之前開發的一款基于OpenTelemetry的Tracing組件需要使用基于速率限制(Rate Limiting)的跟蹤采樣策略,本想使用現有的解決方案,比如System.Threading.RateLimiting命名空間下的RateLimiter。大體看了RateLimiter的三種實現(固定窗口、滑動窗口和令牌桶),覺得過于相對復雜了點,代碼還涉及到鎖,而且提供的功能我也不太需要,于是嘗試實現一種簡單且無鎖解決方案。
我為RateLimiter定義了如下這個簡單的IRateLimiter接口,唯一的無參方法TryAcquire利用返回的布爾值確定當前是否超出設定的速率限制。我只提供的兩種基于時間窗口的實現,如下所示的基于“滑動時間窗口”的實現類型SliddingWindowRateLimiter,我們在構造的時候指定時間窗口和閾值。SliddingWindowRateLimiter采用一種“討巧”的實現,它直接利用了BoundedChannel<DateTimeOffset>對象,我們將指定的閾值作為它的最大容量。
public interface IRateLimiter{ bool TryAcquire();}public sealed class SliddingWindowRateLimiter: IRateLimiter{ private readonly TimeSpan _window; private readonly ChannelReader<DateTimeOffset> _reader; private readonly ChannelWriter<DateTimeOffset> _writer; public SliddingWindowRateLimiter(TimeSpan window, int permit) { _window = window; var options = new BoundedChannelOptions (permit) { FullMode = BoundedChannelFullMode.Wait, SingleReader = false, SingleWriter = true }; var channel = Channel .CreateBounded<DateTimeOffset>(options); _reader = channel.Reader; _writer = channel.Writer; Task.Factory.StartNew( Trim,TaskCreationOptions.LongRunning); } public bool TryAcquire() => _writer.TryWrite(DateTimeOffset.UtcNow); private void Trim() { if (!_reader.TryPeek(out var timestamp)) { Task.Delay(_window).Wait(); Trim(); } else { var delay = _window - (DateTimeOffset.UtcNow - timestamp); if (delay > TimeSpan.Zero) { Task.Delay(delay).Wait(); Trim(); } else { var valueTask = _reader.ReadAsync(); if (!valueTask.IsCompleted) _ = valueTask.Result; Trim(); } } }}
在實現的TryAcquire方法中,我們試著將當前時間戳寫入這個Channel,并將寫入的結果(成功或者失敗)作為返回值。為了讓Channel中只包含指定時間窗口的時間戳,我們利用一個LongRuning的Task執行Trim方法對過期的時間戳進行“裁剪”。Trim會調用ChannelReader的TRyPeek方法,如果返回False,意味著Channel為空,此時會等待一段窗口時間再進行“裁剪”。如果提取出來時間戳在Now-Window與當前時間之間,意味著Channel里面的時間戳均在設定的窗口內,此時同樣需要等待,等待時間為Window - (Now - Timestamp);只有在提取的時間超出窗口范圍,我們才需要將其從Channel中移除。
var limiter = new SliddingWindowRateLimiter( TimeSpan.FromSeconds(2),2);var index = 0;await Task.WhenAll( Enumerable.Range(1, 100) .Select(_ => Task.Run(() => { while (true) { if (limiter.TryAcquire()) { Console.WriteLine( $"[{DateTimeOffset.Now}]{Interlocked.Increment(ref index)}"); } } })));
我們在上面的演示程序中使用這個SliddingWindowRateLimiter,設定的限速規則為 2/2s。我們創建了100個Task并發地調用這個SliddingWindowRateLimiter,并將它返回True時的時間戳顯示出來,具體輸出如下所示。
圖片
如下這個FixedWindowRateLimiter類型是針對“固定窗口”的實現,字段_windowTicks和_permit同樣表示時間窗口的時長(這里我們使用Int64類型的Ticks屬性)和閾值。_nextWindowStartTimeTicks表示下一次固定窗口的起始時間,這個需要動態調整,為了確保只有一個線程能夠修改它,我們定義了_windowReseting這個“信號量”。_count是一個計數器,我們使用它確定是否“超速”。
public sealed class FixedWindowRateLimiter : IRateLimiter{ private readonly long _windowTicks; private readonly int _permit; private long _nextWindowStartTimeTicks; private volatile int _count = 0; public FixedWindowRateLimiter(TimeSpan window, int permit) { _windowTicks = window.Ticks; _permit = permit; _nextWindowStartTimeTicks = DateTimeOffset.UtcNow.Add(window).Ticks; } public bool TryAcquire() { // 超出時間窗口,重置計數器,并調整下一個時間窗口的開始時間 var now = DateTimeOffset.UtcNow.Ticks; var nextWindowStartTimeTicks = nextWindowStartTimeTicks; if (now >= nextWindowStartTimeTicks && Interlocked.CompareExchange( ref _nextWindowStartTimeTicks , now + _windowTicks, nextWindowStartTimeTicks) == nextWindowStartTimeTicks) { Interlocked.Exchange(ref _count, 1); return true; } return _count < _permit && Interlocked.Increment(ref _count) <= _permit; }}
在實現的TryAcquire方法中,我們先確定當前時間是否超過了設定的“下一個窗口開始時間”,如果是則調用Interlocked.CompareExchange方法修改__nextWindowStartTimeTicks字段。成功修改__nextWindowStartTimeTicks的線程會調整窗口開始時間,并重置計數器_count為1,并返回True。如果計數器大于等于設定閾值,方法返回False。否則我們讓計數器+1,如果該值<=閾值,返回True,否則返回False。
IRateLimiter limiter = new FixedWindowRateLimiter( window: TimeSpan.FromSeconds(2), permit: 2);var index = 0;await Task.WhenAll( Enumerable.Range(1, 100) .Select(_ => Task.Run(() => { while (true) { if (limiter.TryAcquire()) { Console.WriteLine( $"[{DateTimeOffset.Now}]{Interlocked.Increment(ref index)}"); } } })));
將FixedWindowRateLimiter應用到上面的演示程序,依然能得到我們希望的輸出結果。
本文鏈接:http://www.www897cc.com/showinfo-26-17896-0.html兩種基于時間窗口的限流器的簡單實現
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com