From d176d64466bbb3a5ce18c40e0b7081887ca9506c Mon Sep 17 00:00:00 2001 From: Julien Ruaux Date: Sun, 6 Oct 2019 00:33:14 +0200 Subject: [PATCH] fixed max item count issue --- .gitignore | 1 + .../batch/ThrottlingItemStreamReader.java | 31 +++++++++++++++ .../com/redislabs/riot/cli/RedisCommand.java | 2 +- .../redislabs/riot/cli/TransferCommand.java | 5 +++ .../riot/cli/redis/CommandOptions.java | 3 ++ .../riot/generator/GeneratorReader.java | 5 ++- .../riot/redis/writer/DebugMapWriter.java | 38 +++++++++++++++++++ .../java/com/redislabs/riot/TestFile.java | 7 +++- src/test/resources/timestamp.json | 12 ++++++ 9 files changed, 100 insertions(+), 4 deletions(-) create mode 100644 src/main/java/com/redislabs/riot/batch/ThrottlingItemStreamReader.java create mode 100644 src/main/java/com/redislabs/riot/redis/writer/DebugMapWriter.java create mode 100644 src/test/resources/timestamp.json diff --git a/.gitignore b/.gitignore index 0af161d69..921cac7ac 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,4 @@ buildNumber.properties bin/ /build/ /.gradle/ +*.log \ No newline at end of file diff --git a/src/main/java/com/redislabs/riot/batch/ThrottlingItemStreamReader.java b/src/main/java/com/redislabs/riot/batch/ThrottlingItemStreamReader.java new file mode 100644 index 000000000..4fab13a95 --- /dev/null +++ b/src/main/java/com/redislabs/riot/batch/ThrottlingItemStreamReader.java @@ -0,0 +1,31 @@ +package com.redislabs.riot.batch; + +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.ItemStreamException; +import org.springframework.batch.item.ItemStreamReader; + +public class ThrottlingItemStreamReader extends ThrottlingItemReader implements ItemStreamReader { + + private ItemStreamReader reader; + + public ThrottlingItemStreamReader(ItemStreamReader reader, long sleep) { + super(reader, sleep); + this.reader = reader; + } + + @Override + public void open(ExecutionContext executionContext) throws ItemStreamException { + reader.open(executionContext); + } + + @Override + public void update(ExecutionContext executionContext) throws ItemStreamException { + reader.update(executionContext); + } + + @Override + public void close() throws ItemStreamException { + reader.close(); + } + +} diff --git a/src/main/java/com/redislabs/riot/cli/RedisCommand.java b/src/main/java/com/redislabs/riot/cli/RedisCommand.java index 66badb5e3..065fbcfa9 100644 --- a/src/main/java/com/redislabs/riot/cli/RedisCommand.java +++ b/src/main/java/com/redislabs/riot/cli/RedisCommand.java @@ -2,6 +2,6 @@ public enum RedisCommand { - evalsha, expire, geoadd, hmset, lpush, noop, rpush, sadd, set, xadd, zadd; + print, evalsha, expire, geoadd, hmset, lpush, noop, rpush, sadd, set, xadd, zadd; } diff --git a/src/main/java/com/redislabs/riot/cli/TransferCommand.java b/src/main/java/com/redislabs/riot/cli/TransferCommand.java index 708c04cfe..3e6f8726d 100644 --- a/src/main/java/com/redislabs/riot/cli/TransferCommand.java +++ b/src/main/java/com/redislabs/riot/cli/TransferCommand.java @@ -11,11 +11,13 @@ import org.springframework.batch.core.StepExecution; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemStreamReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; import com.redislabs.riot.batch.JobExecutor; import com.redislabs.riot.batch.ThrottlingItemReader; +import com.redislabs.riot.batch.ThrottlingItemStreamReader; import picocli.CommandLine.Model.CommandSpec; import picocli.CommandLine.Option; @@ -50,6 +52,9 @@ private ItemReader throttle(ItemReader reader) { if (sleep == null) { return reader; } + if (reader instanceof ItemStreamReader) { + return new ThrottlingItemStreamReader((ItemStreamReader) reader, sleep); + } return new ThrottlingItemReader(reader, sleep); } diff --git a/src/main/java/com/redislabs/riot/cli/redis/CommandOptions.java b/src/main/java/com/redislabs/riot/cli/redis/CommandOptions.java index d18a3ccc4..8b910c75b 100644 --- a/src/main/java/com/redislabs/riot/cli/redis/CommandOptions.java +++ b/src/main/java/com/redislabs/riot/cli/redis/CommandOptions.java @@ -2,6 +2,7 @@ import com.redislabs.riot.cli.RedisCommand; import com.redislabs.riot.redis.writer.CollectionMapWriter; +import com.redislabs.riot.redis.writer.DebugMapWriter; import com.redislabs.riot.redis.writer.HmsetMapWriter; import com.redislabs.riot.redis.writer.LpushMapWriter; import com.redislabs.riot.redis.writer.NoopMapWriter; @@ -60,6 +61,8 @@ private RedisMapWriter redisItemWriter(RedisCommand command) { return stream.writer(); case zadd: return zset.writer(); + case print: + return new DebugMapWriter(); case noop: return new NoopMapWriter(); default: diff --git a/src/main/java/com/redislabs/riot/generator/GeneratorReader.java b/src/main/java/com/redislabs/riot/generator/GeneratorReader.java index f7be422d6..f4e0bcadd 100644 --- a/src/main/java/com/redislabs/riot/generator/GeneratorReader.java +++ b/src/main/java/com/redislabs/riot/generator/GeneratorReader.java @@ -26,7 +26,7 @@ public class GeneratorReader extends AbstractItemCountingItemStreamItemReader count = new ThreadLocal<>(); private ThreadLocal partition = new ThreadLocal<>(); private int partitions; - private int maxItemCount; + private Integer maxItemCount; private int partitionSize; private Locale locale; private Map fieldExpressions; @@ -59,10 +59,11 @@ public int partition() { public void open(ExecutionContext executionContext) throws ItemStreamException { this.partition.set(IndexedPartitioner.getPartitionIndex(executionContext)); this.partitions = IndexedPartitioner.getPartitions(executionContext); - this.partitionSize = maxItemCount / partitions; + this.partitionSize = maxItemCount == null ? Integer.MAX_VALUE : (maxItemCount / partitions); super.open(executionContext); } + @Override public void setMaxItemCount(int count) { this.maxItemCount = count; super.setMaxItemCount(count); diff --git a/src/main/java/com/redislabs/riot/redis/writer/DebugMapWriter.java b/src/main/java/com/redislabs/riot/redis/writer/DebugMapWriter.java new file mode 100644 index 000000000..e51770c70 --- /dev/null +++ b/src/main/java/com/redislabs/riot/redis/writer/DebugMapWriter.java @@ -0,0 +1,38 @@ +package com.redislabs.riot.redis.writer; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.lettuce.core.RedisFuture; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.Response; + +public class DebugMapWriter implements RedisMapWriter { + + private final Logger log = LoggerFactory.getLogger(DebugMapWriter.class); + + @Override + public RedisFuture write(Object commands, Map item) { + debug(item); + return null; + } + + private void debug(Map item) { + log.info("{}", item); + } + + @Override + public Response write(Pipeline pipeline, Map item) { + debug(item); + return null; + } + + @Override + public void write(JedisCluster cluster, Map item) { + debug(item); + } + +} diff --git a/src/test/java/com/redislabs/riot/TestFile.java b/src/test/java/com/redislabs/riot/TestFile.java index d37675461..58ca7a8f9 100644 --- a/src/test/java/com/redislabs/riot/TestFile.java +++ b/src/test/java/com/redislabs/riot/TestFile.java @@ -2,6 +2,10 @@ import java.io.File; import java.nio.file.Files; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -185,7 +189,8 @@ public void testImportCsvProcessorHashDateFormat() throws Exception { List keys = commands().keys("event:*"); Assertions.assertEquals(568, keys.size()); Map event = commands().hgetall("event:248206"); - Assertions.assertEquals("1512838800000", event.get("EpochStart")); + Instant date = Instant.ofEpochMilli(Long.parseLong(event.get("EpochStart"))); + Assertions.assertTrue(date.isBefore(Instant.now())); long index = Long.parseLong(event.get("index")); Assertions.assertTrue(index > 0); } diff --git a/src/test/resources/timestamp.json b/src/test/resources/timestamp.json new file mode 100644 index 000000000..bed82756c --- /dev/null +++ b/src/test/resources/timestamp.json @@ -0,0 +1,12 @@ +[ + { + "facility": 1234, + "userFullName": "Julien Ruaux", + "userId": "jruaux", + "sessionId": "c7ac5e75-33a2-466c-99be-1f112b394dg7", + "roles": "[RN, PCT]", + "timestamp": "2019-01-24T10:27:47.825Z", + "body": "What do you call someone who hangs around with computer professionals? A DBA", + "id": 123 + } +] \ No newline at end of file