Skip to content

Commit

Permalink
[bug] migrate: if the session is closed, skip migration (#15776)
Browse files Browse the repository at this point in the history
create migration controller to control the migration and session:
1. wait migration finished before close the routine.
2. check routine if closed before do the migration.

Approved by: @zhangxu19830126, @daviszhen
  • Loading branch information
volgariver6 authored Apr 28, 2024
1 parent 4f6722b commit 0c34482
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 7 deletions.
75 changes: 75 additions & 0 deletions pkg/frontend/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2021 - 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package frontend

import "sync"

// migrateController is created in Routine and used to:
// 1. wait migration finished before close the routine.
// 2. check routine if closed before do the migration.
type migrateController struct {
sync.Mutex
migrateOnce sync.Once
// closed indicates if the session has been closed.
closed bool
// inProgress indicates if the migration is in progress.
inProgress bool
// c is the channel which is used to wait for the migration
// finished when close the routine.
c chan struct{}
}

func newMigrateController() *migrateController {
return &migrateController{
closed: false,
inProgress: false,
c: make(chan struct{}, 1),
}
}

// waitAndClose is called in the routine before the routine is cleaned up.
// if the migration is in progress, wait for it finished and set the closed to true.
func (mc *migrateController) waitAndClose() {
mc.Lock()
defer mc.Unlock()
if mc.inProgress {
<-mc.c
}
mc.closed = true
}

// beginMigrate is called before the migration started. It check if the routine
// has been closed.
func (mc *migrateController) beginMigrate() bool {
mc.Lock()
defer mc.Unlock()
if mc.closed {
return false
}
mc.inProgress = true
return true
}

// endMigrate is called after the migration finished. It notifies the routine that
// it could clean up and set in progress to false.
func (mc *migrateController) endMigrate() {
select {
case mc.c <- struct{}{}:
default:
}
mc.Lock()
defer mc.Unlock()
mc.inProgress = false
}
59 changes: 59 additions & 0 deletions pkg/frontend/migrate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2021 - 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package frontend

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNewMigrateController(t *testing.T) {
mc := newMigrateController()
assert.NotNil(t, mc)
}

func TestCloseOnly(t *testing.T) {
mc := newMigrateController()
assert.NotNil(t, mc)
mc.waitAndClose()
}

func TestFirstCloseThenMigrate(t *testing.T) {
mc := newMigrateController()
assert.NotNil(t, mc)
mc.waitAndClose()
assert.Equal(t, mc.beginMigrate(), false)
}

func TestFirstMigrateThenClose(t *testing.T) {
mc := newMigrateController()
assert.NotNil(t, mc)
assert.Equal(t, mc.beginMigrate(), true)
mc.endMigrate()
mc.waitAndClose()
}

func TestCloseWaitMigrate(t *testing.T) {
mc := newMigrateController()
assert.NotNil(t, mc)
assert.Equal(t, mc.beginMigrate(), true)
go func() {
<-time.After(time.Second)
mc.endMigrate()
}()
mc.waitAndClose()
}
17 changes: 13 additions & 4 deletions pkg/frontend/routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import (
"time"

"github.com/fagongzi/goetty/v2"
"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/defines"
Expand All @@ -33,6 +32,7 @@ import (
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/util/status"
"github.com/matrixorigin/matrixone/pkg/util/trace"
"go.uber.org/zap"
)

// Routine handles requests.
Expand Down Expand Up @@ -67,7 +67,7 @@ type Routine struct {

printInfoOnce bool

migrateOnce sync.Once
mc *migrateController
}

func (rt *Routine) needPrintSessionInfo() bool {
Expand Down Expand Up @@ -391,6 +391,9 @@ func (rt *Routine) cleanup() {
//step 1: cancel the query if there is a running query.
//step 2: close the connection.
rt.closeOnce.Do(func() {
// we should wait for the migration and close the migration controller.
rt.mc.waitAndClose()

ses := rt.getSession()
//step A: rollback the txn
if ses != nil {
Expand Down Expand Up @@ -424,7 +427,12 @@ func (rt *Routine) cleanup() {

func (rt *Routine) migrateConnectionTo(req *query.MigrateConnToRequest) error {
var err error
rt.migrateOnce.Do(func() {
rt.mc.migrateOnce.Do(func() {
if !rt.mc.beginMigrate() {
err = moerr.NewInternalErrorNoCtx("cannot start migrate as routine has been closed")
return
}
defer rt.mc.endMigrate()
ses := rt.getSession()
err = ses.Migrate(req)
})
Expand Down Expand Up @@ -453,6 +461,7 @@ func NewRoutine(ctx context.Context, protocol MysqlProtocol, parameters *config.
cancelRoutineFunc: cancelRoutineFunc,
parameters: parameters,
printInfoOnce: true,
mc: newMigrateController(),
}

return ri
Expand Down
7 changes: 4 additions & 3 deletions pkg/proxy/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,15 @@ func (t *tunnel) transfer(ctx context.Context) error {
}
if err := t.doReplaceConnection(ctx, false); err != nil {
v2.ProxyTransferFailCounter.Inc()
return err
t.logger.Error("failed to replace connection", zap.Error(err))
}
// After replace connections, restart pipes.
// Restart pipes even if the error happened in last step.
if err := t.kickoff(); err != nil {
t.logger.Error("failed to kickoff tunnel", zap.Error(err))
_ = t.Close()
} else {
v2.ProxyTransferSuccessCounter.Inc()
}
v2.ProxyTransferSuccessCounter.Inc()
return nil
}

Expand Down

0 comments on commit 0c34482

Please sign in to comment.