From eb4562d7a59838615060213344215aab61d61f16 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 27 Jan 2017 15:01:44 -0600 Subject: [PATCH] Add doPrivilege blocks for socket connect ops in repository-hdfs (#22793) This is related to #22116. The repository-hdfs plugin opens socket connections. As SocketPermission is transitioned out of core, hdfs will require connect permission. This pull request wraps operations that require this permission in doPrivileged blocks. --- .../repositories/hdfs/HdfsBlobContainer.java | 141 +++++++++++------- .../repositories/hdfs/HdfsBlobStore.java | 28 ++-- .../repositories/hdfs/HdfsRepository.java | 15 +- .../hdfs/HdfsBlobStoreContainerTests.java | 38 ++--- 4 files changed, 114 insertions(+), 108 deletions(-) diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index e485cf56b4c3..e64ab5c4d910 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -19,12 +19,12 @@ package org.elasticsearch.repositories.hdfs; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; @@ -32,10 +32,15 @@ import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation; +import java.io.BufferedInputStream; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; +import java.security.AccessController; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; import java.util.Collections; import java.util.EnumSet; import java.util.LinkedHashMap; @@ -56,12 +61,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer { @Override public boolean blobExists(String blobName) { try { - return store.execute(new Operation() { - @Override - public Boolean run(FileContext fileContext) throws IOException { - return fileContext.util().exists(new Path(path, blobName)); - } - }); + return store.execute(fileContext -> fileContext.util().exists(new Path(path, blobName))); } catch (Exception e) { return false; } @@ -73,22 +73,14 @@ public void deleteBlob(String blobName) throws IOException { throw new NoSuchFileException("Blob [" + blobName + "] does not exist"); } - store.execute(new Operation() { - @Override - public Boolean run(FileContext fileContext) throws IOException { - return fileContext.delete(new Path(path, blobName), true); - } - }); + store.execute(fileContext -> fileContext.delete(new Path(path, blobName), true)); } @Override public void move(String sourceBlobName, String targetBlobName) throws IOException { - store.execute(new Operation() { - @Override - public Void run(FileContext fileContext) throws IOException { - fileContext.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName)); - return null; - } + store.execute((Operation) fileContext -> { + fileContext.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName)); + return null; }); } @@ -98,12 +90,10 @@ public InputStream readBlob(String blobName) throws IOException { throw new NoSuchFileException("Blob [" + blobName + "] does not exist"); } // FSDataInputStream does buffering internally - return store.execute(new Operation() { - @Override - public InputStream run(FileContext fileContext) throws IOException { - return fileContext.open(new Path(path, blobName), bufferSize); - } - }); + // FSDataInputStream can open connections on read() or skip() so we wrap in + // HDFSPrivilegedInputSteam which will ensure that underlying methods will + // be called with the proper privileges. + return store.execute(fileContext -> new HDFSPrivilegedInputSteam(fileContext.open(new Path(path, blobName), bufferSize))); } @Override @@ -111,45 +101,33 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t if (blobExists(blobName)) { throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite"); } - store.execute(new Operation() { - @Override - public Void run(FileContext fileContext) throws IOException { - Path blob = new Path(path, blobName); - // we pass CREATE, which means it fails if a blob already exists. - // NOTE: this behavior differs from FSBlobContainer, which passes TRUNCATE_EXISTING - // that should be fixed there, no need to bring truncation into this, give the user an error. - EnumSet flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK); - CreateOpts[] opts = { CreateOpts.bufferSize(bufferSize) }; - try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) { - int bytesRead; - byte[] buffer = new byte[bufferSize]; - while ((bytesRead = inputStream.read(buffer)) != -1) { - stream.write(buffer, 0, bytesRead); - // For safety we also hsync each write as well, because of its docs: - // SYNC_BLOCK - to force closed blocks to the disk device - // "In addition Syncable.hsync() should be called after each write, - // if true synchronous behavior is required" - stream.hsync(); - } + store.execute((Operation) fileContext -> { + Path blob = new Path(path, blobName); + // we pass CREATE, which means it fails if a blob already exists. + // NOTE: this behavior differs from FSBlobContainer, which passes TRUNCATE_EXISTING + // that should be fixed there, no need to bring truncation into this, give the user an error. + EnumSet flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK); + CreateOpts[] opts = {CreateOpts.bufferSize(bufferSize)}; + try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) { + int bytesRead; + byte[] buffer = new byte[bufferSize]; + while ((bytesRead = inputStream.read(buffer)) != -1) { + stream.write(buffer, 0, bytesRead); + // For safety we also hsync each write as well, because of its docs: + // SYNC_BLOCK - to force closed blocks to the disk device + // "In addition Syncable.hsync() should be called after each write, + // if true synchronous behavior is required" + stream.hsync(); } - return null; } + return null; }); } @Override public Map listBlobsByPrefix(@Nullable final String prefix) throws IOException { - FileStatus[] files = store.execute(new Operation() { - @Override - public FileStatus[] run(FileContext fileContext) throws IOException { - return (fileContext.util().listStatus(path, new PathFilter() { - @Override - public boolean accept(Path path) { - return prefix == null || path.getName().startsWith(prefix); - } - })); - } - }); + FileStatus[] files = store.execute(fileContext -> (fileContext.util().listStatus(path, + path -> prefix == null || path.getName().startsWith(prefix)))); Map map = new LinkedHashMap(); for (FileStatus file : files) { map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen())); @@ -161,4 +139,51 @@ public boolean accept(Path path) { public Map listBlobs() throws IOException { return listBlobsByPrefix(null); } + + /** + * Exists to wrap underlying InputStream methods that might make socket connections in + * doPrivileged blocks. This is due to the way that hdfs client libraries might open + * socket connections when you are reading from an InputStream. + */ + private static class HDFSPrivilegedInputSteam extends FilterInputStream { + + HDFSPrivilegedInputSteam(InputStream in) { + super(in); + } + + public int read() throws IOException { + return doPrivilegedOrThrow(in::read); + } + + public int read(byte b[]) throws IOException { + return doPrivilegedOrThrow(() -> in.read(b)); + } + + public int read(byte b[], int off, int len) throws IOException { + return doPrivilegedOrThrow(() -> in.read(b, off, len)); + } + + public long skip(long n) throws IOException { + return doPrivilegedOrThrow(() -> in.skip(n)); + } + + public int available() throws IOException { + return doPrivilegedOrThrow(() -> in.available()); + } + + public synchronized void reset() throws IOException { + doPrivilegedOrThrow(() -> { + in.reset(); + return null; + }); + } + + private static T doPrivilegedOrThrow(PrivilegedExceptionAction action) throws IOException { + try { + return AccessController.doPrivileged(action); + } catch (PrivilegedActionException e) { + throw (IOException) e.getCause(); + } + } + } } diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java index fdd99a79a3ef..7ce5e8d3cd83 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.lang.reflect.ReflectPermission; +import java.net.SocketPermission; import java.security.AccessController; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; @@ -46,12 +47,7 @@ final class HdfsBlobStore implements BlobStore { HdfsBlobStore(FileContext fileContext, String path, int bufferSize) throws IOException { this.fileContext = fileContext; this.bufferSize = bufferSize; - this.root = execute(new Operation() { - @Override - public Path run(FileContext fileContext) throws IOException { - return fileContext.makeQualified(new Path(path)); - } - }); + this.root = execute(fileContext1 -> fileContext1.makeQualified(new Path(path))); try { mkdirs(root); } catch (FileAlreadyExistsException ok) { @@ -60,23 +56,17 @@ public Path run(FileContext fileContext) throws IOException { } private void mkdirs(Path path) throws IOException { - execute(new Operation() { - @Override - public Void run(FileContext fileContext) throws IOException { - fileContext.mkdir(path, null, true); - return null; - } + execute((Operation) fileContext -> { + fileContext.mkdir(path, null, true); + return null; }); } @Override public void delete(BlobPath path) throws IOException { - execute(new Operation() { - @Override - public Void run(FileContext fc) throws IOException { - fc.delete(translateToHdfsPath(path), true); - return null; - } + execute((Operation) fc -> { + fc.delete(translateToHdfsPath(path), true); + return null; }); } @@ -128,7 +118,7 @@ V execute(Operation operation) throws IOException { try { return AccessController.doPrivileged((PrivilegedExceptionAction) () -> operation.run(fileContext), null, new ReflectPermission("suppressAccessChecks"), - new AuthPermission("modifyPrivateCredentials")); + new AuthPermission("modifyPrivateCredentials"), new SocketPermission("*", "connect")); } catch (PrivilegedActionException pae) { throw (IOException) pae.getException(); } diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java index 6d74d20fc83d..d784e8bf0931 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java @@ -134,15 +134,12 @@ private static FileContext createContext(URI uri, Settings repositorySettings) cfg.setBoolean("fs.hdfs.impl.disable.cache", true); // create the filecontext with our user - return Subject.doAs(subject, new PrivilegedAction() { - @Override - public FileContext run() { - try { - AbstractFileSystem fs = AbstractFileSystem.get(uri, cfg); - return FileContext.getFileContext(fs, cfg); - } catch (UnsupportedFileSystemException e) { - throw new RuntimeException(e); - } + return Subject.doAs(subject, (PrivilegedAction) () -> { + try { + AbstractFileSystem fs = AbstractFileSystem.get(uri, cfg); + return FileContext.getFileContext(fs, cfg); + } catch (UnsupportedFileSystemException e) { + throw new RuntimeException(e); } }); } diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java index cdc6dd968039..2bfe6843daff 100644 --- a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java @@ -32,28 +32,25 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.net.URI; -import java.net.URISyntaxException; import java.security.AccessController; import java.security.Principal; import java.security.PrivilegedAction; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; import java.util.Collections; public class HdfsBlobStoreContainerTests extends ESBlobStoreContainerTestCase { @Override protected BlobStore newBlobStore() throws IOException { - return AccessController.doPrivileged( - new PrivilegedAction() { - @Override - public HdfsBlobStore run() { - try { - FileContext fileContext = createContext(new URI("hdfs:///")); - return new HdfsBlobStore(fileContext, "temp", 1024); - } catch (IOException | URISyntaxException e) { - throw new RuntimeException(e); - } - } - }); + FileContext fileContext; + try { + fileContext = AccessController.doPrivileged((PrivilegedExceptionAction) + () -> createContext(new URI("hdfs:///"))); + } catch (PrivilegedActionException e) { + throw new RuntimeException(e.getCause()); + } + return new HdfsBlobStore(fileContext, "temp", 1024); } @SuppressForbidden(reason = "lesser of two evils (the other being a bunch of JNI/classloader nightmares)") @@ -90,15 +87,12 @@ private FileContext createContext(URI uri) { cfg.set("fs.AbstractFileSystem." + uri.getScheme() + ".impl", TestingFs.class.getName()); // create the FileContext with our user - return Subject.doAs(subject, new PrivilegedAction() { - @Override - public FileContext run() { - try { - TestingFs fs = (TestingFs) AbstractFileSystem.get(uri, cfg); - return FileContext.getFileContext(fs, cfg); - } catch (UnsupportedFileSystemException e) { - throw new RuntimeException(e); - } + return Subject.doAs(subject, (PrivilegedAction) () -> { + try { + TestingFs fs = (TestingFs) AbstractFileSystem.get(uri, cfg); + return FileContext.getFileContext(fs, cfg); + } catch (UnsupportedFileSystemException e) { + throw new RuntimeException(e); } }); }