Skip to content

Commit

Permalink
TimingsRearType
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Sokolov committed Feb 23, 2021
1 parent a93d860 commit c356a64
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 10 deletions.
10 changes: 10 additions & 0 deletions core/src/main/java/com/spbsu/flamestream/core/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@

import com.spbsu.flamestream.core.data.meta.GlobalTime;

import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;

public interface Batch {
GlobalTime time();

Stream<DataItem> payload();

Map<Long, Instant> lastGlobalTimeProcessedAt();

enum Default implements Batch {
EMPTY;

Expand All @@ -21,5 +26,10 @@ public GlobalTime time() {
public Stream<DataItem> payload() {
return Stream.empty();
}

@Override
public Map<Long, Instant> lastGlobalTimeProcessedAt() {
return Collections.emptyMap();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
import com.spbsu.flamestream.core.Batch;
import com.spbsu.flamestream.runtime.FlameRuntime;
import com.spbsu.flamestream.runtime.edge.EdgeContext;
import scala.Tuple2;

import java.util.List;
import java.text.NumberFormat;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;

public class LatencyRearType implements FlameRuntime.RearType<LatencyRearType.Rear, LatencyRearType.Handle> {
public class TimingsRearType implements FlameRuntime.RearType<TimingsRearType.Rear, TimingsRearType.Handle> {
final ConcurrentHashMap<EdgeContext, CompletableFuture<?>> edgeContextDone = new ConcurrentHashMap<>();

public class Instance implements FlameRuntime.RearInstance<Rear> {
Expand All @@ -22,15 +24,15 @@ public Class<Rear> clazz() {

@Override
public Object[] params() {
return new Object[]{LatencyRearType.this};
return new Object[]{TimingsRearType.this};
}
}

public static class Rear implements com.spbsu.flamestream.runtime.edge.Rear {
private final EdgeContext edgeContext;
private final LatencyRearType type;
private final TimingsRearType type;

public Rear(EdgeContext edgeContext, LatencyRearType type) {
public Rear(EdgeContext edgeContext, TimingsRearType type) {
this.edgeContext = edgeContext;
this.type = type;
}
Expand All @@ -39,6 +41,17 @@ public Rear(EdgeContext edgeContext, LatencyRearType type) {

@Override
public CompletionStage<?> accept(Batch batch) {
final var now = Instant.now();
if (batch.time().time() < Long.MAX_VALUE) {
System.out.println(
withDigitSeparators(batch.time().time() - 10) + " seconds window timings"
+ ": processed = " + formattedLatencyNanos(
batch.lastGlobalTimeProcessedAt().get(batch.time().time() - 10),
now
)
+ ", notified = " + formattedLatencyNanos(Instant.ofEpochSecond(batch.time().time() - 10), now)
);
}
last = batch;
if (batch.time().time() == Long.MAX_VALUE) {
type.edgeContextDone.computeIfAbsent(edgeContext, __ -> new CompletableFuture<>()).complete(null);
Expand All @@ -50,6 +63,14 @@ public CompletionStage<?> accept(Batch batch) {
public Batch last() {
return last;
}

private static String formattedLatencyNanos(Instant from, Instant to) {
return NumberFormat.getNumberInstance(Locale.US).format(from.until(to, ChronoUnit.NANOS));
}

private static String withDigitSeparators(long nanos) {
return NumberFormat.getNumberInstance(Locale.US).format(nanos);
}
}

public class Handle {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void test() throws Exception {
))
.build()) {
try (final FlameRuntime.Flame flame = runtime.run(graph)) {
final var rears = flame.attachRear("rear", new LatencyRearType()).collect(Collectors.toList());
final var rears = flame.attachRear("rear", new TimingsRearType()).collect(Collectors.toList());

final var handles = flame
.attachFront("front", new GeneratorFrontType(nexmarkConfiguration))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -381,6 +384,9 @@ private void inject(AddressedItem addressedItem) {
localCall(item, addressedItem.destination());
ack(item, wrappedJobas.get(addressedItem.destination()).vertex);
injectOutTracer.log(item.xor());
if (wrappedSinkJoba != null) {
wrappedSinkJoba.joba.touch(item.meta().globalTime().time());
}
}

private void ack(DataItem dataItem, Graph.Vertex to) {
Expand Down Expand Up @@ -411,6 +417,9 @@ private void accept(DataItem item) {
} else {
throw new IllegalStateException("Source doesn't belong to this component");
}
if (wrappedSinkJoba != null) {
wrappedSinkJoba.joba.touch(item.meta().globalTime().time());
}
}

private void onNewRear(NewRear attachRear) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
import com.spbsu.flamestream.runtime.utils.FlameConfig;
import com.spbsu.flamestream.runtime.utils.tracing.Tracing;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;

Expand All @@ -29,6 +32,7 @@ public class SinkJoba extends Joba {
private final int sinkTrackingComponent;
private final InvalidatingBucket invalidatingBucket = new ArrayInvalidatingBucket();
private final Map<ActorRef, GlobalTime> rears = new HashMap<>();
private NavigableMap<Long, Instant> lastGlobalTimeProcessedAt = new TreeMap<>();

private final Tracing.Tracer barrierReceiveTracer = Tracing.TRACING.forEvent("barrier-receive");
private final Tracing.Tracer barrierSendTracer = Tracing.TRACING.forEvent("barrier-send");
Expand All @@ -48,7 +52,7 @@ public void accept(DataItem item, Sink sink) {
if (barrierDisabled) {
rears.forEach((rear, lastEmmit) -> emmitRearBatch(
rear,
new BatchImpl(item.meta().globalTime(), Collections.singletonList(item))
new BatchImpl(item.meta().globalTime(), Collections.singletonList(item), Collections.emptyMap())
));
} else {
invalidatingBucket.insert(item);
Expand Down Expand Up @@ -77,9 +81,14 @@ public void onMinTime(MinTimeUpdate minTime) {
}
}

public void touch(long time) {
lastGlobalTimeProcessedAt.put(time, Instant.now());
}

private void tryEmmit(GlobalTime upTo) {
final int pos = invalidatingBucket.lowerBound(upTo);

final var lastGlobalTimeProcessedAt = this.lastGlobalTimeProcessedAt.headMap(upTo.time());
rears.forEach((rear, lastEmmit) -> {
final List<DataItem> data = new ArrayList<>();
invalidatingBucket.forRange(0, pos, item -> {
Expand All @@ -89,7 +98,7 @@ private void tryEmmit(GlobalTime upTo) {
});

if (!data.isEmpty() || barrierDisabled) {
emmitRearBatch(rear, new BatchImpl(upTo, data));
emmitRearBatch(rear, new BatchImpl(upTo, data, Map.copyOf(lastGlobalTimeProcessedAt)));
}
});

Expand All @@ -99,6 +108,7 @@ private void tryEmmit(GlobalTime upTo) {
// https://github.com/flame-stream/FlameStream/issues/139
if (!rears.isEmpty()) {
invalidatingBucket.clearRange(0, pos);
lastGlobalTimeProcessedAt.clear();
}
}

Expand All @@ -114,10 +124,12 @@ private void emmitRearBatch(ActorRef rear, BatchImpl batch) {
public static class BatchImpl implements Batch {
private final List<DataItem> items;
private final GlobalTime time;
private final Map<Long, Instant> lastGlobalTimeProcessedAt;

private BatchImpl(GlobalTime time, List<DataItem> items) {
private BatchImpl(GlobalTime time, List<DataItem> items, Map<Long, Instant> lastGlobalTimeProcessedAt) {
this.items = items;
this.time = time;
this.lastGlobalTimeProcessedAt = lastGlobalTimeProcessedAt;
}

@Override
Expand All @@ -134,5 +146,10 @@ public GlobalTime time() {
public Stream<DataItem> payload() {
return items.stream();
}

@Override
public Map<Long, Instant> lastGlobalTimeProcessedAt() {
return lastGlobalTimeProcessedAt;
}
}
}

0 comments on commit c356a64

Please sign in to comment.