Skip to content

Latest commit

 

History

History

kafka

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 

Kafka Bindings

This document defines how to describe Kafka-specific information on AsyncAPI.

Version

Current version is 0.5.0.

Server Binding Object

This object contains information about the server representation in Kafka.

Fixed Fields
Field Name Type Description Applicability [default] Constraints
schemaRegistryUrl string (url) API URL for the Schema Registry used when producing Kafka messages (if a Schema Registry was used) OPTIONAL -
schemaRegistryVendor string The vendor of Schema Registry and Kafka serdes library that should be used (e.g. apicurio, confluent, ibm, or karapace) OPTIONAL MUST NOT be specified if schemaRegistryUrl is not specified
bindingVersion string The version of this binding. OPTIONAL [latest]
Example
servers:
  production:
    bindings:
      kafka:
        schemaRegistryUrl: 'https://my-schema-registry.com'
        schemaRegistryVendor: 'confluent'
        bindingVersion: '0.5.0'

Channel Binding Object

This object contains information about the channel representation in Kafka (eg. a Kafka topic).

Fixed Fields
Field Name Type Description Applicability [default] Constraints
topic string Kafka topic name if different from channel name. OPTIONAL -
partitions integer Number of partitions configured on this topic (useful to know how many parallel consumers you may run). OPTIONAL Must be positive
replicas integer Number of replicas configured on this topic. OPTIONAL MUST be positive
topicConfiguration TopicConfiguration Object Topic configuration properties that are relevant for the API. OPTIONAL -
bindingVersion string The version of this binding. If omitted, "latest" MUST be assumed. OPTIONAL [latest] -

This object MUST contain only the properties defined above.

Example
channels:
  user-signedup:
    bindings:
      kafka:
        topic: 'my-specific-topic-name'
        partitions: 20
        replicas: 3
        topicConfiguration:
          cleanup.policy: ["delete", "compact"]
          retention.ms: 604800000
          retention.bytes: 1000000000
          delete.retention.ms: 86400000
          max.message.bytes: 1048588
        bindingVersion: '0.5.0'

TopicConfiguration Object

This objects contains information about the API relevant topic configuration in Kafka.

Field Name Type Description Applicability [default] Constraints
cleanup.policy array The cleanup.policy configuration option. OPTIONAL array may only contain delete and/or compact
retention.ms long The retention.ms configuration option. OPTIONAL see kafka documentation
retention.bytes long The retention.bytes configuration option. OPTIONAL see kafka documentation
delete.retention.ms long The delete.retention.ms configuration option. OPTIONAL see kafka documentation
max.message.bytes integer The max.message.bytes configuration option. OPTIONAL see kafka documentation
confluent.key.schema.validation boolean It shows whether the schema validation for the message key is enabled. Vendor specific config. OPTIONAL -
confluent.key.subject.name.strategy string The name of the schema lookup strategy for the message key. Vendor specific config. OPTIONAL Clients should default to the vendor default if not supplied.
confluent.value.schema.validation boolean It shows whether the schema validation for the message value is enabled. Vendor specific config. OPTIONAL -
confluent.value.subject.name.strategy string The name of the schema lookup strategy for the message value. Vendor specific config. OPTIONAL Clients should default to the vendor default if not supplied.

This object MAY contain the properties defined above including optional additional properties.

Example
topicConfiguration:
  cleanup.policy: ["delete", "compact"]
  retention.ms: 604800000
  retention.bytes: 1000000000
  delete.retention.ms: 86400000
  max.message.bytes: 1048588
  confluent.key.schema.validation: true
  confluent.key.subject.name.strategy: "TopicNameStrategy"
  confluent.value.schema.validation: true
  confluent.value.subject.name.strategy: "TopicNameStrategy"

Operation Binding Object

This object contains information about the operation representation in Kafka (eg. the way to consume messages)

Fixed Fields
Field Name Type Description Applicability [default] Constraints
groupId Schema Object | Reference Object Id of the consumer group. OPTIONAL -
clientId Schema Object | Reference Object Id of the consumer inside a consumer group. OPTIONAL -
bindingVersion string The version of this binding. If omitted, "latest" MUST be assumed. OPTIONAL [latest] -

This object MUST contain only the properties defined above.

Example
channels:
  user-signedup:
operations:
  userSignup:
    action: receive
    bindings:
      kafka:
        groupId:
          type: string
          enum: ['myGroupId']
        clientId:
          type: string
          enum: ['myClientId']
        bindingVersion: '0.5.0'

Message Binding Object

This object contains information about the message representation in Kafka.

Fixed Fields
Field Name Type Description Applicability [default] Constraints
key Schema Object | Reference Object | AVRO Schema Object The message key. NOTE: You can also use the reference object way. OPTIONAL -
schemaIdLocation string If a Schema Registry is used when performing this operation, tells where the id of schema is stored (e.g. header or payload). OPTIONAL MUST NOT be specified if schemaRegistryUrl is not specified at the Server level
schemaIdPayloadEncoding string Number of bytes or vendor specific values when schema id is encoded in payload (e.g confluent/ apicurio-legacy / apicurio-new). OPTIONAL MUST NOT be specified if schemaRegistryUrl is not specified at the Server level
schemaLookupStrategy string Freeform string for any naming strategy class to use. Clients should default to the vendor default if not supplied. OPTIONAL MUST NOT be specified if schemaRegistryUrl is not specified at the Server level
bindingVersion string The version of this binding. If omitted, "latest" MUST be assumed. OPTIONAL [latest] -

This object MUST contain only the properties defined above.

This example is valid for any Confluent compatible schema registry. Here we describe the implementation using the first 4 bytes in payload to store schema identifier.

channels:
  test:
    address: test-topic
    messages:
      testMessage:
        bindings:
          kafka:
            key:
              type: string
              enum: ['myKey']
            schemaIdLocation: 'payload'
            schemaIdPayloadEncoding: '4'
            bindingVersion: '0.5.0'

This is another example that describes the use if Apicurio schema registry. We describe the apicurio-new way of serializing without details on how it's implemented. We reference a specific lookup strategy that may be used to retrieve schema Id from registry during serialization.

channels:
  test:
    address: test-topic
    messages:
      testMessage:
        bindings:
          kafka:
            key:
              type: string
              enum: ['myKey']
            schemaIdLocation: 'payload'
            schemaIdPayloadEncoding: 'apicurio-new'
            schemaLookupStrategy: 'TopicIdStrategy'
            bindingVersion: '0.5.0'