Skip to content

Commit

Permalink
Merge branch 'main' into remove-expand-logic-for-multi-update
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Dec 17, 2024
2 parents 652e541 + 5dcbe05 commit 2acdfd3
Show file tree
Hide file tree
Showing 14 changed files with 1,042 additions and 249 deletions.
25 changes: 25 additions & 0 deletions pkg/cnservice/server_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (s *service) initQueryCommandHandler() {
s.queryService.AddHandleFunc(query.CmdMethod_MetadataCache, s.handleMetadataCacheRequest, false)
s.queryService.AddHandleFunc(query.CmdMethod_FaultInject, s.handleFaultInjection, false)
s.queryService.AddHandleFunc(query.CmdMethod_CtlMoTableStats, s.handleMoTableStats, false)
s.queryService.AddHandleFunc(query.CmdMethod_WorkspaceThreshold, s.handleWorkspaceThresholdRequest, false)
}

func (s *service) handleKillConn(ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer) error {
Expand Down Expand Up @@ -630,3 +631,27 @@ func (s *service) handleMetadataCacheRequest(

return nil
}

func (s *service) handleWorkspaceThresholdRequest(
ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer,
) error {

logutil.Info(
"WORKSPACE-THRESHOLD-CHANGED",
zap.Uint64("commit-threshold", req.WorkspaceThresholdRequest.CommitThreshold),
zap.Uint64("write-threshold", req.WorkspaceThresholdRequest.WriteThreshold),
)

e := s.storeEngine.(*disttae.Engine)
commit, write := e.SetWorkspaceThreshold(
req.WorkspaceThresholdRequest.CommitThreshold,
req.WorkspaceThresholdRequest.WriteThreshold,
)

resp.WorkspaceThresholdResponse = &query.WorkspaceThresholdResponse{
CommitThreshold: commit,
WriteThreshold: write,
}

return nil
}
930 changes: 721 additions & 209 deletions pkg/pb/query/query.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/queryservice/client/query_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var methodVersions = map[pb.CmdMethod]int64{
pb.CmdMethod_GOGCPercent: defines.MORPCVersion4,
pb.CmdMethod_FaultInject: defines.MORPCVersion4,
pb.CmdMethod_CtlMoTableStats: defines.MORPCVersion4,
pb.CmdMethod_WorkspaceThreshold: defines.MORPCVersion4,
}

type queryClient struct {
Expand Down
93 changes: 93 additions & 0 deletions pkg/sql/plan/function/ctl/cmd_workspace_threshold.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ctl

import (
"context"
"github.com/matrixorigin/matrixone/pkg/clusterservice"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/pb/query"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"strconv"
"strings"
"time"
)

type Res struct {
PodID string `json:"pod_id,omitempty"`
CommitThreshold uint64 `json:"commit_threshold,omitempty"`
WriteThreshold uint64 `json:"write_threshold,omitempty"`
ErrorStr string `json:"error,omitempty"`
}

func handleWorkspaceThreshold(
proc *process.Process,
service serviceType,
parameter string,
sender requestSender,
) (Result, error) {
if service != cn {
return Result{}, moerr.NewWrongServiceNoCtx("expected CN", string(service))
}

thresholds := strings.Split(parameter, ":")
if len(thresholds) != 2 {
return Result{}, moerr.NewInvalidInput(proc.Ctx, "invalid parameter")
}
commit, err := strconv.ParseUint(thresholds[0], 10, 64)
if err != nil {
return Result{}, moerr.NewInvalidInput(proc.Ctx, "invalid commit threshold")
}
write, err := strconv.ParseUint(thresholds[1], 10, 64)
if err != nil {
return Result{}, moerr.NewInvalidInput(proc.Ctx, "invalid write threshold")
}

request := proc.GetQueryClient().NewRequest(query.CmdMethod_WorkspaceThreshold)
request.WorkspaceThresholdRequest = &query.WorkspaceThresholdRequest{
CommitThreshold: commit,
WriteThreshold: write,
}

results := make([]Res, 0)

clusterservice.GetMOCluster(
proc.GetService()).GetCNService(clusterservice.Selector{}, func(cn metadata.CNService) bool {
ctx, cancel := context.WithTimeoutCause(context.Background(), time.Second, moerr.CauseTransferRequest2OtherCNs)
defer cancel()

resp, err := proc.GetQueryClient().SendMessage(ctx, cn.QueryAddress, request)
err = moerr.AttachCause(ctx, err)

res := Res{
PodID: cn.ServiceID,
}

if err != nil {
res.ErrorStr = err.Error()
} else {
res.CommitThreshold = resp.WorkspaceThresholdResponse.CommitThreshold
res.WriteThreshold = resp.WorkspaceThresholdResponse.WriteThreshold
}
results = append(results, res)
return true
})

return Result{
Method: WorkspaceThreshold,
Data: results,
}, nil
}
59 changes: 59 additions & 0 deletions pkg/sql/plan/function/ctl/cmd_workspace_threshold_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ctl

import (
"github.com/google/uuid"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/queryservice/client"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"github.com/stretchr/testify/require"
"testing"
)

func TestHandleWorkspaceThreshold(t *testing.T) {
id := uuid.New().String()
addr := "127.0.0.1:7755"
initRuntime([]string{id}, []string{addr})

runtime.SetupServiceBasedRuntime(id, runtime.ServiceRuntime(""))

cli, err := client.NewQueryClient(id, morpc.Config{})
require.Nil(t, err)

proc := new(process.Process)
proc.Base = &process.BaseProcess{}
proc.Base.QueryClient = cli

result, err := handleWorkspaceThreshold(proc, tn, "xxx", nil)
require.Equal(t, Result{}, result)
require.True(t, moerr.IsMoErrCode(err, moerr.ErrWrongService))

runtime.ServiceRuntime("").SetGlobalVariables(runtime.ClusterService, new(mockCluster))

result, err = handleWorkspaceThreshold(proc, cn, "2:3", nil)
require.NoError(t, err)
require.Equal(t, result, Result{
Method: WorkspaceThreshold,
Data: []Res{
{
PodID: "not exist",
ErrorStr: "internal error: invalid CN query address ",
},
},
})
}
2 changes: 2 additions & 0 deletions pkg/sql/plan/function/ctl/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var (
CtlReaderMethod = strings.ToUpper("reader")
GetTableShards = strings.ToUpper("get-table-shards")
MoTableStats = strings.ToUpper("MoTableStats")
WorkspaceThreshold = strings.ToUpper("WorkspaceThreshold")
)

var (
Expand Down Expand Up @@ -98,6 +99,7 @@ var (
CtlReaderMethod: handleCtlReader,
GetTableShards: handleGetTableShards,
MoTableStats: handleMoTableStats,
WorkspaceThreshold: handleWorkspaceThreshold,
}
)

Expand Down
25 changes: 22 additions & 3 deletions pkg/vm/engine/disttae/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,11 @@ func (e *Engine) fillDefaults() {
if e.config.insertEntryMaxCount <= 0 {
e.config.insertEntryMaxCount = InsertEntryThreshold
}
if e.config.workspaceThreshold <= 0 {
e.config.workspaceThreshold = WorkspaceThreshold
if e.config.commitWorkspaceThreshold <= 0 {
e.config.commitWorkspaceThreshold = CommitWorkspaceThreshold
}
if e.config.writeWorkspaceThreshold <= 0 {
e.config.writeWorkspaceThreshold = WriteWorkspaceThreshold
}
if e.config.cnTransferTxnLifespanThreshold <= 0 {
e.config.cnTransferTxnLifespanThreshold = CNTransferTxnLifespanThreshold
Expand All @@ -167,11 +170,27 @@ func (e *Engine) fillDefaults() {
logutil.Info(
"INIT-ENGINE-CONFIG",
zap.Int("InsertEntryMaxCount", e.config.insertEntryMaxCount),
zap.Uint64("WorkspaceThreshold", e.config.workspaceThreshold),
zap.Uint64("CommitWorkspaceThreshold", e.config.commitWorkspaceThreshold),
zap.Uint64("WriteWorkspaceThreshold", e.config.writeWorkspaceThreshold),
zap.Duration("CNTransferTxnLifespanThreshold", e.config.cnTransferTxnLifespanThreshold),
)
}

// SetWorkspaceThreshold updates the commit and write workspace thresholds (in MB).
// Non-zero values override the current thresholds, while zero keeps them unchanged.
// Returns the previous thresholds (in MB).
func (e *Engine) SetWorkspaceThreshold(commitThreshold, writeThreshold uint64) (commit, write uint64) {
commit = e.config.commitWorkspaceThreshold / mpool.MB
write = e.config.writeWorkspaceThreshold / mpool.MB
if commitThreshold != 0 {
e.config.commitWorkspaceThreshold = commitThreshold * mpool.MB
}
if writeThreshold != 0 {
e.config.writeWorkspaceThreshold = writeThreshold * mpool.MB
}
return
}

func (e *Engine) GetService() string {
return e.service
}
Expand Down
51 changes: 48 additions & 3 deletions pkg/vm/engine/disttae/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/hex"
"fmt"
"math"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -450,13 +451,13 @@ func (txn *Transaction) dumpBatchLocked(ctx context.Context, offset int) error {

//offset < 0 indicates commit.
if offset < 0 {
if txn.approximateInMemInsertSize < txn.engine.config.workspaceThreshold &&
if txn.approximateInMemInsertSize < txn.commitWorkspaceThreshold &&
txn.approximateInMemInsertCnt < txn.engine.config.insertEntryMaxCount &&
txn.approximateInMemDeleteCnt < txn.engine.config.insertEntryMaxCount {
return nil
}
} else {
if txn.approximateInMemInsertSize < txn.engine.config.workspaceThreshold {
if txn.approximateInMemInsertSize < txn.writeWorkspaceThreshold {
return nil
}
}
Expand All @@ -478,7 +479,7 @@ func (txn *Transaction) dumpBatchLocked(ctx context.Context, offset int) error {
size += uint64(txn.writes[i].bat.Size())
}
}
if size < txn.engine.config.workspaceThreshold {
if size < txn.writeWorkspaceThreshold {
return nil
}
size = 0
Expand Down Expand Up @@ -516,10 +517,51 @@ func (txn *Transaction) dumpBatchLocked(ctx context.Context, offset int) error {
}

func (txn *Transaction) dumpInsertBatchLocked(ctx context.Context, offset int, size *uint64, pkCount *int) error {
tbSize := make(map[uint64]int)
tbCount := make(map[uint64]int)
skipTable := make(map[uint64]bool)

for i := offset; i < len(txn.writes); i++ {
if txn.writes[i].isCatalog() {
continue
}
if txn.writes[i].bat == nil || txn.writes[i].bat.RowCount() == 0 {
continue
}
if txn.writes[i].typ == INSERT && txn.writes[i].fileName == "" {
tbSize[txn.writes[i].tableId] += txn.writes[i].bat.Size()
tbCount[txn.writes[i].tableId] += txn.writes[i].bat.RowCount()
}
}

keys := make([]uint64, 0, len(tbSize))
for k := range tbSize {
keys = append(keys, k)
}
sort.Slice(keys, func(i, j int) bool {
return tbSize[keys[i]] < tbSize[keys[j]]
})
sum := 0
for _, k := range keys {
if tbCount[k] >= txn.engine.config.insertEntryMaxCount {
continue
}
if uint64(sum+tbSize[k]) >= txn.writeWorkspaceThreshold {
break
}
sum += tbSize[k]
skipTable[k] = true
}

lastWritesIndex := offset
writes := txn.writes
mp := make(map[tableKey][]*batch.Batch)
for i := offset; i < len(txn.writes); i++ {
if skipTable[txn.writes[i].tableId] {
writes[lastWritesIndex] = writes[i]
lastWritesIndex++
continue
}
if txn.writes[i].isCatalog() {
writes[lastWritesIndex] = writes[i]
lastWritesIndex++
Expand Down Expand Up @@ -1470,6 +1512,9 @@ func (txn *Transaction) CloneSnapshotWS() client.Workspace {
cnBlkId_Pos: map[types.Blockid]Pos{},
batchSelectList: make(map[*batch.Batch][]int64),
cn_flushed_s3_tombstone_object_stats_list: new(sync.Map),

commitWorkspaceThreshold: txn.commitWorkspaceThreshold,
writeWorkspaceThreshold: txn.writeWorkspaceThreshold,
}

ws.readOnly.Store(true)
Expand Down
Loading

0 comments on commit 2acdfd3

Please sign in to comment.