diff --git a/internal/flushcommon/pipeline/data_sync_service.go b/internal/flushcommon/pipeline/data_sync_service.go index 03822958541cb..ee90f6e5f39d3 100644 --- a/internal/flushcommon/pipeline/data_sync_service.go +++ b/internal/flushcommon/pipeline/data_sync_service.go @@ -247,12 +247,15 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams, // init flowgraph fg := flowgraph.NewTimeTickedFlowGraph(params.Ctx) - dmStreamNode, err := newDmInputNode(initCtx, params.DispClient, info.GetVchan().GetSeekPosition(), config, input) + + var dmStreamNode *flowgraph.InputNode + dmStreamNode, err = newDmInputNode(initCtx, params.DispClient, info.GetVchan().GetSeekPosition(), config, input) if err != nil { return nil, err } - ddNode, err := newDDNode( + var ddNode *ddNode + ddNode, err = newDDNode( params.Ctx, collectionID, channelName, @@ -267,7 +270,8 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams, } writeNode := newWriteNode(params.Ctx, params.WriteBufferManager, ds.timetickSender, config) - ttNode, err := newTTNode(config, params.WriteBufferManager, params.CheckpointUpdater) + var ttNode *ttNode + ttNode, err = newTTNode(config, params.WriteBufferManager, params.CheckpointUpdater) if err != nil { return nil, err }