一次又一次运行后,我有时会遇到这个问题。我知道这与计数器相关。当调用sync.waitgroup的done()方法的次数多于调用add()方法的次数时,它将抛出此错误。
如何解决这个问题?
我的代码创建了大小为 4 的批次,并对每个批次进行一些处理,但我在解决此恐慌时遇到了问题。
package main
import (
"fmt"
"sync"
)
func main() {
// create input channel
input := make(chan int)
// create wait group
var wg sync.waitgroup
// start batcher goroutine
wg.add(1)
go batcher(input, &wg)
// send input values to the batcher
for i := 1; i <= 10; i++ {
input <- i
}
// close input channel
close(input)
// wait for batcher goroutine to finish
wg.wait()
}
func batcher(input chan int, wg *sync.waitgroup) {
// create batch channel with buffer of size 4
batch := make(chan int, 4)
// create channel to synchronize worker goroutines
done := make(chan bool)
// create wait group for worker goroutines
var workerwg sync.waitgroup
// start worker goroutines
for i := 0; i < 4; i++ {
workerwg.add(1)
go worker(batch, &workerwg, done)
}
// read input values and send to batch
for value := range input {
batch <- value
if len(batch) == 4 {
// wait for worker goroutines to finish processing batch
workerwg.wait()
// send batch to worker goroutines
for i := 0; i < 4; i++ {
workerwg.add(1)
go sendbatch(batch, &workerwg, done)
}
}
}
// wait for worker goroutines to finish processing remaining batch
workerwg.wait()
// close done channel to notify that all batches have been processed
close(done)
wg.done()
}
func sendbatch(batch chan int, workerwg *sync.waitgroup, done chan bool) {
// process batch
for value := range batch {
fmt.println("processing value:", value)
}
// notify worker goroutines that batch has been processed
workerwg.done()
select {
case done <- true:
default:
// done channel has been closed
}
}
func worker(batch chan int, workerwg *sync.waitgroup, done chan bool) {
// process batches received from batch channel
for batch := range batch {
// process batch
fmt.println("processing batch:", batch)
workerwg.done()
}
// notify batcher goroutine that worker goroutine has finished
select {
case done <- true:
default:
// done channel has been closed
}
}
编写批处理程序的基本代码:
package main
import (
"fmt"
"sync"
)
func main() {
input := make(chan int)
output := make(chan []int)
var wg sync.waitgroup
wg.add(2)
// start the batcher goroutine
go func() {
batch := []int{}
for value := range input {
batch = append(batch, value)
if len(batch) == 4 {
output <- batch
batch = []int{}
}
}
if len(batch) > 0 {
output <- batch
}
close(output)
wg.done()
}()
// start the worker goroutine
go func() {
for batch := range output {
sum := 0
for _, value := range batch {
sum += value
}
fmt.printf("sum of batch %v: %dn", batch, sum)
}
wg.done()
}()
// send input values to the batcher
for _, v := range []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} {
input <- v
}
close(input)
// wait for both goroutines to finish
wg.wait()
}
Sum of batch [1 2 3 4]: 10
Sum of batch [5 6 7 8]: 26
Sum of batch [9 10]: 19
早期的设计有点复杂,我会尝试扩展这个基本设计。
正确答案
根据这段代码:
for i := 0; i < 4; i++ {
workerwg.add(1)
go worker(batch, &workerwg, done)
}
我认为 workerwg.done()
应该移到循环之外:
func worker(batch chan int, workerWg *sync.WaitGroup, done chan bool) {
+ defer workerWg.Done()
// process batches received from batch channel
for batch := range batch {
// process batch
fmt.Println("Processing batch:", batch)
- workerWg.Done()
}
// notify batcher goroutine that worker goroutine has finished
select {
case done <- true:
default:
// done channel has been closed
}
}
但是batch
在demo中并没有关闭。所以事实上,goroutine 将永远运行,直到程序结束。
不知道是否还有其他问题。设计太复杂了。复杂的代码难以理解并且容易出错。考虑重新设计它。