Skip to content

Commit

Permalink
Merge #29206 #29402
Browse files Browse the repository at this point in the history
29206: ui: allow long table rows to wrap r=couchand a=couchand

Fixes: #29132

Release note (admin ui change): Allow long table rows to wrap, if necessary.

Before:
<img width="1351" alt="screen shot 2018-08-27 at 12 19 04 pm" src="https://user-images.githubusercontent.com/7085343/44674574-8e0a8000-a9f3-11e8-8b2a-72358f65c098.png">

After:
<img width="1159" alt="screen shot 2018-08-28 at 3 30 49 pm" src="https://user-images.githubusercontent.com/793969/44746086-67be1080-aad7-11e8-8616-158de5354fe1.png">


29402: changefeedccl,jobspb: select initial scan timestamp on gateway r=mrtracy a=danhhz

Fixes a buglet introduced in #28984 where each node in a distsql
changefeed flow would pick its own timestamp to do the initial scan at.

I thought I got all the backward-incompatible changes out of the way
with #27996 but I didn't forsee this one. While I'm making a
backward-incompatible change, switch targets from a `id->string` map to
an `id->proto` map for future expansion. Row and partition watches, for
example, will need this.

Release note (backward-incompatible change): CHANGEFEEDs created with
previous betas and alphas will not work with this version.

Co-authored-by: Andrew Couch <[email protected]>
Co-authored-by: Daniel Harrison <[email protected]>
  • Loading branch information
3 people committed Sep 4, 2018
3 parents 55e3da6 + a004ddb + ba1a92f commit 07ebdf1
Show file tree
Hide file tree
Showing 11 changed files with 455 additions and 250 deletions.
14 changes: 9 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,17 @@ func distChangefeedFlow(
return err
}

