Skip to content

Commit

Permalink
fix(row): add lock for ToMap to avoid concurrent map write
Browse files Browse the repository at this point in the history
It is possible to have multiple sinks call ToMap at once

Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying committed Nov 17, 2022
1 parent 15627df commit 5c5736d
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion internal/xsql/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ type Tuple struct {
Metadata Metadata // immutable

AffiliateRow

lock sync.Mutex // lock for the cachedMap, because it is possible to access by multiple sinks
cachedMap map[string]interface{} // clone of the row and cached for performance
}

Expand All @@ -230,6 +230,7 @@ var _ TupleRow = &Tuple{}
type JoinTuple struct {
Tuples []TupleRow // The content is immutable, but the slice may be add or removed
AffiliateRow
lock sync.Mutex
cachedMap map[string]interface{} // clone of the row and cached for performance of toMap
}

Expand All @@ -244,6 +245,7 @@ type GroupedTuples struct {
Content []TupleRow
*WindowRange
AffiliateRow
lock sync.Mutex
cachedMap map[string]interface{} // clone of the row and cached for performance of toMap
}

Expand Down Expand Up @@ -339,6 +341,8 @@ func (t *Tuple) Clone() TupleRow {

// ToMap should only use in sink.
func (t *Tuple) ToMap() map[string]interface{} {
t.lock.Lock()
defer t.lock.Unlock()
if t.AffiliateRow.IsEmpty() {
return t.Message
}
Expand Down Expand Up @@ -487,6 +491,8 @@ func (jt *JoinTuple) Clone() TupleRow {
}

func (jt *JoinTuple) ToMap() map[string]interface{} {
jt.lock.Lock()
defer jt.lock.Unlock()
if jt.cachedMap == nil { // clone the message
m := make(map[string]interface{})
for i := len(jt.Tuples) - 1; i >= 0; i-- {
Expand Down Expand Up @@ -544,6 +550,8 @@ func (s *GroupedTuples) All(_ string) (Message, bool) {
}

func (s *GroupedTuples) ToMap() map[string]interface{} {
s.lock.Lock()
defer s.lock.Unlock()
if s.cachedMap == nil {
m := make(map[string]interface{})
for k, v := range s.Content[0].ToMap() {
Expand Down

0 comments on commit 5c5736d

Please sign in to comment.