Skip to content

Commit

Permalink
fix(doc): fix broken code includes
Browse files Browse the repository at this point in the history
Doc aggregation on the Quarkiverse is not compatible with the inclusion
of code samples.
Replace include statements by the original content, and remove the
associated tags in source code
  • Loading branch information
flazarus1A committed Jan 25, 2024
1 parent 493a471 commit 39e2ed6
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 10 deletions.
73 changes: 69 additions & 4 deletions docs/modules/ROOT/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,18 @@ The POJO is read from kafka, or written to kafka, with the POJO's JSON textual r
.PojoProcessor.java
[source,java]
----
include::../../../../integration-tests/json-pojo/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/jsonpojo/PojoProcessor.java[]
@Slf4j
@Processor // <1>
public class PojoProcessor extends ContextualProcessor<String, SamplePojo, String, SamplePojo> { // <2>
@Override
public void process(Record<String, SamplePojo> record) { // <3>
String reversedMsg = new StringBuilder(record.value().getStringField()).reverse().toString();
log.info("Received value {} sending back {} in response", record.value().getStringField(), reversedMsg);
SamplePojo pojo = new SamplePojo(reversedMsg, record.value().getNumericalField() + 37,
!record.value().getBooleanField());
context().forward(record.withValue(pojo)); // <4>
}
}
----

<1> Your Processor is declared with the annotation as for a regular processor.
Expand Down Expand Up @@ -281,7 +292,18 @@ The POJO is read from kafka, or written to kafka, with a custom serialization fo
.PojoProcessor.java
[source,java]
----
include::{sourcedir}/io/quarkiverse/kafkastreamsprocessor/sample/jsonpojo/PojoProcessor.java[tag=pojoprocessor]
@Slf4j
@Processor // <1>
public class PojoProcessor extends ContextualProcessor<String, SamplePojo, String, SamplePojo> { // <2>
@Override
public void process(Record<String, SamplePojo> record) { // <3>
String reversedMsg = new StringBuilder(record.value().getStringField()).reverse().toString();
log.info("Received value {} sending back {} in response", record.value().getStringField(), reversedMsg);
SamplePojo pojo = new SamplePojo(reversedMsg, record.value().getNumericalField() + 37,
!record.value().getBooleanField());
context().forward(record.withValue(pojo)); // <4>
}
}
----
<1> Your Processor is declared with the annotation as for a regular processor.
<2> The handled value type, in this example, is a simple POJO, nothing fancy.
Expand Down Expand Up @@ -588,7 +610,37 @@ In this example, we implement a processor which is using a State-Store.
.StateStoreProcessor.java
[source,java]
----
include::{sourcedir}/io/quarkiverse/kafkastreamsprocessor/sample/stateful/PingProcessor.java[tag=statefulproc]
@Slf4j
@Processor // <1>
public class PingProcessor extends ContextualProcessor<String, Ping, String, Ping> {
private KeyValueStore<String, String> pingData;
@Override
public void init(ProcessorContext<String, Ping> context) {
super.init(context);
pingData = context.getStateStore("ping-data"); // <2>
context.schedule(Duration.ofMillis(1L), PunctuationType.STREAM_TIME, new DuplicateValuePunctuator(pingData));
}
/**
* {@inheritDoc}
*/
@Override
public void process(Record<String, Ping> record) {
log.info("Process the message: {}", record.value().getMessage());
String previousValue = pingData.get(record.key());
pingData.put(record.key(), record.value().getMessage());
if (previousValue == null) {
context().forward(
record.withValue(Ping.newBuilder().setMessage("Store initialization OK for " + record.key()).build()));
} else {
context().forward(record.withValue(
Ping.newBuilder().setMessage("Previous value for " + record.key() + " is " + previousValue).build()));
}
}
}
----
<1> Your Processor is declared with the annotation as for a regular processor.
<2> The definition and initialization of your state store.
Expand All @@ -598,7 +650,20 @@ include::{sourcedir}/io/quarkiverse/kafkastreamsprocessor/sample/stateful/PingPr
.SampleConfigurationCustomizer.java
[source,java]
----
include::{sourcedir}/io/quarkiverse/kafkastreamsprocessor/sample/stateful/SampleConfigurationCustomizer.java[tag=customconf]
@Dependent
public class SampleConfigurationCustomizer implements ConfigurationCustomizer {
@Override
public void fillConfiguration(Configuration configuration) {
List<StoreConfiguration> storeConfigurations = new ArrayList<>();
// Add a key value store for indexes
StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("ping-data"),
Serdes.String(),
Serdes.String());
storeConfigurations.add(new StoreConfiguration(storeBuilder));
configuration.setStoreConfigurations(storeConfigurations);
}
}
----

The `io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer` implementation provided sets the required State-Store configurations into the method's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.quarkiverse.kafkastreamsprocessor.api.Processor;
import lombok.extern.slf4j.Slf4j;

// tag::pojoprocessor[]
@Slf4j
@Processor // <1>
public class PojoProcessor extends ContextualProcessor<String, SamplePojo, String, SamplePojo> { // <2>
Expand All @@ -38,4 +37,3 @@ public void process(Record<String, SamplePojo> record) { // <3>
context().forward(record.withValue(pojo)); // <4>
}
}
// end::pojoprocessor[]
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
* &lt;previous-value&gt;</b></li>
* </ul>
*/
// tag::statefulproc[]
@Slf4j
@Processor // <1>
public class PingProcessor extends ContextualProcessor<String, Ping, String, Ping> {
Expand Down Expand Up @@ -72,4 +71,3 @@ public void process(Record<String, Ping> record) {
}
}
}
// end::statefulproc[]
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer;
import io.quarkiverse.kafkastreamsprocessor.api.configuration.store.StoreConfiguration;

// tag::customconf[]
@Dependent
public class SampleConfigurationCustomizer implements ConfigurationCustomizer {
@Override
Expand All @@ -48,4 +47,3 @@ public void fillConfiguration(Configuration configuration) {
configuration.setStoreConfigurations(storeConfigurations);
}
}
// end::customconf[]

0 comments on commit 39e2ed6

Please sign in to comment.