Skip to content

Commit

Permalink
support subscription in mo table size/rows. (#20785)
Browse files Browse the repository at this point in the history
mo table size/rows currently do not support subscription db and it never was.

Approved by: @qingxinhome, @daviszhen, @aunjgr, @heni02, @m-schen
  • Loading branch information
gouhongshen authored Dec 20, 2024
1 parent f491bcd commit 599d364
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 12 deletions.
17 changes: 14 additions & 3 deletions pkg/frontend/back_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1140,11 +1140,12 @@ func (sh *SqlHelper) GetSubscriptionMeta(dbName string) (*plan.SubscriptionMeta,
return sh.ses.txnCompileCtx.GetSubscriptionMeta(dbName, nil)
}

// Made for sequence func. nextval, setval.
func (sh *SqlHelper) ExecSql(sql string) (ret [][]interface{}, err error) {
func (sh *SqlHelper) execSql(
ctx context.Context,
sql string,
) (ret [][]interface{}, err error) {
var erArray []ExecResult

ctx := sh.ses.txnCompileCtx.execCtx.reqCtx
/*
if we run the transaction statement (BEGIN, ect) here , it creates an independent transaction.
if we do not run the transaction statement (BEGIN, ect) here, it runs the sql in the share transaction
Expand All @@ -1171,3 +1172,13 @@ func (sh *SqlHelper) ExecSql(sql string) (ret [][]interface{}, err error) {

return erArray[0].(*MysqlResultSet).Data, nil
}

// Made for sequence func. nextval, setval.
func (sh *SqlHelper) ExecSql(sql string) (ret [][]interface{}, err error) {
ctx := sh.ses.txnCompileCtx.execCtx.reqCtx
return sh.execSql(ctx, sql)
}

func (sh *SqlHelper) ExecSqlWithCtx(ctx context.Context, sql string) ([][]interface{}, error) {
return sh.execSql(ctx, sql)
}
182 changes: 173 additions & 9 deletions pkg/sql/plan/function/func_mo.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,113 @@ type GetMoTableSizeRowsFuncType = func() func(
var GetMoTableSizeFunc atomic.Pointer[GetMoTableSizeRowsFuncType]
var GetMoTableRowsFunc atomic.Pointer[GetMoTableSizeRowsFuncType]

type subscription struct {
valid bool

oriAccId uint64
oriDatabaseId uint64
oriTableId uint64

oriTableName string
oriDatabaseName string
}

func (s subscription) String() string {
return fmt.Sprintf("valid: %v, oriAcc(%d), oriDatabase(%d-%s), oriTable(%d-%s)",
s.valid,
s.oriAccId,
s.oriDatabaseId,
s.oriDatabaseName,
s.oriTableId,
s.oriTableName)
}

func isSubscribedTable(
proc *process.Process,
reqAcc uint32,
db engine.Database,
dbName, tblName string,
) (sub subscription, err error) {

var (
sql string
ret [][]interface{}
meta *plan.SubscriptionMeta
)

if db.IsSubscription(proc.Ctx) {
defer func() {
if err != nil {
sub.valid = false

metaInfo := ""
if meta != nil {
metaInfo = fmt.Sprintf("ACC(%s,%d)-DB(%s)-TBLS(%s)",
meta.AccountName, meta.AccountId, meta.DbName, meta.Tables)
}

logutil.Error("MO_TABLE_SIZE/ROWS",
zap.String("source", "isSubscribedTable"),
zap.Error(err),
zap.String("sub meta", metaInfo),
zap.Uint32("request acc", reqAcc),
zap.String("db name", dbName),
zap.String("tbl name", tblName),
zap.String("subscription", sub.String()),
zap.String("sql", sql),
)
}
}()

meta, err = proc.GetSessionInfo().SqlHelper.GetSubscriptionMeta(dbName)
if err != nil {
return sub,
moerr.NewInternalErrorNoCtx(fmt.Sprintf("get subscription meta failed, err: %v", err))
}

if meta.Tables != pubsub.TableAll && !strings.Contains(meta.Tables, tblName) {
return sub, moerr.NewInternalErrorNoCtx("no such subscribed table")
}

// check passed, get acc, db, tbl info
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
ctx = defines.AttachAccountId(ctx, uint32(sysAccountID))
defer cancel()

sql = fmt.Sprintf(`
select
reldatabase_id, rel_id
from
mo_catalog.mo_tables
where
account_id = %d and reldatabase = '%s' and relname = '%s';`,
meta.AccountId, meta.DbName, tblName)

ret, err = proc.GetSessionInfo().SqlHelper.ExecSqlWithCtx(ctx, sql)
if err != nil {
return sub,
moerr.NewInternalErrorNoCtx(fmt.Sprintf("exec get subscribed tbl info sql failed, err: %v", err))
}

if len(ret) != 1 {
return sub,
moerr.NewInternalErrorNoCtx(fmt.Sprintf("get the subscribed tbl info empty: %s", tblName))
}

sub.valid = true
sub.oriAccId = uint64(meta.AccountId)
sub.oriDatabaseId = ret[0][0].(uint64)
sub.oriTableId = ret[0][1].(uint64)
sub.oriTableName = tblName
sub.oriDatabaseName = meta.DbName

return sub, nil
}

sub.valid = false
return sub, nil
}

func MoTableSizeRowsHelper(
iVecs []*vector.Vector,
result vector.FunctionResultWrapper,
Expand Down Expand Up @@ -283,17 +390,28 @@ func MoTableSizeRowsHelper(
return err
}

if rel, err = db.Relation(proc.Ctx, tblName, nil); err != nil {
if moerr.IsMoErrCode(err, moerr.OkExpectedEOB) {
return moerr.NewInternalErrorNoCtxf("tbl not exist: %s-%s(%s)",
dbName, tblName, "OkExpectedEOB")
}
var sub subscription
if sub, err = isSubscribedTable(
proc, accountId, db, dbName, tblName); err != nil {
return err
}
} else if sub.valid {
// is subscription
accIds = append(accIds, sub.oriAccId)
dbIds = append(dbIds, sub.oriDatabaseId)
tblIds = append(tblIds, sub.oriTableId)
} else {
if rel, err = db.Relation(proc.Ctx, tblName, nil); err != nil {
if moerr.IsMoErrCode(err, moerr.OkExpectedEOB) {
return moerr.NewInternalErrorNoCtxf("tbl not exist: %s-%s(%s)",
dbName, tblName, "OkExpectedEOB")
}
return err
}

accIds = append(accIds, uint64(accountId))
dbIds = append(dbIds, uint64(rel.GetDBID(proc.Ctx)))
tblIds = append(tblIds, uint64(rel.GetTableID(proc.Ctx)))
accIds = append(accIds, uint64(accountId))
dbIds = append(dbIds, uint64(rel.GetDBID(proc.Ctx)))
tblIds = append(tblIds, uint64(rel.GetTableID(proc.Ctx)))
}
}

ret, err = (*executor.Load())()(
Expand Down Expand Up @@ -399,6 +517,29 @@ func MoTableRowsOld(ivecs []*vector.Vector, result vector.FunctionResultWrapper,
}
return err
}

var accId uint32
accId, err = defines.GetAccountId(foolCtx)
if err != nil {
return err
}

var sub subscription
if sub, err = isSubscribedTable(
proc, accId, dbo, dbStr, tblStr); err != nil {
logutil.Error("MoTableRowsOld",
zap.String("source", "isSubscribeTable"),
zap.Error(err))
return err
} else if sub.valid {
// subscription
foolCtx = defines.AttachAccountId(foolCtx, uint32(sub.oriAccId))
dbo, err = e.Database(foolCtx, sub.oriDatabaseName, txn)
if err != nil {
return err
}
}

rel, err = dbo.Relation(foolCtx, tblStr, nil)
if err != nil {
return err
Expand Down Expand Up @@ -518,6 +659,29 @@ func MoTableSizeOld(ivecs []*vector.Vector, result vector.FunctionResultWrapper,
}
return err
}

var accId uint32
accId, err = defines.GetAccountId(foolCtx)
if err != nil {
return err
}

var sub subscription
if sub, err = isSubscribedTable(
proc, accId, dbo, dbStr, tblStr); err != nil {
logutil.Error("MoTableSizeOld",
zap.String("source", "isSubscribeTable"),
zap.Error(err))
return err
} else if sub.valid {
// subscription
foolCtx = defines.AttachAccountId(foolCtx, uint32(sub.oriAccId))
dbo, err = e.Database(foolCtx, sub.oriDatabaseName, txn)
if err != nil {
return err
}
}

rel, err = dbo.Relation(foolCtx, tblStr, nil)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/vm/process/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ type Process struct {
type sqlHelper interface {
GetCompilerContext() any
ExecSql(string) ([][]interface{}, error)
ExecSqlWithCtx(context.Context, string) ([][]interface{}, error)
GetSubscriptionMeta(string) (sub *plan.SubscriptionMeta, err error)
}

Expand Down
67 changes: 67 additions & 0 deletions test/distributed/cases/function/func_mo.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
drop database if exists testdb;
create database testdb;
use testdb;
create account acc admin_name "root" identified by "111";
create publication pub1 database testdb account all;
create table t1 (a int);
create table t2 (a int);
create table t3 (a int);
insert into t1 select * from generate_series(1, 1000)g;
insert into t2 select * from generate_series(1, 1000)g;
insert into t3 select * from generate_series(1, 1000)g;
drop database if exists testdb_sub;
create database testdb_sub from sys publication pub1;
drop database if exists testdb_nor;
create database testdb_nor;
use testdb_nor;
create table t1 (a int);
create table t2 (a int);
create table t3 (a int);
insert into t1 select * from generate_series(1, 1001)g;
insert into t2 select * from generate_series(1, 1001)g;
insert into t3 select * from generate_series(1, 1001)g;
create table tmp(dbName varchar, tblName varchar);
insert into tmp values ("testdb_nor", "t1"), ("testdb_nor", "t2"), ("testdb_nor", "t3");
insert into tmp values ("testdb_sub", "t1"), ("testdb_sub", "t2"), ("testdb_sub", "t3");
set mo_table_stats.force_update = yes;
select mo_table_rows(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);
mo_table_rows(dbName, tblName)
1001
1001
1001
1000
1000
1000
insert into tmp values ("testdb_sub", "t4");
select mo_table_rows(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);
internal error: get the subscribed tbl info empty: t4
set mo_table_stats.force_update = no;
delete from tmp where dbName = "testdb_sub" and tblName = "t4";
set mo_table_stats.use_old_impl = yes;
select mo_table_rows(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);
mo_table_rows(dbName, tblName)
1001
1001
1001
1000
1000
1000
select mo_table_size(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);
mo_table_size(dbName, tblName)
36036
36036
36036
36000
36000
36000
insert into tmp values ("testdb_sub", "t4");
select mo_table_rows(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);
internal error: get the subscribed tbl info empty: t4
select mo_table_size(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);
internal error: get the subscribed tbl info empty: t4
set mo_table_stats.use_old_impl = no;
drop database testdb_nor;
drop database testdb_sub;
drop account acc;
drop publication pub1;
drop database testdb;
64 changes: 64 additions & 0 deletions test/distributed/cases/function/func_mo.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
drop database if exists testdb;
create database testdb;
use testdb;

create account acc admin_name "root" identified by "111";
create publication pub1 database testdb account all;

create table t1 (a int);
create table t2 (a int);
create table t3 (a int);

insert into t1 select * from generate_series(1, 1000)g;
insert into t2 select * from generate_series(1, 1000)g;
insert into t3 select * from generate_series(1, 1000)g;

-- @session:id=2&user=acc:root&password=111
drop database if exists testdb_sub;
create database testdb_sub from sys publication pub1;

drop database if exists testdb_nor;
create database testdb_nor;
use testdb_nor;

create table t1 (a int);
create table t2 (a int);
create table t3 (a int);

insert into t1 select * from generate_series(1, 1001)g;
insert into t2 select * from generate_series(1, 1001)g;
insert into t3 select * from generate_series(1, 1001)g;

create table tmp(dbName varchar, tblName varchar);
insert into tmp values ("testdb_nor", "t1"), ("testdb_nor", "t2"), ("testdb_nor", "t3");
insert into tmp values ("testdb_sub", "t1"), ("testdb_sub", "t2"), ("testdb_sub", "t3");

set mo_table_stats.force_update = yes;
select mo_table_rows(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);

insert into tmp values ("testdb_sub", "t4");
select mo_table_rows(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);

set mo_table_stats.force_update = no;
delete from tmp where dbName = "testdb_sub" and tblName = "t4";

set mo_table_stats.use_old_impl = yes;
select mo_table_rows(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);
-- @ignore:0
select mo_table_size(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);

insert into tmp values ("testdb_sub", "t4");
select mo_table_rows(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);
-- @ignore:0
select mo_table_size(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);

set mo_table_stats.use_old_impl = no;

drop database testdb_nor;
drop database testdb_sub;

-- @session

drop account acc;
drop publication pub1;
drop database testdb;

0 comments on commit 599d364

Please sign in to comment.