Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
84572: obsservice: update README r=andreimatei a=dhartunian

update README to reference the `./dev` command
which includes the UI build.

Release note: None

84678: changefeedccl: De-flake TestChangefeedBackfillCheckpoint test. r=miretskiy a=miretskiy

Fixes cockroachdb#84121

cockroachdb#84007 introduced a change to add a timeout
to test feed library to prevent flaky tests from hanging for a long
time.  This timeout lead to `TestChangefeedBackfillCheckpoint` test
to become flaky.  The main contributor of the slowness of that test
was the fact that the test processes 1000 messages (twice), and
the fact that a `webhook` sink and it's mock sink implementation
are very slow (50+ms per message).

The webhook sink, and mock webhook sink performance will be
addressed separately (cockroachdb#84676)

For now, marginally improve mock webhook sink performance
by detecting when messages become available directly, instead
of relying on resolved timestamps.  Also, significantly increase
the internal test timeout when reading many messages in a unit test.

While troubleshooting this issue, observed large number of
error messages `http: TLS handshake error from 127.0.0.1:34276: EOF`.
The problem is that the webhook sink specified an arbitrary, and
very small default timeout of 3 seconds.  The default in Go
library is 0 -- no timeout; and we should have this default
as well.  Fixes cockroachdb#75745

Release Notes: None

84682: opt: fix crdb_internal.decode_plan_gist to work with unknown index r=rytaft a=rytaft

Release note (bug fix): `crdb_internal.decode_plan_gist` will no longer
produce an internal error when it is used to decode a plan gist for which
no schema information is available.

Co-authored-by: David Hartunian <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Rebecca Taft <[email protected]>
  • Loading branch information
4 people committed Jul 20, 2022
4 parents a3039fe + e30a719 + 2b2bfcb + 61c1938 commit 4a8f1be
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 14 deletions.
19 changes: 19 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/mock_webhook_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type MockWebhookSink struct {
statusCodes []int
statusCodesIndex int
rows []string
notify chan struct{}
}
}

Expand Down Expand Up @@ -144,6 +145,19 @@ func (s *MockWebhookSink) Pop() string {
return ""
}

// NotifyMessage arranges for channel to be closed when message arrives.
func (s *MockWebhookSink) NotifyMessage() chan struct{} {
c := make(chan struct{})
s.mu.Lock()
defer s.mu.Unlock()
if len(s.mu.rows) > 0 {
close(c)
} else {
s.mu.notify = c
}
return c
}

func (s *MockWebhookSink) requestHandler(hw http.ResponseWriter, hr *http.Request) {
method := hr.Method

Expand Down Expand Up @@ -177,7 +191,12 @@ func (s *MockWebhookSink) publish(hw http.ResponseWriter, hr *http.Request) erro
s.mu.numCalls++
if s.mu.statusCodes[s.mu.statusCodesIndex] >= http.StatusOK && s.mu.statusCodes[s.mu.statusCodesIndex] < http.StatusMultipleChoices {
s.mu.rows = append(s.mu.rows, string(row))
if s.mu.notify != nil {
close(s.mu.notify)
s.mu.notify = nil
}
}

hw.WriteHeader(s.mu.statusCodes[s.mu.statusCodesIndex])
s.mu.statusCodesIndex = (s.mu.statusCodesIndex + 1) % len(s.mu.statusCodes)
s.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5608,7 +5608,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
// TODO(ssd): Tenant testing disabled because of use of DB()
for _, sz := range []int64{100 << 20, 100} {
maxCheckpointSize = sz
cdcTestNamedWithSystem(t, fmt.Sprintf("limit=%s", humanize.Bytes(uint64(sz))), testFn, feedTestEnterpriseSinks)
cdcTestNamedWithSystem(t, fmt.Sprintf("limit=%s", humanize.Bytes(uint64(sz))), testFn, feedTestForceSink("webhook"))
}
}

