Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add table functions mo_configurations #11792

Merged
merged 32 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3eb20d7
add mo_configurations
daviszhen Sep 15, 2023
9d96bf1
update
daviszhen Sep 15, 2023
0abae23
fix sca
daviszhen Sep 15, 2023
3d2eb0d
fix ut
daviszhen Sep 15, 2023
eb2dd28
fix sca
daviszhen Sep 15, 2023
b56c6e0
Merge branch 'main' into 0914-mo-confis
daviszhen Sep 15, 2023
80333c0
Merge branch 'main' into 0914-mo-confis
mergify[bot] Sep 15, 2023
26814e8
Merge branch '0914-mo-confis' of https://github.com/daviszhen/matrixo…
daviszhen Sep 15, 2023
8ac4211
add config counting
daviszhen Sep 15, 2023
853dbb9
add config data count
daviszhen Sep 15, 2023
0266b16
update log update
daviszhen Sep 15, 2023
8075e1f
add mo_configurations view and upgrader
daviszhen Sep 16, 2023
dac2953
Merge branch 'main' into 0914-mo-confis
daviszhen Sep 16, 2023
5a3e4d8
Merge branch 'main' into 0914-mo-confis
daviszhen Sep 17, 2023
8b980b4
Merge branch 'main' into 0914-mo-confis
daviszhen Sep 18, 2023
0c0e859
fix bvt cases
daviszhen Sep 18, 2023
9fa09af
fix bvt case
daviszhen Sep 18, 2023
34cc001
Merge branch 'main' into 0914-mo-confis
daviszhen Sep 18, 2023
780fd76
fix compile error
daviszhen Sep 18, 2023
26df8f9
Merge branch 'main' into 0914-mo-confis
daviszhen Sep 18, 2023
9158edc
Merge branch 'main' into 0914-mo-confis
daviszhen Sep 18, 2023
fb49560
fix default values
daviszhen Sep 19, 2023
330802a
Merge branch 'main' into 0914-mo-confis
daviszhen Sep 19, 2023
76924e3
Merge branch 'main' into 0914-mo-confis
daviszhen Sep 19, 2023
b72f202
Merge branch 'main' into 0914-mo-confis
daviszhen Sep 19, 2023
08fdcc0
Merge branch 'main' into 0914-mo-confis
daviszhen Sep 19, 2023
bcbb09a
Merge branch 'main' into 0914-mo-confis
daviszhen Sep 19, 2023
e263348
Merge branch 'main' into 0914-mo-confis
daviszhen Sep 20, 2023
6af3af7
fix sca
daviszhen Sep 20, 2023
e53ad4f
Merge branch 'main' into 0914-mo-confis
daviszhen Sep 20, 2023
e108df9
fix bvt cases
daviszhen Sep 20, 2023
cb5e533
Merge branch 'main' into 0914-mo-confis
daviszhen Sep 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pkg/cnservice/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ package cnservice

import (
"context"

"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/lockservice"
"github.com/matrixorigin/matrixone/pkg/logservice"
logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/matrixorigin/matrixone/pkg/queryservice"
"github.com/matrixorigin/matrixone/pkg/taskservice"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/util"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -54,10 +56,18 @@ func WithMessageHandle(f func(ctx context.Context,
fs fileservice.FileService,
lockService lockservice.LockService,
queryService queryservice.QueryService,
hakeeper logservice.CNHAKeeperClient,
cli client.TxnClient,
aicm *defines.AutoIncrCacheManager,
mAcquirer func() morpc.Message) error) Option {
return func(s *service) {
s.requestHandler = f
}
}

// WithConfigData saves the data from the config file
func WithConfigData(data map[string]*logservicepb.ConfigItem) Option {
return func(s *service) {
s.config = util.NewConfigData(data)
}
}
6 changes: 6 additions & 0 deletions pkg/cnservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ func NewService(
return nil, err
}

configKVMap, _ := dumpCnConfig(*cfg)
options = append(options, WithConfigData(configKVMap))

