Skip to content

Commit

Permalink
Merge pull request #127680 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.1-127553

release-24.1: changefeedccl: add some debug logging to the test sinkless feed
  • Loading branch information
andyyang890 authored Jul 25, 2024
2 parents 9d08ab4 + 8051c01 commit 2dc562a
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ func makeFeedFactoryWithOptions(
}
sink, cleanup := getInitialSinkForSinklessFactory(t, db, pgURLForUserSinkless)
root, cleanupRoot := pgURLForUserSinkless(username.RootUser)
f := makeSinklessFeedFactory(s, sink, root, pgURLForUserSinkless)
f := makeSinklessFeedFactory(t, s, sink, root, pgURLForUserSinkless)
return f, func() {
cleanup()
cleanupRoot()
Expand Down
30 changes: 23 additions & 7 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"
"sync"
"sync/atomic"
"testing"
"time"

pubsubv1 "cloud.google.com/go/pubsub/apiv1"
Expand Down Expand Up @@ -68,6 +69,7 @@ import (
)

type sinklessFeedFactory struct {
t *testing.T
s serverutils.ApplicationLayerInterface
// postgres url used for creating sinkless changefeeds. This may be the same as
// the rootURL.
Expand All @@ -80,9 +82,19 @@ type sinklessFeedFactory struct {
// makeSinklessFeedFactory returns a TestFeedFactory implementation using the
// `experimental-sql` uri.
func makeSinklessFeedFactory(
s serverutils.ApplicationLayerInterface, sink url.URL, rootConn url.URL, sinkForUser sinkForUser,
t *testing.T,
s serverutils.ApplicationLayerInterface,
sink url.URL,
rootConn url.URL,
sinkForUser sinkForUser,
) cdctest.TestFeedFactory {
return &sinklessFeedFactory{s: s, sink: sink, rootURL: rootConn, sinkForUser: sinkForUser}
return &sinklessFeedFactory{
t: t,
s: s,
sink: sink,
rootURL: rootConn,
sinkForUser: sinkForUser,
}
}

// AsUser executes fn as the specified user.
Expand Down Expand Up @@ -133,6 +145,7 @@ func (f *sinklessFeedFactory) Feed(create string, args ...interface{}) (cdctest.
return nil, err
}
s := &sinklessFeed{
t: f.t,
seenTrackerMap: make(map[string]struct{}),
create: create,
args: args,
Expand Down Expand Up @@ -178,6 +191,7 @@ func (t seenTrackerMap) reset() {
// sinklessFeed is an implementation of the `TestFeed` interface for a
// "sinkless" (results returned over pgwire) feed.
type sinklessFeed struct {
t *testing.T
seenTrackerMap
create string
args []interface{}
Expand All @@ -203,6 +217,7 @@ func (c *sinklessFeed) Partitions() []string { return []string{`sinkless`} }
// Next implements the TestFeed interface.
func (c *sinklessFeed) Next() (*cdctest.TestFeedMessage, error) {
defer time.AfterFunc(timeout(), func() {
c.t.Logf("sinkless feed closing connection due to timeout (%s): %s", timeout(), c.create)
_ = c.conn.Close(context.Background())
}).Stop()

Expand Down Expand Up @@ -242,22 +257,23 @@ func (c *sinklessFeed) start() (err error) {
return err
}

create := c.create
if !c.latestResolved.IsEmpty() {
// NB: The TODO in Next means c.latestResolved is currently never set for
// non-json feeds.
if strings.Contains(create, `WITH`) {
create += fmt.Sprintf(`, cursor='%s'`, c.latestResolved.AsOfSystemTime())
if strings.Contains(c.create, `WITH`) {
c.create += fmt.Sprintf(`, cursor='%s'`, c.latestResolved.AsOfSystemTime())
} else {
create += fmt.Sprintf(` WITH cursor='%s'`, c.latestResolved.AsOfSystemTime())
c.create += fmt.Sprintf(` WITH cursor='%s'`, c.latestResolved.AsOfSystemTime())
}
}
c.rows, err = c.conn.Query(context.Background(), create, c.args...)
c.t.Logf("sinkless feed creating changefeed: %s", c.create)
c.rows, err = c.conn.Query(context.Background(), c.create, c.args...)
return err
}

// Close implements the TestFeed interface.
func (c *sinklessFeed) Close() error {
c.t.Logf("closing sinkless feed")
c.rows = nil
return c.conn.Close(context.Background())
}
Expand Down

0 comments on commit 2dc562a

Please sign in to comment.