Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Withinboredom/bot 20 go consumers #14

Merged
merged 7 commits into from
Mar 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Empty file added .gitattributes
Empty file.
1 change: 1 addition & 0 deletions .idea/durable-php.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions .idea/watcherTasks.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 63 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,66 @@
FROM golang:1.22-alpine AS cli-base-alpine

SHELL ["/bin/ash", "-eo", "pipefail", "-c"]

RUN apk update; \
apk add --no-cache \
autoconf \
automake \
bash \
binutils \
binutils-gold \
bison \
build-base \
cmake \
composer \
curl \
file \
flex \
g++ \
gcc \
git \
jq \
libgcc \
libstdc++ \
libtool \
linux-headers \
m4 \
make \
pkgconfig \
php83 \
php83-common \
php83-ctype \
php83-curl \
php83-dom \
php83-mbstring \
php83-openssl \
php83-pcntl \
php83-phar \
php83-posix \
php83-session \
php83-sodium \
php83-tokenizer \
php83-xml \
php83-xmlwriter \
upx \
wget \
xz ; \
ln -sf /usr/bin/php83 /usr/bin/php

ENV COMPOSER_ALLOW_SUPERUSER=1

WORKDIR /go/src/app
COPY cli/go.mod cli/go.sum ./
RUN go mod graph | awk '{if ($1 !~ "@") print $2}' | xargs go get
COPY cli/build.sh .
COPY cli/build-php.sh .

RUN ./build-php.sh

COPY cli/cli.go .
COPY cli/vendor vendor
RUN ./build.sh

FROM php:8-zts AS base

