Skip to content

Commit

Permalink
feat: add support for gcp pubsub (#266)
Browse files Browse the repository at this point in the history
Adds support for instrumenting `cloud.google.com/go/pubsub`

Requires DataDog/dd-trace-go#2852

---------

Co-authored-by: Romain Marcadier <[email protected]>
  • Loading branch information
rarguelloF and RomainMuller authored Sep 23, 2024
1 parent c6451d2 commit b610c54
Show file tree
Hide file tree
Showing 7 changed files with 1,398 additions and 2 deletions.
6 changes: 5 additions & 1 deletion _integration-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.6
replace github.com/DataDog/orchestrion => ../

require (
cloud.google.com/go/pubsub v1.42.0
github.com/DataDog/orchestrion v0.7.4
github.com/IBM/sarama v1.43.3
github.com/Shopify/sarama v1.38.1
Expand Down Expand Up @@ -32,12 +33,14 @@ require (
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.33.0
github.com/testcontainers/testcontainers-go/modules/cassandra v0.33.0
github.com/testcontainers/testcontainers-go/modules/gcloud v0.33.0
github.com/testcontainers/testcontainers-go/modules/mongodb v0.32.0
github.com/testcontainers/testcontainers-go/modules/redis v0.33.0
github.com/testcontainers/testcontainers-go/modules/redpanda v0.33.0
github.com/testcontainers/testcontainers-go/modules/vault v0.32.0
github.com/xlab/treeprint v1.2.0
go.mongodb.org/mongo-driver v1.16.1
google.golang.org/api v0.194.0
google.golang.org/grpc v1.65.0
google.golang.org/grpc/examples v0.0.0-20240816220358-f8d98a477c22
gopkg.in/DataDog/dd-trace-go.v1 v1.68.0
Expand Down Expand Up @@ -75,9 +78,11 @@ require (
github.com/DataDog/go-tuf v1.1.0-0.5.2 // indirect
github.com/DataDog/gostackparse v0.7.0 // indirect
github.com/DataDog/sketches-go v1.4.6 // indirect
github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/alecthomas/chroma/v2 v2.14.0 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/apache/arrow/go/v15 v15.0.2 // indirect
github.com/armon/go-radix v1.0.1-0.20221118154546-54df44f2176c // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12 // indirect
Expand Down Expand Up @@ -343,7 +348,6 @@ require (
golang.org/x/time v0.6.0 // indirect
golang.org/x/tools v0.24.0 // indirect
golang.org/x/xerrors v0.0.0-20240716161551-93cc26a95ae9 // indirect
google.golang.org/api v0.194.0 // indirect
google.golang.org/genproto v0.0.0-20240823204242-4ba0660f739c // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240823204242-4ba0660f739c // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240823204242-4ba0660f739c // indirect
Expand Down
1,058 changes: 1,058 additions & 0 deletions _integration-tests/go.sum

Large diffs are not rendered by default.

155 changes: 155 additions & 0 deletions _integration-tests/tests/gcp_pubsub/gcp_pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023-present Datadog, Inc.

//go:build integration

package gcppubsub

import (
"context"
"testing"
"time"

"cloud.google.com/go/pubsub"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/gcloud"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"orchestrion/integration/utils"
"orchestrion/integration/validator/trace"
)

const (
testTopic = "pstest-orchestrion-topic"
testSubscription = "pstest-orchestrion-subscription"
)

type TestCase struct {
container *gcloud.GCloudContainer
client *pubsub.Client
publishTime time.Time
messageID string
}

func (tc *TestCase) Setup(t *testing.T) {
var (
err error
ctx = context.Background()
)

tc.container, err = gcloud.RunPubsub(ctx,
"gcr.io/google.com/cloudsdktool/cloud-sdk:490.0.0-emulators",
gcloud.WithProjectID("pstest-orchestrion"),
testcontainers.WithLogger(testcontainers.TestLogger(t)),
utils.WithTestLogConsumer(t),
)
utils.AssertTestContainersError(t, err)

projectID := tc.container.Settings.ProjectID

//dd:ignore
conn, err := grpc.NewClient(tc.container.URI, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)

tc.client, err = pubsub.NewClient(ctx, projectID, option.WithGRPCConn(conn))
require.NoError(t, err)

topic, err := tc.client.CreateTopic(ctx, testTopic)
require.NoError(t, err)

_, err = tc.client.CreateSubscription(ctx, testSubscription, pubsub.SubscriptionConfig{
Topic: topic,
EnableMessageOrdering: true,
})
require.NoError(t, err)
}

func (tc *TestCase) publishMessage(t *testing.T) {
t.Helper()

ctx := context.Background()
topic := tc.client.Topic(testTopic)
topic.EnableMessageOrdering = true
res := topic.Publish(context.Background(), &pubsub.Message{
Data: []byte("Hello, World!"),
OrderingKey: "ordering-key",
})
_, err := res.Get(ctx)
require.NoError(t, err)
t.Log("finished publishing result")
}

func (tc *TestCase) receiveMessage(t *testing.T) {
t.Helper()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

sub := tc.client.Subscription(testSubscription)
err := sub.Receive(ctx, func(_ context.Context, message *pubsub.Message) {
assert.Equal(t, message.Data, []byte("Hello, World!"))
message.Ack()
tc.publishTime = message.PublishTime
tc.messageID = message.ID
cancel()
})
require.NoError(t, err)

<-ctx.Done()
require.NotErrorIs(t, ctx.Err(), context.DeadlineExceeded)
}

func (tc *TestCase) Run(t *testing.T) {
tc.publishMessage(t)
tc.receiveMessage(t)
}

func (tc *TestCase) Teardown(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

require.NoError(t, tc.client.Close())
require.NoError(t, tc.container.Terminate(ctx))
}

func (tc *TestCase) ExpectedTraces() trace.Spans {
return trace.Spans{
{
Tags: map[string]any{
"name": "pubsub.publish",
"type": "queue",
"resource": "projects/pstest-orchestrion/topics/pstest-orchestrion-topic",
"service": "gcp_pubsub.test",
},
Meta: map[string]any{
"span.kind": "producer",
"component": "cloud.google.com/go/pubsub.v1",
"ordering_key": "ordering-key",
},
Children: trace.Spans{
{
Tags: map[string]any{
"name": "pubsub.receive",
"type": "queue",
"resource": "projects/pstest-orchestrion/subscriptions/pstest-orchestrion-subscription",
"service": "gcp_pubsub.test",
},
Meta: map[string]any{
"span.kind": "consumer",
"component": "cloud.google.com/go/pubsub.v1",
"messaging.system": "googlepubsub",
"ordering_key": "ordering-key",
"publish_time": tc.publishTime.String(),
"message_id": tc.messageID,
},
},
},
},
}
}
19 changes: 19 additions & 0 deletions _integration-tests/tests/gcp_pubsub/gen_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 53 additions & 1 deletion internal/injector/builtin/generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/injector/builtin/generated_deps.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b610c54

Please sign in to comment.