diff --git a/src/Trace/Reporter/AsyncReporter.php b/src/Trace/Reporter/AsyncReporter.php new file mode 100644 index 000000000000..35d175bb4077 --- /dev/null +++ b/src/Trace/Reporter/AsyncReporter.php @@ -0,0 +1,166 @@ + 1000, + * 'callPeriod' => 2.0, + * 'workerNum' => 2] + * @type array $clientConfig A config to LoggingClient + * {@see \Google\Cloud\Logging\LoggingClient::__construct()} + * **Defaults to** [] + * @type BatchRunner $batchRunner A BatchRunner object. Mainly used for + * the tests to inject a mock. **Defaults to** a newly created + * BatchRunner. + * } + */ + public function __construct(array $options = []) + { + $this->debugOutput = array_key_exists('debugOutput', $options) + ? $options['debugOutput'] + : false; + $this->clientConfig = array_key_exists('clientConfig', $options) + ? $options['clientConfig'] + : []; + $batchOptions = array_key_exists('batchOptions', $options) + ? $options['batchOptions'] + : []; + $this->batchOptions = $batchOptions + [ + 'batchSize' => 1000, + 'callPeriod' => 2.0, + 'workerNum' => 2 + ]; + + $this->batchRunner = array_key_exists('batchRunner', $options) + ? $options['batchRunner'] + : new BatchRunner(); + $this->batchRunner->registerJob( + self::BATCH_RUNNER_JOB_NAME, + [$this, 'sendEntries'], + $this->batchOptions + ); + } + + /** + * Report the provided Trace to a backend. + * + * @param TracerInterface $tracer + * @return bool + */ + public function report(TracerInterface $tracer) + { + $spans = $tracer->spans(); + if (empty($spans)) { + return false; + } + + $entry = [ + 'traceId' => $tracer->context()->traceId(), + 'spans' => $spans + ]; + try { + return $this->batchRunner->submitItem(self::BATCH_RUNNER_JOB_NAME, $entry); + } catch (\Exception $e) { + return false; + } + } + + /** + * BatchRunner callback handler for reporting serialied traces + * + * @param array $entries An array of traces to send. + * @return bool + */ + public function sendEntries(array $entries) + { + $start = microtime(true); + $client = $this->getClient(); + $traces = array_map(function ($entry) use ($client) { + $trace = $client->trace($entry['traceId']); + $trace->setSpans($entry['spans']); + return $trace; + }, $entries); + + try { + $client->insertBatch($traces); + } catch (ServiceException $e) { + fwrite(STDERR, $e->getMessage() . PHP_EOL); + return false; + } + $end = microtime(true); + if ($this->debugOutput) { + printf( + '%f seconds for insertBatch %d entries' . PHP_EOL, + $end - $start, + count($entries) + ); + printf('memory used: %d' . PHP_EOL, memory_get_usage()); + } + return true; + } + + protected function getClient() + { + if (!isset(self::$client)) { + self::$client = new TraceClient($this->clientConfig); + } + return self::$client; + } +} diff --git a/tests/unit/Trace/Reporter/AsyncReporterTest.php b/tests/unit/Trace/Reporter/AsyncReporterTest.php new file mode 100644 index 000000000000..489dd5a3d367 --- /dev/null +++ b/tests/unit/Trace/Reporter/AsyncReporterTest.php @@ -0,0 +1,127 @@ +runner = $this->prophesize(BatchRunner::class); + $this->tracer = $this->prophesize(TracerInterface::class); + + $this->runner->registerJob( + Argument::type('string'), + Argument::type('array'), + Argument::type('array') + )->willReturn(true); + } + + public function testReportsTrace() + { + $spans = [ + new TraceSpan([ + 'name' => 'span', + 'startTime' => microtime(true), + 'endTime' => microtime(true) + 10 + ]) + ]; + $this->tracer->context()->willReturn(new TraceContext('testtraceid')); + $this->tracer->spans()->willReturn($spans); + + $this->runner->submitItem(Argument::type('string'), Argument::type('array')) + ->willReturn(true); + + $reporter = new AsyncReporter([ + 'batchRunner' => $this->runner->reveal() + ]); + $this->assertTrue($reporter->report($this->tracer->reveal())); + } + + public function testSkipsReportingWhenNoSpans() + { + $this->tracer->spans()->willReturn([]); + + $reporter = new AsyncReporter([ + 'batchRunner' => $this->runner->reveal() + ]); + $this->assertFalse($reporter->report($this->tracer->reveal())); + } + + public function testCallback() + { + $client = $this->prophesize(TraceClient::class); + $reporter = new TestAsyncReporter([ + 'batchRunner' => $this->runner->reveal() + ]); + $trace1 = $this->prophesize(Trace::class); + $trace2 = $this->prophesize(Trace::class); + $trace1->setSpans(Argument::any())->shouldBeCalled(); + $trace2->setSpans(Argument::any())->shouldBeCalled(); + + $client->insertBatch([$trace1, $trace2])->willReturn(true); + $client->trace('trace1')->willReturn($trace1)->shouldBeCalled(); + $client->trace('trace2')->willReturn($trace2)->shouldBeCalled(); + + $reporter->setClient($client->reveal()); + $entries = [ + [ + 'traceId' => 'trace1', + 'spans' => [[ + 'name' => 'main', + 'spanId' => '012345', + 'startTime' => '2017-03-28T21:44:10.484299000Z', + 'endTime' => '2017-03-28T21:44:10.625299000Z' + ]] + ], + [ + 'traceId' => 'trace2', + 'spans' => [[ + 'name' => 'main', + 'spanId' => '234567', + 'startTime' => '2017-03-28T21:44:10.484299000Z', + 'endTime' => '2017-03-28T21:44:10.625299000Z' + ]] + ] + ]; + + $reporter->sendEntries($entries); + } +} + +class TestAsyncReporter extends AsyncReporter +{ + public function setClient($client) + { + self::$client = $client; + } +}