Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add doPrivilege blocks for socket connect ops in repository-hdfs #22793

Merged
merged 8 commits into from
Jan 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,28 @@
package org.elasticsearch.repositories.hdfs;

import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation;

import java.io.BufferedInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
Expand All @@ -56,12 +61,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
@Override
public boolean blobExists(String blobName) {
try {
return store.execute(new Operation<Boolean>() {
@Override
public Boolean run(FileContext fileContext) throws IOException {
return fileContext.util().exists(new Path(path, blobName));
}
});
return store.execute(fileContext -> fileContext.util().exists(new Path(path, blobName)));
} catch (Exception e) {
return false;
}
Expand All @@ -73,22 +73,14 @@ public void deleteBlob(String blobName) throws IOException {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}

store.execute(new Operation<Boolean>() {
@Override
public Boolean run(FileContext fileContext) throws IOException {
return fileContext.delete(new Path(path, blobName), true);
}
});
store.execute(fileContext -> fileContext.delete(new Path(path, blobName), true));
}

@Override
public void move(String sourceBlobName, String targetBlobName) throws IOException {
store.execute(new Operation<Void>() {
@Override
public Void run(FileContext fileContext) throws IOException {
fileContext.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName));
return null;
}
store.execute((Operation<Void>) fileContext -> {
fileContext.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName));
return null;
});
}

