Skip to content

Commit

Permalink
Wrap arguments in TestStageTaskSourceFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Aug 9, 2022
1 parent bcfe05f commit 586b5fd
Showing 1 changed file with 42 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -335,16 +335,28 @@ PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)),
DataSize.of(0, BYTE));
assertFalse(taskSource.isFinished());
assertEquals(getFutureValue(taskSource.getMoreTasks()), ImmutableList.of(
new TaskDescriptor(0, ImmutableListMultimap.of(), ImmutableListMultimap.of(
PLAN_NODE_1, new TestingExchangeSourceHandle(0, 1),
PLAN_NODE_2, new TestingExchangeSourceHandle(0, 1),
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))),
new TaskDescriptor(1, ImmutableListMultimap.of(), ImmutableListMultimap.of(
PLAN_NODE_1, new TestingExchangeSourceHandle(1, 1),
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))),
new TaskDescriptor(2, ImmutableListMultimap.of(), ImmutableListMultimap.of(
PLAN_NODE_2, new TestingExchangeSourceHandle(3, 1),
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE)))));
new TaskDescriptor(
0,
ImmutableListMultimap.of(),
ImmutableListMultimap.of(
PLAN_NODE_1, new TestingExchangeSourceHandle(0, 1),
PLAN_NODE_2, new TestingExchangeSourceHandle(0, 1),
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)),
new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))),
new TaskDescriptor(
1,
ImmutableListMultimap.of(),
ImmutableListMultimap.of(
PLAN_NODE_1, new TestingExchangeSourceHandle(1, 1),
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)),
new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))),
new TaskDescriptor(
2,
ImmutableListMultimap.of(),
ImmutableListMultimap.of(
PLAN_NODE_2, new TestingExchangeSourceHandle(3, 1),
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)),
new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE)))));
assertTrue(taskSource.isFinished());

Split bucketedSplit1 = createBucketedSplit(0, 0);
Expand All @@ -371,25 +383,29 @@ PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)),
ImmutableListMultimap.of(
PLAN_NODE_4, bucketedSplit1),
ImmutableListMultimap.of(
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))),
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)),
new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))),
new TaskDescriptor(
1,
ImmutableListMultimap.of(
PLAN_NODE_5, bucketedSplit4),
ImmutableListMultimap.of(
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))),
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)),
new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))),
new TaskDescriptor(
2,
ImmutableListMultimap.of(
PLAN_NODE_4, bucketedSplit2),
ImmutableListMultimap.of(
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))),
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)),
new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))),
new TaskDescriptor(
3,
ImmutableListMultimap.of(
PLAN_NODE_4, bucketedSplit3),
ImmutableListMultimap.of(
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE)))));
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)),
new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE)))));
assertTrue(taskSource.isFinished());

taskSource = createHashDistributionTaskSource(
Expand Down Expand Up @@ -417,27 +433,31 @@ PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)),
ImmutableListMultimap.of(
PLAN_NODE_1, new TestingExchangeSourceHandle(0, 1),
PLAN_NODE_2, new TestingExchangeSourceHandle(0, 1),
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))),
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)),
new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))),
new TaskDescriptor(
1,
ImmutableListMultimap.of(
PLAN_NODE_5, bucketedSplit4),
ImmutableListMultimap.of(
PLAN_NODE_1, new TestingExchangeSourceHandle(1, 1),
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))),
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)),
new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))),
new TaskDescriptor(
2,
ImmutableListMultimap.of(
PLAN_NODE_4, bucketedSplit2),
ImmutableListMultimap.of(
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))),
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)),
new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))),
new TaskDescriptor(
3,
ImmutableListMultimap.of(
PLAN_NODE_4, bucketedSplit3),
ImmutableListMultimap.of(
PLAN_NODE_2, new TestingExchangeSourceHandle(3, 1),
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE)))));
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)),
new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE)))));
assertTrue(taskSource.isFinished());

taskSource = createHashDistributionTaskSource(
Expand All @@ -464,15 +484,17 @@ PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)),
ImmutableListMultimap.of(
PLAN_NODE_1, new TestingExchangeSourceHandle(0, 1),
PLAN_NODE_2, new TestingExchangeSourceHandle(0, 1),
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))),
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)),
new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))),
new TaskDescriptor(
1,
ImmutableListMultimap.of(
PLAN_NODE_4, bucketedSplit3,
PLAN_NODE_5, bucketedSplit4),
ImmutableListMultimap.of(
PLAN_NODE_1, new TestingExchangeSourceHandle(1, 1),
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE)))));
PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)),
new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE)))));
assertTrue(taskSource.isFinished());

// join based on split target split weight
Expand Down

0 comments on commit 586b5fd

Please sign in to comment.