COPY --from=mlocati/php-extension-installer /usr/bin/install-php-extensions /usr/bin/
Expand Down
77 changes: 45 additions & 32 deletions bin/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
use Amp\Parallel\Worker\ContextWorkerPool;
use Amp\Parallel\Worker\Execution;
use Amp\TimeoutCancellation;
use Basis\Nats\AmpClient;
use Bottledcode\DurablePhp\Abstractions\BeanstalkEventSource;
use Bottledcode\DurablePhp\Abstractions\EventHandlerInterface;
use Bottledcode\DurablePhp\Abstractions\EventQueueInterface;
Expand All @@ -40,6 +41,8 @@
use Bottledcode\DurablePhp\Contexts\LoggingContextFactory;
use Bottledcode\DurablePhp\DurableLogger;
use Bottledcode\DurablePhp\Events\Event;
use Bottledcode\DurablePhp\JetStream\Consumer;
use Bottledcode\DurablePhp\Nats\EnvConfiguration;
use Bottledcode\DurablePhp\QueueTask;
use Bottledcode\DurablePhp\State\Serializer;
use Bottledcode\DurablePhp\WorkerTask;
Expand All @@ -54,16 +57,18 @@ class RunCommand extends Command
{
use ProviderTrait;

private ProjectorInterface|null $projector = null;
private Semaphore|null $semaphore = null;
private ?ProjectorInterface $projector = null;

private string|null $namespace = null;
private ?Semaphore $semaphore = null;

private ?string $namespace = null;

private array $beanstalkConnectionParams = [];

private ContextWorkerPool $workerPool;

private int $workerTimeout = 60;

private string $bootstrap;

private array $providers;
Expand All @@ -75,29 +80,30 @@ class RunCommand extends Command
private string $semaphoreProvider;

private int $backpressure = 0;

private int $maxPressure = 0;

private Execution $q;

public function __construct()
{
parent::__construct("run", "Run your application");
$this->option("-b|--bootstrap", "A file to load before execution", default: 'bootstrap.php')
->option("-n|--namespace", "A short name for isolation", default: 'dphp')
->option("--nats", "host:port of a nats server to connect to", default: '127.0.0.1:4222')
->option("--max-workers", "maximum number of workers to run", default: "32")
->option("--execution-timeout", "maximum amount of time allowed to run code", default: '60')
->option("-m|--migrate", "migrate the db", default: true)
parent::__construct('run', 'Run your application');
$this->option('-b|--bootstrap', 'A file to load before execution', default: 'bootstrap.php')
->option('-n|--namespace', 'A short name for isolation', default: 'dphp')
->option('--nats', 'host:port of a nats server to connect to', default: '127.0.0.1:4222')
->option('--max-workers', 'maximum number of workers to run', default: '32')
->option('--execution-timeout', 'maximum amount of time allowed to run code', default: '60')
->option('-m|--migrate', 'migrate the db', default: true)
->option(
"-p|--projector",
"the projector to use",
'-p|--projector',
'the projector to use',
default: RethinkDbProjector::class
)
->option('-l|--distributed-lock', 'The distributed lock implementation to use', default: RethinkDbProjector::class)
->option(
"--monitor",
"what queues to monitor for more fine-grained scaling",
default: "activities,entities,orchestrations"
'--monitor',
'what queues to monitor for more fine-grained scaling',
default: 'activities,entities,orchestrations'
)
->onExit($this->exit(...));

Expand All @@ -108,14 +114,21 @@ public function __construct()
public function execute(
string $bootstrap,
string $namespace,
string $beanstalk,
string $nats,
string $projector,
int $maxWorkers,
int $executionTimeout,
string $distributedLock,
string $monitor,
bool $migrate
): int {

$client = new AmpClient(new EnvConfiguration(), new DurableLogger());
$client->background(true, 1);
$consumer = new Consumer($client, 'test', 'test', true, [], []);
var_dump($consumer->info());
exit();

$this->maxPressure = $maxWorkers * 3;
$this->namespace = $namespace;
$this->workerTimeout = $executionTimeout;
Expand All @@ -126,29 +139,29 @@ public function execute(
$this->configureBeanstalk($host, $port);
assert($this->beanstalkClient !== null);

$this->logger->debug("Connected to beanstalkd");
$this->logger->debug('Connected to beanstalkd');

$projectors = explode('->', $projector);

$this->logger->debug("Configuring projectors and semaphore providers", ['projectors' => $projectors, 'semaphores' => $distributedLock]);
$this->logger->debug('Configuring projectors and semaphore providers', ['projectors' => $projectors, 'semaphores' => $distributedLock]);

$this->providers = $projectors;
$this->semaphoreProvider = $distributedLock;

$this->configureProviders($projectors, $distributedLock, $migrate);

if (str_contains($monitor, 'activities')) {
$this->logger->debug("Subscribing to activity feed...");
$this->logger->debug('Subscribing to activity feed...');
$this->beanstalkClient->subscribe(QueueType::Activities);
}

if (str_contains($monitor, 'entities')) {
$this->logger->debug("Subscribing to entities feed...");
$this->logger->debug('Subscribing to entities feed...');
$this->beanstalkClient->subscribe(QueueType::Entities);
}

if (str_contains($monitor, 'orchestrations')) {
$this->logger->debug("Subscribing to orchestration feed...");
$this->logger->debug('Subscribing to orchestration feed...');
$this->beanstalkClient->subscribe(QueueType::Orchestrations);
}

Expand All @@ -160,11 +173,11 @@ public function execute(

EventLoop::setErrorHandler($this->exit(...));

$this->logger->alert("Ready");
$this->logger->alert('Ready');

/** @var Job $bEvent */
while($bEvent = $this->q->getChannel()->receive()) {
$this->logger->info("Processing event", ['bEventId' => $bEvent->getId()]);
while ($bEvent = $this->q->getChannel()->receive()) {
$this->logger->info('Processing event', ['bEventId' => $bEvent->getId()]);
$event = Serializer::deserialize(json_decode($bEvent->getData(), true), Event::class);
$this->handleEvent($event, $bEvent);
}
Expand All @@ -181,40 +194,40 @@ private function configureBeanstalk(string $host, int $port): void

private function handleEvent(Event $event, JobIdInterface $bEvent): void
{
$this->logger->info("Sending to worker", ['event' => $event, 'bEventId' => $bEvent->getId(),
$this->logger->info('Sending to worker', ['event' => $event, 'bEventId' => $bEvent->getId(),
'idle' => $this->workerPool->getIdleWorkerCount(), 'running' => $this->workerPool->isRunning()]);
$task = new WorkerTask($this->bootstrap, $event, $this->providers, $this->semaphoreProvider);
$execution = $this->workerPool->submit($task, new TimeoutCancellation($this->workerTimeout));
$execution->getFuture()->catch(function ($e) use ($bEvent) {
$this->logger->error("Unable to process job", ['bEventId' => $bEvent->getId(), 'exception' => $e]);
$this->logger->error('Unable to process job', ['bEventId' => $bEvent->getId(), 'exception' => $e]);
$this->q->getChannel()->send(['dead', $bEvent->getId()]);
})->map($this->handleTaskResult(new JobId($bEvent->getId())));
}

private function handleTaskResult(JobIdInterface $bEvent): Closure
{
return function (array|null $result) use ($bEvent) {
return function (?array $result) use ($bEvent) {
// mark event as successful
$this->logger->info("Acknowledge", ['bEventId' => $bEvent->getId(), 'result' => $result]);
$this->logger->info('Acknowledge', ['bEventId' => $bEvent->getId(), 'result' => $result]);
$this->q->getChannel()->send(['ack', $bEvent->getId()]);
//$this->beanstalkClient->ack($bEvent);

$this->logger->info("Firing " . count($result ?? []) . " events");
$this->logger->info('Firing ' . count($result ?? []) . ' events');
// dispatch events
foreach ($result ?? [] as $event) {
$this->beanstalkClient->fire($event);
}
};
}

private function exit(string|Throwable $reason = "exit")
private function exit(string|Throwable $reason = 'exit')
{
if ($this->namespace) {
$this->logger->error("releasing locks", ['reason' => $reason]);
$this->logger->error('releasing locks', ['reason' => $reason]);

EventLoop::queue(function () {
$this->semaphore->signalAll();
$this->logger->critical("Successfully released locks");
$this->logger->critical('Successfully released locks');
exit(1);
});

Expand Down
19 changes: 0 additions & 19 deletions bin/dphp

This file was deleted.

Binary file added bin/dphp-linux-x86_64
Binary file not shown.
2 changes: 2 additions & 0 deletions cli/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/.idea/
/dist/
1 change: 1 addition & 0 deletions cli/.vendor_modified
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1709414729
17 changes: 17 additions & 0 deletions cli/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
TARGET := dphp-linux-x86_64
BIN_PATH := ../bin
DOCKER_IMAGE := builder
DOCKER_TARGET := cli-base-alpine
BUILD_PATH := /go/src/app/dist

${BIN_PATH}/${TARGET}: cli.go go.mod .vendor_modified
cd .. && docker build --pull --target ${DOCKER_TARGET} -t ${DOCKER_IMAGE} .
docker create --name builder builder || ( docker rm -f builder && false )
docker cp ${DOCKER_IMAGE}:${BUILD_PATH}/${TARGET} ${BIN_PATH}/${TARGET} || ( docker rm -f builder && false )
docker rm -f builder

# This will capture the most recently modified time among all files under vendor/ recursively
VENDOR_TIMESTAMP := $(shell find vendor -type f -print0 | xargs -0 stat -c '%Y' | sort -nr | head -n1)

.vendor_modified:
@echo ${VENDOR_TIMESTAMP} > .vendor_modified
Loading