diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index 6390cb99..de79f3a5 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -375,9 +375,6 @@ func (c *EventCollector) updateResolvedTsMetric() { if minResolvedTs > 0 { phyResolvedTs := oracle.ExtractPhysical(minResolvedTs) lagMs := float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3 - log.Info("EventCollector resolved ts lag", - zap.Uint64("resolvedTs", minResolvedTs), - zap.Float64("lagMs", lagMs)) metrics.EventCollectorResolvedTsLagGauge.Set(lagMs) } } diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index ae214e1b..1e19638a 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -1,3 +1,16 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package eventstore import ( @@ -12,6 +25,8 @@ import ( "github.com/pingcap/ticdc/logservice/logservicepb" + "github.com/pingcap/ticdc/utils/dynstream" + "github.com/cockroachdb/pebble" "github.com/klauspost/compress/zstd" "github.com/pingcap/log" @@ -73,32 +88,8 @@ type EventIterator interface { Close() (eventCnt int64, err error) } -type eventType int - -// TODO: better name -const ( - eventTypeBatchSignal eventType = iota - eventTypeNormal -) - -type eventWithState struct { - eventType - raw *common.RawKVEntry - subID logpuller.SubscriptionID - tableID int64 - uniqueID uint64 -} - -var uniqueIDGen uint64 = 0 - -func genUniqueID() uint64 { - return atomic.AddUint64(&uniqueIDGen, 1) -} - type dispatcherStat struct { dispatcherID common.DispatcherID - // called when new resolved ts event come - notifier ResolvedTsNotifier tableSpan *heartbeatpb.TableSpan // the max ts of events which is not needed by this dispatcher @@ -108,33 +99,49 @@ type dispatcherStat struct { } type subscriptionStat struct { + subID logpuller.SubscriptionID + + tableID int64 + // dispatchers depend on this subscription - ids map[common.DispatcherID]bool - // events of this subscription will be send to the channel identified by chIndex - chIndex int + dispatchers struct { + sync.RWMutex + notifiers map[common.DispatcherID]ResolvedTsNotifier + } + + dbIndex int + + eventCh chan kvEvents // data <= checkpointTs can be deleted - checkpointTs uint64 + checkpointTs atomic.Uint64 // the resolveTs persisted in the store - resolvedTs uint64 + resolvedTs atomic.Uint64 // the max commit ts of dml event in the store - maxEventCommitTs uint64 - // an id encode in the event key of this dispatcher - // used to separate data between dispatchers with overlapping spans - uniqueKeyID uint64 + maxEventCommitTs atomic.Uint64 +} + +type eventWithSubID struct { + subID logpuller.SubscriptionID + raw *common.RawKVEntry +} + +type kvEvents struct { + kvs []*common.RawKVEntry + subID logpuller.SubscriptionID + tableID int64 } type eventStore struct { pdClock pdutil.Clock - dbs []*pebble.DB - eventChs []chan eventWithState + dbs []*pebble.DB + chs []chan kvEvents + writeTaskPools []*writeTaskPool puller *logpuller.LogPuller gcManager *gcManager - closed atomic.Bool - messageCenter messaging.MessageCenter coordinatorInfo struct { @@ -142,7 +149,7 @@ type eventStore struct { id node.ID } - wgBatchSignal sync.WaitGroup + ds dynstream.DynamicStream[int, logpuller.SubscriptionID, eventWithSubID, *subscriptionStat, *eventsHandler] // To manage background goroutines. wg sync.WaitGroup @@ -160,8 +167,19 @@ type eventStore struct { decoder *zstd.Decoder } -const dataDir = "event_store" -const dbCount = 32 +const ( + dataDir = "event_store" + dbCount = 8 + writeWorkerNumPerDB = 100 + streamCount = 4 +) + +type pathHasher struct { +} + +func (h pathHasher) HashPath(subID logpuller.SubscriptionID) uint64 { + return uint64(subID) +} func New( ctx context.Context, @@ -204,10 +222,19 @@ func New( log.Panic("Failed to create zstd decoder", zap.Error(err)) } + option := dynstream.NewOption() + option.InputBufferSize = 102400 + ds := dynstream.NewParallelDynamicStream(streamCount, pathHasher{}, &eventsHandler{}, option) + ds.Start() + store := &eventStore{ - pdClock: pdClock, - dbs: make([]*pebble.DB, 0, dbCount), - eventChs: make([]chan eventWithState, 0, dbCount), + pdClock: pdClock, + + dbs: make([]*pebble.DB, 0, dbCount), + chs: make([]chan kvEvents, 0, dbCount), + writeTaskPools: make([]*writeTaskPool, 0, dbCount), + + ds: ds, gcManager: newGCManager(), encoder: encoder, @@ -235,32 +262,21 @@ func New( log.Fatal("open db failed", zap.Error(err)) } store.dbs = append(store.dbs, db) - store.eventChs = append(store.eventChs, make(chan eventWithState, 8192)) + store.chs = append(store.chs, make(chan kvEvents, 100000)) + store.writeTaskPools = append(store.writeTaskPools, newWriteTaskPool(store, store.dbs[i], store.chs[i], writeWorkerNumPerDB)) } store.dispatcherMeta.dispatcherStats = make(map[common.DispatcherID]*dispatcherStat) store.dispatcherMeta.subscriptionStats = make(map[logpuller.SubscriptionID]*subscriptionStat) store.dispatcherMeta.tableToDispatchers = make(map[int64]map[common.DispatcherID]bool) - // start background goroutines to handle events from puller - for i := range store.dbs { - // send batch signal periodically - eventCh := store.eventChs[i] - store.wgBatchSignal.Add(1) - go func() { - defer store.wgBatchSignal.Done() - store.sendBatchSignalPeriodically(ctx, eventCh) - }() - - db := store.dbs[i] - store.wg.Add(1) - go func(index int) { - defer store.wg.Done() - store.batchAndWriteEvents(ctx, db, eventCh) - }(i) - } - - consume := func(ctx context.Context, raw *common.RawKVEntry, subID logpuller.SubscriptionID, extraData interface{}) error { - store.consumeEvent(subID, raw, extraData) + consume := func(ctx context.Context, raw *common.RawKVEntry, subID logpuller.SubscriptionID) error { + if raw == nil { + log.Panic("should not happen: meet nil event") + } + store.ds.In(subID) <- eventWithSubID{ + subID: subID, + raw: raw, + } return nil } puller := logpuller.NewLogPuller(client, pdClock, consume) @@ -274,67 +290,46 @@ func New( return store } -func (e *eventStore) handleMessage(_ context.Context, targetMessage *messaging.TargetMessage) error { - for _, msg := range targetMessage.Message { - switch msg.(type) { - case *common.LogCoordinatorBroadcastRequest: - e.coordinatorInfo.Lock() - e.coordinatorInfo.id = targetMessage.From - e.coordinatorInfo.Unlock() - default: - log.Panic("invalid message type", zap.Any("msg", msg)) - } +type writeTaskPool struct { + store *eventStore + db *pebble.DB + dataCh chan kvEvents + workerNum int +} + +func newWriteTaskPool(store *eventStore, db *pebble.DB, ch chan kvEvents, workerNum int) *writeTaskPool { + return &writeTaskPool{ + store: store, + db: db, + dataCh: ch, + workerNum: workerNum, } - return nil } -func (e *eventStore) uploadStatePeriodically(ctx context.Context) error { - tick := time.NewTicker(30 * time.Second) - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-tick.C: - e.dispatcherMeta.RLock() - state := &logservicepb.EventStoreState{ - Subscriptions: make(map[int64]*logservicepb.SubscriptionStates), - } - for tableID, dispatcherIDs := range e.dispatcherMeta.tableToDispatchers { - subStates := make([]*logservicepb.SubscriptionState, 0, len(dispatcherIDs)) - subIDs := make(map[logpuller.SubscriptionID]bool) - for dispatcherID := range dispatcherIDs { - dispatcherStat := e.dispatcherMeta.dispatcherStats[dispatcherID] - subID := dispatcherStat.subID - subStat := e.dispatcherMeta.subscriptionStats[subID] - if _, ok := subIDs[subID]; ok { - continue - } - subStates = append(subStates, &logservicepb.SubscriptionState{ - SubID: uint64(subID), - Span: dispatcherStat.tableSpan, - CheckpointTs: subStat.checkpointTs, - ResolvedTs: subStat.resolvedTs, - }) - subIDs[subID] = true - } - sort.Slice(subStates, func(i, j int) bool { - return subStates[i].SubID < subStates[j].SubID - }) - state.Subscriptions[tableID] = &logservicepb.SubscriptionStates{ - Subscriptions: subStates, +func (p *writeTaskPool) run(ctx context.Context) { + p.store.wg.Add(p.workerNum) + for i := 0; i < p.workerNum; i++ { + go func() { + defer p.store.wg.Done() + for { + select { + case <-ctx.Done(): + return + case data := <-p.dataCh: + // TODO: batch it + p.store.writeEvents(p.db, data.kvs, uint64(data.subID), data.tableID) + // a data events only contain data for one subscription + p.store.wakeSubscription(data.subID) } } - - message := messaging.NewSingleTargetMessage(e.coordinatorInfo.id, messaging.LogCoordinatorTopic, state) - e.dispatcherMeta.RUnlock() - // just ignore messagees fail to send - if err := e.messageCenter.SendEvent(message); err != nil { - log.Debug("send broadcast message to node failed", zap.Error(err)) - } - } + }() } } +func (e *eventStore) wakeSubscription(subID logpuller.SubscriptionID) { + e.ds.Wake(subID) <- subID +} + func (e *eventStore) Name() string { return appcontext.EventStore } @@ -342,6 +337,18 @@ func (e *eventStore) Name() string { func (e *eventStore) Run(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) + for _, p := range e.writeTaskPools { + p := p + eg.Go(func() error { + p.run(ctx) + return nil + }) + } + + eg.Go(func() error { + return e.puller.Run(ctx) + }) + eg.Go(func() error { return e.puller.Run(ctx) }) @@ -363,21 +370,11 @@ func (e *eventStore) Run(ctx context.Context) error { } func (e *eventStore) Close(ctx context.Context) error { - // notify and wait the background routine for sending batch signal to exit - e.closed.Store(true) - e.wgBatchSignal.Wait() - if err := e.puller.Close(ctx); err != nil { log.Error("failed to close log puller", zap.Error(err)) } + e.ds.Close() - // now we can be sure that e.eventChs won't be used - - // notify and wait background goroutines for writing events to exit before cloase pebble db - for i := range e.eventChs { - close(e.eventChs[i]) - } - // TODO: wait gc manager, because it may also write data to pebble db e.wg.Wait() for _, db := range e.dbs { @@ -388,12 +385,6 @@ func (e *eventStore) Close(ctx context.Context) error { return nil } -type subscriptionTag struct { - chIndex int - tableID int64 - uniqueKeyID uint64 -} - func (e *eventStore) RegisterDispatcher( dispatcherID common.DispatcherID, tableSpan *heartbeatpb.TableSpan, @@ -417,7 +408,6 @@ func (e *eventStore) RegisterDispatcher( stat := &dispatcherStat{ dispatcherID: dispatcherID, - notifier: notifier, tableSpan: tableSpan, checkpointTs: startTs, } @@ -437,17 +427,19 @@ func (e *eventStore) RegisterDispatcher( // check whether startTs is in the range [checkpointTs, resolvedTs] // for `[checkpointTs`: because we want data > startTs, so data <= checkpointTs == startTs deleted is ok. // for `resolvedTs]`: startTs == resolvedTs is a special case that no resolved ts has been recieved, so it is ok. - if subscriptionStat.checkpointTs <= startTs && startTs <= subscriptionStat.resolvedTs { + if subscriptionStat.checkpointTs.Load() <= startTs && startTs <= subscriptionStat.resolvedTs.Load() { stat.subID = candidateDispatcher.subID e.dispatcherMeta.dispatcherStats[dispatcherID] = stat // add dispatcher to existing subscription and return - subscriptionStat.ids[dispatcherID] = true + subscriptionStat.dispatchers.Lock() + subscriptionStat.dispatchers.notifiers[dispatcherID] = notifier + subscriptionStat.dispatchers.Unlock() candidateIDs[dispatcherID] = true e.dispatcherMeta.Unlock() log.Info("reuse existing subscription", zap.Any("dispatcherID", dispatcherID), zap.Uint64("subID", uint64(stat.subID)), - zap.Uint64("checkpointTs", subscriptionStat.checkpointTs), + zap.Uint64("checkpointTs", subscriptionStat.checkpointTs.Load()), zap.Uint64("startTs", startTs)) return true, nil } @@ -465,31 +457,33 @@ func (e *eventStore) RegisterDispatcher( // TODO: hash span is only needed when we need to reuse data after restart // (if we finally decide not to reuse data after restart, use round robin instead) // But if we need to share data for sub span, we need hash table id instead. - chIndex := common.HashTableSpan(tableSpan, len(e.eventChs)) - uniqueKeyID := genUniqueID() + chIndex := common.HashTableSpan(tableSpan, len(e.chs)) + // Note: don't hold any lock when call Subscribe // TODO: if puller event come before we initialize dispatcherStat, // maxEventCommitTs may not be updated correctly and cause data loss.(lost resolved ts is harmless) // To fix it, we need to alloc subID and initialize dispatcherStat before puller may send events. // That is allocate subID in a separate method. - stat.subID = e.puller.Subscribe(*tableSpan, startTs, subscriptionTag{ - chIndex: chIndex, - tableID: tableSpan.TableID, - uniqueKeyID: uniqueKeyID, - }) + stat.subID = e.puller.Subscribe(*tableSpan, startTs) metrics.EventStoreSubscriptionGauge.Inc() e.dispatcherMeta.Lock() defer e.dispatcherMeta.Unlock() e.dispatcherMeta.dispatcherStats[dispatcherID] = stat - e.dispatcherMeta.subscriptionStats[stat.subID] = &subscriptionStat{ - ids: map[common.DispatcherID]bool{dispatcherID: true}, - chIndex: chIndex, - checkpointTs: startTs, - resolvedTs: startTs, - maxEventCommitTs: startTs, - uniqueKeyID: uniqueKeyID, - } + subStat := &subscriptionStat{ + subID: stat.subID, + tableID: tableSpan.TableID, + dbIndex: chIndex, + eventCh: e.chs[chIndex], + } + subStat.dispatchers.notifiers = make(map[common.DispatcherID]ResolvedTsNotifier) + subStat.dispatchers.notifiers[dispatcherID] = notifier + subStat.checkpointTs.Store(startTs) + subStat.resolvedTs.Store(startTs) + subStat.maxEventCommitTs.Store(startTs) + e.dispatcherMeta.subscriptionStats[stat.subID] = subStat + e.ds.AddPath(stat.subID, subStat, dynstream.AreaSettings{}) + dispatchersForSameTable, ok := e.dispatcherMeta.tableToDispatchers[tableSpan.TableID] if !ok { e.dispatcherMeta.tableToDispatchers[tableSpan.TableID] = map[common.DispatcherID]bool{dispatcherID: true} @@ -519,8 +513,8 @@ func (e *eventStore) UnregisterDispatcher(dispatcherID common.DispatcherID) erro if !ok { log.Panic("should not happen") } - delete(subscriptionStat.ids, dispatcherID) - if len(subscriptionStat.ids) == 0 { + delete(subscriptionStat.dispatchers.notifiers, dispatcherID) + if len(subscriptionStat.dispatchers.notifiers) == 0 { delete(e.dispatcherMeta.subscriptionStats, subID) // TODO: do we need unlock before puller.Unsubscribe? e.puller.Unsubscribe(subID) @@ -535,6 +529,7 @@ func (e *eventStore) UnregisterDispatcher(dispatcherID common.DispatcherID) erro delete(dispatchersForSameTable, dispatcherID) if len(dispatchersForSameTable) == 0 { delete(e.dispatcherMeta.tableToDispatchers, tableID) + e.ds.RemovePath(subID) } return nil @@ -544,45 +539,45 @@ func (e *eventStore) UpdateDispatcherCheckpointTs( dispatcherID common.DispatcherID, checkpointTs uint64, ) error { - e.dispatcherMeta.RLock() - defer e.dispatcherMeta.RUnlock() - if stat, ok := e.dispatcherMeta.dispatcherStats[dispatcherID]; ok { - stat.checkpointTs = checkpointTs - subscriptionStat := e.dispatcherMeta.subscriptionStats[stat.subID] - // calculate the new checkpoint ts of the subscription - newCheckpointTs := uint64(0) - for dispatcherID := range subscriptionStat.ids { - dispatcherStat := e.dispatcherMeta.dispatcherStats[dispatcherID] - if newCheckpointTs == 0 || dispatcherStat.checkpointTs < newCheckpointTs { - newCheckpointTs = dispatcherStat.checkpointTs - } - } - if newCheckpointTs == 0 { - return nil - } - if newCheckpointTs < subscriptionStat.checkpointTs { - log.Panic("should not happen", - zap.Uint64("newCheckpointTs", newCheckpointTs), - zap.Uint64("oldCheckpointTs", subscriptionStat.checkpointTs)) - } - if subscriptionStat.checkpointTs < newCheckpointTs { - e.gcManager.addGCItem( - subscriptionStat.chIndex, - subscriptionStat.uniqueKeyID, - stat.tableSpan.TableID, - subscriptionStat.checkpointTs, - newCheckpointTs, - ) - if log.GetLevel() <= zap.DebugLevel { - log.Debug("update checkpoint ts", - zap.Any("dispatcherID", dispatcherID), - zap.Uint64("subID", uint64(stat.subID)), - zap.Uint64("newCheckpointTs", newCheckpointTs), - zap.Uint64("oldCheckpointTs", subscriptionStat.checkpointTs)) - subscriptionStat.checkpointTs = newCheckpointTs - } - } - } + // e.dispatcherMeta.RLock() + // defer e.dispatcherMeta.RUnlock() + // if stat, ok := e.dispatcherMeta.dispatcherStats[dispatcherID]; ok { + // stat.checkpointTs = checkpointTs + // subscriptionStat := e.dispatcherMeta.subscriptionStats[stat.subID] + // // calculate the new checkpoint ts of the subscription + // newCheckpointTs := uint64(0) + // for dispatcherID := range subscriptionStat.ids { + // dispatcherStat := e.dispatcherMeta.dispatcherStats[dispatcherID] + // if newCheckpointTs == 0 || dispatcherStat.checkpointTs < newCheckpointTs { + // newCheckpointTs = dispatcherStat.checkpointTs + // } + // } + // if newCheckpointTs == 0 { + // return nil + // } + // if newCheckpointTs < subscriptionStat.checkpointTs { + // log.Panic("should not happen", + // zap.Uint64("newCheckpointTs", newCheckpointTs), + // zap.Uint64("oldCheckpointTs", subscriptionStat.checkpointTs)) + // } + // if subscriptionStat.checkpointTs < newCheckpointTs { + // e.gcManager.addGCItem( + // subscriptionStat.chIndex, + // subscriptionStat.uniqueKeyID, + // stat.tableSpan.TableID, + // subscriptionStat.checkpointTs, + // newCheckpointTs, + // ) + // if log.GetLevel() <= zap.DebugLevel { + // log.Debug("update checkpoint ts", + // zap.Any("dispatcherID", dispatcherID), + // zap.Uint64("subID", uint64(stat.subID)), + // zap.Uint64("newCheckpointTs", newCheckpointTs), + // zap.Uint64("oldCheckpointTs", subscriptionStat.checkpointTs)) + // subscriptionStat.checkpointTs = newCheckpointTs + // } + // } + // } return nil } @@ -600,7 +595,7 @@ func (e *eventStore) GetDispatcherDMLEventState(dispatcherID common.DispatcherID subscriptionStat := e.dispatcherMeta.subscriptionStats[stat.subID] return true, DMLEventState{ // ResolvedTs: subscriptionStat.resolvedTs, - MaxEventCommitTs: subscriptionStat.maxEventCommitTs, + MaxEventCommitTs: subscriptionStat.maxEventCommitTs.Load(), } } @@ -613,18 +608,18 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com return nil, nil } subscriptionStat := e.dispatcherMeta.subscriptionStats[stat.subID] - if dataRange.StartTs < subscriptionStat.checkpointTs { + if dataRange.StartTs < subscriptionStat.checkpointTs.Load() { log.Panic("should not happen", zap.Any("dispatcherID", dispatcherID), - zap.Uint64("checkpointTs", subscriptionStat.checkpointTs), + zap.Uint64("checkpointTs", subscriptionStat.checkpointTs.Load()), zap.Uint64("startTs", dataRange.StartTs)) } - db := e.dbs[subscriptionStat.chIndex] + db := e.dbs[subscriptionStat.dbIndex] e.dispatcherMeta.RUnlock() // convert range before pass it to pebble: (startTs, endTs] is equal to [startTs + 1, endTs + 1) - start := EncodeKeyPrefix(subscriptionStat.uniqueKeyID, stat.tableSpan.TableID, dataRange.StartTs+1) - end := EncodeKeyPrefix(subscriptionStat.uniqueKeyID, stat.tableSpan.TableID, dataRange.EndTs+1) + start := EncodeKeyPrefix(uint64(subscriptionStat.subID), stat.tableSpan.TableID, dataRange.StartTs+1) + end := EncodeKeyPrefix(uint64(subscriptionStat.subID), stat.tableSpan.TableID, dataRange.EndTs+1) // TODO: optimize read performance iter, err := db.NewIter(&pebble.IterOptions{ LowerBound: start, @@ -669,7 +664,7 @@ func (e *eventStore) updateMetricsOnce() { e.dispatcherMeta.RLock() for _, subscriptionStat := range e.dispatcherMeta.subscriptionStats { // resolved ts lag - resolvedTs := atomic.LoadUint64(&subscriptionStat.resolvedTs) + resolvedTs := subscriptionStat.resolvedTs.Load() resolvedPhyTs := oracle.ExtractPhysical(resolvedTs) resolvedLag := float64(currentPhyTs-resolvedPhyTs) / 1e3 metrics.EventStoreDispatcherResolvedTsLagHist.Observe(float64(resolvedLag)) @@ -677,7 +672,7 @@ func (e *eventStore) updateMetricsOnce() { minResolvedTs = resolvedTs } // checkpoint ts lag - checkpointTs := subscriptionStat.checkpointTs + checkpointTs := subscriptionStat.checkpointTs.Load() watermarkPhyTs := oracle.ExtractPhysical(checkpointTs) watermarkLag := float64(currentPhyTs-watermarkPhyTs) / 1e3 metrics.EventStoreDispatcherWatermarkLagHist.Observe(float64(watermarkLag)) @@ -691,68 +686,11 @@ func (e *eventStore) updateMetricsOnce() { metrics.EventStoreResolvedTsLagGauge.Set(eventStoreResolvedTsLag) } -type DBBatchEvent struct { - batch *pebble.Batch - - maxEventCommitTsMap map[logpuller.SubscriptionID]uint64 - - resolvedTsMap map[logpuller.SubscriptionID]uint64 -} - -const ( - batchCommitSize int = 16 * 1024 * 1024 - batchCommitInterval = 20 * time.Millisecond -) - -// handleEvents fetch events from puller and write them to pebble db -func (e *eventStore) batchAndWriteEvents(ctx context.Context, db *pebble.DB, inputCh <-chan eventWithState) { - batchCh := make(chan *DBBatchEvent, 10240) - - // consume batch events - e.wg.Add(1) - go func() { - defer e.wg.Done() - for batchEvent := range batchCh { - batch := batchEvent.batch - if batch != nil && !batch.Empty() { - size := batch.Len() - if err := batch.Commit(pebble.NoSync); err != nil { - log.Panic("failed to commit pebble batch", zap.Error(err)) - } - metrics.EventStoreWriteBytes.Add(float64(size)) - } - - e.dispatcherMeta.RLock() - for subID, maxEventCommitTs := range batchEvent.maxEventCommitTsMap { - subscriptionStat, ok := e.dispatcherMeta.subscriptionStats[subID] - if !ok { - // the subscription is removed? - log.Warn("unknown subscriptionID", zap.Uint64("subID", uint64(subID))) - continue - } - subscriptionStat.maxEventCommitTs = maxEventCommitTs - } - // update resolved ts after commit successfully - for subID, resolvedTs := range batchEvent.resolvedTsMap { - subscriptionStat, ok := e.dispatcherMeta.subscriptionStats[subID] - if !ok { - // the subscription is removed? - log.Warn("unknown subscriptionID", zap.Uint64("subID", uint64(subID))) - continue - } - atomic.StoreUint64(&subscriptionStat.resolvedTs, resolvedTs) - for dispatcherID := range subscriptionStat.ids { - dispatcherStat := e.dispatcherMeta.dispatcherStats[dispatcherID] - dispatcherStat.notifier(resolvedTs) - } - } - e.dispatcherMeta.RUnlock() - } - }() - - addEvent2Batch := func(batch *pebble.Batch, item eventWithState) { - key := EncodeKey(item.uniqueID, item.tableID, item.raw) - value := item.raw.Encode() +func (e *eventStore) writeEvents(db *pebble.DB, items []*common.RawKVEntry, subID uint64, tableID int64) error { + batch := db.NewBatch() + for _, item := range items { + key := EncodeKey(subID, tableID, item) + value := item.Encode() compressedValue := e.encoder.EncodeAll(value, nil) ratio := float64(len(value)) / float64(len(compressedValue)) metrics.EventStoreCompressRatio.Set(ratio) @@ -760,86 +698,8 @@ func (e *eventStore) batchAndWriteEvents(ctx context.Context, db *pebble.DB, inp log.Panic("failed to update pebble batch", zap.Error(err)) } } - - // fetch events from channel and return a batch when the batch is full or timeout - doBatching := func() *DBBatchEvent { - var batch *pebble.Batch - resolvedTsMap := make(map[logpuller.SubscriptionID]uint64) - maxEventCommitTsMap := make(map[logpuller.SubscriptionID]uint64) - startToBatch := time.Now() - // Note: don't use select here for performance - for item := range inputCh { - if item.eventType == eventTypeBatchSignal { - if time.Since(startToBatch) >= batchCommitInterval { - if batch != nil || len(resolvedTsMap) > 0 { - return &DBBatchEvent{batch, maxEventCommitTsMap, resolvedTsMap} - } - } - continue - } - - if item.raw.IsResolved() { - resolvedTsMap[item.subID] = item.raw.CRTs - continue - } else { - if batch == nil { - batch = db.NewBatch() - } - if item.raw.CRTs > maxEventCommitTsMap[item.subID] { - maxEventCommitTsMap[item.subID] = item.raw.CRTs - } - addEvent2Batch(batch, item) - if batch.Len() >= batchCommitSize { - return &DBBatchEvent{batch, maxEventCommitTsMap, resolvedTsMap} - } - } - } - return nil - } - - for { - batchEvent := doBatching() - if batchEvent == nil { - // notify batch goroutine to exit - close(batchCh) - return - } - select { - case <-ctx.Done(): - // notify batch goroutine to exit - close(batchCh) - return - case batchCh <- batchEvent: - } - } -} - -// TODO: maybe we can remove it and just rely on resolved ts? Do it after we know how to share -func (e *eventStore) sendBatchSignalPeriodically(ctx context.Context, inputCh chan<- eventWithState) { - ticker := time.NewTicker(batchCommitInterval / 2) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if e.closed.Load() { - return - } - inputCh <- eventWithState{eventType: eventTypeBatchSignal} - } - } -} - -func (e *eventStore) consumeEvent(subID logpuller.SubscriptionID, raw *common.RawKVEntry, tag interface{}) { - subTag := tag.(subscriptionTag) - e.eventChs[subTag.chIndex] <- eventWithState{ - eventType: eventTypeNormal, - raw: raw, - subID: subID, - tableID: subTag.tableID, - uniqueID: subTag.uniqueKeyID, - } + metrics.EventStoreWriteBytes.Add(float64(batch.Len())) + return batch.Commit(pebble.NoSync) } func (e *eventStore) deleteEvents(dbIndex int, uniqueKeyID uint64, tableID int64, startTs uint64, endTs uint64) error { @@ -906,3 +766,64 @@ func (iter *eventStoreIter) Close() (int64, error) { iter.innerIter = nil return iter.rowCount, err } + +func (e *eventStore) handleMessage(_ context.Context, targetMessage *messaging.TargetMessage) error { + for _, msg := range targetMessage.Message { + switch msg.(type) { + case *common.LogCoordinatorBroadcastRequest: + e.coordinatorInfo.Lock() + e.coordinatorInfo.id = targetMessage.From + e.coordinatorInfo.Unlock() + default: + log.Panic("invalid message type", zap.Any("msg", msg)) + } + } + return nil +} + +func (e *eventStore) uploadStatePeriodically(ctx context.Context) error { + tick := time.NewTicker(30 * time.Second) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-tick.C: + e.dispatcherMeta.RLock() + state := &logservicepb.EventStoreState{ + Subscriptions: make(map[int64]*logservicepb.SubscriptionStates), + } + for tableID, dispatcherIDs := range e.dispatcherMeta.tableToDispatchers { + subStates := make([]*logservicepb.SubscriptionState, 0, len(dispatcherIDs)) + subIDs := make(map[logpuller.SubscriptionID]bool) + for dispatcherID := range dispatcherIDs { + dispatcherStat := e.dispatcherMeta.dispatcherStats[dispatcherID] + subID := dispatcherStat.subID + subStat := e.dispatcherMeta.subscriptionStats[subID] + if _, ok := subIDs[subID]; ok { + continue + } + subStates = append(subStates, &logservicepb.SubscriptionState{ + SubID: uint64(subID), + Span: dispatcherStat.tableSpan, + CheckpointTs: subStat.checkpointTs.Load(), + ResolvedTs: subStat.resolvedTs.Load(), + }) + subIDs[subID] = true + } + sort.Slice(subStates, func(i, j int) bool { + return subStates[i].SubID < subStates[j].SubID + }) + state.Subscriptions[tableID] = &logservicepb.SubscriptionStates{ + Subscriptions: subStates, + } + } + + message := messaging.NewSingleTargetMessage(e.coordinatorInfo.id, messaging.LogCoordinatorTopic, state) + e.dispatcherMeta.RUnlock() + // just ignore messagees fail to send + if err := e.messageCenter.SendEvent(message); err != nil { + log.Debug("send broadcast message to node failed", zap.Error(err)) + } + } + } +} diff --git a/logservice/eventstore/helper.go b/logservice/eventstore/helper.go new file mode 100644 index 00000000..87f18ed7 --- /dev/null +++ b/logservice/eventstore/helper.go @@ -0,0 +1,73 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventstore + +import ( + "github.com/pingcap/log" + "github.com/pingcap/ticdc/logservice/logpuller" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/utils/dynstream" +) + +const ( + DataGroupResolvedTs = 1 + DataGroupDML = 2 +) + +type eventsHandler struct { +} + +func (h *eventsHandler) Path(event eventWithSubID) logpuller.SubscriptionID { + return event.subID +} + +func (h *eventsHandler) Handle(subStat *subscriptionStat, events ...eventWithSubID) bool { + if events[0].raw.IsResolved() { + if len(events) != 1 { + log.Panic("should not happen") + } + subStat.resolvedTs.Store(events[0].raw.CRTs) + subStat.dispatchers.RLock() + defer subStat.dispatchers.RUnlock() + for _, notifier := range subStat.dispatchers.notifiers { + notifier(events[0].raw.CRTs) + } + return false + } + subStat.maxEventCommitTs.Store(events[len(events)-1].raw.CRTs) + items := make([]*common.RawKVEntry, 0, len(events)) + for _, e := range events { + items = append(items, e.raw) + } + subStat.eventCh <- kvEvents{ + kvs: items, + subID: subStat.subID, + tableID: subStat.tableID, + } + return true +} + +func (h *eventsHandler) GetSize(event eventWithSubID) int { return 0 } +func (h *eventsHandler) GetArea(path logpuller.SubscriptionID, dest *subscriptionStat) int { return 0 } +func (h *eventsHandler) GetTimestamp(event eventWithSubID) dynstream.Timestamp { return 0 } +func (h *eventsHandler) IsPaused(event eventWithSubID) bool { return false } + +func (h *eventsHandler) GetType(event eventWithSubID) dynstream.EventType { + if event.raw.IsResolved() { + return dynstream.EventType{DataGroup: DataGroupResolvedTs, Property: dynstream.PeriodicSignal} + } + return dynstream.EventType{DataGroup: DataGroupDML, Property: dynstream.BatchableData} +} + +func (h *eventsHandler) OnDrop(event eventWithSubID) {} diff --git a/logservice/logpuller/log_puller.go b/logservice/logpuller/log_puller.go index 86d164bd..5b820d68 100644 --- a/logservice/logpuller/log_puller.go +++ b/logservice/logpuller/log_puller.go @@ -47,9 +47,6 @@ type spanProgress struct { resolvedTsUpdated atomic.Int64 resolvedTs atomic.Uint64 - // tag is supplied at subscription time and is passed to the consume function. - tag interface{} - consume struct { // This lock is used to prevent the table progress from being // removed while consuming events. @@ -77,7 +74,7 @@ func (p *spanProgress) resolveLock(currentTime time.Time) { type LogPuller struct { client *SubscriptionClient pdClock pdutil.Clock - consume func(context.Context, *common.RawKVEntry, SubscriptionID, interface{}) error + consume func(context.Context, *common.RawKVEntry, SubscriptionID) error subscriptions struct { sync.RWMutex @@ -92,7 +89,7 @@ type LogPuller struct { func NewLogPuller( client *SubscriptionClient, pdClock pdutil.Clock, - consume func(context.Context, *common.RawKVEntry, SubscriptionID, interface{}) error, + consume func(context.Context, *common.RawKVEntry, SubscriptionID) error, ) *LogPuller { puller := &LogPuller{ client: client, @@ -177,7 +174,6 @@ func (p *LogPuller) Close(ctx context.Context) error { func (p *LogPuller) Subscribe( span heartbeatpb.TableSpan, startTs uint64, - tag interface{}, ) SubscriptionID { p.subscriptions.Lock() @@ -186,7 +182,6 @@ func (p *LogPuller) Subscribe( progress := &spanProgress{ span: span, subID: subID, - tag: tag, } progress.consume.f = func( @@ -197,7 +192,7 @@ func (p *LogPuller) Subscribe( progress.consume.RLock() defer progress.consume.RUnlock() if !progress.consume.removed { - return p.consume(ctx, raw, subID, progress.tag) + return p.consume(ctx, raw, subID) } return nil } diff --git a/logservice/logpuller/log_puller_multi_span.go b/logservice/logpuller/log_puller_multi_span.go index 7a83c71e..57dee707 100644 --- a/logservice/logpuller/log_puller_multi_span.go +++ b/logservice/logpuller/log_puller_multi_span.go @@ -72,7 +72,7 @@ func NewLogPullerMultiSpan( } // consumeWrapper may be called concurrently - consumeWrapper := func(ctx context.Context, entry *common.RawKVEntry, subID SubscriptionID, _ interface{}) error { + consumeWrapper := func(ctx context.Context, entry *common.RawKVEntry, subID SubscriptionID) error { if entry.IsResolved() { pullerWrapper.tryUpdatePendingResolvedTs(subID, entry.CRTs) return nil @@ -82,7 +82,7 @@ func NewLogPullerMultiSpan( pullerWrapper.innerPuller = NewLogPuller(client, pdClock, consumeWrapper) for _, span := range spans { - subID := pullerWrapper.innerPuller.Subscribe(span, startTs, nil) + subID := pullerWrapper.innerPuller.Subscribe(span, startTs) item := &resolvedTsItem{ resolvedTs: 0, }