Skip to content

Commit

Permalink
Withinboredom/bot 20 go consumers (#14)
Browse files Browse the repository at this point in the history
* start refactoring history

* add go cli

* do not fail

* fix typo

* update cli

* do not track with lfs

* commit actual binary
  • Loading branch information
withinboredom authored Mar 3, 2024
1 parent 5ed0e4b commit e954d9f
Show file tree
Hide file tree
Showing 383 changed files with 139,833 additions and 59 deletions.
Empty file added .gitattributes
Empty file.
1 change: 1 addition & 0 deletions .idea/durable-php.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions .idea/watcherTasks.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 63 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,66 @@
FROM golang:1.22-alpine AS cli-base-alpine

SHELL ["/bin/ash", "-eo", "pipefail", "-c"]

RUN apk update; \
apk add --no-cache \
autoconf \
automake \
bash \
binutils \
binutils-gold \
bison \
build-base \
cmake \
composer \
curl \
file \
flex \
g++ \
gcc \
git \
jq \
libgcc \
libstdc++ \
libtool \
linux-headers \
m4 \
make \
pkgconfig \
php83 \
php83-common \
php83-ctype \
php83-curl \
php83-dom \
php83-mbstring \
php83-openssl \
php83-pcntl \
php83-phar \
php83-posix \
php83-session \
php83-sodium \
php83-tokenizer \
php83-xml \
php83-xmlwriter \
upx \
wget \
xz ; \
ln -sf /usr/bin/php83 /usr/bin/php

ENV COMPOSER_ALLOW_SUPERUSER=1

WORKDIR /go/src/app
COPY cli/go.mod cli/go.sum ./
RUN go mod graph | awk '{if ($1 !~ "@") print $2}' | xargs go get
COPY cli/build.sh .
COPY cli/build-php.sh .

RUN ./build-php.sh

COPY cli/cli.go .
COPY cli/vendor vendor
RUN ./build.sh

FROM php:8-zts AS base

COPY --from=mlocati/php-extension-installer /usr/bin/install-php-extensions /usr/bin/
Expand Down
77 changes: 45 additions & 32 deletions bin/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
use Amp\Parallel\Worker\ContextWorkerPool;
use Amp\Parallel\Worker\Execution;
use Amp\TimeoutCancellation;
use Basis\Nats\AmpClient;
use Bottledcode\DurablePhp\Abstractions\BeanstalkEventSource;
use Bottledcode\DurablePhp\Abstractions\EventHandlerInterface;
use Bottledcode\DurablePhp\Abstractions\EventQueueInterface;
Expand All @@ -40,6 +41,8 @@
use Bottledcode\DurablePhp\Contexts\LoggingContextFactory;
use Bottledcode\DurablePhp\DurableLogger;
use Bottledcode\DurablePhp\Events\Event;
use Bottledcode\DurablePhp\JetStream\Consumer;
use Bottledcode\DurablePhp\Nats\EnvConfiguration;
use Bottledcode\DurablePhp\QueueTask;
use Bottledcode\DurablePhp\State\Serializer;
use Bottledcode\DurablePhp\WorkerTask;
Expand All @@ -54,16 +57,18 @@ class RunCommand extends Command
{
use ProviderTrait;

private ProjectorInterface|null $projector = null;
private Semaphore|null $semaphore = null;
private ?ProjectorInterface $projector = null;

private string|null $namespace = null;
private ?Semaphore $semaphore = null;

private ?string $namespace = null;

private array $beanstalkConnectionParams = [];

private ContextWorkerPool $workerPool;

private int $workerTimeout = 60;

private string $bootstrap;

private array $providers;
Expand All @@ -75,29 +80,30 @@ class RunCommand extends Command
private string $semaphoreProvider;

private int $backpressure = 0;

private int $maxPressure = 0;

private Execution $q;

public function __construct()
{
parent::__construct("run", "Run your application");
$this->option("-b|--bootstrap", "A file to load before execution", default: 'bootstrap.php')
->option("-n|--namespace", "A short name for isolation", default: 'dphp')
->option("--nats", "host:port of a nats server to connect to", default: '127.0.0.1:4222')
->option("--max-workers", "maximum number of workers to run", default: "32")
->option("--execution-timeout", "maximum amount of time allowed to run code", default: '60')
->option("-m|--migrate", "migrate the db", default: true)
parent::__construct('run', 'Run your application');
$this->option('-b|--bootstrap', 'A file to load before execution', default: 'bootstrap.php')
->option('-n|--namespace', 'A short name for isolation', default: 'dphp')
->option('--nats', 'host:port of a nats server to connect to', default: '127.0.0.1:4222')
->option('--max-workers', 'maximum number of workers to run', default: '32')
->option('--execution-timeout', 'maximum amount of time allowed to run code', default: '60')
->option('-m|--migrate', 'migrate the db', default: true)
->option(
"-p|--projector",
"the projector to use",
'-p|--projector',
'the projector to use',
default: RethinkDbProjector::class
)
->option('-l|--distributed-lock', 'The distributed lock implementation to use', default: RethinkDbProjector::class)
->option(
"--monitor",
"what queues to monitor for more fine-grained scaling",
default: "activities,entities,orchestrations"
'--monitor',
'what queues to monitor for more fine-grained scaling',
default: 'activities,entities,orchestrations'
)
->onExit($this->exit(...));

Expand All @@ -108,14 +114,21 @@ public function __construct()
public function execute(
string $bootstrap,
string $namespace,
string $beanstalk,
string $nats,
string $projector,
int $maxWorkers,
int $executionTimeout,
string $distributedLock,
string $monitor,
bool $migrate
): int {

$client = new AmpClient(new EnvConfiguration(), new DurableLogger());
$client->background(true, 1);
$consumer = new Consumer($client, 'test', 'test', true, [], []);
var_dump($consumer->info());
exit();

$this->maxPressure = $maxWorkers * 3;
$this->namespace = $namespace;
$this->workerTimeout = $executionTimeout;
Expand All @@ -126,29 +139,29 @@ public function execute(
$this->configureBeanstalk($host, $port);
assert($this->beanstalkClient !== null);

$this->logger->debug("Connected to beanstalkd");
$this->logger->debug('Connected to beanstalkd');

$projectors = explode('->', $projector);

$this->logger->debug("Configuring projectors and semaphore providers", ['projectors' => $projectors, 'semaphores' => $distributedLock]);
$this->logger->debug('Configuring projectors and semaphore providers', ['projectors' => $projectors, 'semaphores' => $distributedLock]);

$this->providers = $projectors;
$this->semaphoreProvider = $distributedLock;

$this->configureProviders($projectors, $distributedLock, $migrate);

if (str_contains($monitor, 'activities')) {
$this->logger->debug("Subscribing to activity feed...");
$this->logger->debug('Subscribing to activity feed...');
$this->beanstalkClient->subscribe(QueueType::Activities);
}

if (str_contains($monitor, 'entities')) {
$this->logger->debug("Subscribing to entities feed...");
$this->logger->debug('Subscribing to entities feed...');
$this->beanstalkClient->subscribe(QueueType::Entities);
}

if (str_contains($monitor, 'orchestrations')) {
$this->logger->debug("Subscribing to orchestration feed...");
$this->logger->debug('Subscribing to orchestration feed...');
$this->beanstalkClient->subscribe(QueueType::Orchestrations);
}

Expand All @@ -160,11 +173,11 @@ public function execute(

EventLoop::setErrorHandler($this->exit(...));

$this->logger->alert("Ready");
$this->logger->alert('Ready');

/** @var Job $bEvent */
while($bEvent = $this->q->getChannel()->receive()) {
$this->logger->info("Processing event", ['bEventId' => $bEvent->getId()]);
while ($bEvent = $this->q->getChannel()->receive()) {
$this->logger->info('Processing event', ['bEventId' => $bEvent->getId()]);
$event = Serializer::deserialize(json_decode($bEvent->getData(), true), Event::class);
$this->handleEvent($event, $bEvent);
}
Expand All @@ -181,40 +194,40 @@ private function configureBeanstalk(string $host, int $port): void

private function handleEvent(Event $event, JobIdInterface $bEvent): void
{
$this->logger->info("Sending to worker", ['event' => $event, 'bEventId' => $bEvent->getId(),
$this->logger->info('Sending to worker', ['event' => $event, 'bEventId' => $bEvent->getId(),
'idle' => $this->workerPool->getIdleWorkerCount(), 'running' => $this->workerPool->isRunning()]);
$task = new WorkerTask($this->bootstrap, $event, $this->providers, $this->semaphoreProvider);
$execution = $this->workerPool->submit($task, new TimeoutCancellation($this->workerTimeout));
$execution->getFuture()->catch(function ($e) use ($bEvent) {
$this->logger->error("Unable to process job", ['bEventId' => $bEvent->getId(), 'exception' => $e]);
$this->logger->error('Unable to process job', ['bEventId' => $bEvent->getId(), 'exception' => $e]);
$this->q->getChannel()->send(['dead', $bEvent->getId()]);
})->map($this->handleTaskResult(new JobId($bEvent->getId())));
}

private function handleTaskResult(JobIdInterface $bEvent): Closure
{
return function (array|null $result) use ($bEvent) {
return function (?array $result) use ($bEvent) {
// mark event as successful
$this->logger->info("Acknowledge", ['bEventId' => $bEvent->getId(), 'result' => $result]);
$this->logger->info('Acknowledge', ['bEventId' => $bEvent->getId(), 'result' => $result]);
$this->q->getChannel()->send(['ack', $bEvent->getId()]);
//$this->beanstalkClient->ack($bEvent);

$this->logger->info("Firing " . count($result ?? []) . " events");
$this->logger->info('Firing ' . count($result ?? []) . ' events');
// dispatch events
foreach ($result ?? [] as $event) {
$this->beanstalkClient->fire($event);
}
};
}

private function exit(string|Throwable $reason = "exit")
private function exit(string|Throwable $reason = 'exit')
{
if ($this->namespace) {
$this->logger->error("releasing locks", ['reason' => $reason]);
$this->logger->error('releasing locks', ['reason' => $reason]);

EventLoop::queue(function () {
$this->semaphore->signalAll();
$this->logger->critical("Successfully released locks");
$this->logger->critical('Successfully released locks');
exit(1);
});

Expand Down
19 changes: 0 additions & 19 deletions bin/dphp

This file was deleted.

2 changes: 2 additions & 0 deletions cli/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/.idea/
/dist/
1 change: 1 addition & 0 deletions cli/.vendor_modified
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1709414729
17 changes: 17 additions & 0 deletions cli/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
TARGET := dphp-linux-x86_64
BIN_PATH := ../bin
DOCKER_IMAGE := builder
DOCKER_TARGET := cli-base-alpine
BUILD_PATH := /go/src/app/dist

${BIN_PATH}/${TARGET}: cli.go go.mod .vendor_modified
cd .. && docker build --pull --target ${DOCKER_TARGET} -t ${DOCKER_IMAGE} .
docker create --name builder builder || ( docker rm -f builder && false )
docker cp ${DOCKER_IMAGE}:${BUILD_PATH}/${TARGET} ${BIN_PATH}/${TARGET} || ( docker rm -f builder && false )
docker rm -f builder

# This will capture the most recently modified time among all files under vendor/ recursively
VENDOR_TIMESTAMP := $(shell find vendor -type f -print0 | xargs -0 stat -c '%Y' | sort -nr | head -n1)

.vendor_modified:
@echo ${VENDOR_TIMESTAMP} > .vendor_modified
Loading

0 comments on commit e954d9f

Please sign in to comment.