Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #78]support evevt_time window #77

Merged
merged 1 commit into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class RSQLConstant {

public static final String BODY_TYPE = "body.type";

public static final String COMMAND_OPERATOR = "command_operator";
public static final String CONFIG_PREFIX = "configKey@";

public static final String TABLE_TYPE = "table.type";
public enum TableType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.alibaba.rsqldb.parser.impl;

import com.alibaba.rsqldb.common.RSQLConstant;
import com.alibaba.rsqldb.parser.model.statement.CreateTableStatement;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.DeserializationFeature;
Expand All @@ -23,6 +24,7 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.streams.core.metadata.StreamConfig;
import org.apache.rocketmq.streams.core.rstream.GroupedStream;
import org.apache.rocketmq.streams.core.rstream.JoinedStream;
import org.apache.rocketmq.streams.core.rstream.RStream;
Expand Down Expand Up @@ -140,6 +142,21 @@ public Object getHeader(String key) {
return this.header.get(key);
}

public Map<String, Object> getConfigSetAtBuild() {
HashMap<String, Object> result = new HashMap<>();

for (String key : header.keySet()) {
if (key.startsWith(RSQLConstant.CONFIG_PREFIX)) {
String configKey = key.substring(RSQLConstant.CONFIG_PREFIX.length());
Object value = header.get(key);

result.put(configKey, value);
}
}

return result;
}

public byte[] getInsertValueData() {
return insertValueData;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.alibaba.rsqldb.parser.model.expression.AndExpression;
import com.alibaba.rsqldb.parser.model.expression.Expression;
import com.alibaba.rsqldb.parser.model.expression.OrExpression;
import com.alibaba.rsqldb.parser.model.expression.SingleExpression;
import com.alibaba.rsqldb.parser.model.expression.SingleValueCalcuExpression;
import com.alibaba.rsqldb.parser.model.statement.Statement;
import com.alibaba.rsqldb.parser.model.statement.query.join.JointGroupByHavingStatement;
Expand All @@ -53,15 +52,13 @@
import org.apache.rocketmq.streams.core.function.accumulator.Accumulator;
import org.apache.rocketmq.streams.core.rstream.GroupedStream;
import org.apache.rocketmq.streams.core.rstream.RStream;
import org.apache.rocketmq.streams.core.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.alibaba.rsqldb.parser.model.statement.query;

import com.alibaba.rsqldb.common.RSQLConstant;
import com.alibaba.rsqldb.common.exception.SyntaxErrorException;
import com.alibaba.rsqldb.parser.impl.BuildContext;
import com.alibaba.rsqldb.parser.model.Calculator;
Expand All @@ -27,6 +28,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.rocketmq.streams.core.function.accumulator.Accumulator;
import org.apache.rocketmq.streams.core.metadata.StreamConfig;
import org.apache.rocketmq.streams.core.rstream.GroupedStream;
import org.apache.rocketmq.streams.core.rstream.RStream;
import org.apache.rocketmq.streams.core.rstream.WindowStream;
Expand Down Expand Up @@ -89,14 +91,16 @@ private void validator() {

@Override
public BuildContext build(BuildContext context) throws Throwable {
context.putHeader(RSQLConstant.CONFIG_PREFIX + StreamConfig.ALLOW_LATENESS_MILLISECOND, 10*1000);

RStream<JsonNode> rStream = context.getRStreamSource(this.getTableName());
RStream<JsonNode> stream = rStream.selectTimestamp(value -> {
String timeField = groupByWindow.getTimeField().getFieldName();
JsonNode node = value.get(timeField);
try {
return node.asLong();
} catch (Throwable t) {
logger.info("get time from value error, time field :[{}], value=[{}]", timeField, value);
logger.error("get time from value error, time field :[{}], value=[{}]", timeField, value);
throw t;
}
});
Expand Down Expand Up @@ -149,7 +153,7 @@ public BuildContext build(BuildContext context) throws Throwable {
return this.getHavingExpression().isTrue(value);
} catch (Throwable t) {
//使用错误,例如字段是string,使用>过滤;
logger.info("having filter error, sql:[{}], value=[{}]", WindowQueryStatement.this.getContent(), value, t);
logger.warn("having filter error, sql:[{}], value=[{}]", WindowQueryStatement.this.getContent(), value, t);
return false;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.streams.core.RocketMQStream;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.metadata.StreamConfig;
import org.apache.rocketmq.streams.core.topology.TopologyBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -212,7 +213,7 @@ private void startStream(Command command) throws Throwable {

Properties properties = new Properties();
properties.put(MixAll.NAMESRV_ADDR_PROPERTY, rsqlConfig.getNamesrvAddr());
properties.put(Constant.SKIP_DATA_ERROR, true);
properties.putAll(dispatch.getConfigSetAtBuild());

RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);
RocketMQStream previous = rStreams.put(jobId, rocketMQStream);
Expand Down