滑动时间窗口
首先来看时间窗口,假设我们要统计QPS(每秒内的访问数),并且要求QPS<=200。在如下两个时间窗口内,各自的时间窗口内进行统计,大致一看没啥问题。
再看下图,在每一个时间窗口内都没啥问题,所以根据统计结果不会限流。但是在第一个的后500ms+第二个的前500ms内的QPS=380>200,超出了系统的承受能力。所以此时出现了滑动事件窗口。
滑动时间窗口将原来的1s跨度窗口按照需求进行拆分,比如拆分成2个500ms的窗口,这样随着窗口的滑动,中间的两个500ms也能组成一个1s的窗口,此时发现这个窗口内的QPS=380>200,会进行限流。当然,按照假设如下的四个窗口中,第一个的后250ms+第二个窗口500ms+第三个窗口的前250ms的访问量之和也可能大于200,为了解决这个问题,可以将窗口再拆分成250ms跨度的。所以,滑动窗口随着拆分粒度越小,准确度越高,但是单来的计算消耗,空间占用也会越大,sentinel 秒级滑动窗口默认是500ms的跨度。
随着时间的推移,如果不采用一些手段,每500ms,就要新建一个时间窗口,我们发现时间窗口会越来越多,内存占用会越来越大,而一段时间前的滑动窗口可能已经没用了。此时有两种方案,一是新建窗口 + 回收老窗口;二是直接进行窗口复用,后者更容易实现且后者的好处更多。sentinel 使用了窗口复用机制。
由于我们要做到窗口复用,那么每个时间窗口就需要记录时间窗口时间,不然,由于窗口覆盖机制,我们不知道窗口具体统计的时哪一个时段。来看下 sentinel 滑动时间窗口的最终设计。
sentinel 实现滑动窗口
计数:获取当前的 WindowWrapper,然后取出其中的 MetricBucket 进行计数。
取值:计算一个 QPS 时,需要获取整个秒级滑动窗口内的所有 WindowWrapper,然后累加器 MetricBucket 的相关统计数据即可。
这里只讲最核心的部分,sentinel 的滑动窗口还做了一些其他事情,比如“预占用”。假设当前时间窗口的名额已满,但是当前这个请求很重要且可以支持一定的等待时间,其会预占用下一时间窗口的名额,然后其线程wait到下一个时间窗口后开始运行。
计数器 MetricBucket
/**
* Represents metrics data in a period of time span.
* 统计一段时间内(采样周期)的指标数据
*
* @author jialiang.linjl
* @author Eric Zhao
*/
public class MetricBucket {
/**
* 存储各项指标计数
* 指标类型:MetricEvent.values()
*/
private final LongAdder[] counters;
/**
* 记录最小耗时
*/
private volatile long minRt;
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
public MetricBucket reset(MetricBucket bucket) {
for (MetricEvent event : MetricEvent.values()) {
counters[event.ordinal()].reset();
counters[event.ordinal()].add(bucket.get(event));
}
initMinRt();
return this;
}
private void initMinRt() {
this.minRt = SentinelConfig.statisticMaxRt();
}
/**
* Reset the adders.
*
* @return new metric bucket in initial state
*/
public MetricBucket reset() {
for (MetricEvent event : MetricEvent.values()) {
counters[event.ordinal()].reset();
}
initMinRt();
return this;
}
public long get(MetricEvent event) {
return counters[event.ordinal()].sum();
}
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
public long pass() {
return get(MetricEvent.PASS);
}
public long block() {
return get(MetricEvent.BLOCK);
}
...
public long minRt() {
return minRt;
}
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
public void addBlock(int n) {
add(MetricEvent.BLOCK, n);
}
public void addRT(long rt) {
add(MetricEvent.RT, rt);
// Not thread-safe, but it's okay.
if (rt < minRt) {
minRt = rt;
}
}
这里统计数据用的是 LongAdder,在高并发下其相较于 AtomicInteger 和 AtomicLong 具有更高的吞吐,更高的性能。
LongAdder longAdder = new LongAdder();
longAdder.increment(); // 等价于 longAdder.add(1)
longAdder.add(1);
longAdder.add(1);
System.out.println(longAdder.sum());
时间窗口 WindowWrapper
public class WindowWrap<T> {
/**
* Time length of a single window bucket in milliseconds.
* 时间窗口长度
*/
private final long windowLengthInMs;
/**
* Start timestamp of the window in milliseconds.
* 当前时间窗口的开始时间
*/
private long windowStart;
/**
* Statistic data.
* 统计数据:即 MetricBucket
*/
private T value;
/**
* @param windowLengthInMs a single window bucket's time length in milliseconds.
* @param windowStart the start timestamp of the window
* @param value statistic data
*/
public WindowWrap(long windowLengthInMs, long windowStart, T value) {
this.windowLengthInMs = windowLengthInMs;
this.windowStart = windowStart;
this.value = value;
}
public long windowLength() {
return windowLengthInMs;
}
public long windowStart() {
return windowStart;
}
public T value() {
return value;
}
public void setValue(T value) {
this.value = value;
}
/**
* Reset start timestamp of current bucket to provided time.
*
* @param startTime valid start timestamp
* @return bucket after reset
*/
public WindowWrap<T> resetTo(long startTime) {
this.windowStart = startTime;
return this;
}
/**
* Check whether given timestamp is in current bucket.
* 传入的时间是否在当前的时间窗口内
*
* @param timeMillis valid timestamp in ms
* @return true if the given time is in current bucket, otherwise false
* @since 1.5.0
*/
public boolean isTimeInWindow(long timeMillis) {
return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs;
}
}
滑动窗口 LeapArray
public abstract class LeapArray<T> {
/**
* 每个窗口 windowWrapper(metricBucket) 的大小
* 秒级:500ms
* 分钟级:1000ms
*/
protected int windowLengthInMs;
/**
* 滑动窗口的窗口个数
* 秒级:默认为2
* 分钟级:默认为60
*/
protected int sampleCount;
/**
* 滑动窗口的时间跨度,
* 秒级:1000ms
* 分钟级:60*1000ms
*/
protected int intervalInMs;
/**
* 滑动窗口的时间跨度,
* 秒级:1s
* 分钟级:60s
*/
private double intervalInSecond;
/**
* 滑动时间窗口
*/
protected final AtomicReferenceArray<WindowWrap<T>> array;
/**
* The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
*
* @param sampleCount bucket count of the sliding window
* @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
*/
public LeapArray(int sampleCount, int intervalInMs) {
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
/**
* Get the bucket at current timestamp.
*
* @return the bucket at current timestamp
*/
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
/**
* Create a new statistic value for bucket.
*
* @param timeMillis current time in milliseconds
* @return the new empty bucket
*/
public abstract T newEmptyBucket(long timeMillis);
/**
* Reset given bucket to provided start time and reset the value.
*
* @param startTime the start time of the bucket in milliseconds
* @param windowWrap current bucket
* @return new clean bucket at given start time
*/
protected abstract WindowWrap<T> resetWindowTo(WindowWrap<T> windowWrap, long startTime);
/**
* 计算当前时间在滑动窗口数组中的索引位置
* @param timeMillis
* @return
*/
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int)(timeId % array.length());
}
/**
* 计算某一个 windowWrapper 的开始时间
* @param timeMillis
* @return
*/
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
/**
* Get bucket item at provided timestamp.
*
* @param timeMillis a valid timestamp in milliseconds
* @return current bucket item at provided timestamp if the time is valid; null if time is invalid
*/
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 计算给定时间在滑动窗口数组中的索引位置
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
* 从滑动时间窗口中根据给定时间获取 时间窗口
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array. 如果 Bucket 不存在,则新建,并且放置到滑动时间窗口中
* (2) Bucket is up-to-date, then just return the bucket. 如果查询到的 Bucket 就是当前时间节点的 Bucket,则直接返回
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets. 如果获取到的 Bucket 已经过期了,则重置 Bucket,用于新的统计计算
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
}