From ca320b5eb58493f4cf5a4fb928e5d75e31ad8cd9 Mon Sep 17 00:00:00 2001 From: Wilco Louwerse Date: Tue, 26 Nov 2024 12:55:34 +0100 Subject: [PATCH 1/8] w.i.p. RateLimit for Synchronizations --- lib/Db/Synchronization.php | 3 + lib/Migration/Version1Date20241121160300.php | 11 +++ lib/Service/SynchronizationService.php | 78 ++++++++++++++++---- 3 files changed, 78 insertions(+), 14 deletions(-) diff --git a/lib/Db/Synchronization.php b/lib/Db/Synchronization.php index 0460859c..f5220616 100644 --- a/lib/Db/Synchronization.php +++ b/lib/Db/Synchronization.php @@ -21,6 +21,7 @@ class Synchronization extends Entity implements JsonSerializable protected ?DateTime $sourceLastChanged = null; // The last changed date of the source object protected ?DateTime $sourceLastChecked = null; // The last checked date of the source object protected ?DateTime $sourceLastSynced = null; // The last synced date of the source object + protected ?int $currentPage = 1; // The last page synced. Used for keeping track where to continue syncing after Rate Limit has been exceeded on source with pagination. // Target protected ?string $targetId = null; // The id of the target object protected ?string $targetType = null; // The type of the target object (e.g. api, database, register/schema.) @@ -48,6 +49,7 @@ public function __construct() { $this->addType('sourceLastChanged', 'datetime'); $this->addType('sourceLastChecked', 'datetime'); $this->addType('sourceLastSynced', 'datetime'); + $this->addType('currentPage', 'integer'); $this->addType('targetId', 'string'); $this->addType('targetType', 'string'); $this->addType('targetHash', 'string'); @@ -106,6 +108,7 @@ public function jsonSerialize(): array 'sourceLastChanged' => isset($this->sourceLastChanged) === true ? $this->sourceLastChanged->format('c') : null, 'sourceLastChecked' => isset($this->sourceLastChecked) === true ? $this->sourceLastChecked->format('c') : null, 'sourceLastSynced' => isset($this->sourceLastSynced) === true ? $this->sourceLastSynced->format('c') : null, + 'currentPage' => $this->currentPage, 'targetId' => $this->targetId, 'targetType' => $this->targetType, 'targetHash' => $this->targetHash, diff --git a/lib/Migration/Version1Date20241121160300.php b/lib/Migration/Version1Date20241121160300.php index daef153b..b6dc8b4f 100644 --- a/lib/Migration/Version1Date20241121160300.php +++ b/lib/Migration/Version1Date20241121160300.php @@ -41,6 +41,7 @@ public function changeSchema(IOutput $output, Closure $schemaClosure, array $opt * @var ISchemaWrapper $schema */ $schema = $schemaClosure(); + // Sources table $table = $schema->getTable('openconnector_sources'); if ($table->hasColumn('rate_limit_limit') === false) { @@ -71,6 +72,16 @@ public function changeSchema(IOutput $output, Closure $schemaClosure, array $opt ]); } + // Synchronizations table + $table = $schema->getTable('openconnector_synchronizations'); + + if ($table->hasColumn('current_page') === false) { + $table->addColumn('current_page', Types::INTEGER, [ + 'notnull' => false, + 'default' => 1 + ]); + } + return $schema; } diff --git a/lib/Service/SynchronizationService.php b/lib/Service/SynchronizationService.php index 37fcb836..0bff1d7d 100644 --- a/lib/Service/SynchronizationService.php +++ b/lib/Service/SynchronizationService.php @@ -359,6 +359,7 @@ public function getAllObjectsFromSource(Synchronization $synchronization, ?bool /** * Retrieves all objects from an API source for a given synchronization. + * @todo re-write this entire function so that it is recursive. * * @param Synchronization $synchronization The synchronization object containing source information. * @param bool $isTest If we only want to return a single object (for example a test) @@ -372,7 +373,11 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is $objects = []; $source = $this->sourceMapper->find(id: $synchronization->getSourceId()); - // Lets get the source config + if ($source->getRateLimitRemaining() !== null && $source->getRateLimitRemaining() <= 0) { + // @todo + } + + // Let's get the source config $sourceConfig = $synchronization->getSourceConfig(); $endpoint = $sourceConfig['endpoint'] ?? ''; $headers = $sourceConfig['headers'] ?? []; @@ -382,12 +387,27 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is 'query' => $query, ]; + // Get CurrentPage from Synchronization. + $currentPage = $synchronization->getCurrentPage() ?? 1; + // Current Page could be higher than 1 if we continue after rate limit had been reached previously. + if ($currentPage > 1) { + $config = $this->getNextPage(config: $config, sourceConfig: $sourceConfig, currentPage: $currentPage); + } + // Make the initial API call - $response = $this->callService->call(source: $source, endpoint: $endpoint, method: 'GET', config: $config)->getResponse(); + $callLog = $this->callService->call(source: $source, endpoint: $endpoint, method: 'GET', config: $config); + $response = $callLog->getResponse(); + if ($response === null) { + // @todo + if ($callLog->getStatusCode() === 429) { + // @todo + } + } + $lastHash = md5($response['body']); $body = json_decode($response['body'], true); if (empty($body) === true) { - // @todo log that we got a empty response + // @todo log that we got an empty response return []; } $objects = array_merge($objects, $this->getAllObjectsFromArray(array: $body, synchronization: $synchronization)); @@ -397,29 +417,53 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is return [$objects[0]] ?? []; } - // Current page is 2 because the first call made above is page 1. - $currentPage = 2; + // Increase Current page because of the first call made above. + $currentPage++; + // Update CurrentPage on Synchronization + $synchronization->setCurrentPage($currentPage); + $this->synchronizationMapper->update($synchronization); + $useNextEndpoint = false; if (array_key_exists('next', $body)) { $useNextEndpoint = true; } - // Continue making API calls if there are more pages from 'next' the response body or if paginationQuery is set - while($useNextEndpoint === true && $nextEndpoint = $this->getNextEndpoint(body: $body, url: $source->getLocation(), sourceConfig: $sourceConfig, currentPage: $currentPage)) { - // Do not pass $config here becuase it overwrites the query attached to nextEndpoint - $response = $this->callService->call(source: $source, endpoint: $nextEndpoint)->getResponse(); + while ($useNextEndpoint === true && $nextEndpoint = $this->getNextEndpoint(body: $body, url: $source->getLocation())) { + // Do not pass $config here because it overwrites the query attached to nextEndpoint + $callLog = $this->callService->call(source: $source, endpoint: $nextEndpoint); + $response = $callLog->getResponse(); + if ($response === null) { + // @todo + if ($callLog->getStatusCode() === 429) { + // @todo + } + } + $body = json_decode($response['body'], true); $objects = array_merge($objects, $this->getAllObjectsFromArray($body, $synchronization)); + + // Increase Current page. And update CurrentPage on Synchronization. + $currentPage++; + $synchronization->setCurrentPage($currentPage); + $this->synchronizationMapper->update($synchronization); } if ($useNextEndpoint === false) { do { $config = $this->getNextPage(config: $config, sourceConfig: $sourceConfig, currentPage: $currentPage); - $response = $this->callService->call(source: $source, endpoint: $endpoint, method: 'GET', config: $config)->getResponse(); + $callLog = $this->callService->call(source: $source, endpoint: $endpoint, method: 'GET', config: $config); + $response = $callLog->getResponse(); + if ($response === null) { + // @todo + if ($callLog->getStatusCode() === 429) { + // @todo + } + } + $hash = md5($response['body']); - if($hash === $lastHash) { + if ($hash === $lastHash) { break; } @@ -432,10 +476,18 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is $newObjects = $this->getAllObjectsFromArray(array: $body, synchronization: $synchronization); $objects = array_merge($objects, $newObjects); + + // Increase Current page. And update CurrentPage on Synchronization. $currentPage++; + $synchronization->setCurrentPage($currentPage); + $this->synchronizationMapper->update($synchronization); } while (empty($newObjects) === false); } + // Reset Current Page to 1 when we are done with all pages. + $synchronization->setCurrentPage(1); + $this->synchronizationMapper->update($synchronization); + return $objects; } @@ -444,12 +496,10 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is * * @param array $body * @param string $url - * @param array $sourceConfig - * @param int $currentPage * * @return string|null The next endpoint URL if a next link or pagination query is available, or null if neither exists. */ - private function getNextEndpoint(array $body, string $url, array $sourceConfig, int $currentPage): ?string + private function getNextEndpoint(array $body, string $url): ?string { $nextLink = $this->getNextlinkFromCall($body); From 05f444c52e77dbd899c45402e57baee705e57a37 Mon Sep 17 00:00:00 2001 From: Wilco Louwerse Date: Tue, 26 Nov 2024 14:44:42 +0100 Subject: [PATCH 2/8] Updated migration docblock --- lib/Migration/Version1Date20241121160300.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/Migration/Version1Date20241121160300.php b/lib/Migration/Version1Date20241121160300.php index b6dc8b4f..66ae4e42 100644 --- a/lib/Migration/Version1Date20241121160300.php +++ b/lib/Migration/Version1Date20241121160300.php @@ -18,7 +18,8 @@ /** * This migration changes the following: - * - Adding 3 new columns for the table Source: rateLimitLimit, rateLimitRemaining & rateLimitReset + * - Adding 4 new columns for the table Source: rateLimitLimit, rateLimitRemaining, rateLimitReset & rateLimitWindow + * - Adding 1 new column for the table Synchronization: CurrentPage */ class Version1Date20241121160300 extends SimpleMigrationStep { From 8d67010f5bc2facf4c929fa61e8b5248fc226d81 Mon Sep 17 00:00:00 2001 From: Wilco Louwerse Date: Tue, 26 Nov 2024 17:52:27 +0100 Subject: [PATCH 3/8] w.i.p. try to somehow set job nextrun when RateLimit has been reached --- lib/Action/SynchronizationAction.php | 10 +++- lib/Cron/ActionTask.php | 6 ++- lib/Service/SynchronizationService.php | 64 ++++++++++++++++++++++++-- 3 files changed, 73 insertions(+), 7 deletions(-) diff --git a/lib/Action/SynchronizationAction.php b/lib/Action/SynchronizationAction.php index 396e2627..12cc9780 100644 --- a/lib/Action/SynchronizationAction.php +++ b/lib/Action/SynchronizationAction.php @@ -6,6 +6,7 @@ use OCA\OpenConnector\Service\SynchronizationService; use OCA\OpenConnector\Db\SynchronizationMapper; use OCA\OpenConnector\Db\SynchronizationContractMapper; +use Symfony\Component\HttpKernel\Exception\TooManyRequestsHttpException; /** * This action handles the synchronization of data from the source to the target. @@ -70,7 +71,14 @@ public function run(array $argument = []): array $response['stackTrace'][] = 'Doing the synchronization'; try { $objects = $this->synchronizationService->synchronize($synchronization); - } catch (Exception $e) { + } catch (TooManyRequestsHttpException $e) { + if (isset($e->getHeaders()['X-RateLimit-Reset']) === true) { + $response['nextRun'] = $e->getHeaders()['X-RateLimit-Reset']; + } + $response['level'] = 'WARNING'; + $response['stackTrace'][] = $response['message'] = 'Stopped synchronization: ' . $e->getMessage(); + return $response; + } catch (Exception $e) { $response['level'] = 'ERROR'; $response['stackTrace'][] = $response['message'] = 'Failed to synchronize: ' . $e->getMessage(); return $response; diff --git a/lib/Cron/ActionTask.php b/lib/Cron/ActionTask.php index a5b3a54d..c3924b0e 100644 --- a/lib/Cron/ActionTask.php +++ b/lib/Cron/ActionTask.php @@ -99,8 +99,12 @@ public function run($argument) // Update the job + $nextRun = new DateTime(); + if (isset($result['nextRun']) === true) { + $nextRun = $result['nextRun']; + } $job->setLastRun(new DateTime()); - $job->setNextRun(new DateTime()); + $job->setNextRun($nextRun); $this->jobMapper->update($job); // Log the job diff --git a/lib/Service/SynchronizationService.php b/lib/Service/SynchronizationService.php index 09efcb92..0c82604d 100644 --- a/lib/Service/SynchronizationService.php +++ b/lib/Service/SynchronizationService.php @@ -20,6 +20,7 @@ use OCP\AppFramework\Db\MultipleObjectsReturnedException; use Psr\Container\ContainerExceptionInterface; use Psr\Container\NotFoundExceptionInterface; +use Symfony\Component\HttpKernel\Exception\TooManyRequestsHttpException; use Symfony\Component\Uid\Uuid; use OCP\AppFramework\Db\DoesNotExistException; use Adbar\Dot; @@ -88,7 +89,11 @@ public function synchronize(Synchronization $synchronization, ?bool $isTest = fa throw new Exception('sourceId of synchronziation cannot be empty. Canceling synchronization..'); } - $objectList = $this->getAllObjectsFromSource(synchronization: $synchronization, isTest: $isTest); + try { + $objectList = $this->getAllObjectsFromSource(synchronization: $synchronization, isTest: $isTest); + } catch (TooManyRequestsHttpException $e) { + $rateLimitException = $e; + } foreach ($objectList as $key => $object) { // If the source configuration contains a dot notation for the id position, we need to extract the id from the source object @@ -136,6 +141,15 @@ public function synchronize(Synchronization $synchronization, ?bool $isTest = fa $this->synchronizationContractMapper->update($synchronizationContract); } + if (isset($rateLimitException) === true) { + var_dump('test'); + throw new TooManyRequestsHttpException( + message: $rateLimitException->getMessage(), + code: 429, + headers: $rateLimitException->getHeaders() + ); + } + return $objectList; } @@ -423,7 +437,17 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is $source = $this->sourceMapper->find(id: $synchronization->getSourceId()); if ($source->getRateLimitRemaining() !== null && $source->getRateLimitRemaining() <= 0) { - // @todo + throw new TooManyRequestsHttpException( + message: "Rate Limit on Source has been exceeded. Canceling synchronization...", + code: 429, + headers: [ + 'X-RateLimit-Limit' => [(string) $source->getRateLimitLimit()], + 'X-RateLimit-Remaining' => [(string) $source->getRateLimitRemaining()], + 'X-RateLimit-Reset' => [(string) $source->getRateLimitReset()], + 'X-RateLimit-Used' => ["0"], + 'X-RateLimit-Window' => [(string) $source->getRateLimitWindow()], + ] + ); } // Let's get the source config @@ -450,7 +474,17 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is if ($response === null) { // @todo if ($callLog->getStatusCode() === 429) { - // @todo + throw new TooManyRequestsHttpException( + message: "Stopped sync because rate limit on Source has been exceeded.", + code: 429, + headers: [ + 'X-RateLimit-Limit' => [(string) $source->getRateLimitLimit()], + 'X-RateLimit-Remaining' => [(string) $source->getRateLimitRemaining()], + 'X-RateLimit-Reset' => [(string) $source->getRateLimitReset()], + 'X-RateLimit-Used' => ["1"], + 'X-RateLimit-Window' => [(string) $source->getRateLimitWindow()], + ] + ); } } @@ -486,7 +520,17 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is if ($response === null) { // @todo if ($callLog->getStatusCode() === 429) { - // @todo + throw new TooManyRequestsHttpException( + message: "Stopped sync because rate limit on Source has been exceeded.", + code: 429, + headers: [ + 'X-RateLimit-Limit' => [(string) $source->getRateLimitLimit()], + 'X-RateLimit-Remaining' => [(string) $source->getRateLimitRemaining()], + 'X-RateLimit-Reset' => [(string) $source->getRateLimitReset()], + 'X-RateLimit-Used' => ["1"], + 'X-RateLimit-Window' => [(string) $source->getRateLimitWindow()], + ] + ); } } @@ -507,7 +551,17 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is if ($response === null) { // @todo if ($callLog->getStatusCode() === 429) { - // @todo + throw new TooManyRequestsHttpException( + message: "Stopped sync because rate limit on Source has been exceeded.", + code: 429, + headers: [ + 'X-RateLimit-Limit' => [(string) $source->getRateLimitLimit()], + 'X-RateLimit-Remaining' => [(string) $source->getRateLimitRemaining()], + 'X-RateLimit-Reset' => [(string) $source->getRateLimitReset()], + 'X-RateLimit-Used' => ["1"], + 'X-RateLimit-Window' => [(string) $source->getRateLimitWindow()], + ] + ); } } From 927e6297a46620ed3ee8983e0c14aa1c86949fec Mon Sep 17 00:00:00 2001 From: Wilco Louwerse Date: Fri, 29 Nov 2024 11:15:04 +0100 Subject: [PATCH 4/8] Improvements for checking RateLimit during synchronization --- lib/Controller/SynchronizationsController.php | 87 +++++++++++-------- lib/Cron/ActionTask.php | 8 +- lib/Service/SynchronizationService.php | 48 ++++------ 3 files changed, 76 insertions(+), 67 deletions(-) diff --git a/lib/Controller/SynchronizationsController.php b/lib/Controller/SynchronizationsController.php index 9ceb1e9b..d2c63c2e 100644 --- a/lib/Controller/SynchronizationsController.php +++ b/lib/Controller/SynchronizationsController.php @@ -2,6 +2,7 @@ namespace OCA\OpenConnector\Controller; +use GuzzleHttp\Exception\GuzzleException; use OCA\OpenConnector\Service\ObjectService; use OCA\OpenConnector\Service\SearchService; use OCA\OpenConnector\Service\SynchronizationService; @@ -15,6 +16,8 @@ use OCP\IRequest; use Exception; use OCP\AppFramework\Db\DoesNotExistException; +use Psr\Container\ContainerExceptionInterface; +use Psr\Container\NotFoundExceptionInterface; class SynchronizationsController extends Controller { @@ -213,33 +216,36 @@ public function logs(int $id): JSONResponse } } - /** - * Tests a synchronization - * - * This method tests a synchronization without persisting anything to the database. - * - * @NoAdminRequired - * @NoCSRFRequired - * - * @param int $id The ID of the synchronization - * - * @return JSONResponse A JSON response containing the test results - * - * @example - * Request: - * empty POST - * - * Response: - * { - * "resultObject": { - * "fullName": "John Doe", - * "userAge": 30, - * "contactEmail": "john@example.com" - * }, - * "isValid": true, - * "validationErrors": [] - * } - */ + /** + * Tests a synchronization + * + * This method tests a synchronization without persisting anything to the database. + * + * @NoAdminRequired + * @NoCSRFRequired + * + * @param int $id The ID of the synchronization + * + * @return JSONResponse A JSON response containing the test results + * @throws GuzzleException + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * + * @example + * Request: + * empty POST + * + * Response: + * { + * "resultObject": { + * "fullName": "John Doe", + * "userAge": 30, + * "contactEmail": "john@example.com" + * }, + * "isValid": true, + * "validationErrors": [] + * } + */ public function test(int $id): JSONResponse { try { @@ -250,17 +256,26 @@ public function test(int $id): JSONResponse // Try to synchronize try { - $logAndContractArray = $this->synchronizationService->synchronize(synchronization: $synchronization, isTest: true); + $logAndContractArray = $this->synchronizationService->synchronize( + synchronization: $synchronization, + isTest: true + ); + // Return the result as a JSON response return new JSONResponse(data: $logAndContractArray, statusCode: 200); } catch (Exception $e) { - // If synchronizaiton fails, return an error response - return new JSONResponse([ - 'error' => 'Synchronization error', - 'message' => $e->getMessage() - ], 400); - } + // Check if getHeaders method exists and use it if available + $headers = method_exists($e, 'getHeaders') ? $e->getHeaders() : null; - return new JSONResponse($resultFromTest, 200); + // If synchronization fails, return an error response + return new JSONResponse( + data: [ + 'error' => 'Synchronization error', + 'message' => $e->getMessage() + ], + statusCode: $e->getCode() ?? 400, + headers: $headers + ); + } } -} \ No newline at end of file +} diff --git a/lib/Cron/ActionTask.php b/lib/Cron/ActionTask.php index c3924b0e..56c14cf9 100644 --- a/lib/Cron/ActionTask.php +++ b/lib/Cron/ActionTask.php @@ -8,7 +8,10 @@ use OCP\BackgroundJob\TimedJob; use OCP\AppFramework\Utility\ITimeFactory; use OCP\BackgroundJob\IJobList; +use OCP\DB\Exception; +use Psr\Container\ContainerExceptionInterface; use Psr\Container\ContainerInterface; +use Psr\Container\NotFoundExceptionInterface; use Symfony\Component\Uid\Uuid; use DateInterval; use DateTime; @@ -55,6 +58,9 @@ public function __construct( * @param $argument * * @return JobLog|void + * @throws Exception + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface */ public function run($argument) { @@ -101,7 +107,7 @@ public function run($argument) // Update the job $nextRun = new DateTime(); if (isset($result['nextRun']) === true) { - $nextRun = $result['nextRun']; + $nextRun = DateTime::createFromFormat('U', $result['nextRun']); } $job->setLastRun(new DateTime()); $job->setNextRun($nextRun); diff --git a/lib/Service/SynchronizationService.php b/lib/Service/SynchronizationService.php index 8b9f6b42..32134c94 100644 --- a/lib/Service/SynchronizationService.php +++ b/lib/Service/SynchronizationService.php @@ -146,7 +146,6 @@ public function synchronize(Synchronization $synchronization, ?bool $isTest = fa } if (isset($rateLimitException) === true) { - var_dump('test'); throw new TooManyRequestsHttpException( message: $rateLimitException->getMessage(), code: 429, @@ -445,17 +444,13 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is $objects = []; $source = $this->sourceMapper->find(id: $synchronization->getSourceId()); - if ($source->getRateLimitRemaining() !== null && $source->getRateLimitRemaining() <= 0) { + if ($source->getRateLimitRemaining() !== null && $source->getRateLimitReset() !== null + && $source->getRateLimitRemaining() <= 0 && $source->getRateLimitReset() > time() + ) { throw new TooManyRequestsHttpException( message: "Rate Limit on Source has been exceeded. Canceling synchronization...", code: 429, - headers: [ - 'X-RateLimit-Limit' => [(string) $source->getRateLimitLimit()], - 'X-RateLimit-Remaining' => [(string) $source->getRateLimitRemaining()], - 'X-RateLimit-Reset' => [(string) $source->getRateLimitReset()], - 'X-RateLimit-Used' => ["0"], - 'X-RateLimit-Window' => [(string) $source->getRateLimitWindow()], - ] + headers: $this->getRateLimitHeaders($source) ); } @@ -486,13 +481,7 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is throw new TooManyRequestsHttpException( message: "Stopped sync because rate limit on Source has been exceeded.", code: 429, - headers: [ - 'X-RateLimit-Limit' => [(string) $source->getRateLimitLimit()], - 'X-RateLimit-Remaining' => [(string) $source->getRateLimitRemaining()], - 'X-RateLimit-Reset' => [(string) $source->getRateLimitReset()], - 'X-RateLimit-Used' => ["1"], - 'X-RateLimit-Window' => [(string) $source->getRateLimitWindow()], - ] + headers: $this->getRateLimitHeaders($source) ); } } @@ -532,13 +521,7 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is throw new TooManyRequestsHttpException( message: "Stopped sync because rate limit on Source has been exceeded.", code: 429, - headers: [ - 'X-RateLimit-Limit' => [(string) $source->getRateLimitLimit()], - 'X-RateLimit-Remaining' => [(string) $source->getRateLimitRemaining()], - 'X-RateLimit-Reset' => [(string) $source->getRateLimitReset()], - 'X-RateLimit-Used' => ["1"], - 'X-RateLimit-Window' => [(string) $source->getRateLimitWindow()], - ] + headers: $this->getRateLimitHeaders($source) ); } } @@ -563,13 +546,7 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is throw new TooManyRequestsHttpException( message: "Stopped sync because rate limit on Source has been exceeded.", code: 429, - headers: [ - 'X-RateLimit-Limit' => [(string) $source->getRateLimitLimit()], - 'X-RateLimit-Remaining' => [(string) $source->getRateLimitRemaining()], - 'X-RateLimit-Reset' => [(string) $source->getRateLimitReset()], - 'X-RateLimit-Used' => ["1"], - 'X-RateLimit-Window' => [(string) $source->getRateLimitWindow()], - ] + headers: $this->getRateLimitHeaders($source) ); } } @@ -604,6 +581,17 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is return $objects; } + private function getRateLimitHeaders($source): array + { + return [ + 'X-RateLimit-Limit' => $source->getRateLimitLimit(), + 'X-RateLimit-Remaining' => $source->getRateLimitRemaining(), + 'X-RateLimit-Reset' => $source->getRateLimitReset(), + 'X-RateLimit-Used' => 0, + 'X-RateLimit-Window' => $source->getRateLimitWindow(), + ]; + } + /** * Determines the next API endpoint based on a provided next. * From 43f4ac7fc45952f66f9228effefdb3110b2aa2dd Mon Sep 17 00:00:00 2001 From: Wilco Louwerse Date: Tue, 10 Dec 2024 15:54:47 +0100 Subject: [PATCH 5/8] Create logs for rate limit and tested setting the job nextrun correctly --- lib/Controller/SynchronizationsController.php | 2 +- lib/Cron/ActionTask.php | 4 ++ lib/Db/SynchronizationContractLog.php | 3 + lib/Migration/Version0Date20240826193657.php | 2 +- lib/Migration/Version1Date20241210120155.php | 65 +++++++++++++++++++ lib/Service/CallService.php | 2 +- lib/Service/SynchronizationService.php | 22 ++++++- 7 files changed, 95 insertions(+), 5 deletions(-) create mode 100644 lib/Migration/Version1Date20241210120155.php diff --git a/lib/Controller/SynchronizationsController.php b/lib/Controller/SynchronizationsController.php index d2c63c2e..6ba49a31 100644 --- a/lib/Controller/SynchronizationsController.php +++ b/lib/Controller/SynchronizationsController.php @@ -265,7 +265,7 @@ public function test(int $id): JSONResponse return new JSONResponse(data: $logAndContractArray, statusCode: 200); } catch (Exception $e) { // Check if getHeaders method exists and use it if available - $headers = method_exists($e, 'getHeaders') ? $e->getHeaders() : null; + $headers = method_exists($e, 'getHeaders') ? $e->getHeaders() : []; // If synchronization fails, return an error response return new JSONResponse( diff --git a/lib/Cron/ActionTask.php b/lib/Cron/ActionTask.php index efaa66de..45e12e64 100644 --- a/lib/Cron/ActionTask.php +++ b/lib/Cron/ActionTask.php @@ -116,6 +116,10 @@ public function run($argument) $nextRun = new DateTime('now + '.$job->getInterval().' seconds'); if (isset($result['nextRun']) === true) { $nextRun = DateTime::createFromFormat('U', $result['nextRun']); + // Check if the current seconds part is not zero, and if so, round up to the next minute + if ($nextRun->format('s') !== '00') { + $nextRun->modify('next minute'); + } } $nextRun->setTime(hour: $nextRun->format('H'), minute: $nextRun->format('i')); $job->setLastRun(new DateTime()); diff --git a/lib/Db/SynchronizationContractLog.php b/lib/Db/SynchronizationContractLog.php index 39df9164..efa213ce 100644 --- a/lib/Db/SynchronizationContractLog.php +++ b/lib/Db/SynchronizationContractLog.php @@ -9,6 +9,7 @@ class SynchronizationContractLog extends Entity implements JsonSerializable { protected ?string $uuid = null; + protected ?string $message = null; protected ?string $synchronizationId = null; protected ?string $synchronizationContractId = null; protected ?array $source = []; @@ -20,6 +21,7 @@ class SynchronizationContractLog extends Entity implements JsonSerializable public function __construct() { $this->addType('uuid', 'string'); + $this->addType('message', 'string'); $this->addType('synchronizationId', 'string'); $this->addType('synchronizationContractId', 'string'); $this->addType('source', 'json'); @@ -65,6 +67,7 @@ public function jsonSerialize(): array return [ 'id' => $this->id, 'uuid' => $this->uuid, + 'message' => $this->message, 'synchronizationId' => $this->synchronizationId, 'synchronizationContractId' => $this->synchronizationContractId, 'source' => $this->source, diff --git a/lib/Migration/Version0Date20240826193657.php b/lib/Migration/Version0Date20240826193657.php index 1306d7fc..373149b5 100644 --- a/lib/Migration/Version0Date20240826193657.php +++ b/lib/Migration/Version0Date20240826193657.php @@ -283,7 +283,7 @@ public function changeSchema(IOutput $output, Closure $schemaClosure, array $opt $table->addIndex(['user_id'], 'openconnector_job_logs_user_id_index'); } - if (!$schema->hasTable('openconnector_source_contract_logs')) { + if (!$schema->hasTable('openconnector_synchronization_contract_logs')) { $table = $schema->createTable('openconnector_synchronization_contract_logs'); $table->addColumn('id', Types::BIGINT, ['autoincrement' => true, 'notnull' => true, 'length' => 20]); $table->addColumn('uuid', Types::STRING, ['notnull' => true, 'length' => 36]); diff --git a/lib/Migration/Version1Date20241210120155.php b/lib/Migration/Version1Date20241210120155.php new file mode 100644 index 00000000..bca70ed1 --- /dev/null +++ b/lib/Migration/Version1Date20241210120155.php @@ -0,0 +1,65 @@ +hasTable('openconnector_synchronization_contract_logs') === true) { + $table = $schema->getTable('openconnector_synchronization_contract_logs'); + + if ($table->hasColumn('message') === false) { + $table->addColumn('message', Types::STRING, [ + 'length' => 255, + 'notnull' => false, + ])->setDefault(null); + } + } + + return $schema; + } + + /** + * @param IOutput $output + * @param Closure(): ISchemaWrapper $schemaClosure + * @param array $options + */ + public function postSchemaChange(IOutput $output, Closure $schemaClosure, array $options): void { + } +} diff --git a/lib/Service/CallService.php b/lib/Service/CallService.php index 8db6434c..24f84186 100644 --- a/lib/Service/CallService.php +++ b/lib/Service/CallService.php @@ -254,7 +254,7 @@ public function call( $body = $response->getBody()->getContents(); - // Let create the data array + // Let's create the data array $data = [ 'request' => [ 'url' => $url, diff --git a/lib/Service/SynchronizationService.php b/lib/Service/SynchronizationService.php index fc5bff99..64b59342 100644 --- a/lib/Service/SynchronizationService.php +++ b/lib/Service/SynchronizationService.php @@ -93,7 +93,16 @@ public function __construct( public function synchronize(Synchronization $synchronization, ?bool $isTest = false): array { if (empty($synchronization->getSourceId()) === true) { - throw new Exception('sourceId of synchronziation cannot be empty. Canceling synchronization..'); + $log = [ + 'synchronizationId' => $synchronization->getId(), + 'synchronizationContractId' => 0, + 'message' => 'sourceId of synchronization cannot be empty. Canceling synchronization...', + 'created' => new DateTime(), + 'expires' => new DateTime('+1 day') + ]; + + $this->synchronizationContractLogMapper->createFromArray($log); + throw new Exception('sourceId of synchronization cannot be empty. Canceling synchronization...'); } try { @@ -152,6 +161,15 @@ public function synchronize(Synchronization $synchronization, ?bool $isTest = fa } if (isset($rateLimitException) === true) { + $log = [ + 'synchronizationId' => $synchronization->getId(), + 'synchronizationContractId' => isset($synchronizationContract) === true ? $synchronizationContract->getId() : 0, + 'message' => $rateLimitException->getMessage(), + 'created' => new DateTime(), + 'expires' => new DateTime('+1 day') + ]; + + $this->synchronizationContractLogMapper->createFromArray($log); throw new TooManyRequestsHttpException( message: $rateLimitException->getMessage(), code: 429, @@ -347,7 +365,7 @@ public function synchronizeContract(SynchronizationContract $synchronizationCont // The object has not changed and the config has not been updated since last check return $synchronizationContract; } - + // The object has changed, oke let do mappig and bla die bla $synchronizationContract->setOriginHash($originHash); $synchronizationContract->setSourceLastChanged(new DateTime()); From 6843ff49dc489a58ebef9c2af36c1b229d7a08ce Mon Sep 17 00:00:00 2001 From: Wilco Louwerse Date: Tue, 10 Dec 2024 16:16:07 +0100 Subject: [PATCH 6/8] Fix timezone for setting job next run --- lib/Action/SynchronizationAction.php | 5 +++-- lib/Cron/ActionTask.php | 19 +++++++++++++++---- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/lib/Action/SynchronizationAction.php b/lib/Action/SynchronizationAction.php index 12cc9780..1bd27db0 100644 --- a/lib/Action/SynchronizationAction.php +++ b/lib/Action/SynchronizationAction.php @@ -72,11 +72,12 @@ public function run(array $argument = []): array try { $objects = $this->synchronizationService->synchronize($synchronization); } catch (TooManyRequestsHttpException $e) { + $response['level'] = 'WARNING'; + $response['stackTrace'][] = $response['message'] = 'Stopped synchronization: ' . $e->getMessage(); if (isset($e->getHeaders()['X-RateLimit-Reset']) === true) { $response['nextRun'] = $e->getHeaders()['X-RateLimit-Reset']; + $response['stackTrace'][] = $response['message'] = 'Returning X-RateLimit-Reset header to update Job nextRun: ' . $response['nextRun']; } - $response['level'] = 'WARNING'; - $response['stackTrace'][] = $response['message'] = 'Stopped synchronization: ' . $e->getMessage(); return $response; } catch (Exception $e) { $response['level'] = 'ERROR'; diff --git a/lib/Cron/ActionTask.php b/lib/Cron/ActionTask.php index 45e12e64..8c515067 100644 --- a/lib/Cron/ActionTask.php +++ b/lib/Cron/ActionTask.php @@ -86,10 +86,22 @@ public function run($argument) // if the next run is in the the future, we don't need to do anything if ($job->getNextRun() !== null && $job->getNextRun() > new DateTime()) { - return; + $jobLog = $this->jobLogMapper->createFromArray([ + 'level' => 'WARNING', + 'message' => 'Next Run is still in the future for this job', + 'jobId' => $job->getId(), + 'jobClass' => $job->getJobClass(), + 'jobListId' => $job->getJobListId(), + 'arguments' => $job->getArguments(), + 'lastRun' => $job->getLastRun(), + 'nextRun' => $job->getNextRun(), + 'executionTime' => 0 + ]); + + return $jobLog; } - if(empty($job->getUserId()) === false && $this->userSession->getUser() === null) { + if (empty($job->getUserId()) === false && $this->userSession->getUser() === null) { $user = $this->userManager->get($job->getUserId()); $this->userSession->setUser($user); } @@ -111,11 +123,10 @@ public function run($argument) $job->setIsEnabled(false); } - // Update the job $nextRun = new DateTime('now + '.$job->getInterval().' seconds'); if (isset($result['nextRun']) === true) { - $nextRun = DateTime::createFromFormat('U', $result['nextRun']); + $nextRun = DateTime::createFromFormat('U', $result['nextRun'], $nextRun->getTimezone()); // Check if the current seconds part is not zero, and if so, round up to the next minute if ($nextRun->format('s') !== '00') { $nextRun->modify('next minute'); From 71e938eb31c0ca32d03c1954450b328e28b72e45 Mon Sep 17 00:00:00 2001 From: Wilco Louwerse Date: Tue, 10 Dec 2024 16:29:13 +0100 Subject: [PATCH 7/8] Fix migrations & docblocks --- lib/Migration/Version1Date20241121160300.php | 11 ----------- lib/Migration/Version1Date20241210120155.php | 16 +++++++++++++++- lib/Service/SynchronizationService.php | 18 +++++++++++++++++- 3 files changed, 32 insertions(+), 13 deletions(-) diff --git a/lib/Migration/Version1Date20241121160300.php b/lib/Migration/Version1Date20241121160300.php index 66ae4e42..8ea60305 100644 --- a/lib/Migration/Version1Date20241121160300.php +++ b/lib/Migration/Version1Date20241121160300.php @@ -19,7 +19,6 @@ /** * This migration changes the following: * - Adding 4 new columns for the table Source: rateLimitLimit, rateLimitRemaining, rateLimitReset & rateLimitWindow - * - Adding 1 new column for the table Synchronization: CurrentPage */ class Version1Date20241121160300 extends SimpleMigrationStep { @@ -73,16 +72,6 @@ public function changeSchema(IOutput $output, Closure $schemaClosure, array $opt ]); } - // Synchronizations table - $table = $schema->getTable('openconnector_synchronizations'); - - if ($table->hasColumn('current_page') === false) { - $table->addColumn('current_page', Types::INTEGER, [ - 'notnull' => false, - 'default' => 1 - ]); - } - return $schema; } diff --git a/lib/Migration/Version1Date20241210120155.php b/lib/Migration/Version1Date20241210120155.php index bca70ed1..f4beb3be 100644 --- a/lib/Migration/Version1Date20241210120155.php +++ b/lib/Migration/Version1Date20241210120155.php @@ -17,7 +17,8 @@ /** * This migration changes the following: - * - Add 1 new column for the table SynchronizationContractLogs: message + * - Adding 1 new column for the table Synchronization: currentPage + * - Adding 1 new column for the table SynchronizationContractLogs: message */ class Version1Date20241210120155 extends SimpleMigrationStep { @@ -41,6 +42,19 @@ public function changeSchema(IOutput $output, Closure $schemaClosure, array $opt */ $schema = $schemaClosure(); + // Synchronizations table + if ($schema->hasTable('openconnector_synchronizations') === true) { + $table = $schema->getTable('openconnector_synchronizations'); + + if ($table->hasColumn('current_page') === false) { + $table->addColumn('current_page', Types::INTEGER, [ + 'notnull' => false, + 'default' => 1 + ]); + } + } + + // SynchronizationContractLogs table if ($schema->hasTable('openconnector_synchronization_contract_logs') === true) { $table = $schema->getTable('openconnector_synchronization_contract_logs'); diff --git a/lib/Service/SynchronizationService.php b/lib/Service/SynchronizationService.php index 64b59342..6e67abd3 100644 --- a/lib/Service/SynchronizationService.php +++ b/lib/Service/SynchronizationService.php @@ -681,7 +681,23 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is return $objects; } - private function getRateLimitHeaders($source): array + /** + * Retrieves rate limit information from a given source and formats it as HTTP headers. + * + * This function extracts rate limit details from the provided source object and returns them + * as an associative array of headers. The headers can be used for communicating rate limit status + * in API responses or logging purposes. + * + * @param Source $source The source object containing rate limit details, such as limits, remaining requests, and reset times. + * + * @return array An associative array of rate limit headers: + * - 'X-RateLimit-Limit' (int|null): The maximum number of allowed requests. + * - 'X-RateLimit-Remaining' (int|null): The number of requests remaining in the current window. + * - 'X-RateLimit-Reset' (int|null): The Unix timestamp when the rate limit resets. + * - 'X-RateLimit-Used' (int|null): The number of requests used so far. + * - 'X-RateLimit-Window' (int|null): The duration of the rate limit window in seconds. + */ + private function getRateLimitHeaders(Source $source): array { return [ 'X-RateLimit-Limit' => $source->getRateLimitLimit(), From 07565d73128ac5579d4fc3ea50deca0b6ed82545 Mon Sep 17 00:00:00 2001 From: Wilco Louwerse Date: Tue, 10 Dec 2024 17:39:08 +0100 Subject: [PATCH 8/8] Small fix for syncaction stacktrace --- lib/Action/SynchronizationAction.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Action/SynchronizationAction.php b/lib/Action/SynchronizationAction.php index 1bd27db0..347b1193 100644 --- a/lib/Action/SynchronizationAction.php +++ b/lib/Action/SynchronizationAction.php @@ -76,7 +76,7 @@ public function run(array $argument = []): array $response['stackTrace'][] = $response['message'] = 'Stopped synchronization: ' . $e->getMessage(); if (isset($e->getHeaders()['X-RateLimit-Reset']) === true) { $response['nextRun'] = $e->getHeaders()['X-RateLimit-Reset']; - $response['stackTrace'][] = $response['message'] = 'Returning X-RateLimit-Reset header to update Job nextRun: ' . $response['nextRun']; + $response['stackTrace'][] = 'Returning X-RateLimit-Reset header to update Job nextRun: ' . $response['nextRun']; } return $response; } catch (Exception $e) {