Skip to content

Commit

Permalink
feat(exporter): export all record and value types by default
Browse files Browse the repository at this point in the history
* export all record/value types if no type is explicitly listed
* all configuration entries can be overridden by environment variables with the same name in upper case
  • Loading branch information
saig0 committed Mar 2, 2020
1 parent 295de98 commit 23784bd
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 71 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ final ZeebeHazelcast zeebeHazelcast = ZeebeHazelcast.newBuilder(hz)

// ...

long sequence = zeebeHazelcast.getSequence();

zeebeHazelcast.close();
```

Expand Down Expand Up @@ -116,11 +114,11 @@ className = "io.zeebe.hazelcast.exporter.HazelcastExporter"
# Hazelcast port
port = 5701
# comma separated list of io.zeebe.protocol.record.ValueType
enabledValueTypes = "JOB,WORKFLOW_INSTANCE,DEPLOYMENT,INCIDENT"
# comma separated list of io.zeebe.protocol.record.ValueType to export or empty to export all types
enabledValueTypes = ""
# comma separated list of io.zeebe.protocol.record.RecordType
enabledRecordTypes = "EVENT"
# comma separated list of io.zeebe.protocol.record.RecordType to export or empty to export all types
enabledRecordTypes = ""
# Hazelcast ringbuffer's name
name = "zeebe"
Expand All @@ -135,6 +133,8 @@ className = "io.zeebe.hazelcast.exporter.HazelcastExporter"
format = "protobuf"
```

The values can be overridden by environment variables with the same name and a `ZEEBE_HAZELCAST_` prefix (e.g. `ZEEBE_HAZELCAST_PORT`).

## Build it from Source

The exporter and the Java connector can be built with Maven
Expand Down
28 changes: 14 additions & 14 deletions connector-java/src/test/java/io/zeebe/hazelcast/ExporterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ public void shouldExportDeploymentEvents() {

client.newDeployCommand().addWorkflowModel(WORKFLOW, "process.bpmn").send().join();

TestUtil.waitUntil(() -> events.size() >= 2);
TestUtil.waitUntil(() -> events.size() >= 4);

assertThat(events)
.hasSize(2)
.hasSize(4)
.extracting(r -> r.getMetadata().getIntent())
.containsExactly("CREATED", "DISTRIBUTED");
.containsExactly("CREATE", "CREATED", "DISTRIBUTE", "DISTRIBUTED");
}

@Test
Expand All @@ -112,12 +112,12 @@ public void shouldExportJobEvents() {
.send()
.join();

TestUtil.waitUntil(() -> events.size() >= 1);
TestUtil.waitUntil(() -> events.size() >= 2);

assertThat(events)
.hasSize(1)
.extracting(r -> r.getMetadata().getIntent())
.containsExactly("CREATED");
.hasSize(2)
.extracting(r -> r.getMetadata().getIntent())
.containsExactly("CREATE", "CREATED");
}

@Test
Expand All @@ -130,12 +130,12 @@ public void shouldExportIncidentEvents() {

client.newCreateInstanceCommand().bpmnProcessId("process").latestVersion().send().join();

TestUtil.waitUntil(() -> events.size() >= 1);
TestUtil.waitUntil(() -> events.size() >= 2);

assertThat(events)
.hasSize(1)
.hasSize(2)
.extracting(r -> r.getMetadata().getIntent())
.containsExactly("CREATED");
.containsExactly("CREATE", "CREATED");
}

