Skip to content
This repository has been archived by the owner on Dec 27, 2022. It is now read-only.

Commit

Permalink
Interface name support for Tigon SQL queries
Browse files Browse the repository at this point in the history
  • Loading branch information
achalpandey committed Oct 20, 2014
1 parent f8b27cb commit 20e71fe
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,21 @@ public void create() {
setName("SQLinputFlowlet");
setDescription("Executes an inner join of two streams <uid, name>, <uid, age> and emits <uid, name, age>");
StreamSchema nameSchema = new StreamSchema.Builder()
.setName("nameDataStream")
.addField("uid", GDATFieldType.INT, GDATSlidingWindowAttribute.INCREASING)
.addField("name", GDATFieldType.STRING)
.build();
addJSONInput("nameInput", nameSchema);

StreamSchema ageSchema = new StreamSchema.Builder()
.setName("ageDataStream")
.addField("uid", GDATFieldType.INT, GDATSlidingWindowAttribute.INCREASING)
.addField("age", GDATFieldType.INT)
.build();
addJSONInput("ageInput", ageSchema);

addQuery("userDetails", "SELECT nI.uid, nI.name, aI.age INNER_JOIN " +
"FROM [nameInput].nameInput nI, [ageInput].ageInput aI WHERE nI.uid = aI.uid AND aI.age > 40");
"FROM nameInput.nameDataStream nI, ageInput.ageDataStream aI WHERE nI.uid = aI.uid AND aI.age > 40");
}

@QueryOutput("userDetails")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ public interface StreamSchema {

List<GDATField> getFields();

String getName();

/**
* Builder for creating instance of {@link StreamSchema}.
*/
static final class Builder {

private Set<String> fieldNames = Sets.newHashSet();
private List<GDATField> fields = Lists.newArrayList();
private String name;

private void fieldCheck(String name) {
Preconditions.checkArgument(name != null, "Field name cannot be null.");
Expand All @@ -59,8 +62,13 @@ public Builder addField(String name, GDATFieldType fieldType, GDATSlidingWindowA
return this;
}

public Builder setName(String name) {
this.name = name;
return this;
}

public StreamSchema build() {
return new DefaultStreamSchema(fields);
return new DefaultStreamSchema(name, fields);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
*/
public class DefaultStreamSchema implements StreamSchema {
private List<GDATField> fields;
private String name;

public DefaultStreamSchema(List<GDATField> fields) {
public DefaultStreamSchema(String name, List<GDATField> fields) {
this.name = name;
this.fields = fields;
}

Expand All @@ -36,4 +38,8 @@ public List<GDATField> getFields() {
return fields;
}

@Override
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public Map<String, String> generateQueryFiles() {
}

public Map.Entry<String, String> generateHostIfq() {
String contents = createLocalHostIfq(spec.getInputSchemas());
String contents = createLocalHostIfq();
return Maps.immutableEntry(HOSTNAME, contents);
}

Expand All @@ -79,28 +79,8 @@ private String createIfresXML(Set<String> inputNames) {
return stringBuilder.toString();
}

/**
* This function generates the content for *.ifq file. It generates one interface for each schema.
* The interface name is auto-generated and is the same as the schema name.
* The ifq file data is of the format:
*
* [interface set name 1] : [predicate 1] ;
* [interface set name n-1] : [predicate n-1] ;
* [interface set name n] : [predicate n]
*
* Note : The last line must not end with a semi-colon
*
* @param schemaMap Map containing input schemas
* @return String that contains the file content for *.ifq file
*/
private String createLocalHostIfq(Map<String, Map.Entry<InputStreamFormat, StreamSchema>> schemaMap) {
StringBuilder stringBuilder = new StringBuilder();
for (String streamName : schemaMap.keySet()) {
// For now interface set names are same as the Schema names (one-to-one mapping)
stringBuilder.append(streamName).append(" : ").append("Contains[Filename, ").append(streamName).append("];")
.append(Constants.NEWLINE);
}
return stringBuilder.toString().substring(0, stringBuilder.length() - 2);
private String createLocalHostIfq() {
return "default : NOT Contains[InterfaceType, GDAT]";
}

private String createOutputSpec(Map<String, String> sql) {
Expand All @@ -113,8 +93,10 @@ private String createOutputSpec(Map<String, String> sql) {

private String createPacketSchema(Map<String, Map.Entry<InputStreamFormat, StreamSchema>> schemaMap) {
StringBuilder stringBuilder = new StringBuilder();
for (String streamName : schemaMap.keySet()) {
stringBuilder.append(createProtocol(streamName, schemaMap.get(streamName).getValue()));
for (Map.Entry<String, Map.Entry<InputStreamFormat, StreamSchema>> mapEntry : schemaMap.entrySet()) {
String interfaceName = mapEntry.getKey();
String schemaName = mapEntry.getValue().getValue().getName();
stringBuilder.append(createProtocol(schemaName, schemaMap.get(interfaceName).getValue()));
}
return stringBuilder.toString();
}
Expand Down
Binary file modified tigon-sql/src/main/resources/StreamLib_x64_osx.tar.gz
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public static void beforeClass() throws Exception {
final int port = getRandomPort();
int tcpPort = getRandomPort();
runtimeArgs.put(Constants.HTTP_PORT, Integer.toString(port));
runtimeArgs.put(Constants.TCP_INGESTION_PORT_PREFIX + "intInput", Integer.toString(tcpPort));
runtimeArgs.put(Constants.TCP_INGESTION_PORT_PREFIX + "inputInterface", Integer.toString(tcpPort));
flowManager = deployFlow(SQLFlow.class, runtimeArgs);
TimeUnit.SECONDS.sleep(60);
ingestData = new Thread(new Runnable() {
Expand All @@ -109,7 +109,7 @@ public void run() {
for (int j = 1; j <= i; j++) {
try {
// TODO eliminate org.apache.http dependency TIGON-5
HttpPost httpPost = new HttpPost("http://localhost:" + port + "/v1/tigon/intInput");
HttpPost httpPost = new HttpPost("http://localhost:" + port + "/v1/tigon/inputInterface");
JsonObject bodyJson = new JsonObject();
JsonArray dataArray = new JsonArray();
dataArray.add(new JsonPrimitive(Integer.toString(i)));
Expand Down Expand Up @@ -190,11 +190,12 @@ public void create() {
setName("Summation");
setDescription("sums up the input value over a timewindow");
StreamSchema schema = new StreamSchema.Builder()
.setName("intInput")
.addField("timestamp", GDATFieldType.LONG, GDATSlidingWindowAttribute.INCREASING)
.addField("intStream", GDATFieldType.INT)
.build();
addJSONInput("intInput", schema);
addQuery("sumOut", "SELECT timestamp, SUM(intStream) AS sumValue FROM [intInput].intInput GROUP BY timestamp");
addJSONInput("inputInterface", schema);
addQuery("sumOut","SELECT timestamp, SUM(intStream) AS sumValue FROM inputInterface.intInput GROUP BY timestamp");
}

@QueryOutput("sumOut")
Expand Down

0 comments on commit 20e71fe

Please sign in to comment.