Skip to content

Commit

Permalink
feat: Added logging options
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Sep 3, 2023
1 parent 8fd9412 commit c363cab
Show file tree
Hide file tree
Showing 63 changed files with 3,364 additions and 1,025 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.1.5
3.2.0-SNAPSHOT
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.redis.riot.core;

import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import org.springframework.batch.core.Job;
Expand All @@ -14,8 +12,6 @@
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.util.ClassUtils;
Expand Down Expand Up @@ -100,17 +96,4 @@ protected ReaderStepBuilder step(String name) {
return StepBuilder.factory(stepFactory).name(name).options(stepOptions);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
protected <S, T> ItemProcessor<S, T> processor(Collection<? extends ItemProcessor<?, ?>> processors) {
if (processors.isEmpty()) {
return null;
}
if (processors.size() == 1) {
return (ItemProcessor) processors.iterator().next();
}
CompositeItemProcessor<S, T> composite = new CompositeItemProcessor<>();
composite.setDelegates(new ArrayList<>(processors));
return composite;
}

}
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
package com.redis.riot.core;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.function.FunctionItemProcessor;
import org.springframework.context.expression.MapAccessor;
import org.springframework.expression.AccessException;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.TypedValue;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import com.redis.spring.batch.util.PredicateItemProcessor;
import com.redis.spring.batch.writer.Operation;
import com.redis.spring.batch.writer.OperationItemWriter;
import com.redis.spring.batch.writer.operation.CompositeOperation;
Expand All @@ -36,10 +26,19 @@ protected AbstractMapImport(AbstractRedisClient client) {
super(client);
}

protected ItemProcessor<Map<String, Object>, Map<String, Object>> processor() {
return processorOptions.processor();
}

public void setProcessorOptions(MapProcessorOptions options) {
this.processorOptions = options;
}

@SuppressWarnings("unchecked")
public void setOperations(Operation<String, String, Map<String, Object>>... operations) {
setOperations(Arrays.asList(operations));
}

public void setOperations(List<Operation<String, String, Map<String, Object>>> operations) {
this.operations = operations;
}
Expand All @@ -65,47 +64,4 @@ private Operation<String, String, Map<String, Object>> operation() {
return operation;
}

protected ItemProcessor<Map<String, Object>, Map<String, Object>> processor() {
List<ItemProcessor<Map<String, Object>, Map<String, Object>>> processors = new ArrayList<>();
EvaluationContext context = evaluationContext();
if (!CollectionUtils.isEmpty(processorOptions.getExpressions())) {
processors.add(new FunctionItemProcessor<>(SpelUtils.mapOperator(context, processorOptions.getExpressions())));
}
if (processorOptions.getFilter() != null) {
Predicate<Map<String, Object>> predicate = SpelUtils.predicate(context, processorOptions.getFilter());
processors.add(new PredicateItemProcessor<>(predicate));
}
return processor(processors);
}

protected StandardEvaluationContext evaluationContext() {
StandardEvaluationContext context = processorOptions.getEvaluationContextOptions().evaluationContext();
context.addPropertyAccessor(new QuietMapAccessor());
return context;
}

