Sentinel核心架构源码深度剖析:从限流算法到Slot责任链实现
本文系统分析了Sentinel限流系统的核心架构与实现原理。主要内容包括:1)详细解析四种限流算法(计数器法、滑动窗口、漏桶、令牌桶)的实现机制与适用场景;2)深入剖析Sentinel责任链模式的设计,包括NodeSelectorSlot、StatisticSlot等核心组件的源码实现;3)介绍系统的高性能优化策略,如LongAdder并发计数和缓存机制;4)总结设计模式应用和最佳实践,提供算法选
一、限流算法理论基础与实现
1.1 计数器法:最简单直接的限流实现
计数器法是最基础也是最容易实现的限流算法,其核心思想是在固定时间窗口内统计请求次数,超过阈值则拒绝后续请求。
算法原理图解:
text
时间窗口:1分钟,阈值:100请求 ┌─────────────────────────────────┐ │ 0:00 0:30 0:59 │ │ ├──────┬──────┬──────┬───────┤│ │ │ 45 │ 30 │ 25 │ ││ │ └──────┴──────┴──────┴───────┘│ └─────────────────────────────────┘ 当0:59时,统计0:00-0:59的请求总数=100,新请求被拒绝
伪代码实现:
java
public class CounterLimiter {
private long timeStamp = System.currentTimeMillis(); // 当前时间
private long limitCount = 100; // 限流阈值
private long interval = 60 * 1000; // 时间窗口(毫秒)
private long requestCount = 0; // 当前窗口请求数
public synchronized boolean tryAcquire() {
long now = System.currentTimeMillis();
// 判断是否进入下一个时间窗口
if (now - timeStamp < interval) {
// 仍在当前窗口
if (requestCount < limitCount) {
requestCount++;
return true;
} else {
return false; // 限流
}
} else {
// 进入新的时间窗口
timeStamp = now;
requestCount = 1; // 重置计数器
return true;
}
}
}
计数器法的缺陷:
-
临界问题:在时间窗口切换瞬间可能出现流量突刺
-
精度问题:固定窗口导致统计不够精确
-
无法应对突发流量:窗口内前半段空闲,后半段突发流量仍会被拒绝
1.2 滑动时间窗口算法:解决计数器法的精度问题
滑动窗口算法是对计数器法的改进,将固定时间窗口划分为多个小格子,每次滑动一个格子,使统计更加精确平滑。
算法原理图解:
text
时间窗口:1分钟,划分为6个格子(每格10秒) 初始状态:格子0-5 [ 0-9s:15 ][10-19s:20][20-29s:25][30-39s:18][40-49s:22][50-59s:30] 总请求=130 滑动后:格子1-6 [10-19s:20][20-29s:25][30-39s:18][40-49s:22][50-59s:30][60-69s:0 ] 总请求=115 ↑新增格子 ↑移除最旧格子
Sentinel滑动窗口源码实现分析:
核心类:LeapArray 和 WindowWrap
java
// Sentinel中的滑动窗口核心数据结构
public abstract class LeapArray<T> {
// 窗口长度(毫秒)
protected int windowLengthInMs;
// 采样窗口数量
protected int sampleCount;
// 总的时间窗口长度(毫秒)= windowLengthInMs * sampleCount
protected int intervalInMs;
// 窗口数组
protected final AtomicReferenceArray<WindowWrap<T>> array;
// 获取当前时间窗口
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 计算当前时间对应的窗口索引
int idx = calculateTimeIdx(timeMillis);
// 计算窗口开始时间
long windowStart = calculateWindowStart(timeMillis);
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 创建新窗口
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
// 正好是当前窗口
return old;
} else if (windowStart > old.windowStart()) {
// 需要创建新窗口,替换旧窗口
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 不应该发生,说明时钟回拨
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
}
窗口包装类 WindowWrap:
java
public class WindowWrap<T> {
// 窗口长度(毫秒)
private final long windowLengthInMs;
// 窗口开始时间(毫秒)
private long windowStart;
// 窗口存储的数据(如统计值)
private T value;
// 添加值到窗口
public void addValue(T item) {
// 具体实现由子类决定
}
// 重置窗口
public void resetTo(long startTime) {
this.windowStart = startTime;
// 重置存储的数据
}
}
滑动窗口统计实现 BucketLeapArray:
java
public class BucketLeapArray extends LeapArray<MetricBucket> {
public BucketLeapArray(int sampleCount, int intervalInMs) {
// 默认每个窗口500ms,2个窗口组成1秒的时间窗口
super(sampleCount, intervalInMs);
}
@Override
public MetricBucket newEmptyBucket(long time) {
return new MetricBucket();
}
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> windowWrap, long startTime) {
windowWrap.resetTo(startTime);
return windowWrap;
}
}
统计值类 MetricBucket:
java
public class MetricBucket {
// 使用LongAdder保证并发安全和高性能
private final LongAdder[] counters;
// 统计维度
private static final int PASS = 0; // 通过
private static final int BLOCK = 1; // 阻塞
private static final int EXCEPTION = 2; // 异常
private static final int SUCCESS = 3; // 成功
private static final int RT = 4; // 响应时间
public MetricBucket() {
counters = new LongAdder[MetricEvent.values().length];
for (int i = 0; i < counters.length; i++) {
counters[i] = new LongAdder();
}
}
// 增加统计值
public void add(MetricEvent event, long count) {
counters[event.ordinal()].add(count);
}
// 获取统计值
public long get(MetricEvent event) {
return counters[event.ordinal()].sum();
}
}
滑动窗口算法的优势:
-
更精确的统计:通过多格子细分,统计更接近真实流量
-
平滑过渡:窗口滑动而非跳跃,避免临界问题
-
可配置精度:格子数越多,统计越精确(但内存消耗越大)
1.3 漏桶算法:恒定速率处理请求
漏桶算法模拟一个固定容量的漏桶,请求以任意速率进入桶内,但以恒定速率流出,当桶满时新请求被拒绝。
算法原理图解:
text
请求流入 → [ 漏桶(容量100) ] → 恒定速率流出(10请求/秒)
↑ ↑
任意速率 恒定速率
伪代码实现:
java
public class LeakyBucketLimiter {
private long capacity; // 桶容量
private long rate; // 流出速率(请求/秒)
private long water; // 当前水量(当前请求数)
private long lastLeakTime; // 上次漏水时间
public LeakyBucketLimiter(long capacity, long rate) {
this.capacity = capacity;
this.rate = rate;
this.water = 0;
this.lastLeakTime = System.currentTimeMillis();
}
public synchronized boolean tryAcquire() {
leakWater(); // 先漏水
if (water < capacity) {
water++; // 加水
return true;
} else {
return false; // 桶满,拒绝
}
}
private void leakWater() {
long now = System.currentTimeMillis();
long elapsedTime = now - lastLeakTime;
// 计算这段时间应该漏掉的水量
long leakAmount = elapsedTime * rate / 1000;
if (leakAmount > 0) {
water = Math.max(0, water - leakAmount);
lastLeakTime = now;
}
}
}
Sentinel中的漏桶算法实现:
Sentinel中的匀速排队(Rate Limiter)模式就是基于漏桶算法实现的:
java
// 匀速排队控制器
public class RateLimiterController implements TrafficShapingController {
private final int maxQueueingTimeMs; // 最大排队时间
private final double count; // 阈值
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public RateLimiterController(int timeOut, double count) {
this.maxQueueingTimeMs = timeOut;
this.count = count;
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 当请求来到的时间点,最近一次请求通过的时间点
long currentTime = TimeUtil.currentTimeMillis();
// 计算每个请求的理论通过时间间隔
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// 理论通过时间
long expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
// 可以通过,更新最后通过时间
latestPassedTime.set(currentTime);
return true;
} else {
// 计算需要等待的时间
long waitTime = costTime + latestPassedTime.get() - currentTime;
if (waitTime > maxQueueingTimeMs) {
// 等待时间超过最大排队时间,拒绝
return false;
} else {
// 更新最后通过时间
long oldTime = latestPassedTime.addAndGet(costTime);
try {
// 等待
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
// 处理中断
return false;
}
}
}
}
}
1.4 令牌桶算法:允许突发流量
令牌桶算法以恒定速率生成令牌放入桶中,请求需要获取令牌才能通过,当桶满时新令牌被丢弃,桶空时请求被拒绝。
算法原理图解:
text
令牌生成(10令牌/秒)→ [ 令牌桶(容量100) ] → 请求获取令牌
↑ ↓
恒定速率 突发获取
伪代码实现:
java
public class TokenBucketLimiter {
private long capacity; // 桶容量
private long rate; // 令牌生成速率(令牌/秒)
private long tokens; // 当前令牌数
private long lastRefillTime; // 上次补充令牌时间
public TokenBucketLimiter(long capacity, long rate) {
this.capacity = capacity;
this.rate = rate;
this.tokens = capacity; // 初始时桶满
this.lastRefillTime = System.currentTimeMillis();
}
public synchronized boolean tryAcquire() {
refillTokens(); // 先补充令牌
if (tokens > 0) {
tokens--; // 消耗令牌
return true;
} else {
return false; // 令牌不足,拒绝
}
}
private void refillTokens() {
long now = System.currentTimeMillis();
long elapsedTime = now - lastRefillTime;
// 计算这段时间应该补充的令牌数
long refillAmount = elapsedTime * rate / 1000;
if (refillAmount > 0) {
tokens = Math.min(capacity, tokens + refillAmount);
lastRefillTime = now;
}
}
}
Sentinel中的Warm Up预热模式:
Sentinel的Warm Up模式基于令牌桶算法实现,在冷启动阶段缓慢提升流量:
java
// 预热控制器
public class WarmUpController implements TrafficShapingController {
protected double count; // 阈值
private int coldFactor; // 冷启动因子,默认3
private int warningToken = 0; // 预警令牌数
private int maxToken; // 最大令牌数
private double slope; // 斜率
private AtomicLong storedTokens = new AtomicLong(0); // 当前存储的令牌数
private AtomicLong lastFilledTime = new AtomicLong(0); // 上次填充时间
public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
construct(count, warmUpPeriodInSec, coldFactor);
}
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
this.count = count;
this.coldFactor = coldFactor;
// 阈值除以冷启动因子,得到预热期间的阈值
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
// 斜率 = (coldFactor - 1) / count / (maxToken - warningToken)
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps(); // 当前通过的QPS
long previousQps = (long) node.previousPassQps(); // 上一秒的QPS
syncToken(previousQps); // 同步令牌
// 开始计算当前消耗的令牌数
long restToken = storedTokens.get();
if (restToken >= warningToken) {
// 当前令牌数高于预警值,说明还在预热期
long aboveToken = restToken - warningToken;
// 警告区间消耗令牌的速度更快
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
// 已经过了预热期,使用正常的阈值
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
// 同步令牌,根据上一秒的QPS计算当前应存储的令牌数
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}
long oldValue = storedTokens.get();
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
// 从当前存储的令牌中减去上一秒的QPS
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
// 冷却(计算当前应存储的令牌数)
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
// 添加令牌的前提:当前令牌数低于预警值
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
if (passQps < (int)count / coldFactor) {
// 当前QPS低于阈值/冷启动因子,说明系统压力不大,可以增加令牌
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
}
1.5 限流算法对比总结
| 算法 | 原理 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 计数器法 | 固定时间窗口计数 | 实现简单,内存消耗小 | 临界问题,精度低 | 简单限流场景 |
| 滑动窗口 | 多个小时间窗口滑动统计 | 统计更精确,解决临界问题 | 内存消耗较大 | 需要精确统计的场景 |
| 漏桶算法 | 恒定速率处理,桶满则溢 | 输出流量恒定,平滑突发 | 无法应对突发流量 | 需要恒定速率处理的场景 |
| 令牌桶算法 | 恒定速率生成令牌 | 允许突发流量,灵活性高 | 实现较复杂 | 需要应对突发流量的场景 |
二、Sentinel核心架构:Slot责任链模式
2.1 Sentinel整体架构设计
Sentinel采用Slot责任链模式处理每个请求,每个Slot负责不同的功能,形成完整的处理流水线:
text
请求入口
↓
NodeSelectorSlot(节点选择)
↓
ClusterBuilderSlot(集群构建)
↓
LogSlot(日志记录)
↓
StatisticSlot(统计)
↓
AuthoritySlot(授权)
↓
SystemSlot(系统保护)
↓
FlowSlot(流量控制)
↓
DegradeSlot(熔断降级)
↓
业务逻辑执行/返回结果
2.2 核心Slot源码分析
2.2.1 NodeSelectorSlot:资源调用树构建
java
// NodeSelectorSlot负责构建调用树,记录资源之间的调用关系
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
// 线程本地变量,存储当前上下文
private ThreadLocal<Context> contextThreadLocal = new ThreadLocal<>();
@Override
public void entry(Context context, ResourceWrapper resourceWrapper,
DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
// 获取或创建默认节点
DefaultNode node = context.getCurNode();
if (node == null) {
// 这里可能会初始化入口节点
node = new EntranceNode(new StringResourceWrapper(resourceWrapper.getName(), EntryType.IN));
}
// 设置当前节点
context.setCurNode(node);
// 触发下一个Slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
// 清理上下文
context.setCurNode(null);
// 触发下一个Slot
fireExit(context, resourceWrapper, count, args);
}
}
2.2.2 StatisticSlot:指标统计核心
java
// StatisticSlot负责统计各项指标,是限流和熔断的基础
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper,
DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
try {
// 触发下一个Slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 请求通过,增加通过计数
node.increaseThreadNum();
node.addPassRequest(count);
// 调用成功,增加成功计数
if (context.getCurEntry().getError() == null) {
node.increaseSuccess(count);
}
} catch (BlockException e) {
// 被限流或降级,增加阻塞计数
node.increaseBlockQps(count);
throw e;
} catch (Throwable e) {
// 出现业务异常,增加异常计数
node.increaseExceptionQps(count);
throw e;
}
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
// 减少线程数
node.decreaseThreadNum();
// 记录响应时间
Entry entry = context.getCurEntry();
if (entry.getError() == null) {
long rt = TimeUtil.currentTimeMillis() - entry.getCreateTime();
if (rt > TimeUtil.currentTimeMillis()) {
rt = 0L;
}
node.addRt(rt);
}
// 触发下一个Slot
fireExit(context, resourceWrapper, count, args);
}
}
2.2.3 FlowSlot:流量控制实现
java
// FlowSlot负责流量控制,检查是否超过阈值
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private final FlowRuleChecker checker;
public FlowSlot() {
this(new FlowRuleChecker());
}
@Override
public void entry(Context context, ResourceWrapper resourceWrapper,
DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
// 检查流量规则
checker.checkFlow(resourceWrapper, context, node, count, prioritized);
// 触发下一个Slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
// 流量规则检查器
class FlowRuleChecker {
public void checkFlow(ResourceWrapper resource, Context context,
DefaultNode node, int count, boolean prioritized) throws BlockException {
// 检查所有流量规则
List<FlowRule> rules = FlowRuleManager.getRulesForResource(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
private boolean canPassCheck(FlowRule rule, Context context,
DefaultNode node, int acquireCount, boolean prioritized) {
// 根据规则类型选择不同的检查策略
switch (rule.getGrade()) {
case RuleConstant.FLOW_GRADE_QPS:
// QPS限流检查
return passQpsCheck(rule, context, node, acquireCount, prioritized);
case RuleConstant.FLOW_GRADE_THREAD:
// 线程数限流检查
return passThreadCheck(rule, context, node);
default:
return true;
}
}
private boolean passQpsCheck(FlowRule rule, Context context,
DefaultNode node, int acquireCount, boolean prioritized) {
// 获取限流控制器
TrafficShapingController controller = rule.getRater();
if (controller == null) {
// 根据策略创建对应的控制器
switch (rule.getControlBehavior()) {
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
// 预热模式
controller = new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), 3);
break;
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
// 匀速排队模式
controller = new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
break;
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
// 预热+匀速排队模式
controller = new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), rule.getColdFactor());
break;
default:
// 默认快速失败模式
controller = new DefaultController(rule.getCount(), rule.getGrade());
}
rule.setRater(controller);
}
// 检查是否可以通过
return controller.canPass(node, acquireCount, prioritized);
}
}
2.2.4 DegradeSlot:熔断降级实现
java
// DegradeSlot负责熔断降级检查
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper,
DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
// 检查熔断规则
performChecking(context, resourceWrapper);
// 触发下一个Slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
private void performChecking(Context context, ResourceWrapper resource) throws BlockException {
// 获取所有熔断规则
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(resource.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
// 检查每个熔断器
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}
}
// 抽象熔断器
public abstract class AbstractCircuitBreaker implements CircuitBreaker {
protected final DegradeRule rule;
protected final int recoveryTimeoutMs; // 恢复超时时间
protected volatile long nextRetryTimestamp; // 下次重试时间
// 熔断器状态
protected final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
enum State {
CLOSED, // 关闭状态(正常)
OPEN, // 打开状态(熔断)
HALF_OPEN // 半开状态(探测恢复)
}
@Override
public boolean tryPass(Context context) {
// 处理特殊情况
if (currentState() == State.CLOSED) {
return true;
}
if (currentState() == State.OPEN) {
// 对于熔断状态,如果已超过恢复时间,则切换到半开状态
if (retryTimeoutArrived()) {
return fromOpenToHalfOpen(context);
}
return false;
}
// 半开状态
return false;
}
@Override
public void onRequestComplete(Context context) {
// 请求完成时的回调,更新统计信息
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
Throwable error = entry.getError();
// 更新成功/失败计数
// 根据规则判断是否需要熔断
if (currentState() == State.CLOSED) {
// 检查是否需要触发熔断
if (shouldPassCheck()) {
// 触发熔断
transformToOpen();
}
} else if (currentState() == State.HALF_OPEN) {
// 半开状态下,根据请求结果决定切换到关闭还是打开状态
if (error == null) {
// 请求成功,切换到关闭状态
fromHalfOpenToClose();
} else {
// 请求失败,重新切换到打开状态
fromHalfOpenToOpen();
}
}
}
}
2.2.5 SystemSlot:系统保护实现
java
// SystemSlot负责系统级别的保护
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private final SystemRuleManager systemRuleManager = SystemRuleManager.getInstance();
@Override
public void entry(Context context, ResourceWrapper resourceWrapper,
DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
// 检查系统规则
SystemRuleManager.checkSystem(resourceWrapper);
// 触发下一个Slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
// 系统规则管理器
public class SystemRuleManager {
private static volatile Map<String, List<SystemRule>> systemRules = new HashMap<>();
private static final SystemStatusListener statusListener = new SystemStatusListener();
// 检查系统规则
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
// 如果系统规则为空,直接返回
if (systemRules == null) {
return;
}
// 检查每个系统规则
for (SystemRule rule : systemRules.values()) {
// 根据规则类型进行检查
switch (rule.getGrade()) {
case RuleConstant.DEGRADE_GRADE_RT:
// 检查平均RT
if (rule.getCount() < getCurrentRt()) {
throw new SystemBlockException(rule.getLimitApp(), rule);
}
break;
case RuleConstant.DEGRADE_GRADE_THREAD:
// 检查线程数
if (rule.getCount() < getCurrentThreadNum()) {
throw new SystemBlockException(rule.getLimitApp(), rule);
}
break;
case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:
// 检查异常比例
if (rule.getCount() < getCurrentExceptionRatio()) {
throw new SystemBlockException(rule.getLimitApp(), rule);
}
break;
case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:
// 检查异常数
if (rule.getCount() < getCurrentExceptionCount()) {
throw new SystemBlockException(rule.getLimitApp(), rule);
}
break;
}
}
}
// 获取当前平均RT
private static double getCurrentRt() {
// 从系统指标统计中获取
return ENTRY_NODE.avgRt();
}
// 获取当前线程数
private static double getCurrentThreadNum() {
// 从系统指标统计中获取
return ENTRY_NODE.curThreadNum();
}
}
2.3 Slot责任链构建过程
Sentinel通过CtSph类构建Slot责任链:
java
public class CtSph implements Sph {
private static final Object lock = new Object();
// Slot责任链
private static volatile ProcessorSlot<DefaultNode> chain;
@Override
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
// 获取或创建Slot责任链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
// 创建上下文
Context context = ContextUtil.getContext();
if (context == null) {
context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", type);
}
// 执行Slot链
try {
chain.entry(context, resourceWrapper, null, count, args);
} catch (BlockException e1) {
throw e1;
} catch (Throwable e1) {
throw new SystemBlockException(e1.getMessage());
}
// 返回Entry
return new CtEntry(resourceWrapper, chain, context);
}
// 查找或创建Slot链
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// 创建Slot链
chain = SlotChainProvider.newSlotChain();
chainMap.put(resourceWrapper, chain);
}
}
}
return chain;
}
}
// Slot链构建器
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// 按照顺序添加Slot
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new SystemSlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
}
三、Sentinel性能优化与设计模式
3.1 高性能统计设计
Sentinel使用LongAdder和Striped64实现高性能并发计数:
java
// 使用LongAdder替代AtomicLong提高并发性能
public class MetricBucket {
private final LongAdder[] counters;
public MetricBucket() {
counters = new LongAdder[MetricEvent.values().length];
for (int i = 0; i < counters.length; i++) {
counters[i] = new LongAdder();
}
}
public void add(MetricEvent event, long count) {
counters[event.ordinal()].add(count);
}
}
3.2 缓存优化
Sentinel使用缓存机制减少规则查找开销:
java
// 规则缓存
public class FlowRuleUtil {
private static final ConcurrentMap<String, List<FlowRule>> flowRules =
new ConcurrentHashMap<>();
// 带缓存的规则获取
public static List<FlowRule> getRules(String resourceName) {
List<FlowRule> rules = flowRules.get(resourceName);
if (rules == null) {
synchronized (flowRules) {
rules = flowRules.get(resourceName);
if (rules == null) {
// 从规则管理器获取
rules = FlowRuleManager.getRulesForResource(resourceName);
flowRules.put(resourceName, rules);
}
}
}
return rules;
}
}
3.3 设计模式应用
3.3.1 责任链模式(Chain of Responsibility)
java
// 抽象的Slot接口
public interface ProcessorSlot<T> {
void entry(Context context, ResourceWrapper resourceWrapper,
T param, int count, boolean prioritized, Object... args) throws Throwable;
void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}
// Slot链实现
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
private AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper,
Object param, int count, boolean prioritized, Object... args) throws Throwable {
super.fireEntry(context, resourceWrapper, param, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper,
int count, Object... args) {
super.fireExit(context, resourceWrapper, count, args);
}
};
private AbstractLinkedProcessorSlot<?> end = first;
@Override
public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
protocolProcessor.setNext(first.getNext());
first.setNext(protocolProcessor);
if (end == first) {
end = protocolProcessor;
}
}
@Override
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
end.setNext(protocolProcessor);
end = protocolProcessor;
}
}
3.3.2 工厂模式(Factory Pattern)
java
// Slot链工厂
public final class SlotChainProvider {
private static volatile SlotChainBuilder slotChainBuilder = null;
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder == null) {
synchronized (SlotChainProvider.class) {
if (slotChainBuilder == null) {
// 使用SPI机制加载SlotChainBuilder
slotChainBuilder = new DefaultSlotChainBuilder();
}
}
}
return slotChainBuilder.build();
}
}
3.3.3 策略模式(Strategy Pattern)
java
// 流量控制策略接口
public interface TrafficShapingController {
boolean canPass(Node node, int acquireCount, boolean prioritized);
}
// 不同的策略实现
class DefaultController implements TrafficShapingController { /* 快速失败 */ }
class WarmUpController implements TrafficShapingController { /* 预热 */ }
class RateLimiterController implements TrafficShapingController { /* 匀速排队 */ }
四、总结与最佳实践
4.1 Sentinel核心设计要点
-
模块化设计:通过Slot责任链实现功能解耦,每个Slot职责单一
-
高性能统计:使用LongAdder和滑动窗口实现高并发下的精确统计
-
灵活的规则引擎:支持多种限流算法和熔断策略
-
可扩展架构:通过SPI机制支持自定义扩展
4.2 限流算法选择建议
| 场景 | 推荐算法 | 理由 |
|---|---|---|
| API网关入口限流 | 令牌桶算法 | 允许突发流量,适合网关场景 |
| 内部服务保护 | 滑动窗口 | 统计精确,避免临界问题 |
| 数据库访问限流 | 漏桶算法 | 恒定速率,保护数据库 |
| 秒杀场景 | 预热+令牌桶 | 冷启动保护+允许合理突发 |
4.3 性能调优建议
-
合理设置滑动窗口参数:
java
// 窗口越小统计越精确,但内存消耗越大 // 推荐:500ms窗口,2个窗口组成1秒统计 LeapArray<MetricBucket> data = new BucketLeapArray(2, 1000);
-
监控关键指标:
java
// 监控内存使用 Runtime runtime = Runtime.getRuntime(); long usedMemory = runtime.totalMemory() - runtime.freeMemory(); // 监控规则数量 int ruleCount = FlowRuleManager.getRules().size();
-
合理配置JVM参数:
bash
# Sentinel对内存和GC比较敏感 -Xms2g -Xmx2g # 堆内存 -XX:+UseG1GC # 使用G1垃圾收集器 -XX:MaxGCPauseMillis=100 # 最大GC停顿时间
4.4 生产环境部署建议
-
Sentinel Dashboard高可用:
bash
# 多节点部署,使用Nginx负载均衡 upstream sentinel_dashboard { server 192.168.1.101:8080; server 192.168.1.102:8080; server 192.168.1.103:8080; } -
客户端配置优化:
yaml
spring: cloud: sentinel: transport: dashboard: sentinel-dashboard.com:8080 port: 8719 client-ip: ${spring.cloud.client.ip-address} eager: true # 立即初始化 log: dir: /var/log/sentinel -
规则持久化方案:
java
// 使用Nacos作为规则配置中心 @Configuration public class SentinelRuleConfig { @PostConstruct public void initRules() { // 从Nacos加载规则 List<FlowRule> rules = loadRulesFromNacos(); FlowRuleManager.loadRules(rules); } }
更多推荐

所有评论(0)