Skip to content

Commit

Permalink
Add doPrivilege blocks for socket connect ops in repository-hdfs (#22793
Browse files Browse the repository at this point in the history
)

This is related to #22116. The repository-hdfs plugin opens socket
connections. As SocketPermission is transitioned out of core, hdfs
will require connect permission. This pull request wraps operations
that require this permission in doPrivileged blocks.
  • Loading branch information
Tim-Brooks authored Jan 27, 2017
1 parent aad51d4 commit eb4562d
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,28 @@
package org.elasticsearch.repositories.hdfs;

import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation;

import java.io.BufferedInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
Expand All @@ -56,12 +61,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
@Override
public boolean blobExists(String blobName) {
try {
return store.execute(new Operation<Boolean>() {
@Override
public Boolean run(FileContext fileContext) throws IOException {
return fileContext.util().exists(new Path(path, blobName));
}
});
return store.execute(fileContext -> fileContext.util().exists(new Path(path, blobName)));
} catch (Exception e) {
return false;
}
Expand All @@ -73,22 +73,14 @@ public void deleteBlob(String blobName) throws IOException {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}

store.execute(new Operation<Boolean>() {
@Override
public Boolean run(FileContext fileContext) throws IOException {
return fileContext.delete(new Path(path, blobName), true);
}
});
store.execute(fileContext -> fileContext.delete(new Path(path, blobName), true));
}

@Override
public void move(String sourceBlobName, String targetBlobName) throws IOException {
store.execute(new Operation<Void>() {
@Override
public Void run(FileContext fileContext) throws IOException {
fileContext.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName));
return null;
}
store.execute((Operation<Void>) fileContext -> {
fileContext.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName));
return null;
});
}

