Skip to content

Commit

Permalink
Merge pull request #2 from ZacBlanco/ALLUXIO-3327
Browse files Browse the repository at this point in the history
client changes + minor versioning changes
  • Loading branch information
apc999 authored Oct 2, 2018
2 parents d43a763 + 5c6d7d7 commit e01a760
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 11 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,26 @@ implementing HDFS's file system API.
<dependency>
<groupId>org.alluxio</groupId>
<artifactId>alluxio-core-client-fs</artifactId>
<version>1.8.0</version>
<version>1.8.1</version>
</dependency>
```

#### Gradle

```groovy
compile 'org.alluxio:alluxio-core-client-fs:1.8.0'
compile 'org.alluxio:alluxio-core-client-fs:1.8.1'
```

#### Apache Ant
```xml
<dependency org="org.alluxio" name="alluxio" rev="1.8.0">
<dependency org="org.alluxio" name="alluxio" rev="1.8.1">
<artifact name="alluxio-core-client-fs" type="jar" />
</dependency>
```

#### SBT
```
libraryDependencies += "org.alluxio" % "alluxio-core-client-fs" % "1.8.0"
libraryDependencies += "org.alluxio" % "alluxio-core-client-fs" % "1.8.1"
```

## Contributing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,12 +285,52 @@ public BlockOutStream getOutStream(long blockId, long blockSize, OutStreamOption
WorkerNetAddress address;
FileWriteLocationPolicy locationPolicy = Preconditions.checkNotNull(options.getLocationPolicy(),
PreconditionMessage.FILE_WRITE_LOCATION_POLICY_UNSPECIFIED);
address = locationPolicy.getWorkerForNextBlock(getEligibleWorkers(), blockSize);
if (address == null) {
throw new UnavailableException(
ExceptionMessage.NO_SPACE_FOR_BLOCK_ON_WORKER.getMessage(blockSize));
java.util.Set<BlockWorkerInfo> blockWorkers;
blockWorkers = com.google.common.collect.Sets.newHashSet(getEligibleWorkers());
// The number of initial copies depends on the write type: if ASYNC_THROUGH, it is the property
// "alluxio.user.file.replication.durable" before data has been persisted; otherwise
// "alluxio.user.file.replication.min"
int initialReplicas = (options.getWriteType() == alluxio.client.WriteType.ASYNC_THROUGH
&& options.getReplicationDurable() > options.getReplicationMin())
? options.getReplicationDurable() : options.getReplicationMin();
if (initialReplicas <= 1) {
address = locationPolicy.getWorkerForNextBlock(blockWorkers, blockSize);
if (address == null) {
throw new UnavailableException(
ExceptionMessage.NO_SPACE_FOR_BLOCK_ON_WORKER.getMessage(blockSize));
}
return getOutStream(blockId, blockSize, address, options);
}

// Group different block workers by their hostnames
java.util.Map<String, java.util.Set<BlockWorkerInfo>> blockWorkersByHost =
new java.util.HashMap<>();
for (BlockWorkerInfo blockWorker : blockWorkers) {
String hostName = blockWorker.getNetAddress().getHost();
if (blockWorkersByHost.containsKey(hostName)) {
blockWorkersByHost.get(hostName).add(blockWorker);
} else {
blockWorkersByHost.put(hostName, com.google.common.collect.Sets.newHashSet(blockWorker));
}
}

// Select N workers on different hosts where N is the value of initialReplicas for this block
List<WorkerNetAddress> workerAddressList = new java.util.ArrayList<>();
for (int i = 0; i < initialReplicas; i++) {
address = locationPolicy.getWorkerForNextBlock(blockWorkers, blockSize);
if (address == null) {
break;
}
workerAddressList.add(address);
blockWorkers.removeAll(blockWorkersByHost.get(address.getHost()));
}
if (workerAddressList.size() < initialReplicas) {
throw new alluxio.exception.status.ResourceExhaustedException(String.format(
"Not enough workers for replications, %d workers selected but %d required",
workerAddressList.size(), initialReplicas));
}
return getOutStream(blockId, blockSize, address, options);
return BlockOutStream
.createReplicatedBlockOutStream(mContext, blockId, blockSize, workerAddressList, options);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,47 @@ public long remaining() {
return mLength - pos - (mCurrentPacket != null ? mCurrentPacket.readableBytes() : 0);
}

/**
* Creates a new remote block output stream.
*
* @param context the file system context
* @param blockId the block id
* @param blockSize the block size
* @param workerNetAddresses the worker network addresses
* @param options the options
* @return the {@link BlockOutStream} instance created
*/
public static BlockOutStream createReplicatedBlockOutStream(FileSystemContext context,
long blockId, long blockSize, java.util.List<WorkerNetAddress> workerNetAddresses,
OutStreamOptions options) throws IOException {
List<PacketWriter> packetWriters = new ArrayList<>();
for (WorkerNetAddress address: workerNetAddresses) {
PacketWriter packetWriter =
PacketWriter.Factory.create(context, blockId, blockSize, address, options);
packetWriters.add(packetWriter);
}
return new BlockOutStream(packetWriters, blockSize, workerNetAddresses);
}

/**
* Constructs a new {@link BlockOutStream} with only one {@link PacketWriter}.
*
* @param packetWriters the packet writer
* @param length the length of the stream
* @param workerNetAddresses the worker network addresses
*/
protected BlockOutStream(List<PacketWriter> packetWriters, long length,
java.util.List<WorkerNetAddress> workerNetAddresses) {
mCloser = Closer.create();
mLength = length;
mAddress = workerNetAddresses.get(0);
mPacketWriters = packetWriters;
for (PacketWriter packetWriter : packetWriters) {
mCloser.register(packetWriter);
}
mClosed = false;
}

@Override
public void write(int b) throws IOException {
Preconditions.checkState(remaining() > 0, PreconditionMessage.ERR_END_OF_BLOCK);
Expand Down Expand Up @@ -123,6 +164,38 @@ public void write(byte[] b, int off, int len) throws IOException {
updateCurrentPacket(false);
}

/**
* Writes the data in the specified byte buf to this output stream.
*
* @param buf the buffer
* @throws IOException
*/
public void write(io.netty.buffer.ByteBuf buf) throws IOException {
write(buf, 0, buf.readableBytes());
}

/**
* Writes len bytes from the specified byte buf starting at offset off to this output stream.
*
* @param buf the buffer
* @param off the offset
* @param len the length
*/
public void write(io.netty.buffer.ByteBuf buf, int off, int len) throws IOException {
if (len == 0) {
return;
}

while (len > 0) {
updateCurrentPacket(false);
int toWrite = Math.min(len, mCurrentPacket.writableBytes());
mCurrentPacket.writeBytes(buf, off, toWrite);
off += toWrite;
len -= toWrite;
}
updateCurrentPacket(false);
}

@Override
public void flush() throws IOException {
if (mClosed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ private void closeBlockInStream(BlockInStream stream) throws IOException {

// Send an async cache request to a worker based on read type and passive cache options.
boolean cache = mOptions.getOptions().getReadType().isCache();
boolean overReplicated = mStatus.getReplicationMax() > 0
&& mStatus.getFileBlockInfos().get((int) (getPos() / mBlockSize))
.getBlockInfo().getLocations().size() >= mStatus.getReplicationMax();
cache = cache && !overReplicated;
boolean passiveCache = Configuration.getBoolean(PropertyKey.USER_FILE_PASSIVE_CACHE_ENABLED);
long channelTimeout = Configuration.getMs(PropertyKey.USER_NETWORK_NETTY_TIMEOUT_MS);
if (cache) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ private CreateFileOptions() {
PropertyKey.USER_FILE_WRITE_LOCATION_POLICY), new Class[] {}, new Object[] {});
mWriteTier = Configuration.getInt(PropertyKey.USER_FILE_WRITE_TIER_DEFAULT);
mWriteType = Configuration.getEnum(PropertyKey.USER_FILE_WRITE_TYPE_DEFAULT, WriteType.class);
mReplicationDurable = Configuration.getInt(PropertyKey.USER_FILE_REPLICATION_DURABLE);
mReplicationMax = Configuration.getInt(PropertyKey.USER_FILE_REPLICATION_MAX);
mReplicationMin = Configuration.getInt(PropertyKey.USER_FILE_REPLICATION_MIN);
mMode = Mode.defaults().applyFileUMask();
}

Expand Down
2 changes: 1 addition & 1 deletion integration/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

FROM openjdk:8-jdk-alpine

ARG ALLUXIO_TARBALL=http://downloads.alluxio.org/downloads/files/1.8.0/alluxio-1.8.0-bin.tar.gz
ARG ALLUXIO_TARBALL=http://downloads.alluxio.org/downloads/files/1.8.1/alluxio-1.8.1-bin.tar.gz

RUN apk add --update bash && \
rm -rf /var/cache/apk/*
Expand Down
2 changes: 1 addition & 1 deletion integration/docker/Dockerfile.fuse
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

FROM ubuntu:16.04

ARG ALLUXIO_TARBALL=http://downloads.alluxio.org/downloads/files/1.8.0/alluxio-1.8.0-bin.tar.gz
ARG ALLUXIO_TARBALL=http://downloads.alluxio.org/downloads/files/1.8.1/alluxio-1.8.1-bin.tar.gz
ENV ENABLE_FUSE true

RUN apt-get update && apt-get install -y --no-install-recommends software-properties-common && \
Expand Down

0 comments on commit e01a760

Please sign in to comment.