Skip to content

Latest commit

 

History

History
347 lines (281 loc) · 14.7 KB

ch9-07.md

File metadata and controls

347 lines (281 loc) · 14.7 KB

9.7. 示例: 併發的非阻塞緩存

本節中我們會做一個無阻塞的緩存,這種工具可以幫助我們來解決現實世界中併發程序出現但沒有現成的庫可以解決的問題。這個問題叫作緩存(memoizing)函數(譯註:Memoization的定義: memoization 一詞是Donald Michie 根據拉丁語memorandum杜撰的一個詞。相應的動詞、過去分詞、ing形式有memoiz、memoized、memoizing.),也就是説,我們需要緩存函數的返迴結果,這樣在對函數進行調用的時候,我們就隻需要一次計算,之後隻要返迴計算的結果就可以了。我們的解決方案會是併發安全且會避免對整個緩存加鎖而導致所有操作都去爭一個鎖的設計。

我們將使用下面的httpGetBody函數作爲我們需要緩存的函數的一個樣例。這個函數會去進行HTTP GET請求併且獲取http響應body。對這個函數的調用本身開銷是比較大的,所以我們盡量盡量避免在不必要的時候反複調用。

func httpGetBody(url string) (interface{}, error) {
	resp, err := http.Get(url)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()
	return ioutil.ReadAll(resp.Body)
}

最後一行稍微隱藏了一些細節。ReadAll會返迴兩個結果,一個[]byte數組和一個錯誤,不過這兩個對象可以被賦值給httpGetBody的返迴聲明里的interface{}和error類型,所以我們也就可以這樣返迴結果併且不需要額外的工作了。我們在httpGetBody中選用這種返迴類型是爲了使其可以與緩存匹配。

下面是我們要設計的cache的第一個“草稿”:

gopl.io/ch9/memo1
// Package memo provides a concurrency-unsafe
// memoization of a function of type Func.
package memo

// A Memo caches the results of calling a Func.
type Memo struct {
	f     Func
	cache map[string]result
}

// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)

type result struct {
	value interface{}
	err   error
}

func New(f Func) *Memo {
	return &Memo{f: f, cache: make(map[string]result)}
}

// NOTE: not concurrency-safe!
func (memo *Memo) Get(key string) (interface{}, error) {
	res, ok := memo.cache[key]
	if !ok {
		res.value, res.err = memo.f(key)
		memo.cache[key] = res
	}
	return res.value, res.err
}

Memo實例會記録需要緩存的函數f(類型爲Func),以及緩存內容(里面是一個string到result映射的map)。每一個result都是都是簡單的函數返迴的值對兒--一個值和一個錯誤值。繼續下去我們會展示一些Memo的變種,不過所有的例子都會遵循這些上面的這些方面。

下面是一個使用Memo的例子。對於流入的URL的每一個元素我們都會調用Get,併打印調用延時以及其返迴的數據大小的log:

m := memo.New(httpGetBody)
for url := range incomingURLs() {
	start := time.Now()
	value, err := m.Get(url)
	if err != nil {
		log.Print(err)
	}
	fmt.Printf("%s, %s, %d bytes\n",
	url, time.Since(start), len(value.([]byte)))
}

我們可以使用測試包(第11章的主題)來繫統地鑒定緩存的效果。從下面的測試輸出,我們可以看到URL流包含了一些重複的情況,盡管我們第一次對每一個URL的(*Memo).Get的調用都會花上幾百毫秒,但第二次就隻需要花1毫秒就可以返迴完整的數據了。

$ go test -v gopl.io/ch9/memo1
=== RUN   Test
https://golang.org, 175.026418ms, 7537 bytes
https://godoc.org, 172.686825ms, 6878 bytes
https://play.golang.org, 115.762377ms, 5767 bytes
http://gopl.io, 749.887242ms, 2856 bytes
https://golang.org, 721ns, 7537 bytes
https://godoc.org, 152ns, 6878 bytes
https://play.golang.org, 205ns, 5767 bytes
http://gopl.io, 326ns, 2856 bytes
--- PASS: Test (1.21s)
PASS
ok  gopl.io/ch9/memo1   1.257s

這個測試是順序地去做所有的調用的。

由於這種彼此獨立的HTTP請求可以很好地併發,我們可以把這個測試改成併發形式。可以使用sync.WaitGroup來等待所有的請求都完成之後再返迴。

m := memo.New(httpGetBody)
var n sync.WaitGroup
for url := range incomingURLs() {
	n.Add(1)
	go func(url string) {
		start := time.Now()
		value, err := m.Get(url)
		if err != nil {
			log.Print(err)
		}
		fmt.Printf("%s, %s, %d bytes\n",
		url, time.Since(start), len(value.([]byte)))
		n.Done()
	}(url)
}
n.Wait()

這次測試跑起來更快了,然而不幸的是貌似這個測試不是每次都能夠正常工作。我們註意到有一些意料之外的cache miss(緩存未命中),或者命中了緩存但卻返迴了錯誤的值,或者甚至會直接崩潰。

