From 7c40462132507db1b6bea7ffe04b539d6eb74ca5 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 23 Feb 2023 16:46:23 +0530 Subject: [PATCH 1/2] GetSchema rpc to streaming api Signed-off-by: Harshit Gangal --- go/vt/proto/queryservice/queryservice.pb.go | 12 +-- .../queryservice/queryservice_grpc.pb.go | 81 ++++++++++++------- go/vt/vtcombo/tablet_map.go | 6 +- go/vt/vtgate/schema/tracker.go | 40 +++++---- go/vt/vttablet/endtoend/framework/client.go | 10 ++- go/vt/vttablet/grpcqueryservice/server.go | 12 +-- go/vt/vttablet/grpctabletconn/conn.go | 42 +++++++--- go/vt/vttablet/queryservice/queryservice.go | 2 +- go/vt/vttablet/queryservice/wrapped.go | 7 +- go/vt/vttablet/sandboxconn/sandboxconn.go | 12 +-- .../tabletconntest/fakequeryservice.go | 2 +- go/vt/vttablet/tabletserver/query_executor.go | 37 ++++----- go/vt/vttablet/tabletserver/tabletserver.go | 5 +- proto/queryservice.proto | 4 +- 14 files changed, 158 insertions(+), 114 deletions(-) diff --git a/go/vt/proto/queryservice/queryservice.pb.go b/go/vt/proto/queryservice/queryservice.pb.go index 211030f1cee..3d72458cb27 100644 --- a/go/vt/proto/queryservice/queryservice.pb.go +++ b/go/vt/proto/queryservice/queryservice.pb.go @@ -45,7 +45,7 @@ var file_queryservice_proto_rawDesc = []byte{ 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0c, 0x71, 0x75, 0x65, 0x72, 0x79, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x1a, 0x0b, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x10, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x32, 0xd0, 0x10, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x3a, 0x0a, 0x07, 0x45, + 0x6f, 0x32, 0xd2, 0x10, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x3a, 0x0a, 0x07, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x12, 0x15, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, @@ -174,14 +174,14 @@ var file_queryservice_proto_rawDesc = []byte{ 0x6c, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, - 0x30, 0x01, 0x12, 0x40, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, + 0x30, 0x01, 0x12, 0x42, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, 0x17, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x42, 0x2b, 0x5a, 0x29, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, - 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x71, 0x75, 0x65, 0x72, 0x79, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, - 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x2b, 0x5a, 0x29, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, + 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x71, 0x75, 0x65, 0x72, 0x79, 0x73, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var file_queryservice_proto_goTypes = []interface{}{ diff --git a/go/vt/proto/queryservice/queryservice_grpc.pb.go b/go/vt/proto/queryservice/queryservice_grpc.pb.go index e40c8110f06..f9d596351e2 100644 --- a/go/vt/proto/queryservice/queryservice_grpc.pb.go +++ b/go/vt/proto/queryservice/queryservice_grpc.pb.go @@ -82,7 +82,7 @@ type QueryClient interface { // VStreamResults streams results along with the gtid of the snapshot. VStreamResults(ctx context.Context, in *binlogdata.VStreamResultsRequest, opts ...grpc.CallOption) (Query_VStreamResultsClient, error) // GetSchema returns the schema information. - GetSchema(ctx context.Context, in *query.GetSchemaRequest, opts ...grpc.CallOption) (*query.GetSchemaResponse, error) + GetSchema(ctx context.Context, in *query.GetSchemaRequest, opts ...grpc.CallOption) (Query_GetSchemaClient, error) } type queryClient struct { @@ -534,13 +534,36 @@ func (x *queryVStreamResultsClient) Recv() (*binlogdata.VStreamResultsResponse, return m, nil } -func (c *queryClient) GetSchema(ctx context.Context, in *query.GetSchemaRequest, opts ...grpc.CallOption) (*query.GetSchemaResponse, error) { - out := new(query.GetSchemaResponse) - err := c.cc.Invoke(ctx, "/queryservice.Query/GetSchema", in, out, opts...) +func (c *queryClient) GetSchema(ctx context.Context, in *query.GetSchemaRequest, opts ...grpc.CallOption) (Query_GetSchemaClient, error) { + stream, err := c.cc.NewStream(ctx, &Query_ServiceDesc.Streams[9], "/queryservice.Query/GetSchema", opts...) if err != nil { return nil, err } - return out, nil + x := &queryGetSchemaClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Query_GetSchemaClient interface { + Recv() (*query.GetSchemaResponse, error) + grpc.ClientStream +} + +type queryGetSchemaClient struct { + grpc.ClientStream +} + +func (x *queryGetSchemaClient) Recv() (*query.GetSchemaResponse, error) { + m := new(query.GetSchemaResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil } // QueryServer is the server API for Query service. @@ -605,7 +628,7 @@ type QueryServer interface { // VStreamResults streams results along with the gtid of the snapshot. VStreamResults(*binlogdata.VStreamResultsRequest, Query_VStreamResultsServer) error // GetSchema returns the schema information. - GetSchema(context.Context, *query.GetSchemaRequest) (*query.GetSchemaResponse, error) + GetSchema(*query.GetSchemaRequest, Query_GetSchemaServer) error mustEmbedUnimplementedQueryServer() } @@ -691,8 +714,8 @@ func (UnimplementedQueryServer) VStreamRows(*binlogdata.VStreamRowsRequest, Quer func (UnimplementedQueryServer) VStreamResults(*binlogdata.VStreamResultsRequest, Query_VStreamResultsServer) error { return status.Errorf(codes.Unimplemented, "method VStreamResults not implemented") } -func (UnimplementedQueryServer) GetSchema(context.Context, *query.GetSchemaRequest) (*query.GetSchemaResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetSchema not implemented") +func (UnimplementedQueryServer) GetSchema(*query.GetSchemaRequest, Query_GetSchemaServer) error { + return status.Errorf(codes.Unimplemented, "method GetSchema not implemented") } func (UnimplementedQueryServer) mustEmbedUnimplementedQueryServer() {} @@ -1202,22 +1225,25 @@ func (x *queryVStreamResultsServer) Send(m *binlogdata.VStreamResultsResponse) e return x.ServerStream.SendMsg(m) } -func _Query_GetSchema_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(query.GetSchemaRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(QueryServer).GetSchema(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/queryservice.Query/GetSchema", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(QueryServer).GetSchema(ctx, req.(*query.GetSchemaRequest)) +func _Query_GetSchema_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(query.GetSchemaRequest) + if err := stream.RecvMsg(m); err != nil { + return err } - return interceptor(ctx, in, info, handler) + return srv.(QueryServer).GetSchema(m, &queryGetSchemaServer{stream}) +} + +type Query_GetSchemaServer interface { + Send(*query.GetSchemaResponse) error + grpc.ServerStream +} + +type queryGetSchemaServer struct { + grpc.ServerStream +} + +func (x *queryGetSchemaServer) Send(m *query.GetSchemaResponse) error { + return x.ServerStream.SendMsg(m) } // Query_ServiceDesc is the grpc.ServiceDesc for Query service. @@ -1295,10 +1321,6 @@ var Query_ServiceDesc = grpc.ServiceDesc{ MethodName: "Release", Handler: _Query_Release_Handler, }, - { - MethodName: "GetSchema", - Handler: _Query_GetSchema_Handler, - }, }, Streams: []grpc.StreamDesc{ { @@ -1346,6 +1368,11 @@ var Query_ServiceDesc = grpc.ServiceDesc{ Handler: _Query_VStreamResults_Handler, ServerStreams: true, }, + { + StreamName: "GetSchema", + Handler: _Query_GetSchema_Handler, + ServerStreams: true, + }, }, Metadata: "queryservice.proto", } diff --git a/go/vt/vtcombo/tablet_map.go b/go/vt/vtcombo/tablet_map.go index b68dd4ccc49..33548e1e1d3 100644 --- a/go/vt/vtcombo/tablet_map.go +++ b/go/vt/vtcombo/tablet_map.go @@ -662,9 +662,9 @@ func (itc *internalTabletConn) Release(ctx context.Context, target *querypb.Targ } // GetSchema is part of the QueryService interface. -func (itc *internalTabletConn) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string) (map[string]string, error) { - response, err := itc.tablet.qsc.QueryService().GetSchema(ctx, target, tableType, tableNames) - return response, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err)) +func (itc *internalTabletConn) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) error { + err := itc.tablet.qsc.QueryService().GetSchema(ctx, target, tableType, tableNames, callback) + return tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err)) } // Close is part of queryservice.QueryService diff --git a/go/vt/vtgate/schema/tracker.go b/go/vt/vtgate/schema/tracker.go index 98f1211e39b..caff0c3b07b 100644 --- a/go/vt/vtgate/schema/tracker.go +++ b/go/vt/vtgate/schema/tracker.go @@ -134,21 +134,18 @@ func (t *Tracker) loadViews(conn queryservice.QueryService, target *querypb.Targ return nil } - fvRes, err := conn.GetSchema(t.ctx, target, querypb.SchemaTableType_VIEWS, nil) - if err != nil { - return err - } - t.mu.Lock() defer t.mu.Unlock() // We must clear out any previous view definition before loading it here as this is called - // whenever a shard's primary tablet starts and sends the initial signal. Without - // clearing out the previous view definition removes any dropped views. + // whenever a shard's primary tablet starts and sends the initial signal. + // This is needed clear out any stale view definitions. t.clearKeyspaceViews(target.Keyspace) - t.updateViews(target.Keyspace, fvRes) - log.Infof("finished loading views for keyspace %s. Found %d views", target.Keyspace, len(fvRes)) - return nil + return conn.GetSchema(t.ctx, target, querypb.SchemaTableType_VIEWS, nil, func(schemaRes *querypb.GetSchemaResponse) error { + t.updateViews(target.Keyspace, schemaRes.TableDefinition) + log.Infof("finished loading views for keyspace %s. Found %d views", target.Keyspace, len(schemaRes.TableDefinition)) + return nil + }) } // Start starts the schema tracking. @@ -299,26 +296,27 @@ func (t *Tracker) updateTables(keyspace string, res *sqltypes.Result) { } func (t *Tracker) updatedViewSchema(th *discovery.TabletHealth) bool { - viewsUpdated := th.Stats.ViewSchemaChanged - res, err := th.Conn.GetSchema(t.ctx, th.Target, querypb.SchemaTableType_VIEWS, viewsUpdated) - if err != nil { - t.tracked[th.Target.Keyspace].setLoaded(false) - // TODO: optimize for the views that got errored out. - log.Warningf("error fetching new views definition for %v", viewsUpdated, err) - return false - } - t.mu.Lock() defer t.mu.Unlock() + viewsUpdated := th.Stats.ViewSchemaChanged + // first we empty all prior schema. deleted tables will not show up in the result, // so this is the only chance to delete for _, view := range viewsUpdated { t.views.delete(th.Target.Keyspace, view) } - t.updateViews(th.Target.Keyspace, res) + err := th.Conn.GetSchema(t.ctx, th.Target, querypb.SchemaTableType_VIEWS, viewsUpdated, func(schemaRes *querypb.GetSchemaResponse) error { + t.updateViews(th.Target.Keyspace, schemaRes.TableDefinition) + return nil + }) + if err != nil { + t.tracked[th.Target.Keyspace].setLoaded(false) + // TODO: optimize for the views that got errored out. + log.Warningf("error fetching new views definition for %v", viewsUpdated, err) + return false + } return true - } func (t *Tracker) updateViews(keyspace string, res map[string]string) { diff --git a/go/vt/vttablet/endtoend/framework/client.go b/go/vt/vttablet/endtoend/framework/client.go index 38967cab202..ea72d2ee89d 100644 --- a/go/vt/vttablet/endtoend/framework/client.go +++ b/go/vt/vttablet/endtoend/framework/client.go @@ -410,5 +410,13 @@ func (client *QueryClient) UpdateContext(ctx context.Context) { } func (client *QueryClient) GetSchema(tableType querypb.SchemaTableType, tableNames ...string) (map[string]string, error) { - return client.server.GetSchema(client.ctx, client.target, tableType, tableNames) + schemaDef := map[string]string{} + err := client.server.GetSchema(client.ctx, client.target, tableType, tableNames, func(schemaRes *querypb.GetSchemaResponse) error { + schemaDef = schemaRes.TableDefinition + return nil + }) + if err != nil { + return nil, err + } + return schemaDef, nil } diff --git a/go/vt/vttablet/grpcqueryservice/server.go b/go/vt/vttablet/grpcqueryservice/server.go index 69ebcbf39b9..e23acba630f 100644 --- a/go/vt/vttablet/grpcqueryservice/server.go +++ b/go/vt/vttablet/grpcqueryservice/server.go @@ -484,16 +484,10 @@ func (q *query) Release(ctx context.Context, request *querypb.ReleaseRequest) (r } // GetSchema implements the QueryServer interface -func (q *query) GetSchema(ctx context.Context, request *querypb.GetSchemaRequest) (response *querypb.GetSchemaResponse, err error) { +func (q *query) GetSchema(request *querypb.GetSchemaRequest, stream queryservicepb.Query_GetSchemaServer) (err error) { defer q.server.HandlePanic(&err) - var resp map[string]string - resp, err = q.server.GetSchema(ctx, request.Target, request.TableType, request.TableNames) - if err != nil { - return nil, vterrors.ToGRPC(err) - } - return &querypb.GetSchemaResponse{ - TableDefinition: resp, - }, nil + err = q.server.GetSchema(stream.Context(), request.Target, request.TableType, request.TableNames, stream.Send) + return vterrors.ToGRPC(err) } // Register registers the implementation on the provide gRPC Server. diff --git a/go/vt/vttablet/grpctabletconn/conn.go b/go/vt/vttablet/grpctabletconn/conn.go index 4a945d1a1af..7ef533f580d 100644 --- a/go/vt/vttablet/grpctabletconn/conn.go +++ b/go/vt/vttablet/grpctabletconn/conn.go @@ -1019,23 +1019,45 @@ func (conn *gRPCQueryClient) Release(ctx context.Context, target *querypb.Target } // GetSchema implements the queryservice interface -func (conn *gRPCQueryClient) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string) (map[string]string, error) { +func (conn *gRPCQueryClient) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) error { conn.mu.RLock() defer conn.mu.RUnlock() if conn.cc == nil { - return nil, tabletconn.ConnClosed + return tabletconn.ConnClosed } - req := &querypb.GetSchemaRequest{ - Target: target, - TableType: tableType, - TableNames: tableNames, - } - reply, err := conn.c.GetSchema(ctx, req) + stream, err := func() (queryservicepb.Query_GetSchemaClient, error) { + conn.mu.RLock() + defer conn.mu.RUnlock() + if conn.cc == nil { + return nil, tabletconn.ConnClosed + } + + stream, err := conn.c.GetSchema(ctx, &querypb.GetSchemaRequest{ + Target: target, + TableType: tableType, + TableNames: tableNames, + }) + if err != nil { + return nil, tabletconn.ErrorFromGRPC(err) + } + return stream, nil + }() if err != nil { - return nil, tabletconn.ErrorFromGRPC(err) + return err + } + for { + shr, err := stream.Recv() + if err != nil { + return tabletconn.ErrorFromGRPC(err) + } + if err := callback(shr); err != nil { + if err == nil || err == io.EOF { + return nil + } + return err + } } - return reply.TableDefinition, nil } // Close closes underlying gRPC channel. diff --git a/go/vt/vttablet/queryservice/queryservice.go b/go/vt/vttablet/queryservice/queryservice.go index b72ae12d653..1b9bdac13e9 100644 --- a/go/vt/vttablet/queryservice/queryservice.go +++ b/go/vt/vttablet/queryservice/queryservice.go @@ -118,7 +118,7 @@ type QueryService interface { Release(ctx context.Context, target *querypb.Target, transactionID, reservedID int64) error // GetSchema returns the table definition for the specified tables. - GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string) (map[string]string, error) + GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) error // Close must be called for releasing resources. Close(ctx context.Context) error diff --git a/go/vt/vttablet/queryservice/wrapped.go b/go/vt/vttablet/queryservice/wrapped.go index 97c4732da44..376c228b02a 100644 --- a/go/vt/vttablet/queryservice/wrapped.go +++ b/go/vt/vttablet/queryservice/wrapped.go @@ -324,13 +324,12 @@ func (ws *wrappedService) Release(ctx context.Context, target *querypb.Target, t }) } -func (ws *wrappedService) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string) (schemaDef map[string]string, err error) { +func (ws *wrappedService) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) (err error) { err = ws.wrapper(ctx, target, ws.impl, "GetSchema", false, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) { - var innerErr error - schemaDef, innerErr = conn.GetSchema(ctx, target, tableType, tableNames) + innerErr := conn.GetSchema(ctx, target, tableType, tableNames, callback) return canRetry(ctx, innerErr), innerErr }) - return schemaDef, err + return err } func (ws *wrappedService) Close(ctx context.Context) error { diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index 822f6774fe9..a1ef83f7a3a 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -585,14 +585,14 @@ func (sbc *SandboxConn) Release(ctx context.Context, target *querypb.Target, tra } // GetSchema implements the QueryService interface -func (sbc *SandboxConn) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string) (map[string]string, error) { +func (sbc *SandboxConn) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) error { sbc.GetSchemaCount.Add(1) - var resp map[string]string - if len(sbc.getSchemaResult) > 0 { - resp = sbc.getSchemaResult[0] - sbc.getSchemaResult = sbc.getSchemaResult[1:] + if len(sbc.getSchemaResult) == 0 { + return nil } - return resp, nil + resp := sbc.getSchemaResult[0] + sbc.getSchemaResult = sbc.getSchemaResult[1:] + return callback(&querypb.GetSchemaResponse{TableDefinition: resp}) } // Close does not change ExecCount diff --git a/go/vt/vttablet/tabletconntest/fakequeryservice.go b/go/vt/vttablet/tabletconntest/fakequeryservice.go index e78194fd5db..8bfb40bceee 100644 --- a/go/vt/vttablet/tabletconntest/fakequeryservice.go +++ b/go/vt/vttablet/tabletconntest/fakequeryservice.go @@ -750,7 +750,7 @@ func (f *FakeQueryService) Release(ctx context.Context, target *querypb.Target, } // GetSchema implements the QueryService interface -func (f *FakeQueryService) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string) (map[string]string, error) { +func (f *FakeQueryService) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) error { panic("implement me") } diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index ee02901993d..5a92154e3d5 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -1218,15 +1218,15 @@ func generateBindVarsForViewDDLInsert(createView *sqlparser.CreateView) map[stri return bindVars } -func (qre *QueryExecutor) GetSchemaDefinitions(tableType querypb.SchemaTableType, tableNames []string) (map[string]string, error) { +func (qre *QueryExecutor) GetSchemaDefinitions(tableType querypb.SchemaTableType, tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) error { switch tableType { case querypb.SchemaTableType_VIEWS: - return qre.getViewDefinitions(tableNames) + return qre.getViewDefinitions(tableNames, callback) } - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid table type %v", tableType) + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid table type %v", tableType) } -func (qre *QueryExecutor) getViewDefinitions(viewNames []string) (map[string]string, error) { +func (qre *QueryExecutor) getViewDefinitions(viewNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) error { query := mysql.FetchViews var bindVars map[string]*querypb.BindVariable if len(viewNames) > 0 { @@ -1235,36 +1235,33 @@ func (qre *QueryExecutor) getViewDefinitions(viewNames []string) (map[string]str "viewnames": sqltypes.StringBindVariable(strings.Join(viewNames, ",")), } } - res, err := qre.execQuery(query, bindVars) - if err != nil { - return nil, err - } - - schemaDef := make(map[string]string) - for _, row := range res.Rows { - schemaDef[row[0].ToString()] = row[1].ToString() - } - return schemaDef, nil + return qre.execQuery(query, bindVars, func(result *sqltypes.Result) error { + schemaDef := make(map[string]string) + for _, row := range result.Rows { + schemaDef[row[0].ToString()] = row[1].ToString() + } + return callback(&querypb.GetSchemaResponse{TableDefinition: schemaDef}) + }) } -func (qre *QueryExecutor) execQuery(query string, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { +func (qre *QueryExecutor) execQuery(query string, bindVars map[string]*querypb.BindVariable, callback func(result *sqltypes.Result) error) error { sql := query if len(bindVars) > 0 { stmt, err := sqlparser.Parse(query) if err != nil { - return nil, err + return err } sql, _, err = qre.generateFinalSQL(sqlparser.NewParsedQuery(stmt), bindVars) if err != nil { - return nil, err + return err } } - conn, err := qre.getConn() + conn, err := qre.getStreamConn() if err != nil { - return nil, err + return err } defer conn.Recycle() - return qre.execDBConn(conn, sql, true) + return qre.execStreamSQL(conn, false /* isTransaction */, sql, callback) } diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index c9c5146783c..090098aded6 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1396,7 +1396,7 @@ func txToReserveState(state queryservice.TransactionState) queryservice.Reserved } // GetSchema returns table definitions for the specified tables. -func (tsv *TabletServer) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string) (schemaDef map[string]string, err error) { +func (tsv *TabletServer) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) (err error) { err = tsv.execRequest( ctx, tsv.QueryTimeout.Get(), "GetSchema", "", nil, @@ -1409,8 +1409,7 @@ func (tsv *TabletServer) GetSchema(ctx context.Context, target *querypb.Target, logStats: logStats, tsv: tsv, } - schemaDef, err = qre.GetSchemaDefinitions(tableType, tableNames) - return err + return qre.GetSchemaDefinitions(tableType, tableNames, callback) }, ) return diff --git a/proto/queryservice.proto b/proto/queryservice.proto index 295fca961eb..eeed60aa50d 100644 --- a/proto/queryservice.proto +++ b/proto/queryservice.proto @@ -110,5 +110,5 @@ service Query { rpc VStreamResults(binlogdata.VStreamResultsRequest) returns (stream binlogdata.VStreamResultsResponse) {}; // GetSchema returns the schema information. - rpc GetSchema(query.GetSchemaRequest) returns (query.GetSchemaResponse) {}; -} + rpc GetSchema(query.GetSchemaRequest) returns (stream query.GetSchemaResponse) {}; +} \ No newline at end of file From fdf3a303586bb80773afab4a97b2be53006ab750 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 23 Feb 2023 21:24:16 +0530 Subject: [PATCH 2/2] addressed review comment Signed-off-by: Harshit Gangal --- go/vt/vtgate/schema/tracker.go | 10 ++++++++-- go/vt/vttablet/tabletserver/query_executor.go | 4 ++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/go/vt/vtgate/schema/tracker.go b/go/vt/vtgate/schema/tracker.go index caff0c3b07b..a75a583ae11 100644 --- a/go/vt/vtgate/schema/tracker.go +++ b/go/vt/vtgate/schema/tracker.go @@ -141,11 +141,17 @@ func (t *Tracker) loadViews(conn queryservice.QueryService, target *querypb.Targ // This is needed clear out any stale view definitions. t.clearKeyspaceViews(target.Keyspace) - return conn.GetSchema(t.ctx, target, querypb.SchemaTableType_VIEWS, nil, func(schemaRes *querypb.GetSchemaResponse) error { + var numViews int + err := conn.GetSchema(t.ctx, target, querypb.SchemaTableType_VIEWS, nil, func(schemaRes *querypb.GetSchemaResponse) error { t.updateViews(target.Keyspace, schemaRes.TableDefinition) - log.Infof("finished loading views for keyspace %s. Found %d views", target.Keyspace, len(schemaRes.TableDefinition)) + numViews += len(schemaRes.TableDefinition) return nil }) + if err != nil { + return err + } + log.Infof("finished loading views for keyspace %s. Found %d views", target.Keyspace, numViews) + return nil } // Start starts the schema tracking. diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 5a92154e3d5..cf26d7c5019 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -1235,7 +1235,7 @@ func (qre *QueryExecutor) getViewDefinitions(viewNames []string, callback func(s "viewnames": sqltypes.StringBindVariable(strings.Join(viewNames, ",")), } } - return qre.execQuery(query, bindVars, func(result *sqltypes.Result) error { + return qre.generateFinalQueryAndStreamExecute(query, bindVars, func(result *sqltypes.Result) error { schemaDef := make(map[string]string) for _, row := range result.Rows { schemaDef[row[0].ToString()] = row[1].ToString() @@ -1244,7 +1244,7 @@ func (qre *QueryExecutor) getViewDefinitions(viewNames []string, callback func(s }) } -func (qre *QueryExecutor) execQuery(query string, bindVars map[string]*querypb.BindVariable, callback func(result *sqltypes.Result) error) error { +func (qre *QueryExecutor) generateFinalQueryAndStreamExecute(query string, bindVars map[string]*querypb.BindVariable, callback func(result *sqltypes.Result) error) error { sql := query if len(bindVars) > 0 { stmt, err := sqlparser.Parse(query)