一、限流算法理论基础与实现

1.1 计数器法:最简单直接的限流实现

计数器法是最基础也是最容易实现的限流算法,其核心思想是在固定时间窗口内统计请求次数,超过阈值则拒绝后续请求。

算法原理图解:

text

时间窗口:1分钟,阈值:100请求
┌─────────────────────────────────┐
│  0:00      0:30      0:59       │
│   ├──────┬──────┬──────┬───────┤│
│   │  45  │  30  │  25  │       ││
│   └──────┴──────┴──────┴───────┘│
└─────────────────────────────────┘
当0:59时,统计0:00-0:59的请求总数=100,新请求被拒绝
伪代码实现:

java

public class CounterLimiter {
    private long timeStamp = System.currentTimeMillis();  // 当前时间
    private long limitCount = 100;                       // 限流阈值
    private long interval = 60 * 1000;                   // 时间窗口(毫秒)
    private long requestCount = 0;                       // 当前窗口请求数
    
    public synchronized boolean tryAcquire() {
        long now = System.currentTimeMillis();
        
        // 判断是否进入下一个时间窗口
        if (now - timeStamp < interval) {
            // 仍在当前窗口
            if (requestCount < limitCount) {
                requestCount++;
                return true;
            } else {
                return false;  // 限流
            }
        } else {
            // 进入新的时间窗口
            timeStamp = now;
            requestCount = 1;  // 重置计数器
            return true;
        }
    }
}
计数器法的缺陷
  1. 临界问题:在时间窗口切换瞬间可能出现流量突刺

  2. 精度问题:固定窗口导致统计不够精确

  3. 无法应对突发流量:窗口内前半段空闲,后半段突发流量仍会被拒绝

1.2 滑动时间窗口算法:解决计数器法的精度问题

滑动窗口算法是对计数器法的改进,将固定时间窗口划分为多个小格子,每次滑动一个格子,使统计更加精确平滑。

算法原理图解:

text

时间窗口:1分钟,划分为6个格子(每格10秒)
初始状态:格子0-5
[ 0-9s:15 ][10-19s:20][20-29s:25][30-39s:18][40-49s:22][50-59s:30] 总请求=130

滑动后:格子1-6
[10-19s:20][20-29s:25][30-39s:18][40-49s:22][50-59s:30][60-69s:0 ] 总请求=115
↑新增格子              ↑移除最旧格子
Sentinel滑动窗口源码实现分析:

核心类:LeapArray 和 WindowWrap

java

// Sentinel中的滑动窗口核心数据结构
public abstract class LeapArray<T> {
    // 窗口长度(毫秒)
    protected int windowLengthInMs;
    // 采样窗口数量
    protected int sampleCount;
    // 总的时间窗口长度(毫秒)= windowLengthInMs * sampleCount
    protected int intervalInMs;
    // 窗口数组
    protected final AtomicReferenceArray<WindowWrap<T>> array;
    
    // 获取当前时间窗口
    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        
        // 计算当前时间对应的窗口索引
        int idx = calculateTimeIdx(timeMillis);
        // 计算窗口开始时间
        long windowStart = calculateWindowStart(timeMillis);
        
        while (true) {
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                // 创建新窗口
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    return window;
                } else {
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                // 正好是当前窗口
                return old;
            } else if (windowStart > old.windowStart()) {
                // 需要创建新窗口,替换旧窗口
                if (updateLock.tryLock()) {
                    try {
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // 不应该发生,说明时钟回拨
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }
}

窗口包装类 WindowWrap

java

public class WindowWrap<T> {
    // 窗口长度(毫秒)
    private final long windowLengthInMs;
    // 窗口开始时间(毫秒)
    private long windowStart;
    // 窗口存储的数据(如统计值)
    private T value;
    
    // 添加值到窗口
    public void addValue(T item) {
        // 具体实现由子类决定
    }
    
    // 重置窗口
    public void resetTo(long startTime) {
        this.windowStart = startTime;
        // 重置存储的数据
    }
}

滑动窗口统计实现 BucketLeapArray

java

public class BucketLeapArray extends LeapArray<MetricBucket> {
    
    public BucketLeapArray(int sampleCount, int intervalInMs) {
        // 默认每个窗口500ms,2个窗口组成1秒的时间窗口
        super(sampleCount, intervalInMs);
    }
    
    @Override
    public MetricBucket newEmptyBucket(long time) {
        return new MetricBucket();
    }
    
    @Override
    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> windowWrap, long startTime) {
        windowWrap.resetTo(startTime);
        return windowWrap;
    }
}

统计值类 MetricBucket

java

public class MetricBucket {
    // 使用LongAdder保证并发安全和高性能
    private final LongAdder[] counters;
    
