diff --git a/pipeline/pipeline_whitebox_test.go b/pipeline/pipeline_whitebox_test.go index f5958b63c..5187d91e0 100644 --- a/pipeline/pipeline_whitebox_test.go +++ b/pipeline/pipeline_whitebox_test.go @@ -23,17 +23,25 @@ func TestPipeline_streamEvent(t *testing.T) { event.SourceID = SourceID(streamID) event.streamName = DefaultStreamName event.SeqID = 123456789 + key := streamKey{ + ID: streamID, + Name: DefaultStreamName, + } p.streamEvent(event) - assert.Equal(t, event, p.streamer.getStream(streamID, DefaultStreamName).first) + assert.Equal(t, event, p.streamer.getStream(key).first) p.UseSpread() p.streamEvent(event) expectedStreamID := StreamID(event.SeqID % uint64(procs)) - assert.Equal(t, event, p.streamer.getStream(expectedStreamID, DefaultStreamName).first) + key = streamKey{ + ID: expectedStreamID, + Name: DefaultStreamName, + } + assert.Equal(t, event, p.streamer.getStream(key).first) } // Can't use fake plugin here dye cycle import diff --git a/pipeline/streamer.go b/pipeline/streamer.go index 22c3c4a28..b9c4dc603 100644 --- a/pipeline/streamer.go +++ b/pipeline/streamer.go @@ -2,6 +2,7 @@ package pipeline import ( "fmt" + "strings" "sync" "time" @@ -11,34 +12,34 @@ import ( type StreamID uint64 +type streamKey struct { + ID StreamID + Name StreamName +} + type streamer struct { - streams map[StreamID]map[StreamName]*stream - mu *sync.RWMutex + streams map[streamKey]*stream + mu sync.RWMutex shouldStop atomic.Bool charged []*stream - chargedMu *sync.Mutex + chargedMu sync.Mutex chargedCond *sync.Cond blocked []*stream - blockedMu *sync.Mutex + blockedMu sync.Mutex eventTimeout time.Duration } func newStreamer(eventTimeout time.Duration) *streamer { streamer := &streamer{ - streams: make(map[StreamID]map[StreamName]*stream), - mu: &sync.RWMutex{}, - charged: make([]*stream, 0), - - chargedMu: &sync.Mutex{}, - blockedMu: &sync.Mutex{}, - + streams: make(map[streamKey]*stream), + charged: make([]*stream, 0), eventTimeout: eventTimeout, } - streamer.chargedCond = sync.NewCond(streamer.chargedMu) + streamer.chargedCond = sync.NewCond(&streamer.chargedMu) return streamer } @@ -51,22 +52,24 @@ func (s *streamer) stop() { s.shouldStop.Store(true) s.mu.Lock() - for _, source := range s.streams { - for _, stream := range source { - stream.put(unlockEvent(stream)) - } + for _, stream := range s.streams { + stream.put(unlockEvent(stream)) } s.mu.Unlock() } func (s *streamer) putEvent(streamID StreamID, streamName StreamName, event *Event) uint64 { - return s.getStream(streamID, streamName).put(event) + key := streamKey{ + ID: streamID, + Name: streamName, + } + return s.getStream(key).put(event) } -func (s *streamer) getStream(streamID StreamID, streamName StreamName) *stream { +func (s *streamer) getStream(key streamKey) *stream { // fast path, stream has been already created s.mu.RLock() - st, has := s.streams[streamID][streamName] + st, has := s.streams[key] s.mu.RUnlock() if has { return st @@ -75,20 +78,15 @@ func (s *streamer) getStream(streamID StreamID, streamName StreamName) *stream { // slow path, create new stream s.mu.Lock() defer s.mu.Unlock() - st, has = s.streams[streamID][streamName] + st, has = s.streams[key] if has { return st } - _, has = s.streams[streamID] - if !has { - s.streams[streamID] = make(map[StreamName]*stream) - } - // copy streamName because it's unsafe []byte instead of regular string - streamNameCopy := StreamName([]byte(streamName)) - st = newStream(streamNameCopy, streamID, s) - s.streams[streamID][streamNameCopy] = st + key.Name = StreamName(strings.Clone(string(key.Name))) + st = newStream(key.Name, key.ID, s) + s.streams[key] = st return st } @@ -170,18 +168,16 @@ func (s *streamer) dump() string { out := logger.Cond(len(s.streams) == 0, logger.Header("no streams"), func() string { o := logger.Header("streams") - for _, s := range s.streams { - for _, stream := range s { - state := "| UNATTACHED |" - if stream.isAttached { - state = "| ATTACHED |" - } - if stream.isDetaching { - state = "| DETACHING |" - } - - o += fmt.Sprintf("%d(%s) state=%s, away event id=%d, commit event id=%d, len=%d\n", stream.streamID, stream.name, state, stream.awaySeq, stream.commitSeq.Load(), stream.len) + for _, stream := range s.streams { + state := "| UNATTACHED |" + if stream.isAttached { + state = "| ATTACHED |" } + if stream.isDetaching { + state = "| DETACHING |" + } + + o += fmt.Sprintf("%d(%s) state=%s, away event id=%d, commit event id=%d, len=%d\n", stream.streamID, stream.name, state, stream.awaySeq, stream.commitSeq.Load(), stream.len) } return o