From fb956849ed66c31b83a2feea103c608bb83416ed Mon Sep 17 00:00:00 2001 From: Shay Bratslavsky Date: Mon, 26 Aug 2024 11:43:00 +0300 Subject: [PATCH] 3.5.1 beta (#178) * add github files * client config as one json (#166) * client config as one json * remove logic --------- Co-authored-by: Idan Asulin <74712806+idanasulin2706@users.noreply.github.com> * test fat jar * fix branch * add plugin * add archive option * fix serve file * fix * test app * fix * fix * Rnd 759 client parameters to store (#168) * client config as one json * remove logic * remove superstream connection entry from configToSend * update version beta file * fix * remove comments * remove imports * support update full config for client * change the versions and pass client ip and host * handle consumer full config update * handle consumer full config update * gradle_build_imports * handle admin client update full config --------- Co-authored-by: Idan Asulin <74712806+idanasulin2706@users.noreply.github.com> * Rnd 759 client parameters to store (#169) * client config as one json * remove logic * remove superstream connection entry from configToSend * update version beta file * fix * remove comments * remove imports * support update full config for client * change the versions and pass client ip and host * handle consumer full config update * handle consumer full config update * gradle_build_imports * handle admin client update full config * back to 113 --------- Co-authored-by: Idan Asulin <74712806+idanasulin2706@users.noreply.github.com> * revert none relevant changes into Jenkinsfile * remove none relevant comments * Rnd 759 client parameters to store (#172) * client config as one json * remove logic * remove superstream connection entry from configToSend * update version beta file * fix * remove comments * remove imports * support update full config for client * change the versions and pass client ip and host * handle consumer full config update * handle consumer full config update * gradle_build_imports * handle admin client update full config * back to 113 * buildNewGradle * wait mechanism for canstart * synchronized the other thread * wait mechanism refactor * remove comment * beta version upgrade --------- Co-authored-by: Idan Asulin <74712806+idanasulin2706@users.noreply.github.com> * added SUPERSTREAM_DEBUG env var handle- disable and able all stdout (#173) * added SUPERSTREAM_DEBUG env var handle- disable and able all stdout * version beta * version beta --> 3 * refactor with consts for initSuperstreamConfig method * change SUPERSTREAM_DEBUG env var affect only for superstream stdout * log for test * revert test log * change consts env var names * revert partitions.contains * serielizer/desirielizer handle for payload reduction. empty methods * revert contains check * stdout handle outside of superstream class * changed superstream connection log in adnminKafka * move log for Successfully connection to superstream after waitForStart * move it again * Rnd 955 support in changing client config parameters (#174) * mechnisim of wait for superstream config and config bootstrap servers remove for test * move place for test * add getter to abstract config values. remove the bootstrap servers key-val * set the superstream config vaues inside kafka producer config * wait for super stream config move to super stream class, wait with object lock to support release cpu in wait interval * refactor for waiting methods * default timeout for superstream config * move getter location * list of supported client added in consts- we register clients only if type in the list * to lower case added * move type check to the AbstractConfig * move import * upgrade beta version-beta.conf * Rnd 955 support in changing client config parameters (#175) * mechnisim of wait for superstream config and config bootstrap servers remove for test * move place for test * add getter to abstract config values. remove the bootstrap servers key-val * set the superstream config vaues inside kafka producer config * wait for super stream config move to super stream class, wait with object lock to support release cpu in wait interval * refactor for waiting methods * default timeout for superstream config * move getter location * list of supported client added in consts- we register clients only if type in the list * to lower case added * move type check to the AbstractConfig * move import * upgrade beta version-beta.conf * fix stdout when superstream failed initializing * upgrade version beta * remove_pr_template (#176) * Remove pr template (#177) * remove_pr_template * move PULL_REQUEST_TEMPLATE --------- Co-authored-by: idanasulinStrech Co-authored-by: liranbahar Co-authored-by: Idan Asulin <74712806+idanasulin2706@users.noreply.github.com> Co-authored-by: Beka Kotchauri Co-authored-by: Beka Kotchauri <145648546+bkochauri-memphis@users.noreply.github.com> --- .github/CODEOWNERS | 1 + Jenkinsfile | 2 +- PULL_REQUEST_TEMPLATE.md | 64 ++- build.gradle | 11 + .../kafka/clients/admin/KafkaAdminClient.java | 14 +- .../kafka/clients/consumer/KafkaConsumer.java | 6 + .../kafka/clients/producer/KafkaProducer.java | 10 +- .../kafka/common/config/AbstractConfig.java | 17 +- .../kafka/common/superstream/Consts.java | 28 +- .../kafka/common/superstream/Superstream.java | 444 ++++++++++++------ .../superstream/superstream-version.conf | 2 +- version.conf | 2 +- 12 files changed, 436 insertions(+), 165 deletions(-) create mode 100644 .github/CODEOWNERS diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 000000000..241456729 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1 @@ +* @idanasulin2706 @shay23b \ No newline at end of file diff --git a/Jenkinsfile b/Jenkinsfile index 3aa402f0b..968ec3fc4 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -196,4 +196,4 @@ Triggered by: ${env.TRIGGERED_BY} :link: *Build URL:* <${jobUrl}|View Build Details> """ ) -} +} \ No newline at end of file diff --git a/PULL_REQUEST_TEMPLATE.md b/PULL_REQUEST_TEMPLATE.md index 552a4d03e..159da8d44 100644 --- a/PULL_REQUEST_TEMPLATE.md +++ b/PULL_REQUEST_TEMPLATE.md @@ -1,14 +1,50 @@ -*More detailed description of your change, -if necessary. The PR title and PR message become -the squashed commit message, so use a separate -comment to ping reviewers.* - -*Summary of testing strategy (including rationale) -for the feature or bug fix. Unit and/or integration -tests are expected for any behaviour change and -system tests should be considered for larger changes.* - -### Committer Checklist (excluded from commit message) -- [ ] Verify design and implementation -- [ ] Verify test coverage and CI build status -- [ ] Verify documentation (including upgrade notes) +## How Has This Been Tested? + +Please describe the tests that you ran to verify your changes. Please also note any relevant details for your test configuration. +- [ ] Test A +- [ ] Test B + +## Checklist: + +- [ ] I have performed a self-review of my code +- [ ] I have made corresponding changes to the knowledge base (if needed) +- [ ] My changes generate no new warnings +- [ ] I have verified that the specification is met and all functionalities are working as expected + +## Reviewer Score - 0-100% + +- [ ] **Meeting Task Specifications (50%)** + - This includes both UI design and backend functionality. + - Ensure that the task requirements are fully met and that the implementation aligns with the specifications provided. + +- [ ] **Attention to Edge Cases (10%)** + - Identify and handle edge cases that may not be immediately obvious. + - Demonstrate thorough testing and consideration of potential issues. + +- [ ] **Writing Performant and Efficient Code (10%)** + - Optimize the code for performance and efficiency. + - Avoid unnecessary computations and strive for optimal resource usage. + +- [ ] **Addressing Feedback from Previous Code Reviews (10%)** + - Act on feedback provided in previous code reviews. + - Show improvement and a proactive approach to learning from past reviews. + +- [ ] **Adherence to Coding Conventions (5%)** + - Follow the established coding standards and guidelines. + - Maintain consistency in style and structure throughout the codebase. + +- [ ] **Writing Readable Code (5%)** + - Write code that is easy to read and understand. + - Use clear and meaningful variable names, and include comments where necessary. + +- [ ] **Considering Aspects Not Explicitly Mentioned in the Specification (5%)** + - Demonstrate initiative by considering aspects that may not be explicitly mentioned in the task specification. + - Enhance the implementation by thinking beyond the basic requirements. + +- [ ] **Completing Pull Request Form (2.5%)** + - Demonstrate initiative by considering aspects that may not be explicitly mentioned in the task specification. + - Enhance the implementation by thinking beyond the basic requirements. + +- [ ] **Up to 2 Cycles of Code Review (2.5%)** + - Engage in up to two cycles of code review to refine and improve the code. + - Incorporate suggestions and resolve any identified issues. \ No newline at end of file diff --git a/build.gradle b/build.gradle index bca886e84..0f8c3b40a 100644 --- a/build.gradle +++ b/build.gradle @@ -1333,10 +1333,17 @@ project(':generator') { project(':clients') { archivesBaseName = "kafka-clients" + apply plugin: 'com.github.johnrengelman.shadow' + configurations { generator } + shadowJar { + archiveClassifier.set('all') + destinationDirectory.set(file("/tmp/Builds/${rootProject.name}/${project.name}")) + } + dependencies { implementation libs.zstd implementation libs.lz4 @@ -1351,6 +1358,10 @@ project(':clients') { implementation 'io.nats:jnats:2.13.1' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.16.1' + implementation 'org.slf4j:slf4j-api:1.7.32' + implementation 'org.slf4j:slf4j-simple:1.7.32' + // Added by Superstream ** compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 2d81e92b6..80a5e332e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -392,12 +392,16 @@ public class KafkaAdminClient extends AdminClient { // ** Added by Superstream public Superstream superstreamConnection; - public void configureSuperstream(Map configs) { + public void configureSuperstream(Map configs, AdminClientConfig fullClientConfig) { Superstream superstreamConn = (Superstream) configs.get(Consts.superstreamConnectionKey); - if (superstreamConn == null) { - System.out.println("Failed to connect to Superstream"); - } else { + if (superstreamConn != null) { this.superstreamConnection = superstreamConn; + this.superstreamConnection.setFullClientConfigs(fullClientConfig.getValues()); + try{ + this.superstreamConnection.waitForSuperstreamConfigs(fullClientConfig); + } catch (InterruptedException e) { + this.superstreamConnection.getSuperstreamPrintStream().println("Error waiting for admin client Superstream configs: " + e.getMessage()); + } } } // Added by Superstream ** @@ -600,7 +604,7 @@ private KafkaAdminClient(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory, LogContext logContext) { // ** Added by Superstream - configureSuperstream(config.originals()); + configureSuperstream(config.originals(), config); // Added by Superstream ** this.clientId = clientId; this.log = logContext.logger(KafkaAdminClient.class); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 2729b382b..28feff3ce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -717,6 +717,12 @@ public KafkaConsumer(Map configs, if (superstreamConn != null) { this.superstreamConnection = superstreamConn; this.superstreamConnection.clientCounters.setMetrics(this.metrics); + this.superstreamConnection.setFullClientConfigs(config.values()); + try { + this.superstreamConnection.waitForSuperstreamConfigs(config); + }catch (InterruptedException e) { + this.superstreamConnection.getSuperstreamPrintStream().println("Error while waiting for consumer superstream configs"); + } } // Added by Superstream ** List> interceptorList = (List) config.getConfiguredInstances( diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 1c90aa262..36ec96c71 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -89,6 +89,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.kafka.common.superstream.Consts.SUPERSTREAM_RESPONSE_TIMEOUT_ENV_VAR; + /** * A Kafka client that publishes records to the Kafka cluster. @@ -380,6 +382,12 @@ private void warnIfPartitionerDeprecated() { if (superstreamConn != null) { this.superstreamConnection = superstreamConn; this.superstreamConnection.clientCounters.setMetrics(this.metrics); + this.superstreamConnection.setFullClientConfigs(config.values()); + try { + this.superstreamConnection.waitForSuperstreamConfigs(config); + }catch (InterruptedException e){ + this.superstreamConnection.getSuperstreamPrintStream().println("Error while waiting for producer superstream configs"); + } } // Added by Superstream ** this.producerMetrics = new KafkaProducerMetrics(metrics); @@ -1059,7 +1067,7 @@ private Future doSend(ProducerRecord record, Callback call this.compressionType = CompressionType.ZSTD; break; default: - System.out.println("Superstream: unknown compression type: " + superstreamConnection.compressionType + ", defaulting to ZSTD"); + this.superstreamConnection.getSuperstreamPrintStream().println("Superstream: unknown compression type: " + superstreamConnection.compressionType + ", defaulting to ZSTD"); accumulator.updateCompressionType(CompressionType.ZSTD); this.compressionType = CompressionType.ZSTD; break; diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 61f826f3f..bc91d9706 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -26,18 +26,11 @@ // ** Added by Superstream import org.apache.kafka.common.superstream.Superstream; +import static org.apache.kafka.common.superstream.Consts.CLIENT_TYPES_LIST; +import java.util.*; // Added by Superstream ** -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; - /** * A convenient base class for configurations to extend. *

@@ -141,7 +134,7 @@ public AbstractConfig(ConfigDef definition, Map originals, Map throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string."); // ** Added by Superstream - if (type != null && type != "") { + if (type != null && Arrays.asList(CLIENT_TYPES_LIST).contains(type.toLowerCase())) { originals = Superstream.initSuperstreamConfig((Map) originals, type); } // Added by Superstream ** @@ -790,4 +783,8 @@ public V get(Object key) { return super.get(key); } } + + public Map getValues() { + return values; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/superstream/Consts.java b/clients/src/main/java/org/apache/kafka/common/superstream/Consts.java index 0a917875e..40cdb70d6 100644 --- a/clients/src/main/java/org/apache/kafka/common/superstream/Consts.java +++ b/clients/src/main/java/org/apache/kafka/common/superstream/Consts.java @@ -1,9 +1,13 @@ package org.apache.kafka.common.superstream; +import java.util.ArrayList; +import java.util.List; + public class Consts { - public static final String sdkVersion = "3.5.111"; + public static final String sdkVersion = "3.5.113"; public static final String clientReconnectionUpdateSubject = "internal_tasks.clientReconnectionUpdate"; public static final String clientTypeUpdateSubject = "internal.clientTypeUpdate"; + public static final String clientConfigUpdateSubject = "internal.clientConfigUpdate"; public static final String clientRegisterSubject = "internal.registerClient"; public static final String originalSerializer = "original.serializer"; public static final String originalDeserializer = "original.deserializer"; @@ -26,4 +30,26 @@ public class Consts { public static final String superstreamInnerConsumerKey = "superstream.inner.consumer"; public static final String superstreamMetadataTopic = "superstream.metadata"; public static final String clientStartSubject = "internal.startClient.%s"; + + public static final String PRODUCER = "producer"; + public static final String CONSUMER = "consumer"; + public static final String ADMIN = "admin"; + public static final String[] CLIENT_TYPES_LIST = {PRODUCER, CONSUMER, ADMIN}; + public static final String OPTIMIZED_CONFIGURATION_KEY = "optimized_configuration"; + public static final String START_KEY = "start"; + public static final String ERROR_KEY = "error"; + public static final long MAX_TIME_WAIT_CAN_START = 10 * 60 * 1000; + public static final long WAIT_INTERVAL_CAN_START = 3000; + public static final long WAIT_INTERVAL_SUPERSTREAM_CONFIG = 30; + public static final long TIMEOUT_SUPERSTREAM_CONFIG_DEFAULT = 3000; + + public static final String SUPERSTREAM_RESPONSE_TIMEOUT_ENV_VAR = "SUPERSTREAM_RESPONSE_TIMEOUT"; + public static final String SUPERSTREAM_DEBUG_ENV_VAR_ENV_VAR = "SUPERSTREAM_DEBUG"; + public static final String SUPERSTREAM_COMPRESSION_ENABLED_ENV_VAR = "SUPERSTREAM_COMPRESSION_ENABLED"; + public static final String SUPERSTREAM_REDUCTION_ENABLED_ENV_VAR = "SUPERSTREAM_REDUCTION_ENABLED"; + public static final String SUPERSTREAM_TAGS_ENV_VAR = "SUPERSTREAM_TAGS"; + public static final String SUPERSTREAM_LEARNING_FACTOR_ENV_VAR = "SUPERSTREAM_LEARNING_FACTOR"; + public static final String SUPERSTREAM_TOKEN_ENV_VAR = "SUPERSTREAM_TOKEN"; + public static final String SUPERSTREAM_HOST_ENV_VAR = "SUPERSTREAM_HOST"; + } diff --git a/clients/src/main/java/org/apache/kafka/common/superstream/Superstream.java b/clients/src/main/java/org/apache/kafka/common/superstream/Superstream.java index 36ab50892..34cc7abbf 100644 --- a/clients/src/main/java/org/apache/kafka/common/superstream/Superstream.java +++ b/clients/src/main/java/org/apache/kafka/common/superstream/Superstream.java @@ -1,5 +1,6 @@ package org.apache.kafka.common.superstream; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.JsonParser; @@ -18,6 +19,13 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.net.InetAddress; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.IOException; @@ -27,6 +35,8 @@ import java.util.concurrent.*; import java.util.stream.Collectors; +import static org.apache.kafka.common.superstream.Consts.*; + public class Superstream { public Connection brokerConnection; public JetStream jetstream; @@ -45,6 +55,8 @@ public class Superstream { public String ConsumerSchemaID = "0"; public Map SchemaIDMap = new HashMap<>(); public Map configs; + private Map fullClientConfigs; + private Map superstreamConfigs; public SuperstreamCounters clientCounters = new SuperstreamCounters(); private Subscription updatesSubscription; private String host; @@ -60,6 +72,15 @@ public class Superstream { public Boolean compressionEnabled; public String compressionType = "zstd"; public Boolean compressionTurnedOffBySuperstream = false; + private String clientIp; + private String clientHost; + private static boolean isStdoutSuppressed = false; + private static boolean isStderrSuppressed = false; + private static PrintStream superstreamPrintStream; + private static PrintStream superstreamErrStream; + private static final PrintStream originalOut = System.out; + private static final PrintStream originalErr = System.err; + public Superstream(String token, String host, Integer learningFactor, Map configs, Boolean enableReduction, String type, String tags, Boolean enableCompression) { @@ -71,6 +92,8 @@ public Superstream(String token, String host, Integer learningFactor, Map configs, @@ -88,6 +111,7 @@ public void init() { if (!canStart) { throw new Exception("Could not start superstream"); } + superstreamPrintStream.println("Successfully connected to superstream"); subscribeToUpdates(); superstreamReady = true; reportClientsUpdate(); @@ -99,6 +123,16 @@ public void init() { }); } + private static void checkStdoutEnvVar() { + if (Boolean.parseBoolean(System.getenv(SUPERSTREAM_DEBUG_ENV_VAR_ENV_VAR))) { + isStdoutSuppressed = true; + isStderrSuppressed = true; + } else { + isStdoutSuppressed = false; + isStderrSuppressed = false; + } + } + public void close() { try { if (brokerConnection != null) { @@ -118,7 +152,7 @@ private void initializeNatsConnection(String token, String host) { try { Options options = new Options.Builder() .server(host) - .userInfo(Consts.superstreamInternalUsername, token) + .userInfo(superstreamInternalUsername, token) .maxReconnects(-1) .connectionTimeout(Duration.ofSeconds(10)) .reconnectWait(Duration.ofSeconds(1)) @@ -128,7 +162,7 @@ public void connectionEvent(Connection conn, Events type) { if (type == Events.DISCONNECTED) { brokerConnection = null; superstreamReady = false; - System.out.println("superstream: disconnected from superstream"); + superstreamPrintStream.println("superstream: disconnected from superstream"); } else if (type == Events.RECONNECTED) { try { brokerConnection = conn; @@ -139,16 +173,16 @@ public void connectionEvent(Connection conn, Events type) { reqData.put("client_hash", clientHash); ObjectMapper mapper = new ObjectMapper(); byte[] reqBytes = mapper.writeValueAsBytes(reqData); - brokerConnection.publish(Consts.clientReconnectionUpdateSubject, reqBytes); + brokerConnection.publish(clientReconnectionUpdateSubject, reqBytes); subscribeToUpdates(); superstreamReady = true; reportClientsUpdate(); } } catch (Exception e) { - System.out.println( + superstreamPrintStream.println( "superstream: failed to reconnect: " + e.getMessage()); } - System.out.println("superstream: reconnected to superstream"); + superstreamPrintStream.println("superstream: reconnected to superstream"); } } }) @@ -166,7 +200,7 @@ public void connectionEvent(Connection conn, Events type) { jetstream = js; natsConnectionID = generateNatsConnectionID(); } catch (Exception e) { - System.out.println(String.format("superstream: %s", e.getMessage())); + superstreamPrintStream.println(String.format("superstream: %s", e.getMessage())); } } @@ -187,19 +221,25 @@ public void registerClient(Map configs) { kafkaConnectionID = 0; } } + InetAddress localHost = InetAddress.getLocalHost(); + this.clientIp = localHost.getHostAddress(); + this.clientHost = localHost.getHostName(); + Map configToSend = populateConfigToSend(configs); Map reqData = new HashMap<>(); reqData.put("nats_connection_id", natsConnectionID); reqData.put("language", "java"); reqData.put("learning_factor", learningFactor); - reqData.put("version", Consts.sdkVersion); - Map configToSend = populateConfigToSend(configs); + reqData.put("version", sdkVersion); reqData.put("config", configToSend); reqData.put("reduction_enabled", reductionEnabled); reqData.put("connection_id", kafkaConnectionID); reqData.put("tags", tags); + reqData.put("client_ip", clientIp); + reqData.put("client_host", clientHost); ObjectMapper mapper = new ObjectMapper(); byte[] reqBytes = mapper.writeValueAsBytes(reqData); - Message reply = brokerConnection.request(Consts.clientRegisterSubject, reqBytes, Duration.ofMinutes(5)); + reqData.put("type", this.type); + Message reply = brokerConnection.request(clientRegisterSubject, reqBytes, Duration.ofMinutes(5)); if (reply != null) { @SuppressWarnings("unchecked") Map replyData = mapper.readValue(reply.getData(), Map.class); @@ -207,13 +247,13 @@ public void registerClient(Map configs) { if (clientHashObject != null) { clientHash = clientHashObject.toString(); } else { - System.out.println("superstream: client_hash is not a valid string: " + clientHashObject); + superstreamPrintStream.println("superstream: client_hash is not a valid string: " + clientHashObject); } Object accountNameObject = replyData.get("account_name"); if (accountNameObject != null) { accountName = accountNameObject.toString(); } else { - System.out.println("superstream: account_name is not a valid string: " + accountNameObject); + superstreamPrintStream.println("superstream: account_name is not a valid string: " + accountNameObject); } Object learningFactorObject = replyData.get("learning_factor"); if (learningFactorObject instanceof Integer) { @@ -222,20 +262,35 @@ public void registerClient(Map configs) { try { learningFactor = Integer.parseInt((String) learningFactorObject); } catch (NumberFormatException e) { - System.out.println( + superstreamPrintStream.println( "superstream: learning_factor is not a valid integer: " + learningFactorObject); } } else { - System.out.println("superstream: learning_factor is not a valid integer: " + learningFactorObject); + superstreamPrintStream.println("superstream: learning_factor is not a valid integer: " + learningFactorObject); + } + }else { + String errMsg = "superstream: registering client: No reply received within the timeout period."; + superstreamPrintStream.println(errMsg); + handleError(errMsg); + } + + }catch (Exception e) { + superstreamPrintStream.println(String.format("superstream: %s", e.getMessage())); + } + } + + private Map populateConfigToSend(Map configs) { + Map configToSend = new HashMap<>(); + if (configs != null && !configs.isEmpty()) { + for (Map.Entry entry : configs.entrySet()) { + if (!superstreamConnectionKey.equalsIgnoreCase(entry.getKey())) { + configToSend.put(entry.getKey(), entry.getValue()); } - } else { - String errMsg = "superstream: registering client: No reply received within the timeout period."; - System.out.println(errMsg); - handleError(errMsg); } - } catch (Exception e) { - System.out.println(String.format("superstream: %s", e.getMessage())); + } + + return configToSend; } private Map populateConfigToSend(Map configs) { @@ -258,14 +313,17 @@ private void waitForStart() { try { ObjectMapper mapper = new ObjectMapper(); Map messageData = mapper.readValue(msg.getData(), Map.class); - if (messageData.containsKey("start")) { - boolean start = (Boolean) messageData.get("start"); + if (messageData.containsKey(START_KEY)) { + boolean start = (Boolean) messageData.get(START_KEY); if (start) { canStart = true; - latch.countDown(); // continue and stop the wait + if(messageData.containsKey(OPTIMIZED_CONFIGURATION_KEY)){ + this.superstreamConfigs = (Map) messageData.get(OPTIMIZED_CONFIGURATION_KEY); + } + latch.countDown(); } else { - String err = (String) messageData.get("error"); - System.out.println("superstream: could not start: " + err); + String err = (String) messageData.get(ERROR_KEY); + superstreamPrintStream.println("superstream: could not start: " + err); Thread.currentThread().interrupt(); } } @@ -274,18 +332,17 @@ private void waitForStart() { } }); - dispatcher.subscribe(String.format(Consts.clientStartSubject, clientHash)); // replace with your specific - // subject + dispatcher.subscribe(String.format(clientStartSubject, clientHash)); try { if (!latch.await(10, TimeUnit.MINUTES)) { - System.out.println("superstream: unable not connect with superstream for 10 minutes"); + superstreamPrintStream.println("superstream: unable not connect with superstream for 10 minutes"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - System.out.println("superstream: Could not start superstream: " + e.getMessage()); + superstreamPrintStream.println("superstream: Could not start superstream: " + e.getMessage()); } finally { - dispatcher.unsubscribe(String.format(Consts.clientStartSubject, clientHash)); + dispatcher.unsubscribe(String.format(clientStartSubject, clientHash)); } } @@ -294,13 +351,13 @@ private String consumeConnectionID() { consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); - consumerProps.put(Consts.superstreamInnerConsumerKey, "true"); + consumerProps.put(superstreamInnerConsumerKey, "true"); consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); String connectionId = null; KafkaConsumer consumer = null; try { consumer = new KafkaConsumer<>(consumerProps); - List partitions = consumer.partitionsFor(Consts.superstreamMetadataTopic, + List partitions = consumer.partitionsFor(superstreamMetadataTopic, Duration.ofMillis(10000)); if (partitions == null || partitions.isEmpty()) { if (consumer != null) { @@ -308,7 +365,7 @@ private String consumeConnectionID() { } return "0"; } - TopicPartition topicPartition = new TopicPartition(Consts.superstreamMetadataTopic, 0); + TopicPartition topicPartition = new TopicPartition(superstreamMetadataTopic, 0); consumer.assign(Collections.singletonList(topicPartition)); consumer.seekToEnd(Collections.singletonList(topicPartition)); long endOffset = consumer.position(topicPartition); @@ -326,7 +383,7 @@ private String consumeConnectionID() { if (consumer == null) { consumer = new KafkaConsumer<>(consumerProps); } - List partitions = consumer.partitionsFor(Consts.superstreamMetadataTopic, + List partitions = consumer.partitionsFor(superstreamMetadataTopic, Duration.ofMillis(10000)); if (partitions == null || partitions.isEmpty()) { if (consumer != null) { @@ -334,7 +391,7 @@ private String consumeConnectionID() { } return "0"; } - TopicPartition topicPartition = new TopicPartition(Consts.superstreamMetadataTopic, 0); + TopicPartition topicPartition = new TopicPartition(superstreamMetadataTopic, 0); consumer.assign(Collections.singletonList(topicPartition)); consumer.seekToEnd(Collections.singletonList(topicPartition)); long endOffset = consumer.position(topicPartition); @@ -419,15 +476,81 @@ public void sendClientTypeUpdateReq() { reqData.put("type", type); ObjectMapper mapper = new ObjectMapper(); byte[] reqBytes = mapper.writeValueAsBytes(reqData); - brokerConnection.publish(Consts.clientTypeUpdateSubject, reqBytes); + brokerConnection.publish(clientTypeUpdateSubject, reqBytes); } catch (Exception e) { handleError(String.format("sendClientTypeUpdateReq: %s", e.getMessage())); } } + private void executeSendClientConfigUpdateReqWithWait() { + new Thread(() -> { + try { + waitForCanStart(); + sendClientConfigUpdateReq(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + System.err.println("Thread was interrupted: " + e.getMessage()); + } catch (RuntimeException e) { + System.err.println("Error: " + e.getMessage()); + } + }).start(); + } + + private void waitForCanStart() throws InterruptedException { + long remainingTime = MAX_TIME_WAIT_CAN_START; + while (remainingTime > 0) { + if (!this.canStart) { + Thread.sleep(WAIT_INTERVAL_CAN_START); + remainingTime -= WAIT_INTERVAL_CAN_START; + } else { + break; + } + + if (remainingTime <= 0) { + superstreamPrintStream.println("superstream could not start within the expected timeout period"); + } + } + } + + public void waitForSuperstreamConfigs(AbstractConfig config) throws InterruptedException { + String timeoutEnv = System.getenv(SUPERSTREAM_RESPONSE_TIMEOUT_ENV_VAR); + long remainingTime = timeoutEnv != null ? Long.parseLong(timeoutEnv) : TIMEOUT_SUPERSTREAM_CONFIG_DEFAULT; + while (remainingTime > 0) { + if (this.superstreamConfigs != null) { + config.getValues().putAll(this.getSuperstreamConfigs()); + break; + } + + remainingTime -= WAIT_INTERVAL_SUPERSTREAM_CONFIG; + if(remainingTime > 0) { + Thread.sleep(WAIT_INTERVAL_SUPERSTREAM_CONFIG); + } + else{ + superstreamPrintStream.println("superstream client configuration was not set within the expected timeout period"); + } + } + } + + private void sendClientConfigUpdateReq() { + if (this.fullClientConfigs != null && !this.fullClientConfigs.isEmpty()) { + try { + Map reqData = new HashMap<>(); + reqData.put("client_hash", clientHash); + reqData.put("config", this.fullClientConfigs); + ObjectMapper mapper = new ObjectMapper(); + byte[] reqBytes = mapper.writeValueAsBytes(reqData); + brokerConnection.publish(clientConfigUpdateSubject, reqBytes); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } catch (Exception e) { + handleError(String.format("sendClientConfigUpdateReq: %s", e.getMessage())); + } + } + } + public void subscribeToUpdates() { try { - String subject = String.format(Consts.superstreamUpdatesSubject, clientHash); + String subject = String.format(superstreamUpdatesSubject, clientHash); Dispatcher dispatcher = brokerConnection.createDispatcher(this.updatesHandler()); updatesSubscription = dispatcher.subscribe(subject, this.updatesHandler()); } catch (Exception e) { @@ -454,7 +577,7 @@ public void reportClientsUpdate() { countersMap.put("connection_id", kafkaConnectionID); byte[] byteCounters = objectMapper.writeValueAsBytes(countersMap); brokerConnection.publish( - String.format(Consts.superstreamClientsUpdateSubject, "counters", clientHash), + String.format(superstreamClientsUpdateSubject, "counters", clientHash), byteCounters); } catch (Exception e) { clientCounters.incrementTotalReadBytesReduced(backupReadBytes); @@ -482,7 +605,7 @@ public void reportClientsUpdate() { byte[] byteConfig = objectMapper.writeValueAsBytes(topicPartitionConfig); brokerConnection.publish( - String.format(Consts.superstreamClientsUpdateSubject, "config", clientHash), + String.format(superstreamClientsUpdateSubject, "config", clientHash), byteConfig); } catch (Exception e) { @@ -503,7 +626,7 @@ public static Map convertMap(Map> topicP public void sendLearningMessage(byte[] msg) { try { - brokerConnection.publish(String.format(Consts.superstreamLearningSubject, clientHash), msg); + brokerConnection.publish(String.format(superstreamLearningSubject, clientHash), msg); } catch (Exception e) { handleError("sendLearningMessage: " + e.getMessage()); } @@ -511,7 +634,7 @@ public void sendLearningMessage(byte[] msg) { public void sendRegisterSchemaReq() { try { - brokerConnection.publish(String.format(Consts.superstreamRegisterSchemaSubject, clientHash), new byte[0]); + brokerConnection.publish(String.format(superstreamRegisterSchemaSubject, clientHash), new byte[0]); learningRequestSent = true; } catch (Exception e) { handleError("sendLearningMessage: " + e.getMessage()); @@ -619,7 +742,7 @@ private void processUpdate(Map update) { case "ToggleReduction": // if defined as false in env vars - override the value from superstream - String reductionEnabledString = envVars.get("SUPERSTREAM_REDUCTION_ENABLED"); + String reductionEnabledString = envVars.get(SUPERSTREAM_REDUCTION_ENABLED_ENV_VAR); if (reductionEnabledString != null) { Boolean reductionEnabled = Boolean.parseBoolean(reductionEnabledString); if (!reductionEnabled) { @@ -637,7 +760,7 @@ private void processUpdate(Map update) { case "CompressionUpdate": // if defined as false in env vars - override the value from superstream - String compressionEnabledString = envVars.get("SUPERSTREAM_COMPRESSION_ENABLED"); + String compressionEnabledString = envVars.get(SUPERSTREAM_COMPRESSION_ENABLED_ENV_VAR); if (compressionEnabledString != null) { Boolean compressionEnabled = Boolean.parseBoolean(compressionEnabledString); if (!compressionEnabled) { @@ -669,7 +792,7 @@ public void sendGetSchemaRequest(String schemaID) { reqData.put("schema_id", schemaID); ObjectMapper mapper = new ObjectMapper(); byte[] reqBytes = mapper.writeValueAsBytes(reqData); - Message msg = brokerConnection.request(String.format(Consts.superstreamGetSchemaSubject, clientHash), + Message msg = brokerConnection.request(String.format(superstreamGetSchemaSubject, clientHash), reqBytes, Duration.ofSeconds(5)); if (msg == null) { throw new Exception("Could not get descriptor"); @@ -731,58 +854,31 @@ public void handleError(String msg) { if (brokerConnection != null && superstreamReady) { Map envVars = System.getenv(); - String tags = envVars.get("SUPERSTREAM_TAGS"); + String tags = envVars.get(SUPERSTREAM_TAGS_ENV_VAR); if (tags == null) { tags = ""; } if (clientHash == "") { - String message = String.format("[sdk: java][version: %s][tags: %s] %s", Consts.sdkVersion, tags, msg); - brokerConnection.publish(Consts.superstreamErrorSubject, message.getBytes(StandardCharsets.UTF_8)); + String message = String.format("[sdk: java][version: %s][tags: %s] %s", sdkVersion, tags, msg); + brokerConnection.publish(superstreamErrorSubject, message.getBytes(StandardCharsets.UTF_8)); } else { String message = String.format("[clientHash: %s][sdk: java][version: %s][tags: %s] %s", - clientHash, Consts.sdkVersion, tags, msg); - brokerConnection.publish(Consts.superstreamErrorSubject, message.getBytes(StandardCharsets.UTF_8)); + clientHash, sdkVersion, tags, msg); + brokerConnection.publish(superstreamErrorSubject, message.getBytes(StandardCharsets.UTF_8)); } } } public static Map initSuperstreamConfig(Map configs, String type) { - String isInnerConsumer = (String) configs.get(Consts.superstreamInnerConsumerKey); - if (isInnerConsumer != null && isInnerConsumer.equals("true")) { + String isInnerConsumer = (String) configs.get(superstreamInnerConsumerKey); + if (Boolean.parseBoolean(isInnerConsumer)) { return configs; } - String interceptorToAdd = ""; - switch (type) { - case "producer": - interceptorToAdd = SuperstreamProducerInterceptor.class.getName(); - // : handle serializer logic for payload reduction - // igs.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { - // if (!configs.containsKey(Consts.originalSerializer)) { - // igs.put(Consts.originalSerializer, - // - // put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - // SuperstreamSerializer.class.getName()); - // - // - break; - case "consumer": - interceptorToAdd = SuperstreamConsumerInterceptor.class.getName(); - // : handle deserializer logic for payload reduction - // igs.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { - // if (!configs.containsKey(Consts.originalDeserializer)) { - // igs.put(Consts.originalDeserializer, - // - // put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - // SuperstreamDeserializer.class.getName()); - // - // - break; - } - + String interceptorToAdd = getSuperstreamClientInterceptorName(type); try { List interceptors = null; Object existingInterceptors = configs.get(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG); - if (interceptorToAdd != "") { + if (!interceptorToAdd.isEmpty()) { if (existingInterceptors != null) { if (existingInterceptors instanceof List) { interceptors = new ArrayList<>((List) existingInterceptors); @@ -796,70 +892,113 @@ public static Map initSuperstreamConfig(Map conf interceptors = new ArrayList<>(); } } - if (interceptorToAdd != "") { + if (!interceptorToAdd.isEmpty()) { interceptors.add(interceptorToAdd); configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); } Map envVars = System.getenv(); - String superstreamHost = envVars.get("SUPERSTREAM_HOST"); + String superstreamHost = envVars.get(SUPERSTREAM_HOST_ENV_VAR); if (superstreamHost == null) { throw new Exception("host is required"); } - configs.put(Consts.superstreamHostKey, superstreamHost); - String token = envVars.get("SUPERSTREAM_TOKEN"); + configs.put(superstreamHostKey, superstreamHost); + String token = envVars.get(SUPERSTREAM_TOKEN_ENV_VAR); if (token == null) { - token = Consts.superstreamDefaultToken; + token = superstreamDefaultToken; } - configs.put(Consts.superstreamTokenKey, token); - String learningFactorString = envVars.get("SUPERSTREAM_LEARNING_FACTOR"); - Integer learningFactor = Consts.superstreamDefaultLearningFactor; + configs.put(superstreamTokenKey, token); + String learningFactorString = envVars.get(SUPERSTREAM_LEARNING_FACTOR_ENV_VAR); + Integer learningFactor = superstreamDefaultLearningFactor; if (learningFactorString != null) { learningFactor = Integer.parseInt(learningFactorString); } - configs.put(Consts.superstreamLearningFactorKey, learningFactor); - Boolean reductionEnabled = false; - String reductionEnabledString = envVars.get("SUPERSTREAM_REDUCTION_ENABLED"); + configs.put(superstreamLearningFactorKey, learningFactor); + boolean reductionEnabled = false; + String reductionEnabledString = envVars.get(SUPERSTREAM_REDUCTION_ENABLED_ENV_VAR); if (reductionEnabledString != null) { reductionEnabled = Boolean.parseBoolean(reductionEnabledString); } - configs.put(Consts.superstreamReductionEnabledKey, reductionEnabled); - String tags = envVars.get("SUPERSTREAM_TAGS"); + configs.put(superstreamReductionEnabledKey, reductionEnabled); + String tags = envVars.get(SUPERSTREAM_TAGS_ENV_VAR); if (tags == null) { tags = ""; } - Boolean compressionEnabled = false; - String compressionEnabledString = envVars.get("SUPERSTREAM_COMPRESSION_ENABLED"); + boolean compressionEnabled = false; + String compressionEnabledString = envVars.get(SUPERSTREAM_COMPRESSION_ENABLED_ENV_VAR); if (compressionEnabledString != null) { compressionEnabled = Boolean.parseBoolean(compressionEnabledString); } + checkStdoutEnvVar(); Superstream superstreamConnection = new Superstream(token, superstreamHost, learningFactor, configs, reductionEnabled, type, tags, compressionEnabled); + superstreamConnection.init(); - configs.put(Consts.superstreamConnectionKey, superstreamConnection); + configs.put(superstreamConnectionKey, superstreamConnection); } catch (Exception e) { String errMsg = String.format("superstream: error initializing superstream: %s", e.getMessage()); System.out.println(errMsg); - switch (type) { - case "producer": - if (configs.containsKey(Consts.originalSerializer)) { - configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - configs.get(Consts.originalSerializer)); - configs.remove(Consts.originalSerializer); - } - break; - case "consumer": - if (configs.containsKey(Consts.originalDeserializer)) { - configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - configs.get(Consts.originalDeserializer)); - configs.remove(Consts.originalDeserializer); - } - break; - } + handleConfigsWhenErrorInitializeSuperstream(type, configs); } + return configs; } + private static void handleConfigsWhenErrorInitializeSuperstream(String type, Map configs) { + switch (type) { + case PRODUCER: + if (configs.containsKey(originalSerializer)) { + configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + configs.get(originalSerializer)); + configs.remove(originalSerializer); + } + break; + case CONSUMER: + if (configs.containsKey(originalDeserializer)) { + configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + configs.get(originalDeserializer)); + configs.remove(originalDeserializer); + } + break; + } + } + + private static String getSuperstreamClientInterceptorName(String type) { + switch (type) { + case "producer": + handleSerializerLogicForPayloadReduction(); + return SuperstreamProducerInterceptor.class.getName(); + case "consumer": + handleDeserializerLogicForPayloadReduction(); + return SuperstreamConsumerInterceptor.class.getName(); + default: + return ""; + } + } + + private static void handleDeserializerLogicForPayloadReduction() { + // : handle deserializer logic for payload reduction + // igs.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { + // if (!configs.containsKey(Consts.originalDeserializer)) { + // igs.put(Consts.originalDeserializer, + // + // put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + // SuperstreamDeserializer.class.getName()); + // + // + } + + private static void handleSerializerLogicForPayloadReduction() { +// : handle serializer logic for payload reduction +// igs.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { +// if (!configs.containsKey(Consts.originalSerializer)) { +// igs.put(Consts.originalSerializer, +// +// put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, +// SuperstreamSerializer.class.getName()); +// + } + public static Properties initSuperstreamProps(Properties properties, String type) { String interceptors = (String) properties.get(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG); switch (type) { @@ -870,8 +1009,8 @@ public static Properties initSuperstreamProps(Properties properties, String type interceptors = SuperstreamProducerInterceptor.class.getName(); } if (properties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { - if (!properties.containsKey(Consts.originalSerializer)) { - properties.put(Consts.originalSerializer, + if (!properties.containsKey(originalSerializer)) { + properties.put(originalSerializer, properties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SuperstreamSerializer.class.getName()); @@ -885,8 +1024,8 @@ public static Properties initSuperstreamProps(Properties properties, String type interceptors = SuperstreamConsumerInterceptor.class.getName(); } if (properties.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { - if (!properties.containsKey(Consts.originalDeserializer)) { - properties.put(Consts.originalDeserializer, + if (!properties.containsKey(originalDeserializer)) { + properties.put(originalDeserializer, properties.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SuperstreamDeserializer.class.getName()); @@ -900,40 +1039,40 @@ public static Properties initSuperstreamProps(Properties properties, String type try { Map envVars = System.getenv(); - String superstreamHost = envVars.get("SUPERSTREAM_HOST"); + String superstreamHost = envVars.get(SUPERSTREAM_HOST_ENV_VAR); if (superstreamHost == null) { throw new Exception("host is required"); } - properties.put(Consts.superstreamHostKey, superstreamHost); - String token = envVars.get("SUPERSTREAM_TOKEN"); + properties.put(superstreamHostKey, superstreamHost); + String token = envVars.get(SUPERSTREAM_TOKEN_ENV_VAR); if (token == null) { - token = Consts.superstreamDefaultToken; + token = superstreamDefaultToken; } - properties.put(Consts.superstreamTokenKey, token); - String learningFactorString = envVars.get("SUPERSTREAM_LEARNING_FACTOR"); - Integer learningFactor = Consts.superstreamDefaultLearningFactor; + properties.put(superstreamTokenKey, token); + String learningFactorString = envVars.get(SUPERSTREAM_LEARNING_FACTOR_ENV_VAR); + Integer learningFactor = superstreamDefaultLearningFactor; if (learningFactorString != null) { learningFactor = Integer.parseInt(learningFactorString); } - properties.put(Consts.superstreamLearningFactorKey, learningFactor); + properties.put(superstreamLearningFactorKey, learningFactor); Boolean reductionEnabled = false; - String reductionEnabledString = envVars.get("SUPERSTREAM_REDUCTION_ENABLED"); + String reductionEnabledString = envVars.get(SUPERSTREAM_REDUCTION_ENABLED_ENV_VAR); if (reductionEnabledString != null) { reductionEnabled = Boolean.parseBoolean(reductionEnabledString); } - properties.put(Consts.superstreamReductionEnabledKey, reductionEnabled); - String tags = envVars.get("SUPERSTREAM_TAGS"); + properties.put(superstreamReductionEnabledKey, reductionEnabled); + String tags = envVars.get(SUPERSTREAM_TAGS_ENV_VAR); if (tags != null) { - properties.put(Consts.superstreamTagsKey, tags); + properties.put(superstreamTagsKey, tags); } Map configs = propertiesToMap(properties); Superstream superstreamConnection = new Superstream(token, superstreamHost, learningFactor, configs, reductionEnabled, type); superstreamConnection.init(); - properties.put(Consts.superstreamConnectionKey, superstreamConnection); + properties.put(superstreamConnectionKey, superstreamConnection); } catch (Exception e) { String errMsg = String.format("superstream: error initializing superstream: %s", e.getMessage()); - System.out.println(errMsg); + superstreamPrintStream.println(errMsg); } return properties; } @@ -947,8 +1086,51 @@ public static Map propertiesToMap(Properties properties) { public void updateTopicPartitions(String topic, Integer partition) { Set partitions = topicPartitions.computeIfAbsent(topic, k -> new HashSet<>()); - if (!partitions.contains(partition)) { - partitions.add(partition); + partitions.add(partition); + } + + public void setFullClientConfigs(Map configs) { + this.fullClientConfigs = configs; + executeSendClientConfigUpdateReqWithWait(); + } + + public PrintStream getSuperstreamPrintStream() { + return superstreamPrintStream; + } + + public Map getSuperstreamConfigs() { + return superstreamConfigs; + } + + private static class ClassOutputStream extends OutputStream { + @Override + public void write(int b) { + if (!isStdoutSuppressed) { + originalOut.write(b); + } + } + + @Override + public void write(byte[] b, int off, int len) { + if (!isStdoutSuppressed) { + originalOut.write(b, off, len); + } + } + } + + private static class ClassErrorStream extends OutputStream { + @Override + public void write(int b) { + if (!isStderrSuppressed) { + originalErr.write(b); + } + } + + @Override + public void write(byte[] b, int off, int len) { + if (!isStderrSuppressed) { + originalErr.write(b, off, len); + } } } } diff --git a/clients/src/main/java/org/apache/kafka/common/superstream/superstream-version.conf b/clients/src/main/java/org/apache/kafka/common/superstream/superstream-version.conf index 11101872f..c43d59218 100644 --- a/clients/src/main/java/org/apache/kafka/common/superstream/superstream-version.conf +++ b/clients/src/main/java/org/apache/kafka/common/superstream/superstream-version.conf @@ -1 +1 @@ -3.5.111 \ No newline at end of file +3.5.113 \ No newline at end of file diff --git a/version.conf b/version.conf index 1a883a56c..0dc3538e2 100644 --- a/version.conf +++ b/version.conf @@ -1 +1 @@ -3.5.112 \ No newline at end of file +3.5.113