Skip to content

Commit

Permalink
Remove expired members from subscription topic stored in Redis set map
Browse files Browse the repository at this point in the history
  • Loading branch information
kirills-morozovs authored Aug 20, 2024
1 parent 0b9da72 commit e58f99b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 10 deletions.
35 changes: 27 additions & 8 deletions src/Subscriptions/Storage/RedisStorageManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

/**
* Stores subscribers and topics in redis.
*
* - Topics are subscriptions like "userCreated" or "userDeleted".
* - Subscribers are clients that are listening to channels like "private-lighthouse-a7ef3d".
*
Expand Down Expand Up @@ -61,10 +62,13 @@ public function subscribersByTopic(string $topic): Collection
// As explained in storeSubscriber, we use redis sets to store the names of subscribers of a topic.
// We can retrieve all members of a set using the command smembers.
$subscriberIds = $this->connection->command('smembers', [$this->topicKey($topic)]);
if (count($subscriberIds) === 0) {
if ($subscriberIds === []) {
return new Collection();
}

// Store all keys as missing keys to remove the ones which are expired later.
$missingKeys = $subscriberIds;

// Since we store the individual subscribers with a prefix,
// but not in the set, we have to add the prefix here.
$subscriberIds = array_map([$this, 'channelKey'], $subscriberIds);
Expand All @@ -73,30 +77,45 @@ public function subscribersByTopic(string $topic): Collection
// This is like using multiple get calls (getSubscriber uses the get command).
$subscribers = $this->connection->command('mget', [$subscriberIds]);

return (new Collection($subscribers))
$subscribersCollection = (new Collection($subscribers))
->filter()
->map(static function (?string $subscriber): ?Subscriber {
// Some entries may be expired
->map(static function (?string $subscriber) use (&$missingKeys): ?Subscriber {
// Some entries may be expired.
if ($subscriber === null) {
return null;
}

// Other entries may contain invalid values
// Other entries may contain invalid values.
try {
return unserialize($subscriber);
$subscriber = unserialize($subscriber);

// This key exists so remove it from the list of missing keys.
$missingKeys = array_diff($missingKeys, [$subscriber->channel]);

return $subscriber;
} catch (\ErrorException) {
return null;
}
})
->filter();

// Remove expired subscribers from the set of subscribers of this topic.
if ($missingKeys !== []) {
$this->connection->command('srem', [
$this->topicKey($topic),
...$missingKeys,
]);
}

return $subscribersCollection;
}

public function storeSubscriber(Subscriber $subscriber, string $topic): void
{
$subscriber->topic = $topic;

// In contrast to the CacheStorageManager, we use redis sets.
// Instead of reading the entire list, adding the subscriber and storing the list;
// Instead of reading the entire list, adding the subscriber, and storing the list;
// we simply add the name of the subscriber to the set of subscribers of this topic using the sadd command...
$topicKey = $this->topicKey($topic);
$this->connection->command('sadd', [
Expand All @@ -108,7 +127,7 @@ public function storeSubscriber(Subscriber $subscriber, string $topic): void
$this->connection->command('expire', [$topicKey, $this->ttl]);
}

// Lastly, we store the subscriber as a serialized string...
// Lastly, we store the subscriber as a serialized string.
$setCommand = 'set';
$setArguments = [
$this->channelKey($subscriber->channel),
Expand Down
7 changes: 5 additions & 2 deletions tests/Unit/Subscriptions/Storage/RedisStorageManagerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,20 @@ public function testSubscribersByTopic(): void
$subscriber2,
];

$redisConnection->expects($this->exactly(2))
$redisConnection->expects($this->exactly(3))
->method('command')
->with(...$this->withConsecutive(
['smembers', ["graphql.topic.{$topic}"]],
['mget', [[
'graphql.subscriber.foo1',
'graphql.subscriber.foo2',
'graphql.subscriber.foo3',
'graphql.subscriber.foo4',
]]],
['srem', ["graphql.topic.{$topic}", 'foo3', 'foo4']],
))
->willReturnOnConsecutiveCalls(
['foo1', 'foo2', 'foo3'],
['foo1', 'foo2', 'foo3', 'foo4'],
[
serialize($subscriber1),
serialize($subscriber2),
Expand All @@ -219,6 +221,7 @@ public function testSubscribersByTopic(): void
// mget non-existing-entry
false,
],
null,
);

$this->assertEquals(
Expand Down

0 comments on commit e58f99b

Please sign in to comment.