Skip to content

Commit

Permalink
Bump libj to v5.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
slominskir committed Aug 27, 2024
1 parent 6c565d5 commit ddac183
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 17 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dependencies {
"io.confluent:kafka-streams-avro-serde:7.4.0",
"org.apache.avro:avro:1.11.2",
"org.slf4j:slf4j-log4j12:1.7.36",
"org.jlab:jaws-libj:4.8.0"
"org.jlab:jaws-libj:5.0.0"
testImplementation "org.apache.kafka:kafka-streams-test-utils:3.5.0",
"junit:junit:4.13.2"
}
Expand Down
2 changes: 1 addition & 1 deletion deps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ services:
- SCHEMA_REGISTRY_KAFKA_BROKERS=PLAINTEXT://kafka:9092

cli:
image: jeffersonlab/jaws-libp:4.9.2
image: jeffersonlab/jaws-libp:5.0.0
hostname: cli
container_name: cli
depends_on:
Expand Down
23 changes: 11 additions & 12 deletions src/main/java/org/jlab/jaws/Registrations2Epics.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.jlab.jaws.entity.AlarmInstance;
import org.jlab.jaws.entity.Alarm;
import org.jlab.jaws.entity.EPICSSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,8 +38,7 @@ public final class Registrations2Epics {
public static final String OUTPUT_TOPIC = "epics-channels";

public static final Serde<String> INPUT_KEY_SERDE = Serdes.String();
public static final SpecificAvroSerde<AlarmInstance> INPUT_VALUE_SERDE =
new SpecificAvroSerde<>();
public static final SpecificAvroSerde<Alarm> INPUT_VALUE_SERDE = new SpecificAvroSerde<>();
public static final Serde<String> OUTPUT_KEY_SERDE = INPUT_KEY_SERDE;
public static final Serde<String> OUTPUT_VALUE_SERDE = INPUT_KEY_SERDE;

Expand Down Expand Up @@ -80,7 +79,7 @@ static Topology createTopology(Properties props) {
config.put(SCHEMA_REGISTRY_URL_CONFIG, props.getProperty(SCHEMA_REGISTRY_URL_CONFIG));
INPUT_VALUE_SERDE.configure(config, false);

final StoreBuilder<KeyValueStore<String, AlarmInstance>> storeBuilder =
final StoreBuilder<KeyValueStore<String, Alarm>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("Registrations2EpicsStore"),
INPUT_KEY_SERDE,
Expand All @@ -89,7 +88,7 @@ static Topology createTopology(Properties props) {

builder.addStateStore(storeBuilder);

final KStream<String, AlarmInstance> input =
final KStream<String, Alarm> input =
builder.stream(INPUT_TOPIC, Consumed.with(INPUT_KEY_SERDE, INPUT_VALUE_SERDE));

final KStream<String, String> output =
Expand All @@ -104,7 +103,7 @@ private static String toJsonKey(String channel) {
return "{\"topic\":\"alarm-activations\",\"channel\":\"" + channel + "\"}";
}

private static String toJsonValue(String outkey, AlarmInstance registration) {
private static String toJsonValue(String outkey, Alarm registration) {
return registration == null ? null : "{\"mask\":\"a\",\"outkey\":\"" + outkey + "\"}";
}

Expand Down Expand Up @@ -144,7 +143,7 @@ public void run() {
* previous AlarmInstances.
*/
private static final class MyProcessorSupplier
implements ProcessorSupplier<String, AlarmInstance, String, String> {
implements ProcessorSupplier<String, Alarm, String, String> {

private final String storeName;

Expand All @@ -163,10 +162,10 @@ public MyProcessorSupplier(String storeName) {
* @return a new {@link Transformer} instance
*/
@Override
public Processor<String, AlarmInstance, String, String> get() {
return new Processor<String, AlarmInstance, String, String>() {
public Processor<String, Alarm, String, String> get() {
return new Processor<String, Alarm, String, String>() {
private ProcessorContext<String, String> context;
private KeyValueStore<String, AlarmInstance> store;
private KeyValueStore<String, Alarm> store;

@Override
public void init(ProcessorContext<String, String> context) {
Expand All @@ -175,7 +174,7 @@ public void init(ProcessorContext<String, String> context) {
}

@Override
public void process(Record<String, AlarmInstance> input) {
public void process(Record<String, Alarm> input) {
Record<String, String> output =
null; // null returned to mean no record - when not of type DirectCAAlarm OR when an
// unmatched tombstone is encountered
Expand All @@ -186,7 +185,7 @@ public void process(Record<String, AlarmInstance> input) {

if (input.value()
== null) { // Tombstone - we need most recent non-null registration to transform
AlarmInstance previous = store.get(input.key());
Alarm previous = store.get(input.key());
if (previous != null) { // We only store EPICSProducer, so no need to check type
channel = ((EPICSSource) previous.getSource()).getPv();
output =
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/org/jlab/jaws/Registrations2EpicsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

public class Registrations2EpicsTest {
private TopologyTestDriver testDriver;
private TestInputTopic<String, AlarmInstance> inputTopic;
private TestInputTopic<String, Alarm> inputTopic;
private TestOutputTopic<String, String> outputTopic;
private AlarmInstance instance = new AlarmInstance();
private Alarm instance = new Alarm();

@Before
public void setup() {
Expand All @@ -40,7 +40,7 @@ public void setup() {
source.setPv("channel1");

instance.setSource(source);
instance.setAlarmclass("base");
instance.setAction("base");
instance.setLocation(Arrays.asList("INJ"));
instance.setScreencommand("/");
}
Expand Down

0 comments on commit ddac183

Please sign in to comment.