Skip to content

Commit

Permalink
doing
Browse files Browse the repository at this point in the history
  • Loading branch information
aptend committed May 15, 2024
1 parent a364e6c commit aea2924
Show file tree
Hide file tree
Showing 24 changed files with 1,080 additions and 1,555 deletions.
89 changes: 46 additions & 43 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,40 +146,7 @@ func ParseEntryList(es []*api.Entry) (any, []*api.Entry, error) {
cmds := genCreateTables(GenRows(bat))
idx := 0
for i := range cmds {
// tae's logic
if len(cmds[i].Comment) > 0 {
cmds[i].Defs = append(cmds[i].Defs, &engine.CommentDef{
Comment: cmds[i].Comment,
})
}
if len(cmds[i].Viewdef) > 0 {
cmds[i].Defs = append(cmds[i].Defs, &engine.ViewDef{
View: cmds[i].Viewdef,
})
}
if len(cmds[i].Constraint) > 0 {
c := new(engine.ConstraintDef)
if err = c.UnmarshalBinary(cmds[i].Constraint); err != nil {
return nil, nil, err
}
cmds[i].Defs = append(cmds[i].Defs, c)
}
if cmds[i].Partitioned > 0 || len(cmds[i].Partition) > 0 {
cmds[i].Defs = append(cmds[i].Defs, &engine.PartitionDef{
Partitioned: cmds[i].Partitioned,
Partition: cmds[i].Partition,
})
}
pro := new(engine.PropertiesDef)
pro.Properties = append(pro.Properties, engine.Property{
Key: SystemRelAttr_Kind,
Value: string(cmds[i].RelKind),
})
pro.Properties = append(pro.Properties, engine.Property{
Key: SystemRelAttr_CreateSQL,
Value: cmds[i].CreateSql,
})
cmds[i].Defs = append(cmds[i].Defs, pro)
// fill columns
if err = fillCreateTable(&idx, &cmds[i], es); err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -232,19 +199,55 @@ func genCreateTables(rows [][]any) []CreateTable {
cmds[i].Constraint = row[MO_TABLES_CONSTRAINT_IDX].([]byte)
cmds[i].RelKind = string(row[MO_TABLES_RELKIND_IDX].([]byte))
}

for i := range cmds {
// tae's logic
if len(cmds[i].Comment) > 0 {
cmds[i].Defs = append(cmds[i].Defs, &engine.CommentDef{
Comment: cmds[i].Comment,
})
}
if len(cmds[i].Viewdef) > 0 {
cmds[i].Defs = append(cmds[i].Defs, &engine.ViewDef{
View: cmds[i].Viewdef,
})
}
if len(cmds[i].Constraint) > 0 {
c := new(engine.ConstraintDef)
if err := c.UnmarshalBinary(cmds[i].Constraint); err != nil {
panic(err)
}
cmds[i].Defs = append(cmds[i].Defs, c)
}
if cmds[i].Partitioned > 0 || len(cmds[i].Partition) > 0 {
cmds[i].Defs = append(cmds[i].Defs, &engine.PartitionDef{
Partitioned: cmds[i].Partitioned,
Partition: cmds[i].Partition,
})
}
pro := new(engine.PropertiesDef)
pro.Properties = append(pro.Properties, engine.Property{
Key: SystemRelAttr_Kind,
Value: string(cmds[i].RelKind),
})
pro.Properties = append(pro.Properties, engine.Property{
Key: SystemRelAttr_CreateSQL,
Value: cmds[i].CreateSql,
})
cmds[i].Defs = append(cmds[i].Defs, pro)
}
return cmds
}

func genUpdateConstraint(rows [][]any) []UpdateConstraint {
cmds := make([]UpdateConstraint, len(rows))
for i, row := range rows {
cmds[i].TableId = row[MO_TABLES_REL_ID_IDX].(uint64)
cmds[i].DatabaseId = row[MO_TABLES_RELDATABASE_ID_IDX].(uint64)
cmds[i].TableName = string(row[MO_TABLES_REL_NAME_IDX].([]byte))
cmds[i].DatabaseName = string(row[MO_TABLES_RELDATABASE_IDX].([]byte))
cmds[i].Constraint = row[MO_TABLES_UPDATE_CONSTRAINT].([]byte)
func genUpdateConstraint(rows [][]any) []*api.AlterTableReq {
reqs := make([]*api.AlterTableReq, len(rows))
for _, row := range rows {
did := row[MO_TABLES_RELDATABASE_ID_IDX].(uint64)
tid := row[MO_TABLES_REL_ID_IDX].(uint64)
cstr := row[MO_TABLES_UPDATE_CONSTRAINT].([]byte)
reqs = append(reqs, api.NewUpdateConstraintReq(did, tid, string(cstr)))
}
return cmds
return reqs
}

func genUpdateAltertable(rows [][]any) ([]*api.AlterTableReq, error) {
Expand Down
16 changes: 7 additions & 9 deletions pkg/catalog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package catalog

import (
"fmt"
"strings"

"github.com/matrixorigin/matrixone/pkg/container/types"
Expand Down Expand Up @@ -402,9 +403,9 @@ type CreateDatabase struct {
Name string
CreateSql string
DatTyp string
Owner uint32
Creator uint32
AccountId uint32
Owner uint32 // roleid
Creator uint32 // userid
AccountId uint32 // tenantid
CreatedTime types.Timestamp
}

Expand All @@ -431,12 +432,9 @@ type CreateTable struct {
Defs []engine.TableDef
}

type UpdateConstraint struct {
DatabaseId uint64
TableId uint64
TableName string
DatabaseName string
Constraint []byte
func (t CreateTable) String() string {
return fmt.Sprintf("{aid-%v,uid-%v,rid-%v}: %d-%s:%d-%s, %q",
t.AccountId, t.Creator, t.Owner, t.DatabaseId, t.DatabaseName, t.TableId, t.Name, t.CreateSql)
}

type DropOrTruncateTable struct {
Expand Down
8 changes: 8 additions & 0 deletions pkg/pb/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ func (m *SyncLogTailResp) UnmarshalBinary(data []byte) error {
return m.Unmarshal(data)
}

func (m *TNStringResponse) MarshalBinary() ([]byte, error) {
return m.Marshal()
}

func (m *TNStringResponse) UnmarshalBinary(data []byte) error {
return m.Unmarshal(data)
}

func (m *PrecommitWriteCmd) MarshalBinary() ([]byte, error) {
return m.Marshal()
}
Expand Down
7 changes: 0 additions & 7 deletions pkg/vm/engine/tae/catalog/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,6 @@ type BaseEntryImpl[T BaseNode[T]] struct {
*txnbase.MVCCChain[*MVCCNode[T]]
}

func NewReplayBaseEntry[T BaseNode[T]](factory func() T) *BaseEntryImpl[T] {
be := &BaseEntryImpl[T]{
MVCCChain: txnbase.NewMVCCChain(CompareBaseNode[T], NewEmptyMVCCNodeFactory(factory), nil),
}
return be
}

func NewBaseEntry[T BaseNode[T]](factory func() T) *BaseEntryImpl[T] {
return &BaseEntryImpl[T]{
MVCCChain: txnbase.NewMVCCChain(CompareBaseNode[T], NewEmptyMVCCNodeFactory(factory), nil),
Expand Down
33 changes: 13 additions & 20 deletions pkg/vm/engine/tae/catalog/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,36 +105,28 @@ func NewDBEntryWithID(catalog *Catalog, name string, createSql, datTyp string, i
}

func NewSystemDBEntry(catalog *Catalog) *DBEntry {
entry := &DBEntry{
ID: pkgcatalog.MO_CATALOG_ID,
BaseEntryImpl: NewBaseEntry(
func() *EmptyMVCCNode {
return &EmptyMVCCNode{}
}),
catalog: catalog,
DBNode: &DBNode{
name: pkgcatalog.MO_CATALOG,
createSql: "create database " + pkgcatalog.MO_CATALOG,
},
entries: make(map[uint64]*common.GenericDLNode[*TableEntry]),
nameNodes: make(map[string]*nodeList[*TableEntry]),
link: common.NewGenericSortedDList((*TableEntry).Less),
isSys: true,
entry := NewReplayDBEntry()
entry.isSys = true
entry.ID = pkgcatalog.MO_CATALOG_ID
entry.catalog = catalog
entry.DBNode = &DBNode{
name: pkgcatalog.MO_CATALOG,
createSql: "create database " + pkgcatalog.MO_CATALOG,
}
entry.CreateWithTS(types.SystemDBTS, &EmptyMVCCNode{})
return entry
}

func NewReplayDBEntry() *DBEntry {
entry := &DBEntry{
BaseEntryImpl: NewReplayBaseEntry(
func() *EmptyMVCCNode { return &EmptyMVCCNode{} }),
entries: make(map[uint64]*common.GenericDLNode[*TableEntry]),
nameNodes: make(map[string]*nodeList[*TableEntry]),
link: common.NewGenericSortedDList((*TableEntry).Less),
BaseEntryImpl: NewBaseEntry(func() *EmptyMVCCNode { return &EmptyMVCCNode{} }),
entries: make(map[uint64]*common.GenericDLNode[*TableEntry]),
nameNodes: make(map[string]*nodeList[*TableEntry]),
link: common.NewGenericSortedDList((*TableEntry).Less),
}
return entry
}

func (e *DBEntry) GetID() uint64 { return e.ID }
func (e *DBEntry) IsSystemDB() bool { return e.isSys }
func (e *DBEntry) CoarseTableCnt() int {
Expand Down Expand Up @@ -173,6 +165,7 @@ func (e *DBEntry) String() string {
func (e *DBEntry) StringLocked() string {
return e.StringWithlevelLocked(common.PPL1)
}

func (e *DBEntry) StringWithLevel(level common.PPLevel) string {
e.RLock()
defer e.RUnlock()
Expand Down
3 changes: 2 additions & 1 deletion pkg/vm/engine/tae/catalog/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package catalog

import (
"context"
"sync"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand Down Expand Up @@ -229,7 +230,7 @@ func (txn *mockTxn) CreateDatabase(name, createSql, datTyp string) (handle.Datab
return h, nil
}

func (txn *mockTxn) CreateDatabaseWithID(name, createSql, datTyp string, id uint64) (handle.Database, error) {
func (txn *mockTxn) CreateDatabaseWithID(ctx context.Context, name, createSql, datTyp string, id uint64) (handle.Database, error) {
entry, err := txn.catalog.CreateDBEntryWithID(name, createSql, datTyp, id, txn)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/vm/engine/tae/catalog/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func NewObjectEntryByMetaLocation(

func NewReplayObjectEntry() *ObjectEntry {
e := &ObjectEntry{
BaseEntryImpl: NewReplayBaseEntry(
BaseEntryImpl: NewBaseEntry(
func() *ObjectMVCCNode { return &ObjectMVCCNode{*objectio.NewObjectStats()} }),
}
return e
Expand Down Expand Up @@ -568,6 +568,7 @@ func (entry *ObjectEntry) GetSchemaLocked() *Schema {
// a block can be compacted:
// 1. no uncommited node
// 2. at least one committed node
// Note: Soft deleted nobjects might have in memory deletes to be flushed.
func (entry *ObjectEntry) PrepareCompact() bool {
entry.RLock()
defer entry.RUnlock()
Expand Down
38 changes: 15 additions & 23 deletions pkg/vm/engine/tae/catalog/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,27 +113,19 @@ func NewTableEntryWithTableId(db *DBEntry, schema *Schema, txnCtx txnif.AsyncTxn
}

func NewSystemTableEntry(db *DBEntry, id uint64, schema *Schema) *TableEntry {
opts := btree.Options{
Degree: 4,
}
e := &TableEntry{
ID: id,
BaseEntryImpl: NewBaseEntry(
func() *TableMVCCNode { return &TableMVCCNode{} }),
db: db,
TableNode: &TableNode{},
link: common.NewGenericSortedDList((*ObjectEntry).Less),
entries: make(map[types.Objectid]*common.GenericDLNode[*ObjectEntry]),
deleteList: btree.NewBTreeGOptions(DeleteEntry.Less, opts),
Stats: common.NewTableCompactStat(),
}
e := NewReplayTableEntry()
e.ID = id
e.db = db

e.TableNode.schema.Store(schema)
e.CreateWithTS(types.SystemDBTS, &TableMVCCNode{Schema: schema})

var sid types.Uuid
if schema.Name == SystemTableSchema.Name {
sid = SystemObject_Table_ID
} else if schema.Name == SystemDBSchema.Name {
if schema.Name == SystemDBSchema.Name {
e.tableData = DefaultTableDataFactory(e) // TODO(aptend): add data handle
sid = SystemObject_DB_ID
} else if schema.Name == SystemTableSchema.Name {
sid = SystemObject_Table_ID
} else if schema.Name == SystemColumnSchema.Name {
sid = SystemObject_Columns_ID
} else {
Expand All @@ -149,12 +141,12 @@ func NewReplayTableEntry() *TableEntry {
Degree: 4,
}
e := &TableEntry{
BaseEntryImpl: NewReplayBaseEntry(
func() *TableMVCCNode { return &TableMVCCNode{} }),
link: common.NewGenericSortedDList((*ObjectEntry).Less),
entries: make(map[types.Objectid]*common.GenericDLNode[*ObjectEntry]),
deleteList: btree.NewBTreeGOptions(DeleteEntry.Less, opts),
Stats: common.NewTableCompactStat(),
BaseEntryImpl: NewBaseEntry(func() *TableMVCCNode { return &TableMVCCNode{} }),
link: common.NewGenericSortedDList((*ObjectEntry).Less),
TableNode: &TableNode{},
entries: make(map[types.Objectid]*common.GenericDLNode[*ObjectEntry]),
deleteList: btree.NewBTreeGOptions(DeleteEntry.Less, opts),
Stats: common.NewTableCompactStat(),
}
return e
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/vm/engine/tae/catalog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package catalog

type EntryState int8

var DefaultTableDataFactory TableDataFactory

const (
ES_Appendable EntryState = iota
ES_NotAppendable
Expand Down
1 change: 1 addition & 0 deletions pkg/vm/engine/tae/db/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func Open(ctx context.Context, dirname string, opts *options.Options) (db *DB, e
dataFactory := tables.NewDataFactory(
db.Runtime, db.Dir,
)
catalog.DefaultTableDataFactory = dataFactory.MakeTableFactory()
if db.Catalog, err = catalog.OpenCatalog(db.usageMemo); err != nil {
return
}
Expand Down
Loading

0 comments on commit aea2924

Please sign in to comment.