    // 统计维度
    private static final int PASS = 0;      // 通过
    private static final int BLOCK = 1;     // 阻塞
    private static final int EXCEPTION = 2; // 异常
    private static final int SUCCESS = 3;   // 成功
    private static final int RT = 4;        // 响应时间
    
    public MetricBucket() {
        counters = new LongAdder[MetricEvent.values().length];
        for (int i = 0; i < counters.length; i++) {
            counters[i] = new LongAdder();
        }
    }
    
    // 增加统计值
    public void add(MetricEvent event, long count) {
        counters[event.ordinal()].add(count);
    }
    
    // 获取统计值
    public long get(MetricEvent event) {
        return counters[event.ordinal()].sum();
    }
}
滑动窗口算法的优势:
  1. 更精确的统计:通过多格子细分,统计更接近真实流量

  2. 平滑过渡:窗口滑动而非跳跃,避免临界问题

  3. 可配置精度:格子数越多,统计越精确(但内存消耗越大)

1.3 漏桶算法:恒定速率处理请求

漏桶算法模拟一个固定容量的漏桶,请求以任意速率进入桶内,但以恒定速率流出,当桶满时新请求被拒绝。

算法原理图解:

text

请求流入 → [ 漏桶(容量100) ] → 恒定速率流出(10请求/秒)
        ↑                     ↑
      任意速率               恒定速率
伪代码实现:

java

public class LeakyBucketLimiter {
    private long capacity;          // 桶容量
    private long rate;              // 流出速率(请求/秒)
    private long water;             // 当前水量(当前请求数)
    private long lastLeakTime;      // 上次漏水时间
    
    public LeakyBucketLimiter(long capacity, long rate) {
        this.capacity = capacity;
        this.rate = rate;
        this.water = 0;
        this.lastLeakTime = System.currentTimeMillis();
    }
    
    public synchronized boolean tryAcquire() {
        leakWater();  // 先漏水
        
        if (water < capacity) {
            water++;  // 加水
            return true;
        } else {
            return false;  // 桶满,拒绝
        }
    }
    
    private void leakWater() {
        long now = System.currentTimeMillis();
        long elapsedTime = now - lastLeakTime;
        
        // 计算这段时间应该漏掉的水量
        long leakAmount = elapsedTime * rate / 1000;
        
        if (leakAmount > 0) {
            water = Math.max(0, water - leakAmount);
            lastLeakTime = now;
        }
    }
}
Sentinel中的漏桶算法实现:

Sentinel中的匀速排队(Rate Limiter)模式就是基于漏桶算法实现的:

java

// 匀速排队控制器
public class RateLimiterController implements TrafficShapingController {
    
    private final int maxQueueingTimeMs;  // 最大排队时间
    private final double count;           // 阈值
    private final AtomicLong latestPassedTime = new AtomicLong(-1);
    
    public RateLimiterController(int timeOut, double count) {
        this.maxQueueingTimeMs = timeOut;
        this.count = count;
    }
    
    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // 当请求来到的时间点,最近一次请求通过的时间点
        long currentTime = TimeUtil.currentTimeMillis();
        
        // 计算每个请求的理论通过时间间隔
        long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
        
        // 理论通过时间
        long expectedTime = costTime + latestPassedTime.get();
        
        if (expectedTime <= currentTime) {
            // 可以通过,更新最后通过时间
            latestPassedTime.set(currentTime);
            return true;
        } else {
            // 计算需要等待的时间
            long waitTime = costTime + latestPassedTime.get() - currentTime;
            
            if (waitTime > maxQueueingTimeMs) {
                // 等待时间超过最大排队时间,拒绝
                return false;
            } else {
                // 更新最后通过时间
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
                    // 等待
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                    // 处理中断
                    return false;
                }
            }
        }
    }
}

1.4 令牌桶算法:允许突发流量

令牌桶算法以恒定速率生成令牌放入桶中,请求需要获取令牌才能通过,当桶满时新令牌被丢弃,桶空时请求被拒绝。

算法原理图解:

text

