Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supplying metadata causes error when publishing message with Kafka PubSub component #3171

Closed
olitomlinson opened this issue Oct 9, 2023 · 6 comments
Labels
kind/bug Something isn't working

Comments

@olitomlinson
Copy link

tldr : error message : Detail="error when publish to topic workflowTopic in pubsub kafka-pubsub: kafka: invalid configuration (Producing headers requires Kafka at least v0.11)"

I've tried this on dapr runtime 1.10.0, 1.11.0 and 1.12.0-rc.5 and the error message is the same, so doesn't seem like a recent regression.

If I swap out Kafka for Redis, the message publishes as expected, so this sounds like a bug.


I'm trying to override the CE fields like below :

    var metadata = new Dictionary<string, string>();
    metadata.Add("cloudevent.id", "123");
    metadata.Add("cloudevent.type", "123");
    metadata.Add("cloudevent.source", "123");

    await daprClient.PublishEventAsync<StartWorkflowRequest>("kafka-pubsub", "workflowTopic", request, metadata, cts.Token);

However, on publish the side car rejects the message :

2023-10-09 21:50:49 fail: Microsoft.AspNetCore.Server.Kestrel[13]
2023-10-09 21:50:49       Connection id "0HMU915F0635H", Request id "0HMU915F0635H:00000002": An unhandled exception was thrown by the application.
2023-10-09 21:50:49       Dapr.DaprException: Publish operation failed: the Dapr endpoint indicated a failure. See InnerException for details.
2023-10-09 21:50:49        ---> Grpc.Core.RpcException: Status(StatusCode="Internal", Detail="error when publish to topic workflowTopic in pubsub kafka-pubsub: kafka: invalid configuration (Producing headers requires Kafka at least v0.11)")
2023-10-09 21:50:49          at Dapr.Client.DaprClientGrpc.MakePublishRequest(String pubsubName, String topicName, ByteString content, Dictionary`2 metadata, String dataContentType, CancellationToken cancellationToken)
2023-10-09 21:50:49          --- End of inner exception stack trace ---
2023-10-09 21:50:49          at Dapr.Client.DaprClientGrpc.MakePublishRequest(String pubsubName, String topicName, ByteString content, Dictionary`2 metadata, String dataContentType, CancellationToken cancellationToken)
2023-10-09 21:50:49          at Program.<>c__DisplayClass0_1.<<<Main>$>b__7>d.MoveNext() in /src/Client/Program.cs:line 41
2023-10-09 21:50:49       --- End of stack trace from previous location ---
2023-10-09 21:50:49          at System.Threading.Tasks.Parallel.<>c__50`1.<<ForEachAsync>b__50_0>d.MoveNext()
2023-10-09 21:50:49       --- End of stack trace from previous location ---
2023-10-09 21:50:49          at Program.<>c__DisplayClass0_0.<<<Main>$>b__0>d.MoveNext() in /src/Client/Program.cs:line 32
2023-10-09 21:50:49       --- End of stack trace from previous location ---
2023-10-09 21:50:49          at Microsoft.AspNetCore.Http.RequestDelegateFactory.<ExecuteTask>g__ExecuteAwaited|58_0[T](Task`1 task, HttpContext httpContext)
2023-10-09 21:50:49          at Microsoft.AspNetCore.Routing.EndpointMiddleware.<Invoke>g__AwaitRequestTask|6_0(Endpoint endpoint, Task requestTask, ILogger logger)
2023-10-09 21:50:49          at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.HttpProtocol.ProcessRequests[TContext](IHttpApplication`1 application)

kakfa/zookeeper configuration (I've also tried with the latest kafka/zookeeper tags)

zookeeper:
    image: confluentinc/cp-zookeeper:7.0.4
    networks:
      - network
    restart: unless-stopped
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ALLOW_ANONYMOUS_LOGIN: "true"
    ports:
      - 2181:2181
    volumes:
      - zookeeper_logs:/var/lib/zookeeper/log
      - zookeeper_data:/var/lib/zookeeper/data
  kafka:
    image: confluentinc/cp-kafka:7.0.4
    networks:
      - network
    restart: unless-stopped
    depends_on:
      - zookeeper
    healthcheck:
      test: nc -z localhost 9092 || exit -1
      interval: 10s
      retries: 10
      timeout: 30s
      start_period: 10s
    ports:
      - 9092:9092
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LOG4J_ROOT_LOGLEVEL: ERROR
      KAFKA_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.apache.kafka=ERROR,kafka=ERROR,kafka.cluster=ERROR,kafka.controller=ERROR,kafka.coordinator=ERROR,kafka.log=ERROR,kafka.server=ERROR,kafka.zookeeper=ERROR,state.change.logger=ERROR
      KAFKA_JMX_PORT: 9997
      KAFKA_NUM_PARTITIONS: 2
@olitomlinson olitomlinson added the kind/bug Something isn't working label Oct 9, 2023
@olitomlinson
Copy link
Author

this is blocking the adoption of kafka pubsub for my team who are in the process of migrating from redis to kafka components.

@yaron2
Copy link
Member

yaron2 commented Oct 18, 2023

Is your Kafka version 0.11 or above?

@olitomlinson
Copy link
Author

@yaron2 yes I'm using confluents 7.x images, which uses Kafka 3.x according to this doc

https://docs.confluent.io/platform/current/installation/versions-interoperability.html

Unless I'm massively getting the wrong end of the stick here :)

@yaron2
Copy link
Member

yaron2 commented Oct 18, 2023

@yaron2 yes I'm using confluents 7.x images, which uses Kafka 3.x according to this doc

https://docs.confluent.io/platform/current/installation/versions-interoperability.html

Unless I'm massively getting the wrong end of the stick here :)

So we can narrow down the cause, can you please replace the Confluent image with vanilla Kafka (see here) and report if that works?

@yaron2
Copy link
Member

yaron2 commented Oct 18, 2023

Also, can you look at the logs of the Confluent image, get the exact Kafka version used and then try adding the following to the component YAML?

- name: version
  value: 3.0.0 # REPLACE WITH THE ACTUAL VERSION

@olitomlinson
Copy link
Author

@yaron2

Turns out I copied the component metadata verbatim from here :

  - name: version # Optional.
    value: 0.10.2.0

I delete this, and everything works as expected, which makes sense as the docs say it defaults to 2.0.0

So, I'll raise a docs issue to get example yaml changed to 2.0.0 so people who copy the component spec don't fall into the same trap as me :)

Thanks for the guidance!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants