diff --git a/composer.json b/composer.json index dd4520ae..85529f54 100644 --- a/composer.json +++ b/composer.json @@ -14,9 +14,12 @@ ], "require": { "php": "^7.3 || ^8.0", - "ext-json": ">=1.3.7", "ext-curl": "*", - "ezimuel/ringphp": "^1.1.2", + "ext-json": ">=1.3.7", + "guzzlehttp/guzzle": "^7.9", + "guzzlehttp/promises": "^2.0", + "psr/http-client": "^1.0", + "psr/http-message": "^1.1 || ^2.0", "psr/log": "^1|^2|^3", "symfony/yaml": "*" }, diff --git a/src/OpenSearch/ClientBuilder.php b/src/OpenSearch/ClientBuilder.php index 20c6b7af..b0e28f13 100644 --- a/src/OpenSearch/ClientBuilder.php +++ b/src/OpenSearch/ClientBuilder.php @@ -38,12 +38,14 @@ use OpenSearch\Namespaces\NamespaceBuilderInterface; use OpenSearch\Serializers\SerializerInterface; use OpenSearch\Serializers\SmartSerializer; -use GuzzleHttp\Ring\Client\CurlHandler; -use GuzzleHttp\Ring\Client\CurlMultiHandler; -use GuzzleHttp\Ring\Client\Middleware; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use ReflectionClass; +use GuzzleHttp\Handler\CurlHandler; +use GuzzleHttp\Handler\CurlMultiHandler; +use GuzzleHttp\Handler\Proxy; +use Psr\Http\Message\RequestInterface; +use GuzzleHttp\Utils; class ClientBuilder { @@ -239,30 +241,30 @@ public static function fromConfig(array $config, bool $quiet = false): Client return $builder->build(); } - /** - * Get the default handler - * - * @param array $multiParams - * @param array $singleParams - * @throws \RuntimeException - */ - public static function defaultHandler(array $multiParams = [], array $singleParams = []): callable - { - $future = null; - if (extension_loaded('curl')) { - $config = array_merge([ 'mh' => curl_multi_init() ], $multiParams); - if (function_exists('curl_reset')) { - $default = new CurlHandler($singleParams); - $future = new CurlMultiHandler($config); - } else { - $default = new CurlMultiHandler($config); - } - } else { - throw new \RuntimeException('OpenSearch-PHP requires cURL, or a custom HTTP handler.'); - } - - return $future ? Middleware::wrapFuture($default, $future) : $default; - } + // /** + // * Get the default handler + // * + // * @param array $multiParams + // * @param array $singleParams + // * @throws \RuntimeException + // */ + // public static function defaultHandler(array $multiParams = [], array $singleParams = []): callable + // { + // $future = null; + // if (extension_loaded('curl')) { + // $config = array_merge([ 'mh' => curl_multi_init() ], $multiParams); + // if (function_exists('curl_reset')) { + // $default = new CurlHandler($singleParams); + // $future = new CurlMultiHandler($config); + // } else { + // $default = new CurlMultiHandler($config); + // } + // } else { + // throw new \RuntimeException('OpenSearch-PHP requires cURL, or a custom HTTP handler.'); + // } + + // return $future ? Proxy::wrapSync($default, $future) : $default; + // } /** * Get the multi handler for async (CurlMultiHandler) @@ -583,7 +585,7 @@ public function build(): Client $this->buildLoggers(); if (is_null($this->handler)) { - $this->handler = ClientBuilder::defaultHandler(); + $this->handler = Utils::chooseHandler(); // ClientBuilder::defaultHandler(); } if (!is_null($this->sigV4CredentialProvider)) { @@ -610,18 +612,21 @@ public function build(): Client } if (!is_null($sslOptions)) { - $sslHandler = function (callable $handler, array $sslOptions) { - return function (array $request) use ($handler, $sslOptions) { - // Add our custom headers - foreach ($sslOptions as $key => $value) { - $request['client'][$key] = $value; - } - - // Send the request using the handler and return the response. - return $handler($request); - }; - }; - $this->handler = $sslHandler($this->handler, $sslOptions); + // $sslHandler = function (callable $handler, array $sslOptions) { + // return function (RequestInterface $request, array $options) use ($handler, $sslOptions) { + // // Add our custom headers + // foreach ($sslOptions as $key => $value) { + // $request['client'][$key] = $value; + // } + + // // Send the request using the handler and return the response. + // return $handler($request); + // }; + // }; + // $this->handler = $sslHandler($this->handler, $sslOptions); + foreach ($sslOptions as $key => $value) { + $this->connectionParams['client'][$key] = $value; + } } if (is_null($this->serializer)) { diff --git a/src/OpenSearch/Connections/Connection.php b/src/OpenSearch/Connections/Connection.php index 3a67e930..e82b794a 100644 --- a/src/OpenSearch/Connections/Connection.php +++ b/src/OpenSearch/Connections/Connection.php @@ -42,10 +42,9 @@ use OpenSearch\Serializers\SerializerInterface; use OpenSearch\Transport; use Exception; -use GuzzleHttp\Ring\Core; -use GuzzleHttp\Ring\Exception\ConnectException; -use GuzzleHttp\Ring\Exception\RingException; use Psr\Log\LoggerInterface; +use GuzzleHttp\Psr7\Request; +use Psr\Http\Message\RequestInterface; class Connection implements ConnectionInterface { @@ -211,31 +210,34 @@ public function performRequest(string $method, string $uri, ?array $params = [], $host = $this->host; if (isset($this->connectionParams['client']['port_in_header']) && $this->connectionParams['client']['port_in_header']) { - $host .= ':' . $this->port; + $host .= ':' . $this->port; } - $request = [ + $request_options = [ 'http_method' => $method, 'scheme' => $this->transportSchema, + 'address' => $this->transportSchema . '://' . $host . ':' . $this->port, 'uri' => $this->getURI($uri, $params), 'body' => $body, 'headers' => array_merge( [ - 'Host' => [$host] + 'Host' => [$host] ], $headers ) ]; - $request = array_replace_recursive($request, $this->connectionParams, $options); + $request_options = array_replace_recursive($request_options, $this->connectionParams, $options); - // RingPHP does not like if client is empty - if (empty($request['client'])) { - unset($request['client']); - } + $request = new Request( + $request_options['http_method'], + $request_options['address'] . $request_options['uri'], + $request_options['headers'], + $request_options['body'] + ); $handler = $this->handler; - $future = $handler($request, $this, $transport, $options); + $future = $handler($request, $this, $transport, $request_options['client']); return $future; } @@ -252,91 +254,88 @@ public function getLastRequestInfo(): array private function wrapHandler(callable $handler): callable { - return function (array $request, Connection $connection, Transport $transport = null, $options) use ($handler) { + return function (RequestInterface $request, Connection $connection, Transport $transport = null, $options) use ($handler) { $this->lastRequest = []; $this->lastRequest['request'] = $request; - // Send the request using the wrapped handler. - $response = Core::proxy( - $handler($request), - function ($response) use ($connection, $transport, $request, $options) { + $response = $handler($request, $options)->then( + function ($response) { $this->lastRequest['response'] = $response; - if (isset($response['error']) === true) { - if ($response['error'] instanceof ConnectException || $response['error'] instanceof RingException) { - $this->log->warning("Curl exception encountered."); - - $exception = $this->getCurlRetryException($request, $response); - - $this->logRequestFail($request, $response, $exception); - - $node = $connection->getHost(); - $this->log->warning("Marking node $node dead."); - $connection->markDead(); - - // If the transport has not been set, we are inside a Ping or Sniff, - // so we don't want to retrigger retries anyway. - // - // TODO this could be handled better, but we are limited because connectionpools do not - // have access to Transport. Architecturally, all of this needs to be refactored - if (isset($transport) === true) { - $transport->connectionPool->scheduleCheck(); - - $neverRetry = isset($request['client']['never_retry']) ? $request['client']['never_retry'] : false; - $shouldRetry = $transport->shouldRetry($request); - $shouldRetryText = ($shouldRetry) ? 'true' : 'false'; - - $this->log->warning("Retries left? $shouldRetryText"); - if ($shouldRetry && !$neverRetry) { - return $transport->performRequest( - $request['http_method'], - $request['uri'], - [], - $request['body'], - $options - ); - } - } + // $connection->markAlive(); - $this->log->warning("Out of retries, throwing exception from $node"); - // Only throw if we run out of retries - throw $exception; - } else { - // Something went seriously wrong, bail - $exception = new TransportException($response['error']->getMessage()); - $this->logRequestFail($request, $response, $exception); - throw $exception; - } - } else { - $connection->markAlive(); + if (isset($response->getHeaders()['Warning'])) { + $this->logWarning($request, $response); + } - if (isset($response['headers']['Warning'])) { - $this->logWarning($request, $response); - } - if (isset($response['body']) === true) { - $response['body'] = stream_get_contents($response['body']); - $this->lastRequest['response']['body'] = $response['body']; + if ($response->getStatusCode() >= 400 && $response->getStatusCode() < 500) { + $ignore = $request['client']['ignore'] ?? []; + // Skip 404 if succeeded true in the body (e.g. clear_scroll) + $body = $response->getBody()->getContents() ?? ''; + if (strpos($body, '"succeeded":true') !== false) { + $ignore[] = 404; } + $this->process4xxError($request, $response, $ignore); + } elseif ($response->getStatusCode() >= 500) { + $ignore = $request['client']['ignore'] ?? []; + $this->process5xxError($request, $response, $ignore); + } - if ($response['status'] >= 400 && $response['status'] < 500) { - $ignore = $request['client']['ignore'] ?? []; - // Skip 404 if succeeded true in the body (e.g. clear_scroll) - $body = $response['body'] ?? ''; - if (strpos($body, '"succeeded":true') !== false) { - $ignore[] = 404; + // return isset($request['client']['verbose']) && $request['client']['verbose'] === true ? $response : $response['body']; + + // No error, deserialize + $responseBody = $this->serializer->deserialize( + $response->getBody()->getContents(), + $response->getHeaders() // ['transfer_stats'] + ); + + // $this->logRequestSuccess($request, $response); + return $responseBody; + }, function ($error) { + if ($error instanceof ConnectException) { + $this->log->warning("Curl exception encountered."); + + $exception = $this->getCurlRetryException($request, $response); + + // $this->logRequestFail($request, $response, $exception); + + $node = $connection->getHost(); + $this->log->warning("Marking node $node dead."); + $connection->markDead(); + + // If the transport has not been set, we are inside a Ping or Sniff, + // so we don't want to retrigger retries anyway. + // + // TODO this could be handled better, but we are limited because connectionpools do not + // have access to Transport. Architecturally, all of this needs to be refactored + if (isset($transport) === true) { + $transport->connectionPool->scheduleCheck(); + + $neverRetry = isset($request['client']['never_retry']) ? $request['client']['never_retry'] : false; + $shouldRetry = $transport->shouldRetry($request); + $shouldRetryText = ($shouldRetry) ? 'true' : 'false'; + + $this->log->warning("Retries left? $shouldRetryText"); + if ($shouldRetry && !$neverRetry) { + return $transport->performRequest( + $request['http_method'], + $request['uri'], + [], + $request['body'], + $options + ); } - $this->process4xxError($request, $response, $ignore); - } elseif ($response['status'] >= 500) { - $ignore = $request['client']['ignore'] ?? []; - $this->process5xxError($request, $response, $ignore); } - // No error, deserialize - $response['body'] = $this->serializer->deserialize($response['body'], $response['transfer_stats']); + $this->log->warning("Out of retries, throwing exception from $node"); + // Only throw if we run out of retries + throw $exception; + } else { + // Something went seriously wrong, bail + $exception = new TransportException($error->getMessage()); + // $this->logRequestFail($request, $response, $exception); + throw $exception; } - $this->logRequestSuccess($request, $response); - - return isset($request['client']['verbose']) && $request['client']['verbose'] === true ? $response : $response['body']; } ); @@ -375,7 +374,7 @@ public function getHeaders(): array return $this->headers; } - public function logWarning(array $request, array $response): void + public function logWarning(RequestInterface $request, ResponseInterface $response): void { $this->log->warning('Deprecation', $response['headers']['Warning']); } @@ -383,11 +382,11 @@ public function logWarning(array $request, array $response): void /** * Log a successful request * - * @param array $request - * @param array $response + * @param RequestInterface $request + * @param ResponseInterface $response * @return void */ - public function logRequestSuccess(array $request, array $response): void + public function logRequestSuccess(RequestInterface $request, ResponseInterface $response): void { $port = $request['client']['curl'][CURLOPT_PORT] ?? $response['transfer_stats']['primary_port'] ?? ''; $uri = $this->addPortInUrl($response['effective_url'], (int) $port); @@ -400,7 +399,7 @@ public function logRequestSuccess(array $request, array $response): void 'uri' => $uri, 'port' => $port, 'headers' => $request['headers'], - 'HTTP code' => $response['status'], + 'HTTP code' => $response->getStatusCode(), 'duration' => $response['transfer_stats']['total_time'], ) ); @@ -416,7 +415,7 @@ public function logRequestSuccess(array $request, array $response): void 'method' => $request['http_method'], 'uri' => $uri, 'port' => $port, - 'HTTP code' => $response['status'], + 'HTTP code' => $response->getStatusCode(), 'duration' => $response['transfer_stats']['total_time'], ) ); @@ -431,7 +430,7 @@ public function logRequestSuccess(array $request, array $response): void * * @return void */ - public function logRequestFail(array $request, array $response, \Throwable $exception): void + public function logRequestFail(RequestInterface $request, ResponseInterface $response, \Throwable $exception): void { $port = $request['client']['curl'][CURLOPT_PORT] ?? $response['transfer_stats']['primary_port'] ?? ''; $uri = $this->addPortInUrl($response['effective_url'], (int) $port); @@ -444,7 +443,7 @@ public function logRequestFail(array $request, array $response, \Throwable $exce 'uri' => $uri, 'port' => $port, 'headers' => $request['headers'], - 'HTTP code' => $response['status'], + 'HTTP code' => $response->getStatusCode(), 'duration' => $response['transfer_stats']['total_time'], 'error' => $exception->getMessage(), ) @@ -461,7 +460,7 @@ public function logRequestFail(array $request, array $response, \Throwable $exce 'method' => $request['http_method'], 'uri' => $uri, 'port' => $port, - 'HTTP code' => $response['status'], + 'HTTP code' => $response->getStatusCode(), 'duration' => $response['transfer_stats']['total_time'], ) ); @@ -485,7 +484,7 @@ public function ping(): bool return false; } - if ($response['status'] === 200) { + if ($response->getStatusCode() === 200) { $this->markAlive(); return true; @@ -563,7 +562,7 @@ public function getPort() return $this->port; } - protected function getCurlRetryException(array $request, array $response): OpenSearchException + protected function getCurlRetryException(RequestInterface $request, ResponseInterface $response): OpenSearchException { $exception = null; $message = $response['error']->getMessage(); @@ -634,16 +633,16 @@ private function buildCurlCommand(string $method, string $url, ?string $body): s /** * @throws OpenSearchException */ - private function process4xxError(array $request, array $response, array $ignore): void + private function process4xxError(RequestInterface $request, ResponseInterface $response, array $ignore): void { - $statusCode = $response['status']; + $statusCode = $response->getStatusCode(); /** * @var \Exception $exception */ $exception = $this->tryDeserialize400Error($response); - if (array_search($response['status'], $ignore) !== false) { + if (array_search($response->getStatusCode(), $ignore) !== false) { return; } @@ -672,9 +671,9 @@ private function process4xxError(array $request, array $response, array $ignore) /** * @throws OpenSearchException */ - private function process5xxError(array $request, array $response, array $ignore): void + private function process5xxError(RequestInterface $request, ResponseInterface $response, array $ignore): void { - $statusCode = (int) $response['status']; + $statusCode = (int) $response->getStatusCode(); $responseBody = $response['body']; /** @@ -742,7 +741,7 @@ private function tryDeserializeError(array $response, string $errorClass): OpenS // <2.0 "i just blew up" nonstructured exception // $error is an array but we don't know the format, reuse the response body instead // added json_encode to convert into a string - return new $errorClass(json_encode($response['body']), (int) $response['status']); + return new $errorClass(json_encode($response['body']), (int) $response->getStatusCode()); } // 2.0 structured exceptions @@ -752,19 +751,19 @@ private function tryDeserializeError(array $response, string $errorClass): OpenS $cause = $info['reason']; $type = $info['type']; // added json_encode to convert into a string - $original = new $errorClass(json_encode($response['body']), $response['status']); + $original = new $errorClass(json_encode($response['body']), $response->getStatusCode()); - return new $errorClass("$type: $cause", (int) $response['status'], $original); + return new $errorClass("$type: $cause", (int) $response->getStatusCode(), $original); } // <2.0 semi-structured exceptions // added json_encode to convert into a string - $original = new $errorClass(json_encode($response['body']), $response['status']); + $original = new $errorClass(json_encode($response['body']), $response->getStatusCode()); $errorEncoded = $error['error']; if (is_array($errorEncoded)) { $errorEncoded = json_encode($errorEncoded); } - return new $errorClass($errorEncoded, (int) $response['status'], $original); + return new $errorClass($errorEncoded, (int) $response->getStatusCode(), $original); } // if responseBody is not string, we convert it so it can be used as Exception message diff --git a/src/OpenSearch/Handlers/SigV4Handler.php b/src/OpenSearch/Handlers/SigV4Handler.php index 41bc8b8c..330aab24 100644 --- a/src/OpenSearch/Handlers/SigV4Handler.php +++ b/src/OpenSearch/Handlers/SigV4Handler.php @@ -46,7 +46,7 @@ public function __construct( ?: CredentialProvider::defaultProvider(); } - public function __invoke(array $request) + public function __invoke(RequestInterface $request) { $creds = call_user_func($this->credentialProvider)->wait(); diff --git a/src/OpenSearch/Namespaces/BooleanRequestWrapper.php b/src/OpenSearch/Namespaces/BooleanRequestWrapper.php index 9c29f656..245777ba 100644 --- a/src/OpenSearch/Namespaces/BooleanRequestWrapper.php +++ b/src/OpenSearch/Namespaces/BooleanRequestWrapper.php @@ -25,7 +25,6 @@ use OpenSearch\Common\Exceptions\RoutingMissingException; use OpenSearch\Endpoints\AbstractEndpoint; use OpenSearch\Transport; -use GuzzleHttp\Ring\Future\FutureArrayInterface; abstract class BooleanRequestWrapper { diff --git a/src/OpenSearch/Transport.php b/src/OpenSearch/Transport.php index 8655ce5e..bb7a8749 100644 --- a/src/OpenSearch/Transport.php +++ b/src/OpenSearch/Transport.php @@ -25,8 +25,8 @@ use OpenSearch\ConnectionPool\AbstractConnectionPool; use OpenSearch\Connections\Connection; use OpenSearch\Connections\ConnectionInterface; -use GuzzleHttp\Ring\Future\FutureArrayInterface; use Psr\Log\LoggerInterface; +use GuzzleHttp\Promise\Promise; class Transport { @@ -96,7 +96,7 @@ public function getConnection(): ConnectionInterface * * @throws Common\Exceptions\NoNodesAvailableException|\Exception */ - public function performRequest(string $method, string $uri, array $params = [], $body = null, array $options = []): FutureArrayInterface + public function performRequest(string $method, string $uri, array $params = [], $body = null, array $options = []): Promise { try { $connection = $this->getConnection(); @@ -118,7 +118,7 @@ public function performRequest(string $method, string $uri, array $params = [], $this ); - $future->promise()->then( + $future->then( //onSuccess function ($response) { $this->retryAttempts = 0; @@ -144,19 +144,19 @@ function ($response) { * * @return callable|array */ - public function resultOrFuture(FutureArrayInterface $result, array $options = []) + public function resultOrFuture(Promise $result, array $options = []) { $response = null; $async = isset($options['client']['future']) ? $options['client']['future'] : null; if (is_null($async) || $async === false) { do { $result = $result->wait(); - } while ($result instanceof FutureArrayInterface); + } while ($result instanceof Promise); } return $result; } - public function shouldRetry(array $request): bool + public function shouldRetry(RequestInterface $request): bool { if ($this->retryAttempts < $this->retries) { $this->retryAttempts += 1; diff --git a/tests/ClientTest.php b/tests/ClientTest.php index 43470607..845e0f61 100644 --- a/tests/ClientTest.php +++ b/tests/ClientTest.php @@ -21,8 +21,6 @@ namespace OpenSearch\Tests; -use GuzzleHttp\Ring\Client\MockHandler; -use GuzzleHttp\Ring\Future\FutureArray; use Mockery as m; use OpenSearch; use OpenSearch\Client; diff --git a/tests/Handlers/SigV4HandlerTest.php b/tests/Handlers/SigV4HandlerTest.php index 862043b2..4cc821b8 100644 --- a/tests/Handlers/SigV4HandlerTest.php +++ b/tests/Handlers/SigV4HandlerTest.php @@ -6,7 +6,6 @@ use Aws\Credentials\CredentialProvider; use Aws\Credentials\Credentials; -use GuzzleHttp\Ring\Future\CompletedFutureArray; use OpenSearch\ClientBuilder; use OpenSearch\Handlers\SigV4Handler; use PHPUnit\Framework\TestCase; diff --git a/tests/TransportTest.php b/tests/TransportTest.php index 96469261..613fbac1 100644 --- a/tests/TransportTest.php +++ b/tests/TransportTest.php @@ -26,8 +26,6 @@ use OpenSearch\Connections\Connection; use OpenSearch\Serializers\SerializerInterface; use OpenSearch\Transport; -use GuzzleHttp\Ring\Future\FutureArray; -use GuzzleHttp\Ring\Future\FutureArrayInterface; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface;