golang 框架通过限流和熔断提升并发能力,具体实现如下:限流:限制每秒请求数量,防止系统超负荷,实现方式为使用令牌桶算法;熔断:当系统故障时暂停处理请求,防止故障传播,实现方式为使用断路器模式,设置最大失败次数、打开时间和复位时间。
Golang 框架如何通过限流和熔断提高系统的并发能力
限流
限流是一种用来限制系统每秒钟处理请求数量的技术。它可以防止系统超负荷,从而导致性能下降或崩溃。在 Golang 中,可以使用以下代码段来实现限流:
import (
"context"
"errors"
"time"
)
// Limit is a struct that implements the Token Bucket algorithm
type Limit struct {
rate time.Duration
burst int
queue chan struct{}
ctx context.Context
cancel context.CancelFunc
}
// NewLimit creates a new Limit
func NewLimit(rate time.Duration, burst int) *Limit {
ctx, cancel := context.WithCancel(context.Background())
return &Limit{
rate: rate,
burst: burst,
queue: make(chan struct{}, burst),
ctx: ctx,
cancel: cancel,
}
}
// Wait waits for a token to become available and then consumes it
func (l *Limit) Wait() error {
select {
case <-l.ctx.Done():
return errors.New("context canceled")
case <-l.queue:
return nil
}
}
// Acquire acquires a token and increments the counter
func (l *Limit) Acquire() error {
if err := l.Wait(); err != nil {
return err
}
go func() {
time.Sleep(l.rate)
select {
case l.queue <- struct{}{}:
case <-l.ctx.Done():
}
}()
return nil
}
熔断
立即学习“go语言免费学习笔记(深入)”;
熔断是一种当系统出现故障时暂时停止处理请求的技术。它可以防止故障传播并允许系统自我恢复。在 Golang 中,可以使用以下代码段来实现熔断:
import (
"context"
"sync/atomic"
"time"
)
// CircuitBreaker is a struct that implements the Circuit Breaker pattern
type CircuitBreaker struct {
maxFailures int
openTime time.Duration
resetTime time.Duration
count int64
state string
openAt time.Time
resetAt time.Time
closedAt time.Time
lastFailedAt time.Time
totalFailures int64
totalSuccesses int64
}
// NewCircuitBreaker creates a new CircuitBreaker
func NewCircuitBreaker(maxFailures int, openTime time.Duration, resetTime time.Duration) *CircuitBreaker {
return &CircuitBreaker{
maxFailures: maxFailures,
openTime: openTime,
resetTime: resetTime,
state: "CLOSED",
closedAt: time.Now(),
}
}
// Execute executes a function and handles circuit breaker logic
func (cb *CircuitBreaker) Execute(f func()) {
switch cb.state {
case "OPEN":
if time.Since(cb.openAt) > cb.openTime {
cb.setState("HALF_OPEN")
cb.openAt = time.Now()
}
return
case "HALF_OPEN":
if time.Since(cb.resetAt) > cb.resetTime {
cb.setState("CLOSED")
cb.resetAt = time.Now()
}
}
start := time.Now()
f()
end := time.Since(start)
if end >= cb.openTime {
atomic.AddInt64(&cb.count, 1)
} else {
atomic.AddInt64(&cb.totalSuccesses, 1)
}
if atomic.LoadInt64(&cb.count) >= int64(maxFailures) {
cb.setState("OPEN")
cb.count = 0
}
}
// setState changes the state of the circuit breaker
func (cb *CircuitBreaker) setState(state string) {
switch state {
case "OPEN":
cb.state = "OPEN"
cb.openAt = time.Now()
case "HALF_OPEN":
cb.state = "HALF_OPEN"
cb.resetAt = time.Now()
case "CLOSED":
cb.state = "CLOSED"
cb.closedAt = time.Now()
}
}
实战案例
以下是一个使用限流和熔断来提高 Golang 应用并发能力的实战案例:
import (
"context"
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/google/go-github/v42/github"
)
// Limit the number of requests to the GitHub API
var rateLimiter = NewLimit(time.Second, 10)
// Circuit breaker to prevent overloading the GitHub API
var cb = NewCircuitBreaker(3, time.Minute, time.Minute*5)
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if err := acquireToken(); err != nil {
// Handle error
return
}
cb.Execute(func() {
// Make request to GitHub API
client := github.NewClient(nil)
ctx := context.Background()
repos, _, err := client.Repositories.List(ctx, "", nil)
if err != nil {
// Handle error
return
}
for _, repo := range repos {
fmt.Fprintf(w, "%sn", repo.GetFullName())
}
})
})
log.Fatal(http.ListenAndServe(":8080", nil))
}
// acquireToken blocks until a token is available and then consumes it
func acquireToken() error {
if err := rateLimiter.Acquire(); err != nil {
return err
}
return nil
}
在这个示例中,rateLimiter 用于限制对 GitHub API 的请求速率,而 cb 则用于防止因故障导致请求过载。这使得应用程序能够以受控和健壮的方式处理来自用户的请求,从而提高了系统的整体并发能力。