Architecture · #rate-limiting#algorithms#system-design

限流算法详解:令牌桶、滑动窗口与实现

2024.02.21 8 min 3.3k
// 目录 · contents

前言

限流(Rate Limiting)是保护系统稳定性的核心机制。当系统面临突发流量、恶意攻击或上下游异常时,限流可以确保系统在可承受的负载下运行,避免因过载导致的级联故障。本文将深入分析四种主流限流算法的原理,并提供 Java、Go 和 Redis 的实际实现。

为什么需要限流

graph LR
    subgraph 无限流保护
        Users1[大量用户] -->|10万QPS| Server1[服务器<br/>容量:1万QPS]
        Server1 -->|过载崩溃| X[系统不可用]
    end

    subgraph 有限流保护
        Users2[大量用户] -->|10万QPS| RateLimiter[限流器]
        RateLimiter -->|1万QPS| Server2[服务器<br/>容量:1万QPS]
        RateLimiter -->|9万QPS| Reject[拒绝/排队]
        Server2 --> OK[系统正常运行]
    end

限流的核心目的: 1. 保护系统:防止过载导致宕机 2. 公平分配:确保所有用户公平使用资源 3. 防止滥用:阻止爬虫或恶意调用 4. 成本控制:避免资源消耗超出预算

固定窗口算法(Fixed Window)

原理

将时间划分为固定大小的窗口(如每分钟一个窗口),在每个窗口内使用计数器记录请求数。当计数超过阈值时拒绝后续请求。

