Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

OperateSchema: init workerReq with source (#1104) #1106

Merged
merged 1 commit into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1717,18 +1717,6 @@ func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateSchemaRequest
}, nil
}

workerReq := workerrpc.Request{
Type: workerrpc.CmdOperateSchema,
OperateSchema: &pb.OperateWorkerSchemaRequest{
Op: req.Op,
Task: req.Task,
Source: "", // set below.
Database: req.Database,
Table: req.Table,
Schema: req.Schema,
},
}

workerRespCh := make(chan *pb.CommonWorkerResponse, len(req.Sources))
var wg sync.WaitGroup
for _, source := range req.Sources {
Expand All @@ -1740,9 +1728,19 @@ func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateSchemaRequest
workerRespCh <- errorCommonWorkerResponse(fmt.Sprintf("source %s relevant worker-client not found", source), source, "")
return
}
workerReq2 := workerReq
workerReq2.OperateSchema.Source = source
resp, err := worker.SendRequest(ctx, &workerReq2, s.cfg.RPCTimeout)
workerReq := workerrpc.Request{
Type: workerrpc.CmdOperateSchema,
OperateSchema: &pb.OperateWorkerSchemaRequest{
Op: req.Op,
Task: req.Task,
Source: source,
Database: req.Database,
Table: req.Table,
Schema: req.Schema,
},
}

resp, err := worker.SendRequest(ctx, &workerReq, s.cfg.RPCTimeout)
workerResp := &pb.CommonWorkerResponse{}
if err != nil {
workerResp = errorCommonWorkerResponse(err.Error(), source, worker.BaseInfo().Name)
Expand Down
4 changes: 4 additions & 0 deletions tests/sequence_sharding_optimistic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ run() {
curl -X PUT ${API_URL} -d '{"op":1, "task":"sequence_sharding_optimistic", "sources": ["mysql-replica-01"], "database":"sharding_seq_opt", "table":"t1"}' > ${WORK_DIR}/get_schema.log
check_log_contains ${WORK_DIR}/get_schema.log "Table 'sharding_seq_opt.t1' doesn't exist" 1

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"operate-schema get -s mysql-replica-01,mysql-replica-02 sequence_sharding_optimistic -d sharding_seq_opt -t t2" \
"\"result\": true" 3

# try to set another schema, `c3` `int` -> `bigint`.
echo 'CREATE TABLE `t1` ( `id` bigint(20) NOT NULL, `c2` varchar(20) DEFAULT NULL, `c3` bigint(11) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin' > ${WORK_DIR}/schema.sql
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
Expand Down