Skip to content

Commit

Permalink
Modernize Transports
Browse files Browse the repository at this point in the history
  • Loading branch information
bzikarsky committed Apr 5, 2022
1 parent 89c6906 commit f2625cf
Show file tree
Hide file tree
Showing 23 changed files with 492 additions and 1,178 deletions.
39 changes: 10 additions & 29 deletions src/Gelf/Transport/AbstractTransport.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<?php
declare(strict_types=1);

/*
* This file is part of the php-gelf package.
Expand All @@ -12,6 +13,7 @@
namespace Gelf\Transport;

use Gelf\Encoder\EncoderInterface;
use Gelf\Encoder\JsonEncoder;
use Gelf\MessageInterface;
use Gelf\PublisherInterface;

Expand All @@ -21,22 +23,19 @@
*
* @author Benjamin Zikarsky <[email protected]>
*/
abstract class AbstractTransport implements TransportInterface, PublisherInterface
abstract class AbstractTransport implements TransportInterface
{
protected EncoderInterface $messageEncoder;

/**
* @var EncoderInterface
*/
protected $messageEncoder;
public function __construct(?EncoderInterface $messageEncoder = null)
{
$this->messageEncoder = $messageEncoder ?? new JsonEncoder();
}

/**
* Sets a message encoder
*
* @param EncoderInterface $encoder
*
* @return $this
*/
public function setMessageEncoder(EncoderInterface $encoder)
public function setMessageEncoder(EncoderInterface $encoder): self
{
$this->messageEncoder = $encoder;

Expand All @@ -45,27 +44,9 @@ public function setMessageEncoder(EncoderInterface $encoder)

/**
* Returns the current message encoder
*
* @return EncoderInterface
*/
public function getMessageEncoder()
public function getMessageEncoder(): EncoderInterface
{
return $this->messageEncoder;
}

/**
* Alias to send() without return value
* Required to fulfill the PublisherInterface
*
* @deprecated deprecated since 1.1
* @codeCoverageIgnore
*
* @param MessageInterface $message
*
* @return int the number of bytes sent
*/
public function publish(MessageInterface $message)
{
return $this->send($message);
}
}
31 changes: 9 additions & 22 deletions src/Gelf/Transport/AmqpTransport.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<?php
declare(strict_types=1);

/*
* This file is part of the php-gelf package.
Expand All @@ -24,37 +25,23 @@
*/
class AmqpTransport extends AbstractTransport
{
/**
* @var AMQPExchange $exchange
*/
protected $exchange;

/**
* @var AMQPQueue $exchange
*/
protected $queue;

/**
* @param AMQPExchange $exchange
* @param AMQPQueue $queue
*/
public function __construct(AMQPExchange $exchange, AMQPQueue $queue)
{
$this->queue = $queue;
$this->exchange = $exchange;
$this->messageEncoder = new DefaultEncoder();
public function __construct(
private AMQPExchange $exchange,
private AMQPQueue $queue
) {
parent::__construct();
}

/**
* @inheritdoc
*/
public function send(Message $message)
public function send(Message $message): int
{
$rawMessage = $this->getMessageEncoder()->encode($message);

$attributes = array(
$attributes = [
'Content-type' => 'application/json'
);
];

// if queue is durable then mark message as 'persistent'
if (($this->queue->getFlags() & AMQP_DURABLE) > 0) {
Expand Down
143 changes: 34 additions & 109 deletions src/Gelf/Transport/HttpTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,76 +27,29 @@
*/
class HttpTransport extends AbstractTransport
{
const DEFAULT_HOST = "127.0.0.1";
const DEFAULT_PORT = 12202;
const DEFAULT_PATH = "/gelf";

const AUTO_SSL_PORT = 443;

/**
* @var string
*/
protected $host;

/**
* @var int
*/
protected $port;

/**
* @var string
*/
protected $path;

/**
* @var StreamSocketClient
*/
protected $socketClient;

/**
* @var SslOptions|null
*/
protected $sslOptions = null;

/**
* @var string|null
*/
protected $authentication = null;
private const DEFAULT_HOST = "127.0.0.1";
private const DEFAULT_PORT = 12202;
private const DEFAULT_PATH = "/gelf";
private const AUTO_SSL_PORT = 443;

/**
* @var string|null
*/
protected $proxyUri = null;
private StreamSocketClient $socketClient;

/**
* @var bool
*/
protected $requestFullUri = false;
private ?string $authentication = null;
private ?string $proxyUri = null;
private ?bool $requestFullUri = false;

/**
* Class constructor
*
* @param string|null $host when NULL or empty default-host is used
* @param int|null $port when NULL or empty default-port is used
* @param string|null $path when NULL or empty default-path is used
* @param SslOptions|null $sslOptions when null not SSL is used
*/
public function __construct(
$host = self::DEFAULT_HOST,
$port = self::DEFAULT_PORT,
$path = self::DEFAULT_PATH,
SslOptions $sslOptions = null
private string $host = self::DEFAULT_HOST,
private int $port = self::DEFAULT_PORT,
private string $path = self::DEFAULT_PATH,
private ?SslOptions $sslOptions = null
) {
$this->host = $host;
$this->port = $port;
$this->path = $path;
parent::__construct();

if ($port == self::AUTO_SSL_PORT && $sslOptions == null) {
$sslOptions = new SslOptions();
if ($port == self::AUTO_SSL_PORT && $sslOptions === null) {
$this->sslOptions = new SslOptions();
}

$this->sslOptions = $sslOptions;
$this->messageEncoder = new DefaultEncoder();
$this->socketClient = new StreamSocketClient(
$this->getScheme(),
$this->host,
Expand All @@ -113,13 +66,8 @@ public function __construct(
* If a username but no password is given, and empty password is used.
* If a https URI is given, the provided SslOptions (with a fallback to
* the default SslOptions) are used.
*
* @param string $url
* @param SslOptions|null $sslOptions
*
* @return HttpTransport
*/
public static function fromUrl($url, SslOptions $sslOptions = null)
public static function fromUrl(string $url, ?SslOptions $sslOptions = null): static
{
$parsed = parse_url($url);

Expand All @@ -130,12 +78,12 @@ public static function fromUrl($url, SslOptions $sslOptions = null)

// check it's http or https
$scheme = strtolower($parsed['scheme']);
if (!in_array($scheme, array('http', 'https'))) {
if (!in_array($scheme, ['http', 'https'])) {
throw new \InvalidArgumentException("$url is not a valid http/https URL");
}

// setup defaults
$defaults = array('port' => 80, 'path' => '', 'user' => null, 'pass' => '');
$defaults = ['port' => 80, 'path' => '', 'user' => null, 'pass' => ''];

// change some defaults for https
if ($scheme == 'https') {
Expand All @@ -157,22 +105,16 @@ public static function fromUrl($url, SslOptions $sslOptions = null)

/**
* Sets HTTP basic authentication
*
* @param string $username
* @param string $password
*/
public function setAuthentication($username, $password)
public function setAuthentication(string $username, string $password): void
{
$this->authentication = $username . ":" . $password;
}

/**
* Enables HTTP proxy
*
* @param $proxyUri
* @param bool $requestFullUri
*/
public function setProxy($proxyUri, $requestFullUri = false)
public function setProxy(string $proxyUri, bool $requestFullUri = false): void
{
$this->proxyUri = $proxyUri;
$this->requestFullUri = $requestFullUri;
Expand All @@ -181,25 +123,21 @@ public function setProxy($proxyUri, $requestFullUri = false)
}

/**
* Sends a Message over this transport
*
* @param MessageInterface $message
*
* @return int the number of bytes sent
* @inheritDoc
*/
public function send(MessageInterface $message)
public function send(MessageInterface $message): int
{
$messageEncoder = $this->getMessageEncoder();
$rawMessage = $messageEncoder->encode($message);

$request = array(
$request = [
sprintf("POST %s HTTP/1.1", $this->path),
sprintf("Host: %s:%d", $this->host, $this->port),
sprintf("Content-Length: %d", strlen($rawMessage)),
"Content-Type: application/json",
"Connection: Keep-Alive",
"Accept: */*"
);
];

if (null !== $this->authentication) {
$request[] = "Authorization: Basic " . base64_encode($this->authentication);
Expand All @@ -219,7 +157,7 @@ public function send(MessageInterface $message)

// if we don't have a HTTP/1.1 connection, or the server decided to close the connection
// we should do so as well. next read/write-attempt will open a new socket in this case.
if (strpos($headers, "HTTP/1.1") !== 0 || preg_match("!Connection:\s*Close!i", $headers)) {
if (!str_starts_with($headers, "HTTP/1.1") || preg_match("!Connection:\s*Close!i", $headers)) {
$this->socketClient->close();
}

Expand All @@ -235,10 +173,7 @@ public function send(MessageInterface $message)
return $byteCount;
}

/**
* @return string
*/
private function readResponseHeaders()
private function readResponseHeaders(): string
{
$chunkSize = 1024; // number of bytes to read at once
$delimiter = "\r\n\r\n"; // delimiter between headers and response
Expand All @@ -247,58 +182,48 @@ private function readResponseHeaders()
do {
$chunk = $this->socketClient->read($chunkSize);
$response .= $chunk;
} while (false === strpos($chunk, $delimiter) && strlen($chunk) > 0);
} while (!str_contains($chunk, $delimiter) && strlen($chunk) > 0);

$elements = explode($delimiter, $response, 2);

return $elements[0];
}

/**
* @return string
*/
private function getScheme()
private function getScheme(): string
{
return null === $this->sslOptions ? 'tcp' : 'ssl';
}

/**
* @return array
*/
private function getContext()
private function getContext(): array
{
$options = array();
$options = [];

if (null !== $this->sslOptions) {
$options = array_merge($options, $this->sslOptions->toStreamContext($this->host));
}

if (null !== $this->proxyUri) {
$options['http'] = array(
$options['http'] = [
'proxy' => $this->proxyUri,
'request_fulluri' => $this->requestFullUri
);
];
}

return $options;
}

/**
* Sets the connect-timeout
*
* @param int $timeout
*/
public function setConnectTimeout($timeout)
public function setConnectTimeout(int $timeout): void
{
$this->socketClient->setConnectTimeout($timeout);
}

/**
* Returns the connect-timeout
*
* @return int
*/
public function getConnectTimeout()
public function getConnectTimeout(): int
{
return $this->socketClient->getConnectTimeout();
}
Expand Down
Loading

0 comments on commit f2625cf

Please sign in to comment.