go使用sentinel实现限流和熔断重连

go使用sentinel实现限流和熔断重连

package main

import (
   "errors"
   "fmt"
   "log"
   "math/rand"
   "time"

   sentinel "github.com/alibaba/sentinel-golang/api"
   "github.com/alibaba/sentinel-golang/core/base"
   "github.com/alibaba/sentinel-golang/core/circuitbreaker"
   "github.com/alibaba/sentinel-golang/core/config"
   "github.com/alibaba/sentinel-golang/core/flow"
   "github.com/alibaba/sentinel-golang/logging"
   "github.com/alibaba/sentinel-golang/util"
)

func main() {
   err := sentinel.InitDefault()
   if err != nil {
      fmt.Println(err)
      return
   }

   _, err = flow.LoadRules([]*flow.Rule{
      {
         Resource:               "some-test",
         TokenCalculateStrategy: flow.Direct, // 直接
         ControlBehavior:        flow.Reject, // 直接拒绝
         Threshold:              10,          // 允许多少个
         StatIntervalInMs:       1000,        // 多长时间内
      },
      {
         Resource:               "some-test1",
         TokenCalculateStrategy: flow.Direct,     // 直接
         ControlBehavior:        flow.Throttling, // 匀速通过
         Threshold:              10,              // 允许多少个
         StatIntervalInMs:       1000,            // 多长时间内  当前设置代表1秒只允许10个,相当于每100毫秒放一个
      },
      {
         Resource:               "some-test2",
         TokenCalculateStrategy: flow.WarmUp, // 冷启动  预热方式  缓慢增长访问量
         ControlBehavior:        flow.Reject, // 直接拒绝
         Threshold:              1000,        // 允许多少个
         WarmUpPeriodSec:        10,          // 多长时间内达到顶峰
         WarmUpColdFactor:       3,           // 预热因子,默认3
      },
   })
   if err != nil {
      fmt.Println(err)
      return
   }

   // qps()
   // warmup()
   // throttling()
   breakerCount()
}

type stateChangeTestListener struct {
}

func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) {
   fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}

func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) {
   fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %d, time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis())
}

func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) {
   fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}

func breakerCount() {
   conf := config.NewDefaultConfig()
   // for testing, logging output to console
   conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
   err := sentinel.InitWithConfig(conf)
   if err != nil {
      log.Fatal(err)
   }
   ch := make(chan struct{})
   // Register a state change listener so that we could observer the state change of the internal circuit breaker.
   circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{})

   _, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
      // Statistic time span=5s, recoveryTimeout=3s, maxErrorCount=50
      {
         Resource:                     "abc",
         Strategy:                     circuitbreaker.ErrorCount,
         RetryTimeoutMs:               3000, // 熔断后重试时间
         MinRequestAmount:             10,   // 忽略前多少个请求
         StatIntervalMs:               5000, // 几秒统计一次
         StatSlidingWindowBucketCount: 10,
         Threshold:                    50, // 达到多少个错误
      },
   })
   if err != nil {
      log.Fatal(err)
   }

   logging.Info("[CircuitBreaker ErrorCount] Sentinel Go circuit breaking demo is running. You may see the pass/block metric in the metric log.")
   go func() {
      for {
         e, b := sentinel.Entry("abc")
         if b != nil {
            // g1 blocked
            time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
         } else {
            if rand.Uint64()%20 > 9 {
               // Record current invocation as error.
               sentinel.TraceError(e, errors.New("biz error"))
            }
            // g1 passed
            time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond)
            e.Exit()
         }
      }
   }()
   go func() {
      for {
         e, b := sentinel.Entry("abc")
         if b != nil {
            // g2 blocked
            time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
         } else {
            // g2 passed
            time.Sleep(time.Duration(rand.Uint64()%80) * time.Millisecond)
            e.Exit()
         }
      }
   }()
   <-ch
}

func throttling() {
   for i := 0; i < 200; i++ {
      e, b := sentinel.Entry("some-test1", sentinel.WithTrafficType(base.Inbound))
      if b != nil {
         fmt.Println("限流了")
      } else {
         fmt.Println("访问成功")
         e.Exit()
      }
      time.Sleep(99 * time.Millisecond)
      // time.Sleep(100 * time.Millisecond)
   }
}

func warmup() {
   var total, totalBlock, totalSuc int

   for i := 0; i < 100; i++ {

      go func() {
         for {
            total++
            e, b := sentinel.Entry("some-test2", sentinel.WithTrafficType(base.Inbound))
            if b != nil {
               totalBlock++
               // fmt.Println("限流了")
            } else {
               totalSuc++
               // fmt.Println("访问成功")
               e.Exit()
            }
            time.Sleep(time.Duration(rand.Uint64()%20+10) * time.Millisecond)
         }
      }()

   }

   go func() {
      for {
         time.Sleep(1 * time.Second)
         fmt.Printf("请求数:%d,限流数:%d,通过数:%d\n\n", total, totalBlock, totalSuc)
         total, totalBlock, totalSuc = 0, 0, 0
      }
   }()
   time.Sleep(20 * time.Second)
}

func qps() {
   for i := 0; i < 12; i++ {

      e, b := sentinel.Entry("some-test", sentinel.WithTrafficType(base.Inbound))
      if b != nil {
         fmt.Println("限流了")
      } else {
         fmt.Println("访问成功")
         e.Exit()
      }
   }
}


在gin中实际投入使用:


初始化时调用此方法

package initialize

import (
   "fmt"
   sentinel "github.com/alibaba/sentinel-golang/api"
   "github.com/alibaba/sentinel-golang/core/flow"
)

func Sentinel() {
   err := sentinel.InitDefault()
   if err != nil {
      fmt.Println(err)
      return
   }

   _, err = flow.LoadRules([]*flow.Rule{
      {
         Resource:               "some-test",
         TokenCalculateStrategy: flow.Direct, // 直接
         ControlBehavior:        flow.Reject, // 直接拒绝
         Threshold:              1,           // 允许多少个
         StatIntervalInMs:       2000,        // 多长时间内
      },
      {
         Resource:               "some-test1",
         TokenCalculateStrategy: flow.Direct,     // 直接
         ControlBehavior:        flow.Throttling, // 匀速通过
         Threshold:              10,              // 允许多少个
         StatIntervalInMs:       1000,            // 多长时间内  当前设置代表1秒只允许10个,相当于每100毫秒放一个
      },
      {
         Resource:               "some-test2",
         TokenCalculateStrategy: flow.WarmUp, // 冷启动  预热方式  缓慢增长访问量
         ControlBehavior:        flow.Reject, // 直接拒绝
         Threshold:              1000,        // 允许多少个
         WarmUpPeriodSec:        10,          // 多长时间内达到顶峰
         WarmUpColdFactor:       3,           // 预热因子,默认3
      },
   })
   if err != nil {
      fmt.Println(err)
      return
   }
}

需要限流时这样用

e, b := sentinel.Entry("some-test")
if b != nil {
   zap.S().Info("请求被限流")
   c.JSON(http.StatusTooManyRequests, gin.H{
      "msg": "请求频繁,请稍后重试",
   })
   return
}
list, err := global.GoodsClient.GoodsList(context.WithValue(context.Background(), "ginContext", c), &req)  //这是远程请求
e.Exit()


最后编辑于:2022/03/02作者: 牛逼PHP

发表评论