Skip to content

Commit

Permalink
fix: move sync first time tick into single function
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Jul 2, 2024
1 parent f8b3336 commit 5b7025b
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 23 deletions.
2 changes: 2 additions & 0 deletions internal/streamingnode/server/wal/adaptor/scanner_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) {
if msg.MessageType() == message.MessageTypeTimeTick {
// If the time tick message incoming,
// the reorder buffer can be consumed into a pending queue with latest timetick.

// TODO: !!! should we drop the unexpected broken timetick rule message.
s.pendingQueue.Add(s.reorderBuffer.PopUtilTimeTick(msg.TimeTick()))
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/ack"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
)

var _ interceptors.AppendInterceptor = (*timeTickAppendInterceptor)(nil)
Expand Down Expand Up @@ -65,6 +66,37 @@ func (impl *timeTickAppendInterceptor) executeSyncTimeTick(interval time.Duratio
logger.Info("start to sync time tick...")
defer logger.Info("sync time tick stopped")

if err := impl.blockUntilSyncTimeTickReady(underlyingWALImpls); err != nil {
logger.Warn("sync first time tick failed", zap.Error(err))
return

Check warning on line 71 in internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go#L70-L71

Added lines #L70 - L71 were not covered by tests
}

// interceptor is ready, wait for the final wal object is ready to use.
wal := param.WAL.Get()

// TODO: sync time tick message to wal periodically.
// Add a trigger on `AckManager` to sync time tick message without periodically.
// `AckManager` gather detail information, time tick sync can check it and make the message between tt more smaller.
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-impl.ctx.Done():
return
case <-ticker.C:
if err := impl.sendTsMsg(impl.ctx, wal.Append); err != nil {
log.Warn("send time tick sync message failed", zap.Error(err))

Check warning on line 88 in internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go#L88

Added line #L88 was not covered by tests
}
}
}
}

// blockUntilSyncTimeTickReady blocks until the first time tick message is sent.
func (impl *timeTickAppendInterceptor) blockUntilSyncTimeTickReady(underlyingWALImpls walimpls.WALImpls) error {
logger := log.With(zap.Any("channel", underlyingWALImpls.Channel()))
logger.Info("start to sync first time tick")
defer logger.Info("sync first time tick done")

// Send first timetick message to wal before interceptor is ready.
for count := 0; ; count++ {
// Sent first timetick message to wal before ready.
Expand All @@ -75,40 +107,20 @@ func (impl *timeTickAppendInterceptor) executeSyncTimeTick(interval time.Duratio
// !!! Send a timetick message into walimpls directly is safe.
select {
case <-impl.ctx.Done():
return
return impl.ctx.Err()

Check warning on line 110 in internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go#L110

Added line #L110 was not covered by tests
default:
}
if err := impl.sendTsMsg(impl.ctx, underlyingWALImpls.Append); err != nil {
log.Warn("send first timestamp message failed", zap.Error(err), zap.Int("retryCount", count))
logger.Warn("send first timestamp message failed", zap.Error(err), zap.Int("retryCount", count))

Check warning on line 114 in internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go#L114

Added line #L114 was not covered by tests
// TODO: exponential backoff.
time.Sleep(50 * time.Millisecond)
continue
}
break
}

// interceptor is ready now.
close(impl.ready)
logger.Info("start to sync time ready")

// interceptor is ready, wait for the final wal object is ready to use.
wal := param.WAL.Get()

// TODO: sync time tick message to wal periodically.
// Add a trigger on `AckManager` to sync time tick message without periodically.
// `AckManager` gather detail information, time tick sync can check it and make the message between tt more smaller.
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-impl.ctx.Done():
return
case <-ticker.C:
if err := impl.sendTsMsg(impl.ctx, wal.Append); err != nil {
log.Warn("send time tick sync message failed", zap.Error(err))
}
}
}
return nil
}

// syncAcknowledgedDetails syncs the timestamp acknowledged details.
Expand Down

0 comments on commit 5b7025b

Please sign in to comment.