Skip to content

Commit

Permalink
KYLIN-3690 New streaming backend implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
allenma authored and shaofengshi committed Mar 24, 2019
1 parent c05ae0c commit 0943599
Show file tree
Hide file tree
Showing 268 changed files with 29,831 additions and 20 deletions.
4 changes: 4 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-storage-hbase</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-storage-stream</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-engine-mr</artifactId>
Expand Down
109 changes: 109 additions & 0 deletions build/bin/kylin.sh
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,115 @@ then
else
quit "Kylin is not running"
fi

# streaming command
elif [ "$1" == "streaming" ]
then
if [ $# -lt 2 ]
then
echo "invalid input args $@"
exit -1
fi
if [ "$2" == "start" ]
then
if [ -f "${KYLIN_HOME}/streaming_receiver_pid" ]
then
PID=`cat $KYLIN_HOME/streaming_receiver_pid`
if ps -p $PID > /dev/null
then
echo "Kylin is running, stop it first"
exit 1
fi
fi
#retrive $hbase_dependency
source ${dir}/find-hbase-dependency.sh
#retrive $KYLIN_EXTRA_START_OPTS
if [ -f "${KYLIN_HOME}/conf/setenv.sh" ]
then source ${KYLIN_HOME}/conf/setenv.sh
fi

mkdir -p ${KYLIN_HOME}/ext
HBASE_CLASSPATH=`hbase classpath`
#echo "hbase class path:"$HBASE_CLASSPATH
STREAM_CLASSPATH=${KYLIN_HOME}/lib/streaming/*:${KYLIN_HOME}/ext/*:${HBASE_CLASSPATH}

# KYLIN_EXTRA_START_OPTS is for customized settings, checkout bin/setenv.sh
${JAVA_HOME}/bin/java -cp $STREAM_CLASSPATH ${KYLIN_EXTRA_START_OPTS} \
-Dlog4j.configuration=stream-receiver-log4j.properties\
-DKYLIN_HOME=${KYLIN_HOME}\
-Dkylin.hbase.dependency=${hbase_dependency} \
org.apache.kylin.stream.server.StreamingReceiver $@ > ${KYLIN_HOME}/logs/streaming_receiver.out 2>&1 & echo $! > ${KYLIN_HOME}/streaming_receiver_pid &
exit 0
elif [ "$2" == "stop" ]
then
if [ ! -f "${KYLIN_HOME}/streaming_receiver_pid" ]
then
echo "streaming is not running, please check"
exit 1
fi
PID=`cat ${KYLIN_HOME}/streaming_receiver_pid`
if [ "$PID" = "" ]
then
echo "streaming is not running, please check"
exit 1
else
echo "stopping streaming:$PID"
WAIT_TIME=2
LOOP_COUNTER=20
if ps -p $PID > /dev/null
then
echo "Stopping Kylin: $PID"
kill $PID

for ((i=0; i<$LOOP_COUNTER; i++))
do
# wait to process stopped
sleep $WAIT_TIME
if ps -p $PID > /dev/null ; then
echo "Stopping in progress. Will check after $WAIT_TIME secs again..."
continue;
else
break;
fi
done

# if process is still around, use kill -9
if ps -p $PID > /dev/null
then
echo "Initial kill failed, getting serious now..."
kill -9 $PID
sleep 1 #give kill -9 sometime to "kill"
if ps -p $PID > /dev/null
then
quit "Warning, even kill -9 failed, giving up! Sorry..."
fi
fi

# process is killed , remove pid file
rm -rf ${KYLIN_HOME}/streaming_receiver_pid
echo "Kylin with pid ${PID} has been stopped."
exit 0
else
quit "Kylin with pid ${PID} is not running"
fi
fi
elif [[ "$2" = org.apache.kylin.* ]]
then
source ${KYLIN_HOME}/conf/setenv.sh
HBASE_CLASSPATH=`hbase classpath`
#echo "hbase class path:"$HBASE_CLASSPATH
STREAM_CLASSPATH=${KYLIN_HOME}/lib/streaming/*:${KYLIN_HOME}/ext/*:${HBASE_CLASSPATH}

shift
# KYLIN_EXTRA_START_OPTS is for customized settings, checkout bin/setenv.sh
${JAVA_HOME}/bin/java -cp $STREAM_CLASSPATH ${KYLIN_EXTRA_START_OPTS} \
-Dlog4j.configuration=stream-receiver-log4j.properties\
-DKYLIN_HOME=${KYLIN_HOME}\
-Dkylin.hbase.dependency=${hbase_dependency} \
"$@"
exit 0
fi

elif [ "$1" = "version" ]
then
retrieveDependency
Expand Down
3 changes: 3 additions & 0 deletions build/script/prepare-libs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@ echo "version ${version}"
echo "copy lib file"
rm -rf build/lib build/tool
mkdir build/lib build/tool
mkdir build/lib/streaming
cp assembly/target/kylin-assembly-${version}-job.jar build/lib/kylin-job-${version}.jar
cp storage-hbase/target/kylin-storage-hbase-${version}-coprocessor.jar build/lib/kylin-coprocessor-${version}.jar
cp jdbc/target/kylin-jdbc-${version}.jar build/lib/kylin-jdbc-${version}.jar
cp tool-assembly/target/kylin-tool-assembly-${version}-assembly.jar build/tool/kylin-tool-${version}.jar
cp datasource-sdk/target/kylin-datasource-sdk-${version}-lib.jar build/lib/kylin-datasource-sdk-${version}.jar
cp stream-receiver/target/kylin-stream-receiver-${version}-all.jar build/lib/streaming/kylin-stream-receiver-${version}-all.jar

# Copied file becomes 000 for some env (e.g. my Cygwin)
chmod 644 build/lib/kylin-job-${version}.jar
chmod 644 build/lib/kylin-coprocessor-${version}.jar
chmod 644 build/lib/kylin-jdbc-${version}.jar
chmod 644 build/tool/kylin-tool-${version}.jar
chmod 644 build/lib/kylin-datasource-sdk-${version}.jar
chmod 644 build/lib/streaming/kylin-stream-receiver-${version}-all.jar
133 changes: 133 additions & 0 deletions core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,8 @@ public Map<Integer, String> getSourceEngines() {
r.put(1, "org.apache.kylin.source.kafka.KafkaSource");
r.put(8, "org.apache.kylin.source.jdbc.JdbcSource");
r.put(16, "org.apache.kylin.source.jdbc.extensible.JdbcSource");
r.put(20, "org.apache.kylin.stream.source.kafka.KafkaBatchSourceAdaptor");
r.put(21, "org.apache.kylin.stream.source.kafka.KafkaBatchSourceAdaptor");
r.putAll(convertKeyToInteger(getPropertiesByPrefix("kylin.source.provider.")));
return r;
}
Expand Down Expand Up @@ -1031,6 +1033,7 @@ public Map<Integer, String> getStorageEngines() {
r.put(0, "org.apache.kylin.storage.hbase.HBaseStorage");
r.put(1, "org.apache.kylin.storage.hybrid.HybridStorage");
r.put(2, "org.apache.kylin.storage.hbase.HBaseStorage");
r.put(3, "org.apache.kylin.storage.stream.StreamStorage");
r.putAll(convertKeyToInteger(getPropertiesByPrefix("kylin.storage.provider.")));
return r;
}
Expand Down Expand Up @@ -1984,5 +1987,135 @@ public String getJdbcSourceAdaptor() {

public boolean isLimitPushDownEnabled() {
return Boolean.parseBoolean(getOptional("kylin.storage.limit-push-down-enabled", TRUE));

// ============================================================================
// streaming
// ============================================================================
public String getStreamingStoreClass() {
return getOptional("kylin.stream.store.class", "org.apache.kylin.stream.core.storage.columnar.ColumnarSegmentStore");
}

public String getStreamingBasicCuboidJobDFSBlockSize() {
return getOptional("kylin.stream.job.dfs.block.size", String.valueOf(16 * 1024 * 1024));
}

public String getStreamingIndexPath() {
return getOptional("kylin.stream.index.path", "stream_index");
}

public int getStreamingCubeConsumerTasksNum() {
return Integer.parseInt(getOptional("kylin.stream.cube-num-of-consumer-tasks", "3"));
}

public int getStreamingCubeWindowInSecs() {
return Integer.parseInt(getOptional("kylin.stream.cube.window", "3600"));
}

public int getStreamingCubeDurationInSecs() {
return Integer.parseInt(getOptional("kylin.stream.cube.duration", "7200"));
}

public int getStreamingCubeMaxDurationInSecs() {
return Integer.parseInt(getOptional("kylin.stream.cube.duration.max", "43200"));
}

public int getStreamingCheckPointFileMaxNum() {
return Integer.parseInt(getOptional("kylin.stream.checkpoint.file.max.num", "5"));
}

public int getStreamingCheckPointIntervalsInSecs() {
return Integer.parseInt(getOptional("kylin.stream.index.checkpoint.intervals", "300"));
}

public int getStreamingIndexMaxRows() {
return Integer.parseInt(getOptional("kylin.stream.index.maxrows", "50000"));
}

public int getStreamingMaxImmutableSegments() {
return Integer.parseInt(getOptional("kylin.stream.immutable.segments.max.num", "100"));
}

public boolean isStreamingConsumeFromLatestOffsets() {
return Boolean.parseBoolean(getOptional("kylin.stream.consume.offsets.latest", "true"));
}

public String getStreamingNode() {
return getOptional("kylin.stream.node", null);
}

public Map<String, String> getStreamingNodeProperties() {
return getPropertiesByPrefix("kylin.stream.node");
}

public String getStreamingMetadataStoreType() {
return getOptional("kylin.stream.metadata.store.type", "zk");
}

public String getStreamingCoordinateZK() {
return getOptional("kylin.stream.zookeeper", null);
}

public String getStreamingSegmentRetentionPolicy() {
return getOptional("kylin.stream.segment.retention.policy", "fullBuild");
}

public int getStreamingReceiverHttpMaxThreads() {
return Integer.parseInt(getOptional("kylin.stream.receiver.http.max.threads", "200"));
}

public int getStreamingReceiverHttpMinThreads() {
return Integer.parseInt(getOptional("kylin.stream.receiver.http.min.threads", "10"));
}

public int getStreamingReceiverQueryCoreThreads() {
return Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", "50"));
}

public int getStreamingReceiverQueryMaxThreads() {
return Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", "200"));
}

public int getStreamingReceiverUseThreadsPerQuery() {
return Integer.parseInt(getOptional("kylin.stream.receiver.use-threads-per-query", "8"));
}

public int getStreamingRPCHttpConnTimeout() {
return Integer.parseInt(getOptional("kylin.stream.rpc.http.connect.timeout", "10000"));
}

public int getStreamingRPCHttpReadTimeout() {
return Integer.parseInt(getOptional("kylin.stream.rpc.http.read.timeout", "60000"));
}

public boolean isStreamingBuildAdditionalCuboids() {
return Boolean.parseBoolean(getOptional("kylin.stream.build.additional.cuboids", "false"));
}

public Map<String, String> getStreamingSegmentRetentionPolicyProperties(String policyName) {
return getPropertiesByPrefix("kylin.stream.segment.retention.policy." + policyName + ".");
}

public int getStreamingMaxFragmentsInSegment() {
return Integer.parseInt(getOptional("kylin.stream.segment-max-fragments", "50"));
}

public int getStreamingMinFragmentsInSegment() {
return Integer.parseInt(getOptional("kylin.stream.segment-min-fragments", "15"));
}

public int getStreamingMaxFragmentSizeInMb() {
return Integer.parseInt(getOptional("kylin.stream.max-fragment-size-mb", "300"));
}

public boolean isStreamingFragmentsAutoMergeEnabled() {
return Boolean.parseBoolean(getOptional("kylin.stream.fragments-auto-merge-enable", "true"));
}

public boolean isStreamingConcurrentScanEnabled() {
return Boolean.parseBoolean(getOptional("kylin.stream.segment.concurrent.scan", "false"));
}

public String getLocalStorageImpl() {
return getOptional("kylin.stream.settled.storage", null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private KylinConfigExt(KylinConfigExt ext, Map<String, String> overrides) {
this.overrides = BCC.check(overrides);
}

protected String getOptional(String prop, String dft) {
public String getOptional(String prop, String dft) {
String value = overrides.get(prop);
if (value != null)
return StrSubstitutor.replace(value, System.getenv());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ public static boolean getHtraceEnabled() {
return getBoolean(DEBUG_TOGGLE_HTRACE_ENABLED);
}

public static boolean isStreamingProfileEnable() {
return getBoolean(DEBUG_TOGGLE_STREAMING_DETAIL_PROFILE);
}

public static int getQueryTimeout() {
String v = getString(DEBUG_TOGGLE_QUERY_TIMEOUT);
if (v == null)
Expand Down Expand Up @@ -369,4 +373,14 @@ public static Properties getJdbcDriverClientCalciteProps() {
* extra calcite props from jdbc client
*/
public static final String JDBC_CLIENT_CALCITE_PROPS = "JDBC_CLIENT_CALCITE_PROPS";

/**
* set DEBUG_TOGGLE_STREAMING_PROFILE="true" to profile streaming query
*
example:(put it into request body)
"backdoorToggles": {
"DEBUG_TOGGLE_STREAMING_DETAIL_PROFILE": "true"
}
*/
public final static String DEBUG_TOGGLE_STREAMING_DETAIL_PROFILE = "DEBUG_TOGGLE_STREAMING_DETAIL_PROFILE";
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ abstract public class ResourceStore {
public static final String EXECUTE_RESOURCE_ROOT = "/execute";
public static final String EXECUTE_OUTPUT_RESOURCE_ROOT = "/execute_output";
public static final String STREAMING_RESOURCE_ROOT = "/streaming";
public static final String STREAMING_V2_RESOURCE_ROOT = "/streaming_v2";
public static final String KAFKA_RESOURCE_ROOT = "/kafka";
public static final String STREAMING_OUTPUT_RESOURCE_ROOT = "/streaming_output";
public static final String CUBE_STATISTICS_ROOT = "/cube_statistics";
Expand Down
Loading

0 comments on commit 0943599

Please sign in to comment.