Skip to content

Commit

Permalink
Close ExecutorService in test
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Aug 29, 2019
1 parent a8bd67c commit 28dc4ee
Showing 1 changed file with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.prestosql.spi.type.Type;
import io.prestosql.sql.planner.plan.PlanNodeId;
import io.prestosql.testing.TestingTaskContext;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.List;
Expand Down Expand Up @@ -65,8 +67,6 @@ public class TestPartitionedOutputOperator
private static final DataSize PARTITION_MAX_MEMORY = new DataSize(5, MEGABYTE);
private static final List<Type> TYPES = ImmutableList.of(BIGINT);
private static final List<Type> REPLICATION_TYPES = ImmutableList.of(BIGINT, BIGINT);
private static final ExecutorService EXECUTOR = newCachedThreadPool(daemonThreadsNamed("test-EXECUTOR-%s"));
private static final ScheduledExecutorService SCHEDULER = newScheduledThreadPool(1, daemonThreadsNamed("test-%s"));

private static final Block NULL_BLOCK = new RunLengthEncodedBlock(BIGINT.createBlockBuilder(null, 1).appendNull().build(), POSITIONS_PER_PAGE);
private static final Block TESTING_BLOCK = createLongSequenceBlock(0, POSITIONS_PER_PAGE);
Expand All @@ -75,6 +75,25 @@ public class TestPartitionedOutputOperator
private static final Page TESTING_PAGE = new Page(TESTING_BLOCK);
private static final Page TESTING_PAGE_WITH_NULL_BLOCK = new Page(POSITIONS_PER_PAGE, NULL_BLOCK, TESTING_BLOCK);

private ExecutorService executor;
private ScheduledExecutorService scheduledExecutor;

@BeforeClass
public void setUp()
{
executor = newCachedThreadPool(daemonThreadsNamed("test-EXECUTOR-%s"));
scheduledExecutor = newScheduledThreadPool(1, daemonThreadsNamed("test-%s"));
}

@AfterClass(alwaysRun = true)
public void tearDown()
{
executor.shutdownNow();
executor = null;
scheduledExecutor.shutdownNow();
scheduledExecutor = null;
}

@Test
public void testOutputForSimplePage()
{
Expand Down Expand Up @@ -159,12 +178,12 @@ public void testOutputForPageWithRunLengthAndReplication()
assertEquals(operatorContext.getOutputPositions().getTotalCount(), PAGE_COUNT * PARTITION_COUNT * TESTING_PAGE_WITH_NULL_BLOCK.getPositionCount());
}

private static PartitionedOutputOperator createPartitionedOutputOperator(boolean shouldReplicate)
private PartitionedOutputOperator createPartitionedOutputOperator(boolean shouldReplicate)
{
PartitionFunction partitionFunction = new LocalPartitionGenerator(new InterpretedHashGenerator(ImmutableList.of(BIGINT), new int[] {0}), PARTITION_COUNT);
PagesSerdeFactory serdeFactory = new PagesSerdeFactory(createTestMetadataManager().getBlockEncodingSerde(), false);

DriverContext driverContext = TestingTaskContext.builder(EXECUTOR, SCHEDULER, TEST_SESSION)
DriverContext driverContext = TestingTaskContext.builder(executor, scheduledExecutor, TEST_SESSION)
.setMemoryPoolSize(MAX_MEMORY)
.build()
.addPipelineContext(0, true, true, false)
Expand All @@ -176,11 +195,11 @@ private static PartitionedOutputOperator createPartitionedOutputOperator(boolean
}
PartitionedOutputBuffer buffer = new PartitionedOutputBuffer(
"task-instance-id",
new StateMachine<>("bufferState", SCHEDULER, OPEN, TERMINAL_BUFFER_STATES),
new StateMachine<>("bufferState", scheduledExecutor, OPEN, TERMINAL_BUFFER_STATES),
buffers.withNoMoreBufferIds(),
new DataSize(Long.MAX_VALUE, BYTE),
() -> new SimpleLocalMemoryContext(newSimpleAggregatedMemoryContext(), "test"),
SCHEDULER);
scheduledExecutor);

PartitionedOutputOperator.PartitionedOutputFactory operatorFactory;
if (shouldReplicate) {
Expand Down

0 comments on commit 28dc4ee

Please sign in to comment.