自适应限流以及四种经典限流算法

前言

在分布式系统中,如果某个服务节点发生故障或者网络发生异常,都有可能导致调用方被阻塞等待,如果超时时间设置很长,调用方资源很可能被耗尽。这又导致了调用方的上游系统发生资源耗尽的情况,最终导致系统雪崩。

要防止系统发生雪崩,就必须要有容错设计。如果遇到突增流量,一般的做法是对非核心业务功能采用熔断和服务降级的措施来保护核心业务功能正常服务,而对于核心功能服务,则需要采用限流的措施。

相信你看完本篇文章,一定能够对系统容错的常见策略——限流、熔断、降级有更深的理解和体会。

如果对同学有帮助的话,麻烦三连哦,不胜感激!!!

概述

2.1 熔断(客户端)

在服务的依赖调用中,被调用方出现故障时,出于自我保护的目的,调用方会主动停止调用,并根据业务需要进行相应处理。调用方这种主动停止调用的行为我们称之为熔断。

假定服务A依赖服务B,当服务B处于正常状态,整个调用是健康的,服务A可以得到服务B的正常响应。当服务B出现故障时,比如响应缓慢或者响应超时,如果服务A继续请求服务B,那么服务A的响应时间也会增加,进而导致服务A响应缓慢。如果服务A不进行熔断处理,服务B的故障会传导至服务A,最终导致服务A也不可用。


2.2 限流(服务端)

限流是针对服务请求数量的一种自我保护机制,当请求数量超出服务的处理能力时,会自动丢弃新来的请求。


任何一个系统的处理能力都是有极限的,假定服务A的处理能力为QPS=100,当QPS<100时服务A可以提供正常的服务。当QPS>100时,由于请求量增大,会出现争抢服务资源的情况(数据库连接、CPU、内存等),导致服务A处理缓慢;当QPS继续增大时,可能会造成服务A响应更加缓慢甚至奔溃。如果不进行限流控制,服务A始终会面临着被大流量冲击的风险。做好系统请求流量的评估,制定合理的限流策略,是我们进行系统高可用保护的第一步。

2.3 降级

降级是通过开关配置将某些不重要的业务功能屏蔽掉,以提高服务处理能力。在大促场景中经常会对某些服务进行降级处理,大促结束之后再进行复原。


在不影响业务核心链路的情况下,屏蔽某些不重要的业务功能,可以节省系统的处理时间,提供系统的响应能力,在服务器资源固定的前提下处理更多的请求。


源码拆解和分析

3.1 熔断

无论是令牌桶、漏桶还是自适应限流的方法,总的来说都是服务端的单机限流方式。虽然服务端限流虽然可以帮助我们抗住一定的压力,但是拒绝请求毕竟还是有成本的。如果我们的本来流量可以支撑 1w qps,加了限流可以支撑在 10w qps 的情况下,仍然可以提供 1w qps 的有效请求,但是流量突然再翻了 10 倍,来到 100w qps 那么服务该挂还是得挂。

所以我们的可用性建设不仅仅是服务端做建设就可以万事大吉了,得在整个链路上的每个组件都做好自己的事情才行,今天我们就来一起看一下客户端上的限流措施:熔断

熔断器存在三种状态:

3.1.1 方案对比


hystrix-go

Hystrix 是由 Netflex 开发的一款开源组件,提供了基础的熔断功能。 Hystrix 将降级的策略封装在 Command 中,提供了 runfallback 两个方法,前者表示正常的逻辑,比如微服务之间的调用……,如果发生了故障,再执行 fallback 方法返回结果,我们可以把它理解成保底操作。如果正常逻辑在短时间内频繁发生故障,那么可能会触发短路,也就是之后的请求不再执行 run, 而是直接执行 fallback。

hystrix-go 则是用 go 实现的 hystrix 版,更确切的说,是简化版。只是上一次更新还是 2018 年 的一次 pr, 也就毕业了?


使用方法

hystric实现熔断一般包括两步:

第一步:配置熔断规则

