Skip to content

Commit

Permalink
Fix file batch operation (#5519)
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones authored Oct 25, 2024
1 parent 0a450e9 commit 489bd7a
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import com.spotify.scio.util.RemoteFileUtil;
import java.net.URI;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,7 +38,7 @@ public class FileDownloadDoFn<OutputT> extends DoFn<URI, OutputT> {

private static final Logger LOG = LoggerFactory.getLogger(FileDownloadDoFn.class);

private final List<Element> batch;
private final List<ValueInSingleWindow<URI>> batch;
private final RemoteFileUtil remoteFileUtil;
private final SerializableFunction<Path, OutputT> fn;
private final int batchSize;
Expand Down Expand Up @@ -78,21 +79,37 @@ public void startBundle(StartBundleContext context) {
this.batch.clear();
}

// kept for binary compatibility. Must not be used
// TODO: remove in 0.15.0
@Deprecated
public void processElement(
URI element, Instant timestamp, OutputReceiver<OutputT> out, BoundedWindow window) {
processElement(element, timestamp, window, null, out);
}

@ProcessElement
public void processElement(
@DoFn.Element URI element,
@Timestamp Instant timestamp,
OutputReceiver<OutputT> out,
BoundedWindow window) {
batch.add(new Element(element, timestamp, window));
BoundedWindow window,
PaneInfo pane,
OutputReceiver<OutputT> out) {
batch.add(ValueInSingleWindow.of(element, timestamp, window, pane));
if (batch.size() >= batchSize) {
processBatch(out);
flush(
r -> {
final OutputT o = r.getValue();
final Instant ts = r.getTimestamp();
final Collection<BoundedWindow> ws = Collections.singleton(r.getWindow());
final PaneInfo p = r.getPane();
out.outputWindowedValue(o, ts, ws, p);
});
}
}

@FinishBundle
public void finishBundle(FinishBundleContext context) {
processBatch(context);
flush(p -> context.output(p.getValue(), p.getTimestamp(), p.getWindow()));
}

@Override
Expand All @@ -103,51 +120,31 @@ public void populateDisplayData(DisplayData.Builder builder) {
.add(DisplayData.item("Keep Downloaded Files", keep));
}

private void processBatch(OutputReceiver<OutputT> outputReceiver) {
private void flush(Consumer<ValueInSingleWindow<OutputT>> outputFn) {
if (batch.isEmpty()) {
return;
}
LOG.info("Processing batch of {}", batch.size());
List<URI> uris = batch.stream().map(e -> e.uri).collect(Collectors.toList());
remoteFileUtil.download(uris).stream().map(fn::apply).forEach(outputReceiver::output);
if (!keep) {
LOG.info("Deleting batch of {}", batch.size());
remoteFileUtil.delete(uris);
}
batch.clear();
}
List<URI> uris = batch.stream().map(ValueInSingleWindow::getValue).collect(Collectors.toList());
List<Path> paths = remoteFileUtil.download(uris);

private void processBatch(FinishBundleContext c) {
if (batch.isEmpty()) {
return;
}
LOG.info("Processing batch of {}", batch.size());
List<URI> uris = batch.stream().map(e -> e.uri).collect(Collectors.toList());
List<OutputT> outputs =
remoteFileUtil.download(uris).stream().map(fn::apply).collect(Collectors.toList());
// .forEach(c::output);
Iterator<OutputT> i1 = outputs.iterator();
Iterator<Element> i2 = batch.iterator();
while (i1.hasNext() && i2.hasNext()) {
Element e = i2.next();
c.output(i1.next(), e.timestamp, e.window);
Iterator<ValueInSingleWindow<URI>> inputIt = batch.iterator();
Iterator<Path> pathIt = paths.iterator();
while (inputIt.hasNext() && pathIt.hasNext()) {
final ValueInSingleWindow<URI> r = inputIt.next();
final Path path = pathIt.next();

final OutputT o = fn.apply(path);
final Instant ts = r.getTimestamp();
final BoundedWindow w = r.getWindow();
final PaneInfo p = r.getPane();
outputFn.accept(ValueInSingleWindow.of(o, ts, w, p));
}

if (!keep) {
LOG.info("Deleting batch of {}", batch.size());
remoteFileUtil.delete(uris);
}
batch.clear();
}

private class Element {
private URI uri;
private Instant timestamp;
private BoundedWindow window;

Element(URI uri, Instant timestamp, BoundedWindow window) {
this.uri = uri;
this.timestamp = timestamp;
this.window = window;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
Expand Down Expand Up @@ -104,7 +106,12 @@ public Path download(URI src) {
*/
public List<Path> download(List<URI> srcs) {
try {
return paths.getAll(srcs).values().asList();
Map<URI, Path> results = paths.getAll(srcs);
List<Path> paths = new ArrayList<>(srcs.size());
for (URI src : srcs) {
paths.add(results.get(src));
}
return paths;
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package com.spotify.scio.transforms

import java.nio.file.{Files, Path}

import com.spotify.scio.testing._
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.{Files => GFiles}
import org.joda.time.Instant

import scala.jdk.CollectionConverters._

Expand All @@ -31,8 +31,13 @@ class FileDownloadDoFnTest extends PipelineSpec {
val files = createFiles(tmpDir, 100)
runWithContext { sc =>
val p = sc.parallelize(files.map(_.toUri)).flatMapFile(fn)
p.keys should containInAnyOrder((1 to 100).map(_.toString))
p.values.distinct should forAll { f: Path => !Files.exists(f) }

val content = p.keys
val paths = p.values.distinct

val expected = (1 to 100).map(_.toString)
content should containInAnyOrder(expected)
paths should forAll { f: Path => !Files.exists(f) }
}
files.foreach(Files.delete)
Files.delete(tmpDir)
Expand All @@ -42,9 +47,23 @@ class FileDownloadDoFnTest extends PipelineSpec {
val tmpDir = Files.createTempDirectory("filedofn-")
val files = createFiles(tmpDir, 100)
runWithContext { sc =>
val p = sc.parallelize(files.map(_.toUri)).flatMapFile(fn, 10, false)
p.keys should containInAnyOrder((1 to 100).map(_.toString))
p.values.distinct should forAll { f: Path => !Files.exists(f) }
// try to use a single bundle so we can check
// elements flushed in processElement as well as
// elements flushed in finishBundle
val p = sc
.parallelize(Seq(files.map(_.toUri).zipWithIndex))
.flatten
.timestampBy { case (_, i) => new Instant(i + 1) }
.keys
.flatMapFile(fn, 10, false)
.withTimestamp

val contentAndTimestamp = p.map { case ((i, _), ts) => (i, ts.getMillis) }
val paths = p.map { case ((_, f), _) => f }.distinct

val expected = (1L to 100L).map(i => (i.toString, i))
contentAndTimestamp should containInAnyOrder(expected)
paths should forAll { f: Path => !Files.exists(f) }
}
files.foreach(Files.delete)
Files.delete(tmpDir)
Expand Down

0 comments on commit 489bd7a

Please sign in to comment.