Skip to content

Commit

Permalink
Merge branch 'master' into issue40351
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Jan 6, 2023
2 parents a81453e + e1a2b58 commit 689e921
Show file tree
Hide file tree
Showing 15 changed files with 681 additions and 181 deletions.
36 changes: 36 additions & 0 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,3 +985,39 @@ func TestIssue39080(t *testing.T) {
" UNIQUE KEY `authorIdx` (`authorId`)\n"+
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
}

func TestWriteDataWriteOnlyMode(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)

tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2.MustExec("use test")
tk.MustExec("CREATE TABLE t (`col1` bigint(20) DEFAULT 1,`col2` float,UNIQUE KEY `key1` (`col1`))")

originalCallback := dom.DDL().GetHook()
defer dom.DDL().SetHook(originalCallback)

hook := &ddl.TestDDLCallback{Do: dom}
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.SchemaState != model.StateWriteOnly {
return
}
tk2.MustExec("insert ignore into t values (1, 2)")
tk2.MustExec("insert ignore into t values (2, 2)")
}
dom.DDL().SetHook(hook)
tk.MustExec("alter table t change column `col1` `col1` varchar(20)")

hook = &ddl.TestDDLCallback{Do: dom}
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.SchemaState != model.StateWriteOnly {
return
}
tk2.MustExec("insert ignore into t values (1)")
tk2.MustExec("insert ignore into t values (2)")
}
dom.DDL().SetHook(hook)
tk.MustExec("alter table t drop column `col1`")
dom.DDL().SetHook(originalCallback)
}
39 changes: 28 additions & 11 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,33 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D
}
}

// addChangingColTimes is used to fetch values while processing "modify/change column" operation.
addChangingColTimes := 0
// extraColumns is used to fetch values while processing "add/drop/modify/change column" operation.
extraColumns := 0
for _, col := range t.WritableCols() {
// if there is a changing column, append the dependency column for index fetch values
if col.ChangeStateInfo != nil && col.State != model.StatePublic {
value, err := table.CastValue(ctx, row[col.DependencyColumnOffset], col.ColumnInfo, false, false)
if err != nil {
return nil, err
}
row = append(row, value)
extraColumns++
continue
}

if col.State != model.StatePublic {
// only append origin default value for index fetch values
if col.Offset >= len(row) {
value, err := table.GetColOriginDefaultValue(ctx, col.ToInfo())
if err != nil {
return nil, err
}

row = append(row, value)
extraColumns++
}
}
}
// append unique keys and errors
for _, v := range t.Indices() {
if !tables.IsIndexWritable(v) {
Expand All @@ -159,12 +184,6 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D
if t.Meta().IsCommonHandle && v.Meta().Primary {
continue
}
if len(row) < len(t.WritableCols()) && addChangingColTimes == 0 {
if col := tables.FindChangingCol(t.WritableCols(), v.Meta()); col != nil {
row = append(row, row[col.DependencyColumnOffset])
addChangingColTimes++
}
}
colVals, err1 := v.FetchValues(row, nil)
if err1 != nil {
return nil, err1
Expand Down Expand Up @@ -194,9 +213,7 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D
commonHandle: t.Meta().IsCommonHandle,
})
}
if addChangingColTimes == 1 {
row = row[:len(row)-1]
}
row = row[:len(row)-extraColumns]
result = append(result, toBeCheckedRow{
row: row,
handleKey: handleKey,
Expand Down
128 changes: 4 additions & 124 deletions executor/memtable_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/infoschema"
Expand All @@ -47,7 +46,6 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/set"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -177,7 +175,7 @@ func fetchClusterConfig(sctx sessionctx.Context, nodeTypes, nodeAddrs set.String
if err != nil {
return nil, err
}
serversInfo = filterClusterServerInfo(serversInfo, nodeTypes, nodeAddrs)
serversInfo = infoschema.FilterClusterServerInfo(serversInfo, nodeTypes, nodeAddrs)
//nolint: prealloc
var finalRows [][]types.Datum
wg := sync.WaitGroup{}
Expand Down Expand Up @@ -310,108 +308,12 @@ func (e *clusterServerInfoRetriever) retrieve(ctx context.Context, sctx sessionc
return nil, nil
}
e.retrieved = true

serversInfo, err := infoschema.GetClusterServerInfo(sctx)
if err != nil {
return nil, err
}
serversInfo = filterClusterServerInfo(serversInfo, e.extractor.NodeTypes, e.extractor.Instances)

type result struct {
idx int
rows [][]types.Datum
err error
}
wg := sync.WaitGroup{}
ch := make(chan result, len(serversInfo))
infoTp := e.serverInfoType
finalRows := make([][]types.Datum, 0, len(serversInfo)*10)
for i, srv := range serversInfo {
address := srv.Address
remote := address
if srv.ServerType == "tidb" {
remote = srv.StatusAddr
}
wg.Add(1)
go func(index int, remote, address, serverTP string) {
util.WithRecovery(func() {
defer wg.Done()
items, err := getServerInfoByGRPC(ctx, remote, infoTp)
if err != nil {
ch <- result{idx: index, err: err}
return
}
partRows := serverInfoItemToRows(items, serverTP, address)
ch <- result{idx: index, rows: partRows}
}, nil)
}(i, remote, address, srv.ServerType)
}
wg.Wait()
close(ch)
// Keep the original order to make the result more stable
var results []result //nolint: prealloc
for result := range ch {
if result.err != nil {
sctx.GetSessionVars().StmtCtx.AppendWarning(result.err)
continue
}
results = append(results, result)
}
slices.SortFunc(results, func(i, j result) bool { return i.idx < j.idx })
for _, result := range results {
finalRows = append(finalRows, result.rows...)
}
return finalRows, nil
}

func serverInfoItemToRows(items []*diagnosticspb.ServerInfoItem, tp, addr string) [][]types.Datum {
rows := make([][]types.Datum, 0, len(items))
for _, v := range items {
for _, item := range v.Pairs {
row := types.MakeDatums(
tp,
addr,
v.Tp,
v.Name,
item.Key,
item.Value,
)
rows = append(rows, row)
}
}
return rows
}

func getServerInfoByGRPC(ctx context.Context, address string, tp diagnosticspb.ServerInfoType) ([]*diagnosticspb.ServerInfoItem, error) {
opt := grpc.WithInsecure()
security := config.GetGlobalConfig().Security
if len(security.ClusterSSLCA) != 0 {
clusterSecurity := security.ClusterSecurity()
tlsConfig, err := clusterSecurity.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
}
opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}
conn, err := grpc.Dial(address, opt)
if err != nil {
return nil, err
}
defer func() {
err := conn.Close()
if err != nil {
log.Error("close grpc connection error", zap.Error(err))
}
}()

cli := diagnosticspb.NewDiagnosticsClient(conn)
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
r, err := cli.ServerInfo(ctx, &diagnosticspb.ServerInfoRequest{Tp: tp})
if err != nil {
return nil, err
}
return r.Items, nil
serversInfo = infoschema.FilterClusterServerInfo(serversInfo, e.extractor.NodeTypes, e.extractor.Instances)
return infoschema.FetchClusterServerInfoWithoutPrivilegeCheck(ctx, sctx, serversInfo, e.serverInfoType, true)
}

