Skip to content

Commit

Permalink
feat: support reset ETLMergeTask and StorageUsage cron_task Options.L…
Browse files Browse the repository at this point in the history
…abels (#20104)

1. new config `observability.task-label`. specify `options.Label` for mo_task `ETLMergeTask`, `StorageUsage`
2. new config `observability.reset-task-label`. Do reset and check labels op at the beginning of  `StorageUsage` task.
3. fix internalExecutor got query result, in `append` situation.

Approved by: @daviszhen, @zhangxu19830126, @aptend, @w-zr, @sukki37
  • Loading branch information
xzxiong authored Nov 29, 2024
1 parent 446e06a commit e3e8d76
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 7 deletions.
2 changes: 2 additions & 0 deletions cmd/mo-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ func initTraceMetric(ctx context.Context, st metadata.ServiceType, cfg *Config,
selector := clusterservice.NewSelector().SelectByLabel(SV.LabelSelector, clusterservice.Contain)
mustGetRuntime(cfg).SetGlobalVariables(runtime.BackgroundCNSelector, selector)
mustGetRuntime(cfg).SetGlobalVariables(motrace.MaxStatementSize, int(cfg.getCNServiceConfig().Frontend.LengthOfQueryPrinted))
mustGetRuntime(cfg).SetGlobalVariables(mometric.MOMetricResetTaskLabel, SV.ResetTaskLabel)
mustGetRuntime(cfg).SetGlobalVariables(mometric.MOMetricTaskLabel, SV.TaskLabel)

if !SV.DisableTrace || !SV.DisableMetric {
writerFactory = export.GetWriterFactory(fs, UUID, nodeRole, !SV.DisableSqlWriter)
Expand Down
10 changes: 8 additions & 2 deletions pkg/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,10 @@ type ObservabilityParameters struct {
// LabelSelector
LabelSelector map[string]string `toml:"label-selector"`

// TaskLabel
TaskLabel map[string]string `toml:"task-label"`
ResetTaskLabel bool `toml:"reset-task-label"`

// estimate tcp network packet cost
TCPPacket bool `toml:"tcp-packet"`

Expand Down Expand Up @@ -652,6 +656,8 @@ func NewObservabilityParameters() *ObservabilityParameters {
SelectAggThreshold: toml.Duration{},
EnableStmtMerge: false,
LabelSelector: map[string]string{}, /*default: role=logging_cn*/
TaskLabel: map[string]string{},
ResetTaskLabel: false,
TCPPacket: true,
MaxLogMessageSize: toml.ByteSize(defaultMaxLogMessageSize),
CU: *NewOBCUConfig(),
Expand Down Expand Up @@ -768,9 +774,9 @@ func (op *ObservabilityParameters) resetConfigByOld() {
}
}
resetMapConfig := func(target map[string]string, defaultVal map[string]string, setVal map[string]string) {
eq := true
eq := len(target) == len(defaultVal)
// check eq
if len(target) == len(defaultVal) {
if eq {
for k, v := range defaultVal {
if target[k] != v {
eq = false
Expand Down
19 changes: 17 additions & 2 deletions pkg/frontend/internal_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,24 @@ func (ip *internalProtocol) GetBool(PropertyID) bool {
}

func (ip *internalProtocol) Write(execCtx *ExecCtx, crs *perfcounter.CounterSet, bat *batch.Batch) error {
// init ip.result.resultSet
mrs := execCtx.ses.GetMysqlResultSet()
err := fillResultSet(execCtx.reqCtx, bat, execCtx.ses, mrs)
res := ip.result.resultSet
if res == nil {
res = &MysqlResultSet{}
ip.result.resultSet = res
for _, col := range mrs.Columns {
res.AddColumn(col)
}
}

// copy into result set
err := fillResultSet(execCtx.reqCtx, bat, execCtx.ses, res)
if err != nil {
return err
}
return ip.sendRows(mrs, uint64(bat.RowCount()))
ip.result.affectedRows += uint64(bat.RowCount())
return nil
}

func (ip *internalProtocol) WriteHandshake() error {
Expand Down Expand Up @@ -468,6 +480,9 @@ func (ip *internalProtocol) SetUserName(username string) {

func (ip *internalProtocol) Close() {}

// sendRows
// case 1: used in WriteResponse and WriteResultSetRow, which are 'copy' op
// case 2: used in Write, which is 'append' op. (deprecated)
func (ip *internalProtocol) sendRows(mrs *MysqlResultSet, cnt uint64) error {
if ip.stashResult {
res := ip.result.resultSet
Expand Down
61 changes: 61 additions & 0 deletions pkg/frontend/internal_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/testutil"
ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor"
)

Expand Down Expand Up @@ -134,3 +137,61 @@ func TestIeResult(t *testing.T) {
require.NoError(t, e)
require.Equal(t, 42, v.(int))
}

func Test_internalProtocol_Write(t *testing.T) {
setPu("", config.NewParameterUnit(&config.FrontendParameters{}, nil, nil, nil))
// Mock autoIncrCaches
setAicm("", &defines.AutoIncrCacheManager{})

executorVar := NewInternalExecutor("")
ip := executorVar.proto
assert.True(t, ip.IsEstablished())
ip.stashResult = true
ip.SetEstablished()
ip.Close()
ip.ResetStatistics()
_ = ip.ConnectionID()
ctx := context.TODO()
assert.Nil(t, ip.WriteColumnDef(ctx, nil, 1))
assert.Nil(t, ip.WriteLengthEncodedNumber(1))
assert.Nil(t, ip.WriteEOFIF(0, 1))
assert.Nil(t, ip.WriteOK(1, 1, 0, 0, ""))
assert.Nil(t, ip.WriteEOFOrOK(0, 1))

ses := executorVar.newCmdSession(ctx, ie.NewOptsBuilder().Finish())
col1 := &MysqlColumn{}
col1.SetName("col1")
col1.SetColumnType(defines.MYSQL_TYPE_LONG)
ses.mrs = &MysqlResultSet{}
ses.mrs.AddColumn(col1)

execCtx := &ExecCtx{
reqCtx: ctx,
ses: ses,
}

mockBatch := func(vals []int64) *batch.Batch {
bat := batch.New([]string{"col1"})
vecs := make([]*vector.Vector, 1)
vecs[0] = testutil.MakeInt64Vector(vals, nil)
bat.Vecs = vecs
bat.SetRowCount(len(vals))
return bat
}
batch1 := mockBatch([]int64{100})
batch2 := mockBatch([]int64{200, 201})

// ======================= main ===================
ip.Reset(ses)
err := ip.Write(execCtx, nil, batch1)
require.NoError(t, err)
require.Equal(t, 1, int(ip.result.affectedRows))
require.Equal(t, 1, len(ip.result.resultSet.Data))
require.Equal(t, [][]any{{int64(100)} /*colum1, rows: 1*/}, ip.result.resultSet.Data)

err = ip.Write(execCtx, nil, batch2)
require.NoError(t, err)
require.Equal(t, 3, int(ip.result.affectedRows))
require.Equal(t, 3, len(ip.result.resultSet.Data))
require.Equal(t, [][]any{{int64(100)}, {int64(200)}, {int64(201)} /*column1, rows: 3*/}, ip.result.resultSet.Data)
}
5 changes: 3 additions & 2 deletions pkg/predefine/predefine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ import (
"fmt"
"time"

"github.com/robfig/cron/v3"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/pb/task"
"github.com/matrixorigin/matrixone/pkg/util/export"
"github.com/matrixorigin/matrixone/pkg/util/metric/mometric"
"github.com/robfig/cron/v3"
)

// genInitCronTaskSQL Generate `insert` statement for creating system cron tasks, which works on the `mo_task`.`sys_cron_task` table.
Expand Down Expand Up @@ -63,7 +64,7 @@ func GenInitCronTaskSQL() (string, error) {
}
cronTasks = append(cronTasks, task1)

task2, err := createCronTask(mometric.TaskMetadata(mometric.StorageUsageCronTask, task.TaskCode_MetricStorageUsage), export.MergeTaskCronExprEvery05Min)
task2, err := createCronTask(mometric.TaskMetadata(mometric.StorageUsageCronTask, task.TaskCode_MetricStorageUsage), mometric.StorageUsageTaskCronExpr)
if err != nil {
return "", err
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/util/export/etl/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etl

// ETLMergeTask name for cron task in mo_task.sys_cron_task
const ETLMergeTask = "ETLMergeTask"
2 changes: 1 addition & 1 deletion pkg/util/export/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ const ParamSeparator = " "
// MergeTaskMetadata handle args like: "{db_tbl_name} [date, default: today]"
func MergeTaskMetadata(id task.TaskCode, args ...string) task.TaskMetadata {
return task.TaskMetadata{
ID: path.Join("ETLMergeTask", path.Join(args...)),
ID: path.Join(etl.ETLMergeTask, path.Join(args...)),
Executor: id,
Context: []byte(strings.Join(args, ParamSeparator)),
Options: task.TaskOptions{Concurrency: 1},
Expand Down
148 changes: 148 additions & 0 deletions pkg/util/metric/mometric/cron_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mometric

import (
"context"
"encoding/json"
"fmt"
"math"
"path"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/task"
"github.com/matrixorigin/matrixone/pkg/taskservice"
"github.com/matrixorigin/matrixone/pkg/util/export/etl"
"github.com/matrixorigin/matrixone/pkg/util/export/table"
ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor"
"github.com/matrixorigin/matrixone/pkg/util/metric"
Expand Down Expand Up @@ -182,6 +184,7 @@ func CalculateStorageUsage(
service string,
sqlExecutor func() ie.InternalExecutor,
) (err error) {
var restart bool
var account string
var sizeMB, snapshotSizeMB, objectCount float64
ctx, span := trace.Start(ctx, "MetricStorageUsage")
Expand All @@ -200,6 +203,12 @@ func CalculateStorageUsage(
cleanStorageUsageMetric(logger, "CalculateStorageUsage")
}()

restart, err = checkAndResetTaskLabels(ctx, logger, service, sqlExecutor)
if err != nil || restart {
logger.Info("checkAndResetTaskLabels", zap.Error(err), zap.Bool("need-restart", restart))
return err
}

// init metric value
v2.GetTraceCheckStorageUsageAllCounter().Add(0)
v2.GetTraceCheckStorageUsageNewCounter().Add(0)
Expand Down Expand Up @@ -451,3 +460,142 @@ func checkNewAccountSize(ctx context.Context, logger *log.MOLogger, sqlExecutor
logger.Debug("wait next round, check new account")
}
}

const (
MOMetricResetTaskLabel = "mo_metric_reset_task_labels"
MOMetricTaskLabel = "mo_metric_task_labels"
)

const (
idxCronTaskId = iota
idxTaskMetadataId
idxTaskMetadataOption
)

func getQueryCronTaskRecord() string {
// like:
// mysql> select cron_task_id, task_metadata_id, task_metadata_option from mo_task.sys_cron_task where task_metadata_id in ('ETLMergeTask', 'StorageUsage');
// +--------------+------------------+----------------------+
// | cron_task_id | task_metadata_id | task_metadata_option |
// +--------------+------------------+----------------------+
// | 2 | StorageUsage | {"Concurrency":1} |
// | 1 | ETLMergeTask | {"Concurrency":1} |
// +--------------+------------------+----------------------+
return fmt.Sprintf(`select cron_task_id, task_metadata_id, task_metadata_option from mo_task.sys_cron_task where task_metadata_id in ('%s', '%s')`,
etl.ETLMergeTask,
StorageUsageCronTask,
)
}

func checkAndResetTaskLabels(ctx context.Context, logger *log.MOLogger, service string, sqlExecutor func() ie.InternalExecutor) (restart bool, err error) {

var reset = false
var labels = map[string]string{}

if s, exist := runtime.ServiceRuntime(service).GetGlobalVariables(MOMetricResetTaskLabel); exist {
reset = s.(bool)
}
if s, exist := runtime.ServiceRuntime(service).GetGlobalVariables(MOMetricTaskLabel); exist {
labels = s.(map[string]string)
}

if !reset {
logger.Info("skip reset task labels")
return false, nil
}

opts := ie.NewOptsBuilder().Database(MetricDBConst).Internal(true).Finish()
executor := sqlExecutor()
sql := getQueryCronTaskRecord()
logger.Info("query", zap.String("sql", sql))
result := executor.Query(ctx, sql, opts)

tasks := make([]task.CronTask, 0, 2)
dstTask := make([]task.CronTask, 0, 2)
cnt := result.RowCount()
if cnt == 0 {
logger.Warn("got empty sys_cron_task", zap.String("sql", sql))
return false, moerr.NewInternalErrorf(ctx, "ResetTaskLabels: got empty sys_cron_task")
}
logger.Debug("fetch sys_cron_task", zap.Uint64("cnt", cnt))

defer func() {
if err != nil {
logger.Error("fetch sys_cron_task failed", zap.Error(err))
}
}()

// fetch task
for rowIdx := uint64(0); rowIdx < cnt; rowIdx++ {
var t task.CronTask
var options string
t.ID, err = result.GetUint64(ctx, rowIdx, idxCronTaskId)
if err != nil {
return false, err
}

t.Metadata.ID, err = result.GetString(ctx, rowIdx, idxTaskMetadataId)
if err != nil {
return false, err
}

options, err = result.GetString(ctx, rowIdx, idxTaskMetadataOption)
if err != nil {
return false, err
}
if err = json.Unmarshal([]byte(options), &t.Metadata.Options); err != nil {
return false, err
}

tasks = append(tasks, t)
}

// check task labels
checkL:
for _, t := range tasks {
if len(t.Metadata.Options.Labels) != len(labels) {
dstTask = append(dstTask, t)
continue
}
for k, v := range labels {
if t.Metadata.Options.Labels[k] != v {
dstTask = append(dstTask, t)
continue checkL
}
}
}

if len(dstTask) == 0 {
logger.Info("all task label ok", zap.Any("labels", labels))
return false, nil
}

// update task
for _, t := range dstTask {
var options []byte
logger.Info("diff task labels",
zap.String("task", t.Metadata.ID),
zap.Any("src", t.Metadata.Options.Labels),
zap.Any("dest", labels),
)
t.Metadata.Options.Labels = labels

options, err = json.Marshal(t.Metadata.Options)
if err != nil {
return false, err
}

sql = fmt.Sprintf(
"update mo_task.sys_cron_task set task_metadata_option='%s' where cron_task_id=%d",
options,
t.ID,
)
logger.Info("query", zap.String("sql", sql), zap.ByteString("options", options))
err = executor.Exec(ctx, sql, opts)
if err != nil {
return false, err
}
}

return true, nil
}

0 comments on commit e3e8d76

Please sign in to comment.