Skip to content

Commit

Permalink
Merge pull request #18 from flazarus1A/bugfix/Fix-doc
Browse files Browse the repository at this point in the history
fix(doc): fix broken code includes
  • Loading branch information
flazarus1A authored Jan 25, 2024
2 parents f83816c + 39e2ed6 commit d80de95
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 10 deletions.
73 changes: 69 additions & 4 deletions docs/modules/ROOT/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,18 @@ The POJO is read from kafka, or written to kafka, with the POJO's JSON textual r
.PojoProcessor.java
[source,java]
----
include::../../../../integration-tests/json-pojo/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/jsonpojo/PojoProcessor.java[]
@Slf4j
@Processor // <1>
public class PojoProcessor extends ContextualProcessor<String, SamplePojo, String, SamplePojo> { // <2>
@Override
public void process(Record<String, SamplePojo> record) { // <3>
String reversedMsg = new StringBuilder(record.value().getStringField()).reverse().toString();
log.info("Received value {} sending back {} in response", record.value().getStringField(), reversedMsg);
SamplePojo pojo = new SamplePojo(reversedMsg, record.value().getNumericalField() + 37,
!record.value().getBooleanField());
context().forward(record.withValue(pojo)); // <4>
}
}
----

<1> Your Processor is declared with the annotation as for a regular processor.
Expand Down Expand Up @@ -281,7 +292,18 @@ The POJO is read from kafka, or written to kafka, with a custom serialization fo
.PojoProcessor.java
[source,java]
----
include::{sourcedir}/io/quarkiverse/kafkastreamsprocessor/sample/jsonpojo/PojoProcessor.java[tag=pojoprocessor]
@Slf4j
@Processor // <1>
public class PojoProcessor extends ContextualProcessor<String, SamplePojo, String, SamplePojo> { // <2>
@Override
public void process(Record<String, SamplePojo> record) { // <3>
String reversedMsg = new StringBuilder(record.value().getStringField()).reverse().toString();
log.info("Received value {} sending back {} in response", record.value().getStringField(), reversedMsg);
SamplePojo pojo = new SamplePojo(reversedMsg, record.value().getNumericalField() + 37,
!record.value().getBooleanField());
context().forward(record.withValue(pojo)); // <4>
}
}
----
<1> Your Processor is declared with the annotation as for a regular processor.
<2> The handled value type, in this example, is a simple POJO, nothing fancy.
Expand Down Expand Up @@ -588,7 +610,37 @@ In this example, we implement a processor which is using a State-Store.
.StateStoreProcessor.java
[source,java]
----
include::{sourcedir}/io/quarkiverse/kafkastreamsprocessor/sample/stateful/PingProcessor.java[tag=statefulproc]
@Slf4j
@Processor // <1>
public class PingProcessor extends ContextualProcessor<String, Ping, String, Ping> {
private KeyValueStore<String, String> pingData;
@Override
public void init(ProcessorContext<String, Ping> context) {
super.init(context);
pingData = context.getStateStore("ping-data"); // <2>
context.schedule(Duration.ofMillis(1L), PunctuationType.STREAM_TIME, new DuplicateValuePunctuator(pingData));
}
/**
* {@inheritDoc}
*/
@Override
public void process(Record<String, Ping> record) {
log.info("Process the message: {}", record.value().getMessage());
String previousValue = pingData.get(record.key());
pingData.put(record.key(), record.value().getMessage());
if (previousValue == null) {
context().forward(
record.withValue(Ping.newBuilder().setMessage("Store initialization OK for " + record.key()).build()));
} else {
context().forward(record.withValue(
Ping.newBuilder().setMessage("Previous value for " + record.key() + " is " + previousValue).build()));
}
}
}
----
<1> Your Processor is declared with the annotation as for a regular processor.
<2> The definition and initialization of your state store.
Expand All @@ -598,7 +650,20 @@ include::{sourcedir}/io/quarkiverse/kafkastreamsprocessor/sample/stateful/PingPr
.SampleConfigurationCustomizer.java
[source,java]
----
include::{sourcedir}/io/quarkiverse/kafkastreamsprocessor/sample/stateful/SampleConfigurationCustomizer.java[tag=customconf]
@Dependent
public class SampleConfigurationCustomizer implements ConfigurationCustomizer {
@Override
public void fillConfiguration(Configuration configuration) {
List<StoreConfiguration> storeConfigurations = new ArrayList<>();
// Add a key value store for indexes
StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("ping-data"),
Serdes.String(),
Serdes.String());
storeConfigurations.add(new StoreConfiguration(storeBuilder));
configuration.setStoreConfigurations(storeConfigurations);
}
}
----

The `io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer` implementation provided sets the required State-Store configurations into the method's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.quarkiverse.kafkastreamsprocessor.api.Processor;
import lombok.extern.slf4j.Slf4j;

// tag::pojoprocessor[]
@Slf4j
@Processor // <1>
public class PojoProcessor extends ContextualProcessor<String, SamplePojo, String, SamplePojo> { // <2>
Expand All @@ -38,4 +37,3 @@ public void process(Record<String, SamplePojo> record) { // <3>
context().forward(record.withValue(pojo)); // <4>
}
}
// end::pojoprocessor[]
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
* &lt;previous-value&gt;</b></li>
* </ul>
*/
// tag::statefulproc[]
@Slf4j
@Processor // <1>
public class PingProcessor extends ContextualProcessor<String, Ping, String, Ping> {
Expand Down Expand Up @@ -72,4 +71,3 @@ public void process(Record<String, Ping> record) {
}
}
}
// end::statefulproc[]
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer;
import io.quarkiverse.kafkastreamsprocessor.api.configuration.store.StoreConfiguration;

// tag::customconf[]
@Dependent
public class SampleConfigurationCustomizer implements ConfigurationCustomizer {
@Override
Expand All @@ -48,4 +47,3 @@ public void fillConfiguration(Configuration configuration) {
configuration.setStoreConfigurations(storeConfigurations);
}
}
// end::customconf[]

0 comments on commit d80de95

Please sign in to comment.