令牌生成(10令牌/秒)→ [ 令牌桶(容量100) ] → 请求获取令牌
                     ↑                     ↓
                   恒定速率              突发获取
伪代码实现:

java

public class TokenBucketLimiter {
    private long capacity;          // 桶容量
    private long rate;              // 令牌生成速率(令牌/秒)
    private long tokens;            // 当前令牌数
    private long lastRefillTime;    // 上次补充令牌时间
    
    public TokenBucketLimiter(long capacity, long rate) {
        this.capacity = capacity;
        this.rate = rate;
        this.tokens = capacity;  // 初始时桶满
        this.lastRefillTime = System.currentTimeMillis();
    }
    
    public synchronized boolean tryAcquire() {
        refillTokens();  // 先补充令牌
        
        if (tokens > 0) {
            tokens--;  // 消耗令牌
            return true;
        } else {
            return false;  // 令牌不足,拒绝
        }
    }
    
    private void refillTokens() {
        long now = System.currentTimeMillis();
        long elapsedTime = now - lastRefillTime;
        
        // 计算这段时间应该补充的令牌数
        long refillAmount = elapsedTime * rate / 1000;
        
        if (refillAmount > 0) {
            tokens = Math.min(capacity, tokens + refillAmount);
            lastRefillTime = now;
        }
    }
}
Sentinel中的Warm Up预热模式:

Sentinel的Warm Up模式基于令牌桶算法实现,在冷启动阶段缓慢提升流量:

java

// 预热控制器
public class WarmUpController implements TrafficShapingController {
    
    protected double count;  // 阈值
    private int coldFactor;  // 冷启动因子,默认3
    private int warningToken = 0;  // 预警令牌数
    private int maxToken;  // 最大令牌数
    private double slope;  // 斜率
    
    private AtomicLong storedTokens = new AtomicLong(0);  // 当前存储的令牌数
    private AtomicLong lastFilledTime = new AtomicLong(0);  // 上次填充时间
    
    public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
        construct(count, warmUpPeriodInSec, coldFactor);
    }
    
    private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
        if (coldFactor <= 1) {
            throw new IllegalArgumentException("Cold factor should be larger than 1");
        }
        
        this.count = count;
        this.coldFactor = coldFactor;
        
        // 阈值除以冷启动因子,得到预热期间的阈值
        warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
        maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
        
        // 斜率 = (coldFactor - 1) / count / (maxToken - warningToken)
        slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
    }
    
    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        long passQps = (long) node.passQps();  // 当前通过的QPS
        
        long previousQps = (long) node.previousPassQps();  // 上一秒的QPS
        
        syncToken(previousQps);  // 同步令牌
        
        // 开始计算当前消耗的令牌数
        long restToken = storedTokens.get();
        
        if (restToken >= warningToken) {
            // 当前令牌数高于预警值,说明还在预热期
            long aboveToken = restToken - warningToken;
            
            // 警告区间消耗令牌的速度更快
            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            
            if (passQps + acquireCount <= warningQps) {
                return true;
            }
        } else {
            // 已经过了预热期,使用正常的阈值
            if (passQps + acquireCount <= count) {
                return true;
            }
        }
        
        return false;
    }
    
    // 同步令牌,根据上一秒的QPS计算当前应存储的令牌数
    protected void syncToken(long passQps) {
        long currentTime = TimeUtil.currentTimeMillis();
        currentTime = currentTime - currentTime % 1000;
        long oldLastFillTime = lastFilledTime.get();
        
        if (currentTime <= oldLastFillTime) {
            return;
        }
        
        long oldValue = storedTokens.get();
        long newValue = coolDownTokens(currentTime, passQps);
        
        if (storedTokens.compareAndSet(oldValue, newValue)) {
            // 从当前存储的令牌中减去上一秒的QPS
            long currentValue = storedTokens.addAndGet(0 - passQps);
            if (currentValue < 0) {
                storedTokens.set(0L);
            }
            lastFilledTime.set(currentTime);
        }
    }
    
    // 冷却(计算当前应存储的令牌数)
    private long coolDownTokens(long currentTime, long passQps) {
        long oldValue = storedTokens.get();
        long newValue = oldValue;
        
        // 添加令牌的前提:当前令牌数低于预警值
        if (oldValue < warningToken) {
            newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
        } else if (oldValue > warningToken) {
            if (passQps < (int)count / coldFactor) {
                // 当前QPS低于阈值/冷启动因子,说明系统压力不大,可以增加令牌
                newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
            }
        }
        
        return Math.min(newValue, maxToken);
    }
}

