Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GetSchema rpc to streaming api #12447

Merged
merged 2 commits into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions go/vt/proto/queryservice/queryservice.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 54 additions & 27 deletions go/vt/proto/queryservice/queryservice_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,9 +662,9 @@ func (itc *internalTabletConn) Release(ctx context.Context, target *querypb.Targ
}

// GetSchema is part of the QueryService interface.
func (itc *internalTabletConn) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string) (map[string]string, error) {
response, err := itc.tablet.qsc.QueryService().GetSchema(ctx, target, tableType, tableNames)
return response, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
func (itc *internalTabletConn) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) error {
err := itc.tablet.qsc.QueryService().GetSchema(ctx, target, tableType, tableNames, callback)
return tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

// Close is part of queryservice.QueryService
Expand Down
44 changes: 24 additions & 20 deletions go/vt/vtgate/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,23 @@ func (t *Tracker) loadViews(conn queryservice.QueryService, target *querypb.Targ
return nil
}

fvRes, err := conn.GetSchema(t.ctx, target, querypb.SchemaTableType_VIEWS, nil)
if err != nil {
return err
}

t.mu.Lock()
defer t.mu.Unlock()
// We must clear out any previous view definition before loading it here as this is called
// whenever a shard's primary tablet starts and sends the initial signal. Without
// clearing out the previous view definition removes any dropped views.
// whenever a shard's primary tablet starts and sends the initial signal.
// This is needed clear out any stale view definitions.
t.clearKeyspaceViews(target.Keyspace)
t.updateViews(target.Keyspace, fvRes)
log.Infof("finished loading views for keyspace %s. Found %d views", target.Keyspace, len(fvRes))

var numViews int
err := conn.GetSchema(t.ctx, target, querypb.SchemaTableType_VIEWS, nil, func(schemaRes *querypb.GetSchemaResponse) error {
t.updateViews(target.Keyspace, schemaRes.TableDefinition)
numViews += len(schemaRes.TableDefinition)
return nil
})
if err != nil {
return err
}
log.Infof("finished loading views for keyspace %s. Found %d views", target.Keyspace, numViews)
return nil
}

Expand Down Expand Up @@ -299,26 +302,27 @@ func (t *Tracker) updateTables(keyspace string, res *sqltypes.Result) {
}

func (t *Tracker) updatedViewSchema(th *discovery.TabletHealth) bool {
viewsUpdated := th.Stats.ViewSchemaChanged
res, err := th.Conn.GetSchema(t.ctx, th.Target, querypb.SchemaTableType_VIEWS, viewsUpdated)
if err != nil {
t.tracked[th.Target.Keyspace].setLoaded(false)
// TODO: optimize for the views that got errored out.
log.Warningf("error fetching new views definition for %v", viewsUpdated, err)
return false
}

t.mu.Lock()
defer t.mu.Unlock()

viewsUpdated := th.Stats.ViewSchemaChanged

// first we empty all prior schema. deleted tables will not show up in the result,
// so this is the only chance to delete
for _, view := range viewsUpdated {
t.views.delete(th.Target.Keyspace, view)
}
t.updateViews(th.Target.Keyspace, res)
err := th.Conn.GetSchema(t.ctx, th.Target, querypb.SchemaTableType_VIEWS, viewsUpdated, func(schemaRes *querypb.GetSchemaResponse) error {
t.updateViews(th.Target.Keyspace, schemaRes.TableDefinition)
return nil
})
if err != nil {
t.tracked[th.Target.Keyspace].setLoaded(false)
// TODO: optimize for the views that got errored out.
log.Warningf("error fetching new views definition for %v", viewsUpdated, err)
return false
}
return true

}

