Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix file batch operation #5519

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import com.spotify.scio.util.RemoteFileUtil;
import java.net.URI;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,7 +38,7 @@ public class FileDownloadDoFn<OutputT> extends DoFn<URI, OutputT> {

private static final Logger LOG = LoggerFactory.getLogger(FileDownloadDoFn.class);

private final List<Element> batch;
private final List<ValueInSingleWindow<URI>> batch;
private final RemoteFileUtil remoteFileUtil;
private final SerializableFunction<Path, OutputT> fn;
private final int batchSize;
Expand Down Expand Up @@ -78,21 +79,37 @@ public void startBundle(StartBundleContext context) {
this.batch.clear();
}

// kept for binary compatibility. Must not be used
// TODO: remove in 0.15.0
@Deprecated
public void processElement(
URI element, Instant timestamp, OutputReceiver<OutputT> out, BoundedWindow window) {
processElement(element, timestamp, window, null, out);
}

@ProcessElement
public void processElement(
@DoFn.Element URI element,
@Timestamp Instant timestamp,
OutputReceiver<OutputT> out,
BoundedWindow window) {
batch.add(new Element(element, timestamp, window));
BoundedWindow window,
PaneInfo pane,
OutputReceiver<OutputT> out) {
batch.add(ValueInSingleWindow.of(element, timestamp, window, pane));
if (batch.size() >= batchSize) {
processBatch(out);
flush(
r -> {
final OutputT o = r.getValue();
final Instant ts = r.getTimestamp();
final Collection<BoundedWindow> ws = Collections.singleton(r.getWindow());
final PaneInfo p = r.getPane();
out.outputWindowedValue(o, ts, ws, p);
});
}
}

@FinishBundle
public void finishBundle(FinishBundleContext context) {
processBatch(context);
flush(p -> context.output(p.getValue(), p.getTimestamp(), p.getWindow()));
}

@Override
Expand All @@ -103,51 +120,31 @@ public void populateDisplayData(DisplayData.Builder builder) {
.add(DisplayData.item("Keep Downloaded Files", keep));
}

private void processBatch(OutputReceiver<OutputT> outputReceiver) {
private void flush(Consumer<ValueInSingleWindow<OutputT>> outputFn) {
if (batch.isEmpty()) {
return;
}
LOG.info("Processing batch of {}", batch.size());
List<URI> uris = batch.stream().map(e -> e.uri).collect(Collectors.toList());
remoteFileUtil.download(uris).stream().map(fn::apply).forEach(outputReceiver::output);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

original element metadata are not passed back

if (!keep) {
LOG.info("Deleting batch of {}", batch.size());
remoteFileUtil.delete(uris);
}
batch.clear();
}
List<URI> uris = batch.stream().map(ValueInSingleWindow::getValue).collect(Collectors.toList());
List<Path> paths = remoteFileUtil.download(uris);

private void processBatch(FinishBundleContext c) {
if (batch.isEmpty()) {
return;
}
LOG.info("Processing batch of {}", batch.size());
List<URI> uris = batch.stream().map(e -> e.uri).collect(Collectors.toList());
List<OutputT> outputs =
remoteFileUtil.download(uris).stream().map(fn::apply).collect(Collectors.toList());
// .forEach(c::output);
Iterator<OutputT> i1 = outputs.iterator();
Iterator<Element> i2 = batch.iterator();
while (i1.hasNext() && i2.hasNext()) {
Comment on lines -129 to -131
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parts assumes that List<Output> is 1:1 mapping of List<URI>

Element e = i2.next();
c.output(i1.next(), e.timestamp, e.window);
Iterator<ValueInSingleWindow<URI>> inputIt = batch.iterator();
Iterator<Path> pathIt = paths.iterator();
while (inputIt.hasNext() && pathIt.hasNext()) {
final ValueInSingleWindow<URI> r = inputIt.next();
final Path path = pathIt.next();

final OutputT o = fn.apply(path);
final Instant ts = r.getTimestamp();
final BoundedWindow w = r.getWindow();
final PaneInfo p = r.getPane();
outputFn.accept(ValueInSingleWindow.of(o, ts, w, p));
}

if (!keep) {
LOG.info("Deleting batch of {}", batch.size());
remoteFileUtil.delete(uris);
}
batch.clear();
}

private class Element {
private URI uri;
private Instant timestamp;
private BoundedWindow window;

Element(URI uri, Instant timestamp, BoundedWindow window) {
this.uri = uri;
this.timestamp = timestamp;
this.window = window;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
Expand Down Expand Up @@ -104,7 +106,12 @@ public Path download(URI src) {
*/
public List<Path> download(List<URI> srcs) {
try {
return paths.getAll(srcs).values().asList();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getAll(srcs) returns a Map<URI, Path>. Using values give no guarantee that returned Paths are ordered.

Map<URI, Path> results = paths.getAll(srcs);
List<Path> paths = new ArrayList<>(srcs.size());
for (URI src : srcs) {
paths.add(results.get(src));
}
return paths;
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package com.spotify.scio.transforms

import java.nio.file.{Files, Path}

import com.spotify.scio.testing._
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.{Files => GFiles}
import org.joda.time.Instant

import scala.jdk.CollectionConverters._

Expand All @@ -31,8 +31,13 @@ class FileDownloadDoFnTest extends PipelineSpec {
val files = createFiles(tmpDir, 100)
runWithContext { sc =>
val p = sc.parallelize(files.map(_.toUri)).flatMapFile(fn)
p.keys should containInAnyOrder((1 to 100).map(_.toString))
p.values.distinct should forAll { f: Path => !Files.exists(f) }

val content = p.keys
val paths = p.values.distinct

val expected = (1 to 100).map(_.toString)
content should containInAnyOrder(expected)
paths should forAll { f: Path => !Files.exists(f) }
}
files.foreach(Files.delete)
Files.delete(tmpDir)
Expand All @@ -42,9 +47,23 @@ class FileDownloadDoFnTest extends PipelineSpec {
val tmpDir = Files.createTempDirectory("filedofn-")
val files = createFiles(tmpDir, 100)
runWithContext { sc =>
val p = sc.parallelize(files.map(_.toUri)).flatMapFile(fn, 10, false)
p.keys should containInAnyOrder((1 to 100).map(_.toString))
p.values.distinct should forAll { f: Path => !Files.exists(f) }
// try to use a single bundle so we can check
// elements flushed in processElement as well as
// elements flushed in finishBundle
val p = sc
.parallelize(Seq(files.map(_.toUri).zipWithIndex))
.flatten
.timestampBy { case (_, i) => new Instant(i + 1) }
.keys
.flatMapFile(fn, 10, false)
.withTimestamp

val contentAndTimestamp = p.map { case ((i, _), ts) => (i, ts.getMillis) }
val paths = p.map { case ((_, f), _) => f }.distinct

val expected = (1L to 100L).map(i => (i.toString, i))
contentAndTimestamp should containInAnyOrder(expected)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran on the original code, got non expected item (1, 10) -> (1st element was flushed while processing the 10th)

paths should forAll { f: Path => !Files.exists(f) }
}
files.foreach(Files.delete)
Files.delete(tmpDir)
Expand Down