diff --git a/pkg/plugin/BUILD.bazel b/pkg/plugin/BUILD.bazel new file mode 100644 index 0000000000000..60157479817b1 --- /dev/null +++ b/pkg/plugin/BUILD.bazel @@ -0,0 +1,55 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "plugin", + srcs = [ + "audit.go", + "const.go", + "errors.go", + "helper.go", + "plugin.go", + "spi.go", + ], + importpath = "github.com/pingcap/tidb/pkg/plugin", + visibility = ["//visibility:public"], + deps = [ + "//pkg/domain", + "//pkg/errno", + "//pkg/sessionctx/variable", + "//pkg/util", + "//pkg/util/dbterror", + "//pkg/util/logutil", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_log//:log", + "@io_etcd_go_etcd_client_v3//:client", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "plugin_test", + timeout = "short", + srcs = [ + "const_test.go", + "helper_test.go", + "integration_test.go", + "main_test.go", + "plugin_test.go", + "spi_test.go", + ], + embed = [":plugin"], + flaky = True, + shard_count = 12, + deps = [ + "//pkg/kv", + "//pkg/parser/mysql", + "//pkg/server", + "//pkg/session", + "//pkg/sessionctx/variable", + "//pkg/testkit", + "//pkg/testkit/testsetup", + "@com_github_stretchr_testify//require", + "@io_etcd_go_etcd_client_v3//:client", + "@org_uber_go_goleak//:goleak", + ], +) diff --git a/plugin/plugin.go b/plugin/plugin.go index b3aac9694193f..4d8327c9378d9 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -20,6 +20,7 @@ import ( gplugin "plugin" "strconv" "sync/atomic" + "time" "unsafe" "github.com/pingcap/errors" @@ -245,11 +246,48 @@ type flushWatcher struct { plugin *Plugin } +<<<<<<< HEAD:plugin/plugin.go +======= +func (w *flushWatcher) refreshPluginState() error { + disabled, err := w.getPluginDisabledFlag() + if err != nil { + logutil.BgLogger().Error("get plugin disabled flag failure", zap.String("plugin", w.manifest.Name), zap.Error(err)) + return err + } + if disabled { + atomic.StoreUint32(&w.manifest.flushWatcher.plugin.Disabled, 1) + } else { + atomic.StoreUint32(&w.manifest.flushWatcher.plugin.Disabled, 0) + } + err = w.manifest.OnFlush(w.ctx, w.manifest) + if err != nil { + logutil.BgLogger().Error("plugin flush event failed", zap.String("plugin", w.manifest.Name), zap.Error(err)) + return err + } + return nil +} +>>>>>>> 169617d22dc (plugin: fix bug that watch loop will refresh frequently when channel closed (#49275)):pkg/plugin/plugin.go func (w *flushWatcher) watchLoop() { - watchChan := w.etcd.Watch(w.ctx, w.path) + const reWatchInterval = time.Second * 5 + logutil.BgLogger().Info("plugin flushWatcher loop started", zap.String("plugin", w.manifest.Name)) + for w.ctx.Err() == nil { + ch := w.etcd.Watch(w.ctx, w.path) + if exit := w.watchLoopWithChan(ch); exit { + break + } + + logutil.BgLogger().Info( + "plugin flushWatcher old chan closed, restart loop later", + zap.String("plugin", w.manifest.Name), + zap.Duration("after", reWatchInterval)) + } +} + +func (w *flushWatcher) watchLoopWithChan(ch clientv3.WatchChan) (exit bool) { for { select { case <-w.ctx.Done(): +<<<<<<< HEAD:plugin/plugin.go return case <-watchChan: disabled, err := w.getPluginDisabledFlag() @@ -265,6 +303,15 @@ func (w *flushWatcher) watchLoop() { if err != nil { logutil.BgLogger().Error("notify plugin flush event failed", zap.String("plugin", w.manifest.Name), zap.Error(err)) } +======= + return true + case _, ok := <-ch: + if !ok { + return false + } + logutil.BgLogger().Info("plugin flushWatcher detected event to reload plugin config", zap.String("plugin", w.manifest.Name)) + _ = w.refreshPluginState() +>>>>>>> 169617d22dc (plugin: fix bug that watch loop will refresh frequently when channel closed (#49275)):pkg/plugin/plugin.go } } } diff --git a/plugin/plugin_test.go b/plugin/plugin_test.go index cfcc85ef310f3..da1da115df210 100644 --- a/plugin/plugin_test.go +++ b/plugin/plugin_test.go @@ -18,10 +18,17 @@ import ( "context" "io" "strconv" + "sync/atomic" "testing" +<<<<<<< HEAD:plugin/plugin_test.go +======= + "time" + "unsafe" +>>>>>>> 169617d22dc (plugin: fix bug that watch loop will refresh frequently when channel closed (#49275)):pkg/plugin/plugin_test.go "github.com/pingcap/tidb/sessionctx/variable" "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" ) func TestLoadPluginSuccess(t *testing.T) { @@ -242,3 +249,43 @@ func TestPluginsClone(t *testing.T) { require.Equal(t, uint16(1), cps.versions["whitelist"]) require.Len(t, cps.dyingPlugins, 1) } + +func TestPluginWatcherLoop(t *testing.T) { + // exit when ctx done + ctx, cancel := context.WithCancel(context.Background()) + watcher := &flushWatcher{ + ctx: ctx, + manifest: &Manifest{ + Name: "test", + }, + } + ch := make(chan clientv3.WatchResponse) + var cancelled atomic.Bool + go func() { + time.Sleep(10 * time.Millisecond) + cancelled.Store(true) + cancel() + }() + exit := watcher.watchLoopWithChan(ch) + require.True(t, exit) + require.True(t, cancelled.Load()) + + // exit when ch closed + watcher = &flushWatcher{ + ctx: context.Background(), + manifest: &Manifest{ + Name: "test", + }, + } + + var closed atomic.Bool + ch = make(chan clientv3.WatchResponse) + go func() { + time.Sleep(10 * time.Millisecond) + closed.Store(true) + close(ch) + }() + exit = watcher.watchLoopWithChan(ch) + require.False(t, exit) + require.True(t, cancelled.Load()) +}