graph LR
    subgraph "窗口1: 00:00-01:00"
        W1[计数: 95/100 ✓]
    end
    subgraph "窗口2: 01:00-02:00"
        W2[计数: 100/100 ✗]
    end
    subgraph "窗口3: 02:00-03:00"
        W3[计数: 42/100 ✓]
    end

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class FixedWindowRateLimiter {
private final int maxRequests;
private final long windowSizeMs;
private long windowStart;
private int counter;

public FixedWindowRateLimiter(int maxRequests, long windowSizeMs) {
this.maxRequests = maxRequests;
this.windowSizeMs = windowSizeMs;
this.windowStart = System.currentTimeMillis();
this.counter = 0;
}

public synchronized boolean tryAcquire() {
long now = System.currentTimeMillis();

// 进入新窗口,重置计数器
if (now - windowStart >= windowSizeMs) {
windowStart = now;
counter = 0;
}

if (counter < maxRequests) {
counter++;
return true;
}
return false;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type FixedWindowLimiter struct {
mu sync.Mutex
maxRequests int
windowSize time.Duration
windowStart time.Time
counter int
}

func NewFixedWindowLimiter(maxRequests int, windowSize time.Duration) *FixedWindowLimiter {
return &FixedWindowLimiter{
maxRequests: maxRequests,
windowSize: windowSize,
windowStart: time.Now(),
}
}

func (l *FixedWindowLimiter) Allow() bool {
l.mu.Lock()
defer l.mu.Unlock()

now := time.Now()
if now.Sub(l.windowStart) >= l.windowSize {
l.windowStart = now
l.counter = 0
}

if l.counter < l.maxRequests {
l.counter++
return true
}
return false
}

临界问题

固定窗口的最大问题是窗口边界的突发流量:如果在窗口末尾和下个窗口开头分别涌入最大量请求,实际瞬时流量可能达到限制值的两倍。

graph LR
    subgraph "问题场景: 限制100/min"
        A["窗口1末尾<br/>00:59 → 100请求"] --> B["窗口2开头<br/>01:00 → 100请求"]
    end
    C["实际: 1秒内200请求<br/>是限制的2倍!"]

滑动窗口算法(Sliding Window)

原理

滑动窗口通过将窗口细分为多个小格子,用加权计算来平滑边界效应。

graph TB
    subgraph "滑动窗口 (窗口=1min, 6个格子)"
        G1["0-10s<br/>15"] --> G2["10-20s<br/>20"]
        G2 --> G3["20-30s<br/>10"]
        G3 --> G4["30-40s<br/>25"]
        G4 --> G5["40-50s<br/>18"]
        G5 --> G6["50-60s<br/>12"]
    end
    Note["当前时间: 35s<br/>窗口范围: 前35s到当前<br/>总计 = 15+20+10+25×0.5 = 57.5"]

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class SlidingWindowRateLimiter {
private final int maxRequests;
private final long windowSizeMs;
private final int bucketCount;
private final long bucketSizeMs;
private final int[] buckets;
private long lastBucketTime;
private int lastBucketIndex;

public SlidingWindowRateLimiter(int maxRequests, long windowSizeMs, int bucketCount) {
this.maxRequests = maxRequests;
this.windowSizeMs = windowSizeMs;
this.bucketCount = bucketCount;
this.bucketSizeMs = windowSizeMs / bucketCount;
this.buckets = new int[bucketCount];
this.lastBucketTime = System.currentTimeMillis();
this.lastBucketIndex = 0;
}

public synchronized boolean tryAcquire() {
long now = System.currentTimeMillis();
advanceWindow(now);

int totalCount = 0;
for (int count : buckets) {
totalCount += count;
}

if (totalCount < maxRequests) {
buckets[lastBucketIndex]++;
return true;
}
return false;
}

private void advanceWindow(long now) {
long elapsed = now - lastBucketTime;
int bucketsToAdvance = (int) (elapsed / bucketSizeMs);

if (bucketsToAdvance > 0) {
for (int i = 0; i < Math.min(bucketsToAdvance, bucketCount); i++) {
lastBucketIndex = (lastBucketIndex + 1) % bucketCount;
buckets[lastBucketIndex] = 0;
}
lastBucketTime += bucketsToAdvance * bucketSizeMs;
}
}
}

基于 Redis Sorted Set 的滑动窗口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class RedisSlidingWindowLimiter {
private final RedisTemplate<String, String> redis;
private final int maxRequests;
private final long windowSizeMs;

public boolean tryAcquire(String key) {
long now = System.currentTimeMillis();
String member = now + ":" + UUID.randomUUID();

String redisKey = "rate_limit:" + key;

// 使用 Lua 脚本保证原子性
String luaScript = """
-- 移除窗口外的旧请求
redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, ARGV[1])
-- 统计窗口内的请求数
local count = redis.call('ZCARD', KEYS[1])
if count < tonumber(ARGV[2]) then
-- 未超限,添加新请求
redis.call('ZADD', KEYS[1], ARGV[3], ARGV[4])
redis.call('PEXPIRE', KEYS[1], ARGV[5])
return 1
end
return 0
""";

Long result = redis.execute(
new DefaultRedisScript<>(luaScript, Long.class),
List.of(redisKey),
String.valueOf(now - windowSizeMs), // 窗口起始时间
String.valueOf(maxRequests), // 最大请求数
String.valueOf(now), // 当前时间作为 score
member, // 唯一成员
String.valueOf(windowSizeMs) // TTL
);

return result != null && result == 1;
}
}

令牌桶算法(Token Bucket)

原理

系统以固定速率向桶中添加令牌,每个请求需要消耗一个令牌。桶有固定容量,满了之后新令牌被丢弃。这允许一定程度的突发流量(桶中积累的令牌)。

graph TB
    subgraph 令牌桶
        Generator[令牌生成器<br/>10个/秒] -->|定时添加| Bucket[令牌桶<br/>容量:100]
        Request[请求] -->|取令牌| Bucket
        Bucket -->|有令牌| Allow[放行]
        Bucket -->|无令牌| Deny[拒绝]
    end
sequenceDiagram
    participant G as 令牌生成器
    participant B as 令牌桶(容量=5)
    participant R as 请求

    Note over B: 初始令牌数: 5
    G->>B: +1 令牌 (桶满,丢弃)
    R->>B: 请求1, 取1令牌 ✓ (剩余4)
    R->>B: 请求2, 取1令牌 ✓ (剩余3)
    R->>B: 请求3, 取1令牌 ✓ (剩余2)
    G->>B: +1 令牌 (剩余3)
    R->>B: 请求4, 取1令牌 ✓ (剩余2)
    R->>B: 突发: 取3令牌 ✗ (不足,拒绝)

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class TokenBucketRateLimiter {
private final double maxTokens; // 桶容量
private final double refillRate; // 令牌填充速率(个/秒)
private double currentTokens; // 当前令牌数
private long lastRefillTime; // 上次填充时间

public TokenBucketRateLimiter(double maxTokens, double refillRate) {
this.maxTokens = maxTokens;
this.refillRate = refillRate;
this.currentTokens = maxTokens;
this.lastRefillTime = System.nanoTime();
}

public synchronized boolean tryAcquire() {
return tryAcquire(1);
}

public synchronized boolean tryAcquire(int tokens) {
refill();
if (currentTokens >= tokens) {
currentTokens -= tokens;
return true;
}
return false;
}

private void refill() {
long now = System.nanoTime();
double elapsed = (now - lastRefillTime) / 1_000_000_000.0;
double tokensToAdd = elapsed * refillRate;
currentTokens = Math.min(maxTokens, currentTokens + tokensToAdd);
lastRefillTime = now;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
type TokenBucket struct {
mu sync.Mutex
maxTokens float64
refillRate float64 // tokens per second
tokens float64
lastRefillAt time.Time
}

func NewTokenBucket(maxTokens, refillRate float64) *TokenBucket {
return &TokenBucket{
maxTokens: maxTokens,
refillRate: refillRate,
tokens: maxTokens,
lastRefillAt: time.Now(),
}
}

func (tb *TokenBucket) Allow() bool {
return tb.AllowN(1)
}

func (tb *TokenBucket) AllowN(n int) bool {
tb.mu.Lock()
defer tb.mu.Unlock()

tb.refill()

requested := float64(n)
if tb.tokens >= requested {
tb.tokens -= requested
return true
}
return false
}

func (tb *TokenBucket) refill() {
now := time.Now()
elapsed := now.Sub(tb.lastRefillAt).Seconds()
tb.tokens = math.Min(tb.maxTokens, tb.tokens+elapsed*tb.refillRate)
tb.lastRefillAt = now
}

Redis 分布式令牌桶

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
-- Redis Lua 脚本: 分布式令牌桶
-- KEYS[1] = 令牌桶 key
-- ARGV[1] = maxTokens
-- ARGV[2] = refillRate (tokens/sec)
-- ARGV[3] = now (当前时间戳,毫秒)
-- ARGV[4] = requested (请求的令牌数)

local key = KEYS[1]
local maxTokens = tonumber(ARGV[1])
local refillRate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

-- 获取当前状态
local bucket = redis.call('HMGET', key, 'tokens', 'lastRefillTime')
local tokens = tonumber(bucket[1]) or maxTokens
local lastRefillTime = tonumber(bucket[2]) or now

-- 计算需要补充的令牌
local elapsed = (now - lastRefillTime) / 1000.0
local tokensToAdd = elapsed * refillRate
tokens = math.min(maxTokens, tokens + tokensToAdd)

-- 尝试获取令牌
local allowed = 0
if tokens >= requested then
tokens = tokens - requested
allowed = 1
end

-- 更新状态
redis.call('HMSET', key, 'tokens', tokens, 'lastRefillTime', now)
redis.call('PEXPIRE', key, math.ceil(maxTokens / refillRate) * 1000 + 1000)

return allowed

漏桶算法(Leaky Bucket)

原理

漏桶以固定速率处理请求(像水从桶底漏出),请求以任意速率进入桶中。桶满时新请求被丢弃。与令牌桶不同,漏桶不允许突发流量,它的输出速率是恒定的。

graph TB
    subgraph 漏桶
        Input[请求入口<br/>速率不定] -->|入桶| Bucket[漏桶<br/>容量:100]
        Bucket -->|恒定速率<br/>10个/秒| Output[处理请求]
        Input -.->|桶满| Overflow[溢出/丢弃]
    end

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class LeakyBucketRateLimiter {
private final int capacity; // 桶容量
private final double leakRate; // 漏出速率(个/秒)
private double water; // 当前水量
private long lastLeakTime; // 上次漏水时间

public LeakyBucketRateLimiter(int capacity, double leakRate) {
this.capacity = capacity;
this.leakRate = leakRate;
this.water = 0;
this.lastLeakTime = System.nanoTime();
}

public synchronized boolean tryAcquire() {
leak();

if (water < capacity) {
water++;
return true;
}
return false; // 桶满,拒绝
}

private void leak() {
long now = System.nanoTime();
double elapsed = (now - lastLeakTime) / 1_000_000_000.0;
double leaked = elapsed * leakRate;
water = Math.max(0, water - leaked);
lastLeakTime = now;
}
}

算法对比

graph TB
    subgraph "固定窗口"
        FW[简单高效<br/>有边界突发问题]
    end
    subgraph "滑动窗口"
        SW[平滑计数<br/>解决边界问题<br/>内存开销稍大]
    end
    subgraph "令牌桶"
        TB2[允许突发<br/>平均速率控制<br/>最常用]
    end
    subgraph "漏桶"
        LB[严格恒定速率<br/>不允许突发<br/>队列模型]
    end
算法 突发处理 精确度 内存 适用场景
固定窗口 有边界问题 O(1) 简单场景,粗粒度限流
滑动窗口 平滑 O(N) 需要精确计数的场景
令牌桶 允许突发 O(1) API 限流(最推荐)
漏桶 不允许 O(1) 需要恒定处理速率

Nginx 限流配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 基于漏桶算法的限流
http {
# 定义限流区域
# $binary_remote_addr 按 IP 限流
# zone=api:10m 共享内存区域名和大小
# rate=10r/s 每秒10个请求
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;

# 按 API Key 限流
map $http_x_api_key $api_key {
default $http_x_api_key;
"" $binary_remote_addr;
}
limit_req_zone $api_key zone=api_key:10m rate=100r/s;

server {
location /api/ {
# burst=20: 允许20个突发请求排队
# nodelay: 突发请求不延迟,直接处理
limit_req zone=api burst=20 nodelay;

# 超限返回 429
limit_req_status 429;

proxy_pass http://backend;
}

location /api/v1/upload {
# 上传接口更严格的限流
limit_req zone=api burst=5;
limit_req_status 429;

proxy_pass http://upload-backend;
}
}
}

Go 标准库限流

Go 标准库提供了基于令牌桶的 rate.Limiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package main

import (
"net/http"
"golang.org/x/time/rate"
"sync"
)

// 每用户独立限流器
type UserRateLimiter struct {
limiters map[string]*rate.Limiter
mu sync.RWMutex
rate rate.Limit
burst int
}

func NewUserRateLimiter(r rate.Limit, burst int) *UserRateLimiter {
return &UserRateLimiter{
limiters: make(map[string]*rate.Limiter),
rate: r,
burst: burst,
}
}

func (u *UserRateLimiter) GetLimiter(userID string) *rate.Limiter {
u.mu.RLock()
limiter, exists := u.limiters[userID]
u.mu.RUnlock()

if exists {
return limiter
}

u.mu.Lock()
defer u.mu.Unlock()

limiter = rate.NewLimiter(u.rate, u.burst)
u.limiters[userID] = limiter
return limiter
}

// 限流中间件
func RateLimitMiddleware(limiter *UserRateLimiter) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
userID := r.Header.Get("X-User-ID")
if userID == "" {
userID = r.RemoteAddr
}

if !limiter.GetLimiter(userID).Allow() {
http.Error(w, `{"error":"rate limit exceeded"}`,
http.StatusTooManyRequests)
return
}

next.ServeHTTP(w, r)
})
}
}

