Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#49275
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lcwangchao authored and ti-chi-bot committed Dec 8, 2023
1 parent a102bd1 commit cf7ddc2
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 1 deletion.
55 changes: 55 additions & 0 deletions pkg/plugin/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "plugin",
srcs = [
"audit.go",
"const.go",
"errors.go",
"helper.go",
"plugin.go",
"spi.go",
],
importpath = "github.com/pingcap/tidb/pkg/plugin",
visibility = ["//visibility:public"],
deps = [
"//pkg/domain",
"//pkg/errno",
"//pkg/sessionctx/variable",
"//pkg/util",
"//pkg/util/dbterror",
"//pkg/util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@io_etcd_go_etcd_client_v3//:client",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "plugin_test",
timeout = "short",
srcs = [
"const_test.go",
"helper_test.go",
"integration_test.go",
"main_test.go",
"plugin_test.go",
"spi_test.go",
],
embed = [":plugin"],
flaky = True,
shard_count = 12,
deps = [
"//pkg/kv",
"//pkg/parser/mysql",
"//pkg/server",
"//pkg/session",
"//pkg/sessionctx/variable",
"//pkg/testkit",
"//pkg/testkit/testsetup",
"@com_github_stretchr_testify//require",
"@io_etcd_go_etcd_client_v3//:client",
"@org_uber_go_goleak//:goleak",
],
)
49 changes: 48 additions & 1 deletion plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
gplugin "plugin"
"strconv"
"sync/atomic"
"time"
"unsafe"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -245,11 +246,48 @@ type flushWatcher struct {
plugin *Plugin
}

<<<<<<< HEAD:plugin/plugin.go
=======
func (w *flushWatcher) refreshPluginState() error {
disabled, err := w.getPluginDisabledFlag()
if err != nil {
logutil.BgLogger().Error("get plugin disabled flag failure", zap.String("plugin", w.manifest.Name), zap.Error(err))
return err
}
if disabled {
atomic.StoreUint32(&w.manifest.flushWatcher.plugin.Disabled, 1)
} else {
atomic.StoreUint32(&w.manifest.flushWatcher.plugin.Disabled, 0)
}
err = w.manifest.OnFlush(w.ctx, w.manifest)
if err != nil {
logutil.BgLogger().Error("plugin flush event failed", zap.String("plugin", w.manifest.Name), zap.Error(err))
return err
}
return nil
}
>>>>>>> 169617d22dc (plugin: fix bug that watch loop will refresh frequently when channel closed (#49275)):pkg/plugin/plugin.go
func (w *flushWatcher) watchLoop() {
watchChan := w.etcd.Watch(w.ctx, w.path)
const reWatchInterval = time.Second * 5
logutil.BgLogger().Info("plugin flushWatcher loop started", zap.String("plugin", w.manifest.Name))
for w.ctx.Err() == nil {
ch := w.etcd.Watch(w.ctx, w.path)
if exit := w.watchLoopWithChan(ch); exit {
break
}

logutil.BgLogger().Info(
"plugin flushWatcher old chan closed, restart loop later",
zap.String("plugin", w.manifest.Name),
zap.Duration("after", reWatchInterval))
}
}

func (w *flushWatcher) watchLoopWithChan(ch clientv3.WatchChan) (exit bool) {
for {
select {
case <-w.ctx.Done():
<<<<<<< HEAD:plugin/plugin.go
return
case <-watchChan:
disabled, err := w.getPluginDisabledFlag()
Expand All @@ -265,6 +303,15 @@ func (w *flushWatcher) watchLoop() {
if err != nil {
logutil.BgLogger().Error("notify plugin flush event failed", zap.String("plugin", w.manifest.Name), zap.Error(err))
}
=======
return true
case _, ok := <-ch:
if !ok {
return false
}
logutil.BgLogger().Info("plugin flushWatcher detected event to reload plugin config", zap.String("plugin", w.manifest.Name))
_ = w.refreshPluginState()
>>>>>>> 169617d22dc (plugin: fix bug that watch loop will refresh frequently when channel closed (#49275)):pkg/plugin/plugin.go
}
}
}
Expand Down
47 changes: 47 additions & 0 deletions plugin/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@ import (
"context"
"io"
"strconv"
"sync/atomic"
"testing"
<<<<<<< HEAD:plugin/plugin_test.go
=======
"time"
"unsafe"
>>>>>>> 169617d22dc (plugin: fix bug that watch loop will refresh frequently when channel closed (#49275)):pkg/plugin/plugin_test.go

"github.com/pingcap/tidb/sessionctx/variable"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
)

func TestLoadPluginSuccess(t *testing.T) {
Expand Down Expand Up @@ -242,3 +249,43 @@ func TestPluginsClone(t *testing.T) {
require.Equal(t, uint16(1), cps.versions["whitelist"])
require.Len(t, cps.dyingPlugins, 1)
}

func TestPluginWatcherLoop(t *testing.T) {
// exit when ctx done
ctx, cancel := context.WithCancel(context.Background())
watcher := &flushWatcher{
ctx: ctx,
manifest: &Manifest{
Name: "test",
},
}
ch := make(chan clientv3.WatchResponse)
var cancelled atomic.Bool
go func() {
time.Sleep(10 * time.Millisecond)
cancelled.Store(true)
cancel()
}()
exit := watcher.watchLoopWithChan(ch)
require.True(t, exit)
require.True(t, cancelled.Load())

// exit when ch closed
watcher = &flushWatcher{
ctx: context.Background(),
manifest: &Manifest{
Name: "test",
},
}

var closed atomic.Bool
ch = make(chan clientv3.WatchResponse)
go func() {
time.Sleep(10 * time.Millisecond)
closed.Store(true)
close(ch)
}()
exit = watcher.watchLoopWithChan(ch)
require.False(t, exit)
require.True(t, cancelled.Load())
}

0 comments on commit cf7ddc2

Please sign in to comment.