Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance Regression Upgrading from v0.71.0 -> v0.73.0 #19798

Closed
chriskuchin opened this issue Mar 20, 2023 · 21 comments · Fixed by #20721
Closed

Performance Regression Upgrading from v0.71.0 -> v0.73.0 #19798

chriskuchin opened this issue Mar 20, 2023 · 21 comments · Fixed by #20721
Labels
bug Something isn't working receiver/fluentforward

Comments

@chriskuchin
Copy link

Component(s)

No response

What happened?

Description

Last week we rolled an upgraded version of OpenTelmetry Contrib to our production environment and started having memory and CPU problems resulting in dropped metrics and high CPU alarms.

We upgraded straight from 71 to 73 no idea which version was the origination of the regression

Steps to Reproduce

Expected Result

Actual Result

You can see from the ecs metrics below where the cpu and memory drops is where we rolled back to v0.71.0.

The blank prior to the lines was the sidecar dropping data due to being overwhelmed

image

Collector version

v0.73.0

Environment information

Environment

OS: ECS Fargate Linux

OpenTelemetry Collector configuration

---
receivers:
  prometheus:
    config:
      scrape_configs:
        - job_name: sidecar
          scrape_interval: 60s
          metrics_path: /metrics
          scheme: http
          static_configs:
            - targets:
                - localhost:8080
        - job_name: otel
          scrape_interval: 60s
          metrics_path: /metrics
          scheme: http
          static_configs:
            - targets: ["localhost:8888"]

  fluentforward/net:
    endpoint: 0.0.0.0:34334

  fluentforward/sock:
    endpoint: unix://var/run/fluent.sock

  awsecscontainermetrics:
    collection_interval: 60s

  statsd:

  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

  jaeger:
    protocols:
      grpc:
      thrift_http:
      thrift_compact:
      thrift_binary:

  zipkin:


exporters:
  logging:
    verbosity: basic

  sumologic:
    endpoint: <url>
    compress_encoding: "gzip"
    max_request_body_size: 1_048_576 # 1MB
    log_format: "json"
    metric_format: "prometheus"
    source_category: ${ENVIRONMENT}/${SERVICE_NAME}
    source_name: ${SERVICE_NAME}
    # source_host: ${TASK}

  loki:
    endpoint: <url>
    format: json
    sending_queue:
      enabled: true
      num_consumers: 10
      queue_size: 5000
    labels:
      attributes:
        team:
        service:
      resource:
        account:
        region:
        environment:

  otlphttp/tempo:
    endpoint: <url>

  prometheusremotewrite/mimir:
    endpoint: <url>
    resource_to_telemetry_conversion:
      enabled: true

  prometheus:
    endpoint: 0.0.0.0:9273
    namespace: ${SERVICE_NAME}
    resource_to_telemetry_conversion:
      enabled: true

{{- $resourceDetection := map "aws.ecs.task.launch_type" "launch_type" "aws.ecs.task.version" "task_version" "cloud.account.id" "account" "cloud.region" "region" "cloud.subnet" "subnet" "cloud.vpc" "vpc" "cloud.availability_zone" "availability_zone" "cloud.availability_zone.id" "availability_zone_id" "aws.ecs.cluster.arn" "" "aws.ecs.task.arn" "" "aws.ecs.task.family" "task_family" "aws.ecs.task.revision" "task_revision" }}
{{- $metricAttributeRemap := map "aws.ecs.task.revision" "task_revision" "cloud.availability_zone" "availability_zone" "cloud.account.id" "account" "cloud.region" "region" }}
{{- $deleteFields := split "aws.ecs.task.pull.started_at,aws.ecs.task.known_status,aws.ecs.task.arn,aws.ecs.cluster.name,aws.ecs.task.pull_started_at,aws.ecs.task.pull_stopped_at,aws.ecs.service.name,container.id,aws.ecs.docker.name,aws.ecs.container.image.id,aws.ecs.container.exit_code,aws.ecs.container.created_at,aws.ecs.container.know_status,aws.ecs.container.image.id,aws.ecs.container.started_at,aws.ecs.container.finished_at,aws.ecs.launchtype" "," }}

