diff --git a/pkg/frontend/migrate.go b/pkg/frontend/migrate.go new file mode 100644 index 0000000000000..cba70e4403f16 --- /dev/null +++ b/pkg/frontend/migrate.go @@ -0,0 +1,75 @@ +// Copyright 2021 - 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package frontend + +import "sync" + +// migrateController is created in Routine and used to: +// 1. wait migration finished before close the routine. +// 2. check routine if closed before do the migration. +type migrateController struct { + sync.Mutex + migrateOnce sync.Once + // closed indicates if the session has been closed. + closed bool + // inProgress indicates if the migration is in progress. + inProgress bool + // c is the channel which is used to wait for the migration + // finished when close the routine. + c chan struct{} +} + +func newMigrateController() *migrateController { + return &migrateController{ + closed: false, + inProgress: false, + c: make(chan struct{}, 1), + } +} + +// waitAndClose is called in the routine before the routine is cleaned up. +// if the migration is in progress, wait for it finished and set the closed to true. +func (mc *migrateController) waitAndClose() { + mc.Lock() + defer mc.Unlock() + if mc.inProgress { + <-mc.c + } + mc.closed = true +} + +// beginMigrate is called before the migration started. It check if the routine +// has been closed. +func (mc *migrateController) beginMigrate() bool { + mc.Lock() + defer mc.Unlock() + if mc.closed { + return false + } + mc.inProgress = true + return true +} + +// endMigrate is called after the migration finished. It notifies the routine that +// it could clean up and set in progress to false. +func (mc *migrateController) endMigrate() { + select { + case mc.c <- struct{}{}: + default: + } + mc.Lock() + defer mc.Unlock() + mc.inProgress = false +} diff --git a/pkg/frontend/migrate_test.go b/pkg/frontend/migrate_test.go new file mode 100644 index 0000000000000..8c3a9e68f5ab7 --- /dev/null +++ b/pkg/frontend/migrate_test.go @@ -0,0 +1,59 @@ +// Copyright 2021 - 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package frontend + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewMigrateController(t *testing.T) { + mc := newMigrateController() + assert.NotNil(t, mc) +} + +func TestCloseOnly(t *testing.T) { + mc := newMigrateController() + assert.NotNil(t, mc) + mc.waitAndClose() +} + +func TestFirstCloseThenMigrate(t *testing.T) { + mc := newMigrateController() + assert.NotNil(t, mc) + mc.waitAndClose() + assert.Equal(t, mc.beginMigrate(), false) +} + +func TestFirstMigrateThenClose(t *testing.T) { + mc := newMigrateController() + assert.NotNil(t, mc) + assert.Equal(t, mc.beginMigrate(), true) + mc.endMigrate() + mc.waitAndClose() +} + +func TestCloseWaitMigrate(t *testing.T) { + mc := newMigrateController() + assert.NotNil(t, mc) + assert.Equal(t, mc.beginMigrate(), true) + go func() { + <-time.After(time.Second) + mc.endMigrate() + }() + mc.waitAndClose() +} diff --git a/pkg/frontend/routine.go b/pkg/frontend/routine.go index 81194f581b1ef..3abd9b2e15764 100644 --- a/pkg/frontend/routine.go +++ b/pkg/frontend/routine.go @@ -22,8 +22,7 @@ import ( "time" "github.com/fagongzi/goetty/v2" - "go.uber.org/zap" - + "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/runtime" "github.com/matrixorigin/matrixone/pkg/config" "github.com/matrixorigin/matrixone/pkg/defines" @@ -33,6 +32,7 @@ import ( v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/util/status" "github.com/matrixorigin/matrixone/pkg/util/trace" + "go.uber.org/zap" ) // Routine handles requests. @@ -67,7 +67,7 @@ type Routine struct { printInfoOnce bool - migrateOnce sync.Once + mc *migrateController } func (rt *Routine) needPrintSessionInfo() bool { @@ -391,6 +391,9 @@ func (rt *Routine) cleanup() { //step 1: cancel the query if there is a running query. //step 2: close the connection. rt.closeOnce.Do(func() { + // we should wait for the migration and close the migration controller. + rt.mc.waitAndClose() + ses := rt.getSession() //step A: rollback the txn if ses != nil { @@ -424,7 +427,12 @@ func (rt *Routine) cleanup() { func (rt *Routine) migrateConnectionTo(req *query.MigrateConnToRequest) error { var err error - rt.migrateOnce.Do(func() { + rt.mc.migrateOnce.Do(func() { + if !rt.mc.beginMigrate() { + err = moerr.NewInternalErrorNoCtx("cannot start migrate as routine has been closed") + return + } + defer rt.mc.endMigrate() ses := rt.getSession() err = ses.Migrate(req) }) @@ -453,6 +461,7 @@ func NewRoutine(ctx context.Context, protocol MysqlProtocol, parameters *config. cancelRoutineFunc: cancelRoutineFunc, parameters: parameters, printInfoOnce: true, + mc: newMigrateController(), } return ri diff --git a/pkg/proxy/tunnel.go b/pkg/proxy/tunnel.go index 834443254193b..7f18bb0efb779 100644 --- a/pkg/proxy/tunnel.go +++ b/pkg/proxy/tunnel.go @@ -367,14 +367,15 @@ func (t *tunnel) transfer(ctx context.Context) error { } if err := t.doReplaceConnection(ctx, false); err != nil { v2.ProxyTransferFailCounter.Inc() - return err + t.logger.Error("failed to replace connection", zap.Error(err)) } - // After replace connections, restart pipes. + // Restart pipes even if the error happened in last step. if err := t.kickoff(); err != nil { t.logger.Error("failed to kickoff tunnel", zap.Error(err)) _ = t.Close() + } else { + v2.ProxyTransferSuccessCounter.Inc() } - v2.ProxyTransferSuccessCounter.Inc() return nil }