Skip to content

Commit

Permalink
Refactor worker messaging
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Dec 23, 2023
1 parent 667cae7 commit bc3d248
Show file tree
Hide file tree
Showing 14 changed files with 188 additions and 136 deletions.
11 changes: 10 additions & 1 deletion bin/cluster
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,13 @@ try {

$watcher->start($workers);

$watcher->join();
foreach ($watcher->getMessageIterator() as $message) {
$data = $message->getData();
$id = $message->getWorker()->getId();

if (is_scalar($data) || $data instanceof \Stringable) {
$logger->info(sprintf('Received message from worker %d: %s', $id, $data));
} else {
$logger->notice(sprintf('Received non-printable message from worker %d of type %s', $id, get_debug_type($data)));
}
}
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"amphp/byte-stream": "^2",
"amphp/log": "^2",
"amphp/parallel": "^2.2",
"amphp/pipeline": "^1",
"amphp/pipeline": "dev-merge as 1.1",
"amphp/process": "^2",
"amphp/serialization": "^1",
"amphp/socket": "^2",
Expand Down
18 changes: 11 additions & 7 deletions src/Cluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use Amp\ByteStream\ResourceStream;
use Amp\Cancellation;
use Amp\CancelledException;
use Amp\Cluster\Internal\ClusterLogHandler;
use Amp\Cluster\Internal\ClusterMessage;
use Amp\Cluster\Internal\ClusterMessageType;
Expand Down Expand Up @@ -37,11 +36,11 @@ public static function isWorker(): bool
}

/**
* @return int Returns the amphp context ID of the execution context.
* @return int<0, max> Returns the context ID of the execution context or 0 if running as an independent script.
*/
public static function getContextId(): int
{
return \defined("AMP_CONTEXT_ID") ? \AMP_CONTEXT_ID : \getmypid();
return self::$cluster?->contextId ?? 0;
}

public static function getServerSocketFactory(): ServerSocketFactory
Expand Down Expand Up @@ -116,9 +115,12 @@ public static function awaitTermination(?Cancellation $cancellation = null): voi
}
}

private static function run(Channel $channel, Socket&ResourceStream $transferSocket): void
/**
* @param positive-int $contextId
*/
private static function run(int $contextId, Channel $channel, Socket&ResourceStream $transferSocket): void
{
self::$cluster = new self($channel, new ClusterServerSocketFactory($transferSocket));
self::$cluster = new self($contextId, $channel, new ClusterServerSocketFactory($transferSocket));
self::$cluster->loop();
}

Expand All @@ -136,9 +138,11 @@ public static function shutdown(): void
private readonly DeferredCancellation $loopCancellation;

/**
* @param int<0, max> $contextId
* @param Channel<ClusterMessage, ClusterMessage> $ipcChannel
*/
private function __construct(
private readonly int $contextId,
private readonly Channel $ipcChannel,
private readonly ClusterServerSocketFactory $serverSocketFactory,
) {
Expand Down Expand Up @@ -197,13 +201,13 @@ private function loop(): void
new ClusterMessage(ClusterMessageType::Pong, $message->data),
),

ClusterMessageType::Data => $this->queue->push($message->data),
ClusterMessageType::Data => $this->queue->pushAsync($message->data)->ignore(),

