Skip to content

Commit

Permalink
Handle retry for redis io flow (#274)
Browse files Browse the repository at this point in the history
  • Loading branch information
khorshuheng authored and feast-ci-bot committed Jan 5, 2020
1 parent 3d44ad7 commit 8a0f53b
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,20 @@ public PDone expand(PCollection<FeatureRow> input) {
switch (storeType) {
case REDIS:
RedisConfig redisConfig = getStore().getRedisConfig();
input
PCollection<FailedElement> redisWriteResult = input
.apply(
"FeatureRowToRedisMutation",
ParDo.of(new FeatureRowToRedisMutationDoFn(getFeatureSets())))
.apply(
"WriteRedisMutationToRedis",
RedisCustomIO.write(redisConfig.getHost(), redisConfig.getPort()));
RedisCustomIO.write(redisConfig));
if (options.getDeadLetterTableSpec() != null) {
redisWriteResult.apply(
WriteFailedElementToBigQuery.newBuilder()
.setTableSpec(options.getDeadLetterTableSpec())
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
.build());
}
break;
case BIGQUERY:
BigQueryConfig bigqueryConfig = getStore().getBigqueryConfig();
Expand Down
38 changes: 38 additions & 0 deletions ingestion/src/main/java/feast/retry/BackOffExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package feast.retry;

import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.joda.time.Duration;

import java.io.IOException;
import java.io.Serializable;

public class BackOffExecutor implements Serializable {

private static FluentBackoff backoff;

public BackOffExecutor(Integer maxRetries, Duration initialBackOff) {
backoff = FluentBackoff.DEFAULT
.withMaxRetries(maxRetries)
.withInitialBackoff(initialBackOff);
}

public void execute(Retriable retriable) throws Exception {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backOff = backoff.backoff();
while(true) {
try {
retriable.execute();
break;
} catch (Exception e) {
if(retriable.isExceptionRetriable(e) && BackOffUtils.next(sleeper, backOff)) {
retriable.cleanUpAfterFailure();
} else {
throw e;
}
}
}
}
}
7 changes: 7 additions & 0 deletions ingestion/src/main/java/feast/retry/Retriable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package feast.retry;

public interface Retriable {
void execute();
Boolean isExceptionRetriable(Exception e);
void cleanUpAfterFailure();
}
132 changes: 100 additions & 32 deletions ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,31 @@
*/
package feast.store.serving.redis;

import feast.core.StoreProto;
import feast.ingestion.values.FailedElement;
import feast.retry.BackOffExecutor;
import feast.retry.Retriable;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class RedisCustomIO {

Expand All @@ -39,8 +51,8 @@ public class RedisCustomIO {

private RedisCustomIO() {}

public static Write write(String host, int port) {
return new Write(host, port);
public static Write write(StoreProto.Store.RedisConfig redisConfig) {
return new Write(redisConfig);
}

public enum Method {
Expand Down Expand Up @@ -152,12 +164,12 @@ public void setScore(@Nullable Long score) {
}

/** ServingStoreWrite data to a Redis server. */
public static class Write extends PTransform<PCollection<RedisMutation>, PDone> {
public static class Write extends PTransform<PCollection<RedisMutation>, PCollection<FailedElement>> {

private WriteDoFn dofn;

private Write(String host, int port) {
this.dofn = new WriteDoFn(host, port);
private Write(StoreProto.Store.RedisConfig redisConfig) {
this.dofn = new WriteDoFn(redisConfig);
}

public Write withBatchSize(int batchSize) {
Expand All @@ -171,24 +183,28 @@ public Write withTimeout(int timeout) {
}

@Override
public PDone expand(PCollection<RedisMutation> input) {
input.apply(ParDo.of(dofn));
return PDone.in(input.getPipeline());
public PCollection<FailedElement> expand(PCollection<RedisMutation> input) {
return input.apply(ParDo.of(dofn));
}

public static class WriteDoFn extends DoFn<RedisMutation, Void> {
public static class WriteDoFn extends DoFn<RedisMutation, FailedElement> {

private final String host;
private int port;
private final int port;
private final BackOffExecutor backOffExecutor;
private final List<RedisMutation> mutations = new ArrayList<>();

private Jedis jedis;
private Pipeline pipeline;
private int batchCount;
private int batchSize = DEFAULT_BATCH_SIZE;
private int timeout = DEFAULT_TIMEOUT;

WriteDoFn(String host, int port) {
this.host = host;
this.port = port;
WriteDoFn(StoreProto.Store.RedisConfig redisConfig) {
this.host = redisConfig.getHost();
this.port = redisConfig.getPort();
long backoffMs = redisConfig.getInitialBackoffMs() > 0 ? redisConfig.getInitialBackoffMs() : 1;
this.backOffExecutor = new BackOffExecutor(redisConfig.getMaxRetries(),
Duration.millis(backoffMs));
}

public WriteDoFn withBatchSize(int batchSize) {
Expand All @@ -212,24 +228,69 @@ public void setup() {

@StartBundle
public void startBundle() {
mutations.clear();
pipeline = jedis.pipelined();
pipeline.multi();
batchCount = 0;
}

private void executeBatch() throws Exception {
backOffExecutor.execute(new Retriable() {
@Override
public void execute() {
pipeline.multi();
mutations.forEach(mutation -> {
writeRecord(mutation);
if (mutation.getExpiryMillis() != null && mutation.getExpiryMillis() > 0) {
pipeline.pexpire(mutation.getKey(), mutation.getExpiryMillis());
}
});
pipeline.exec();
pipeline.sync();
mutations.clear();
}

@Override
public Boolean isExceptionRetriable(Exception e) {
return e instanceof JedisConnectionException;
}

@Override
public void cleanUpAfterFailure() {
try {
pipeline.close();
} catch (IOException e) {
log.error(String.format("Error while closing pipeline: %s", e.getMessage()));
}
jedis = new Jedis(host, port, timeout);
pipeline = jedis.pipelined();
}
});
}

private FailedElement toFailedElement(RedisMutation mutation, Exception exception, String jobName) {
return FailedElement.newBuilder()
.setJobName(jobName)
.setTransformName("RedisCustomIO")
.setPayload(mutation.getValue().toString())
.setErrorMessage(exception.getMessage())
.setStackTrace(ExceptionUtils.getStackTrace(exception))
.build();
}

@ProcessElement
public void processElement(ProcessContext context) {
RedisMutation mutation = context.element();
writeRecord(mutation);
if (mutation.getExpiryMillis() != null && mutation.getExpiryMillis() > 0) {
pipeline.pexpire(mutation.getKey(), mutation.getExpiryMillis());
}
batchCount++;
if (batchCount >= batchSize) {
pipeline.exec();
pipeline.sync();
pipeline.multi();
batchCount = 0;
mutations.add(mutation);
if (mutations.size() >= batchSize) {
try {
executeBatch();
} catch (Exception e) {
mutations.forEach(failedMutation -> {
FailedElement failedElement = toFailedElement(
failedMutation, e, context.getPipelineOptions().getJobName());
context.output(failedElement);
});
mutations.clear();
}
}
}

Expand All @@ -254,12 +315,19 @@ private Response<?> writeRecord(RedisMutation mutation) {
}

@FinishBundle
public void finishBundle() {
if (pipeline.isInMulti()) {
pipeline.exec();
pipeline.sync();
public void finishBundle(FinishBundleContext context) throws IOException, InterruptedException {
if(mutations.size() > 0) {
try {
executeBatch();
} catch (Exception e) {
mutations.forEach(failedMutation -> {
FailedElement failedElement = toFailedElement(
failedMutation, e, context.getPipelineOptions().getJobName());
context.output(failedElement, Instant.now(), GlobalWindow.INSTANCE);
});
mutations.clear();
}
}
batchCount = 0;
}

@Teardown
Expand Down
Loading

0 comments on commit 8a0f53b

Please sign in to comment.