第二部:设置熔断逻辑

一个简单的:

// 第一步:配置熔断规则
hystrix.ConfigureCommand("wuqq", hystrix.CommandConfig{
        Timeout:                int(3 * time.Second),
        MaxConcurrentRequests:  10,
        SleepWindow:            5000,
        RequestVolumeThreshold: 10,
        ErrorPercentThreshold:  30,
    })
    
// 第二步:设置熔断逻辑
// Do是异步,Go是同步

    _ = hystrix.Do("wuqq", func() error {
        // talk to other services
        _, err := http.Get("https://www.baidu.com/")
        if err != nil {
            fmt.Println("get error:%v",err)
            return err
        }
        return nil
    }, func(err error) error {
        fmt.Printf("handle  error:%v
", err)
        return nil
    })

Do 函数需要三个参数,第一个参数 commmand 名称,你可以把每个名称当成一个独立当服务,第二个参数是处理正常的逻辑,比如 http 调用服务,返回参数是 err。如果处理调用失败,那么就执行第三个参数逻辑, 我们称为保底操作。由于服务错误率过高导致熔断器开启,那么之后的请求也直接回调此函数。

配置参数含义:

Timeout: 执行 command 的超时时间。

MaxConcurrentRequests:command 的最大并发量 。

SleepWindow:当熔断器被打开后,SleepWindow 的时间就是控制过多久后去尝试服务是否可用了。

RequestVolumeThreshold: 一个统计窗口 10 秒内请求数量。达到这个请求数量后才去判断是否要开启熔断

ErrorPercentThreshold:错误百分比,请求数量大于等于 RequestVolumeThreshold 并且错误率到达这个百分比后就会启动熔断


核心实现

核心实现的方法是 AllowRequest,IsOpen判断当前是否处于熔断状态,allowSingleTest就是去看是否过了一段时间需要重新进行尝试

func (circuit *CircuitBreaker) AllowRequest() bool {        
    return !circuit.IsOpen() || circuit.allowSingleTest()
}

IsOpen先看当前是否已经打开了,如果已经打开了就直接返回就行了,如果还没打开就去判断

func (circuit *CircuitBreaker) IsOpen() bool {
   circuit.mutex.RLock()
   o := circuit.forceOpen || circuit.open
   circuit.mutex.RUnlock()

   if o {
      return true
   }

   if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold {
      return false
   }

   if !circuit.metrics.IsHealthy(time.Now()) {
      // too many failures, open the circuit
      circuit.setOpen()
      return true
   }

   return false
}

hystrix-go已经可以比较好的满足我们的需求,但是存在一个问题就是一旦触发了熔断,在一段时间之内就会被一刀切的拦截请求,所以我们来看看 google sre 的一个实现


Google SRE保护算法

这个算法的好处是不会直接一刀切的丢弃所有请求,而是计算出一个概率来进行判断,当成功的请求数量越少,K越小的时候计算出的概率就越大,表示这个请求被丢弃的概率越大


Kratos源码分析

func (b *sreBreaker) Allow() error {
   // 统计成功的请求,和总的请求
   success, total := b.summary()

   // 计算当前的成功率
   k := b.k * float64(success)
   if log.V(5) {
      log.Info("breaker: request: %d, succee: %d, fail: %d", total, success, total-success)
   }
   // 统计请求量和成功率
   // 如果 qps 比较小,不触发熔断
   // 如果成功率比较高,不触发熔断,如果 k = 2,那么就是成功率 >= 50% 的时候就不熔断
   if total < b.request || float64(total) < k {
      if atomic.LoadInt32(&b.state) == StateOpen {
         atomic.CompareAndSwapInt32(&b.state, StateOpen, StateClosed)
      }
      return nil
   }
   if atomic.LoadInt32(&b.state) == StateClosed {
      atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen)
   }

   // 计算一个概率,当 dr 值越大,那么被丢弃的概率也就越大
   // dr 值是,如果失败率越高或者是 k 值越小,那么它越大
   dr := math.Max(0, (float64(total)-k)/float64(total+1))
   drop := b.trueOnProba(dr)
   if log.V(5) {
      log.Info("breaker: drop ratio: %f, drop: %t", dr, drop)
   }
   if drop {
      return ecode.ServiceUnavailable
   }
   return nil
}