1.5 限流算法对比总结

算法 原理 优点 缺点 适用场景
计数器法 固定时间窗口计数 实现简单,内存消耗小 临界问题,精度低 简单限流场景
滑动窗口 多个小时间窗口滑动统计 统计更精确,解决临界问题 内存消耗较大 需要精确统计的场景
漏桶算法 恒定速率处理,桶满则溢 输出流量恒定,平滑突发 无法应对突发流量 需要恒定速率处理的场景
令牌桶算法 恒定速率生成令牌 允许突发流量,灵活性高 实现较复杂 需要应对突发流量的场景

二、Sentinel核心架构:Slot责任链模式

2.1 Sentinel整体架构设计

Sentinel采用Slot责任链模式处理每个请求,每个Slot负责不同的功能,形成完整的处理流水线:

text

请求入口
    ↓
NodeSelectorSlot(节点选择)
    ↓
ClusterBuilderSlot(集群构建)
    ↓
LogSlot(日志记录)
    ↓
StatisticSlot(统计)
    ↓
AuthoritySlot(授权)
    ↓
SystemSlot(系统保护)
    ↓
FlowSlot(流量控制)
    ↓
DegradeSlot(熔断降级)
    ↓
业务逻辑执行/返回结果

2.2 核心Slot源码分析

2.2.1 NodeSelectorSlot:资源调用树构建

java

// NodeSelectorSlot负责构建调用树,记录资源之间的调用关系
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
    // 线程本地变量,存储当前上下文
    private ThreadLocal<Context> contextThreadLocal = new ThreadLocal<>();
    
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, 
                     DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        
        // 获取或创建默认节点
        DefaultNode node = context.getCurNode();
        if (node == null) {
            // 这里可能会初始化入口节点
            node = new EntranceNode(new StringResourceWrapper(resourceWrapper.getName(), EntryType.IN));
        }
        
        // 设置当前节点
        context.setCurNode(node);
        
        // 触发下一个Slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
    
    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        // 清理上下文
        context.setCurNode(null);
        
        // 触发下一个Slot
        fireExit(context, resourceWrapper, count, args);
    }
}
2.2.2 StatisticSlot:指标统计核心

java

// StatisticSlot负责统计各项指标,是限流和熔断的基础
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, 
                     DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        try {
            // 触发下一个Slot
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
            
            // 请求通过,增加通过计数
            node.increaseThreadNum();
            node.addPassRequest(count);
            
            // 调用成功,增加成功计数
            if (context.getCurEntry().getError() == null) {
                node.increaseSuccess(count);
            }
            
        } catch (BlockException e) {
            // 被限流或降级,增加阻塞计数
            node.increaseBlockQps(count);
            throw e;
        } catch (Throwable e) {
            // 出现业务异常,增加异常计数
            node.increaseExceptionQps(count);
            throw e;
        }
    }
    
    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        // 减少线程数
        node.decreaseThreadNum();
        
        // 记录响应时间
        Entry entry = context.getCurEntry();
        if (entry.getError() == null) {
            long rt = TimeUtil.currentTimeMillis() - entry.getCreateTime();
            if (rt > TimeUtil.currentTimeMillis()) {
                rt = 0L;
            }
            node.addRt(rt);
        }
        
        // 触发下一个Slot
        fireExit(context, resourceWrapper, count, args);
    }
}
2.2.3 FlowSlot:流量控制实现

java

// FlowSlot负责流量控制,检查是否超过阈值
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
    private final FlowRuleChecker checker;
    
    public FlowSlot() {
        this(new FlowRuleChecker());
    }
    
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, 
                     DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        // 检查流量规则
        checker.checkFlow(resourceWrapper, context, node, count, prioritized);
        
        // 触发下一个Slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

// 流量规则检查器
class FlowRuleChecker {
    