/**
* {@link org.springframework.context.expression.MapAccessor} that always returns true for canRead and does not throw
* AccessExceptions
*
* @author Julien Ruaux
*/
private static class QuietMapAccessor extends MapAccessor {

@Override
public boolean canRead(EvaluationContext context, @Nullable Object target, String name) {
return true;
}

@Override
public TypedValue read(EvaluationContext context, @Nullable Object target, String name) {
try {
return super.read(context, target, name);
} catch (AccessException e) {
return new TypedValue(null);
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.springframework.expression.Expression;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import com.redis.lettucemod.util.GeoLocation;
Expand Down Expand Up @@ -53,13 +52,8 @@ public void addExpression(String field, Expression expression) {
public StandardEvaluationContext evaluationContext() {
StandardEvaluationContext context = new StandardEvaluationContext();
context.registerFunction("geo", geoMethod());
Map<String, Object> contextVariables = variables();
if (!CollectionUtils.isEmpty(contextVariables)) {
context.setVariables(contextVariables);
}
if (!CollectionUtils.isEmpty(expressions)) {
expressions.forEach((k, v) -> context.setVariable(k, v.getValue(context)));
}
context.setVariables(variables());
expressions.forEach((k, v) -> context.setVariable(k, v.getValue(context)));
return context;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public class KeyFilterOptions {
private List<IntRange> slots;

public KeyFilterOptions() {

}

private KeyFilterOptions(Builder builder) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
package com.redis.riot.core;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.function.FunctionItemProcessor;
import org.springframework.context.expression.MapAccessor;
import org.springframework.expression.AccessException;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.TypedValue;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;

import com.redis.spring.batch.util.BatchUtils;
import com.redis.spring.batch.util.PredicateItemProcessor;

public class MapProcessorOptions {

Expand Down Expand Up @@ -36,4 +51,42 @@ public void setFilter(Expression filter) {
this.filter = filter;
}

public ItemProcessor<Map<String, Object>, Map<String, Object>> processor() {
List<ItemProcessor<Map<String, Object>, Map<String, Object>>> processors = new ArrayList<>();
StandardEvaluationContext context = evaluationContextOptions.evaluationContext();
context.addPropertyAccessor(new QuietMapAccessor());
if (!CollectionUtils.isEmpty(expressions)) {
processors.add(new FunctionItemProcessor<>(SpelUtils.mapOperator(context, expressions)));
}
if (filter != null) {
Predicate<Map<String, Object>> predicate = SpelUtils.predicate(context, filter);
processors.add(new PredicateItemProcessor<>(predicate));
}
return BatchUtils.processor(processors);
}

/**
* {@link org.springframework.context.expression.MapAccessor} that always returns true for canRead and does not throw
* AccessExceptions
*
* @author Julien Ruaux
*/
private static class QuietMapAccessor extends MapAccessor {

@Override
public boolean canRead(EvaluationContext context, @Nullable Object target, String name) {
return true;
}

@Override
public TypedValue read(EvaluationContext context, @Nullable Object target, String name) {
try {
return super.read(context, target, name);
} catch (AccessException e) {
return new TypedValue(null);
}
}

}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.redis.riot.core;

import java.io.PrintStream;
import java.io.PrintWriter;
import java.time.Duration;
import java.util.Map;
import java.util.TreeMap;
Expand All @@ -17,7 +17,7 @@
import io.lettuce.core.metrics.CommandMetrics.CommandLatency;
import io.lettuce.core.metrics.DefaultCommandLatencyCollectorOptions;

public class PingExecutable implements Executable {
public class Ping implements Executable {

private final AbstractRedisClient client;

Expand All @@ -29,7 +29,7 @@ public class PingExecutable implements Executable {

public static final double[] DEFAULT_PERCENTILES = DefaultCommandLatencyCollectorOptions.DEFAULT_TARGET_PERCENTILES;

private final PrintStream printStream;
private final PrintWriter out;

private int iterations = DEFAULT_ITERATIONS;

Expand All @@ -43,9 +43,9 @@ public class PingExecutable implements Executable {

private Duration sleep;

public PingExecutable(AbstractRedisClient client, PrintStream printStream) {
public Ping(AbstractRedisClient client, PrintWriter out) {
this.client = client;
this.printStream = printStream;
this.out = out;
}

public void setSleep(Duration sleep) {
Expand Down Expand Up @@ -123,7 +123,7 @@ private void execute(StatefulRedisModulesConnection<String, String> connection)
}
Histogram histogram = stats.getIntervalHistogram();
if (latencyDistribution) {
histogram.outputPercentileDistribution(printStream, (double) timeUnit.toNanos(1));
histogram.outputPercentileDistribution(System.out, (double) timeUnit.toNanos(1));
}
Map<Double, Long> percentileMap = new TreeMap<>();
for (double targetPercentile : percentiles) {
Expand All @@ -133,7 +133,7 @@ private void execute(StatefulRedisModulesConnection<String, String> connection)
long min = toTimeUnit(histogram.getMinValue());
long max = toTimeUnit(histogram.getMaxValue());
CommandLatency latency = new CommandLatency(min, max, percentileMap);
printStream.println(latency.toString());
out.println(latency.toString());
}

private long toTimeUnit(long value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ protected Job job() {

private FakerItemReader reader() {
FakerItemReader reader = new FakerItemReader();
reader.setEvaluationContext(evaluationContext());
reader.setMaxItemCount(count);
reader.setLocale(locale);
reader.setFields(fields());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.support.DataBindingMethodResolver;
import org.springframework.expression.spel.support.StandardEvaluationContext;
Expand All @@ -26,24 +27,16 @@ public class FakerItemReader extends AbstractItemCountingItemStreamItemReader<Ma

public static final Locale DEFAULT_LOCALE = Locale.getDefault();

private static final String EXECUTION_CONTEXT = "ctx";

private StandardEvaluationContext evaluationContext = new StandardEvaluationContext();

private Map<String, Expression> fields = new LinkedHashMap<>();

private Locale locale = DEFAULT_LOCALE;

private boolean open;
private EvaluationContext evaluationContext;

public FakerItemReader() {
setName(ClassUtils.getShortName(getClass()));
}

public void setEvaluationContext(StandardEvaluationContext context) {
this.evaluationContext = context;
}

public void setLocale(Locale locale) {
this.locale = locale;
}
Expand All @@ -62,14 +55,18 @@ public void setStringFields(Map<String, String> stringFields) {
protected synchronized void doOpen() throws Exception {
if (!isOpen()) {
Assert.notEmpty(fields, "No field specified");
evaluationContext.addPropertyAccessor(new ReflectivePropertyAccessor());
evaluationContext.addMethodResolver(DataBindingMethodResolver.forInstanceMethodInvocation());
evaluationContext.setRootObject(new Faker(locale));
evaluationContext.setVariable(EXECUTION_CONTEXT, new ExecutionContext());
open = true;
evaluationContext = evaluationContext();
}
}

private EvaluationContext evaluationContext() {
StandardEvaluationContext context = new StandardEvaluationContext();
context.addPropertyAccessor(new ReflectivePropertyAccessor());
context.addMethodResolver(DataBindingMethodResolver.forInstanceMethodInvocation());
context.setRootObject(new AugmentedFaker(locale));
return context;
}

@Override
protected Map<String, Object> doRead() throws Exception {
Map<String, Object> map = new HashMap<>();
Expand All @@ -80,15 +77,19 @@ protected Map<String, Object> doRead() throws Exception {
@Override
protected synchronized void doClose() throws Exception {
if (isOpen()) {
open = false;
evaluationContext = null;
}
}

public boolean isOpen() {
return open;
return evaluationContext != null;
}

public class ExecutionContext {
public class AugmentedFaker extends Faker {

public AugmentedFaker(Locale locale) {
super(locale);
}

public int getIndex() {
return index();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
import org.springframework.batch.item.json.JsonObjectMarshaller;
import org.springframework.batch.item.support.AbstractFileItemWriter;
import org.springframework.core.io.Resource;
import org.springframework.core.io.WritableResource;

Expand All @@ -29,7 +28,7 @@ public class FileDumpExport extends AbstractExport {

public static final String DEFAULT_ROOT_NAME = "root";

public static final String DEFAULT_LINE_SEPARATOR = AbstractFileItemWriter.DEFAULT_LINE_SEPARATOR;
public static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator");

private final String file;

Expand Down
Loading

0 comments on commit c363cab

Please sign in to comment.