Skip to content

Commit

Permalink
changefeedccl: plumb context to sink creation
Browse files Browse the repository at this point in the history
Release note: none.
  • Loading branch information
dt committed Feb 24, 2020
1 parent 9fc70cc commit 8820c13
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 17 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
nodeID := ca.flowCtx.EvalCtx.NodeID
var err error
if ca.sink, err = getSink(
ca.spec.Feed.SinkURI, nodeID, ca.spec.Feed.Opts, ca.spec.Feed.Targets,
ctx, ca.spec.Feed.SinkURI, nodeID, ca.spec.Feed.Opts, ca.spec.Feed.Targets,
ca.flowCtx.Cfg.Settings, timestampOracle, ca.flowCtx.Cfg.ExternalStorageFromURI,
); err != nil {
err = MarkRetryableError(err)
Expand Down Expand Up @@ -481,7 +481,7 @@ func (cf *changeFrontier) Start(ctx context.Context) context.Context {
// but the oracle is only used when emitting row updates.
var nilOracle timestampLowerBoundOracle
if cf.sink, err = getSink(
cf.spec.Feed.SinkURI, nodeID, cf.spec.Feed.Opts, cf.spec.Feed.Targets,
ctx, cf.spec.Feed.SinkURI, nodeID, cf.spec.Feed.Opts, cf.spec.Feed.Targets,
cf.flowCtx.Cfg.Settings, nilOracle, cf.flowCtx.Cfg.ExternalStorageFromURI,
); err != nil {
err = MarkRetryableError(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func changefeedPlanHook(
nodeID := p.ExtendedEvalContext().NodeID
var nilOracle timestampLowerBoundOracle
canarySink, err := getSink(
details.SinkURI, nodeID, details.Opts, details.Targets,
ctx, details.SinkURI, nodeID, details.Opts, details.Targets,
settings, nilOracle, p.ExecCfg().DistSQLSrv.ExternalStorageFromURI,
)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type Sink interface {
}

func getSink(
ctx context.Context,
sinkURI string,
nodeID roachpb.NodeID,
opts map[string]string,
Expand Down Expand Up @@ -189,7 +190,7 @@ func getSink(
q = url.Values{}
makeSink = func() (Sink, error) {
return makeCloudStorageSink(
u.String(), nodeID, fileSize, settings,
ctx, u.String(), nodeID, fileSize, settings,
opts, timestampOracle, makeExternalStorageFromURI,
)
}
Expand Down Expand Up @@ -232,7 +233,10 @@ type errorWrapperSink struct {
}

func (s errorWrapperSink) EmitRow(
ctx context.Context, table *sqlbase.TableDescriptor, key, value []byte, updated hlc.Timestamp,
ctx context.Context,
table *sqlbase.TableDescriptor,
key, value []byte,
updated hlc.Timestamp,
) error {
if err := s.wrapped.EmitRow(ctx, table, key, value, updated); err != nil {
return MarkRetryableError(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ type cloudStorageSink struct {
var cloudStorageSinkIDAtomic int64

func makeCloudStorageSink(
ctx context.Context,
baseURI string,
nodeID roachpb.NodeID,
targetMaxFileSize int64,
Expand Down Expand Up @@ -333,7 +334,6 @@ func makeCloudStorageSink(
return nil, errors.Errorf(`this sink requires the WITH %s option`, changefeedbase.OptKeyInValue)
}

ctx := context.TODO()
var err error
if s.es, err = makeExternalStorageFromURI(ctx, baseURI); err != nil {
return nil, err
Expand Down
22 changes: 11 additions & 11 deletions pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestCloudStorageSink(t *testing.T) {
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}
sinkDir := `golden`
s, err := makeCloudStorageSink(
`nodelocal:///`+sinkDir, 1, unlimitedFileSize,
ctx, `nodelocal:///`+sinkDir, 1, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
)
require.NoError(t, err)
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestCloudStorageSink(t *testing.T) {
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}
dir := `single-node`
s, err := makeCloudStorageSink(
`nodelocal:///`+dir, 1, unlimitedFileSize,
ctx, `nodelocal:///`+dir, 1, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
)
require.NoError(t, err)
Expand Down Expand Up @@ -188,12 +188,12 @@ func TestCloudStorageSink(t *testing.T) {
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}
dir := `multi-node`
s1, err := makeCloudStorageSink(
`nodelocal:///`+dir, 1, unlimitedFileSize,
ctx, `nodelocal:///`+dir, 1, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
)
require.NoError(t, err)
s2, err := makeCloudStorageSink(
`nodelocal:///`+dir, 2, unlimitedFileSize,
ctx, `nodelocal:///`+dir, 2, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
)
require.NoError(t, err)
Expand Down Expand Up @@ -222,12 +222,12 @@ func TestCloudStorageSink(t *testing.T) {
// this happens before checkpointing, some data is written again but
// this is unavoidable.
s1R, err := makeCloudStorageSink(
`nodelocal:///`+dir, 1, unlimitedFileSize,
ctx, `nodelocal:///`+dir, 1, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
)
require.NoError(t, err)
s2R, err := makeCloudStorageSink(
`nodelocal:///`+dir, 2, unlimitedFileSize,
ctx, `nodelocal:///`+dir, 2, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
)
require.NoError(t, err)
Expand Down Expand Up @@ -268,14 +268,14 @@ func TestCloudStorageSink(t *testing.T) {
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}
dir := `zombie`
s1, err := makeCloudStorageSink(
`nodelocal:///`+dir, 1, unlimitedFileSize,
ctx, `nodelocal:///`+dir, 1, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
)
require.NoError(t, err)
s1.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID.
s1.(*cloudStorageSink).jobSessionID = "a" // Force deterministic job session ID.
s2, err := makeCloudStorageSink(
`nodelocal:///`+dir, 1, unlimitedFileSize,
ctx, `nodelocal:///`+dir, 1, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
)
require.NoError(t, err)
Expand Down Expand Up @@ -310,7 +310,7 @@ func TestCloudStorageSink(t *testing.T) {
dir := `bucketing`
const targetMaxFileSize = 6
s, err := makeCloudStorageSink(
`nodelocal:///`+dir, 1, targetMaxFileSize,
ctx, `nodelocal:///`+dir, 1, targetMaxFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
)
require.NoError(t, err)
Expand Down Expand Up @@ -397,7 +397,7 @@ func TestCloudStorageSink(t *testing.T) {
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}
dir := `file-ordering`
s, err := makeCloudStorageSink(
`nodelocal:///`+dir, 1, unlimitedFileSize,
ctx, `nodelocal:///`+dir, 1, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
)

Expand Down Expand Up @@ -456,7 +456,7 @@ func TestCloudStorageSink(t *testing.T) {
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}
dir := `ordering-among-schema-versions`
var targetMaxFileSize int64 = 10
s, err := makeCloudStorageSink(`nodelocal:///`+dir, 1, targetMaxFileSize, settings,
s, err := makeCloudStorageSink(ctx, `nodelocal:///`+dir, 1, targetMaxFileSize, settings,
opts, timestampOracle, externalStorageFromURI)
require.NoError(t, err)

Expand Down

0 comments on commit 8820c13

Please sign in to comment.