    public void checkFlow(ResourceWrapper resource, Context context, 
                         DefaultNode node, int count, boolean prioritized) throws BlockException {
        
        // 检查所有流量规则
        List<FlowRule> rules = FlowRuleManager.getRulesForResource(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }
    
    private boolean canPassCheck(FlowRule rule, Context context, 
                                DefaultNode node, int acquireCount, boolean prioritized) {
        
        // 根据规则类型选择不同的检查策略
        switch (rule.getGrade()) {
            case RuleConstant.FLOW_GRADE_QPS:
                // QPS限流检查
                return passQpsCheck(rule, context, node, acquireCount, prioritized);
            case RuleConstant.FLOW_GRADE_THREAD:
                // 线程数限流检查
                return passThreadCheck(rule, context, node);
            default:
                return true;
        }
    }
    
    private boolean passQpsCheck(FlowRule rule, Context context, 
                                DefaultNode node, int acquireCount, boolean prioritized) {
        
        // 获取限流控制器
        TrafficShapingController controller = rule.getRater();
        
        if (controller == null) {
            // 根据策略创建对应的控制器
            switch (rule.getControlBehavior()) {
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
                    // 预热模式
                    controller = new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), 3);
                    break;
                case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
                    // 匀速排队模式
                    controller = new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
                    break;
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
                    // 预热+匀速排队模式
                    controller = new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
                            rule.getMaxQueueingTimeMs(), rule.getColdFactor());
                    break;
                default:
                    // 默认快速失败模式
                    controller = new DefaultController(rule.getCount(), rule.getGrade());
            }
            
            rule.setRater(controller);
        }
        
        // 检查是否可以通过
        return controller.canPass(node, acquireCount, prioritized);
    }
}
2.2.4 DegradeSlot:熔断降级实现

java

// DegradeSlot负责熔断降级检查
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, 
                     DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        // 检查熔断规则
        performChecking(context, resourceWrapper);
        
        // 触发下一个Slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
    
    private void performChecking(Context context, ResourceWrapper resource) throws BlockException {
        // 获取所有熔断规则
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(resource.getName());
        
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
            return;
        }
        
        for (CircuitBreaker cb : circuitBreakers) {
            // 检查每个熔断器
            if (!cb.tryPass(context)) {
                throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
            }
        }
    }
}

// 抽象熔断器
public abstract class AbstractCircuitBreaker implements CircuitBreaker {
    
    protected final DegradeRule rule;
    protected final int recoveryTimeoutMs;  // 恢复超时时间
    protected volatile long nextRetryTimestamp;  // 下次重试时间
    
    // 熔断器状态
    protected final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
    
    enum State {
        CLOSED,      // 关闭状态(正常)
        OPEN,        // 打开状态(熔断)
        HALF_OPEN    // 半开状态(探测恢复)
    }
    
    @Override
    public boolean tryPass(Context context) {
        // 处理特殊情况
        if (currentState() == State.CLOSED) {
            return true;
        }
        
        if (currentState() == State.OPEN) {
            // 对于熔断状态,如果已超过恢复时间,则切换到半开状态
            if (retryTimeoutArrived()) {
                return fromOpenToHalfOpen(context);
            }
            return false;
        }
        
        // 半开状态
        return false;
    }
    
    @Override
    public void onRequestComplete(Context context) {
        // 请求完成时的回调,更新统计信息
        Entry entry = context.getCurEntry();
        if (entry == null) {
            return;
        }
        
        Throwable error = entry.getError();
        // 更新成功/失败计数
        // 根据规则判断是否需要熔断
        if (currentState() == State.CLOSED) {
            // 检查是否需要触发熔断
            if (shouldPassCheck()) {
                // 触发熔断
                transformToOpen();
            }
        } else if (currentState() == State.HALF_OPEN) {
            // 半开状态下,根据请求结果决定切换到关闭还是打开状态
            if (error == null) {
                // 请求成功,切换到关闭状态
                fromHalfOpenToClose();
            } else {
                // 请求失败,重新切换到打开状态
                fromHalfOpenToOpen();
            }
        }
    }
}
2.2.5 SystemSlot:系统保护实现

java

// SystemSlot负责系统级别的保护
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
    private final SystemRuleManager systemRuleManager = SystemRuleManager.getInstance();
    
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, 
                     DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        // 检查系统规则
        SystemRuleManager.checkSystem(resourceWrapper);
        
        // 触发下一个Slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

// 系统规则管理器
public class SystemRuleManager {
    
    private static volatile Map<String, List<SystemRule>> systemRules = new HashMap<>();
    
    private static final SystemStatusListener statusListener = new SystemStatusListener();
    
    // 检查系统规则
    public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
        // 如果系统规则为空,直接返回
        if (systemRules == null) {
            return;
        }
        
