diff --git a/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/AVGFunction.java b/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/AVGFunction.java index 1657e98..1be4812 100644 --- a/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/AVGFunction.java +++ b/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/AVGFunction.java @@ -17,6 +17,9 @@ import com.alibaba.rsqldb.common.RSQLConstant; import com.alibaba.rsqldb.common.exception.RSQLServerException; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; import java.math.BigDecimal; @@ -24,11 +27,13 @@ import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +@JsonIgnoreProperties(ignoreUnknown = true) public class AVGFunction implements SQLFunction { private String fieldName; private String asName; - public AVGFunction(String fieldName, String asName) { + @JsonCreator + public AVGFunction(@JsonProperty("fieldName")String fieldName, @JsonProperty("asName")String asName) { this.fieldName = fieldName; this.asName = asName; } diff --git a/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/CountFunction.java b/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/CountFunction.java index eca3305..4382904 100644 --- a/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/CountFunction.java +++ b/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/CountFunction.java @@ -16,16 +16,19 @@ package com.alibaba.rsqldb.common.function; import com.alibaba.rsqldb.common.RSQLConstant; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; import java.math.BigDecimal; import java.util.concurrent.ConcurrentHashMap; +@JsonIgnoreProperties(ignoreUnknown = true) public class CountFunction implements SQLFunction { private String fieldName; private String asName; - public CountFunction(String fieldName, String asName) { + public CountFunction(@JsonProperty("fieldName")String fieldName, @JsonProperty("asName")String asName) { this.fieldName = fieldName; this.asName = asName; } diff --git a/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/EmptyFunction.java b/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/EmptyFunction.java index 8f9c4b3..68fba0c 100644 --- a/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/EmptyFunction.java +++ b/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/EmptyFunction.java @@ -15,15 +15,20 @@ */ package com.alibaba.rsqldb.common.function; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; import java.util.concurrent.ConcurrentHashMap; +@JsonIgnoreProperties(ignoreUnknown = true) public class EmptyFunction implements SQLFunction { private String fieldName; private String asName; - public EmptyFunction(String fieldName, String asName) { + @JsonCreator + public EmptyFunction(@JsonProperty("fieldName")String fieldName, @JsonProperty("asName")String asName) { this.fieldName = fieldName; this.asName = asName; } diff --git a/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/MaxFunction.java b/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/MaxFunction.java index 47c7f7c..1f03409 100644 --- a/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/MaxFunction.java +++ b/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/MaxFunction.java @@ -15,16 +15,21 @@ */ package com.alibaba.rsqldb.common.function; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; import java.math.BigDecimal; import java.util.concurrent.ConcurrentHashMap; +@JsonIgnoreProperties(ignoreUnknown = true) public class MaxFunction implements SQLFunction { private String fieldName; private String asName; - public MaxFunction(String fieldName, String asName) { + @JsonCreator + public MaxFunction(@JsonProperty("fieldName")String fieldName, @JsonProperty("asName")String asName) { this.fieldName = fieldName; this.asName = asName; } diff --git a/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/MinFunction.java b/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/MinFunction.java index 67697a1..df0baed 100644 --- a/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/MinFunction.java +++ b/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/MinFunction.java @@ -15,16 +15,21 @@ */ package com.alibaba.rsqldb.common.function; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; import java.math.BigDecimal; import java.util.concurrent.ConcurrentHashMap; +@JsonIgnoreProperties(ignoreUnknown = true) public class MinFunction implements SQLFunction { private String fieldName; private String asName; - public MinFunction(String fieldName, String asName) { + @JsonCreator + public MinFunction(@JsonProperty("fieldName")String fieldName, @JsonProperty("asName")String asName) { this.fieldName = fieldName; this.asName = asName; } diff --git a/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/SQLFunction.java b/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/SQLFunction.java index 74ff694..2ba1470 100644 --- a/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/SQLFunction.java +++ b/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/SQLFunction.java @@ -15,11 +15,26 @@ */ package com.alibaba.rsqldb.common.function; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.databind.JsonNode; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY +) +@JsonSubTypes({ + @JsonSubTypes.Type(value = AVGFunction.class, name = "aVGFunction"), + @JsonSubTypes.Type(value = CountFunction.class, name = "countFunction"), + @JsonSubTypes.Type(value = EmptyFunction.class, name = "emptyFunction"), + @JsonSubTypes.Type(value = MaxFunction.class, name = "maxFunction"), + @JsonSubTypes.Type(value = MinFunction.class, name = "minFunction"), + @JsonSubTypes.Type(value = SumFunction.class, name = "sumFunction"), + @JsonSubTypes.Type(value = WindowBoundaryTimeFunction.class, name = "windowBoundaryTimeFunction"), +}) public interface SQLFunction { void apply(JsonNode jsonNode, final ConcurrentHashMap container); diff --git a/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/SumFunction.java b/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/SumFunction.java index 733d168..0f3b95e 100644 --- a/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/SumFunction.java +++ b/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/SumFunction.java @@ -15,16 +15,22 @@ */ package com.alibaba.rsqldb.common.function; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NumericNode; import java.math.BigDecimal; import java.util.concurrent.ConcurrentHashMap; +@JsonIgnoreProperties(ignoreUnknown = true) public class SumFunction implements SQLFunction { private String fieldName; private String asName; - public SumFunction(String fieldName, String asName) { + @JsonCreator + public SumFunction(@JsonProperty("fieldName")String fieldName, @JsonProperty("asName")String asName) { this.fieldName = fieldName; this.asName = asName; } @@ -32,14 +38,21 @@ public SumFunction(String fieldName, String asName) { @Override public void apply(JsonNode jsonNode, ConcurrentHashMap container) { JsonNode valueNode = jsonNode.get(fieldName); - if (valueNode != null) { + if (valueNode instanceof NumericNode) { String value = valueNode.asText(); BigDecimal newValue = new BigDecimal(value); if (!container.containsKey(asName)) { container.put(asName, newValue); }else { - BigDecimal old = (BigDecimal)container.get(asName); + Object temp = container.get(asName); + BigDecimal old; + if (temp instanceof Number) { + old = new BigDecimal(String.valueOf(temp)); + } else { + throw new RuntimeException("value is not number."); + } + BigDecimal add = old.add(newValue); container.put(asName, add); diff --git a/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/WindowBoundaryTimeFunction.java b/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/WindowBoundaryTimeFunction.java index 94adb55..4110a06 100644 --- a/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/WindowBoundaryTimeFunction.java +++ b/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/function/WindowBoundaryTimeFunction.java @@ -15,16 +15,21 @@ */ package com.alibaba.rsqldb.common.function; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +@JsonIgnoreProperties(ignoreUnknown = true) public class WindowBoundaryTimeFunction implements SQLFunction { private String fieldName; private String asName; - public WindowBoundaryTimeFunction(String fieldName, String asName) { + @JsonCreator + public WindowBoundaryTimeFunction(@JsonProperty("fieldName")String fieldName, @JsonProperty("asName")String asName) { this.fieldName = fieldName; this.asName = asName; } diff --git a/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/expression/MultiValueExpression.java b/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/expression/MultiValueExpression.java index 9f21b86..e70e74e 100644 --- a/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/expression/MultiValueExpression.java +++ b/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/expression/MultiValueExpression.java @@ -63,10 +63,16 @@ public boolean isTrue(JsonNode jsonNode) { return this.values == null; } + boolean value = false; + List> literals = values.getLiterals(); for (Literal literal : literals) { - return super.isEqual(node, literal); + value = super.isEqual(node, literal); + if (value) { + break; + } } - return false; + + return value; } } diff --git a/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/statement/CreateTableStatement.java b/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/statement/CreateTableStatement.java index b902f91..e8ad51d 100644 --- a/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/statement/CreateTableStatement.java +++ b/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/statement/CreateTableStatement.java @@ -198,9 +198,9 @@ public BuildContext build(BuildContext context) throws Throwable { GroupedStream groupedStream = context.getGroupedStreamResult(); if (windowStream != null) { - windowStream.sink(topicName, new JsonStringKVSer<>()); + windowStream.sink(topicName, new JsonStringKVSer<>(serializer)); } else if (groupedStream != null) { - groupedStream.sink(topicName, new JsonStringKVSer<>()); + groupedStream.sink(topicName, new JsonStringKVSer<>(serializer)); } else { stream.sink(topicName, new JsonObjectKVSer<>(serializer)); } diff --git a/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/statement/query/RSQLAccumulator.java b/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/statement/query/RSQLAccumulator.java index 85a7060..60f062c 100644 --- a/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/statement/query/RSQLAccumulator.java +++ b/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/statement/query/RSQLAccumulator.java @@ -16,12 +16,15 @@ package com.alibaba.rsqldb.parser.model.statement.query; import com.alibaba.rsqldb.common.function.SQLFunction; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.rocketmq.streams.core.function.accumulator.Accumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.math.BigDecimal; import java.util.ArrayList; @@ -31,9 +34,12 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class RSQLAccumulator implements Accumulator { + private static final Logger logger = LoggerFactory.getLogger(RSQLAccumulator.class); + private List sqlFunctions; - private ConcurrentHashMap container = new ConcurrentHashMap<>(); + private ConcurrentHashMap tempHolder = new ConcurrentHashMap<>(); + @JsonCreator public RSQLAccumulator(@JsonProperty("sqlFunctions") List sqlFunctions) { this.sqlFunctions = sqlFunctions; } @@ -45,7 +51,7 @@ public void addValue(JsonNode value) { } for (SQLFunction function : sqlFunctions) { - function.apply(value, container); + function.apply(value, tempHolder); } } @@ -59,20 +65,28 @@ public void merge(Accumulator other) { public ObjectNode result(Properties context) { //需要二次计算的,进行二次计算 for (SQLFunction function : sqlFunctions) { - function.secondCalcu(container, context); + function.secondCalcu(tempHolder, context); } ObjectNode node = JsonNodeFactory.instance.objectNode(); - for (String key : container.keySet()) { - Object temp = container.get(key); - if (temp instanceof BigDecimal) { - BigDecimal value = (BigDecimal) temp; - node.put(key, value.toString()); + for (String key : tempHolder.keySet()) { + Object temp = tempHolder.get(key); + if (temp instanceof Number) { + BigDecimal value = new BigDecimal(String.valueOf(temp)); + + String valueStr = value.toString(); + if (valueStr.contains(".")) { + node.put(key, value.doubleValue()); + } else { + node.put(key, value.longValue()); + } + } else if (temp instanceof String) { node.put(key, (String) temp); } else if (temp instanceof JsonNode) { node.set(key, (JsonNode) temp); } else { + logger.error("unsupported type: " + temp.getClass()); throw new UnsupportedOperationException(); } } @@ -88,12 +102,12 @@ public void setSqlFunctions(List sqlFunctions) { this.sqlFunctions = sqlFunctions; } - public ConcurrentHashMap getContainer() { - return container; + public ConcurrentHashMap getTempHolder() { + return tempHolder; } - public void setContainer(ConcurrentHashMap container) { - this.container = container; + public void setTempHolder(ConcurrentHashMap tempHolder) { + this.tempHolder = tempHolder; } @Override diff --git a/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/serialization/json/JsonSer.java b/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/serialization/json/JsonSer.java index 0173e88..3424dd5 100644 --- a/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/serialization/json/JsonSer.java +++ b/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/serialization/json/JsonSer.java @@ -38,27 +38,29 @@ public byte[] serialize(Object obj) throws SerializeException { @Override public byte[] serialize(Object key, Object value) throws SerializeException { - if (key == null) { - return this.serialize(value); - } - - try { - ObjectNode objectNode = objectMapper.createObjectNode(); +// if (key == null) { +// return this.serialize(value); +// } - String valueAsString = objectMapper.writeValueAsString(value); - JsonNode valueJsonNode = objectMapper.readTree(valueAsString); + return this.serialize(value); - if (key.getClass().isPrimitive()) { - objectNode.set(String.valueOf(key), valueJsonNode); - } else { - throw new UnsupportedOperationException("key is not primitive."); - } - - String result = objectNode.toPrettyString(); - - return objectMapper.writeValueAsBytes(result); - } catch (Throwable t) { - throw new SerializeException(t); - } +// try { +// ObjectNode objectNode = objectMapper.createObjectNode(); +// +// String valueAsString = objectMapper.writeValueAsString(value); +// JsonNode valueJsonNode = objectMapper.readTree(valueAsString); +// +// if (key.getClass().isPrimitive()) { +// objectNode.set(String.valueOf(key), valueJsonNode); +// } else { +// throw new UnsupportedOperationException("key is not primitive."); +// } +// +// String result = objectNode.toPrettyString(); +// +// return objectMapper.writeValueAsBytes(result); +// } catch (Throwable t) { +// throw new SerializeException(t); +// } } } diff --git a/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/serialization/json/JsonStringKVSer.java b/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/serialization/json/JsonStringKVSer.java index 6ab1524..1f6a460 100644 --- a/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/serialization/json/JsonStringKVSer.java +++ b/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/serialization/json/JsonStringKVSer.java @@ -16,12 +16,18 @@ package com.alibaba.rsqldb.parser.serialization.json; +import com.alibaba.rsqldb.parser.serialization.Serializer; import org.apache.rocketmq.streams.core.serialization.KeyValueSerializer; public class JsonStringKVSer implements KeyValueSerializer { + private final Serializer serializer; + + public JsonStringKVSer(Serializer serializer) { + this.serializer = serializer; + } @Override public byte[] serialize(String s, V data) throws Throwable { - return new byte[0]; + return this.serializer.serialize(s, data); } }