Skip to content

Commit

Permalink
insertInto sql success.
Browse files Browse the repository at this point in the history
  • Loading branch information
ni-ze committed Jan 5, 2023
1 parent 81f6964 commit f466b49
Show file tree
Hide file tree
Showing 21 changed files with 213 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package com.alibaba.rsqldb.common.exception;

public class DeserializeException extends RuntimeException {
public class DeserializeException extends Exception {
public DeserializeException() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package com.alibaba.rsqldb.common.exception;

public class SerializeException extends RuntimeException{
public class SerializeException extends Exception {
public SerializeException() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.alibaba.rsqldb.parser.model.Columns;
import com.alibaba.rsqldb.parser.model.baseType.Literal;
import com.alibaba.rsqldb.parser.model.baseType.StringType;
import com.alibaba.rsqldb.parser.serialization.Serializer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -37,6 +38,8 @@
import org.apache.rocketmq.streams.core.rstream.StreamBuilder;
import org.apache.rocketmq.streams.core.rstream.WindowStream;
import org.apache.rocketmq.streams.core.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.List;
Expand All @@ -49,6 +52,7 @@
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class CreateTableStatement extends Statement {
private static final Logger logger = LoggerFactory.getLogger(CreateTableStatement.class);
private Columns columns;
private List<Pair<String, Literal<?>>> properties;

Expand Down Expand Up @@ -166,7 +170,7 @@ private SerializeType getSerializeTypeFromProperties() {

@Override
public BuildContext build(BuildContext context) throws Throwable {
Set<String> fieldNames = this.columns.getFields();
Set<String> fieldNames = this.columns.getFields();

if (context.getHeader(RSQLConstant.TABLE_TYPE) == RSQLConstant.TableType.SOURCE) {
StreamBuilder builder = context.getStreamBuilder();
Expand All @@ -178,6 +182,7 @@ public BuildContext build(BuildContext context) throws Throwable {
while (entryIterator.hasNext()) {
Map.Entry<String, JsonNode> next = entryIterator.next();
if (!fieldNames.contains(next.getKey())) {
logger.info("remove field, name:{}, value:{}", next.getKey(), next.getValue());
entryIterator.remove();
}
}
Expand All @@ -188,6 +193,7 @@ public BuildContext build(BuildContext context) throws Throwable {
context.addRStreamSource(this.getTableName(), rStream);
context.setCreateTableStatement(this);
} else if (context.getHeader(RSQLConstant.TABLE_TYPE) == RSQLConstant.TableType.SINK) {
Serializer serializer = SerializeTypeContainer.getSerializer(serializeType);
RStream<? extends JsonNode> stream = context.getrStreamResult();
WindowStream<String, ? extends JsonNode> windowStream = context.getWindowStreamResult();
GroupedStream<String, ? extends JsonNode> groupedStream = context.getGroupedStreamResult();
Expand All @@ -197,7 +203,7 @@ public BuildContext build(BuildContext context) throws Throwable {
} else if (groupedStream != null) {
groupedStream.sink(topicName, new JsonStringKVSer<>());
} else {
stream.sink(topicName, new JsonObjectKVSer<>());
stream.sink(topicName, new JsonObjectKVSer<>(serializer));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@

import com.alibaba.rsqldb.parser.impl.BuildContext;
import com.alibaba.rsqldb.parser.model.statement.query.QueryStatement;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

@JsonIgnoreProperties(ignoreUnknown = true)
public class CreateViewStatement extends Statement {
private QueryStatement queryStatement;

public CreateViewStatement(String content, String tableName, QueryStatement queryStatement) {
@JsonCreator
public CreateViewStatement(@JsonProperty("content") String content, @JsonProperty("tableName") String tableName,
@JsonProperty("queryStatement") QueryStatement queryStatement) {
super(content, tableName);
this.queryStatement = queryStatement;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import com.alibaba.rsqldb.parser.impl.BuildContext;
import com.alibaba.rsqldb.parser.model.Columns;
import com.alibaba.rsqldb.parser.model.statement.query.QueryStatement;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.rocketmq.streams.core.rstream.GroupedStream;
import org.apache.rocketmq.streams.core.rstream.RStream;
Expand All @@ -33,23 +36,27 @@

/**
* String sql = "INSERT INTO Customers (CustomerName, ContactName, Address, City)\n" +
* "select field_1\n" +
* " , field_2\n" +
* " , field_3\n" +
* " , field_4\n" +
* "from test_source where field_1='1';";
* "select field_1\n" +
* " , field_2\n" +
* " , field_3\n" +
* " , field_4\n" +
* "from test_source where field_1='1';";
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class InsertQueryStatement extends Statement {
private static final Logger logger = LoggerFactory.getLogger(InsertQueryStatement.class);
private QueryStatement queryStatement;
private Columns columns;


public InsertQueryStatement(String content, String tableName, QueryStatement queryStatement) {
super(content, tableName);
this.queryStatement = queryStatement;
}

public InsertQueryStatement(String content, String tableName, QueryStatement queryStatement, Columns columns) {
@JsonCreator
public InsertQueryStatement(@JsonProperty("content") String content, @JsonProperty("tableName") String tableName,
@JsonProperty("queryStatement") QueryStatement queryStatement, @JsonProperty("columns") Columns columns) {
super(content, tableName);
this.queryStatement = queryStatement;
this.columns = columns;
Expand Down Expand Up @@ -87,7 +94,8 @@ public BuildContext build(BuildContext context) throws Throwable {
fieldName2NewName.put(name, name);
}

RStream<JsonNode> stream = context.getRStreamSource(this.getTableName());
//the right RStream will in StreamResult field, because build queryStatement will happen before this.
RStream<? extends JsonNode> stream = context.getrStreamResult();

WindowStream<String, ? extends JsonNode> windowStream = context.getWindowStreamResult();
GroupedStream<String, ? extends JsonNode> groupedStream = context.getGroupedStreamResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import com.alibaba.rsqldb.parser.model.baseType.Literal;
import com.alibaba.rsqldb.parser.model.baseType.NumberType;
import com.alibaba.rsqldb.parser.model.baseType.StringType;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.streams.core.util.Pair;

Expand All @@ -46,12 +49,15 @@
*
* 没有source,只有sink
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class InsertValueStatement extends Statement {
private static final String template = "insert sql=[%s], create table sql=[%s]";
//columns中顺序就是insert语句中值的顺序:INSERT INTO `purchaser_dim` VALUES (1,'tom','male','16');
private List<ColumnValue> columns = new ArrayList<>();

public InsertValueStatement(String content, String tableName, List<ColumnValue> columns) {
@JsonCreator
public InsertValueStatement(@JsonProperty("content") String content, @JsonProperty("tableName") String tableName,
@JsonProperty("columns") List<ColumnValue> columns) {
super(content, tableName);
this.columns = columns;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,26 @@
import com.alibaba.rsqldb.parser.impl.BuildContext;
import com.alibaba.rsqldb.parser.model.Calculator;
import com.alibaba.rsqldb.parser.model.Field;
import com.alibaba.rsqldb.parser.model.baseType.BooleanType;
import com.alibaba.rsqldb.parser.model.baseType.NumberType;
import com.alibaba.rsqldb.parser.model.baseType.StringType;
import com.alibaba.rsqldb.parser.model.expression.AndExpression;
import com.alibaba.rsqldb.parser.model.expression.Expression;
import com.alibaba.rsqldb.parser.model.expression.OrExpression;
import com.alibaba.rsqldb.parser.model.expression.SingleExpression;
import com.alibaba.rsqldb.parser.model.expression.SingleValueCalcuExpression;
import com.alibaba.rsqldb.parser.model.statement.Statement;
import com.alibaba.rsqldb.parser.model.statement.query.join.JointGroupByHavingStatement;
import com.alibaba.rsqldb.parser.model.statement.query.join.JointGroupByStatement;
import com.alibaba.rsqldb.parser.model.statement.query.join.JointStatement;
import com.alibaba.rsqldb.parser.model.statement.query.join.JointWhereGBHavingStatement;
import com.alibaba.rsqldb.parser.model.statement.query.join.JointWhereGroupByStatement;
import com.alibaba.rsqldb.parser.model.statement.query.join.JointWhereStatement;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -61,6 +72,21 @@
* 单纯的select * from 语句map中的value都是null,不会存在计算。
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY
)
@JsonSubTypes({
@JsonSubTypes.Type(value = FilterQueryStatement.class, name = "filterQueryStatement"),
@JsonSubTypes.Type(value = GroupByQueryStatement.class, name = "groupByQueryStatement"),
@JsonSubTypes.Type(value = JointGroupByHavingStatement.class, name = "jointGroupByHavingStatement"),
@JsonSubTypes.Type(value = JointGroupByStatement.class, name = "jointGroupByStatement"),
@JsonSubTypes.Type(value = JointStatement.class, name = "jointStatement"),
@JsonSubTypes.Type(value = JointWhereGBHavingStatement.class, name = "jointWhereGBHavingStatement"),
@JsonSubTypes.Type(value = JointWhereGroupByStatement.class, name = "jointWhereGroupByStatement"),
@JsonSubTypes.Type(value = JointWhereStatement.class, name = "jointWhereStatement"),
@JsonSubTypes.Type(value = WindowQueryStatement.class, name = "windowQueryStatement")
})
public class QueryStatement extends Statement {
private static final Logger logger = LoggerFactory.getLogger(QueryStatement.class);

Expand Down Expand Up @@ -90,39 +116,40 @@ private List<SQLFunction> buildFunction() {
Calculator calculator = selectFieldAndCalculator.get(field);

String fieldName = field.getFieldName();
String asName = !StringUtils.isEmpty(field.getAsFieldName()) ? field.getAsFieldName() : field.getFieldName();
String asName = getAsName(field);
String newName = !StringUtils.isEmpty(asName) ? asName : field.getFieldName();

SQLFunction function;
if (calculator == null) {
function = new EmptyFunction(fieldName, asName);
function = new EmptyFunction(fieldName, newName);
} else {
switch (calculator) {
case COUNT: {
function = new CountFunction(fieldName, asName);
function = new CountFunction(fieldName, newName);
break;
}
case MAX: {
function = new MaxFunction(fieldName, asName);
function = new MaxFunction(fieldName, newName);
break;
}
case MIN: {
function = new MinFunction(fieldName, asName);
function = new MinFunction(fieldName, newName);
break;
}
case SUM: {
function = new SumFunction(fieldName, asName);
function = new SumFunction(fieldName, newName);
break;
}
case AVG: {
function = new AVGFunction(fieldName, asName);
function = new AVGFunction(fieldName, newName);
break;
}
case WINDOW_START: {
function = new WindowBoundaryTimeFunction(Constant.WINDOW_START_TIME, asName);
function = new WindowBoundaryTimeFunction(Constant.WINDOW_START_TIME, newName);
break;
}
case WINDOW_END: {
function = new WindowBoundaryTimeFunction(Constant.WINDOW_END_TIME, asName);
function = new WindowBoundaryTimeFunction(Constant.WINDOW_END_TIME, newName);
break;
}
default: {
Expand Down Expand Up @@ -277,10 +304,20 @@ protected HashMap<String, String> fieldName2AsName() {

HashMap<String, String> result = new HashMap<>();
for (Field field : fields) {
String asName = !StringUtils.isEmpty(field.getAsFieldName()) ? field.getAsFieldName() : field.getFieldName();
result.put(field.getFieldName(), asName);
String asName = getAsName(field);
String newName = !StringUtils.isEmpty(asName) ? asName : field.getFieldName();
result.put(field.getFieldName(), newName);
}

return result;
}

private String getAsName(Field field) {
String asFieldName = field.getAsFieldName();
if (StringUtils.isEmpty(asFieldName) || "null".equalsIgnoreCase(asFieldName)) {
return null;
}

return asFieldName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

import com.alibaba.rsqldb.parser.model.Field;
import com.alibaba.rsqldb.parser.model.Node;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.concurrent.TimeUnit;

@JsonIgnoreProperties(ignoreUnknown = true)
public class WindowInfoInSQL extends Node {
public enum WindowType {
TUMBLE, HOP, SESSION
Expand Down Expand Up @@ -51,8 +55,9 @@ public int getIndex() {
private FirstWordInSQL firstWordInSQL;
private String newFieldName;


public WindowInfoInSQL(String content, WindowType type, long slide, long size, Field timeField) {
@JsonCreator
public WindowInfoInSQL(@JsonProperty("content") String content, @JsonProperty("type") WindowType type,
@JsonProperty("slide") long slide, @JsonProperty("size") long size, @JsonProperty("timeField") Field timeField) {
super(content);
this.type = type;
this.slide = slide;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
*/
package com.alibaba.rsqldb.parser.serialization;

import com.alibaba.rsqldb.common.exception.DeserializeException;
import com.fasterxml.jackson.databind.JsonNode;

public interface Deserializer {
JsonNode deserialize(byte[] source);
JsonNode deserialize(byte[] source) throws DeserializeException;

default <T> T deserialize(byte[] source, Class<T> clazz) {
default <T> T deserialize(byte[] source, Class<T> clazz) throws DeserializeException {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.alibaba.rsqldb.common.exception.DeserializeException;
import com.alibaba.rsqldb.parser.model.Field;
import com.alibaba.rsqldb.parser.model.expression.Expression;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
Expand Down Expand Up @@ -47,7 +46,7 @@ public JsonNode deserialize(byte[] source) throws DeserializeException {
}

@Override
public <T> T deserialize(byte[] source, Class<T> clazz) {
public <T> T deserialize(byte[] source, Class<T> clazz) throws DeserializeException {
if (source == null || source.length == 0) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@
import org.apache.rocketmq.streams.core.serialization.KeyValueSerializer;

public class JsonObjectKVSer<V> implements KeyValueSerializer<Object, V> {
private final Serializer serializer;

public JsonObjectKVSer(Serializer serializer) {
this.serializer = serializer;
}

@Override
public byte[] serialize(Object o, V data) throws Throwable {
return new byte[0];
return serializer.serialize(o, data);
}
}
Loading

0 comments on commit f466b49

Please sign in to comment.