Skip to content

Commit

Permalink
changefeedccl: Add timeout to testfeed library.
Browse files Browse the repository at this point in the history
Add timeout to testfeed sink implementations to timeout
if no messages are received for too long.

Release Notes: None
  • Loading branch information
Yevgeniy Miretskiy committed Jul 7, 2022
1 parent 3b22cdd commit 0507685
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ go_test(
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
Expand Down
23 changes: 22 additions & 1 deletion pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/json"
Expand Down Expand Up @@ -127,16 +129,26 @@ type sinklessFeed struct {

conn *pgx.Conn
rows pgx.Rows
cancel context.CancelFunc
latestResolved hlc.Timestamp
}

var _ cdctest.TestFeed = (*sinklessFeed)(nil)

func deadline() time.Duration {
if util.RaceEnabled {
return time.Minute
}
return 30 * time.Second
}

// Partitions implements the TestFeed interface.
func (c *sinklessFeed) Partitions() []string { return []string{`sinkless`} }

// Next implements the TestFeed interface.
func (c *sinklessFeed) Next() (*cdctest.TestFeedMessage, error) {
defer time.AfterFunc(deadline(), c.cancel).Stop()

m := &cdctest.TestFeedMessage{Partition: `sinkless`}
for {
if !c.rows.Next() {
Expand Down Expand Up @@ -168,7 +180,8 @@ func (c *sinklessFeed) Next() (*cdctest.TestFeedMessage, error) {

// Resume implements the TestFeed interface.
func (c *sinklessFeed) start() error {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
var err error
c.conn, err = pgx.ConnectConfig(ctx, c.connCfg)
if err != nil {
Expand Down Expand Up @@ -707,6 +720,8 @@ func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) {
}

select {
case <-time.After(deadline()):
return nil, &contextutil.TimeoutError{}
case <-c.ss.eventReady():
case <-c.shutdown:
return nil, c.terminalJobError()
Expand Down Expand Up @@ -960,6 +975,8 @@ func (c *cloudFeed) Next() (*cdctest.TestFeedMessage, error) {
}

select {
case <-time.After(deadline()):
return nil, &contextutil.TimeoutError{}
case <-c.ss.eventReady():
case <-c.shutdown:
return nil, c.terminalJobError()
Expand Down Expand Up @@ -1541,6 +1558,8 @@ func (f *webhookFeed) Next() (*cdctest.TestFeedMessage, error) {
}

select {
case <-time.After(deadline()):
return nil, &contextutil.TimeoutError{}
case <-f.ss.eventReady():
case <-f.shutdown:
return nil, f.terminalJobError()
Expand Down Expand Up @@ -1784,6 +1803,8 @@ func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) {
return m, nil
}
select {
case <-time.After(deadline()):
return nil, &contextutil.TimeoutError{}
case <-p.ss.eventReady():
case <-p.shutdown:
return nil, p.terminalJobError()
Expand Down

0 comments on commit 0507685

Please sign in to comment.