Skip to content

Commit

Permalink
Merge pull request #411 from prooph/bugfix/consider-stream-order
Browse files Browse the repository at this point in the history
Order messages in consideration of provided stream order
  • Loading branch information
codeliner authored Sep 6, 2020
2 parents c603c6e + 5391f41 commit 9b74792
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/StreamIterator/MergedStreamIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@ class MergedStreamIterator implements StreamIterator
*/
private $numberOfIterators;

/**
* @var array
*/
private $originalIteratorOrder;

public function __construct(array $streamNames, StreamIterator ...$iterators)
{
foreach ($iterators as $key => $iterator) {
$this->iterators[$key][0] = $iterator;
$this->iterators[$key][1] = $streamNames[$key];
}
$this->numberOfIterators = \count($this->iterators);
$this->originalIteratorOrder = $this->iterators;

$this->prioritizeIterators();
}
Expand Down Expand Up @@ -100,6 +106,8 @@ public function count(): int
private function prioritizeIterators(): void
{
if ($this->numberOfIterators > 1) {
$this->iterators = $this->originalIteratorOrder;

$this->timSort($this->iterators, $this->numberOfIterators);
}
}
Expand Down
27 changes: 27 additions & 0 deletions tests/StreamIterator/MergedStreamIteratorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,33 @@ public function it_returns_messages_in_order_for_large_streams(): void
}
}

/**
* @test
*/
public function it_returns_messages_in_order_in_consideration_of_provided_stream_order(): void
{
$streams = [
'streamA' => new InMemoryStreamIterator([
1 => TestDomainEvent::withPayloadAndSpecifiedCreatedAt(['expected_index' => 0, 'expected_position' => 1, 'expected_stream_name' => 'streamA'], 1, DateTimeImmutable::createFromFormat('Y-m-d\TH:i:s.u', '2018-02-26T17:29:45.000000')),
2 => TestDomainEvent::withPayloadAndSpecifiedCreatedAt(['expected_index' => 2, 'expected_position' => 2, 'expected_stream_name' => 'streamA'], 2, DateTimeImmutable::createFromFormat('Y-m-d\TH:i:s.u', '2018-02-26T20:28:25.000000')),
]),
'streamB' => new InMemoryStreamIterator([
1 => TestDomainEvent::withPayloadAndSpecifiedCreatedAt(['expected_index' => 1, 'expected_position' => 1, 'expected_stream_name' => 'streamB'], 1, DateTimeImmutable::createFromFormat('Y-m-d\TH:i:s.u', '2018-02-26T17:29:45.000000')),
2 => TestDomainEvent::withPayloadAndSpecifiedCreatedAt(['expected_index' => 3, 'expected_position' => 2, 'expected_stream_name' => 'streamB'], 2, DateTimeImmutable::createFromFormat('Y-m-d\TH:i:s.u', '2018-02-26T20:28:25.000000')),
]),
];

$iterator = new MergedStreamIterator(\array_keys($streams), ...\array_values($streams));

$index = 0;
foreach ($iterator as $position => $message) {
$this->assertEquals($index, $message->payload()['expected_index']);
$this->assertEquals($iterator->streamName(), $message->payload()['expected_stream_name']);

$index++;
}
}

/**
* @test
*/
Expand Down

0 comments on commit 9b74792

Please sign in to comment.