Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Merge pull request vitessio#5736 from planetscale/ss-vstream-defaults
Browse files Browse the repository at this point in the history
vstream: support default values
  • Loading branch information
deepthi authored Jan 20, 2020
2 parents 8c6fd67 + afcc462 commit 1a2cddf
Show file tree
Hide file tree
Showing 7 changed files with 783 additions and 479 deletions.
151 changes: 0 additions & 151 deletions go/vt/vtgate/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,14 @@ import (
"reflect"
"sort"
"strings"
"sync"

"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/gateway"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
Expand Down Expand Up @@ -344,152 +339,6 @@ func (res *Resolver) UpdateStream(ctx context.Context, keyspace string, shard st
})
}

// VStream streams events from one target. This function ensures that events of each
// transaction are streamed together, along with the corresponding GTID.
func (res *Resolver) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, send func(events []*binlogdatapb.VEvent) error) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// mu protects sending on ch, err and positions.
// mu is needed for sending because transactions can come
// in separate chunks. If so, we have to send all the
// chunks together.
var mu sync.Mutex
ch := make(chan []*binlogdatapb.VEvent)
var outerErr error

positions := make(map[topo.KeyspaceShard]*binlogdatapb.ShardGtid, len(vgtid.ShardGtids))
for _, shardPos := range vgtid.ShardGtids {
ks := topo.KeyspaceShard{Keyspace: shardPos.Keyspace, Shard: shardPos.Shard}
positions[ks] = shardPos
}

var loopwg, wg sync.WaitGroup
// Make sure goroutines don't start until loop has exited.
// Otherwise there's a race because the goroutines update the map.
loopwg.Add(1)
for ks, pos := range positions {
wg.Add(1)
go func(ks topo.KeyspaceShard, pos string) {
loopwg.Wait()
defer wg.Done()
err := res.vstreamOneShard(ctx, ks.Keyspace, ks.Shard, tabletType, pos, filter, func(eventss [][]*binlogdatapb.VEvent) error {
mu.Lock()
defer mu.Unlock()

// Send all chunks while holding the lock.
for _, evs := range eventss {
// Replace GTID and table names.
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_GTID:
// Update the VGtid and send that instead.
positions[ks].Gtid = ev.Gtid
ev.Type = binlogdatapb.VEventType_VGTID
ev.Gtid = ""
ev.Vgtid = proto.Clone(vgtid).(*binlogdatapb.VGtid)
case binlogdatapb.VEventType_FIELD:
ev.FieldEvent.TableName = ks.Keyspace + "." + ev.FieldEvent.TableName
case binlogdatapb.VEventType_ROW:
ev.RowEvent.TableName = ks.Keyspace + "." + ev.RowEvent.TableName
}
}
select {
case <-ctx.Done():
return ctx.Err()
case ch <- evs:
}
}
return nil
})

// Set the error on exit. First one wins.
mu.Lock()
defer mu.Unlock()
if outerErr == nil {
outerErr = err
cancel()
}
}(ks, pos.Gtid)
}
// Allow goroutines to start.
loopwg.Done()

go func() {
wg.Wait()
close(ch)
}()

for ev := range ch {
if err := send(ev); err != nil {
return err
}
}

return outerErr
}

// vstreamOneShard streams from one shard. If transactions come in separate chunks, they are grouped and sent.
func (res *Resolver) vstreamOneShard(ctx context.Context, keyspace, shard string, tabletType topodatapb.TabletType, startPos string, filter *binlogdatapb.Filter, send func(eventss [][]*binlogdatapb.VEvent) error) error {
errCount := 0
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

var eventss [][]*binlogdatapb.VEvent
rss, err := res.resolver.ResolveDestination(ctx, keyspace, tabletType, key.DestinationShard(shard))
if err != nil {
return err
}
if len(rss) != 1 {
// Unreachable.
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected number or shards: %v", rss)
}
err = rss[0].QueryService.VStream(ctx, rss[0].Target, startPos, filter, func(events []*binlogdatapb.VEvent) error {
// Remove all heartbeat events for now.
// Otherwise they can accumulate indefinitely if there are no real events.
// TODO(sougou): figure out a model for this.
for i := 0; i < len(events); i++ {
if events[i].Type == binlogdatapb.VEventType_HEARTBEAT {
events = append(events[:i], events[i+1:]...)
}
}
if len(events) == 0 {
return nil
}
// We received a valid event. Reset error count.
errCount = 0

eventss = append(eventss, events)
lastEvent := events[len(events)-1]
switch lastEvent.Type {
case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL:
if err := send(eventss); err != nil {
return err
}
eventss = nil
}
return nil
})
if err == nil {
// Unreachable.
err = vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "vstream ended unexpectedly")
}
if !isRetryableError(err) {
log.Errorf("vstream for %s/%s error: %v", keyspace, shard, err)
return err
}
errCount++
if errCount >= 3 {
log.Errorf("vstream for %s/%s had three consecutive failures: %v", keyspace, shard, err)
return err
}
}
}

// GetGatewayCacheStatus returns a displayable version of the Gateway cache.
func (res *Resolver) GetGatewayCacheStatus() gateway.TabletCacheStatusList {
return res.scatterConn.GetGatewayCacheStatus()
Expand Down
Loading

0 comments on commit 1a2cddf

Please sign in to comment.