可以先看看上一篇沒Thread Pool,Limiter也好?真的嗎? 來個前情提要。

Worker Group其實基本概念跟Thread Pool很像,就是:

  • 一堆Worker在跑,等著接收參數並且輸出結果
  • 單一Worker擁有一個用來輸入參數的channel,以及一個用以輸出結果的channel
  • Worker可被context的Done()終結,或者:
  • 可被WaitGroup終結

不過由於go沒有generic(目前版本是1.16,在1.17/1.18會支援),所以這兩個channel都沒辦法寫得很漂亮,這也是大多數Worker Pool要不就是得用chan interface{}來寫,不然就是寫不出來。不過Worker Group這種東西其實夠輕量,輕量到其實自己打造都是可以的,這邊就介紹一下怎麼自己打造一個Worker Pool,以及揭秘為什麼很多CLI/UI都需要有一個自己的UI Thread(go沒thread,所以稱為UI routine吧)。

所以我們可以假設一個流程:

  1. 產生20個worker thread,用一個channel把參數傳進去,用一個channel接出參數
  2. 把參數塞進channel
  3. 用WaitGroup來控制流程

那看起來的code應該大致上會像是這樣

//假設資料從csv讀入 
data := GetFromCSV("laz_prd.csv", 1)
//輸入參數用的channel
c := make(chan string, len(data)) 
//存放結果用的channel
r := make(chan ProbeResult, len(data))
//用以控制流程的wait group
var wg sync.WaitGroup
wg.Add(len(data))

for i := 0; i < WORKER_SIZE; i++ {
	//傳入該輸入用channel 輸出用channel以及waitgroup給worker
	go worker(c, r, &wg) 
}
//把參數灌進channel讓worker接收
for _, pid := range data {
	c <- pid
}
wg.Wait()
//印出結果
for {
	select {
		case result := <- r:
			PrintResult(r)
		default: //這邊的default是必要的,不然channel r讀空了就會卡在select
			break
	}
}

我們可以來設計一下worker,應該大致上會長這樣:

func worker(p chan string, r chan string, wg *sync.WaitGroup) {
	for {
		result := &Result{}
		select {
			case input := <-p:
				r <- CreateResultFromInput(input)
				wg.Done()
		}
	}
}

其實這就是一個最基本的Worker Group了,有WORKER_SIZE 個Worker在處理資料。不過,這顯然有一點問題,就是:

  1. 如果以一個Application來講這樣是ok的,程式跑完就算還有殘留的go routine,問題也不大,因為會隨著程式結束而消滅。但是如果是一個Service的話這樣顯然會有routine leak的問題
  2. 這顯然是沒辦法邊跑邊印,一定得把所有東西都處理完才會一口氣印出來。
  3. (Optional) 沒sleep,就如同上一篇所提到的問題,會被當DDoS

第一個的話其實用context可以解決這問題,第二個的話解法很多,有一些人會選在wg.Wait()前面再開一個for + select去接,結束條件就設「接受到 len(data) 個結果」就好,不過比較通用的解法是額外開一個go routine專門處理UI問題。用context的話,整個code大概長這樣,就不另外再貼了

data := GetFromCSV("laz_prd.csv", 1)
c := make(chan string, len(data)) 
r := make(chan ProbeResult, len(data))
var wg sync.WaitGroup
wg.Add(len(data))
//增加一個context with cancel
//第二個參數cancel是一個func,執行他便可以讓該context傳出ctx.Done()的信號,告訴Worker說你該死了
ctx, cancel := context.WithCancel(context.Background())

for i := 0; i < WORKER_SIZE; i++ {
	//傳入該context給worker
	go worker(ctx, c, r, &wg) 
}

for _, pid := range data {
	c <- pid
}
wg.Wait()
//傳入ctx.Done()給每一個worker
cancel()

然後Worker也要實作能對ctx.Done()反應的自殺機制

//簽名要額外能收context
func worker(ctx context.Context, p chan string, r chan string, wg *sync.WaitGroup) {
	for {
		result := &Result{}
		select {
			//自殺指令,收到就會跳出回圈結束這個worker
			case ctx<-Done():
				break

			case input := <-p:
				r <- CreateResultFromInput(input)
				time.Sleep(200 * time.Millisecond) //給個sleep避免request太兇
				wg.Done()
		}
	}
}

這樣,worker就能正確被回收了。

One Comment

  1. Pingback: 沒Thread Pool,Limiter也好?真的嗎? - Fox Nest

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。