{% hint style="info" %} This functionality is available in Aidbox versions 2409 and later and requires FHIR Schema validation engine to be enabled. {% endhint %}
This page describes an AidboxTopicDestination which allows to store events described by an AidboxSubscriptionTopic in Kafka.
Aidbox provides two kinds of Kafka integrations:
- Best effort: Aidbox stores events in memory. In some cases (for example, if Aidbox crashes or Kafka is unavailable), events can be lost.
- At least once: Aidbox stores events in the database in the same transaction with a CRUD operation. Aidbox guarantees at least once delivery for an event.
Best effort
incurs a lower performance cost than the at least once
approach. Choose at least once
if performance is not a concern for you.
{% hint style="warning" %}
Be aware of using Best effort
with batch transactions. Messages are generated while processing batch entries, so if the batch transaction fails at the end, the messages will not be revoked.
{% endhint %}
{% hint style="warning" %}
Please note that at least once
approach uses transactional Kafka producers. Please make sure that transaction.state.log.replication.factor
is less or equal then the number of brokers in your Kafka cluster. Otherwise sending messages from Aidbox to Kafka may fail with Timeout expired after ...ms while awaiting InitProducerId
error.
{% endhint %}
{% content-ref url="./" %} . {% endcontent-ref %}
To use Kafka with #aidboxsubscriptiontopic you have to create #aidboxtopicdestination resource.
There are two FHIR profiles available to use with Kafka:
for best effort:
http://aidbox.app/StructureDefinition/aidboxtopicdestination-kafka-best-effort
for at least once:
http://aidbox.app/StructureDefinition/aidboxtopicdestination-kafka-at-least-once
{% hint style="info" %} For additional details see Kafka Producer Configs Documentation {% endhint %}
Parameter name | Value type | Description |
---|---|---|
kafkaTopic * | valueString | The Kafka topic where the data should be sent. |
bootstrapServers * | valueString | Comma-separated string. A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. |
compressionType | valueString | Specify the final compression type for a given topic. This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd'). |
batchSize | valueInteger | This configuration controls the default batch size in bytes. |
deliveryTimeoutMs | valueInteger | A maximum time limit for reporting the success or failure of a record sent by a producer, covering delays before sending, waiting for broker acknowledgment, and handling retriable errors. |
maxBlockMs | valueInteger | The configuration controls how long the KafkaProducer 's send() method will block. |
maxRequestSize | valueInteger | The maximum size of a request in bytes. |
requestTimeoutMs | valueInteger | The maximum amount of time the client will wait for the response of a request. |
sslKeystoreKey | valueString | Private key in the format specified by 'ssl.keystore.type'. |
securityProtocol | valueString | Protocol used to communicate with brokers. |
saslMechanism | valueString | SASL mechanism used for client connections. |
saslJaasConfig | valueString | JAAS login context parameters for SASL connections in the format used by JAAS configuration files. |
saslClientCallbackHandlerClass | valueString | The fully qualified name of a SASL client callback handler class that implements the AuthenticateCallbackHandler interface. |
* required parameter.
Full example see on Github
POST /fhir/AidboxTopicDestination
content-type: application/json
accept: application/json
{
"resourceType": "AidboxTopicDestination",
"meta": {
"profile": [
"http://aidbox.app/StructureDefinition/aidboxtopicdestination-kafka-at-least-once"
]
},
"kind": "kafka-at-least-once",
"id": "kafka-destination",
"topic": "http://example.org/FHIR/R5/SubscriptionTopic/QuestionnaireResponse-topic",
"parameter": [
{
"name": "kafkaTopic",
"valueString": "aidbox-forms"
},
{
"name": "bootstrapServers",
"valueString": "kafka:29092"
}
]
}
POST /fhir/AidboxTopicDestination
content-type: application/json
accept: application/json
{
"resourceType": "AidboxTopicDestination",
"meta": {
"profile": [
"http://aidbox.app/StructureDefinition/aidboxtopicdestination-kafka-best-effort"
]
},
"kind": "kafka-best-effort",
"id": "kafka-destination",
"topic": "http://example.org/FHIR/R5/SubscriptionTopic/QuestionnaireResponse-topic",
"parameter": [
{
"name": "kafkaTopic",
"valueString": "aidbox-forms"
},
{
"name": "bootstrapServers",
"valueString": "<...>.amazonaws.com:9098,<...>.amazonaws.com:9098"
},
{
"name": "securityProtocol",
"valueString": "SASL_SSL"
},
{
"name": "saslMechanism",
"valueString": "AWS_MSK_IAM"
},
{
"name": "saslJaasConfig",
"valueString": "software.amazon.msk.auth.iam.IAMLoginModule required;"
},
{
"name": "saslClientCallbackHandlerClass",
"valueString": "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
]
}
Aidbox provides $status
operation which provides short status information on the integration status:
{% tabs %} {% tab title="Request" %}
GET /fhir/AidboxTopicDestination/<topic-destination-id>/$status
content-type: application/json
accept: application/json
{% endtab %}
{% tab title="Response (best-effort)" %} {% code title="200 OK" %}
{
"resourceType": "Parameters",
"parameter": [
{
"valueDecimal": 1,
"name": "messagesDelivered"
},
{
"valueDecimal": 0,
"name": "messagesInProcess"
},
{
"valueDecimal": 1,
"name": "messagesLost"
},
{
"valueDateTime": "2024-10-03T08:43:36Z",
"name": "startTimestamp"
},
{
"valueString": "active",
"name": "status"
},
{
"name": "lastErrorDetail",
"part": [
{
"valueString": "Timeout expired after 10000ms while awaiting InitProducerId",
"name": "message"
},
{
"valueDateTime": "2024-10-03T08:44:09Z",
"name": "timestamp"
}
]
}
]
}
{% endcode %} {% endtab %}
{% tab title="Response (at-least-once)" %}
200 OK
{
"resourceType": "Parameters",
"parameter": [
{
"name": "messagesDelivered",
"valueDecimal": 1
},
{
"name": "messagesDeliveryAttempts",
"valueDecimal": 2
},
{
"name": "messagesInProcess",
"valueDecimal": 0
},
{
"name": "messagesQueued",
"valueDecimal": 0
},
{
"name": "startTimestamp",
"valueDateTime": "2024-10-03T08:18:47Z"
},
{
"name": "status",
"valueString": "active"
},
{
"name": "lastErrorDetail",
"part": [
{
"valueString": "Timeout expired after 10000ms while awaiting InitProducerId",
"name": "message"
},
{
"valueDateTime": "2024-10-03T08:19:32Z",
"name": "timestamp"
}
]
}
]
}
{% endtab %} {% endtabs %}
Response parameters for best-effort
:
Property | Type | Description |
---|---|---|
startTimestamp | valueDateTime | AidboxTopicDestination start time in UTC. |
status | valueString | AidboxTopicDestination status is always active , which means that AidboxTopicDestination will try to send all received notifications. |
messagesDelivered | valueDecimal | Total number of events that have been successfully delivered. |
messagesInProcess | valueDecimal | Current number of events in the buffer being processed for delivery. |
messagesLost | string | Total number of events that failed to be delivered. |
lastErrorDetail | valueDateTime | Information about errors of the latest failed attempt to send an event. This parameter can be repeated up to 5 times. Includes the following parameters. |
lastErrorDetail.message | string | Error message of the given error. |
lastErrorDetail.timestamp | valueDateTime | Timestamp of the given error. |
Response parameters for at-least-once
:
Property | Type | Description |
---|---|---|
messagesDelivered | valueDecimal | Total number of events that have been successfully delivered. |
messagesDeliveryAttempts | valueDecimal | Number of delivery attempts that failed. |
messagesInProcess | valueDecimal | Current number of events in the buffer being processed for delivery. |
messagesQueued | valueDecimal | Number of events pending in the queue for dispatch to the Kafka driver. |
startTimestamp | valueDateTime | AidboxTopicDestination start time in UTC. |
status | valueString | AidboxTopicDestination status is always active , which means that AidboxTopicDestination will try to send all received notifications. |
lastErrorDetail | part | Information about errors of the latest failed attempt to send an event. This parameter can be repeated up to 5 times. Includes the following parameters. |
lastErrorDetail.message | valueString | Error message of the given error. |
lastErrorDetail.timestamp | valueDateTime | Timestamp of the given error. |