From 5c5736df926618a949f9665ce121d5972ed5dc3b Mon Sep 17 00:00:00 2001 From: Jiyong Huang Date: Thu, 17 Nov 2022 14:55:55 +0800 Subject: [PATCH] fix(row): add lock for ToMap to avoid concurrent map write It is possible to have multiple sinks call ToMap at once Signed-off-by: Jiyong Huang --- internal/xsql/row.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/internal/xsql/row.go b/internal/xsql/row.go index 1e687bcaa5..5d5fb8279e 100644 --- a/internal/xsql/row.go +++ b/internal/xsql/row.go @@ -220,7 +220,7 @@ type Tuple struct { Metadata Metadata // immutable AffiliateRow - + lock sync.Mutex // lock for the cachedMap, because it is possible to access by multiple sinks cachedMap map[string]interface{} // clone of the row and cached for performance } @@ -230,6 +230,7 @@ var _ TupleRow = &Tuple{} type JoinTuple struct { Tuples []TupleRow // The content is immutable, but the slice may be add or removed AffiliateRow + lock sync.Mutex cachedMap map[string]interface{} // clone of the row and cached for performance of toMap } @@ -244,6 +245,7 @@ type GroupedTuples struct { Content []TupleRow *WindowRange AffiliateRow + lock sync.Mutex cachedMap map[string]interface{} // clone of the row and cached for performance of toMap } @@ -339,6 +341,8 @@ func (t *Tuple) Clone() TupleRow { // ToMap should only use in sink. func (t *Tuple) ToMap() map[string]interface{} { + t.lock.Lock() + defer t.lock.Unlock() if t.AffiliateRow.IsEmpty() { return t.Message } @@ -487,6 +491,8 @@ func (jt *JoinTuple) Clone() TupleRow { } func (jt *JoinTuple) ToMap() map[string]interface{} { + jt.lock.Lock() + defer jt.lock.Unlock() if jt.cachedMap == nil { // clone the message m := make(map[string]interface{}) for i := len(jt.Tuples) - 1; i >= 0; i-- { @@ -544,6 +550,8 @@ func (s *GroupedTuples) All(_ string) (Message, bool) { } func (s *GroupedTuples) ToMap() map[string]interface{} { + s.lock.Lock() + defer s.lock.Unlock() if s.cachedMap == nil { m := make(map[string]interface{}) for k, v := range s.Content[0].ToMap() {