Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azeventhubs,azservicebus] Some API cleanup, renames #20754

Merged
merged 9 commits into from
May 8, 2023
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/amqp_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package azeventhubs
import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
)

// AMQPAnnotatedMessage represents the AMQP message, as received from Event Hubs.
Expand Down
8 changes: 4 additions & 4 deletions sdk/messaging/azeventhubs/checkpoint_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type CheckpointStore interface {
// ListOwnership lists all ownerships.
ListOwnership(ctx context.Context, fullyQualifiedNamespace string, eventHubName string, consumerGroup string, options *ListOwnershipOptions) ([]Ownership, error)

// UpdateCheckpoint updates a specific checkpoint with a sequence and offset.
UpdateCheckpoint(ctx context.Context, checkpoint Checkpoint, options *UpdateCheckpointOptions) error
// SetCheckpoint updates a specific checkpoint with a sequence and offset.
SetCheckpoint(ctx context.Context, checkpoint Checkpoint, options *SetCheckpointOptions) error
}

// Ownership tracks which consumer owns a particular partition.
Expand Down Expand Up @@ -59,8 +59,8 @@ type ListOwnershipOptions struct {
// For future expansion
}

// UpdateCheckpointOptions contains optional parameters for the UpdateCheckpoint function
type UpdateCheckpointOptions struct {
// SetCheckpointOptions contains optional parameters for the UpdateCheckpoint function
type SetCheckpointOptions struct {
// For future expansion
}

Expand Down
9 changes: 7 additions & 2 deletions sdk/messaging/azeventhubs/checkpoints/blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
Expand Down Expand Up @@ -63,6 +64,8 @@ func (b *BlobStore) ClaimOwnership(ctx context.Context, partitionOwnership []aze
if bloberror.HasCode(err,
bloberror.ConditionNotMet, // updated before we could update it
bloberror.BlobAlreadyExists) { // created before we could create it

log.Writef(azeventhubs.EventConsumer, "[%s] skipping %s because: %s", po.OwnerID, po.PartitionID, err)
continue
}

Expand Down Expand Up @@ -180,10 +183,10 @@ func (b *BlobStore) ListOwnership(ctx context.Context, fullyQualifiedNamespace s
return ownerships, nil
}

// UpdateCheckpoint updates a specific checkpoint with a sequence and offset.
// SetCheckpoint updates a specific checkpoint with a sequence and offset.
//
// NOTE: This function doesn't attempt to prevent simultaneous checkpoint updates - ownership is assumed.
func (b *BlobStore) UpdateCheckpoint(ctx context.Context, checkpoint azeventhubs.Checkpoint, options *azeventhubs.UpdateCheckpointOptions) error {
func (b *BlobStore) SetCheckpoint(ctx context.Context, checkpoint azeventhubs.Checkpoint, options *azeventhubs.SetCheckpointOptions) error {
blobName, err := nameForCheckpointBlob(checkpoint)

if err != nil {
Expand All @@ -199,6 +202,7 @@ func (b *BlobStore) setOwnershipMetadata(ctx context.Context, blobName string, o
blobClient := b.cc.NewBlockBlobClient(blobName)

if ownership.ETag != nil {
log.Writef(azeventhubs.EventConsumer, "[%s] claiming ownership for %s with etag %s", ownership.OwnerID, ownership.PartitionID, string(*ownership.ETag))
setMetadataResp, err := blobClient.SetMetadata(ctx, blobMetadata, &blob.SetMetadataOptions{
AccessConditions: &blob.AccessConditions{
ModifiedAccessConditions: &blob.ModifiedAccessConditions{
Expand All @@ -214,6 +218,7 @@ func (b *BlobStore) setOwnershipMetadata(ctx context.Context, blobName string, o
return setMetadataResp.LastModified, *setMetadataResp.ETag, nil
}

log.Writef(azeventhubs.EventConsumer, "[%s] claiming ownership for %s with NO etags", ownership.PartitionID, ownership.OwnerID)
uploadResp, err := blobClient.Upload(ctx, streaming.NopCloser(bytes.NewReader([]byte{})), &blockblob.UploadOptions{
Metadata: blobMetadata,
AccessConditions: &blob.AccessConditions{
Expand Down
4 changes: 2 additions & 2 deletions sdk/messaging/azeventhubs/checkpoints/blob_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestBlobStore_Checkpoints(t *testing.T) {
require.NoError(t, err)
require.Empty(t, checkpoints)

err = store.UpdateCheckpoint(context.Background(), azeventhubs.Checkpoint{
err = store.SetCheckpoint(context.Background(), azeventhubs.Checkpoint{
ConsumerGroup: "$Default",
EventHubName: "event-hub-name",
FullyQualifiedNamespace: "ns.servicebus.windows.net",
Expand All @@ -57,7 +57,7 @@ func TestBlobStore_Checkpoints(t *testing.T) {

// There's a code path to allow updating the blob after it's been created but without an etag
// in which case it just updates it.
err = store.UpdateCheckpoint(context.Background(), azeventhubs.Checkpoint{
err = store.SetCheckpoint(context.Background(), azeventhubs.Checkpoint{
ConsumerGroup: "$Default",
EventHubName: "event-hub-name",
FullyQualifiedNamespace: "ns.servicebus.windows.net",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestConsumerClient_Recovery(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, 1, len(events))

t.Logf("[%s] Received seq:%d, offset:%d", sr.PartitionID, events[0].SequenceNumber, *events[0].Offset)
t.Logf("[%s] Received seq:%d, offset:%d", sr.PartitionID, events[0].SequenceNumber, events[0].Offset)

require.Equal(t, fmt.Sprintf("event 1 for partition %s", sr.PartitionID), string(events[0].Body))
}(i, sr)
Expand Down
6 changes: 3 additions & 3 deletions sdk/messaging/azeventhubs/event_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/eh"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
)

// EventData is an event that can be sent, using the ProducerClient, to an Event Hub.
Expand Down Expand Up @@ -52,7 +52,7 @@ type ReceivedEventData struct {
PartitionKey *string

// Offset is the offset of the event.
Offset *int64
Offset int64

// RawAMQPMessage is the AMQP message, as received by the client. This can be useful to get access
// to properties that are not exposed by ReceivedEventData such as payloads encoded into the
Expand Down Expand Up @@ -177,7 +177,7 @@ func updateFromAMQPAnnotations(src *amqp.Message, dest *ReceivedEventData) error
case offsetNumberAnnotation:
if offsetStr, ok := v.(string); ok {
if offset, err := strconv.ParseInt(offsetStr, 10, 64); err == nil {
dest.Offset = &offset
dest.Offset = offset
continue
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/event_data_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/internal/uuid"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
)

// ErrEventDataTooLarge is returned when a message cannot fit into a batch when using the [azeventhubs.EventDataBatch.AddEventData] function.
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/event_data_batch_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/mock"
"github.com/Azure/go-amqp"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
)
Expand Down
6 changes: 3 additions & 3 deletions sdk/messaging/azeventhubs/event_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
"github.com/stretchr/testify/require"
)

Expand All @@ -20,7 +20,7 @@ func TestEventData_Annotations(t *testing.T) {
require.Empty(t, re.Body)
require.Nil(t, re.EnqueuedTime)
require.Equal(t, int64(0), re.SequenceNumber)
require.Nil(t, re.Offset)
require.Zero(t, re.Offset)
require.Nil(t, re.PartitionKey)
})

Expand Down Expand Up @@ -99,7 +99,7 @@ func TestEventData_newReceivedEventData(t *testing.T) {
SystemProperties: map[string]any{
"hello": "world",
},
Offset: to.Ptr[int64](102),
Offset: int64(102),
PartitionKey: to.Ptr("partition key"),
RawAMQPMessage: &AMQPAnnotatedMessage{
Properties: &AMQPAnnotatedMessageProperties{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func Example_migrateCheckpoints() {
newCheckpoint.Offset = &offset
newCheckpoint.SequenceNumber = &oldCheckpoint.Checkpoint.SequenceNumber

if err := newCheckpointStore.UpdateCheckpoint(context.Background(), newCheckpoint, nil); err != nil {
if err := newCheckpointStore.SetCheckpoint(context.Background(), newCheckpoint, nil); err != nil {
panic(err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func processEventsForPartition(partitionClient *azeventhubs.ProcessorPartitionCl

// Updates the checkpoint with the latest event received. If processing needs to restart
// it will restart from this point, automatically.
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
Expand All @@ -154,7 +154,7 @@ func shutdownPartitionResources(partitionClient *azeventhubs.ProcessorPartitionC
defer partitionClient.Close(context.TODO())
}

func createClientsForExample(eventHubConnectionString, eventHubName, storageConnectionString, storageContainerName string) (*azeventhubs.ConsumerClient, *checkpoints.BlobStore, error) {
func createClientsForExample(eventHubConnectionString, eventHubName, storageConnectionString, storageContainerName string) (*azeventhubs.ConsumerClient, azeventhubs.CheckpointStore, error) {
// NOTE: the storageContainerName must exist before the checkpoint store can be used.
azBlobContainerClient, err := container.NewClientFromConnectionString(storageConnectionString, storageContainerName, nil)

Expand Down
3 changes: 2 additions & 1 deletion sdk/messaging/azeventhubs/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/internal v1.2.0
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0
github.com/Azure/go-amqp v1.0.0
github.com/golang/mock v1.6.0
github.com/joho/godotenv v1.4.0
github.com/stretchr/testify v1.7.1
nhooyr.io/websocket v1.8.7
nhooyr.io/websocket v1.8.7
)

require (
Expand Down
2 changes: 2 additions & 0 deletions sdk/messaging/azeventhubs/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0/go.mod h1:Y3gnVwfaz8h6L1YHar+NfWORtBoVUSB5h4GlGkdeF7Q=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 h1:u/LLAOFgsMv7HmNL4Qufg58y+qElGOt5qv0z1mURkRY=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0/go.mod h1:2e8rMJtl2+2j+HXbTBwnyGpm5Nou7KhvSfxOq8JpTag=
github.com/Azure/go-amqp v1.0.0 h1:QfCugi1M+4F2JDTRgVnRw7PYXLXZ9hmqk3+9+oJh3OA=
github.com/Azure/go-amqp v1.0.0/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc=
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE=
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
4 changes: 2 additions & 2 deletions sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func Test_InMemoryCheckpointStore_Checkpoints(t *testing.T) {
require.Empty(t, checkpoints)

for i := int64(0); i < 5; i++ {
err = store.UpdateCheckpoint(context.Background(), Checkpoint{
err = store.SetCheckpoint(context.Background(), Checkpoint{
FullyQualifiedNamespace: "ns",
EventHubName: "eh",
ConsumerGroup: "cg",
Expand Down Expand Up @@ -269,7 +269,7 @@ func (cps *testCheckpointStore) ListOwnership(ctx context.Context, fullyQualifie
return ownerships, nil
}

func (cps *testCheckpointStore) UpdateCheckpoint(ctx context.Context, checkpoint Checkpoint, options *UpdateCheckpointOptions) error {
func (cps *testCheckpointStore) SetCheckpoint(ctx context.Context, checkpoint Checkpoint, options *SetCheckpointOptions) error {
cps.checkpointsMu.Lock()
defer cps.checkpointsMu.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/internal/amqp_fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"context"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
)

type FakeNSForPartClient struct {
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/internal/amqpwrap/amqpwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"errors"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
)

// AMQPReceiver is implemented by *amqp.Receiver
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/internal/amqpwrap/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package amqpwrap
import (
"context"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
)

// RPCResponse is the simplified response structure from an RPC like call
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/internal/cbs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/auth"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/internal/cbs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/auth"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/mock"
"github.com/Azure/go-amqp"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (inf *processorStressTest) receiveForever(ctx context.Context, partClient *

if len(events) > 0 {
// we're okay, let's update our checkpoint
if err := partClient.UpdateCheckpoint(ctx, events[len(events)-1]); err != nil {
if err := partClient.UpdateCheckpoint(ctx, events[len(events)-1], nil); err != nil {
logger("Fatal error updating checkpoint: %s", err)
inf.TC.TrackException(err)
panic(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func initCheckpointStore(ctx context.Context, containerName string, testData *st
newCheckpoint.SequenceNumber = &partProps.LastEnqueuedSequenceNumber
}

if err = cps.UpdateCheckpoint(ctx, newCheckpoint, nil); err != nil {
if err = cps.SetCheckpoint(ctx, newCheckpoint, nil); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
)

type errNonRetriable struct {
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/internal/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
"github.com/stretchr/testify/require"
)

Expand Down
22 changes: 0 additions & 22 deletions sdk/messaging/azeventhubs/internal/go-amqp/LICENSE

This file was deleted.

Loading