Skip to content

Commit

Permalink
fix(storage): fix gRPC generation/condition issues
Browse files Browse the repository at this point in the history
Fixes issues in piping through generation and preconditions
for object Compose and Update. Adds a test against the emulator
which validates this logic across both transports.

I'll add another test for bucket preconditions in a separate
PR.
  • Loading branch information
tritone committed Aug 8, 2023
1 parent 8c46711 commit d2c2490
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 6 deletions.
188 changes: 188 additions & 0 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,194 @@ func TestHMACKeyCRUDEmulated(t *testing.T) {
})
}

func TestObjectConditionsEmulated(t *testing.T) {
transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) {
ctx := context.Background()

// Create test bucket
if _, err := client.CreateBucket(context.Background(), project, bucket, &BucketAttrs{Name: bucket}); err != nil {
t.Fatalf("client.CreateBucket: %v", err)
}

cases := []struct {
name string
call func() error
}{
{
name: "Object update generation",
call: func() error {
objName, gen, _, err := createObject(ctx, bucket)
if err != nil {
return fmt.Errorf("creating object: %w", err)
}
uattrs := &ObjectAttrsToUpdate{CustomTime: time.Now()}
_, err = client.UpdateObject(ctx, bucket, objName, uattrs, gen, nil, nil)
return err
},
},
{
name: "Object update ifMetagenerationMatch",
call: func() error {
objName, gen, metaGen, err := createObject(ctx, bucket)
if err != nil {
return fmt.Errorf("creating object: %w", err)
}
uattrs := &ObjectAttrsToUpdate{CustomTime: time.Now()}
conds := &Conditions{
GenerationMatch: gen,
MetagenerationMatch: metaGen,
}
_, err = client.UpdateObject(ctx, bucket, objName, uattrs, gen, nil, conds)
return err
},
},
{
name: "Object write ifGenerationMatch",
call: func() error {
var err error
_, err = client.OpenWriter(&openWriterParams{
ctx: ctx,
chunkSize: 256 * 1024,
chunkRetryDeadline: 0,
bucket: bucket,
attrs: &ObjectAttrs{},
conds: &Conditions{DoesNotExist: true},
encryptionKey: nil,
sendCRC32C: false,
donec: nil,
setError: func(e error) {
if e != nil {
err = e
}
},
progress: nil,
setObj: nil,
})
return err
},
},
{
name: "Object rewrite ifMetagenerationMatch",
call: func() error {
objName, gen, metaGen, err := createObject(ctx, bucket)
if err != nil {
return fmt.Errorf("creating object: %w", err)
}
_, err = client.RewriteObject(ctx, &rewriteObjectRequest{
srcObject: sourceObject{
name: objName,
bucket: bucket,
gen: gen,
conds: &Conditions{
GenerationMatch: gen,
MetagenerationMatch: metaGen,
},
},
dstObject: destinationObject{
name: fmt.Sprintf("%d-object", time.Now().Nanosecond()),
bucket: bucket,
conds: &Conditions{
DoesNotExist: true,
},
attrs: &ObjectAttrs{},
},
})
return err
},
},
{
name: "Object compose ifGenerationMatch",
call: func() error {
obj1, obj1Gen, _, err := createObject(ctx, bucket)
if err != nil {
return fmt.Errorf("creating object: %w", err)
}
obj2, obj2Gen, _, err := createObject(ctx, bucket)
if err != nil {
return fmt.Errorf("creating object: %w", err)
}
_, err = client.ComposeObject(ctx, &composeObjectRequest{
dstBucket: bucket,
dstObject: destinationObject{
name: fmt.Sprintf("%d-object", time.Now().Nanosecond()),
bucket: bucket,
conds: &Conditions{DoesNotExist: true},
attrs: &ObjectAttrs{},
},
srcs: []sourceObject{
{
name: obj1,
bucket: bucket,
gen: obj1Gen,
conds: &Conditions{
GenerationMatch: obj1Gen,
},
},
{
name: obj2,
bucket: bucket,
conds: &Conditions{
GenerationMatch: obj2Gen,
},
},
},
})
return err
},
},
{
name: "Object delete ifGenerationMatch",
call: func() error {
objName, gen, _, err := createObject(ctx, bucket)
if err != nil {
return fmt.Errorf("creating object: %w", err)
}
err = client.DeleteObject(ctx, bucket, objName, gen, &Conditions{GenerationMatch: gen})
return err
},
},
{
name: "Object get ifMetagenerationMatch",
call: func() error {
objName, gen, metaGen, err := createObject(ctx, bucket)
if err != nil {
return fmt.Errorf("creating object: %w", err)
}
_, err = client.GetObject(ctx, bucket, objName, gen, nil, &Conditions{GenerationMatch: gen, MetagenerationMatch: metaGen})
return err
},
},
}
for _, c := range cases {
t.Run(c.name, func(r *testing.T) {
if err := c.call(); err != nil {
r.Errorf("error: %v", err)
}
})
}
})
}

// createObject creates an object in the emulator and returns its name, generation, and
// metageneration.
func createObject(ctx context.Context, bucket string) (string, int64, int64, error) {
prefix := time.Now().Nanosecond()
objName := fmt.Sprintf("%d-object", prefix)

w := veneerClient.Bucket(bucket).Object(objName).NewWriter(ctx)
if _, err := w.Write(randomBytesToWrite); err != nil {
return "", 0, 0, fmt.Errorf("failed to populate test data: %w", err)
}
if err := w.Close(); err != nil {
return "", 0, 0, fmt.Errorf("closing object: %v", err)
}
attrs, err := veneerClient.Bucket(bucket).Object(objName).Attrs(ctx)
if err != nil {
return "", 0, 0, fmt.Errorf("update object: %v", err)
}
return objName, attrs.Generation, attrs.Metageneration, nil
}

// transportClienttest executes the given function with a sub-test, a project name
// based on the transport, a unique bucket name also based on the transport, and
// the transport-specific client to run the test with. It also checks the environment
Expand Down
20 changes: 14 additions & 6 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,11 +512,15 @@ func (c *grpcStorageClient) GetObject(ctx context.Context, bucket, object string
func (c *grpcStorageClient) UpdateObject(ctx context.Context, bucket, object string, uattrs *ObjectAttrsToUpdate, gen int64, encryptionKey []byte, conds *Conditions, opts ...storageOption) (*ObjectAttrs, error) {
s := callSettings(c.settings, opts...)
o := uattrs.toProtoObject(bucketResourceName(globalProjectAlias, bucket), object)
// For Update, generation is passed via the object message rather than a field on the request.
if gen >= 0 {
o.Generation = gen
}
req := &storagepb.UpdateObjectRequest{
Object: o,
PredefinedAcl: uattrs.PredefinedACL,
}
if err := applyCondsProto("grpcStorageClient.UpdateObject", gen, conds, req); err != nil {
if err := applyCondsProto("grpcStorageClient.UpdateObject", defaultGen, conds, req); err != nil {
return nil, err
}
if s.userProject != "" {
Expand Down Expand Up @@ -783,17 +787,18 @@ func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjec

dstObjPb := req.dstObject.attrs.toProtoObject(req.dstBucket)
dstObjPb.Name = req.dstObject.name
if err := applyCondsProto("ComposeObject destination", defaultGen, req.dstObject.conds, dstObjPb); err != nil {
return nil, err
}

if req.sendCRC32C {
dstObjPb.Checksums.Crc32C = &req.dstObject.attrs.CRC32C
}

srcs := []*storagepb.ComposeObjectRequest_SourceObject{}
for _, src := range req.srcs {
srcObjPb := &storagepb.ComposeObjectRequest_SourceObject{Name: src.name}
if err := applyCondsProto("ComposeObject source", src.gen, src.conds, srcObjPb); err != nil {
srcObjPb := &storagepb.ComposeObjectRequest_SourceObject{Name: src.name, ObjectPreconditions: &storagepb.ComposeObjectRequest_SourceObject_ObjectPreconditions{}}
if src.gen >= 0 {
srcObjPb.Generation = src.gen
}
if err := applyCondsProto("ComposeObject source", defaultGen, src.conds, srcObjPb.ObjectPreconditions); err != nil {
return nil, err
}
srcs = append(srcs, srcObjPb)
Expand All @@ -803,6 +808,9 @@ func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjec
Destination: dstObjPb,
SourceObjects: srcs,
}
if err := applyCondsProto("ComposeObject destination", defaultGen, req.dstObject.conds, rawReq); err != nil {
return nil, err
}
if req.predefinedACL != "" {
rawReq.DestinationPredefinedAcl = req.predefinedACL
}
Expand Down

0 comments on commit d2c2490

Please sign in to comment.