From 706ca8aa2a1f49d960004d03a86f2f3708ea91a4 Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Sun, 24 Mar 2019 21:06:18 +0100 Subject: [PATCH 1/2] We have some services that are allergic to empty offsets so let's have a build that doesn't use transactions --- build.sh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/build.sh b/build.sh index 9fb980d..06ce2a4 100755 --- a/build.sh +++ b/build.sh @@ -21,10 +21,10 @@ set -e docker build -f ./Dockerfile -t yolean/kafka-topics-copy:dev . build-contract -docker tag yolean/kafka-topics-copy:dev yolean/kafka-topics-copy:latest -docker push yolean/kafka-topics-copy:latest +docker tag yolean/kafka-topics-copy:dev yolean/kafka-topics-copy:notx +docker push yolean/kafka-topics-copy:notx # Workaround for https://github.com/Yolean/kafka-topics-copy/issues/4 # TODO add to build-contract -docker build -f ./Dockerfile --target runtime-plainjava -t yolean/kafka-topics-copy:plainjava . -docker push yolean/kafka-topics-copy:plainjava +docker build -f ./Dockerfile --target runtime-plainjava -t yolean/kafka-topics-copy:plainjava-notx . +docker push yolean/kafka-topics-copy:plainjava-notx From 5d490c144d7149d2c3895afc7d5f79deb3a1d43e Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Sun, 24 Mar 2019 21:06:43 +0100 Subject: [PATCH 2/2] Crude de-support of transactions, i.e. lots of duplication in case of failed produce somewhere in the middle of a poll --- .../kafka/topicscopy/tasks/CopyByPoll.java | 20 ++++++------------- .../yolean/kafka/topicscopy/tasks/Create.java | 7 +------ .../kafka/topicscopy/tasks/Subscribe.java | 2 +- 3 files changed, 8 insertions(+), 21 deletions(-) diff --git a/src/main/java/se/yolean/kafka/topicscopy/tasks/CopyByPoll.java b/src/main/java/se/yolean/kafka/topicscopy/tasks/CopyByPoll.java index 36a9fd7..d2aa5e0 100644 --- a/src/main/java/se/yolean/kafka/topicscopy/tasks/CopyByPoll.java +++ b/src/main/java/se/yolean/kafka/topicscopy/tasks/CopyByPoll.java @@ -72,7 +72,7 @@ public void run() { } try { - producer.beginTransaction(); + // producer.beginTransaction(); List> sent = new ArrayList<>(count); Iterator> records = polled.iterator(); @@ -88,33 +88,25 @@ public void run() { metadata.add(i, m); } - // https://hevodata.com/blog/kafka-exactly-once/, but what do we send for the cross-cluster mirror case? - // producer.sendOffsetsToTransaction(offsets, consumerGroupId); - producer.commitTransaction(); + // producer.commitTransaction(); - // https://www.baeldung.com/kafka-exactly-once says: - // Conversely, applications that must read and write to different Kafka clusters - // must use the older commitSync and commitAsync API. Typically, applications - // will store consumer offsets into their external state storage to maintain - // transactionality. consumer.commitSync(); statusHandlers.forEach(h -> h.copied(count)); } catch (ProducerFencedException e) { - // https://hevodata.com/blog/kafka-exactly-once/ doesn't abortTransaction here throw new RuntimeException("Unhandled", e); } catch (KafkaException e) { - producer.abortTransaction(); + //producer.abortTransaction(); throw new RuntimeException("Unhandled", e); } catch (InterruptedException e) { - producer.abortTransaction(); + //producer.abortTransaction(); throw new RuntimeException("Unhandled", e); } catch (ExecutionException e) { - producer.abortTransaction(); + //producer.abortTransaction(); throw new RuntimeException("Unhandled", e); } catch (TimeoutException e) { - producer.abortTransaction(); + //producer.abortTransaction(); throw new RuntimeException("Unhandled", e); } } diff --git a/src/main/java/se/yolean/kafka/topicscopy/tasks/Create.java b/src/main/java/se/yolean/kafka/topicscopy/tasks/Create.java index c2699ad..097238a 100644 --- a/src/main/java/se/yolean/kafka/topicscopy/tasks/Create.java +++ b/src/main/java/se/yolean/kafka/topicscopy/tasks/Create.java @@ -32,7 +32,6 @@ public void run() { consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, options.getGroupId()); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.getSourceBootstrap()); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, options.getAutoOffsetReset()); @@ -41,11 +40,7 @@ public void run() { Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, options.getTargetBootstrap()); - producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); - // https://www.baeldung.com/kafka-exactly-once - // "All that we need to do is make sure the transaction id is distinct for each - // producer, though consistent across restarts." - producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, options.getGroupId() + "-tx1"); + producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, options.getTargetCompression().name); diff --git a/src/main/java/se/yolean/kafka/topicscopy/tasks/Subscribe.java b/src/main/java/se/yolean/kafka/topicscopy/tasks/Subscribe.java index 184132f..1c2a96e 100644 --- a/src/main/java/se/yolean/kafka/topicscopy/tasks/Subscribe.java +++ b/src/main/java/se/yolean/kafka/topicscopy/tasks/Subscribe.java @@ -18,7 +18,7 @@ public Create getCreated() { @Override public void run() { - this.created.getProducer().initTransactions(); + //this.created.getProducer().initTransactions(); this.created.getConsumer().subscribe(sourceTopics); }