diff --git a/pkg/upstream/manager.go b/pkg/upstream/manager.go index d9c69077f4f..b12f022653a 100644 --- a/pkg/upstream/manager.go +++ b/pkg/upstream/manager.go @@ -197,7 +197,9 @@ func (m *Manager) Tick(ctx context.Context, activeUpstreams := make(map[uint64]struct{}) for _, cf := range globalState.Changefeeds { - activeUpstreams[cf.Info.UpstreamID] = struct{}{} + if cf != nil && cf.Info != nil { + activeUpstreams[cf.Info.UpstreamID] = struct{}{} + } } m.mu.Lock() defer m.mu.Unlock() diff --git a/pkg/upstream/manager_test.go b/pkg/upstream/manager_test.go index 4bc544d9868..04ebc7efa80 100644 --- a/pkg/upstream/manager_test.go +++ b/pkg/upstream/manager_test.go @@ -55,16 +55,23 @@ func TestUpstream(t *testing.T) { require.NotNil(t, up) // test Tick - _ = manager.Tick(context.Background(), &orchestrator.GlobalReactorState{}) + globalState := &orchestrator.GlobalReactorState{ + Changefeeds: make(map[model.ChangeFeedID]*orchestrator.ChangefeedReactorState), + } + // add one changefeed state whose info is nil to make sure it won't be checked + globalState.Changefeeds[model.DefaultChangeFeedID("1")] = &orchestrator.ChangefeedReactorState{ + Info: nil, + } + _ = manager.Tick(context.Background(), globalState) mockClock.Add(maxIdleDuration * 2) manager.lastTickTime = atomic.Time{} - _ = manager.Tick(context.Background(), &orchestrator.GlobalReactorState{}) + _ = manager.Tick(context.Background(), globalState) // wait until up2 is closed for !up2.IsClosed() { } manager.lastTickTime = atomic.Time{} - _ = manager.Tick(context.Background(), &orchestrator.GlobalReactorState{}) - _ = manager.Tick(context.Background(), &orchestrator.GlobalReactorState{}) + _ = manager.Tick(context.Background(), globalState) + _ = manager.Tick(context.Background(), globalState) up, ok = manager.Get(testID) require.False(t, ok) require.Nil(t, up)