From 155217b59cea3c0e3689afa6825160f8ad0c8828 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 6 Feb 2024 20:14:39 +0800 Subject: [PATCH 01/14] infoschema: split InfoSchemaMisc from InfoSchema interface --- pkg/infoschema/builder.go | 16 +++++++------ pkg/infoschema/infoschema.go | 44 ++++++++++++++++++++---------------- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/pkg/infoschema/builder.go b/pkg/infoschema/builder.go index 067ad24c04c83..445348609f802 100644 --- a/pkg/infoschema/builder.go +++ b/pkg/infoschema/builder.go @@ -531,7 +531,7 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 // TODO: Check how this would work with ADD/REMOVE Partitioning, // which may have AutoID not connected to tableID // TODO: can there be _tidb_rowid AutoID per partition? - oldAllocs, _ := b.is.AllocByID(oldTableID) + oldAllocs, _ := allocByID(b.is, oldTableID) allocs = filterAllocators(diff, oldAllocs) } @@ -1149,12 +1149,14 @@ func NewBuilder(r autoid.Requirement, factory func() (pools.Resource, error)) *B return &Builder{ Requirement: r, is: &infoSchema{ - schemaMap: map[string]*schemaTables{}, - policyMap: map[string]*model.PolicyInfo{}, - resourceGroupMap: map[string]*model.ResourceGroupInfo{}, - ruleBundleMap: map[int64]*placement.Bundle{}, - sortedTablesBuckets: make([]sortedTables, bucketCount), - referredForeignKeyMap: make(map[SchemaAndTableName][]*model.ReferredFKInfo), + infoSchemaMisc: infoSchemaMisc{ + policyMap: map[string]*model.PolicyInfo{}, + resourceGroupMap: map[string]*model.ResourceGroupInfo{}, + ruleBundleMap: map[int64]*placement.Bundle{}, + referredForeignKeyMap: make(map[SchemaAndTableName][]*model.ReferredFKInfo), + }, + schemaMap: map[string]*schemaTables{}, + sortedTablesBuckets: make([]sortedTables, bucketCount), }, dirtyDB: make(map[string]bool), factory: factory, diff --git a/pkg/infoschema/infoschema.go b/pkg/infoschema/infoschema.go index 3022f972b2d00..17540cf22afe3 100644 --- a/pkg/infoschema/infoschema.go +++ b/pkg/infoschema/infoschema.go @@ -42,15 +42,17 @@ type InfoSchema interface { TableExists(schema, table model.CIStr) bool SchemaByID(id int64) (*model.DBInfo, bool) SchemaByTable(tableInfo *model.TableInfo) (*model.DBInfo, bool) - PolicyByName(name model.CIStr) (*model.PolicyInfo, bool) - ResourceGroupByName(name model.CIStr) (*model.ResourceGroupInfo, bool) TableByID(id int64) (table.Table, bool) - AllocByID(id int64) (autoid.Allocators, bool) AllSchemas() []*model.DBInfo - Clone() (result []*model.DBInfo) SchemaTables(schema model.CIStr) []table.Table SchemaMetaVersion() int64 FindTableByPartitionID(partitionID int64) (table.Table, *model.DBInfo, *model.PartitionDefinition) + InfoSchemaMisc +} + +type InfoSchemaMisc interface { + PolicyByName(name model.CIStr) (*model.PolicyInfo, bool) + ResourceGroupByName(name model.CIStr) (*model.ResourceGroupInfo, bool) // PlacementBundleByPhysicalTableID is used to get a rule bundle. PlacementBundleByPhysicalTableID(id int64) (*placement.Bundle, bool) // AllPlacementBundles is used to get all placement bundles @@ -85,6 +87,17 @@ type schemaTables struct { const bucketCount = 512 type infoSchema struct { + infoSchemaMisc + schemaMap map[string]*schemaTables + + // sortedTablesBuckets is a slice of sortedTables, a table's bucket index is (tableID % bucketCount). + sortedTablesBuckets []sortedTables + + // schemaMetaVersion is the version of schema, and we should check version when change schema. + schemaMetaVersion int64 +} + +type infoSchemaMisc struct { // ruleBundleMap stores all placement rules ruleBundleMap map[int64]*placement.Bundle @@ -96,17 +109,9 @@ type infoSchema struct { resourceGroupMutex sync.RWMutex resourceGroupMap map[string]*model.ResourceGroupInfo - schemaMap map[string]*schemaTables - - // sortedTablesBuckets is a slice of sortedTables, a table's bucket index is (tableID % bucketCount). - sortedTablesBuckets []sortedTables - // temporaryTables stores the temporary table ids temporaryTableIDs map[int64]struct{} - // schemaMetaVersion is the version of schema, and we should check version when change schema. - schemaMetaVersion int64 - // referredForeignKeyMap records all table's ReferredFKInfo. // referredSchemaAndTableName => child SchemaAndTableAndForeignKeyName => *model.ReferredFKInfo referredForeignKeyMap map[SchemaAndTableName][]*model.ReferredFKInfo @@ -283,7 +288,8 @@ func (is *infoSchema) TableByID(id int64) (val table.Table, ok bool) { return slice[idx], true } -func (is *infoSchema) AllocByID(id int64) (autoid.Allocators, bool) { +// AllocByID returns the Allocators of a table. +func allocByID(is *infoSchema, id int64) (autoid.Allocators, bool) { tbl, ok := is.TableByID(id) if !ok { return autoid.Allocators{}, false @@ -342,12 +348,12 @@ func (is *infoSchema) HasTemporaryTable() bool { return len(is.temporaryTableIDs) != 0 } -func (is *infoSchema) Clone() (result []*model.DBInfo) { - for _, v := range is.schemaMap { - result = append(result, v.dbInfo.Clone()) - } - return -} +// func (is *infoSchema) Clone() (result []*model.DBInfo) { +// for _, v := range is.schemaMap { +// result = append(result, v.dbInfo.Clone()) +// } +// return +// } // GetSequenceByName gets the sequence by name. func GetSequenceByName(is InfoSchema, schema, sequence model.CIStr) (util.SequenceTable, error) { From 6330aebdc5b448b1eb560c03d23572091a36a8d3 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 6 Feb 2024 20:20:23 +0800 Subject: [PATCH 02/14] clean up --- pkg/infoschema/infoschema.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pkg/infoschema/infoschema.go b/pkg/infoschema/infoschema.go index 17540cf22afe3..e19cf836282a8 100644 --- a/pkg/infoschema/infoschema.go +++ b/pkg/infoschema/infoschema.go @@ -348,13 +348,6 @@ func (is *infoSchema) HasTemporaryTable() bool { return len(is.temporaryTableIDs) != 0 } -// func (is *infoSchema) Clone() (result []*model.DBInfo) { -// for _, v := range is.schemaMap { -// result = append(result, v.dbInfo.Clone()) -// } -// return -// } - // GetSequenceByName gets the sequence by name. func GetSequenceByName(is InfoSchema, schema, sequence model.CIStr) (util.SequenceTable, error) { tbl, err := is.TableByName(schema, sequence) From 0b4cff24ca699e2971f7c98099e3ddebfd4fa04a Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 6 Feb 2024 21:02:24 +0800 Subject: [PATCH 03/14] make lint happy --- pkg/infoschema/infoschema.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/infoschema/infoschema.go b/pkg/infoschema/infoschema.go index e19cf836282a8..827d3235f6b54 100644 --- a/pkg/infoschema/infoschema.go +++ b/pkg/infoschema/infoschema.go @@ -34,7 +34,6 @@ import ( // InfoSchema is the interface used to retrieve the schema information. // It works as a in memory cache and doesn't handle any schema change. // InfoSchema is read-only, and the returned value is a copy. -// TODO: add more methods to retrieve tables and columns. type InfoSchema interface { SchemaByName(schema model.CIStr) (*model.DBInfo, bool) SchemaExists(schema model.CIStr) bool @@ -47,10 +46,11 @@ type InfoSchema interface { SchemaTables(schema model.CIStr) []table.Table SchemaMetaVersion() int64 FindTableByPartitionID(partitionID int64) (table.Table, *model.DBInfo, *model.PartitionDefinition) - InfoSchemaMisc + Misc } -type InfoSchemaMisc interface { +// Misc contains the methods that are not closely related to InfoSchema. +type Misc interface { PolicyByName(name model.CIStr) (*model.PolicyInfo, bool) ResourceGroupByName(name model.CIStr) (*model.ResourceGroupInfo, bool) // PlacementBundleByPhysicalTableID is used to get a rule bundle. From 1dc7d48887fb958486b78c5e3da5e2c712ebc7d9 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 6 Feb 2024 21:15:38 +0800 Subject: [PATCH 04/14] fix lint --- pkg/infoschema/infoschema.go | 30 ++++++++++++++++-------------- pkg/infoschema/infoschema_test.go | 7 +------ 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/pkg/infoschema/infoschema.go b/pkg/infoschema/infoschema.go index 827d3235f6b54..590f52bbf5c98 100644 --- a/pkg/infoschema/infoschema.go +++ b/pkg/infoschema/infoschema.go @@ -67,6 +67,8 @@ type Misc interface { GetTableReferredForeignKeys(schema, table string) []*model.ReferredFKInfo } +var _ Misc = &infoSchemaMisc{} + type sortedTables []table.Table func (s sortedTables) searchTable(id int64) int { @@ -344,7 +346,7 @@ func (is *infoSchema) FindTableByPartitionID(partitionID int64) (table.Table, *m } // HasTemporaryTable returns whether information schema has temporary table -func (is *infoSchema) HasTemporaryTable() bool { +func (is *infoSchemaMisc) HasTemporaryTable() bool { return len(is.temporaryTableIDs) != 0 } @@ -405,7 +407,7 @@ func HasAutoIncrementColumn(tbInfo *model.TableInfo) (bool, string) { } // PolicyByName is used to find the policy. -func (is *infoSchema) PolicyByName(name model.CIStr) (*model.PolicyInfo, bool) { +func (is *infoSchemaMisc) PolicyByName(name model.CIStr) (*model.PolicyInfo, bool) { is.policyMutex.RLock() defer is.policyMutex.RUnlock() t, r := is.policyMap[name.L] @@ -413,7 +415,7 @@ func (is *infoSchema) PolicyByName(name model.CIStr) (*model.PolicyInfo, bool) { } // ResourceGroupByName is used to find the resource group. -func (is *infoSchema) ResourceGroupByName(name model.CIStr) (*model.ResourceGroupInfo, bool) { +func (is *infoSchemaMisc) ResourceGroupByName(name model.CIStr) (*model.ResourceGroupInfo, bool) { is.resourceGroupMutex.RLock() defer is.resourceGroupMutex.RUnlock() t, r := is.resourceGroupMap[name.L] @@ -421,7 +423,7 @@ func (is *infoSchema) ResourceGroupByName(name model.CIStr) (*model.ResourceGrou } // AllResourceGroups returns all resource groups. -func (is *infoSchema) AllResourceGroups() []*model.ResourceGroupInfo { +func (is *infoSchemaMisc) AllResourceGroups() []*model.ResourceGroupInfo { is.resourceGroupMutex.RLock() defer is.resourceGroupMutex.RUnlock() groups := make([]*model.ResourceGroupInfo, 0, len(is.resourceGroupMap)) @@ -432,7 +434,7 @@ func (is *infoSchema) AllResourceGroups() []*model.ResourceGroupInfo { } // AllPlacementPolicies returns all placement policies -func (is *infoSchema) AllPlacementPolicies() []*model.PolicyInfo { +func (is *infoSchemaMisc) AllPlacementPolicies() []*model.PolicyInfo { is.policyMutex.RLock() defer is.policyMutex.RUnlock() policies := make([]*model.PolicyInfo, 0, len(is.policyMap)) @@ -442,12 +444,12 @@ func (is *infoSchema) AllPlacementPolicies() []*model.PolicyInfo { return policies } -func (is *infoSchema) PlacementBundleByPhysicalTableID(id int64) (*placement.Bundle, bool) { +func (is *infoSchemaMisc) PlacementBundleByPhysicalTableID(id int64) (*placement.Bundle, bool) { t, r := is.ruleBundleMap[id] return t, r } -func (is *infoSchema) AllPlacementBundles() []*placement.Bundle { +func (is *infoSchemaMisc) AllPlacementBundles() []*placement.Bundle { bundles := make([]*placement.Bundle, 0, len(is.ruleBundleMap)) for _, bundle := range is.ruleBundleMap { bundles = append(bundles, bundle) @@ -455,31 +457,31 @@ func (is *infoSchema) AllPlacementBundles() []*placement.Bundle { return bundles } -func (is *infoSchema) setResourceGroup(resourceGroup *model.ResourceGroupInfo) { +func (is *infoSchemaMisc) setResourceGroup(resourceGroup *model.ResourceGroupInfo) { is.resourceGroupMutex.Lock() defer is.resourceGroupMutex.Unlock() is.resourceGroupMap[resourceGroup.Name.L] = resourceGroup } -func (is *infoSchema) deleteResourceGroup(name string) { +func (is *infoSchemaMisc) deleteResourceGroup(name string) { is.resourceGroupMutex.Lock() defer is.resourceGroupMutex.Unlock() delete(is.resourceGroupMap, name) } -func (is *infoSchema) setPolicy(policy *model.PolicyInfo) { +func (is *infoSchemaMisc) setPolicy(policy *model.PolicyInfo) { is.policyMutex.Lock() defer is.policyMutex.Unlock() is.policyMap[policy.Name.L] = policy } -func (is *infoSchema) deletePolicy(name string) { +func (is *infoSchemaMisc) deletePolicy(name string) { is.policyMutex.Lock() defer is.policyMutex.Unlock() delete(is.policyMap, name) } -func (is *infoSchema) addReferredForeignKeys(schema model.CIStr, tbInfo *model.TableInfo) { +func (is *infoSchemaMisc) addReferredForeignKeys(schema model.CIStr, tbInfo *model.TableInfo) { for _, fk := range tbInfo.ForeignKeys { if fk.Version < model.FKVersion1 { continue @@ -519,7 +521,7 @@ func (is *infoSchema) addReferredForeignKeys(schema model.CIStr, tbInfo *model.T } } -func (is *infoSchema) deleteReferredForeignKeys(schema model.CIStr, tbInfo *model.TableInfo) { +func (is *infoSchemaMisc) deleteReferredForeignKeys(schema model.CIStr, tbInfo *model.TableInfo) { for _, fk := range tbInfo.ForeignKeys { if fk.Version < model.FKVersion1 { continue @@ -541,7 +543,7 @@ func (is *infoSchema) deleteReferredForeignKeys(schema model.CIStr, tbInfo *mode } // GetTableReferredForeignKeys gets the table's ReferredFKInfo by lowercase schema and table name. -func (is *infoSchema) GetTableReferredForeignKeys(schema, table string) []*model.ReferredFKInfo { +func (is *infoSchemaMisc) GetTableReferredForeignKeys(schema, table string) []*model.ReferredFKInfo { name := SchemaAndTableName{schema: schema, table: table} return is.referredForeignKeyMap[name] } diff --git a/pkg/infoschema/infoschema_test.go b/pkg/infoschema/infoschema_test.go index 3da8d6b5cf79c..811ca32079a26 100644 --- a/pkg/infoschema/infoschema_test.go +++ b/pkg/infoschema/infoschema_test.go @@ -128,8 +128,6 @@ func TestBasic(t *testing.T) { schemas := is.AllSchemas() require.Len(t, schemas, 4) - schemas = is.Clone() - require.Len(t, schemas, 4) require.True(t, is.SchemaExists(dbName)) require.False(t, is.SchemaExists(noexist)) @@ -171,10 +169,7 @@ func TestBasic(t *testing.T) { tb, ok = is.TableByID(dbID) require.False(t, ok) require.Nil(t, tb) - - alloc, ok := is.AllocByID(tbID) - require.True(t, ok) - require.NotNil(t, alloc) + require.NotNil(t, tb.Allocators(nil)) tb, err = is.TableByName(dbName, tbName) require.NoError(t, err) From 8fd891e2c08837ccf9becefe1243d3ace2c688cf Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 6 Feb 2024 22:53:07 +0800 Subject: [PATCH 05/14] fix ci --- pkg/infoschema/infoschema.go | 2 +- pkg/infoschema/infoschema_test.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/infoschema/infoschema.go b/pkg/infoschema/infoschema.go index 590f52bbf5c98..aaf4e700b5097 100644 --- a/pkg/infoschema/infoschema.go +++ b/pkg/infoschema/infoschema.go @@ -290,7 +290,7 @@ func (is *infoSchema) TableByID(id int64) (val table.Table, ok bool) { return slice[idx], true } -// AllocByID returns the Allocators of a table. +// allocByID returns the Allocators of a table. func allocByID(is *infoSchema, id int64) (autoid.Allocators, bool) { tbl, ok := is.TableByID(id) if !ok { diff --git a/pkg/infoschema/infoschema_test.go b/pkg/infoschema/infoschema_test.go index 811ca32079a26..619381b396e42 100644 --- a/pkg/infoschema/infoschema_test.go +++ b/pkg/infoschema/infoschema_test.go @@ -169,7 +169,6 @@ func TestBasic(t *testing.T) { tb, ok = is.TableByID(dbID) require.False(t, ok) require.Nil(t, tb) - require.NotNil(t, tb.Allocators(nil)) tb, err = is.TableByName(dbName, tbName) require.NoError(t, err) From 28423d2a7be15982bb3df8a072e76e5cb714a412 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 7 Feb 2024 19:09:47 +0800 Subject: [PATCH 06/14] infoschema,domain: introduce InfoSchemaV2, core data struct etc --- DEPS.bzl | 26 ++ go.mod | 2 + go.sum | 4 + pkg/domain/domain.go | 12 +- pkg/infoschema/BUILD.bazel | 5 + pkg/infoschema/builder.go | 79 +++++- pkg/infoschema/cache.go | 9 +- pkg/infoschema/infoschema_v2.go | 418 ++++++++++++++++++++++++++++++++ 8 files changed, 536 insertions(+), 19 deletions(-) create mode 100644 pkg/infoschema/infoschema_v2.go diff --git a/DEPS.bzl b/DEPS.bzl index 45b9163b40653..c9e2b006fa9d4 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -6413,6 +6413,19 @@ def go_deps(): "https://storage.googleapis.com/pingcapmirror/gomod/github.com/sashamelentyev/usestdlibvars/com_github_sashamelentyev_usestdlibvars-v1.24.0.zip", ], ) + go_repository( + name = "com_github_scalalang2_golang_fifo", + build_file_proto_mode = "disable_global", + importpath = "github.com/scalalang2/golang-fifo", + sha256 = "48ed9feefc3680b12116a212eaac53af5d6c7183ffe80ed1427eb8504a3b05cc", + strip_prefix = "github.com/scalalang2/golang-fifo@v0.1.5", + urls = [ + "http://bazel-cache.pingcap.net:8080/gomod/github.com/scalalang2/golang-fifo/com_github_scalalang2_golang_fifo-v0.1.5.zip", + "http://ats.apps.svc/gomod/github.com/scalalang2/golang-fifo/com_github_scalalang2_golang_fifo-v0.1.5.zip", + "https://cache.hawkingrei.com/gomod/github.com/scalalang2/golang-fifo/com_github_scalalang2_golang_fifo-v0.1.5.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/scalalang2/golang-fifo/com_github_scalalang2_golang_fifo-v0.1.5.zip", + ], + ) go_repository( name = "com_github_scaleway_scaleway_sdk_go", build_file_proto_mode = "disable_global", @@ -7024,6 +7037,19 @@ def go_deps(): "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tiancaiamao/gp/com_github_tiancaiamao_gp-v0.0.0-20221230034425-4025bc8a4d4a.zip", ], ) + go_repository( + name = "com_github_tidwall_btree", + build_file_proto_mode = "disable_global", + importpath = "github.com/tidwall/btree", + sha256 = "4a6619eb936c836841702933a9d66f27abe83b7ffb541de44d12db4aa3a809d5", + strip_prefix = "github.com/tidwall/btree@v1.7.0", + urls = [ + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tidwall/btree/com_github_tidwall_btree-v1.7.0.zip", + "http://ats.apps.svc/gomod/github.com/tidwall/btree/com_github_tidwall_btree-v1.7.0.zip", + "https://cache.hawkingrei.com/gomod/github.com/tidwall/btree/com_github_tidwall_btree-v1.7.0.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tidwall/btree/com_github_tidwall_btree-v1.7.0.zip", + ], + ) go_repository( name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", diff --git a/go.mod b/go.mod index fb3caf3b5b2a6..c10aa37057e90 100644 --- a/go.mod +++ b/go.mod @@ -97,6 +97,7 @@ require ( github.com/prometheus/prometheus v0.49.1 github.com/robfig/cron/v3 v3.0.1 github.com/sasha-s/go-deadlock v0.3.1 + github.com/scalalang2/golang-fifo v0.1.5 github.com/shirou/gopsutil/v3 v3.23.10 github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 github.com/soheilhy/cmux v0.1.5 @@ -107,6 +108,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/tdakkota/asciicheck v0.2.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 + github.com/tidwall/btree v1.7.0 github.com/tikv/client-go/v2 v2.0.8-0.20240205071126-11cb7985f0ec github.com/tikv/pd/client v0.0.0-20240126020320-567c7d43a008 github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a diff --git a/go.sum b/go.sum index 78e92c6ace632..dd17971c11701 100644 --- a/go.sum +++ b/go.sum @@ -783,6 +783,8 @@ github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 h1:GHRpF1pTW19a github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46/go.mod h1:uAQ5PCi+MFsC7HjREoAz1BU+Mq60+05gifQSsHSDG/8= github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0= github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM= +github.com/scalalang2/golang-fifo v0.1.5 h1:cl70TQhlMGGpI2DZGcr+7/GFTJOjHMeor0t7wynEEoA= +github.com/scalalang2/golang-fifo v0.1.5/go.mod h1:IK3OZBg7iHbVdQVGPDjcW1MWPb6JcWjaS/w0iRBS8gs= github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= @@ -864,6 +866,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= +github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI= +github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= github.com/tikv/client-go/v2 v2.0.8-0.20240205071126-11cb7985f0ec h1:j/1OKeXulUHDlfm7uHb6PTlNdagwrHzoHZGPQNK0y68= github.com/tikv/client-go/v2 v2.0.8-0.20240205071126-11cb7985f0ec/go.mod h1:jZLZhtui1Po+x616K7jxVAMe+aQfAuWqUZluRdO75Kc= github.com/tikv/pd/client v0.0.0-20240126020320-567c7d43a008 h1:McTV/45piKWPinw0m7gFospPEGBW8AfK5J0RB/G9DP4= diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 468fd3521dee1..4cbf373dd3a7a 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -123,9 +123,8 @@ func init() { // NewMockDomain is only used for test func NewMockDomain() *Domain { - do := &Domain{ - infoCache: infoschema.NewCache(1), - } + do := &Domain{} + do.infoCache = infoschema.NewCache(do, 1) do.infoCache.Insert(infoschema.MockInfoSchema(nil), 0) return do } @@ -301,7 +300,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return nil, false, currentSchemaVersion, nil, err } - newISBuilder, err := infoschema.NewBuilder(do, do.sysFacHack).InitWithDBInfos(schemas, policies, resourceGroups, neededSchemaVersion) + newISBuilder, err := infoschema.NewBuilder(do, do.sysFacHack, do.infoCache.Data).InitWithDBInfos(schemas, policies, resourceGroups, neededSchemaVersion) if err != nil { return nil, false, currentSchemaVersion, nil, err } @@ -450,7 +449,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 } diffs = append(diffs, diff) } - builder := infoschema.NewBuilder(do, do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest()) + builder := infoschema.NewBuilder(do, do.sysFacHack, do.infoCache.Data).InitWithOldInfoSchema(do.infoCache.GetLatest()) builder.SetDeltaUpdateBundles() phyTblIDs := make([]int64, 0, len(diffs)) actions := make([]uint64, 0, len(diffs)) @@ -1073,7 +1072,6 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio exit: make(chan struct{}), sysSessionPool: newSessionPool(capacity, factory), statsLease: statsLease, - infoCache: infoschema.NewCache(int(variable.SchemaVersionCacheLimit.Load())), slowQuery: newTopNSlowQueries(config.GetGlobalConfig().InMemSlowQueryTopNNum, time.Hour*24*7, config.GetGlobalConfig().InMemSlowQueryRecentNum), dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{replayer.GetPlanReplayerDirName(), GetOptimizerTraceDirName(), GetExtractTaskDirName()}}, mdlCheckTableInfo: &mdlCheckTableInfo{ @@ -1083,6 +1081,8 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio }, mdlCheckCh: make(chan struct{}), } + + do.infoCache = infoschema.NewCache(do, int(variable.SchemaVersionCacheLimit.Load())) do.stopAutoAnalyze.Store(false) do.wg = util.NewWaitGroupEnhancedWrapper("domain", do.exit, config.GetGlobalConfig().TiDBEnableExitCheck) do.SchemaValidator = NewSchemaValidator(ddlLease, do) diff --git a/pkg/infoschema/BUILD.bazel b/pkg/infoschema/BUILD.bazel index d42caa569fb26..edcce41386a32 100644 --- a/pkg/infoschema/BUILD.bazel +++ b/pkg/infoschema/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "cluster.go", "error.go", "infoschema.go", + "infoschema_v2.go", "metric_table_def.go", "metrics_schema.go", "tables.go", @@ -54,11 +55,15 @@ go_library( "@com_github_pingcap_kvproto//pkg/diagnosticspb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_log//:log", + "@com_github_scalalang2_golang_fifo//:go_default_library", + "@com_github_scalalang2_golang_fifo//sieve:go_default_library", + "@com_github_tidwall_btree//:go_default_library", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_pd_client//http", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//credentials", "@org_golang_google_grpc//credentials/insecure", + "@org_golang_x_sync//singleflight", "@org_uber_go_zap//:zap", ], ) diff --git a/pkg/infoschema/builder.go b/pkg/infoschema/builder.go index 445348609f802..b6e0d77172267 100644 --- a/pkg/infoschema/builder.go +++ b/pkg/infoschema/builder.go @@ -18,6 +18,7 @@ import ( "cmp" "context" "fmt" + "math" "slices" "strings" @@ -188,6 +189,7 @@ type Builder struct { factory func() (pools.Resource, error) bundleInfoBuilder + infoData *InfoSchemaData } // ApplyDiff applies SchemaDiff to the new InfoSchema. @@ -557,7 +559,7 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 if tableIDIsValid(newTableID) { // All types except DropTableOrView. var err error - tblIDs, err = b.applyCreateTable(m, dbInfo, newTableID, allocs, diff.Type, tblIDs) + tblIDs, err = b.applyCreateTable(m, dbInfo, newTableID, allocs, diff.Type, tblIDs, diff.Version) if err != nil { return nil, errors.Trace(err) } @@ -682,6 +684,9 @@ func (b *Builder) applyCreateSchema(m *meta.Meta, diff *model.SchemaDiff) error ) } b.is.schemaMap[di.Name.L] = &schemaTables{dbInfo: di, tables: make(map[string]table.Table)} + if enableV2.Load() { + b.infoData.addDB(diff.Version, di) + } return nil } @@ -779,7 +784,7 @@ func (b *Builder) copySortedTablesBucket(bucketIdx int) { b.is.sortedTablesBuckets[bucketIdx] = newSortedTables } -func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID int64, allocs autoid.Allocators, tp model.ActionType, affected []int64) ([]int64, error) { +func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID int64, allocs autoid.Allocators, tp model.ActionType, affected []int64, schemaVersion int64) ([]int64, error) { tblInfo, err := m.GetTable(dbInfo.ID, tableID) if err != nil { return nil, errors.Trace(err) @@ -868,6 +873,15 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i slices.SortFunc(b.is.sortedTablesBuckets[bucketIdx], func(i, j table.Table) int { return cmp.Compare(i.Meta().ID, j.Meta().ID) }) + if enableV2.Load() { + b.infoData.add(Item{ + dbName: dbInfo.Name.L, + dbID: dbInfo.ID, + tableName: tblInfo.Name.L, + tableID: tblInfo.ID, + schemaVersion: schemaVersion, + }, tbl) + } if tblInfo.TempTableType != model.TempTableNone { b.addTemporaryTable(tableID) @@ -946,14 +960,34 @@ func (b *Builder) applyDropTable(dbInfo *model.DBInfo, tableID int64, affected [ return affected } +// TODO: get rid of this and use infoschemaV2 directly. +type infoschemaProxy struct { + infoschemaV2 + v1 InfoSchema +} + // Build builds and returns the built infoschema. func (b *Builder) Build() InfoSchema { b.updateInfoSchemaBundles(b.is) + if enableV2.Load() { + return &infoschemaProxy{ + infoschemaV2: infoschemaV2{ + ts: math.MaxUint64, // TODO: should be the correct TS + r: b.Requirement, + InfoSchemaData: b.infoData, + schemaVersion: b.is.SchemaMetaVersion(), + }, + v1: b.is, + } + } return b.is } // InitWithOldInfoSchema initializes an empty new InfoSchema by copies all the data from old InfoSchema. func (b *Builder) InitWithOldInfoSchema(oldSchema InfoSchema) *Builder { + if proxy, ok := oldSchema.(*infoschemaProxy); ok { + oldSchema = proxy.v1 + } oldIS := oldSchema.(*infoSchema) b.is.schemaMetaVersion = oldIS.schemaMetaVersion b.copySchemasMap(oldIS) @@ -1056,18 +1090,23 @@ func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, policies []*model.Pol } for _, di := range dbInfos { - err := b.createSchemaTablesForDB(di, b.tableFromMeta) + schTbls, err := b.createSchemaTablesForDB(di, b.tableFromMeta, schemaVersion) if err != nil { return nil, errors.Trace(err) } + b.is.schemaMap[di.Name.L] = schTbls } // Initialize virtual tables. for _, driver := range drivers { - err := b.createSchemaTablesForDB(driver.DBInfo, driver.TableFromMeta) + schTbls, err := b.createSchemaTablesForDB(driver.DBInfo, driver.TableFromMeta, schemaVersion) if err != nil { return nil, errors.Trace(err) } + b.is.schemaMap[driver.DBInfo.Name.L] = schTbls + if enableV2.Load() { + b.infoData.addSpecialDB(driver.DBInfo, schTbls) + } } // Sort all tables by `ID` @@ -1101,28 +1140,43 @@ func (b *Builder) tableFromMeta(alloc autoid.Allocators, tblInfo *model.TableInf type tableFromMetaFunc func(alloc autoid.Allocators, tblInfo *model.TableInfo) (table.Table, error) -func (b *Builder) createSchemaTablesForDB(di *model.DBInfo, tableFromMeta tableFromMetaFunc) error { +func (b *Builder) createSchemaTablesForDB(di *model.DBInfo, tableFromMeta tableFromMetaFunc, schemaVersion int64) (*schemaTables, error) { schTbls := &schemaTables{ dbInfo: di, tables: make(map[string]table.Table, len(di.Tables)), } - b.is.schemaMap[di.Name.L] = schTbls - for _, t := range di.Tables { allocs := autoid.NewAllocatorsFromTblInfo(b.Requirement, di.ID, t) var tbl table.Table tbl, err := tableFromMeta(allocs, t) if err != nil { - return errors.Wrap(err, fmt.Sprintf("Build table `%s`.`%s` schema failed", di.Name.O, t.Name.O)) + return nil, errors.Wrap(err, fmt.Sprintf("Build table `%s`.`%s` schema failed", di.Name.O, t.Name.O)) } + schTbls.tables[t.Name.L] = tbl sortedTbls := b.is.sortedTablesBuckets[tableBucketIdx(t.ID)] b.is.sortedTablesBuckets[tableBucketIdx(t.ID)] = append(sortedTbls, tbl) + + if enableV2.Load() { + b.infoData.add(Item{ + dbName: di.Name.L, + dbID: di.ID, + tableName: t.Name.L, + tableID: t.ID, + schemaVersion: schemaVersion, + }, tbl) + } + if tblInfo := tbl.Meta(); tblInfo.TempTableType != model.TempTableNone { b.addTemporaryTable(tblInfo.ID) } } - return nil + + if enableV2.Load() { + b.infoData.addDB(schemaVersion, di) + } + + return schTbls, nil } func (b *Builder) addTemporaryTable(tblID int64) { @@ -1145,7 +1199,7 @@ func RegisterVirtualTable(dbInfo *model.DBInfo, tableFromMeta tableFromMetaFunc) } // NewBuilder creates a new Builder with a Handle. -func NewBuilder(r autoid.Requirement, factory func() (pools.Resource, error)) *Builder { +func NewBuilder(r autoid.Requirement, factory func() (pools.Resource, error), infoData *InfoSchemaData) *Builder { return &Builder{ Requirement: r, is: &infoSchema{ @@ -1158,8 +1212,9 @@ func NewBuilder(r autoid.Requirement, factory func() (pools.Resource, error)) *B schemaMap: map[string]*schemaTables{}, sortedTablesBuckets: make([]sortedTables, bucketCount), }, - dirtyDB: make(map[string]bool), - factory: factory, + dirtyDB: make(map[string]bool), + factory: factory, + infoData: infoData, } } diff --git a/pkg/infoschema/cache.go b/pkg/infoschema/cache.go index 50d74580ea04d..3520a67200ba7 100644 --- a/pkg/infoschema/cache.go +++ b/pkg/infoschema/cache.go @@ -19,6 +19,7 @@ import ( "sync" infoschema_metrics "github.com/pingcap/tidb/pkg/infoschema/metrics" + "github.com/pingcap/tidb/pkg/meta/autoid" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" ) @@ -30,6 +31,9 @@ type InfoCache struct { mu sync.RWMutex // cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order cache []schemaAndTimestamp + + r autoid.Requirement + Data *InfoSchemaData } type schemaAndTimestamp struct { @@ -38,9 +42,12 @@ type schemaAndTimestamp struct { } // NewCache creates a new InfoCache. -func NewCache(capacity int) *InfoCache { +func NewCache(r autoid.Requirement, capacity int) *InfoCache { + infoData := NewInfoSchemaData() return &InfoCache{ cache: make([]schemaAndTimestamp, 0, capacity), + r: r, + Data: infoData, } } diff --git a/pkg/infoschema/infoschema_v2.go b/pkg/infoschema/infoschema_v2.go new file mode 100644 index 0000000000000..d651125f955b1 --- /dev/null +++ b/pkg/infoschema/infoschema_v2.go @@ -0,0 +1,418 @@ +package infoschema + +import ( + "fmt" + "math" + "sync" + "sync/atomic" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta" + "github.com/pingcap/tidb/pkg/meta/autoid" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/table" + "github.com/scalalang2/golang-fifo" + "github.com/scalalang2/golang-fifo/sieve" + "github.com/tidwall/btree" + "golang.org/x/sync/singleflight" +) + +var enableV2 atomic.Bool + +type Item struct { + dbName string + dbID int64 + tableName string + tableID int64 + schemaVersion int64 +} + +type schemaItem struct { + schemaVersion int64 + dbInfo *model.DBInfo +} + +func (si *schemaItem) Name() string { + return si.dbInfo.Name.L +} + +type VersionAndTimestamp struct { + schemaVersion int64 + timestamp uint64 +} + +type InfoSchemaData struct { + // For the TableByName API, sorted by {dbName, tableName, tableID} => schemaVersion + // + // If the schema version +1 but a specific table does not change, the old record is + // kept and no new {dbName, tableName, tableID} => schemaVersion+1 record been added. + // + // It means as long as we can find a item in it, the item is available, even through the + // schema version maybe smaller than required. + // + // *IMPORTANT RESTRICTION*: Do we have the full data in memory? NO! + name2id *btree.BTreeG[Item] + + // For the TableByID API, sorted by {tableID} + // To reload model.TableInfo, we need both table ID and database ID for meta kv API. + // It provides the tableID => databaseID mapping. + // + // *IMPORTANT RESTRICTION*: Do we have the full data in memory? NO! + // But this mapping should be synced with name2id. + byID *btree.BTreeG[Item] + + cache fifo.Cache[cacheKey, table.Table] + + // For the SchemaByName API + schemaMap *btree.BTreeG[schemaItem] + + // sorted by both SchemaVersion and timestamp in descending order, assume they have same order + mu struct { + sync.RWMutex + versionTimestamps []VersionAndTimestamp + } + + // For information_schema/metrics_schema/performance_schema etc + specials map[string]*schemaTables +} + +func (isd *InfoSchemaData) getVersionByTS(ts uint64) (int64, bool) { + isd.mu.RLock() + defer isd.mu.RUnlock() + + return isd.getVersionByTSNoLock(ts) +} + +func (isd *InfoSchemaData) getVersionByTSNoLock(ts uint64) (int64, bool) { + // search one by one instead of binary search, because the timestamp of a schema could be 0 + // this is ok because the size of h.cache is small (currently set to 16) + // moreover, the most likely hit element in the array is the first one in steady mode + // thus it may have better performance than binary search + for i, vt := range isd.mu.versionTimestamps { + if vt.timestamp == 0 || ts < uint64(vt.timestamp) { + // is.timestamp == 0 means the schema ts is unknown, so we can't use it, then just skip it. + // ts < is.timestamp means the schema is newer than ts, so we can't use it too, just skip it to find the older one. + continue + } + // ts >= is.timestamp must be true after the above condition. + if i == 0 { + // the first element is the latest schema, so we can return it directly. + return vt.schemaVersion, true + } + if isd.mu.versionTimestamps[i-1].schemaVersion == vt.schemaVersion+1 && uint64(isd.mu.versionTimestamps[i-1].timestamp) > ts { + // This first condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, + // but current(cache[i]) schema-version is not 9, then current schema is not suitable for ts. + // The second condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts. + return vt.schemaVersion, true + } + // current schema is not suitable for ts, then break the loop to avoid the unnecessary search. + break + } + + return 0, false +} + +type cacheKey struct { + tableID int64 + schemaVersion int64 +} + +func NewInfoSchemaData() *InfoSchemaData { + ret := &InfoSchemaData{ + byID: btree.NewBTreeG[Item](compareByID), + name2id: btree.NewBTreeG[Item](compareByName), + schemaMap: btree.NewBTreeG[schemaItem](compareBySchema), + // TODO: limit by size instead of by table count. + cache: sieve.New[cacheKey, table.Table](1000), + specials: make(map[string]*schemaTables), + } + return ret +} + +func (isd *InfoSchemaData) add(item Item, tbl table.Table) { + isd.byID.Set(item) + isd.name2id.Set(item) + isd.cache.Set(cacheKey{item.tableID, item.schemaVersion}, tbl) +} + +func (isd *InfoSchemaData) addSpecialDB(di *model.DBInfo, tables *schemaTables) { + isd.specials[di.Name.L] = tables +} + +func (isd *InfoSchemaData) addDB(schemaVersion int64, dbInfo *model.DBInfo) { + isd.schemaMap.Set(schemaItem{schemaVersion: schemaVersion, dbInfo: dbInfo}) +} + +func compareByID(a, b Item) bool { + if a.tableID < b.tableID { + return true + } + if a.tableID > b.tableID { + return false + } + + return a.dbID < b.dbID +} + +func compareByName(a, b Item) bool { + if a.dbName < b.dbName { + return true + } + if a.dbName > b.dbName { + return false + } + + if a.tableName < b.tableName { + return true + } + if a.tableName > b.tableName { + return false + } + + return a.tableID < b.tableID +} + +func compareBySchema(a, b schemaItem) bool { + if a.Name() < b.Name() { + return true + } + if a.Name() > b.Name() { + return false + } + return a.schemaVersion < b.schemaVersion +} + +var _ InfoSchema = &infoschemaV2{} + +type infoschemaV2 struct { + infoSchemaMisc + r autoid.Requirement + ts uint64 + schemaVersion int64 + *InfoSchemaData +} + +func search(bt *btree.BTreeG[Item], schemaVersion int64, end Item, eq func(a, b *Item) bool) (Item, bool) { + var ok bool + var itm Item + // Iterate through the btree, find the query item whose schema version is the largest one (latest). + bt.Descend(end, func(item Item) bool { + if !eq(&end, &item) { + return false + } + if item.schemaVersion > schemaVersion { + // We're seaching historical snapshot, and this record is newer than us, we can't use it. + // Skip the record. + return true + } + // schema version of the items should <= query's schema version. + if !ok { // The first one found. + ok = true + itm = item + } else { // The latest one + if item.schemaVersion > itm.schemaVersion { + itm = item + } + } + return true + }) + return itm, ok +} + +func (is *infoschemaV2) TableByID(id int64) (val table.Table, ok bool) { + if isTableVirtual(id) { + return nil, false + } + + // Get from the cache. + key := cacheKey{id, is.schemaVersion} + tbl, found := is.cache.Get(key) + if found && tbl != nil { + return tbl.(table.Table), true + } + + eq := func(a, b *Item) bool { return a.tableID == b.tableID } + itm, ok := search(is.byID, is.schemaVersion, Item{tableID: id, dbID: math.MaxInt64}, eq) + if !ok { + return nil, false + } + + // Maybe the table is evicted? need to reload. + ret, err := loadTableInfo(is.r, is.InfoSchemaData, id, itm.dbID, is.ts) + if err == nil { + is.cache.Set(key, ret) + return ret, true + } else { + // fmt.Println("load table error ==", id, err) + } + return nil, false +} + +func (is *infoschemaV2) TableByName(schema, tbl model.CIStr) (t table.Table, err error) { + if schema.L == "information_schema" || schema.L == "metrics_schema" || schema.L == "performance_schema" { + if tbNames, ok := is.specials[schema.L]; ok { + if t, ok = tbNames.tables[tbl.L]; ok { + return + } + } + return nil, ErrTableNotExists.GenWithStackByArgs(schema, tbl) + } + + eq := func(a, b *Item) bool { return a.dbName == b.dbName && a.tableName == b.tableName } + itm, ok := search(is.name2id, is.schemaVersion, Item{dbName: schema.L, tableName: tbl.L, tableID: math.MaxInt64}, eq) + if !ok { + // TODO: in the future, this may happen and we need to check tikv to see whether table exists. + return nil, ErrTableNotExists.GenWithStackByArgs(schema, tbl) + } + + // Get from the cache. + key := cacheKey{itm.tableID, is.schemaVersion} + res, found := is.cache.Get(key) + if found && res != nil { + return res.(table.Table), nil + } + + // Maybe the table is evicted? need to reload. + ret, err := loadTableInfo(is.r, is.InfoSchemaData, itm.tableID, itm.dbID, is.ts) + if err != nil { + return nil, errors.Trace(err) + } + is.cache.Set(key, ret) + return ret, nil +} + +func (is *infoschemaV2) SchemaByName(schema model.CIStr) (val *model.DBInfo, ok bool) { + var dbInfo model.DBInfo + dbInfo.Name = schema + is.schemaMap.Descend(schemaItem{dbInfo: &dbInfo, schemaVersion: math.MaxInt64}, func(item schemaItem) bool { + if item.Name() != schema.L { + ok = false + return false + } + if item.schemaVersion <= is.schemaVersion { + ok = true + val = item.dbInfo + return false + } + return true + }) + return +} + +func (is *infoschemaV2) AllSchemas() (schemas []*model.DBInfo) { + is.schemaMap.Scan(func(item schemaItem) bool { + // TODO: version? + schemas = append(schemas, item.dbInfo) + return true + }) + return +} + +func (is *infoschemaV2) SchemaMetaVersion() int64 { + return is.schemaVersion +} + +func (is *infoschemaV2) SchemaExists(schema model.CIStr) bool { + var ok bool + // TODO: support different version + is.schemaMap.Scan(func(item schemaItem) bool { + if item.dbInfo.Name.L == schema.L { + ok = true + return false + } + return true + }) + return ok +} + +func (is *infoschemaV2) FindTableByPartitionID(partitionID int64) (table.Table, *model.DBInfo, *model.PartitionDefinition) { + panic("TODO") +} + +func (is *infoschemaV2) TableExists(schema, table model.CIStr) bool { + _, err := is.TableByName(schema, table) + if err == nil { + return true + } + return false +} + +func (is *infoschemaV2) SchemaByID(id int64) (*model.DBInfo, bool) { + var ok bool + var dbInfo *model.DBInfo + is.schemaMap.Scan(func(item schemaItem) bool { + if item.dbInfo.ID == id { + ok = true + dbInfo = item.dbInfo + return false + } + return true + }) + return dbInfo, ok +} + +func (is *infoschemaV2) SchemaTables(schema model.CIStr) (tables []table.Table) { + dbInfo, ok := is.SchemaByName(schema) + if !ok { + return + } + snapshot := is.r.Store().GetSnapshot(kv.NewVersion(is.ts)) + // Using the KV timeout read feature to address the issue of potential DDL lease expiration when + // the meta region leader is slow. + snapshot.SetOption(kv.TiKVClientReadTimeout, uint64(3000)) // 3000ms. + m := meta.NewSnapshotMeta(snapshot) + tblInfos, err := m.ListSimpleTables(dbInfo.ID) + if err != nil { + if meta.ErrDBNotExists.Equal(err) { + return nil + } + panic(err) + } + tables = make([]table.Table, 0, len(tblInfos)) + for _, tblInfo := range tblInfos { + tbl, ok := is.TableByID(tblInfo.ID) + if !ok { + // what happen? + continue + } + tables = append(tables, tbl) + } + return +} + +func loadTableInfo(r autoid.Requirement, infoData *InfoSchemaData, tblID, dbID int64, ts uint64) (table.Table, error) { + // Try to avoid repeated concurrency loading. + res, err, _ := sf.Do(fmt.Sprintf("%d-%d", dbID, tblID), func() (ret any, err error) { + snapshot := r.Store().GetSnapshot(kv.NewVersion(ts)) + // Using the KV timeout read feature to address the issue of potential DDL lease expiration when + // the meta region leader is slow. + snapshot.SetOption(kv.TiKVClientReadTimeout, uint64(3000)) // 3000ms. + m := meta.NewSnapshotMeta(snapshot) + + ret, err = m.GetTable(dbID, tblID) + return + }) + + if err != nil { + // TODO load table panic!!! + panic(err) + } + tblInfo := res.(*model.TableInfo) // TODO: it could be missing!!! + + ConvertCharsetCollateToLowerCaseIfNeed(tblInfo) + ConvertOldVersionUTF8ToUTF8MB4IfNeed(tblInfo) + allocs := autoid.NewAllocatorsFromTblInfo(r, dbID, tblInfo) + b := NewBuilder(r, nil, infoData) // TODO: handle cached table!!! + ret, err := b.tableFromMeta(allocs, tblInfo) + if err != nil { + return nil, errors.Trace(err) + } + return ret, nil +} + +var sf = &singleflight.Group{} + +func isTableVirtual(id int64) bool { + return (id & (1 << 62)) > 0 +} From 8e8006cfed54963a52718a6282641daae0681983 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sun, 18 Feb 2024 18:28:20 +0800 Subject: [PATCH 07/14] make lint happy --- pkg/infoschema/infoschema_v2.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/pkg/infoschema/infoschema_v2.go b/pkg/infoschema/infoschema_v2.go index d651125f955b1..ddeda06ef325b 100644 --- a/pkg/infoschema/infoschema_v2.go +++ b/pkg/infoschema/infoschema_v2.go @@ -90,7 +90,7 @@ func (isd *InfoSchemaData) getVersionByTSNoLock(ts uint64) (int64, bool) { // moreover, the most likely hit element in the array is the first one in steady mode // thus it may have better performance than binary search for i, vt := range isd.mu.versionTimestamps { - if vt.timestamp == 0 || ts < uint64(vt.timestamp) { + if vt.timestamp == 0 || ts < vt.timestamp { // is.timestamp == 0 means the schema ts is unknown, so we can't use it, then just skip it. // ts < is.timestamp means the schema is newer than ts, so we can't use it too, just skip it to find the older one. continue @@ -100,7 +100,7 @@ func (isd *InfoSchemaData) getVersionByTSNoLock(ts uint64) (int64, bool) { // the first element is the latest schema, so we can return it directly. return vt.schemaVersion, true } - if isd.mu.versionTimestamps[i-1].schemaVersion == vt.schemaVersion+1 && uint64(isd.mu.versionTimestamps[i-1].timestamp) > ts { + if isd.mu.versionTimestamps[i-1].schemaVersion == vt.schemaVersion+1 && isd.mu.versionTimestamps[i-1].timestamp > ts { // This first condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, // but current(cache[i]) schema-version is not 9, then current schema is not suitable for ts. // The second condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts. @@ -229,7 +229,7 @@ func (is *infoschemaV2) TableByID(id int64) (val table.Table, ok bool) { key := cacheKey{id, is.schemaVersion} tbl, found := is.cache.Get(key) if found && tbl != nil { - return tbl.(table.Table), true + return tbl, true } eq := func(a, b *Item) bool { return a.tableID == b.tableID } @@ -243,8 +243,6 @@ func (is *infoschemaV2) TableByID(id int64) (val table.Table, ok bool) { if err == nil { is.cache.Set(key, ret) return ret, true - } else { - // fmt.Println("load table error ==", id, err) } return nil, false } @@ -270,7 +268,7 @@ func (is *infoschemaV2) TableByName(schema, tbl model.CIStr) (t table.Table, err key := cacheKey{itm.tableID, is.schemaVersion} res, found := is.cache.Get(key) if found && res != nil { - return res.(table.Table), nil + return res, nil } // Maybe the table is evicted? need to reload. @@ -332,10 +330,7 @@ func (is *infoschemaV2) FindTableByPartitionID(partitionID int64) (table.Table, func (is *infoschemaV2) TableExists(schema, table model.CIStr) bool { _, err := is.TableByName(schema, table) - if err == nil { - return true - } - return false + return err == nil } func (is *infoschemaV2) SchemaByID(id int64) (*model.DBInfo, bool) { From aa769048cf8aab6c280655d4d391d8cdcb9dbbd4 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sun, 18 Feb 2024 19:00:30 +0800 Subject: [PATCH 08/14] fix build --- pkg/infoschema/builder.go | 12 ++++----- pkg/infoschema/cache.go | 4 +-- pkg/infoschema/infoschema_test.go | 10 +++---- pkg/infoschema/infoschema_v2.go | 30 +++++++++++---------- pkg/infoschema/test/cachetest/cache_test.go | 16 +++++------ 5 files changed, 37 insertions(+), 35 deletions(-) diff --git a/pkg/infoschema/builder.go b/pkg/infoschema/builder.go index b6e0d77172267..7f054a39cf342 100644 --- a/pkg/infoschema/builder.go +++ b/pkg/infoschema/builder.go @@ -189,7 +189,7 @@ type Builder struct { factory func() (pools.Resource, error) bundleInfoBuilder - infoData *InfoSchemaData + infoData *Data } // ApplyDiff applies SchemaDiff to the new InfoSchema. @@ -972,10 +972,10 @@ func (b *Builder) Build() InfoSchema { if enableV2.Load() { return &infoschemaProxy{ infoschemaV2: infoschemaV2{ - ts: math.MaxUint64, // TODO: should be the correct TS - r: b.Requirement, - InfoSchemaData: b.infoData, - schemaVersion: b.is.SchemaMetaVersion(), + ts: math.MaxUint64, // TODO: should be the correct TS + r: b.Requirement, + Data: b.infoData, + schemaVersion: b.is.SchemaMetaVersion(), }, v1: b.is, } @@ -1199,7 +1199,7 @@ func RegisterVirtualTable(dbInfo *model.DBInfo, tableFromMeta tableFromMetaFunc) } // NewBuilder creates a new Builder with a Handle. -func NewBuilder(r autoid.Requirement, factory func() (pools.Resource, error), infoData *InfoSchemaData) *Builder { +func NewBuilder(r autoid.Requirement, factory func() (pools.Resource, error), infoData *Data) *Builder { return &Builder{ Requirement: r, is: &infoSchema{ diff --git a/pkg/infoschema/cache.go b/pkg/infoschema/cache.go index 3520a67200ba7..2cee88b21bbbf 100644 --- a/pkg/infoschema/cache.go +++ b/pkg/infoschema/cache.go @@ -33,7 +33,7 @@ type InfoCache struct { cache []schemaAndTimestamp r autoid.Requirement - Data *InfoSchemaData + Data *Data } type schemaAndTimestamp struct { @@ -43,7 +43,7 @@ type schemaAndTimestamp struct { // NewCache creates a new InfoCache. func NewCache(r autoid.Requirement, capacity int) *InfoCache { - infoData := NewInfoSchemaData() + infoData := NewData() return &InfoCache{ cache: make([]schemaAndTimestamp, 0, capacity), r: r, diff --git a/pkg/infoschema/infoschema_test.go b/pkg/infoschema/infoschema_test.go index cd4c79c6a1a32..18a405aed3f55 100644 --- a/pkg/infoschema/infoschema_test.go +++ b/pkg/infoschema/infoschema_test.go @@ -111,7 +111,7 @@ func TestBasic(t *testing.T) { }) require.NoError(t, err) - builder, err := infoschema.NewBuilder(dom, nil).InitWithDBInfos(dbInfos, nil, nil, 1) + builder, err := infoschema.NewBuilder(dom, nil, nil).InitWithDBInfos(dbInfos, nil, nil, 1) require.NoError(t, err) txn, err := store.Begin() @@ -250,7 +250,7 @@ func TestInfoTables(t *testing.T) { require.NoError(t, err) }() - builder, err := infoschema.NewBuilder(dom, nil).InitWithDBInfos(nil, nil, nil, 0) + builder, err := infoschema.NewBuilder(dom, nil, nil).InitWithDBInfos(nil, nil, nil, 0) require.NoError(t, err) is := builder.Build() @@ -327,7 +327,7 @@ func TestBuildSchemaWithGlobalTemporaryTable(t *testing.T) { err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) for _, change := range changes { - builder := infoschema.NewBuilder(dom, nil).InitWithOldInfoSchema(curIs) + builder := infoschema.NewBuilder(dom, nil, nil).InitWithOldInfoSchema(curIs) change(m, builder) curIs = builder.Build() } @@ -405,7 +405,7 @@ func TestBuildSchemaWithGlobalTemporaryTable(t *testing.T) { // full load newDB, ok := newIS.SchemaByName(model.NewCIStr("test")) require.True(t, ok) - builder, err := infoschema.NewBuilder(dom, nil).InitWithDBInfos([]*model.DBInfo{newDB}, newIS.AllPlacementPolicies(), newIS.AllResourceGroups(), newIS.SchemaMetaVersion()) + builder, err := infoschema.NewBuilder(dom, nil, nil).InitWithDBInfos([]*model.DBInfo{newDB}, newIS.AllPlacementPolicies(), newIS.AllResourceGroups(), newIS.SchemaMetaVersion()) require.NoError(t, err) require.True(t, builder.Build().HasTemporaryTable()) @@ -530,7 +530,7 @@ func TestBuildBundle(t *testing.T) { assertBundle(is, tbl2.Meta().ID, nil) assertBundle(is, p1.ID, p1Bundle) - builder, err := infoschema.NewBuilder(dom, nil).InitWithDBInfos([]*model.DBInfo{db}, is.AllPlacementPolicies(), is.AllResourceGroups(), is.SchemaMetaVersion()) + builder, err := infoschema.NewBuilder(dom, nil, nil).InitWithDBInfos([]*model.DBInfo{db}, is.AllPlacementPolicies(), is.AllResourceGroups(), is.SchemaMetaVersion()) require.NoError(t, err) is2 := builder.Build() assertBundle(is2, tbl1.Meta().ID, tb1Bundle) diff --git a/pkg/infoschema/infoschema_v2.go b/pkg/infoschema/infoschema_v2.go index ddeda06ef325b..8c2c520935e16 100644 --- a/pkg/infoschema/infoschema_v2.go +++ b/pkg/infoschema/infoschema_v2.go @@ -20,6 +20,7 @@ import ( var enableV2 atomic.Bool +// Item is the btree item sorted by name or by id. type Item struct { dbName string dbID int64 @@ -37,12 +38,13 @@ func (si *schemaItem) Name() string { return si.dbInfo.Name.L } -type VersionAndTimestamp struct { +// versionAndTimestamp is the tuple of schema version and timestamp. +type versionAndTimestamp struct { schemaVersion int64 timestamp uint64 } -type InfoSchemaData struct { +type Data struct { // For the TableByName API, sorted by {dbName, tableName, tableID} => schemaVersion // // If the schema version +1 but a specific table does not change, the old record is @@ -70,21 +72,21 @@ type InfoSchemaData struct { // sorted by both SchemaVersion and timestamp in descending order, assume they have same order mu struct { sync.RWMutex - versionTimestamps []VersionAndTimestamp + versionTimestamps []versionAndTimestamp } // For information_schema/metrics_schema/performance_schema etc specials map[string]*schemaTables } -func (isd *InfoSchemaData) getVersionByTS(ts uint64) (int64, bool) { +func (isd *Data) getVersionByTS(ts uint64) (int64, bool) { isd.mu.RLock() defer isd.mu.RUnlock() return isd.getVersionByTSNoLock(ts) } -func (isd *InfoSchemaData) getVersionByTSNoLock(ts uint64) (int64, bool) { +func (isd *Data) getVersionByTSNoLock(ts uint64) (int64, bool) { // search one by one instead of binary search, because the timestamp of a schema could be 0 // this is ok because the size of h.cache is small (currently set to 16) // moreover, the most likely hit element in the array is the first one in steady mode @@ -118,8 +120,8 @@ type cacheKey struct { schemaVersion int64 } -func NewInfoSchemaData() *InfoSchemaData { - ret := &InfoSchemaData{ +func NewData() *Data { + ret := &Data{ byID: btree.NewBTreeG[Item](compareByID), name2id: btree.NewBTreeG[Item](compareByName), schemaMap: btree.NewBTreeG[schemaItem](compareBySchema), @@ -130,17 +132,17 @@ func NewInfoSchemaData() *InfoSchemaData { return ret } -func (isd *InfoSchemaData) add(item Item, tbl table.Table) { +func (isd *Data) add(item Item, tbl table.Table) { isd.byID.Set(item) isd.name2id.Set(item) isd.cache.Set(cacheKey{item.tableID, item.schemaVersion}, tbl) } -func (isd *InfoSchemaData) addSpecialDB(di *model.DBInfo, tables *schemaTables) { +func (isd *Data) addSpecialDB(di *model.DBInfo, tables *schemaTables) { isd.specials[di.Name.L] = tables } -func (isd *InfoSchemaData) addDB(schemaVersion int64, dbInfo *model.DBInfo) { +func (isd *Data) addDB(schemaVersion int64, dbInfo *model.DBInfo) { isd.schemaMap.Set(schemaItem{schemaVersion: schemaVersion, dbInfo: dbInfo}) } @@ -190,7 +192,7 @@ type infoschemaV2 struct { r autoid.Requirement ts uint64 schemaVersion int64 - *InfoSchemaData + *Data } func search(bt *btree.BTreeG[Item], schemaVersion int64, end Item, eq func(a, b *Item) bool) (Item, bool) { @@ -239,7 +241,7 @@ func (is *infoschemaV2) TableByID(id int64) (val table.Table, ok bool) { } // Maybe the table is evicted? need to reload. - ret, err := loadTableInfo(is.r, is.InfoSchemaData, id, itm.dbID, is.ts) + ret, err := loadTableInfo(is.r, is.Data, id, itm.dbID, is.ts) if err == nil { is.cache.Set(key, ret) return ret, true @@ -272,7 +274,7 @@ func (is *infoschemaV2) TableByName(schema, tbl model.CIStr) (t table.Table, err } // Maybe the table is evicted? need to reload. - ret, err := loadTableInfo(is.r, is.InfoSchemaData, itm.tableID, itm.dbID, is.ts) + ret, err := loadTableInfo(is.r, is.Data, itm.tableID, itm.dbID, is.ts) if err != nil { return nil, errors.Trace(err) } @@ -376,7 +378,7 @@ func (is *infoschemaV2) SchemaTables(schema model.CIStr) (tables []table.Table) return } -func loadTableInfo(r autoid.Requirement, infoData *InfoSchemaData, tblID, dbID int64, ts uint64) (table.Table, error) { +func loadTableInfo(r autoid.Requirement, infoData *Data, tblID, dbID int64, ts uint64) (table.Table, error) { // Try to avoid repeated concurrency loading. res, err, _ := sf.Do(fmt.Sprintf("%d-%d", dbID, tblID), func() (ret any, err error) { snapshot := r.Store().GetSnapshot(kv.NewVersion(ts)) diff --git a/pkg/infoschema/test/cachetest/cache_test.go b/pkg/infoschema/test/cachetest/cache_test.go index 29e42491b0f7b..4f9f9249a3f66 100644 --- a/pkg/infoschema/test/cachetest/cache_test.go +++ b/pkg/infoschema/test/cachetest/cache_test.go @@ -23,12 +23,12 @@ import ( ) func TestNewCache(t *testing.T) { - ic := infoschema.NewCache(16) + ic := infoschema.NewCache(nil, 16) require.NotNil(t, ic) } func TestInsert(t *testing.T) { - ic := infoschema.NewCache(3) + ic := infoschema.NewCache(nil, 3) require.NotNil(t, ic) is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2) @@ -100,7 +100,7 @@ func TestInsert(t *testing.T) { } func TestGetByVersion(t *testing.T) { - ic := infoschema.NewCache(2) + ic := infoschema.NewCache(nil, 2) require.NotNil(t, ic) is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1) ic.Insert(is1, 1) @@ -115,7 +115,7 @@ func TestGetByVersion(t *testing.T) { } func TestGetLatest(t *testing.T) { - ic := infoschema.NewCache(16) + ic := infoschema.NewCache(nil, 16) require.NotNil(t, ic) require.Nil(t, ic.GetLatest()) @@ -135,7 +135,7 @@ func TestGetLatest(t *testing.T) { } func TestGetByTimestamp(t *testing.T) { - ic := infoschema.NewCache(16) + ic := infoschema.NewCache(nil, 16) require.NotNil(t, ic) require.Nil(t, ic.GetLatest()) require.Equal(t, 0, ic.Len()) @@ -180,7 +180,7 @@ func TestGetByTimestamp(t *testing.T) { } func TestReSize(t *testing.T) { - ic := infoschema.NewCache(2) + ic := infoschema.NewCache(nil, 2) require.NotNil(t, ic) is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1) ic.Insert(is1, 1) @@ -214,7 +214,7 @@ func TestReSize(t *testing.T) { } func TestCacheWithSchemaTsZero(t *testing.T) { - ic := infoschema.NewCache(16) + ic := infoschema.NewCache(nil, 16) require.NotNil(t, ic) for i := 1; i <= 8; i++ { @@ -268,7 +268,7 @@ func TestCacheWithSchemaTsZero(t *testing.T) { require.Equal(t, 16, ic.Size()) // Test for there is a hole in the middle. - ic = infoschema.NewCache(16) + ic = infoschema.NewCache(nil, 16) // mock for restart with full load the latest version schema. ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 100), 100) From 4c9de327dcf894f78a09cf3aa6d176663d3e5cf5 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sun, 18 Feb 2024 19:43:24 +0800 Subject: [PATCH 09/14] fix build --- pkg/ddl/options_test.go | 2 +- pkg/ddl/placement_policy_ddl_test.go | 2 +- pkg/ddl/syncer/state_syncer_test.go | 2 +- pkg/ddl/syncer/syncer_test.go | 4 ++-- pkg/domain/ru_stats_test.go | 2 +- pkg/executor/slow_query_test.go | 2 +- pkg/executor/stmtsummary_test.go | 6 +++--- pkg/owner/manager_test.go | 6 +++--- 8 files changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/ddl/options_test.go b/pkg/ddl/options_test.go index 98447e65f776f..a7a299c1515e7 100644 --- a/pkg/ddl/options_test.go +++ b/pkg/ddl/options_test.go @@ -35,7 +35,7 @@ func TestOptions(t *testing.T) { callback := &ddl.BaseCallback{} lease := time.Second * 3 store := &mock.Store{} - infoHandle := infoschema.NewCache(16) + infoHandle := infoschema.NewCache(nil, 16) options := []ddl.Option{ ddl.WithEtcdClient(client), diff --git a/pkg/ddl/placement_policy_ddl_test.go b/pkg/ddl/placement_policy_ddl_test.go index 897e21d843907..7cdcdfcf33019 100644 --- a/pkg/ddl/placement_policy_ddl_test.go +++ b/pkg/ddl/placement_policy_ddl_test.go @@ -118,7 +118,7 @@ func TestPlacementPolicyInUse(t *testing.T) { t4.State = model.StatePublic db1.Tables = append(db1.Tables, t4) - builder, err := infoschema.NewBuilder(dom, nil).InitWithDBInfos( + builder, err := infoschema.NewBuilder(dom, nil, nil).InitWithDBInfos( []*model.DBInfo{db1, db2, dbP}, []*model.PolicyInfo{p1, p2, p3, p4, p5}, nil, diff --git a/pkg/ddl/syncer/state_syncer_test.go b/pkg/ddl/syncer/state_syncer_test.go index 1a4d984744c37..3a474f33a8f16 100644 --- a/pkg/ddl/syncer/state_syncer_test.go +++ b/pkg/ddl/syncer/state_syncer_test.go @@ -53,7 +53,7 @@ func TestStateSyncerSimple(t *testing.T) { cli := cluster.RandClient() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ic := infoschema.NewCache(2) + ic := infoschema.NewCache(nil, 2) ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) d := NewDDL( ctx, diff --git a/pkg/ddl/syncer/syncer_test.go b/pkg/ddl/syncer/syncer_test.go index 0b2afaf87bd49..a1d63bda02094 100644 --- a/pkg/ddl/syncer/syncer_test.go +++ b/pkg/ddl/syncer/syncer_test.go @@ -64,7 +64,7 @@ func TestSyncerSimple(t *testing.T) { cli := cluster.RandClient() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ic := infoschema.NewCache(2) + ic := infoschema.NewCache(nil, 2) ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) d := NewDDL( ctx, @@ -88,7 +88,7 @@ func TestSyncerSimple(t *testing.T) { key := util2.DDLAllSchemaVersions + "/" + d.OwnerManager().ID() checkRespKV(t, 1, key, syncer.InitialVersion, resp.Kvs...) - ic2 := infoschema.NewCache(2) + ic2 := infoschema.NewCache(nil, 2) ic2.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) d1 := NewDDL( ctx, diff --git a/pkg/domain/ru_stats_test.go b/pkg/domain/ru_stats_test.go index 8ea0d8df2cdd7..9b16dee93f931 100644 --- a/pkg/domain/ru_stats_test.go +++ b/pkg/domain/ru_stats_test.go @@ -63,7 +63,7 @@ func TestWriteRUStatistics(t *testing.T) { testInfo := &testInfoschema{ groups: infoGroups, } - testInfoCache := infoschema.NewCache(1) + testInfoCache := infoschema.NewCache(nil, 1) testInfoCache.Insert(testInfo, uint64(time.Now().Unix())) testRUWriter.RMClient = testRMClient testRUWriter.InfoCache = testInfoCache diff --git a/pkg/executor/slow_query_test.go b/pkg/executor/slow_query_test.go index 74717a2a6f804..cfebbd75b8475 100644 --- a/pkg/executor/slow_query_test.go +++ b/pkg/executor/slow_query_test.go @@ -56,7 +56,7 @@ func parseLog(retriever *slowQueryRetriever, sctx sessionctx.Context, reader *bu } func newSlowQueryRetriever() (*slowQueryRetriever, error) { - newISBuilder, err := infoschema.NewBuilder(nil, nil).InitWithDBInfos(nil, nil, nil, 0) + newISBuilder, err := infoschema.NewBuilder(nil, nil, nil).InitWithDBInfos(nil, nil, nil, 0) if err != nil { return nil, err } diff --git a/pkg/executor/stmtsummary_test.go b/pkg/executor/stmtsummary_test.go index 2da5935ffe6cd..092190656c733 100644 --- a/pkg/executor/stmtsummary_test.go +++ b/pkg/executor/stmtsummary_test.go @@ -30,7 +30,7 @@ import ( ) func TestStmtSummaryRetriverV2_TableStatementsSummary(t *testing.T) { - infoSchemaBuilder, err := infoschema.NewBuilder(nil, nil).InitWithDBInfos(nil, nil, nil, 0) + infoSchemaBuilder, err := infoschema.NewBuilder(nil, nil, nil).InitWithDBInfos(nil, nil, nil, 0) require.NoError(t, err) infoSchema := infoSchemaBuilder.Build() table, err := infoSchema.TableByName(util.InformationSchemaName, model.NewCIStr(infoschema.TableStatementsSummary)) @@ -73,7 +73,7 @@ func TestStmtSummaryRetriverV2_TableStatementsSummary(t *testing.T) { } func TestStmtSummaryRetriverV2_TableStatementsSummaryEvicted(t *testing.T) { - infoSchemaBuilder, err := infoschema.NewBuilder(nil, nil).InitWithDBInfos(nil, nil, nil, 0) + infoSchemaBuilder, err := infoschema.NewBuilder(nil, nil, nil).InitWithDBInfos(nil, nil, nil, 0) require.NoError(t, err) infoSchema := infoSchemaBuilder.Build() table, err := infoSchema.TableByName(util.InformationSchemaName, model.NewCIStr(infoschema.TableStatementsSummaryEvicted)) @@ -151,7 +151,7 @@ func TestStmtSummaryRetriverV2_TableStatementsSummaryHistory(t *testing.T) { stmtSummary.Add(stmtsummaryv2.GenerateStmtExecInfo4Test("digest3")) stmtSummary.Add(stmtsummaryv2.GenerateStmtExecInfo4Test("digest3")) - infoSchemaBuilder, err := infoschema.NewBuilder(nil, nil).InitWithDBInfos(nil, nil, nil, 0) + infoSchemaBuilder, err := infoschema.NewBuilder(nil, nil, nil).InitWithDBInfos(nil, nil, nil, 0) require.NoError(t, err) infoSchema := infoSchemaBuilder.Build() table, err := infoSchema.TableByName(util.InformationSchemaName, model.NewCIStr(infoschema.TableStatementsSummaryHistory)) diff --git a/pkg/owner/manager_test.go b/pkg/owner/manager_test.go index eff35dbd82108..6c86467b31b96 100644 --- a/pkg/owner/manager_test.go +++ b/pkg/owner/manager_test.go @@ -52,7 +52,7 @@ func newTestInfo(t *testing.T) *testInfo { cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 4}) cli := cluster.Client(0) - ic := infoschema.NewCache(2) + ic := infoschema.NewCache(nil, 2) ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) d := NewDDL( context.Background(), @@ -231,7 +231,7 @@ func TestCluster(t *testing.T) { require.True(t, isOwner) cli1 := cluster.Client(1) - ic2 := infoschema.NewCache(2) + ic2 := infoschema.NewCache(nil, 2) ic2.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) d1 := NewDDL( context.Background(), @@ -256,7 +256,7 @@ func TestCluster(t *testing.T) { d.OwnerManager().Cancel() // d3 (not owner) stop cli3 := cluster.Client(3) - ic3 := infoschema.NewCache(2) + ic3 := infoschema.NewCache(nil, 2) ic3.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) d3 := NewDDL( context.Background(), From 92e004ea0e55e856b5b7324478932fd53442ef03 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sun, 18 Feb 2024 19:54:22 +0800 Subject: [PATCH 10/14] fix buil --- br/pkg/restore/db_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/br/pkg/restore/db_test.go b/br/pkg/restore/db_test.go index 5df07ea772d96..3aa9affeeb8bb 100644 --- a/br/pkg/restore/db_test.go +++ b/br/pkg/restore/db_test.go @@ -376,7 +376,7 @@ func TestGetExistedUserDBs(t *testing.T) { dbs := restore.GetExistedUserDBs(dom) require.Equal(t, 0, len(dbs)) - builder, err := infoschema.NewBuilder(dom, nil).InitWithDBInfos( + builder, err := infoschema.NewBuilder(dom, nil, nil).InitWithDBInfos( []*model.DBInfo{ {Name: model.NewCIStr("mysql")}, {Name: model.NewCIStr("test")}, @@ -387,7 +387,7 @@ func TestGetExistedUserDBs(t *testing.T) { dbs = restore.GetExistedUserDBs(dom) require.Equal(t, 0, len(dbs)) - builder, err = infoschema.NewBuilder(dom, nil).InitWithDBInfos( + builder, err = infoschema.NewBuilder(dom, nil, nil).InitWithDBInfos( []*model.DBInfo{ {Name: model.NewCIStr("mysql")}, {Name: model.NewCIStr("test")}, @@ -399,7 +399,7 @@ func TestGetExistedUserDBs(t *testing.T) { dbs = restore.GetExistedUserDBs(dom) require.Equal(t, 1, len(dbs)) - builder, err = infoschema.NewBuilder(dom, nil).InitWithDBInfos( + builder, err = infoschema.NewBuilder(dom, nil, nil).InitWithDBInfos( []*model.DBInfo{ {Name: model.NewCIStr("mysql")}, {Name: model.NewCIStr("d1")}, From 69c580381768a6420dc93be42796f9b46a699831 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sun, 18 Feb 2024 20:25:29 +0800 Subject: [PATCH 11/14] make lint happy --- pkg/infoschema/infoschema_v2.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/infoschema/infoschema_v2.go b/pkg/infoschema/infoschema_v2.go index 8c2c520935e16..712aab69c56ea 100644 --- a/pkg/infoschema/infoschema_v2.go +++ b/pkg/infoschema/infoschema_v2.go @@ -44,6 +44,7 @@ type versionAndTimestamp struct { timestamp uint64 } +// Data is the core data struct of infoschema V2. type Data struct { // For the TableByName API, sorted by {dbName, tableName, tableID} => schemaVersion // @@ -120,6 +121,7 @@ type cacheKey struct { schemaVersion int64 } +// NewData creates an infoschema V2 data struct. func NewData() *Data { ret := &Data{ byID: btree.NewBTreeG[Item](compareByID), From 520038b7dbe175bd67207af5fbed83cd722522a0 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sun, 18 Feb 2024 20:37:09 +0800 Subject: [PATCH 12/14] add license header --- pkg/infoschema/infoschema_v2.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/infoschema/infoschema_v2.go b/pkg/infoschema/infoschema_v2.go index 712aab69c56ea..8b2630512dc82 100644 --- a/pkg/infoschema/infoschema_v2.go +++ b/pkg/infoschema/infoschema_v2.go @@ -1,3 +1,17 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package infoschema import ( From d99ca3cdd784c3708ccfd0fb1cf0bb0bd3152529 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 22 Feb 2024 21:36:52 +0800 Subject: [PATCH 13/14] address comment --- pkg/infoschema/builder.go | 4 +- pkg/infoschema/infoschema_v2.go | 107 ++++++++++++++++++-------------- 2 files changed, 64 insertions(+), 47 deletions(-) diff --git a/pkg/infoschema/builder.go b/pkg/infoschema/builder.go index 7f054a39cf342..407b097f83d8b 100644 --- a/pkg/infoschema/builder.go +++ b/pkg/infoschema/builder.go @@ -874,7 +874,7 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i return cmp.Compare(i.Meta().ID, j.Meta().ID) }) if enableV2.Load() { - b.infoData.add(Item{ + b.infoData.add(tableItem{ dbName: dbInfo.Name.L, dbID: dbInfo.ID, tableName: tblInfo.Name.L, @@ -1158,7 +1158,7 @@ func (b *Builder) createSchemaTablesForDB(di *model.DBInfo, tableFromMeta tableF b.is.sortedTablesBuckets[tableBucketIdx(t.ID)] = append(sortedTbls, tbl) if enableV2.Load() { - b.infoData.add(Item{ + b.infoData.add(tableItem{ dbName: di.Name.L, dbID: di.ID, tableName: t.Name.L, diff --git a/pkg/infoschema/infoschema_v2.go b/pkg/infoschema/infoschema_v2.go index 8b2630512dc82..f2fa11672d29c 100644 --- a/pkg/infoschema/infoschema_v2.go +++ b/pkg/infoschema/infoschema_v2.go @@ -26,6 +26,8 @@ import ( "github.com/pingcap/tidb/pkg/meta/autoid" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/table" + "github.com/pingcap/tidb/pkg/table/tables" + "github.com/pingcap/tidb/pkg/util" "github.com/scalalang2/golang-fifo" "github.com/scalalang2/golang-fifo/sieve" "github.com/tidwall/btree" @@ -34,8 +36,8 @@ import ( var enableV2 atomic.Bool -// Item is the btree item sorted by name or by id. -type Item struct { +// tableItem is the btree item sorted by name or by id. +type tableItem struct { dbName string dbID int64 tableName string @@ -65,23 +67,24 @@ type Data struct { // If the schema version +1 but a specific table does not change, the old record is // kept and no new {dbName, tableName, tableID} => schemaVersion+1 record been added. // - // It means as long as we can find a item in it, the item is available, even through the + // It means as long as we can find an item in it, the item is available, even through the // schema version maybe smaller than required. // // *IMPORTANT RESTRICTION*: Do we have the full data in memory? NO! - name2id *btree.BTreeG[Item] + byName *btree.BTreeG[tableItem] // For the TableByID API, sorted by {tableID} // To reload model.TableInfo, we need both table ID and database ID for meta kv API. // It provides the tableID => databaseID mapping. // // *IMPORTANT RESTRICTION*: Do we have the full data in memory? NO! - // But this mapping should be synced with name2id. - byID *btree.BTreeG[Item] + // But this mapping should be synced with byName. + byID *btree.BTreeG[tableItem] - cache fifo.Cache[cacheKey, table.Table] + tableCache fifo.Cache[tableCacheKey, table.Table] - // For the SchemaByName API + // For the SchemaByName API, sorted by {dbName, schemaVersion} => model.DBInfo + // Stores the full data in memory. schemaMap *btree.BTreeG[schemaItem] // sorted by both SchemaVersion and timestamp in descending order, assume they have same order @@ -103,7 +106,7 @@ func (isd *Data) getVersionByTS(ts uint64) (int64, bool) { func (isd *Data) getVersionByTSNoLock(ts uint64) (int64, bool) { // search one by one instead of binary search, because the timestamp of a schema could be 0 - // this is ok because the size of h.cache is small (currently set to 16) + // this is ok because the size of h.tableCache is small (currently set to 16) // moreover, the most likely hit element in the array is the first one in steady mode // thus it may have better performance than binary search for i, vt := range isd.mu.versionTimestamps { @@ -130,7 +133,7 @@ func (isd *Data) getVersionByTSNoLock(ts uint64) (int64, bool) { return 0, false } -type cacheKey struct { +type tableCacheKey struct { tableID int64 schemaVersion int64 } @@ -138,20 +141,20 @@ type cacheKey struct { // NewData creates an infoschema V2 data struct. func NewData() *Data { ret := &Data{ - byID: btree.NewBTreeG[Item](compareByID), - name2id: btree.NewBTreeG[Item](compareByName), - schemaMap: btree.NewBTreeG[schemaItem](compareBySchema), + byID: btree.NewBTreeG[tableItem](compareByID), + byName: btree.NewBTreeG[tableItem](compareByName), + schemaMap: btree.NewBTreeG[schemaItem](compareSchemaItem), // TODO: limit by size instead of by table count. - cache: sieve.New[cacheKey, table.Table](1000), - specials: make(map[string]*schemaTables), + tableCache: sieve.New[tableCacheKey, table.Table](1000), + specials: make(map[string]*schemaTables), } return ret } -func (isd *Data) add(item Item, tbl table.Table) { +func (isd *Data) add(item tableItem, tbl table.Table) { isd.byID.Set(item) - isd.name2id.Set(item) - isd.cache.Set(cacheKey{item.tableID, item.schemaVersion}, tbl) + isd.byName.Set(item) + isd.tableCache.Set(tableCacheKey{item.tableID, item.schemaVersion}, tbl) } func (isd *Data) addSpecialDB(di *model.DBInfo, tables *schemaTables) { @@ -162,7 +165,7 @@ func (isd *Data) addDB(schemaVersion int64, dbInfo *model.DBInfo) { isd.schemaMap.Set(schemaItem{schemaVersion: schemaVersion, dbInfo: dbInfo}) } -func compareByID(a, b Item) bool { +func compareByID(a, b tableItem) bool { if a.tableID < b.tableID { return true } @@ -173,7 +176,7 @@ func compareByID(a, b Item) bool { return a.dbID < b.dbID } -func compareByName(a, b Item) bool { +func compareByName(a, b tableItem) bool { if a.dbName < b.dbName { return true } @@ -191,7 +194,7 @@ func compareByName(a, b Item) bool { return a.tableID < b.tableID } -func compareBySchema(a, b schemaItem) bool { +func compareSchemaItem(a, b schemaItem) bool { if a.Name() < b.Name() { return true } @@ -211,12 +214,12 @@ type infoschemaV2 struct { *Data } -func search(bt *btree.BTreeG[Item], schemaVersion int64, end Item, eq func(a, b *Item) bool) (Item, bool) { +func search(bt *btree.BTreeG[tableItem], schemaVersion int64, end tableItem, matchFn func(a, b *tableItem) bool) (tableItem, bool) { var ok bool - var itm Item + var target tableItem // Iterate through the btree, find the query item whose schema version is the largest one (latest). - bt.Descend(end, func(item Item) bool { - if !eq(&end, &item) { + bt.Descend(end, func(item tableItem) bool { + if !matchFn(&end, &item) { return false } if item.schemaVersion > schemaVersion { @@ -227,46 +230,56 @@ func search(bt *btree.BTreeG[Item], schemaVersion int64, end Item, eq func(a, b // schema version of the items should <= query's schema version. if !ok { // The first one found. ok = true - itm = item + target = item } else { // The latest one - if item.schemaVersion > itm.schemaVersion { - itm = item + if item.schemaVersion > target.schemaVersion { + target = item } } return true }) - return itm, ok + return target, ok } func (is *infoschemaV2) TableByID(id int64) (val table.Table, ok bool) { if isTableVirtual(id) { + // Don't store the virtual table in the tableCache, because when cache missing + // we can't refill it from tikv. + // TODO: returns the correct result. return nil, false } // Get from the cache. - key := cacheKey{id, is.schemaVersion} - tbl, found := is.cache.Get(key) + key := tableCacheKey{id, is.schemaVersion} + tbl, found := is.tableCache.Get(key) if found && tbl != nil { return tbl, true } - eq := func(a, b *Item) bool { return a.tableID == b.tableID } - itm, ok := search(is.byID, is.schemaVersion, Item{tableID: id, dbID: math.MaxInt64}, eq) + eq := func(a, b *tableItem) bool { return a.tableID == b.tableID } + itm, ok := search(is.byID, is.schemaVersion, tableItem{tableID: id, dbID: math.MaxInt64}, eq) if !ok { + // TODO: in the future, this may happen and we need to check tikv to see whether table exists. return nil, false } // Maybe the table is evicted? need to reload. ret, err := loadTableInfo(is.r, is.Data, id, itm.dbID, is.ts) if err == nil { - is.cache.Set(key, ret) + is.tableCache.Set(key, ret) return ret, true } return nil, false } +func isSpecialDB(dbName string) bool { + return dbName == util.InformationSchemaName.L || + dbName == util.PerformanceSchemaName.L || + dbName == util.MetricSchemaName.L +} + func (is *infoschemaV2) TableByName(schema, tbl model.CIStr) (t table.Table, err error) { - if schema.L == "information_schema" || schema.L == "metrics_schema" || schema.L == "performance_schema" { + if isSpecialDB(schema.L) { if tbNames, ok := is.specials[schema.L]; ok { if t, ok = tbNames.tables[tbl.L]; ok { return @@ -275,16 +288,16 @@ func (is *infoschemaV2) TableByName(schema, tbl model.CIStr) (t table.Table, err return nil, ErrTableNotExists.GenWithStackByArgs(schema, tbl) } - eq := func(a, b *Item) bool { return a.dbName == b.dbName && a.tableName == b.tableName } - itm, ok := search(is.name2id, is.schemaVersion, Item{dbName: schema.L, tableName: tbl.L, tableID: math.MaxInt64}, eq) + eq := func(a, b *tableItem) bool { return a.dbName == b.dbName && a.tableName == b.tableName } + itm, ok := search(is.byName, is.schemaVersion, tableItem{dbName: schema.L, tableName: tbl.L, tableID: math.MaxInt64}, eq) if !ok { // TODO: in the future, this may happen and we need to check tikv to see whether table exists. return nil, ErrTableNotExists.GenWithStackByArgs(schema, tbl) } // Get from the cache. - key := cacheKey{itm.tableID, is.schemaVersion} - res, found := is.cache.Get(key) + key := tableCacheKey{itm.tableID, is.schemaVersion} + res, found := is.tableCache.Get(key) if found && res != nil { return res, nil } @@ -294,7 +307,7 @@ func (is *infoschemaV2) TableByName(schema, tbl model.CIStr) (t table.Table, err if err != nil { return nil, errors.Trace(err) } - is.cache.Set(key, ret) + is.tableCache.Set(key, ret) return ret, nil } @@ -380,6 +393,7 @@ func (is *infoschemaV2) SchemaTables(schema model.CIStr) (tables []table.Table) if meta.ErrDBNotExists.Equal(err) { return nil } + // TODO: error could happen, so do not panic! panic(err) } tables = make([]table.Table, 0, len(tblInfos)) @@ -396,7 +410,7 @@ func (is *infoschemaV2) SchemaTables(schema model.CIStr) (tables []table.Table) func loadTableInfo(r autoid.Requirement, infoData *Data, tblID, dbID int64, ts uint64) (table.Table, error) { // Try to avoid repeated concurrency loading. - res, err, _ := sf.Do(fmt.Sprintf("%d-%d", dbID, tblID), func() (ret any, err error) { + res, err, _ := loadTableSF.Do(fmt.Sprintf("%d-%d", dbID, tblID), func() (ret any, err error) { snapshot := r.Store().GetSnapshot(kv.NewVersion(ts)) // Using the KV timeout read feature to address the issue of potential DDL lease expiration when // the meta region leader is slow. @@ -416,16 +430,19 @@ func loadTableInfo(r autoid.Requirement, infoData *Data, tblID, dbID int64, ts u ConvertCharsetCollateToLowerCaseIfNeed(tblInfo) ConvertOldVersionUTF8ToUTF8MB4IfNeed(tblInfo) allocs := autoid.NewAllocatorsFromTblInfo(r, dbID, tblInfo) - b := NewBuilder(r, nil, infoData) // TODO: handle cached table!!! - ret, err := b.tableFromMeta(allocs, tblInfo) + // TODO: handle cached table!!! + ret, err := tables.TableFromMeta(allocs, tblInfo) if err != nil { return nil, errors.Trace(err) } return ret, nil } -var sf = &singleflight.Group{} +var loadTableSF = &singleflight.Group{} func isTableVirtual(id int64) bool { - return (id & (1 << 62)) > 0 + // some kind of magic number... + // we use special ids for tables in INFORMATION_SCHEMA/PERFORMANCE_SCHEMA/METRICS_SCHEMA + // See meta/autoid/autoid.go for those definitions. + return (id & autoid.SystemSchemaIDFlag) > 0 } From 64e79696ba0380186c4cd17364a8bf4affe43bdd Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 23 Feb 2024 12:40:27 +0800 Subject: [PATCH 14/14] address comment --- pkg/infoschema/infoschema_v2.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/infoschema/infoschema_v2.go b/pkg/infoschema/infoschema_v2.go index f2fa11672d29c..fb30fa902ef56 100644 --- a/pkg/infoschema/infoschema_v2.go +++ b/pkg/infoschema/infoschema_v2.go @@ -264,7 +264,7 @@ func (is *infoschemaV2) TableByID(id int64) (val table.Table, ok bool) { } // Maybe the table is evicted? need to reload. - ret, err := loadTableInfo(is.r, is.Data, id, itm.dbID, is.ts) + ret, err := loadTableInfo(is.r, is.Data, id, itm.dbID, is.ts, is.schemaVersion) if err == nil { is.tableCache.Set(key, ret) return ret, true @@ -303,7 +303,7 @@ func (is *infoschemaV2) TableByName(schema, tbl model.CIStr) (t table.Table, err } // Maybe the table is evicted? need to reload. - ret, err := loadTableInfo(is.r, is.Data, itm.tableID, itm.dbID, is.ts) + ret, err := loadTableInfo(is.r, is.Data, itm.tableID, itm.dbID, is.ts, is.schemaVersion) if err != nil { return nil, errors.Trace(err) } @@ -408,9 +408,9 @@ func (is *infoschemaV2) SchemaTables(schema model.CIStr) (tables []table.Table) return } -func loadTableInfo(r autoid.Requirement, infoData *Data, tblID, dbID int64, ts uint64) (table.Table, error) { +func loadTableInfo(r autoid.Requirement, infoData *Data, tblID, dbID int64, ts uint64, schemaVersion int64) (table.Table, error) { // Try to avoid repeated concurrency loading. - res, err, _ := loadTableSF.Do(fmt.Sprintf("%d-%d", dbID, tblID), func() (ret any, err error) { + res, err, _ := loadTableSF.Do(fmt.Sprintf("%d-%d-%d", dbID, tblID, schemaVersion), func() (ret any, err error) { snapshot := r.Store().GetSnapshot(kv.NewVersion(ts)) // Using the KV timeout read feature to address the issue of potential DDL lease expiration when // the meta region leader is slow.