Skip to content

Commit

Permalink
feat: add trace context to both http and event requests
Browse files Browse the repository at this point in the history
  • Loading branch information
asalkeld committed Nov 22, 2022
1 parent 1cf363f commit 6512b6b
Show file tree
Hide file tree
Showing 50 changed files with 354 additions and 156 deletions.
18 changes: 9 additions & 9 deletions api/documents/document_ref.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ type DocumentRef interface {
Id() string

// Get - Retrieve the value of the document
Get() (Document, error)
Get(context.Context) (Document, error)

// Set - Sets the value of the document
Set(map[string]interface{}) error
Set(context.Context, map[string]interface{}) error

// Delete - Deletes the document
Delete() error
Delete(context.Context) error

// Collection - Retrieve a child collection of this document
Collection(string) (CollectionRef, error)
Expand Down Expand Up @@ -119,16 +119,16 @@ func (d *documentRefImpl) Collection(c string) (CollectionRef, error) {
}

// Delete - Deletes the document this reference refers to if it exists
func (d *documentRefImpl) Delete() error {
_, err := d.dc.Delete(context.TODO(), &v1.DocumentDeleteRequest{
func (d *documentRefImpl) Delete(ctx context.Context) error {
_, err := d.dc.Delete(ctx, &v1.DocumentDeleteRequest{
Key: d.toWireKey(),
})

return err
}

// Set - Sets the contents of the document this reference refers to
func (d *documentRefImpl) Set(content map[string]interface{}) error {
func (d *documentRefImpl) Set(ctx context.Context, content map[string]interface{}) error {
sv, err := protoutils.NewStruct(content)
if err != nil {
return errors.NewWithCause(
Expand All @@ -138,7 +138,7 @@ func (d *documentRefImpl) Set(content map[string]interface{}) error {
)
}

if _, err = d.dc.Set(context.TODO(), &v1.DocumentSetRequest{
if _, err = d.dc.Set(ctx, &v1.DocumentSetRequest{
Key: d.toWireKey(),
Content: sv,
}); err != nil {
Expand All @@ -153,8 +153,8 @@ type DecodeOption interface {
}

// Get - Retrieves the Document this reference refers to if it exists
func (d *documentRefImpl) Get() (Document, error) {
res, err := d.dc.Get(context.TODO(), &v1.DocumentGetRequest{
func (d *documentRefImpl) Get(ctx context.Context) (Document, error) {
res, err := d.dc.Get(ctx, &v1.DocumentGetRequest{
Key: d.toWireKey(),
})
if err != nil {
Expand Down
13 changes: 7 additions & 6 deletions api/documents/document_ref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package documents

import (
"context"
"fmt"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -108,7 +109,7 @@ var _ = Describe("DocumentRef", func() {
},
}

err := md.Delete()
err := md.Delete(context.TODO())

It("should pass through the returned error", func() {
Expect(err).To(HaveOccurred())
Expand All @@ -133,7 +134,7 @@ var _ = Describe("DocumentRef", func() {
},
}

err := md.Delete()
err := md.Delete(context.TODO())

It("should not return an error", func() {
Expect(err).ToNot(HaveOccurred())
Expand All @@ -159,7 +160,7 @@ var _ = Describe("DocumentRef", func() {
},
}

err := md.Set(map[string]interface{}{
err := md.Set(context.TODO(), map[string]interface{}{
"test": "test",
})

Expand All @@ -186,7 +187,7 @@ var _ = Describe("DocumentRef", func() {
},
}

err := md.Set(map[string]interface{}{
err := md.Set(context.TODO(), map[string]interface{}{
"test": "test",
})

Expand Down Expand Up @@ -214,7 +215,7 @@ var _ = Describe("DocumentRef", func() {
},
}

_, err := md.Get()
_, err := md.Get(context.TODO())

It("should pass through the returned error", func() {
Expect(err).To(HaveOccurred())
Expand Down Expand Up @@ -246,7 +247,7 @@ var _ = Describe("DocumentRef", func() {
},
}

d, _ := md.Get()
d, _ := md.Get(context.TODO())

It("should provide the returned document", func() {
Expect(d.Content()).To(Equal(map[string]interface{}{
Expand Down
12 changes: 6 additions & 6 deletions api/documents/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ type Query interface {
FromPagingToken(interface{}) Query

// Fetch - Return paged values
Fetch() (*FetchResult, error)
Fetch(ctx context.Context) (*FetchResult, error)

// Stream - Return an iterator containing values
Stream() (DocumentIter, error)
Stream(ctx context.Context) (DocumentIter, error)
}

// Defacto Query interface implementation
Expand Down Expand Up @@ -84,7 +84,7 @@ func (q *queryImpl) expressionsToWire() ([]*v1.Expression, error) {
return expressions, nil
}

func (q *queryImpl) Fetch() (*FetchResult, error) {
func (q *queryImpl) Fetch(ctx context.Context) (*FetchResult, error) {
// build the expressions list
expressions, err := q.expressionsToWire()
if err != nil {
Expand All @@ -101,7 +101,7 @@ func (q *queryImpl) Fetch() (*FetchResult, error) {
token = t
}

r, err := q.dc.Query(context.TODO(), &v1.DocumentQueryRequest{
r, err := q.dc.Query(ctx, &v1.DocumentQueryRequest{
Collection: q.col.ToWire(),
Expressions: expressions,
Limit: int32(q.limit),
Expand Down Expand Up @@ -132,14 +132,14 @@ func (q *queryImpl) Fetch() (*FetchResult, error) {
}, nil
}

func (q *queryImpl) Stream() (DocumentIter, error) {
func (q *queryImpl) Stream(ctx context.Context) (DocumentIter, error) {
// build the expressions list
expressions, err := q.expressionsToWire()
if err != nil {
return nil, err
}

r, err := q.dc.QueryStream(context.TODO(), &v1.DocumentQueryStreamRequest{
r, err := q.dc.QueryStream(ctx, &v1.DocumentQueryStreamRequest{
Collection: q.col.ToWire(),
Expressions: expressions,
Limit: int32(q.limit),
Expand Down
15 changes: 8 additions & 7 deletions api/documents/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package documents

import (
"context"
"fmt"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -109,7 +110,7 @@ var _ = Describe("Query", func() {
})
q.Where(Condition("test").Eq(StringValue("test")))

_, err := q.Fetch()
_, err := q.Fetch(context.TODO())

It("should unwrap the gRPC error", func() {
Expect(err).To(HaveOccurred())
Expand Down Expand Up @@ -149,7 +150,7 @@ var _ = Describe("Query", func() {
Condition("test").Eq(StringValue("test")),
)

r, err := q.Fetch()
r, err := q.Fetch(context.TODO())

It("should return documents", func() {
By("not returning an error")
Expand All @@ -176,7 +177,7 @@ var _ = Describe("Query", func() {

q.FromPagingToken("blah")

_, err := q.Fetch()
_, err := q.Fetch(context.TODO())

It("should return an error", func() {
Expect(err).To(HaveOccurred())
Expand All @@ -194,7 +195,7 @@ var _ = Describe("Query", func() {

q.Where(&queryExpression{})

_, err := q.Fetch()
_, err := q.Fetch(context.TODO())

It("should return an error", func() {
Expect(err).To(HaveOccurred())
Expand All @@ -219,7 +220,7 @@ var _ = Describe("Query", func() {
})
q.Where(Condition("test").Eq(StringValue("test")))

_, err := q.Stream()
_, err := q.Stream(context.TODO())

It("should unwrap the gRPC error", func() {
Expect(err).To(HaveOccurred())
Expand All @@ -245,7 +246,7 @@ var _ = Describe("Query", func() {
Condition("test").Eq(StringValue("test")),
)

r, err := q.Stream()
r, err := q.Stream(context.TODO())

It("should not return an error", func() {
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -273,7 +274,7 @@ var _ = Describe("Query", func() {

q.Where(&queryExpression{})

_, err := q.Stream()
_, err := q.Stream(context.TODO())

It("should return an error", func() {
Expect(err).To(HaveOccurred())
Expand Down
5 changes: 3 additions & 2 deletions api/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package events

import (
"context"
"fmt"
"os"

Expand Down Expand Up @@ -144,7 +145,7 @@ var _ = Describe("Events", func() {
tc: mockTopic,
}

returnEvt, err := evt.Topic("test-topic").Publish(&Event{
returnEvt, err := evt.Topic("test-topic").Publish(context.TODO(), &Event{
Payload: map[string]interface{}{
"test": "test",
},
Expand All @@ -171,7 +172,7 @@ var _ = Describe("Events", func() {
tc: mockTopic,
}

_, err := evt.Topic("test-topic").Publish(&Event{
_, err := evt.Topic("test-topic").Publish(context.TODO(), &Event{
Payload: map[string]interface{}{
"test": "test",
},
Expand Down
6 changes: 3 additions & 3 deletions api/events/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Topic interface {
Name() string

// Publish will publish the provided events on the topic.
Publish(*Event, ...PublishOption) (*Event, error)
Publish(context.Context, *Event, ...PublishOption) (*Event, error)
}

type topicImpl struct {
Expand All @@ -51,7 +51,7 @@ func WithDelay(duration time.Duration) func(*v1.EventPublishRequest) {
}
}

func (s *topicImpl) Publish(evt *Event, opts ...PublishOption) (*Event, error) {
func (s *topicImpl) Publish(ctx context.Context, evt *Event, opts ...PublishOption) (*Event, error) {
// Convert payload to Protobuf Struct
payloadStruct, err := protoutils.NewStruct(evt.Payload)
if err != nil {
Expand All @@ -72,7 +72,7 @@ func (s *topicImpl) Publish(evt *Event, opts ...PublishOption) (*Event, error) {
opt(event)
}

r, err := s.ec.Publish(context.TODO(), event)
r, err := s.ec.Publish(ctx, event)
if err != nil {
return nil, errors.FromGrpcError(err)
}
Expand Down
12 changes: 6 additions & 6 deletions api/queues/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ type Queue interface {
// Name - The name of the queue
Name() string
// Send - Push a number of tasks to a queue
Send([]*Task) ([]*FailedTask, error)
Send(context.Context, []*Task) ([]*FailedTask, error)
// Receive - Retrieve tasks from a queue to a maximum of the given depth
Receive(int) ([]ReceivedTask, error)
Receive(context.Context, int) ([]ReceivedTask, error)
}

type queueImpl struct {
Expand All @@ -41,12 +41,12 @@ func (q *queueImpl) Name() string {
return q.name
}

func (q *queueImpl) Receive(depth int) ([]ReceivedTask, error) {
func (q *queueImpl) Receive(ctx context.Context, depth int) ([]ReceivedTask, error) {
if depth < 1 {
return nil, errors.New(codes.InvalidArgument, "Queue.Receive: depth cannot be less than 1")
}

r, err := q.c.Receive(context.TODO(), &v1.QueueReceiveRequest{
r, err := q.c.Receive(ctx, &v1.QueueReceiveRequest{
Queue: q.name,
Depth: int32(depth),
})
Expand All @@ -68,7 +68,7 @@ func (q *queueImpl) Receive(depth int) ([]ReceivedTask, error) {
return rts, nil
}

func (q *queueImpl) Send(tasks []*Task) ([]*FailedTask, error) {
func (q *queueImpl) Send(ctx context.Context, tasks []*Task) ([]*FailedTask, error) {
// Convert SDK Task objects to gRPC Task objects
wireTasks := make([]*v1.NitricTask, len(tasks))
for i, task := range tasks {
Expand All @@ -84,7 +84,7 @@ func (q *queueImpl) Send(tasks []*Task) ([]*FailedTask, error) {
}

// Push the tasks to the queue
res, err := q.c.SendBatch(context.Background(), &v1.QueueSendBatchRequest{
res, err := q.c.SendBatch(ctx, &v1.QueueSendBatchRequest{
Queue: q.name,
Tasks: wireTasks,
})
Expand Down
9 changes: 5 additions & 4 deletions api/queues/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package queues

import (
"context"
"fmt"

"github.com/golang/mock/gomock"
Expand All @@ -40,7 +41,7 @@ var _ = Describe("Queue", func() {
c: mockQ,
}

_, err := q.Send([]*Task{
_, err := q.Send(context.TODO(), []*Task{
{
ID: "1234",
PayloadType: "test-payload",
Expand Down Expand Up @@ -80,7 +81,7 @@ var _ = Describe("Queue", func() {
c: mockQ,
}

fts, _ := q.Send([]*Task{
fts, _ := q.Send(context.TODO(), []*Task{
{
ID: "1234",
PayloadType: "test-payload",
Expand All @@ -107,7 +108,7 @@ var _ = Describe("Queue", func() {
name: "test-queue",
}

_, err := q.Receive(0)
_, err := q.Receive(context.TODO(), 0)

It("should return an error", func() {
Expect(err).To(HaveOccurred())
Expand Down Expand Up @@ -137,7 +138,7 @@ var _ = Describe("Queue", func() {
c: mockQ,
}

t, _ := q.Receive(1)
t, _ := q.Receive(context.TODO(), 1)

It("should receive a single task", func() {
Expect(t).To(HaveLen(1))
Expand Down
6 changes: 3 additions & 3 deletions api/queues/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type ReceivedTask interface {
// Task - Returns the Task data contained in this Received Task instance
Task() *Task
// Complete - Completes the task removing it from the queue
Complete() error
Complete(context.Context) error
}

type receivedTaskImpl struct {
Expand All @@ -56,8 +56,8 @@ func (r *receivedTaskImpl) Queue() string {
return r.queue
}

func (r *receivedTaskImpl) Complete() error {
_, err := r.qc.Complete(context.TODO(), &v1.QueueCompleteRequest{
func (r *receivedTaskImpl) Complete(ctx context.Context) error {
_, err := r.qc.Complete(ctx, &v1.QueueCompleteRequest{
Queue: r.queue,
LeaseId: r.leaseId,
})
Expand Down
Loading

0 comments on commit 6512b6b

Please sign in to comment.