Skip to content

Commit

Permalink
fix stream being stuck forever
Browse files Browse the repository at this point in the history
  • Loading branch information
felixdorn committed Jul 5, 2022
1 parent da71525 commit 7cba036
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 20 deletions.
8 changes: 7 additions & 1 deletion src/TwitterResponse.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@

namespace RWC\TwitterStream;

use GuzzleHttp\Psr7\Response;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\StreamInterface;
use RWC\TwitterStream\Exceptions\TwitterException;

class TwitterResponse implements ResponseInterface
{
protected array $payload;
protected array $payload = [];

public function __construct(protected ResponseInterface $response)
{
// The response is a stream.
if ($response->getBody()->getMetadata('wrapper_type') === 'http') {
return;
}

$payload = json_decode($response->getBody()->getContents(), true, flags: JSON_THROW_ON_ERROR);

if (array_key_exists('errors', $payload)) {
Expand Down
14 changes: 7 additions & 7 deletions src/TwitterStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ abstract class TwitterStream

public function listen(Connection $connection, callable $callback): void
{
$this->response = $connection->request('GET', $this->toURL(), ['stream' => true]);
$this->stream = $this->response->getBody();

$parser = new Parser();
$response = $connection->request('GET', $this->toURL(), ['stream' => true]);
$parser = new Parser();
$parser->parseAsObjects(
$this->stream,
function () use ($callback) {
$response->getBody(),
function (object $item) use ($callback) {
$this->received++;

$callback(...)->call($this, ...func_get_args());
dump('here');

$callback(...)->call($this, $item);
}
);
}
Expand Down
21 changes: 9 additions & 12 deletions tinker.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,17 @@

$connection = new Connection($bearerToken);
$rule = new RuleManager($connection);
foreach($rule->all() as $r) {
$rule->delete($r->id);
}
$res = $rule->new('cat videos but no images')
->group(fn (RuleBuilder $builder) => $builder->query('cats'))
->hasVideos()
->query('cat')
->save();

dd($res->getBody()->getContents(), $res->getStatusCode());


//
// $stream
// ->listen(new Connection($bearerToken), function (object $tweet) {
// dump($tweet);
// });
//
// foreach ($twitterStream->filteredTweets() as $tweet) {
// dump($tweet);
// }

$stream
->listen(new Connection($bearerToken), function (object $tweet) {
dump($tweet);
});

0 comments on commit 7cba036

Please sign in to comment.