限流和熔断是高并发系统中防止系统崩溃的至关重要技术。限流:控制传入系统的请求速率,防止超载。实战代码:使用 go-limiter 库实现限流中间件,控制每秒请求数量。熔断:当系统出现故障时断开与故障服务之间的连接,防止故障扩散。实战代码:使用自定义 circuitbreaker 结构体实现熔断机制,根据请求失败次数和时间参数自动切换熔断状态。
Golang 框架限流与熔断的最佳实践
简介
在高并发系统中,限流和熔断是至关重要的技术,可以防止系统因流量激增而崩溃。本文将探讨 Golang 框架中限流和熔断的最佳实践,并提供实战案例。
限流
定义
限流是指控制传入系统的请求速率,以防止其超载。
实战案例
import (
"context"
"net/http"
"sync/atomic"
"time"
"github.com/goccy/go-limiter"
)
// LimitedHandler 是一个限流中间件。
func LimitedHandler(limit float64) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
limiter := limiter.NewFixed(limit, time.Second)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if rate, _, ok := limiter.TakeAvailable(context.Background(), 1); !ok {
http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
return
}
atomic.AddInt64(&inFlightRequests, int64(rate))
defer atomic.AddInt64(&inFlightRequests, -int64(rate))
next.ServeHTTP(w, r)
})
}
}
// inFlightRequests 用于跟踪正在处理的请求数量。
var inFlightRequests int64
熔断
定义
熔断是指当系统出现故障时,自动断开与故障服务之间的连接,以防止故障扩散。
立即学习“go语言免费学习笔记(深入)”;
实战案例
import (
"context"
"errors"
"time"
"github.com/robfig/cron/v3"
)
// CircuitBreaker 是一个熔断器。
type CircuitBreaker struct {
state int64
closedAt time.Time
failedCount uint64
}
const (
closedState = 0
openState = 1
halfOpenState = 2
)
// closedTimeout 是熔断器处于闭合状态的最大时长。
const closedTimeout = 30 * time.Second
// halfOpenTimeout 是熔断器处于半开状态的最大时长。
const halfOpenTimeout = 30 * time.Second
// threshold 是触发熔断的失败请求次数阈值。
const threshold = 5
// NewCircuitBreaker 创建一个新的熔断器。
func NewCircuitBreaker() *CircuitBreaker {
return &CircuitBreaker{
state: closedState,
closedAt: time.Now(),
failedCount: 0,
}
}
// AllowCheck 允许或拒绝请求。
func (cb *CircuitBreaker) AllowCheck() error {
switch cb.state {
case closedState:
if time.Since(cb.closedAt) > closedTimeout {
cb.setState(halfOpenState)
cb.failedCount = 0
}
return nil
case openState:
return errors.New("Circuit breaker open")
case halfOpenState:
if time.Since(cb.closedAt) > halfOpenTimeout {
cb.setState(openState)
cb.failedCount = 0
}
return nil
}
return errors.New("Invalid circuit breaker state")
}
// TryCall 尝试调用给定的函数,并处理熔断。
func (cb *CircuitBreaker) TryCall(ctx context.Context, call func() error) error {
if err := cb.AllowCheck(); err != nil {
return err
}
err := call()
if err != nil {
cb.failedCount++
if cb.failedCount >= threshold {
cb.setState(openState)
cb.closedAt = time.Now()
}
} else {
cb.setState(closedState)
}
return err
}
// setState 更新熔断器的状态。
func (cb *CircuitBreaker) setState(state int64) {
oldState := atomic.LoadInt64(&cb.state)
if oldState != state && atomic.CompareAndSwapInt64(&cb.state, oldState, state) {
cb.closedAt = time.Now()
switch state {
case closedState:
cron.New().Schedule(time.Minute.String(), func() { cb.setState(openState) })
case openState:
cron.New().Schedule(time.Second*10.String(), func() { cb.setState(halfOpenState) })
}
}
}