From 820af4f1a3908e4fde3e690d83b8fc698a38ac1d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 25 Jan 2017 15:46:58 -0600 Subject: [PATCH 1/7] Add doPrivileged blocks to hdfs plugin --- .../repositories/hdfs/HdfsBlobContainer.java | 99 +++++++++---------- .../repositories/hdfs/HdfsBlobStore.java | 28 ++---- .../repositories/hdfs/HdfsRepository.java | 15 ++- .../hdfs/HdfsBlobStoreContainerTests.java | 38 +++---- 4 files changed, 75 insertions(+), 105 deletions(-) diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index e485cf56b4c35..06821dc5dfd17 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -19,12 +19,14 @@ package org.elasticsearch.repositories.hdfs; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.elasticsearch.SpecialPermission; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; @@ -36,6 +38,9 @@ import java.io.InputStream; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; +import java.security.AccessController; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; import java.util.Collections; import java.util.EnumSet; import java.util.LinkedHashMap; @@ -56,12 +61,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer { @Override public boolean blobExists(String blobName) { try { - return store.execute(new Operation() { - @Override - public Boolean run(FileContext fileContext) throws IOException { - return fileContext.util().exists(new Path(path, blobName)); - } - }); + return store.execute(fileContext -> fileContext.util().exists(new Path(path, blobName))); } catch (Exception e) { return false; } @@ -73,22 +73,14 @@ public void deleteBlob(String blobName) throws IOException { throw new NoSuchFileException("Blob [" + blobName + "] does not exist"); } - store.execute(new Operation() { - @Override - public Boolean run(FileContext fileContext) throws IOException { - return fileContext.delete(new Path(path, blobName), true); - } - }); + store.execute(fileContext -> fileContext.delete(new Path(path, blobName), true)); } @Override public void move(String sourceBlobName, String targetBlobName) throws IOException { - store.execute(new Operation() { - @Override - public Void run(FileContext fileContext) throws IOException { - fileContext.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName)); - return null; - } + store.execute((Operation) fileContext -> { + fileContext.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName)); + return null; }); } @@ -98,11 +90,20 @@ public InputStream readBlob(String blobName) throws IOException { throw new NoSuchFileException("Blob [" + blobName + "] does not exist"); } // FSDataInputStream does buffering internally - return store.execute(new Operation() { - @Override - public InputStream run(FileContext fileContext) throws IOException { - return fileContext.open(new Path(path, blobName), bufferSize); - } + return store.execute((Operation) fileContext -> { + FSDataInputStream is = fileContext.open(new Path(path, blobName), bufferSize); + return new InputStream() { + @Override + public int read() throws IOException { + try { + SpecialPermission.check(); + // FSDataInputStream can open connection on read() + return AccessController.doPrivileged((PrivilegedExceptionAction) is::read); + } catch (PrivilegedActionException e) { + throw (IOException) e.getCause(); + } + } + }; }); } @@ -111,45 +112,33 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t if (blobExists(blobName)) { throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite"); } - store.execute(new Operation() { - @Override - public Void run(FileContext fileContext) throws IOException { - Path blob = new Path(path, blobName); - // we pass CREATE, which means it fails if a blob already exists. - // NOTE: this behavior differs from FSBlobContainer, which passes TRUNCATE_EXISTING - // that should be fixed there, no need to bring truncation into this, give the user an error. - EnumSet flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK); - CreateOpts[] opts = { CreateOpts.bufferSize(bufferSize) }; - try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) { - int bytesRead; - byte[] buffer = new byte[bufferSize]; - while ((bytesRead = inputStream.read(buffer)) != -1) { - stream.write(buffer, 0, bytesRead); - // For safety we also hsync each write as well, because of its docs: - // SYNC_BLOCK - to force closed blocks to the disk device - // "In addition Syncable.hsync() should be called after each write, - // if true synchronous behavior is required" - stream.hsync(); - } + store.execute((Operation) fileContext -> { + Path blob = new Path(path, blobName); + // we pass CREATE, which means it fails if a blob already exists. + // NOTE: this behavior differs from FSBlobContainer, which passes TRUNCATE_EXISTING + // that should be fixed there, no need to bring truncation into this, give the user an error. + EnumSet flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK); + CreateOpts[] opts = { CreateOpts.bufferSize(bufferSize) }; + try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) { + int bytesRead; + byte[] buffer = new byte[bufferSize]; + while ((bytesRead = inputStream.read(buffer)) != -1) { + stream.write(buffer, 0, bytesRead); + // For safety we also hsync each write as well, because of its docs: + // SYNC_BLOCK - to force closed blocks to the disk device + // "In addition Syncable.hsync() should be called after each write, + // if true synchronous behavior is required" + stream.hsync(); } - return null; } + return null; }); } @Override public Map listBlobsByPrefix(@Nullable final String prefix) throws IOException { - FileStatus[] files = store.execute(new Operation() { - @Override - public FileStatus[] run(FileContext fileContext) throws IOException { - return (fileContext.util().listStatus(path, new PathFilter() { - @Override - public boolean accept(Path path) { - return prefix == null || path.getName().startsWith(prefix); - } - })); - } - }); + FileStatus[] files = store.execute(fileContext -> (fileContext.util().listStatus(path, + path -> prefix == null || path.getName().startsWith(prefix)))); Map map = new LinkedHashMap(); for (FileStatus file : files) { map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen())); diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java index fdd99a79a3efe..7ce5e8d3cd830 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.lang.reflect.ReflectPermission; +import java.net.SocketPermission; import java.security.AccessController; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; @@ -46,12 +47,7 @@ final class HdfsBlobStore implements BlobStore { HdfsBlobStore(FileContext fileContext, String path, int bufferSize) throws IOException { this.fileContext = fileContext; this.bufferSize = bufferSize; - this.root = execute(new Operation() { - @Override - public Path run(FileContext fileContext) throws IOException { - return fileContext.makeQualified(new Path(path)); - } - }); + this.root = execute(fileContext1 -> fileContext1.makeQualified(new Path(path))); try { mkdirs(root); } catch (FileAlreadyExistsException ok) { @@ -60,23 +56,17 @@ public Path run(FileContext fileContext) throws IOException { } private void mkdirs(Path path) throws IOException { - execute(new Operation() { - @Override - public Void run(FileContext fileContext) throws IOException { - fileContext.mkdir(path, null, true); - return null; - } + execute((Operation) fileContext -> { + fileContext.mkdir(path, null, true); + return null; }); } @Override public void delete(BlobPath path) throws IOException { - execute(new Operation() { - @Override - public Void run(FileContext fc) throws IOException { - fc.delete(translateToHdfsPath(path), true); - return null; - } + execute((Operation) fc -> { + fc.delete(translateToHdfsPath(path), true); + return null; }); } @@ -128,7 +118,7 @@ V execute(Operation operation) throws IOException { try { return AccessController.doPrivileged((PrivilegedExceptionAction) () -> operation.run(fileContext), null, new ReflectPermission("suppressAccessChecks"), - new AuthPermission("modifyPrivateCredentials")); + new AuthPermission("modifyPrivateCredentials"), new SocketPermission("*", "connect")); } catch (PrivilegedActionException pae) { throw (IOException) pae.getException(); } diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java index 6d74d20fc83dc..d784e8bf09314 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java @@ -134,15 +134,12 @@ private static FileContext createContext(URI uri, Settings repositorySettings) cfg.setBoolean("fs.hdfs.impl.disable.cache", true); // create the filecontext with our user - return Subject.doAs(subject, new PrivilegedAction() { - @Override - public FileContext run() { - try { - AbstractFileSystem fs = AbstractFileSystem.get(uri, cfg); - return FileContext.getFileContext(fs, cfg); - } catch (UnsupportedFileSystemException e) { - throw new RuntimeException(e); - } + return Subject.doAs(subject, (PrivilegedAction) () -> { + try { + AbstractFileSystem fs = AbstractFileSystem.get(uri, cfg); + return FileContext.getFileContext(fs, cfg); + } catch (UnsupportedFileSystemException e) { + throw new RuntimeException(e); } }); } diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java index cdc6dd968039d..2bfe6843dafff 100644 --- a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java @@ -32,28 +32,25 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.net.URI; -import java.net.URISyntaxException; import java.security.AccessController; import java.security.Principal; import java.security.PrivilegedAction; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; import java.util.Collections; public class HdfsBlobStoreContainerTests extends ESBlobStoreContainerTestCase { @Override protected BlobStore newBlobStore() throws IOException { - return AccessController.doPrivileged( - new PrivilegedAction() { - @Override - public HdfsBlobStore run() { - try { - FileContext fileContext = createContext(new URI("hdfs:///")); - return new HdfsBlobStore(fileContext, "temp", 1024); - } catch (IOException | URISyntaxException e) { - throw new RuntimeException(e); - } - } - }); + FileContext fileContext; + try { + fileContext = AccessController.doPrivileged((PrivilegedExceptionAction) + () -> createContext(new URI("hdfs:///"))); + } catch (PrivilegedActionException e) { + throw new RuntimeException(e.getCause()); + } + return new HdfsBlobStore(fileContext, "temp", 1024); } @SuppressForbidden(reason = "lesser of two evils (the other being a bunch of JNI/classloader nightmares)") @@ -90,15 +87,12 @@ private FileContext createContext(URI uri) { cfg.set("fs.AbstractFileSystem." + uri.getScheme() + ".impl", TestingFs.class.getName()); // create the FileContext with our user - return Subject.doAs(subject, new PrivilegedAction() { - @Override - public FileContext run() { - try { - TestingFs fs = (TestingFs) AbstractFileSystem.get(uri, cfg); - return FileContext.getFileContext(fs, cfg); - } catch (UnsupportedFileSystemException e) { - throw new RuntimeException(e); - } + return Subject.doAs(subject, (PrivilegedAction) () -> { + try { + TestingFs fs = (TestingFs) AbstractFileSystem.get(uri, cfg); + return FileContext.getFileContext(fs, cfg); + } catch (UnsupportedFileSystemException e) { + throw new RuntimeException(e); } }); } From 37dac922dd216b8400a9f1d796c0c7805e4ca14d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 27 Jan 2017 09:11:02 -0600 Subject: [PATCH 2/7] Ensure stream is opened in access block --- .../repositories/hdfs/HdfsBlobContainer.java | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index 06821dc5dfd17..66ea4f61cb3d0 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation; +import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.file.FileAlreadyExistsException; @@ -90,20 +91,20 @@ public InputStream readBlob(String blobName) throws IOException { throw new NoSuchFileException("Blob [" + blobName + "] does not exist"); } // FSDataInputStream does buffering internally - return store.execute((Operation) fileContext -> { + return store.execute(fileContext -> { FSDataInputStream is = fileContext.open(new Path(path, blobName), bufferSize); - return new InputStream() { - @Override - public int read() throws IOException { - try { - SpecialPermission.check(); - // FSDataInputStream can open connection on read() - return AccessController.doPrivileged((PrivilegedExceptionAction) is::read); - } catch (PrivilegedActionException e) { - throw (IOException) e.getCause(); - } - } - }; + InputStream stream = is.markSupported() ? is : new BufferedInputStream(is); + stream.mark(1); + + try { + SpecialPermission.check(); + // FSDataInputStream can open connection on read() + AccessController.doPrivileged((PrivilegedExceptionAction) () -> stream.skip(1)); + } catch (PrivilegedActionException e) { + throw (IOException) e.getCause(); + } + stream.reset(); + return stream; }); } @@ -118,7 +119,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t // NOTE: this behavior differs from FSBlobContainer, which passes TRUNCATE_EXISTING // that should be fixed there, no need to bring truncation into this, give the user an error. EnumSet flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK); - CreateOpts[] opts = { CreateOpts.bufferSize(bufferSize) }; + CreateOpts[] opts = {CreateOpts.bufferSize(bufferSize)}; try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) { int bytesRead; byte[] buffer = new byte[bufferSize]; From 07a72280614afd0e0bb707a7c5a69393a30facdc Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 27 Jan 2017 09:23:04 -0600 Subject: [PATCH 3/7] Remove unneeded second doPrivileged block --- .../repositories/hdfs/HdfsBlobContainer.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index 66ea4f61cb3d0..2ce1611420bbb 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -91,18 +91,12 @@ public InputStream readBlob(String blobName) throws IOException { throw new NoSuchFileException("Blob [" + blobName + "] does not exist"); } // FSDataInputStream does buffering internally + // FSDataInputStream can open connection on read() or skip() return store.execute(fileContext -> { FSDataInputStream is = fileContext.open(new Path(path, blobName), bufferSize); InputStream stream = is.markSupported() ? is : new BufferedInputStream(is); stream.mark(1); - - try { - SpecialPermission.check(); - // FSDataInputStream can open connection on read() - AccessController.doPrivileged((PrivilegedExceptionAction) () -> stream.skip(1)); - } catch (PrivilegedActionException e) { - throw (IOException) e.getCause(); - } + stream.skip(1); stream.reset(); return stream; }); From f45de3987562ae865783fb78db9febb8eeb6601b Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 27 Jan 2017 09:52:42 -0600 Subject: [PATCH 4/7] Close input stream --- .../elasticsearch/bootstrap/security.policy | 2 +- .../repositories/hdfs/HdfsBlobContainer.java | 26 +++++++++++++++---- .../plugin-metadata/plugin-security.policy | 2 ++ 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/core/src/main/resources/org/elasticsearch/bootstrap/security.policy b/core/src/main/resources/org/elasticsearch/bootstrap/security.policy index 2b88253532aef..2aaba57fdc4b4 100644 --- a/core/src/main/resources/org/elasticsearch/bootstrap/security.policy +++ b/core/src/main/resources/org/elasticsearch/bootstrap/security.policy @@ -56,7 +56,7 @@ grant { permission org.elasticsearch.SpecialPermission; // Allow connecting to the internet anywhere - permission java.net.SocketPermission "*", "connect,resolve"; + permission java.net.SocketPermission "*", "resolve"; // Allow read access to all system properties permission java.util.PropertyPermission "*", "read"; diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index 2ce1611420bbb..648bc6f15b8dc 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.SpecialPermission; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobMetaData; @@ -92,13 +93,28 @@ public InputStream readBlob(String blobName) throws IOException { } // FSDataInputStream does buffering internally // FSDataInputStream can open connection on read() or skip() + // + // In order to deal with socket permissions we ensure that we read (and then reset) + // some of the input stream. So that the caller of this method receives a stream that + // has already been opened. return store.execute(fileContext -> { + boolean success = false; FSDataInputStream is = fileContext.open(new Path(path, blobName), bufferSize); - InputStream stream = is.markSupported() ? is : new BufferedInputStream(is); - stream.mark(1); - stream.skip(1); - stream.reset(); - return stream; + + try { + InputStream bufferedStream = is.markSupported() ? is : new BufferedInputStream(is); + bufferedStream.mark(1); + bufferedStream.skip(1); + bufferedStream.reset(); + success = true; + return bufferedStream; + } finally { + if (success == false) { + if (is != null) { + IOUtils.closeWhileHandlingException(is); + } + } + } }); } diff --git a/plugins/repository-hdfs/src/main/plugin-metadata/plugin-security.policy b/plugins/repository-hdfs/src/main/plugin-metadata/plugin-security.policy index 85447245c96f2..67d7ff6711281 100644 --- a/plugins/repository-hdfs/src/main/plugin-metadata/plugin-security.policy +++ b/plugins/repository-hdfs/src/main/plugin-metadata/plugin-security.policy @@ -35,4 +35,6 @@ grant { permission javax.security.auth.AuthPermission "getSubject"; permission javax.security.auth.AuthPermission "doAs"; permission javax.security.auth.AuthPermission "modifyPrivateCredentials"; + + permission java.net.SocketPermission "*", "connect"; }; From 68630017ab366f1fdb155d9dd05efc85ed7f53ed Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 27 Jan 2017 10:04:07 -0600 Subject: [PATCH 5/7] Remove policy changes --- .../main/resources/org/elasticsearch/bootstrap/security.policy | 2 +- .../src/main/plugin-metadata/plugin-security.policy | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/resources/org/elasticsearch/bootstrap/security.policy b/core/src/main/resources/org/elasticsearch/bootstrap/security.policy index 2aaba57fdc4b4..2b88253532aef 100644 --- a/core/src/main/resources/org/elasticsearch/bootstrap/security.policy +++ b/core/src/main/resources/org/elasticsearch/bootstrap/security.policy @@ -56,7 +56,7 @@ grant { permission org.elasticsearch.SpecialPermission; // Allow connecting to the internet anywhere - permission java.net.SocketPermission "*", "resolve"; + permission java.net.SocketPermission "*", "connect,resolve"; // Allow read access to all system properties permission java.util.PropertyPermission "*", "read"; diff --git a/plugins/repository-hdfs/src/main/plugin-metadata/plugin-security.policy b/plugins/repository-hdfs/src/main/plugin-metadata/plugin-security.policy index 67d7ff6711281..85447245c96f2 100644 --- a/plugins/repository-hdfs/src/main/plugin-metadata/plugin-security.policy +++ b/plugins/repository-hdfs/src/main/plugin-metadata/plugin-security.policy @@ -35,6 +35,4 @@ grant { permission javax.security.auth.AuthPermission "getSubject"; permission javax.security.auth.AuthPermission "doAs"; permission javax.security.auth.AuthPermission "modifyPrivateCredentials"; - - permission java.net.SocketPermission "*", "connect"; }; From 61a28fb699966958bae6ede03177c0238532fb39 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 27 Jan 2017 10:43:19 -0600 Subject: [PATCH 6/7] Add wrapping inputstream --- .../repositories/hdfs/HdfsBlobContainer.java | 74 ++++++++++++------- 1 file changed, 47 insertions(+), 27 deletions(-) diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index 648bc6f15b8dc..a266f63d173b5 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -21,13 +21,10 @@ import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.lucene.util.IOUtils; -import org.elasticsearch.SpecialPermission; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; @@ -36,6 +33,7 @@ import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation; import java.io.BufferedInputStream; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.file.FileAlreadyExistsException; @@ -92,30 +90,10 @@ public InputStream readBlob(String blobName) throws IOException { throw new NoSuchFileException("Blob [" + blobName + "] does not exist"); } // FSDataInputStream does buffering internally - // FSDataInputStream can open connection on read() or skip() - // - // In order to deal with socket permissions we ensure that we read (and then reset) - // some of the input stream. So that the caller of this method receives a stream that - // has already been opened. - return store.execute(fileContext -> { - boolean success = false; - FSDataInputStream is = fileContext.open(new Path(path, blobName), bufferSize); - - try { - InputStream bufferedStream = is.markSupported() ? is : new BufferedInputStream(is); - bufferedStream.mark(1); - bufferedStream.skip(1); - bufferedStream.reset(); - success = true; - return bufferedStream; - } finally { - if (success == false) { - if (is != null) { - IOUtils.closeWhileHandlingException(is); - } - } - } - }); + // FSDataInputStream can open connections on read() or skip() so we wrap in + // HDFSPrivilegedInputSteam which will ensure that underlying methods will + // be called with the proper privileges. + return store.execute(fileContext -> new HDFSPrivilegedInputSteam(fileContext.open(new Path(path, blobName), bufferSize))); } @Override @@ -161,4 +139,46 @@ public Map listBlobsByPrefix(@Nullable final String prefix public Map listBlobs() throws IOException { return listBlobsByPrefix(null); } + + private static class HDFSPrivilegedInputSteam extends FilterInputStream { + + HDFSPrivilegedInputSteam(InputStream in) { + super(in); + } + + public int read() throws IOException { + return doPrivilegedOrThrow(in::read); + } + + public int read(byte b[]) throws IOException { + return doPrivilegedOrThrow(() -> in.read(b)); + } + + public int read(byte b[], int off, int len) throws IOException { + return doPrivilegedOrThrow(() -> in.read(b, off, len)); + } + + public long skip(long n) throws IOException { + return doPrivilegedOrThrow(() -> in.skip(n)); + } + + public int available() throws IOException { + return doPrivilegedOrThrow(() -> in.available()); + } + + public synchronized void reset() throws IOException { + doPrivilegedOrThrow(() -> { + in.reset(); + return null; + }); + } + + private static T doPrivilegedOrThrow(PrivilegedExceptionAction action) throws IOException { + try { + return AccessController.doPrivileged(action); + } catch (PrivilegedActionException e) { + throw (IOException) e.getCause(); + } + } + } } From 1a363c6c929c402c6b0234e54bc1bfd414b68ff4 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 27 Jan 2017 10:44:56 -0600 Subject: [PATCH 7/7] Add comment about class --- .../elasticsearch/repositories/hdfs/HdfsBlobContainer.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index a266f63d173b5..e64ab5c4d910f 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -140,6 +140,11 @@ public Map listBlobs() throws IOException { return listBlobsByPrefix(null); } + /** + * Exists to wrap underlying InputStream methods that might make socket connections in + * doPrivileged blocks. This is due to the way that hdfs client libraries might open + * socket connections when you are reading from an InputStream. + */ private static class HDFSPrivilegedInputSteam extends FilterInputStream { HDFSPrivilegedInputSteam(InputStream in) {