Reactive extensions for PHP. The reactive extensions for PHP are a set of libraries to compose asynchronous and event-based programs using observable streams.
$source = \Rx\Observable::fromArray([1, 2, 3, 4]);
function ($x) {
echo 'Next: ', $x, PHP_EOL;
function (Exception $ex) {
echo 'Error: ', $ex->getMessage(), PHP_EOL;
function () {
echo 'Completed', PHP_EOL;
//Next: 1
//Next: 2
//Next: 3
//Next: 4
$ git clone
$ cd RxPHP
$ composer install
$ php demo/interval/interval.php
Have fun running the demos in /demo
note: When running the demos, the scheduler is automatically bootstrapped. When using RxPHP within your own project, you'll need to set the default scheduler.
- Install an event loop. Any event loop should work, but the ReactPHP event loop is recommended.
$ composer require react/event-loop
- Install RxPHP using composer.
$ composer require reactivex/rxphp
- Write some code.
require_once __DIR__ . '/vendor/autoload.php';
use Rx\Observable;
use React\EventLoop\Factory;
use Rx\Scheduler;
$loop = Factory::create();
//You only need to set the default scheduler once
Scheduler::setDefaultFactory(function() use($loop){
return new Scheduler\EventLoopScheduler($loop);
->flatMap(function ($i) {
return Observable::of($i + 1);
->subscribe(function ($e) {
echo $e, PHP_EOL;
Some async PHP frameworks have yet to fully embrace the awesome power of observables. To help ease the transition, RxPHP has built in support for ReactPHP promises.
Mixing a promise into an observable stream:
->flatMap(function ($i) {
return Observable::fromPromise(\React\Promise\resolve(42 + $i));
->subscribe(function ($v) {
echo $v . PHP_EOL;
Converting an Observable into a promise. (This is useful for libraries that use generators and coroutines):
$observable = Observable::interval(1000)
$promise = $observable->toPromise();
RxPHP is licensed under the MIT License - see the LICENSE file for details