Skip to content

Commit

Permalink
changefeedccl: add some debug logging to the test sinkless feed
Browse files Browse the repository at this point in the history
This patch adds some debug logging to the test sinkless feed to
help with debugging test failures.

Release note: None
  • Loading branch information
andyyang890 committed Jul 24, 2024
1 parent a7ec4dd commit 8051c01
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ func makeFeedFactoryWithOptions(
}
sink, cleanup := getInitialSinkForSinklessFactory(t, db, pgURLForUserSinkless)
root, cleanupRoot := pgURLForUserSinkless(username.RootUser)
f := makeSinklessFeedFactory(s, sink, root, pgURLForUserSinkless)
f := makeSinklessFeedFactory(t, s, sink, root, pgURLForUserSinkless)
return f, func() {
cleanup()
cleanupRoot()
Expand Down
30 changes: 23 additions & 7 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"
"sync"
"sync/atomic"
"testing"
"time"

pubsubv1 "cloud.google.com/go/pubsub/apiv1"
Expand Down Expand Up @@ -68,6 +69,7 @@ import (
)

type sinklessFeedFactory struct {
t *testing.T
s serverutils.ApplicationLayerInterface
// postgres url used for creating sinkless changefeeds. This may be the same as
// the rootURL.
Expand All @@ -80,9 +82,19 @@ type sinklessFeedFactory struct {
// makeSinklessFeedFactory returns a TestFeedFactory implementation using the
// `experimental-sql` uri.
func makeSinklessFeedFactory(
s serverutils.ApplicationLayerInterface, sink url.URL, rootConn url.URL, sinkForUser sinkForUser,
t *testing.T,
s serverutils.ApplicationLayerInterface,
sink url.URL,
rootConn url.URL,
sinkForUser sinkForUser,
) cdctest.TestFeedFactory {
return &sinklessFeedFactory{s: s, sink: sink, rootURL: rootConn, sinkForUser: sinkForUser}
return &sinklessFeedFactory{
t: t,
s: s,
sink: sink,
rootURL: rootConn,
sinkForUser: sinkForUser,
}
}

// AsUser executes fn as the specified user.
Expand Down Expand Up @@ -133,6 +145,7 @@ func (f *sinklessFeedFactory) Feed(create string, args ...interface{}) (cdctest.
return nil, err
}
s := &sinklessFeed{
t: f.t,
seenTrackerMap: make(map[string]struct{}),
create: create,
args: args,
Expand Down Expand Up @@ -178,6 +191,7 @@ func (t seenTrackerMap) reset() {
// sinklessFeed is an implementation of the `TestFeed` interface for a
// "sinkless" (results returned over pgwire) feed.
type sinklessFeed struct {
t *testing.T
seenTrackerMap
create string
args []interface{}
Expand All @@ -203,6 +217,7 @@ func (c *sinklessFeed) Partitions() []string { return []string{`sinkless`} }
// Next implements the TestFeed interface.
func (c *sinklessFeed) Next() (*cdctest.TestFeedMessage, error) {
defer time.AfterFunc(timeout(), func() {
c.t.Logf("sinkless feed closing connection due to timeout (%s): %s", timeout(), c.create)
_ = c.conn.Close(context.Background())
}).Stop()

Expand Down Expand Up @@ -242,22 +257,23 @@ func (c *sinklessFeed) start() (err error) {
return err
}

create := c.create
if !c.latestResolved.IsEmpty() {
// NB: The TODO in Next means c.latestResolved is currently never set for
// non-json feeds.
if strings.Contains(create, `WITH`) {
create += fmt.Sprintf(`, cursor='%s'`, c.latestResolved.AsOfSystemTime())
if strings.Contains(c.create, `WITH`) {
c.create += fmt.Sprintf(`, cursor='%s'`, c.latestResolved.AsOfSystemTime())
} else {
create += fmt.Sprintf(` WITH cursor='%s'`, c.latestResolved.AsOfSystemTime())
c.create += fmt.Sprintf(` WITH cursor='%s'`, c.latestResolved.AsOfSystemTime())
}
}
c.rows, err = c.conn.Query(context.Background(), create, c.args...)
c.t.Logf("sinkless feed creating changefeed: %s", c.create)
c.rows, err = c.conn.Query(context.Background(), c.create, c.args...)
return err
}

// Close implements the TestFeed interface.
func (c *sinklessFeed) Close() error {
c.t.Logf("closing sinkless feed")
c.rows = nil
return c.conn.Close(context.Background())
}
Expand Down

0 comments on commit 8051c01

Please sign in to comment.