Skip to content

Commit

Permalink
3.5.1 beta (#178)
Browse files Browse the repository at this point in the history
* add github files

* client config as one json (#166)

* client config as one json

* remove logic

---------

Co-authored-by: Idan Asulin <[email protected]>

* test fat jar

* fix branch

* add plugin

* add archive option

* fix serve file

* fix

* test app

* fix

* fix

* Rnd 759 client parameters to store (#168)

* client config as one json

* remove logic

* remove superstream connection entry from configToSend

* update version beta file

* fix

* remove comments

* remove imports

* support update full config for client

* change the versions and pass client ip and host

* handle consumer full config update

* handle consumer full config update

* gradle_build_imports

* handle admin client update full config

---------

Co-authored-by: Idan Asulin <[email protected]>

* Rnd 759 client parameters to store (#169)

* client config as one json

* remove logic

* remove superstream connection entry from configToSend

* update version beta file

* fix

* remove comments

* remove imports

* support update full config for client

* change the versions and pass client ip and host

* handle consumer full config update

* handle consumer full config update

* gradle_build_imports

* handle admin client update full config

* back to 113

---------

Co-authored-by: Idan Asulin <[email protected]>

* revert none relevant changes into Jenkinsfile

* remove none relevant comments

* Rnd 759 client parameters to store (#172)

* client config as one json

* remove logic

* remove superstream connection entry from configToSend

* update version beta file

* fix

* remove comments

* remove imports

* support update full config for client

* change the versions and pass client ip and host

* handle consumer full config update

* handle consumer full config update

* gradle_build_imports

* handle admin client update full config

* back to 113

* buildNewGradle

* wait mechanism for canstart

* synchronized the other thread

* wait mechanism refactor

* remove comment

* beta version upgrade

---------

Co-authored-by: Idan Asulin <[email protected]>

* added SUPERSTREAM_DEBUG env var handle- disable and able all stdout (#173)

* added SUPERSTREAM_DEBUG env var handle- disable and able all stdout

* version beta

* version beta --> 3

* refactor with consts for initSuperstreamConfig method

* change SUPERSTREAM_DEBUG env var affect only for superstream stdout

* log for test

* revert test log

* change consts env var names

* revert partitions.contains

* serielizer/desirielizer handle for payload reduction. empty methods

* revert contains check

* stdout handle outside of superstream class

* changed superstream connection log in adnminKafka

* move log for Successfully connection to superstream after waitForStart

* move it again

* Rnd 955 support in changing client config parameters (#174)

* mechnisim of wait for superstream config and config bootstrap servers remove for test

* move place for test

* add getter to abstract config values. remove the bootstrap servers key-val

* set the superstream config vaues inside kafka producer config

* wait for super stream config move to super stream class, wait with object lock to support release cpu in wait interval

* refactor for waiting methods

* default timeout for superstream config

* move getter location

* list of supported client added in consts- we register clients only if type in the list

* to lower case added

* move type check to the AbstractConfig

* move import

* upgrade beta version-beta.conf

* Rnd 955 support in changing client config parameters (#175)

* mechnisim of wait for superstream config and config bootstrap servers remove for test

* move place for test

* add getter to abstract config values. remove the bootstrap servers key-val

* set the superstream config vaues inside kafka producer config

* wait for super stream config move to super stream class, wait with object lock to support release cpu in wait interval

* refactor for waiting methods

* default timeout for superstream config

* move getter location

* list of supported client added in consts- we register clients only if type in the list

* to lower case added

* move type check to the AbstractConfig

* move import

* upgrade beta version-beta.conf

* fix stdout when superstream failed initializing

* upgrade version beta

* remove_pr_template (#176)

* Remove pr template (#177)

* remove_pr_template

* move PULL_REQUEST_TEMPLATE

---------

Co-authored-by: idanasulinStrech <[email protected]>
Co-authored-by: liranbahar <[email protected]>
Co-authored-by: Idan Asulin <[email protected]>
Co-authored-by: Beka Kotchauri <[email protected]>
Co-authored-by: Beka Kotchauri <[email protected]>
  • Loading branch information
6 people authored Aug 26, 2024
1 parent aa46b48 commit 5001b76
Show file tree
Hide file tree
Showing 13 changed files with 437 additions and 166 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* @idanasulin2706 @shay23b
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,4 @@ Triggered by: ${env.TRIGGERED_BY}
:link: *Build URL:* <${jobUrl}|View Build Details>
"""
)
}
}
64 changes: 50 additions & 14 deletions PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,50 @@
*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*

*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
## How Has This Been Tested?

Please describe the tests that you ran to verify your changes. Please also note any relevant details for your test configuration.
- [ ] Test A
- [ ] Test B

## Checklist:

- [ ] I have performed a self-review of my code
- [ ] I have made corresponding changes to the knowledge base (if needed)
- [ ] My changes generate no new warnings
- [ ] I have verified that the specification is met and all functionalities are working as expected

## Reviewer Score - 0-100%

- [ ] **Meeting Task Specifications (50%)**
- This includes both UI design and backend functionality.
- Ensure that the task requirements are fully met and that the implementation aligns with the specifications provided.

- [ ] **Attention to Edge Cases (10%)**
- Identify and handle edge cases that may not be immediately obvious.
- Demonstrate thorough testing and consideration of potential issues.

- [ ] **Writing Performant and Efficient Code (10%)**
- Optimize the code for performance and efficiency.
- Avoid unnecessary computations and strive for optimal resource usage.

- [ ] **Addressing Feedback from Previous Code Reviews (10%)**
- Act on feedback provided in previous code reviews.
- Show improvement and a proactive approach to learning from past reviews.

- [ ] **Adherence to Coding Conventions (5%)**
- Follow the established coding standards and guidelines.
- Maintain consistency in style and structure throughout the codebase.

- [ ] **Writing Readable Code (5%)**
- Write code that is easy to read and understand.
- Use clear and meaningful variable names, and include comments where necessary.

- [ ] **Considering Aspects Not Explicitly Mentioned in the Specification (5%)**
- Demonstrate initiative by considering aspects that may not be explicitly mentioned in the task specification.
- Enhance the implementation by thinking beyond the basic requirements.

- [ ] **Completing Pull Request Form (2.5%)**
- Demonstrate initiative by considering aspects that may not be explicitly mentioned in the task specification.
- Enhance the implementation by thinking beyond the basic requirements.

- [ ] **Up to 2 Cycles of Code Review (2.5%)**
- Engage in up to two cycles of code review to refine and improve the code.
- Incorporate suggestions and resolve any identified issues.
11 changes: 11 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1333,10 +1333,17 @@ project(':generator') {
project(':clients') {
archivesBaseName = "kafka-clients"

apply plugin: 'com.github.johnrengelman.shadow'

configurations {
generator
}

shadowJar {
archiveClassifier.set('all')
destinationDirectory.set(file("/tmp/Builds/${rootProject.name}/${project.name}"))
}

dependencies {
implementation libs.zstd
implementation libs.lz4
Expand All @@ -1351,6 +1358,10 @@ project(':clients') {

implementation 'io.nats:jnats:2.13.1'

implementation 'com.fasterxml.jackson.core:jackson-databind:2.16.1'
implementation 'org.slf4j:slf4j-api:1.7.32'
implementation 'org.slf4j:slf4j-simple:1.7.32'

// Added by Superstream **

compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,12 +392,16 @@ public class KafkaAdminClient extends AdminClient {

// ** Added by Superstream
public Superstream superstreamConnection;
public void configureSuperstream(Map<String, ?> configs) {
public void configureSuperstream(Map<String, ?> configs, AdminClientConfig fullClientConfig) {
Superstream superstreamConn = (Superstream) configs.get(Consts.superstreamConnectionKey);
if (superstreamConn == null) {
System.out.println("Failed to connect to Superstream");
} else {
if (superstreamConn != null) {
this.superstreamConnection = superstreamConn;
this.superstreamConnection.setFullClientConfigs(fullClientConfig.getValues());
try{
this.superstreamConnection.waitForSuperstreamConfigs(fullClientConfig);
} catch (InterruptedException e) {
this.superstreamConnection.getSuperstreamPrintStream().println("Error waiting for admin client Superstream configs: " + e.getMessage());
}
}
}
// Added by Superstream **
Expand Down Expand Up @@ -600,7 +604,7 @@ private KafkaAdminClient(AdminClientConfig config,
TimeoutProcessorFactory timeoutProcessorFactory,
LogContext logContext) {
// ** Added by Superstream
configureSuperstream(config.originals());
configureSuperstream(config.originals(), config);
// Added by Superstream **
this.clientId = clientId;
this.log = logContext.logger(KafkaAdminClient.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,12 @@ public KafkaConsumer(Map<String, Object> configs,
if (superstreamConn != null) {
this.superstreamConnection = superstreamConn;
this.superstreamConnection.clientCounters.setMetrics(this.metrics);
this.superstreamConnection.setFullClientConfigs(config.values());
try {
this.superstreamConnection.waitForSuperstreamConfigs(config);
}catch (InterruptedException e) {
this.superstreamConnection.getSuperstreamPrintStream().println("Error while waiting for consumer superstream configs");
}
}
// Added by Superstream **
List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.kafka.common.superstream.Consts.SUPERSTREAM_RESPONSE_TIMEOUT_ENV_VAR;


/**
* A Kafka client that publishes records to the Kafka cluster.
Expand Down Expand Up @@ -380,6 +382,12 @@ private void warnIfPartitionerDeprecated() {
if (superstreamConn != null) {
this.superstreamConnection = superstreamConn;
this.superstreamConnection.clientCounters.setMetrics(this.metrics);
this.superstreamConnection.setFullClientConfigs(config.values());
try {
this.superstreamConnection.waitForSuperstreamConfigs(config);
}catch (InterruptedException e){
this.superstreamConnection.getSuperstreamPrintStream().println("Error while waiting for producer superstream configs");
}
}
// Added by Superstream **
this.producerMetrics = new KafkaProducerMetrics(metrics);
Expand Down Expand Up @@ -1059,7 +1067,7 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
this.compressionType = CompressionType.ZSTD;
break;
default:
System.out.println("Superstream: unknown compression type: " + superstreamConnection.compressionType + ", defaulting to ZSTD");
this.superstreamConnection.getSuperstreamPrintStream().println("Superstream: unknown compression type: " + superstreamConnection.compressionType + ", defaulting to ZSTD");
accumulator.updateCompressionType(CompressionType.ZSTD);
this.compressionType = CompressionType.ZSTD;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,11 @@

// ** Added by Superstream
import org.apache.kafka.common.superstream.Superstream;
import static org.apache.kafka.common.superstream.Consts.CLIENT_TYPES_LIST;
import java.util.*;
// Added by Superstream **

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;

/**
* A convenient base class for configurations to extend.
* <p>
Expand Down Expand Up @@ -141,7 +134,7 @@ public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?>
throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string.");

// ** Added by Superstream
if (type != null && type != "") {
if (type != null && Arrays.asList(CLIENT_TYPES_LIST).contains(type.toLowerCase())) {
originals = Superstream.initSuperstreamConfig((Map<String, Object>) originals, type);
}
// Added by Superstream **
Expand Down Expand Up @@ -790,4 +783,8 @@ public V get(Object key) {
return super.get(key);
}
}

public Map<String, Object> getValues() {
return values;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package org.apache.kafka.common.superstream;

import java.util.ArrayList;
import java.util.List;

public class Consts {
public static final String sdkVersion = "3.5.111";
public static final String sdkVersion = "3.5.113";
public static final String clientReconnectionUpdateSubject = "internal_tasks.clientReconnectionUpdate";
public static final String clientTypeUpdateSubject = "internal.clientTypeUpdate";
public static final String clientConfigUpdateSubject = "internal.clientConfigUpdate";
public static final String clientRegisterSubject = "internal.registerClient";
public static final String originalSerializer = "original.serializer";
public static final String originalDeserializer = "original.deserializer";
Expand All @@ -26,4 +30,26 @@ public class Consts {
public static final String superstreamInnerConsumerKey = "superstream.inner.consumer";
public static final String superstreamMetadataTopic = "superstream.metadata";
public static final String clientStartSubject = "internal.startClient.%s";

public static final String PRODUCER = "producer";
public static final String CONSUMER = "consumer";
public static final String ADMIN = "admin";
public static final String[] CLIENT_TYPES_LIST = {PRODUCER, CONSUMER, ADMIN};
public static final String OPTIMIZED_CONFIGURATION_KEY = "optimized_configuration";
public static final String START_KEY = "start";
public static final String ERROR_KEY = "error";
public static final long MAX_TIME_WAIT_CAN_START = 10 * 60 * 1000;
public static final long WAIT_INTERVAL_CAN_START = 3000;
public static final long WAIT_INTERVAL_SUPERSTREAM_CONFIG = 30;
public static final long TIMEOUT_SUPERSTREAM_CONFIG_DEFAULT = 3000;

public static final String SUPERSTREAM_RESPONSE_TIMEOUT_ENV_VAR = "SUPERSTREAM_RESPONSE_TIMEOUT";
public static final String SUPERSTREAM_DEBUG_ENV_VAR_ENV_VAR = "SUPERSTREAM_DEBUG";
public static final String SUPERSTREAM_COMPRESSION_ENABLED_ENV_VAR = "SUPERSTREAM_COMPRESSION_ENABLED";
public static final String SUPERSTREAM_REDUCTION_ENABLED_ENV_VAR = "SUPERSTREAM_REDUCTION_ENABLED";
public static final String SUPERSTREAM_TAGS_ENV_VAR = "SUPERSTREAM_TAGS";
public static final String SUPERSTREAM_LEARNING_FACTOR_ENV_VAR = "SUPERSTREAM_LEARNING_FACTOR";
public static final String SUPERSTREAM_TOKEN_ENV_VAR = "SUPERSTREAM_TOKEN";
public static final String SUPERSTREAM_HOST_ENV_VAR = "SUPERSTREAM_HOST";

}
Loading

0 comments on commit 5001b76

Please sign in to comment.