diff --git a/README.md b/README.md index 8ab46d0..68c74f4 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ $notifications = [ 'payload' => '{"message":"Hello World!"}', ], [ // current PushSubscription format (browsers might change this in the future) - 'subscription' => Subscription::create([ + 'subscription' => Subscription::create([ "endpoint" => "https://example.com/other/endpoint/of/another/vendor/abcdef...", "keys" => [ 'p256dh' => '(stringOf88Chars)', @@ -253,18 +253,18 @@ foreach ($webPush->flush() as $report) { echo "[v] Message sent successfully for subscription {$endpoint}."; } else { echo "[x] Message failed to sent for subscription {$endpoint}: {$report->getReason()}"; - + // also available (to get more info) - + /** @var \Psr\Http\Message\RequestInterface $requestToPushService */ $requestToPushService = $report->getRequest(); - + /** @var \Psr\Http\Message\ResponseInterface $responseOfPushService */ $responseOfPushService = $report->getResponse(); - + /** @var string $failReason */ $failReason = $report->getReason(); - + /** @var bool $isTheEndpointWrongOrExpired */ $isTheEndpointWrongOrExpired = $report->isSubscriptionExpired(); } @@ -364,6 +364,7 @@ Here are some ideas: 1. Make sure MultiCurl is available on your server 2. Find the right balance for your needs between security and performance (see above) 3. Find the right batch size (set it in `defaultOptions` or as parameter to `flush()`) +4. Use `flushPooled()` instead of `flush()`. The former uses concurrent requests, accelerating the process and often doubling the speed of the requests. ### How to solve "SSL certificate problem: unable to get local issuer certificate"? diff --git a/src/WebPush.php b/src/WebPush.php index 4f20faa..47ca09c 100644 --- a/src/WebPush.php +++ b/src/WebPush.php @@ -14,6 +14,7 @@ namespace Minishlink\WebPush; use GuzzleHttp\Client; +use GuzzleHttp\Pool; use GuzzleHttp\Exception\RequestException; use GuzzleHttp\Psr7\Request; use ParagonIE\ConstantTime\Base64UrlSafe; @@ -30,7 +31,7 @@ class WebPush protected ?array $notifications = null; /** - * @var array Default options: TTL, urgency, topic, batchSize + * @var array Default options: TTL, urgency, topic, batchSize, requestConcurrency */ protected array $defaultOptions; @@ -53,7 +54,7 @@ class WebPush * WebPush constructor. * * @param array $auth Some servers need authentication - * @param array $defaultOptions TTL, urgency, topic, batchSize + * @param array $defaultOptions TTL, urgency, topic, batchSize, requestConcurrency * @param int|null $timeout Timeout of POST request * * @throws \ErrorException @@ -175,6 +176,58 @@ public function flush(?int $batchSize = null): \Generator } } + /** + * Flush notifications. Triggers concurrent requests. + * + * @param callable(MessageSentReport): void $callback Callback for each notification + * @param null|int $batchSize Defaults the value defined in defaultOptions during instantiation (which defaults to 1000). + * @param null|int $requestConcurrency Defaults the value defined in defaultOptions during instantiation (which defaults to 100). + */ + public function flushPooled($callback, ?int $batchSize = null, ?int $requestConcurrency = null): void + { + if (empty($this->notifications)) { + return; + } + + if (null === $batchSize) { + $batchSize = $this->defaultOptions['batchSize']; + } + + if (null === $requestConcurrency) { + $requestConcurrency = $this->defaultOptions['requestConcurrency']; + } + + $batches = array_chunk($this->notifications, $batchSize); + $this->notifications = []; + + foreach ($batches as $batch) { + $batch = $this->prepare($batch); + $pool = new Pool($this->client, $batch, [ + 'requestConcurrency' => $requestConcurrency, + 'fulfilled' => function (ResponseInterface $response, int $index) use ($callback, $batch) { + /** @var \Psr\Http\Message\RequestInterface $request **/ + $request = $batch[$index]; + $callback(new MessageSentReport($request, $response)); + }, + 'rejected' => function (RequestException $reason) use ($callback) { + if (method_exists($reason, 'getResponse')) { + $response = $reason->getResponse(); + } else { + $response = null; + } + $callback(new MessageSentReport($reason->getRequest(), $response, false, $reason->getMessage())); + }, + ]); + + $promise = $pool->promise(); + $promise->wait(); + } + + if ($this->reuseVAPIDHeaders) { + $this->vapidHeaders = []; + } + } + /** * @throws \ErrorException|\Random\RandomException */ @@ -315,7 +368,7 @@ public function getDefaultOptions(): array } /** - * @param array $defaultOptions Keys 'TTL' (Time To Live, defaults 4 weeks), 'urgency', 'topic', 'batchSize' + * @param array $defaultOptions Keys 'TTL' (Time To Live, defaults 4 weeks), 'urgency', 'topic', 'batchSize', 'requestConcurrency' */ public function setDefaultOptions(array $defaultOptions): WebPush { @@ -323,6 +376,8 @@ public function setDefaultOptions(array $defaultOptions): WebPush $this->defaultOptions['urgency'] = $defaultOptions['urgency'] ?? null; $this->defaultOptions['topic'] = $defaultOptions['topic'] ?? null; $this->defaultOptions['batchSize'] = $defaultOptions['batchSize'] ?? 1000; + $this->defaultOptions['requestConcurrency'] = $defaultOptions['requestConcurrency'] ?? 100; + return $this; } diff --git a/tests/WebPushTest.php b/tests/WebPushTest.php index ef3d1d8..f02855e 100644 --- a/tests/WebPushTest.php +++ b/tests/WebPushTest.php @@ -222,6 +222,46 @@ public function testFlush(): void } } + /** + * @throws \ErrorException + * @throws \JsonException + */ + public function testFlushPooled(): void + { + $subscription = new Subscription(self::$endpoints['standard']); + + $report = $this->webPush->sendOneNotification($subscription); + $this->assertFalse($report->isSuccess()); // it doesn't have VAPID + + // queue has been reset + $this->assertEmpty(iterator_to_array($this->webPush->flush())); + + $report = $this->webPush->sendOneNotification($subscription); + $this->assertFalse($report->isSuccess()); // it doesn't have VAPID + + $nonExistentSubscription = Subscription::create([ + 'endpoint' => 'https://fcm.googleapis.com/fcm/send/fCd2-8nXJhU:APA91bGi2uaqFXGft4qdolwyRUcUPCL1XV_jWy1tpCRqnu4sk7ojUpC5gnq1PTncbCdMq9RCVQIIFIU9BjzScvjrDqpsI7J-K_3xYW8xo1xSNCfge1RvJ6Xs8RGL_Sw7JtbCyG1_EVgWDc22on1r_jozD8vsFbB0Fg', + 'publicKey' => 'BME-1ZSAv2AyGjENQTzrXDj6vSnhAIdKso4n3NDY0lsd1DUgEzBw7ARMKjrYAm7JmJBPsilV5CWNH0mVPyJEt0Q', + 'authToken' => 'hUIGbmiypj9_EQea8AnCKA', + 'contentEncoding' => 'aes128gcm', + ]); + + // test multiple requests + $this->webPush->queueNotification($nonExistentSubscription, json_encode(['test' => 1], JSON_THROW_ON_ERROR)); + $this->webPush->queueNotification($nonExistentSubscription, json_encode(['test' => 2], JSON_THROW_ON_ERROR)); + $this->webPush->queueNotification($nonExistentSubscription, json_encode(['test' => 3], JSON_THROW_ON_ERROR)); + + $callback = function ($report) { + $this->assertFalse($report->isSuccess()); + $this->assertTrue($report->isSubscriptionExpired()); + $this->assertEquals(410, $report->getResponse()->getStatusCode()); + $this->assertNotEmpty($report->getReason()); + $this->assertNotFalse(filter_var($report->getEndpoint(), FILTER_VALIDATE_URL)); + }; + + $this->webPush->flushPooled($callback); + } + public function testFlushEmpty(): void { $this->assertEmpty(iterator_to_array($this->webPush->flush(300)));