Skip to content

Commit

Permalink
nexmark.Generator
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Sokolov committed Jan 29, 2021
1 parent f2d33dc commit 92eb32b
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 51 deletions.
4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[submodule "examples/nexmark"]
path = examples/nexmark
url = https://github.com/nexmark/nexmark/
branch = master
1 change: 1 addition & 0 deletions examples/nexmark
Submodule nexmark added at 4d952a
29 changes: 29 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,35 @@
<artifactId>jblas</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>com.github.nexmark</groupId>
<artifactId>nexmark-flink</artifactId>
<version>0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.11.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.11.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.11.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.11.1</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.spbsu.flamestream.example.nexmark;

import com.github.nexmark.flink.model.Auction;
import com.github.nexmark.flink.model.Person;
import com.spbsu.flamestream.core.graph.SerializableBiFunction;
import com.spbsu.flamestream.core.graph.SerializableFunction;
import com.spbsu.flamestream.example.labels.Flow;
Expand All @@ -10,46 +12,20 @@
import scala.runtime.AbstractFunction1;
import scala.util.Either;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Objects;
import java.util.function.ToLongFunction;
import java.util.stream.Stream;

public class Query8 {
public static class Input {
private Input() {
}

public static class Auction extends Input {
public final String seller;
public final long dateTime;

public Auction(String seller, long dateTime) {
this.seller = seller;
this.dateTime = dateTime;
}
}

public static class Person extends Input {
public final String id;
public final String name;
public final long dateTime;

public Person(String id, String name, long dateTime) {
this.id = id;
this.name = name;
this.dateTime = dateTime;
}
}
}

public static final class PersonGroupingKey {
public final String id;
public final long id;
public final String name;
public final long startTime;

private PersonGroupingKey(String id, String name, long startTime) {
private PersonGroupingKey(long id, String name, long startTime) {
this.id = id;
this.name = name;
this.startTime = startTime;
Expand Down Expand Up @@ -79,10 +55,10 @@ public String toString() {
}

private static final class AuctionGroupingKey {
public final String seller;
public final long seller;
public final long startTime;

private AuctionGroupingKey(String seller, long startTime) {
private AuctionGroupingKey(long seller, long startTime) {
this.seller = seller;
this.startTime = startTime;
}
Expand All @@ -99,7 +75,7 @@ public boolean equals(Object obj) {
}
if (obj instanceof AuctionGroupingKey) {
final var that = (AuctionGroupingKey) obj;
return Objects.equals(seller, that.seller) && startTime == that.startTime;
return seller == that.seller && startTime == that.startTime;
}
return false;
}
Expand All @@ -110,21 +86,21 @@ public String toString() {
}
}

public static Flow<Input, PersonGroupingKey> create(long timeWindow) {
final var inputs = new Operator.Input<>(Input.class);
public static Flow<Object, PersonGroupingKey> create() {
final var inputs = new Operator.Input<>(Object.class);
return new Flow<>(inputs, selectJoinOn(
PersonGroupingKey.class,
(p, a) -> p,
selectGroupBy(
PersonGroupingKey.class,
person -> new PersonGroupingKey(person.id, person.name, tumbleStart(person.dateTime, timeWindow)),
filterType(inputs, Input.Person.class),
person -> new PersonGroupingKey(person.id, person.name, tumbleStart(person.dateTime, 10)),
filterType(inputs, Person.class),
person -> person.startTime
),
selectGroupBy(
AuctionGroupingKey.class,
auction -> new AuctionGroupingKey(auction.seller, tumbleStart(auction.dateTime, timeWindow)),
filterType(inputs, Input.Auction.class),
auction -> new AuctionGroupingKey(auction.seller, tumbleStart(auction.dateTime, 10)),
filterType(inputs, Auction.class),
auction -> auction.startTime
),
p -> p.id, a -> a.seller, p -> p.startTime, a -> a.startTime
Expand Down Expand Up @@ -237,7 +213,7 @@ public Output apply(Right right) {
);
}

private static long tumbleStart(long dateTime, long interval) {
return (dateTime / interval + 1) * interval;
private static long tumbleStart(Instant dateTime, long interval) {
return (dateTime.getEpochSecond() / interval + 1) * interval;
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package com.spbsu.flamestream.example.nexmark;

import com.github.nexmark.flink.model.Auction;
import com.github.nexmark.flink.model.Person;
import com.spbsu.flamestream.core.Graph;
import com.spbsu.flamestream.example.labels.Materializer;
import com.spbsu.flamestream.runtime.FlameRuntime;
import com.spbsu.flamestream.runtime.LocalRuntime;
import com.spbsu.flamestream.runtime.acceptance.FlameAkkaSuite;
import com.spbsu.flamestream.runtime.edge.akka.AkkaFront;
import com.spbsu.flamestream.runtime.edge.akka.AkkaFrontType;
import com.spbsu.flamestream.runtime.edge.akka.AkkaRearType;
import com.spbsu.flamestream.runtime.utils.AwaitResultConsumer;
import org.testng.annotations.Test;

import java.util.List;
import java.time.Instant;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
Expand All @@ -20,25 +21,24 @@
public class Query8Test extends FlameAkkaSuite {
@Test
public void test() throws InterruptedException {
final Graph graph = Materializer.materialize(Query8.create(2));
final Graph graph = Materializer.materialize(Query8.create());

try (final LocalRuntime runtime = new LocalRuntime.Builder().maxElementsInGraph(2)
.millisBetweenCommits(500)
.build()) {
try (final FlameRuntime.Flame flame = runtime.run(graph)) {
final Queue<Query8.Input> input = new ConcurrentLinkedQueue<>();
input.add(new Query8.Input.Person("person", "name", 0));
input.add(new Query8.Input.Auction("person", 1));
input.add(new Query8.Input.Auction("person", 2));
final Queue<Object> input = new ConcurrentLinkedQueue<>();
input.add(new Person(0, "name", null, null, null, null, Instant.ofEpochSecond(0), null));
input.add(new Auction(0, null, null, 0, 0, Instant.ofEpochSecond(1), null, 0, 0, null));
input.add(new Auction(0, null, null, 0, 0, Instant.ofEpochSecond(10), null, 0, 0, null));

final AwaitResultConsumer<Query8.PersonGroupingKey> awaitConsumer =
new AwaitResultConsumer<>(1);
flame.attachRear("rear", new AkkaRearType<>(runtime.system(), Query8.PersonGroupingKey.class))
.forEach(r -> r.addListener(awaitConsumer));
final List<AkkaFront.FrontHandle<Query8.Input>> handles = flame
.attachFront("front", new AkkaFrontType<Query8.Input>(runtime.system()))
.collect(Collectors.toList());
applyDataToAllHandlesAsync(input, handles);
applyDataToAllHandlesAsync(input, flame
.attachFront("front", new AkkaFrontType<>(runtime.system()))
.collect(Collectors.toList()));
awaitConsumer.await(200, TimeUnit.SECONDS);
}
}
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
<module>examples</module>
<module>client</module>
<module>benchmark/flink-benchmark</module>
<module>examples/nexmark/nexmark-flink</module>
</modules>
</project>

0 comments on commit 92eb32b

Please sign in to comment.