但更糟糕的是,有時候這個程序還是能正確的運行(譯:也就是最讓人崩潰的偶發bug),所以我們甚至可能都不會意識到這個程序有bug。。但是我們可以使用-race這個flag來運行程序,競爭檢測器(§9.6)會打印像下面這樣的報告:

$ go test -run=TestConcurrent -race -v gopl.io/ch9/memo1
=== RUN   TestConcurrent
...
WARNING: DATA RACE
Write by goroutine 36:
  runtime.mapassign1()
      ~/go/src/runtime/hashmap.go:411 +0x0
  gopl.io/ch9/memo1.(*Memo).Get()
      ~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
  ...
Previous write by goroutine 35:
  runtime.mapassign1()
      ~/go/src/runtime/hashmap.go:411 +0x0
  gopl.io/ch9/memo1.(*Memo).Get()
      ~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
...
Found 1 data race(s)
FAIL    gopl.io/ch9/memo1   2.393s

memo.go的32行出現了兩次,説明有兩個goroutine在沒有同步榦預的情況下更新了cache map。這表明Get不是併發安全的,存在數據競爭。

28  func (memo *Memo) Get(key string) (interface{}, error) {
29      res, ok := memo.cache(key)
30      if !ok {
31          res.value, res.err = memo.f(key)
32          memo.cache[key] = res
33      }
34      return res.value, res.err
35  }

最簡單的使cache併發安全的方式是使用基於監控的同步。隻要給Memo加上一個mutex,在Get的一開始獲取互斥鎖,return的時候釋放鎖,就可以讓cache的操作發生在臨界區內了:

gopl.io/ch9/memo2
type Memo struct {
	f     Func
	mu    sync.Mutex // guards cache
	cache map[string]result
}

