Skip to content

Commit

Permalink
support embed mo for testing (#17670)
Browse files Browse the repository at this point in the history
Add embed mo support, and support for cluster testing in UT.

Approved by: @xzxiong, @daviszhen, @aptend, @badboynt1, @ouyuanning, @reusee, @m-schen, @qingxinhome, @sukki37, @XuPeng-SH, @aunjgr, @fengttt
  • Loading branch information
zhangxu19830126 authored Jul 26, 2024
1 parent 3c65bc1 commit 839495f
Show file tree
Hide file tree
Showing 35 changed files with 2,242 additions and 67 deletions.
3 changes: 3 additions & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@
# pkg/defines
/pkg/defines @daviszhen

# pkg/embed
/pkg/embed @zhangxu19830126

# pkg/tnservice
/pkg/tnservice @zhangxu19830126 @XuPeng-SH

Expand Down
2 changes: 2 additions & 0 deletions cmd/mo-service/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"sync"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
Expand Down Expand Up @@ -56,6 +57,7 @@ func setupServiceRuntime(
cfg.mustGetServiceUUID(),
r,
)
catalog.SetupDefines(cfg.mustGetServiceUUID())
return nil
}

Expand Down
54 changes: 42 additions & 12 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"regexp"
"strconv"

"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/compress"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
Expand All @@ -35,22 +36,50 @@ const (
CatalogVersion_Curr uint32 = CatalogVersion_V1
)

func init() {
MoDatabaseTableDefs = make([]engine.TableDef, len(MoDatabaseSchema))
type Defines struct {
// used by memengine or tae
MoDatabaseTableDefs []engine.TableDef
// used by memengine or tae
MoTablesTableDefs []engine.TableDef
// used by memengine or tae
MoColumnsTableDefs []engine.TableDef
// used by memengine or tae or cn
MoTableMetaDefs []engine.TableDef
MoDatabaseConstraint []byte
MoTableConstraint []byte
MoColumnConstraint []byte
}

func SetupDefines(sid string) {
runtime.ServiceRuntime(sid).SetGlobalVariables("catalog_defines", newDefines())
}

func GetDefines(sid string) *Defines {
v, ok := runtime.ServiceRuntime(sid).GetGlobalVariables("catalog_defines")
if !ok {
panic("catalog_defines is not set: " + sid)
}
return v.(*Defines)
}

func newDefines() *Defines {
d := &Defines{}

d.MoDatabaseTableDefs = make([]engine.TableDef, len(MoDatabaseSchema))
for i, name := range MoDatabaseSchema {
MoDatabaseTableDefs[i] = newAttributeDef(name, MoDatabaseTypes[i], i == 0)
d.MoDatabaseTableDefs[i] = newAttributeDef(name, MoDatabaseTypes[i], i == 0)
}
MoTablesTableDefs = make([]engine.TableDef, len(MoTablesSchema))
d.MoTablesTableDefs = make([]engine.TableDef, len(MoTablesSchema))
for i, name := range MoTablesSchema {
MoTablesTableDefs[i] = newAttributeDef(name, MoTablesTypes[i], i == 0)
d.MoTablesTableDefs[i] = newAttributeDef(name, MoTablesTypes[i], i == 0)
}
MoColumnsTableDefs = make([]engine.TableDef, len(MoColumnsSchema))
d.MoColumnsTableDefs = make([]engine.TableDef, len(MoColumnsSchema))
for i, name := range MoColumnsSchema {
MoColumnsTableDefs[i] = newAttributeDef(name, MoColumnsTypes[i], i == 0)
d.MoColumnsTableDefs[i] = newAttributeDef(name, MoColumnsTypes[i], i == 0)
}
MoTableMetaDefs = make([]engine.TableDef, len(MoTableMetaSchema))
d.MoTableMetaDefs = make([]engine.TableDef, len(MoTableMetaSchema))
for i, name := range MoTableMetaSchema {
MoTableMetaDefs[i] = newAttributeDef(name, MoTableMetaTypes[i], i == 0)
d.MoTableMetaDefs[i] = newAttributeDef(name, MoTableMetaTypes[i], i == 0)
}

def := &engine.ConstraintDef{
Expand All @@ -65,7 +94,7 @@ func init() {
},
},
}
MoDatabaseConstraint, _ = def.MarshalBinary()
d.MoDatabaseConstraint, _ = def.MarshalBinary()

def = &engine.ConstraintDef{
Cts: []engine.Constraint{
Expand All @@ -79,7 +108,7 @@ func init() {
},
},
}
MoTableConstraint, _ = def.MarshalBinary()
d.MoTableConstraint, _ = def.MarshalBinary()

def = &engine.ConstraintDef{
Cts: []engine.Constraint{
Expand All @@ -93,7 +122,8 @@ func init() {
},
},
}
MoColumnConstraint, _ = def.MarshalBinary()
d.MoColumnConstraint, _ = def.MarshalBinary()
return d
}