processors:
  resource:
    attributes:
      - key: subnet
        action: insert
        value: ${SUBNET_ID}

      - key: environment
        action: upsert
        value: ${ENVIRONMENT}

      - key: cloud.availability_zone.id
        action: insert
        value: ${AVAILABILITY_ZONE_ID}

      - key: subnet
        action: insert
        value: ${SUBNET_ID}

      - key: vpc
        action: insert
        value: ${VPC}

      - key: aws.ecs.task.arn
        action: extract
        pattern: \/(?P<task_id>\w+)$
      - key: aws.ecs.task.id
        action: delete

      - key: aws.ecs.cluster.arn
        action: extract
        pattern: \/(?P<cluster_name>\w+)$
      - key: aws.ecs.cluster.arn
        action: delete

      # Remap resource detection fields
      {{- range $k, $v := $resourceDetection }}
      {{- if ne $v "" }}
      - key: {{ $v }}
        from_attribute: {{ $k }}
        action: upsert
      - key: {{ $k }}
        action: delete
      {{- end }}
      {{- end }}

      # Delete Resource Fields
      {{- range $k, $v := $deleteFields }}
      {{- if ne $v "" }}
      - key: {{ $v }}
        action: delete
      {{- end }}
      {{- end }}

  attributes:
    actions:
      - key: service
        action: upsert
        value: ${SERVICE_NAME}

      - key: team
        action: upsert
        value: ${BILLING_TEAM}

      {{- range $k, $v := $metricAttributeRemap }}
      {{- if ne $v "" }}
      - key: {{ $v }}
        from_attribute: {{ $k }}
        action: upsert
      - key: {{ $k }}
        action: delete
      {{- end }}
      {{- end }}

      {{- range $k, $v := $deleteFields }}
      {{- if ne $v "" }}
      - key: {{ $v }}
        action: delete
      {{- end }}
      {{- end }}

  resource/cloud:
    attributes:
      - key: cloud.availability_zone.id
        action: insert
        value: ${AVAILABILITY_ZONE_ID}
      - key: cloud.subnet.id
        action: insert
        value: ${SUBNET_ID}
      - key: cloud.vpc.id
        action: insert
        value: ${VPC}
      - key: route.environment
        action: insert
        value: ${ENVIRONMENT}
      - key: route.service
        action: insert
        value: ${SERVICE_NAME}
      - key: route.billing.team
        action: insert
        value: ${BILLING_TEAM}
      - key: aws.ecs.task.arn
        action: extract
        pattern: \/(?P<task_id>\w+)$
      - key: aws.ecs.task.id
        action: insert
        from_attribute: task_id
      - key: aws.ecs.cluster.arn
        action: extract
        pattern: \/(?P<cluster_name>\w+)$
      - key: aws.ecs.cluster.name
        action: insert
        from_attribute: cluster_name
      - key: aws.ecs.task.pull_started_at
        action: delete
      - key: aws.ecs.task.pull_stopped_at
        action: delete
      - key: aws.ecs.task.known_status
        action: delete
      - key: aws.ecs.launch_type
        action: delete
      - key: aws.ecs.container.created_at
        action: delete
      - key: aws.ecs.container.started_at
        action: delete
      - key: aws.ecs.container.finished_at
        action: delete
      - key: aws.ecs.container.know_status
        action: delete
      - key: aws.ecs.docker.name
        action: delete
      - key: aws.ecs.container.image.id
        action: delete
      - key: aws.ecs.container.exit_code
        action: delete

  attributes/cleanup:
    actions:
      - key: cluster_name
        action: delete
      - key: ecs_task_definition
        action: delete
      - key: fluent.tag
        action: delete
      - key: ecs_task_arn
        action: delete
      - key: task_id
        action: delete

  resourcedetection/ecs:
    detectors: [env, ecs]
    timeout: 2s
    override: false
    attributes:
      {{- range $k, $v := $resourceDetection }}
      - {{ $k }}
      {{- end }}

  probabilistic_sampler:
    hash_seed: 13
    sampling_percentage: 10

  probabilistic_sampler/tempo:
    hash_seed: 13
    sampling_percentage: 100

  memory_limiter:
    check_interval: 5s
    limit_mib: 128
    spike_limit_mib: 0

  batch:
    timeout: 200ms

