From 489bd7a34b95b140ed53484c4fab0cb28bad69dc Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 25 Oct 2024 09:07:18 +0200 Subject: [PATCH] Fix file batch operation (#5519) --- .../scio/transforms/FileDownloadDoFn.java | 85 +++++++++---------- .../com/spotify/scio/util/RemoteFileUtil.java | 9 +- .../transforms/FileDownloadDoFnTest.scala | 31 +++++-- 3 files changed, 74 insertions(+), 51 deletions(-) diff --git a/scio-core/src/main/java/com/spotify/scio/transforms/FileDownloadDoFn.java b/scio-core/src/main/java/com/spotify/scio/transforms/FileDownloadDoFn.java index ea4254fd4a..94a899f819 100644 --- a/scio-core/src/main/java/com/spotify/scio/transforms/FileDownloadDoFn.java +++ b/scio-core/src/main/java/com/spotify/scio/transforms/FileDownloadDoFn.java @@ -20,14 +20,15 @@ import com.spotify.scio.util.RemoteFileUtil; import java.net.URI; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; +import java.util.*; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +38,7 @@ public class FileDownloadDoFn extends DoFn { private static final Logger LOG = LoggerFactory.getLogger(FileDownloadDoFn.class); - private final List batch; + private final List> batch; private final RemoteFileUtil remoteFileUtil; private final SerializableFunction fn; private final int batchSize; @@ -78,21 +79,37 @@ public void startBundle(StartBundleContext context) { this.batch.clear(); } + // kept for binary compatibility. Must not be used + // TODO: remove in 0.15.0 + @Deprecated + public void processElement( + URI element, Instant timestamp, OutputReceiver out, BoundedWindow window) { + processElement(element, timestamp, window, null, out); + } + @ProcessElement public void processElement( @DoFn.Element URI element, @Timestamp Instant timestamp, - OutputReceiver out, - BoundedWindow window) { - batch.add(new Element(element, timestamp, window)); + BoundedWindow window, + PaneInfo pane, + OutputReceiver out) { + batch.add(ValueInSingleWindow.of(element, timestamp, window, pane)); if (batch.size() >= batchSize) { - processBatch(out); + flush( + r -> { + final OutputT o = r.getValue(); + final Instant ts = r.getTimestamp(); + final Collection ws = Collections.singleton(r.getWindow()); + final PaneInfo p = r.getPane(); + out.outputWindowedValue(o, ts, ws, p); + }); } } @FinishBundle public void finishBundle(FinishBundleContext context) { - processBatch(context); + flush(p -> context.output(p.getValue(), p.getTimestamp(), p.getWindow())); } @Override @@ -103,51 +120,31 @@ public void populateDisplayData(DisplayData.Builder builder) { .add(DisplayData.item("Keep Downloaded Files", keep)); } - private void processBatch(OutputReceiver outputReceiver) { + private void flush(Consumer> outputFn) { if (batch.isEmpty()) { return; } LOG.info("Processing batch of {}", batch.size()); - List uris = batch.stream().map(e -> e.uri).collect(Collectors.toList()); - remoteFileUtil.download(uris).stream().map(fn::apply).forEach(outputReceiver::output); - if (!keep) { - LOG.info("Deleting batch of {}", batch.size()); - remoteFileUtil.delete(uris); - } - batch.clear(); - } + List uris = batch.stream().map(ValueInSingleWindow::getValue).collect(Collectors.toList()); + List paths = remoteFileUtil.download(uris); - private void processBatch(FinishBundleContext c) { - if (batch.isEmpty()) { - return; - } - LOG.info("Processing batch of {}", batch.size()); - List uris = batch.stream().map(e -> e.uri).collect(Collectors.toList()); - List outputs = - remoteFileUtil.download(uris).stream().map(fn::apply).collect(Collectors.toList()); - // .forEach(c::output); - Iterator i1 = outputs.iterator(); - Iterator i2 = batch.iterator(); - while (i1.hasNext() && i2.hasNext()) { - Element e = i2.next(); - c.output(i1.next(), e.timestamp, e.window); + Iterator> inputIt = batch.iterator(); + Iterator pathIt = paths.iterator(); + while (inputIt.hasNext() && pathIt.hasNext()) { + final ValueInSingleWindow r = inputIt.next(); + final Path path = pathIt.next(); + + final OutputT o = fn.apply(path); + final Instant ts = r.getTimestamp(); + final BoundedWindow w = r.getWindow(); + final PaneInfo p = r.getPane(); + outputFn.accept(ValueInSingleWindow.of(o, ts, w, p)); } + if (!keep) { LOG.info("Deleting batch of {}", batch.size()); remoteFileUtil.delete(uris); } batch.clear(); } - - private class Element { - private URI uri; - private Instant timestamp; - private BoundedWindow window; - - Element(URI uri, Instant timestamp, BoundedWindow window) { - this.uri = uri; - this.timestamp = timestamp; - this.window = window; - } - } } diff --git a/scio-core/src/main/java/com/spotify/scio/util/RemoteFileUtil.java b/scio-core/src/main/java/com/spotify/scio/util/RemoteFileUtil.java index 2467ff5069..bfab544c61 100644 --- a/scio-core/src/main/java/com/spotify/scio/util/RemoteFileUtil.java +++ b/scio-core/src/main/java/com/spotify/scio/util/RemoteFileUtil.java @@ -28,7 +28,9 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; @@ -104,7 +106,12 @@ public Path download(URI src) { */ public List download(List srcs) { try { - return paths.getAll(srcs).values().asList(); + Map results = paths.getAll(srcs); + List paths = new ArrayList<>(srcs.size()); + for (URI src : srcs) { + paths.add(results.get(src)); + } + return paths; } catch (ExecutionException e) { throw new RuntimeException(e); } diff --git a/scio-core/src/test/scala/com/spotify/scio/transforms/FileDownloadDoFnTest.scala b/scio-core/src/test/scala/com/spotify/scio/transforms/FileDownloadDoFnTest.scala index 4760ea58a5..471f7549a4 100644 --- a/scio-core/src/test/scala/com/spotify/scio/transforms/FileDownloadDoFnTest.scala +++ b/scio-core/src/test/scala/com/spotify/scio/transforms/FileDownloadDoFnTest.scala @@ -18,10 +18,10 @@ package com.spotify.scio.transforms import java.nio.file.{Files, Path} - import com.spotify.scio.testing._ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.{Files => GFiles} +import org.joda.time.Instant import scala.jdk.CollectionConverters._ @@ -31,8 +31,13 @@ class FileDownloadDoFnTest extends PipelineSpec { val files = createFiles(tmpDir, 100) runWithContext { sc => val p = sc.parallelize(files.map(_.toUri)).flatMapFile(fn) - p.keys should containInAnyOrder((1 to 100).map(_.toString)) - p.values.distinct should forAll { f: Path => !Files.exists(f) } + + val content = p.keys + val paths = p.values.distinct + + val expected = (1 to 100).map(_.toString) + content should containInAnyOrder(expected) + paths should forAll { f: Path => !Files.exists(f) } } files.foreach(Files.delete) Files.delete(tmpDir) @@ -42,9 +47,23 @@ class FileDownloadDoFnTest extends PipelineSpec { val tmpDir = Files.createTempDirectory("filedofn-") val files = createFiles(tmpDir, 100) runWithContext { sc => - val p = sc.parallelize(files.map(_.toUri)).flatMapFile(fn, 10, false) - p.keys should containInAnyOrder((1 to 100).map(_.toString)) - p.values.distinct should forAll { f: Path => !Files.exists(f) } + // try to use a single bundle so we can check + // elements flushed in processElement as well as + // elements flushed in finishBundle + val p = sc + .parallelize(Seq(files.map(_.toUri).zipWithIndex)) + .flatten + .timestampBy { case (_, i) => new Instant(i + 1) } + .keys + .flatMapFile(fn, 10, false) + .withTimestamp + + val contentAndTimestamp = p.map { case ((i, _), ts) => (i, ts.getMillis) } + val paths = p.map { case ((_, f), _) => f }.distinct + + val expected = (1L to 100L).map(i => (i.toString, i)) + contentAndTimestamp should containInAnyOrder(expected) + paths should forAll { f: Path => !Files.exists(f) } } files.foreach(Files.delete) Files.delete(tmpDir)