From 6512b6bb4509ae537982f75cf5a1583f1f9bd47c Mon Sep 17 00:00:00 2001 From: Angus Salkeld Date: Fri, 4 Nov 2022 14:04:32 +1000 Subject: [PATCH] feat: add trace context to both http and event requests --- api/documents/document_ref.go | 18 ++++----- api/documents/document_ref_test.go | 13 ++++--- api/documents/query.go | 12 +++--- api/documents/query_test.go | 15 ++++---- api/events/events_test.go | 5 ++- api/events/topic.go | 6 +-- api/queues/queue.go | 12 +++--- api/queues/queue_test.go | 9 +++-- api/queues/task.go | 6 +-- api/queues/task_test.go | 5 ++- api/secrets/secret_ref.go | 6 +-- api/secrets/secret_ref_test.go | 5 ++- api/storage/bucket.go | 6 +-- api/storage/bucket_test.go | 5 ++- api/storage/file.go | 34 ++++++++--------- api/storage/file_test.go | 19 +++++----- constants/grpc.go | 3 ++ examples/documents/delete.go | 4 +- examples/documents/get.go | 3 +- examples/documents/paged_results.go | 5 ++- examples/documents/query.go | 3 +- examples/documents/query_filter.go | 3 +- examples/documents/query_limits.go | 3 +- examples/documents/set.go | 4 +- examples/documents/stream_results.go | 3 +- examples/documents/sub_col_query.go | 3 +- examples/documents/sub_doc_query.go | 3 +- examples/events/publish.go | 3 +- examples/events/publish_id.go | 3 +- examples/queues/failed.go | 5 ++- examples/queues/receive.go | 6 ++- examples/queues/send.go | 4 +- examples/queues/send_id.go | 4 +- examples/secrets/put.go | 3 +- examples/storage/delete.go | 4 +- examples/storage/read.go | 3 +- examples/storage/sign_read.go | 3 +- examples/storage/sign_write.go | 3 +- examples/storage/write.go | 4 +- faas/context.go | 15 ++++++-- faas/request.go | 24 +++++++++++- go.mod | 16 +++++++- go.sum | 41 +++++++++++++++++++- mocks/mockapi/events.go | 11 +++--- mocks/mockapi/queues.go | 17 +++++---- mocks/mockapi/secrets.go | 9 +++-- mocks/mockapi/storage.go | 9 +++-- resources/example_test.go | 24 ++++++------ resources/manager.go | 30 +++++++++++++-- resources/trace.go | 56 ++++++++++++++++++++++++++++ 50 files changed, 354 insertions(+), 156 deletions(-) create mode 100644 resources/trace.go diff --git a/api/documents/document_ref.go b/api/documents/document_ref.go index 433f82b..0c01909 100644 --- a/api/documents/document_ref.go +++ b/api/documents/document_ref.go @@ -36,13 +36,13 @@ type DocumentRef interface { Id() string // Get - Retrieve the value of the document - Get() (Document, error) + Get(context.Context) (Document, error) // Set - Sets the value of the document - Set(map[string]interface{}) error + Set(context.Context, map[string]interface{}) error // Delete - Deletes the document - Delete() error + Delete(context.Context) error // Collection - Retrieve a child collection of this document Collection(string) (CollectionRef, error) @@ -119,8 +119,8 @@ func (d *documentRefImpl) Collection(c string) (CollectionRef, error) { } // Delete - Deletes the document this reference refers to if it exists -func (d *documentRefImpl) Delete() error { - _, err := d.dc.Delete(context.TODO(), &v1.DocumentDeleteRequest{ +func (d *documentRefImpl) Delete(ctx context.Context) error { + _, err := d.dc.Delete(ctx, &v1.DocumentDeleteRequest{ Key: d.toWireKey(), }) @@ -128,7 +128,7 @@ func (d *documentRefImpl) Delete() error { } // Set - Sets the contents of the document this reference refers to -func (d *documentRefImpl) Set(content map[string]interface{}) error { +func (d *documentRefImpl) Set(ctx context.Context, content map[string]interface{}) error { sv, err := protoutils.NewStruct(content) if err != nil { return errors.NewWithCause( @@ -138,7 +138,7 @@ func (d *documentRefImpl) Set(content map[string]interface{}) error { ) } - if _, err = d.dc.Set(context.TODO(), &v1.DocumentSetRequest{ + if _, err = d.dc.Set(ctx, &v1.DocumentSetRequest{ Key: d.toWireKey(), Content: sv, }); err != nil { @@ -153,8 +153,8 @@ type DecodeOption interface { } // Get - Retrieves the Document this reference refers to if it exists -func (d *documentRefImpl) Get() (Document, error) { - res, err := d.dc.Get(context.TODO(), &v1.DocumentGetRequest{ +func (d *documentRefImpl) Get(ctx context.Context) (Document, error) { + res, err := d.dc.Get(ctx, &v1.DocumentGetRequest{ Key: d.toWireKey(), }) if err != nil { diff --git a/api/documents/document_ref_test.go b/api/documents/document_ref_test.go index 3552165..3679f0d 100644 --- a/api/documents/document_ref_test.go +++ b/api/documents/document_ref_test.go @@ -15,6 +15,7 @@ package documents import ( + "context" "fmt" "github.com/golang/mock/gomock" @@ -108,7 +109,7 @@ var _ = Describe("DocumentRef", func() { }, } - err := md.Delete() + err := md.Delete(context.TODO()) It("should pass through the returned error", func() { Expect(err).To(HaveOccurred()) @@ -133,7 +134,7 @@ var _ = Describe("DocumentRef", func() { }, } - err := md.Delete() + err := md.Delete(context.TODO()) It("should not return an error", func() { Expect(err).ToNot(HaveOccurred()) @@ -159,7 +160,7 @@ var _ = Describe("DocumentRef", func() { }, } - err := md.Set(map[string]interface{}{ + err := md.Set(context.TODO(), map[string]interface{}{ "test": "test", }) @@ -186,7 +187,7 @@ var _ = Describe("DocumentRef", func() { }, } - err := md.Set(map[string]interface{}{ + err := md.Set(context.TODO(), map[string]interface{}{ "test": "test", }) @@ -214,7 +215,7 @@ var _ = Describe("DocumentRef", func() { }, } - _, err := md.Get() + _, err := md.Get(context.TODO()) It("should pass through the returned error", func() { Expect(err).To(HaveOccurred()) @@ -246,7 +247,7 @@ var _ = Describe("DocumentRef", func() { }, } - d, _ := md.Get() + d, _ := md.Get(context.TODO()) It("should provide the returned document", func() { Expect(d.Content()).To(Equal(map[string]interface{}{ diff --git a/api/documents/query.go b/api/documents/query.go index 6f9510d..f316d77 100644 --- a/api/documents/query.go +++ b/api/documents/query.go @@ -33,10 +33,10 @@ type Query interface { FromPagingToken(interface{}) Query // Fetch - Return paged values - Fetch() (*FetchResult, error) + Fetch(ctx context.Context) (*FetchResult, error) // Stream - Return an iterator containing values - Stream() (DocumentIter, error) + Stream(ctx context.Context) (DocumentIter, error) } // Defacto Query interface implementation @@ -84,7 +84,7 @@ func (q *queryImpl) expressionsToWire() ([]*v1.Expression, error) { return expressions, nil } -func (q *queryImpl) Fetch() (*FetchResult, error) { +func (q *queryImpl) Fetch(ctx context.Context) (*FetchResult, error) { // build the expressions list expressions, err := q.expressionsToWire() if err != nil { @@ -101,7 +101,7 @@ func (q *queryImpl) Fetch() (*FetchResult, error) { token = t } - r, err := q.dc.Query(context.TODO(), &v1.DocumentQueryRequest{ + r, err := q.dc.Query(ctx, &v1.DocumentQueryRequest{ Collection: q.col.ToWire(), Expressions: expressions, Limit: int32(q.limit), @@ -132,14 +132,14 @@ func (q *queryImpl) Fetch() (*FetchResult, error) { }, nil } -func (q *queryImpl) Stream() (DocumentIter, error) { +func (q *queryImpl) Stream(ctx context.Context) (DocumentIter, error) { // build the expressions list expressions, err := q.expressionsToWire() if err != nil { return nil, err } - r, err := q.dc.QueryStream(context.TODO(), &v1.DocumentQueryStreamRequest{ + r, err := q.dc.QueryStream(ctx, &v1.DocumentQueryStreamRequest{ Collection: q.col.ToWire(), Expressions: expressions, Limit: int32(q.limit), diff --git a/api/documents/query_test.go b/api/documents/query_test.go index 5dbce98..0f7ff32 100644 --- a/api/documents/query_test.go +++ b/api/documents/query_test.go @@ -15,6 +15,7 @@ package documents import ( + "context" "fmt" "github.com/golang/mock/gomock" @@ -109,7 +110,7 @@ var _ = Describe("Query", func() { }) q.Where(Condition("test").Eq(StringValue("test"))) - _, err := q.Fetch() + _, err := q.Fetch(context.TODO()) It("should unwrap the gRPC error", func() { Expect(err).To(HaveOccurred()) @@ -149,7 +150,7 @@ var _ = Describe("Query", func() { Condition("test").Eq(StringValue("test")), ) - r, err := q.Fetch() + r, err := q.Fetch(context.TODO()) It("should return documents", func() { By("not returning an error") @@ -176,7 +177,7 @@ var _ = Describe("Query", func() { q.FromPagingToken("blah") - _, err := q.Fetch() + _, err := q.Fetch(context.TODO()) It("should return an error", func() { Expect(err).To(HaveOccurred()) @@ -194,7 +195,7 @@ var _ = Describe("Query", func() { q.Where(&queryExpression{}) - _, err := q.Fetch() + _, err := q.Fetch(context.TODO()) It("should return an error", func() { Expect(err).To(HaveOccurred()) @@ -219,7 +220,7 @@ var _ = Describe("Query", func() { }) q.Where(Condition("test").Eq(StringValue("test"))) - _, err := q.Stream() + _, err := q.Stream(context.TODO()) It("should unwrap the gRPC error", func() { Expect(err).To(HaveOccurred()) @@ -245,7 +246,7 @@ var _ = Describe("Query", func() { Condition("test").Eq(StringValue("test")), ) - r, err := q.Stream() + r, err := q.Stream(context.TODO()) It("should not return an error", func() { Expect(err).ToNot(HaveOccurred()) @@ -273,7 +274,7 @@ var _ = Describe("Query", func() { q.Where(&queryExpression{}) - _, err := q.Stream() + _, err := q.Stream(context.TODO()) It("should return an error", func() { Expect(err).To(HaveOccurred()) diff --git a/api/events/events_test.go b/api/events/events_test.go index 4ee539d..1b09b71 100644 --- a/api/events/events_test.go +++ b/api/events/events_test.go @@ -15,6 +15,7 @@ package events import ( + "context" "fmt" "os" @@ -144,7 +145,7 @@ var _ = Describe("Events", func() { tc: mockTopic, } - returnEvt, err := evt.Topic("test-topic").Publish(&Event{ + returnEvt, err := evt.Topic("test-topic").Publish(context.TODO(), &Event{ Payload: map[string]interface{}{ "test": "test", }, @@ -171,7 +172,7 @@ var _ = Describe("Events", func() { tc: mockTopic, } - _, err := evt.Topic("test-topic").Publish(&Event{ + _, err := evt.Topic("test-topic").Publish(context.TODO(), &Event{ Payload: map[string]interface{}{ "test": "test", }, diff --git a/api/events/topic.go b/api/events/topic.go index 89fe306..4cff2fc 100644 --- a/api/events/topic.go +++ b/api/events/topic.go @@ -32,7 +32,7 @@ type Topic interface { Name() string // Publish will publish the provided events on the topic. - Publish(*Event, ...PublishOption) (*Event, error) + Publish(context.Context, *Event, ...PublishOption) (*Event, error) } type topicImpl struct { @@ -51,7 +51,7 @@ func WithDelay(duration time.Duration) func(*v1.EventPublishRequest) { } } -func (s *topicImpl) Publish(evt *Event, opts ...PublishOption) (*Event, error) { +func (s *topicImpl) Publish(ctx context.Context, evt *Event, opts ...PublishOption) (*Event, error) { // Convert payload to Protobuf Struct payloadStruct, err := protoutils.NewStruct(evt.Payload) if err != nil { @@ -72,7 +72,7 @@ func (s *topicImpl) Publish(evt *Event, opts ...PublishOption) (*Event, error) { opt(event) } - r, err := s.ec.Publish(context.TODO(), event) + r, err := s.ec.Publish(ctx, event) if err != nil { return nil, errors.FromGrpcError(err) } diff --git a/api/queues/queue.go b/api/queues/queue.go index b647a73..6bd3912 100644 --- a/api/queues/queue.go +++ b/api/queues/queue.go @@ -27,9 +27,9 @@ type Queue interface { // Name - The name of the queue Name() string // Send - Push a number of tasks to a queue - Send([]*Task) ([]*FailedTask, error) + Send(context.Context, []*Task) ([]*FailedTask, error) // Receive - Retrieve tasks from a queue to a maximum of the given depth - Receive(int) ([]ReceivedTask, error) + Receive(context.Context, int) ([]ReceivedTask, error) } type queueImpl struct { @@ -41,12 +41,12 @@ func (q *queueImpl) Name() string { return q.name } -func (q *queueImpl) Receive(depth int) ([]ReceivedTask, error) { +func (q *queueImpl) Receive(ctx context.Context, depth int) ([]ReceivedTask, error) { if depth < 1 { return nil, errors.New(codes.InvalidArgument, "Queue.Receive: depth cannot be less than 1") } - r, err := q.c.Receive(context.TODO(), &v1.QueueReceiveRequest{ + r, err := q.c.Receive(ctx, &v1.QueueReceiveRequest{ Queue: q.name, Depth: int32(depth), }) @@ -68,7 +68,7 @@ func (q *queueImpl) Receive(depth int) ([]ReceivedTask, error) { return rts, nil } -func (q *queueImpl) Send(tasks []*Task) ([]*FailedTask, error) { +func (q *queueImpl) Send(ctx context.Context, tasks []*Task) ([]*FailedTask, error) { // Convert SDK Task objects to gRPC Task objects wireTasks := make([]*v1.NitricTask, len(tasks)) for i, task := range tasks { @@ -84,7 +84,7 @@ func (q *queueImpl) Send(tasks []*Task) ([]*FailedTask, error) { } // Push the tasks to the queue - res, err := q.c.SendBatch(context.Background(), &v1.QueueSendBatchRequest{ + res, err := q.c.SendBatch(ctx, &v1.QueueSendBatchRequest{ Queue: q.name, Tasks: wireTasks, }) diff --git a/api/queues/queue_test.go b/api/queues/queue_test.go index b53d407..5e5b18f 100644 --- a/api/queues/queue_test.go +++ b/api/queues/queue_test.go @@ -15,6 +15,7 @@ package queues import ( + "context" "fmt" "github.com/golang/mock/gomock" @@ -40,7 +41,7 @@ var _ = Describe("Queue", func() { c: mockQ, } - _, err := q.Send([]*Task{ + _, err := q.Send(context.TODO(), []*Task{ { ID: "1234", PayloadType: "test-payload", @@ -80,7 +81,7 @@ var _ = Describe("Queue", func() { c: mockQ, } - fts, _ := q.Send([]*Task{ + fts, _ := q.Send(context.TODO(), []*Task{ { ID: "1234", PayloadType: "test-payload", @@ -107,7 +108,7 @@ var _ = Describe("Queue", func() { name: "test-queue", } - _, err := q.Receive(0) + _, err := q.Receive(context.TODO(), 0) It("should return an error", func() { Expect(err).To(HaveOccurred()) @@ -137,7 +138,7 @@ var _ = Describe("Queue", func() { c: mockQ, } - t, _ := q.Receive(1) + t, _ := q.Receive(context.TODO(), 1) It("should receive a single task", func() { Expect(t).To(HaveLen(1)) diff --git a/api/queues/task.go b/api/queues/task.go index 4d909b4..3a5ce0c 100644 --- a/api/queues/task.go +++ b/api/queues/task.go @@ -38,7 +38,7 @@ type ReceivedTask interface { // Task - Returns the Task data contained in this Received Task instance Task() *Task // Complete - Completes the task removing it from the queue - Complete() error + Complete(context.Context) error } type receivedTaskImpl struct { @@ -56,8 +56,8 @@ func (r *receivedTaskImpl) Queue() string { return r.queue } -func (r *receivedTaskImpl) Complete() error { - _, err := r.qc.Complete(context.TODO(), &v1.QueueCompleteRequest{ +func (r *receivedTaskImpl) Complete(ctx context.Context) error { + _, err := r.qc.Complete(ctx, &v1.QueueCompleteRequest{ Queue: r.queue, LeaseId: r.leaseId, }) diff --git a/api/queues/task_test.go b/api/queues/task_test.go index e10e77a..b722ff6 100644 --- a/api/queues/task_test.go +++ b/api/queues/task_test.go @@ -15,6 +15,7 @@ package queues import ( + "context" "fmt" "github.com/golang/mock/gomock" @@ -130,7 +131,7 @@ var _ = Describe("Task", func() { }, } - err := t.Complete() + err := t.Complete(context.TODO()) It("should not return an error", func() { Expect(err).ToNot(HaveOccurred()) @@ -155,7 +156,7 @@ var _ = Describe("Task", func() { }, } - err := t.Complete() + err := t.Complete(context.TODO()) It("should pass through the gRPC error", func() { Expect(err).To(HaveOccurred()) diff --git a/api/secrets/secret_ref.go b/api/secrets/secret_ref.go index d13f026..22f1d92 100644 --- a/api/secrets/secret_ref.go +++ b/api/secrets/secret_ref.go @@ -24,7 +24,7 @@ import ( // SecretRef is a reference to a cloud secret for secret storage. type SecretRef interface { Name() string - Put([]byte) (SecretVersionRef, error) + Put(context.Context, []byte) (SecretVersionRef, error) Version(string) SecretVersionRef Latest() SecretVersionRef } @@ -38,8 +38,8 @@ func (s *secretRefImpl) Name() string { return s.name } -func (s *secretRefImpl) Put(sec []byte) (SecretVersionRef, error) { - r, err := s.sc.Put(context.TODO(), &v1.SecretPutRequest{ +func (s *secretRefImpl) Put(ctx context.Context, sec []byte) (SecretVersionRef, error) { + r, err := s.sc.Put(ctx, &v1.SecretPutRequest{ Secret: &v1.Secret{ Name: s.name, }, diff --git a/api/secrets/secret_ref_test.go b/api/secrets/secret_ref_test.go index 01e6569..157bd96 100644 --- a/api/secrets/secret_ref_test.go +++ b/api/secrets/secret_ref_test.go @@ -15,6 +15,7 @@ package secrets import ( + "context" "fmt" "github.com/golang/mock/gomock" @@ -129,7 +130,7 @@ var _ = Describe("secretRefImpl", func() { }, nil).Times(1) // Call the service - sv, err := s.Put([]byte("ssssshhhh... it's a secret")) + sv, err := s.Put(context.TODO(), []byte("ssssshhhh... it's a secret")) By("not returning an error") Expect(err).ToNot(HaveOccurred()) @@ -164,7 +165,7 @@ var _ = Describe("secretRefImpl", func() { ).Return(nil, fmt.Errorf("mock-error")).Times(1) // Call the service - sv, err := s.Put([]byte("ssssshhhh... it's a secret")) + sv, err := s.Put(context.TODO(), []byte("ssssshhhh... it's a secret")) By("returning the error") Expect(err).To(HaveOccurred()) diff --git a/api/storage/bucket.go b/api/storage/bucket.go index ba158a9..c4fc574 100644 --- a/api/storage/bucket.go +++ b/api/storage/bucket.go @@ -25,7 +25,7 @@ type Bucket interface { // File - Get a file reference for in this bucket File(key string) File // Files - Get all file references for this bucket - Files() ([]File, error) + Files(ctx context.Context) ([]File, error) } type bucketImpl struct { @@ -41,8 +41,8 @@ func (b *bucketImpl) File(key string) File { } } -func (b *bucketImpl) Files() ([]File, error) { - resp, err := b.sc.ListFiles(context.TODO(), &v1.StorageListFilesRequest{ +func (b *bucketImpl) Files(ctx context.Context) ([]File, error) { + resp, err := b.sc.ListFiles(ctx, &v1.StorageListFilesRequest{ BucketName: b.name, }) if err != nil { diff --git a/api/storage/bucket_test.go b/api/storage/bucket_test.go index adafefd..3d4f84f 100644 --- a/api/storage/bucket_test.go +++ b/api/storage/bucket_test.go @@ -15,6 +15,7 @@ package storage import ( + "context" "fmt" "github.com/golang/mock/gomock" @@ -73,7 +74,7 @@ var _ = Describe("Bucket", func() { }).Times(1).Return(nil, fmt.Errorf("mock-error")) By("calling Files() on the bucket reference") - files, err := bucketRef.Files() + files, err := bucketRef.Files(context.TODO()) By("receiving nil files") Expect(files).To(BeNil()) @@ -103,7 +104,7 @@ var _ = Describe("Bucket", func() { }, nil) By("bucket.Files() being called") - files, err := bucketRef.Files() + files, err := bucketRef.Files(context.TODO()) By("not returning an error") Expect(err).ShouldNot(HaveOccurred()) diff --git a/api/storage/file.go b/api/storage/file.go index 2fda902..1a8eda5 100644 --- a/api/storage/file.go +++ b/api/storage/file.go @@ -35,15 +35,15 @@ type File interface { // Name - Get the name of the file Name() string // Read - Read this object - Read() ([]byte, error) + Read(ctx context.Context) ([]byte, error) // Write - Write this object - Write([]byte) error + Write(ctx context.Context, data []byte) error // Delete - Delete this object - Delete() error + Delete(ctx context.Context) error // UploadUrl - Creates a signed Url for uploading this file reference - UploadUrl(expiry int) (string, error) + UploadUrl(ctx context.Context, expiry int) (string, error) // DownloadUrl - Creates a signed Url for downloading this file reference - DownloadUrl(expiry int) (string, error) + DownloadUrl(ctx context.Context, expiry int) (string, error) } type fileImpl struct { @@ -56,8 +56,8 @@ func (o *fileImpl) Name() string { return o.key } -func (o *fileImpl) Read() ([]byte, error) { - r, err := o.sc.Read(context.TODO(), &v1.StorageReadRequest{ +func (o *fileImpl) Read(ctx context.Context) ([]byte, error) { + r, err := o.sc.Read(ctx, &v1.StorageReadRequest{ BucketName: o.bucket, Key: o.key, }) @@ -68,8 +68,8 @@ func (o *fileImpl) Read() ([]byte, error) { return r.GetBody(), nil } -func (o *fileImpl) Write(content []byte) error { - if _, err := o.sc.Write(context.TODO(), &v1.StorageWriteRequest{ +func (o *fileImpl) Write(ctx context.Context, content []byte) error { + if _, err := o.sc.Write(ctx, &v1.StorageWriteRequest{ BucketName: o.bucket, Key: o.key, Body: content, @@ -80,8 +80,8 @@ func (o *fileImpl) Write(content []byte) error { return nil } -func (o *fileImpl) Delete() error { - if _, err := o.sc.Delete(context.TODO(), &v1.StorageDeleteRequest{ +func (o *fileImpl) Delete(ctx context.Context) error { + if _, err := o.sc.Delete(ctx, &v1.StorageDeleteRequest{ BucketName: o.bucket, Key: o.key, }); err != nil { @@ -104,15 +104,15 @@ func (p PresignUrlOptions) isValid() error { return nil } -func (o *fileImpl) UploadUrl(expiry int) (string, error) { - return o.signUrl(PresignUrlOptions{Expiry: expiry, Mode: ModeWrite}) +func (o *fileImpl) UploadUrl(ctx context.Context, expiry int) (string, error) { + return o.signUrl(ctx, PresignUrlOptions{Expiry: expiry, Mode: ModeWrite}) } -func (o *fileImpl) DownloadUrl(expiry int) (string, error) { - return o.signUrl(PresignUrlOptions{Expiry: expiry, Mode: ModeRead}) +func (o *fileImpl) DownloadUrl(ctx context.Context, expiry int) (string, error) { + return o.signUrl(ctx, PresignUrlOptions{Expiry: expiry, Mode: ModeRead}) } -func (o *fileImpl) signUrl(opts PresignUrlOptions) (string, error) { +func (o *fileImpl) signUrl(ctx context.Context, opts PresignUrlOptions) (string, error) { if err := opts.isValid(); err != nil { return "", errors.NewWithCause(codes.InvalidArgument, "invalid options", err) } @@ -123,7 +123,7 @@ func (o *fileImpl) signUrl(opts PresignUrlOptions) (string, error) { op = v1.StoragePreSignUrlRequest_WRITE } - r, err := o.sc.PreSignUrl(context.TODO(), &v1.StoragePreSignUrlRequest{ + r, err := o.sc.PreSignUrl(ctx, &v1.StoragePreSignUrlRequest{ BucketName: o.bucket, Key: o.key, Operation: op, diff --git a/api/storage/file_test.go b/api/storage/file_test.go index 093de72..53d9966 100644 --- a/api/storage/file_test.go +++ b/api/storage/file_test.go @@ -15,6 +15,7 @@ package storage import ( + "context" "fmt" "github.com/golang/mock/gomock" @@ -41,7 +42,7 @@ var _ = Describe("Object", func() { By("the gRPC server returning an error") mockStorage.EXPECT().Read(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("mock error")) - _, err := obj.Read() + _, err := obj.Read(context.TODO()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("Unknown: error from grpc library: \n mock error")) @@ -64,7 +65,7 @@ var _ = Describe("Object", func() { Body: []byte("test"), }, nil) - b, _ := obj.Read() + b, _ := obj.Read(context.TODO()) Expect(b).To(Equal([]byte("test"))) ctrl.Finish() @@ -86,7 +87,7 @@ var _ = Describe("Object", func() { By("the gRPC server returning an error") mockStorage.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("mock error")) - err := obj.Write([]byte("test")) + err := obj.Write(context.TODO(), []byte("test")) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("Unknown: error from grpc library: \n mock error")) @@ -107,7 +108,7 @@ var _ = Describe("Object", func() { By("the gRPC server returning a successful response") mockStorage.EXPECT().Write(gomock.Any(), gomock.Any()).Return(&v1.StorageWriteResponse{}, nil) - err := obj.Write([]byte("test")) + err := obj.Write(context.TODO(), []byte("test")) Expect(err).ToNot(HaveOccurred()) ctrl.Finish() @@ -128,7 +129,7 @@ var _ = Describe("Object", func() { By("the gRPC server returning an error") mockStorage.EXPECT().Delete(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("mock error")) - err := obj.Delete() + err := obj.Delete(context.TODO()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("Unknown: error from grpc library: \n mock error")) @@ -149,7 +150,7 @@ var _ = Describe("Object", func() { By("the gRPC server returning a successful response") mockStorage.EXPECT().Delete(gomock.Any(), gomock.Any()).Return(&v1.StorageDeleteResponse{}, nil) - err := obj.Delete() + err := obj.Delete(context.TODO()) Expect(err).ToNot(HaveOccurred()) ctrl.Finish() }) @@ -164,7 +165,7 @@ var _ = Describe("Object", func() { key: "test-object", } - _, err := obj.signUrl(PresignUrlOptions{ + _, err := obj.signUrl(context.TODO(), PresignUrlOptions{ Mode: 7, }) Expect(err).Should(HaveOccurred()) @@ -190,7 +191,7 @@ var _ = Describe("Object", func() { Operation: v1.StoragePreSignUrlRequest_READ, }).Return(nil, fmt.Errorf("mock error")) - _, err := obj.signUrl(PresignUrlOptions{}) + _, err := obj.signUrl(context.TODO(), PresignUrlOptions{}) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("Unknown: error from grpc library: \n mock error")) @@ -217,7 +218,7 @@ var _ = Describe("Object", func() { Url: "http://example.com", }, nil) - url, err := obj.signUrl(PresignUrlOptions{Mode: ModeWrite}) + url, err := obj.signUrl(context.TODO(), PresignUrlOptions{Mode: ModeWrite}) Expect(err).ToNot(HaveOccurred()) Expect(url).To(Equal("http://example.com")) diff --git a/constants/grpc.go b/constants/grpc.go index ec72314..c4fdbc9 100644 --- a/constants/grpc.go +++ b/constants/grpc.go @@ -15,6 +15,7 @@ package constants import ( + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" ) @@ -25,5 +26,7 @@ func DefaultOptions() []grpc.DialOption { grpc.WithInsecure(), //nolint:staticcheck grpc.WithBlock(), grpc.WithTimeout(NitricDialTimeout()), //nolint:staticcheck + grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), + grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()), } } diff --git a/examples/documents/delete.go b/examples/documents/delete.go index f4a644a..c96aca9 100644 --- a/examples/documents/delete.go +++ b/examples/documents/delete.go @@ -16,6 +16,8 @@ package documents_examples // [START import] import ( + "context" + "github.com/nitrictech/go-sdk/api/documents" ) @@ -27,7 +29,7 @@ func delete() { document := docs.Collection("products").Doc("nitric") - err := document.Delete() + err := document.Delete(context.TODO()) if err != nil { // handle error diff --git a/examples/documents/get.go b/examples/documents/get.go index 7a6e680..d7fc586 100644 --- a/examples/documents/get.go +++ b/examples/documents/get.go @@ -16,6 +16,7 @@ package documents_examples // [START import] import ( + "context" "fmt" "github.com/nitrictech/go-sdk/api/documents" @@ -29,7 +30,7 @@ func get() { document := docs.Collection("products").Doc("nitric") - product, err := document.Get() + product, err := document.Get(context.TODO()) if err != nil { // handle error diff --git a/examples/documents/paged_results.go b/examples/documents/paged_results.go index 60a1298..8835c43 100644 --- a/examples/documents/paged_results.go +++ b/examples/documents/paged_results.go @@ -16,6 +16,7 @@ package documents_examples // [START import] import ( + "context" "fmt" "github.com/nitrictech/go-sdk/api/documents" @@ -33,10 +34,10 @@ func pagedResults() { ).Limit(100) // Fetch first page - results, _ := query.Fetch() + results, _ := query.Fetch(context.TODO()) // Fetch next page - results, _ = query.FromPagingToken(results.PagingToken).Fetch() + results, _ = query.FromPagingToken(results.PagingToken).Fetch(context.TODO()) fmt.Println("results: ", results) // [END snippet] diff --git a/examples/documents/query.go b/examples/documents/query.go index 4527b02..4eae1b9 100644 --- a/examples/documents/query.go +++ b/examples/documents/query.go @@ -16,6 +16,7 @@ package documents_examples // [START import] import ( + "context" "fmt" "github.com/nitrictech/go-sdk/api/documents" @@ -29,7 +30,7 @@ func query() { query := docs.Collection("customers").Query() - results, err := query.Fetch() + results, err := query.Fetch(context.TODO()) if err != nil { // handle error diff --git a/examples/documents/query_filter.go b/examples/documents/query_filter.go index d10d210..03345e2 100644 --- a/examples/documents/query_filter.go +++ b/examples/documents/query_filter.go @@ -16,6 +16,7 @@ package documents_examples // [START import] import ( + "context" "fmt" "github.com/nitrictech/go-sdk/api/documents" @@ -32,7 +33,7 @@ func queryFilter() { documents.Condition("age").Gt(documents.NumberValue(21)), ) - results, err := query.Fetch() + results, err := query.Fetch(context.TODO()) if err != nil { // handle error diff --git a/examples/documents/query_limits.go b/examples/documents/query_limits.go index 93d762a..94957b6 100644 --- a/examples/documents/query_limits.go +++ b/examples/documents/query_limits.go @@ -16,6 +16,7 @@ package documents_examples // [START import] import ( + "context" "fmt" "github.com/nitrictech/go-sdk/api/documents" @@ -29,7 +30,7 @@ func queryLimit() { query := docs.Collection("customers").Query().Limit(1000) - results, err := query.Fetch() + results, err := query.Fetch(context.TODO()) if err != nil { // handle error diff --git a/examples/documents/set.go b/examples/documents/set.go index cf878fc..4ba5dfc 100644 --- a/examples/documents/set.go +++ b/examples/documents/set.go @@ -16,6 +16,8 @@ package documents_examples // [START import] import ( + "context" + "github.com/nitrictech/go-sdk/api/documents" ) @@ -27,7 +29,7 @@ func set() { document := docs.Collection("products").Doc("nitric") - err := document.Set(map[string]interface{}{ + err := document.Set(context.TODO(), map[string]interface{}{ "id": "nitric", "name": "Nitric Framework", "description": "A development framework", diff --git a/examples/documents/stream_results.go b/examples/documents/stream_results.go index ef6dc97..048c23e 100644 --- a/examples/documents/stream_results.go +++ b/examples/documents/stream_results.go @@ -16,6 +16,7 @@ package documents_examples // [START import] import ( + "context" "fmt" "io" @@ -29,7 +30,7 @@ func stream() { docs, _ := documents.New() query := docs.Collection("Customers").Query() - itr, _ := query.Stream() + itr, _ := query.Stream(context.TODO()) for d, err := itr.Next(); err != io.EOF; d, err = itr.Next() { if err != nil { diff --git a/examples/documents/sub_col_query.go b/examples/documents/sub_col_query.go index 942433f..daa8ae5 100644 --- a/examples/documents/sub_col_query.go +++ b/examples/documents/sub_col_query.go @@ -16,6 +16,7 @@ package documents_examples // [START import] import ( + "context" "fmt" "github.com/nitrictech/go-sdk/api/documents" @@ -31,7 +32,7 @@ func subColQuery() { query := collection.Query() - results, err := query.Fetch() + results, err := query.Fetch(context.TODO()) if err != nil { // handle error diff --git a/examples/documents/sub_doc_query.go b/examples/documents/sub_doc_query.go index c1232cd..47b2f93 100644 --- a/examples/documents/sub_doc_query.go +++ b/examples/documents/sub_doc_query.go @@ -16,6 +16,7 @@ package documents_examples // [START import] import ( + "context" "fmt" "github.com/nitrictech/go-sdk/api/documents" @@ -31,7 +32,7 @@ func subDocQuery() { query := collection.Query() - results, err := query.Fetch() + results, err := query.Fetch(context.TODO()) if err != nil { // handle error diff --git a/examples/events/publish.go b/examples/events/publish.go index 3fb1614..9c0f880 100644 --- a/examples/events/publish.go +++ b/examples/events/publish.go @@ -16,6 +16,7 @@ package events_examples // [START import] import ( + "context" "fmt" "github.com/nitrictech/go-sdk/api/events" @@ -32,7 +33,7 @@ func publishEvent() error { return err } - pEvt, err := ec.Topic("my-topic").Publish(&events.Event{ + pEvt, err := ec.Topic("my-topic").Publish(context.TODO(), &events.Event{ Payload: map[string]interface{}{ "test": "event", }, diff --git a/examples/events/publish_id.go b/examples/events/publish_id.go index 85ede78..05b4c1b 100644 --- a/examples/events/publish_id.go +++ b/examples/events/publish_id.go @@ -16,6 +16,7 @@ package events_examples // [START import] import ( + "context" "fmt" "github.com/nitrictech/go-sdk/api/events" @@ -31,7 +32,7 @@ func publishEventId() error { return err } - pEvt, err := ec.Topic("my-topic").Publish(&events.Event{ + pEvt, err := ec.Topic("my-topic").Publish(context.TODO(), &events.Event{ ID: "unique-event-id", Payload: map[string]interface{}{ "test": "event", diff --git a/examples/queues/failed.go b/examples/queues/failed.go index 81f3e43..3aad960 100644 --- a/examples/queues/failed.go +++ b/examples/queues/failed.go @@ -16,6 +16,7 @@ package queues_examples // [START import] import ( + "context" "fmt" "github.com/nitrictech/go-sdk/api/queues" @@ -27,7 +28,7 @@ func failed() { // [START snippet] qc, _ := queues.New() - ftasks, err := qc.Queue("my-queue").Send([]*queues.Task{ + ftasks, err := qc.Queue("my-queue").Send(context.TODO(), []*queues.Task{ { ID: "1234", PayloadType: "test-payload", @@ -45,7 +46,7 @@ func failed() { // print out the failure reason fmt.Println("Failed Task", ft.Reason) // resend the failed task - qc.Queue("my-queue").Send([]*queues.Task{ft.Task}) + qc.Queue("my-queue").Send(context.TODO(), []*queues.Task{ft.Task}) } // [END snippet] } diff --git a/examples/queues/receive.go b/examples/queues/receive.go index c21e722..19cfa89 100644 --- a/examples/queues/receive.go +++ b/examples/queues/receive.go @@ -16,6 +16,8 @@ package queues_examples // [START import] import ( + "context" + "github.com/nitrictech/go-sdk/api/queues" ) @@ -25,14 +27,14 @@ func receive() { // [START snippet] qc, _ := queues.New() - tasks, err := qc.Queue("my-queue").Receive(10) + tasks, err := qc.Queue("my-queue").Receive(context.TODO(), 10) if err != nil { // handle error } for _, t := range tasks { - err := t.Complete() + err := t.Complete(context.TODO()) if err != nil { // handle completion error diff --git a/examples/queues/send.go b/examples/queues/send.go index 2c2c39d..f1c78ba 100644 --- a/examples/queues/send.go +++ b/examples/queues/send.go @@ -16,6 +16,8 @@ package queues_examples // [START import] import ( + "context" + "github.com/nitrictech/go-sdk/api/queues" ) @@ -25,7 +27,7 @@ func send() { // [START snippet] qc, _ := queues.New() - _, err := qc.Queue("my-queue").Send([]*queues.Task{ + _, err := qc.Queue("my-queue").Send(context.TODO(), []*queues.Task{ { ID: "1234", PayloadType: "test-payload", diff --git a/examples/queues/send_id.go b/examples/queues/send_id.go index 68c997b..75076d7 100644 --- a/examples/queues/send_id.go +++ b/examples/queues/send_id.go @@ -16,6 +16,8 @@ package queues_examples // [START import] import ( + "context" + "github.com/nitrictech/go-sdk/api/queues" ) @@ -25,7 +27,7 @@ func sendId() { // [START snippet] qc, _ := queues.New() - _, err := qc.Queue("my-queue").Send([]*queues.Task{ + _, err := qc.Queue("my-queue").Send(context.TODO(), []*queues.Task{ { ID: "unique-task-id", PayloadType: "test-payload", diff --git a/examples/secrets/put.go b/examples/secrets/put.go index 0122307..bd4435d 100644 --- a/examples/secrets/put.go +++ b/examples/secrets/put.go @@ -16,6 +16,7 @@ package secrets_examples // [START import] import ( + "context" "fmt" "github.com/nitrictech/go-sdk/api/secrets" @@ -28,7 +29,7 @@ func put() { sc, _ := secrets.New() // Access the latest secret - version, err := sc.Secret("my-secret").Put([]byte("sssshhhh.... it's a secret")) + version, err := sc.Secret("my-secret").Put(context.TODO(), []byte("sssshhhh.... it's a secret")) if err != nil { // handle error diff --git a/examples/storage/delete.go b/examples/storage/delete.go index 52f4588..bd9d1ba 100644 --- a/examples/storage/delete.go +++ b/examples/storage/delete.go @@ -16,6 +16,8 @@ package storage_examples // [START import] import ( + "context" + "github.com/nitrictech/go-sdk/api/storage" ) @@ -30,7 +32,7 @@ func deleteFile() { // handle client creation error... } - err = sc.Bucket("my-bucket").File("path/to/file").Delete() + err = sc.Bucket("my-bucket").File("path/to/file").Delete(context.TODO()) if err != nil { // handle bucket file delete error diff --git a/examples/storage/read.go b/examples/storage/read.go index 60abcdc..346172a 100644 --- a/examples/storage/read.go +++ b/examples/storage/read.go @@ -16,6 +16,7 @@ package storage_examples // [START import] import ( + "context" "fmt" "github.com/nitrictech/go-sdk/api/storage" @@ -32,7 +33,7 @@ func readFile() { // handle client creation error... } - data, err := sc.Bucket("my-bucket").File("path/to/file").Read() + data, err := sc.Bucket("my-bucket").File("path/to/file").Read(context.TODO()) if err != nil { // handle bucket file delete error diff --git a/examples/storage/sign_read.go b/examples/storage/sign_read.go index 6a324d4..c09e0df 100644 --- a/examples/storage/sign_read.go +++ b/examples/storage/sign_read.go @@ -16,6 +16,7 @@ package storage_examples // [START import] import ( + "context" "fmt" "github.com/nitrictech/go-sdk/api/storage" @@ -31,7 +32,7 @@ func signUrlRead() { // handle client creation error... } - url, err := sc.Bucket("my-bucket").File("path/to/file").DownloadUrl(3600) + url, err := sc.Bucket("my-bucket").File("path/to/file").DownloadUrl(context.TODO(), 3600) if err != nil { // handle bucket file presign url error } diff --git a/examples/storage/sign_write.go b/examples/storage/sign_write.go index 7ad4583..e865d37 100644 --- a/examples/storage/sign_write.go +++ b/examples/storage/sign_write.go @@ -16,6 +16,7 @@ package storage_examples // [START import] import ( + "context" "fmt" "github.com/nitrictech/go-sdk/api/storage" @@ -31,7 +32,7 @@ func signUrlWrite() { // handle client creation error... } - url, err := sc.Bucket("my-bucket").File("path/to/file").UploadUrl(3600) + url, err := sc.Bucket("my-bucket").File("path/to/file").UploadUrl(context.TODO(), 3600) if err != nil { // handle bucket file presign url error } diff --git a/examples/storage/write.go b/examples/storage/write.go index b311db6..e1938d5 100644 --- a/examples/storage/write.go +++ b/examples/storage/write.go @@ -16,6 +16,8 @@ package storage_examples // [START import] import ( + "context" + "github.com/nitrictech/go-sdk/api/storage" ) @@ -30,7 +32,7 @@ func writeFile() { // handle client creation error... } - err = sc.Bucket("my-bucket").File("path/to/file").Write([]byte("Hello World!")) + err = sc.Bucket("my-bucket").File("path/to/file").Write(context.TODO(), []byte("Hello World!")) if err != nil { // handle bucket file delete error diff --git a/faas/context.go b/faas/context.go index 24ca06e..c1128dd 100644 --- a/faas/context.go +++ b/faas/context.go @@ -41,6 +41,11 @@ func (t triggerContextImpl) Event() *EventContext { func triggerContextFromGrpcTriggerRequest(triggerReq *pb.TriggerRequest) (*triggerContextImpl, error) { trigCtx := &triggerContextImpl{} + tc := map[string]string{} + if triggerReq.TraceContext != nil { + tc = triggerReq.TraceContext.GetValues() + } + if triggerReq.GetHttp() != nil { httpTrig := triggerReq.GetHttp() @@ -69,8 +74,9 @@ func triggerContextFromGrpcTriggerRequest(triggerReq *pb.TriggerRequest) (*trigg trigCtx.http = &HttpContext{ Request: &httpRequestImpl{ dataRequestImpl: dataRequestImpl{ - data: triggerReq.GetData(), - mimeType: triggerReq.GetMimeType(), + data: triggerReq.GetData(), + mimeType: triggerReq.GetMimeType(), + traceContext: tc, }, method: httpTrig.GetMethod(), headers: headers, @@ -93,8 +99,9 @@ func triggerContextFromGrpcTriggerRequest(triggerReq *pb.TriggerRequest) (*trigg trigCtx.event = &EventContext{ Request: &eventRequestImpl{ dataRequestImpl: dataRequestImpl{ - data: triggerReq.GetData(), - mimeType: triggerReq.GetMimeType(), + data: triggerReq.GetData(), + mimeType: triggerReq.GetMimeType(), + traceContext: tc, }, topic: topic.GetTopic(), }, diff --git a/faas/request.go b/faas/request.go index c4654ad..5bb1d14 100644 --- a/faas/request.go +++ b/faas/request.go @@ -14,14 +14,23 @@ package faas +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" +) + type DataRequest interface { Data() []byte MimeType() string + Context() context.Context } type dataRequestImpl struct { - data []byte - mimeType string + data []byte + mimeType string + traceContext map[string]string } func (d *dataRequestImpl) Data() []byte { @@ -32,8 +41,19 @@ func (d *dataRequestImpl) MimeType() string { return d.mimeType } +func (d *dataRequestImpl) Context() context.Context { + phc := propagation.HeaderCarrier{} + + for k, v := range d.traceContext { + phc.Set(k, v) + } + + return otel.GetTextMapPropagator().Extract(context.Background(), phc) +} + type HttpRequest interface { DataRequest + Context() context.Context Method() string Path() string Query() map[string][]string diff --git a/go.mod b/go.mod index 547fff7..5e745cd 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,10 @@ require ( github.com/onsi/gomega v1.22.1 github.com/pkg/errors v0.9.1 github.com/uw-labs/lichen v0.1.7 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.36.4 + go.opentelemetry.io/otel v1.11.1 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.1 + go.opentelemetry.io/otel/sdk v1.11.1 google.golang.org/grpc v1.50.1 google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.2.0 google.golang.org/protobuf v1.28.1 @@ -47,6 +51,7 @@ require ( github.com/breml/bidichk v0.2.3 // indirect github.com/breml/errchkjson v0.3.0 // indirect github.com/butuzov/ireturn v0.1.1 // indirect + github.com/cenkalti/backoff/v4 v4.1.3 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/charithe/durationcheck v0.0.9 // indirect github.com/chavacava/garif v0.0.0-20220316182200-5cad0b5181d4 // indirect @@ -62,6 +67,8 @@ require ( github.com/fsnotify/fsnotify v1.5.4 // indirect github.com/fzipp/gocyclo v0.5.1 // indirect github.com/go-critic/go-critic v0.6.3 // indirect + github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/go-toolsmith/astcast v1.0.0 // indirect github.com/go-toolsmith/astcopy v1.0.1 // indirect @@ -82,13 +89,14 @@ require ( github.com/golangci/misspell v0.3.5 // indirect github.com/golangci/revgrep v0.0.0-20210930125155-c22e5001d4f2 // indirect github.com/golangci/unconvert v0.0.0-20180507085042-28b1c447d1f4 // indirect - github.com/google/go-cmp v0.5.8 // indirect + github.com/google/go-cmp v0.5.9 // indirect github.com/google/licenseclassifier v0.0.0-20220326190949-7c62d6fe8d3a // indirect github.com/gordonklaus/ineffassign v0.0.0-20210914165742-4cc7213b9bc8 // indirect github.com/gostaticanalysis/analysisutil v0.7.1 // indirect github.com/gostaticanalysis/comment v1.4.2 // indirect github.com/gostaticanalysis/forcetypeassert v0.1.0 // indirect github.com/gostaticanalysis/nilerr v0.1.1 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-version v1.4.0 // indirect @@ -162,7 +170,7 @@ require ( github.com/ssgreg/nlreturn/v2 v2.2.1 // indirect github.com/stbenjam/no-sprintf-host-port v0.1.1 // indirect github.com/stretchr/objx v0.4.0 // indirect - github.com/stretchr/testify v1.7.1 // indirect + github.com/stretchr/testify v1.8.0 // indirect github.com/subosito/gotenv v1.2.0 // indirect github.com/sylvia7788/contextcheck v1.0.5 // indirect github.com/tdakkota/asciicheck v0.1.1 // indirect @@ -177,6 +185,10 @@ require ( github.com/yagipy/maintidx v1.0.0 // indirect github.com/yeya24/promlinter v0.2.0 // indirect gitlab.com/bosi/decorder v0.2.1 // indirect + go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.1 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.1 // indirect + go.opentelemetry.io/otel/trace v1.11.1 // indirect + go.opentelemetry.io/proto/otlp v0.19.0 // indirect golang.org/x/exp/typeparams v0.0.0-20220428152302-39d4317da171 // indirect golang.org/x/mod v0.6.0 // indirect golang.org/x/net v0.1.0 // indirect diff --git a/go.sum b/go.sum index 71ad95b..622b664 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,7 @@ cloud.google.com/go v0.94.1/go.mod h1:qAlAugsXlC+JWO+Bke5vCtc9ONxjQT3drlTTnAplMW cloud.google.com/go v0.97.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Udc= cloud.google.com/go v0.98.0/go.mod h1:ua6Ush4NALrHk5QXDWnjvZHN93OuF0HfuEPq9I1X0cM= cloud.google.com/go v0.99.0/go.mod h1:w0Xx2nLzqWJPuozYQX+hFfCSI8WioryfRDzkoI/Y2ZA= +cloud.google.com/go v0.100.2 h1:t9Iw5QH5v4XtlEQaCtUY7x6sCABps8sW0acw7e2WQ6Y= cloud.google.com/go v0.100.2/go.mod h1:4Xra9TjzAeYHrl5+oeLlzbM2k3mjVhZh4UqTZ//w99A= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= @@ -42,6 +43,7 @@ cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4g cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ= cloud.google.com/go/compute v0.1.0/go.mod h1:GAesmwr110a34z04OlxYkATPBEfVhkymfTBXtfbBFow= cloud.google.com/go/compute v1.3.0/go.mod h1:cCZiE1NHEtai4wiufUhW8I8S1JKkAnhnQJWM7YD99wM= +cloud.google.com/go/compute v1.5.0 h1:b1zWmYuuHz7gO9kDcM/EpHGr06UgsYNRpNJzI2kFiLM= cloud.google.com/go/compute v1.5.0/go.mod h1:9SMHyhJlzhlkJqrPAc839t2BZFTSk6Jdj6mkzQJeu0M= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= @@ -135,6 +137,8 @@ github.com/breml/errchkjson v0.3.0 h1:YdDqhfqMT+I1vIxPSas44P+9Z9HzJwCeAzjB8PxP1x github.com/breml/errchkjson v0.3.0/go.mod h1:9Cogkyv9gcT8HREpzi3TiqBxCqDzo8awa92zSDFcofU= github.com/butuzov/ireturn v0.1.1 h1:QvrO2QF2+/Cx1WA/vETCIYBKtRjc30vesdoPUNo1EbY= github.com/butuzov/ireturn v0.1.1/go.mod h1:Wh6Zl3IMtTpaIKbmwzqi6olnM9ptYQxxVacMsOEFPoc= +github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= +github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= @@ -248,6 +252,11 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-redis/redis v6.15.8+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= @@ -288,6 +297,8 @@ github.com/gogo/protobuf v1.3.0/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= +github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -365,8 +376,9 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= -github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/licenseclassifier v0.0.0-20201113175434-78a70215ca36 h1:YGB3wNLUTvq+lbIwdNRsaMJvoX4mCKkwzHlmlT1V+ow= github.com/google/licenseclassifier v0.0.0-20201113175434-78a70215ca36/go.mod h1:qsqn2hxC+vURpyBRygGUuinTO42MFRLcsmQ/P8v94+M= @@ -437,7 +449,10 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.2.2/go.mod h1:EaizFBKfUKtMIF5iaD github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.12.1/go.mod h1:8XEsbTttt/W+VvjtQhLACqCisSPWTxCZ7sBRjU6iH9c= +github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 h1:BZHcxBETFHIdVyhyEfOvn/RdU/QGdLI4y34qQGjGWO0= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks= github.com/hashicorp/consul/api v1.10.1/go.mod h1:XjsvQN+RJGWI2TWy1/kqaE16HrR2J/FWgkYjdZQsX9M= github.com/hashicorp/consul/api v1.11.0/go.mod h1:XjsvQN+RJGWI2TWy1/kqaE16HrR2J/FWgkYjdZQsX9M= github.com/hashicorp/consul/api v1.12.0/go.mod h1:6pVBMo0ebnYdt2S3H87XhekM/HHrUoTD2XXb/VrZVy0= @@ -883,8 +898,9 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/sylvia7788/contextcheck v1.0.4/go.mod h1:vuPKJMQ7MQ91ZTqfdyreNKwZjyUg6KO+IebVyQDedZQ= @@ -969,11 +985,29 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.36.4 h1:PRXhsszxTt5bbPriTjmaweWUsAnJYeWBhUMLRetUgBU= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.36.4/go.mod h1:05eWWy6ZWzmpeImD3UowLTB3VjDMU1yxQ+ENuVWDM3c= +go.opentelemetry.io/otel v1.11.1 h1:4WLLAmcfkmDk2ukNXJyq3/kiz/3UzCaYq6PskJsaou4= +go.opentelemetry.io/otel v1.11.1/go.mod h1:1nNhXBbWSD0nsL38H6btgnFN2k4i0sNLHNNMZMSbUGE= +go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.1 h1:X2GndnMCsUPh6CiY2a+frAbNsXaPLbB0soHRYhAZ5Ig= +go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.1/go.mod h1:i8vjiSzbiUC7wOQplijSXMYUpNM93DtlS5CbUT+C6oQ= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.1 h1:MEQNafcNCB0uQIti/oHgU7CZpUMYQ7qigBwMVKycHvc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.1/go.mod h1:19O5I2U5iys38SsmT2uDJja/300woyzE1KPIQxEUBUc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.1 h1:LYyG/f1W/jzAix16jbksJfMQFpOH/Ma6T639pVPMgfI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.1/go.mod h1:QrRRQiY3kzAoYPNLP0W/Ikg0gR6V3LMc+ODSxr7yyvg= +go.opentelemetry.io/otel/sdk v1.11.1 h1:F7KmQgoHljhUuJyA+9BiU+EkJfyX5nVVF4wyzWZpKxs= +go.opentelemetry.io/otel/sdk v1.11.1/go.mod h1:/l3FE4SupHJ12TduVjUkZtlfFqDCQJlOlithYrdktys= +go.opentelemetry.io/otel/trace v1.11.1 h1:ofxdnzsNrGBYXbP7t7zpUK281+go5rF7dvdIZXF8gdQ= +go.opentelemetry.io/otel/trace v1.11.1/go.mod h1:f/Q9G7vzk5u91PhbmKbg1Qn0rzH1LJ4vbPHFGkTPtOk= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= +go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= @@ -1129,6 +1163,7 @@ golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= +golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 h1:OSnWWcOd/CtWQC2cYSBgbTSJv3ciqd8r54ySIW2y3RE= golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -1248,6 +1283,7 @@ golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= @@ -1323,6 +1359,7 @@ google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww google.golang.org/appengine v1.6.2/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20170818010345-ee236bd376b0/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= diff --git a/mocks/mockapi/events.go b/mocks/mockapi/events.go index 3fb5a6d..0c87932 100644 --- a/mocks/mockapi/events.go +++ b/mocks/mockapi/events.go @@ -5,6 +5,7 @@ package mockapi import ( + context "context" reflect "reflect" gomock "github.com/golang/mock/gomock" @@ -102,10 +103,10 @@ func (mr *MockTopicMockRecorder) Name() *gomock.Call { } // Publish mocks base method. -func (m *MockTopic) Publish(arg0 *events.Event, arg1 ...func(*v1.EventPublishRequest)) (*events.Event, error) { +func (m *MockTopic) Publish(arg0 context.Context, arg1 *events.Event, arg2 ...func(*v1.EventPublishRequest)) (*events.Event, error) { m.ctrl.T.Helper() - varargs := []interface{}{arg0} - for _, a := range arg1 { + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "Publish", varargs...) @@ -115,8 +116,8 @@ func (m *MockTopic) Publish(arg0 *events.Event, arg1 ...func(*v1.EventPublishReq } // Publish indicates an expected call of Publish. -func (mr *MockTopicMockRecorder) Publish(arg0 interface{}, arg1 ...interface{}) *gomock.Call { +func (mr *MockTopicMockRecorder) Publish(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0}, arg1...) + varargs := append([]interface{}{arg0, arg1}, arg2...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MockTopic)(nil).Publish), varargs...) } diff --git a/mocks/mockapi/queues.go b/mocks/mockapi/queues.go index bcbf433..ba83ed8 100644 --- a/mocks/mockapi/queues.go +++ b/mocks/mockapi/queues.go @@ -5,6 +5,7 @@ package mockapi import ( + context "context" reflect "reflect" gomock "github.com/golang/mock/gomock" @@ -86,31 +87,31 @@ func (mr *MockQueueMockRecorder) Name() *gomock.Call { } // Receive mocks base method. -func (m *MockQueue) Receive(arg0 int) ([]queues.ReceivedTask, error) { +func (m *MockQueue) Receive(arg0 context.Context, arg1 int) ([]queues.ReceivedTask, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Receive", arg0) + ret := m.ctrl.Call(m, "Receive", arg0, arg1) ret0, _ := ret[0].([]queues.ReceivedTask) ret1, _ := ret[1].(error) return ret0, ret1 } // Receive indicates an expected call of Receive. -func (mr *MockQueueMockRecorder) Receive(arg0 interface{}) *gomock.Call { +func (mr *MockQueueMockRecorder) Receive(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Receive", reflect.TypeOf((*MockQueue)(nil).Receive), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Receive", reflect.TypeOf((*MockQueue)(nil).Receive), arg0, arg1) } // Send mocks base method. -func (m *MockQueue) Send(arg0 []*queues.Task) ([]*queues.FailedTask, error) { +func (m *MockQueue) Send(arg0 context.Context, arg1 []*queues.Task) ([]*queues.FailedTask, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Send", arg0) + ret := m.ctrl.Call(m, "Send", arg0, arg1) ret0, _ := ret[0].([]*queues.FailedTask) ret1, _ := ret[1].(error) return ret0, ret1 } // Send indicates an expected call of Send. -func (mr *MockQueueMockRecorder) Send(arg0 interface{}) *gomock.Call { +func (mr *MockQueueMockRecorder) Send(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockQueue)(nil).Send), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockQueue)(nil).Send), arg0, arg1) } diff --git a/mocks/mockapi/secrets.go b/mocks/mockapi/secrets.go index ed1ba0f..381550c 100644 --- a/mocks/mockapi/secrets.go +++ b/mocks/mockapi/secrets.go @@ -5,6 +5,7 @@ package mockapi import ( + context "context" reflect "reflect" gomock "github.com/golang/mock/gomock" @@ -100,18 +101,18 @@ func (mr *MockSecretRefMockRecorder) Name() *gomock.Call { } // Put mocks base method. -func (m *MockSecretRef) Put(arg0 []byte) (secrets.SecretVersionRef, error) { +func (m *MockSecretRef) Put(arg0 context.Context, arg1 []byte) (secrets.SecretVersionRef, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Put", arg0) + ret := m.ctrl.Call(m, "Put", arg0, arg1) ret0, _ := ret[0].(secrets.SecretVersionRef) ret1, _ := ret[1].(error) return ret0, ret1 } // Put indicates an expected call of Put. -func (mr *MockSecretRefMockRecorder) Put(arg0 interface{}) *gomock.Call { +func (mr *MockSecretRefMockRecorder) Put(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockSecretRef)(nil).Put), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockSecretRef)(nil).Put), arg0, arg1) } // Version mocks base method. diff --git a/mocks/mockapi/storage.go b/mocks/mockapi/storage.go index c9500ec..1dd54e1 100644 --- a/mocks/mockapi/storage.go +++ b/mocks/mockapi/storage.go @@ -5,6 +5,7 @@ package mockapi import ( + context "context" reflect "reflect" gomock "github.com/golang/mock/gomock" @@ -86,16 +87,16 @@ func (mr *MockBucketMockRecorder) File(arg0 interface{}) *gomock.Call { } // Files mocks base method. -func (m *MockBucket) Files() ([]storage.File, error) { +func (m *MockBucket) Files(arg0 context.Context) ([]storage.File, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Files") + ret := m.ctrl.Call(m, "Files", arg0) ret0, _ := ret[0].([]storage.File) ret1, _ := ret[1].(error) return ret0, ret1 } // Files indicates an expected call of Files. -func (mr *MockBucketMockRecorder) Files() *gomock.Call { +func (mr *MockBucketMockRecorder) Files(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Files", reflect.TypeOf((*MockBucket)(nil).Files)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Files", reflect.TypeOf((*MockBucket)(nil).Files), arg0) } diff --git a/resources/example_test.go b/resources/example_test.go index 0aea316..1c3f0e8 100644 --- a/resources/example_test.go +++ b/resources/example_test.go @@ -86,7 +86,7 @@ func ExampleNewSchedule() { err = NewSchedule("job", "10 minutes", func(ec *faas.EventContext, next faas.EventHandler) (*faas.EventContext, error) { fmt.Println("got scheduled event ", string(ec.Request.Data())) - tasks, err := queue.Receive(10) + tasks, err := queue.Receive(ec.Request.Context(), 10) if err != nil { fmt.Println(err) return nil, err @@ -94,7 +94,7 @@ func ExampleNewSchedule() { for _, task := range tasks { fmt.Printf("processing task %s", task.Task().ID) - if err = task.Complete(); err != nil { + if err = task.Complete(ec.Request.Context()); err != nil { fmt.Println(err) } } @@ -128,14 +128,14 @@ func ExampleNewQueue() { os.Exit(1) } - exampleApi.Get("/hello/:name", func(ctx *faas.HttpContext, next faas.HttpHandler) (*faas.HttpContext, error) { - params := ctx.Request.PathParams() + exampleApi.Get("/hello/:name", func(hc *faas.HttpContext, next faas.HttpHandler) (*faas.HttpContext, error) { + params := hc.Request.PathParams() if params == nil || len(params["name"]) == 0 { - ctx.Response.Body = []byte("error retrieving path params") - ctx.Response.Status = http.StatusBadRequest + hc.Response.Body = []byte("error retrieving path params") + hc.Response.Status = http.StatusBadRequest } else { - _, err = queue.Send([]*queues.Task{ + _, err = queue.Send(hc.Request.Context(), []*queues.Task{ { ID: uuid.NewString(), PayloadType: "custom-X", @@ -143,15 +143,15 @@ func ExampleNewQueue() { }, }) if err != nil { - ctx.Response.Body = []byte("error sending event") - ctx.Response.Status = http.StatusInternalServerError + hc.Response.Body = []byte("error sending event") + hc.Response.Status = http.StatusInternalServerError } else { - ctx.Response.Body = []byte("Hello " + params["name"]) - ctx.Response.Status = http.StatusOK + hc.Response.Body = []byte("Hello " + params["name"]) + hc.Response.Status = http.StatusOK } } - return next(ctx) + return next(hc) }) fmt.Println("running example") diff --git a/resources/manager.go b/resources/manager.go index c3fe537..792b156 100644 --- a/resources/manager.go +++ b/resources/manager.go @@ -15,13 +15,17 @@ package resources import ( + "context" "errors" + "fmt" "io" "os" "strings" "sync" multierror "github.com/missionMeteora/toolkit/errors" + "go.opentelemetry.io/otel" + sdktrace "go.opentelemetry.io/otel/sdk/trace" "google.golang.org/grpc" "github.com/nitrictech/go-sdk/api/documents" @@ -49,6 +53,7 @@ type Manager interface { NewQueue(name string, permissions ...QueuePermission) (queues.Queue, error) NewSchedule(name, rate string, handlers ...faas.EventMiddleware) error NewTopic(name string, permissions ...TopicPermission) (Topic, error) + NewRoute(apiName, apiPath string) Route } type manager struct { @@ -65,15 +70,26 @@ type manager struct { builders map[string]faas.HandlerBuilder } -var run = &manager{ - blockers: map[string]Starter{}, - builders: map[string]faas.HandlerBuilder{}, -} +var ( + run = New() + traceInit = sync.Once{} +) // New is used to create the top level resource manager. // Note: this is not required if you are using // resources.NewApi() and the like. These use a default manager instance. func New() Manager { + traceInit.Do(func() { + if os.Getenv("OTELCOL_BIN") != "" { + tp, err := newTracerProvider(context.TODO()) + if err != nil { + fmt.Println(err) + } else { + otel.SetTracerProvider(tp) + } + } + }) + return &manager{ blockers: map[string]Starter{}, builders: map[string]faas.HandlerBuilder{}, @@ -127,6 +143,12 @@ func (m *manager) Run() error { wg.Wait() + tp, ok := otel.GetTracerProvider().(*sdktrace.TracerProvider) + if ok { + _ = tp.ForceFlush(context.TODO()) + _ = tp.Shutdown(context.TODO()) + } + return errList.Err() } diff --git a/resources/trace.go b/resources/trace.go new file mode 100644 index 0000000..3db199c --- /dev/null +++ b/resources/trace.go @@ -0,0 +1,56 @@ +// Copyright 2021 Nitric Pty Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resources + +import ( + "context" + "os" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" +) + +func newTracerProvider(ctx context.Context) (*sdktrace.TracerProvider, error) { + exp, err := otlptracegrpc.New(ctx, otlptracegrpc.WithInsecure()) + if err != nil { + return nil, err + } + + res, err := resource.New(ctx, + resource.WithTelemetrySDK(), + resource.WithAttributes( + semconv.ServiceNamespaceKey.String(os.Getenv("NITRIC_STACK")), + ), + ) + if err != nil { + return nil, err + } + + otel.SetTextMapPropagator( + propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + )) + + return sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.AlwaysSample())), + sdktrace.WithBatcher(exp), + sdktrace.WithResource(res), + ), nil +}