From 7e0d8270537503bd9e568b7e453fa10a4623b8b0 Mon Sep 17 00:00:00 2001 From: Nikita Sokolov Date: Thu, 25 Feb 2021 16:51:13 +0300 Subject: [PATCH] SimpleFront --- .../flamestream/runtime/edge/SimpleFront.java | 108 ++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 runtime/src/main/java/com/spbsu/flamestream/runtime/edge/SimpleFront.java diff --git a/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/SimpleFront.java b/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/SimpleFront.java new file mode 100644 index 000000000..05ee13e79 --- /dev/null +++ b/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/SimpleFront.java @@ -0,0 +1,108 @@ +package com.spbsu.flamestream.runtime.edge; + +import com.spbsu.flamestream.core.data.PayloadDataItem; +import com.spbsu.flamestream.core.data.meta.GlobalTime; +import com.spbsu.flamestream.core.data.meta.Meta; +import com.spbsu.flamestream.runtime.FlameRuntime; +import com.spbsu.flamestream.runtime.master.acker.api.Heartbeat; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; + +public class SimpleFront implements Front { + private final CompletableFuture> consumers; + + public static class Handle { + private final Consumer consumer; + private final EdgeContext context; + private Meta basicMeta; + private int childId = 0; + + public Handle(ConcurrentMap>> consumer, EdgeContext context) { + try { + this.consumer = consumer.computeIfAbsent(context, __ -> new CompletableFuture<>()).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + this.context = context; + } + + public synchronized boolean onDataItem(long time, Object object) { + if (basicMeta != null && basicMeta.globalTime().time() > time) { + throw new IllegalArgumentException(); + } + boolean heartbeat = false; + if (basicMeta == null || basicMeta.globalTime().time() < time) { + final var globalTime = new GlobalTime(time, context.edgeId()); + consumer.accept(new Heartbeat(globalTime)); + basicMeta = new Meta(globalTime); + childId = 0; + heartbeat = true; + } + final var dataItem = new PayloadDataItem(new Meta(basicMeta, 0, childId++), object); + consumer.accept(dataItem); + return heartbeat; + } + + public synchronized void unregister() { + consumer.accept(new Heartbeat(new GlobalTime(Long.MAX_VALUE, context.edgeId()))); + } + } + + public static class Instance implements FlameRuntime.FrontInstance { + private final Map>> consumers; + + public Instance(Map>> consumers) {this.consumers = consumers;} + + @Override + public Class clazz() { + return SimpleFront.class; + } + + @Override + public Object[] params() { + return new Object[]{consumers}; + } + } + + public static class Type implements FlameRuntime.FrontType { + private final ConcurrentMap>> consumers + = new ConcurrentHashMap<>(); + + @Override + public Instance instance() { + return new Instance(consumers); + } + + @Override + public Handle handle(EdgeContext context) { + return new Handle(consumers, context); + } + } + + public SimpleFront( + EdgeContext edgeContext, + ConcurrentMap>> consumers + ) { + this.consumers = consumers.computeIfAbsent(edgeContext, __ -> new CompletableFuture<>()); + } + + @Override + public void onStart(Consumer consumer, GlobalTime from) { + if (!consumers.complete(consumer)) { + throw new IllegalStateException(); + } + } + + @Override + public void onRequestNext() { + } + + @Override + public void onCheckpoint(GlobalTime to) { + } +}