func main() {
// 每用户每秒10个请求,允许突发20个
userLimiter := NewUserRateLimiter(10, 20)
mux := http.NewServeMux()
mux.HandleFunc("/api/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK"))
})

handler := RateLimitMiddleware(userLimiter)(mux)
http.ListenAndServe(":8080", handler)
}

多级限流策略

在实际生产环境中,通常需要多级限流:

graph TB
    Request[请求] --> L1[第1层: CDN/WAF<br/>IP级别限流]
    L1 --> L2[第2层: API Gateway<br/>用户/API Key限流]
    L2 --> L3[第3层: 服务级别<br/>单机限流]
    L3 --> L4[第4层: 方法级别<br/>热点接口限流]
    L4 --> Business[业务处理]

总结

限流是系统稳定性的重要保障。四种算法各有特点:固定窗口实现最简单但有边界问题,滑动窗口更精确,令牌桶允许突发且最为通用,漏桶保证恒定处理速率。在实际应用中,推荐使用令牌桶作为默认选择,结合 Redis 实现分布式限流,并在网关、服务和方法级别部署多级限流策略。

作者 · authorzt
发布 · date2024-02-21
篇幅 · length3.3k 字 · 8 min
许可 · licenseCC BY-SA 4.0
$ echo "comments" · 评论