Skip to content

Commit

Permalink
objstorage: allow attaching of shared objects
Browse files Browse the repository at this point in the history
This commit introduces the concept of a `SharedObjectBacking`. This is
an opaque encoding of the metadata necessary to access a shared object
(specifically creator ID and creator file num). The backing can be
extracted for a shared object on an instance and then used to attach
the object to another instance. This will be used for rebalancing.
  • Loading branch information
RaduBerinde committed Feb 28, 2023
1 parent 8285e8d commit 63a8366
Show file tree
Hide file tree
Showing 6 changed files with 364 additions and 8 deletions.
4 changes: 2 additions & 2 deletions objstorage/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ type ObjectMetadata struct {
type CreatorID = sharedobjcat.CreatorID

// IsShared returns true if the object is on shared storage.
func (m *ObjectMetadata) IsShared() bool {
return m.Shared.CreatorID.IsSet()
func (meta *ObjectMetadata) IsShared() bool {
return meta.Shared.CreatorID.IsSet()
}

// Open creates the Provider.
Expand Down
45 changes: 44 additions & 1 deletion objstorage/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package objstorage

import (
"fmt"
"strings"
"testing"

"github.com/cockroachdb/datadriven"
Expand All @@ -25,6 +26,7 @@ func TestProvider(t *testing.T) {
})

providers := make(map[string]*Provider)
backings := make(map[string]SharedObjectBacking)
var curProvider *Provider
datadriven.RunTest(t, "testdata/provider", func(t *testing.T, d *datadriven.TestData) string {
scanArgs := func(desc string, args ...interface{}) {
Expand Down Expand Up @@ -106,7 +108,48 @@ func TestProvider(t *testing.T) {

case "list":
for _, meta := range curProvider.List() {
log.Infof("%s", curProvider.Path(meta))
log.Infof("%s -> %s", meta.FileNum, curProvider.Path(meta))
}
return log.String()

case "save-backing":
var key string
var fileNum base.FileNum
scanArgs("<key> <file-num>", &key, &fileNum)
meta, err := curProvider.Lookup(base.FileTypeTable, fileNum)
require.NoError(t, err)
backing, err := meta.SharedObjectBacking()
if err != nil {
return err.Error()
}
backings[key] = backing
return log.String()

case "attach":
lines := strings.Split(d.Input, "\n")
if len(lines) == 0 {
d.Fatalf(t, "at least one row expected; format: <key> <file-num>")
}
var objs []SharedObjectToAttach
for _, l := range lines {
var key string
var fileNum base.FileNum
_, err := fmt.Sscan(l, &key, &fileNum)
require.NoError(t, err)
backing, ok := backings[key]
if !ok {
d.Fatalf(t, "unknown backing key %q", key)
}
objs = append(objs, SharedObjectToAttach{
FileType: base.FileTypeTable,
FileNum: fileNum,
Backing: backing,
})
}
metas, err := curProvider.AttachSharedObjects(objs)
require.NoError(t, err)
for _, meta := range metas {
log.Infof("%s -> %s", meta.FileNum, curProvider.Path(meta))
}
return log.String()

Expand Down
5 changes: 4 additions & 1 deletion objstorage/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ func (p *Provider) sharedPath(meta ObjectMetadata) string {

func sharedObjectName(meta ObjectMetadata) string {
// TODO(radu): prepend a "shard" value for better distribution within the bucket?
return fmt.Sprintf("%s-%s", meta.Shared.CreatorID, base.MakeFilename(meta.FileType, meta.FileNum))
return fmt.Sprintf(
"%s-%s",
meta.Shared.CreatorID, base.MakeFilename(meta.FileType, meta.Shared.CreatorFileNum),
)
}

func (p *Provider) sharedCreate(
Expand Down
145 changes: 145 additions & 0 deletions objstorage/shared_backing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package objstorage

import (
"bytes"
"encoding/binary"
"io"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/objstorage/sharedobjcat"
)

// SharedObjectBacking encodes the metadata necessary to incorporate a shared
// object into a different Pebble instance.
type SharedObjectBacking []byte

const (
tagCreatorID = 1
tagCreatorFileNum = 2

// Any new tags that don't have the tagNotSafeToIgnoreMask bit set must be
// followed by the length of the data (so they can be skipped).

// Any new tags that have the tagNotSafeToIgnoreMask bit set cause errors if
// they are encountered by earlier code that doesn't know the tag.
tagNotSafeToIgnoreMask = 64
)

// SharedObjectBacking encodes the shared object metadata.
func (meta *ObjectMetadata) SharedObjectBacking() (SharedObjectBacking, error) {
if !meta.IsShared() {
return nil, errors.AssertionFailedf("object %s not on shared storage", meta.FileNum)
}

buf := make([]byte, 0, binary.MaxVarintLen64*4)
buf = binary.AppendUvarint(buf, tagCreatorID)
buf = binary.AppendUvarint(buf, uint64(meta.Shared.CreatorID))
buf = binary.AppendUvarint(buf, tagCreatorFileNum)
buf = binary.AppendUvarint(buf, uint64(meta.Shared.CreatorFileNum))
return buf, nil
}

// fromSharedObjectBacking decodes the shared object metadata.
func fromSharedObjectBacking(
fileType base.FileType, fileNum base.FileNum, buf SharedObjectBacking,
) (ObjectMetadata, error) {
var creatorID uint64
var creatorFileNum uint64
br := bytes.NewReader(buf)
for {
tag, err := binary.ReadUvarint(br)
if err == io.EOF {
break
}
if err != nil {
return ObjectMetadata{}, err
}
switch tag {
case tagCreatorID:
creatorID, err = binary.ReadUvarint(br)

case tagCreatorFileNum:
creatorFileNum, err = binary.ReadUvarint(br)

// TODO(radu): encode file type as well?

default:
// Ignore unknown tags, unless they're not safe to ignore.
if tag&tagNotSafeToIgnoreMask != 0 {
return ObjectMetadata{}, errors.Newf("unknown tag %d", tag)
}
var dataLen uint64
dataLen, err = binary.ReadUvarint(br)
if err == nil {
_, err = br.Seek(int64(dataLen), io.SeekCurrent)
}
}
if err != nil {
return ObjectMetadata{}, err
}
}
if creatorID == 0 {
return ObjectMetadata{}, errors.Newf("shared object backing missing creator ID")
}
if creatorFileNum == 0 {
return ObjectMetadata{}, errors.Newf("shared object backing missing creator file num")
}
meta := ObjectMetadata{
FileNum: fileNum,
FileType: fileType,
}
meta.Shared.CreatorID = CreatorID(creatorID)
meta.Shared.CreatorFileNum = base.FileNum(creatorFileNum)
return meta, nil
}

// SharedObjectToAttach contains the arguments needed to attach an existing shared object.
type SharedObjectToAttach struct {
// FileNum is the file number that will be used to refer to this object (in
// the context of this instance).
FileNum base.FileNum
FileType base.FileType
// Backing contains the metadata for the share dobject backing (normally
// generated from a different instance).
Backing SharedObjectBacking
}

// AttachSharedObjects registers existing shared objects with this provider.
func (p *Provider) AttachSharedObjects(objs []SharedObjectToAttach) ([]ObjectMetadata, error) {
metas := make([]ObjectMetadata, len(objs))
for i, o := range objs {
meta, err := fromSharedObjectBacking(o.FileType, o.FileNum, o.Backing)
if err != nil {
return nil, err
}
metas[i] = meta
}

func() {
p.mu.Lock()
defer p.mu.Unlock()
for _, meta := range metas {
p.mu.shared.catalogBatch.AddObject(sharedobjcat.SharedObjectMetadata{
FileNum: meta.FileNum,
FileType: meta.FileType,
CreatorID: meta.Shared.CreatorID,
CreatorFileNum: meta.Shared.CreatorFileNum,
})
}
}()
if err := p.sharedSync(); err != nil {
return nil, err
}

p.mu.Lock()
defer p.mu.Unlock()
for _, meta := range metas {
p.mu.knownObjects[meta.FileNum] = meta
}
return metas, nil
}
47 changes: 47 additions & 0 deletions objstorage/shared_backing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package objstorage

import (
"encoding/binary"
"testing"

"github.com/cockroachdb/pebble/internal/base"
"github.com/stretchr/testify/require"
)

func TestSharedObjectBacking(t *testing.T) {
meta := ObjectMetadata{
FileNum: 1,
FileType: base.FileTypeTable,
}
meta.Shared.CreatorID = 100
meta.Shared.CreatorFileNum = 200

buf, err := meta.SharedObjectBacking()
require.NoError(t, err)

meta1, err := fromSharedObjectBacking(meta.FileType, meta.FileNum, buf)
require.NoError(t, err)
require.Equal(t, meta, meta1)

t.Run("unknown-tags", func(t *testing.T) {
// Append a tag that is safe to ignore.
buf2 := buf
buf2 = binary.AppendUvarint(buf2, 13)
buf2 = binary.AppendUvarint(buf2, 2)
buf2 = append(buf2, 1, 1)

meta2, err := fromSharedObjectBacking(meta.FileType, meta.FileNum, buf2)
require.NoError(t, err)
require.Equal(t, meta, meta2)

buf3 := buf2
buf3 = binary.AppendUvarint(buf3, tagNotSafeToIgnoreMask+5)
_, err = fromSharedObjectBacking(meta.FileType, meta.FileNum, buf3)
require.Error(t, err)
require.Contains(t, err.Error(), "unknown tag")
})
}
Loading

0 comments on commit 63a8366

Please sign in to comment.