ClusterMessageType::Pong,
ClusterMessageType::Log => throw new \RuntimeException('Unexpected message type received'),
};
}
} catch (CancelledException|ChannelException) {
} catch (\Throwable) {
// IPC Channel manually closed
} finally {
$this->loopCancellation->cancel();
Expand Down
6 changes: 5 additions & 1 deletion src/ClusterServerSocketFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ public function listen(SocketAddress|string $address, ?BindContext $bindContext
\socket_listen($socket, $context["socket"]["backlog"] ?? 0);

$stream = \socket_export_stream($socket);
\stream_context_set_option($stream, $context);
if (PHP_VERSION_ID >= 80300) {
\stream_context_set_options($stream, $context);
} else {
\stream_context_set_option($stream, $context);
}

return new ResourceServerSocket($stream, $bindContext);
}
Expand Down
1 change: 1 addition & 0 deletions src/Internal/ClusterLogHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Monolog\LogRecord;
use Psr\Log\LogLevel;

/** @internal */
final class ClusterLogHandler extends AbstractProcessingHandler
{
/**
Expand Down
1 change: 1 addition & 0 deletions src/Internal/ClusterMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Amp\Cluster\Internal;

/** @internal */
final class ClusterMessage
{
public function __construct(
Expand Down
1 change: 1 addition & 0 deletions src/Internal/ClusterMessageType.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Amp\Cluster\Internal;

/** @internal */
enum ClusterMessageType
{
case Ping;
Expand Down
31 changes: 13 additions & 18 deletions src/Internal/ContextWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
use Amp\CancelledException;
use Amp\Cluster\Watcher;
use Amp\Cluster\Worker;
use Amp\Cluster\WorkerMessage;
use Amp\DeferredCancellation;
use Amp\Future;
use Amp\Parallel\Context\Context;
use Amp\Parallel\Context\ProcessContext;
use Amp\Pipeline\Queue;
use Amp\Socket\Socket;
use Amp\Sync\ChannelException;
use Amp\TimeoutCancellation;
Expand All @@ -21,10 +23,10 @@
use function Amp\weakClosure;

/**
* @template TReceive
* @template-covariant TReceive
* @template TSend
*
* @implements Worker<TReceive, TSend>
* @implements Worker<TSend>
*
* @internal
*/
Expand All @@ -34,18 +36,18 @@ final class ContextWorker extends AbstractLogger implements Worker

private int $lastActivity;

/** @var list<\Closure(TReceive):void> */
private array $onMessage = [];

private readonly Future $joinFuture;

/**
* @param positive-int $id
* @param Context<mixed, ClusterMessage|null, ClusterMessage|null> $context
* @param Queue<WorkerMessage<TReceive, TSend>> $queue
*/
public function __construct(
private readonly int $id,
private readonly Context $context,
private readonly Socket $socket,
private readonly Queue $queue,
private readonly DeferredCancellation $deferredCancellation,
private readonly Logger $logger,
) {
Expand All @@ -58,11 +60,6 @@ public function getId(): int
return $this->id;
}

public function onMessage(\Closure $onMessage): void
{
$this->onMessage[] = $onMessage;
}

public function send(mixed $data): void
{
$this->context->send(new ClusterMessage(ClusterMessageType::Data, $data));
Expand Down Expand Up @@ -96,7 +93,9 @@ public function run(): void
match ($message->type) {
ClusterMessageType::Pong => null,

ClusterMessageType::Data => $this->handleMessage($message->data),
ClusterMessageType::Data => $this->queue
->pushAsync(new WorkerMessage($this, $message->data))
->ignore(),

ClusterMessageType::Log => \array_map(
static fn (MonologHandler $handler) => $handler->handle($message->data),
Expand All @@ -108,6 +107,9 @@ public function run(): void
}

$this->joinFuture->await(new TimeoutCancellation(Watcher::WORKER_TIMEOUT));
} catch (\Throwable $exception) {
$this->joinFuture->ignore();
throw $exception;
} finally {
EventLoop::cancel($watcher);
$this->close();
Expand Down Expand Up @@ -152,11 +154,4 @@ public function log($level, $message, array $context = []): void

$this->logger->log($level, $message, $context);
}

private function handleMessage(mixed $data): void
{
foreach ($this->onMessage as $onMessage) {
async($onMessage, $data);
}
}
}
4 changes: 2 additions & 2 deletions src/Internal/cluster-runner.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

try {
// Read random IPC hub URI and associated key from process channel.
['uri' => $uri, 'key' => $key] = $channel->receive();
['id' => $id, 'uri' => $uri, 'key' => $key] = $channel->receive();

$transferSocket = Ipc\connect($uri, $key, new TimeoutCancellation(Watcher::WORKER_TIMEOUT));
} catch (\Throwable $exception) {
Expand All @@ -51,7 +51,7 @@

/** @psalm-suppress InvalidArgument */
Future\await([
async((static fn () => Cluster::run($channel, $transferSocket))->bindTo(null, Cluster::class)
async((static fn () => Cluster::run($id, $channel, $transferSocket))->bindTo(null, Cluster::class)
?: throw new \RuntimeException('Unable to bind closure')),

/* Protect current scope by requiring script within another function.
Expand Down
Loading

0 comments on commit bc3d248

Please sign in to comment.