Skip to content

Commit

Permalink
feat: request/response support (#847)
Browse files Browse the repository at this point in the history
Co-authored-by: ue85540 <[email protected]>
  • Loading branch information
GreenRover and ue85540 authored Mar 10, 2023
1 parent a2fdc03 commit d367c23
Show file tree
Hide file tree
Showing 4 changed files with 1,136 additions and 1 deletion.
332 changes: 332 additions & 0 deletions examples/adeo-kafka-request-reply.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
asyncapi: 3.0.0
info:
title: Adeo AsyncAPI Case Study
version: "%REPLACED_BY_MAVEN%"
description: >
This Adeo specification illustrates how ADEO uses AsyncAPI to document some of their exchanges
contact:
name: AsyncAPI team
email: [email protected]
servers:
production:
url: "prod.url:9092"
protocol: kafka-secure
description: Kafka PRODUCTION cluster
security:
- sasl-ssl: []
bindings:
kafka:
schemaRegistryUrl: >-
https://schema-registry.prod.url/
staging:
url: "staging.url:9092"
protocol: kafka-secure
description: Kafka STAGING cluster for `uat` and `preprod` environments
security:
- sasl-ssl: []
bindings:
kafka:
schemaRegistryUrl: >-
https://schema-registry.staging.url/
dev:
url: "dev.url:9092"
protocol: kafka-secure
description: Kafka DEV cluster for `dev` and `sit` environments
security:
- sasl-ssl: []
bindings:
kafka:
schemaRegistryUrl: >-
https://schema-registry.dev.url/
tags:
- name: costing
description: "Costing channels, used by Costing clients."
channels:
costingRequest:
address: "adeo-{env}-case-study-COSTING-REQUEST-{version}"
description: >
Use this topic to do a Costing Request to Costing product.
We use the
[**RecordNameStrategy**](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy)
to infer the messages schema.
You have to define `x-value.subject.name.strategy` to
`io.confluent.kafka.serializers.subject.RecordNameStrategy` in your
producer to use the schema we manage.
The schema below illustrates how Costing Request messages are
handled.
![](https://user-images.githubusercontent.com/5501911/188920831-689cec5f-8dc3-460b-8794-0b54ec8b0ac8.png)
parameters:
env:
$ref: "#/components/parameters/Env"
version:
$ref: "#/components/parameters/Version"
bindings:
kafka:
replicas: 3
partitions: 3
topicConfiguration
cleanup.policy: delete
retention.ms: 7 days
messages:
costingRequest:
$ref: "#/components/messages/costingRequestV1"





costingResponse:
address: "adeo-{env}-case-study-COSTING-RESPONSE-{version}"
description: >
This topic is used to REPLY Costing Requests and is targeted by the
`REPLY_TOPIC` header.
**You must grant PUBLISH access to our `svc-ccr-app` service account.**.
We use the
[**RecordNameStrategy**](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy)
to infer the messages schema.
You have to define `key.subject.name.strategy` and
`x-value.subject.name.strategy` to
`io.confluent.kafka.serializers.subject.RecordNameStrategy` in your
consumer.
The schema below illustrates how Costing Response messages are
handled.
![](https://user-images.githubusercontent.com/5501911/188920831-689cec5f-8dc3-460b-8794-0b54ec8b0ac8.png)
parameters:
env:
$ref: "#/components/parameters/Env"
version:
$ref: "#/components/parameters/Version"
bindings:
kafka:
groupId:
type: string
description: >
The groupId must be prefixed by your `svc` account, deliver by the
Adeo Kafka team.
This `svc` must have the read access to the topic.
x-key.subject.name.strategy:
type: string
description: >
We use the RecordNameStrategy to infer the messages schema.
Use
`x-key.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy`
in your consumer configuration.
x-value.subject.name.strategy:
type: string
description: >
We use the RecordNameStrategy to infer the messages schema.
Use
`x-value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy`
in your consumer configuration.
messages:
costingResponse:
$ref: "#/components/messages/costingResponse"





operations:
requestCosting:
action: receive
channel:
$ref: '#/channels/costingRequest'
reply:
channel:
$ref: '#/channels/costingResponse'
address:
locaton: '$message.header#/REPLY_TOPIC'
summary: |
[COSTING] Request one or more Costing calculation for any product
description: >
You can try a costing request using our [Conduktor producer
template](https://conduktor.url)
tags:
- name: costing
bindings:
kafka:
groupId:
type: string
description: >
The groupId must be prefixed by your `svc` account, deliver by the
Adeo Kafka team.
This `svc` must have the write access to the topic.
x-value.subject.name.strategy:
type: string
description: >
We use the RecordNameStrategy to infer the messages schema.
Use
`x-value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy`
in your producer configuration.
getCostingResponse:
action: send
channel:
$ref: '#/channels/costingResponse'
summary: >
[COSTING] Get the costing responses matching an initial Costing
Request.
tags:
- name: costing





components:
correlationIds:
costingCorrelationId:
description: >
This correlation ID is used for message tracing and messages
correlation.
This correlation ID is generated at runtime based on the `REQUEST_ID`
and sent to the RESPONSE message.
location: $message.header#/REQUEST_ID
messages:
costingRequestV1:
name: CostingRequestV1
title: Costing Request V1
summary: Costing Request V1 inputs.
tags:
- name: costing
schemaFormat: application/vnd.apache.avro;version=1.9.0
correlationId:
$ref: "#/components/correlationIds/costingCorrelationId"
headers:
type: object
required:
- REQUESTER_ID
- REQUESTER_CODE
- REQUEST_ID
- REPLY_TOPIC
properties:
REQUEST_ID:
$ref: "#/components/schemas/RequestId"
REPLY_TOPIC:
$ref: "#/components/schemas/ReplyTopic"
REQUESTER_ID:
$ref: "#/components/schemas/RequesterId"
REQUESTER_CODE:
$ref: "#/components/schemas/RequesterCode"
payload:
$ref: "https://deploy-preview-921--asyncapi-website.netlify.app/resources/casestudies/adeo/CostingRequestPayload.avsc"
costingResponse:
name: CostingResponse
title: Costing Response
summary: Costing Response ouputs.
tags:
- name: costing
description: >
Please refer to the `CostingResponseKey.avsc` schema, available on [our
github
project](https://github.url/).
schemaFormat: application/vnd.apache.avro;version=1.9.0
correlationId:
$ref: "#/components/correlationIds/costingCorrelationId"
headers:
type: object
properties:
CALCULATION_ID:
$ref: "#/components/schemas/MessageId"
CORRELATION_ID:
$ref: "#/components/schemas/CorrelationId"
REQUEST_TIMESTAMP:
type: string
format: date-time
description: Timestamp of the costing request
CALCULATION_TIMESTAMP:
type: string
format: date-time
description: Technical timestamp for the costing calculation
bindings:
kafka:
key:
$ref: "https://deploy-preview-921--asyncapi-website.netlify.app/resources/casestudies/adeo/CostingResponseKey.avsc"
payload:
$ref: "https://deploy-preview-921--asyncapi-website.netlify.app/resources/casestudies/adeo/CostingResponsePayload.avsc"
schemas:
RequesterId:
type: string
description: The Costing requester service account used to produce costing request.
example: svc-ecollect-app
RequesterCode:
type: string
description: >-
The Costing requester code (generally the BU Code). The requester code
is useful to get the dedicated context (tenant).
example: 1
MessageId:
type: string
format: uuid
description: A unique Message ID.
example: 1fa6ef40-8f47-40a8-8cf6-f8607d0066ef
RequestId:
type: string
format: uuid
description: >-
A unique Request ID needed to define a `CORRELATION_ID` for exchanges,
which will be sent back in the Costing Responses.
example: 1fa6ef40-8f47-40a8-8cf6-f8607d0066ef
CorrelationId:
type: string
format: uuid
description: >-
A unique Correlation ID defined from the `REQUEST_ID` or the
`MESSAGE_ID` provided in the Costing Request.
example: 1fa6ef40-8f47-40a8-8cf6-f8607d0066ef
BuCode:
type: string
description: The Business Unit code for which data are applicable.
example: 1
ReplyTopic:
type: string
description: >
The Kafka topic where to send the Costing Response. This is required for
the [Return Address EIP
pattern](https://www.enterpriseintegrationpatterns.com/patterns/messaging/ReturnAddress.html).
**You must grant WRITE access to our `svc-ccr-app` service account.**
example: adeo-case-study-COSTING-RESPONSE-V1
ErrorStep:
type: string
description: |
The woker that has thrown the error.
example: EXPOSE_RESULT
ErrorMessage:
type: string
description: |
The error message describing the error.
example: Error message
ErrorCode:
type: string
description: |
The error code.
example: CURRENCY_NOT_FOUND
parameters:
Env:
description: Adeo Kafka Environement for messages publications.
schema:
type: string
enum:
- dev
- sit
- uat1
- preprod
- prod
Version:
description: the topic version you want to use
schema:
type: string
example: V1
default: V1
securitySchemes:
sasl-ssl:
type: plain
x-sasl.jaas.config: >-
org.apache.kafka.common.security.plain.PlainLoginModule required
username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>";
x-security.protocol: SASL_SSL
x-ssl.endpoint.identification.algorithm: https
x-sasl.mechanism: PLAIN
description: >
Use [SASL authentication with SSL
encryption](https://docs.confluent.io/platform/current/security/security_tutorial.html#configure-clients)
to connect to the ADEO Broker.
Loading

0 comments on commit d367c23

Please sign in to comment.