var highWater hlc.Timestamp
if h := progress.GetHighWater(); h != nil {
highWater = *h
spansTS := details.StatementTime
var initialHighWater hlc.Timestamp
if h := progress.GetHighWater(); h != nil && *h != (hlc.Timestamp{}) {
initialHighWater = *h
// If we have a high-water set, use it to compute the spans, since the
// ones at the statement time may have been garbage collected by now.
spansTS = initialHighWater
}

execCfg := phs.ExecCfg()
trackedSpans, err := fetchSpansForTargets(ctx, execCfg.DB, details.Targets, highWater)
trackedSpans, err := fetchSpansForTargets(ctx, execCfg.DB, details.Targets, spansTS)
if err != nil {
return err
}
Expand Down Expand Up @@ -107,7 +111,7 @@ func distChangefeedFlow(
for i, nodeSpan := range nodeSpans {
watches[i] = distsqlrun.ChangeAggregatorSpec_Watch{
Span: nodeSpan,
InitialResolved: highWater,
InitialResolved: initialHighWater,
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func newChangeAggregatorProcessor(
buf := makeBuffer()
ca.poller = makePoller(
flowCtx.Settings, flowCtx.ClientDB, flowCtx.ClientDB.Clock(), flowCtx.Gossip, spans,
spec.Feed.Targets, initialHighWater, buf)
spec.Feed, initialHighWater, buf)
rowsFn := kvsToRows(flowCtx.LeaseManager.(*sql.LeaseManager), spec.Feed, buf.Get)
ca.tickFn = emitEntries(spec.Feed, ca.sink, rowsFn)

Expand Down
28 changes: 14 additions & 14 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,15 @@ func changefeedPlanHook(
return err
}

now := p.ExecCfg().Clock.Now()
var highWater hlc.Timestamp
statementTime := p.ExecCfg().Clock.Now()
var initialHighWater hlc.Timestamp
if cursor, ok := opts[optCursor]; ok {
asOf := tree.AsOfClause{Expr: tree.NewStrVal(cursor)}
var err error
if highWater, err = p.EvalAsOfTimestamp(asOf, now); err != nil {
if initialHighWater, err = p.EvalAsOfTimestamp(asOf, statementTime); err != nil {
return err
}
statementTime = initialHighWater
}

// For now, disallow targeting a database or wildcard table selection.
Expand All @@ -157,32 +158,31 @@ func changefeedPlanHook(
}

// This grabs table descriptors once to get their ids.
descriptorTime := now
if highWater != (hlc.Timestamp{}) {
descriptorTime = highWater
}
targetDescs, _, err := backupccl.ResolveTargetsToDescriptors(
ctx, p, descriptorTime, changefeedStmt.Targets)
ctx, p, statementTime, changefeedStmt.Targets)
if err != nil {
return err
}
targets := make(map[sqlbase.ID]string, len(targetDescs))
targets := make(jobspb.ChangefeedTargets, len(targetDescs))
for _, desc := range targetDescs {
if tableDesc := desc.GetTable(); tableDesc != nil {
if err := validateChangefeedTable(tableDesc); err != nil {
return err
}
targets[tableDesc.ID] = tableDesc.Name
targets[tableDesc.ID] = jobspb.ChangefeedTarget{
StatementTimeName: tableDesc.Name,
}
}
}

details := jobspb.ChangefeedDetails{
Targets: targets,
Opts: opts,
SinkURI: sinkURI,
Targets: targets,
Opts: opts,
SinkURI: sinkURI,
StatementTime: statementTime,
}
progress := jobspb.Progress{
Progress: &jobspb.Progress_HighWater{HighWater: &highWater},
Progress: &jobspb.Progress_HighWater{HighWater: &initialHighWater},
Details: &jobspb.Progress_Changefeed{
Changefeed: &jobspb.ChangefeedProgress{},
},
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ func createBenchmarkChangefeed(
tableDesc := sqlbase.GetTableDescriptor(s.DB(), database, table)
spans := []roachpb.Span{tableDesc.PrimaryIndexSpan()}
details := jobspb.ChangefeedDetails{
Targets: map[sqlbase.ID]string{tableDesc.ID: tableDesc.Name},
Targets: jobspb.ChangefeedTargets{tableDesc.ID: jobspb.ChangefeedTarget{
StatementTimeName: tableDesc.Name,
}},
Opts: map[string]string{
optEnvelope: string(optEnvelopeRow),
},
Expand All @@ -104,8 +106,7 @@ func createBenchmarkChangefeed(

buf := makeBuffer()
poller := makePoller(
s.ClusterSettings(), s.DB(), feedClock, s.Gossip(), spans, details.Targets,
initialHighWater, buf)
s.ClusterSettings(), s.DB(), feedClock, s.Gossip(), spans, details, initialHighWater, buf)

rowsFn := kvsToRows(s.LeaseManager().(*sql.LeaseManager), details, buf.Get)
tickFn := emitEntries(details, sink, rowsFn)
Expand Down
54 changes: 26 additions & 28 deletions pkg/ccl/changefeedccl/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl/intervalccl"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -44,7 +45,7 @@ type poller struct {
clock *hlc.Clock
gossip *gossip.Gossip
spans []roachpb.Span
targets map[sqlbase.ID]string
details jobspb.ChangefeedDetails
buf *buffer

highWater hlc.Timestamp
Expand All @@ -56,45 +57,44 @@ func makePoller(
clock *hlc.Clock,
gossip *gossip.Gossip,
spans []roachpb.Span,
targets map[sqlbase.ID]string,
startTime hlc.Timestamp,
details jobspb.ChangefeedDetails,
highWater hlc.Timestamp,
buf *buffer,
) *poller {
return &poller{
settings: settings,
db: db,
clock: clock,
gossip: gossip,
highWater: startTime,
highWater: highWater,
spans: spans,
targets: targets,
details: details,
buf: buf,
}
}

func fetchSpansForTargets(
ctx context.Context, db *client.DB, targets map[sqlbase.ID]string, ts hlc.Timestamp,
ctx context.Context, db *client.DB, targets jobspb.ChangefeedTargets, ts hlc.Timestamp,
) ([]roachpb.Span, error) {
var spans []roachpb.Span
err := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
spans = nil
if ts != (hlc.Timestamp{}) {
txn.SetFixedTimestamp(ctx, ts)
}
txn.SetFixedTimestamp(ctx, ts)
// Note that all targets are currently guaranteed to be tables.
for tableID, origName := range targets {
for tableID, t := range targets {
tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, tableID)
if err != nil {
if errors.Cause(err) == sqlbase.ErrDescriptorNotFound {
return errors.Errorf(`"%s" was dropped or truncated`, origName)
return errors.Errorf(`"%s" was dropped or truncated`, t.StatementTimeName)
}
return err
}
if tableDesc.State == sqlbase.TableDescriptor_DROP {
return errors.Errorf(`"%s" was dropped or truncated`, origName)
return errors.Errorf(`"%s" was dropped or truncated`, t.StatementTimeName)
}
if tableDesc.Name != origName {
return errors.Errorf(`"%s" was renamed to "%s"`, origName, tableDesc.Name)
if tableDesc.Name != t.StatementTimeName {
return errors.Errorf(
`"%s" was renamed to "%s"`, t.StatementTimeName, tableDesc.Name)
}
if err := validateChangefeedTable(tableDesc); err != nil {
return err
Expand Down Expand Up @@ -137,7 +137,7 @@ func (p *poller) Run(ctx context.Context) error {
log.VEventf(ctx, 1, `changefeed poll [%s,%s): %s`,
p.highWater, nextHighWater, time.Duration(nextHighWater.WallTime-p.highWater.WallTime))

_, err := fetchSpansForTargets(ctx, p.db, p.targets, nextHighWater)
_, err := fetchSpansForTargets(ctx, p.db, p.details.Targets, nextHighWater)
if err != nil {
return err
}
Expand Down Expand Up @@ -301,22 +301,15 @@ func (p *poller) slurpSST(ctx context.Context, sst []byte) error {
// TODO(nvanbenschoten): this should probably be a whole different type that
// shares a common interface with poller.
func (p *poller) runUsingRangefeeds(ctx context.Context) error {
startTS := p.highWater
spans, err := fetchSpansForTargets(ctx, p.db, p.targets, startTS)
if err != nil {
return err
}

g := ctxgroup.WithContext(ctx)
sender := p.db.NonTransactionalSender()
if startTS == (hlc.Timestamp{}) {
startTS = p.clock.Now()

if p.highWater == (hlc.Timestamp{}) {
// TODO(nvanbenschoten/danhhz): This should be replaced by a series of
// ScanRequests and this structure should be completely reworked. Right
// now it's copied verbatim from above.
var ranges []roachpb.RangeDescriptor
if err := p.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
txn.SetFixedTimestamp(ctx, p.details.StatementTime)
var err error
ranges, err = allRangeDescriptors(ctx, txn)
return err
Expand Down Expand Up @@ -378,7 +371,7 @@ func (p *poller) runUsingRangefeeds(ctx context.Context) error {
if log.V(2) {
log.Infof(ctx, `sending ExportRequest [%s,%s)`, span.Key, span.EndKey)
}
header := roachpb.Header{Timestamp: startTS}
header := roachpb.Header{Timestamp: p.details.StatementTime}
req := &roachpb.ExportRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(span),
StartTime: hlc.Timestamp{},
Expand All @@ -401,20 +394,25 @@ func (p *poller) runUsingRangefeeds(ctx context.Context) error {
return err
}
}
return p.buf.AddResolved(ctx, span, startTS)
return p.buf.AddResolved(ctx, span, p.details.StatementTime)
})
}
return nil
})
}

rangeFeedTS := p.details.StatementTime
if rangeFeedTS.Less(p.highWater) {
rangeFeedTS = p.highWater
}

// TODO(nvanbenschoten): This is horrible.
ds := sender.(*client.CrossRangeTxnWrapperSender).Wrapped().(*kv.DistSender)
eventC := make(chan *roachpb.RangeFeedEvent, 128)
for _, span := range spans {
for _, span := range p.spans {
req := &roachpb.RangeFeedRequest{
Header: roachpb.Header{
Timestamp: startTS,
Timestamp: rangeFeedTS,
},
Span: span,
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"

"github.com/Shopify/sarama"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -47,7 +48,7 @@ type Sink interface {
Close() error
}

func getSink(sinkURI string, targets map[sqlbase.ID]string) (Sink, error) {
func getSink(sinkURI string, targets jobspb.ChangefeedTargets) (Sink, error) {
u, err := url.Parse(sinkURI)
if err != nil {
return nil, err
Expand Down Expand Up @@ -96,14 +97,14 @@ type kafkaSink struct {
}

func getKafkaSink(
kafkaTopicPrefix string, bootstrapServers string, targets map[sqlbase.ID]string,
kafkaTopicPrefix string, bootstrapServers string, targets jobspb.ChangefeedTargets,
) (Sink, error) {
sink := &kafkaSink{
kafkaTopicPrefix: kafkaTopicPrefix,
}
sink.topics = make(map[string]struct{})
for _, tableName := range targets {
sink.topics[kafkaTopicPrefix+tableName] = struct{}{}
for _, t := range targets {
sink.topics[kafkaTopicPrefix+t.StatementTimeName] = struct{}{}
}

config := sarama.NewConfig()
Expand Down Expand Up @@ -354,7 +355,7 @@ type sqlSink struct {
rowBuf []interface{}
}

func makeSQLSink(uri, tableName string, targets map[sqlbase.ID]string) (*sqlSink, error) {
func makeSQLSink(uri, tableName string, targets jobspb.ChangefeedTargets) (*sqlSink, error) {
if u, err := url.Parse(uri); err != nil {
return nil, err
} else if u.Path == `` {
Expand All @@ -374,8 +375,8 @@ func makeSQLSink(uri, tableName string, targets map[sqlbase.ID]string) (*sqlSink
topics: make(map[string]struct{}),
hasher: fnv.New32a(),
}
for _, tableName := range targets {
s.topics[tableName] = struct{}{}
for _, t := range targets {
s.topics[t.StatementTimeName] = struct{}{}
}
return s, nil
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/changefeedccl/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (

"github.com/Shopify/sarama"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -141,7 +141,10 @@ func TestSQLSink(t *testing.T) {
defer cleanup()
sinkURL.Path = `d`

targets := map[sqlbase.ID]string{0: `foo`, 1: `bar`}
targets := jobspb.ChangefeedTargets{
0: jobspb.ChangefeedTarget{StatementTimeName: `foo`},
1: jobspb.ChangefeedTarget{StatementTimeName: `bar`},
}
sink, err := makeSQLSink(sinkURL.String(), `sink`, targets)
require.NoError(t, err)
defer func() { require.NoError(t, sink.Close()) }()
Expand Down
Loading

0 comments on commit 07ebdf1

Please sign in to comment.