From 310202b5f9304eeb8a5ec7ded1ff7c64bff4e378 Mon Sep 17 00:00:00 2001 From: ryans Date: Mon, 8 Jul 2024 12:53:08 -0400 Subject: [PATCH] Use Google Java format --- .gitattributes | 4 +- build.gradle | 7 + .../org/jlab/jaws/Registrations2Epics.java | 336 +++++++++--------- .../java/org/jlab/jaws/util/CreateTopic.java | 64 ++-- .../java/org/jlab/jaws/util/ListTopic.java | 49 ++- .../jlab/jaws/Registrations2EpicsTest.java | 107 +++--- 6 files changed, 300 insertions(+), 267 deletions(-) diff --git a/.gitattributes b/.gitattributes index 4e7efec..a4193d6 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1,3 +1,3 @@ -* text=auto -*.sh text eol=lf +* text eol=lf +*.png binary *.bat text eol=crlf \ No newline at end of file diff --git a/build.gradle b/build.gradle index f5d6037..a66cbee 100644 --- a/build.gradle +++ b/build.gradle @@ -3,6 +3,7 @@ plugins { id 'application' id 'distribution' id 'com.github.jk1.dependency-license-report' version '2.0' + id "com.diffplug.spotless" version "6.25.0" } group 'org.jlab' @@ -82,4 +83,10 @@ startScripts { windowsScriptFile.text = windowsScriptFile.text.replace('%APP_HOME%\\lib\\config', '%APP_HOME%\\config') unixScriptFile.text = unixScriptFile.text.replace('$APP_HOME/lib/config', '$APP_HOME/config') } +} + +spotless { + java { + googleJavaFormat() + } } \ No newline at end of file diff --git a/src/main/java/org/jlab/jaws/Registrations2Epics.java b/src/main/java/org/jlab/jaws/Registrations2Epics.java index 2e915cf..cd24a8f 100644 --- a/src/main/java/org/jlab/jaws/Registrations2Epics.java +++ b/src/main/java/org/jlab/jaws/Registrations2Epics.java @@ -1,222 +1,238 @@ package org.jlab.jaws; +import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; + import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.CountDownLatch; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; -import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; -import org.jlab.jaws.entity.EPICSSource; import org.jlab.jaws.entity.AlarmInstance; +import org.jlab.jaws.entity.EPICSSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.nio.charset.StandardCharsets; -import java.util.*; -import java.util.concurrent.CountDownLatch; - -import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; - /** * A Kafka Streams application to populate the epics2kafka epics-channels topic from the JAWS * effective-registrations topic for the subset of messages of type EPICSProducer. */ public final class Registrations2Epics { - private static final Logger LOGGER = LoggerFactory.getLogger(Registrations2Epics.class); + private static final Logger LOGGER = LoggerFactory.getLogger(Registrations2Epics.class); - // TODO: these need to be configurable - public static final String INPUT_TOPIC = "alarm-instances"; - public static final String OUTPUT_TOPIC = "epics-channels"; + // TODO: these need to be configurable + public static final String INPUT_TOPIC = "alarm-instances"; + public static final String OUTPUT_TOPIC = "epics-channels"; - public static final Serde INPUT_KEY_SERDE = Serdes.String(); - public static final SpecificAvroSerde INPUT_VALUE_SERDE = new SpecificAvroSerde<>(); - public static final Serde OUTPUT_KEY_SERDE = INPUT_KEY_SERDE; - public static final Serde OUTPUT_VALUE_SERDE = INPUT_KEY_SERDE; + public static final Serde INPUT_KEY_SERDE = Serdes.String(); + public static final SpecificAvroSerde INPUT_VALUE_SERDE = + new SpecificAvroSerde<>(); + public static final Serde OUTPUT_KEY_SERDE = INPUT_KEY_SERDE; + public static final Serde OUTPUT_VALUE_SERDE = INPUT_KEY_SERDE; - static Properties getStreamsConfig() { + static Properties getStreamsConfig() { - LOGGER.trace("getStreamConfig()"); + LOGGER.trace("getStreamConfig()"); - String bootstrapServers = System.getenv("BOOTSTRAP_SERVERS"); + String bootstrapServers = System.getenv("BOOTSTRAP_SERVERS"); - bootstrapServers = (bootstrapServers == null) ? "localhost:9092" : bootstrapServers; + bootstrapServers = (bootstrapServers == null) ? "localhost:9092" : bootstrapServers; - String registry = System.getenv("SCHEMA_REGISTRY"); + String registry = System.getenv("SCHEMA_REGISTRY"); - registry = (registry == null) ? "http://localhost:8081" : registry; + registry = (registry == null) ? "http://localhost:8081" : registry; - final Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "registrations2epics"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); // Disable caching - props.put(SCHEMA_REGISTRY_URL_CONFIG, registry); + final Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "registrations2epics"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); // Disable caching + props.put(SCHEMA_REGISTRY_URL_CONFIG, registry); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - return props; - } + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return props; + } - /** - * Create the Kafka Streams Domain Specific Language (DSL) Topology. - * - * @param props The streams configuration - * @return The Topology - */ - static Topology createTopology(Properties props) { + /** + * Create the Kafka Streams Domain Specific Language (DSL) Topology. + * + * @param props The streams configuration + * @return The Topology + */ + static Topology createTopology(Properties props) { - LOGGER.trace("createTopology()"); + LOGGER.trace("createTopology()"); - final StreamsBuilder builder = new StreamsBuilder(); - Map config = new HashMap<>(); - config.put(SCHEMA_REGISTRY_URL_CONFIG, props.getProperty(SCHEMA_REGISTRY_URL_CONFIG)); - INPUT_VALUE_SERDE.configure(config, false); + final StreamsBuilder builder = new StreamsBuilder(); + Map config = new HashMap<>(); + config.put(SCHEMA_REGISTRY_URL_CONFIG, props.getProperty(SCHEMA_REGISTRY_URL_CONFIG)); + INPUT_VALUE_SERDE.configure(config, false); - final StoreBuilder> storeBuilder = Stores.keyValueStoreBuilder( + final StoreBuilder> storeBuilder = + Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("Registrations2EpicsStore"), INPUT_KEY_SERDE, - INPUT_VALUE_SERDE - ).withCachingEnabled(); - - builder.addStateStore(storeBuilder); - - final KStream input = builder.stream(INPUT_TOPIC, Consumed.with(INPUT_KEY_SERDE, INPUT_VALUE_SERDE)); - - final KStream output = input.process(new MyProcessorSupplier(storeBuilder.name()), storeBuilder.name()); - - output.to(OUTPUT_TOPIC, Produced.with(OUTPUT_KEY_SERDE, OUTPUT_VALUE_SERDE)); - - return builder.build(); + INPUT_VALUE_SERDE) + .withCachingEnabled(); + + builder.addStateStore(storeBuilder); + + final KStream input = + builder.stream(INPUT_TOPIC, Consumed.with(INPUT_KEY_SERDE, INPUT_VALUE_SERDE)); + + final KStream output = + input.process(new MyProcessorSupplier(storeBuilder.name()), storeBuilder.name()); + + output.to(OUTPUT_TOPIC, Produced.with(OUTPUT_KEY_SERDE, OUTPUT_VALUE_SERDE)); + + return builder.build(); + } + + private static String toJsonKey(String channel) { + return "{\"topic\":\"alarm-activations\",\"channel\":\"" + channel + "\"}"; + } + + private static String toJsonValue(String outkey, AlarmInstance registration) { + return registration == null ? null : "{\"mask\":\"a\",\"outkey\":\"" + outkey + "\"}"; + } + + /** + * Entrypoint of the application. + * + * @param args The command line arguments + */ + public static void main(String[] args) { + final Properties props = getStreamsConfig(); + final Topology top = createTopology(props); + final KafkaStreams streams = new KafkaStreams(top, props); + final CountDownLatch latch = new CountDownLatch(1); + + // attach shutdown handler to catch control-c + Runtime.getRuntime() + .addShutdownHook( + new Thread("streams-shutdown-hook") { + @Override + public void run() { + streams.close(); + latch.countDown(); + } + }); + + try { + streams.start(); + latch.await(); + } catch (final Throwable e) { + System.exit(1); } + System.exit(0); + } - private static String toJsonKey(String channel) { - return "{\"topic\":\"alarm-activations\",\"channel\":\"" + channel + "\"}"; - } + /** + * Factory to create Kafka Streams Transformer instances; references a stateStore to maintain + * previous AlarmInstances. + */ + private static final class MyProcessorSupplier + implements ProcessorSupplier { - private static String toJsonValue(String outkey, AlarmInstance registration) { - return registration == null ? null : "{\"mask\":\"a\",\"outkey\":\"" + outkey + "\"}"; - } + private final String storeName; /** - * Entrypoint of the application. + * Create a new MsgTransformerFactory. * - * @param args The command line arguments + * @param storeName The state store name */ - public static void main(String[] args) { - final Properties props = getStreamsConfig(); - final Topology top = createTopology(props); - final KafkaStreams streams = new KafkaStreams(top, props); - final CountDownLatch latch = new CountDownLatch(1); - - // attach shutdown handler to catch control-c - Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { - @Override - public void run() { - streams.close(); - latch.countDown(); - } - }); - - try { - streams.start(); - latch.await(); - } catch (final Throwable e) { - System.exit(1); - } - System.exit(0); + public MyProcessorSupplier(String storeName) { + this.storeName = storeName; } /** - * Factory to create Kafka Streams Transformer instances; references a stateStore to maintain previous - * AlarmInstances. + * Return a new {@link Transformer} instance. + * + * @return a new {@link Transformer} instance */ - private static final class MyProcessorSupplier implements ProcessorSupplier { + @Override + public Processor get() { + return new Processor() { + private ProcessorContext context; + private KeyValueStore store; - private final String storeName; + @Override + public void init(ProcessorContext context) { + this.context = context; + this.store = context.getStateStore(storeName); + } - /** - * Create a new MsgTransformerFactory. - * - * @param storeName The state store name - */ - public MyProcessorSupplier(String storeName) { - this.storeName = storeName; + @Override + public void process(Record input) { + Record output = + null; // null returned to mean no record - when not of type DirectCAAlarm OR when an + // unmatched tombstone is encountered + + long timestamp = System.currentTimeMillis(); + + String channel; + + if (input.value() + == null) { // Tombstone - we need most recent non-null registration to transform + AlarmInstance previous = store.get(input.key()); + if (previous != null) { // We only store EPICSProducer, so no need to check type + channel = ((EPICSSource) previous.getSource()).getPv(); + output = + new Record<>( + toJsonKey(channel), toJsonValue(input.key(), input.value()), timestamp); + populateHeaders(output); + } + } else if (input.value().getSource() instanceof EPICSSource) { + channel = ((EPICSSource) input.value().getSource()).getPv(); + output = + new Record<>( + toJsonKey(channel), toJsonValue(input.key(), input.value()), timestamp); + populateHeaders(output); + store.put( + input.key(), + input.value()); // Store most recent non-null registration for each CA alarm (key) + } + + LOGGER.trace("Transformed: {}={} -> {}", input.key(), input.value(), output); + + if (output != null) { + context.forward(output); + } } - /** - * Return a new {@link Transformer} instance. - * - * @return a new {@link Transformer} instance - */ @Override - public Processor get() { - return new Processor() { - private ProcessorContext context; - private KeyValueStore store; - - @Override - public void init(ProcessorContext context) { - this.context = context; - this.store = context.getStateStore(storeName); - } - - @Override - public void process(Record input) { - Record output = null; // null returned to mean no record - when not of type DirectCAAlarm OR when an unmatched tombstone is encountered - - long timestamp = System.currentTimeMillis(); - - String channel; - - if(input.value() == null) { // Tombstone - we need most recent non-null registration to transform - AlarmInstance previous = store.get(input.key()); - if(previous != null) { // We only store EPICSProducer, so no need to check type - channel = ((EPICSSource)previous.getSource()).getPv(); - output = new Record<>(toJsonKey(channel), toJsonValue(input.key(), input.value()), timestamp); - populateHeaders(output); - } - } else if(input.value().getSource() instanceof EPICSSource) { - channel = ((EPICSSource) input.value().getSource()).getPv(); - output = new Record<>(toJsonKey(channel), toJsonValue(input.key(), input.value()), timestamp); - populateHeaders(output); - store.put(input.key(), input.value()); // Store most recent non-null registration for each CA alarm (key) - } - - LOGGER.trace("Transformed: {}={} -> {}", input.key(), input.value(), output); - - if(output != null) { - context.forward(output); - } - } - - @Override - public void close() { - // Nothing to do - } - }; + public void close() { + // Nothing to do } + }; + } - private void populateHeaders(Record record) { - String host = "unknown"; + private void populateHeaders(Record record) { + String host = "unknown"; - try { - host = InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - LOGGER.debug("Unable to obtain host name"); - } + try { + host = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOGGER.debug("Unable to obtain host name"); + } - record.headers().add("user", System.getProperty("user.name").getBytes(StandardCharsets.UTF_8)); - record.headers().add("producer", "registrations2epics".getBytes(StandardCharsets.UTF_8)); - record.headers().add("host", host.getBytes(StandardCharsets.UTF_8)); - } + record + .headers() + .add("user", System.getProperty("user.name").getBytes(StandardCharsets.UTF_8)); + record.headers().add("producer", "registrations2epics".getBytes(StandardCharsets.UTF_8)); + record.headers().add("host", host.getBytes(StandardCharsets.UTF_8)); } + } } diff --git a/src/main/java/org/jlab/jaws/util/CreateTopic.java b/src/main/java/org/jlab/jaws/util/CreateTopic.java index a0b834b..f7647e1 100644 --- a/src/main/java/org/jlab/jaws/util/CreateTopic.java +++ b/src/main/java/org/jlab/jaws/util/CreateTopic.java @@ -1,50 +1,54 @@ package org.jlab.jaws.util; -import org.apache.kafka.clients.admin.*; -import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.config.ConfigResource; -import org.apache.kafka.common.config.TopicConfig; - import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; +import org.apache.kafka.clients.admin.*; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; public class CreateTopic { - public static void main(String[] args) throws ExecutionException, InterruptedException { + public static void main(String[] args) throws ExecutionException, InterruptedException { - String bootstrapServers = System.getenv("BOOTSTRAP_SERVERS"); + String bootstrapServers = System.getenv("BOOTSTRAP_SERVERS"); - if(bootstrapServers == null) { - bootstrapServers = "localhost:9094"; - } + if (bootstrapServers == null) { + bootstrapServers = "localhost:9094"; + } - System.out.println("Using BOOTSTRAP_SERVERS = " + bootstrapServers); + System.out.println("Using BOOTSTRAP_SERVERS = " + bootstrapServers); - Map conf = new HashMap<>(); - conf.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - conf.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); - AdminClient client = AdminClient.create(conf); + Map conf = new HashMap<>(); + conf.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + conf.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); + AdminClient client = AdminClient.create(conf); - int partitions = 1; - short replicationFactor = 1; + int partitions = 1; + short replicationFactor = 1; - KafkaFuture future = client - .createTopics(Collections.singleton(new NewTopic("epics-channels", partitions, replicationFactor)), - new CreateTopicsOptions().timeoutMs(10000)) - .all(); - future.get(); + KafkaFuture future = + client + .createTopics( + Collections.singleton( + new NewTopic("epics-channels", partitions, replicationFactor)), + new CreateTopicsOptions().timeoutMs(10000)) + .all(); + future.get(); - ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "epics-channels"); + ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "epics-channels"); - ConfigEntry entry = new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT); - Map> updateConfig = new HashMap<>(); - updateConfig.put(resource, Collections.singleton(new AlterConfigOp(entry, AlterConfigOp.OpType.SET))); + ConfigEntry entry = + new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT); + Map> updateConfig = new HashMap<>(); + updateConfig.put( + resource, Collections.singleton(new AlterConfigOp(entry, AlterConfigOp.OpType.SET))); - AlterConfigsResult alterConfigsResult = client.incrementalAlterConfigs(updateConfig); - alterConfigsResult.all(); + AlterConfigsResult alterConfigsResult = client.incrementalAlterConfigs(updateConfig); + alterConfigsResult.all(); - System.out.println("Topic 'epics-channels' created"); - } + System.out.println("Topic 'epics-channels' created"); + } } diff --git a/src/main/java/org/jlab/jaws/util/ListTopic.java b/src/main/java/org/jlab/jaws/util/ListTopic.java index 0e0b056..c1030fc 100644 --- a/src/main/java/org/jlab/jaws/util/ListTopic.java +++ b/src/main/java/org/jlab/jaws/util/ListTopic.java @@ -1,40 +1,39 @@ package org.jlab.jaws.util; -import org.apache.kafka.clients.admin.*; -import org.apache.kafka.common.KafkaFuture; - import java.util.*; import java.util.concurrent.ExecutionException; +import org.apache.kafka.clients.admin.*; +import org.apache.kafka.common.KafkaFuture; public class ListTopic { - public static void main(String[] args) throws ExecutionException, InterruptedException { - - String bootstrapServers = System.getenv("BOOTSTRAP_SERVERS"); + public static void main(String[] args) throws ExecutionException, InterruptedException { - if(bootstrapServers == null) { - bootstrapServers = "localhost:9094"; - } + String bootstrapServers = System.getenv("BOOTSTRAP_SERVERS"); - System.err.println("Using BOOTSTRAP_SERVERS = " + bootstrapServers); + if (bootstrapServers == null) { + bootstrapServers = "localhost:9094"; + } - Map conf = new HashMap<>(); - conf.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - conf.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); - AdminClient client = AdminClient.create(conf); + System.err.println("Using BOOTSTRAP_SERVERS = " + bootstrapServers); - ListTopicsResult ltr = client.listTopics(); - KafkaFuture> names = ltr.names(); - Set nameSet = names.get(); + Map conf = new HashMap<>(); + conf.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + conf.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); + AdminClient client = AdminClient.create(conf); - System.out.println(nameSet); + ListTopicsResult ltr = client.listTopics(); + KafkaFuture> names = ltr.names(); + Set nameSet = names.get(); - for(String name: nameSet) { - if("alarm-instances".equals(name)) { - System.out.println("Topic 'alarm-instances' exists!"); - System.exit(0); - } - } + System.out.println(nameSet); - System.exit(1); + for (String name : nameSet) { + if ("alarm-instances".equals(name)) { + System.out.println("Topic 'alarm-instances' exists!"); + System.exit(0); + } } + + System.exit(1); + } } diff --git a/src/test/java/org/jlab/jaws/Registrations2EpicsTest.java b/src/test/java/org/jlab/jaws/Registrations2EpicsTest.java index afb8f07..c3640a4 100644 --- a/src/test/java/org/jlab/jaws/Registrations2EpicsTest.java +++ b/src/test/java/org/jlab/jaws/Registrations2EpicsTest.java @@ -1,5 +1,9 @@ package org.jlab.jaws; +import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; + +import java.util.Arrays; +import java.util.Properties; import org.apache.kafka.streams.*; import org.jlab.jaws.entity.*; import org.junit.After; @@ -7,64 +11,67 @@ import org.junit.Before; import org.junit.Test; -import java.util.Arrays; -import java.util.Properties; - -import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; - public class Registrations2EpicsTest { - private TopologyTestDriver testDriver; - private TestInputTopic inputTopic; - private TestOutputTopic outputTopic; - private AlarmInstance instance = new AlarmInstance(); + private TopologyTestDriver testDriver; + private TestInputTopic inputTopic; + private TestOutputTopic outputTopic; + private AlarmInstance instance = new AlarmInstance(); - @Before - public void setup() { - final Properties streamsConfig = Registrations2Epics.getStreamsConfig(); - streamsConfig.put(SCHEMA_REGISTRY_URL_CONFIG, "mock://testing"); - final Topology top = Registrations2Epics.createTopology(streamsConfig); - testDriver = new TopologyTestDriver(top, streamsConfig); + @Before + public void setup() { + final Properties streamsConfig = Registrations2Epics.getStreamsConfig(); + streamsConfig.put(SCHEMA_REGISTRY_URL_CONFIG, "mock://testing"); + final Topology top = Registrations2Epics.createTopology(streamsConfig); + testDriver = new TopologyTestDriver(top, streamsConfig); - // setup test topics - inputTopic = testDriver.createInputTopic(Registrations2Epics.INPUT_TOPIC, Registrations2Epics.INPUT_KEY_SERDE.serializer(), Registrations2Epics.INPUT_VALUE_SERDE.serializer()); - outputTopic = testDriver.createOutputTopic(Registrations2Epics.OUTPUT_TOPIC, Registrations2Epics.OUTPUT_KEY_SERDE.deserializer(), Registrations2Epics.OUTPUT_VALUE_SERDE.deserializer()); + // setup test topics + inputTopic = + testDriver.createInputTopic( + Registrations2Epics.INPUT_TOPIC, + Registrations2Epics.INPUT_KEY_SERDE.serializer(), + Registrations2Epics.INPUT_VALUE_SERDE.serializer()); + outputTopic = + testDriver.createOutputTopic( + Registrations2Epics.OUTPUT_TOPIC, + Registrations2Epics.OUTPUT_KEY_SERDE.deserializer(), + Registrations2Epics.OUTPUT_VALUE_SERDE.deserializer()); - EPICSSource source = new EPICSSource(); - source.setPv("channel1"); + EPICSSource source = new EPICSSource(); + source.setPv("channel1"); - instance.setSource(source); - instance.setAlarmclass("base"); - instance.setLocation(Arrays.asList("INJ")); - instance.setScreencommand("/"); - } + instance.setSource(source); + instance.setAlarmclass("base"); + instance.setLocation(Arrays.asList("INJ")); + instance.setScreencommand("/"); + } - @After - public void tearDown() { - if(testDriver != null) { - testDriver.close(); - } + @After + public void tearDown() { + if (testDriver != null) { + testDriver.close(); } + } - @Test - public void matchedTombstoneMsg() { - inputTopic.pipeInput("alarm1", instance); - inputTopic.pipeInput("alarm1", null); - KeyValue result = outputTopic.readKeyValuesToList().get(1); - Assert.assertNull(result.value); - } + @Test + public void matchedTombstoneMsg() { + inputTopic.pipeInput("alarm1", instance); + inputTopic.pipeInput("alarm1", null); + KeyValue result = outputTopic.readKeyValuesToList().get(1); + Assert.assertNull(result.value); + } + @Test + public void unmatchedTombstoneMsg() { + inputTopic.pipeInput("alarm1", null); + Assert.assertTrue( + outputTopic.isEmpty()); // Cannot transform a tombstone without a prior registration! + } - @Test - public void unmatchedTombstoneMsg() { - inputTopic.pipeInput("alarm1", null); - Assert.assertTrue(outputTopic.isEmpty()); // Cannot transform a tombstone without a prior registration! - } - - @Test - public void regularMsg() { - inputTopic.pipeInput("alarm1", instance); - KeyValue result = outputTopic.readKeyValue(); - Assert.assertEquals("{\"topic\":\"alarm-activations\",\"channel\":\"channel1\"}", result.key); - Assert.assertEquals("{\"mask\":\"a\",\"outkey\":\"alarm1\"}", result.value); - } + @Test + public void regularMsg() { + inputTopic.pipeInput("alarm1", instance); + KeyValue result = outputTopic.readKeyValue(); + Assert.assertEquals("{\"topic\":\"alarm-activations\",\"channel\":\"channel1\"}", result.key); + Assert.assertEquals("{\"mask\":\"a\",\"outkey\":\"alarm1\"}", result.value); + } }