Skip to content

Commit

Permalink
*: refactor partition definition radondb#480
Browse files Browse the repository at this point in the history
[summary]
partition_options:
    \* empty *\
  | GLOBAL
  | SINGLE
  | DISTRIBUTED BY (backend-name)
  | PARTITION BY HASH(shard-key) {PARTITIONS num}
  | PARTITION BY LIST(shard-key)(PARTITION backend VALUES IN (value_list),...)

[test case]
src/proxy/ddl_test.go
src/vendor/github.com/xelabs/go-mysqlstack/sqlparser/ddl_test.go

[patch codecov]
src/proxy/ddl.go 94.7%
src/vendor/github.com/xelabs/go-mysqlstack/sqlparser/ast_funcs.go 95.5%
src/vendor/github.com/xelabs/go-mysqlstack/sqlparser/sql.go 92.3%
  • Loading branch information
zhyass committed Jun 5, 2020
1 parent a7e8c48 commit 92200c2
Show file tree
Hide file tree
Showing 13 changed files with 2,128 additions and 2,203 deletions.
2 changes: 1 addition & 1 deletion src/planner/builder/sqlparser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func TestSQLShardKey(t *testing.T) {

ddl := node.(*sqlparser.DDL)
want := "col1"
got := ddl.PartitionName
got := ddl.PartitionOption.(*sqlparser.PartOptHash).Name
assert.Equal(t, want, got)
}
}
Expand Down
133 changes: 71 additions & 62 deletions src/proxy/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,54 +39,19 @@ func checkEngine(ddl *sqlparser.DDL) error {
}

func tryGetShardKey(ddl *sqlparser.DDL) (string, error) {
shardKey := ddl.PartitionName
table := ddl.Table.Name.String()

if "dual" == table {
return "", fmt.Errorf("spanner.ddl.check.create.table[%s].error:not support", table)
}

if shardKey != "" {
shardKeyOK := false
constraintCheckOK := true
// shardKey check and constraint check in column definition
for _, col := range ddl.TableSpec.Columns {
colName := col.Name.String()
if colName == shardKey {
shardKeyOK = true
} else {
if col.Type.PrimaryKeyOpt == sqlparser.ColKeyPrimary ||
col.Type.UniqueKeyOpt == sqlparser.ColKeyUniqueKey {
constraintCheckOK = false
}
}
}

if !shardKeyOK {
return "", fmt.Errorf("Sharding Key column '%s' doesn't exist in table", shardKey)
}
if !constraintCheckOK {
return "", fmt.Errorf("The unique/primary constraint should be only defined on the sharding key column[%s]", shardKey)
}

// constraint check in index definition
for _, index := range ddl.TableSpec.Indexes {
constraintCheckOK = false
if index.Unique || index.Primary {
for _, colIdx := range index.Opts.Columns {
colName := colIdx.Column.String()
if colName == shardKey {
constraintCheckOK = true
break
}
}
if !constraintCheckOK {
return "", fmt.Errorf("The unique/primary constraint should be only defined on the sharding key column[%s]", shardKey)
}
}
}
return shardKey, nil
} else {
switch partOpt := ddl.PartitionOption.(type) {
case *sqlparser.PartOptHash:
shardKey := partOpt.Name
return shardKey, checkShardKey(ddl, shardKey)
case *sqlparser.PartOptList:
shardKey := partOpt.Name
return shardKey, checkShardKey(ddl, shardKey)
case *sqlparser.PartOptNormal:
for _, col := range ddl.TableSpec.Columns {
colName := col.Name.String()
if col.Type.PrimaryKeyOpt == sqlparser.ColKeyPrimary ||
Expand All @@ -102,10 +67,54 @@ func tryGetShardKey(ddl *sqlparser.DDL) (string, error) {
}
}
}
case *sqlparser.PartOptGlobal, *sqlparser.PartOptSingle:
return "", nil
}
return "", fmt.Errorf("The unique/primary constraint shoule be defined or add 'PARTITION BY HASH' to mandatory indication")
}

