五分钟搞懂分布式流控算法原理和滑动窗口设计实现

流控的使用场景

固定窗口算法

图片来自互联网,如有侵权请联系作者删除

设计原理

维护一个单位时间内的计数值,每当一个请求通过时,就将计数值加1,当计数值超过预先设定的阈值时,就拒绝单位时间内的其他请求

问题

假设我们设定1秒内允许通过的请求阈值是99,如果有用户在时间窗口的最后几毫秒发送了99个请求,紧接着又在下一个时间窗口开始时发送了99个请求,那么这个用户其实在一秒内成功请求了198次,显然超过了阈值但并不会被限流(会有突刺问题)

滑动窗口算法

图片来自互联网,如有侵权请联系作者删除

设计原理

假设我们设定1秒内允许通过的请求是200个,但是在这里我们需要把1秒的时间分成多格,假设分成5格(格数越多,流量过渡越平滑),每格窗口的时间大小是200毫秒,每过200毫秒,就将窗口向前移动一格

问题

流量的过渡是否平滑依赖于我们设置的窗口格数也就是统计时间间隔,格数越多,统计越精确,但是具体要分多少呢?

小总结: 固定窗口 和 滑动窗口 解决:单位时间总流量

漏斗算法

图片来自互联网,如有侵权请联系作者删除


设计原理

漏斗算法以一个常量限制了出口流量速率,因此漏斗算法可以应对平滑突发的流量。其中漏斗作为流量容器我们可以看做一个FIFO的队列,当入口流量速率大于出口流量速率时,因为流量容器是有限的,当超出流量容器大小时,超出的流量会被丢弃

优点

  1. 可以平滑限制请求的处理速度,避免瞬间请求过多导致系统崩溃或者雪崩
  2. 可以控制请求的处理速度,使得系统可以适应不同的流量需求,避免过载或者过度闲置。
  3. 可以通过调整桶的大小和漏出速率来满足不同的限流需求,可以灵活地适应不同的场景。

问题

因为漏桶算法限制了流出速率是一个固定常量值,所以漏桶算法不支持出现突发流出流量。但是在实际情况下,流量往往是突发的。

令牌桶算法

图片来自互联网,如有侵权请联系作者删除

设计原理

以恒定速率往令牌桶里加入令牌,令牌桶被装满时,多余的令牌会被丢 弃。当请求到来时,会先尝试从令牌桶获取令牌(相当于从令牌桶移除一个令牌),获取成功则请求被放行,获取失败则阻塞或直接拒绝请求

算法实现

目前来说滑动窗口的实现分为两种【环形和线性】,环形的代表是Sentinel的LeapArray,线性的是EasyRetry自研的SlidingWindow,下面分别介绍这两种的设计与实现

LeapArray【Sentinel】

核心字段

javaprotected int windowLengthInMs;  // 窗口长度 (intervalInMs / sampleCount)
protected int sampleCount;  // 总窗口间隔
protected int intervalInMs; // 总窗口时间 单位毫秒
private double intervalInSecond; // 总窗口时间 单位秒

核心算法

计算起始时间【windowStart】 => 利用求余运算,保证区间内桶的开始时间是一致的

javaprotected long calculateWindowStart(long timeMillis) {
  return timeMillis - timeMillis % windowLengthInMs;
}

计算下标 【timeIdx】 => 利用除法取整和求余运算,保证区间内桶的下标位是一致的

javapublic int calculateTimeIdx(long timeMillis) {
  long timeId = timeMillis / windowLengthInMs;
  return (int) (timeId % array.length());
}

获取窗口对象

java// 根据给定的时间戳计算出在存储桶数组中的索引位置。存储桶数组是一个循环数组,存储了不同时间窗口的计数桶。
int idx = calculateTimeIdx(timeMillis);

// 根据给定的时间戳计算出对应时间窗口的开始时间。
long windowStart = calculateWindowStart(timeMillis);

