在 go 中实现分布式并发可以提高程序性能和可扩展性,主要通过利用 goroutine 和通道实现。goroutine 是轻量级线程,可以并发执行任务,而通道用于 goroutine 之间安全地交换数据。通过创建 goroutine 池和分配任务的方式,可以在多台机器上并行处理大量数据,从而充分利用 cpu 资源和提高效率。
使用 Go 实现分布式并发
分布式并发是一种编程技术,它使程序能够跨越多个机器同时执行任务,从而提高性能和可扩展性。在 Go 中,可以使用 goroutine 和通道等内置特性来实现分布式并发。
goroutine
立即学习“go语言免费学习笔记(深入)”;
goroutine 是 Go 中的轻量级线程,它与传统线程不同,不需要堆栈,并且开销更小。goroutine 可以并发运行,从而最大限度地利用 CPU 资源。以下代码示例演示了如何使用 goroutine 实现并发任务:
package main
import (
"fmt"
"runtime"
)
func main() {
// 创建一个通道,用于 goroutine 之间通信
ch := make(chan int)
// 创建并启动 goroutine
go func() {
for i := 0; i < 100; i++ {
ch <- i
}
close(ch) // 关闭通道,表示没有更多数据发送
}()
// 从通道中读取数据并打印
for v := range ch {
fmt.Println(v)
}
fmt.Println("Number of goroutines:", runtime.NumGoroutine())
}
通道
通道是一种用于 goroutine 之间通信的机制。它可以安全地传递数据,同时防止数据竞争。以下代码示例演示了如何使用通道进行 goroutine 之间的数据交换:
package main
import (
"fmt"
"time"
)
func main() {
// 创建一个通道,用于传递数据
ch := make(chan int)
// 创建并启动 goroutine,将数据发送到通道中
go func() {
for i := 0; i < 10; i++ {
ch <- i
time.Sleep(100 * time.Millisecond) // 模拟延迟
}
close(ch) // 关闭通道,表示没有更多数据发送
}()
// 从通道中读取数据并打印
for v := range ch {
fmt.Println(v)
}
}
实战案例
一个常见的分布式并发场景是利用多个机器来并行处理大量数据。以下代码示例演示了如何在 Go 中使用分布式并发来处理一个大型文件:
package main
import (
"bufio"
"context"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"strconv"
"sync"
)
// 用于处理请求的 goroutine 数量
var numOfWorkers int = 5
// 用于同步 goroutine
var wg sync.WaitGroup
func main() {
// 从命令行读取文件名
if len(os.Args) != 2 {
log.Fatal("Usage: go run worker_pool.go <filename>")
}
filename := os.Args[1]
// 为每个请求创建一个 goroutine 池
ctx := context.Background()
pool := make(chan context.Context, numOfWorkers)
for i := 0; i < numOfWorkers; i++ {
go worker(ctx, pool, filename)
}
// 读取文件并分配每行到 goroutine 池中
file, err := ioutil.ReadFile(filename)
if err != nil {
log.Fatal(err)
}
scanner := bufio.NewScanner(bytes.NewReader(file))
count := 0
for scanner.Scan() {
pool <- ctx
wg.Add(1)
go handleLine(scanner.Text(), &wg)
count++
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
wg.Wait()
fmt.Println("Total lines processed:", count)
}
// 处理一行数据的 goroutine
func handleLine(line string, wg *sync.WaitGroup) {
defer wg.Done()
defer func() { <-pool }()
// 处理行的数据
num, err := strconv.Atoi(line)
if err != nil {
log.Fatal(err)
}
fmt.Println(num * num)
}
// 分配请求的 goroutine
func worker(ctx context.Context, pool chan context.Context, filename string) {
for {
// 从池中获取请求
select {
case <-pool:
case <-ctx.Done():
return
}
// 打开文件并读取下一行
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
var line string
_, err = file.ReadString('n')
if err != nil && err != io.EOF {
log.Fatal(err)
}
if err == io.EOF {
file.Close()
continue
}
line = strings.TrimRight(line, "n")
// 处理行的数据
num, err := strconv.Atoi(line)
if err != nil {
log.Fatal(err)
}
fmt.Println(num * num)
file.Close()
}
}