extensions:
  memory_ballast:
    size_mib: 64

service:
  extensions: [memory_ballast]
  telemetry:
    logs:
      level: warn
  pipelines:
    metrics:
      receivers: [prometheus, statsd, otlp]
      processors: [memory_limiter, resourcedetection/ecs, resource/cloud, attributes/cleanup]
      exporters: [prometheus, sumologic]
    metrics/mimir:
      receivers: [awsecscontainermetrics, prometheus, statsd, otlp]
      processors: [memory_limiter, resourcedetection/ecs, resource, attributes, batch]
      exporters: [prometheusremotewrite/mimir]
    logs:
      receivers: [fluentforward/net, fluentforward/sock]
      processors: [memory_limiter, resourcedetection/ecs, resource/cloud, attributes/cleanup]
      exporters: [sumologic]
    logs/loki:
      receivers: [fluentforward/net, fluentforward/sock]
      processors: [memory_limiter, resourcedetection/ecs, resource, attributes, batch]
      exporters: [loki]
    traces:
      receivers: [otlp, zipkin, jaeger]
      processors: [memory_limiter, probabilistic_sampler, batch, resourcedetection/ecs, resource/cloud, attributes/cleanup]
      exporters: [logging]
    traces/tempo:
      receivers: [otlp, zipkin, jaeger]
      processors: [memory_limiter, probabilistic_sampler/tempo, resourcedetection/ecs, batch]
      exporters: [otlp/tempo]

Log output

No response

Additional context

I use a confd based entrypoint in my container to render the config hence the metadata templating pieces.

We utilize the APK published by the contrib project for alpine

@chriskuchin chriskuchin added bug Something isn't working needs triage New item requiring triage labels Mar 20, 2023
@atoulme atoulme removed the needs triage New item requiring triage label Mar 21, 2023
@atoulme
Copy link
Contributor

atoulme commented Mar 21, 2023

Hard to tell what is causing this. Your set up is too complex to understand which component triggered the raise in usage.

Can you try upgrading to 72?

@fatsheep9146
Copy link
Contributor

Hard to tell what is causing this. Your set up is too complex to understand which component triggered the raise in usage.

Can you try upgrading to 72?

I agreed with this, can you try 0.72 first, we can narrow down the scope of the problem.
@chriskuchin

@chriskuchin
Copy link
Author

Unfortunately all my test apps didn't reproduce it and when it hit production it was a non trivial incident. However if it helps my best guess is it will be in the logs pipelines...

The reason is we have only just introduced engineers to Metrics and Traces and we don't have lots of adoption and the impact was much wider than the adoption of traces and metrics. Plus the Logs are very chatty since many services were using logs for metrics.

It's on my roadmap to get some sort of test that will trigger this but with current priorities I have no way to try and repro this right now. We have just pinned to 0.71 for now

@chriskuchin
Copy link
Author

The other detail was that the performance was probably related to an increase in memory usage. We have the memory_limiter configured so most of the cpu was the memory limiter trying to get the memory down I assume

@akunchamdd
Copy link

akunchamdd commented Mar 24, 2023

We experienced the similar issue upgrading from 0.72 -> 0.73.

One commonality between the above config and ours is fluentforward receiver. We didn't see this issue in OTEL collector pods meant for traces which don't use fluentforward receiver.

Screenshot 2023-03-24 at 8 55 04 AM

    exporters:
      kafka:
    extensions:
      health_check: {}
      memory_ballast:
           size_mib: 64
    processors:
      batch:
        send_batch_max_size: 12000
        send_batch_size: 8000
      groupbyattrs/kube:
        keys:
        - fluent.tag
        - host.name
        - k8s.node.name
        - k8s.namespace.name
        - k8s.pod.name
        - k8s.pod.uid
    receivers:
      fluentforward:
        endpoint: 0.0.0.0:8006
    service:
      extensions:
      - health_check
      - memory_ballast
      - zpages
      pipelines:
        logs:
          exporters:
          - kafka
          processors:
          - batch
          - groupbyattrs/kube
          receivers:
          - fluentforward
      telemetry:
        metrics:

@dlahn
Copy link

dlahn commented Apr 4, 2023