        // 检查每个系统规则
        for (SystemRule rule : systemRules.values()) {
            // 根据规则类型进行检查
            switch (rule.getGrade()) {
                case RuleConstant.DEGRADE_GRADE_RT:
                    // 检查平均RT
                    if (rule.getCount() < getCurrentRt()) {
                        throw new SystemBlockException(rule.getLimitApp(), rule);
                    }
                    break;
                case RuleConstant.DEGRADE_GRADE_THREAD:
                    // 检查线程数
                    if (rule.getCount() < getCurrentThreadNum()) {
                        throw new SystemBlockException(rule.getLimitApp(), rule);
                    }
                    break;
                case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:
                    // 检查异常比例
                    if (rule.getCount() < getCurrentExceptionRatio()) {
                        throw new SystemBlockException(rule.getLimitApp(), rule);
                    }
                    break;
                case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:
                    // 检查异常数
                    if (rule.getCount() < getCurrentExceptionCount()) {
                        throw new SystemBlockException(rule.getLimitApp(), rule);
                    }
                    break;
            }
        }
    }
    
    // 获取当前平均RT
    private static double getCurrentRt() {
        // 从系统指标统计中获取
        return ENTRY_NODE.avgRt();
    }
    
    // 获取当前线程数
    private static double getCurrentThreadNum() {
        // 从系统指标统计中获取
        return ENTRY_NODE.curThreadNum();
    }
}

2.3 Slot责任链构建过程

Sentinel通过CtSph类构建Slot责任链:

java

public class CtSph implements Sph {
    
    private static final Object lock = new Object();
    
    // Slot责任链
    private static volatile ProcessorSlot<DefaultNode> chain;
    
    @Override
    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
        // 获取或创建Slot责任链
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
        
        // 创建上下文
        Context context = ContextUtil.getContext();
        if (context == null) {
            context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", type);
        }
        
        // 执行Slot链
        try {
            chain.entry(context, resourceWrapper, null, count, args);
        } catch (BlockException e1) {
            throw e1;
        } catch (Throwable e1) {
            throw new SystemBlockException(e1.getMessage());
        }
        
        // 返回Entry
        return new CtEntry(resourceWrapper, chain, context);
    }
    
    // 查找或创建Slot链
    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized (LOCK) {
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {
                    // 创建Slot链
                    chain = SlotChainProvider.newSlotChain();
                    chainMap.put(resourceWrapper, chain);
                }
            }
        }
        return chain;
    }
}

// Slot链构建器
public class DefaultSlotChainBuilder implements SlotChainBuilder {
    
    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        
        // 按照顺序添加Slot
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new FlowSlot());
        chain.addLast(new DegradeSlot());
        
        return chain;
    }
}

三、Sentinel性能优化与设计模式

3.1 高性能统计设计

Sentinel使用LongAdderStriped64实现高性能并发计数:

java

// 使用LongAdder替代AtomicLong提高并发性能
public class MetricBucket {
    private final LongAdder[] counters;
    
    public MetricBucket() {
        counters = new LongAdder[MetricEvent.values().length];
        for (int i = 0; i < counters.length; i++) {
            counters[i] = new LongAdder();
        }
    }
    
    public void add(MetricEvent event, long count) {
        counters[event.ordinal()].add(count);
    }
}

3.2 缓存优化

Sentinel使用缓存机制减少规则查找开销:

java

// 规则缓存
public class FlowRuleUtil {
    private static final ConcurrentMap<String, List<FlowRule>> flowRules = 
        new ConcurrentHashMap<>();
    
    // 带缓存的规则获取
    public static List<FlowRule> getRules(String resourceName) {
        List<FlowRule> rules = flowRules.get(resourceName);
        if (rules == null) {
            synchronized (flowRules) {
                rules = flowRules.get(resourceName);
                if (rules == null) {
                    // 从规则管理器获取
                    rules = FlowRuleManager.getRulesForResource(resourceName);
                    flowRules.put(resourceName, rules);
                }
            }
        }
        return rules;
    }
}

3.3 设计模式应用

3.3.1 责任链模式(Chain of Responsibility)

java

// 抽象的Slot接口
public interface ProcessorSlot<T> {
    void entry(Context context, ResourceWrapper resourceWrapper, 
              T param, int count, boolean prioritized, Object... args) throws Throwable;
    
    void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}

// Slot链实现
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
    private AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, 
                         Object param, int count, boolean prioritized, Object... args) throws Throwable {
            super.fireEntry(context, resourceWrapper, param, count, prioritized, args);
        }
        
        @Override
        public void exit(Context context, ResourceWrapper resourceWrapper, 
                        int count, Object... args) {
            super.fireExit(context, resourceWrapper, count, args);
        }
    };
    
    private AbstractLinkedProcessorSlot<?> end = first;
    
    @Override
    public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        protocolProcessor.setNext(first.getNext());
        first.setNext(protocolProcessor);
        if (end == first) {
            end = protocolProcessor;
        }
    }
    
    @Override
    public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        end.setNext(protocolProcessor);
        end = protocolProcessor;
    }
}
3.3.2 工厂模式(Factory Pattern)

java

// Slot链工厂
public final class SlotChainProvider {
    private static volatile SlotChainBuilder slotChainBuilder = null;
    
    public static ProcessorSlotChain newSlotChain() {
        if (slotChainBuilder == null) {
            synchronized (SlotChainProvider.class) {
                if (slotChainBuilder == null) {
                    // 使用SPI机制加载SlotChainBuilder
                    slotChainBuilder = new DefaultSlotChainBuilder();
                }
            }
        }
        return slotChainBuilder.build();
    }
}
3.3.3 策略模式(Strategy Pattern)

java

// 流量控制策略接口
public interface TrafficShapingController {
    boolean canPass(Node node, int acquireCount, boolean prioritized);
}

// 不同的策略实现
class DefaultController implements TrafficShapingController { /* 快速失败 */ }
class WarmUpController implements TrafficShapingController { /* 预热 */ }
class RateLimiterController implements TrafficShapingController { /* 匀速排队 */ }

四、总结与最佳实践

4.1 Sentinel核心设计要点

  1. 模块化设计:通过Slot责任链实现功能解耦,每个Slot职责单一

  2. 高性能统计:使用LongAdder和滑动窗口实现高并发下的精确统计

  3. 灵活的规则引擎:支持多种限流算法和熔断策略

  4. 可扩展架构:通过SPI机制支持自定义扩展

4.2 限流算法选择建议

场景 推荐算法 理由
API网关入口限流 令牌桶算法 允许突发流量,适合网关场景
内部服务保护 滑动窗口 统计精确,避免临界问题
数据库访问限流 漏桶算法 恒定速率,保护数据库
秒杀场景 预热+令牌桶 冷启动保护+允许合理突发

4.3 性能调优建议

  1. 合理设置滑动窗口参数

    java

    // 窗口越小统计越精确,但内存消耗越大
    // 推荐:500ms窗口,2个窗口组成1秒统计
    LeapArray<MetricBucket> data = new BucketLeapArray(2, 1000);
  2. 监控关键指标

    java

    // 监控内存使用
    Runtime runtime = Runtime.getRuntime();
    long usedMemory = runtime.totalMemory() - runtime.freeMemory();
    
    // 监控规则数量
    int ruleCount = FlowRuleManager.getRules().size();
  3. 合理配置JVM参数

    bash

    # Sentinel对内存和GC比较敏感
    -Xms2g -Xmx2g  # 堆内存
    -XX:+UseG1GC   # 使用G1垃圾收集器
    -XX:MaxGCPauseMillis=100  # 最大GC停顿时间

4.4 生产环境部署建议

  1. Sentinel Dashboard高可用

    bash

    # 多节点部署,使用Nginx负载均衡
    upstream sentinel_dashboard {
        server 192.168.1.101:8080;
        server 192.168.1.102:8080;
        server 192.168.1.103:8080;
    }
  2. 客户端配置优化

    yaml

    spring:
      cloud:
        sentinel:
          transport:
            dashboard: sentinel-dashboard.com:8080
            port: 8719
            client-ip: ${spring.cloud.client.ip-address}
          eager: true  # 立即初始化
          log:
            dir: /var/log/sentinel
  3. 规则持久化方案

    java

    // 使用Nacos作为规则配置中心
    @Configuration
    public class SentinelRuleConfig {
        
        @PostConstruct
        public void initRules() {
            // 从Nacos加载规则
            List<FlowRule> rules = loadRulesFromNacos();
            FlowRuleManager.loadRules(rules);
        }
    }
Logo

立足具身智能前沿赛道,致力于搭建全球化、开源化、全栈式技术交流与实践共创平台。

更多推荐