Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: integrate plugin framework with TiDB (#9006) #9888

Merged
merged 2 commits into from
Mar 25, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ path_to_add := $(addsuffix /bin,$(subst :,/bin:,$(GOPATH)))
export PATH := $(path_to_add):$(PATH)

GO := GO111MODULE=on go
GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG)
GOBUILD := CGO_ENABLED=1 $(GO) build $(BUILD_FLAG)
GOTEST := CGO_ENABLED=1 $(GO) test -p 3
OVERALLS := CGO_ENABLED=1 overalls
GOVERALLS := goveralls
Expand Down
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type Config struct {
Binlog Binlog `toml:"binlog" json:"binlog"`
CompatibleKillQuery bool `toml:"compatible-kill-query" json:"compatible-kill-query"`
CheckMb4ValueInUtf8 bool `toml:"check-mb4-value-in-utf8" json:"check-mb4-value-in-utf8"`
Plugin Plugin `toml:"plugin" json:"plugin"`
}

// Log is the log section of config.
Expand Down Expand Up @@ -240,6 +241,12 @@ type TiKVClient struct {
CommitTimeout string `toml:"commit-timeout" json:"commit-timeout"`
}

// Plugin is the config for plugin
type Plugin struct {
Dir string `toml:"dir" json:"dir"`
Load string `toml:"load" json:"load"`
}

