Skip to content

Commit

Permalink
Fix file batch operation
Browse files Browse the repository at this point in the history
RemoteFileUtil did not preserve element ordering wich was assumed in the
FileDownloadDoFn

Pane information was not propagated
  • Loading branch information
RustedBones committed Oct 24, 2024
1 parent 0a450e9 commit fac16a0
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import com.spotify.scio.util.RemoteFileUtil;
import java.net.URI;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,7 +38,7 @@ public class FileDownloadDoFn<OutputT> extends DoFn<URI, OutputT> {

private static final Logger LOG = LoggerFactory.getLogger(FileDownloadDoFn.class);

private final List<Element> batch;
private final List<ValueInSingleWindow<URI>> batch;
private final RemoteFileUtil remoteFileUtil;
private final SerializableFunction<Path, OutputT> fn;
private final int batchSize;
Expand Down Expand Up @@ -78,21 +79,37 @@ public void startBundle(StartBundleContext context) {
this.batch.clear();
}

// kept for binary compatibility. Must not be used
// TODO: remove in 0.15.0
@Deprecated
public void processElement(
URI element, Instant timestamp, OutputReceiver<OutputT> out, BoundedWindow window) {
processElement(element, timestamp, window, null, out);
}

@ProcessElement
public void processElement(
@DoFn.Element URI element,
@Timestamp Instant timestamp,
OutputReceiver<OutputT> out,
BoundedWindow window) {
batch.add(new Element(element, timestamp, window));
BoundedWindow window,
PaneInfo pane,
OutputReceiver<OutputT> out) {
batch.add(ValueInSingleWindow.of(element, timestamp, window, pane));
if (batch.size() >= batchSize) {
processBatch(out);
flush(
r -> {
final OutputT o = r.getValue();
final Instant ts = r.getTimestamp();
final Collection<BoundedWindow> ws = Collections.singleton(r.getWindow());
final PaneInfo p = r.getPane();
out.outputWindowedValue(o, ts, ws, p);
});
}
}

@FinishBundle
public void finishBundle(FinishBundleContext context) {
processBatch(context);
flush(p -> context.output(p.getValue(), p.getTimestamp(), p.getWindow()));
}

@Override
Expand All @@ -103,51 +120,31 @@ public void populateDisplayData(DisplayData.Builder builder) {
.add(DisplayData.item("Keep Downloaded Files", keep));
}

private void processBatch(OutputReceiver<OutputT> outputReceiver) {
private void flush(Consumer<ValueInSingleWindow<OutputT>> outputFn) {
if (batch.isEmpty()) {
return;
}
LOG.info("Processing batch of {}", batch.size());
List<URI> uris = batch.stream().map(e -> e.uri).collect(Collectors.toList());
remoteFileUtil.download(uris).stream().map(fn::apply).forEach(outputReceiver::output);
if (!keep) {
LOG.info("Deleting batch of {}", batch.size());
remoteFileUtil.delete(uris);
}
batch.clear();
}
List<URI> uris = batch.stream().map(ValueInSingleWindow::getValue).collect(Collectors.toList());
List<Path> paths = remoteFileUtil.download(uris);

private void processBatch(FinishBundleContext c) {
if (batch.isEmpty()) {
return;
}
LOG.info("Processing batch of {}", batch.size());
List<URI> uris = batch.stream().map(e -> e.uri).collect(Collectors.toList());
List<OutputT> outputs =
remoteFileUtil.download(uris).stream().map(fn::apply).collect(Collectors.toList());
// .forEach(c::output);
Iterator<OutputT> i1 = outputs.iterator();
Iterator<Element> i2 = batch.iterator();
while (i1.hasNext() && i2.hasNext()) {
Element e = i2.next();
c.output(i1.next(), e.timestamp, e.window);
Iterator<ValueInSingleWindow<URI>> inputIt = batch.iterator();
Iterator<Path> pathIt = paths.iterator();
while (inputIt.hasNext() && pathIt.hasNext()) {
final ValueInSingleWindow<URI> r = inputIt.next();
final Path path = pathIt.next();

final OutputT o = fn.apply(path);
final Instant ts = r.getTimestamp();
final BoundedWindow w = r.getWindow();
final PaneInfo p = r.getPane();
outputFn.accept(ValueInSingleWindow.of(o, ts, w, p));
}

if (!keep) {
LOG.info("Deleting batch of {}", batch.size());
remoteFileUtil.delete(uris);
}
batch.clear();
}

private class Element {
private URI uri;
private Instant timestamp;
private BoundedWindow window;

Element(URI uri, Instant timestamp, BoundedWindow window) {
this.uri = uri;
this.timestamp = timestamp;
this.window = window;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
Expand Down Expand Up @@ -104,7 +106,12 @@ public Path download(URI src) {
*/
public List<Path> download(List<URI> srcs) {
try {
return paths.getAll(srcs).values().asList();
Map<URI, Path> results = paths.getAll(srcs);
List<Path> paths = new ArrayList<>(srcs.size());
for (URI src : srcs) {
paths.add(results.get(src));
}
return paths;
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package com.spotify.scio.transforms

import java.nio.file.{Files, Path}

import com.spotify.scio.testing._
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.{Files => GFiles}
import org.joda.time.Instant

import scala.jdk.CollectionConverters._

Expand All @@ -31,8 +31,13 @@ class FileDownloadDoFnTest extends PipelineSpec {
val files = createFiles(tmpDir, 100)
runWithContext { sc =>
val p = sc.parallelize(files.map(_.toUri)).flatMapFile(fn)
p.keys should containInAnyOrder((1 to 100).map(_.toString))
p.values.distinct should forAll { f: Path => !Files.exists(f) }

val content = p.keys
val paths = p.values.distinct

val expected = (1 to 100).map(_.toString)
content should containInAnyOrder(expected)
paths should forAll { f: Path => !Files.exists(f) }
}
files.foreach(Files.delete)
Files.delete(tmpDir)
Expand All @@ -42,9 +47,23 @@ class FileDownloadDoFnTest extends PipelineSpec {
val tmpDir = Files.createTempDirectory("filedofn-")
val files = createFiles(tmpDir, 100)
runWithContext { sc =>
val p = sc.parallelize(files.map(_.toUri)).flatMapFile(fn, 10, false)
p.keys should containInAnyOrder((1 to 100).map(_.toString))
p.values.distinct should forAll { f: Path => !Files.exists(f) }
// try to use a single bundle so we can check
// elements flushed in processElement as well as
// elements flushed in finishBundle
val p = sc
.parallelize(Seq(files.map(_.toUri).zipWithIndex))
.flatten
.timestampBy { case (_, i) => new Instant(i + 1) }
.keys
.flatMapFile(fn, 10, false)
.withTimestamp

val contentAndTimestamp = p.map { case ((i, _), ts) => (i, ts.getMillis) }
val paths = p.map { case ((_, f), _) => f }.distinct

val expected = (1L to 100L).map(i => (i.toString, i))
contentAndTimestamp should containInAnyOrder(expected)
paths should forAll { f: Path => !Files.exists(f) }
}
files.foreach(Files.delete)
Files.delete(tmpDir)
Expand Down

0 comments on commit fac16a0

Please sign in to comment.