限流器 (Rate Limiter)
- 文章發表於
前言
限流器(Rate Limiter)在現今的軟體架構中扮演著不可或缺的角色。它能作為一道防護牆,防止軟體服務遭到濫用或攻擊,確保系統穩定性,同時保障所有使用者的公平使用權益。
想像一家擁有破億流量的軟體服務商,像是 ChatGPT、x.com,如果沒有限流器的保護,系統很可能被單一使用者或惡意程式過度濫用,造成硬體資源被獨佔甚至崩潰,而影響到對其他使用者的體驗。使用限流器可以確保每位使用者都能公平地使用資源,除了維持整體系統的效能穩定,並避免任何單一使用者壟斷資源或遭受到惡意攻擊。
使用限流器的主要好處包括:
- 防止濫用:惡意使用者可能試圖透過大量請求(例如,DoS 攻擊)或濫用 API(例如,人工灌水、過度抓取資料)來破壞系統的正常運作。
- 資源管理:確保後端服務不會因為大量請求而過載,進而維持系統的穩定性與可用性,讓所有使用者都能獲得良好的使用體驗。
- 公平使用:讓所有使用者都能公平地取得資源,避免少數高流量使用者壟斷資源,影響其他使用者的服務品質。
- 成本控制:在雲端環境中,過多的請求可能導致高昂的營運成本;透過限流機制(Rate Limiting),可以有效控制並管理這些成本。
流程概述

