Skip to content

Commit

Permalink
fixed max item count issue
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Oct 5, 2019
1 parent 51e309e commit d176d64
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 4 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ buildNumber.properties
bin/
/build/
/.gradle/
*.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.redislabs.riot.batch;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;

public class ThrottlingItemStreamReader<T> extends ThrottlingItemReader<T> implements ItemStreamReader<T> {

private ItemStreamReader<T> reader;

public ThrottlingItemStreamReader(ItemStreamReader<T> reader, long sleep) {
super(reader, sleep);
this.reader = reader;
}

@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
reader.open(executionContext);
}

@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
reader.update(executionContext);
}

@Override
public void close() throws ItemStreamException {
reader.close();
}

}
2 changes: 1 addition & 1 deletion src/main/java/com/redislabs/riot/cli/RedisCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

public enum RedisCommand {

evalsha, expire, geoadd, hmset, lpush, noop, rpush, sadd, set, xadd, zadd;
print, evalsha, expire, geoadd, hmset, lpush, noop, rpush, sadd, set, xadd, zadd;

}
5 changes: 5 additions & 0 deletions src/main/java/com/redislabs/riot/cli/TransferCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;

import com.redislabs.riot.batch.JobExecutor;
import com.redislabs.riot.batch.ThrottlingItemReader;
import com.redislabs.riot.batch.ThrottlingItemStreamReader;

import picocli.CommandLine.Model.CommandSpec;
import picocli.CommandLine.Option;
Expand Down Expand Up @@ -50,6 +52,9 @@ private ItemReader throttle(ItemReader reader) {
if (sleep == null) {
return reader;
}
if (reader instanceof ItemStreamReader) {
return new ThrottlingItemStreamReader((ItemStreamReader) reader, sleep);
}
return new ThrottlingItemReader(reader, sleep);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.redislabs.riot.cli.RedisCommand;
import com.redislabs.riot.redis.writer.CollectionMapWriter;
import com.redislabs.riot.redis.writer.DebugMapWriter;
import com.redislabs.riot.redis.writer.HmsetMapWriter;
import com.redislabs.riot.redis.writer.LpushMapWriter;
import com.redislabs.riot.redis.writer.NoopMapWriter;
Expand Down Expand Up @@ -60,6 +61,8 @@ private RedisMapWriter redisItemWriter(RedisCommand command) {
return stream.writer();
case zadd:
return zset.writer();
case print:
return new DebugMapWriter();
case noop:
return new NoopMapWriter();
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class GeneratorReader extends AbstractItemCountingItemStreamItemReader<Ma
private ThreadLocal<Long> count = new ThreadLocal<>();
private ThreadLocal<Integer> partition = new ThreadLocal<>();
private int partitions;
private int maxItemCount;
private Integer maxItemCount;
private int partitionSize;
private Locale locale;
private Map<String, Expression> fieldExpressions;
Expand Down Expand Up @@ -59,10 +59,11 @@ public int partition() {
public void open(ExecutionContext executionContext) throws ItemStreamException {
this.partition.set(IndexedPartitioner.getPartitionIndex(executionContext));
this.partitions = IndexedPartitioner.getPartitions(executionContext);
this.partitionSize = maxItemCount / partitions;
this.partitionSize = maxItemCount == null ? Integer.MAX_VALUE : (maxItemCount / partitions);
super.open(executionContext);
}

@Override
public void setMaxItemCount(int count) {
this.maxItemCount = count;
super.setMaxItemCount(count);
Expand Down
38 changes: 38 additions & 0 deletions src/main/java/com/redislabs/riot/redis/writer/DebugMapWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.redislabs.riot.redis.writer;

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.lettuce.core.RedisFuture;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;

public class DebugMapWriter implements RedisMapWriter {

private final Logger log = LoggerFactory.getLogger(DebugMapWriter.class);

@Override
public RedisFuture<?> write(Object commands, Map<String, Object> item) {
debug(item);
return null;
}

private void debug(Map<String, Object> item) {
log.info("{}", item);
}

@Override
public Response<?> write(Pipeline pipeline, Map<String, Object> item) {
debug(item);
return null;
}

@Override
public void write(JedisCluster cluster, Map<String, Object> item) {
debug(item);
}

}
7 changes: 6 additions & 1 deletion src/test/java/com/redislabs/riot/TestFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import java.io.File;
import java.nio.file.Files;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -185,7 +189,8 @@ public void testImportCsvProcessorHashDateFormat() throws Exception {
List<String> keys = commands().keys("event:*");
Assertions.assertEquals(568, keys.size());
Map<String, String> event = commands().hgetall("event:248206");
Assertions.assertEquals("1512838800000", event.get("EpochStart"));
Instant date = Instant.ofEpochMilli(Long.parseLong(event.get("EpochStart")));
Assertions.assertTrue(date.isBefore(Instant.now()));
long index = Long.parseLong(event.get("index"));
Assertions.assertTrue(index > 0);
}
Expand Down
12 changes: 12 additions & 0 deletions src/test/resources/timestamp.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[
{
"facility": 1234,
"userFullName": "Julien Ruaux",
"userId": "jruaux",
"sessionId": "c7ac5e75-33a2-466c-99be-1f112b394dg7",
"roles": "[RN, PCT]",
"timestamp": "2019-01-24T10:27:47.825Z",
"body": "What do you call someone who hangs around with computer professionals? A DBA",
"id": 123
}
]

0 comments on commit d176d64

Please sign in to comment.