Expand All @@ -98,58 +90,44 @@ public InputStream readBlob(String blobName) throws IOException {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}
// FSDataInputStream does buffering internally
return store.execute(new Operation<InputStream>() {
@Override
public InputStream run(FileContext fileContext) throws IOException {
return fileContext.open(new Path(path, blobName), bufferSize);
}
});
// FSDataInputStream can open connections on read() or skip() so we wrap in
// HDFSPrivilegedInputSteam which will ensure that underlying methods will
// be called with the proper privileges.
return store.execute(fileContext -> new HDFSPrivilegedInputSteam(fileContext.open(new Path(path, blobName), bufferSize)));
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
if (blobExists(blobName)) {
throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite");
}
store.execute(new Operation<Void>() {
@Override
public Void run(FileContext fileContext) throws IOException {
Path blob = new Path(path, blobName);
// we pass CREATE, which means it fails if a blob already exists.
// NOTE: this behavior differs from FSBlobContainer, which passes TRUNCATE_EXISTING
// that should be fixed there, no need to bring truncation into this, give the user an error.
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK);
CreateOpts[] opts = { CreateOpts.bufferSize(bufferSize) };
try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) {
int bytesRead;
byte[] buffer = new byte[bufferSize];
while ((bytesRead = inputStream.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
// For safety we also hsync each write as well, because of its docs:
// SYNC_BLOCK - to force closed blocks to the disk device
// "In addition Syncable.hsync() should be called after each write,
// if true synchronous behavior is required"
stream.hsync();
}
store.execute((Operation<Void>) fileContext -> {
Path blob = new Path(path, blobName);
// we pass CREATE, which means it fails if a blob already exists.
// NOTE: this behavior differs from FSBlobContainer, which passes TRUNCATE_EXISTING
// that should be fixed there, no need to bring truncation into this, give the user an error.
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK);
CreateOpts[] opts = {CreateOpts.bufferSize(bufferSize)};
try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) {
int bytesRead;
byte[] buffer = new byte[bufferSize];
while ((bytesRead = inputStream.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
// For safety we also hsync each write as well, because of its docs:
// SYNC_BLOCK - to force closed blocks to the disk device
// "In addition Syncable.hsync() should be called after each write,
// if true synchronous behavior is required"
stream.hsync();
}
return null;
}
return null;
});
}

@Override
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable final String prefix) throws IOException {
FileStatus[] files = store.execute(new Operation<FileStatus[]>() {
@Override
public FileStatus[] run(FileContext fileContext) throws IOException {
return (fileContext.util().listStatus(path, new PathFilter() {
@Override
public boolean accept(Path path) {
return prefix == null || path.getName().startsWith(prefix);
}
}));
}
});
FileStatus[] files = store.execute(fileContext -> (fileContext.util().listStatus(path,
path -> prefix == null || path.getName().startsWith(prefix))));
Map<String, BlobMetaData> map = new LinkedHashMap<String, BlobMetaData>();
for (FileStatus file : files) {
map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
Expand All @@ -161,4 +139,51 @@ public boolean accept(Path path) {
public Map<String, BlobMetaData> listBlobs() throws IOException {
return listBlobsByPrefix(null);
}

/**
* Exists to wrap underlying InputStream methods that might make socket connections in
* doPrivileged blocks. This is due to the way that hdfs client libraries might open
* socket connections when you are reading from an InputStream.
*/
private static class HDFSPrivilegedInputSteam extends FilterInputStream {

HDFSPrivilegedInputSteam(InputStream in) {
super(in);
}

public int read() throws IOException {
return doPrivilegedOrThrow(in::read);
}

public int read(byte b[]) throws IOException {
return doPrivilegedOrThrow(() -> in.read(b));
}

public int read(byte b[], int off, int len) throws IOException {
return doPrivilegedOrThrow(() -> in.read(b, off, len));
}

public long skip(long n) throws IOException {
return doPrivilegedOrThrow(() -> in.skip(n));
}

public int available() throws IOException {
return doPrivilegedOrThrow(() -> in.available());
}

public synchronized void reset() throws IOException {
doPrivilegedOrThrow(() -> {
in.reset();
return null;
});
}

private static <T> T doPrivilegedOrThrow(PrivilegedExceptionAction<T> action) throws IOException {
try {
return AccessController.doPrivileged(action);
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.io.IOException;
import java.lang.reflect.ReflectPermission;
import java.net.SocketPermission;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
Expand All @@ -46,12 +47,7 @@ final class HdfsBlobStore implements BlobStore {
HdfsBlobStore(FileContext fileContext, String path, int bufferSize) throws IOException {
this.fileContext = fileContext;
this.bufferSize = bufferSize;
this.root = execute(new Operation<Path>() {
@Override
public Path run(FileContext fileContext) throws IOException {
return fileContext.makeQualified(new Path(path));
}
});
this.root = execute(fileContext1 -> fileContext1.makeQualified(new Path(path)));
try {
mkdirs(root);
} catch (FileAlreadyExistsException ok) {
Expand All @@ -60,23 +56,17 @@ public Path run(FileContext fileContext) throws IOException {
}

private void mkdirs(Path path) throws IOException {
execute(new Operation<Void>() {
@Override
public Void run(FileContext fileContext) throws IOException {
fileContext.mkdir(path, null, true);
return null;
}
execute((Operation<Void>) fileContext -> {
fileContext.mkdir(path, null, true);
return null;
});
}

@Override
public void delete(BlobPath path) throws IOException {
execute(new Operation<Void>() {
@Override
public Void run(FileContext fc) throws IOException {
fc.delete(translateToHdfsPath(path), true);
return null;
}
execute((Operation<Void>) fc -> {
fc.delete(translateToHdfsPath(path), true);
return null;
});
}

Expand Down Expand Up @@ -128,7 +118,7 @@ <V> V execute(Operation<V> operation) throws IOException {
try {
return AccessController.doPrivileged((PrivilegedExceptionAction<V>)
() -> operation.run(fileContext), null, new ReflectPermission("suppressAccessChecks"),
new AuthPermission("modifyPrivateCredentials"));
new AuthPermission("modifyPrivateCredentials"), new SocketPermission("*", "connect"));
} catch (PrivilegedActionException pae) {
throw (IOException) pae.getException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,12 @@ private static FileContext createContext(URI uri, Settings repositorySettings)
cfg.setBoolean("fs.hdfs.impl.disable.cache", true);

// create the filecontext with our user
return Subject.doAs(subject, new PrivilegedAction<FileContext>() {
@Override
public FileContext run() {
try {
AbstractFileSystem fs = AbstractFileSystem.get(uri, cfg);
return FileContext.getFileContext(fs, cfg);
} catch (UnsupportedFileSystemException e) {
throw new RuntimeException(e);
}
return Subject.doAs(subject, (PrivilegedAction<FileContext>) () -> {
try {
AbstractFileSystem fs = AbstractFileSystem.get(uri, cfg);
return FileContext.getFileContext(fs, cfg);
} catch (UnsupportedFileSystemException e) {
throw new RuntimeException(e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,25 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;

public class HdfsBlobStoreContainerTests extends ESBlobStoreContainerTestCase {

@Override
protected BlobStore newBlobStore() throws IOException {
return AccessController.doPrivileged(
new PrivilegedAction<HdfsBlobStore>() {
@Override
public HdfsBlobStore run() {
try {
FileContext fileContext = createContext(new URI("hdfs:///"));
return new HdfsBlobStore(fileContext, "temp", 1024);
} catch (IOException | URISyntaxException e) {
throw new RuntimeException(e);
}
}
});
FileContext fileContext;
try {
fileContext = AccessController.doPrivileged((PrivilegedExceptionAction<FileContext>)
() -> createContext(new URI("hdfs:///")));
} catch (PrivilegedActionException e) {
throw new RuntimeException(e.getCause());
}
return new HdfsBlobStore(fileContext, "temp", 1024);
}

@SuppressForbidden(reason = "lesser of two evils (the other being a bunch of JNI/classloader nightmares)")
Expand Down Expand Up @@ -90,15 +87,12 @@ private FileContext createContext(URI uri) {
cfg.set("fs.AbstractFileSystem." + uri.getScheme() + ".impl", TestingFs.class.getName());

// create the FileContext with our user
return Subject.doAs(subject, new PrivilegedAction<FileContext>() {
@Override
public FileContext run() {
try {
TestingFs fs = (TestingFs) AbstractFileSystem.get(uri, cfg);
return FileContext.getFileContext(fs, cfg);
} catch (UnsupportedFileSystemException e) {
throw new RuntimeException(e);
}
return Subject.doAs(subject, (PrivilegedAction<FileContext>) () -> {
try {
TestingFs fs = (TestingFs) AbstractFileSystem.get(uri, cfg);
return FileContext.getFileContext(fs, cfg);
} catch (UnsupportedFileSystemException e) {
throw new RuntimeException(e);
}
});
}
Expand Down