diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 2a89398fd1d6..12809d60aca3 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -1019,7 +1019,7 @@ func makeFeedFactoryWithOptions( } sink, cleanup := getInitialSinkForSinklessFactory(t, db, pgURLForUserSinkless) root, cleanupRoot := pgURLForUserSinkless(username.RootUser) - f := makeSinklessFeedFactory(s, sink, root, pgURLForUserSinkless) + f := makeSinklessFeedFactory(t, s, sink, root, pgURLForUserSinkless) return f, func() { cleanup() cleanupRoot() diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 3a0791c1695d..b3af6bafbaeb 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -25,6 +25,7 @@ import ( "strings" "sync" "sync/atomic" + "testing" "time" pubsubv1 "cloud.google.com/go/pubsub/apiv1" @@ -68,6 +69,7 @@ import ( ) type sinklessFeedFactory struct { + t *testing.T s serverutils.ApplicationLayerInterface // postgres url used for creating sinkless changefeeds. This may be the same as // the rootURL. @@ -80,9 +82,19 @@ type sinklessFeedFactory struct { // makeSinklessFeedFactory returns a TestFeedFactory implementation using the // `experimental-sql` uri. func makeSinklessFeedFactory( - s serverutils.ApplicationLayerInterface, sink url.URL, rootConn url.URL, sinkForUser sinkForUser, + t *testing.T, + s serverutils.ApplicationLayerInterface, + sink url.URL, + rootConn url.URL, + sinkForUser sinkForUser, ) cdctest.TestFeedFactory { - return &sinklessFeedFactory{s: s, sink: sink, rootURL: rootConn, sinkForUser: sinkForUser} + return &sinklessFeedFactory{ + t: t, + s: s, + sink: sink, + rootURL: rootConn, + sinkForUser: sinkForUser, + } } // AsUser executes fn as the specified user. @@ -133,6 +145,7 @@ func (f *sinklessFeedFactory) Feed(create string, args ...interface{}) (cdctest. return nil, err } s := &sinklessFeed{ + t: f.t, seenTrackerMap: make(map[string]struct{}), create: create, args: args, @@ -178,6 +191,7 @@ func (t seenTrackerMap) reset() { // sinklessFeed is an implementation of the `TestFeed` interface for a // "sinkless" (results returned over pgwire) feed. type sinklessFeed struct { + t *testing.T seenTrackerMap create string args []interface{} @@ -203,6 +217,7 @@ func (c *sinklessFeed) Partitions() []string { return []string{`sinkless`} } // Next implements the TestFeed interface. func (c *sinklessFeed) Next() (*cdctest.TestFeedMessage, error) { defer time.AfterFunc(timeout(), func() { + c.t.Logf("sinkless feed closing connection due to timeout (%s): %s", timeout(), c.create) _ = c.conn.Close(context.Background()) }).Stop() @@ -242,22 +257,23 @@ func (c *sinklessFeed) start() (err error) { return err } - create := c.create if !c.latestResolved.IsEmpty() { // NB: The TODO in Next means c.latestResolved is currently never set for // non-json feeds. - if strings.Contains(create, `WITH`) { - create += fmt.Sprintf(`, cursor='%s'`, c.latestResolved.AsOfSystemTime()) + if strings.Contains(c.create, `WITH`) { + c.create += fmt.Sprintf(`, cursor='%s'`, c.latestResolved.AsOfSystemTime()) } else { - create += fmt.Sprintf(` WITH cursor='%s'`, c.latestResolved.AsOfSystemTime()) + c.create += fmt.Sprintf(` WITH cursor='%s'`, c.latestResolved.AsOfSystemTime()) } } - c.rows, err = c.conn.Query(context.Background(), create, c.args...) + c.t.Logf("sinkless feed creating changefeed: %s", c.create) + c.rows, err = c.conn.Query(context.Background(), c.create, c.args...) return err } // Close implements the TestFeed interface. func (c *sinklessFeed) Close() error { + c.t.Logf("closing sinkless feed") c.rows = nil return c.conn.Close(context.Background()) }