// get metadata fs
metadataFS, err := fileservice.Get[fileservice.ReplaceableFileService](fileService, defines.LocalFileServiceName)
if err != nil {
Expand Down Expand Up @@ -173,6 +176,7 @@ func NewService(
fService fileservice.FileService,
lockService lockservice.LockService,
queryService queryservice.QueryService,
hakeeper logservice.CNHAKeeperClient,
cli client.TxnClient,
aicm *defines.AutoIncrCacheManager,
messageAcquirer func() morpc.Message) error {
Expand Down Expand Up @@ -353,6 +357,7 @@ func (s *service) handleRequest(
s.fileService,
s.lockService,
s.queryService,
s._hakeeperClient,
s._txnClient,
s.aicm,
s.acquireMessage)
Expand Down Expand Up @@ -651,6 +656,7 @@ func (s *service) initInternalSQlExecutor(mp *mpool.MPool) {
s._txnClient,
s.fileService,
s.queryService,
s._hakeeperClient,
s.aicm)
runtime.ProcessLevelRuntime().SetGlobalVariables(runtime.InternalSQLExecutor, exec)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/cnservice/server_heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,17 @@ func (s *service) heartbeat(ctx context.Context) {
InitWorkState: s.cfg.InitWorkState,
GossipAddress: s.gossipServiceAddr(),
GossipJoined: s.gossipNode.Joined(),
ConfigData: s.config.GetData(),
}

cb, err := s._hakeeperClient.SendCNHeartbeat(ctx2, hb)
if err != nil {
s.logger.Error("failed to send cn heartbeat", zap.Error(err))
return
}

s.config.DecrCount()

s.handleCommands(cb.Commands)
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/cnservice/service_ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,8 @@ func TestService_RegisterServices(t *testing.T) {
assert.Equal(t, fmt.Sprintf("%s:%d", serviceHost, port2+3), s.queryServiceServiceAddr())
assert.Equal(t, fmt.Sprintf("%s:%d", listenHost, port2+3), s.queryServiceListenAddr())
}

func TestDefaultCnConfig(t *testing.T) {
cfg := Config{}
cfg.SetDefaultValue()
}
132 changes: 132 additions & 0 deletions pkg/cnservice/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package cnservice

import (
"context"
logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
moconnector "github.com/matrixorigin/matrixone/pkg/stream/connector"
"github.com/matrixorigin/matrixone/pkg/util"
"runtime"
"strings"
"sync"
Expand Down Expand Up @@ -389,6 +391,128 @@ func (c *Config) Validate() error {
return nil
}

// SetDefaultValue setups the default of the config.
// most of the code are copied from the Validate.
// But, the Validate may change some global variables that the SetDefaultValue does not need.
// So, need a different function.
func (c *Config) SetDefaultValue() {
foundMachineHost := ""
if c.ListenAddress == "" {
c.ListenAddress = defaultListenAddress
}
if c.ServiceAddress == "" {
c.ServiceAddress = c.ListenAddress
} else {
foundMachineHost = strings.Split(c.ServiceAddress, ":")[0]
}
if c.Role == "" {
c.Role = metadata.CNRole_TP.String()
}
if c.HAKeeper.DiscoveryTimeout.Duration == 0 {
c.HAKeeper.DiscoveryTimeout.Duration = time.Second * 30
}
if c.HAKeeper.HeatbeatInterval.Duration == 0 {
c.HAKeeper.HeatbeatInterval.Duration = time.Second
}
if c.HAKeeper.HeatbeatTimeout.Duration == 0 {
c.HAKeeper.HeatbeatTimeout.Duration = time.Second * 3
}
if c.TaskRunner.Parallelism == 0 {
c.TaskRunner.Parallelism = runtime.NumCPU() / 16
if c.TaskRunner.Parallelism <= ReservedTasks {
c.TaskRunner.Parallelism = 1 + ReservedTasks
}
}
if c.TaskRunner.FetchInterval.Duration == 0 {
c.TaskRunner.FetchInterval.Duration = time.Second * 10
}
if c.TaskRunner.FetchTimeout.Duration == 0 {
c.TaskRunner.FetchTimeout.Duration = time.Second * 10
}
if c.TaskRunner.HeartbeatInterval.Duration == 0 {
c.TaskRunner.HeartbeatInterval.Duration = time.Second * 5
}
if c.TaskRunner.MaxWaitTasks == 0 {
c.TaskRunner.MaxWaitTasks = 256
}
if c.TaskRunner.QueryLimit == 0 {
c.TaskRunner.QueryLimit = c.TaskRunner.Parallelism
}
if c.TaskRunner.RetryInterval.Duration == 0 {
c.TaskRunner.RetryInterval.Duration = time.Second
}
if c.Engine.Type == "" {
c.Engine.Type = EngineDistributedTAE
}
if c.Engine.Logstore == "" {
c.Engine.Logstore = options.LogstoreLogservice
}
if c.Cluster.RefreshInterval.Duration == 0 {
c.Cluster.RefreshInterval.Duration = time.Second * 10
}

if c.Txn.Mode == "" {
c.Txn.Mode = defaultTxnMode.String()
}

if c.Txn.Isolation == "" {
if txn.GetTxnMode(c.Txn.Mode) == txn.TxnMode_Pessimistic {
c.Txn.Isolation = txn.TxnIsolation_RC.String()
} else {
c.Txn.Isolation = txn.TxnIsolation_SI.String()
}
}
// Fix txn mode various config, simply override
if txn.GetTxnMode(c.Txn.Mode) == txn.TxnMode_Pessimistic {
if c.Txn.EnableSacrificingFreshness == 0 {
c.Txn.EnableSacrificingFreshness = 1
}
if c.Txn.EnableCNBasedConsistency == 0 {
c.Txn.EnableCNBasedConsistency = -1
}
// We don't support the following now, so always disable
c.Txn.EnableRefreshExpression = -1
if c.Txn.EnableLeakCheck == 0 {
c.Txn.EnableLeakCheck = -1
}
} else {
if c.Txn.EnableSacrificingFreshness == 0 {
c.Txn.EnableSacrificingFreshness = 1
}
if c.Txn.EnableCNBasedConsistency == 0 {
c.Txn.EnableCNBasedConsistency = 1
}
// We don't support the following now, so always disable
c.Txn.EnableRefreshExpression = -1
if c.Txn.EnableLeakCheck == 0 {
c.Txn.EnableLeakCheck = -1
}
}

if c.Txn.MaxActiveAges.Duration == 0 {
c.Txn.MaxActiveAges.Duration = time.Minute * 2
}
if c.Txn.MaxActive == 0 {
c.Txn.MaxActive = runtime.NumCPU() * 4
}
c.Ctl.Adjust(foundMachineHost, defaultCtlListenAddress)
c.LockService.ServiceID = "temp"
c.LockService.Validate()
c.LockService.ServiceID = c.UUID

c.QueryServiceConfig.Adjust(foundMachineHost, defaultQueryServiceListenAddress)

if c.PortBase != 0 {
if c.ServiceHost == "" {
c.ServiceHost = defaultServiceHost
}
}

if !metadata.ValidStateString(c.InitWorkState) {
c.InitWorkState = metadata.WorkState_Working.String()
}
}

func (s *service) getLockServiceConfig() lockservice.Config {
s.cfg.LockService.ServiceID = s.cfg.UUID
s.cfg.LockService.RPC = s.cfg.RPC
Expand All @@ -410,6 +534,7 @@ type service struct {
fService fileservice.FileService,
lockService lockservice.LockService,
queryService queryservice.QueryService,
hakeeper logservice.CNHAKeeperClient,
cli client.TxnClient,
aicm *defines.AutoIncrCacheManager,
messageAcquirer func() morpc.Message) error
Expand Down Expand Up @@ -448,4 +573,11 @@ type service struct {
connectorMgr moconnector.ConnectorManagerInterface
gossipNode *gossip.Node
cacheServer cacheservice.CacheService
config *util.ConfigData
}

func dumpCnConfig(cfg Config) (map[string]*logservicepb.ConfigItem, error) {
defCfg := Config{}
defCfg.SetDefaultValue()
return util.DumpConfig(cfg, defCfg)
}
20 changes: 19 additions & 1 deletion pkg/cnservice/upgrader/newAddTable.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,24 @@ var MoSessionsView = &table.Table{
CreateTableSql: "drop view `mo_catalog`.`mo_sessions`;",
}


var MoConfigurationsView = &table.Table{
Account: table.AccountAll,
Database: catalog.MO_CATALOG,
Table: "mo_configurations",
Columns: []table.Column{
table.StringColumn("node_type", "the type of the node. cn,tn,log,proxy."),
table.StringColumn("node_id", "the id of the node"),
table.StringColumn("name", "the name of the configuration item"),
table.UInt64Column("current_value", "the current value of the configuration item"),
table.StringColumn("default_value", "the default value of the configuration item"),
table.StringColumn("internal", "the configuration item is internal or external"),
},
CreateViewSql: "CREATE VIEW IF NOT EXISTS `mo_catalog`.`mo_configurations` AS SELECT * FROM mo_configurations() AS mo_configurations_tmp;",
//actually drop view here
CreateTableSql: "drop view `mo_catalog`.`mo_configurations`;",
}

var SqlStatementHotspotView = &table.Table{
Account: table.AccountAll,
Database: catalog.MO_SYSTEM,
Expand All @@ -191,5 +209,5 @@ var SqlStatementHotspotView = &table.Table{
CreateTableSql: "DROP VIEW IF EXISTS `system`.`sql_statement_hotspot`;",
}

var needUpgradNewView = []*table.Table{PARTITIONSView, STATISTICSView, MoSessionsView, SqlStatementHotspotView}
var needUpgradNewView = []*table.Table{PARTITIONSView, STATISTICSView, MoSessionsView, SqlStatementHotspotView, MoConfigurationsView}
var registeredViews = []*table.Table{processlistView}
4 changes: 4 additions & 0 deletions pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ var (
"mo_stages": 0,
catalog.MOAutoIncrTable: 0,
"mo_sessions": 0,
"mo_configurations": 0,
}
configInitVariables = map[string]int8{
"save_query_result": 0,
Expand Down Expand Up @@ -843,6 +844,7 @@ var (
"mo_pubs": 0,
"mo_stages": 0,
"mo_sessions": 0,
"mo_configurations": 0,
}
createDbInformationSchemaSql = "create database information_schema;"
createAutoTableSql = fmt.Sprintf(`create table if not exists %s (
Expand Down Expand Up @@ -1018,6 +1020,7 @@ var (
primary key(stage_id)
);`,
`CREATE VIEW IF NOT EXISTS mo_sessions AS SELECT * FROM mo_sessions() AS mo_sessions_tmp;`,
`CREATE VIEW IF NOT EXISTS mo_configurations AS SELECT * FROM mo_configurations() AS mo_configurations_tmp;`,
}

//drop tables for the tenant
Expand All @@ -1032,6 +1035,7 @@ var (
`drop table if exists mo_catalog.mo_mysql_compatibility_mode;`,
`drop table if exists mo_catalog.mo_stages;`,
`drop view if exists mo_catalog.mo_sessions;`,
`drop view if exists mo_catalog.mo_configurations;`,
}
dropMoPubsSql = `drop table if exists mo_catalog.mo_pubs;`
deleteMoPubsSql = `delete from mo_catalog.mo_pubs;`
Expand Down
2 changes: 2 additions & 0 deletions pkg/frontend/mysql_cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3484,6 +3484,7 @@ func (mce *MysqlCmdExecutor) doComQuery(requestCtx context.Context, input *UserI
pu.FileService,
pu.LockService,
pu.QueryService,
pu.HAKeeperClient,
ses.GetAutoIncrCacheManager())
proc.CopyVectorPool(ses.proc)
proc.CopyValueScanBatch(ses.proc)
Expand Down Expand Up @@ -3660,6 +3661,7 @@ func (mce *MysqlCmdExecutor) doComQueryInProgress(requestCtx context.Context, in
pu.FileService,
pu.LockService,
pu.QueryService,
pu.HAKeeperClient,
ses.GetAutoIncrCacheManager())
proc.CopyVectorPool(ses.proc)
proc.CopyValueScanBatch(ses.proc)
Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ func NewSession(proto Protocol, mp *mpool.MPool, pu *config.ParameterUnit,
pu.FileService,
pu.LockService,
pu.QueryService,
pu.HAKeeperClient,
ses.GetAutoIncrCacheManager())

runtime.SetFinalizer(ses, func(ss *Session) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/hakeeper/rsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ func (s *stateMachine) handleClusterDetailsQuery(cfg Config) *pb.ClusterDetails
WorkState: info.WorkState,
Labels: info.Labels,
QueryAddress: info.QueryAddress,
ConfigData: info.ConfigData,
}
cd.CNStores = append(cd.CNStores, n)
}
Expand All @@ -692,6 +693,7 @@ func (s *stateMachine) handleClusterDetailsQuery(cfg Config) *pb.ClusterDetails
LogtailServerAddress: info.LogtailServerAddress,
LockServiceAddress: info.LockServiceAddress,
CtlAddress: info.CtlAddress,
ConfigData: info.ConfigData,
}
cd.TNStores = append(cd.TNStores, n)
}
Expand All @@ -706,6 +708,7 @@ func (s *stateMachine) handleClusterDetailsQuery(cfg Config) *pb.ClusterDetails
State: state,
ServiceAddress: info.ServiceAddress,
Replicas: info.Replicas,
ConfigData: info.ConfigData,
}
cd.LogStores = append(cd.LogStores, n)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/logservice/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package logservice

import (
"fmt"
logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/matrixorigin/matrixone/pkg/util"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -600,3 +602,8 @@ func (c *Config) GossipServiceAddr() string {
}
return c.GossipAddress
}

func dumpLogConfig(cfg Config) (map[string]*logservicepb.ConfigItem, error) {
defCfg := Config{}
return util.DumpConfig(cfg, defCfg)
}
Loading