From bac9091df90fbf913378505ef9ea48ff03037b54 Mon Sep 17 00:00:00 2001 From: Jackson Date: Tue, 12 Nov 2024 19:59:57 +0800 Subject: [PATCH 01/16] add config taskLabelSeelctor --- pkg/config/configuration.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/config/configuration.go b/pkg/config/configuration.go index 5b2693ed2b6e2..def2c8cd12773 100644 --- a/pkg/config/configuration.go +++ b/pkg/config/configuration.go @@ -163,6 +163,9 @@ var ( defaultLoggerLabelKey = "role" defaultLoggerLabelVal = "logging_cn" defaultLoggerMap = map[string]string{defaultLoggerLabelKey: defaultLoggerLabelVal} + labelNoOpKey = "op" + labelNoOpValue = "NoOp" + defaultTaskLoggerMap = map[string]string{labelNoOpKey: labelNoOpValue} defaultMaxLogMessageSize = 16 << 10 @@ -560,6 +563,9 @@ type ObservabilityParameters struct { // LabelSelector LabelSelector map[string]string `toml:"label-selector"` + // TaskLabelSelector + TaskLabelSelector map[string]string `toml:"task-label-selector"` + // estimate tcp network packet cost TCPPacket bool `toml:"tcp-packet"` @@ -630,6 +636,7 @@ func NewObservabilityParameters() *ObservabilityParameters { SelectAggThreshold: toml.Duration{}, EnableStmtMerge: false, LabelSelector: map[string]string{}, /*default: role=logging_cn*/ + TaskLabelSelector: defaultTaskLoggerMap, TCPPacket: true, MaxLogMessageSize: toml.ByteSize(defaultMaxLogMessageSize), CU: *NewOBCUConfig(), From 54953a857be559f50efcb2992103a5f0f7d05b9d Mon Sep 17 00:00:00 2001 From: Jackson Date: Tue, 12 Nov 2024 20:34:58 +0800 Subject: [PATCH 02/16] fix config adapting --- pkg/config/configuration.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/config/configuration.go b/pkg/config/configuration.go index def2c8cd12773..3a6e868d1e8db 100644 --- a/pkg/config/configuration.go +++ b/pkg/config/configuration.go @@ -753,9 +753,9 @@ func (op *ObservabilityParameters) resetConfigByOld() { } } resetMapConfig := func(target map[string]string, defaultVal map[string]string, setVal map[string]string) { - eq := true + eq := len(target) == len(defaultVal) // check eq - if len(target) == len(defaultVal) { + if eq { for k, v := range defaultVal { if target[k] != v { eq = false From f24ebcd079ee30c0cace4482daadaeb5a10674cc Mon Sep 17 00:00:00 2001 From: Jackson Date: Tue, 12 Nov 2024 20:37:59 +0800 Subject: [PATCH 03/16] add switch --- pkg/config/configuration.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/config/configuration.go b/pkg/config/configuration.go index 3a6e868d1e8db..11d6e87ca91fe 100644 --- a/pkg/config/configuration.go +++ b/pkg/config/configuration.go @@ -563,8 +563,9 @@ type ObservabilityParameters struct { // LabelSelector LabelSelector map[string]string `toml:"label-selector"` - // TaskLabelSelector - TaskLabelSelector map[string]string `toml:"task-label-selector"` + // TaskLabel + TaskLabel map[string]string `toml:"task-label"` + ResetTaskLabel bool `toml:"reset-task-label"` // estimate tcp network packet cost TCPPacket bool `toml:"tcp-packet"` @@ -636,7 +637,8 @@ func NewObservabilityParameters() *ObservabilityParameters { SelectAggThreshold: toml.Duration{}, EnableStmtMerge: false, LabelSelector: map[string]string{}, /*default: role=logging_cn*/ - TaskLabelSelector: defaultTaskLoggerMap, + TaskLabel: map[string]string{}, + ResetTaskLabel: false, TCPPacket: true, MaxLogMessageSize: toml.ByteSize(defaultMaxLogMessageSize), CU: *NewOBCUConfig(), From ee266e1059ce9201edb24c6420b3be4fa4e13206 Mon Sep 17 00:00:00 2001 From: Jackson Date: Tue, 12 Nov 2024 20:50:35 +0800 Subject: [PATCH 04/16] reset defaultLoggerMap as nil --- pkg/config/configuration.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/config/configuration.go b/pkg/config/configuration.go index 11d6e87ca91fe..6df10ad959bbe 100644 --- a/pkg/config/configuration.go +++ b/pkg/config/configuration.go @@ -162,10 +162,7 @@ var ( // defaultLoggerLabelKey and defaultLoggerLabelVal defaultLoggerLabelKey = "role" defaultLoggerLabelVal = "logging_cn" - defaultLoggerMap = map[string]string{defaultLoggerLabelKey: defaultLoggerLabelVal} - labelNoOpKey = "op" - labelNoOpValue = "NoOp" - defaultTaskLoggerMap = map[string]string{labelNoOpKey: labelNoOpValue} + defaultLoggerMap = map[string]string{} defaultMaxLogMessageSize = 16 << 10 From ac8ad652ac593b7f19d4083f9d965ebcaea96575 Mon Sep 17 00:00:00 2001 From: Jackson Date: Thu, 14 Nov 2024 21:18:35 +0800 Subject: [PATCH 05/16] base version --- pkg/util/export/etl/type.go | 18 ++++ pkg/util/export/merge.go | 2 +- pkg/util/metric/mometric/cron_task.go | 141 ++++++++++++++++++++++++++ 3 files changed, 160 insertions(+), 1 deletion(-) create mode 100644 pkg/util/export/etl/type.go diff --git a/pkg/util/export/etl/type.go b/pkg/util/export/etl/type.go new file mode 100644 index 0000000000000..56f0b1b4a8ab0 --- /dev/null +++ b/pkg/util/export/etl/type.go @@ -0,0 +1,18 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etl + +// ETLMergeTask name for cron task in mo_task.sys_cron_task +const ETLMergeTask = "ETLMergeTask" diff --git a/pkg/util/export/merge.go b/pkg/util/export/merge.go index 404d94246f071..8ad6df0731e5c 100644 --- a/pkg/util/export/merge.go +++ b/pkg/util/export/merge.go @@ -761,7 +761,7 @@ const ParamSeparator = " " // MergeTaskMetadata handle args like: "{db_tbl_name} [date, default: today]" func MergeTaskMetadata(id task.TaskCode, args ...string) task.TaskMetadata { return task.TaskMetadata{ - ID: path.Join("ETLMergeTask", path.Join(args...)), + ID: path.Join(etl.ETLMergeTask, path.Join(args...)), Executor: id, Context: []byte(strings.Join(args, ParamSeparator)), Options: task.TaskOptions{Concurrency: 1}, diff --git a/pkg/util/metric/mometric/cron_task.go b/pkg/util/metric/mometric/cron_task.go index d863412c9458c..ab8b03213b170 100644 --- a/pkg/util/metric/mometric/cron_task.go +++ b/pkg/util/metric/mometric/cron_task.go @@ -16,6 +16,7 @@ package mometric import ( "context" + "encoding/json" "fmt" "math" "path" @@ -34,6 +35,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/pb/task" "github.com/matrixorigin/matrixone/pkg/taskservice" + "github.com/matrixorigin/matrixone/pkg/util/export/etl" "github.com/matrixorigin/matrixone/pkg/util/export/table" ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor" "github.com/matrixorigin/matrixone/pkg/util/metric" @@ -200,6 +202,8 @@ func CalculateStorageUsage( cleanStorageUsageMetric(logger, "CalculateStorageUsage") }() + err = checkAndResetTaskLabels(ctx, logger, service, sqlExecutor) + // init metric value v2.GetTraceCheckStorageUsageAllCounter().Add(0) v2.GetTraceCheckStorageUsageNewCounter().Add(0) @@ -451,3 +455,140 @@ func checkNewAccountSize(ctx context.Context, logger *log.MOLogger, sqlExecutor logger.Debug("wait next round, check new account") } } + +const ( + MOMetricResetTaskLabels = "mo_metric_reset_task_labels" + MOMetricTaskLabels = "mo_metric_task_labels" +) + +const ( + idxCronTaskId = iota + idxTaskMetadataId + idxTaskMetadataOption +) + +func getQueryCronTaskRecord() string { + // like: + // mysql> select cron_task_id, task_metadata_id, task_metadata_option from mo_task.sys_cron_task where task_metadata_id in ('ETLMergeTask', 'StorageUsage'); + // +--------------+------------------+----------------------+ + // | cron_task_id | task_metadata_id | task_metadata_option | + // +--------------+------------------+----------------------+ + // | 2 | StorageUsage | {"Concurrency":1} | + // | 1 | ETLMergeTask | {"Concurrency":1} | + // +--------------+------------------+----------------------+ + return fmt.Sprintf(`select cron_task_id, task_metadata_id, task_metadata_option from mo_task.sys_cron_task where task_metadata_id in ('%s', '%s')`, + etl.ETLMergeTask, + StorageUsageCronTask, + ) +} + +func checkAndResetTaskLabels( + ctx context.Context, + logger *log.MOLogger, + service string, + sqlExecutor func() ie.InternalExecutor, +) (err error) { + + var reset = false + var labels = map[string]string{} + + if s, exist := runtime.ServiceRuntime(service).GetGlobalVariables(MOMetricResetTaskLabels); exist { + reset = s.(bool) + } + if s, exist := runtime.ServiceRuntime(service).GetGlobalVariables(MOMetricTaskLabels); exist { + labels = s.(map[string]string) + } + + if !reset { + logger.Info("skip reset task labels") + return nil + } + + opts := ie.NewOptsBuilder().Database(MetricDBConst).Internal(true).Finish() + executor := sqlExecutor() + sql := getQueryCronTaskRecord() + result := executor.Query(ctx, sql, opts) + + tasks := make([]task.CronTask, 0, 2) + dstTask := make([]task.CronTask, 0, 2) + cnt := result.RowCount() + if cnt == 0 { + logger.Warn("got empty sys_cron_task", zap.String("sql", sql)) + return moerr.NewInternalErrorf(ctx, "ResetTaskLabels: got empty sys_cron_task") + } + logger.Debug("fetch sys_cron_task", zap.Uint64("cnt", cnt)) + + defer func() { + if err != nil { + logger.Error("fetch sys_cron_task failed", zap.Error(err)) + } + }() + + // fetch task + for rowIdx := uint64(0); rowIdx < cnt; rowIdx++ { + var t task.CronTask + var options string + t.ID, err = result.GetUint64(ctx, rowIdx, idxCronTaskId) + if err != nil { + return err + } + + t.Metadata.ID, err = result.GetString(ctx, rowIdx, idxTaskMetadataId) + if err != nil { + return err + } + + options, err = result.GetString(ctx, rowIdx, idxTaskMetadataOption) + if err != nil { + return err + } + if err = json.Unmarshal([]byte(options), &t.Metadata.Options); err != nil { + return err + } + + tasks = append(tasks, t) + } + + // check task labels +checkL: + for _, t := range tasks { + if len(t.Metadata.Options.Labels) != len(labels) { + dstTask = append(dstTask, t) + continue + } + for k, v := range labels { + if t.Metadata.Options.Labels[k] != v { + dstTask = append(dstTask, t) + continue checkL + } + } + } + + // update task + for _, t := range dstTask { + var options []byte + logger.Info("diff task labels", + zap.String("task", t.Metadata.ID), + zap.Any("src", t.Metadata.Options.Labels), + zap.Any("dest", labels), + ) + t.Metadata.Options.Labels = labels + + options, err = json.Marshal(t.Metadata.Options) + if err != nil { + return err + } + + sql = fmt.Sprintf( + "update mo_task.sys_cron_task set task_metadata_context='%s' where cron_task_id=%d", + options, + t.ID, + ) + err = executor.Exec(ctx, sql, opts) + if err != nil { + return err + } + } + + return nil +} From cb57cdff04131524b74561bd629eaa349d100e23 Mon Sep 17 00:00:00 2001 From: Jackson Date: Thu, 14 Nov 2024 21:21:20 +0800 Subject: [PATCH 06/16] const --- cmd/mo-service/main.go | 2 ++ pkg/util/metric/mometric/cron_task.go | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cmd/mo-service/main.go b/cmd/mo-service/main.go index 2c0a8b94bfe39..f4758c6ff035d 100644 --- a/cmd/mo-service/main.go +++ b/cmd/mo-service/main.go @@ -441,6 +441,8 @@ func initTraceMetric(ctx context.Context, st metadata.ServiceType, cfg *Config, selector := clusterservice.NewSelector().SelectByLabel(SV.LabelSelector, clusterservice.Contain) mustGetRuntime(cfg).SetGlobalVariables(runtime.BackgroundCNSelector, selector) + mustGetRuntime(cfg).SetGlobalVariables(mometric.MOMetricResetTaskLabel, SV.ResetTaskLabel) + mustGetRuntime(cfg).SetGlobalVariables(mometric.MOMetricTaskLabel, SV.TaskLabel) if !SV.DisableTrace || !SV.DisableMetric { writerFactory = export.GetWriterFactory(fs, UUID, nodeRole, !SV.DisableSqlWriter) diff --git a/pkg/util/metric/mometric/cron_task.go b/pkg/util/metric/mometric/cron_task.go index ab8b03213b170..cd09277b0cad8 100644 --- a/pkg/util/metric/mometric/cron_task.go +++ b/pkg/util/metric/mometric/cron_task.go @@ -457,8 +457,8 @@ func checkNewAccountSize(ctx context.Context, logger *log.MOLogger, sqlExecutor } const ( - MOMetricResetTaskLabels = "mo_metric_reset_task_labels" - MOMetricTaskLabels = "mo_metric_task_labels" + MOMetricResetTaskLabel = "mo_metric_reset_task_labels" + MOMetricTaskLabel = "mo_metric_task_labels" ) const ( @@ -492,10 +492,10 @@ func checkAndResetTaskLabels( var reset = false var labels = map[string]string{} - if s, exist := runtime.ServiceRuntime(service).GetGlobalVariables(MOMetricResetTaskLabels); exist { + if s, exist := runtime.ServiceRuntime(service).GetGlobalVariables(MOMetricResetTaskLabel); exist { reset = s.(bool) } - if s, exist := runtime.ServiceRuntime(service).GetGlobalVariables(MOMetricTaskLabels); exist { + if s, exist := runtime.ServiceRuntime(service).GetGlobalVariables(MOMetricTaskLabel); exist { labels = s.(map[string]string) } From 57ab5f2983f278f62bac44719573428628bd9793 Mon Sep 17 00:00:00 2001 From: Jackson Date: Thu, 14 Nov 2024 21:22:35 +0800 Subject: [PATCH 07/16] note --- pkg/util/metric/mometric/cron_task.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/util/metric/mometric/cron_task.go b/pkg/util/metric/mometric/cron_task.go index cd09277b0cad8..692596f744e5c 100644 --- a/pkg/util/metric/mometric/cron_task.go +++ b/pkg/util/metric/mometric/cron_task.go @@ -507,6 +507,7 @@ func checkAndResetTaskLabels( opts := ie.NewOptsBuilder().Database(MetricDBConst).Internal(true).Finish() executor := sqlExecutor() sql := getQueryCronTaskRecord() + logger.Info("query", zap.String("sql", sql)) result := executor.Query(ctx, sql, opts) tasks := make([]task.CronTask, 0, 2) @@ -584,6 +585,7 @@ checkL: options, t.ID, ) + logger.Info("query", zap.String("sql", sql)) err = executor.Exec(ctx, sql, opts) if err != nil { return err From 2fefeb1ef7db175afb7a68fde3df2fc11310ac3e Mon Sep 17 00:00:00 2001 From: Jackson Date: Thu, 14 Nov 2024 21:31:34 +0800 Subject: [PATCH 08/16] fix quit logic --- etc/launch/cn.toml | 13 ++++++++++ pkg/util/metric/mometric/cron_task.go | 37 +++++++++++++++------------ 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/etc/launch/cn.toml b/etc/launch/cn.toml index 8c57e6a64281f..877ac690a6934 100644 --- a/etc/launch/cn.toml +++ b/etc/launch/cn.toml @@ -12,3 +12,16 @@ port-base = 18000 check-fraction = 65536 enable-metrics = true +[observability.label-selector] +key1 = "value1" +key2 = "value2" + +[observability.task-label] +key1 = "value1" +key2 = "value2" + +[observability] +reset-task-label = true + +[cn.frontend] +proxy-enabled = true diff --git a/pkg/util/metric/mometric/cron_task.go b/pkg/util/metric/mometric/cron_task.go index 692596f744e5c..fa5ee12a8997a 100644 --- a/pkg/util/metric/mometric/cron_task.go +++ b/pkg/util/metric/mometric/cron_task.go @@ -184,6 +184,7 @@ func CalculateStorageUsage( service string, sqlExecutor func() ie.InternalExecutor, ) (err error) { + var restart bool var account string var sizeMB, snapshotSizeMB, objectCount float64 ctx, span := trace.Start(ctx, "MetricStorageUsage") @@ -202,7 +203,11 @@ func CalculateStorageUsage( cleanStorageUsageMetric(logger, "CalculateStorageUsage") }() - err = checkAndResetTaskLabels(ctx, logger, service, sqlExecutor) + restart, err = checkAndResetTaskLabels(ctx, logger, service, sqlExecutor) + if err != nil || restart { + logger.Info("checkAndResetTaskLabels", zap.Error(err), zap.Bool("need-restart", restart)) + return err + } // init metric value v2.GetTraceCheckStorageUsageAllCounter().Add(0) @@ -482,12 +487,7 @@ func getQueryCronTaskRecord() string { ) } -func checkAndResetTaskLabels( - ctx context.Context, - logger *log.MOLogger, - service string, - sqlExecutor func() ie.InternalExecutor, -) (err error) { +func checkAndResetTaskLabels(ctx context.Context, logger *log.MOLogger, service string, sqlExecutor func() ie.InternalExecutor) (restart bool, err error) { var reset = false var labels = map[string]string{} @@ -501,7 +501,7 @@ func checkAndResetTaskLabels( if !reset { logger.Info("skip reset task labels") - return nil + return false, nil } opts := ie.NewOptsBuilder().Database(MetricDBConst).Internal(true).Finish() @@ -515,7 +515,7 @@ func checkAndResetTaskLabels( cnt := result.RowCount() if cnt == 0 { logger.Warn("got empty sys_cron_task", zap.String("sql", sql)) - return moerr.NewInternalErrorf(ctx, "ResetTaskLabels: got empty sys_cron_task") + return false, moerr.NewInternalErrorf(ctx, "ResetTaskLabels: got empty sys_cron_task") } logger.Debug("fetch sys_cron_task", zap.Uint64("cnt", cnt)) @@ -531,20 +531,20 @@ func checkAndResetTaskLabels( var options string t.ID, err = result.GetUint64(ctx, rowIdx, idxCronTaskId) if err != nil { - return err + return false, err } t.Metadata.ID, err = result.GetString(ctx, rowIdx, idxTaskMetadataId) if err != nil { - return err + return false, err } options, err = result.GetString(ctx, rowIdx, idxTaskMetadataOption) if err != nil { - return err + return false, err } if err = json.Unmarshal([]byte(options), &t.Metadata.Options); err != nil { - return err + return false, err } tasks = append(tasks, t) @@ -565,6 +565,11 @@ checkL: } } + if len(dstTask) == 0 { + logger.Info("all task label ok", zap.Any("labels", labels)) + return false, nil + } + // update task for _, t := range dstTask { var options []byte @@ -577,7 +582,7 @@ checkL: options, err = json.Marshal(t.Metadata.Options) if err != nil { - return err + return false, err } sql = fmt.Sprintf( @@ -588,9 +593,9 @@ checkL: logger.Info("query", zap.String("sql", sql)) err = executor.Exec(ctx, sql, opts) if err != nil { - return err + return false, err } } - return nil + return true, nil } From 7cce4b5472f11027c586917890a843f4d47f4520 Mon Sep 17 00:00:00 2001 From: xzxiong Date: Thu, 14 Nov 2024 23:28:00 +0800 Subject: [PATCH 09/16] fix --- pkg/util/metric/mometric/cron_task.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/util/metric/mometric/cron_task.go b/pkg/util/metric/mometric/cron_task.go index fa5ee12a8997a..8252d9ff7bf0d 100644 --- a/pkg/util/metric/mometric/cron_task.go +++ b/pkg/util/metric/mometric/cron_task.go @@ -586,11 +586,11 @@ checkL: } sql = fmt.Sprintf( - "update mo_task.sys_cron_task set task_metadata_context='%s' where cron_task_id=%d", + "update mo_task.sys_cron_task set task_metadata_option='%s' where cron_task_id=%d", options, t.ID, ) - logger.Info("query", zap.String("sql", sql)) + logger.Info("query", zap.String("sql", sql), zap.ByteString("options", options)) err = executor.Exec(ctx, sql, opts) if err != nil { return false, err From d87b8d78d1905c48c733a2ad9555346155322c2a Mon Sep 17 00:00:00 2001 From: xzxiong Date: Thu, 14 Nov 2024 23:36:23 +0800 Subject: [PATCH 10/16] per minute --- etc/launch/cn.toml | 3 +-- pkg/predefine/predefine.go | 5 +++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/etc/launch/cn.toml b/etc/launch/cn.toml index 877ac690a6934..0cd596c0d44b9 100644 --- a/etc/launch/cn.toml +++ b/etc/launch/cn.toml @@ -17,8 +17,7 @@ key1 = "value1" key2 = "value2" [observability.task-label] -key1 = "value1" -key2 = "value2" +key3 = "value3" [observability] reset-task-label = true diff --git a/pkg/predefine/predefine.go b/pkg/predefine/predefine.go index 236fe45ad1f4b..6aafcf13ec6ed 100644 --- a/pkg/predefine/predefine.go +++ b/pkg/predefine/predefine.go @@ -19,11 +19,12 @@ import ( "fmt" "time" + "github.com/robfig/cron/v3" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/pb/task" "github.com/matrixorigin/matrixone/pkg/util/export" "github.com/matrixorigin/matrixone/pkg/util/metric/mometric" - "github.com/robfig/cron/v3" ) // genInitCronTaskSQL Generate `insert` statement for creating system cron tasks, which works on the `mo_task`.`sys_cron_task` table. @@ -63,7 +64,7 @@ func GenInitCronTaskSQL() (string, error) { } cronTasks = append(cronTasks, task1) - task2, err := createCronTask(mometric.TaskMetadata(mometric.StorageUsageCronTask, task.TaskCode_MetricStorageUsage), export.MergeTaskCronExprEvery05Min) + task2, err := createCronTask(mometric.TaskMetadata(mometric.StorageUsageCronTask, task.TaskCode_MetricStorageUsage), mometric.StorageUsageTaskCronExpr) if err != nil { return "", err } From f4cabba59435f828b5ab751a614cd9a49d6ae202 Mon Sep 17 00:00:00 2001 From: xzxiong Date: Fri, 15 Nov 2024 00:30:22 +0800 Subject: [PATCH 11/16] bugfix --- etc/launch/cn.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/etc/launch/cn.toml b/etc/launch/cn.toml index 0cd596c0d44b9..e472319cc70e6 100644 --- a/etc/launch/cn.toml +++ b/etc/launch/cn.toml @@ -17,7 +17,6 @@ key1 = "value1" key2 = "value2" [observability.task-label] -key3 = "value3" [observability] reset-task-label = true From 59a7180b49895884e962424d2735931a76b86287 Mon Sep 17 00:00:00 2001 From: Jackson Date: Fri, 15 Nov 2024 11:22:49 +0800 Subject: [PATCH 12/16] fix append op. --- pkg/frontend/internal_executor.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/frontend/internal_executor.go b/pkg/frontend/internal_executor.go index 3bc733bdb100b..3fe050f0e9222 100644 --- a/pkg/frontend/internal_executor.go +++ b/pkg/frontend/internal_executor.go @@ -299,7 +299,8 @@ func (ip *internalProtocol) Write(execCtx *ExecCtx, crs *perfcounter.CounterSet, if err != nil { return err } - return ip.sendRows(mrs, uint64(bat.RowCount())) + cnt := uint64(bat.RowCount()) + return ip.sendRows(mrs, cnt, mrs.GetRowCount()-cnt) } func (ip *internalProtocol) WriteHandshake() error { @@ -364,7 +365,7 @@ func (ip *internalProtocol) WriteResponse(ctx context.Context, resp *Response) e ip.ResetStatistics() if resp.category == ResultResponse { if mer := resp.data.(*MysqlExecutionResult); mer != nil && mer.Mrs() != nil { - ip.sendRows(mer.Mrs(), mer.mrs.GetRowCount()) + ip.sendRows(mer.Mrs(), mer.mrs.GetRowCount(), 0) } } else { // OkResponse. this is NOT ErrorResponse because error will be returned by doComQuery @@ -468,7 +469,11 @@ func (ip *internalProtocol) SetUserName(username string) { func (ip *internalProtocol) Close() {} -func (ip *internalProtocol) sendRows(mrs *MysqlResultSet, cnt uint64) error { +// sendRows +// case 1: used in WriteResponse and WriteResultSetRow, which are 'copy' op +// case 2: used in Write, which is 'append' op. +// - add @startIdx to adapt append op. +func (ip *internalProtocol) sendRows(mrs *MysqlResultSet, cnt uint64, startIdx uint64) error { if ip.stashResult { res := ip.result.resultSet if res == nil { @@ -487,7 +492,7 @@ func (ip *internalProtocol) sendRows(mrs *MysqlResultSet, cnt uint64) error { } } colCnt := res.GetColumnCount() - for i := uint64(0); i < cnt; i++ { + for i := startIdx; i < cnt; i++ { row := make([]any, colCnt) copy(row, mrs.Data[i]) res.Data = append(res.Data, row) @@ -510,7 +515,7 @@ func (ip *internalProtocol) swapOutResult() *internalExecResult { func (ip *internalProtocol) WriteResultSetRow(mrs *MysqlResultSet, cnt uint64) error { ip.Lock() defer ip.Unlock() - return ip.sendRows(mrs, cnt) + return ip.sendRows(mrs, cnt, 0) } func (ip *internalProtocol) WriteColumnDefBytes(payload []byte) error { return nil From 9b1063fb537a3efc89aaaccc637dfbf5932aa283 Mon Sep 17 00:00:00 2001 From: Jackson Date: Fri, 15 Nov 2024 11:50:22 +0800 Subject: [PATCH 13/16] revert launch config example --- etc/launch/cn.toml | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/etc/launch/cn.toml b/etc/launch/cn.toml index e472319cc70e6..8c57e6a64281f 100644 --- a/etc/launch/cn.toml +++ b/etc/launch/cn.toml @@ -12,14 +12,3 @@ port-base = 18000 check-fraction = 65536 enable-metrics = true -[observability.label-selector] -key1 = "value1" -key2 = "value2" - -[observability.task-label] - -[observability] -reset-task-label = true - -[cn.frontend] -proxy-enabled = true From 8eb5737fe5a560d44d66d0f78fecbd21f5bbdba8 Mon Sep 17 00:00:00 2001 From: Jackson Date: Fri, 15 Nov 2024 14:35:12 +0800 Subject: [PATCH 14/16] improve code --- pkg/frontend/internal_executor.go | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/pkg/frontend/internal_executor.go b/pkg/frontend/internal_executor.go index 3fe050f0e9222..6f7ef8c89140c 100644 --- a/pkg/frontend/internal_executor.go +++ b/pkg/frontend/internal_executor.go @@ -294,13 +294,24 @@ func (ip *internalProtocol) GetBool(PropertyID) bool { } func (ip *internalProtocol) Write(execCtx *ExecCtx, crs *perfcounter.CounterSet, bat *batch.Batch) error { + // init ip.result.resultSet mrs := execCtx.ses.GetMysqlResultSet() - err := fillResultSet(execCtx.reqCtx, bat, execCtx.ses, mrs) + res := ip.result.resultSet + if res == nil { + res = &MysqlResultSet{} + ip.result.resultSet = res + for _, col := range mrs.Columns { + res.AddColumn(col) + } + } + + // copy into result set + err := fillResultSet(execCtx.reqCtx, bat, execCtx.ses, res) if err != nil { return err } - cnt := uint64(bat.RowCount()) - return ip.sendRows(mrs, cnt, mrs.GetRowCount()-cnt) + ip.result.affectedRows += uint64(bat.RowCount()) + return nil } func (ip *internalProtocol) WriteHandshake() error { @@ -365,7 +376,7 @@ func (ip *internalProtocol) WriteResponse(ctx context.Context, resp *Response) e ip.ResetStatistics() if resp.category == ResultResponse { if mer := resp.data.(*MysqlExecutionResult); mer != nil && mer.Mrs() != nil { - ip.sendRows(mer.Mrs(), mer.mrs.GetRowCount(), 0) + ip.sendRows(mer.Mrs(), mer.mrs.GetRowCount()) } } else { // OkResponse. this is NOT ErrorResponse because error will be returned by doComQuery @@ -471,9 +482,8 @@ func (ip *internalProtocol) Close() {} // sendRows // case 1: used in WriteResponse and WriteResultSetRow, which are 'copy' op -// case 2: used in Write, which is 'append' op. -// - add @startIdx to adapt append op. -func (ip *internalProtocol) sendRows(mrs *MysqlResultSet, cnt uint64, startIdx uint64) error { +// case 2: used in Write, which is 'append' op. (deprecated) +func (ip *internalProtocol) sendRows(mrs *MysqlResultSet, cnt uint64) error { if ip.stashResult { res := ip.result.resultSet if res == nil { @@ -492,7 +502,7 @@ func (ip *internalProtocol) sendRows(mrs *MysqlResultSet, cnt uint64, startIdx u } } colCnt := res.GetColumnCount() - for i := startIdx; i < cnt; i++ { + for i := uint64(0); i < cnt; i++ { row := make([]any, colCnt) copy(row, mrs.Data[i]) res.Data = append(res.Data, row) @@ -515,7 +525,7 @@ func (ip *internalProtocol) swapOutResult() *internalExecResult { func (ip *internalProtocol) WriteResultSetRow(mrs *MysqlResultSet, cnt uint64) error { ip.Lock() defer ip.Unlock() - return ip.sendRows(mrs, cnt, 0) + return ip.sendRows(mrs, cnt) } func (ip *internalProtocol) WriteColumnDefBytes(payload []byte) error { return nil From ecabb74ce42b2d78e22b20b34e0910d6167ea479 Mon Sep 17 00:00:00 2001 From: Jackson Date: Fri, 15 Nov 2024 16:07:40 +0800 Subject: [PATCH 15/16] fix sca --- pkg/config/configuration.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/configuration.go b/pkg/config/configuration.go index 6df10ad959bbe..127cf3ef14552 100644 --- a/pkg/config/configuration.go +++ b/pkg/config/configuration.go @@ -162,7 +162,7 @@ var ( // defaultLoggerLabelKey and defaultLoggerLabelVal defaultLoggerLabelKey = "role" defaultLoggerLabelVal = "logging_cn" - defaultLoggerMap = map[string]string{} + defaultLoggerMap = map[string]string{defaultLoggerLabelKey: defaultLoggerLabelVal} defaultMaxLogMessageSize = 16 << 10 From 5fc1cf46a81260dbd079984a9575c4f7bf0ce380 Mon Sep 17 00:00:00 2001 From: xzxiong Date: Sat, 23 Nov 2024 12:50:42 +0800 Subject: [PATCH 16/16] imp code cover --- pkg/frontend/internal_executor_test.go | 61 ++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/pkg/frontend/internal_executor_test.go b/pkg/frontend/internal_executor_test.go index a014767cbf1db..2c051da3a6eb0 100644 --- a/pkg/frontend/internal_executor_test.go +++ b/pkg/frontend/internal_executor_test.go @@ -24,7 +24,10 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/runtime" "github.com/matrixorigin/matrixone/pkg/config" + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/defines" + "github.com/matrixorigin/matrixone/pkg/testutil" ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor" ) @@ -134,3 +137,61 @@ func TestIeResult(t *testing.T) { require.NoError(t, e) require.Equal(t, 42, v.(int)) } + +func Test_internalProtocol_Write(t *testing.T) { + setPu("", config.NewParameterUnit(&config.FrontendParameters{}, nil, nil, nil)) + // Mock autoIncrCaches + setAicm("", &defines.AutoIncrCacheManager{}) + + executorVar := NewInternalExecutor("") + ip := executorVar.proto + assert.True(t, ip.IsEstablished()) + ip.stashResult = true + ip.SetEstablished() + ip.Close() + ip.ResetStatistics() + _ = ip.ConnectionID() + ctx := context.TODO() + assert.Nil(t, ip.WriteColumnDef(ctx, nil, 1)) + assert.Nil(t, ip.WriteLengthEncodedNumber(1)) + assert.Nil(t, ip.WriteEOFIF(0, 1)) + assert.Nil(t, ip.WriteOK(1, 1, 0, 0, "")) + assert.Nil(t, ip.WriteEOFOrOK(0, 1)) + + ses := executorVar.newCmdSession(ctx, ie.NewOptsBuilder().Finish()) + col1 := &MysqlColumn{} + col1.SetName("col1") + col1.SetColumnType(defines.MYSQL_TYPE_LONG) + ses.mrs = &MysqlResultSet{} + ses.mrs.AddColumn(col1) + + execCtx := &ExecCtx{ + reqCtx: ctx, + ses: ses, + } + + mockBatch := func(vals []int64) *batch.Batch { + bat := batch.New([]string{"col1"}) + vecs := make([]*vector.Vector, 1) + vecs[0] = testutil.MakeInt64Vector(vals, nil) + bat.Vecs = vecs + bat.SetRowCount(len(vals)) + return bat + } + batch1 := mockBatch([]int64{100}) + batch2 := mockBatch([]int64{200, 201}) + + // ======================= main =================== + ip.Reset(ses) + err := ip.Write(execCtx, nil, batch1) + require.NoError(t, err) + require.Equal(t, 1, int(ip.result.affectedRows)) + require.Equal(t, 1, len(ip.result.resultSet.Data)) + require.Equal(t, [][]any{{int64(100)} /*colum1, rows: 1*/}, ip.result.resultSet.Data) + + err = ip.Write(execCtx, nil, batch2) + require.NoError(t, err) + require.Equal(t, 3, int(ip.result.affectedRows)) + require.Equal(t, 3, len(ip.result.resultSet.Data)) + require.Equal(t, [][]any{{int64(100)}, {int64(200)}, {int64(201)} /*column1, rows: 3*/}, ip.result.resultSet.Data) +}