func checkShardKey(ddl *sqlparser.DDL, shardKey string) error {
shardKeyOK := false
constraintCheckOK := true
// shardKey check and constraint check in column definition
for _, col := range ddl.TableSpec.Columns {
colName := col.Name.String()
if colName == shardKey {
shardKeyOK = true
} else {
if col.Type.PrimaryKeyOpt == sqlparser.ColKeyPrimary ||
col.Type.UniqueKeyOpt == sqlparser.ColKeyUniqueKey {
constraintCheckOK = false
}
}
}

if !shardKeyOK {
return fmt.Errorf("Sharding Key column '%s' doesn't exist in table", shardKey)
}
if !constraintCheckOK {
return fmt.Errorf("The unique/primary constraint should be only defined on the sharding key column[%s]", shardKey)
}

// constraint check in index definition
for _, index := range ddl.TableSpec.Indexes {
constraintCheckOK = false
if index.Unique || index.Primary {
for _, colIdx := range index.Opts.Columns {
colName := colIdx.Column.String()
if colName == shardKey {
constraintCheckOK = true
break
}
}
if !constraintCheckOK {
return fmt.Errorf("The unique/primary constraint should be only defined on the sharding key column[%s]", shardKey)
}
}
}
return nil
}

func checkTableExists(database string, table string, router *router.Router) bool {
tblList := router.Tables()
tables, ok := tblList[database]
Expand Down Expand Up @@ -200,10 +209,8 @@ func (spanner *Spanner) handleDDL(session *driver.Session, query string, node *s
}
return qr, nil
case sqlparser.CreateTableStr:
var err error
table := ddl.Table.Name.String()
backends := scatter.Backends()
shardKey := ddl.PartitionName
tableType := router.TableTypeUnknown

if err := route.CheckDatabase(database); err != nil {
Expand All @@ -220,6 +227,11 @@ func (spanner *Spanner) handleDDL(session *driver.Session, query string, node *s
return nil, err
}

shardKey, err := tryGetShardKey(ddl)
if err != nil {
return nil, err
}

autoinc, err := autoincrement.GetAutoIncrement(node)
if err != nil {
return nil, err
Expand All @@ -228,40 +240,37 @@ func (spanner *Spanner) handleDDL(session *driver.Session, query string, node *s
AutoIncrement: autoinc,
}

switch ddl.TableSpec.Options.Type {
case sqlparser.PartitionTableHash, sqlparser.NormalTableType:
if shardKey, err = tryGetShardKey(ddl); err != nil {
return nil, err
}

switch partOpt := ddl.PartitionOption.(type) {
case *sqlparser.PartOptHash:
tableType = router.TableTypePartitionHash
if err := route.CreateHashTable(database, table, shardKey, tableType, backends, ddl.PartitionNum, extra); err != nil {
if err := route.CreateHashTable(database, table, shardKey, tableType, backends, partOpt.PartitionNum, extra); err != nil {
return nil, err
}
case sqlparser.PartitionTableList:
if shardKey, err = tryGetShardKey(ddl); err != nil {
case *sqlparser.PartOptNormal:
tableType = router.TableTypePartitionHash
if err := route.CreateHashTable(database, table, shardKey, tableType, backends, nil, extra); err != nil {
return nil, err
}

case *sqlparser.PartOptList:
tableType = router.TableTypePartitionList
if err := route.CreateListTable(database, table, shardKey, tableType, ddl.PartitionOptions, extra); err != nil {
if err := route.CreateListTable(database, table, shardKey, tableType, partOpt.PartDefs, extra); err != nil {
return nil, err
}
case sqlparser.GlobalTableType:
case *sqlparser.PartOptGlobal:
tableType = router.TableTypeGlobal
if err := route.CreateNonPartTable(database, table, tableType, backends, extra); err != nil {
return nil, err
}
case sqlparser.SingleTableType:
case *sqlparser.PartOptSingle:
tableType = router.TableTypeSingle
if ddl.BackendName != "" {
if partOpt.BackendName != "" {
// TODO(andy): distributed by a list of backends
if isExist := scatter.CheckBackend(ddl.BackendName); !isExist {
if isExist := scatter.CheckBackend(partOpt.BackendName); !isExist {
log.Error("spanner.ddl.execute[%v].backend.doesn't.exist", query)
return nil, fmt.Errorf("create table distributed by backend '%s' doesn't exist", ddl.BackendName)
return nil, fmt.Errorf("create table distributed by backend '%s' doesn't exist", partOpt.BackendName)
}

assignedBackends := []string{ddl.BackendName}
assignedBackends := []string{partOpt.BackendName}
if err := route.CreateNonPartTable(database, table, tableType, assignedBackends, extra); err != nil {
return nil, err
}
Expand Down
8 changes: 5 additions & 3 deletions src/proxy/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ func TestProxyDDLTable(t *testing.T) {
client, err := driver.NewConn("mock", "mock", address, "test", "utf8")
assert.Nil(t, err)
querys := []string{
"create table if not exists t4(a int, b int) SINGLE comment 'comment test' charset='utf8'",
"create table if not exists t5(a int, b int) Global default charset utf8",
"create table if not exists t4(a int, b int) comment 'comment test' charset='utf8' SINGLE",
"create table if not exists t5(a int, b int) default charset utf8 Global",
"create table if not exists t6(a int key, b int) default character set='utf8' comment 'test' engine innodb",
}
for _, query := range querys {
Expand Down Expand Up @@ -779,7 +779,7 @@ func TestProxyDDLCreateTable(t *testing.T) {
"create table t7(a int collate utf8_bin Collate 'utf8_bin' collate \"utf8_bin\") partition by hash(a)",
"create table t8(a int, b int) partition by hash(a)",
"create table t9(a int, b timestamp(5) on update current_timestamp(5) column_format fixed column_format default column_format dynamic) partition by hash(a)",
"create table t10(a int column_format fixed column_format default column_format dynamic) partition by hash(a) comment='comment option' engine=tokudb default charset='utf8' avg_row_length=123 checksum=1 collate='utf8_bin' compression='lz4' connection='id' data directory='/data' index directory='/index' delay_key_write=1 encryption='n' insert_method=First key_block_size=1 max_rows=3 min_rows=2 pack_keys=default password='pwd' row_format=dynamic stats_auto_recalc=1 stats_persistent=default stats_sample_pages=65535 tablespace=storage",
"create table t10(a int column_format fixed column_format default column_format dynamic) comment='comment option' engine=tokudb default charset='utf8' avg_row_length=123 checksum=1 collate='utf8_bin' compression='lz4' connection='id' data directory='/data' index directory='/index' delay_key_write=1 encryption='n' insert_method=First key_block_size=1 max_rows=3 min_rows=2 pack_keys=default password='pwd' row_format=dynamic stats_auto_recalc=1 stats_persistent=default stats_sample_pages=65535 tablespace=storage partition by hash(a)",
}

for _, query := range querys {
Expand Down Expand Up @@ -1181,6 +1181,7 @@ func TestProxyDDLGlobalSingleNormalList(t *testing.T) {
"CREATE TABLE t5(a int ,b int, primary key(a))",
"CREATE TABLE t6(a int ,b int, primary key(a, b))",
"create table t7(a int, b int unique)",
"create table `t8/t/t`(a int,b int, primary key(a))",

// partition list
"CREATE TABLE l(a int primary key,b int ) partition by list(a)(" +
Expand All @@ -1206,6 +1207,7 @@ func TestProxyDDLGlobalSingleNormalList(t *testing.T) {
"",
"The unique/primary constraint shoule be defined or add 'PARTITION BY HASH' to mandatory indication (errno 1105) (sqlstate HY000)",
"",
"invalid.table.name.currently.not.support.tablename[t8/t/t].contains.with.char:'/' or space ' ' (errno 1105) (sqlstate HY000)",

// partition list
"",
Expand Down
4 changes: 2 additions & 2 deletions src/router/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (r *Router) SingleUniform(table string, backends []string) (*config.TableCo
}, nil
}

func listMergePartition(partitionDef sqlparser.PartitionOptions) (map[string]string, error) {
func listMergePartition(partitionDef sqlparser.PartitionDefinitions) (map[string]string, error) {
partitionMap := make(map[string]string)
for _, onePart := range partitionDef {
row := onePart.Row
Expand All @@ -167,7 +167,7 @@ func listMergePartition(partitionDef sqlparser.PartitionOptions) (map[string]str
}

// ListUniform used to uniform the list table to backends.
func (r *Router) ListUniform(table string, shardkey string, partitionDef sqlparser.PartitionOptions) (*config.TableConfig, error) {
func (r *Router) ListUniform(table string, shardkey string, partitionDef sqlparser.PartitionDefinitions) (*config.TableConfig, error) {
if table == "" {
return nil, errors.New("table.cant.be.null")
}
Expand Down
12 changes: 6 additions & 6 deletions src/router/compute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,20 +381,20 @@ func TestRouterComputeListError(t *testing.T) {
// Shardkey is NULL.
{
assert.NotNil(t, router)
_, err := router.ListUniform("t1", "", sqlparser.PartitionOptions{})
_, err := router.ListUniform("t1", "", sqlparser.PartitionDefinitions{})
assert.NotNil(t, err)
}

// Table is NULL.
{
assert.NotNil(t, router)
_, err := router.ListUniform("", "i", sqlparser.PartitionOptions{})
_, err := router.ListUniform("", "i", sqlparser.PartitionDefinitions{})
assert.NotNil(t, err)
}

// different backends with the same list value.
{
partitionDef := sqlparser.PartitionOptions{
partitionDef := sqlparser.PartitionDefinitions{
&sqlparser.PartitionDefinition{
Backend: "node1",
Row: sqlparser.ValTuple{sqlparser.NewStrVal([]byte("1"))},
Expand All @@ -410,10 +410,10 @@ func TestRouterComputeListError(t *testing.T) {
assert.NotNil(t, err)
}

// empty PartitionOptions
// empty PartitionDefinitions
{
assert.NotNil(t, router)
_, err := router.ListUniform("t1", "i", sqlparser.PartitionOptions{})
_, err := router.ListUniform("t1", "i", sqlparser.PartitionDefinitions{})
assert.NotNil(t, err)
}
}
Expand Down Expand Up @@ -449,7 +449,7 @@ func TestRouterComputeList(t *testing.T) {
defer cleanup()
assert.NotNil(t, router)

partitionDef := sqlparser.PartitionOptions{
partitionDef := sqlparser.PartitionDefinitions{
&sqlparser.PartitionDefinition{
Backend: "node1",
Row: sqlparser.ValTuple{sqlparser.NewStrVal([]byte("2"))},
Expand Down
2 changes: 1 addition & 1 deletion src/router/frm.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (r *Router) CreateHashTable(db, table, shardKey string, tableType string, b

// CreateListTable used to add a list table to router and flush the schema to disk.
func (r *Router) CreateListTable(db, table, shardKey string, tableType string,
partitionDef sqlparser.PartitionOptions, extra *Extra) error {
partitionDef sqlparser.PartitionDefinitions, extra *Extra) error {
r.mu.Lock()
defer r.mu.Unlock()

Expand Down
6 changes: 3 additions & 3 deletions src/router/frm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestFrmTable(t *testing.T) {

// Add list table.
{
partitionDef := sqlparser.PartitionOptions{
partitionDef := sqlparser.PartitionDefinitions{
&sqlparser.PartitionDefinition{
Backend: "node1",
Row: sqlparser.ValTuple{sqlparser.NewStrVal([]byte("2"))},
Expand Down Expand Up @@ -547,7 +547,7 @@ func TestFrmTableCreateListTable(t *testing.T) {

// Add list table.
{
partitionDef := sqlparser.PartitionOptions{
partitionDef := sqlparser.PartitionDefinitions{
&sqlparser.PartitionDefinition{
Backend: "node1",
Row: sqlparser.ValTuple{sqlparser.NewStrVal([]byte("2"))},
Expand All @@ -567,7 +567,7 @@ func TestFrmTableCreateListTable(t *testing.T) {
err = router.CreateListTable("test", "l", "", TableTypePartitionHash, partitionDef, nil)
assert.NotNil(t, err)

err = router.CreateListTable("test", "l", "id", TableTypePartitionList, sqlparser.PartitionOptions{}, nil)
err = router.CreateListTable("test", "l", "id", TableTypePartitionList, sqlparser.PartitionDefinitions{}, nil)
assert.NotNil(t, err)

err = router.CreateListTable("test", "l", "id", TableTypePartitionList, partitionDef, &Extra{&config.AutoIncrement{"id"}})
Expand Down
8 changes: 1 addition & 7 deletions src/vendor/github.com/xelabs/go-mysqlstack/sqlparser/ast.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 92200c2

Please sign in to comment.