diff --git a/cmd/mo-service/main.go b/cmd/mo-service/main.go index 423ae8abf153a..d07ca7bb1c61f 100644 --- a/cmd/mo-service/main.go +++ b/cmd/mo-service/main.go @@ -442,6 +442,8 @@ func initTraceMetric(ctx context.Context, st metadata.ServiceType, cfg *Config, selector := clusterservice.NewSelector().SelectByLabel(SV.LabelSelector, clusterservice.Contain) mustGetRuntime(cfg).SetGlobalVariables(runtime.BackgroundCNSelector, selector) mustGetRuntime(cfg).SetGlobalVariables(motrace.MaxStatementSize, int(cfg.getCNServiceConfig().Frontend.LengthOfQueryPrinted)) + mustGetRuntime(cfg).SetGlobalVariables(mometric.MOMetricResetTaskLabel, SV.ResetTaskLabel) + mustGetRuntime(cfg).SetGlobalVariables(mometric.MOMetricTaskLabel, SV.TaskLabel) if !SV.DisableTrace || !SV.DisableMetric { writerFactory = export.GetWriterFactory(fs, UUID, nodeRole, !SV.DisableSqlWriter) diff --git a/pkg/config/configuration.go b/pkg/config/configuration.go index 539eb66cf0ea0..f9d4ba950f7ce 100644 --- a/pkg/config/configuration.go +++ b/pkg/config/configuration.go @@ -582,6 +582,10 @@ type ObservabilityParameters struct { // LabelSelector LabelSelector map[string]string `toml:"label-selector"` + // TaskLabel + TaskLabel map[string]string `toml:"task-label"` + ResetTaskLabel bool `toml:"reset-task-label"` + // estimate tcp network packet cost TCPPacket bool `toml:"tcp-packet"` @@ -652,6 +656,8 @@ func NewObservabilityParameters() *ObservabilityParameters { SelectAggThreshold: toml.Duration{}, EnableStmtMerge: false, LabelSelector: map[string]string{}, /*default: role=logging_cn*/ + TaskLabel: map[string]string{}, + ResetTaskLabel: false, TCPPacket: true, MaxLogMessageSize: toml.ByteSize(defaultMaxLogMessageSize), CU: *NewOBCUConfig(), @@ -768,9 +774,9 @@ func (op *ObservabilityParameters) resetConfigByOld() { } } resetMapConfig := func(target map[string]string, defaultVal map[string]string, setVal map[string]string) { - eq := true + eq := len(target) == len(defaultVal) // check eq - if len(target) == len(defaultVal) { + if eq { for k, v := range defaultVal { if target[k] != v { eq = false diff --git a/pkg/frontend/internal_executor.go b/pkg/frontend/internal_executor.go index 3bc733bdb100b..6f7ef8c89140c 100644 --- a/pkg/frontend/internal_executor.go +++ b/pkg/frontend/internal_executor.go @@ -294,12 +294,24 @@ func (ip *internalProtocol) GetBool(PropertyID) bool { } func (ip *internalProtocol) Write(execCtx *ExecCtx, crs *perfcounter.CounterSet, bat *batch.Batch) error { + // init ip.result.resultSet mrs := execCtx.ses.GetMysqlResultSet() - err := fillResultSet(execCtx.reqCtx, bat, execCtx.ses, mrs) + res := ip.result.resultSet + if res == nil { + res = &MysqlResultSet{} + ip.result.resultSet = res + for _, col := range mrs.Columns { + res.AddColumn(col) + } + } + + // copy into result set + err := fillResultSet(execCtx.reqCtx, bat, execCtx.ses, res) if err != nil { return err } - return ip.sendRows(mrs, uint64(bat.RowCount())) + ip.result.affectedRows += uint64(bat.RowCount()) + return nil } func (ip *internalProtocol) WriteHandshake() error { @@ -468,6 +480,9 @@ func (ip *internalProtocol) SetUserName(username string) { func (ip *internalProtocol) Close() {} +// sendRows +// case 1: used in WriteResponse and WriteResultSetRow, which are 'copy' op +// case 2: used in Write, which is 'append' op. (deprecated) func (ip *internalProtocol) sendRows(mrs *MysqlResultSet, cnt uint64) error { if ip.stashResult { res := ip.result.resultSet diff --git a/pkg/frontend/internal_executor_test.go b/pkg/frontend/internal_executor_test.go index a014767cbf1db..2c051da3a6eb0 100644 --- a/pkg/frontend/internal_executor_test.go +++ b/pkg/frontend/internal_executor_test.go @@ -24,7 +24,10 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/runtime" "github.com/matrixorigin/matrixone/pkg/config" + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/defines" + "github.com/matrixorigin/matrixone/pkg/testutil" ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor" ) @@ -134,3 +137,61 @@ func TestIeResult(t *testing.T) { require.NoError(t, e) require.Equal(t, 42, v.(int)) } + +func Test_internalProtocol_Write(t *testing.T) { + setPu("", config.NewParameterUnit(&config.FrontendParameters{}, nil, nil, nil)) + // Mock autoIncrCaches + setAicm("", &defines.AutoIncrCacheManager{}) + + executorVar := NewInternalExecutor("") + ip := executorVar.proto + assert.True(t, ip.IsEstablished()) + ip.stashResult = true + ip.SetEstablished() + ip.Close() + ip.ResetStatistics() + _ = ip.ConnectionID() + ctx := context.TODO() + assert.Nil(t, ip.WriteColumnDef(ctx, nil, 1)) + assert.Nil(t, ip.WriteLengthEncodedNumber(1)) + assert.Nil(t, ip.WriteEOFIF(0, 1)) + assert.Nil(t, ip.WriteOK(1, 1, 0, 0, "")) + assert.Nil(t, ip.WriteEOFOrOK(0, 1)) + + ses := executorVar.newCmdSession(ctx, ie.NewOptsBuilder().Finish()) + col1 := &MysqlColumn{} + col1.SetName("col1") + col1.SetColumnType(defines.MYSQL_TYPE_LONG) + ses.mrs = &MysqlResultSet{} + ses.mrs.AddColumn(col1) + + execCtx := &ExecCtx{ + reqCtx: ctx, + ses: ses, + } + + mockBatch := func(vals []int64) *batch.Batch { + bat := batch.New([]string{"col1"}) + vecs := make([]*vector.Vector, 1) + vecs[0] = testutil.MakeInt64Vector(vals, nil) + bat.Vecs = vecs + bat.SetRowCount(len(vals)) + return bat + } + batch1 := mockBatch([]int64{100}) + batch2 := mockBatch([]int64{200, 201}) + + // ======================= main =================== + ip.Reset(ses) + err := ip.Write(execCtx, nil, batch1) + require.NoError(t, err) + require.Equal(t, 1, int(ip.result.affectedRows)) + require.Equal(t, 1, len(ip.result.resultSet.Data)) + require.Equal(t, [][]any{{int64(100)} /*colum1, rows: 1*/}, ip.result.resultSet.Data) + + err = ip.Write(execCtx, nil, batch2) + require.NoError(t, err) + require.Equal(t, 3, int(ip.result.affectedRows)) + require.Equal(t, 3, len(ip.result.resultSet.Data)) + require.Equal(t, [][]any{{int64(100)}, {int64(200)}, {int64(201)} /*column1, rows: 3*/}, ip.result.resultSet.Data) +} diff --git a/pkg/predefine/predefine.go b/pkg/predefine/predefine.go index 236fe45ad1f4b..6aafcf13ec6ed 100644 --- a/pkg/predefine/predefine.go +++ b/pkg/predefine/predefine.go @@ -19,11 +19,12 @@ import ( "fmt" "time" + "github.com/robfig/cron/v3" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/pb/task" "github.com/matrixorigin/matrixone/pkg/util/export" "github.com/matrixorigin/matrixone/pkg/util/metric/mometric" - "github.com/robfig/cron/v3" ) // genInitCronTaskSQL Generate `insert` statement for creating system cron tasks, which works on the `mo_task`.`sys_cron_task` table. @@ -63,7 +64,7 @@ func GenInitCronTaskSQL() (string, error) { } cronTasks = append(cronTasks, task1) - task2, err := createCronTask(mometric.TaskMetadata(mometric.StorageUsageCronTask, task.TaskCode_MetricStorageUsage), export.MergeTaskCronExprEvery05Min) + task2, err := createCronTask(mometric.TaskMetadata(mometric.StorageUsageCronTask, task.TaskCode_MetricStorageUsage), mometric.StorageUsageTaskCronExpr) if err != nil { return "", err } diff --git a/pkg/util/export/etl/type.go b/pkg/util/export/etl/type.go new file mode 100644 index 0000000000000..56f0b1b4a8ab0 --- /dev/null +++ b/pkg/util/export/etl/type.go @@ -0,0 +1,18 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etl + +// ETLMergeTask name for cron task in mo_task.sys_cron_task +const ETLMergeTask = "ETLMergeTask" diff --git a/pkg/util/export/merge.go b/pkg/util/export/merge.go index 404d94246f071..8ad6df0731e5c 100644 --- a/pkg/util/export/merge.go +++ b/pkg/util/export/merge.go @@ -761,7 +761,7 @@ const ParamSeparator = " " // MergeTaskMetadata handle args like: "{db_tbl_name} [date, default: today]" func MergeTaskMetadata(id task.TaskCode, args ...string) task.TaskMetadata { return task.TaskMetadata{ - ID: path.Join("ETLMergeTask", path.Join(args...)), + ID: path.Join(etl.ETLMergeTask, path.Join(args...)), Executor: id, Context: []byte(strings.Join(args, ParamSeparator)), Options: task.TaskOptions{Concurrency: 1}, diff --git a/pkg/util/metric/mometric/cron_task.go b/pkg/util/metric/mometric/cron_task.go index d863412c9458c..8252d9ff7bf0d 100644 --- a/pkg/util/metric/mometric/cron_task.go +++ b/pkg/util/metric/mometric/cron_task.go @@ -16,6 +16,7 @@ package mometric import ( "context" + "encoding/json" "fmt" "math" "path" @@ -34,6 +35,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/pb/task" "github.com/matrixorigin/matrixone/pkg/taskservice" + "github.com/matrixorigin/matrixone/pkg/util/export/etl" "github.com/matrixorigin/matrixone/pkg/util/export/table" ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor" "github.com/matrixorigin/matrixone/pkg/util/metric" @@ -182,6 +184,7 @@ func CalculateStorageUsage( service string, sqlExecutor func() ie.InternalExecutor, ) (err error) { + var restart bool var account string var sizeMB, snapshotSizeMB, objectCount float64 ctx, span := trace.Start(ctx, "MetricStorageUsage") @@ -200,6 +203,12 @@ func CalculateStorageUsage( cleanStorageUsageMetric(logger, "CalculateStorageUsage") }() + restart, err = checkAndResetTaskLabels(ctx, logger, service, sqlExecutor) + if err != nil || restart { + logger.Info("checkAndResetTaskLabels", zap.Error(err), zap.Bool("need-restart", restart)) + return err + } + // init metric value v2.GetTraceCheckStorageUsageAllCounter().Add(0) v2.GetTraceCheckStorageUsageNewCounter().Add(0) @@ -451,3 +460,142 @@ func checkNewAccountSize(ctx context.Context, logger *log.MOLogger, sqlExecutor logger.Debug("wait next round, check new account") } } + +const ( + MOMetricResetTaskLabel = "mo_metric_reset_task_labels" + MOMetricTaskLabel = "mo_metric_task_labels" +) + +const ( + idxCronTaskId = iota + idxTaskMetadataId + idxTaskMetadataOption +) + +func getQueryCronTaskRecord() string { + // like: + // mysql> select cron_task_id, task_metadata_id, task_metadata_option from mo_task.sys_cron_task where task_metadata_id in ('ETLMergeTask', 'StorageUsage'); + // +--------------+------------------+----------------------+ + // | cron_task_id | task_metadata_id | task_metadata_option | + // +--------------+------------------+----------------------+ + // | 2 | StorageUsage | {"Concurrency":1} | + // | 1 | ETLMergeTask | {"Concurrency":1} | + // +--------------+------------------+----------------------+ + return fmt.Sprintf(`select cron_task_id, task_metadata_id, task_metadata_option from mo_task.sys_cron_task where task_metadata_id in ('%s', '%s')`, + etl.ETLMergeTask, + StorageUsageCronTask, + ) +} + +func checkAndResetTaskLabels(ctx context.Context, logger *log.MOLogger, service string, sqlExecutor func() ie.InternalExecutor) (restart bool, err error) { + + var reset = false + var labels = map[string]string{} + + if s, exist := runtime.ServiceRuntime(service).GetGlobalVariables(MOMetricResetTaskLabel); exist { + reset = s.(bool) + } + if s, exist := runtime.ServiceRuntime(service).GetGlobalVariables(MOMetricTaskLabel); exist { + labels = s.(map[string]string) + } + + if !reset { + logger.Info("skip reset task labels") + return false, nil + } + + opts := ie.NewOptsBuilder().Database(MetricDBConst).Internal(true).Finish() + executor := sqlExecutor() + sql := getQueryCronTaskRecord() + logger.Info("query", zap.String("sql", sql)) + result := executor.Query(ctx, sql, opts) + + tasks := make([]task.CronTask, 0, 2) + dstTask := make([]task.CronTask, 0, 2) + cnt := result.RowCount() + if cnt == 0 { + logger.Warn("got empty sys_cron_task", zap.String("sql", sql)) + return false, moerr.NewInternalErrorf(ctx, "ResetTaskLabels: got empty sys_cron_task") + } + logger.Debug("fetch sys_cron_task", zap.Uint64("cnt", cnt)) + + defer func() { + if err != nil { + logger.Error("fetch sys_cron_task failed", zap.Error(err)) + } + }() + + // fetch task + for rowIdx := uint64(0); rowIdx < cnt; rowIdx++ { + var t task.CronTask + var options string + t.ID, err = result.GetUint64(ctx, rowIdx, idxCronTaskId) + if err != nil { + return false, err + } + + t.Metadata.ID, err = result.GetString(ctx, rowIdx, idxTaskMetadataId) + if err != nil { + return false, err + } + + options, err = result.GetString(ctx, rowIdx, idxTaskMetadataOption) + if err != nil { + return false, err + } + if err = json.Unmarshal([]byte(options), &t.Metadata.Options); err != nil { + return false, err + } + + tasks = append(tasks, t) + } + + // check task labels +checkL: + for _, t := range tasks { + if len(t.Metadata.Options.Labels) != len(labels) { + dstTask = append(dstTask, t) + continue + } + for k, v := range labels { + if t.Metadata.Options.Labels[k] != v { + dstTask = append(dstTask, t) + continue checkL + } + } + } + + if len(dstTask) == 0 { + logger.Info("all task label ok", zap.Any("labels", labels)) + return false, nil + } + + // update task + for _, t := range dstTask { + var options []byte + logger.Info("diff task labels", + zap.String("task", t.Metadata.ID), + zap.Any("src", t.Metadata.Options.Labels), + zap.Any("dest", labels), + ) + t.Metadata.Options.Labels = labels + + options, err = json.Marshal(t.Metadata.Options) + if err != nil { + return false, err + } + + sql = fmt.Sprintf( + "update mo_task.sys_cron_task set task_metadata_option='%s' where cron_task_id=%d", + options, + t.ID, + ) + logger.Info("query", zap.String("sql", sql), zap.ByteString("options", options)) + err = executor.Exec(ctx, sql, opts) + if err != nil { + return false, err + } + } + + return true, nil +}