diff --git a/README.md b/README.md
index 9f4de367b030..cac60357ee64 100644
--- a/README.md
+++ b/README.md
@@ -31,26 +31,26 @@ implementing HDFS's file system API.
org.alluxio
alluxio-core-client-fs
- 1.8.0
+ 1.8.1
```
#### Gradle
```groovy
-compile 'org.alluxio:alluxio-core-client-fs:1.8.0'
+compile 'org.alluxio:alluxio-core-client-fs:1.8.1'
```
#### Apache Ant
```xml
-
+
```
#### SBT
```
-libraryDependencies += "org.alluxio" % "alluxio-core-client-fs" % "1.8.0"
+libraryDependencies += "org.alluxio" % "alluxio-core-client-fs" % "1.8.1"
```
## Contributing
diff --git a/core/client/fs/src/main/java/alluxio/client/block/AlluxioBlockStore.java b/core/client/fs/src/main/java/alluxio/client/block/AlluxioBlockStore.java
index b02e907663a3..e66b0df6f770 100644
--- a/core/client/fs/src/main/java/alluxio/client/block/AlluxioBlockStore.java
+++ b/core/client/fs/src/main/java/alluxio/client/block/AlluxioBlockStore.java
@@ -285,12 +285,52 @@ public BlockOutStream getOutStream(long blockId, long blockSize, OutStreamOption
WorkerNetAddress address;
FileWriteLocationPolicy locationPolicy = Preconditions.checkNotNull(options.getLocationPolicy(),
PreconditionMessage.FILE_WRITE_LOCATION_POLICY_UNSPECIFIED);
- address = locationPolicy.getWorkerForNextBlock(getEligibleWorkers(), blockSize);
- if (address == null) {
- throw new UnavailableException(
- ExceptionMessage.NO_SPACE_FOR_BLOCK_ON_WORKER.getMessage(blockSize));
+ java.util.Set blockWorkers;
+ blockWorkers = com.google.common.collect.Sets.newHashSet(getEligibleWorkers());
+ // The number of initial copies depends on the write type: if ASYNC_THROUGH, it is the property
+ // "alluxio.user.file.replication.durable" before data has been persisted; otherwise
+ // "alluxio.user.file.replication.min"
+ int initialReplicas = (options.getWriteType() == alluxio.client.WriteType.ASYNC_THROUGH
+ && options.getReplicationDurable() > options.getReplicationMin())
+ ? options.getReplicationDurable() : options.getReplicationMin();
+ if (initialReplicas <= 1) {
+ address = locationPolicy.getWorkerForNextBlock(blockWorkers, blockSize);
+ if (address == null) {
+ throw new UnavailableException(
+ ExceptionMessage.NO_SPACE_FOR_BLOCK_ON_WORKER.getMessage(blockSize));
+ }
+ return getOutStream(blockId, blockSize, address, options);
+ }
+
+ // Group different block workers by their hostnames
+ java.util.Map> blockWorkersByHost =
+ new java.util.HashMap<>();
+ for (BlockWorkerInfo blockWorker : blockWorkers) {
+ String hostName = blockWorker.getNetAddress().getHost();
+ if (blockWorkersByHost.containsKey(hostName)) {
+ blockWorkersByHost.get(hostName).add(blockWorker);
+ } else {
+ blockWorkersByHost.put(hostName, com.google.common.collect.Sets.newHashSet(blockWorker));
+ }
+ }
+
+ // Select N workers on different hosts where N is the value of initialReplicas for this block
+ List workerAddressList = new java.util.ArrayList<>();
+ for (int i = 0; i < initialReplicas; i++) {
+ address = locationPolicy.getWorkerForNextBlock(blockWorkers, blockSize);
+ if (address == null) {
+ break;
+ }
+ workerAddressList.add(address);
+ blockWorkers.removeAll(blockWorkersByHost.get(address.getHost()));
+ }
+ if (workerAddressList.size() < initialReplicas) {
+ throw new alluxio.exception.status.ResourceExhaustedException(String.format(
+ "Not enough workers for replications, %d workers selected but %d required",
+ workerAddressList.size(), initialReplicas));
}
- return getOutStream(blockId, blockSize, address, options);
+ return BlockOutStream
+ .createReplicatedBlockOutStream(mContext, blockId, blockSize, workerAddressList, options);
}
/**
diff --git a/core/client/fs/src/main/java/alluxio/client/block/stream/BlockOutStream.java b/core/client/fs/src/main/java/alluxio/client/block/stream/BlockOutStream.java
index 70108e63125b..18306867a27a 100644
--- a/core/client/fs/src/main/java/alluxio/client/block/stream/BlockOutStream.java
+++ b/core/client/fs/src/main/java/alluxio/client/block/stream/BlockOutStream.java
@@ -95,6 +95,47 @@ public long remaining() {
return mLength - pos - (mCurrentPacket != null ? mCurrentPacket.readableBytes() : 0);
}
+ /**
+ * Creates a new remote block output stream.
+ *
+ * @param context the file system context
+ * @param blockId the block id
+ * @param blockSize the block size
+ * @param workerNetAddresses the worker network addresses
+ * @param options the options
+ * @return the {@link BlockOutStream} instance created
+ */
+ public static BlockOutStream createReplicatedBlockOutStream(FileSystemContext context,
+ long blockId, long blockSize, java.util.List workerNetAddresses,
+ OutStreamOptions options) throws IOException {
+ List packetWriters = new ArrayList<>();
+ for (WorkerNetAddress address: workerNetAddresses) {
+ PacketWriter packetWriter =
+ PacketWriter.Factory.create(context, blockId, blockSize, address, options);
+ packetWriters.add(packetWriter);
+ }
+ return new BlockOutStream(packetWriters, blockSize, workerNetAddresses);
+ }
+
+ /**
+ * Constructs a new {@link BlockOutStream} with only one {@link PacketWriter}.
+ *
+ * @param packetWriters the packet writer
+ * @param length the length of the stream
+ * @param workerNetAddresses the worker network addresses
+ */
+ protected BlockOutStream(List packetWriters, long length,
+ java.util.List workerNetAddresses) {
+ mCloser = Closer.create();
+ mLength = length;
+ mAddress = workerNetAddresses.get(0);
+ mPacketWriters = packetWriters;
+ for (PacketWriter packetWriter : packetWriters) {
+ mCloser.register(packetWriter);
+ }
+ mClosed = false;
+ }
+
@Override
public void write(int b) throws IOException {
Preconditions.checkState(remaining() > 0, PreconditionMessage.ERR_END_OF_BLOCK);
@@ -123,6 +164,38 @@ public void write(byte[] b, int off, int len) throws IOException {
updateCurrentPacket(false);
}
+ /**
+ * Writes the data in the specified byte buf to this output stream.
+ *
+ * @param buf the buffer
+ * @throws IOException
+ */
+ public void write(io.netty.buffer.ByteBuf buf) throws IOException {
+ write(buf, 0, buf.readableBytes());
+ }
+
+ /**
+ * Writes len bytes from the specified byte buf starting at offset off to this output stream.
+ *
+ * @param buf the buffer
+ * @param off the offset
+ * @param len the length
+ */
+ public void write(io.netty.buffer.ByteBuf buf, int off, int len) throws IOException {
+ if (len == 0) {
+ return;
+ }
+
+ while (len > 0) {
+ updateCurrentPacket(false);
+ int toWrite = Math.min(len, mCurrentPacket.writableBytes());
+ mCurrentPacket.writeBytes(buf, off, toWrite);
+ off += toWrite;
+ len -= toWrite;
+ }
+ updateCurrentPacket(false);
+ }
+
@Override
public void flush() throws IOException {
if (mClosed) {
diff --git a/core/client/fs/src/main/java/alluxio/client/file/FileInStream.java b/core/client/fs/src/main/java/alluxio/client/file/FileInStream.java
index 93562c134d29..db57e259b946 100644
--- a/core/client/fs/src/main/java/alluxio/client/file/FileInStream.java
+++ b/core/client/fs/src/main/java/alluxio/client/file/FileInStream.java
@@ -315,6 +315,10 @@ private void closeBlockInStream(BlockInStream stream) throws IOException {
// Send an async cache request to a worker based on read type and passive cache options.
boolean cache = mOptions.getOptions().getReadType().isCache();
+ boolean overReplicated = mStatus.getReplicationMax() > 0
+ && mStatus.getFileBlockInfos().get((int) (getPos() / mBlockSize))
+ .getBlockInfo().getLocations().size() >= mStatus.getReplicationMax();
+ cache = cache && !overReplicated;
boolean passiveCache = Configuration.getBoolean(PropertyKey.USER_FILE_PASSIVE_CACHE_ENABLED);
long channelTimeout = Configuration.getMs(PropertyKey.USER_NETWORK_NETTY_TIMEOUT_MS);
if (cache) {
diff --git a/core/client/fs/src/main/java/alluxio/client/file/options/CreateFileOptions.java b/core/client/fs/src/main/java/alluxio/client/file/options/CreateFileOptions.java
index 08cbe384cc78..74de98d2de4f 100644
--- a/core/client/fs/src/main/java/alluxio/client/file/options/CreateFileOptions.java
+++ b/core/client/fs/src/main/java/alluxio/client/file/options/CreateFileOptions.java
@@ -69,6 +69,9 @@ private CreateFileOptions() {
PropertyKey.USER_FILE_WRITE_LOCATION_POLICY), new Class[] {}, new Object[] {});
mWriteTier = Configuration.getInt(PropertyKey.USER_FILE_WRITE_TIER_DEFAULT);
mWriteType = Configuration.getEnum(PropertyKey.USER_FILE_WRITE_TYPE_DEFAULT, WriteType.class);
+ mReplicationDurable = Configuration.getInt(PropertyKey.USER_FILE_REPLICATION_DURABLE);
+ mReplicationMax = Configuration.getInt(PropertyKey.USER_FILE_REPLICATION_MAX);
+ mReplicationMin = Configuration.getInt(PropertyKey.USER_FILE_REPLICATION_MIN);
mMode = Mode.defaults().applyFileUMask();
}
diff --git a/integration/docker/Dockerfile b/integration/docker/Dockerfile
index fb1f639c00f1..fad3dc610555 100644
--- a/integration/docker/Dockerfile
+++ b/integration/docker/Dockerfile
@@ -11,7 +11,7 @@
FROM openjdk:8-jdk-alpine
-ARG ALLUXIO_TARBALL=http://downloads.alluxio.org/downloads/files/1.8.0/alluxio-1.8.0-bin.tar.gz
+ARG ALLUXIO_TARBALL=http://downloads.alluxio.org/downloads/files/1.8.1/alluxio-1.8.1-bin.tar.gz
RUN apk add --update bash && \
rm -rf /var/cache/apk/*
diff --git a/integration/docker/Dockerfile.fuse b/integration/docker/Dockerfile.fuse
index c50c475b47ac..da6677af003f 100644
--- a/integration/docker/Dockerfile.fuse
+++ b/integration/docker/Dockerfile.fuse
@@ -11,7 +11,7 @@
FROM ubuntu:16.04
-ARG ALLUXIO_TARBALL=http://downloads.alluxio.org/downloads/files/1.8.0/alluxio-1.8.0-bin.tar.gz
+ARG ALLUXIO_TARBALL=http://downloads.alluxio.org/downloads/files/1.8.1/alluxio-1.8.1-bin.tar.gz
ENV ENABLE_FUSE true
RUN apt-get update && apt-get install -y --no-install-recommends software-properties-common && \