Skip to content

Commit

Permalink
changefeedccl: add option to shard cloudstorage folders
Browse files Browse the repository at this point in the history
Previously we always bucketed cloudstorage paths into 'folders' delineated
by date which described the earliest possible timestamp that could exist
in that folder.  Ex: topic-name/2021-09-13/file.ndjson

A customer with a ton of files wanted the folders to be separated hourly
rather than just into dates.  Ex: topic-name/2021-0-13/04/file.ndjson

This patch adds a `partition_format` query param which allows the
user to elect to use the default behaviour with the "daily" value, split
it by date/hour/ with the "hourly" value, or not partition at all with
the "flat" value

So to solve the customer's issue they would add the query parameter
partition_format="hourly"

Release justification: low risk new ux option

Release note (api change): CREATE CHANGEFEED on a CloudStorage sink now
allows a new query parameter to specify how the file paths are
partitioned, for example partition_format="daily" represents the default
behavior of splitting into dates (2021-05-01/), while
partition_format="hourly" will further partition them them by hour
(2021-05-01/05/), and partition_format="flat" will not partition at all
  • Loading branch information
samiskin committed Sep 15, 2021
1 parent 7d93e04 commit 02d2869
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ const (
SinkParamClientCert = `client_cert`
SinkParamClientKey = `client_key`
SinkParamFileSize = `file_size`
SinkParamPartitionFormat = `partition_format`
SinkParamSchemaTopic = `schema_topic`
SinkParamTLSEnabled = `tls_enabled`
SinkParamSkipTLSVerify = `insecure_tls_skip_verify`
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ func (u *sinkURL) consumeParam(p string) string {
return v
}

func (u *sinkURL) addParam(p string, value string) {
if u.q == nil {
u.q = u.Query()
}
u.q.Add(p, value)
}

func (u *sinkURL) consumeBool(param string, dest *bool) (wasSet bool, err error) {
if paramVal := u.consumeParam(param); paramVal != "" {
wasSet, err := strToBool(paramVal, dest)
Expand Down
31 changes: 24 additions & 7 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,17 @@ const sinkCompressionGzip = "gzip"

var cloudStorageSinkIDAtomic int64

// Files that are emitted can be partitioned by their earliest event time,
// for example being emitted to topic/date/file.ndjson, or further split by hour.
// Note that a file may contain events with timestamps that would normally
// fall under a different partition had they been flushed later.
var partitionDateFormats = map[string]string{
"flat": "/",
"daily": "2006-01-02/",
"hourly": "2006-01-02/15/",
}
var defaultPartitionFormat = partitionDateFormats["daily"]

func makeCloudStorageSink(
ctx context.Context,
u sinkURL,
Expand All @@ -321,10 +332,6 @@ func makeCloudStorageSink(
}
u.Scheme = strings.TrimPrefix(u.Scheme, `experimental-`)

// Date partitioning is pretty standard, so no override for now, but we could
// plumb one down if someone needs it.
const defaultPartitionFormat = `2006-01-02`

sinkID := atomic.AddInt64(&cloudStorageSinkIDAtomic, 1)
s := &cloudStorageSink{
srcID: srcID,
Expand All @@ -337,9 +344,19 @@ func makeCloudStorageSink(
// TODO(dan,ajwerner): Use the jobs framework's session ID once that's available.
jobSessionID: generateChangefeedSessionID(),
}
if timestampOracle != nil {
s.dataFileTs = cloudStorageFormatTime(timestampOracle.inclusiveLowerBoundTS())
s.dataFilePartition = timestampOracle.inclusiveLowerBoundTS().GoTime().Format(s.partitionFormat)

if partitionFormat := u.consumeParam(changefeedbase.SinkParamPartitionFormat); partitionFormat != "" {
dateFormat, ok := partitionDateFormats[partitionFormat]
if !ok {
return nil, errors.Errorf("invalid partition_format of %s", partitionFormat)
}

s.partitionFormat = dateFormat
}

if s.timestampOracle != nil {
s.dataFileTs = cloudStorageFormatTime(s.timestampOracle.inclusiveLowerBoundTS())
s.dataFilePartition = s.timestampOracle.inclusiveLowerBoundTS().GoTime().Format(s.partitionFormat)
}

switch changefeedbase.FormatType(opts[changefeedbase.OptFormat]) {
Expand Down
121 changes: 121 additions & 0 deletions pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sort"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/blobs"
Expand Down Expand Up @@ -70,6 +71,42 @@ func TestCloudStorageSink(t *testing.T) {
return decompressed
}

listLeafDirectories := func(root string) []string {
absRoot := filepath.Join(dir, root)

var folders []string

hasChildDirs := func(path string) bool {
files, err := ioutil.ReadDir(path)
if err != nil {
return false
}
for _, file := range files {
if file.IsDir() {
return true
}
}
return false
}

walkFn := func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if path == absRoot {
return nil
}
if info.IsDir() && !hasChildDirs(path) {
relPath, _ := filepath.Rel(absRoot, path)
folders = append(folders, relPath)
}
return nil
}

require.NoError(t, filepath.Walk(absRoot, walkFn))
return folders
}

// slurpDir returns the contents of every file under root (relative to the
// temp dir created above), sorted by the name of the file.
slurpDir := func(t *testing.T, root string) []string {
Expand Down Expand Up @@ -477,6 +514,90 @@ func TestCloudStorageSink(t *testing.T) {
}, slurpDir(t, dir))
})

t.Run(`partition-formatting`, func(t *testing.T) {
t1 := makeTopic(`t1`)
testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")}
const targetMaxFileSize = 6

opts := opts

timestamps := []time.Time{
time.Date(2000, time.January, 1, 1, 1, 1, 0, time.UTC),
time.Date(2000, time.January, 1, 1, 2, 1, 0, time.UTC),
time.Date(2000, time.January, 1, 2, 1, 1, 0, time.UTC),
time.Date(2000, time.January, 2, 1, 1, 1, 0, time.UTC),
time.Date(2000, time.January, 2, 6, 1, 1, 0, time.UTC),
}

for i, tc := range []struct {
format string
expectedFolders []string
}{
{
"hourly",
[]string{
"2000-01-01/01",
"2000-01-01/02",
"2000-01-02/01",
"2000-01-02/06",
},
},
{
"daily",
[]string{
"2000-01-01",
"2000-01-02",
},
},
{
"flat",
[]string{},
},
{
"", // should fall back to default
[]string{
"2000-01-01",
"2000-01-02",
},
},
} {
t.Run(tc.format, func(t *testing.T) {
sf, err := span.MakeFrontier(testSpan)
require.NoError(t, err)
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}

dir := fmt.Sprintf(`partition-formatting-%d`, i)

sinkUriWithParam := sinkURI(dir, targetMaxFileSize)
sinkUriWithParam.addParam(changefeedbase.SinkParamPartitionFormat, tc.format)
s, err := makeCloudStorageSink(
ctx, sinkUriWithParam, 1,
settings, opts, timestampOracle, externalStorageFromURI, user,
)

require.NoError(t, err)
defer func() { require.NoError(t, s.Close()) }()
s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID.

for i, timestamp := range timestamps {
hlcTime := ts(timestamp.UnixNano())

// Move the frontier and flush to update the dataFilePartition value
_, err = sf.Forward(testSpan, hlcTime)
require.NoError(t, err)
require.NoError(t, s.Flush(ctx))

require.NoError(t, s.EmitRow(ctx, t1,
noKey, []byte(fmt.Sprintf(`v%d`, i)), hlcTime, zeroAlloc))
}

require.NoError(t, s.Flush(ctx)) // Flush the last file
require.ElementsMatch(t, tc.expectedFolders, listLeafDirectories(dir))
require.Equal(t, []string{"v0\n", "v1\n", "v2\n", "v3\n", "v4\n"}, slurpDir(t, dir))
})
}
})

t.Run(`file-ordering`, func(t *testing.T) {
t1 := makeTopic(`t1`)
testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")}
Expand Down

0 comments on commit 02d2869

Please sign in to comment.