diff --git a/lib/Action/SynchronizationAction.php b/lib/Action/SynchronizationAction.php index 396e2627..347b1193 100644 --- a/lib/Action/SynchronizationAction.php +++ b/lib/Action/SynchronizationAction.php @@ -6,6 +6,7 @@ use OCA\OpenConnector\Service\SynchronizationService; use OCA\OpenConnector\Db\SynchronizationMapper; use OCA\OpenConnector\Db\SynchronizationContractMapper; +use Symfony\Component\HttpKernel\Exception\TooManyRequestsHttpException; /** * This action handles the synchronization of data from the source to the target. @@ -70,7 +71,15 @@ public function run(array $argument = []): array $response['stackTrace'][] = 'Doing the synchronization'; try { $objects = $this->synchronizationService->synchronize($synchronization); - } catch (Exception $e) { + } catch (TooManyRequestsHttpException $e) { + $response['level'] = 'WARNING'; + $response['stackTrace'][] = $response['message'] = 'Stopped synchronization: ' . $e->getMessage(); + if (isset($e->getHeaders()['X-RateLimit-Reset']) === true) { + $response['nextRun'] = $e->getHeaders()['X-RateLimit-Reset']; + $response['stackTrace'][] = 'Returning X-RateLimit-Reset header to update Job nextRun: ' . $response['nextRun']; + } + return $response; + } catch (Exception $e) { $response['level'] = 'ERROR'; $response['stackTrace'][] = $response['message'] = 'Failed to synchronize: ' . $e->getMessage(); return $response; diff --git a/lib/Controller/SynchronizationsController.php b/lib/Controller/SynchronizationsController.php index 9ceb1e9b..6ba49a31 100644 --- a/lib/Controller/SynchronizationsController.php +++ b/lib/Controller/SynchronizationsController.php @@ -2,6 +2,7 @@ namespace OCA\OpenConnector\Controller; +use GuzzleHttp\Exception\GuzzleException; use OCA\OpenConnector\Service\ObjectService; use OCA\OpenConnector\Service\SearchService; use OCA\OpenConnector\Service\SynchronizationService; @@ -15,6 +16,8 @@ use OCP\IRequest; use Exception; use OCP\AppFramework\Db\DoesNotExistException; +use Psr\Container\ContainerExceptionInterface; +use Psr\Container\NotFoundExceptionInterface; class SynchronizationsController extends Controller { @@ -213,33 +216,36 @@ public function logs(int $id): JSONResponse } } - /** - * Tests a synchronization - * - * This method tests a synchronization without persisting anything to the database. - * - * @NoAdminRequired - * @NoCSRFRequired - * - * @param int $id The ID of the synchronization - * - * @return JSONResponse A JSON response containing the test results - * - * @example - * Request: - * empty POST - * - * Response: - * { - * "resultObject": { - * "fullName": "John Doe", - * "userAge": 30, - * "contactEmail": "john@example.com" - * }, - * "isValid": true, - * "validationErrors": [] - * } - */ + /** + * Tests a synchronization + * + * This method tests a synchronization without persisting anything to the database. + * + * @NoAdminRequired + * @NoCSRFRequired + * + * @param int $id The ID of the synchronization + * + * @return JSONResponse A JSON response containing the test results + * @throws GuzzleException + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * + * @example + * Request: + * empty POST + * + * Response: + * { + * "resultObject": { + * "fullName": "John Doe", + * "userAge": 30, + * "contactEmail": "john@example.com" + * }, + * "isValid": true, + * "validationErrors": [] + * } + */ public function test(int $id): JSONResponse { try { @@ -250,17 +256,26 @@ public function test(int $id): JSONResponse // Try to synchronize try { - $logAndContractArray = $this->synchronizationService->synchronize(synchronization: $synchronization, isTest: true); + $logAndContractArray = $this->synchronizationService->synchronize( + synchronization: $synchronization, + isTest: true + ); + // Return the result as a JSON response return new JSONResponse(data: $logAndContractArray, statusCode: 200); } catch (Exception $e) { - // If synchronizaiton fails, return an error response - return new JSONResponse([ - 'error' => 'Synchronization error', - 'message' => $e->getMessage() - ], 400); - } + // Check if getHeaders method exists and use it if available + $headers = method_exists($e, 'getHeaders') ? $e->getHeaders() : []; - return new JSONResponse($resultFromTest, 200); + // If synchronization fails, return an error response + return new JSONResponse( + data: [ + 'error' => 'Synchronization error', + 'message' => $e->getMessage() + ], + statusCode: $e->getCode() ?? 400, + headers: $headers + ); + } } -} \ No newline at end of file +} diff --git a/lib/Cron/ActionTask.php b/lib/Cron/ActionTask.php index 0ad5a346..8c515067 100644 --- a/lib/Cron/ActionTask.php +++ b/lib/Cron/ActionTask.php @@ -10,7 +10,9 @@ use OCP\BackgroundJob\IJobList; use OCP\IUserManager; use OCP\IUserSession; +use Psr\Container\ContainerExceptionInterface; use Psr\Container\ContainerInterface; +use Psr\Container\NotFoundExceptionInterface; use Symfony\Component\Uid\Uuid; use DateInterval; use DateTime; @@ -59,6 +61,9 @@ public function __construct( * @param $argument * * @return JobLog|void + * @throws \OCP\DB\Exception + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface */ public function run($argument) { @@ -81,10 +86,22 @@ public function run($argument) // if the next run is in the the future, we don't need to do anything if ($job->getNextRun() !== null && $job->getNextRun() > new DateTime()) { - return; + $jobLog = $this->jobLogMapper->createFromArray([ + 'level' => 'WARNING', + 'message' => 'Next Run is still in the future for this job', + 'jobId' => $job->getId(), + 'jobClass' => $job->getJobClass(), + 'jobListId' => $job->getJobListId(), + 'arguments' => $job->getArguments(), + 'lastRun' => $job->getLastRun(), + 'nextRun' => $job->getNextRun(), + 'executionTime' => 0 + ]); + + return $jobLog; } - if(empty($job->getUserId()) === false && $this->userSession->getUser() === null) { + if (empty($job->getUserId()) === false && $this->userSession->getUser() === null) { $user = $this->userManager->get($job->getUserId()); $this->userSession->setUser($user); } @@ -106,11 +123,17 @@ public function run($argument) $job->setIsEnabled(false); } - // Update the job - $job->setLastRun(new DateTime()); $nextRun = new DateTime('now + '.$job->getInterval().' seconds'); + if (isset($result['nextRun']) === true) { + $nextRun = DateTime::createFromFormat('U', $result['nextRun'], $nextRun->getTimezone()); + // Check if the current seconds part is not zero, and if so, round up to the next minute + if ($nextRun->format('s') !== '00') { + $nextRun->modify('next minute'); + } + } $nextRun->setTime(hour: $nextRun->format('H'), minute: $nextRun->format('i')); + $job->setLastRun(new DateTime()); $job->setNextRun($nextRun); $this->jobMapper->update($job); diff --git a/lib/Db/Synchronization.php b/lib/Db/Synchronization.php index 473524b0..9cc466d9 100644 --- a/lib/Db/Synchronization.php +++ b/lib/Db/Synchronization.php @@ -21,6 +21,7 @@ class Synchronization extends Entity implements JsonSerializable protected ?DateTime $sourceLastChanged = null; // The last changed date of the source object protected ?DateTime $sourceLastChecked = null; // The last checked date of the source object protected ?DateTime $sourceLastSynced = null; // The last synced date of the source object + protected ?int $currentPage = 1; // The last page synced. Used for keeping track where to continue syncing after Rate Limit has been exceeded on source with pagination. // Target protected ?string $targetId = null; // The id of the target object protected ?string $targetType = null; // The type of the target object (e.g. api, database, register/schema.) @@ -51,6 +52,7 @@ public function __construct() { $this->addType('sourceLastChanged', 'datetime'); $this->addType('sourceLastChecked', 'datetime'); $this->addType('sourceLastSynced', 'datetime'); + $this->addType('currentPage', 'integer'); $this->addType('targetId', 'string'); $this->addType('targetType', 'string'); $this->addType('targetHash', 'string'); @@ -126,6 +128,7 @@ public function jsonSerialize(): array 'sourceLastChanged' => isset($this->sourceLastChanged) === true ? $this->sourceLastChanged->format('c') : null, 'sourceLastChecked' => isset($this->sourceLastChecked) === true ? $this->sourceLastChecked->format('c') : null, 'sourceLastSynced' => isset($this->sourceLastSynced) === true ? $this->sourceLastSynced->format('c') : null, + 'currentPage' => $this->currentPage, 'targetId' => $this->targetId, 'targetType' => $this->targetType, 'targetHash' => $this->targetHash, diff --git a/lib/Db/SynchronizationContractLog.php b/lib/Db/SynchronizationContractLog.php index 39df9164..efa213ce 100644 --- a/lib/Db/SynchronizationContractLog.php +++ b/lib/Db/SynchronizationContractLog.php @@ -9,6 +9,7 @@ class SynchronizationContractLog extends Entity implements JsonSerializable { protected ?string $uuid = null; + protected ?string $message = null; protected ?string $synchronizationId = null; protected ?string $synchronizationContractId = null; protected ?array $source = []; @@ -20,6 +21,7 @@ class SynchronizationContractLog extends Entity implements JsonSerializable public function __construct() { $this->addType('uuid', 'string'); + $this->addType('message', 'string'); $this->addType('synchronizationId', 'string'); $this->addType('synchronizationContractId', 'string'); $this->addType('source', 'json'); @@ -65,6 +67,7 @@ public function jsonSerialize(): array return [ 'id' => $this->id, 'uuid' => $this->uuid, + 'message' => $this->message, 'synchronizationId' => $this->synchronizationId, 'synchronizationContractId' => $this->synchronizationContractId, 'source' => $this->source, diff --git a/lib/Migration/Version0Date20240826193657.php b/lib/Migration/Version0Date20240826193657.php index 1306d7fc..373149b5 100644 --- a/lib/Migration/Version0Date20240826193657.php +++ b/lib/Migration/Version0Date20240826193657.php @@ -283,7 +283,7 @@ public function changeSchema(IOutput $output, Closure $schemaClosure, array $opt $table->addIndex(['user_id'], 'openconnector_job_logs_user_id_index'); } - if (!$schema->hasTable('openconnector_source_contract_logs')) { + if (!$schema->hasTable('openconnector_synchronization_contract_logs')) { $table = $schema->createTable('openconnector_synchronization_contract_logs'); $table->addColumn('id', Types::BIGINT, ['autoincrement' => true, 'notnull' => true, 'length' => 20]); $table->addColumn('uuid', Types::STRING, ['notnull' => true, 'length' => 36]); diff --git a/lib/Migration/Version1Date20241121160300.php b/lib/Migration/Version1Date20241121160300.php index daef153b..8ea60305 100644 --- a/lib/Migration/Version1Date20241121160300.php +++ b/lib/Migration/Version1Date20241121160300.php @@ -18,7 +18,7 @@ /** * This migration changes the following: - * - Adding 3 new columns for the table Source: rateLimitLimit, rateLimitRemaining & rateLimitReset + * - Adding 4 new columns for the table Source: rateLimitLimit, rateLimitRemaining, rateLimitReset & rateLimitWindow */ class Version1Date20241121160300 extends SimpleMigrationStep { @@ -41,6 +41,7 @@ public function changeSchema(IOutput $output, Closure $schemaClosure, array $opt * @var ISchemaWrapper $schema */ $schema = $schemaClosure(); + // Sources table $table = $schema->getTable('openconnector_sources'); if ($table->hasColumn('rate_limit_limit') === false) { diff --git a/lib/Migration/Version1Date20241210120155.php b/lib/Migration/Version1Date20241210120155.php new file mode 100644 index 00000000..f4beb3be --- /dev/null +++ b/lib/Migration/Version1Date20241210120155.php @@ -0,0 +1,79 @@ +hasTable('openconnector_synchronizations') === true) { + $table = $schema->getTable('openconnector_synchronizations'); + + if ($table->hasColumn('current_page') === false) { + $table->addColumn('current_page', Types::INTEGER, [ + 'notnull' => false, + 'default' => 1 + ]); + } + } + + // SynchronizationContractLogs table + if ($schema->hasTable('openconnector_synchronization_contract_logs') === true) { + $table = $schema->getTable('openconnector_synchronization_contract_logs'); + + if ($table->hasColumn('message') === false) { + $table->addColumn('message', Types::STRING, [ + 'length' => 255, + 'notnull' => false, + ])->setDefault(null); + } + } + + return $schema; + } + + /** + * @param IOutput $output + * @param Closure(): ISchemaWrapper $schemaClosure + * @param array $options + */ + public function postSchemaChange(IOutput $output, Closure $schemaClosure, array $options): void { + } +} diff --git a/lib/Service/CallService.php b/lib/Service/CallService.php index 8db6434c..24f84186 100644 --- a/lib/Service/CallService.php +++ b/lib/Service/CallService.php @@ -254,7 +254,7 @@ public function call( $body = $response->getBody()->getContents(); - // Let create the data array + // Let's create the data array $data = [ 'request' => [ 'url' => $url, diff --git a/lib/Service/SynchronizationService.php b/lib/Service/SynchronizationService.php index 3f51accd..6e67abd3 100644 --- a/lib/Service/SynchronizationService.php +++ b/lib/Service/SynchronizationService.php @@ -21,6 +21,7 @@ use OCP\AppFramework\Db\MultipleObjectsReturnedException; use Psr\Container\ContainerExceptionInterface; use Psr\Container\NotFoundExceptionInterface; +use Symfony\Component\HttpKernel\Exception\TooManyRequestsHttpException; use Symfony\Component\Uid\Uuid; use OCP\AppFramework\Db\DoesNotExistException; use Adbar\Dot; @@ -92,10 +93,23 @@ public function __construct( public function synchronize(Synchronization $synchronization, ?bool $isTest = false): array { if (empty($synchronization->getSourceId()) === true) { - throw new Exception('sourceId of synchronziation cannot be empty. Canceling synchronization..'); + $log = [ + 'synchronizationId' => $synchronization->getId(), + 'synchronizationContractId' => 0, + 'message' => 'sourceId of synchronization cannot be empty. Canceling synchronization...', + 'created' => new DateTime(), + 'expires' => new DateTime('+1 day') + ]; + + $this->synchronizationContractLogMapper->createFromArray($log); + throw new Exception('sourceId of synchronization cannot be empty. Canceling synchronization...'); } - $objectList = $this->getAllObjectsFromSource(synchronization: $synchronization, isTest: $isTest); + try { + $objectList = $this->getAllObjectsFromSource(synchronization: $synchronization, isTest: $isTest); + } catch (TooManyRequestsHttpException $e) { + $rateLimitException = $e; + } foreach ($objectList as $key => $object) { // If the source configuration contains a dot notation for the id position, we need to extract the id from the source object @@ -146,6 +160,23 @@ public function synchronize(Synchronization $synchronization, ?bool $isTest = fa $this->synchronize($followUpSynchronization, $isTest); } + if (isset($rateLimitException) === true) { + $log = [ + 'synchronizationId' => $synchronization->getId(), + 'synchronizationContractId' => isset($synchronizationContract) === true ? $synchronizationContract->getId() : 0, + 'message' => $rateLimitException->getMessage(), + 'created' => new DateTime(), + 'expires' => new DateTime('+1 day') + ]; + + $this->synchronizationContractLogMapper->createFromArray($log); + throw new TooManyRequestsHttpException( + message: $rateLimitException->getMessage(), + code: 429, + headers: $rateLimitException->getHeaders() + ); + } + return $objectList; } @@ -334,7 +365,7 @@ public function synchronizeContract(SynchronizationContract $synchronizationCont // The object has not changed and the config has not been updated since last check return $synchronizationContract; } - + // The object has changed, oke let do mappig and bla die bla $synchronizationContract->setOriginHash($originHash); $synchronizationContract->setSourceLastChanged(new DateTime()); @@ -499,6 +530,7 @@ public function getAllObjectsFromSource(Synchronization $synchronization, ?bool /** * Retrieves all objects from an API source for a given synchronization. + * @todo re-write this entire function so that it is recursive. * * @param Synchronization $synchronization The synchronization object containing source information. * @param bool $isTest If we only want to return a single object (for example a test) @@ -512,7 +544,17 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is $objects = []; $source = $this->sourceMapper->find(id: $synchronization->getSourceId()); - // Lets get the source config + if ($source->getRateLimitRemaining() !== null && $source->getRateLimitReset() !== null + && $source->getRateLimitRemaining() <= 0 && $source->getRateLimitReset() > time() + ) { + throw new TooManyRequestsHttpException( + message: "Rate Limit on Source has been exceeded. Canceling synchronization...", + code: 429, + headers: $this->getRateLimitHeaders($source) + ); + } + + // Let's get the source config $sourceConfig = $this->callService->applyConfigDot($synchronization->getSourceConfig()); $endpoint = $sourceConfig['endpoint'] ?? ''; $headers = $sourceConfig['headers'] ?? []; @@ -522,13 +564,32 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is 'query' => $query, ]; + // Get CurrentPage from Synchronization. + $currentPage = $synchronization->getCurrentPage() ?? 1; + // Current Page could be higher than 1 if we continue after rate limit had been reached previously. + if ($currentPage > 1) { + $config = $this->getNextPage(config: $config, sourceConfig: $sourceConfig, currentPage: $currentPage); + } + // Make the initial API call // @TODO: method is now fixed to GET, but could end up in configuration. - $response = $this->callService->call(source: $source, endpoint: $endpoint, method: 'GET', config: $config)->getResponse(); + $callLog = $this->callService->call(source: $source, endpoint: $endpoint, method: 'GET', config: $config); + $response = $callLog->getResponse(); + if ($response === null) { + // @todo + if ($callLog->getStatusCode() === 429) { + throw new TooManyRequestsHttpException( + message: "Stopped sync because rate limit on Source has been exceeded.", + code: 429, + headers: $this->getRateLimitHeaders($source) + ); + } + } + $lastHash = md5($response['body']); $body = json_decode($response['body'], true); if (empty($body) === true) { - // @todo log that we got a empty response + // @todo log that we got an empty response return []; } $objects = array_merge($objects, $this->getAllObjectsFromArray(array: $body, synchronization: $synchronization)); @@ -538,29 +599,61 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is return [$objects[0]] ?? []; } - // Current page is 2 because the first call made above is page 1. - $currentPage = 2; + // Increase Current page because of the first call made above. + $currentPage++; + // Update CurrentPage on Synchronization + $synchronization->setCurrentPage($currentPage); + $this->synchronizationMapper->update($synchronization); + $useNextEndpoint = false; if (array_key_exists('next', $body)) { $useNextEndpoint = true; } - // Continue making API calls if there are more pages from 'next' the response body or if paginationQuery is set - while($useNextEndpoint === true && $nextEndpoint = $this->getNextEndpoint(body: $body, url: $source->getLocation(), sourceConfig: $sourceConfig, currentPage: $currentPage)) { - // Do not pass $config here becuase it overwrites the query attached to nextEndpoint - $response = $this->callService->call(source: $source, endpoint: $nextEndpoint)->getResponse(); + while ($useNextEndpoint === true && $nextEndpoint = $this->getNextEndpoint(body: $body, url: $source->getLocation())) { + // Do not pass $config here because it overwrites the query attached to nextEndpoint + $callLog = $this->callService->call(source: $source, endpoint: $nextEndpoint); + $response = $callLog->getResponse(); + if ($response === null) { + // @todo + if ($callLog->getStatusCode() === 429) { + throw new TooManyRequestsHttpException( + message: "Stopped sync because rate limit on Source has been exceeded.", + code: 429, + headers: $this->getRateLimitHeaders($source) + ); + } + } + $body = json_decode($response['body'], true); $objects = array_merge($objects, $this->getAllObjectsFromArray($body, $synchronization)); + + // Increase Current page. And update CurrentPage on Synchronization. + $currentPage++; + $synchronization->setCurrentPage($currentPage); + $this->synchronizationMapper->update($synchronization); } if ($useNextEndpoint === false && $synchronization->usesPagination() === true) { do { $config = $this->getNextPage(config: $config, sourceConfig: $sourceConfig, currentPage: $currentPage); - $response = $this->callService->call(source: $source, endpoint: $endpoint, method: 'GET', config: $config)->getResponse(); + $callLog = $this->callService->call(source: $source, endpoint: $endpoint, method: 'GET', config: $config); + $response = $callLog->getResponse(); + if ($response === null) { + // @todo + if ($callLog->getStatusCode() === 429) { + throw new TooManyRequestsHttpException( + message: "Stopped sync because rate limit on Source has been exceeded.", + code: 429, + headers: $this->getRateLimitHeaders($source) + ); + } + } + $hash = md5($response['body']); - if($hash === $lastHash) { + if ($hash === $lastHash) { break; } @@ -573,24 +666,57 @@ public function getAllObjectsFromApi(Synchronization $synchronization, ?bool $is $newObjects = $this->getAllObjectsFromArray(array: $body, synchronization: $synchronization); $objects = array_merge($objects, $newObjects); + + // Increase Current page. And update CurrentPage on Synchronization. $currentPage++; + $synchronization->setCurrentPage($currentPage); + $this->synchronizationMapper->update($synchronization); } while (empty($newObjects) === false); } + // Reset Current Page to 1 when we are done with all pages. + $synchronization->setCurrentPage(1); + $this->synchronizationMapper->update($synchronization); + return $objects; } + /** + * Retrieves rate limit information from a given source and formats it as HTTP headers. + * + * This function extracts rate limit details from the provided source object and returns them + * as an associative array of headers. The headers can be used for communicating rate limit status + * in API responses or logging purposes. + * + * @param Source $source The source object containing rate limit details, such as limits, remaining requests, and reset times. + * + * @return array An associative array of rate limit headers: + * - 'X-RateLimit-Limit' (int|null): The maximum number of allowed requests. + * - 'X-RateLimit-Remaining' (int|null): The number of requests remaining in the current window. + * - 'X-RateLimit-Reset' (int|null): The Unix timestamp when the rate limit resets. + * - 'X-RateLimit-Used' (int|null): The number of requests used so far. + * - 'X-RateLimit-Window' (int|null): The duration of the rate limit window in seconds. + */ + private function getRateLimitHeaders(Source $source): array + { + return [ + 'X-RateLimit-Limit' => $source->getRateLimitLimit(), + 'X-RateLimit-Remaining' => $source->getRateLimitRemaining(), + 'X-RateLimit-Reset' => $source->getRateLimitReset(), + 'X-RateLimit-Used' => 0, + 'X-RateLimit-Window' => $source->getRateLimitWindow(), + ]; + } + /** * Determines the next API endpoint based on a provided next. * * @param array $body * @param string $url - * @param array $sourceConfig - * @param int $currentPage * * @return string|null The next endpoint URL if a next link or pagination query is available, or null if neither exists. */ - private function getNextEndpoint(array $body, string $url, array $sourceConfig, int $currentPage): ?string + private function getNextEndpoint(array $body, string $url): ?string { $nextLink = $this->getNextlinkFromCall($body);