From d60e64cd7549ccccbfac992cd84ffee9cb5a0481 Mon Sep 17 00:00:00 2001 From: Nito Martinez Date: Tue, 4 Jan 2022 10:21:21 +0100 Subject: [PATCH 1/9] Standalone jar with avro serializer and kafka producer and consumer --- pom.xml | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 56 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index a40d407..863097a 100644 --- a/pom.xml +++ b/pom.xml @@ -13,14 +13,24 @@ 0.1.2 1.8 UTF-8 - 2.4.0 + 2.8.1 + 3.2.4 2.12.1 - 0.9.6 + 1.1.0 5.4.0 3.8.1 github + 3.3.1 + + + + confluent + http://packages.confluent.io/maven/ + + + internal.repo @@ -40,9 +50,8 @@ com.intuit.karate - karate-apache + karate-core ${karate.version} - test @@ -74,6 +83,12 @@ 2.4.0 + + io.confluent + kafka-avro-serializer + ${confluent.version} + + @@ -176,4 +191,40 @@ - \ No newline at end of file + + + fatjar + + + + org.apache.maven.plugins + maven-shade-plugin + ${maven.shade.version} + + + package + + shade + + + karate-kafka-${project.version} + + + *:* + + + + com.intuit.karate.Main + + + + + + + + + + + + + From 8c61ff18db8a2ea236abc6efb6605e932a59e861 Mon Sep 17 00:00:00 2001 From: Nito Martinez Date: Sat, 8 Jan 2022 14:35:59 +0100 Subject: [PATCH 2/9] Update documentation pointing to Karate standalone --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index bc19639..1e05e05 100644 --- a/README.md +++ b/README.md @@ -226,6 +226,16 @@ On the consumer side, you need to specify a deserializer for the key / value the On the Producer Side, you should never have to configure a serializer either for the key or data +### Karate standalone + +If you want to use a standalone Kafka Jar please use this: + +```shell +mvn -Pfatjar clean install +``` + +There is an example on how to use this in the [example karate standalone](examples/karate-standalone-with-kafka-schema-registry) + ## Managing the local Kafka broker The configuration for Kafka and Zookeeper is specified in `kafka-single-broker.yml`. See From 27a1993d49ee5eb697911843c33c6a3116f95f0d Mon Sep 17 00:00:00 2001 From: Nito Martinez Date: Wed, 12 Jan 2022 17:17:59 +0100 Subject: [PATCH 3/9] Add docker-compose to bring up a kafka with schema-registry --- README.md | 2 +- .../docker-compose.yml | 92 +++++++++++++++++++ 2 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 examples/karate_standalone_with_kafka_schema_registry/docker-compose.yml diff --git a/README.md b/README.md index 1e05e05..f7c19d8 100644 --- a/README.md +++ b/README.md @@ -234,7 +234,7 @@ If you want to use a standalone Kafka Jar please use this: mvn -Pfatjar clean install ``` -There is an example on how to use this in the [example karate standalone](examples/karate-standalone-with-kafka-schema-registry) +There is an example on how to use this in the [example karate standalone](examples/karate_standalone_with_kafka_schema_registry) ## Managing the local Kafka broker diff --git a/examples/karate_standalone_with_kafka_schema_registry/docker-compose.yml b/examples/karate_standalone_with_kafka_schema_registry/docker-compose.yml new file mode 100644 index 0000000..3783635 --- /dev/null +++ b/examples/karate_standalone_with_kafka_schema_registry/docker-compose.yml @@ -0,0 +1,92 @@ +version: '3' +services: + zookeeper: + image: confluentinc/cp-zookeeper:6.2.2 + hostname: zookeeper + container_name: zookeeper + ports: + - "2181:2181" + environment: + KAFKA_HEAP_OPTS: "-Xmx512M -Xms512M" + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_SERVER_ID: 1 + ZOOKEEPER_SERVERS: zookeeper:2888:3888 + ZOOKEEPER_4LW_COMMANDS_WHITELIST: stat,ruok,conf,isro + restart: always + kafka: + image: confluentinc/cp-kafka:6.2.2 + hostname: kafka + container_name: kafka + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + KAFKA_HEAP_OPTS: "-Xmx1G -Xms1G" + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092,LISTENER_DOCKER_EXTERNAL://127.0.0.1:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_LOG_SEGMENT_BYTES: 16777216 + KAFKA_LOG_RETENTION_BYTES: 134217728 + restart: always + schemaregistry: + image: confluentinc/cp-schema-registry:6.2.2 + hostname: schemaregistry + container_name: schemaregistry + depends_on: + - kafka + ports: + - "8081:8081" + environment: + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:19092 + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 + SCHEMA_REGISTRY_HOST_NAME: "schemaregistry" + SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: "full" + # SCHEMA_REGISTRY_DEBUG: 'true' + restart: always + restproxy: + image: confluentinc/cp-kafka-rest:6.2.2 + hostname: restproxy + container_name: restproxy + ports: + - "8082:8082" + environment: + KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:19092 + KAFKA_REST_LISTENERS: http://0.0.0.0:8082/ + KAFKA_REST_SCHEMA_REGISTRY_URL: http://schemaregistry:8081/ + KAFKA_REST_HOST_NAME: "restproxy" + depends_on: + - schemaregistry + restart: always +# Uncomment this for the schemaregistry ui and kafkatopics ui + schemaregistryui: + image: landoop/schema-registry-ui + hostname: schemaregistryui + container_name: schemaregistryui + ports: + - "8083:8083" + environment: + SCHEMAREGISTRY_URL: http://schemaregistry:8081 + PROXY: "TRUE" + ALLOW_GLOBAL: "1" + ALLOW_DELETION: "1" + ALLOW_TRANSITIVE: "1" + PORT: 8083 + depends_on: + - schemaregistry + restart: always + kafkatopicsui: + image: landoop/kafka-topics-ui:latest + hostname: kafkatopicsui + container_name: kafkatopicsui + depends_on: + - restproxy + ports: + - "8000:8000" + environment: + KAFKA_REST_PROXY_URL: http://restproxy:8082 + PROXY: "TRUE" + restart: always From dfd8954f5e266958798de187eecf668134065ca2 Mon Sep 17 00:00:00 2001 From: Nito Martinez Date: Wed, 12 Jan 2022 17:27:05 +0100 Subject: [PATCH 4/9] Test schemas --- .../test/message_key.avsc | 18 +++++++++ .../test/message_value.avsc | 37 +++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 examples/karate_standalone_with_kafka_schema_registry/test/message_key.avsc create mode 100644 examples/karate_standalone_with_kafka_schema_registry/test/message_value.avsc diff --git a/examples/karate_standalone_with_kafka_schema_registry/test/message_key.avsc b/examples/karate_standalone_with_kafka_schema_registry/test/message_key.avsc new file mode 100644 index 0000000..27ecca5 --- /dev/null +++ b/examples/karate_standalone_with_kafka_schema_registry/test/message_key.avsc @@ -0,0 +1,18 @@ +{ + "type": "record", + "namespace": "com.qindel.karate_kafka", + "name": "MessageStatusKey", + "doc": "Indicates the identifier of the message based on campaign_id and email.", + "fields": [ + { + "name": "campaign_id", + "type": "string", + "doc": "Is the campaign id returned when the campaign is launched" + }, + { + "name": "destination_address", + "type": "string", + "doc": "Is the destination address of the message, that is, the destination email for email" + } + ] +} \ No newline at end of file diff --git a/examples/karate_standalone_with_kafka_schema_registry/test/message_value.avsc b/examples/karate_standalone_with_kafka_schema_registry/test/message_value.avsc new file mode 100644 index 0000000..52b5752 --- /dev/null +++ b/examples/karate_standalone_with_kafka_schema_registry/test/message_value.avsc @@ -0,0 +1,37 @@ +{ + "type": "record", + "namespace": "com.qindel.karate_kafka", + "name": "MessageStatusValue", + "doc": "Indicates status of the message. Whether it is queued, sent, or has been delivered", + "fields": [ + { + "name": "status", + "type": { + "type": "enum", + "name": "status", + "symbols": [ + "other", + "queued", + "sent", + "delivered", + "error" + ], + "default": "other" + }, + "doc": "The different status where the message can be: 'queued': the message is in the output queue. 'sent': the message has been sent, 'delivered': the message has reached the end user device. 'error': the message will not be sent, more information about the error will be in the description field, 'revoked': The message was not be possible to be delivered once sent for a period of time and has been removed of the system, in case of rcs, if sms fallback was configured, the next status might be 'queued' and the channel might change from 'rcs' to 'sms' and start again via 'queued', 'sent', 'delivered'... 'other' is mainly used for schema evolution for new statuses that might be defined in the future. If you get 'other' the most likely case is that there is a new version of the schema" + }, + { + "name": "status_ts", + "type": "long", + "logicalType": "timestamp-millis", + "default": 0, + "doc": "Is the timestamp when the status was changed. Number of milliseconds from the unix epoch, 1 January 1970 00:00:00.000 UTC" + }, + { + "name": "description", + "type": "string", + "default": "", + "doc": "Is extra information about the status. It might indicate the reason of an error, or the list that filtered the message" + } + ] +} \ No newline at end of file From 908e2774a08d83a73de250465b1b98e4145be0b4 Mon Sep 17 00:00:00 2001 From: Nito Martinez Date: Wed, 12 Jan 2022 17:27:18 +0100 Subject: [PATCH 5/9] Test key and test value --- .../test/message_key.json | 4 ++++ .../test/message_value.json | 5 +++++ 2 files changed, 9 insertions(+) create mode 100644 examples/karate_standalone_with_kafka_schema_registry/test/message_key.json create mode 100644 examples/karate_standalone_with_kafka_schema_registry/test/message_value.json diff --git a/examples/karate_standalone_with_kafka_schema_registry/test/message_key.json b/examples/karate_standalone_with_kafka_schema_registry/test/message_key.json new file mode 100644 index 0000000..ecc22a9 --- /dev/null +++ b/examples/karate_standalone_with_kafka_schema_registry/test/message_key.json @@ -0,0 +1,4 @@ +{ + "campaign_id": "test_campaign", + "destination_address": "you@example.com" +} diff --git a/examples/karate_standalone_with_kafka_schema_registry/test/message_value.json b/examples/karate_standalone_with_kafka_schema_registry/test/message_value.json new file mode 100644 index 0000000..09766c8 --- /dev/null +++ b/examples/karate_standalone_with_kafka_schema_registry/test/message_value.json @@ -0,0 +1,5 @@ +{ + "status": "sent", + "status_ts": 0, + "description": "test" +} \ No newline at end of file From 413c8e56cee27bd057fc7389dffaaf870ccf9278 Mon Sep 17 00:00:00 2001 From: Nito Martinez Date: Wed, 12 Jan 2022 17:27:59 +0100 Subject: [PATCH 6/9] Add example feature for karate-kafka --- .../kafka_rest_proxy_schema_registry.feature | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 examples/karate_standalone_with_kafka_schema_registry/test/kafka_rest_proxy_schema_registry.feature diff --git a/examples/karate_standalone_with_kafka_schema_registry/test/kafka_rest_proxy_schema_registry.feature b/examples/karate_standalone_with_kafka_schema_registry/test/kafka_rest_proxy_schema_registry.feature new file mode 100644 index 0000000..a605f06 --- /dev/null +++ b/examples/karate_standalone_with_kafka_schema_registry/test/kafka_rest_proxy_schema_registry.feature @@ -0,0 +1,42 @@ +Feature: Kafka Rest Proxy Avro Producer and Kafka Avro Consumer Demo + + Background: + # Global info + * def topic = 'test-topic' + # Kafka consumer info + * def KafkaConsumer = Java.type('karate.kafka.KarateKafkaConsumer') + * def KafkaAvroDeserializer = Java.type("io.confluent.kafka.serializers.KafkaAvroDeserializer") + # Rest proxy info + * def rest_proxy_path = "/topics/" + topic + * configure headers = { "Accept": "application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json", "Content-Type": "application/vnd.kafka.avro.v2+json" } + # Schemas + # We need to escape the avsc quotes with backslash, and not send any newlines + * def keySchema = karate.readAsString('this:message_key.avsc').replace(/[\""]/g, '\\"').replace(/[\n\r]/g, ' ') + * def valueSchema = karate.readAsString('this:message_value.avsc').replace(/[\""]/g, '\\"').replace(/[\n\r]/g, ' ') + # Example data + * def recordKey = karate.readAsString('this:message_key.json').replace(/[\n\r]/g, ' ') + * def recordValue = karate.readAsString('this:message_value.json').replace(/[\n\r]/g, ' ') + * def requestString = '{ "key_schema": "' + keySchema +'", "value_schema": "' + valueSchema + '", "records": [ { "key": ' + recordKey + ', "value": ' + recordValue + ' } ] }' + * print "requestString:", requestString + + + Scenario Outline: Write messages to and read it back from a KafkaConsumer with props + + # Create a consumer with the properties per tag + * def kc = new KafkaConsumer(topic,) + Given url "" + And path rest_proxy_path + And request requestString + When method post + # Fetch message from kafka + * json out = kc.take() + # The values should match + Then match out.key == { "campaign_id": "test_campaign", "destination_address": "you@example.com" } + And match out.value contains { "status": "sent" } + # Close the Consumer + * kc.close() + + @dev + Examples: + | restproxy | kafkaprops | + | http://127.0.0.1:8082 | { "bootstrap.servers": "127.0.0.1:9092", "group.id": "my.test", "auto.offset.reset": "earliest", "schema.registry.url": "http://127.0.0.1:8081", "key.deserializer": "io.confluent.kafka.serializers.KafkaAvroDeserializer", "value.deserializer": "io.confluent.kafka.serializers.KafkaAvroDeserializer" } | \ No newline at end of file From 033855536b7e289a868d9a6a99589365186611c1 Mon Sep 17 00:00:00 2001 From: Nito Martinez Date: Wed, 12 Jan 2022 17:29:56 +0100 Subject: [PATCH 7/9] Add some documentation --- .../README.md | 188 ++++++++++++++++++ 1 file changed, 188 insertions(+) create mode 100644 examples/karate_standalone_with_kafka_schema_registry/README.md diff --git a/examples/karate_standalone_with_kafka_schema_registry/README.md b/examples/karate_standalone_with_kafka_schema_registry/README.md new file mode 100644 index 0000000..5a50929 --- /dev/null +++ b/examples/karate_standalone_with_kafka_schema_registry/README.md @@ -0,0 +1,188 @@ +## Test Kafka Standalone with Schema Registry + +### Description + +The objective is to create an example using: + +- Karate standalone JAR with Kafka libraries. This is mainly for non Java based installations. +See https://github.com/karatelabs/karate/blob/master/karate-netty/README.md#standalone-jar +- Create a REST request which will generate some Kafka topic output. This is simulated using a REST producer with the +Confluent REST proxy +- Avro Consumer using the confluent schema registry. + +### Build of the standalone JAR file + +From the source tree directory run: + +```shell +mvn -Pfatjar clean install +``` + +this will generate the following fat jar: `target/karate-kafka-0.1.2.jar` + +For more information see: https://github.com/karatelabs/karate/blob/master/karate-netty/README.md#standalone-jar + +### Start up of Kafka and Schema Registry + +The following command will create: + +- Zookeeper on port 2181 +- Kafka on port 9092 (internally it listens on 19092) +- Confluent Schema Registry on http://127.0.0.1:8081 +- Confluent Rest proxy on http://127.0.0.1:8082 +- Landoop Schema registry UI on http://127.0.0.1:8083 +- Landoop Topics UI browser on http://127.0.0.1:8000 + +```shell +cd examples/karate_standalone_with_kafka_schema_registry/ +docker-compose up +``` + +To stop the environment run: + + +```shell +cd examples/karate_standalone_with_kafka_schema_registry/ +docker-compose down +``` + +### Run the example test + +The example test files are at `examples/karate_standalone_with_kafka_schema_registry/test`. And it includes the files: + +- `kafa-rest-proxy-schema-registry.feature`: Example scenario to run +- `message_key.avsc`: Key Avro Schema +- `message_value.avsc`: Value Avro Schema +- `message_key.json`: Example test key +- `message_value.json`: Example test value + + +To run the test: + +```shell +KARATE_JAR=$(pwd)/target/karate-kafka-0.1.2.jar +cd examples/karate_standalone_with_kafka_schema_registry/test +java -jar $KARATE_JAR --tags=@dev kafka_rest_proxy_schema_registry.feature +``` + +### Gherkin/Karate file + +1. It creates a test entry from the `message_key.json` and `message_value.json` files. + +The basic idea is that there is some kind of REST request which will generate some Kafka messages. +In our example this is simulated using the Kafka Confluent Rest proxy and pushing directly a message +to the test topic. See https://docs.confluent.io/platform/current/kafka-rest/api.html + +The key elements are the `Accept` and the `Content-Type` header which indicates that the content generated is of type +AVRO and sent to the proxy with JSON + +2. It then reads with the Kafka Consumer using a Schema registry. + +The Kafka Consumer is using the well known class `karate.kafka.KarateKafkaConsumer` + +3. The parameters which might vary between the environments have been setup in the variables: + * restproxy: http://127.0.0.1:8082 + * kafkaprops: Here we indicate the bootstrap server, the group id, the key and value deserializer and the schema +registry url + + + +```gherkin +Feature: Kafka Rest Proxy Avro Producer and Kafka Avro Consumer Demo + + Background: + # Global info + * def topic = 'test-topic' + # Kafka consumer info + * def KafkaConsumer = Java.type('karate.kafka.KarateKafkaConsumer') + * def KafkaAvroDeserializer = Java.type("io.confluent.kafka.serializers.KafkaAvroDeserializer") + # Rest proxy info + * def rest_proxy_path = "/topics/" + topic + * configure headers = { "Accept": "application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json", "Content-Type": "application/vnd.kafka.avro.v2+json" } + # Schemas + # We need to escape the avsc quotes with backslash, and not send any newlines + * def keySchema = karate.readAsString('this:message_key.avsc').replace(/[\""]/g, '\\"').replace(/[\n\r]/g, ' ') + * def valueSchema = karate.readAsString('this:message_value.avsc').replace(/[\""]/g, '\\"').replace(/[\n\r]/g, ' ') + # Example data + * def recordKey = karate.readAsString('this:message_key.json').replace(/[\n\r]/g, ' ') + * def recordValue = karate.readAsString('this:message_value.json').replace(/[\n\r]/g, ' ') + * def requestString = '{ "key_schema": "' + keySchema +'", "value_schema": "' + valueSchema + '", "records": [ { "key": ' + recordKey + ', "value": ' + recordValue + ' } ] }' + * print "requestString:", requestString + + + Scenario Outline: Write messages to and read it back from a KafkaConsumer with props + + # Create a consumer with the properties per tag + * def kc = new KafkaConsumer(topic,) + Given url "" + And path rest_proxy_path + And request requestString + When method post + # Fetch message from kafka + * json out = kc.take() + # The values should match + Then match out.key == { "campaign_id": "test_campaign", "destination_address": "you@example.com" } + And match out.value contains { "status": "sent" } + # Close the Consumer + * kc.close() + + @dev + Examples: + | restproxy | kafkaprops | + | http://127.0.0.1:8082 | { "bootstrap.servers": "127.0.0.1:9092", "group.id": "my.test", "auto.offset.reset": "earliest", "schema.registry.url": "http://127.0.0.1:8081", "key.deserializer": "io.confluent.kafka.serializers.KafkaAvroDeserializer", "value.deserializer": "io.confluent.kafka.serializers.KafkaAvroDeserializer" } | +``` + + +### Example output + +```shell +KARATE_JAR=$(pwd)/target/karate-kafka-0.1.2.jar +cd examples/karate_standalone_with_kafka_schema_registry/test +java -jar $KARATE_JAR --tags=@dev kafka_rest_proxy_schema_registry.feature +14:51:46.827 [main] INFO com.intuit.karate - Karate version: 1.1.0 +14:51:48.205 [main] INFO com.intuit.karate - [print] requestString: { "key_schema": "{ \"type\": \"record\", \"namespace\": \"com.qindel.karate-kafka\", \"name\": \"MessageStatusKey\", \"doc\": \"Indicates the identifier of the message based on campaign_id and email.\", \"fields\": [ { \"name\": \"campaign_id\", \"type\": \"string\", \"doc\": \"Is the campaign id returned when the campaign is launched\" }, { \"name\": \"destination_address\", \"type\": \"string\", \"doc\": \"Is the destination address of the message, that is, the destination email for email\" } ] }", "value_schema": "{ \"type\": \"record\", \"namespace\": \"com.qindel.karate-kafka\", \"name\": \"MessageStatusValue\", \"doc\": \"Indicates status of the message. Whether it is queued, sent, or has been delivered\", \"fields\": [ { \"name\": \"status\", \"type\": { \"type\": \"enum\", \"name\": \"status\", \"symbols\": [ \"other\", \"queued\", \"sent\", \"delivered\", \"error\" ], \"default\": \"other\" }, \"doc\": \"The different status where the message can be: 'queued': the message is in the output queue. 'sent': the message has been sent, 'delivered': the message has reached the end user device. 'error': the message will not be sent, more information about the error will be in the description field, 'revoked': The message was not be possible to be delivered once sent for a period of time and has been removed of the system, in case of rcs, if sms fallback was configured, the next status might be 'queued' and the channel might change from 'rcs' to 'sms' and start again via 'queued', 'sent', 'delivered'... 'other' is mainly used for schema evolution for new statuses that might be defined in the future. If you get 'other' the most likely case is that there is a new version of the schema\" }, { \"name\": \"status_ts\", \"type\": \"long\", \"logicalType\": \"timestamp-millis\", \"default\": 0, \"doc\": \"Is the timestamp when the status was changed. Number of milliseconds from the unix epoch, 1 January 1970 00:00:00.000 UTC\" }, { \"name\": \"description\", \"type\": \"string\", \"default\": \"\", \"doc\": \"Is extra information about the status. It might indicate the reason of an error, or the list that filtered the message\" } ] }", "records": [ { "key": { "campaign_id": "test_campaign", "destination_address": "you@example.com" } , "value": { "status": "sent", "status_ts": 0, "description": "test" } } ] } +14:51:48.519 [main] DEBUG com.intuit.karate - request: +1 > POST http://127.0.0.1:8082/topics/test-topic +1 > Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json +1 > Content-Type: application/vnd.kafka.avro.v2+json; charset=UTF-8 +1 > Content-Length: 2620 +1 > Host: 127.0.0.1:8082 +1 > Connection: Keep-Alive +1 > User-Agent: Apache-HttpClient/4.5.13 (Java/11.0.13) +1 > Accept-Encoding: gzip,deflate +{ "key_schema": "{ \"type\": \"record\", \"namespace\": \"com.qindel.karate-kafka\", \"name\": \"MessageStatusKey\", \"doc\": \"Indicates the identifier of the message based on campaign_id and email.\", \"fields\": [ { \"name\": \"campaign_id\", \"type\": \"string\", \"doc\": \"Is the campaign id returned when the campaign is launched\" }, { \"name\": \"destination_address\", \"type\": \"string\", \"doc\": \"Is the destination address of the message, that is, the destination email for email\" } ] }", "value_schema": "{ \"type\": \"record\", \"namespace\": \"com.qindel.karate-kafka\", \"name\": \"MessageStatusValue\", \"doc\": \"Indicates status of the message. Whether it is queued, sent, or has been delivered\", \"fields\": [ { \"name\": \"status\", \"type\": { \"type\": \"enum\", \"name\": \"status\", \"symbols\": [ \"other\", \"queued\", \"sent\", \"delivered\", \"error\" ], \"default\": \"other\" }, \"doc\": \"The different status where the message can be: 'queued': the message is in the output queue. 'sent': the message has been sent, 'delivered': the message has reached the end user device. 'error': the message will not be sent, more information about the error will be in the description field, 'revoked': The message was not be possible to be delivered once sent for a period of time and has been removed of the system, in case of rcs, if sms fallback was configured, the next status might be 'queued' and the channel might change from 'rcs' to 'sms' and start again via 'queued', 'sent', 'delivered'... 'other' is mainly used for schema evolution for new statuses that might be defined in the future. If you get 'other' the most likely case is that there is a new version of the schema\" }, { \"name\": \"status_ts\", \"type\": \"long\", \"logicalType\": \"timestamp-millis\", \"default\": 0, \"doc\": \"Is the timestamp when the status was changed. Number of milliseconds from the unix epoch, 1 January 1970 00:00:00.000 UTC\" }, { \"name\": \"description\", \"type\": \"string\", \"default\": \"\", \"doc\": \"Is extra information about the status. It might indicate the reason of an error, or the list that filtered the message\" } ] }", "records": [ { "key": { "campaign_id": "test_campaign", "destination_address": "you@example.com" } , "value": { "status": "sent", "status_ts": 0, "description": "test" } } ] } + +14:51:50.716 [main] DEBUG com.intuit.karate - response time in milliseconds: 2193 +1 < 200 +1 < Date: Sat, 08 Jan 2022 13:51:48 GMT +1 < Content-Type: application/vnd.kafka.v2+json +1 < Vary: Accept-Encoding, User-Agent +1 < Transfer-Encoding: chunked +{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":1,"value_schema_id":2} +14:51:55.062 [main] INFO com.intuit.karate - [print] kafka out: { + "key": { + "campaign_id": "test_campaign", + "destination_address": "you@example.com" + }, + "value": { + "status": "sent", + "status_ts": 0, + "description": "test" + } +} + +--------------------------------------------------------- +feature: kafa-rest-proxy-schema-registry.feature +scenarios: 1 | passed: 1 | failed: 0 | time: 7.2697 +--------------------------------------------------------- + +14:51:55.863 [main] INFO com.intuit.karate.Suite - <> feature 1 of 1 (0 remaining) kafa-rest-proxy-schema-registry.feature +Karate version: 1.1.0 +====================================================== +elapsed: 8.91 | threads: 1 | thread time: 7.27 +features: 1 | skipped: 0 | efficiency: 0.82 +scenarios: 1 | passed: 1 | failed: 0 +====================================================== + +HTML report: (paste into browser to view) | Karate version: 1.1.0 +``` \ No newline at end of file From 78988616ef80b760e2fe86d0ab833974ab39bd9a Mon Sep 17 00:00:00 2001 From: Nito Martinez Date: Wed, 12 Jan 2022 17:35:10 +0100 Subject: [PATCH 8/9] Update documentation --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f7c19d8..d8794e9 100644 --- a/README.md +++ b/README.md @@ -234,7 +234,7 @@ If you want to use a standalone Kafka Jar please use this: mvn -Pfatjar clean install ``` -There is an example on how to use this in the [example karate standalone](examples/karate_standalone_with_kafka_schema_registry) +There is an example on how to use this in the [example karate standalone with kafka using schema_registry](examples/karate_standalone_with_kafka_schema_registry) ## Managing the local Kafka broker From 96087c1762bec6659a11d4901e5578bda3239846 Mon Sep 17 00:00:00 2001 From: Nito Martinez Date: Mon, 31 Jan 2022 11:38:09 +0100 Subject: [PATCH 9/9] Add karate_version-karate_kafka_version as version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 863097a..3c7e5f3 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ jar - 0.1.2 + 1.1.0-0.1.3 1.8 UTF-8 2.8.1