// Binlog is the config for binlog.
type Binlog struct {
Enable bool `toml:"enable" json:"enable"`
Expand Down
7 changes: 7 additions & 0 deletions executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"bytes"
"fmt"
"github.com/pingcap/tidb/plugin"
lysu marked this conversation as resolved.
Show resolved Hide resolved
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -807,6 +808,12 @@ func (e *ShowExec) fetchShowProcedureStatus() error {
}

func (e *ShowExec) fetchShowPlugins() error {
tiPlugins := plugin.GetAll()
for _, ps := range tiPlugins {
for _, p := range ps {
e.appendRow([]interface{}{p.Name, p.State.String(), p.Kind.String(), p.Path, p.License, strconv.Itoa(int(p.Version))})
}
}
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1631,9 +1631,9 @@ func buildShowSchema(s *ast.ShowStmt) (schema *expression.Schema) {
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeLonglong,
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar}
case ast.ShowPlugins:
names = []string{"Name", "Status", "Type", "Library", "License"}
names = []string{"Name", "Status", "Type", "Library", "License", "Version"}
ftypes = []byte{
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar,
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar,
}
case ast.ShowProcessList:
names = []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info", "Mem"}
Expand Down
80 changes: 57 additions & 23 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"github.com/pingcap/tidb/plugin"
"net"
"strings"
"sync"
Expand Down Expand Up @@ -1168,6 +1169,21 @@ func loadSystemTZ(se *session) (string, error) {

// BootstrapSession runs the first time when the TiDB server start.
func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
cfg := config.GetGlobalConfig()
if len(cfg.Plugin.Load) > 0 {
err := plugin.Init(context.Background(), plugin.Config{
Plugins: strings.Split(cfg.Plugin.Load, ","),
PluginDir: cfg.Plugin.Dir,
GlobalSysVar: &variable.SysVars,
PluginVarNames: &variable.PluginVarNames,
})
if err != nil {
return nil, err
}
}

initLoadCommonGlobalVarsSQL()

ver := getStoreBootstrapVersion(store)
if ver == notBootstrapped {
runInBootstrapSession(store, bootstrap)
Expand Down Expand Up @@ -1338,33 +1354,51 @@ func finishBootstrap(store kv.Storage) {
}

const quoteCommaQuote = "', '"
const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" +
variable.AutocommitVar + quoteCommaQuote +
variable.SQLModeVar + quoteCommaQuote +
variable.MaxAllowedPacket + quoteCommaQuote +
variable.TimeZone + quoteCommaQuote +
variable.BlockEncryptionMode + quoteCommaQuote +

var builtinGlobalVariable = []string{
variable.AutocommitVar,
variable.SQLModeVar,
variable.MaxAllowedPacket,
variable.TimeZone,
variable.BlockEncryptionMode,
/* TiDB specific global variables: */
variable.TiDBSkipUTF8Check + quoteCommaQuote +
variable.TiDBIndexJoinBatchSize + quoteCommaQuote +
variable.TiDBIndexLookupSize + quoteCommaQuote +
variable.TiDBIndexLookupConcurrency + quoteCommaQuote +
variable.TiDBIndexLookupJoinConcurrency + quoteCommaQuote +
variable.TiDBIndexSerialScanConcurrency + quoteCommaQuote +
variable.TiDBHashJoinConcurrency + quoteCommaQuote +
variable.TiDBProjectionConcurrency + quoteCommaQuote +
variable.TiDBHashAggPartialConcurrency + quoteCommaQuote +
variable.TiDBHashAggFinalConcurrency + quoteCommaQuote +
variable.TiDBBackoffLockFast + quoteCommaQuote +
variable.TiDBConstraintCheckInPlace + quoteCommaQuote +
variable.TiDBOptInSubqUnFolding + quoteCommaQuote +
variable.TiDBDistSQLScanConcurrency + quoteCommaQuote +
variable.TiDBMaxChunkSize + quoteCommaQuote +
variable.TiDBRetryLimit + quoteCommaQuote +
variable.TiDBDisableTxnAutoRetry + "')"
variable.TiDBSkipUTF8Check,
variable.TiDBIndexJoinBatchSize,
variable.TiDBIndexLookupSize,
variable.TiDBIndexLookupConcurrency,
variable.TiDBIndexLookupJoinConcurrency,
variable.TiDBIndexSerialScanConcurrency,
variable.TiDBHashJoinConcurrency,
variable.TiDBProjectionConcurrency,
variable.TiDBHashAggPartialConcurrency,
variable.TiDBHashAggFinalConcurrency,
variable.TiDBBackoffLockFast,
variable.TiDBConstraintCheckInPlace,
variable.TiDBOptInSubqUnFolding,
variable.TiDBDistSQLScanConcurrency,
variable.TiDBMaxChunkSize,
variable.TiDBRetryLimit,
variable.TiDBDisableTxnAutoRetry,
}

var (
loadCommonGlobalVarsSQLOnce sync.Once
loadCommonGlobalVarsSQL string
)

func initLoadCommonGlobalVarsSQL() {
loadCommonGlobalVarsSQLOnce.Do(func() {
vars := append(make([]string, 0, len(builtinGlobalVariable)+len(variable.PluginVarNames)), builtinGlobalVariable...)
if len(variable.PluginVarNames) > 0 {
vars = append(vars, variable.PluginVarNames...)
}
loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" + strings.Join(vars, quoteCommaQuote) + "')"
})
}

// loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session.
func (s *session) loadCommonGlobalVariablesIfNeeded() error {
initLoadCommonGlobalVarsSQL()
vars := s.sessionVars
if vars.CommonGlobalLoaded {
return nil
Expand Down
10 changes: 9 additions & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func GetSysVar(name string) *SysVar {
return SysVars[name]
}

// PluginVarNames is global plugin var names set.
var PluginVarNames []string

// Variable error codes.
const (
CodeUnknownStatusVar terror.ErrCode = 1
Expand Down Expand Up @@ -312,7 +315,8 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, "sort_buffer_size", "262144"},
{ScopeGlobal, "innodb_flush_neighbors", "1"},
{ScopeNone, "innodb_use_sys_malloc", "ON"},
{ScopeNone, "plugin_dir", "/usr/local/mysql/lib/plugin/"},
{ScopeSession, PluginLoad, ""},
{ScopeNone, PluginDir, "/data/deploy/plugin"},
{ScopeNone, "performance_schema_max_socket_classes", "10"},
{ScopeNone, "performance_schema_max_stage_classes", "150"},
{ScopeGlobal, "innodb_purge_batch_size", "300"},
Expand Down Expand Up @@ -777,6 +781,10 @@ const (
SyncBinlog = "sync_binlog"
// BlockEncryptionMode is the name for 'block_encryption_mode' system variable.
BlockEncryptionMode = "block_encryption_mode"
// PluginDir is the name of 'plugin_dir' system variable.
PluginDir = "plugin_dir"
// PluginLoad is the name of 'plugin_load' system variable.
PluginLoad = "plugin_load"
)

// GlobalVarAccessor is the interface for accessing global scope system and status variables.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func GetSessionOnlySysVars(s *SessionVars, key string) (string, bool, error) {
return strconv.FormatUint(atomic.LoadUint64(&config.GetGlobalConfig().Log.SlowThreshold), 10), true, nil
case TiDBQueryLogMaxLen:
return strconv.FormatUint(atomic.LoadUint64(&config.GetGlobalConfig().Log.QueryLogMaxLen), 10), true, nil
case PluginDir:
return config.GetGlobalConfig().Plugin.Dir, true, nil
case PluginLoad:
return config.GetGlobalConfig().Plugin.Load, true, nil
case TiDBCheckMb4ValueInUtf8:
return BoolToIntStr(config.GetGlobalConfig().CheckMb4ValueInUtf8), true, nil
}
Expand Down
10 changes: 10 additions & 0 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ const (
nmMetricsInterval = "metrics-interval"
nmDdlLease = "lease"
nmTokenLimit = "token-limit"
nmPluginDir = "plugin-dir"
nmPluginLoad = "plugin-load"

nmProxyProtocolNetworks = "proxy-protocol-networks"
nmProxyProtocolHeaderTimeout = "proxy-protocol-header-timeout"
Expand All @@ -94,6 +96,8 @@ var (
runDDL = flagBoolean(nmRunDDL, true, "run ddl worker on this tidb-server")
ddlLease = flag.String(nmDdlLease, "45s", "schema lease duration, very dangerous to change only if you know what you do")
tokenLimit = flag.Int(nmTokenLimit, 1000, "the limit of concurrent executed sessions")
pluginDir = flag.String(nmPluginDir, "/data/deploy/plugin", "the folder that hold plugin")
pluginLoad = flag.String(nmPluginLoad, "", "wait load plugin name(seperated by comma)")

// Log
logLevel = flag.String(nmLogLevel, "info", "log level: info, debug, warn, error, fatal")
Expand Down Expand Up @@ -315,6 +319,12 @@ func overrideConfig() {
if actualFlags[nmTokenLimit] {
cfg.TokenLimit = uint(*tokenLimit)
}
if actualFlags[nmPluginLoad] {
cfg.Plugin.Load = *pluginLoad
}
if actualFlags[nmPluginDir] {
cfg.Plugin.Dir = *pluginDir
}

// Log
if actualFlags[nmLogLevel] {
Expand Down