// 通过随机来判断是否需要进行熔断
func (b *sreBreaker) trueOnProba(proba float64) (truth bool) {
   b.randLock.Lock()
   truth = b.r.Float64() < proba
   b.randLock.Unlock()
   return
}


熔断与failover结合的思想

一句话总结:请求先进入CircuitBreaker根据当前熔断器策略决定请求主集群或备集群,若请求主集群且主集群请求失败,则进入Failover逻辑Failover到备集群中获取数据。


3.2 限流

限流,也称流量控制。是指系统在面临高并发,或者大流量请求的情况下,限制新的请求对系统的访问,从而保证系统的稳定性。限流会导致部分用户请求处理不及时或者被拒,这就影响了用户体验。所以一般需要在系统稳定和用户体验之间平衡一下。

3.2.1 固定窗口

固定时间内对请求书进行限制,例如说每秒请求不超过50次,那就在0-1秒,1-2秒……n-n+1秒,每秒不超过50次请求。

可是会出现一个问题,在0.99秒和1.01秒分别有50次请求,对于固定窗口方法,不会限流,但是实际上在0.99秒-1.01秒,这一段不到1s的时间内已经达到了阙值的两倍,以下的滑动窗口方法可以解决这个问题。

3.2.2 滑动窗口

package main

import (
        "fmt"
        "sync"
        "time"
)

var winMu map[string]*sync.RWMutex

func init() {
        winMu = make(map[string]*sync.RWMutex)
}

type timeSlot struct {
        timestamp time.Time // 这个timeSlot的时间起点
        count     int       // 落在这个timeSlot内的请求数
}

func countReq(win []*timeSlot) int {
        var count int
        for _, ts := range win {
                count += ts.count
        }
        return count
}

type SlidingWindowLimiter struct {
        SlotDuration time.Duration // time slot的长度
        WinDuration  time.Duration // sliding window的长度
        numSlots     int           // window内最多有多少个slot
        windows      map[string][]*timeSlot
        maxReq       int // win duration内允许的最大请求数
}

func NewSliding(slotDuration time.Duration, winDuration time.Duration, maxReq int) *SlidingWindowLimiter {
        return &SlidingWindowLimiter{
                SlotDuration: slotDuration,
                WinDuration:  winDuration,
                numSlots:     int(winDuration / slotDuration),
                windows:      make(map[string][]*timeSlot),
                maxReq:       maxReq,
        }
}

// 获取user_id/ip的时间窗口
func (l *SlidingWindowLimiter) getWindow(uidOrIp string) []*timeSlot {
        win, ok := l.windows[uidOrIp]
        if !ok {
                win = make([]*timeSlot, 0, l.numSlots)
        }
        return win
}

func (l *SlidingWindowLimiter) storeWindow(uidOrIp string, win []*timeSlot) {
        l.windows[uidOrIp] = win
}

func (l *SlidingWindowLimiter) validate(uidOrIp string) bool {
        // 同一user_id/ip并发安全
        mu, ok := winMu[uidOrIp]
        if !ok {
                var m sync.RWMutex
                mu = &m
                winMu[uidOrIp] = mu
        }
        mu.Lock()
        defer mu.Unlock()

        win := l.getWindow(uidOrIp)
        now := time.Now()
        // 已经过期的time slot移出时间窗
        timeoutOffset := -1
        for i, ts := range win {
                if ts.timestamp.Add(l.WinDuration).After(now) {
                        break
                }
                timeoutOffset = i
        }
        if timeoutOffset > -1 {
                win = win[timeoutOffset+1:]
        }

        // 判断请求是否超限
        var result bool
        if countReq(win) < l.maxReq {
                result = true
        }

        // 记录这次的请求数
        var lastSlot *timeSlot
        if len(win) > 0 {
                lastSlot = win[len(win)-1]
                if lastSlot.timestamp.Add(l.SlotDuration).Before(now) {
                        lastSlot = &timeSlot{timestamp: now, count: 1}
                        win = append(win, lastSlot)
                } else {
                        lastSlot.count++
                }
        } else {
                lastSlot = &timeSlot{timestamp: now, count: 1}
                win = append(win, lastSlot)
        }

        l.storeWindow(uidOrIp, win)

        return result
}

