How to do Change Stream and Add in Laravel queue #2751
Replies: 1 comment
-
User supervisor to keep it running <?php
namespace App\Console\Commands;
use App\Jobs\ProcessTrending;
use Illuminate\Console\Command;
use MongoDB\Client;
class StartChangeStreamCommand extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'app:start-change-stream-command';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Starts Change Stream';
/**
* Execute the console command.
*/
public function handle()
{
$client = new Client(config('database.connections.mongodb.dsn'));
$pipeline = [['$match' => ['operationType' => 'insert']]];
$options = [
'fullDocument' => 'updateLookup'
];
$changeStream = $client->selectCollection(config('database.connections.mongodb.database'), 'violations')
->watch($pipeline, $options);
for ($changeStream->rewind(); true; $changeStream->next()) {
if (!$changeStream->valid()) {
continue;
}
$event = $changeStream->current();
$ns = sprintf('%s.%s', $event['ns']['db'], $event['ns']['coll']);
$id = json_encode($event['documentKey']['_id']);
switch ($event['operationType']) {
case 'delete':
printf("Deleted document in %s with _id: %s\n\n", $ns, $id);
break;
case 'insert':
printf("Inserted new document in %s\n", $ns);
if ($event['fullDocument']['type'] !== "Movie") {
ProcessRecommendationJob::dispatch($event['fullDocument']);
}
echo json_encode($event['fullDocument']), "\n\n";
break;
case 'replace':
printf("Replaced new document in %s with _id: %s\n", $ns, $id);
echo json_encode($event['fullDocument']), "\n\n";
break;
case 'update':
printf("Updated document in %s with _id: %s\n", $ns, $id);
echo json_encode($event['updateDescription']), "\n\n";
break;
}
}
}
} |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
So I want to watch a change in my collection and save it in laravel queue for retriable job. Does someone can share to me how did you achieve it?
I got here but where is the best place to add this in laravel?
https://www.mongodb.com/docs/php-library/current/reference/method/MongoDBChangeStream-current/
Beta Was this translation helpful? Give feedback.
All reactions