func (t *Tracker) updateViews(keyspace string, res map[string]string) {
Expand Down
10 changes: 9 additions & 1 deletion go/vt/vttablet/endtoend/framework/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,5 +410,13 @@ func (client *QueryClient) UpdateContext(ctx context.Context) {
}

func (client *QueryClient) GetSchema(tableType querypb.SchemaTableType, tableNames ...string) (map[string]string, error) {
return client.server.GetSchema(client.ctx, client.target, tableType, tableNames)
schemaDef := map[string]string{}
err := client.server.GetSchema(client.ctx, client.target, tableType, tableNames, func(schemaRes *querypb.GetSchemaResponse) error {
schemaDef = schemaRes.TableDefinition
return nil
})
if err != nil {
return nil, err
}
return schemaDef, nil
}
12 changes: 3 additions & 9 deletions go/vt/vttablet/grpcqueryservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,16 +484,10 @@ func (q *query) Release(ctx context.Context, request *querypb.ReleaseRequest) (r
}

// GetSchema implements the QueryServer interface
func (q *query) GetSchema(ctx context.Context, request *querypb.GetSchemaRequest) (response *querypb.GetSchemaResponse, err error) {
func (q *query) GetSchema(request *querypb.GetSchemaRequest, stream queryservicepb.Query_GetSchemaServer) (err error) {
defer q.server.HandlePanic(&err)
var resp map[string]string
resp, err = q.server.GetSchema(ctx, request.Target, request.TableType, request.TableNames)
if err != nil {
return nil, vterrors.ToGRPC(err)
}
return &querypb.GetSchemaResponse{
TableDefinition: resp,
}, nil
err = q.server.GetSchema(stream.Context(), request.Target, request.TableType, request.TableNames, stream.Send)
return vterrors.ToGRPC(err)
}

// Register registers the implementation on the provide gRPC Server.
Expand Down
42 changes: 32 additions & 10 deletions go/vt/vttablet/grpctabletconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,23 +1019,45 @@ func (conn *gRPCQueryClient) Release(ctx context.Context, target *querypb.Target
}

// GetSchema implements the queryservice interface
func (conn *gRPCQueryClient) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string) (map[string]string, error) {
func (conn *gRPCQueryClient) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) error {
conn.mu.RLock()
defer conn.mu.RUnlock()
if conn.cc == nil {
return nil, tabletconn.ConnClosed
return tabletconn.ConnClosed
}

req := &querypb.GetSchemaRequest{
Target: target,
TableType: tableType,
TableNames: tableNames,
}
reply, err := conn.c.GetSchema(ctx, req)
stream, err := func() (queryservicepb.Query_GetSchemaClient, error) {
conn.mu.RLock()
defer conn.mu.RUnlock()
if conn.cc == nil {
return nil, tabletconn.ConnClosed
}

stream, err := conn.c.GetSchema(ctx, &querypb.GetSchemaRequest{
Target: target,
TableType: tableType,
TableNames: tableNames,
})
if err != nil {
return nil, tabletconn.ErrorFromGRPC(err)
}
return stream, nil
}()
if err != nil {
return nil, tabletconn.ErrorFromGRPC(err)
return err
}
for {
shr, err := stream.Recv()
if err != nil {
return tabletconn.ErrorFromGRPC(err)
}
if err := callback(shr); err != nil {
if err == nil || err == io.EOF {
return nil
}
return err
}
}
return reply.TableDefinition, nil
}

// Close closes underlying gRPC channel.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/queryservice/queryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type QueryService interface {
Release(ctx context.Context, target *querypb.Target, transactionID, reservedID int64) error

// GetSchema returns the table definition for the specified tables.
GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string) (map[string]string, error)
GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) error

// Close must be called for releasing resources.
Close(ctx context.Context) error
Expand Down
7 changes: 3 additions & 4 deletions go/vt/vttablet/queryservice/wrapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,13 +324,12 @@ func (ws *wrappedService) Release(ctx context.Context, target *querypb.Target, t
})
}

func (ws *wrappedService) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string) (schemaDef map[string]string, err error) {
func (ws *wrappedService) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) (err error) {
err = ws.wrapper(ctx, target, ws.impl, "GetSchema", false, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
var innerErr error
schemaDef, innerErr = conn.GetSchema(ctx, target, tableType, tableNames)
innerErr := conn.GetSchema(ctx, target, tableType, tableNames, callback)
return canRetry(ctx, innerErr), innerErr
})
return schemaDef, err
return err
}

func (ws *wrappedService) Close(ctx context.Context) error {
Expand Down
12 changes: 6 additions & 6 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,14 +585,14 @@ func (sbc *SandboxConn) Release(ctx context.Context, target *querypb.Target, tra
}

// GetSchema implements the QueryService interface
func (sbc *SandboxConn) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string) (map[string]string, error) {
func (sbc *SandboxConn) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) error {
sbc.GetSchemaCount.Add(1)
var resp map[string]string
if len(sbc.getSchemaResult) > 0 {
resp = sbc.getSchemaResult[0]
sbc.getSchemaResult = sbc.getSchemaResult[1:]
if len(sbc.getSchemaResult) == 0 {
return nil
}
return resp, nil
resp := sbc.getSchemaResult[0]
sbc.getSchemaResult = sbc.getSchemaResult[1:]
return callback(&querypb.GetSchemaResponse{TableDefinition: resp})
}

// Close does not change ExecCount
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletconntest/fakequeryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ func (f *FakeQueryService) Release(ctx context.Context, target *querypb.Target,
}

// GetSchema implements the QueryService interface
func (f *FakeQueryService) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string) (map[string]string, error) {
func (f *FakeQueryService) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) error {
panic("implement me")
}

Expand Down
Loading