func (l *SlidingWindowLimiter) getUidOrIp() string {
        return "127.0.0.1"
}

func (l *SlidingWindowLimiter) IsLimited() bool {
        return !l.validate(l.getUidOrIp())
}

func main() {
        limiter := NewSliding(100*time.Millisecond, time.Second, 10)
        for i := 0; i < 5; i++ {
                fmt.Println(limiter.IsLimited())
        }
        time.Sleep(100 * time.Millisecond)
        for i := 0; i < 5; i++ {
                fmt.Println(limiter.IsLimited())
        }
        fmt.Println(limiter.IsLimited())
        for _, v := range limiter.windows[limiter.getUidOrIp()] {
                fmt.Println(v.timestamp, v.count)
        }

        fmt.Println("a thousand years later...")
        time.Sleep(time.Second)
        for i := 0; i < 7; i++ {
                fmt.Println(limiter.IsLimited())
        }
        for _, v := range limiter.windows[limiter.getUidOrIp()] {
                fmt.Println(v.timestamp, v.count)
        }
}

3.2.3 漏桶

package main

import (
        "fmt"
        "sync"
        "time"
)

// 每个请求来了,把需要执行的业务逻辑封装成Task,放入木桶,等待worker取出执行
type Task struct {
        handler func() Result // worker从木桶中取出请求对象后要执行的业务逻辑函数
        resChan chan Result   // 等待worker执行并返回结果的channel
        taskID  int
}

// 封装业务逻辑的执行结果
type Result struct {
}

// 模拟业务逻辑的函数
func handler() Result {
        time.Sleep(300 * time.Millisecond)
        return Result{}
}

func NewTask(id int) Task {
        return Task{
                handler: handler,
                resChan: make(chan Result),
                taskID:  id,
        }
}

// 漏桶
type LeakyBucket struct {
        BucketSize int       // 木桶的大小
        NumWorker  int       // 同时从木桶中获取任务执行的worker数量
        bucket     chan Task // 存方任务的木桶
}

func NewLeakyBucket(bucketSize int, numWorker int) *LeakyBucket {
        return &LeakyBucket{
                BucketSize: bucketSize,
                NumWorker:  numWorker,
                bucket:     make(chan Task, bucketSize),
        }
}

func (b *LeakyBucket) validate(task Task) bool {
        // 如果木桶已经满了,返回false
        select {
        case b.bucket <- task:
        default:
                fmt.Printf("request[id=%d] is refused
", task.taskID)
                return false
        }

        // 等待worker执行
        <-task.resChan
        fmt.Printf("request[id=%d] is run
", task.taskID)
        return true
}

func (b *LeakyBucket) Start() {
        // 开启worker从木桶拉取任务执行
        go func() {
                for i := 0; i < b.NumWorker; i++ {
                        go func() {
                                for {
                                        task := <-b.bucket
                                        result := task.handler()
                                        task.resChan <- result
                                }
                        }()
                }
        }()
}

func main() {
        bucket := NewLeakyBucket(10, 4)
        bucket.Start()

        var wg sync.WaitGroup
        for i := 0; i < 20; i++ {
                wg.Add(1)
                go func(id int) {
                        defer wg.Done()
                        task := NewTask(id)
                        bucket.validate(task)
                }(i)
        }
        wg.Wait()
}
展开阅读全文

页面更新:2024-04-22

标签:错误率   熔断器   木桶   算法   流量   逻辑   数量   状态   核心   业务   经典   系统

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top