Skip to content

Commit

Permalink
fix: Check for null when building composite processor
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Apr 19, 2024
1 parent 15ac9bd commit 886a6be
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
import java.io.IOException;

import org.springframework.batch.core.Job;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.function.FunctionItemProcessor;
import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
import org.springframework.batch.item.json.JsonObjectMarshaller;
import org.springframework.core.io.Resource;
Expand Down Expand Up @@ -128,10 +126,7 @@ protected Job job() {
RedisItemReader<String, String, KeyValue<String, Object>> reader = RedisItemReader.struct();
reader.setClient(getRedisClient());
configureReader(reader);
ItemWriter<KeyValue<String, Object>> writer = writer();
ItemProcessor<KeyValue<String, Object>, KeyValue<String, Object>> processor = new FunctionItemProcessor<>(
processor(StringCodec.UTF8));
return jobBuilder().start(step(reader, processor, writer)).build();
return jobBuilder().start(step(reader, processor(StringCodec.UTF8), writer())).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.function.FunctionItemProcessor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.expression.spel.support.StandardEvaluationContext;

Expand All @@ -27,8 +27,9 @@
import com.redis.spring.batch.RedisItemReader.ReaderMode;
import com.redis.spring.batch.RedisItemWriter;
import com.redis.spring.batch.common.FlushingStepBuilder;
import com.redis.spring.batch.reader.KeyComparatorOptions;
import com.redis.spring.batch.reader.KeyComparatorOptions.StreamMessageIdPolicy;
import com.redis.spring.batch.reader.KeyComparisonItemReader;
import com.redis.spring.batch.reader.KeyComparisonItemReader.StreamMessageIdPolicy;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ReadFrom;
Expand All @@ -55,7 +56,7 @@ public class Replication extends AbstractExport {
private ReplicationType type = DEFAULT_TYPE;
private boolean showDiffs;
private CompareMode compareMode = DEFAULT_COMPARE_MODE;
private Duration ttlTolerance = KeyComparisonItemReader.DEFAULT_TTL_TOLERANCE;
private Duration ttlTolerance = KeyComparatorOptions.DEFAULT_TTL_TOLERANCE;
private RedisClientOptions targetRedisClientOptions = new RedisClientOptions();
private ReadFrom targetReadFrom;
private RedisWriterOptions writerOptions = new RedisWriterOptions();
Expand Down Expand Up @@ -98,8 +99,8 @@ protected void close() {

@Override
protected Job job() {
FunctionItemProcessor<KeyValue<byte[], Object>, KeyValue<byte[], Object>> processor = new FunctionItemProcessor<>(
processor(ByteArrayCodec.INSTANCE));
ItemProcessor<KeyValue<byte[], Object>, KeyValue<byte[], Object>> processor = processor(
ByteArrayCodec.INSTANCE);
SimpleStepBuilder<KeyValue<byte[], Object>, KeyValue<byte[], Object>> scanStep = stepBuilder(STEP_SCAN,
reader(), processor, writer());
RedisItemReader<byte[], byte[], KeyValue<byte[], Object>> liveReader = reader();
Expand Down Expand Up @@ -203,11 +204,17 @@ private KeyComparisonItemReader comparisonReader() {
reader.setTargetClient(targetRedisClient);
reader.setTargetPoolSize(writerOptions.getPoolSize());
reader.setTargetReadFrom(targetReadFrom);
reader.setTtlTolerance(ttlTolerance);
reader.setStreamMessageIdPolicy(streamMessageIdPolicy());
reader.setComparatorOptions(comparatorOptions());
return reader;
}

private KeyComparatorOptions comparatorOptions() {
KeyComparatorOptions options = new KeyComparatorOptions();
options.setTtlTolerance(ttlTolerance);
options.setStreamMessageIdPolicy(streamMessageIdPolicy());
return options;
}

private StreamMessageIdPolicy streamMessageIdPolicy() {
if (getProcessorOptions().isDropStreamMessageId()) {
return StreamMessageIdPolicy.IGNORE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.redis.testcontainers.RedisStackContainer;

class StackTests extends AbstractReplicationTests {
class ReplicationStackTests extends ReplicationTests {

private static final RedisStackContainer source = RedisContainerFactory.stack();
private static final RedisStackContainer target = RedisContainerFactory.stack();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package com.redis.riot.redis;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.function.Predicate;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.springframework.batch.item.support.ListItemWriter;
import org.testcontainers.shaded.org.bouncycastle.util.encoders.Hex;

import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.riot.core.ExportProcessorOptions;
import com.redis.riot.core.PredicateItemProcessor;
import com.redis.riot.core.RedisClientOptions;
Expand All @@ -20,8 +25,9 @@
import com.redis.testcontainers.RedisServer;

import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.codec.ByteArrayCodec;

public abstract class AbstractReplicationTests extends AbstractTargetTestBase {
public abstract class ReplicationTests extends AbstractTargetTestBase {

public static final String BEERS_JSON_URL = "https://storage.googleapis.com/jrx/beers.json";
public static final int BEER_CSV_COUNT = 2410;
Expand All @@ -44,6 +50,7 @@ protected void execute(Replication replication, TestInfo info) {
replication.setJobFactory(jobFactory);
replication.setRedisClientOptions(redisOptions(getRedisServer()));
replication.setTargetRedisClientOptions(redisOptions(getTargetRedisServer()));
replication.getReaderOptions().setIdleTimeout(getIdleTimeout());
replication.run();
}

Expand Down Expand Up @@ -93,6 +100,23 @@ void keyProcessorWithDate(TestInfo info) throws Throwable {
Assertions.assertEquals(value1, targetRedisCommands.get("1273449600000:" + key1));
}

@Test
void binaryKeyLiveReplication(TestInfo info) throws Exception {
enableKeyspaceNotifications();
Executors.newSingleThreadExecutor().execute(() -> {
awaitPubSub();
byte[] key = Hex.decode("aced0005");
StatefulRedisModulesConnection<byte[], byte[]> connection = RedisModulesUtils.connection(redisClient,
ByteArrayCodec.INSTANCE);
connection.sync().set(key, "value".getBytes());
});
Replication replication = new Replication();
replication.setMode(ReplicationMode.LIVE);
replication.setCompareMode(CompareMode.NONE);
execute(replication, info);
Assertions.assertEquals(Collections.emptyList(), compare(info).mismatches());
}

@Test
void filterKeySlot(TestInfo info) throws Exception {
enableKeyspaceNotifications();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.function.FunctionItemProcessor;
import org.springframework.batch.item.support.PassThroughItemProcessor;
import org.springframework.expression.spel.support.StandardEvaluationContext;

Expand All @@ -30,11 +31,14 @@ public abstract class AbstractExport extends AbstractJobRunnable {
private RedisReaderOptions readerOptions = new RedisReaderOptions();
private ExportProcessorOptions processorOptions = new ExportProcessorOptions();

protected <K> Function<KeyValue<K, Object>, KeyValue<K, Object>> processor(RedisCodec<K, ?> codec) {
protected <K> ItemProcessor<KeyValue<K, Object>, KeyValue<K, Object>> processor(RedisCodec<K, ?> codec) {
if (processorOptions.isEmpty()) {
return null;
}
ToStringKeyValueFunction<K, Object> code = new ToStringKeyValueFunction<>(codec);
StringKeyValueFunction<K, Object> decode = new StringKeyValueFunction<>(codec);
KeyValueOperator operator = keyValueOperator();
return code.andThen(operator).andThen(decode);
return new FunctionItemProcessor<>(code.andThen(operator).andThen(decode));
}

protected StandardEvaluationContext evaluationContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ protected RedisItemReader<String, String, KeyValue<String, Object>> reader() {
}

protected ItemProcessor<KeyValue<String, Object>, Map<String, Object>> processor() {
ItemProcessor<KeyValue<String, Object>, KeyValue<String, Object>> processor = new FunctionItemProcessor<>(
processor(StringCodec.UTF8));
ItemProcessor<KeyValue<String, Object>, KeyValue<String, Object>> processor = processor(StringCodec.UTF8);
StructToMapFunction toMapFunction = new StructToMapFunction();
if (keyRegex != null) {
toMapFunction.setKey(new RegexNamedGroupFunction(keyRegex));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -102,11 +103,12 @@ public static Predicate<String> globPredicate(List<String> patterns) {
}

public static <S, T> ItemProcessor<S, T> processor(ItemProcessor<?, ?>... processors) {
return processor(Arrays.asList(processors));
return processor(new ArrayList<>(Arrays.asList(processors)));
}

@SuppressWarnings("unchecked")
public static <S, T> ItemProcessor<S, T> processor(Collection<? extends ItemProcessor<?, ?>> processors) {
processors.removeIf(Objects::isNull);
if (processors.isEmpty()) {
return null;
}
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ latencyutilsVersion = 2.0.3
lettucemodVersion = 3.7.3
picocliVersion = 4.7.5
progressbarVersion = 0.10.0
springBatchRedisVersion = 4.1.1
springBatchRedisVersion = 4.1.2-SNAPSHOT
testcontainersRedisVersion = 2.2.0

org.gradle.daemon = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import com.redis.riot.redis.ReplicationMode;
import com.redis.riot.redis.ReplicationType;
import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.reader.KeyComparatorOptions;
import com.redis.spring.batch.reader.KeyComparison.Status;
import com.redis.spring.batch.reader.KeyComparisonItemReader;
import com.redis.spring.batch.reader.KeyNotificationItemReader;

import picocli.CommandLine.ArgGroup;
Expand All @@ -40,7 +40,7 @@ public class ReplicateCommand extends AbstractExportCommand {
boolean type;

@Option(names = "--ttl-tolerance", description = "Max TTL offset in millis to use for dataset verification (default: ${DEFAULT-VALUE}).", paramLabel = "<ms>")
long ttlTolerance = KeyComparisonItemReader.DEFAULT_TTL_TOLERANCE.toMillis();
long ttlTolerance = KeyComparatorOptions.DEFAULT_TTL_TOLERANCE.toMillis();

@Option(names = "--show-diffs", description = "Print details of key mismatches during dataset verification. Disables progress reporting.")
boolean showDiffs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.testcontainers.containers.JdbcDatabaseContainer;

Expand All @@ -25,8 +23,6 @@

abstract class AbstractDbTests extends AbstractRiotTestBase {

private final Logger log = LoggerFactory.getLogger(getClass());

private static final RedisStackContainer redis = new RedisStackContainer(
RedisStackContainer.DEFAULT_IMAGE_NAME.withTag(RedisStackContainer.DEFAULT_TAG));

Expand Down

0 comments on commit 886a6be

Please sign in to comment.