diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 48928b508e318..4e819cd896a2c 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -1419,7 +1419,7 @@ com.microsoft.azure azure-storage - 7.0.0 + 7.0.1 diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemConcurrencyLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemConcurrencyLive.java index 1c868ea0ff1e6..2c99b84394f82 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemConcurrencyLive.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemConcurrencyLive.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Assert; @@ -130,15 +131,56 @@ public void testConcurrentDeleteFile() throws Exception { } } + /** + * Validate the bug fix for HADOOP-17089. Please note that we were never + * able to reproduce this except during a Spark job that ran for multiple days + * and in a hacked-up azure-storage SDK that added sleep before and after + * the call to factory.setNamespaceAware(true) as shown in the description of + * + * @see https://github.com/Azure/azure-storage-java/pull/546 + */ + @Test(timeout = TEST_EXECUTION_TIMEOUT) + public void testConcurrentList() throws Exception { + final Path testDir = new Path("/tmp/data-loss/11230174258112/_temporary/0/_temporary/attempt_20200624190514_0006_m_0"); + final Path testFile = new Path(testDir, "part-00004-15ea87b1-312c-4fdf-1820-95afb3dfc1c3-a010.snappy.parquet"); + fs.create(testFile).close(); + List tasks = new ArrayList<>(THREAD_COUNT); + + for (int i = 0; i < THREAD_COUNT; i++) { + tasks.add(new ListTask(fs, testDir)); + } + + ExecutorService es = null; + try { + es = Executors.newFixedThreadPool(THREAD_COUNT); + + List> futures = es.invokeAll(tasks); + + for (Future future : futures) { + Assert.assertTrue(future.isDone()); + + // we are using Callable, so if an exception + // occurred during the operation, it will be thrown + // when we call get + long fileCount = future.get(); + assertEquals("The list should always contain 1 file.", 1, fileCount); + } + } finally { + if (es != null) { + es.shutdownNow(); + } + } + } + abstract class FileSystemTask implements Callable { private final FileSystem fileSystem; private final Path path; - protected FileSystem getFileSystem() { + FileSystem getFileSystem() { return this.fileSystem; } - protected Path getFilePath() { + Path getFilePath() { return this.path; } @@ -182,4 +224,17 @@ public Void call() throws Exception { return null; } } + + class ListTask extends FileSystemTask { + ListTask(FileSystem fs, Path p) { + super(fs, p); + } + + public Integer call() throws Exception { + FileSystem fs = getFileSystem(); + Path p = getFilePath(); + FileStatus[] files = fs.listStatus(p); + return files.length; + } + } }