Skip to content

Commit

Permalink
chore: don't reimplement Channels.newOutputStream (#4363)
Browse files Browse the repository at this point in the history
Change-Id: I9eafa3fa692db20f77b8a526761a8f76f597df50

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable-hbase/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
igorbernstein2 authored May 23, 2024
1 parent 95628dc commit f244ff9
Showing 1 changed file with 2 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,13 @@
package com.google.cloud.bigtable.beam.sequencefiles;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.DynamicFileDestinations;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
import org.apache.beam.sdk.io.FileBasedSink.Writer;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.util.MimeTypes;
Expand Down Expand Up @@ -155,7 +150,7 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception {
configuration.setStrings("io.serializations", writeOperation.serializationNames);

FSDataOutputStream outputStream =
new FSDataOutputStream(new OutputStreamWrapper(channel), new Statistics("dataflow"));
new FSDataOutputStream(Channels.newOutputStream(channel), new Statistics("dataflow"));
sequenceFile =
SequenceFile.createWriter(
configuration,
Expand All @@ -181,51 +176,4 @@ public void write(KV<K, V> value) throws Exception {
sequenceFile.append(value.getKey(), value.getValue());
}
}

/**
* Adapter to allow Hadoop's {@link SequenceFile} to write to Beam's {@link WritableByteChannel}.
*/
static class OutputStreamWrapper extends OutputStream {
private final WritableByteChannel inner;
private final ByteBuffer singleByteBuffer = ByteBuffer.allocate(1);

/**
* Constructs a new {@link OutputStreamWrapper}.
*
* @param inner An instance of Beam's {@link WritableByteChannel}.
*/
OutputStreamWrapper(WritableByteChannel inner) {
this.inner = inner;
}

/** {@inheritDoc} */
@Override
public void write(byte[] b, int off, int len) throws IOException {
int written = 0;

ByteBuffer byteBuffer = ByteBuffer.wrap(b, off, len);

while (written < len) {
// Workaround Java 9 overridden methods with covariant return types
((Buffer) byteBuffer).position(written + off);
written += this.inner.write(byteBuffer);
}
}

/** {@inheritDoc} */
@Override
public void write(int b) throws IOException {
// Workaround Java 9 overridden methods with covariant return types
((Buffer) singleByteBuffer).clear();
singleByteBuffer.put((byte) b);

int written = 0;

while (written == 0) {
// Workaround Java 9 overridden methods with covariant return types
((Buffer) singleByteBuffer).position(0);
written = this.inner.write(singleByteBuffer);
}
}
}
}

0 comments on commit f244ff9

Please sign in to comment.