func newAttributeDef(name string, typ types.Type, isPrimary bool) engine.TableDef {
Expand Down
13 changes: 0 additions & 13 deletions pkg/catalog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,19 +741,6 @@ var (
MO_COLUMNS_ATT_SEQNUM_IDX,
MO_COLUMNS_ATT_ENUM_IDX,
}

// used by memengine or tae
MoDatabaseTableDefs = []engine.TableDef{}
// used by memengine or tae
MoTablesTableDefs = []engine.TableDef{}
// used by memengine or tae
MoColumnsTableDefs = []engine.TableDef{}
// used by memengine or tae or cn
MoTableMetaDefs = []engine.TableDef{}

MoDatabaseConstraint = []byte{}
MoTableConstraint = []byte{}
MoColumnConstraint = []byte{}
)

var (
Expand Down
1 change: 1 addition & 0 deletions pkg/cnservice/memory_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (s *service) initMemoryEngineNonDist(
runtime.ServiceRuntime(s.cfg.UUID).SetGlobalVariables(runtime.ClusterService, cluster)

storage, err := memorystorage.NewMemoryStorage(
s.cfg.UUID,
mp,
ck,
memoryengine.RandomIDGenerator,
Expand Down
11 changes: 0 additions & 11 deletions pkg/cnservice/server_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/frontend"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
Expand Down Expand Up @@ -249,16 +248,6 @@ func (s *service) registerExecutorsLocked() {
return
}

pu := config.NewParameterUnit(
&s.cfg.Frontend,
nil,
nil,
nil)
pu.StorageEngine = s.storeEngine
pu.TxnClient = s._txnClient
s.cfg.Frontend.SetDefaultValues()
pu.FileService = s.fileService
pu.LockService = s.lockService
ieFactory := func() ie.InternalExecutor {
return frontend.NewInternalExecutor(s.cfg.UUID)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/cnservice/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,9 @@ func (c *Config) Validate() error {

// pessimistic mode implies primary key check
if txn.GetTxnMode(c.Txn.Mode) == txn.TxnMode_Pessimistic || c.PrimaryKeyCheck {
config.CNPrimaryCheck = true
config.CNPrimaryCheck.Store(true)
} else {
config.CNPrimaryCheck = false
config.CNPrimaryCheck.Store(false)
}

if c.LargestEntryLimit > 0 {
Expand All @@ -406,12 +406,12 @@ func (c *Config) Validate() error {

if c.MaxPreparedStmtCount > 0 {
if c.MaxPreparedStmtCount > maxForMaxPreparedStmtCount {
frontend.MaxPrepareNumberInOneSession = maxForMaxPreparedStmtCount
frontend.MaxPrepareNumberInOneSession.Store(uint32(maxForMaxPreparedStmtCount))
} else {
frontend.MaxPrepareNumberInOneSession = c.MaxPreparedStmtCount
frontend.MaxPrepareNumberInOneSession.Store(uint32(c.MaxPreparedStmtCount))
}
} else {
frontend.MaxPrepareNumberInOneSession = 100000
frontend.MaxPrepareNumberInOneSession.Store(100000)
}
c.QueryServiceConfig.Adjust(foundMachineHost, defaultQueryServiceListenAddress)

Expand Down
3 changes: 2 additions & 1 deletion pkg/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"os"
"path/filepath"
"sync/atomic"
"time"

"github.com/matrixorigin/matrixone/pkg/fileservice"
Expand Down Expand Up @@ -165,7 +166,7 @@ var (
// largestEntryLimit is the max size for reading file to csv buf
LargestEntryLimit = 10 * 1024 * 1024

CNPrimaryCheck = false
CNPrimaryCheck atomic.Bool
)

// FrontendParameters of the frontend
Expand Down
53 changes: 53 additions & 0 deletions pkg/embed/clock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2021-2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package embed

import (
"context"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/stopper"
"github.com/matrixorigin/matrixone/pkg/txn/clock"
)

const (
localClockBackend = "LOCAL"
hlcClockBackend = "HLC"
)

func newClock(
cfg ServiceConfig,
stopper *stopper.Stopper,
) (clock.Clock, error) {
var c clock.Clock
switch cfg.Clock.Backend {
case localClockBackend:
c = newLocalClock(cfg, stopper)
default:
return nil, moerr.NewInternalError(context.Background(), "not implement for %s", cfg.Clock.Backend)
}
c.SetNodeID(cfg.hashNodeID())
return c, nil
}

func newLocalClock(
cfg ServiceConfig,
stopper *stopper.Stopper,
) clock.Clock {
return clock.NewUnixNanoHLCClockWithStopper(
stopper,
cfg.Clock.MaxClockOffset.Duration,
)
}
Loading

0 comments on commit 839495f

Please sign in to comment.