@Test
Expand Down Expand Up @@ -181,7 +181,7 @@ public void shouldReadFromHead() throws Exception {
.build();

client.newDeployCommand().addWorkflowModel(WORKFLOW, "process.bpmn").send().join();
TestUtil.waitUntil(() -> deploymentRecords.size() >= 2);
TestUtil.waitUntil(() -> deploymentRecords.size() >= 4);

zeebeHazelcast.close();
deploymentRecords.clear();
Expand All @@ -198,7 +198,7 @@ public void shouldReadFromHead() throws Exception {
TestUtil.waitUntil(() -> wfRecords.size() >= 4);

// then
assertThat(deploymentRecords).hasSize(2);
assertThat(deploymentRecords).hasSize(4);
}

@Test
Expand All @@ -211,7 +211,7 @@ public void shouldReadFromTail() throws Exception {
ZeebeHazelcast.newBuilder(hz).addDeploymentListener(deploymentRecords::add).build();

client.newDeployCommand().addWorkflowModel(WORKFLOW, "process.bpmn").send().join();
TestUtil.waitUntil(() -> deploymentRecords.size() >= 2);
TestUtil.waitUntil(() -> deploymentRecords.size() >= 4);

zeebeHazelcast.close();
deploymentRecords.clear();
Expand Down Expand Up @@ -241,7 +241,7 @@ public void shouldReadFromSequence() throws Exception {
ZeebeHazelcast.newBuilder(hz).addDeploymentListener(deploymentRecords::add).build();

client.newDeployCommand().addWorkflowModel(WORKFLOW, "process.bpmn").send().join();
TestUtil.waitUntil(() -> deploymentRecords.size() >= 2);
TestUtil.waitUntil(() -> deploymentRecords.size() >= 4);

final var sequence = zeebeHazelcast.getSequence();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,54 @@
package io.zeebe.hazelcast.exporter;

import java.util.Optional;

public class ExporterConfiguration {

public int port = 5701;
private static final String ENV_PREFIX = "ZEEBE_HAZELCAST_";

private int port = 5701;

private String name = "zeebe";

private int capacity = -1;
private int timeToLiveInSeconds = -1;

private String format = "protobuf";

private String enabledValueTypes = "";
private String enabledRecordTypes = "";

public int getPort() {
return getEnv("PORT").map(Integer::parseInt).orElse(port);
}

public String getName() {
return getEnv("NAME").orElse(name);
}

public String name = "zeebe";
public int getCapacity() {
return getEnv("CAPACITY").map(Integer::parseInt).orElse(capacity);
}

public int capacity = -1;
public int timeToLiveInSeconds = -1;
public int getTimeToLiveInSeconds() {
return getEnv("TIME_TO_LIVE_IN_SECONDS").map(Integer::parseInt).orElse(timeToLiveInSeconds);
}

public String format = "protobuf";
public String getFormat() {
return getEnv("FORMAT").orElse(format);
}

public String enabledValueTypes = "JOB,WORKFLOW_INSTANCE,DEPLOYMENT,INCIDENT";
public String enabledRecordTypes = "EVENT";
public String getEnabledValueTypes() {
return getEnv("ENABLED_VALUE_TYPES").orElse(enabledValueTypes);
}

public String getEnabledRecordTypes() {
return getEnv("ENABLED_RECORD_TYPES").orElse(enabledRecordTypes);
}

private Optional<String> getEnv(String name) {
return Optional.ofNullable(System.getenv(ENV_PREFIX + name));
}

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,9 @@
import io.zeebe.exporter.proto.RecordTransformer;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.protocol.record.Record;
import io.zeebe.protocol.record.RecordType;
import io.zeebe.protocol.record.ValueType;
import org.slf4j.Logger;

import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class HazelcastExporter implements Exporter {

Expand All @@ -39,59 +33,39 @@ public void configure(Context context) {

logger.debug("Starting exporter with configuration: {}", config);

final List<RecordType> enabledRecordTypes =
parseList(config.enabledRecordTypes).map(RecordType::valueOf).collect(Collectors.toList());

final List<ValueType> enabledValueTypes =
parseList(config.enabledValueTypes).map(ValueType::valueOf).collect(Collectors.toList());

context.setFilter(
new Context.RecordFilter() {

@Override
public boolean acceptType(RecordType recordType) {
return enabledRecordTypes.contains(recordType);
}

@Override
public boolean acceptValue(ValueType valueType) {
return enabledValueTypes.contains(valueType);
}
});
final var filter = new HazelcastRecordFilter(config);
context.setFilter(filter);

configureFormat();
}

private void configureFormat() {
if (config.format.equalsIgnoreCase("protobuf")) {
final var format = config.getFormat();
if (format.equalsIgnoreCase("protobuf")) {
recordTransformer = this::recordToProtobuf;

} else if (config.format.equalsIgnoreCase("json")) {
} else if (format.equalsIgnoreCase("json")) {
recordTransformer = this::recordToJson;

} else {
throw new IllegalArgumentException(
String.format(
"Expected the parameter 'format' to be one fo 'protobuf' or 'json' but was '%s'",
config.format));
format));
}
}

private Stream<String> parseList(String list) {
return Arrays.stream(list.split(",")).map(String::trim).map(String::toUpperCase);
}

@Override
public void open(Controller controller) {
this.controller = controller;

final Config cfg = buildHazelcastConfig();
hazelcast = Hazelcast.newHazelcastInstance(cfg);

ringbuffer = hazelcast.getRingbuffer(config.name);
ringbuffer = hazelcast.getRingbuffer(config.getName());
if (ringbuffer == null) {
throw new IllegalStateException(
String.format("Failed to open ringbuffer with name '%s'", config.name));
String.format("Failed to open ringbuffer with name '%s'", config.getName()));
}

logger.info(
Expand All @@ -106,16 +80,16 @@ public void open(Controller controller) {
private Config buildHazelcastConfig() {

final Config cfg = new Config();
cfg.getNetworkConfig().setPort(config.port);
cfg.getNetworkConfig().setPort(config.getPort());
cfg.setProperty("hazelcast.logging.type", "slf4j");

final var ringbufferConfig = new RingbufferConfig(config.name);
final var ringbufferConfig = new RingbufferConfig(config.getName());

if (config.capacity > 0) {
ringbufferConfig.setCapacity(config.capacity);
if (config.getCapacity() > 0) {
ringbufferConfig.setCapacity(config.getCapacity());
}
if (config.timeToLiveInSeconds > 0) {
ringbufferConfig.setTimeToLiveSeconds(config.timeToLiveInSeconds);
if (config.getTimeToLiveInSeconds() > 0) {
ringbufferConfig.setTimeToLiveSeconds(config.getTimeToLiveInSeconds());
}

cfg.addRingBufferConfig(ringbufferConfig);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.zeebe.hazelcast.exporter;

import io.zeebe.exporter.api.context.Context;
import io.zeebe.protocol.record.RecordType;
import io.zeebe.protocol.record.ValueType;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public final class HazelcastRecordFilter implements Context.RecordFilter {

private final List<RecordType> enabledRecordTypes;
private final List<ValueType> enabledValueTypes;

public HazelcastRecordFilter(ExporterConfiguration config) {

final var enabledRecordTypeList = parseAsList(config.getEnabledRecordTypes());

enabledRecordTypes =
Arrays.stream(RecordType.values())
.filter(
recordType ->
enabledRecordTypeList.isEmpty()
|| enabledRecordTypeList.contains(recordType.name()))
.collect(Collectors.toList());

final var enabledValueTypeList = parseAsList(config.getEnabledValueTypes());

enabledValueTypes =
Arrays.stream(ValueType.values())
.filter(
valueType ->
enabledValueTypeList.isEmpty()
|| enabledValueTypeList.contains(valueType.name()))
.collect(Collectors.toList());
}

private List<String> parseAsList(String list) {
return Arrays.stream(list.split(","))
.map(String::trim)
.filter(item -> !item.isEmpty())
.map(String::toUpperCase)
.collect(Collectors.toList());
}

@Override
public boolean acceptType(RecordType recordType) {
return enabledRecordTypes.contains(recordType);
}

@Override
public boolean acceptValue(ValueType valueType) {
return enabledValueTypes.contains(valueType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void cleanUp() {
@Test
public void shouldExportEventsAsProtobuf() throws Exception {
// given
final Ringbuffer<byte[]> buffer = hz.getRingbuffer(CONFIGURATION.name);
final Ringbuffer<byte[]> buffer = hz.getRingbuffer(CONFIGURATION.getName());

var sequence = buffer.headSequence();

Expand All @@ -72,7 +72,7 @@ public void shouldExportEventsAsProtobuf() throws Exception {
.startsWith("{")
.endsWith("}")
.contains("\"valueType\":\"DEPLOYMENT\"")
.contains("\"recordType\":\"EVENT\"")
.contains("\"intent\":\"CREATED\"");
.contains("\"recordType\":\"COMMAND\"")
.contains("\"intent\":\"CREATE\"");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void cleanUp() {
@Test
public void shouldExportEventsAsProtobuf() throws Exception {
// given
final Ringbuffer<byte[]> buffer = hz.getRingbuffer(CONFIGURATION.name);
final Ringbuffer<byte[]> buffer = hz.getRingbuffer(CONFIGURATION.getName());

var sequence = buffer.headSequence();

Expand Down

0 comments on commit 23784bd

Please sign in to comment.