Skip to content

Commit

Permalink
Merge pull request #77 from ni-ze/main
Browse files Browse the repository at this point in the history
[ISSUE #78]support evevt_time window
  • Loading branch information
ni-ze authored Feb 6, 2023
2 parents f92b12b + 5005125 commit 674ccc6
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class RSQLConstant {

public static final String BODY_TYPE = "body.type";

public static final String COMMAND_OPERATOR = "command_operator";
public static final String CONFIG_PREFIX = "configKey@";

public static final String TABLE_TYPE = "table.type";
public enum TableType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.alibaba.rsqldb.parser.impl;

import com.alibaba.rsqldb.common.RSQLConstant;
import com.alibaba.rsqldb.parser.model.statement.CreateTableStatement;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.DeserializationFeature;
Expand All @@ -23,6 +24,7 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.streams.core.metadata.StreamConfig;
import org.apache.rocketmq.streams.core.rstream.GroupedStream;
import org.apache.rocketmq.streams.core.rstream.JoinedStream;
import org.apache.rocketmq.streams.core.rstream.RStream;
Expand Down Expand Up @@ -140,6 +142,21 @@ public Object getHeader(String key) {
return this.header.get(key);
}

public Map<String, Object> getConfigSetAtBuild() {
HashMap<String, Object> result = new HashMap<>();

for (String key : header.keySet()) {
if (key.startsWith(RSQLConstant.CONFIG_PREFIX)) {
String configKey = key.substring(RSQLConstant.CONFIG_PREFIX.length());
Object value = header.get(key);

result.put(configKey, value);
}
}

return result;
}

public byte[] getInsertValueData() {
return insertValueData;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.alibaba.rsqldb.parser.model.expression.AndExpression;
import com.alibaba.rsqldb.parser.model.expression.Expression;
import com.alibaba.rsqldb.parser.model.expression.OrExpression;
import com.alibaba.rsqldb.parser.model.expression.SingleExpression;
import com.alibaba.rsqldb.parser.model.expression.SingleValueCalcuExpression;
import com.alibaba.rsqldb.parser.model.statement.Statement;
import com.alibaba.rsqldb.parser.model.statement.query.join.JointGroupByHavingStatement;
Expand All @@ -53,15 +52,13 @@
import org.apache.rocketmq.streams.core.function.accumulator.Accumulator;
import org.apache.rocketmq.streams.core.rstream.GroupedStream;
import org.apache.rocketmq.streams.core.rstream.RStream;
import org.apache.rocketmq.streams.core.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.alibaba.rsqldb.parser.model.statement.query;

import com.alibaba.rsqldb.common.RSQLConstant;
import com.alibaba.rsqldb.common.exception.SyntaxErrorException;
import com.alibaba.rsqldb.parser.impl.BuildContext;
import com.alibaba.rsqldb.parser.model.Calculator;
Expand All @@ -27,6 +28,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.rocketmq.streams.core.function.accumulator.Accumulator;
import org.apache.rocketmq.streams.core.metadata.StreamConfig;
import org.apache.rocketmq.streams.core.rstream.GroupedStream;
import org.apache.rocketmq.streams.core.rstream.RStream;
import org.apache.rocketmq.streams.core.rstream.WindowStream;
Expand Down Expand Up @@ -89,14 +91,16 @@ private void validator() {

@Override
public BuildContext build(BuildContext context) throws Throwable {
context.putHeader(RSQLConstant.CONFIG_PREFIX + StreamConfig.ALLOW_LATENESS_MILLISECOND, 10*1000);

RStream<JsonNode> rStream = context.getRStreamSource(this.getTableName());
RStream<JsonNode> stream = rStream.selectTimestamp(value -> {
String timeField = groupByWindow.getTimeField().getFieldName();
JsonNode node = value.get(timeField);
try {
return node.asLong();
} catch (Throwable t) {
logger.info("get time from value error, time field :[{}], value=[{}]", timeField, value);
logger.error("get time from value error, time field :[{}], value=[{}]", timeField, value);
throw t;
}
});
Expand Down Expand Up @@ -149,7 +153,7 @@ public BuildContext build(BuildContext context) throws Throwable {
return this.getHavingExpression().isTrue(value);
} catch (Throwable t) {
//使用错误,例如字段是string,使用>过滤;
logger.info("having filter error, sql:[{}], value=[{}]", WindowQueryStatement.this.getContent(), value, t);
logger.warn("having filter error, sql:[{}], value=[{}]", WindowQueryStatement.this.getContent(), value, t);
return false;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.streams.core.RocketMQStream;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.metadata.StreamConfig;
import org.apache.rocketmq.streams.core.topology.TopologyBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -212,7 +213,7 @@ private void startStream(Command command) throws Throwable {

Properties properties = new Properties();
properties.put(MixAll.NAMESRV_ADDR_PROPERTY, rsqlConfig.getNamesrvAddr());
properties.put(Constant.SKIP_DATA_ERROR, true);
properties.putAll(dispatch.getConfigSetAtBuild());

RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);
RocketMQStream previous = rStreams.put(jobId, rocketMQStream);
Expand Down

0 comments on commit 674ccc6

Please sign in to comment.