Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add the mqtt source and sink, fix the udf issue and the window size problem #4

Merged
merged 1 commit into from
Jan 4, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/SUMMARY.md
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@
* [创建metaq源表](stream_source/metaq/README.md)
* [创建RocketMQ源表](stream_source/rocketmq/README.md)
* [创建文件源表](stream_source/file/README.md)
* [创建Mqtt源表](stream_source/mqtt/README.md)
* [创建自定义源表](stream_source/custom/README.md)
* [创建结果表](stream_sink/README.md)
* [创建metaq结果表](stream_sink/metaq/README.md)
@@ -15,6 +16,7 @@
* [创建打印结果表](stream_sink/print/README.md)
* [创建DB结果表](stream_sink/db/README.md)
* [创建ES结果表](stream_sink/es/README.md)
* [创建Mqtt结果表](stream_sink/mqtt/README.md)
* [创建自定义结果表](stream_sink/custom/README.md)
* [创建维表](stream_dim/README.md)
* [创建mysql维表](stream_dim/mysql/README.md)
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package com.alibaba.rsqldb.clients.sql;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import com.alibaba.rsqldb.parser.entity.SqlTask;
import org.apache.rocketmq.streams.client.strategy.Strategy;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
import org.apache.rocketmq.streams.configurable.ConfigurableComponent;

import com.alibaba.rsqldb.parser.entity.SqlTask;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
* can execute sql directly can submit sql to server support sql assemble

This file was deleted.

Original file line number Diff line number Diff line change
@@ -16,13 +16,14 @@
*/
package com.alibaba.rsqldb.clients.strategy;

import java.util.Properties;
import com.alibaba.rsqldb.parser.parser.builder.BlinkUDFScan;
import org.apache.rocketmq.streams.client.strategy.Strategy;
import org.apache.rocketmq.streams.common.classloader.IsolationClassLoader;
import org.apache.rocketmq.streams.common.component.AbstractComponent;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
import org.apache.rocketmq.streams.script.ScriptComponent;
import com.alibaba.rsqldb.parser.parser.builder.BlinkUDFScan;

import java.util.Properties;