while (true) {
    // 获取环形数组中对象下标的窗口对象
    WindowWrap old = array.get(idx);
    if (old == null) {
        // 不存在则创建窗口
        WindowWrap window = new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        // 通过CAS(Compare-And-Swap)原子操作来确保只有一个线程能够成功插入
        if (array.compareAndSet(idx, null, window)) {
           // 如果插入成功,返回新创建的计数桶;
            return window;
        } else {
        //  如果插入失败,线程将让出时间片,等待下一次循环。
            Thread.yield();
        }
    } else if (windowStart == old.windowStart()) {
    // 如果找到的计数桶的开始时间与给定时间戳的开始时间一致,说明该计数桶是最新的,并且当前时间戳处于这个时间窗口内。直接返回找到的计数桶。
        return old;
    } else if (windowStart > old.windowStart()) {
// 如果找到的计数桶的开始时间早于给定时间戳的开始时间,说明该计数桶已过时(不再使用)。
// 在这种情况下,算法会尝试获取一个更新锁(updateLock)来对这个过时的计数桶进行重置
//如果成功获取锁,就会调用 `resetWindowTo` 方法来重置过时的计数桶,并返回重置后的计数桶。如果获取锁失败,线程将让出时间片,等待下一次循环。
        if (updateLock.tryLock()) {
            try {
                return resetWindowTo(old, windowStart);
            } finally {
                updateLock.unlock();
            }
        } else {
            Thread.yield();
        }
    } else if (windowStart < old.windowStart()) {
    // 如果找到的计数桶的开始时间晚于给定时间戳的开始时间,这是一个不应该出现的情况,代码中并没有详细处理这种情况,只是返回一个新创建的空计数桶。
        return new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
    }
}

完整代码实现: https://github.com/alibaba/Sentinel/blob/v1.8.0/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java

SlidingWindow[【EasyRetry】]https://www.easyretry.com/)

核心字段

java/**  
* 这是一个用于存储数据的 TreeMap,其中的键为窗口期的开始时间(LocalDateTime 类型),
* 值为一个并发链表队列(ConcurrentLinkedQueue),用于存储在该窗口期内的数据。  
*/  
public final TreeMap> saveData = new TreeMap<>();  
  
/**  
* 总量窗口期阈值,指定一个窗口期内数据的最大数量。
*/  
private final Integer totalThreshold;  
  
/**  
* 开启的窗口数据预警阈值,当存活的窗口数量过多时会进行预警。
*/  
private final Integer windowTotalThreshold;  
  
/**  
* 一个监听器列表,用于处理达到窗口期阈值时的操作。
*/  
private final List> listeners;

/**  
* 新增窗口锁  
*/  
private static final ReentrantLock SAVE_LOCK = new ReentrantLock();  
  
/**  
* 到达时间窗口期或者总量窗口期锁  
*/  
private static final ReentrantLock NOTICE_LOCK = new ReentrantLock();

新增数据&&开启新窗口

javapublic void add(T data) {

    LocalDateTime now = LocalDateTime.now();
    if (isOpenNewWindow(now)) {

        SAVE_LOCK.lock();
        LocalDateTime windowPeriod = now.plus(duration, chronoUnit);
        try {

            // 防止开启两个间隔时间小于窗口期的窗口
            if (isOpenNewWindow(now)) {
                ConcurrentLinkedQueue list = new ConcurrentLinkedQueue<>();
                list.add(data);

                LogUtils
                    .info(log, "添加新数据 [{}] [{}] size:[{}]", windowPeriod, Thread.currentThread().getName(), list.size());
                saveData.put(windowPeriod, list);

                // 扫描n-1个窗口,是否过期,过期则删除
                removeInvalidWindow();

                // 超过窗口阈值预警
                alarmWindowTotal();

            } else {
                oldWindowAdd(data);
            }

        } finally {
            SAVE_LOCK.unlock();
        }

    } else {
        oldWindowAdd(data);
    }

}

