Skip to content

Commit

Permalink
OTel-Arrow exporter timeout propagation (open-telemetry#34733)
Browse files Browse the repository at this point in the history
**Description:** Exporter side of
open-telemetry/otel-arrow#227. The receiver
side is
open-telemetry#34742.

**Link to tracking Issue:**
open-telemetry/otel-arrow#227

**Testing:** Adds a test for the expected metadata propagation.

**Documentation:** Since this is expected of gRPC receivers, no docs are
changed.
  • Loading branch information
jmacd authored and f7o committed Sep 12, 2024
1 parent c899faf commit 0480119
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 49 deletions.
27 changes: 27 additions & 0 deletions .chloggen/otelarrow-exporttimeout.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otelarrowexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add gRPC timeout propagation.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34733]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -499,3 +499,5 @@ replaces:
- github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/cfgardenobserver => ../../extension/observer/cfgardenobserver
- github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter => ../../exporter/rabbitmqexporter
- github.com/open-telemetry/opentelemetry-collector-contrib/receiver/githubreceiver => ../../receiver/githubreceiver
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil => ../../internal/grpcutil

3 changes: 3 additions & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.108.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.108.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.108.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil v0.108.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.108.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.108.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet v0.108.0 // indirect
Expand Down Expand Up @@ -1371,3 +1372,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/obse
replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter => ../../exporter/rabbitmqexporter

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/githubreceiver => ../../receiver/githubreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil => ../../internal/grpcutil
3 changes: 3 additions & 0 deletions exporter/otelarrowexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.22.0

require (
github.com/apache/arrow/go/v16 v16.1.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil v0.108.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow v0.108.0
github.com/open-telemetry/otel-arrow v0.25.0
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -105,3 +106,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/otela
replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver => ../../receiver/otelarrowreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil => ../../internal/grpcutil
5 changes: 5 additions & 0 deletions exporter/otelarrowexporter/internal/arrow/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"
)

Expand Down Expand Up @@ -310,6 +311,10 @@ func (e *Exporter) SendAndWait(ctx context.Context, data any) (bool, error) {
}
md["otlp-pdata-size"] = strconv.Itoa(uncompSize)

if dead, ok := ctx.Deadline(); ok {
md["grpc-timeout"] = grpcutil.EncodeTimeout(time.Until(dead))
}

wri := writeItem{
records: data,
md: md,
Expand Down
128 changes: 79 additions & 49 deletions exporter/otelarrowexporter/internal/arrow/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata"
)
Expand Down Expand Up @@ -576,65 +577,94 @@ func TestArrowExporterStreaming(t *testing.T) {

// TestArrowExporterHeaders tests a mix of outgoing context headers.
func TestArrowExporterHeaders(t *testing.T) {
tc := newSingleStreamMetadataTestCase(t)
channel := newHealthyTestChannel()
for _, withDeadline := range []bool{true, false} {
t.Run(fmt.Sprint("with_deadline=", withDeadline), func(t *testing.T) {

tc.traceCall.AnyTimes().DoAndReturn(tc.returnNewStream(channel))
tc := newSingleStreamMetadataTestCase(t)
channel := newHealthyTestChannel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
require.NoError(t, tc.exporter.Start(ctx))
tc.traceCall.AnyTimes().DoAndReturn(tc.returnNewStream(channel))

var expectOutput []metadata.MD
var actualOutput []metadata.MD
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
md := metadata.MD{}
hpd := hpack.NewDecoder(4096, func(f hpack.HeaderField) {
md[f.Name] = append(md[f.Name], f.Value)
})
for data := range channel.sendChannel() {
if len(data.Headers) == 0 {
actualOutput = append(actualOutput, nil)
} else {
_, err := hpd.Write(data.Headers)
require.NoError(t, tc.exporter.Start(ctx))

var expectOutput []metadata.MD
var actualOutput []metadata.MD

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
md := metadata.MD{}
hpd := hpack.NewDecoder(4096, func(f hpack.HeaderField) {
md[f.Name] = append(md[f.Name], f.Value)
})
for data := range channel.sendChannel() {
if len(data.Headers) == 0 {
actualOutput = append(actualOutput, nil)
} else {
_, err := hpd.Write(data.Headers)
require.NoError(t, err)
actualOutput = append(actualOutput, md)
md = metadata.MD{}
}
channel.recv <- statusOKFor(data.BatchId)
}
}()

for times := 0; times < 10; times++ {
input := testdata.GenerateTraces(2)

if times%2 == 1 {
md := metadata.MD{
"expected1": []string{"metadata1"},
"expected2": []string{fmt.Sprint(times)},
"otlp-pdata-size": []string{"329"},
}
expectOutput = append(expectOutput, md)
} else {
expectOutput = append(expectOutput, metadata.MD{
"otlp-pdata-size": []string{"329"},
})
}

sendCtx := ctx
if withDeadline {
var sendCancel context.CancelFunc
sendCtx, sendCancel = context.WithTimeout(sendCtx, time.Second)
defer sendCancel()
}

sent, err := tc.exporter.SendAndWait(sendCtx, input)
require.NoError(t, err)
actualOutput = append(actualOutput, md)
md = metadata.MD{}
require.True(t, sent)
}
channel.recv <- statusOKFor(data.BatchId)
}
}()

for times := 0; times < 10; times++ {
input := testdata.GenerateTraces(2)
// Stop the test conduit started above.
cancel()
wg.Wait()

if times%2 == 1 {
md := metadata.MD{
"expected1": []string{"metadata1"},
"expected2": []string{fmt.Sprint(times)},
"otlp-pdata-size": []string{"329"},
// Manual check for proper deadline propagation. Since the test
// is timed we don't expect an exact match.
if withDeadline {
for _, out := range actualOutput {
dead := out.Get("grpc-timeout")
require.Len(t, dead, 1)
require.NotEmpty(t, dead[0])
to, err := grpcutil.DecodeTimeout(dead[0])
require.NoError(t, err)
// Allow the test to lapse for 0.5s.
require.Less(t, time.Second/2, to)
require.GreaterOrEqual(t, time.Second, to)
out.Delete("grpc-timeout")
}
}
expectOutput = append(expectOutput, md)
} else {
expectOutput = append(expectOutput, metadata.MD{
"otlp-pdata-size": []string{"329"},
})
}

sent, err := tc.exporter.SendAndWait(context.Background(), input)
require.NoError(t, err)
require.True(t, sent)
require.Equal(t, expectOutput, actualOutput)
require.NoError(t, tc.exporter.Shutdown(ctx))
})
}
// Stop the test conduit started above.
cancel()
wg.Wait()

require.Equal(t, expectOutput, actualOutput)
require.NoError(t, tc.exporter.Shutdown(ctx))
}

// TestArrowExporterIsTraced tests whether trace and span ID are
Expand Down
3 changes: 3 additions & 0 deletions internal/otelarrow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mostynb/go-grpc-compression v1.2.3 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil v0.108.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.108.0 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down Expand Up @@ -110,3 +111,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otela
replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter => ../../exporter/otelarrowexporter

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../sharedcomponent

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil => ../grpcutil
2 changes: 2 additions & 0 deletions receiver/otelarrowreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/otela
replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter => ../../exporter/otelarrowexporter

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil => ../../internal/grpcutil

0 comments on commit 0480119

Please sign in to comment.