We are also seeing this in 0.74. As above, we are also using the fluentforward receiver and the loki exporter. This started happening when we added these 2 components and started shipping more logs.

We see CPU and memory slowly going up.

@atoulme
Copy link
Contributor

atoulme commented Apr 4, 2023

Please use the pprof extension to take a snapshot of memory usage over time and see where the leak might be.

@github-actions
Copy link
Contributor

github-actions bot commented Apr 4, 2023

Pinging code owners for receiver/fluentforward: @dmitryax. See Adding Labels via Comments if you do not have permissions to add labels yourself.

@github-actions
Copy link
Contributor

github-actions bot commented Apr 4, 2023

Pinging code owners for exporter/loki: @gramidt @gouthamve @jpkrohling @kovrus @mar4uk. See Adding Labels via Comments if you do not have permissions to add labels yourself.

@jpkrohling
Copy link
Member

@mar4uk, @kovrus, are we seeing similar spikes related to the Loki exporter?

@dlahn
Copy link

dlahn commented Apr 5, 2023

@jpkrohling I'm pretty sure it's actually the fluent forward receiver, as this was changed between these releases. I'm trying some details from pprof.

@jpkrohling
Copy link
Member

One idea in case pprof doesn't help: you can try building your own collector distribution, using the latest Loki exporter and the previous Fluentforward receiver. Something like:

dist:
  module: github.com/open-telemetry/opentelemetry-collector-releases/contrib
  name: otelcol-contrib
  description: OpenTelemetry Collector Contrib
  version: 0.74.0
  output_path: ./_build
  otelcol_version: 0.74.0

exporters:
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter v0.74.0

receivers:
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/fluentforwardreceiver v0.73.0

@dlahn
Copy link

dlahn commented Apr 5, 2023

@atoulme It looks like it's the fluentforward receiver. I've attached the image.

heap

Perhaps this is related to #18860? This seems to be the only change recently.

Here is a screenshot of the resources of the tasks. You can see that the CPU and the memory are increasing slowly over time. There are some dips, but these are reboots:
Screenshot 2023-04-05 at 15 06 36

This is a staging environment and it receives fairly minimal traffic.

@dlahn
Copy link

dlahn commented Apr 5, 2023

Adding a CPU profile:

cpu-profile

@jpkrohling jpkrohling removed the exporter/loki Loki Exporter label Apr 5, 2023
@dmitryax
Copy link
Member

dmitryax commented Apr 5, 2023

Looks like it's indeed caused by #18860

We have one testbed test case covering fluentforward receiver and it shows a significant increase from:

https://github.com/open-telemetry/opentelemetry-collector-contrib/actions/runs/4294311941/jobs/7483225968

Test                                    |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items|
----------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:|
Log10kDPS/FluentForward-SplunkHEC       |PASS  |     15s|    19.2|    19.3|         57|         83|    148200|        148200|

to

https://github.com/open-telemetry/opentelemetry-collector-contrib/actions/runs/4295166937/jobs/7485433282

Test                                    |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items|
----------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:|
Log10kDPS/FluentForward-SplunkHEC       |PASS  |     15s|    28.6|    31.7|         60|         88|    147500|        147500|

@TylerHelmuth, do you have a chance to take a look into that?

@TylerHelmuth
Copy link
Member

I'm looking into it, but don't see anything immediately wrong with #18860. The pattern fluentforward receiver is using for obsreport is the same as other receivers in Contrib.

@TylerHelmuth
Copy link
Member

TylerHelmuth commented Apr 5, 2023

Ok I found the problem.

#18860 implementation passed the function's ctx into c.obsrecv.StartLogsOp(ctx), which is the pattern we seen in other Receivers. StartLogs modifies the context returning a new context with a span id.

The problem is the the ctx is never changing. The receiver is setup with a single context at the start up, and that is the context that gets passed into processLogs and then into StartLogs. Each time the receiver forwards data to the processors it would be modified by StartLogs and the underlying context grows. I found this by printing the context during the load test after obsreport/consumerlogs returned: c.logger.Info("current ctx", zap.Any("ctx", ctx))

With the obsreport stuff:

2023-04-05T13:29:12.512-0600	info	[email protected]/collector.go:71	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel.WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>)"}
2023-04-05T13:29:12.512-0600	info	[email protected]/collector.go:71	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel.WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>)"}
2023-04-05T13:29:12.516-0600	info	[email protected]/collector.go:71	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel.WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>)"}
2023-04-05T13:29:12.517-0600	info	[email protected]/collector.go:71	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel.WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>)"}
2023-04-05T13:29:12.517-0600	info	[email protected]/collector.go:71	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel.WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>)"}
2023-04-05T13:29:12.533-0600	info	[email protected]/collector.go:71	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel.WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>)"}
2023-04-05T13:29:12.533-0600	info	[email protected]/collector.go:71	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel.WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>)"}
2023-04-05T13:29:12.537-0600	info	[email protected]/collector.go:71	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel.WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>)"}
2023-04-05T13:29:12.537-0600	info	[email protected]/collector.go:71	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel.WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>)"}
2023-04-05T13:29:12.537-0600	info	[email protected]/collector.go:71	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel.WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>).WithValue(type tag.ctxKey, val { {receiver {fluentforward {{0}}}}{transport {http {{0}}}} }).WithValue(type trace.traceContextKeyType, val <not Stringer>)"}
...

Without:

2023-04-05T13:31:04.415-0600	info	service/service.go:146	Everything is ready. Begin running and processing data.
2023-04-05T13:31:04.577-0600	info	[email protected]/collector.go:68	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel"}
2023-04-05T13:31:04.577-0600	info	[email protected]/collector.go:68	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel"}
2023-04-05T13:31:04.578-0600	info	[email protected]/collector.go:68	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel"}
2023-04-05T13:31:04.578-0600	info	[email protected]/collector.go:68	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel"}
2023-04-05T13:31:04.578-0600	info	[email protected]/collector.go:68	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel"}
2023-04-05T13:31:04.578-0600	info	[email protected]/collector.go:68	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel"}
2023-04-05T13:31:04.578-0600	info	[email protected]/collector.go:68	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel"}
2023-04-05T13:31:04.578-0600	info	[email protected]/collector.go:68	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel"}
2023-04-05T13:31:04.578-0600	info	[email protected]/collector.go:68	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel"}
2023-04-05T13:31:04.578-0600	info	[email protected]/collector.go:68	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel"}
2023-04-05T13:31:04.578-0600	info	[email protected]/collector.go:68	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel"}
2023-04-05T13:31:04.578-0600	info	[email protected]/collector.go:68	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel"}
2023-04-05T13:31:04.578-0600	info	[email protected]/collector.go:68	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel"}
2023-04-05T13:31:04.578-0600	info	[email protected]/collector.go:68	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel"}
2023-04-05T13:31:04.578-0600	info	[email protected]/collector.go:68	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel"}
2023-04-05T13:31:04.578-0600	info	[email protected]/collector.go:68	current ctx	{"kind": "receiver", "name": "fluentforward", "data_type": "logs", "ctx": "context.Background.WithCancel"}
...

I'm not very familiar with receivers or fluentbit so I'm not sure if the single, long-lived context is inappropriate or if there is a bug in obserport that is mutating the context incorrectly. A potential solution to this problem is to pass in a new context.Background to StartLogsOp and then we could pass the returned context into both ConsumeLogs and EndLogsOp or just EndLogsOp.

@dmitryax how would you like to proceed?

@dmitryax
Copy link
Member

dmitryax commented Apr 5, 2023

@TylerHelmuth nice investigation. Thank you! I believe we should not mutate the original long-lived context.

c.obsrecv.StartLogsOp(ctx) should create a new context which is passed to c.obsrecv.EndLogsOp after that.

It should be done like this for other receivers, we probably didn't run into this because other receivers use a context per request, not the one that reused in a consume loop

@TylerHelmuth
Copy link
Member

@dmitryax
Copy link
Member

dmitryax commented Apr 5, 2023

I think obsreport is good. We just shouldn't mutate the existing ctx here

Let's do

obsCtx := c.obsrecv.StartLogsOp(ctx)

instead

@TylerHelmuth
Copy link
Member

Ah, I mis-interpreted your comment. I've actually already tested that scenario and it works as expected. I'll submit a PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working receiver/fluentforward
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants