Skip to content

Commit

Permalink
Merge branch 'main' into fix-context
Browse files Browse the repository at this point in the history
  • Loading branch information
m-schen authored Aug 8, 2024
2 parents 7500a22 + 6816ea5 commit 3a0508a
Show file tree
Hide file tree
Showing 13 changed files with 81 additions and 56 deletions.
19 changes: 0 additions & 19 deletions pkg/cnservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,25 +223,6 @@ func NewService(
srv.server = server
srv.storeEngine = pu.StorageEngine

srv.requestHandler = func(ctx context.Context,
cnAddr string,
message morpc.Message,
cs morpc.ClientSession,
engine engine.Engine,
fService fileservice.FileService,
lockService lockservice.LockService,
queryClient qclient.QueryClient,
hakeeper logservice.CNHAKeeperClient,
udfService udf.Service,
cli client.TxnClient,
aicm *defines.AutoIncrCacheManager,
messageAcquirer func() morpc.Message) error {
return nil
}
for _, opt := range options {
opt(srv)
}

// TODO: global client need to refactor
c, err := cnclient.NewPipelineClient(
cfg.UUID,
Expand Down
2 changes: 0 additions & 2 deletions pkg/embed/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
)

func TestBasicCluster(t *testing.T) {
// TODO: #17842
t.SkipNow()
c, err := NewCluster(WithCNCount(3))
require.NoError(t, err)
require.NoError(t, c.Start())
Expand Down
3 changes: 2 additions & 1 deletion pkg/util/export/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ import (
)

func init() {
time.Local = time.FixedZone("CST", 0) // set time-zone +0000
// Tips: Op 'time.Local = time.FixedZone(...)' would cause DATA RACE against to time.Now()

logutil.SetupMOLogger(&logutil.LogConfig{
Level: zapcore.DebugLevel.String(),
Format: "console",
Expand Down
7 changes: 4 additions & 3 deletions pkg/util/export/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ import (
)

func init() {
time.Local = time.FixedZone("CST", 0) // set time-zone +0000
// Tips: Op 'time.Local = time.FixedZone(...)' would cause DATA RACE against to time.Now()

table.RegisterTableDefine(dummyTable)
runtime.SetupServiceBasedRuntime("", runtime.NewRuntime(metadata.ServiceType_CN, "test", logutil.GetGlobalLogger()))
}
Expand Down Expand Up @@ -124,8 +125,8 @@ func TestInitCronExpr(t *testing.T) {
sche, err := parser.Parse(MergeTaskCronExpr)
require.Nil(t, err)

now := time.Unix(60, 0)
next := sche.Next(time.UnixMilli(now.UnixMilli()))
now := time.Unix(60, 0).UTC()
next := sche.Next(time.UnixMilli(now.UnixMilli()).UTC())
t.Logf("duration: %v, expr: %s, next: %v", tt.args.duration, MergeTaskCronExpr, next)
if tt.expectDuration > 0 {
require.Equal(t, tt.expectDuration, next.Sub(now))
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/export/table/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func init() {
time.Local = time.FixedZone("CST", 0) // set time-zone +0000
// Tips: Op 'time.Local = time.FixedZone(...)' would cause DATA RACE against to time.Now()
}

func TestNoopTableOptions_FormatDdl(t *testing.T) {
Expand Down Expand Up @@ -364,9 +364,9 @@ func TestColumnField_EncodedDatetime(t *testing.T) {
{
name: "Unix_Zero",
fields: fields{
cf: TimeField(time.Unix(0, 0)),
cf: TimeField(time.Unix(0, 0).UTC()),
},
wantT: time.Unix(0, 0),
wantT: time.Unix(0, 0).UTC(),
want: "1970-01-01 00:00:00.000000",
},
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/export/table/type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func init() {
time.Local = time.FixedZone("CST", 0) // set time-zone +0000
// Tips: Op 'time.Local = time.FixedZone(...)' would cause DATA RACE against to time.Now()
}

func TestPathBuilder(t *testing.T) {
Expand Down Expand Up @@ -58,7 +58,7 @@ func TestPathBuilder(t *testing.T) {
args: args{
account: "user",
typ: MergeLogTypeLogs,
ts: time.Unix(0, 0),
ts: time.Unix(0, 0).UTC(),
db: "db",
name: "table",
nodeUUID: "123456",
Expand All @@ -75,7 +75,7 @@ func TestPathBuilder(t *testing.T) {
args: args{
account: "user",
typ: MergeLogTypeLogs,
ts: time.Unix(0, 0),
ts: time.Unix(0, 0).UTC(),
db: "db",
name: "table",
nodeUUID: "123456",
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/trace/impl/motrace/buffer_pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ func noopReportError(context.Context, error, int) {}
var dummyBaseTime time.Time

func init() {
time.Local = time.FixedZone("CST", 0) // set time-zone +0000
dummyBaseTime = time.Unix(0, 0)
// Tips: Op 'time.Local = time.FixedZone(...)' would cause DATA RACE against to time.Now()

dummyBaseTime = time.Unix(0, 0).UTC()
SV := config.ObservabilityParameters{}
SV.SetDefaultValues("v0.test.0")
SV.TraceExportInterval = 15
Expand Down
30 changes: 11 additions & 19 deletions pkg/vm/engine/disttae/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,9 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
)

var (
timeFixed bool
moCatalogCreatedTime *vector.Vector
moDatabaseCreatedTime *vector.Vector
moTablesCreatedTime *vector.Vector
moColumnsCreatedTime *vector.Vector
)

// tryAdjustThreeTablesCreatedTime analyzes the mo_tables batch and tries to adjust the created time of the three tables.
func tryAdjustThreeTablesCreatedTimeWithBatch(b *batch.Batch) {
if timeFixed {
func (e *Engine) tryAdjustThreeTablesCreatedTimeWithBatch(b *batch.Batch) {
if e.timeFixed {
return
}

Expand All @@ -55,11 +47,11 @@ func tryAdjustThreeTablesCreatedTimeWithBatch(b *batch.Batch) {
tname := b.Vecs[tnameIdx].GetStringAt(i)
if aid == 0 && tname == "mo_user" {
ts := vector.GetFixedAt[types.Timestamp](b.Vecs[createdTsIdx], i)
vector.SetFixedAt(moDatabaseCreatedTime, 0, ts)
vector.SetFixedAt(moTablesCreatedTime, 0, ts)
vector.SetFixedAt(moColumnsCreatedTime, 0, ts)
vector.SetFixedAt(moCatalogCreatedTime, 0, ts)
timeFixed = true
vector.SetFixedAt(e.moDatabaseCreatedTime, 0, ts)
vector.SetFixedAt(e.moTablesCreatedTime, 0, ts)
vector.SetFixedAt(e.moColumnsCreatedTime, 0, ts)
vector.SetFixedAt(e.moCatalogCreatedTime, 0, ts)
e.timeFixed = true
return
}
}
Expand Down Expand Up @@ -90,7 +82,7 @@ func (e *Engine) init(ctx context.Context) error {
if err != nil {
return err
}
moCatalogCreatedTime = bat.Vecs[catalog.MO_DATABASE_CREATED_TIME_IDX]
e.moCatalogCreatedTime = bat.Vecs[catalog.MO_DATABASE_CREATED_TIME_IDX]
ibat, err := fillRandomRowidAndZeroTs(bat, m)
if err != nil {
bat.Clean(m)
Expand Down Expand Up @@ -119,7 +111,7 @@ func (e *Engine) init(ctx context.Context) error {
if err != nil {
return err
}
moDatabaseCreatedTime = bat.Vecs[catalog.MO_TABLES_CREATED_TIME_IDX]
e.moDatabaseCreatedTime = bat.Vecs[catalog.MO_TABLES_CREATED_TIME_IDX]
ibat, err := fillRandomRowidAndZeroTs(bat, m)
if err != nil {
bat.Clean(m)
Expand Down Expand Up @@ -177,7 +169,7 @@ func (e *Engine) init(ctx context.Context) error {
if err != nil {
return err
}
moTablesCreatedTime = bat.Vecs[catalog.MO_TABLES_CREATED_TIME_IDX]
e.moTablesCreatedTime = bat.Vecs[catalog.MO_TABLES_CREATED_TIME_IDX]
ibat, err := fillRandomRowidAndZeroTs(bat, m)
if err != nil {
bat.Clean(m)
Expand Down Expand Up @@ -225,7 +217,7 @@ func (e *Engine) init(ctx context.Context) error {
if err != nil {
return err
}
moColumnsCreatedTime = bat.Vecs[catalog.MO_TABLES_CREATED_TIME_IDX]
e.moColumnsCreatedTime = bat.Vecs[catalog.MO_TABLES_CREATED_TIME_IDX]
ibat, err := fillRandomRowidAndZeroTs(bat, m)
if err != nil {
bat.Clean(m)
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/disttae/logtail_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func (c *PushClient) replayCatalogCache(ctx context.Context, e *Engine) error {
if err = fillTsVecForSysTableQueryBatch(b, typeTs, result.Mp); err != nil {
return err
}
tryAdjustThreeTablesCreatedTimeWithBatch(b)
e.tryAdjustThreeTablesCreatedTimeWithBatch(b)
e.catalog.InsertTable(b)
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/vm/engine/disttae/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ type Engine struct {

//for message on multiCN, use uuid to get the messageBoard
messageCenter *message.MessageCenter

timeFixed bool
moCatalogCreatedTime *vector.Vector
moDatabaseCreatedTime *vector.Vector
moTablesCreatedTime *vector.Vector
moColumnsCreatedTime *vector.Vector
}

func (txn *Transaction) String() string {
Expand Down
3 changes: 0 additions & 3 deletions pkg/vm/engine/tae/containers/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,3 @@ func newVectorElement(pool *VectorPool, t *types.Type, mp *mpool.MPool) *vectorP
}
return element
}

//go:linkname fastrand runtime.fastrand
func fastrand() uint32
24 changes: 24 additions & 0 deletions pkg/vm/engine/tae/containers/runtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !go1.22

package containers

import (
_ "unsafe"
)

//go:linkname fastrand runtime.fastrand
func fastrand() uint32
24 changes: 24 additions & 0 deletions pkg/vm/engine/tae/containers/runtime_1.22.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.22

package containers

import (
_ "unsafe"
)

//go:linkname fastrand runtime.cheaprand
func fastrand() uint32

0 comments on commit 3a0508a

Please sign in to comment.