往已存在的窗口期内添加数据

java    private void oldWindowAdd(T data) {

        LocalDateTime windowPeriod = getNewWindowPeriod();
        // 添加数据
        ConcurrentLinkedQueue list = saveData.get(windowPeriod);
        list.add(data);

        // 到达总量窗口期,将数据传递给监听器进行处理。
        if (list.size() >= totalThreshold) {
            doHandlerListener(windowPeriod);
        }

    }

处理到达窗口期的数据监听器

javaprivate void doHandlerListener(LocalDateTime windowPeriod) {

    NOTICE_LOCK.lock();

    try {

        ConcurrentLinkedQueue list = saveData.get(windowPeriod);
        if (CollectionUtils.isEmpty(list)) {
            return;
        }

        // 深拷贝
        ConcurrentLinkedQueue deepCopy = new ConcurrentLinkedQueue<>(list);
        clear(windowPeriod, deepCopy);

        if (CollectionUtils.isEmpty(deepCopy)) {
            return;
        }

        for (Listener listener : listeners) {
            // 用户自定义实现具体的数据处理逻辑
            listener.handler(new ArrayList<>(deepCopy));
        }

    } catch (Exception e) {
        log.error("到达总量窗口期通知异常", e);
    } finally {
        NOTICE_LOCK.unlock();
    }

}

删除过期窗口

扫描n-1个窗口,是否过期,过期则删除 过期条件为窗口期内无数据

java    private void removeInvalidWindow() {

        for (int i = 0; i < saveData.size() - 1; i++) {
            Map.Entry> firstEntry = saveData.firstEntry();
            if (CollectionUtils.isEmpty(firstEntry.getValue())) {
                saveData.remove(firstEntry.getKey());
            }
        }
    }

完成代码实现: https://gitee.com/aizuda/easy-retry/blob/master/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/window/SlidingWindow.java

项目实战

本篇文章多次提到EasyRetry,想必小伙伴们都很好奇,下面简单介绍一下EasyRetry

EasyRetry是致力提高分布式业务系统一致性的分布式重试平台,它提供了控制台任务观测、可配置的重试策略、重试后执行回调以及丰富地告警配置等功能。通过这些手段,可以对异常数据进行全面监测和回放,从而在确保系统高可用性的同时,大大提升数据的一致性。详细了解

EasyRetry为什么使用滑动窗口?

场景一 客户端批量上报异常数据

EasyRetry作为高性能的分布式重试平台,从设计之初就充分考虑了重试数据上报的集中性和异步特点.使用滑动窗口批量上报异常数据可以减少网络传输的频率,从而降低网络开销。滑动窗口可以将一定时间内的异常数据进行缓冲和合并,然后一次性发送,减少了频繁的网络通信。

具体使用案例

java// 滑动窗口的参数配置
SlidingWindowConfig slidingWindowConfig = easyRetryProperties.getSlidingWindow();  
  
slidingWindow = SlidingWindow  
    .Builder  
    .newBuilder()  
    .withTotalThreshold(slidingWindowConfig.getTotalThreshold())  
    .withWindowTotalThreshold(slidingWindowConfig.getWindowTotalThreshold())  
    .withDuration(slidingWindowConfig.getDuration(), slidingWindowConfig.getChronoUnit())  
    .withListener(new ReportListener())  
.build();  
  
slidingWindow.start();

场景二 重试流量的统计(计划在后期版本实现)

通过LeapArray对重试流量进行统计,EasyRetry将进一步实现重试风暴的管控,从而确保服务的稳定性并提升服务端的质量。

有兴趣的小伙伴可以关注EasyRetry

展开阅读全文

页面更新:2024-02-24

标签:算法   窗口   阈值   令牌   分布式   速率   流量   原理   时间   数据   系统   窗口期

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top