Skip to content

Commit

Permalink
Merge pull request #69 from ni-ze/main
Browse files Browse the repository at this point in the history
[ISSUE #70] Make SQL "where field in(..)" right
  • Loading branch information
ni-ze authored Jan 27, 2023
2 parents 347d499 + e8b1dd8 commit 0d16603
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@

import com.alibaba.rsqldb.common.RSQLConstant;
import com.alibaba.rsqldb.common.exception.RSQLServerException;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

@JsonIgnoreProperties(ignoreUnknown = true)
public class AVGFunction implements SQLFunction {
private String fieldName;
private String asName;

public AVGFunction(String fieldName, String asName) {
@JsonCreator
public AVGFunction(@JsonProperty("fieldName")String fieldName, @JsonProperty("asName")String asName) {
this.fieldName = fieldName;
this.asName = asName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@
package com.alibaba.rsqldb.common.function;

import com.alibaba.rsqldb.common.RSQLConstant;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;

import java.math.BigDecimal;
import java.util.concurrent.ConcurrentHashMap;

@JsonIgnoreProperties(ignoreUnknown = true)
public class CountFunction implements SQLFunction {
private String fieldName;
private String asName;

public CountFunction(String fieldName, String asName) {
public CountFunction(@JsonProperty("fieldName")String fieldName, @JsonProperty("asName")String asName) {
this.fieldName = fieldName;
this.asName = asName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@
*/
package com.alibaba.rsqldb.common.function;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;

import java.util.concurrent.ConcurrentHashMap;

@JsonIgnoreProperties(ignoreUnknown = true)
public class EmptyFunction implements SQLFunction {
private String fieldName;
private String asName;

public EmptyFunction(String fieldName, String asName) {
@JsonCreator
public EmptyFunction(@JsonProperty("fieldName")String fieldName, @JsonProperty("asName")String asName) {
this.fieldName = fieldName;
this.asName = asName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@
*/
package com.alibaba.rsqldb.common.function;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;

import java.math.BigDecimal;
import java.util.concurrent.ConcurrentHashMap;

@JsonIgnoreProperties(ignoreUnknown = true)
public class MaxFunction implements SQLFunction {
private String fieldName;
private String asName;

public MaxFunction(String fieldName, String asName) {
@JsonCreator
public MaxFunction(@JsonProperty("fieldName")String fieldName, @JsonProperty("asName")String asName) {
this.fieldName = fieldName;
this.asName = asName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@
*/
package com.alibaba.rsqldb.common.function;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;

import java.math.BigDecimal;
import java.util.concurrent.ConcurrentHashMap;

@JsonIgnoreProperties(ignoreUnknown = true)
public class MinFunction implements SQLFunction {
private String fieldName;
private String asName;

public MinFunction(String fieldName, String asName) {
@JsonCreator
public MinFunction(@JsonProperty("fieldName")String fieldName, @JsonProperty("asName")String asName) {
this.fieldName = fieldName;
this.asName = asName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,26 @@
*/
package com.alibaba.rsqldb.common.function;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.JsonNode;

import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY
)
@JsonSubTypes({
@JsonSubTypes.Type(value = AVGFunction.class, name = "aVGFunction"),
@JsonSubTypes.Type(value = CountFunction.class, name = "countFunction"),
@JsonSubTypes.Type(value = EmptyFunction.class, name = "emptyFunction"),
@JsonSubTypes.Type(value = MaxFunction.class, name = "maxFunction"),
@JsonSubTypes.Type(value = MinFunction.class, name = "minFunction"),
@JsonSubTypes.Type(value = SumFunction.class, name = "sumFunction"),
@JsonSubTypes.Type(value = WindowBoundaryTimeFunction.class, name = "windowBoundaryTimeFunction"),
})
public interface SQLFunction {
void apply(JsonNode jsonNode, final ConcurrentHashMap<String, Object> container);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,44 @@
*/
package com.alibaba.rsqldb.common.function;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NumericNode;

import java.math.BigDecimal;
import java.util.concurrent.ConcurrentHashMap;

@JsonIgnoreProperties(ignoreUnknown = true)
public class SumFunction implements SQLFunction {
private String fieldName;
private String asName;

public SumFunction(String fieldName, String asName) {
@JsonCreator
public SumFunction(@JsonProperty("fieldName")String fieldName, @JsonProperty("asName")String asName) {
this.fieldName = fieldName;
this.asName = asName;
}

@Override
public void apply(JsonNode jsonNode, ConcurrentHashMap<String, Object> container) {
JsonNode valueNode = jsonNode.get(fieldName);
if (valueNode != null) {
if (valueNode instanceof NumericNode) {
String value = valueNode.asText();
BigDecimal newValue = new BigDecimal(value);

if (!container.containsKey(asName)) {
container.put(asName, newValue);
}else {
BigDecimal old = (BigDecimal)container.get(asName);
Object temp = container.get(asName);
BigDecimal old;
if (temp instanceof Number) {
old = new BigDecimal(String.valueOf(temp));
} else {
throw new RuntimeException("value is not number.");
}

BigDecimal add = old.add(newValue);

container.put(asName, add);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@
*/
package com.alibaba.rsqldb.common.function;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;

import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

@JsonIgnoreProperties(ignoreUnknown = true)
public class WindowBoundaryTimeFunction implements SQLFunction {
private String fieldName;
private String asName;

public WindowBoundaryTimeFunction(String fieldName, String asName) {
@JsonCreator
public WindowBoundaryTimeFunction(@JsonProperty("fieldName")String fieldName, @JsonProperty("asName")String asName) {
this.fieldName = fieldName;
this.asName = asName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,16 @@ public boolean isTrue(JsonNode jsonNode) {
return this.values == null;
}

boolean value = false;

List<Literal<?>> literals = values.getLiterals();
for (Literal<?> literal : literals) {
return super.isEqual(node, literal);
value = super.isEqual(node, literal);
if (value) {
break;
}
}
return false;

return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ public BuildContext build(BuildContext context) throws Throwable {
GroupedStream<String, ? extends JsonNode> groupedStream = context.getGroupedStreamResult();

if (windowStream != null) {
windowStream.sink(topicName, new JsonStringKVSer<>());
windowStream.sink(topicName, new JsonStringKVSer<>(serializer));
} else if (groupedStream != null) {
groupedStream.sink(topicName, new JsonStringKVSer<>());
groupedStream.sink(topicName, new JsonStringKVSer<>(serializer));
} else {
stream.sink(topicName, new JsonObjectKVSer<>(serializer));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
package com.alibaba.rsqldb.parser.model.statement.query;

import com.alibaba.rsqldb.common.function.SQLFunction;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.rocketmq.streams.core.function.accumulator.Accumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.util.ArrayList;
Expand All @@ -31,9 +34,12 @@

@JsonIgnoreProperties(ignoreUnknown = true)
public class RSQLAccumulator implements Accumulator<JsonNode, ObjectNode> {
private static final Logger logger = LoggerFactory.getLogger(RSQLAccumulator.class);

private List<SQLFunction> sqlFunctions;
private ConcurrentHashMap<String, Object> container = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, Object> tempHolder = new ConcurrentHashMap<>();

@JsonCreator
public RSQLAccumulator(@JsonProperty("sqlFunctions") List<SQLFunction> sqlFunctions) {
this.sqlFunctions = sqlFunctions;
}
Expand All @@ -45,7 +51,7 @@ public void addValue(JsonNode value) {
}

for (SQLFunction function : sqlFunctions) {
function.apply(value, container);
function.apply(value, tempHolder);
}
}

Expand All @@ -59,20 +65,28 @@ public void merge(Accumulator<JsonNode, ObjectNode> other) {
public ObjectNode result(Properties context) {
//需要二次计算的,进行二次计算
for (SQLFunction function : sqlFunctions) {
function.secondCalcu(container, context);
function.secondCalcu(tempHolder, context);
}

ObjectNode node = JsonNodeFactory.instance.objectNode();
for (String key : container.keySet()) {
Object temp = container.get(key);
if (temp instanceof BigDecimal) {
BigDecimal value = (BigDecimal) temp;
node.put(key, value.toString());
for (String key : tempHolder.keySet()) {
Object temp = tempHolder.get(key);
if (temp instanceof Number) {
BigDecimal value = new BigDecimal(String.valueOf(temp));

String valueStr = value.toString();
if (valueStr.contains(".")) {
node.put(key, value.doubleValue());
} else {
node.put(key, value.longValue());
}

} else if (temp instanceof String) {
node.put(key, (String) temp);
} else if (temp instanceof JsonNode) {
node.set(key, (JsonNode) temp);
} else {
logger.error("unsupported type: " + temp.getClass());
throw new UnsupportedOperationException();
}
}
Expand All @@ -88,12 +102,12 @@ public void setSqlFunctions(List<SQLFunction> sqlFunctions) {
this.sqlFunctions = sqlFunctions;
}

public ConcurrentHashMap<String, Object> getContainer() {
return container;
public ConcurrentHashMap<String, Object> getTempHolder() {
return tempHolder;
}

public void setContainer(ConcurrentHashMap<String, Object> container) {
this.container = container;
public void setTempHolder(ConcurrentHashMap<String, Object> tempHolder) {
this.tempHolder = tempHolder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,29 @@ public byte[] serialize(Object obj) throws SerializeException {

@Override
public byte[] serialize(Object key, Object value) throws SerializeException {
if (key == null) {
return this.serialize(value);
}

try {
ObjectNode objectNode = objectMapper.createObjectNode();
// if (key == null) {
// return this.serialize(value);
// }

String valueAsString = objectMapper.writeValueAsString(value);
JsonNode valueJsonNode = objectMapper.readTree(valueAsString);
return this.serialize(value);

if (key.getClass().isPrimitive()) {
objectNode.set(String.valueOf(key), valueJsonNode);
} else {
throw new UnsupportedOperationException("key is not primitive.");
}

String result = objectNode.toPrettyString();

return objectMapper.writeValueAsBytes(result);
} catch (Throwable t) {
throw new SerializeException(t);
}
// try {
// ObjectNode objectNode = objectMapper.createObjectNode();
//
// String valueAsString = objectMapper.writeValueAsString(value);
// JsonNode valueJsonNode = objectMapper.readTree(valueAsString);
//
// if (key.getClass().isPrimitive()) {
// objectNode.set(String.valueOf(key), valueJsonNode);
// } else {
// throw new UnsupportedOperationException("key is not primitive.");
// }
//
// String result = objectNode.toPrettyString();
//
// return objectMapper.writeValueAsBytes(result);
// } catch (Throwable t) {
// throw new SerializeException(t);
// }
}
}
Loading

0 comments on commit 0d16603

Please sign in to comment.