From 50051255949ae7607f55489c15c97688eaa885d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BB=B4=E7=AB=A0?= Date: Wed, 1 Feb 2023 20:55:33 +0800 Subject: [PATCH] support evevt_time window --- .../com/alibaba/rsqldb/common/RSQLConstant.java | 2 +- .../rsqldb/parser/impl/BuildContext.java | 17 +++++++++++++++++ .../model/statement/query/QueryStatement.java | 3 --- .../statement/query/WindowQueryStatement.java | 8 ++++++-- .../rsqldb/rest/service/iml/RSQLEngin.java | 3 ++- 5 files changed, 26 insertions(+), 7 deletions(-) diff --git a/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/RSQLConstant.java b/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/RSQLConstant.java index eae1ec5..ce6da94 100644 --- a/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/RSQLConstant.java +++ b/rsqldb-common/src/main/java/com/alibaba/rsqldb/common/RSQLConstant.java @@ -21,7 +21,7 @@ public class RSQLConstant { public static final String BODY_TYPE = "body.type"; - public static final String COMMAND_OPERATOR = "command_operator"; + public static final String CONFIG_PREFIX = "configKey@"; public static final String TABLE_TYPE = "table.type"; public enum TableType { diff --git a/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/impl/BuildContext.java b/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/impl/BuildContext.java index 5baa8bf..ba3f278 100644 --- a/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/impl/BuildContext.java +++ b/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/impl/BuildContext.java @@ -15,6 +15,7 @@ */ package com.alibaba.rsqldb.parser.impl; +import com.alibaba.rsqldb.common.RSQLConstant; import com.alibaba.rsqldb.parser.model.statement.CreateTableStatement; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -23,6 +24,7 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.streams.core.metadata.StreamConfig; import org.apache.rocketmq.streams.core.rstream.GroupedStream; import org.apache.rocketmq.streams.core.rstream.JoinedStream; import org.apache.rocketmq.streams.core.rstream.RStream; @@ -140,6 +142,21 @@ public Object getHeader(String key) { return this.header.get(key); } + public Map getConfigSetAtBuild() { + HashMap result = new HashMap<>(); + + for (String key : header.keySet()) { + if (key.startsWith(RSQLConstant.CONFIG_PREFIX)) { + String configKey = key.substring(RSQLConstant.CONFIG_PREFIX.length()); + Object value = header.get(key); + + result.put(configKey, value); + } + } + + return result; + } + public byte[] getInsertValueData() { return insertValueData; } diff --git a/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/statement/query/QueryStatement.java b/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/statement/query/QueryStatement.java index 5cc3124..baa6016 100644 --- a/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/statement/query/QueryStatement.java +++ b/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/statement/query/QueryStatement.java @@ -32,7 +32,6 @@ import com.alibaba.rsqldb.parser.model.expression.AndExpression; import com.alibaba.rsqldb.parser.model.expression.Expression; import com.alibaba.rsqldb.parser.model.expression.OrExpression; -import com.alibaba.rsqldb.parser.model.expression.SingleExpression; import com.alibaba.rsqldb.parser.model.expression.SingleValueCalcuExpression; import com.alibaba.rsqldb.parser.model.statement.Statement; import com.alibaba.rsqldb.parser.model.statement.query.join.JointGroupByHavingStatement; @@ -53,7 +52,6 @@ import org.apache.rocketmq.streams.core.function.accumulator.Accumulator; import org.apache.rocketmq.streams.core.rstream.GroupedStream; import org.apache.rocketmq.streams.core.rstream.RStream; -import org.apache.rocketmq.streams.core.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +59,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; diff --git a/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/statement/query/WindowQueryStatement.java b/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/statement/query/WindowQueryStatement.java index ad936d2..f8674cc 100644 --- a/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/statement/query/WindowQueryStatement.java +++ b/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/model/statement/query/WindowQueryStatement.java @@ -15,6 +15,7 @@ */ package com.alibaba.rsqldb.parser.model.statement.query; +import com.alibaba.rsqldb.common.RSQLConstant; import com.alibaba.rsqldb.common.exception.SyntaxErrorException; import com.alibaba.rsqldb.parser.impl.BuildContext; import com.alibaba.rsqldb.parser.model.Calculator; @@ -27,6 +28,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.rocketmq.streams.core.function.accumulator.Accumulator; +import org.apache.rocketmq.streams.core.metadata.StreamConfig; import org.apache.rocketmq.streams.core.rstream.GroupedStream; import org.apache.rocketmq.streams.core.rstream.RStream; import org.apache.rocketmq.streams.core.rstream.WindowStream; @@ -89,6 +91,8 @@ private void validator() { @Override public BuildContext build(BuildContext context) throws Throwable { + context.putHeader(RSQLConstant.CONFIG_PREFIX + StreamConfig.ALLOW_LATENESS_MILLISECOND, 10*1000); + RStream rStream = context.getRStreamSource(this.getTableName()); RStream stream = rStream.selectTimestamp(value -> { String timeField = groupByWindow.getTimeField().getFieldName(); @@ -96,7 +100,7 @@ public BuildContext build(BuildContext context) throws Throwable { try { return node.asLong(); } catch (Throwable t) { - logger.info("get time from value error, time field :[{}], value=[{}]", timeField, value); + logger.error("get time from value error, time field :[{}], value=[{}]", timeField, value); throw t; } }); @@ -149,7 +153,7 @@ public BuildContext build(BuildContext context) throws Throwable { return this.getHavingExpression().isTrue(value); } catch (Throwable t) { //使用错误,例如字段是string,使用>过滤; - logger.info("having filter error, sql:[{}], value=[{}]", WindowQueryStatement.this.getContent(), value, t); + logger.warn("having filter error, sql:[{}], value=[{}]", WindowQueryStatement.this.getContent(), value, t); return false; } }); diff --git a/rsqldb-rest/src/main/java/com/alibaba/rsqldb/rest/service/iml/RSQLEngin.java b/rsqldb-rest/src/main/java/com/alibaba/rsqldb/rest/service/iml/RSQLEngin.java index c87713f..ad37500 100644 --- a/rsqldb-rest/src/main/java/com/alibaba/rsqldb/rest/service/iml/RSQLEngin.java +++ b/rsqldb-rest/src/main/java/com/alibaba/rsqldb/rest/service/iml/RSQLEngin.java @@ -35,6 +35,7 @@ import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.streams.core.RocketMQStream; import org.apache.rocketmq.streams.core.common.Constant; +import org.apache.rocketmq.streams.core.metadata.StreamConfig; import org.apache.rocketmq.streams.core.topology.TopologyBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -212,7 +213,7 @@ private void startStream(Command command) throws Throwable { Properties properties = new Properties(); properties.put(MixAll.NAMESRV_ADDR_PROPERTY, rsqlConfig.getNamesrvAddr()); - properties.put(Constant.SKIP_DATA_ERROR, true); + properties.putAll(dispatch.getConfigSetAtBuild()); RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties); RocketMQStream previous = rStreams.put(jobId, rocketMQStream);