Expand Down
14 changes: 12 additions & 2 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/lib/pq"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -87,18 +88,21 @@ func readNextMessages(
ctx context.Context, f cdctest.TestFeed, numMessages int,
) ([]cdctest.TestFeedMessage, error) {
var actual []cdctest.TestFeedMessage
lastMessage := timeutil.Now()
for len(actual) < numMessages {
if ctx.Err() != nil {
return nil, ctx.Err()
}
m, err := f.Next()
if log.V(1) {
if m != nil {
log.Infof(context.Background(), `msg %s: %s->%s (%s)`, m.Topic, m.Key, m.Value, m.Resolved)
log.Infof(context.Background(), `msg %s: %s->%s (%s) (%s)`,
m.Topic, m.Key, m.Value, m.Resolved, timeutil.Since(lastMessage))
} else {
log.Infof(context.Background(), `err %v`, err)
}
}
lastMessage = timeutil.Now()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -177,8 +181,14 @@ func assertPayloadsBase(
t testing.TB, f cdctest.TestFeed, expected []string, stripTs bool, perKeyOrdered bool,
) {
t.Helper()
timeout := assertPayloadsTimeout()
if len(expected) > 100 {
// Webhook sink is very slow; We have few tests that read 1000 messages.
timeout += 5 * time.Minute
}

require.NoError(t,
withTimeout(f, assertPayloadsTimeout(),
withTimeout(f, timeout,
func(ctx context.Context) error {
return assertPayloadsBaseErr(ctx, f, expected, stripTs, perKeyOrdered)
},
Expand Down
10 changes: 4 additions & 6 deletions pkg/ccl/changefeedccl/sink_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ const (
applicationTypeJSON = `application/json`
applicationTypeCSV = `text/csv`
authorizationHeader = `Authorization`
defaultConnTimeout = 3 * time.Second
)

func isWebhookSink(u *url.URL) bool {
Expand Down Expand Up @@ -303,10 +302,9 @@ func makeWebhookSink(
return nil, errors.Errorf(`this sink requires the WITH %s option`, changefeedbase.OptTopicInValue)
}

connTimeout := opts.ClientTimeout
if connTimeout == nil {
t := defaultConnTimeout
connTimeout = &t
var connTimeout time.Duration
if opts.ClientTimeout != nil {
connTimeout = *opts.ClientTimeout
}

ctx, cancel := context.WithCancel(ctx)
Expand All @@ -328,7 +326,7 @@ func makeWebhookSink(
}

// TODO(yevgeniy): Establish HTTP connection in Dial().
sink.client, err = makeWebhookClient(u, *connTimeout)
sink.client, err = makeWebhookClient(u, connTimeout)
if err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1429,7 +1429,9 @@ func (f *webhookFeedFactory) Feed(create string, args ...interface{}) (cdctest.T

if createStmt.SinkURI == nil {
createStmt.SinkURI = tree.NewStrVal(
fmt.Sprintf("webhook-%s?insecure_tls_skip_verify=true&client_cert=%s&client_key=%s", sinkDest.URL(), base64.StdEncoding.EncodeToString(clientCertPEM), base64.StdEncoding.EncodeToString(clientKeyPEM)))
fmt.Sprintf("webhook-%s?insecure_tls_skip_verify=true&client_cert=%s&client_key=%s",
sinkDest.URL(), base64.StdEncoding.EncodeToString(clientCertPEM),
base64.StdEncoding.EncodeToString(clientKeyPEM)))
}
} else {
sinkDest, err = cdctest.StartMockWebhookSink(cert)
Expand Down Expand Up @@ -1582,6 +1584,7 @@ func (f *webhookFeed) Next() (*cdctest.TestFeedMessage, error) {
case <-time.After(timeout()):
return nil, &contextutil.TimeoutError{}
case <-f.ss.eventReady():
case <-f.mockSink.NotifyMessage():
case <-f.shutdown:
return nil, f.terminalJobError()
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/obsservice/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,24 @@ the future).
Build with

```shell
go build ./pkg/obsservice/cmd/obsservice
./dev build obsservice
```

or
which will include the DB Console UI served on the HTTP port. This adds the
`"--config=with_ui"` bazel flag that embeds the UI.

You can also build without the UI using:

```shell
./dev build pkg/obsservice/cmd/obsservice
```

which will produce a binary in `./bin/obsservice`.

## Running

Assuming you're already running a local CRDB instance:

```shell
obsservice --http-addr=localhost:8081 --crdb-http-url=http://localhost:8080 --ui-cert=certs/cert.pem --ui-cert-key=certs/key.pem --ca-cert=certs/ca.crt
```
Expand Down
2 changes: 1 addition & 1 deletion pkg/obsservice/cmd/obsservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func main() {
RootCmd.PersistentFlags().StringVar(
&sinkPGURL,
"sink-pgurl",
"postgresql://root@andrei-desktop:26257/defaultdb?sslmode=disable",
"postgresql://root@localhost:26257/defaultdb?sslmode=disable",
"PGURL for the sink cluster.")

if err := RootCmd.Execute(); err != nil {
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/explain_gist
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,17 @@ SELECT crdb_internal.decode_plan_gist('$lookup_join_gist')
└── • scan
table: s83537@s83537_pkey
spans: FULL SCAN

# Ensure that we can decode a gist even if we do not have the schema info.
query T
SELECT crdb_internal.decode_plan_gist('AgGSARIAAwlAsJ8BE5IBAhcGFg==')
----
• limit
└── • index join
│ table: ?@?
└── • scan
table: ?@?
spans: 32 spans
limit: 10200
2 changes: 1 addition & 1 deletion pkg/sql/opt/exec/explain/plan_gist_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (u *unknownTable) DeletableIndexCount() int {
}

func (u *unknownTable) Index(i cat.IndexOrdinal) cat.Index {
panic(errors.AssertionFailedf("not implemented"))
return &unknownIndex{}
}

func (u *unknownTable) StatisticCount() int {
Expand Down

0 comments on commit 4a8f1be

Please sign in to comment.