// Get is concurrency-safe.
func (memo *Memo) Get(key string) (value interface{}, err error) {
	res, ok := memo.cache[key] if!ok{
	res.value, res.err = memo.f(key)
	memo.cache[key] = res
	memo.mu.Lock()
	res, ok := memo.cache[key]
	if !ok {
		res.value, res.err = memo.f(key)
		memo.cache[key] = res
	}
	memo.mu.Unlock()
	return res.value, res.err
}

測試依然併發進行,但這迴競爭檢査器“沉默”了。不幸的是對於Memo的這一點改變使我們完全喪失了併發的性能優點。每次對f的調用期間都會持有鎖,Get將本來可以併行運行的I/O操作串行化了。我們本章的目的是完成一個無鎖緩存,而不是現在這樣的將所有請求串行化的函數的緩存。

下一個Get的實現,調用Get的goroutine會兩次獲取鎖:査找階段獲取一次,如果査找沒有返迴任何內容,那麽進入更新階段會再次獲取。在這兩次獲取鎖的中間階段,其它goroutine可以隨意使用cache。

gopl.io/ch9/memo3
func (memo *Memo) Get(key string) (value interface{}, err error) {
	memo.mu.Lock()
	res, ok := memo.cache[key]
	memo.mu.Unlock()
	if !ok {
		res.value, res.err = memo.f(key)

		// Between the two critical sections, several goroutines
		// may race to compute f(key) and update the map.
		memo.mu.Lock()
		memo.cache[key] = res
		memo.mu.Unlock()
	}
	return res.value, res.err
}

這些脩改使性能再次得到了提陞,但有一些URL被獲取了兩次。這種情況在兩個以上的goroutine同一時刻調用Get來請求同樣的URL時會發生。多個goroutine一起査詢cache,發現沒有值,然後一起調用f這個慢不拉嘰的函數。在得到結果後,也都會去去更新map。其中一個獲得的結果會覆蓋掉另一個的結果。

理想情況下是應該避免掉多餘的工作的。而這種“避免”工作一般被稱爲duplicate suppression(重複抑製/避免)。下面版本的Memo每一個map元素都是指向一個條目的指針。每一個條目包含對函數f調用結果的內容緩存。與之前不同的是這次entry還包含了一個叫ready的channel。在條目的結果被設置之後,這個channel就會被關閉,以向其它goroutine廣播(§8.9)去讀取該條目內的結果是安全的了。

gopl.io/ch9/memo4
type entry struct {
	res   result
	ready chan struct{} // closed when res is ready
}

func New(f Func) *Memo {
	return &Memo{f: f, cache: make(map[string]*entry)}
}

type Memo struct {
	f     Func
	mu    sync.Mutex // guards cache
	cache map[string]*entry
}

func (memo *Memo) Get(key string) (value interface{}, err error) {
	memo.mu.Lock()
	e := memo.cache[key]
	if e == nil {
		// This is the first request for this key.
		// This goroutine becomes responsible for computing
		// the value and broadcasting the ready condition.
		e = &entry{ready: make(chan struct{})}
		memo.cache[key] = e
		memo.mu.Unlock()

		e.res.value, e.res.err = memo.f(key)

		close(e.ready) // broadcast ready condition
	} else {
		// This is a repeat request for this key.
		memo.mu.Unlock()

		<-e.ready // wait for ready condition
	}
	return e.res.value, e.res.err
}

現在Get函數包括下面這些步驟了:獲取互斥鎖來保護共享變量cache map,査詢map中是否存在指定條目,如果沒有找到那麽分配空間插入一個新條目,釋放互斥鎖。如果存在條目的話且其值沒有寫入完成(也就是有其它的goroutine在調用f這個慢函數)時,goroutine必須等待值ready之後才能讀到條目的結果。而想知道是否ready的話,可以直接從ready channel中讀取,由於這個讀取操作在channel關閉之前一直是阻塞。

如果沒有條目的話,需要向map中插入一個沒有ready的條目,當前正在調用的goroutine就需要負責調用慢函數、更新條目以及向其它所有goroutine廣播條目已經ready可讀的消息了。

條目中的e.res.value和e.res.err變量是在多個goroutine之間共享的。創建條目的goroutine同時也會設置條目的值,其它goroutine在收到"ready"的廣播消息之後立刻會去讀取條目的值。盡管會被多個goroutine同時訪問,但卻併不需要互斥鎖。ready channel的關閉一定會發生在其它goroutine接收到廣播事件之前,因此第一個goroutine對這些變量的寫操作是一定發生在這些讀操作之前的。不會發生數據競爭。

這樣併發、不重複、無阻塞的cache就完成了。

上面這樣Memo的實現使用了一個互斥量來保護多個goroutine調用Get時的共享map變量。不妨把這種設計和前面提到的把map變量限製在一個單獨的monitor goroutine的方案做一些對比,後者在調用Get時需要發消息。

Func、result和entry的聲明和之前保持一致:

// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)

// A result is the result of calling a Func.
type result struct {
	value interface{}
	err   error
}

type entry struct {
	res   result
	ready chan struct{} // closed when res is ready
}

然而Memo類型現在包含了一個叫做requests的channel,Get的調用方用這個channel來和monitor goroutine來通信。requests channel中的元素類型是request。Get的調用方會把這個結構中的兩組key都填充好,實際上用這兩個變量來對函數進行緩存的。另一個叫response的channel會被拿來發送響應結果。這個channel隻會傳迴一個單獨的值。

gopl.io/ch9/memo5
// A request is a message requesting that the Func be applied to key.
type request struct {
	key      string
	response chan<- result // the client wants a single result
}

type Memo struct{ requests chan request }
// New returns a memoization of f.  Clients must subsequently call Close.
func New(f Func) *Memo {
	memo := &Memo{requests: make(chan request)}
	go memo.server(f)
	return memo
}

func (memo *Memo) Get(key string) (interface{}, error) {
	response := make(chan result)
	memo.requests <- request{key, response}
	res := <-response
	return res.value, res.err
}

func (memo *Memo) Close() { close(memo.requests) }

上面的Get方法,會創建一個response channel,把它放進request結構中,然後發送給monitor goroutine,然後馬上又會接收到它。

cache變量被限製在了monitor goroutine (*Memo).server中,下面會看到。monitor會在循環中一直讀取請求,直到request channel被Close方法關閉。每一個請求都會去査詢cache,如果沒有找到條目的話,那麽就會創建/插入一個新的條目。

func (memo *Memo) server(f Func) {
	cache := make(map[string]*entry)
	for req := range memo.requests {
		e := cache[req.key]
		if e == nil {
			// This is the first request for this key.
			e = &entry{ready: make(chan struct{})}
			cache[req.key] = e
			go e.call(f, req.key) // call f(key)
		}
		go e.deliver(req.response)
	}
}

func (e *entry) call(f Func, key string) {
	// Evaluate the function.
	e.res.value, e.res.err = f(key)
	// Broadcast the ready condition.
	close(e.ready)
}

func (e *entry) deliver(response chan<- result) {
	// Wait for the ready condition.
	<-e.ready
	// Send the result to the client.
	response <- e.res
}

和基於互斥量的版本類似,第一個對某個key的請求需要負責去調用函數f併傳入這個key,將結果存在條目里,併關閉ready channel來廣播條目的ready消息。使用(*entry).call來完成上述工作。

緊接着對同一個key的請求會發現map中已經有了存在的條目,然後會等待結果變爲ready,併將結果從response發送給客戶端的goroutien。上述工作是用(*entry).deliver來完成的。對call和deliver方法的調用必須在自己的goroutine中進行以確保monitor goroutines不會因此而被阻塞住而沒法處理新的請求。

這個例子説明我們無論可以用上鎖,還是通信來建立併發程序都是可行的。

上面的兩種方案併不好説特定情境下哪種更好,不過了解他們還是有價值的。有時候從一種方式切換到另一種可以使你的代碼更爲簡潔。(譯註:不是説好的golang推崇通信併發麽)

練習 9.3: 擴展Func類型和(*Memo).Get方法,支持調用方提供一個可選的done channel,使其具備通過該channel來取消整個操作的能力(§8.9)。一個被取消了的Func的調用結果不應該被緩存。