Expand All @@ -98,58 +90,44 @@ public InputStream readBlob(String blobName) throws IOException {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}
// FSDataInputStream does buffering internally
return store.execute(new Operation<InputStream>() {
@Override
public InputStream run(FileContext fileContext) throws IOException {
return fileContext.open(new Path(path, blobName), bufferSize);
}
});
// FSDataInputStream can open connections on read() or skip() so we wrap in
// HDFSPrivilegedInputSteam which will ensure that underlying methods will
// be called with the proper privileges.
return store.execute(fileContext -> new HDFSPrivilegedInputSteam(fileContext.open(new Path(path, blobName), bufferSize)));
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
if (blobExists(blobName)) {
throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite");
}
store.execute(new Operation<Void>() {
@Override
public Void run(FileContext fileContext) throws IOException {
Path blob = new Path(path, blobName);
// we pass CREATE, which means it fails if a blob already exists.
// NOTE: this behavior differs from FSBlobContainer, which passes TRUNCATE_EXISTING
// that should be fixed there, no need to bring truncation into this, give the user an error.
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK);
CreateOpts[] opts = { CreateOpts.bufferSize(bufferSize) };
try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) {
int bytesRead;
byte[] buffer = new byte[bufferSize];
while ((bytesRead = inputStream.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
// For safety we also hsync each write as well, because of its docs:
// SYNC_BLOCK - to force closed blocks to the disk device
// "In addition Syncable.hsync() should be called after each write,
// if true synchronous behavior is required"
stream.hsync();
}
store.execute((Operation<Void>) fileContext -> {
Path blob = new Path(path, blobName);
// we pass CREATE, which means it fails if a blob already exists.
// NOTE: this behavior differs from FSBlobContainer, which passes TRUNCATE_EXISTING
// that should be fixed there, no need to bring truncation into this, give the user an error.
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK);
CreateOpts[] opts = {CreateOpts.bufferSize(bufferSize)};
try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) {
int bytesRead;
byte[] buffer = new byte[bufferSize];
while ((bytesRead = inputStream.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
// For safety we also hsync each write as well, because of its docs:
// SYNC_BLOCK - to force closed blocks to the disk device
// "In addition Syncable.hsync() should be called after each write,
// if true synchronous behavior is required"
stream.hsync();
}
return null;
}
return null;
});
}

@Override
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable final String prefix) throws IOException {
FileStatus[] files = store.execute(new Operation<FileStatus[]>() {
@Override
public FileStatus[] run(FileContext fileContext) throws IOException {
return (fileContext.util().listStatus(path, new PathFilter() {
@Override
public boolean accept(Path path) {
return prefix == null || path.getName().startsWith(prefix);
}
}));
}
});
FileStatus[] files = store.execute(fileContext -> (fileContext.util().listStatus(path,
path -> prefix == null || path.getName().startsWith(prefix))));
Map<String, BlobMetaData> map = new LinkedHashMap<String, BlobMetaData>();
for (FileStatus file : files) {
map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
Expand All @@ -161,4 +139,51 @@ public boolean accept(Path path) {
public Map<String, BlobMetaData> listBlobs() throws IOException {
return listBlobsByPrefix(null);
}

/**
* Exists to wrap underlying InputStream methods that might make socket connections in
* doPrivileged blocks. This is due to the way that hdfs client libraries might open
* socket connections when you are reading from an InputStream.
*/
private static class HDFSPrivilegedInputSteam extends FilterInputStream {

HDFSPrivilegedInputSteam(InputStream in) {
super(in);
}

public int read() throws IOException {
return doPrivilegedOrThrow(in::read);
}

public int read(byte b[]) throws IOException {
return doPrivilegedOrThrow(() -> in.read(b));
}

public int read(byte b[], int off, int len) throws IOException {
return doPrivilegedOrThrow(() -> in.read(b, off, len));
}

public long skip(long n) throws IOException {
return doPrivilegedOrThrow(() -> in.skip(n));
}

public int available() throws IOException {
return doPrivilegedOrThrow(() -> in.available());
}

public synchronized void reset() throws IOException {
doPrivilegedOrThrow(() -> {
in.reset();
return null;
});
}

private static <T> T doPrivilegedOrThrow(PrivilegedExceptionAction<T> action) throws IOException {
try {
return AccessController.doPrivileged(action);
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.io.IOException;
import java.lang.reflect.ReflectPermission;
import java.net.SocketPermission;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
Expand All @@ -46,12 +47,7 @@ final class HdfsBlobStore implements BlobStore {
HdfsBlobStore(FileContext fileContext, String path, int bufferSize) throws IOException {
this.fileContext = fileContext;
this.bufferSize = bufferSize;
this.root = execute(new Operation<Path>() {
@Override
public Path run(FileContext fileContext) throws IOException {
return fileContext.makeQualified(new Path(path));
}
});
this.root = execute(fileContext1 -> fileContext1.makeQualified(new Path(path)));
try {
mkdirs(root);
} catch (FileAlreadyExistsException ok) {
Expand All @@ -60,23 +56,17 @@ public Path run(FileContext fileContext) throws IOException {
}

private void mkdirs(Path path) throws IOException {
execute(new Operation<Void>() {
@Override
public Void run(FileContext fileContext) throws IOException {
fileContext.mkdir(path, null, true);
return null;
}
execute((Operation<Void>) fileContext -> {
fileContext.mkdir(path, null, true);
return null;
});
}

@Override
public void delete(BlobPath path) throws IOException {
execute(new Operation<Void>() {
@Override
public Void run(FileContext fc) throws IOException {
fc.delete(translateToHdfsPath(path), true);
return null;
}
execute((Operation<Void>) fc -> {
fc.delete(translateToHdfsPath(path), true);
return null;
});
}

Expand Down Expand Up @@ -128,7 +118,7 @@ <V> V execute(Operation<V> operation) throws IOException {
try {
return AccessController.doPrivileged((PrivilegedExceptionAction<V>)
() -> operation.run(fileContext), null, new ReflectPermission("suppressAccessChecks"),
new AuthPermission("modifyPrivateCredentials"));
new AuthPermission("modifyPrivateCredentials"), new SocketPermission("*", "connect"));
} catch (PrivilegedActionException pae) {
throw (IOException) pae.getException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,12 @@ private static FileContext createContext(URI uri, Settings repositorySettings)
cfg.setBoolean("fs.hdfs.impl.disable.cache", true);

// create the filecontext with our user
return Subject.doAs(subject, new PrivilegedAction<FileContext>() {
@Override
public FileContext run() {
try {
AbstractFileSystem fs = AbstractFileSystem.get(uri, cfg);
return FileContext.getFileContext(fs, cfg);
} catch (UnsupportedFileSystemException e) {
throw new RuntimeException(e);
}
return Subject.doAs(subject, (PrivilegedAction<FileContext>) () -> {
try {
AbstractFileSystem fs = AbstractFileSystem.get(uri, cfg);
return FileContext.getFileContext(fs, cfg);
} catch (UnsupportedFileSystemException e) {
throw new RuntimeException(e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,25 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;

public class HdfsBlobStoreContainerTests extends ESBlobStoreContainerTestCase {

@Override
protected BlobStore newBlobStore() throws IOException {
return AccessController.doPrivileged(
new PrivilegedAction<HdfsBlobStore>() {
@Override
public HdfsBlobStore run() {
try {
FileContext fileContext = createContext(new URI("hdfs:///"));
return new HdfsBlobStore(fileContext, "temp", 1024);
} catch (IOException | URISyntaxException e) {
throw new RuntimeException(e);
}
}
});
FileContext fileContext;
try {
fileContext = AccessController.doPrivileged((PrivilegedExceptionAction<FileContext>)
() -> createContext(new URI("hdfs:///")));
} catch (PrivilegedActionException e) {
throw new RuntimeException(e.getCause());
}
return new HdfsBlobStore(fileContext, "temp", 1024);
}

@SuppressForbidden(reason = "lesser of two evils (the other being a bunch of JNI/classloader nightmares)")
Expand Down Expand Up @@ -90,15 +87,12 @@ private FileContext createContext(URI uri) {
cfg.set("fs.AbstractFileSystem." + uri.getScheme() + ".impl", TestingFs.class.getName());

// create the FileContext with our user
return Subject.doAs(subject, new PrivilegedAction<FileContext>() {
@Override
public FileContext run() {
try {
TestingFs fs = (TestingFs) AbstractFileSystem.get(uri, cfg);
return FileContext.getFileContext(fs, cfg);
} catch (UnsupportedFileSystemException e) {
throw new RuntimeException(e);
}
return Subject.doAs(subject, (PrivilegedAction<FileContext>) () -> {
try {
TestingFs fs = (TestingFs) AbstractFileSystem.get(uri, cfg);
return FileContext.getFileContext(fs, cfg);
} catch (UnsupportedFileSystemException e) {
throw new RuntimeException(e);
}
});
}
Expand Down

0 comments on commit eb4562d

Please sign in to comment.