Skip to content

Commit

Permalink
capture(ticdc): disable best effort graceful shutdown (#6647)
Browse files Browse the repository at this point in the history
Because TiCDC release-6.2 does not support cross version upgrade and stops
service during upgrade, it is impossible to perform graceful shutdown.

Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus authored Aug 8, 2022
1 parent bd21a6e commit c0eaa00
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
9 changes: 8 additions & 1 deletion cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type captureImpl struct {
processorManager processor.Manager
liveness model.Liveness
config *config.ServerConfig
disableDrain bool

pdEndpoints []string
ownerMu sync.Mutex
Expand Down Expand Up @@ -114,13 +115,19 @@ type captureImpl struct {
newOwner func(upstreamManager *upstream.Manager) owner.Owner
}

// disableDrain is a boolean that disable capture drain feature.
// Because TiCDC release-6.2 does not support cross version upgrade and stops
// service during upgrade, it is impossible to perform graceful shutdown.
const disableDrain = true

// NewCapture returns a new Capture instance
func NewCapture(pdEndpoints []string,
etcdClient etcd.CDCEtcdClient,
grpcService *p2p.ServerWrapper,
) Capture {
conf := config.GetGlobalServerConfig()
return &captureImpl{
disableDrain: disableDrain,
config: config.GetGlobalServerConfig(),
liveness: model.LivenessCaptureAlive,
EtcdClient: etcdClient,
Expand Down Expand Up @@ -666,7 +673,7 @@ func (c *captureImpl) Drain(ctx context.Context) <-chan struct{} {
}

func (c *captureImpl) drainImpl(ctx context.Context) bool {
if !c.config.Debug.EnableSchedulerV3 {
if !c.config.Debug.EnableSchedulerV3 || c.disableDrain {
// Skip drain as two phase scheduler is disabled.
return true
}
Expand Down
42 changes: 42 additions & 0 deletions cdc/capture/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,3 +461,45 @@ func TestCampaignLiveness(t *testing.T) {

wg.Wait()
}

func TestDisableDrainByDefault(t *testing.T) {
t.Parallel()

ctx := context.Background()
ctrl := gomock.NewController(t)
mo := mock_owner.NewMockOwner(ctrl)
mm := mock_processor.NewMockManager(ctrl)
me := mock_etcd.NewMockCDCEtcdClient(ctrl)
cp := &captureImpl{
EtcdClient: me,
info: &model.CaptureInfo{
ID: "capture-for-test",
AdvertiseAddr: "127.0.0.1", Version: "test",
},
processorManager: mm,
owner: mo,
config: config.GetDefaultServerConfig(),
disableDrain: disableDrain,
}
cp.config.Debug.EnableSchedulerV3 = true
require.Equal(t, model.LivenessCaptureAlive, cp.Liveness())

// Owner can not be resigned.
mo.EXPECT().Query(gomock.Any(), gomock.Any()).Do(func(
query *owner.Query, done chan<- error,
) {
// Two captures to allow owner resign.
query.Data = []*model.CaptureInfo{{}, {}}
close(done)
}).AnyTimes()
mo.EXPECT().AsyncStop().AnyTimes()

done := cp.Drain(ctx)

// Must skipping drain by default.
select {
case <-time.After(3 * time.Second):
require.Fail(t, "timeout")
case <-done:
}
}

0 comments on commit c0eaa00

Please sign in to comment.