From 05076858549f97ff8f82d5123e4fcfe358f334d2 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Wed, 6 Jul 2022 14:10:06 -0400 Subject: [PATCH] changefeedccl: Add timeout to testfeed library. Add timeout to testfeed sink implementations to timeout if no messages are received for too long. Release Notes: None --- pkg/ccl/changefeedccl/BUILD.bazel | 2 ++ pkg/ccl/changefeedccl/testfeed_test.go | 23 ++++++++++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 5218bc69328b..2f27895defac 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -229,6 +229,8 @@ go_test( "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", + "//pkg/util", + "//pkg/util/contextutil", "//pkg/util/ctxgroup", "//pkg/util/encoding", "//pkg/util/hlc", diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 00ffdd37ba5c..cb00cf9b4c73 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -42,6 +42,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/json" @@ -127,16 +129,26 @@ type sinklessFeed struct { conn *pgx.Conn rows pgx.Rows + cancel context.CancelFunc latestResolved hlc.Timestamp } var _ cdctest.TestFeed = (*sinklessFeed)(nil) +func deadline() time.Duration { + if util.RaceEnabled { + return time.Minute + } + return 30 * time.Second +} + // Partitions implements the TestFeed interface. func (c *sinklessFeed) Partitions() []string { return []string{`sinkless`} } // Next implements the TestFeed interface. func (c *sinklessFeed) Next() (*cdctest.TestFeedMessage, error) { + defer time.AfterFunc(deadline(), c.cancel).Stop() + m := &cdctest.TestFeedMessage{Partition: `sinkless`} for { if !c.rows.Next() { @@ -168,7 +180,8 @@ func (c *sinklessFeed) Next() (*cdctest.TestFeedMessage, error) { // Resume implements the TestFeed interface. func (c *sinklessFeed) start() error { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + c.cancel = cancel var err error c.conn, err = pgx.ConnectConfig(ctx, c.connCfg) if err != nil { @@ -707,6 +720,8 @@ func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) { } select { + case <-time.After(deadline()): + return nil, &contextutil.TimeoutError{} case <-c.ss.eventReady(): case <-c.shutdown: return nil, c.terminalJobError() @@ -960,6 +975,8 @@ func (c *cloudFeed) Next() (*cdctest.TestFeedMessage, error) { } select { + case <-time.After(deadline()): + return nil, &contextutil.TimeoutError{} case <-c.ss.eventReady(): case <-c.shutdown: return nil, c.terminalJobError() @@ -1541,6 +1558,8 @@ func (f *webhookFeed) Next() (*cdctest.TestFeedMessage, error) { } select { + case <-time.After(deadline()): + return nil, &contextutil.TimeoutError{} case <-f.ss.eventReady(): case <-f.shutdown: return nil, f.terminalJobError() @@ -1784,6 +1803,8 @@ func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) { return m, nil } select { + case <-time.After(deadline()): + return nil, &contextutil.TimeoutError{} case <-p.ss.eventReady(): case <-p.shutdown: return nil, p.terminalJobError()