使用限流器後,整體的使用者流程就如下:
- 用戶端發送請求
使用者(透過瀏覽器或行動應用)發送一個 HTTP 請求。 - 負載平衡器接收請求
負載平衡器負責執行健康檢查,並將請求分配至可用的 API Gateway 節點。 - API Gateway 轉發至限流器
API Gateway 在進行任何後端處理前,會將請求傳送至限流器。 - 限流器檢查使用數據
限流器查詢快取系統(例如 Redis 或 Memcached),以取得該用戶當前的請求次數。若快取中沒有資料,可能會查詢資料庫以獲取限流規則,並更新快取。 - 判斷:允許或拒絕
- 允許 – 限流器通知 API Gateway 繼續處理請求。
- 拒絕 – 限流器通知 API Gateway 立即回傳 HTTP 429 Too Many Requests 錯誤,並可選擇附加
Retry-After
標頭,告知用戶端多久後可以再次嘗試。
- 轉發至後端 API(若允許)
API Gateway 將請求轉發至後端 API 伺服器,執行業務邏輯、查詢資料庫或呼叫其他服務。 - 將回應返回給用戶端
API 回應會透過 API Gateway 和負載平衡器返回給用戶端。
本篇文章將會著重在如何實作限流器常見的演算法,並實作一個簡單的限流器。
實作
如同在先前的限流器探討中所提及,設計一個限流器的核心目的,無非是為了防止單一使用者濫用資源、應對惡意攻擊,並同時兼顧成本控制。然而,該如何將這些概念實際地落實到程式碼中呢?
在真正動手實作之前,不妨先思考一下:我們需要哪些元素,以及應該回傳什麼資訊?
舉例來說,有使用過 Claude 或是 ChatGPT 的讀者應該多少都有被限流的經驗,當你使用量超標時就會看到介面上顯示 "您已超過使用上限,請在幾點後再試",或是快超標時就會顯示 "您的請求次數剩餘 X 次,下次更新時間為 Y 日" 的訊息。
要有上述的效果,後端會需要用 HTTP 狀態碼(例如 429 Too Many Requests)配合 Header(如 X-RateLimit-Limit
、X-RateLimit-Remaining
、X-RateLimit-Reset
或 Retry-After
)來告知用戶端該使用者是否被限流與剩餘額度。用戶端只要偵測到狀態碼或讀取相關 Header,就能正確呈現相對應的訊息告知使用者。
而一個理想的限流回應應該包含以下欄位:
{success: boolean, // 是否允許此次請求繼續執行limit: number, // 當前時間視窗內允許的最大請求數remaining: number, // 剩餘可用的請求次數reset: number, // 當前時間視窗重置的時間點(Unix timestamp)pending: Promise // 背景處理任務(例如資料同步或快取清理)}
如此一來,用戶端可以根據這個統一的資料結構,判斷是否需要顯示提示訊息,或者在必要時自動排程重試。
基本架構
限流器的基本功能通常包含:設定特定期間內的請求上限、選擇限流演算法,以及自訂儲存空間。我們將透過 Strategy Pattern 讓使用套件的開發者自行指定所要使用的演算法。
我們將會介紹以下幾種常見的限流演算法:
- 固定視窗 (Fixed Window)
- 滑動視窗 (Sliding Window Log)
- 滑動視窗計數 (Sliding Window Count)
- 令牌桶 (Token Bucket)
實作框架
/*** 用於應用速率限制的主要類別。* 使用策略模式,允許在執行時傳入不同的限流演算法。*/class RateLimit {/*** @param {object} config - 限流器的設定物件。* @param {function} config.limiter - 特定的限流演算法函式 (由靜態方法之一產生)。* @param {RedisMap} [config.storage] - (可選) 用於儲存狀態的實例,預設為新的 RedisMap。* @param {number} [config.delay] - (可選) 用於模擬網路延遲的毫秒數。*/constructor(config) {this.limiter = config.limiter;this.storage = config.storage || new RedisMap();this.delay = config.delay || 10; // 預設模擬延遲}/*** 對給定的識別符應用已設定的速率限制。* @param {string} identifier - 用於區分被限制實體的唯一識別符 (例如,使用者 ID、IP 地址)。* @returns {Promise<object>} 一個 Promise,其解析值為一個物件,包含成功/失敗及限制詳情。* @example* const result = await rateLimiter.limit("user-123");* // result: { success: boolean, limit: number, remaining: number, reset: number }*/async limit(identifier) {return await this.limiter(this.storage, identifier, this.delay);}/*** 在固定的、不重疊的時間視窗內限制請求。* @param {number} tokens - 每個視窗內允許的最大請求數。* @param {string} window - 時間視窗的持續時間 (例如 "60s", "1m")。* @returns {function} 限流器函式,可傳入 RateLimit 的建構子。*/static fixedWindow(tokens, window) {const windowDuration = parseWindow(window);return async (storage, identifier, delay) => {// 演算法實作...};}/*** 透過加權計算當前和前一個固定視窗來近似一個滑動視窗。* @param {number} tokens - 每個視窗內允許的最大請求數。* @param {string} window - 時間視窗的持續時間 (例如 "60s", "1m")。* @returns {function} 限流器函式。*/static slidingWindow(tokens, window) {const windowDuration = parseWindow(window);return async (storage, identifier, delay) => {// 演算法實作...};}/*** 允許突發請求,最高可達桶的容量,並以固定速率補充令牌。* @param {number} refillRate - 每個間隔時間補充的令牌數量。* @param {string} interval - 補充令牌的間隔時間 (例如 "1s", "10s")。* @param {number} maxTokens - 桶能容納的最大令牌數。* @returns {function} 限流器函式。*/static tokenBucket(refillRate, interval, maxTokens) {return async (storage, identifier, delay) => {// 演算法實作...};}/*** 維護一個在滑動視窗內請求的時間戳日誌。* @param {number} tokens - 每個視窗內允許的最大請求數。* @param {string} window - 時間視窗的持續時間 (例如 "60s", "1m")。* @returns {function} 限流器函式。*/static slidingWindowLog(tokens, window) {return async (storage, identifier, delay) => {// 演算法實作...};}}
固定視窗 (Fixed Window)
固定視窗顧名思義就是將時間劃分為固定長度的「時間視窗」(例如每分鐘、每小時),並在每個視窗內追蹤、累計收到的請求數量。當一個時間視窗結束,進入下一個新的視窗時,前一個視窗的計數器便會自動重置為零。
整體流程說明如下:
- 我們會接受到兩個參數:
tokens
:在這個時間視窗內,允許的最大請求數量。window
:視窗長度(毫秒),例如 1m(代表一分鐘)。
- 計算當前請求所屬的 bucket
- 將
identifier
(例如使用者 ID、API 金鑰)與 bucket 結合成唯一 key,向儲存空間 (Redis/Memcached) 讀取目前已記錄的請求數。 - 判斷是否超過配額
- 如果目前已累計的請求數量 ≥ tokens,表示配額已滿,必須拒絕本次請求。
- 如果目前累計的請求數量 < tokens,代表還能允許更多請求,就將該 bucket 的計數加一,並回傳成功訊息。
- 自動重置
當進入下一個新的 bucket 後,舊的 bucket 編號自然被淘汰,其對應的計數器也不再使用;新 bucket 的計數器會從零開始累計。
上述大多數只是取值判斷是否要拒絕或接受請求,再來就是更新值,最具挑戰性的地方在於:如何在不儲存每個請求的時間戳(timestamps)前提下,仍能高效地追蹤「在特定時間視窗內的請求數量」?
const bucket = Math.floor(Date.now() / windowDuration);
這樣我們就不需要去儲存每個請求的時間戳,而是可以透過計算當前時間與視窗長度來得到當前請求所屬的 bucket。
同時這樣做的好處是:
- 全域一致性:同一時間區間內的所有請求都落在相同的 bucket。
- 自動過期:當 bucket 編號向前推進後,過去的 bucket 自然而然就不再被使用。
O(1)
操作:無需掃描整個時間戳陣列,只要直接計算 bucket 編號即可
class RateLimit {...static fixedWindow(tokens, window) {const windowDuration = parseWindow(window);return async (storage, identifier, delay) => {// 1. 計算目前請求所屬的 bucketconst currentBucket = Math.floor(now / windowDuration);const key = `${identifier}:${currentBucket}`// 2. 讀取此 bucket 已使用的 tokens(若不存在就視為 0)const previousTokens = storage.get(key) || 0;const reset = (currentBucket + 1) * windowDuration;// 3. 如果已經用滿,就拒絕if (previousTokens > tokens) {return { success: false, remaining: 0, limit: tokens, reset }}// 4. 尚未超過,累加並更新儲存空間const currentTokens = previousTokens + 1;// 5. 如果是此 bucket 第一次被寫入,設定過期時間if (currentTokens === 1) {storage.pexpire(key, Math.floor(windowDuration * 1.5))}storage.set(key, currentTokens)return { success: true, limit: tokens, remaining: tokens - currentTokens, reset }};}...}
優點:
- 實現簡單: 邏輯清晰,易於理解和實現。
- 記憶體效率高: 只需要為每個受限制的實體 (如用戶 ID 或 IP 地址) 在當前時間視窗內維護一個計數器。
- 計算開銷小: 計數和重置操作通常很快。
缺點:
- 視窗邊界效應 (Window Boundary Effect) / 臨界區問題:
這是此演算法最主要的缺陷。在時間視窗的邊界處,流量的實際速率可能超過設定的平均速率。例如,若限制為每分鐘 100 次請求,用戶可以在 0:00:59 時刻發送 100 次請求 (被允許),緊接著在 0:01:00 時刻 (新視窗開始) 再發送 100 次請求 (也被允許)。這導致在短短的幾秒鐘內,系統實際處理了 200 次請求,可能會對後端服務造成衝擊。 - 流量不平滑: 由於計數器在每個視窗開始時重置,可能會導致流量在視窗初期集中爆發。
滑動視窗 (Sliding Window Log)
滑動視窗旨在解決固定視窗的缺點,它允許在視窗邊界處的流量平滑過渡,避免流量在視窗開始時突然爆發。其會記錄每個請求到達的時間戳。當一個新請求到達時,系統會檢查在過去一個「滑動時間視窗」(例如過去 60 秒) 內的所有請求時間戳。如果這些時間戳的數量超過了預設的閾值,則拒絕新請求。
class RateLimit {...static slidingWindow(tokens, window) {const windowDuration = parseWindow(window);return async (storage, identifier, delay) => {const now = Date.now();const key = `log:${identifier}`;// 1. 取出儲存的時間戳陣列,若不存在就用空陣列let timestamps = storage.get(key) || [];// 2. 過濾掉「視窗外」(<= now - windowDuration) 的時間戳const windowStartTime = now - windowDuration;timestamps = timestamps.filter(ts => ts > windowStartTime);// 3. 如果在這個視窗已有 tokens 筆請求,就拒絕// earliest = timestamps[0]if (timestamps.length >= tokens) {// 計算下一次可用的時間:earliestTimestamp + windowDurationconst resetTime = timestamps[0] + windowDuration;// 把「過濾後的陣列」存回去,並設定 TTLstorage.set(key, timestamps);// 多加 1000 毫秒 (1 秒) 是為了保證 key 不會因為極端情況(如網路延遲、程式執行時間)過早過期,確保所有在視窗內的請求都能被正確統計。storage.pexpire(key, windowDuration + 1000);return {success: false,limit: tokens,remaining: 0,reset: resetTime};}// 4. 還沒到上限,就把 now 推進去,並存回去timestamps.push(now);storage.set(key, timestamps);storage.pexpire(key, windowDuration + 1000);// 5. 計算剩餘額度與重置時間const remaining = tokens - timestamps.length;// 重新算 earliestTimestamp + windowDuration 作為 resetconst reset = timestamps[0] + windowDuration;// 6. FOR DEV PURPOSE, 如果要模擬 background work,就用 delayif (delay) {await new Promise(resolve => setTimeout(resolve, delay));}return {success: true,limit: tokens,remaining,reset};};}...}
優點:
- 精確的流量控制: 能夠非常精確地限制在任何給定時間段內的請求數量,有效避免了固定視窗的邊界效應。
- 流量平滑: 由於視窗是連續滑動的,流量控制更為平滑。
缺點:
- 記憶體消耗大: 需要儲存每個請求的時間戳,當請求量很大時,記憶體開銷會非常顯著。
- 計算成本高: 每次請求都需要對時間戳列表進行清理和計數操作,尤其是在高併發情況下,這些操作可能成為效能瓶頸。
滑動視窗計數 (Sliding Window Count)
滑動視窗計數 旨在結合固定視窗計數器的低記憶體消耗和滑動視窗日誌的平滑流量控制特性。它通過維護當前時間視窗和前一個時間視窗的請求計數,並根據當前時間在視窗中的位置,加權計算一個近似的滑動視窗內的請求總數。
整體流程說明如下:
- 時間分片與計數器: 與固定視窗類似,時間被劃分為固定長度的視窗,並為每個視窗維護計數器。
- 近似滑動計數: 當一個新請求到達時,它不僅考慮當前視窗 (currentWindowCount) 的請求數,還會考慮前一個視窗 (previousWindowCount) 的請求數。
- 近似總數 = (previousWindowCount * 重疊比例) + currentWindowCount
- 其中,「重疊比例」指的是前一個視窗有多少部分依然落在當前的「滑動視窗」內。例如,如果視窗大小是 60 秒,當前時間是 1:00:15 (即當前視窗已過 15 秒),那麼前一個視窗 (0:59:00-1:00:00) 有 45 秒 (即 75%) 的內容依然在過去 60 秒的滑動視窗內。
- 限制檢查: 如果這個近似總數小於閾值,則允許請求,並增加當前視窗的計數器;否則拒絕。
範例說明(假設限制為每 60 秒最多 100 個請求,視窗長度為 60 秒):
- 視窗 1(0:00:00 - 0:01:00):共收到 80 個請求,
count_window1 = 80
- 視窗 2(0:01:00 - 0:02:00):
- 在 0:01:15(當前視窗已過 15 秒,佔 25%)時:
- 前一視窗的貢獻比例 = (60 - 15) / 60 = 45 / 60 = 75%
- 假設此時視窗 2 已有 10 個請求(
count_window2 = 10
) - 近似滑動總數 = (count_window1 × 75%) + count_window2 = (80 × 0.75) + 10 = 60 + 10 = 70
- 70 < 100,請求允許
- 在 0:01:45(當前視窗已過 45 秒,佔 75%)時:
- 前一視窗的貢獻比例 = (60 - 45) / 60 = 15 / 60 = 25%
- 假設此時視窗 2 已有 50 個請求(
count_window2 = 50
) - 近似滑動總數 = (count_window1 × 25%) + count_window2 = (80 × 0.25) + 50 = 20 + 50 = 70
- 70 < 100,請求允許
- 在 0:01:15(當前視窗已過 15 秒,佔 25%)時:
class RateLimit {...static slidingWindow(tokens, window) {const windowDuration = parseWindow(window);return async (storage, identifier, delay) => {const now = Date.now();// 1. 計算「當前」與「上一個」bucketconst currentBucket = Math.floor(now / windowDuration);const previousBucket = currentBucket - 1;const currentKey = `${identifier}:${currentBucket}`;const previousKey = `${identifier}:${previousBucket}`;// 2. 取得各 bucket 的 token 數(預設為 0)const currentTokens = storage.get(currentKey) || 0;const previousTokens = storage.get(previousKey) || 0;// 3. 計算上一個 bucket 的加權 token 數const elapsed = now % windowDuration;const overlapRatio = 1 - (elapsed / windowDuration);const weightedPrevious = Math.floor(previousTokens * overlapRatio);// 4. 計算滑動視窗內的總 token 數const totalTokens = currentTokens + weightedPrevious;const reset = (currentBucket + 1) * windowDuration;// 5. 檢查是否超過配額if (totalTokens >= tokens) {return {success: false,limit: tokens,remaining: 0,reset};}// 6. 尚未超過,累加當前 bucket 的 token 並更新 storageconst newCurrentTokens = currentTokens + 1;storage.set(currentKey, newCurrentTokens);// 7. 若是第一次寫入,設定 TTLif (newCurrentTokens === 1) {// TTL 設為 windowDuration * 2 + 1000,確保前一個 bucket 尚未過期storage.pexpire(currentKey, Math.floor(windowDuration * 2 + 1000));}// 8. 計算新的剩餘次數const newTotalTokens = newCurrentTokens + weightedPrevious;const remaining = Math.max(0, tokens - newTotalTokens);// 9. FOR DEV PURPOSE, 模擬背景延遲if (delay) {await new Promise(resolve => setTimeout(resolve, delay));}// 10. 回傳成功訊息return {success: true,limit: tokens,remaining,reset};};}}
優點:
- 流量平滑:與固定視窗計數法相比,滑動視窗計數能有效減少視窗邊界帶來的突發流量,使限流效果更平滑。
- 記憶體效率高:僅需儲存當前與前一個視窗的計數,所需記憶體遠低於滑動視窗日誌法。
- 實作難度適中:比滑動視窗日誌簡單,僅稍微複雜於固定視窗計數。
缺點:
- 僅為近似值:無法像滑動視窗日誌法那樣做到完全精確,遇到極端流量分布時可能有誤差。
- 需精確計算重疊比例:必須正確計算視窗重疊比例,並妥善維護兩個視窗的計數。
令牌桶 (Token Bucket)
令牌桶的概念就是將每個請求想像成從一個「令牌桶」中獲取令牌。系統會以恆定的速率向這個桶中添加令牌。每個進入系統的請求都必須先從桶中獲取一個令牌才能被處理。如果桶中有足夠的令牌,請求會被立即處理;如果桶中令牌不足,請求則會被拒絕、排隊等待,或採取其他降級策略。
class RateLimit {...static tokenBucket(refillRate, interval, maxTokens) {const intervalDuration = parseWindow(interval);return async (storage, identifier, delay) => {const now = Date.now();const key = `token:${identifier}`;// 1. 取得現有 bucket 狀態,預設為滿桶let bucketData = storage.get(key) || {tokens: maxTokens,lastRefillAt: now,};// 2. 計算距離上次 refill 經過的時間const timePassed = now - bucketData.lastRefillAt;// 3. 計算應該 refill 幾次const refills = Math.floor(timePassed / intervalDuration);// 4. 根據 refill 次數補充 tokenlet currentTokens = bucketData.tokens;let newLastRefillAt = bucketData.lastRefillAt;if (refills > 0) {currentTokens = Math.min(maxTokens,bucketData.tokens + refills * refillRate);newLastRefillAt = bucketData.lastRefillAt + refills * intervalDuration;}// 5. 若 token 不足,拒絕請求,計算下次可用時間if (currentTokens < 1) {const timeSinceLastRefill = now - newLastRefillAt;const timeToNextRefill = intervalDuration - (timeSinceLastRefill % intervalDuration);const reset = now + timeToNextRefill;storage.set(key, {tokens: currentTokens,lastRefillAt: newLastRefillAt,});storage.pexpire(key, Math.max(intervalDuration * 5, parseWindow("1h")));return {success: false,limit: maxTokens,remaining: 0,reset,};}// 6. token 足夠,消耗 1 個const tokensLeft = currentTokens - 1;storage.set(key, {tokens: tokensLeft,lastRefillAt: newLastRefillAt,});storage.pexpire(key, Math.max(intervalDuration * 5, parseWindow("1h")));// 7. 模擬延遲if (delay) {await new Promise(resolve => setTimeout(resolve, delay));}// 8. 計算下次 refill 時間let reset;if (tokensLeft < maxTokens) {const timeSinceLastRefill = now - newLastRefillAt;const timeToNextRefill = intervalDuration - (timeSinceLastRefill % intervalDuration);reset = now + timeToNextRefill;} else {reset = now;}return {success: true,limit: maxTokens,remaining: tokensLeft,reset,};};}}