php小编西瓜在编程过程中,经常会遇到程序挂起的问题。程序挂起是指程序在执行过程中突然停止响应,并且没有任何错误提示。这种情况常常让人感到困惑,不知道出了什么问题。究竟为什么这个程序挂起?在本文中,我们将探讨一些常见的程序挂起原因,并提供解决方案来帮助解决这个问题。无论你是初学者还是有经验的开发者,相信这些内容都能对你有所帮助。
问题内容
我有在 go 中的通道之间进行通信的代码。它似乎完成了所需的操作,但最后挂起。我正在尝试诊断它为何挂起。
代码使用 httpbin.org
获取随机 uuid,然后将其发布,同时遵守我通过信号量通道和速率通道建立的并发和速率限制。
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"
)
type HttpBinGetRequest struct {
url string
}
type HttpBinGetResponse struct {
Uuid string `json:"uuid"`
StatusCode int
}
type HttpBinPostRequest struct {
url string
uuid string // Item to post to API
}
type HttpBinPostResponse struct {
Data string `json:"data"`
StatusCode int
}
func main() {
// Prepare GET requests for n requests
var requests []*HttpBinGetRequest
for i := 0; i < 10; i++ {
uri := "https://httpbin.org/uuid"
request := &HttpBinGetRequest{
url: uri,
}
requests = append(requests, request)
}
// Create semaphore and rate limit for the GET endpoint
getSemaphore := make(chan struct{}, 10)
getRate := make(chan struct{}, 10)
defer close(getRate)
defer close(getSemaphore)
for i := 0; i < cap(getRate); i++ {
getRate <- struct{}{}
}
go func() {
// ticker corresponding to 1/nth of a second
// where n = rate limit
// basically (1000 / rps) * time.Millisecond
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
_, ok := <-getRate
if !ok {
return
}
}
}()
// Send our GET requests to obtain a random UUID
respChan := make(chan HttpBinGetResponse)
var wg sync.WaitGroup
for _, request := range requests {
wg.Add(1)
// cnt := c
// Go func to make request and receive the response
go func(r *HttpBinGetRequest) {
defer wg.Done()
// Check the rate limiter and block if it is empty
getRate <- struct{}{}
// fmt.Printf("Request #%d at: %sn", cnt, time.Now().UTC().Format("2006-01-02T15:04:05.000Z07:00"))
resp, _ := get(r, getSemaphore)
fmt.Printf("%+vn", resp)
// Place our response into the channel
respChan <- *resp
// fmt.Printf("%+v,%sn", resp, time.Now().UTC().Format("2006-01-02T15:04:05.000Z07:00"))
}(request)
}
// Set up for POST requests 10/s
postSemaphore := make(chan struct{}, 10)
postRate := make(chan struct{}, 10)
defer close(postRate)
defer close(postSemaphore)
for i := 0; i < cap(postRate); i++ {
postRate <- struct{}{}
}
go func() {
// ticker corresponding to 1/nth of a second
// where n = rate limit
// basically (1000 / rps) * time.Millisecond
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
_, ok := <-postRate
if !ok {
return
}
}
}()
// Read responses as they become available
for ele := range respChan {
postReq := &HttpBinPostRequest{
url: "https://httpbin.org/post",
uuid: ele.Uuid,
}
go func(r *HttpBinPostRequest) {
postRate <- struct{}{}
postResp, err := post(r, postSemaphore)
if err != nil {
fmt.Println(err)
}
fmt.Printf("%+vn", postResp)
}(postReq)
}
wg.Wait()
close(respChan)
}
func get(hbgr *HttpBinGetRequest, sem chan struct{}) (*HttpBinGetResponse, error) {
// Add a token to the semaphore
sem <- struct{}{}
// Remove token when function is complete
defer func() { <-sem }()
httpResp := &HttpBinGetResponse{}
client := &http.Client{}
req, err := http.NewRequest("GET", hbgr.url, nil)
if err != nil {
fmt.Println("error making request")
return httpResp, err
}
req.Header = http.Header{
"accept": {"application/json"},
}
resp, err := client.Do(req)
if err != nil {
fmt.Println(err)
fmt.Println("error getting response")
return httpResp, err
}
// Read Response
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("error reading response body")
return httpResp, err
}
json.Unmarshal(body, &httpResp)
httpResp.StatusCode = resp.StatusCode
return httpResp, nil
}
// Method to post data to httpbin
func post(hbr *HttpBinPostRequest, sem chan struct{}) (*HttpBinPostResponse, error) {
// Add a token to the semaphore
sem <- struct{}{}
defer func() { <-sem }()
httpResp := &HttpBinPostResponse{}
client := &http.Client{}
req, err := http.NewRequest("POST", hbr.url, bytes.NewBuffer([]byte(hbr.uuid)))
if err != nil {
fmt.Println("error making request")
return httpResp, err
}
req.Header = http.Header{
"accept": {"application/json"},
}
resp, err := client.Do(req)
if err != nil {
fmt.Println("error getting response")
return httpResp, err
}
// Read Response
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("error reading response body")
return httpResp, err
}
json.Unmarshal(body, &httpResp)
httpResp.StatusCode = resp.StatusCode
return httpResp, nil
}
解决方法
您正在通过 range
语句从代码末尾的 respchan
读取内容。在通道关闭之前,此代码不会退出 - 这发生在该代码块之后。
for ele := range respchan {
// ...
}
wg.wait()
close(respchan)
所以程序永远不会退出 - 因为所有这些逻辑都在同一个 goroutine 中。
要修复并确保在程序退出之前处理所有记录,请将通道读取代码保留在主 goroutine 中,并将等待/关闭逻辑放入其自己的 goroutine 中:
go func() {
wg.wait() // wait for workers to finish ...
close(respchan) // ... now signal the main goroutine we're done
}()
for ele := range respchan {
// ...
}
编辑以等待最终 range
循环中的任何子 goroutine - 可能有一种更简洁的方法来仅使用一个等待组,但一个快速修复可能是:
var swg sync.WaitGroup
go func() {
wg.Wait() // wait for workers to finish ...
swg.Wait() // ... and sub-tasks
close(respChan) // ... now signal the main goroutine we're done
}()
for ele := range respChan {
// ...
swg.Add(1)
go func() {
defer swg.Done()
// ...
}()
}