public class SQLStrategy implements Strategy {

Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@
package com.alibaba.rsqldb.clients.sql;

import com.alibaba.rsqldb.parser.entity.SqlTask;

import org.apache.rocketmq.streams.common.optimization.fingerprint.FingerprintCache;
import org.apache.rocketmq.streams.common.topology.ChainPipeline;
import org.apache.rocketmq.streams.common.utils.FileUtil;

This file was deleted.

Original file line number Diff line number Diff line change
@@ -16,12 +16,11 @@
*/
package com.alibaba.rsqldb.parser.builder;

import java.util.Map;

import com.alibaba.rsqldb.parser.entity.SqlTask;

import org.apache.rocketmq.streams.configurable.ConfigurableComponent;

import java.util.Map;

/**
* 同源,相同数据源的sql会动态装配在一起 可以通过getInsertSql 插入数据库完成任务发布,会自动和正在运行的数据源完成装配
*/

This file was deleted.

Original file line number Diff line number Diff line change
@@ -16,6 +16,19 @@
*/
package com.alibaba.rsqldb.parser.builder;

import com.alibaba.rsqldb.parser.parser.ISqlParser;
import com.alibaba.rsqldb.parser.parser.SQLNodeParserFactory;
import com.alibaba.rsqldb.parser.parser.SQLParserContext;
import com.alibaba.rsqldb.parser.parser.SQLTree;
import com.alibaba.rsqldb.parser.parser.builder.AbstractSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.CreateSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.FunctionSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.ISQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.InsertSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.NotSupportSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.SQLCreateTables;
import com.alibaba.rsqldb.parser.parser.builder.ViewSQLBuilder;
import com.alibaba.rsqldb.parser.parser.sqlnode.IBuilderCreator;
import org.apache.calcite.sql.SqlNode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,14 +43,12 @@
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.configurable.ConfigurableComponent;

import com.alibaba.rsqldb.parser.parser.ISqlParser;
import com.alibaba.rsqldb.parser.parser.SQLNodeParserFactory;
import com.alibaba.rsqldb.parser.parser.SQLParserContext;
import com.alibaba.rsqldb.parser.parser.SQLTree;
import com.alibaba.rsqldb.parser.parser.builder.*;
import com.alibaba.rsqldb.parser.parser.sqlnode.IBuilderCreator;

import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class SQLTreeBuilder {

Original file line number Diff line number Diff line change
@@ -16,12 +16,8 @@
*/
package com.alibaba.rsqldb.parser.entity;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;

import com.alibaba.rsqldb.parser.builder.SQLTreeBuilder;
import com.alibaba.rsqldb.parser.parser.builder.BlinkUDFScan;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable;
import org.apache.rocketmq.streams.common.configurable.IConfigurable;
@@ -31,8 +27,9 @@
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.configurable.ConfigurableComponent;

import com.alibaba.rsqldb.parser.builder.SQLTreeBuilder;
import com.alibaba.rsqldb.parser.parser.builder.BlinkUDFScan;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

public class SqlTask {

Original file line number Diff line number Diff line change
@@ -16,30 +16,24 @@
*/
package com.alibaba.rsqldb.parser.parser;

import com.alibaba.rsqldb.parser.parser.builder.FunctionSQLBuilder;
import com.alibaba.rsqldb.parser.parser.expression.BlinkRuleV2Parser;

import java.lang.reflect.Modifier;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.collections.map.HashedMap;
import org.apache.rocketmq.streams.common.model.NameCreator;
import org.apache.rocketmq.streams.common.calssscaner.AbstractScan;
import org.apache.rocketmq.streams.common.utils.ReflectUtil;

import com.alibaba.rsqldb.parser.parser.builder.AbstractSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.FunctionSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.ScriptParseResult;
import com.alibaba.rsqldb.parser.parser.sqlnode.IBuilderCreator;

import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;
import org.apache.commons.collections.map.HashedMap;
import org.apache.rocketmq.streams.common.calssscaner.AbstractScan;
import org.apache.rocketmq.streams.common.model.NameCreator;
import org.apache.rocketmq.streams.common.utils.ReflectUtil;

import java.lang.reflect.Modifier;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
* 可以通过扩展来增加sql解析能力,主体框架不变化
Original file line number Diff line number Diff line change
@@ -16,15 +16,9 @@
*/
package com.alibaba.rsqldb.parser.parser;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.alibaba.rsqldb.parser.parser.builder.AbstractSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.CreateSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.FunctionSQLBuilder;

import org.apache.rocketmq.streams.common.configurable.IConfigurable;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
import org.apache.rocketmq.streams.common.topology.ChainPipeline;
@@ -34,6 +28,11 @@
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.PrintUtil;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class SQLTree {

protected String namespace;
Original file line number Diff line number Diff line change
@@ -16,8 +16,8 @@
*/
package com.alibaba.rsqldb.parser.parser.builder;

import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
import org.apache.calcite.sql.SqlNode;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;

import java.util.ArrayList;
import java.util.HashMap;
Original file line number Diff line number Diff line change
@@ -16,26 +16,10 @@
*/
package com.alibaba.rsqldb.parser.parser.builder;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import com.alibaba.rsqldb.udf.FunctionUDFScript;
import com.alibaba.rsqldb.udf.udaf.BlinkUDAFScript;
import com.alibaba.rsqldb.udf.udf.BlinkUDFScript;
import com.alibaba.rsqldb.udf.udtf.BlinkUDTFScript;

import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
@@ -50,6 +34,21 @@
import org.apache.rocketmq.streams.script.service.udf.UDFScript;
import sun.misc.JarFilter;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* 支持blink udf的扫描,指定扫描路径完成udf函数扫描,会把jar包中所有udf扫描出来进行处处,目标把blink udf转化成dipper函数
*/
@@ -104,6 +103,7 @@ public static void main(String[] args) {
@Deprecated
public void scan(String jarFilePath, String classname, String functionName) {
scanInnerBlinkUDF();

String localJarFielPath = ComponentCreator.getProperties().getProperty(BLINK_UDF_JAR_PATH);
if (localJarFielPath == null || "".equalsIgnoreCase(localJarFielPath)) {
localJarFielPath = "./udflib";
@@ -130,9 +130,13 @@ public void scanInnerBlinkUDF() {
scan(null, "com.aliyun.yundun.dipper.sql.udf");
scan(null, "com.aliyun.isec.seraph.udtf");
scan(null, "com.lyra.udf.ext");
scan(null, "org.apache.rocketmq.streams.script.function.impl.flatmap");
}
}




/**
* 扫描某个目录下jar包的包名
*
@@ -233,14 +237,15 @@ public void registerJarUDF(String dir, String className, String methodName) {
@Override
protected void doProcessor(Class clazz, String functionName) {
try {
if (ScalarFunction.class.isAssignableFrom(clazz) || TableFunction.class.isAssignableFrom(clazz) || AggregateFunction.class.isAssignableFrom(clazz)) {
String blinkSuperClassName = getBlinkSuperClassName(clazz);
if (!"".equalsIgnoreCase(blinkSuperClassName)) {
if (notSupportUDF.contains(clazz.getSimpleName())) {
return;
}
UDFScript script = null;
if (TableFunction.class.isAssignableFrom(clazz)) {
if (TableFunction.class.isAssignableFrom(clazz) || TableFunction.class.getSimpleName().equalsIgnoreCase(blinkSuperClassName)) {
script = new BlinkUDTFScript();
} else if (AggregateFunction.class.isAssignableFrom(clazz)) {
} else if (AggregateFunction.class.isAssignableFrom(clazz) || AggregateFunction.class.getSimpleName().equalsIgnoreCase(blinkSuperClassName)) {
script = new BlinkUDAFScript();
} else {
script = new BlinkUDFScript();
@@ -263,6 +268,19 @@ protected void doProcessor(Class clazz, String functionName) {
}
}

private String getBlinkSuperClassName(Class clazz) {
if (Object.class.getSimpleName().equalsIgnoreCase(clazz.getSimpleName())) {
return "";
}
if (ScalarFunction.class.getSimpleName().equalsIgnoreCase(clazz.getSimpleName()) ||
TableFunction.class.getSimpleName().equalsIgnoreCase(clazz.getSimpleName()) ||
AggregateFunction.class.getSimpleName().equalsIgnoreCase(clazz.getSimpleName())) {
return clazz.getSimpleName();
} else {
return this.getBlinkSuperClassName(clazz.getSuperclass());
}
}

/**
* 将带有@FunctionMethod注解的方法注册为udfscript
*
@@ -293,8 +311,7 @@ public void registerAnnotationFunction(Class clazz) {
}

/**
* 根据class和函数名字注册udf,如果方法列表中包含eval方法,则将functionname全部与eval方法绑定并注册UDFScript
* 如果class中不包含eval方法,则将functionname与对应的方法名进行绑定并注册
* 根据class和函数名字注册udf,如果方法列表中包含eval方法,则将functionname全部与eval方法绑定并注册UDFScript 如果class中不包含eval方法,则将functionname与对应的方法名进行绑定并注册
*
* @param clazz
* @param functionName
@@ -347,6 +364,7 @@ private List<Method> getMethodList(Class clazz) {

/**
* 提取class中方法名为eval的函数列表
*
* @param clazz
* @return
*/
Original file line number Diff line number Diff line change
@@ -68,7 +68,10 @@ public void build() {
return;
}
this.source = createSource();
this.source.setGroupName(getPipelineBuilder().getPipelineName());
if(StringUtil.isEmpty(this.source.getGroupName())){
this.source.setGroupName(StringUtil.getUUID());
}

getPipelineBuilder().setSource(source);
getPipelineBuilder().setChannelMetaData(metaData);
if(this.getScripts()!=null&&this.getScripts().size()>0){
Original file line number Diff line number Diff line change
@@ -16,15 +16,15 @@
*/
package com.alibaba.rsqldb.parser.parser.builder;

import java.util.HashSet;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.model.NameCreator;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.script.service.udf.UDFScript;

import java.util.HashSet;
import java.util.Set;

/**
* UDX's SQL Builder
*/
@@ -42,16 +42,19 @@ public void build() {
if (blinkUDFScript == null) {
blinkUDFScan.scan(className, null, functionName);
blinkUDFScript = blinkUDFScan.getScript(className, functionName);
if (blinkUDFScript == null) {
blinkUDFScript = blinkUDFScan.getScript(className, null);
}
if (blinkUDFScript == null) {
LOG.error("can not find udf, the udf is " + className);
return;
}
}
// blinkUDFScan.scan(null);
blinkUDFScript.setFunctionName(functionName);
// blinkUDFScan.scan(null);
blinkUDFScript.setFunctionName(functionName);
blinkUDFScript.setNameSpace(getPipelineBuilder().getPipelineNameSpace());
String name = MapKeyUtil.createKey(getPipelineBuilder().getPipelineName(),
NameCreator.createNewName(functionName));
String name = MapKeyUtil.createKey(getPipelineBuilder().getPipelineName(), NameCreator.createNewName(functionName));
blinkUDFScript.setConfigureName(name);
getPipelineBuilder().addConfigurables(blinkUDFScript);
}
Original file line number Diff line number Diff line change
@@ -16,10 +16,10 @@
*/
package com.alibaba.rsqldb.parser.parser.builder;

import java.util.Set;

import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;

import java.util.Set;

/**
* 把一条独立sql对应的描述信息保存下来,并能够builder成dipper的pipeline的节点
*/
Original file line number Diff line number Diff line change
@@ -16,21 +16,16 @@
*/
package com.alibaba.rsqldb.parser.parser.builder;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.configure.StreamsConfigure;
import org.apache.rocketmq.streams.common.topology.stages.OutputChainStage;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.utils.ContantsUtil;
import org.apache.rocketmq.streams.common.utils.PrintUtil;
import com.alibaba.rsqldb.parser.util.ColumnUtil;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.rocketmq.streams.script.operator.impl.ScriptOperator;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class InsertSQLBuilder extends AbstractSQLBuilder {

protected AbstractSQLBuilder builder;
Original file line number Diff line number Diff line change
@@ -16,18 +16,18 @@
*/
package com.alibaba.rsqldb.parser.parser.builder;

import org.apache.rocketmq.streams.script.operator.expression.ScriptExpression;
import org.apache.rocketmq.streams.script.operator.expression.ScriptParameter;
import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
import org.apache.rocketmq.streams.script.service.IScriptExpression;
import org.apache.rocketmq.streams.script.service.IScriptParamter;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
import org.apache.rocketmq.streams.script.service.IScriptExpression;
import org.apache.rocketmq.streams.script.service.IScriptParamter;
import org.apache.rocketmq.streams.script.operator.expression.ScriptExpression;
import org.apache.rocketmq.streams.script.operator.expression.ScriptParameter;

public class JoinConditionSQLBuilder extends SelectSQLBuilder {
protected Set<String> dimFieldNames;
protected String dimAsAlias;
Original file line number Diff line number Diff line change
@@ -16,6 +16,9 @@
*/
package com.alibaba.rsqldb.parser.parser.builder;

import com.alibaba.rsqldb.parser.parser.result.BuilderParseResult;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.NotSupportParseResult;
import org.apache.calcite.sql.SqlNode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,13 +35,15 @@
import org.apache.rocketmq.streams.filter.operator.expression.Expression;
import org.apache.rocketmq.streams.filter.operator.expression.RelationExpression;
import org.apache.rocketmq.streams.script.operator.impl.ScriptOperator;
import com.alibaba.rsqldb.parser.parser.result.BuilderParseResult;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.NotSupportParseResult;
import org.apache.rocketmq.streams.window.builder.WindowBuilder;
import org.apache.rocketmq.streams.window.operator.join.JoinWindow;

import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

/**
@@ -60,8 +65,7 @@ public class JoinSQLBuilder extends SelectSQLBuilder {

protected String rightPiplineName;//默认是空,在双流join场景,左右流都可能填充这个值


protected boolean needWhereToCondition=false;//need where as onCondition
protected boolean needWhereToCondition = false;//need where as onCondition

@Override
public void build() {
@@ -97,8 +101,7 @@ public void build() {
}
getPipelineBuilder().addChainStage(new ScriptOperator(stringBuilder.toString()));
}
if (right != null && BuilderParseResult.class.isInstance(right) && NotSupportParseResult.class.isInstance(right)
== false) {
if (right != null && BuilderParseResult.class.isInstance(right) && NotSupportParseResult.class.isInstance(right) == false) {
BuilderParseResult result = (BuilderParseResult)right;
AbstractSQLBuilder builder = result.getBuilder();
builder.setPipelineBuilder(pipelineBuilder);
@@ -108,8 +111,8 @@ public void build() {
if (SnapshotBuilder.class.isInstance(builder)) {
SnapshotBuilder snapshotBuilder = (SnapshotBuilder)builder;
//snapshotBuilder.setExpression(onCondition);
snapshotBuilder.buildDimCondition(conditionSQLNode,joinType,onCondition);
}else {
snapshotBuilder.buildDimCondition(conditionSQLNode, joinType, onCondition);
} else {
builder.buildSQL();
}

@@ -202,9 +205,9 @@ public void addConfigurables(PipelineBuilder pipelineBuilder) {
public ChainStage createStageChain(PipelineBuilder pipelineBuilder) {
JoinChainStage joinChainStage = new JoinChainStage();
joinChainStage.setWindow(joinWindow);
joinChainStage.setLeftPipline(leftPipelineBuilder.getPipeline());
joinChainStage.setRightPipline(rightBuilder.getPipeline());
joinChainStage.setRigthDependentTableName(((BuilderParseResult)right).getBuilder().getTableName());
joinChainStage.setLeftPipeline(leftPipelineBuilder.getPipeline());
joinChainStage.setRightPipeline(rightBuilder.getPipeline());
joinChainStage.setRightDependentTableName(((BuilderParseResult)right).getBuilder().getTableName());
return joinChainStage;
}

@@ -229,13 +232,13 @@ protected boolean isRightBranch(String parentName) {
if (this.rootTableNames.size() <= 1) {
return false;
}
BuilderParseResult rightResult = (BuilderParseResult) getRight();
if( rightResult.getBuilder().getTableName().equals(parentName)){
BuilderParseResult rightResult = (BuilderParseResult)getRight();
if (rightResult.getBuilder().getTableName().equals(parentName)) {
return true;
}
return false;
// BuilderParseResult rightResult = (BuilderParseResult)getRight();
// return rightResult.getBuilder().getTableName().equals(parentName);
// BuilderParseResult rightResult = (BuilderParseResult)getRight();
// return rightResult.getBuilder().getTableName().equals(parentName);
}

/**
@@ -332,8 +335,7 @@ public boolean isJoin() {
* @return
*/
private AbstractSQLBuilder getJoinBuilder(IParseResult result) {
if (result != null && BuilderParseResult.class.isInstance(result) && NotSupportParseResult.class.isInstance(result)
== false) {
if (result != null && BuilderParseResult.class.isInstance(result) && NotSupportParseResult.class.isInstance(result) == false) {
BuilderParseResult parseResult = (BuilderParseResult)result;
return parseResult.getBuilder();
} else {
@@ -441,12 +443,12 @@ public String getFieldName(String fieldName) {
asName = fieldName.substring(0, index);
fieldName = fieldName.substring(index + 1);
}
String tableAsName=null;
String tableAsName = null;
if (BuilderParseResult.class.isInstance(getLeft())) {
BuilderParseResult builderParseResult = (BuilderParseResult)getLeft();
tableAsName=builderParseResult.getBuilder().getAsName();
if(asName!=null&&tableAsName==null){
tableAsName=builderParseResult.getBuilder().getTableName();
tableAsName = builderParseResult.getBuilder().getAsName();
if (asName != null && tableAsName == null) {
tableAsName = builderParseResult.getBuilder().getTableName();
}
if ((asName != null && asName.equals(tableAsName)) | StringUtil.isEmpty(asName)) {
if (SelectSQLBuilder.class.isInstance(builderParseResult.getBuilder())) {
@@ -460,9 +462,9 @@ public String getFieldName(String fieldName) {
}
if (BuilderParseResult.class.isInstance(getRight())) {
BuilderParseResult builderParseResult = (BuilderParseResult)getRight();
tableAsName=builderParseResult.getBuilder().getAsName();
if(asName!=null&&tableAsName==null){
tableAsName=builderParseResult.getBuilder().getTableName();
tableAsName = builderParseResult.getBuilder().getAsName();
if (asName != null && tableAsName == null) {
tableAsName = builderParseResult.getBuilder().getTableName();
}
if ((asName != null && asName.equals(tableAsName)) | StringUtil.isEmpty(asName)) {
if (SelectSQLBuilder.class.isInstance(builderParseResult.getBuilder())) {
Original file line number Diff line number Diff line change
@@ -16,15 +16,15 @@
*/
package com.alibaba.rsqldb.parser.parser.builder;

import com.alibaba.rsqldb.parser.parser.result.VarParseResult;
import org.apache.calcite.sql.SqlNode;
import org.apache.rocketmq.streams.script.function.model.FunctionType;
import org.apache.rocketmq.streams.script.operator.impl.ScriptOperator;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.rocketmq.streams.script.function.model.FunctionType;
import org.apache.rocketmq.streams.script.operator.impl.ScriptOperator;
import com.alibaba.rsqldb.parser.parser.result.VarParseResult;
import org.apache.calcite.sql.SqlNode;

public class LateralTableBuilder extends SelectSQLBuilder {

/**
Original file line number Diff line number Diff line change
@@ -16,25 +16,23 @@
*/
package com.alibaba.rsqldb.parser.parser.builder;

import org.apache.rocketmq.streams.common.model.NameCreator;
import org.apache.rocketmq.streams.filter.operator.FilterOperator;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.PrintUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.script.operator.impl.ScriptOperator;
import com.alibaba.rsqldb.parser.parser.SQLParserContext;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.ScriptParseResult;
import com.alibaba.rsqldb.parser.parser.sqlnode.SelectParser;

import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlSelect;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.model.NameCreator;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.PrintUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.filter.operator.FilterOperator;
import org.apache.rocketmq.streams.script.operator.impl.ScriptOperator;

import java.util.ArrayList;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -49,38 +47,60 @@
public class SelectSQLBuilder extends AbstractSQLBuilder<AbstractSQLBuilder> {
private static final Log LOG = LogFactory.getLog(SelectParser.class);

protected String expression;//where部分逻辑判断的转换成表达式字符串(varName,functionName,value)&((varName,functionName,value)|(varName,functionName,int,value))

protected Set<String> allFieldNames = null;//如果做过字段解析,把所有的字段放到这里面,在做sub select 取值时,直接取

/**
* 在sql select部分出现的字段
* where部分逻辑判断的转换成表达式字符串(varName,functionName,value)&((varName,functionName,value)|(varName,functionName,int,value))
*/
protected String expression;
/**
* 如果做过字段解析,把所有的字段放到这里面,在做sub select 取值时,直接取
*/
protected Set<String> allFieldNames = null;
/**
* 在sql select部分出现的字段,每个字段和对应解析器,在必要的时候完成解析
*/
protected Map<String, IParseResult> fieldName2ParseResult = new HashMap<>();//每个字段和对应解析器,在必要的时候完成解析
protected List<String> fieldNamesOrderByDeclare=new ArrayList<>();//field name order by sql select,use in insert table(field,field) select
protected int parseStage = 0;//0:select,1:from,2:where
protected Map<String, IParseResult> fieldName2ParseResult = new HashMap<>();
/**
* field name order by sql select,use in insert table(field,field) select
*/
protected List<String> fieldNamesOrderByDeclare = new ArrayList<>();
/**
* 0:select,1:from,2:where
*/
protected int parseStage = 0;

protected List<String> selectScripts = new ArrayList<>();

protected JoinSQLBuilder joinSQLBuilder;// select场景中有join的场景
protected SelectSQLBuilder parentSelect;//如果有嵌套查询,则表示外侧查询
protected SelectSQLBuilder subSelect;//如果有嵌套查询,则表示子查询
protected boolean isDistinct=false;
/**
* select场景中有join的场景
*/
protected JoinSQLBuilder joinSQLBuilder;
/**
* 如果有嵌套查询,则表示外侧查询
*/
protected SelectSQLBuilder parentSelect;
/**
* 如果有嵌套查询,则表示子查询
*/
protected SelectSQLBuilder subSelect;
protected boolean isDistinct = false;
/**
* group by 相关参数
*/
protected WindowBuilder windowBuilder;
protected String overName;

//有union的场景
/**
* 有union的场景
*/
protected UnionSQLBuilder unionSQLBuilder;

//主要是lower或trim等函数,如果同一字段添加多次函数,最终只返回一个名字。使用时需要谨慎
//string:包含函数名;变量名
/**
* 主要是lower或trim等函数,如果同一字段添加多次函数,最终只返回一个名字。使用时需要谨慎 string:包含函数名;变量名
*/
protected Map<String, String> innerVarNames = new HashMap();

/**
* use in create table
*/
protected boolean closeFieldCheck = false;

protected boolean closeFieldCheck=false;//use in create table
@Override
public void buildSQL() {
if (pipelineBuilder == null) {
@@ -101,14 +121,14 @@ public void buildSQL() {
}

if (unionSQLBuilder != null) {
if(!unionSQLBuilder.getTableName().equals(pipelineBuilder.getParentTableName())){
if(unionSQLBuilder.containsTableName(pipelineBuilder.getParentTableName())){
if (!unionSQLBuilder.getTableName().equals(pipelineBuilder.getParentTableName())) {
if (unionSQLBuilder.containsTableName(pipelineBuilder.getParentTableName())) {
unionSQLBuilder.setTableName(pipelineBuilder.getParentTableName());
this.setTableName(unionSQLBuilder.getTableName());
SelectSQLBuilder parent=this.parentSelect;
while (parent!=null){
SelectSQLBuilder parent = this.parentSelect;
while (parent != null) {
parent.setTableName(this.getTableName());
parent=parent.parentSelect;
parent = parent.parentSelect;
}
}
unionSQLBuilder.setTableName(pipelineBuilder.getParentTableName());
@@ -175,8 +195,8 @@ protected void bulidExpression() {
if (StringUtil.isNotEmpty(expression)) {
SqlSelect sqlSelect = (SqlSelect)sqlNode;
sqlSelect.setWhere(null);
String ruleName= NameCreator.createOrGet(this.getPipelineBuilder().getPipelineName()).createName(this.getPipelineBuilder().getPipelineName(),"rule");
pipelineBuilder.addChainStage(new FilterOperator(getNamespace(),ruleName,expression));
String ruleName = NameCreator.createOrGet(this.getPipelineBuilder().getPipelineName()).createName(this.getPipelineBuilder().getPipelineName(), "rule");
pipelineBuilder.addChainStage(new FilterOperator(getNamespace(), ruleName, expression));
}

}
@@ -187,7 +207,7 @@ protected void bulidExpression() {
protected void buildGroup() {
if (windowBuilder != null) {
windowBuilder.setPipelineBuilder(pipelineBuilder);
// windowBuilder.setTreeSQLBulider(getTreeSQLBulider());
//windowBuilder.setTreeSQLBulider(getTreeSQLBulider());
windowBuilder.setOwner(this);
windowBuilder.build();
}
@@ -205,8 +225,8 @@ protected void buildSelect() {
*/
String retainScript = "retainField(";
boolean isFirst = true;
String[] fieldNames=new String[fieldName2ParseResult.size()];
int i=0;
String[] fieldNames = new String[fieldName2ParseResult.size()];
int i = 0;
if (fieldName2ParseResult != null) {
Iterator<Entry<String, IParseResult>> it = fieldName2ParseResult.entrySet().iterator();

@@ -219,9 +239,9 @@ protected void buildSelect() {
retainScript += ",";
}
if (entry.getKey().indexOf("*") != -1) {
String fieldName=doAsteriskTrimAliasName(entry.getKey(), stringBuilder, allFieldNames);
String fieldName = doAsteriskTrimAliasName(entry.getKey(), stringBuilder, allFieldNames);
retainScript += fieldName;
fieldNames[i]=fieldName;
fieldNames[i] = fieldName;
} else {
String name = entry.getKey();
if (name.indexOf(".") != -1) {
@@ -233,7 +253,7 @@ protected void buildSelect() {
}
allFieldNames.add(name);
retainScript += name;
fieldNames[i]=name;
fieldNames[i] = name;
}
IParseResult parseResult = entry.getValue();

@@ -265,9 +285,9 @@ protected void buildSelect() {
String scriptValue = stringBuilder.toString();
this.allFieldNames = allFieldNames;
if (isFirst == false) {
if(isDistinct){
String distinctScript=scriptValue.replace("retainField","distinct");
scriptValue+=distinctScript;
if (isDistinct) {
String distinctScript = scriptValue.replace("retainField", "distinct");
scriptValue += distinctScript;
}
pipelineBuilder.addChainStage(new ScriptOperator(scriptValue));
//optimizer.put(scriptValue,true);
@@ -400,7 +420,7 @@ protected String doAsteriskTrimAliasName(String key, StringBuilder stringBuilder
if (index != -1) {
String name = fieldName.substring(index + 1);
if (fieldNames.contains(name)) {
stringBuilder.append(name + "=" + fieldName + ";" + PrintUtil.LINE);
stringBuilder.append(name).append("=").append(fieldName).append(";").append(PrintUtil.LINE);
list.add(name);
}
} else {
@@ -423,19 +443,15 @@ protected boolean inSelectField(String fieldName) {
Entry<String, IParseResult> entry = it.next();
String key = entry.getKey();
String value = null;
if (ScriptParseResult.class.isInstance(entry.getValue())) {
if (entry.getValue() instanceof ScriptParseResult) {
ScriptParseResult scriptParseResult = (ScriptParseResult)entry.getValue();
value = scriptParseResult.getScript();
if (value.endsWith("=" + fieldName + ";")) {
return true;
}
} else {
value = entry.getValue().getReturnValue();
if (key.equals(value)) {
continue;
}
}

}
return false;
}
@@ -461,8 +477,8 @@ public Set<String> parseDependentTables() {
if (join != null) {
dependentTables.addAll(join.parseDependentTables());
}
UnionSQLBuilder union=this.unionSQLBuilder;
if(union!=null){
UnionSQLBuilder union = this.unionSQLBuilder;
if (union != null) {
dependentTables.addAll(union.parseDependentTables());
}
return dependentTables;
@@ -608,9 +624,9 @@ public String getFieldName(String fieldName, boolean containsSelf) {
} else {
String ailasName = fieldName.substring(0, index);
fieldName = fieldName.substring(index + 1);
String tableAilasName=getAsName();
if(ailasName!=null&&tableAilasName==null){
tableAilasName=getTableName();
String tableAilasName = getAsName();
if (ailasName != null && tableAilasName == null) {
tableAilasName = getTableName();
}
if (ailasName.equals(tableAilasName)) {
if (fieldNames.contains(fieldName)) {
@@ -631,7 +647,7 @@ public String getFieldName(String fieldName, boolean containsSelf) {
@Override
@Deprecated
public String getFieldName(String fieldName) {
if(isCloseFieldCheck()){
if (isCloseFieldCheck()) {
return fieldName;
}
return getFieldName(fieldName, false);

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -16,11 +16,11 @@
*/
package com.alibaba.rsqldb.parser.parser.builder;

import com.alibaba.rsqldb.parser.parser.SQLParserContext;

import java.util.HashSet;
import java.util.Set;

import com.alibaba.rsqldb.parser.parser.SQLParserContext;

public class TableNodeBuilder extends SelectSQLBuilder {

@Override
Original file line number Diff line number Diff line change
@@ -16,13 +16,8 @@
*/
package com.alibaba.rsqldb.parser.parser.builder;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.model.NameCreator;
import org.apache.rocketmq.streams.common.topology.ChainPipeline;
import org.apache.rocketmq.streams.common.topology.ChainStage;
@@ -31,8 +26,13 @@
import org.apache.rocketmq.streams.common.topology.stages.UnionChainStage;
import org.apache.rocketmq.streams.common.utils.NameCreatorUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class UnionSQLBuilder extends SelectSQLBuilder {

Original file line number Diff line number Diff line change
@@ -16,31 +16,31 @@
*/
package com.alibaba.rsqldb.parser.parser.builder;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;

import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.ScriptParseResult;
import org.apache.rocketmq.streams.common.configure.StreamsConfigure;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.filter.builder.ExpressionBuilder;
import org.apache.rocketmq.streams.filter.operator.expression.Expression;
import org.apache.rocketmq.streams.filter.optimization.dependency.ScriptDependent;
import org.apache.rocketmq.streams.script.service.IScriptExpression;
import org.apache.rocketmq.streams.window.operator.AbstractWindow;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.ScriptParseResult;
import org.apache.rocketmq.streams.window.operator.impl.SessionOperator;
import org.apache.rocketmq.streams.window.operator.impl.ShuffleOverWindow;
import org.apache.rocketmq.streams.window.operator.impl.WindowOperator;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;

public class WindowBuilder extends SelectSQLBuilder {

/**
@@ -83,52 +83,54 @@ public class WindowBuilder extends SelectSQLBuilder {
*/
protected String type;


/**
* 时间字段
*/
protected String timeFieldName;
protected int timeUnitAdjust=60;
protected int timeUnitAdjust = 60;

protected List<String> groupByFieldNames = new ArrayList<>();

protected SelectSQLBuilder owner;
protected boolean isLocalStorageOnly = true;//是否只用本地存储
/**
* 是否只用本地存储
*/
protected boolean isLocalStorageOnly = true;
protected String having;
protected List<String> havingScript;
/**
* over window
* over window, 值为over时,是over window
*/
protected String overWindowName;//值为over时,是over window
protected boolean isShuffleOverWindow=false;
protected String overWindowName;
protected boolean isShuffleOverWindow = false;
protected List<String> shuffleOverWindowOrderByFieldNames;
protected int overWindowTopN=100;
protected int overWindowTopN = 100;

@Override
protected void build() {
AbstractWindow window;
if (overWindowName != null) {
if(!isShuffleOverWindow){
if (!isShuffleOverWindow) {
buildOverWindow();
return;
}

return;
}


window = org.apache.rocketmq.streams.window.builder.WindowBuilder.createWindow(type);
if(window.getEmitBeforeValue()==null||window.getEmitBeforeValue()==0){
window.setEmitBeforeValue( StreamsConfigure.getEmitBeforeValue());
if (window.getEmitBeforeValue() == null || window.getEmitBeforeValue() == 0) {
window.setEmitBeforeValue(StreamsConfigure.getEmitBeforeValue());
}
if(window.getEmitAfterValue()==null||window.getEmitAfterValue()==0){
if (window.getEmitAfterValue() == null || window.getEmitAfterValue() == 0) {
window.setEmitAfterValue(StreamsConfigure.getEmitAfterValue());
if(StreamsConfigure.getEmitMaxDelay()!=null){
if (StreamsConfigure.getEmitMaxDelay() != null) {
window.setMaxDelay(StreamsConfigure.getEmitMaxDelay());
}
}
window.setLocalStorageOnly(isLocalStorageOnly);
window.setTimeFieldName(timeFieldName);
window.setWindowType(type);
window.setTimeUnitAdjust(1);

if (window instanceof WindowOperator) {
window.setSizeInterval(Optional.ofNullable(size).orElse(AbstractWindow.DEFAULT_WINDOW_SIZE));
@@ -138,17 +140,13 @@ protected void build() {
window.setSlideInterval(Optional.ofNullable(slide).orElse(AbstractWindow.DEFAULT_WINDOW_SLIDE));
window.setSlideVariable(slideVariable);
window.setSlideAdjust(slideAdjust);


window.setTimeUnitAdjust(Optional.ofNullable(timeUnitAdjust).orElse(60));
}

if (window instanceof SessionOperator) {
SessionOperator theWindow = (SessionOperator) window;
SessionOperator theWindow = (SessionOperator)window;
theWindow.setSessionTimeOut(Optional.ofNullable(timeout).orElse(AbstractWindow.DEFAULT_WINDOW_SESSION_TIMEOUT));
}


Map<String, String> selectMap = new HashMap<>(32);
if (owner.getFieldName2ParseResult() != null) {
//select部分处理,map:key字段名,value:脚本或字段名
@@ -173,47 +171,47 @@ protected void build() {
}

protected void buildHaving(AbstractWindow window) {
if(StringUtil.isEmpty(having)){
if (StringUtil.isEmpty(having)) {
return;
}
window.setHavingExpression(having);
if(havingScript!=null){
List<Expression> expressions=new ArrayList<>();
ExpressionBuilder.createExpression("tmp","tmp",having,expressions,new ArrayList<>());
for(Expression expression:expressions){
String varName=expression.getVarName();
List<String> dependentScripts=getDependentScripts(varName,havingScript);
if(CollectionUtil.isNotEmpty(dependentScripts)){
window.getSelectMap().put(varName,MapKeyUtil.createKey(";",dependentScripts)+";");
if (havingScript != null) {
List<Expression> expressions = new ArrayList<>();
ExpressionBuilder.createExpression("tmp", "tmp", having, expressions, new ArrayList<>());
for (Expression expression : expressions) {
String varName = expression.getVarName();
List<String> dependentScripts = getDependentScripts(varName, havingScript);
if (CollectionUtil.isNotEmpty(dependentScripts)) {
window.getSelectMap().put(varName, MapKeyUtil.createKey(";", dependentScripts) + ";");
}
}
}

}

private List<String> getDependentScripts(String varName, List<String> script) {
ScriptDependent scriptDependent=new ScriptDependent(null,MapKeyUtil.createKey(";",script)+";");
List<IScriptExpression> scriptExpressions= scriptDependent.getDependencyExpression(varName);
List<String> expressions=new ArrayList<>();
for(IScriptExpression scriptExpression:scriptExpressions){
ScriptDependent scriptDependent = new ScriptDependent(null, MapKeyUtil.createKey(";", script) + ";");
List<IScriptExpression> scriptExpressions = scriptDependent.getDependencyExpression(varName);
List<String> expressions = new ArrayList<>();
for (IScriptExpression scriptExpression : scriptExpressions) {
expressions.add(scriptExpression.toString());
}
return expressions;
}

protected void buildOverWindow() {
AbstractWindow overWindow=null;
String groupBy=MapKeyUtil.createKey(";", groupByFieldNames);
if(!isShuffleOverWindow){
overWindow = org.apache.rocketmq.streams.window.builder.WindowBuilder.createOvertWindow(groupBy, overWindowName);
}else {
ShuffleOverWindow shuffleOverWindow=new ShuffleOverWindow();
AbstractWindow overWindow = null;
String groupBy = MapKeyUtil.createKey(";", groupByFieldNames);
if (!isShuffleOverWindow) {
overWindow = org.apache.rocketmq.streams.window.builder.WindowBuilder.createOvertWindow(groupBy, overWindowName);
} else {
ShuffleOverWindow shuffleOverWindow = new ShuffleOverWindow();
shuffleOverWindow.setTimeFieldName("");
shuffleOverWindow.setGroupByFieldName(groupBy);
shuffleOverWindow.setRowNumerName(overWindowName);
shuffleOverWindow.setTopN(overWindowTopN);
shuffleOverWindow.setOrderFieldNames(shuffleOverWindowOrderByFieldNames);
overWindow=shuffleOverWindow;
overWindow = shuffleOverWindow;
}

getPipelineBuilder().addChainStage(overWindow);
Original file line number Diff line number Diff line change
@@ -17,13 +17,14 @@
package com.alibaba.rsqldb.parser.parser.expression;

import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;

public class BlinkRule {
/**
Original file line number Diff line number Diff line change
@@ -24,6 +24,12 @@
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.ScriptParseResult;
import com.alibaba.rsqldb.parser.parser.sqlnode.AbstractSelectNodeParser;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;
import org.apache.rocketmq.streams.common.utils.FileUtil;
import org.apache.rocketmq.streams.common.utils.ReflectUtil;
import org.apache.rocketmq.streams.script.utils.FunctionUtils;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
@@ -32,12 +38,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;
import org.apache.rocketmq.streams.common.utils.FileUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.ReflectUtil;
import org.apache.rocketmq.streams.script.utils.FunctionUtils;

/**
* create by udf
Original file line number Diff line number Diff line change
@@ -16,10 +16,6 @@
*/
package com.alibaba.rsqldb.parser.parser.expression;

import java.util.List;

import org.apache.rocketmq.streams.common.datatype.DataType;
import org.apache.rocketmq.streams.common.datatype.StringDataType;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.namecreator.ParserNameCreator;
import com.alibaba.rsqldb.parser.parser.result.ConstantParseResult;
@@ -28,6 +24,10 @@
import com.alibaba.rsqldb.parser.parser.sqlnode.AbstractSelectNodeParser;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;
import org.apache.rocketmq.streams.common.datatype.DataType;
import org.apache.rocketmq.streams.common.datatype.StringDataType;

import java.util.List;

public class CompareParser extends AbstractSelectNodeParser<SqlBasicCall> {

Original file line number Diff line number Diff line change
@@ -16,9 +16,6 @@
*/
package com.alibaba.rsqldb.parser.parser.expression;

import java.util.List;

import org.apache.rocketmq.streams.common.datatype.StringDataType;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.namecreator.ParserNameCreator;
import com.alibaba.rsqldb.parser.parser.result.ConstantParseResult;
@@ -28,6 +25,9 @@
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.rocketmq.streams.common.datatype.StringDataType;

import java.util.List;

public class InParser extends AbstractSelectNodeParser<SqlBasicCall> {

Original file line number Diff line number Diff line change
@@ -16,13 +16,14 @@
*/
package com.alibaba.rsqldb.parser.parser.expression;

import java.util.List;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.ScriptParseResult;
import com.alibaba.rsqldb.parser.parser.sqlnode.AbstractSelectNodeParser;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;

import java.util.List;

public class IsTrueFunction extends AbstractSelectNodeParser<SqlBasicCall> {

Original file line number Diff line number Diff line change
@@ -16,8 +16,6 @@
*/
package com.alibaba.rsqldb.parser.parser.expression;

import java.util.List;

import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.result.ConstantParseResult;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
@@ -27,6 +25,8 @@
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.fun.SqlLikeOperator;

import java.util.List;

public class LikeFunction extends AbstractSelectNodeParser<SqlBasicCall> {

@Override
Original file line number Diff line number Diff line change
@@ -16,8 +16,6 @@
*/
package com.alibaba.rsqldb.parser.parser.expression;

import java.util.List;

import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.namecreator.ParserNameCreator;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
@@ -26,6 +24,8 @@
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;

import java.util.List;

public class RegexpParser extends AbstractSelectNodeParser<SqlBasicCall> {

@Override
Original file line number Diff line number Diff line change
@@ -16,9 +16,6 @@
*/
package com.alibaba.rsqldb.parser.parser.function;

import java.util.List;

import org.apache.rocketmq.streams.common.datatype.DataType;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.namecreator.ParserNameCreator;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
@@ -27,6 +24,9 @@
import com.alibaba.rsqldb.parser.util.SqlDataTypeUtil;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;
import org.apache.rocketmq.streams.common.datatype.DataType;

import java.util.List;

public class CastFunctionParser extends AbstractSelectNodeParser<SqlBasicCall> {

Original file line number Diff line number Diff line change
@@ -16,8 +16,6 @@
*/
package com.alibaba.rsqldb.parser.parser.function;

import java.util.List;

import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.namecreator.ParserNameCreator;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
@@ -26,6 +24,8 @@
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;

import java.util.List;

public class CountParser extends AbstractSelectNodeParser<SqlBasicCall> {

@Override
Original file line number Diff line number Diff line change
@@ -16,14 +16,15 @@
*/
package com.alibaba.rsqldb.parser.parser.function;

import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.ScriptParseResult;
import com.alibaba.rsqldb.parser.parser.sqlnode.AbstractSelectNodeParser;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;

import java.util.ArrayList;
import java.util.List;

public class FilterFunction extends AbstractSelectNodeParser<SqlBasicCall> {

Original file line number Diff line number Diff line change
@@ -16,8 +16,6 @@
*/
package com.alibaba.rsqldb.parser.parser.function;

import java.util.List;

import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.namecreator.ParserNameCreator;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
@@ -26,6 +24,8 @@
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;

import java.util.List;

public class GroupFunctionParser extends AbstractSelectNodeParser<SqlBasicCall> {

@Override
Original file line number Diff line number Diff line change
@@ -16,14 +16,14 @@
*/
package com.alibaba.rsqldb.parser.parser.function;

import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
import org.apache.rocketmq.streams.common.topology.model.IWindow;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.VarParseResult;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlIntervalLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
import org.apache.rocketmq.streams.common.topology.model.IWindow;
import org.apache.rocketmq.streams.window.builder.WindowBuilder;

public class HopParser extends TumbleParser {
Original file line number Diff line number Diff line change
@@ -16,8 +16,6 @@
*/
package com.alibaba.rsqldb.parser.parser.function;

import java.util.List;

import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.namecreator.ParserNameCreator;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
@@ -26,6 +24,8 @@
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;

import java.util.List;

public class IFFunctionParser extends AbstractSelectNodeParser<SqlBasicCall> {

@Override
Original file line number Diff line number Diff line change
@@ -16,14 +16,15 @@
*/
package com.alibaba.rsqldb.parser.parser.function;

import java.util.List;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.namecreator.ParserNameCreator;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.ScriptParseResult;
import com.alibaba.rsqldb.parser.parser.sqlnode.AbstractSelectNodeParser;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;

import java.util.List;

public class ItemParser extends AbstractSelectNodeParser<SqlBasicCall> {

Original file line number Diff line number Diff line change
@@ -16,15 +16,15 @@
*/
package com.alibaba.rsqldb.parser.parser.function;

import java.util.List;

import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.ScriptParseResult;
import com.alibaba.rsqldb.parser.parser.sqlnode.AbstractSelectNodeParser;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;

import java.util.List;

public class JsonConcatParser extends AbstractSelectNodeParser<SqlBasicCall> {

@Override
Original file line number Diff line number Diff line change
@@ -16,8 +16,6 @@
*/
package com.alibaba.rsqldb.parser.parser.function;

import java.util.List;

import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.namecreator.ParserNameCreator;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
@@ -26,6 +24,8 @@
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;

import java.util.List;

public class LowerParser extends AbstractSelectNodeParser<SqlBasicCall> {

@Override
Original file line number Diff line number Diff line change
@@ -16,22 +16,22 @@
*/
package com.alibaba.rsqldb.parser.parser.function;

import java.util.ArrayList;
import java.util.List;

import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.rocketmq.streams.common.model.NameCreator;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.WindowBuilder;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.VarParseResult;
import com.alibaba.rsqldb.parser.parser.sqlnode.AbstractSelectNodeParser;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlWindow;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.model.NameCreator;

import java.util.ArrayList;
import java.util.List;

public class OverWindowParser extends AbstractSelectNodeParser<SqlBasicCall> {

Original file line number Diff line number Diff line change
@@ -16,19 +16,19 @@
*/
package com.alibaba.rsqldb.parser.parser.function;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.rocketmq.streams.filter.function.expression.ScriptFunction;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.namecreator.ParserNameCreator;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.ScriptParseResult;
import com.alibaba.rsqldb.parser.parser.sqlnode.AbstractSelectNodeParser;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.filter.function.expression.ScriptFunction;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ScriptFunctionParser extends AbstractSelectNodeParser<SqlBasicCall> {

Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.alibaba.rsqldb.parser.parser.function;

import org.apache.calcite.avatica.util.TimeUnit;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlIntervalLiteral;
import org.apache.calcite.sql.SqlNode;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.WindowBuilder;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.VarParseResult;
import com.alibaba.rsqldb.parser.parser.sqlnode.AbstractSelectNodeParser;
import org.apache.calcite.avatica.util.TimeUnit;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlIntervalLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.rocketmq.streams.window.operator.AbstractWindow;

/**
Original file line number Diff line number Diff line change
@@ -16,8 +16,6 @@
*/
package com.alibaba.rsqldb.parser.parser.function;

import java.util.List;

import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.NotSupportParseResult;
@@ -29,6 +27,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.List;

public class SqlAsOperatorParser extends AbstractSelectNodeParser<SqlBasicCall> {

private static final Log LOG = LogFactory.getLog(SqlAsOperatorParser.class);
Original file line number Diff line number Diff line change
@@ -16,12 +16,12 @@
*/
package com.alibaba.rsqldb.parser.parser.function;

import org.apache.rocketmq.streams.common.datatype.StringDataType;
import com.alibaba.rsqldb.parser.parser.builder.AbstractSQLBuilder;
import com.alibaba.rsqldb.parser.parser.AbstractSqlParser;
import com.alibaba.rsqldb.parser.parser.builder.AbstractSQLBuilder;
import com.alibaba.rsqldb.parser.parser.result.ConstantParseResult;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.rocketmq.streams.common.datatype.StringDataType;

public class SqlDataTypeSpecParser extends AbstractSqlParser<SqlDataTypeSpec, AbstractSQLBuilder> {

Original file line number Diff line number Diff line change
@@ -16,8 +16,8 @@
*/
package com.alibaba.rsqldb.parser.parser.function;

import com.alibaba.rsqldb.parser.parser.builder.AbstractSQLBuilder;
import com.alibaba.rsqldb.parser.parser.AbstractSqlParser;
import com.alibaba.rsqldb.parser.parser.builder.AbstractSQLBuilder;
import com.alibaba.rsqldb.parser.parser.namecreator.ParserNameCreator;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.ScriptParseResult;
Original file line number Diff line number Diff line change
@@ -16,18 +16,19 @@
*/
package com.alibaba.rsqldb.parser.parser.function;

import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.streams.common.datatype.DataType;
import org.apache.rocketmq.streams.common.utils.ContantsUtil;
import com.alibaba.rsqldb.parser.parser.builder.AbstractSQLBuilder;
import com.alibaba.rsqldb.parser.parser.AbstractSqlParser;
import com.alibaba.rsqldb.parser.parser.builder.AbstractSQLBuilder;
import com.alibaba.rsqldb.parser.parser.namecreator.ParserNameCreator;
import com.alibaba.rsqldb.parser.parser.result.ConstantParseResult;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.ScriptParseResult;
import com.alibaba.rsqldb.parser.util.SqlDataTypeUtil;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.rocketmq.streams.common.datatype.DataType;
import org.apache.rocketmq.streams.common.utils.ContantsUtil;

import java.util.ArrayList;
import java.util.List;

public class SqlLiteralParser extends AbstractSqlParser<SqlLiteral, AbstractSQLBuilder> {

Original file line number Diff line number Diff line change
@@ -16,14 +16,14 @@
*/
package com.alibaba.rsqldb.parser.parser.function;

import java.util.List;

import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.sqlnode.AbstractSelectNodeParser;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;

import java.util.List;

public class TransJsonArrayParser extends AbstractSelectNodeParser<SqlBasicCall> {

@Override
Original file line number Diff line number Diff line change
@@ -21,12 +21,12 @@
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.VarParseResult;
import com.alibaba.rsqldb.parser.parser.sqlnode.AbstractSelectNodeParser;
import org.apache.rocketmq.streams.window.operator.AbstractWindow;
import org.apache.calcite.avatica.util.TimeUnit;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlIntervalLiteral;
import org.apache.calcite.sql.SqlIntervalLiteral.IntervalValue;
import org.apache.calcite.sql.SqlNode;
import org.apache.rocketmq.streams.window.operator.AbstractWindow;

public class TumbleParser extends AbstractSelectNodeParser<SqlBasicCall> {

@@ -83,7 +83,7 @@ protected int getWindowPeriod(SqlIntervalLiteral sqlIntervalLiteral) {
//TODO default value
int interval = 10;
try {
interval = Integer.valueOf(intervalValue.getIntervalLiteral());
interval = Integer.parseInt(intervalValue.getIntervalLiteral());
} catch (Exception e) {

}
@@ -97,11 +97,11 @@ protected static int getDiff2Minute(TimeUnit timeUnit) {
case SECOND:
return 1;
case MINUTE:
return 1;
case DAY:
return 24 * 60;
case HOUR:
return 60;
case HOUR:
return 60 * 60;
case DAY:
return 24 * 60 * 60;
default:
throw new RuntimeException("can not this time unit :" + timeUnit.toString()
+ ", support second,minute,houre,day, millsecond");
@@ -118,15 +118,16 @@ protected static int getDiff2Minute(TimeUnit timeUnit) {
protected static int convert2Minute(int interval, TimeUnit timeUnit) {
int tumblePeriod = interval;
if (timeUnit != null) {
if (TimeUnit.MINUTE == timeUnit || TimeUnit.SECOND == timeUnit) {
if (TimeUnit.SECOND == timeUnit) {
tumblePeriod = interval;
} else if (TimeUnit.HOUR == timeUnit) {
} else if (TimeUnit.MINUTE == timeUnit) {
tumblePeriod = interval * 60;
} else if (TimeUnit.HOUR == timeUnit) {
tumblePeriod = interval * 60 * 60;
} else if (TimeUnit.DAY == timeUnit) {
tumblePeriod = interval * 24 * 60;
tumblePeriod = interval * 24 * 60 * 60;
} else {
throw new RuntimeException("can not this time unit :" + timeUnit.toString()
+ ", support second,minute,houre,day, millsecond");
throw new RuntimeException("can not this time unit :" + timeUnit + ", support second,minute,houre,day, millsecond");
}
}
return tumblePeriod;
Original file line number Diff line number Diff line change
@@ -16,12 +16,12 @@
*/
package com.alibaba.rsqldb.parser.parser.result;

import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import org.apache.rocketmq.streams.common.utils.StringUtil;

import java.util.ArrayList;
import java.util.List;

import org.apache.rocketmq.streams.common.utils.StringUtil;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;

/**
* 每个解析的返回结果。需要考虑变量,常量和脚本的情况
*/
Original file line number Diff line number Diff line change
@@ -16,17 +16,17 @@
*/
package com.alibaba.rsqldb.parser.parser.result;

import com.alibaba.rsqldb.parser.parser.ISqlParser;
import com.alibaba.rsqldb.parser.parser.SQLNodeParserFactory;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import org.apache.calcite.sql.SqlNode;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import com.alibaba.rsqldb.parser.parser.SQLNodeParserFactory;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.ISqlParser;
import org.apache.calcite.sql.SqlNode;

/**
* 每个解析的返回结果。需要考虑变量,常量和脚本的情况
*/
Original file line number Diff line number Diff line change
@@ -16,11 +16,11 @@
*/
package com.alibaba.rsqldb.parser.parser.sqlnode;

import org.apache.rocketmq.streams.common.datatype.StringDataType;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.result.ConstantParseResult;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.parser.result.ScriptParseResult;
import org.apache.rocketmq.streams.common.datatype.StringDataType;

public abstract class AbstractSelectNodeParser<T> extends AbstractSqlNodeParser<T, SelectSQLBuilder> {

Original file line number Diff line number Diff line change
@@ -16,8 +16,8 @@
*/
package com.alibaba.rsqldb.parser.parser.sqlnode;

import com.alibaba.rsqldb.parser.parser.builder.AbstractSQLBuilder;
import com.alibaba.rsqldb.parser.parser.AbstractSqlParser;
import com.alibaba.rsqldb.parser.parser.builder.AbstractSQLBuilder;

public abstract class AbstractSqlNodeParser<T, DESCRIPTOR extends AbstractSQLBuilder>
extends AbstractSqlParser<T, DESCRIPTOR> implements
Original file line number Diff line number Diff line change
@@ -16,8 +16,6 @@
*/
package com.alibaba.rsqldb.parser.parser.sqlnode;

import java.util.List;

import com.alibaba.rsqldb.parser.parser.builder.CreateSQLBuilder;
import com.alibaba.rsqldb.parser.parser.result.BuilderParseResult;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
@@ -27,6 +25,8 @@
import org.apache.commons.logging.LogFactory;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;

import java.util.List;

/**
* Create Table Parser
*/
Original file line number Diff line number Diff line change
@@ -21,14 +21,15 @@
import com.alibaba.rsqldb.parser.parser.builder.InsertSQLBuilder;
import com.alibaba.rsqldb.parser.parser.result.BuilderParseResult;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.sql.SqlEmit;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.rocketmq.streams.common.configure.StreamsConfigure;

import java.util.ArrayList;
import java.util.List;

public class InsertParser extends AbstractSqlNodeParser<SqlInsert, InsertSQLBuilder> {

@Override
Original file line number Diff line number Diff line change
@@ -16,15 +16,13 @@
*/
package com.alibaba.rsqldb.parser.parser.sqlnode;

import java.util.List;

import com.alibaba.rsqldb.parser.parser.ISqlParser;
import com.alibaba.rsqldb.parser.parser.SQLNodeParserFactory;
import com.alibaba.rsqldb.parser.parser.builder.AbstractSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.JoinSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.LateralTableBuilder;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.TableNodeBuilder;
import com.alibaba.rsqldb.parser.parser.ISqlParser;
import com.alibaba.rsqldb.parser.parser.function.SqlIndentifierParser;
import com.alibaba.rsqldb.parser.parser.result.BuilderParseResult;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
@@ -37,6 +35,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.List;

public class JoinParser extends AbstractSqlNodeParser<SqlJoin, JoinSQLBuilder> {

private static final Log LOG = LogFactory.getLog(JoinParser.class);
Original file line number Diff line number Diff line change
@@ -16,17 +16,11 @@
*/
package com.alibaba.rsqldb.parser.parser.sqlnode;

import java.util.ArrayList;
import java.util.List;

import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.filter.builder.ExpressionBuilder;
import org.apache.rocketmq.streams.filter.operator.expression.Expression;
import com.alibaba.rsqldb.parser.parser.ISqlParser;
import com.alibaba.rsqldb.parser.parser.SQLNodeParserFactory;
import com.alibaba.rsqldb.parser.parser.builder.JoinSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.UnionSQLBuilder;
import com.alibaba.rsqldb.parser.parser.ISqlParser;
import com.alibaba.rsqldb.parser.parser.function.HopParser;
import com.alibaba.rsqldb.parser.parser.result.BuilderParseResult;
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
@@ -38,6 +32,12 @@
import org.apache.calcite.sql.SqlSelect;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.filter.builder.ExpressionBuilder;
import org.apache.rocketmq.streams.filter.operator.expression.Expression;

import java.util.ArrayList;
import java.util.List;

public class SelectParser extends AbstractSelectNodeParser<SqlSelect> {

Original file line number Diff line number Diff line change
@@ -16,13 +16,6 @@
*/
package com.alibaba.rsqldb.parser.parser.sqlnode;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.alibaba.rsqldb.parser.parser.SQLNodeParserFactory;
import com.alibaba.rsqldb.parser.parser.builder.AbstractSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.UnionSQLBuilder;
@@ -33,6 +26,13 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class UnionParser extends AbstractSqlNodeParser<SqlBasicCall, UnionSQLBuilder> {
private static final Log LOG = LogFactory.getLog(UnionParser.class);

Original file line number Diff line number Diff line change
@@ -16,8 +16,6 @@
*/
package com.alibaba.rsqldb.parser.parser.sqlnode;

import java.util.Set;

import com.alibaba.rsqldb.parser.parser.SQLNodeParserFactory;
import com.alibaba.rsqldb.parser.parser.builder.AbstractSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
@@ -27,6 +25,8 @@
import org.apache.calcite.sql.SqlNode;
import org.apache.flink.sql.parser.ddl.SqlCreateView;

import java.util.Set;

public class ViewParser extends AbstractSqlNodeParser<SqlCreateView, ViewSQLBuilder> {

@Override
Original file line number Diff line number Diff line change
@@ -16,20 +16,21 @@
*/
package com.alibaba.rsqldb.parser.util;

import java.util.List;
import com.alibaba.rsqldb.parser.parser.SQLNodeParserFactory;
import com.alibaba.rsqldb.parser.parser.builder.CreateSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.rocketmq.streams.common.datatype.DataType;
import org.apache.rocketmq.streams.common.datatype.StringDataType;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.metadata.MetaDataField;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.sql.parser.ddl.SqlTableColumn;
import com.alibaba.rsqldb.parser.parser.SQLNodeParserFactory;
import com.alibaba.rsqldb.parser.parser.builder.CreateSQLBuilder;
import com.alibaba.rsqldb.parser.parser.builder.SelectSQLBuilder;
import org.apache.rocketmq.streams.common.datatype.DataType;
import org.apache.rocketmq.streams.common.datatype.StringDataType;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.metadata.MetaDataField;

import java.util.List;

public class ColumnUtil {
private static final Log LOG = LogFactory.getLog(ColumnUtil.class);
Original file line number Diff line number Diff line change
@@ -16,9 +16,6 @@
*/
package com.alibaba.rsqldb.parser.util;

import java.util.HashMap;
import java.util.Map;

import org.apache.rocketmq.streams.common.datatype.BooleanDataType;
import org.apache.rocketmq.streams.common.datatype.DataType;
import org.apache.rocketmq.streams.common.datatype.DateDataType;
@@ -29,6 +26,9 @@
import org.apache.rocketmq.streams.common.datatype.StringDataType;
import org.apache.rocketmq.streams.script.utils.FunctionUtils;

import java.util.HashMap;
import java.util.Map;

public class SqlDataTypeUtil {

private static Map<String, DataType> dataTypeMap = new HashMap<>();
51 changes: 0 additions & 51 deletions rsqldb-parser/src/main/resources/log4j.xml

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -16,17 +16,17 @@
*/
package org.apache.rsqldb.runner;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.topology.ChainPipeline;
import org.apache.rocketmq.streams.common.topology.model.Pipeline;
import org.apache.rocketmq.streams.common.topology.task.StreamsTask;
import org.apache.rocketmq.streams.configurable.ConfigurableComponent;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
* @author junjie.cheng
* @date 2021/11/25
Original file line number Diff line number Diff line change
@@ -17,8 +17,6 @@
package org.apache.rsqldb.runner;

import com.alibaba.rsqldb.clients.sql.SQLStream;
import com.alibaba.rsqldb.parser.entity.SqlTask;

import org.apache.rocketmq.streams.common.utils.FileUtil;

/**
Original file line number Diff line number Diff line change
@@ -16,21 +16,20 @@
*/
package org.apache.rsqldb.runner;

import java.io.File;
import java.util.List;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.stream.Collectors;

import com.alibaba.rsqldb.parser.entity.SqlTask;

import com.google.common.collect.Lists;
import org.apache.rocketmq.streams.common.utils.FileUtil;
import org.apache.rocketmq.streams.common.utils.PropertiesUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

import java.io.File;
import java.util.List;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.stream.Collectors;

/**
* @author junjie.cheng
* @date 2021/11/19
9 changes: 6 additions & 3 deletions rsqldb-runner/src/main/resources/dipper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## checkpoint存储配置,可以是memory, DB 或者file, 除了checkpoint外, 任务序列化的内容也会被缓存在该存储
# dipper.configurable.service.type=memory

window.shuffle.channel.type=memory
## 当checkpoint为DB时
# dipper.rds.jdbc.type=
# dipper.rds.jdbc.url=
@@ -11,10 +11,13 @@

## 任务从存储反序列化的频次
# dipper.configurable.polling.time=60 #单位秒(s)

pipeline.qps.print=true
homologous.expression.cache.size=100000
homologous.pre.fingerprint.cache.size=100000
homologous.optimization.switch=true
## 监控日志的相关配置
# dipper.monitor.output.level=INFO #日志等级,有三种INFO,SLOW,ERROR
# dipper.monitor.slow.timeout=60000 #慢查询超时时间
dipper.monitor.slow.timeout=60000
# dipper.monitor.logs.dir=./logs #日志目录

## 窗口配置
Original file line number Diff line number Diff line change
@@ -16,9 +16,9 @@
*/
package com.alibaba.rsqldb.udf;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.rocketmq.streams.common.datatype.DataType;
import org.apache.rocketmq.streams.common.utils.DataTypeUtil;
import org.apache.flink.api.common.typeinfo.TypeInformation;

public class BlinkDataType {

Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.alibaba.rsqldb.udf;

import java.io.File;
import org.apache.flink.api.common.accumulators.DoubleCounter;
import org.apache.flink.api.common.accumulators.Histogram;
import org.apache.flink.api.common.accumulators.IntCounter;
@@ -9,6 +8,8 @@
import org.apache.flink.table.functions.FunctionContext;
import org.apache.rocketmq.streams.script.service.udf.UDFScript;

import java.io.File;

public class FunctionUDFScript extends UDFScript {

protected transient FunctionContext functionContext = new FunctionContext() {
Original file line number Diff line number Diff line change
@@ -16,10 +16,66 @@
*/
package com.alibaba.rsqldb.udf.udaf;

import java.io.File;

import org.apache.flink.api.common.accumulators.DoubleCounter;
import org.apache.flink.api.common.accumulators.Histogram;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.rocketmq.streams.script.service.udf.UDAFScript;

public class BlinkUDAFScript extends UDAFScript {

protected transient FunctionContext functionContext = new FunctionContext() {

@Override
public MetricGroup getMetricGroup() {
return null;
}

@Override
public File getCachedFile(String name) {
return null;
}

@Override
public int getNumberOfParallelSubtasks() {
return 0;
}

@Override
public int getIndexOfThisSubtask() {
return 0;
}

@Override
public IntCounter getIntCounter(String name) {
return null;
}

@Override
public LongCounter getLongCounter(String name) {
return null;
}

@Override
public DoubleCounter getDoubleCounter(String name) {
return null;
}

@Override
public Histogram getHistogram(String name) {
return null;
}

@Override
public String getJobParameter(String key, String defaultValue) {
return "0";
}
};

public BlinkUDAFScript() {
this.accumulateMethodName = "accumulate";
this.createAccumulatorMethodName = "createAccumulator";
@@ -28,6 +84,7 @@ public BlinkUDAFScript() {
this.mergeMethodName = "merge";
this.methodName = "eval";
this.initMethodName = "open";
this.initParameters = new Object[] {functionContext};
}

@Override
Original file line number Diff line number Diff line change
@@ -16,16 +16,15 @@
*/
package com.alibaba.rsqldb.udf.udf;

import java.io.File;

import org.apache.rocketmq.streams.script.service.udf.UDFScript;
import org.apache.flink.api.common.accumulators.DoubleCounter;
import org.apache.flink.api.common.accumulators.Histogram;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;

import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.rocketmq.streams.script.service.udf.UDFScript;

import java.io.File;

public class BlinkUDFScript extends UDFScript {

Original file line number Diff line number Diff line change
@@ -17,14 +17,70 @@
package com.alibaba.rsqldb.udf.udtf;

import com.alibaba.rsqldb.udf.udtf.collector.BlinkCollector;
import java.io.File;
import org.apache.flink.api.common.accumulators.DoubleCounter;
import org.apache.flink.api.common.accumulators.Histogram;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.rocketmq.streams.script.service.udf.UDTFScript;
import org.apache.flink.util.Collector;

public class BlinkUDTFScript extends UDTFScript {

protected transient FunctionContext functionContext = new FunctionContext() {

@Override
public MetricGroup getMetricGroup() {
return null;
}

@Override
public File getCachedFile(String name) {
return null;
}

@Override
public int getNumberOfParallelSubtasks() {
return 0;
}

@Override
public int getIndexOfThisSubtask() {
return 0;
}

@Override
public IntCounter getIntCounter(String name) {
return null;
}

@Override
public LongCounter getLongCounter(String name) {
return null;
}

@Override
public DoubleCounter getDoubleCounter(String name) {
return null;
}

@Override
public Histogram getHistogram(String name) {
return null;
}

@Override
public String getJobParameter(String key, String defaultValue) {
return "0";
}
};

public BlinkUDTFScript() {
this.methodName = "eval";
this.initMethodName = "open";
this.initParameters = new Object[] {functionContext};
this.setSetCollectorMethodName("setCollector");
}

Original file line number Diff line number Diff line change
@@ -18,14 +18,13 @@

import com.alibaba.fastjson.JSONObject;
import com.alibaba.rsqldb.udf.udtf.BlinkUDTFScript;
import org.apache.rocketmq.streams.common.context.IMessage;

import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.script.context.FunctionContext;
import org.apache.rocketmq.streams.common.model.ThreadContext;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.model.ThreadContext;
import org.apache.rocketmq.streams.script.context.FunctionContext;
import org.apache.rocketmq.streams.script.function.model.FunctionType;

public class BlinkCollector implements Collector {
Original file line number Diff line number Diff line change
@@ -19,22 +19,27 @@
import com.alibaba.fastjson.JSONObject;
import com.alibaba.rsqldb.udf.BlinkDataType;
import com.alibaba.rsqldb.udf.udtf.BlinkUDTFScript;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.types.DataTypes;
import org.apache.flink.table.types.RowType;
import org.apache.flink.table.types.TypeInfoWrappedType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.datatype.*;
import org.apache.rocketmq.streams.common.datatype.BooleanDataType;
import org.apache.rocketmq.streams.common.datatype.ByteDataType;
import org.apache.rocketmq.streams.common.datatype.DataType;
import org.apache.rocketmq.streams.common.datatype.DateDataType;
import org.apache.rocketmq.streams.common.datatype.DoubleDataType;
import org.apache.rocketmq.streams.common.datatype.FloatDataType;
import org.apache.rocketmq.streams.common.datatype.IntDataType;
import org.apache.rocketmq.streams.common.datatype.LongDataType;
import org.apache.rocketmq.streams.common.datatype.ShortDataType;
import org.apache.rocketmq.streams.common.datatype.StringDataType;
import org.apache.rocketmq.streams.common.model.ThreadContext;
import org.apache.rocketmq.streams.script.context.FunctionContext;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.types.DataTypes;
import org.apache.flink.table.types.RowType;
import org.apache.flink.table.types.TypeInfoWrappedType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.rocketmq.streams.script.function.model.FunctionType;

import java.lang.reflect.InvocationTargetException;
Original file line number Diff line number Diff line change
@@ -17,13 +17,12 @@
package com.alibaba.rsqldb.udf.udtf.collector;

import com.alibaba.fastjson.JSONObject;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.util.Collector;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.script.context.FunctionContext;
import org.apache.rocketmq.streams.common.model.ThreadContext;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.util.Collector;
import org.apache.rocketmq.streams.script.context.FunctionContext;
import org.apache.rocketmq.streams.script.function.model.FunctionType;

public class BlinkTupleCollector implements Collector<Tuple> {
20 changes: 0 additions & 20 deletions rsqldb-udf/src/test/resources/log4j.xml

This file was deleted.