func parseFailpointServerInfo(s string) []infoschema.ServerInfo {
Expand All @@ -428,28 +330,6 @@ func parseFailpointServerInfo(s string) []infoschema.ServerInfo {
return serversInfo
}

func filterClusterServerInfo(serversInfo []infoschema.ServerInfo, nodeTypes, addresses set.StringSet) []infoschema.ServerInfo {
if len(nodeTypes) == 0 && len(addresses) == 0 {
return serversInfo
}

filterServers := make([]infoschema.ServerInfo, 0, len(serversInfo))
for _, srv := range serversInfo {
// Skip some node type which has been filtered in WHERE clause
// e.g: SELECT * FROM cluster_config WHERE type='tikv'
if len(nodeTypes) > 0 && !nodeTypes.Exist(srv.ServerType) {
continue
}
// Skip some node address which has been filtered in WHERE clause
// e.g: SELECT * FROM cluster_config WHERE address='192.16.8.12:2379'
if len(addresses) > 0 && !addresses.Exist(srv.Address) {
continue
}
filterServers = append(filterServers, srv)
}
return filterServers
}

type clusterLogRetriever struct {
isDrained bool
retrieving bool
Expand Down Expand Up @@ -515,7 +395,7 @@ func (e *clusterLogRetriever) initialize(ctx context.Context, sctx sessionctx.Co

instances := e.extractor.Instances
nodeTypes := e.extractor.NodeTypes
serversInfo = filterClusterServerInfo(serversInfo, nodeTypes, instances)
serversInfo = infoschema.FilterClusterServerInfo(serversInfo, nodeTypes, instances)

var levels = make([]diagnosticspb.LogLevel, 0, len(e.extractor.LogLevels))
for l := range e.extractor.LogLevels {
Expand Down
2 changes: 1 addition & 1 deletion executor/set_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (s *SetConfigExec) Next(ctx context.Context, req *chunk.Chunk) error {
if s.p.Instance != "" {
nodeAddrs.Insert(s.p.Instance)
}
serversInfo = filterClusterServerInfo(serversInfo, nodeTypes, nodeAddrs)
serversInfo = infoschema.FilterClusterServerInfo(serversInfo, nodeTypes, nodeAddrs)
if s.p.Instance != "" && len(serversInfo) == 0 {
return errors.Errorf("instance %v is not found in this cluster", s.p.Instance)
}
Expand Down
4 changes: 4 additions & 0 deletions infoschema/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ go_library(
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/diagnosticspb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//tikv",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//credentials",
"@org_golang_x_exp//slices",
"@org_uber_go_zap//:zap",
],
Expand Down
Loading

0 comments on commit 689e921

Please sign in to comment.