diff --git a/pom.xml b/pom.xml index 450f343b..7de771cc 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ co.cask.tigon tigon - 0.2.0 + 0.2.1 pom Tigon https://github.com/caskdata/tigon @@ -772,7 +772,7 @@ org.apache.maven.plugins maven-surefire-plugin - 2.14.1 + 2.15 -Xmx4096m -Djava.awt.headless=true -XX:MaxPermSize=256m ${surefire.redirectTestOutputToFile} @@ -886,7 +886,7 @@ org.apache.maven.plugins maven-surefire-plugin - 2.14.1 + 2.15 org.apache.rat @@ -1122,7 +1122,7 @@ org.apache.maven.plugins maven-surefire-plugin - 2.14.1 + 2.15 ${argLine} -Xmx4096m -Djava.awt.headless=true -XX:MaxPermSize=256m @@ -1224,7 +1224,7 @@ org.apache.maven.plugins maven-surefire-report-plugin - 2.14.1 + 2.15 diff --git a/tigon-api/pom.xml b/tigon-api/pom.xml index 125c6377..9e3a9d87 100644 --- a/tigon-api/pom.xml +++ b/tigon-api/pom.xml @@ -21,7 +21,7 @@ tigon co.cask.tigon - 0.2.0 + 0.2.1 4.0.0 diff --git a/tigon-archetypes/pom.xml b/tigon-archetypes/pom.xml index c37f2d99..ff317042 100644 --- a/tigon-archetypes/pom.xml +++ b/tigon-archetypes/pom.xml @@ -21,7 +21,7 @@ tigon co.cask.tigon - 0.2.0 + 0.2.1 4.0.0 diff --git a/tigon-archetypes/tigon-app-archetype/pom.xml b/tigon-archetypes/tigon-app-archetype/pom.xml index aa062afc..78e7286c 100644 --- a/tigon-archetypes/tigon-app-archetype/pom.xml +++ b/tigon-archetypes/tigon-app-archetype/pom.xml @@ -22,7 +22,7 @@ co.cask.tigon tigon-archetypes - 0.2.0 + 0.2.1 tigon-app-archetype diff --git a/tigon-archetypes/tigon-app-archetype/src/main/resources/archetype-resources/pom.xml b/tigon-archetypes/tigon-app-archetype/src/main/resources/archetype-resources/pom.xml index 794b6505..670df21f 100644 --- a/tigon-archetypes/tigon-app-archetype/src/main/resources/archetype-resources/pom.xml +++ b/tigon-archetypes/tigon-app-archetype/src/main/resources/archetype-resources/pom.xml @@ -28,7 +28,7 @@ ${package}.CountRandom UTF-8 - 0.2.0 + 0.2.1 1.7.5 4.11 @@ -140,7 +140,7 @@ org.apache.maven.plugins maven-surefire-plugin - 2.14.1 + 2.15 org.apache.felix diff --git a/tigon-client/pom.xml b/tigon-client/pom.xml index 49c3ebbf..e3f24137 100644 --- a/tigon-client/pom.xml +++ b/tigon-client/pom.xml @@ -20,7 +20,7 @@ tigon co.cask.tigon - 0.2.0 + 0.2.1 4.0.0 diff --git a/tigon-client/src/main/java/co/cask/tigon/DistributedMain.java b/tigon-client/src/main/java/co/cask/tigon/DistributedMain.java index c3994e76..66ca77c9 100644 --- a/tigon-client/src/main/java/co/cask/tigon/DistributedMain.java +++ b/tigon-client/src/main/java/co/cask/tigon/DistributedMain.java @@ -33,6 +33,7 @@ import co.cask.tigon.guice.ZKClientModule; import co.cask.tigon.metrics.MetricsCollectionService; import co.cask.tigon.metrics.NoOpMetricsCollectionService; +import co.cask.tigon.utils.ProjectInfo; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -214,7 +215,7 @@ public void startUp(PrintStream out) throws Exception { } else if (cmd.equals(CLICommands.SERVICEINFO)) { out.println(StringUtils.join(flowOperations.getServices(args[1]), "\n")); } else if (cmd.equals(CLICommands.VERSION)) { - out.println(Constants.VERSION); + out.println(ProjectInfo.getVersion().getBuildVersion()); } else if (cmd.equals(CLICommands.HELP)) { try { out.println(CLICommands.valueOf(args[1].toUpperCase()).printHelp()); diff --git a/tigon-common/pom.xml b/tigon-common/pom.xml index 76287cf0..1dd861ae 100644 --- a/tigon-common/pom.xml +++ b/tigon-common/pom.xml @@ -20,7 +20,7 @@ tigon co.cask.tigon - 0.2.0 + 0.2.1 4.0.0 diff --git a/tigon-common/src/main/java/co/cask/tigon/conf/Constants.java b/tigon-common/src/main/java/co/cask/tigon/conf/Constants.java index 3348da46..17ff5b5a 100644 --- a/tigon-common/src/main/java/co/cask/tigon/conf/Constants.java +++ b/tigon-common/src/main/java/co/cask/tigon/conf/Constants.java @@ -20,9 +20,6 @@ * Constants used by different systems are all defined here. */ public final class Constants { - - public static final String VERSION = "0.1.0"; - /** * HDFS Namespaces used in Distributed Mode. */ diff --git a/tigon-common/src/main/java/co/cask/tigon/utils/Networks.java b/tigon-common/src/main/java/co/cask/tigon/utils/Networks.java new file mode 100644 index 00000000..cd42960c --- /dev/null +++ b/tigon-common/src/main/java/co/cask/tigon/utils/Networks.java @@ -0,0 +1,45 @@ +/* + * Copyright © 2014 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + + +package co.cask.tigon.utils; + +import java.io.IOException; +import java.net.ServerSocket; + +/** + * Utility class to provide methods for common network related operations. + */ +public final class Networks { + + /** + * Find a random free port in localhost for binding. + * @return A port number or -1 for failure. + */ + public static int getRandomPort() { + try { + ServerSocket socket = new ServerSocket(0); + try { + return socket.getLocalPort(); + } finally { + socket.close(); + } + } catch (IOException e) { + return -1; + } + } +} + diff --git a/tigon-common/src/main/java/co/cask/tigon/utils/ProjectInfo.java b/tigon-common/src/main/java/co/cask/tigon/utils/ProjectInfo.java index 5edde0b7..6458ebff 100644 --- a/tigon-common/src/main/java/co/cask/tigon/utils/ProjectInfo.java +++ b/tigon-common/src/main/java/co/cask/tigon/utils/ProjectInfo.java @@ -150,6 +150,13 @@ public long getBuildTime() { return buildTime; } + public String getBuildVersion() { + if (isSnapshot()) { + return String.format("%d.%d.%d-SNAPSHOT", major, minor, fix); + } + return String.format("%d.%d.%d", major, minor, fix); + } + @Override public String toString() { if (isSnapshot()) { diff --git a/tigon-distribution/pom.xml b/tigon-distribution/pom.xml index 6d478ddd..525badb2 100644 --- a/tigon-distribution/pom.xml +++ b/tigon-distribution/pom.xml @@ -21,7 +21,7 @@ tigon co.cask.tigon - 0.2.0 + 0.2.1 4.0.0 diff --git a/tigon-docs/developer-guide/source/developer.rst b/tigon-docs/developer-guide/source/developer.rst index d553fef7..427ea649 100644 --- a/tigon-docs/developer-guide/source/developer.rst +++ b/tigon-docs/developer-guide/source/developer.rst @@ -578,16 +578,29 @@ schema: - STRING The Builder’s ``addField`` method takes the name of the field, the field type and the -``SlidingWindowAttribute``. The sliding window attribute is used to annotate that a field is -monotonically increasing or decreasing. A field with this attribute set to +``SlidingWindowAttribute``. The sliding window attribute is used to annotate that a field +is monotonically increasing or decreasing. A field with this attribute set to increasing or decreasing might be required for certain SQL queries; for example, "GROUP BY *increasingField*". Once one or more ``StreamSchemas`` are created, they are added as an input using the -``addJSONInput`` method. This method takes the name of the input stream and the schema of -the stream. Once the inputs streams have been added, one or more SQL queries can be -defined using an ``addQuery`` method. The ``addQuery`` method takes the name of the query -and the SQL statement. +``addJSONInput`` method. This method takes the name of an input stream (an interface) and +the associated schema object. Once all the input streams have been added, one +or more SQL queries can be defined using an ``addQuery`` method. The ``addQuery`` method +takes the name of the query and the SQL statement. + +Conceptually, an interface represents the TCP end-point at which the data stream is +ingested and this data is interpreted by all the schemas associated with this +end-point. Currently, Tigon supports only one schema per interface. This restriction +may be removed in a subsequent Tigon release. + +The ``FROM`` clause in Tigon SQL queries should be followed by a data source name that +follows the format of *interfaceName.schemaName*. In the example below ``intInput`` is +the schema name and ``inputStream`` is the interface name. To access ``intInput`` the +data source is referenced as ``inputStream.intInput``. + +For more information on interface and interface sets, please refer to the :doc:`Tigon SQL +User Manual. ` The output of the SQL queries will be POJOs, whose output class you can define. The names of the members of the output class should match the names used in the SQL query @@ -608,11 +621,12 @@ that method or emit the object to a subsequent Flowlet. In the example given bel setName("Summation"); setDescription("Sums up the input value over a timewindow"); StreamSchema schema = new StreamSchema.Builder() + .setName("intInput") .addField("timestamp", GDATFieldType.LONG, GDATSlidingWindowAttribute.INCREASING) .addField("intStream", GDATFieldType.INT) .build(); - addJSONInput("intInput", schema); - addQuery("sumOut", "SELECT timestamp, SUM(intStream) AS sumValue FROM intInput GROUP BY timestamp"); + addJSONInput("inputStream", schema); + addQuery("sumOut", "SELECT timestamp, SUM(intStream) AS sumValue FROM inputStream.intInput GROUP BY timestamp"); } @QueryOutput("sumOut") diff --git a/tigon-examples/HelloWorld/pom.xml b/tigon-examples/HelloWorld/pom.xml index 11722056..bfa2a2fd 100644 --- a/tigon-examples/HelloWorld/pom.xml +++ b/tigon-examples/HelloWorld/pom.xml @@ -21,7 +21,7 @@ co.cask.tigon tigon-examples - 0.2.0 + 0.2.1 4.0.0 diff --git a/tigon-examples/SQLJoinFlow/pom.xml b/tigon-examples/SQLJoinFlow/pom.xml index 17e3825a..d8e13fba 100644 --- a/tigon-examples/SQLJoinFlow/pom.xml +++ b/tigon-examples/SQLJoinFlow/pom.xml @@ -21,7 +21,7 @@ co.cask.tigon tigon-examples - 0.2.0 + 0.2.1 4.0.0 diff --git a/tigon-examples/SQLJoinFlow/src/main/java/co/cask/tigon/sqljoinflow/SQLJoinFlow.java b/tigon-examples/SQLJoinFlow/src/main/java/co/cask/tigon/sqljoinflow/SQLJoinFlow.java index 4f862762..089babef 100644 --- a/tigon-examples/SQLJoinFlow/src/main/java/co/cask/tigon/sqljoinflow/SQLJoinFlow.java +++ b/tigon-examples/SQLJoinFlow/src/main/java/co/cask/tigon/sqljoinflow/SQLJoinFlow.java @@ -59,21 +59,23 @@ public static class SQLInputFlowlet extends AbstractInputFlowlet { @Override public void create() { setName("SQLinputFlowlet"); - setDescription("blah blah"); + setDescription("Executes an inner join of two streams , and emits "); StreamSchema nameSchema = new StreamSchema.Builder() + .setName("nameDataStream") .addField("uid", GDATFieldType.INT, GDATSlidingWindowAttribute.INCREASING) .addField("name", GDATFieldType.STRING) .build(); addJSONInput("nameInput", nameSchema); StreamSchema ageSchema = new StreamSchema.Builder() + .setName("ageDataStream") .addField("uid", GDATFieldType.INT, GDATSlidingWindowAttribute.INCREASING) .addField("age", GDATFieldType.INT) .build(); addJSONInput("ageInput", ageSchema); addQuery("userDetails", "SELECT nI.uid, nI.name, aI.age INNER_JOIN " + - "FROM nameInput nI, ageInput aI WHERE nI.uid = aI.uid AND aI.age > 40"); + "FROM nameInput.nameDataStream nI, ageInput.ageDataStream aI WHERE nI.uid = aI.uid AND aI.age > 40"); } @QueryOutput("userDetails") diff --git a/tigon-examples/TwitterAnalytics/pom.xml b/tigon-examples/TwitterAnalytics/pom.xml index 95579aed..dbe03b5f 100644 --- a/tigon-examples/TwitterAnalytics/pom.xml +++ b/tigon-examples/TwitterAnalytics/pom.xml @@ -21,7 +21,7 @@ tigon-examples co.cask.tigon - 0.2.0 + 0.2.1 4.0.0 diff --git a/tigon-examples/deploy_pom.xml b/tigon-examples/deploy_pom.xml index 227697f4..ae513c06 100644 --- a/tigon-examples/deploy_pom.xml +++ b/tigon-examples/deploy_pom.xml @@ -178,7 +178,7 @@ org.apache.maven.plugins maven-surefire-plugin - 2.14.1 + 2.15 -Xmx2048m diff --git a/tigon-examples/pom.xml b/tigon-examples/pom.xml index 1f253464..ca0272c1 100644 --- a/tigon-examples/pom.xml +++ b/tigon-examples/pom.xml @@ -23,7 +23,7 @@ co.cask.tigon tigon - 0.2.0 + 0.2.1 @@ -192,7 +192,7 @@ org.apache.maven.plugins maven-surefire-plugin - 2.14.1 + 2.15 -Xmx2048m diff --git a/tigon-flow/pom.xml b/tigon-flow/pom.xml index 23e95d89..32410251 100644 --- a/tigon-flow/pom.xml +++ b/tigon-flow/pom.xml @@ -20,7 +20,7 @@ tigon co.cask.tigon - 0.2.0 + 0.2.1 4.0.0 diff --git a/tigon-hbase-compat-0.94/pom.xml b/tigon-hbase-compat-0.94/pom.xml index ba4820f3..991d82e1 100644 --- a/tigon-hbase-compat-0.94/pom.xml +++ b/tigon-hbase-compat-0.94/pom.xml @@ -20,7 +20,7 @@ tigon co.cask.tigon - 0.2.0 + 0.2.1 4.0.0 diff --git a/tigon-hbase-compat-0.96/pom.xml b/tigon-hbase-compat-0.96/pom.xml index dacfadc7..bef5bff2 100644 --- a/tigon-hbase-compat-0.96/pom.xml +++ b/tigon-hbase-compat-0.96/pom.xml @@ -20,7 +20,7 @@ tigon co.cask.tigon - 0.2.0 + 0.2.1 4.0.0 diff --git a/tigon-queue/pom.xml b/tigon-queue/pom.xml index e5593506..d0033e36 100644 --- a/tigon-queue/pom.xml +++ b/tigon-queue/pom.xml @@ -20,7 +20,7 @@ co.cask.tigon tigon - 0.2.0 + 0.2.1 4.0.0 diff --git a/tigon-sql/pom.xml b/tigon-sql/pom.xml index b3619abd..818f7bb6 100644 --- a/tigon-sql/pom.xml +++ b/tigon-sql/pom.xml @@ -20,7 +20,7 @@ tigon co.cask.tigon - 0.2.0 + 0.2.1 4.0.0 diff --git a/tigon-sql/src/main/c/ftacmp/analyze_fta.cc b/tigon-sql/src/main/c/ftacmp/analyze_fta.cc index f9a5318e..05e9136a 100644 --- a/tigon-sql/src/main/c/ftacmp/analyze_fta.cc +++ b/tigon-sql/src/main/c/ftacmp/analyze_fta.cc @@ -4063,7 +4063,11 @@ query_summary_class *analyze_fta(table_exp_t *fta_tree, table_list *schema, if(tbl_vec[i]->get_ifq()){ ifstr = "["+tbl_vec[i]->get_interface()+"]"; }else{ - ifstr = "''"+tbl_vec[i]->get_machine()+"''."+tbl_vec[i]->get_interface(); + if(tbl_vec[i]->get_machine() != "localhost"){ + ifstr = "'"+tbl_vec[i]->get_machine()+"'."+tbl_vec[i]->get_interface(); + }else{ + ifstr = tbl_vec[i]->get_interface(); + } } //printf("ifstr is %s, i=%d, machine=%s, interface=%s\n",ifstr.c_str(),i,tbl_vec[i]->get_machine().c_str(),tbl_vec[i]->get_interface().c_str()); if(qs->definitions.count("_referenced_ifaces")){ diff --git a/tigon-sql/src/main/c/ftacmp/parse_fta.h b/tigon-sql/src/main/c/ftacmp/parse_fta.h index 52849c3b..0dfc3f37 100644 --- a/tigon-sql/src/main/c/ftacmp/parse_fta.h +++ b/tigon-sql/src/main/c/ftacmp/parse_fta.h @@ -662,7 +662,7 @@ class tablevar_t{ std::string to_string(){ std::string retval; - if(machine != "") + if(machine != "" && !iface_is_query) retval += "'"+machine+"'."; if(interface != ""){ if(iface_is_query){ diff --git a/tigon-sql/src/main/c/ftacmp/translate_fta.cc b/tigon-sql/src/main/c/ftacmp/translate_fta.cc index 0f0260e6..e3fc533c 100644 --- a/tigon-sql/src/main/c/ftacmp/translate_fta.cc +++ b/tigon-sql/src/main/c/ftacmp/translate_fta.cc @@ -205,7 +205,7 @@ int main(int argc, char **argv){ "\t[-P] : link with PADS\n" "\t[-h] : override host name.\n" "\t[-c] : clean out Makefile and hfta_*.cc first.\n" - "\t[-R] : path to root of STREAMING\n" + "\t[-R] : path to root of tigon\n" ; // parameters gathered from command line processing @@ -1846,6 +1846,8 @@ for(q=0;q tvec = split_queries[l]->query_plan[0]->get_input_tbls(); string liface = tvec[0]->get_interface(); string lmach = tvec[0]->get_machine(); + if (lmach == "") + lmach = hostname; interface_names.push_back(liface); machine_names.push_back(lmach); //printf("Machine is %s\n",lmach.c_str()); @@ -2390,6 +2392,10 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e // iterate through interface properties vector iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str); + if (erri) { + fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str()); + exit(1); + } if (iface_properties.empty()) lfta_val[lmach] += "\t\treturn NULL;\n"; else { diff --git a/tigon-sql/src/main/c/lib/gscprts/rts_main.c b/tigon-sql/src/main/c/lib/gscprts/rts_main.c index 17977814..96a19360 100644 --- a/tigon-sql/src/main/c/lib/gscprts/rts_main.c +++ b/tigon-sql/src/main/c/lib/gscprts/rts_main.c @@ -146,7 +146,10 @@ int main (int argc, char* argv[]) { lmap[lmapcnt]=device[x]; lmapcnt++; - interfacetype=get_iface_properties(device[x],"interfacetype"); + if ((interfacetype=get_iface_properties(device[x],"interfacetype"))==0) { + gslog(LOG_EMERG,"Interface Type not configured but required"); + exit(1); + } if (strncmp(interfacetype,"CSV",3)==0) { main_csv(x,device[x],lmapcnt,lmap); diff --git a/tigon-sql/src/main/java/co/cask/tigon/sql/flowlet/AbstractInputFlowlet.java b/tigon-sql/src/main/java/co/cask/tigon/sql/flowlet/AbstractInputFlowlet.java index deb5822f..ad1bf5d4 100644 --- a/tigon-sql/src/main/java/co/cask/tigon/sql/flowlet/AbstractInputFlowlet.java +++ b/tigon-sql/src/main/java/co/cask/tigon/sql/flowlet/AbstractInputFlowlet.java @@ -302,7 +302,11 @@ public void destroy() { */ @Override public void notifyFailure(Set errorProcessNames) { - LOG.info("Missing Pings From : " + errorProcessNames.toString()); + if (errorProcessNames != null) { + LOG.warn("Missing pings from : " + errorProcessNames.toString()); + } else { + LOG.warn("No heartbeats registered"); + } healthInspector.stopAndWait(); healthInspector = new HealthInspector(this); inputFlowletService.restartService(healthInspector); @@ -312,6 +316,10 @@ public void notifyFailure(Set errorProcessNames) { @Override public void announceReady() { FlowletContext ctx = getContext(); + if (portsAnnouncementList.size() > 0) { + // Ingestion end-points have already been announced + return; + } for (String key : dataIngestionPortsMap.keySet()) { portsAnnouncementList.add(ctx.announce(key, inputFlowletService.getDataPort(key))); LOG.info("Announced Data Port {} - {}", key, inputFlowletService.getDataPort(key)); diff --git a/tigon-sql/src/main/java/co/cask/tigon/sql/flowlet/StreamSchema.java b/tigon-sql/src/main/java/co/cask/tigon/sql/flowlet/StreamSchema.java index 3caa7b7b..07af9c58 100644 --- a/tigon-sql/src/main/java/co/cask/tigon/sql/flowlet/StreamSchema.java +++ b/tigon-sql/src/main/java/co/cask/tigon/sql/flowlet/StreamSchema.java @@ -32,6 +32,8 @@ public interface StreamSchema { List getFields(); + String getName(); + /** * Builder for creating instance of {@link StreamSchema}. */ @@ -39,6 +41,7 @@ static final class Builder { private Set fieldNames = Sets.newHashSet(); private List fields = Lists.newArrayList(); + private String name; private void fieldCheck(String name) { Preconditions.checkArgument(name != null, "Field name cannot be null."); @@ -59,8 +62,13 @@ public Builder addField(String name, GDATFieldType fieldType, GDATSlidingWindowA return this; } + public Builder setName(String name) { + this.name = name; + return this; + } + public StreamSchema build() { - return new DefaultStreamSchema(fields); + return new DefaultStreamSchema(name, fields); } } } diff --git a/tigon-sql/src/main/java/co/cask/tigon/sql/internal/DefaultStreamSchema.java b/tigon-sql/src/main/java/co/cask/tigon/sql/internal/DefaultStreamSchema.java index 56a9c9fb..e95680e1 100644 --- a/tigon-sql/src/main/java/co/cask/tigon/sql/internal/DefaultStreamSchema.java +++ b/tigon-sql/src/main/java/co/cask/tigon/sql/internal/DefaultStreamSchema.java @@ -26,8 +26,10 @@ */ public class DefaultStreamSchema implements StreamSchema { private List fields; + private String name; - public DefaultStreamSchema(List fields) { + public DefaultStreamSchema(String name, List fields) { + this.name = name; this.fields = fields; } @@ -36,4 +38,8 @@ public List getFields() { return fields; } + @Override + public String getName() { + return name; + } } diff --git a/tigon-sql/src/main/java/co/cask/tigon/sql/internal/StreamConfigGenerator.java b/tigon-sql/src/main/java/co/cask/tigon/sql/internal/StreamConfigGenerator.java index 436d5d14..ad116372 100644 --- a/tigon-sql/src/main/java/co/cask/tigon/sql/internal/StreamConfigGenerator.java +++ b/tigon-sql/src/main/java/co/cask/tigon/sql/internal/StreamConfigGenerator.java @@ -60,7 +60,7 @@ public Map generateQueryFiles() { } public Map.Entry generateHostIfq() { - String contents = createLocalHostIfq(HOSTNAME); + String contents = createLocalHostIfq(); return Maps.immutableEntry(HOSTNAME, contents); } @@ -79,8 +79,8 @@ private String createIfresXML(Set inputNames) { return stringBuilder.toString(); } - private String createLocalHostIfq(String hostname) { - return "default : Contains[InterfaceType, GDAT]"; + private String createLocalHostIfq() { + return "default : NOT Contains[InterfaceType, GDAT]"; } private String createOutputSpec(Map sql) { @@ -93,8 +93,10 @@ private String createOutputSpec(Map sql) { private String createPacketSchema(Map> schemaMap) { StringBuilder stringBuilder = new StringBuilder(); - for (String streamName : schemaMap.keySet()) { - stringBuilder.append(createProtocol(streamName, schemaMap.get(streamName).getValue())); + for (Map.Entry> mapEntry : schemaMap.entrySet()) { + String interfaceName = mapEntry.getKey(); + String schemaName = mapEntry.getValue().getValue().getName(); + stringBuilder.append(createProtocol(schemaName, schemaMap.get(interfaceName).getValue())); } return stringBuilder.toString(); } diff --git a/tigon-sql/src/main/java/co/cask/tigon/sql/manager/HubHttpHandler.java b/tigon-sql/src/main/java/co/cask/tigon/sql/manager/HubHttpHandler.java index a972266e..ebe9dcee 100644 --- a/tigon-sql/src/main/java/co/cask/tigon/sql/manager/HubHttpHandler.java +++ b/tigon-sql/src/main/java/co/cask/tigon/sql/manager/HubHttpHandler.java @@ -192,8 +192,6 @@ public void announceStreamSubscription(HttpRequest request, HttpResponder respon current = hubDataStoreReference.get(); hds = new HubDataStore.Builder(current).outputReady().build(); } while (!hubDataStoreReference.compareAndSet(current, hds)); - // Invokes Event Listener. All GSEXIT processes have completed /announce-stream-processing - listener.announceReady(); } responder.sendStatus(HttpResponseStatus.OK); } @@ -310,6 +308,8 @@ public void discoverStartProcessing(HttpRequest request, HttpResponder responder if ((hubDataStoreReference.get().isOutputReady()) && (hubDataStoreReference.get().getInstanceName().equals(instance))) { responder.sendJson(HttpResponseStatus.OK, new JsonObject()); + // Invokes Event Listener. All GSEXIT processes have completed /announce-stream-processing + listener.announceReady(); return; } //Instance name does not match diff --git a/tigon-sql/src/main/resources/StreamLib_x64_linux.tar.gz b/tigon-sql/src/main/resources/StreamLib_x64_linux.tar.gz index 48c98d37..0a334ebd 100644 Binary files a/tigon-sql/src/main/resources/StreamLib_x64_linux.tar.gz and b/tigon-sql/src/main/resources/StreamLib_x64_linux.tar.gz differ diff --git a/tigon-sql/src/main/resources/StreamLib_x64_osx.tar.gz b/tigon-sql/src/main/resources/StreamLib_x64_osx.tar.gz index 17cb206b..ab596dbb 100644 Binary files a/tigon-sql/src/main/resources/StreamLib_x64_osx.tar.gz and b/tigon-sql/src/main/resources/StreamLib_x64_osx.tar.gz differ diff --git a/tigon-sql/src/test/java/co/cask/tigon/sql/io/DataRouterTest.java b/tigon-sql/src/test/java/co/cask/tigon/sql/io/DataRouterTest.java index 81143008..62211300 100644 --- a/tigon-sql/src/test/java/co/cask/tigon/sql/io/DataRouterTest.java +++ b/tigon-sql/src/test/java/co/cask/tigon/sql/io/DataRouterTest.java @@ -16,6 +16,7 @@ package co.cask.tigon.sql.io; +import co.cask.tigon.utils.Networks; import com.google.common.collect.Maps; import org.apache.http.HttpHost; import org.apache.http.client.methods.HttpPost; @@ -38,7 +39,6 @@ import java.io.IOException; import java.net.InetSocketAddress; -import java.net.ServerSocket; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -87,13 +87,7 @@ public ChannelPipeline getPipeline() throws Exception { @Test public void testDataRouter() throws Exception { - ServerSocket socket = new ServerSocket(0); - int port = -1; - try { - port = socket.getLocalPort(); - } finally { - socket.close(); - } + int port = Networks.getRandomPort(); DataIngestionRouter router = new DataIngestionRouter(serverMap, port); try { router.startAndWait(); @@ -111,7 +105,6 @@ public void testDataRouter() throws Exception { Assert.assertEquals(1, testMap.get("stream3").intValue()); Assert.assertTrue(!testMap.containsKey("stream4")); } finally { - socket.close(); router.stopAndWait(); } } diff --git a/tigon-unit-test/pom.xml b/tigon-unit-test/pom.xml index 61b66362..dc1268ee 100644 --- a/tigon-unit-test/pom.xml +++ b/tigon-unit-test/pom.xml @@ -5,7 +5,7 @@ tigon co.cask.tigon - 0.2.0 + 0.2.1 4.0.0 @@ -23,7 +23,6 @@ co.cask.tigon tigon-sql ${project.version} - test junit @@ -38,7 +37,6 @@ co.cask.http netty-http - test org.apache.hadoop diff --git a/tigon-unit-test/src/main/java/co/cask/tigon/test/DefaultFlowManager.java b/tigon-unit-test/src/main/java/co/cask/tigon/test/DefaultFlowManager.java index 9fb0a221..02f4bbc0 100644 --- a/tigon-unit-test/src/main/java/co/cask/tigon/test/DefaultFlowManager.java +++ b/tigon-unit-test/src/main/java/co/cask/tigon/test/DefaultFlowManager.java @@ -19,6 +19,7 @@ import co.cask.tigon.internal.app.runtime.ProgramController; import co.cask.tigon.internal.app.runtime.ProgramOptionConstants; import com.google.common.collect.Maps; +import org.apache.twill.discovery.ServiceDiscovered; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,4 +52,9 @@ public void stop() { LOG.warn(e.getMessage(), e); } } + + @Override + public ServiceDiscovered discover(String service) { + return controller.discover(service); + } } diff --git a/tigon-unit-test/src/main/java/co/cask/tigon/test/FlowManager.java b/tigon-unit-test/src/main/java/co/cask/tigon/test/FlowManager.java index 5d70ae68..ca24552f 100644 --- a/tigon-unit-test/src/main/java/co/cask/tigon/test/FlowManager.java +++ b/tigon-unit-test/src/main/java/co/cask/tigon/test/FlowManager.java @@ -16,6 +16,8 @@ package co.cask.tigon.test; +import org.apache.twill.discovery.ServiceDiscovered; + /** * Instance for this class is for managing a running {@link co.cask.tigon.api.flow.Flow}. */ @@ -33,4 +35,11 @@ public interface FlowManager { * Stops the running flow. */ void stop(); + + /** + * Discover a service announced by the Flow. + * @param service Name of the Service. + * @return A {@link ServiceDiscovered} + */ + ServiceDiscovered discover(String service); } diff --git a/tigon-unit-test/src/main/java/co/cask/tigon/test/SQLFlowTestBase.java b/tigon-unit-test/src/main/java/co/cask/tigon/test/SQLFlowTestBase.java new file mode 100644 index 00000000..0cd4c3e3 --- /dev/null +++ b/tigon-unit-test/src/main/java/co/cask/tigon/test/SQLFlowTestBase.java @@ -0,0 +1,228 @@ +/* + * Copyright © 2014 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + + +package co.cask.tigon.test; + +import co.cask.http.AbstractHttpHandler; +import co.cask.http.HttpResponder; +import co.cask.http.NettyHttpService; +import co.cask.tigon.api.flow.Flow; +import co.cask.tigon.sql.conf.Constants; +import co.cask.tigon.utils.Networks; +import com.google.common.base.Charsets; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; +import com.google.gson.Gson; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.util.EntityUtils; +import org.jboss.netty.handler.codec.http.HttpRequest; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.junit.AfterClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; + +/** + * This is a base class that provides all the common functionality to test a Tigon SQL Flow. + * The contract to be followed is that the last flowlet of the Flow that is being tested, pings output data packets to + * an http end-point specified in the runtime args as "baseURL". + */ +public class SQLFlowTestBase extends TestBase { + private static FlowManager flowManager; + private static List ingestionThreads = Lists.newArrayList(); + static CountDownLatch latch; + private static TestHandler handler; + private static NettyHttpService service; + private static String serviceURL; + private static final int httpPort = Networks.getRandomPort(); + private static final Gson GSON = new Gson(); + + /** + * This function deploys an instance of the flowClass. + * Recommended, make this function call from a {@link org.junit.BeforeClass} annotated method. + * @param flowClass Class of the {@link co.cask.tigon.api.flow.Flow} to be deployed + * @throws Exception + */ + public static void setupFlow(Class flowClass) throws Exception { + Map runtimeArgs = Maps.newHashMap(); + handler = new TestHandler(); + service = NettyHttpService.builder() + .addHttpHandlers(ImmutableList.of(handler)) + .setPort(Networks.getRandomPort()) + .build(); + service.startAndWait(); + InetSocketAddress address = service.getBindAddress(); + serviceURL = "http://" + address.getHostName() + ":" + address.getPort() + "/queue"; + runtimeArgs.put("baseURL", serviceURL); + runtimeArgs.put(Constants.HTTP_PORT, Integer.toString(httpPort)); + flowManager = deployFlow(flowClass, runtimeArgs); + int maxWait = 100; + // Waiting for the Tigon SQL Flow initialization + while ((!flowManager.discover(Constants.HTTP_PORT).iterator().hasNext()) && (maxWait > 0)) { + TimeUnit.SECONDS.sleep(1); + maxWait = maxWait - 1; + } + if (maxWait <= 0) { + throw new TimeoutException("Timeout Error, Tigon SQL flow took too long to initiate"); + } + } + + /** + * This function is used to add additional error checking by allowing the user to assert on the number of expected + * output data packets. + * @param count Number of expected output data packets + * @return {@link java.util.concurrent.CountDownLatch} object that will count down to zero as output data packets are + * received + */ + public static CountDownLatch setExpectedOutputCount(int count) { + latch = new CountDownLatch(count); + return latch; + } + + /** + * This function is used to ingest data packets into multiple interfaces. + * @param inputDataStreams {@link java.util.List} of {@link java.util.Map.Entry} that contains the name of the + * interface as the key and the json encoded data packet as the value. + * + * Input Format : + * [ Map.Entry>, + Map.Entry>, + Map.Entry>, + . + . + Map.Entry>] + */ + public static void ingestData(List>> inputDataStreams) { + if (latch == null) { + throw new RuntimeException("Expected output count not defined. Use setExpectedOutputCount()"); + } else if (service == null) { + throw new RuntimeException("Flow not setup. Use setupFlow()"); + } + final List>> finalInputDataStreams = inputDataStreams; + Thread ingestData = new Thread(new Runnable() { + @Override + public void run() { + HttpClient httpClient = new DefaultHttpClient(); + for (Map.Entry> dataStreamEntry : finalInputDataStreams) { + for (String dataPacket : dataStreamEntry.getValue()) { + try { + // TODO eliminate org.apache.http dependency TIGON-5 + HttpPost httpPost = + new HttpPost("http://localhost:" + httpPort + "/v1/tigon/" + dataStreamEntry.getKey()); + httpPost.addHeader("Content-Type", "application/json"); + httpPost.setEntity(new StringEntity(dataPacket, Charsets.UTF_8)); + EntityUtils.consumeQuietly(httpClient.execute(httpPost).getEntity()); + } catch (Exception e) { + Throwables.propagate(e); + } + } + } + } + }); + ingestionThreads.add(ingestData); + ingestData.start(); + try { + ingestData.join(60000); + } catch (InterruptedException e) { + Throwables.propagate(e); + } + } + + /** + * This function is used to ingest data packets to a single interface. + * @param inputDataStream {@link java.util.List} of {@link java.util.Map.Entry} that contains the name of the + * interface as the key and the json encoded data packet as the value. + * + * Input Format : + * Map.Entry> + */ + public static void ingestData(Map.Entry> inputDataStream) { + List>> inputData = Lists.newArrayList(); + inputData.add(inputDataStream); + ingestData(inputData); + } + + /** + * A HTTP handler that maintains a queue of the generated output objects + */ + public static final class TestHandler extends AbstractHttpHandler { + private static final Logger LOG = LoggerFactory.getLogger(TestHandler.class); + private static Queue queue = Queues.newConcurrentLinkedQueue(); + @Path("/queue/poll") + @GET + public void pollData(HttpRequest request, HttpResponder responder) { + if (queue.size() == 0) { + responder.sendStatus(HttpResponseStatus.NO_CONTENT); + return; + } + String dataPacket = queue.poll(); + responder.sendString(HttpResponseStatus.OK, dataPacket); + } + @Path("/queue") + @POST + public void getPing(HttpRequest request, HttpResponder responder) { + String dataPacket = request.getContent().toString(Charsets.UTF_8); + LOG.info("/ping Got Data {}", dataPacket); + queue.add(dataPacket); + latch.countDown(); + responder.sendStatus(HttpResponseStatus.OK); + } + } + + /** + * This function returns the output data packets generated by the given flow + * @param outputClass {@link java.lang.Class} of the output type object + * @param The output type object class + * @return An instance of the output type object + * @throws IOException + */ + public static T getDataPacket(Class outputClass) throws IOException { + HttpClient httpClient = new DefaultHttpClient(); + HttpGet httpGet = new HttpGet(serviceURL + "/poll"); + HttpResponse response = httpClient.execute(httpGet); + if (response.getStatusLine().getStatusCode() != 200) { + return null; + } + String serializedDataPacket = EntityUtils.toString(response.getEntity(), Charsets.UTF_8); + return GSON.fromJson(serializedDataPacket, outputClass); + } + + @AfterClass + public static void afterClass() { + flowManager.stop(); + service.stopAndWait(); + } +} diff --git a/tigon-unit-test/src/test/java/co/cask/tigon/test/SQLFlowTest.java b/tigon-unit-test/src/test/java/co/cask/tigon/test/SQLFlowTest.java deleted file mode 100644 index 13fe46b3..00000000 --- a/tigon-unit-test/src/test/java/co/cask/tigon/test/SQLFlowTest.java +++ /dev/null @@ -1,310 +0,0 @@ -/* - * Copyright © 2014 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package co.cask.tigon.test; - -import co.cask.http.AbstractHttpHandler; -import co.cask.http.HttpResponder; -import co.cask.http.NettyHttpService; -import co.cask.tigon.api.annotation.ProcessInput; -import co.cask.tigon.api.annotation.RoundRobin; -import co.cask.tigon.api.flow.Flow; -import co.cask.tigon.api.flow.FlowSpecification; -import co.cask.tigon.api.flow.flowlet.AbstractFlowlet; -import co.cask.tigon.api.flow.flowlet.FlowletContext; -import co.cask.tigon.api.flow.flowlet.OutputEmitter; -import co.cask.tigon.sql.conf.Constants; -import co.cask.tigon.sql.flowlet.AbstractInputFlowlet; -import co.cask.tigon.sql.flowlet.GDATFieldType; -import co.cask.tigon.sql.flowlet.GDATSlidingWindowAttribute; -import co.cask.tigon.sql.flowlet.StreamSchema; -import co.cask.tigon.sql.flowlet.annotation.QueryOutput; -import com.google.common.base.Charsets; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; -import com.google.common.collect.Queues; -import com.google.gson.Gson; -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.DefaultHttpClient; -import org.apache.http.util.EntityUtils; -import org.jboss.netty.handler.codec.http.HttpRequest; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; - -/** - * SQLFlowTest - */ -public class SQLFlowTest extends TestBase { - private static final Gson GSON = new Gson(); - private static FlowManager flowManager; - private static Thread ingestData; - static final int MAX_TIMESTAMP = 10; - static CountDownLatch latch; - private static TestHandler handler; - private static NettyHttpService service; - private static String serviceURL; - - @BeforeClass - public static void beforeClass() throws Exception { - latch = new CountDownLatch(MAX_TIMESTAMP); - Map runtimeArgs = Maps.newHashMap(); - //Identifying free port - handler = new TestHandler(); - service = NettyHttpService.builder() - .addHttpHandlers(ImmutableList.of(handler)) - .setPort(getRandomPort()) - .build(); - service.startAndWait(); - InetSocketAddress address = service.getBindAddress(); - serviceURL = "http://" + address.getHostName() + ":" + address.getPort(); - runtimeArgs.put("baseURL", serviceURL); - final int port = getRandomPort(); - int tcpPort = getRandomPort(); - runtimeArgs.put(Constants.HTTP_PORT, Integer.toString(port)); - runtimeArgs.put(Constants.TCP_INGESTION_PORT_PREFIX + "intInput", Integer.toString(tcpPort)); - flowManager = deployFlow(SQLFlow.class, runtimeArgs); - TimeUnit.SECONDS.sleep(60); - ingestData = new Thread(new Runnable() { - @Override - public void run() { - HttpClient httpClient = new DefaultHttpClient(); - for (int i = 1; i <= MAX_TIMESTAMP; i++) { - for (int j = 1; j <= i; j++) { - try { - // TODO eliminate org.apache.http dependency TIGON-5 - HttpPost httpPost = new HttpPost("http://localhost:" + port + "/v1/tigon/intInput"); - JsonObject bodyJson = new JsonObject(); - JsonArray dataArray = new JsonArray(); - dataArray.add(new JsonPrimitive(Integer.toString(i))); - dataArray.add(new JsonPrimitive(Integer.toString(j))); - bodyJson.add("data", dataArray); - StringEntity params = new StringEntity(bodyJson.toString(), Charsets.UTF_8); - httpPost.addHeader("Content-Type", "application/json"); - httpPost.setEntity(params); - EntityUtils.consumeQuietly(httpClient.execute(httpPost).getEntity()); - } catch (Exception e) { - Throwables.propagate(e); - } - } - } - latch.countDown(); - } - }); - } - - private static DataPacket getDataPacket() throws IOException { - HttpClient httpClient = new DefaultHttpClient(); - HttpGet httpGet = new HttpGet(serviceURL + "/poll-data"); - HttpResponse response = httpClient.execute(httpGet); - if (response.getStatusLine().getStatusCode() != 200) { - return null; - } - String serializedDataPacket = EntityUtils.toString(response.getEntity(), Charsets.UTF_8); - return GSON.fromJson(serializedDataPacket, DataPacket.class); - } - - @Test - public void testSQLFlow() throws Exception { - ingestData.start(); - ingestData.join(); - latch.await(60, TimeUnit.SECONDS); - int dataPacketCounter = MAX_TIMESTAMP; - DataPacket dataPacket; - while ((dataPacket = getDataPacket()) != null) { - Assert.assertEquals(evalSum(dataPacket.timestamp), dataPacket.sumValue); - dataPacketCounter = dataPacketCounter - 1; - } - Assert.assertEquals(1, dataPacketCounter); - } - - @AfterClass - public static void afterClass() { - flowManager.stop(); - service.stopAndWait(); - } - - private int evalSum(long val) { - // Sum of N natural numbers - return (int) val * ((int) val + 1) / 2; - } - - - public static final class SQLFlow implements Flow { - @Override - public FlowSpecification configure() { - return FlowSpecification.Builder.with() - .setName("SQLFlow") - .setDescription("Sample Flow") - .withFlowlets() - .add("SQLProcessor", new SQLFlowlet(), 1) - .add("dataSink", new SinkFlowlet(), 3) - .connect() - .from("SQLProcessor").to("dataSink") - .build(); - } - } - - private static final class SQLFlowlet extends AbstractInputFlowlet { - private OutputEmitter dataEmitter; - private final Logger flowletLOG = LoggerFactory.getLogger(SQLFlowlet.class); - - @Override - public void create() { - setName("Summation"); - setDescription("sums up the input value over a timewindow"); - StreamSchema schema = new StreamSchema.Builder() - .addField("timestamp", GDATFieldType.LONG, GDATSlidingWindowAttribute.INCREASING) - .addField("intStream", GDATFieldType.INT) - .build(); - addJSONInput("intInput", schema); - addQuery("sumOut", "SELECT timestamp, SUM(intStream) AS sumValue FROM intInput GROUP BY timestamp"); - } - - @QueryOutput("sumOut") - public void emitData(DataPacket dataPacket) { - flowletLOG.info("Emitting data to next flowlet"); - //Each data packet is forwarded to the next flowlet - dataEmitter.emit(dataPacket); - } - } - - private static final class SinkFlowlet extends AbstractFlowlet { - private final Logger flowletLOG = LoggerFactory.getLogger(SinkFlowlet.class); - private String flowletName; - private String baseURL; - // TODO eliminate org.apache.http dependency TIGON-5 - private HttpClient httpClient; - - - @Override - public void initialize(FlowletContext context) throws Exception { - flowletName = context.getName() + "_" + context.getInstanceId(); - baseURL = context.getRuntimeArguments().get("baseURL"); - httpClient = new DefaultHttpClient(); - } - - @RoundRobin - @ProcessInput - public void process(DataPacket value) throws Exception { - flowletLOG.info("{} got {}", flowletName, value.toString()); - try { - JsonObject bodyJson = new JsonObject(); - bodyJson.addProperty("time", value.getTime()); - bodyJson.addProperty("sum", value.getSum()); - HttpPost httpPost = new HttpPost(baseURL + "/ping"); - StringEntity params = new StringEntity(bodyJson.toString(), Charsets.UTF_8); - httpPost.addHeader("Content-Type", "application/json"); - httpPost.setEntity(params); - EntityUtils.consumeQuietly(httpClient.execute(httpPost).getEntity()); - } catch (Exception e) { - Throwables.propagate(e); - } - } - } - - public static int getRandomPort() { - try { - ServerSocket socket = new ServerSocket(0); - try { - return socket.getLocalPort(); - } finally { - socket.close(); - } - } catch (IOException e) { - return -1; - } - } - - public static final class TestHandler extends AbstractHttpHandler { - private static final Logger LOG = LoggerFactory.getLogger(TestHandler.class); - private static JsonObject requestData; - private static Queue queue = Queues.newConcurrentLinkedQueue(); - - @Path("/poll-data") - @GET - public void getDataPacket(HttpRequest request, HttpResponder responder) { - if (queue.size() == 0) { - responder.sendStatus(HttpResponseStatus.NO_CONTENT); - return; - } - DataPacket dataPacket = queue.poll(); - responder.sendJson(HttpResponseStatus.OK, dataPacket); - } - - - @Path("/ping") - @POST - public void getPing(HttpRequest request, HttpResponder responder) { - try { - requestData = GSON.fromJson(request.getContent().toString(Charsets.UTF_8), JsonObject.class); - } catch (Exception e) { - throw new RuntimeException("Cannot parse JSON data from the HTTP response"); - } - DataPacket dataPacket = new DataPacket(requestData.get("time").getAsLong(), requestData.get("sum").getAsInt()); - LOG.info("/ping Got Data {}", dataPacket.toString()); - queue.add(dataPacket); - latch.countDown(); - responder.sendStatus(HttpResponseStatus.OK); - } - } - - private static class DataPacket { - //Using the same data type and variable name as specified in the query output - long timestamp; - int sumValue; - - public String toString() { - return "timestamp : " + timestamp + "\tsumValue : " + sumValue; - } - - public DataPacket(long timestamp, int sumValue) { - this.timestamp = timestamp; - this.sumValue = sumValue; - } - - public String getTime() { - return Long.toString(timestamp); - } - - public String getSum() { - return Integer.toString(sumValue); - } - } -} diff --git a/tigon-unit-test/src/test/java/co/cask/tigon/test/SQLJoinTest.java b/tigon-unit-test/src/test/java/co/cask/tigon/test/SQLJoinTest.java new file mode 100644 index 00000000..bab9292e --- /dev/null +++ b/tigon-unit-test/src/test/java/co/cask/tigon/test/SQLJoinTest.java @@ -0,0 +1,204 @@ +/** + * Copyright 2012-2014 Continuuity, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + + +package co.cask.tigon.test; + +import co.cask.tigon.api.annotation.ProcessInput; +import co.cask.tigon.api.annotation.RoundRobin; +import co.cask.tigon.api.flow.Flow; +import co.cask.tigon.api.flow.FlowSpecification; +import co.cask.tigon.api.flow.flowlet.AbstractFlowlet; +import co.cask.tigon.api.flow.flowlet.FlowletContext; +import co.cask.tigon.api.flow.flowlet.OutputEmitter; +import co.cask.tigon.sql.flowlet.AbstractInputFlowlet; +import co.cask.tigon.sql.flowlet.GDATFieldType; +import co.cask.tigon.sql.flowlet.GDATSlidingWindowAttribute; +import co.cask.tigon.sql.flowlet.StreamSchema; +import co.cask.tigon.sql.flowlet.annotation.QueryOutput; +import com.google.common.base.Charsets; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.util.EntityUtils; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.AbstractMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * SQLJoinTest + */ +public class SQLJoinTest extends SQLFlowTestBase { + private static final int MAX = 10; + private static CountDownLatch latch; + + /** + * Setup and deploy {@link co.cask.tigon.test.SQLTest.SQLFlow} + * @throws Exception + */ + @BeforeClass + public static void beforeClass() throws Exception { + setupFlow(SQLFlow.class); + latch = setExpectedOutputCount(MAX); + } + + /** + * Run test + * @throws Exception + */ + @Test + public void testSQLFlow() throws Exception { + //Generating input data packets + List nameData = Lists.newArrayList(); + for (int i = 1; i <= MAX; i++) { + JsonObject bodyJson = new JsonObject(); + JsonArray dataArray = new JsonArray(); + dataArray.add(new JsonPrimitive(Integer.toString(i))); + dataArray.add(new JsonPrimitive("NAME" + i)); + bodyJson.add("data", dataArray); + nameData.add(bodyJson.toString()); + } + List ageData = Lists.newArrayList(); + for (int i = 1; i <= MAX; i++) { + JsonObject bodyJson = new JsonObject(); + JsonArray dataArray = new JsonArray(); + dataArray.add(new JsonPrimitive(Integer.toString(i))); + dataArray.add(new JsonPrimitive(i * 10)); + bodyJson.add("data", dataArray); + ageData.add(bodyJson.toString()); + } + List>> dataStreams = Lists.newArrayList(); + dataStreams.add(new AbstractMap.SimpleEntry>("nameInput", nameData)); + dataStreams.add(new AbstractMap.SimpleEntry>("ageInput", ageData)); + ingestData(dataStreams); + latch.await(60, TimeUnit.SECONDS); + int dataPacketCounter = MAX; + DataPacket dataPacket; + while ((dataPacket = getDataPacket(DataPacket.class)) != null) { + Assert.assertEquals(dataPacket.uid * 10, dataPacket.age); + Assert.assertEquals("NAME" + dataPacket.uid, dataPacket.name); + dataPacketCounter = dataPacketCounter - 1; + } + Assert.assertEquals(0, dataPacketCounter); + } + + public static final class SQLFlow implements Flow { + @Override + public FlowSpecification configure() { + return FlowSpecification.Builder.with() + .setName("SQLFlow") + .setDescription("Sample Flow") + .withFlowlets() + .add("SQLProcessor", new SQLFlowlet(), 1) + .add("dataSink", new SinkFlowlet(), 3) + .connect() + .from("SQLProcessor").to("dataSink") + .build(); + } + } + + private static class DataPacket { + int uid; + String name; + int age; + + public String toString() { + return "UID: " + uid + "\tName: " + name + "\tAge: " + age; + } + } + + private static final class SQLFlowlet extends AbstractInputFlowlet { + private OutputEmitter dataEmitter; + private final Logger flowletLOG = LoggerFactory.getLogger(SQLFlowlet.class); + + @Override + public void create() { + setName("Joining"); + setDescription("Join two Streams"); + StreamSchema ageSchema = new StreamSchema.Builder() + .setName("ageData") + .addField("uid", GDATFieldType.INT, GDATSlidingWindowAttribute.INCREASING) + .addField("age", GDATFieldType.INT) + .build(); + StreamSchema nameSchema = new StreamSchema.Builder() + .setName("nameData") + .addField("uid", GDATFieldType.INT, GDATSlidingWindowAttribute.INCREASING) + .addField("name", GDATFieldType.STRING) + .build(); + addJSONInput("ageInput", ageSchema); + addJSONInput("nameInput", nameSchema); + + addQuery("userDetails", "SELECT nI.uid as uid, nI.name as name, aI.age as age INNER_JOIN " + + "FROM nameInput.nameData nI, ageInput.ageData aI WHERE nI.uid = aI.uid"); + } + + @QueryOutput("userDetails") + public void emitData(DataPacket dataPacket) { + flowletLOG.info("Emitting data to next flowlet"); + //Each data packet is forwarded to the next flowlet + dataEmitter.emit(dataPacket); + } + } + + private static final class SinkFlowlet extends AbstractFlowlet { + private final Logger flowletLOG = LoggerFactory.getLogger(SinkFlowlet.class); + private String flowletName; + private String baseURL; + // TODO eliminate org.apache.http dependency TIGON-5 + private HttpClient httpClient; + + + @Override + public void initialize(FlowletContext context) throws Exception { + flowletName = context.getName() + "_" + context.getInstanceId(); + baseURL = context.getRuntimeArguments().get("baseURL"); + httpClient = new DefaultHttpClient(); + } + + @RoundRobin + @ProcessInput + public void process(DataPacket value) throws Exception { + flowletLOG.info("{} got {}", flowletName, value.toString()); + try { + JsonObject bodyJson = new JsonObject(); + bodyJson.addProperty("uid", value.uid); + bodyJson.addProperty("name", value.name); + bodyJson.addProperty("age", value.age); + HttpPost httpPost = new HttpPost(baseURL); + StringEntity params = new StringEntity(bodyJson.toString(), Charsets.UTF_8); + httpPost.addHeader("Content-Type", "application/json"); + httpPost.setEntity(params); + EntityUtils.consumeQuietly(httpClient.execute(httpPost).getEntity()); + } catch (Exception e) { + Throwables.propagate(e); + } + } + } +} diff --git a/tigon-unit-test/src/test/java/co/cask/tigon/test/SQLTest.java b/tigon-unit-test/src/test/java/co/cask/tigon/test/SQLTest.java new file mode 100644 index 00000000..477857f2 --- /dev/null +++ b/tigon-unit-test/src/test/java/co/cask/tigon/test/SQLTest.java @@ -0,0 +1,189 @@ +/* + * Copyright © 2014 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + + +package co.cask.tigon.test; + +import co.cask.tigon.api.annotation.ProcessInput; +import co.cask.tigon.api.annotation.RoundRobin; +import co.cask.tigon.api.flow.Flow; +import co.cask.tigon.api.flow.FlowSpecification; +import co.cask.tigon.api.flow.flowlet.AbstractFlowlet; +import co.cask.tigon.api.flow.flowlet.FlowletContext; +import co.cask.tigon.api.flow.flowlet.OutputEmitter; +import co.cask.tigon.sql.flowlet.AbstractInputFlowlet; +import co.cask.tigon.sql.flowlet.GDATFieldType; +import co.cask.tigon.sql.flowlet.GDATSlidingWindowAttribute; +import co.cask.tigon.sql.flowlet.StreamSchema; +import co.cask.tigon.sql.flowlet.annotation.QueryOutput; +import com.google.common.base.Charsets; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.util.EntityUtils; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.AbstractMap; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * SQLTest + */ +public class SQLTest extends SQLFlowTestBase { + private static final int MAX = 10; + private static CountDownLatch latch; + + /** + * Setup and deploy {@link co.cask.tigon.test.SQLTest.SQLFlow} + * @throws Exception + */ + @BeforeClass + public static void beforeClass() throws Exception { + setupFlow(SQLFlow.class); + latch = setExpectedOutputCount(MAX - 1); + } + + /** + * Run test + * @throws Exception + */ + @Test + public void testSQLFlow() throws Exception { + List inputDataList = Lists.newArrayList(); + for (int i = 1; i <= MAX; i++) { + for (int j = 1; j <= i; j++) { + JsonObject bodyJson = new JsonObject(); + JsonArray dataArray = new JsonArray(); + dataArray.add(new JsonPrimitive(Integer.toString(i))); + dataArray.add(new JsonPrimitive(Integer.toString(j))); + bodyJson.add("data", dataArray); + inputDataList.add(bodyJson.toString()); + } + } + ingestData(new AbstractMap.SimpleEntry>("inputDataStream", inputDataList)); + latch.await(60, TimeUnit.SECONDS); + int dataPacketCounter = MAX; + DataPacket dataPacket; + while ((dataPacket = getDataPacket(DataPacket.class)) != null) { + Assert.assertEquals(evalSum(dataPacket.timestamp), dataPacket.sumValue); + dataPacketCounter = dataPacketCounter - 1; + } + // Expected output count is MAX - 1 as the last timestamp value is not processed + Assert.assertEquals(1, dataPacketCounter); + } + + private static int evalSum(long val) { + // Sum of N natural numbers + return (int) val * ((int) val + 1) / 2; + } + + public static final class SQLFlow implements Flow { + @Override + public FlowSpecification configure() { + return FlowSpecification.Builder.with() + .setName("SQLFlow") + .setDescription("Sample Flow") + .withFlowlets() + .add("SQLProcessor", new SQLFlowlet(), 1) + .add("dataSink", new SinkFlowlet(), 3) + .connect() + .from("SQLProcessor").to("dataSink") + .build(); + } + } + + private static class DataPacket { + //Using the same data type and variable name as specified in the query output + long timestamp; + int sumValue; + + public String toString() { + return "timestamp : " + timestamp + "\tsumValue : " + sumValue; + } + } + + private static final class SQLFlowlet extends AbstractInputFlowlet { + private OutputEmitter dataEmitter; + private final Logger flowletLOG = LoggerFactory.getLogger(SQLFlowlet.class); + + @Override + public void create() { + setName("Summation"); + setDescription("sums up the input value over a timewindow"); + StreamSchema schema = new StreamSchema.Builder() + .setName("intInput") + .addField("timestamp", GDATFieldType.LONG, GDATSlidingWindowAttribute.INCREASING) + .addField("intStream", GDATFieldType.INT) + .build(); + addJSONInput("inputDataStream", schema); + addQuery("sumOut", + "SELECT timestamp, SUM(intStream) AS sumValue FROM inputDataStream.intInput GROUP BY timestamp"); + } + + @QueryOutput("sumOut") + public void emitData(DataPacket dataPacket) { + flowletLOG.info("Emitting data to next flowlet"); + //Each data packet is forwarded to the next flowlet + dataEmitter.emit(dataPacket); + } + } + + private static final class SinkFlowlet extends AbstractFlowlet { + private final Logger flowletLOG = LoggerFactory.getLogger(SinkFlowlet.class); + private String flowletName; + private String baseURL; + // TODO eliminate org.apache.http dependency TIGON-5 + private HttpClient httpClient; + + + @Override + public void initialize(FlowletContext context) throws Exception { + flowletName = context.getName() + "_" + context.getInstanceId(); + baseURL = context.getRuntimeArguments().get("baseURL"); + httpClient = new DefaultHttpClient(); + } + + @RoundRobin + @ProcessInput + public void process(DataPacket value) throws Exception { + flowletLOG.info("{} got {}", flowletName, value.toString()); + try { + JsonObject bodyJson = new JsonObject(); + bodyJson.addProperty("timestamp", value.timestamp); + bodyJson.addProperty("sumValue", value.sumValue); + HttpPost httpPost = new HttpPost(baseURL); + StringEntity params = new StringEntity(bodyJson.toString(), Charsets.UTF_8); + httpPost.addHeader("Content-Type", "application/json"); + httpPost.setEntity(params); + EntityUtils.consumeQuietly(httpClient.execute(httpPost).getEntity()); + } catch (Exception e) { + Throwables.propagate(e); + } + } + } +} diff --git a/tigon-yarn/pom.xml b/tigon-yarn/pom.xml index b90ad37f..94413cf9 100644 --- a/tigon-yarn/pom.xml +++ b/tigon-yarn/pom.xml @@ -20,7 +20,7 @@ tigon co.cask.tigon - 0.2.0 + 0.2.1 4.0.0