diff --git a/presto-main/src/main/java/com/facebook/presto/execution/buffer/SpoolingOutputBufferFactory.java b/presto-main/src/main/java/com/facebook/presto/execution/buffer/SpoolingOutputBufferFactory.java index 0910d6989c21..5dd64294c5c7 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/buffer/SpoolingOutputBufferFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/buffer/SpoolingOutputBufferFactory.java @@ -46,7 +46,7 @@ public class SpoolingOutputBufferFactory private final Closer closer = Closer.create(); - private static ExecutorService coreExecutor = newCachedThreadPool(daemonThreadsNamed("spooling-outputbuffer-%s")); + private final ExecutorService coreExecutor = newCachedThreadPool(daemonThreadsNamed("spooling-outputbuffer-%s")); @Inject public SpoolingOutputBufferFactory(FeaturesConfig featuresConfig, TempStorageManager tempStorageManager, FinalizerService finalizerService) diff --git a/presto-main/src/test/java/com/facebook/presto/execution/buffer/TestSpoolingOutputBuffer.java b/presto-main/src/test/java/com/facebook/presto/execution/buffer/TestSpoolingOutputBuffer.java index 30d14b96eaad..5873dc9c2b53 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/buffer/TestSpoolingOutputBuffer.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/buffer/TestSpoolingOutputBuffer.java @@ -28,6 +28,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.io.IOException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -61,60 +62,65 @@ public class TestSpoolingOutputBuffer { private static final String TASK_INSTANCE_ID = "task-instance-id"; + private static final DataSize THRESHOLD = sizeOfPages(3); private static final List TYPES = ImmutableList.of(BIGINT); private static final OutputBufferId BUFFER_ID = new OutputBufferId(0); private static final OutputBufferId INVALID_BUFFER_ID = new OutputBufferId(1); + private static final OutputBuffers OUTPUT_BUFFERS = OutputBuffers.createSpoolingOutputBuffers(); private static final QueryIdGenerator queryIdGenerator = new QueryIdGenerator(); + private static SpoolingOutputBufferFactory spoolingOutputBufferFactory; + private ScheduledExecutorService stateNotificationExecutor; @BeforeClass public void setUp() { stateNotificationExecutor = newScheduledThreadPool(5, daemonThreadsNamed("test-%s")); + + FeaturesConfig featuresConfig = new FeaturesConfig(); + featuresConfig.setSpoolingOutputBufferThreshold(THRESHOLD); + spoolingOutputBufferFactory = new SpoolingOutputBufferFactory(featuresConfig); } @AfterClass(alwaysRun = true) public void tearDown() + throws IOException { if (stateNotificationExecutor != null) { stateNotificationExecutor.shutdownNow(); stateNotificationExecutor = null; } + spoolingOutputBufferFactory.shutdown(); } - @Test(enabled = false) + @Test public void testSimpleInMemory() { - DataSize threshold = sizeOfPages(5); - SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(threshold); + SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(); // add three pages - for (int i = 0; i < 3; i++) { + for (int i = 0; i < 2; i++) { addPage(buffer, createPage(i)); } - compareTotalBuffered(buffer, 3); + compareTotalBuffered(buffer, 2); assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 0, sizeOfPages(1), MAX_WAIT), bufferResult(0, createPage(0))); - compareTotalBuffered(buffer, 3); - - assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 1, sizeOfPages(1), MAX_WAIT), bufferResult(1, createPage(1))); compareTotalBuffered(buffer, 2); - assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 2, sizeOfPages(1), MAX_WAIT), bufferResult(2, createPage(2))); + assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 1, sizeOfPages(1), MAX_WAIT), bufferResult(1, createPage(1))); compareTotalBuffered(buffer, 1); buffer.setNoMorePages(); - assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 3, sizeOfPages(1), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 3, true)); + assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 2, sizeOfPages(1), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 2, true)); compareTotalBuffered(buffer, 0); } - @Test(enabled = false) + @Test public void testSimple() { - DataSize threshold = sizeOfPages(3); - SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(threshold); + SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(); List pages = new LinkedList<>(); // add five pages to storage @@ -171,11 +177,10 @@ public void testSimple() assertEquals(buffer.getInfo().getTotalPagesSent(), 9); } - @Test(enabled = false) + @Test void testUnevenMaxSize() { - DataSize threshold = sizeOfPages(5); - SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(threshold); + SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(); List pages = new LinkedList<>(); // add five pages to storage @@ -196,11 +201,10 @@ void testUnevenMaxSize() compareTotalBuffered(buffer, 8); } - @Test(enabled = false) + @Test void testGetOutOfOrder() { - DataSize threshold = sizeOfPages(3); - SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(threshold); + SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(); List pages = new LinkedList<>(); // add five pages to storage @@ -219,11 +223,10 @@ void testGetOutOfOrder() compareTotalBuffered(buffer, 5); } - @Test(enabled = false) + @Test public void testSimplePendingRead() { - DataSize threshold = sizeOfPages(3); - SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(threshold); + SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(); // attempt to get a page ListenableFuture future = buffer.get(BUFFER_ID, 0, sizeOfPages(2)); @@ -256,11 +259,10 @@ public void testSimplePendingRead() assertFalse(future.isDone()); } - @Test(enabled = false) + @Test public void testMultiplePendingReads() { - DataSize threshold = sizeOfPages(3); - SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(threshold); + SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(); // attempt to get a page ListenableFuture oldPendingRead = buffer.get(BUFFER_ID, 0, sizeOfPages(3)); @@ -281,11 +283,10 @@ public void testMultiplePendingReads() assertBufferResultEquals(TYPES, getFuture(newPendingRead, MAX_WAIT), createBufferResult(TASK_INSTANCE_ID, 0, pages)); } - @Test(enabled = false) + @Test public void testAddAfterPendingRead() { - DataSize threshold = sizeOfPages(3); - SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(threshold); + SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(); ListenableFuture pendingRead = buffer.get(BUFFER_ID, 0, sizeOfPages(5)); assertFalse(pendingRead.isDone()); @@ -309,11 +310,10 @@ public void testAddAfterPendingRead() assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 5, sizeOfPages(1), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 5, true)); } - @Test(enabled = false) + @Test public void testNoMorePagesAfterPendingRead() { - DataSize threshold = sizeOfPages(3); - SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(threshold); + SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(); ListenableFuture pendingRead = buffer.get(BUFFER_ID, 0, sizeOfPages(5)); assertFalse(pendingRead.isDone()); @@ -323,11 +323,10 @@ public void testNoMorePagesAfterPendingRead() assertBufferResultEquals(TYPES, getFuture(pendingRead, NO_WAIT), emptyResults(TASK_INSTANCE_ID, 0, true)); } - @Test(enabled = false) + @Test public void testDestroyAfterPendingRead() { - DataSize threshold = sizeOfPages(3); - SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(threshold); + SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(); ListenableFuture pendingRead = buffer.get(BUFFER_ID, 0, sizeOfPages(5)); assertFalse(pendingRead.isDone()); @@ -338,11 +337,10 @@ public void testDestroyAfterPendingRead() assertBufferResultEquals(TYPES, getFuture(pendingRead, NO_WAIT), emptyResults(TASK_INSTANCE_ID, 0, false)); } - @Test(enabled = false) + @Test public void testAcknowledgeSimple() { - DataSize threshold = sizeOfPages(3); - SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(threshold); + SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(); for (int i = 0; i < 3; i++) { addPage(buffer, createPage(i)); @@ -378,11 +376,10 @@ public void testAcknowledgeSimple() compareTotalBuffered(buffer, 3); } - @Test(enabled = false) + @Test public void testAcknowledgeStorageAndMemory() { - DataSize threshold = sizeOfPages(3); - SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(threshold); + SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(); // add three pages to file for (int i = 0; i < 3; i++) { @@ -399,11 +396,10 @@ public void testAcknowledgeStorageAndMemory() compareTotalBuffered(buffer, 1); } - @Test(enabled = false) + @Test public void testDuplicateGet() { - DataSize threshold = sizeOfPages(3); - SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(threshold); + SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(); // add three pages for (int i = 0; i < 3; i++) { @@ -427,11 +423,10 @@ public void testDuplicateGet() compareTotalBuffered(buffer, 0); } - @Test(enabled = false) + @Test public void testAddAfterNoMorePages() { - DataSize threshold = sizeOfPages(3); - SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(threshold); + SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(); for (int i = 0; i < 2; i++) { addPage(buffer, createPage(i)); @@ -450,11 +445,10 @@ public void testAddAfterNoMorePages() assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 2, sizeOfPages(3), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 2, true)); } - @Test(enabled = false) + @Test public void testAddAfterDestroy() { - DataSize threshold = sizeOfPages(3); - SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(threshold); + SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(); for (int i = 0; i < 2; i++) { addPage(buffer, createPage(i)); @@ -473,11 +467,10 @@ public void testAddAfterDestroy() assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 0, sizeOfPages(3), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 0, true)); } - @Test(enabled = false) + @Test public void testAbort() { - DataSize threshold = sizeOfPages(3); - SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(threshold); + SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(); // add three pages into a file for (int i = 0; i < 3; i++) { @@ -501,11 +494,10 @@ public void testAbort() compareTotalBuffered(buffer, 0); } - @Test(enabled = false) + @Test public void testSetOutputBuffers() { - DataSize threshold = sizeOfPages(3); - SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(threshold); + SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(); OutputBuffers newBuffers = new OutputBuffers(SPOOLING, 1, true, ImmutableMap.of()); buffer.setOutputBuffers(newBuffers); @@ -519,11 +511,10 @@ public void testSetOutputBuffers() } } - @Test(enabled = false) + @Test public void testBufferCompletion() { - DataSize threshold = sizeOfPages(3); - SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(threshold); + SpoolingOutputBuffer buffer = createSpoolingOutputBuffer(); assertFalse(buffer.isFinished()); @@ -558,18 +549,13 @@ public void testBufferCompletion() assertTrue(buffer.isFinished()); } - private SpoolingOutputBuffer createSpoolingOutputBuffer(DataSize threshold) + private SpoolingOutputBuffer createSpoolingOutputBuffer() { - FeaturesConfig featuresConfig = new FeaturesConfig(); - featuresConfig.setSpoolingOutputBufferThreshold(threshold); - - SpoolingOutputBufferFactory spoolingOutputBufferFactory = new SpoolingOutputBufferFactory(featuresConfig); - OutputBuffers outputBuffers = OutputBuffers.createSpoolingOutputBuffers(); TaskId taskId = new TaskId(queryIdGenerator.createNextQueryId().toString(), 0, 0, 0); return spoolingOutputBufferFactory.createSpoolingOutputBuffer( taskId, TASK_INSTANCE_ID, - outputBuffers, + OUTPUT_BUFFERS, new StateMachine<>("bufferState", stateNotificationExecutor, OPEN, TERMINAL_BUFFER_STATES)); }