Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
75519: cli: add tracing to `debug send-kv-batch` r=andreimatei,erikgrinaker a=knz

This extends the `debug send-kv-batch` CLI utility to optionally
collect and print a trace of the remote processing.

(big thanks to @andreimatei for the help)



76213: spanconfig: replace roachpb.SpanConfigEntry in package spanconfig  r=arulajmani a=arulajmani

See individual commits for details. 

Polished up and carved out from #76138. 

In the next patch, we'll modify the existing span configuration RPC arguments to something similar in the draft PR above. Then, we'll make `spanconfig.Target` a union over system targets and spans (instead of the alias for roachpb.Spans as in this PR). We'll also add all the encoding and decoding methods for system targets that we've seen in prior draft patches. 

76254: backupccl: flip `bulkio.backup.split_keys_on_timestamps` to true r=dt a=adityamaru

Release note (sql change): Release note (sql change): BACKUPs of ranges containing extremely
large numbers of revisions to a single row no longer fail with
errors related to exceeding size limit.

76265: changefeedccl: Increase message size limits for kafka sink. r=miretskiy a=miretskiy

Sarama library, used by kafka sink, limits the maximum message
sizes locally. When those limits are exceeded, sarama library
returns confusing error message which seems to imply that the remote
kafka server rejected the message, even though this rejection happened
locally:
   `kafka server: Message was too large, server rejected it to avoid allocation error.`

This PR addresses the problem by increasing sarama limits to 2GB
(max int32).

An alternative approach was to extend `kafka_sink_config` to specify
maximum message size.  However, this alternative is less desirable.
For one, the user supplied configuration can run afoul other limits
imposed by sarama library (e.g. `MaxRequestSize`), so more configuration
option must be added.  In addition, this really exposes very low level
implementation details in the sarama library -- something that we
probably should not do.

Fixes #76258

Release Notes (enterprise change): Kafka sink supports larger messages,
up to 2GB in size.

Co-authored-by: Raphael 'kena' Poss <[email protected]>
Co-authored-by: arulajmani <[email protected]>
Co-authored-by: Aditya Maru <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
5 people committed Feb 9, 2022
5 parents a3cfd63 + c6d292d + f38d556 + 532570d + e109b4b commit 90629ae
Show file tree
Hide file tree
Showing 51 changed files with 1,565 additions and 1,255 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ var (
settings.TenantWritable,
"bulkio.backup.split_keys_on_timestamps",
"split backup data on timestamps when writing revision history",
false,
true,
)
)

Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6946,8 +6946,7 @@ INSERT INTO baz.bar VALUES (110, 'a'), (210, 'b'), (310, 'c'), (410, 'd'), (510,
systemDB.Exec(t, `SET CLUSTER SETTING kv.bulk_sst.target_size='10b'`)
systemDB.Exec(t, `SET CLUSTER SETTING kv.bulk_sst.max_allowed_overage='10b'`)

// Allow mid key breaks for the tennant to verify timestamps on resume.
tenant10.Exec(t, `SET CLUSTER SETTING bulkio.backup.split_keys_on_timestamps = true`)
// Test mid key breaks for the tenant to verify timestamps on resume.
tenant10.Exec(t, `UPDATE baz.bar SET v = 'z' WHERE i = 210`)
tenant10.Exec(t, `BACKUP DATABASE baz TO 'userfile://defaultdb.myfililes/test4' with revision_history`)
expected = nil
Expand Down
13 changes: 13 additions & 0 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"math"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -56,6 +57,12 @@ func init() {
ctx := context.Background()
ctx = logtags.AddTag(ctx, "kafka-producer", nil)
sarama.Logger = &kafkaLogAdapter{ctx: ctx}

// Sarama should not be rejecting messages based on some arbitrary limits.
// This sink already manages its resource usage. Sarama should attempt to deliver
// messages, no matter their size. Of course, the downstream kafka may reject
// those messages, but this rejection should not be done locally.
sarama.MaxRequestSize = math.MaxInt32
}

// kafkaClient is a small interface restricting the functionality in sarama.Client
Expand Down Expand Up @@ -442,6 +449,12 @@ func (j *jsonDuration) UnmarshalJSON(b []byte) error {

// Apply configures provided kafka configuration struct based on this config.
func (c *saramaConfig) Apply(kafka *sarama.Config) error {
// Sarama limits the size of each message to be MaxMessageSize (1MB) bytes.
// This is silly; This sink already manages its memory, and therefore, if we
// had enough resources to ingest and process this message, then sarama shouldn't
// get in a way. Set this limit to be just a bit under maximum request size.
kafka.Producer.MaxMessageBytes = int(sarama.MaxRequestSize - 1)

kafka.Producer.Flush.Bytes = c.Flush.Bytes
kafka.Producer.Flush.Messages = c.Flush.Messages
kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency)
Expand Down
67 changes: 23 additions & 44 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,35 +456,47 @@ func (c *Connector) TokenBucket(
return nil, ctx.Err()
}

// GetSpanConfigEntriesFor implements the spanconfig.KVAccessor interface.
func (c *Connector) GetSpanConfigEntriesFor(
ctx context.Context, spans []roachpb.Span,
) (entries []roachpb.SpanConfigEntry, _ error) {
// GetSpanConfigRecords implements the spanconfig.KVAccessor interface.
func (c *Connector) GetSpanConfigRecords(
ctx context.Context, targets []spanconfig.Target,
) (records []spanconfig.Record, _ error) {
if err := c.withClient(ctx, func(ctx context.Context, c *client) error {
spans := make([]roachpb.Span, 0, len(targets))
for _, target := range targets {
spans = append(spans, *target.GetSpan())
}
resp, err := c.GetSpanConfigs(ctx, &roachpb.GetSpanConfigsRequest{
Spans: spans,
})
if err != nil {
return err
}

entries = resp.SpanConfigEntries
records = spanconfig.EntriesToRecords(resp.SpanConfigEntries)
return nil
}); err != nil {
return nil, err
}
return entries, nil
return records, nil
}

// UpdateSpanConfigEntries implements the spanconfig.KVAccessor
// UpdateSpanConfigRecords implements the spanconfig.KVAccessor
// interface.
func (c *Connector) UpdateSpanConfigEntries(
ctx context.Context, toDelete []roachpb.Span, toUpsert []roachpb.SpanConfigEntry,
func (c *Connector) UpdateSpanConfigRecords(
ctx context.Context, toDelete []spanconfig.Target, toUpsert []spanconfig.Record,
) error {
spansToDelete := make([]roachpb.Span, 0, len(toDelete))
for _, toDel := range toDelete {
spansToDelete = append(spansToDelete, roachpb.Span(toDel))
}

entriesToUpsert := spanconfig.RecordsToSpanConfigEntries(toUpsert)

return c.withClient(ctx, func(ctx context.Context, c *client) error {

_, err := c.UpdateSpanConfigs(ctx, &roachpb.UpdateSpanConfigsRequest{
ToDelete: toDelete,
ToUpsert: toUpsert,
ToDelete: spansToDelete,
ToUpsert: entriesToUpsert,
})
return err
})
Expand All @@ -495,39 +507,6 @@ func (c *Connector) WithTxn(context.Context, *kv.Txn) spanconfig.KVAccessor {
panic("not applicable")
}

// GetSystemSpanConfigEntries implements the spanconfig.KVAccessor interface.
func (c *Connector) GetSystemSpanConfigEntries(
ctx context.Context,
) (entries []roachpb.SystemSpanConfigEntry, _ error) {
if err := c.withClient(ctx, func(ctx context.Context, c *client) error {
resp, err := c.GetSystemSpanConfigs(ctx, &roachpb.GetSystemSpanConfigsRequest{})
if err != nil {
return err
}

entries = resp.SystemSpanConfigEntries
return nil
}); err != nil {
return nil, err
}
return entries, nil
}

// UpdateSystemSpanConfigEntries implements the spanconfig.KVAccessor interface.
func (c *Connector) UpdateSystemSpanConfigEntries(
ctx context.Context,
toDelete []roachpb.SystemSpanConfigTarget,
toUpsert []roachpb.SystemSpanConfigEntry,
) error {
return c.withClient(ctx, func(ctx context.Context, c *client) error {
_, err := c.UpdateSystemSpanConfigs(ctx, &roachpb.UpdateSystemSpanConfigsRequest{
ToDelete: toDelete,
ToUpsert: toUpsert,
})
return err
})
}

// withClient is a convenience wrapper that executes the given closure while
// papering over InternalClient retrieval errors.
func (c *Connector) withClient(
Expand Down
12 changes: 0 additions & 12 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,6 @@ func (m *mockServer) UpdateSpanConfigs(
panic("unimplemented")
}

func (m *mockServer) GetSystemSpanConfigs(
context.Context, *roachpb.GetSystemSpanConfigsRequest,
) (*roachpb.GetSystemSpanConfigsResponse, error) {
panic("unimplemented")
}

func (m *mockServer) UpdateSystemSpanConfigs(
context.Context, *roachpb.UpdateSystemSpanConfigsRequest,
) (*roachpb.UpdateSystemSpanConfigsResponse, error) {
panic("unimplemented")
}

func gossipEventForClusterID(clusterID uuid.UUID) *roachpb.GossipSubscriptionEvent {
return &roachpb.GossipSubscriptionEvent{
Key: gossip.KeyClusterID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ func TestPreSeedSpanConfigsWrittenWhenActive(t *testing.T) {
tenantSeedSpan := roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.Next()}

{
entries, err := scKVAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{
tenantSpan,
records, err := scKVAccessor.GetSpanConfigRecords(ctx, []spanconfig.Target{
spanconfig.MakeSpanTarget(tenantSpan),
})
require.NoError(t, err)
require.Len(t, entries, 1)
require.Equal(t, entries[0].Span, tenantSeedSpan)
require.Len(t, records, 1)
require.Equal(t, *records[0].Target.GetSpan(), tenantSeedSpan)
}
}

Expand Down Expand Up @@ -106,7 +106,9 @@ func TestSeedTenantSpanConfigs(t *testing.T) {

tenantID := roachpb.MakeTenantID(10)
tenantPrefix := keys.MakeTenantPrefix(tenantID)
tenantSpan := roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.PrefixEnd()}
tenantTarget := spanconfig.MakeSpanTarget(
roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.PrefixEnd()},
)
tenantSeedSpan := roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.Next()}
{
_, err := ts.StartTenant(ctx, base.TestTenantArgs{
Expand All @@ -123,31 +125,31 @@ func TestSeedTenantSpanConfigs(t *testing.T) {
require.NoError(t, err)
}

{ // Ensure that no span config entries are to be found
entries, err := scKVAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{
tenantSpan,
{ // Ensure that no span config records are to be found
records, err := scKVAccessor.GetSpanConfigRecords(ctx, []spanconfig.Target{
tenantTarget,
})
require.NoError(t, err)
require.Empty(t, entries)
require.Empty(t, records)
}

tdb.Exec(t,
"SET CLUSTER SETTING version = $1",
clusterversion.ByKey(clusterversion.SeedTenantSpanConfigs).String(),
)

{ // Ensure that the tenant now has a span config entry.
entries, err := scKVAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{
tenantSpan,
{ // Ensure that the tenant now has a span config record.
records, err := scKVAccessor.GetSpanConfigRecords(ctx, []spanconfig.Target{
tenantTarget,
})
require.NoError(t, err)
require.Len(t, entries, 1)
require.Equal(t, entries[0].Span, tenantSeedSpan)
require.Len(t, records, 1)
require.Equal(t, *records[0].Target.GetSpan(), tenantSeedSpan)
}
}

// TestSeedTenantSpanConfigsWithExistingEntry tests that the migration ignores
// tenants with existing span config entries.
// tenants with existing span config records.
func TestSeedTenantSpanConfigsWithExistingEntry(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -173,7 +175,9 @@ func TestSeedTenantSpanConfigsWithExistingEntry(t *testing.T) {

tenantID := roachpb.MakeTenantID(10)
tenantPrefix := keys.MakeTenantPrefix(tenantID)
tenantSpan := roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.PrefixEnd()}
tenantTarget := spanconfig.MakeSpanTarget(
roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.PrefixEnd()},
)
tenantSeedSpan := roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.Next()}
{
_, err := ts.StartTenant(ctx, base.TestTenantArgs{
Expand All @@ -190,13 +194,13 @@ func TestSeedTenantSpanConfigsWithExistingEntry(t *testing.T) {
require.NoError(t, err)
}

{ // Ensure that the tenant already has a span config entry.
entries, err := scKVAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{
tenantSpan,
{ // Ensure that the tenant already has a span config record.
records, err := scKVAccessor.GetSpanConfigRecords(ctx, []spanconfig.Target{
tenantTarget,
})
require.NoError(t, err)
require.Len(t, entries, 1)
require.Equal(t, entries[0].Span, tenantSeedSpan)
require.Len(t, records, 1)
require.Equal(t, *records[0].Target.GetSpan(), tenantSeedSpan)
}

// Ensure the cluster version bump goes through successfully.
Expand All @@ -205,12 +209,12 @@ func TestSeedTenantSpanConfigsWithExistingEntry(t *testing.T) {
clusterversion.ByKey(clusterversion.SeedTenantSpanConfigs).String(),
)

{ // Ensure that the tenant's span config entry stay as it was.
entries, err := scKVAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{
tenantSpan,
{ // Ensure that the tenant's span config record stay as it was.
records, err := scKVAccessor.GetSpanConfigRecords(ctx, []spanconfig.Target{
tenantTarget,
})
require.NoError(t, err)
require.Len(t, entries, 1)
require.Equal(t, entries[0].Span, tenantSeedSpan)
require.Len(t, records, 1)
require.Equal(t, *records[0].Target.GetSpan(), tenantSeedSpan)
}
}
16 changes: 9 additions & 7 deletions pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,18 @@ func TestDataDriven(t *testing.T) {
}
return nil
})
entries, err := kvAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{keys.EverythingSpan})
records, err := kvAccessor.GetSpanConfigRecords(
ctx, []spanconfig.Target{spanconfig.MakeSpanTarget(keys.EverythingSpan)},
)
require.NoError(t, err)
sort.Slice(entries, func(i, j int) bool {
return entries[i].Span.Key.Compare(entries[j].Span.Key) < 0
sort.Slice(records, func(i, j int) bool {
return records[i].Target.Less(records[j].Target)
})

lines := make([]string, len(entries))
for i, entry := range entries {
lines[i] = fmt.Sprintf("%-42s %s", entry.Span,
spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(entry.Config))
lines := make([]string, len(records))
for i, record := range records {
lines[i] = fmt.Sprintf("%-42s %s", record.Target.GetSpan().String(),
spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(record.Config))
}
return spanconfigtestutils.MaybeLimitAndOffset(t, d, "...", lines)

Expand Down
24 changes: 12 additions & 12 deletions pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,31 +159,31 @@ func TestDataDriven(t *testing.T) {
}

sqlTranslator := tenant.SpanConfigSQLTranslator().(spanconfig.SQLTranslator)
entries, _, err := sqlTranslator.Translate(ctx, descpb.IDs{objID})
records, _, err := sqlTranslator.Translate(ctx, descpb.IDs{objID})
require.NoError(t, err)
sort.Slice(entries, func(i, j int) bool {
return entries[i].Span.Key.Compare(entries[j].Span.Key) < 0
sort.Slice(records, func(i, j int) bool {
return records[i].Target.Less(records[j].Target)
})

var output strings.Builder
for _, entry := range entries {
output.WriteString(fmt.Sprintf("%-42s %s\n", entry.Span,
spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(entry.Config)))
for _, record := range records {
output.WriteString(fmt.Sprintf("%-42s %s\n", *record.Target.GetSpan(),
spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(record.Config)))
}
return output.String()

case "full-translate":
sqlTranslator := tenant.SpanConfigSQLTranslator().(spanconfig.SQLTranslator)
entries, _, err := spanconfig.FullTranslate(ctx, sqlTranslator)
records, _, err := spanconfig.FullTranslate(ctx, sqlTranslator)
require.NoError(t, err)

sort.Slice(entries, func(i, j int) bool {
return entries[i].Span.Key.Compare(entries[j].Span.Key) < 0
sort.Slice(records, func(i, j int) bool {
return records[i].Target.Less(records[j].Target)
})
var output strings.Builder
for _, entry := range entries {
output.WriteString(fmt.Sprintf("%-42s %s\n", entry.Span,
spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(entry.Config)))
for _, record := range records {
output.WriteString(fmt.Sprintf("%-42s %s\n", *record.Target.GetSpan(),
spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(record.Config)))
}
return output.String()

Expand Down
1 change: 1 addition & 0 deletions pkg/cli/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func initCLIDefaults() {
setUserfileContextDefaults()
setCertContextDefaults()
setDebugRecoverContextDefaults()
setDebugSendKVBatchContextDefaults()

initPreFlagsDefaults()

Expand Down
8 changes: 8 additions & 0 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1700,6 +1700,14 @@ func init() {
f.Var(&debugTimeSeriesDumpOpts.format, "format", "output format (text, csv, tsv, raw)")
f.Var(&debugTimeSeriesDumpOpts.from, "from", "oldest timestamp to include (inclusive)")
f.Var(&debugTimeSeriesDumpOpts.to, "to", "newest timestamp to include (inclusive)")

f = debugSendKVBatchCmd.Flags()
f.StringVar(&debugSendKVBatchContext.traceFormat, "trace", debugSendKVBatchContext.traceFormat,
"which format to use for the trace output (off, text, jaeger)")
f.BoolVar(&debugSendKVBatchContext.keepCollectedSpans, "keep-collected-spans", debugSendKVBatchContext.keepCollectedSpans,
"whether to keep the CollectedSpans field on the response, to learn about how traces work")
f.StringVar(&debugSendKVBatchContext.traceFile, "trace-output", debugSendKVBatchContext.traceFile,
"the output file to use for the trace. If left empty, output to stderr.")
}

func initPebbleCmds(cmd *cobra.Command) {
Expand Down
Loading

0 comments on commit 90629ae

Please sign in to comment.