diff --git a/core/src/main/java/feast/core/service/SpecService.java b/core/src/main/java/feast/core/service/SpecService.java index 1d6ce16de5..129fa68a82 100644 --- a/core/src/main/java/feast/core/service/SpecService.java +++ b/core/src/main/java/feast/core/service/SpecService.java @@ -143,8 +143,8 @@ public GetFeatureSetResponse getFeatureSet(GetFeatureSetRequest request) { * possible if a project name is not set explicitly * *

The version field can be one of - '*' - This will match all versions - 'latest' - This will - * match the latest feature set version - '<number>' - This will match a specific feature set - * version. This property can only be set if both the feature set name and project name are + * match the latest feature set version - '<number>' - This will match a specific feature + * set version. This property can only be set if both the feature set name and project name are * explicitly set. * * @param filter filter containing the desired featureSet name and version filter diff --git a/core/src/main/java/feast/core/util/PackageUtil.java b/core/src/main/java/feast/core/util/PackageUtil.java index 20b2310644..99c5d73ba7 100644 --- a/core/src/main/java/feast/core/util/PackageUtil.java +++ b/core/src/main/java/feast/core/util/PackageUtil.java @@ -44,9 +44,9 @@ public class PackageUtil { * points to the resource location. Note that the extraction process can take several minutes to * complete. * - *

One use case of this function is to detect the class path of resources to stage when - * using Dataflow runner. The resource URL however is in "jar:file:" format, which cannot be - * handled by default in Apache Beam. + *

One use case of this function is to detect the class path of resources to stage when using + * Dataflow runner. The resource URL however is in "jar:file:" format, which cannot be handled by + * default in Apache Beam. * *

    * 
diff --git a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java
index 778540595a..b7901c2f90 100644
--- a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java
+++ b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java
@@ -89,15 +89,14 @@ public PDone expand(PCollection input) {
     switch (storeType) {
       case REDIS:
         RedisConfig redisConfig = getStore().getRedisConfig();
-        PCollection redisWriteResult = input
-            .apply(
-                "FeatureRowToRedisMutation",
-                ParDo.of(new FeatureRowToRedisMutationDoFn(getFeatureSets())))
-            .apply(
-                "WriteRedisMutationToRedis",
-                RedisCustomIO.write(redisConfig));
+        PCollection redisWriteResult =
+            input
+                .apply(
+                    "FeatureRowToRedisMutation",
+                    ParDo.of(new FeatureRowToRedisMutationDoFn(getFeatureSets())))
+                .apply("WriteRedisMutationToRedis", RedisCustomIO.write(redisConfig));
         if (options.getDeadLetterTableSpec() != null) {
-            redisWriteResult.apply(
+          redisWriteResult.apply(
               WriteFailedElementToBigQuery.newBuilder()
                   .setTableSpec(options.getDeadLetterTableSpec())
                   .setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
diff --git a/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java
index 7d61a62f3f..c31d3c535e 100644
--- a/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java
+++ b/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java
@@ -24,10 +24,8 @@
 import feast.types.FieldProto;
 import feast.types.ValueProto.Value.ValCase;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -111,10 +109,7 @@ public void processElement(ProcessContext context) {
       }
       context.output(getFailureTag(), failedElement.build());
     } else {
-      featureRow = featureRow.toBuilder()
-                    .clearFields()
-                    .addAllFields(fields)
-                    .build();
+      featureRow = featureRow.toBuilder().clearFields().addAllFields(fields).build();
       context.output(getSuccessTag(), featureRow);
     }
   }
diff --git a/ingestion/src/main/java/feast/retry/BackOffExecutor.java b/ingestion/src/main/java/feast/retry/BackOffExecutor.java
index 7e38a3cf70..344c65ac42 100644
--- a/ingestion/src/main/java/feast/retry/BackOffExecutor.java
+++ b/ingestion/src/main/java/feast/retry/BackOffExecutor.java
@@ -1,38 +1,58 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package feast.retry;
 
+import java.io.Serializable;
 import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.sdk.util.BackOffUtils;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.Sleeper;
 import org.joda.time.Duration;
 
-import java.io.IOException;
-import java.io.Serializable;
-
 public class BackOffExecutor implements Serializable {
 
-    private static FluentBackoff backoff;
+  private final Integer maxRetries;
+  private final Duration initialBackOff;
 
-    public BackOffExecutor(Integer maxRetries, Duration initialBackOff) {
-        backoff = FluentBackoff.DEFAULT
-                .withMaxRetries(maxRetries)
-                .withInitialBackoff(initialBackOff);
-    }
+  public BackOffExecutor(Integer maxRetries, Duration initialBackOff) {
+    this.maxRetries = maxRetries;
+    this.initialBackOff = initialBackOff;
+  }
+
+  public void execute(Retriable retriable) throws Exception {
+    FluentBackoff backoff =
+        FluentBackoff.DEFAULT.withMaxRetries(maxRetries).withInitialBackoff(initialBackOff);
+    execute(retriable, backoff);
+  }
 
-    public void execute(Retriable retriable) throws Exception {
-        Sleeper sleeper = Sleeper.DEFAULT;
-        BackOff backOff = backoff.backoff();
-        while(true) {
-            try {
-                retriable.execute();
-                break;
-            } catch (Exception e) {
-                if(retriable.isExceptionRetriable(e) && BackOffUtils.next(sleeper, backOff)) {
-                    retriable.cleanUpAfterFailure();
-                } else {
-                    throw e;
-                }
-            }
+  private void execute(Retriable retriable, FluentBackoff backoff) throws Exception {
+    Sleeper sleeper = Sleeper.DEFAULT;
+    BackOff backOff = backoff.backoff();
+    while (true) {
+      try {
+        retriable.execute();
+        break;
+      } catch (Exception e) {
+        if (retriable.isExceptionRetriable(e) && BackOffUtils.next(sleeper, backOff)) {
+          retriable.cleanUpAfterFailure();
+        } else {
+          throw e;
         }
+      }
     }
+  }
 }
diff --git a/ingestion/src/main/java/feast/retry/Retriable.java b/ingestion/src/main/java/feast/retry/Retriable.java
index 8fd76fedbb..0a788fcdd6 100644
--- a/ingestion/src/main/java/feast/retry/Retriable.java
+++ b/ingestion/src/main/java/feast/retry/Retriable.java
@@ -1,7 +1,25 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package feast.retry;
 
 public interface Retriable {
-    void execute();
-    Boolean isExceptionRetriable(Exception e);
-    void cleanUpAfterFailure();
+  void execute();
+
+  Boolean isExceptionRetriable(Exception e);
+
+  void cleanUpAfterFailure();
 }
diff --git a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java
index 20afc43d76..8c142b66c9 100644
--- a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java
+++ b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java
@@ -20,6 +20,9 @@
 import feast.ingestion.values.FailedElement;
 import feast.retry.BackOffExecutor;
 import feast.retry.Retriable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.avro.reflect.Nullable;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
@@ -38,10 +41,6 @@
 import redis.clients.jedis.Response;
 import redis.clients.jedis.exceptions.JedisConnectionException;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 public class RedisCustomIO {
 
   private static final int DEFAULT_BATCH_SIZE = 1000;
@@ -164,7 +163,8 @@ public void setScore(@Nullable Long score) {
   }
 
   /** ServingStoreWrite data to a Redis server. */
-  public static class Write extends PTransform, PCollection> {
+  public static class Write
+      extends PTransform, PCollection> {
 
     private WriteDoFn dofn;
 
@@ -202,9 +202,10 @@ public static class WriteDoFn extends DoFn {
       WriteDoFn(StoreProto.Store.RedisConfig redisConfig) {
         this.host = redisConfig.getHost();
         this.port = redisConfig.getPort();
-        long backoffMs = redisConfig.getInitialBackoffMs() > 0 ? redisConfig.getInitialBackoffMs() : 1;
-        this.backOffExecutor = new BackOffExecutor(redisConfig.getMaxRetries(),
-                Duration.millis(backoffMs));
+        long backoffMs =
+            redisConfig.getInitialBackoffMs() > 0 ? redisConfig.getInitialBackoffMs() : 1;
+        this.backOffExecutor =
+            new BackOffExecutor(redisConfig.getMaxRetries(), Duration.millis(backoffMs));
       }
 
       public WriteDoFn withBatchSize(int batchSize) {
@@ -233,47 +234,50 @@ public void startBundle() {
       }
 
       private void executeBatch() throws Exception {
-        backOffExecutor.execute(new Retriable() {
-          @Override
-          public void execute() {
-            pipeline.multi();
-            mutations.forEach(mutation -> {
-              writeRecord(mutation);
-              if (mutation.getExpiryMillis() != null && mutation.getExpiryMillis() > 0) {
-                pipeline.pexpire(mutation.getKey(), mutation.getExpiryMillis());
+        backOffExecutor.execute(
+            new Retriable() {
+              @Override
+              public void execute() {
+                pipeline.multi();
+                mutations.forEach(
+                    mutation -> {
+                      writeRecord(mutation);
+                      if (mutation.getExpiryMillis() != null && mutation.getExpiryMillis() > 0) {
+                        pipeline.pexpire(mutation.getKey(), mutation.getExpiryMillis());
+                      }
+                    });
+                pipeline.exec();
+                pipeline.sync();
+                mutations.clear();
               }
-            });
-            pipeline.exec();
-            pipeline.sync();
-            mutations.clear();
-          }
 
-          @Override
-          public Boolean isExceptionRetriable(Exception e) {
-            return e instanceof JedisConnectionException;
-          }
+              @Override
+              public Boolean isExceptionRetriable(Exception e) {
+                return e instanceof JedisConnectionException;
+              }
 
-          @Override
-          public void cleanUpAfterFailure() {
-            try {
-              pipeline.close();
-            } catch (IOException e) {
-              log.error(String.format("Error while closing pipeline: %s", e.getMessage()));
-            }
-            jedis = new Jedis(host, port, timeout);
-            pipeline = jedis.pipelined();
-          }
-        });
+              @Override
+              public void cleanUpAfterFailure() {
+                try {
+                  pipeline.close();
+                } catch (IOException e) {
+                  log.error(String.format("Error while closing pipeline: %s", e.getMessage()));
+                }
+                jedis = new Jedis(host, port, timeout);
+                pipeline = jedis.pipelined();
+              }
+            });
       }
 
-      private FailedElement toFailedElement(RedisMutation mutation, Exception exception, String jobName) {
+      private FailedElement toFailedElement(
+          RedisMutation mutation, Exception exception, String jobName) {
         return FailedElement.newBuilder()
-          .setJobName(jobName)
-          .setTransformName("RedisCustomIO")
-          .setPayload(mutation.getValue().toString())
-          .setErrorMessage(exception.getMessage())
-          .setStackTrace(ExceptionUtils.getStackTrace(exception))
-          .build();
+            .setJobName(jobName)
+            .setTransformName("RedisCustomIO")
+            .setPayload(mutation.getValue().toString())
+            .setErrorMessage(exception.getMessage())
+            .setStackTrace(ExceptionUtils.getStackTrace(exception))
+            .build();
       }
 
       @ProcessElement
@@ -284,11 +288,12 @@ public void processElement(ProcessContext context) {
           try {
             executeBatch();
           } catch (Exception e) {
-            mutations.forEach(failedMutation -> {
-              FailedElement failedElement = toFailedElement(
-                failedMutation, e, context.getPipelineOptions().getJobName());
-              context.output(failedElement);
-            });
+            mutations.forEach(
+                failedMutation -> {
+                  FailedElement failedElement =
+                      toFailedElement(failedMutation, e, context.getPipelineOptions().getJobName());
+                  context.output(failedElement);
+                });
             mutations.clear();
           }
         }
@@ -315,16 +320,18 @@ private Response writeRecord(RedisMutation mutation) {
       }
 
       @FinishBundle
-      public void finishBundle(FinishBundleContext context) throws IOException, InterruptedException {
-        if(mutations.size() > 0) {
+      public void finishBundle(FinishBundleContext context)
+          throws IOException, InterruptedException {
+        if (mutations.size() > 0) {
           try {
             executeBatch();
           } catch (Exception e) {
-            mutations.forEach(failedMutation -> {
-              FailedElement failedElement = toFailedElement(
-                failedMutation, e, context.getPipelineOptions().getJobName());
-              context.output(failedElement, Instant.now(), GlobalWindow.INSTANCE);
-            });
+            mutations.forEach(
+                failedMutation -> {
+                  FailedElement failedElement =
+                      toFailedElement(failedMutation, e, context.getPipelineOptions().getJobName());
+                  context.output(failedElement, Instant.now(), GlobalWindow.INSTANCE);
+                });
             mutations.clear();
           }
         }
diff --git a/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java b/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java
index aca3956387..5c9860ed97 100644
--- a/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java
+++ b/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java
@@ -180,12 +180,14 @@ public void shouldExcludeUnregisteredFields() {
 
     FeatureRow randomRow = TestUtil.createRandomFeatureRow(fs1);
     expected.add(randomRow);
-    input.add(randomRow.toBuilder()
-        .addFields(Field.newBuilder()
-          .setName("extra")
-          .setValue(Value.newBuilder().setStringVal("hello")))
-        .build()
-    );
+    input.add(
+        randomRow
+            .toBuilder()
+            .addFields(
+                Field.newBuilder()
+                    .setName("extra")
+                    .setValue(Value.newBuilder().setStringVal("hello")))
+            .build());
 
     PCollectionTuple output =
         p.apply(Create.of(input))
diff --git a/ingestion/src/test/java/feast/store/serving/redis/RedisCustomIOTest.java b/ingestion/src/test/java/feast/store/serving/redis/RedisCustomIOTest.java
index 94167059b4..fc17f6207f 100644
--- a/ingestion/src/test/java/feast/store/serving/redis/RedisCustomIOTest.java
+++ b/ingestion/src/test/java/feast/store/serving/redis/RedisCustomIOTest.java
@@ -16,12 +16,24 @@
  */
 package feast.store.serving.redis;
 
+import static feast.test.TestUtil.field;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
 import feast.core.StoreProto;
 import feast.storage.RedisProto.RedisKey;
 import feast.store.serving.redis.RedisCustomIO.Method;
 import feast.store.serving.redis.RedisCustomIO.RedisMutation;
 import feast.types.FeatureRowProto.FeatureRow;
 import feast.types.ValueProto.ValueType.Enum;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
@@ -35,29 +47,14 @@
 import redis.embedded.Redis;
 import redis.embedded.RedisServer;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static feast.test.TestUtil.field;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-
 public class RedisCustomIOTest {
-  @Rule
-  public transient TestPipeline p = TestPipeline.create();
+  @Rule public transient TestPipeline p = TestPipeline.create();
 
   private static String REDIS_HOST = "localhost";
   private static int REDIS_PORT = 51234;
   private Redis redis;
   private Jedis jedis;
 
-
   @Before
   public void setUp() throws IOException {
     redis = new RedisServer(REDIS_PORT);
@@ -72,10 +69,8 @@ public void teardown() {
 
   @Test
   public void shouldWriteToRedis() {
-    StoreProto.Store.RedisConfig redisConfig = StoreProto.Store.RedisConfig.newBuilder()
-            .setHost(REDIS_HOST)
-            .setPort(REDIS_PORT)
-            .build();
+    StoreProto.Store.RedisConfig redisConfig =
+        StoreProto.Store.RedisConfig.newBuilder().setHost(REDIS_HOST).setPort(REDIS_PORT).build();
     HashMap kvs = new LinkedHashMap<>();
     kvs.put(
         RedisKey.newBuilder()
@@ -110,8 +105,7 @@ public void shouldWriteToRedis() {
                         null))
             .collect(Collectors.toList());
 
-    p.apply(Create.of(featureRowWrites))
-        .apply(RedisCustomIO.write(redisConfig));
+    p.apply(Create.of(featureRowWrites)).apply(RedisCustomIO.write(redisConfig));
     p.run();
 
     kvs.forEach(
@@ -123,68 +117,95 @@ public void shouldWriteToRedis() {
 
   @Test(timeout = 10000)
   public void shouldRetryFailConnection() throws InterruptedException {
-    StoreProto.Store.RedisConfig redisConfig = StoreProto.Store.RedisConfig.newBuilder()
+    StoreProto.Store.RedisConfig redisConfig =
+        StoreProto.Store.RedisConfig.newBuilder()
             .setHost(REDIS_HOST)
             .setPort(REDIS_PORT)
             .setMaxRetries(4)
             .setInitialBackoffMs(2000)
             .build();
     HashMap kvs = new LinkedHashMap<>();
-    kvs.put(RedisKey.newBuilder().setFeatureSet("fs:1")
-                    .addEntities(field("entity", 1, Enum.INT64)).build(),
-            FeatureRow.newBuilder().setFeatureSet("fs:1")
-                    .addFields(field("entity", 1, Enum.INT64))
-                    .addFields(field("feature", "one", Enum.STRING)).build());
-
-    List featureRowWrites = kvs.entrySet().stream()
-            .map(kv -> new RedisMutation(Method.SET, kv.getKey().toByteArray(),
-                    kv.getValue().toByteArray(),
-                    null, null)
-            )
+    kvs.put(
+        RedisKey.newBuilder()
+            .setFeatureSet("fs:1")
+            .addEntities(field("entity", 1, Enum.INT64))
+            .build(),
+        FeatureRow.newBuilder()
+            .setFeatureSet("fs:1")
+            .addFields(field("entity", 1, Enum.INT64))
+            .addFields(field("feature", "one", Enum.STRING))
+            .build());
+
+    List featureRowWrites =
+        kvs.entrySet().stream()
+            .map(
+                kv ->
+                    new RedisMutation(
+                        Method.SET,
+                        kv.getKey().toByteArray(),
+                        kv.getValue().toByteArray(),
+                        null,
+                        null))
             .collect(Collectors.toList());
 
-    PCollection failedElementCount = p.apply(Create.of(featureRowWrites))
-        .apply(RedisCustomIO.write(redisConfig))
-        .apply(Count.globally());
+    PCollection failedElementCount =
+        p.apply(Create.of(featureRowWrites))
+            .apply(RedisCustomIO.write(redisConfig))
+            .apply(Count.globally());
 
     redis.stop();
     final ScheduledThreadPoolExecutor redisRestartExecutor = new ScheduledThreadPoolExecutor(1);
-    ScheduledFuture scheduledRedisRestart = redisRestartExecutor.schedule(() -> {
-      redis.start();
-    }, 3, TimeUnit.SECONDS);
+    ScheduledFuture scheduledRedisRestart =
+        redisRestartExecutor.schedule(
+            () -> {
+              redis.start();
+            },
+            3,
+            TimeUnit.SECONDS);
 
     PAssert.that(failedElementCount).containsInAnyOrder(0L);
     p.run();
     scheduledRedisRestart.cancel(true);
 
-    kvs.forEach((key, value) -> {
-      byte[] actual = jedis.get(key.toByteArray());
-      assertThat(actual, equalTo(value.toByteArray()));
-    });
+    kvs.forEach(
+        (key, value) -> {
+          byte[] actual = jedis.get(key.toByteArray());
+          assertThat(actual, equalTo(value.toByteArray()));
+        });
   }
 
   @Test
   public void shouldProduceFailedElementIfRetryExceeded() {
-    StoreProto.Store.RedisConfig redisConfig = StoreProto.Store.RedisConfig.newBuilder()
-        .setHost(REDIS_HOST)
-        .setPort(REDIS_PORT)
-        .build();
+    StoreProto.Store.RedisConfig redisConfig =
+        StoreProto.Store.RedisConfig.newBuilder().setHost(REDIS_HOST).setPort(REDIS_PORT).build();
     HashMap kvs = new LinkedHashMap<>();
-    kvs.put(RedisKey.newBuilder().setFeatureSet("fs:1")
-            .addEntities(field("entity", 1, Enum.INT64)).build(),
-        FeatureRow.newBuilder().setFeatureSet("fs:1")
+    kvs.put(
+        RedisKey.newBuilder()
+            .setFeatureSet("fs:1")
+            .addEntities(field("entity", 1, Enum.INT64))
+            .build(),
+        FeatureRow.newBuilder()
+            .setFeatureSet("fs:1")
             .addFields(field("entity", 1, Enum.INT64))
-            .addFields(field("feature", "one", Enum.STRING)).build());
+            .addFields(field("feature", "one", Enum.STRING))
+            .build());
 
-    List featureRowWrites = kvs.entrySet().stream()
-            .map(kv -> new RedisMutation(Method.SET, kv.getKey().toByteArray(),
-                    kv.getValue().toByteArray(),
-                    null, null)
-            ).collect(Collectors.toList());
+    List featureRowWrites =
+        kvs.entrySet().stream()
+            .map(
+                kv ->
+                    new RedisMutation(
+                        Method.SET,
+                        kv.getKey().toByteArray(),
+                        kv.getValue().toByteArray(),
+                        null,
+                        null))
+            .collect(Collectors.toList());
 
-    PCollection failedElementCount = p.apply(Create.of(featureRowWrites))
-        .apply(RedisCustomIO.write(redisConfig))
-        .apply(Count.globally());
+    PCollection failedElementCount =
+        p.apply(Create.of(featureRowWrites))
+            .apply(RedisCustomIO.write(redisConfig))
+            .apply(Count.globally());
 
     redis.stop();
     PAssert.that(failedElementCount).containsInAnyOrder(1L);