Skip to content

Commit

Permalink
Merge pull request #1 from driftphp/feature/using-console-bridge
Browse files Browse the repository at this point in the history
Using console bridge for messages
  • Loading branch information
mmoreram authored Dec 31, 2019
2 parents 4ec2243 + d831819 commit b16f15c
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 73 deletions.
18 changes: 9 additions & 9 deletions Async/AMQPAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
use Bunny\Message;
use Drift\Bus\Bus\CommandBus;
use Drift\Bus\Exception\InvalidCommandException;
use Drift\Console\OutputPrinter;
use React\EventLoop\LoopInterface;
use React\Promise\PromiseInterface;
use Symfony\Component\Console\Output\OutputInterface;

/**
* Class AMQPAdapter.
Expand Down Expand Up @@ -87,34 +87,34 @@ public function enqueue($command): PromiseInterface
/**
* Consume.
*
* @param CommandBus $bus
* @param int $limit
* @param OutputInterface $output
* @param CommandBus $bus
* @param int $limit
* @param OutputPrinter $outputPrinter
*
* @throws InvalidCommandException
*/
public function consume(
CommandBus $bus,
int $limit,
OutputInterface $output
OutputPrinter $outputPrinter
) {
$this->resetIterations($limit);

$this
->prepare()
->then(function () use ($bus, $output) {
->then(function () use ($bus, $outputPrinter) {
return $this
->channel
->qos(0, 1, true)
->then(function () use ($bus, $output) {
->then(function () use ($bus, $outputPrinter) {
return $this
->channel
->consume(function (Message $message, Channel $channel) use ($bus, $output) {
->consume(function (Message $message, Channel $channel) use ($bus, $outputPrinter) {
return $this
->executeCommand(
$bus,
unserialize($message->content),
$output,
$outputPrinter,
function () use ($message, $channel) {
return $channel->ack($message);
},
Expand Down
70 changes: 34 additions & 36 deletions Async/AsyncAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
namespace Drift\Bus\Async;

use Drift\Bus\Bus\CommandBus;
use Drift\Bus\Console\ConsumerLineMessage;
use Drift\Bus\Exception\InvalidCommandException;
use Drift\Bus\Exception\MissingHandlerException;
use Drift\Console\OutputPrinter;
use Drift\Console\TimeFormatter;
use React\Promise\FulfilledPromise;
use React\Promise\PromiseInterface;
use Symfony\Component\Console\Output\OutputInterface;

/**
* Interface AsyncAdapter.
Expand Down Expand Up @@ -64,43 +66,51 @@ abstract public function enqueue($command): PromiseInterface;
/**
* Consume.
*
* @param CommandBus $bus
* @param int $limit
* @param OutputInterface $output
* @param CommandBus $bus
* @param int $limit
* @param OutputPrinter $outputPrinter
*
* @throws InvalidCommandException
*/
abstract public function consume(
CommandBus $bus,
int $limit,
OutputInterface $output
OutputPrinter $outputPrinter
);

/**
* Execute command.
*
* @param CommandBus $bus
* @param object $command
* @param OutputInterface $output
* @param callable $ok
* @param callable $ko
* @param callable $finish
* @param CommandBus $bus
* @param object $command
* @param OutputPrinter $outputPrinter
* @param callable $ok
* @param callable $ko
* @param callable $finish
*
* @return PromiseInterface
*/
protected function executeCommand(
CommandBus $bus,
$command,
OutputInterface $output,
OutputPrinter $outputPrinter,

callable $ok,
callable $ko,
callable $finish
): PromiseInterface {
$from = microtime(true);

return $bus
->execute($command)
->then(function () use ($output, $command, $ok, $finish) {
$this->printCommandMessage($command, $output, 'consumed');
->then(function () use ($from, $outputPrinter, $command, $ok, $finish) {
$to = microtime(true);

(new ConsumerLineMessage(
$command,
TimeFormatter::formatTime($to - $from),
ConsumerLineMessage::CONSUMED
))->print($outputPrinter);

return (new FulfilledPromise())
->then(function () use ($ok) {
Expand All @@ -119,11 +129,17 @@ protected function executeCommand(

return false;
});
}, function (\Exception $exception) use ($output, $command, $ok, $ko) {
}, function (\Exception $exception) use ($from, $outputPrinter, $command, $ok, $ko) {
$to = microtime(true);
$ignorable = $exception instanceof MissingHandlerException;
$ignorable
? $this->printCommandMessage($command, $output, 'ignored')
: $this->printCommandMessage($command, $output, 'failed');

(new ConsumerLineMessage(
$command,
TimeFormatter::formatTime($to - $from),
$ignorable
? ConsumerLineMessage::IGNORED
: ConsumerLineMessage::REJECTED
))->print($outputPrinter);

return (
$ignorable
Expand Down Expand Up @@ -171,22 +187,4 @@ public function canConsumeAnotherOne(): bool

return true;
}

/**
* Print command consumed.
*
* @param object $command
* @param OutputInterface $output
* @param string $status
*/
private function printCommandMessage(
$command,
OutputInterface $output,
string $status
) {
$commandNamespace = get_class($command);
$commandParts = explode('\\', $commandNamespace);
$commandClass = end($commandParts);
$output->writeln(sprintf('Command <%s> %s', $commandClass, $status));
}
}
12 changes: 6 additions & 6 deletions Async/InMemoryAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

use Drift\Bus\Bus\CommandBus;
use Drift\Bus\Exception\InvalidCommandException;
use Drift\Console\OutputPrinter;
use function Clue\React\Block\await;
use React\EventLoop\LoopInterface;
use React\Promise\FulfilledPromise;
use React\Promise\PromiseInterface;
use Symfony\Component\Console\Output\OutputInterface;

/**
* Class DummyAdapter.
Expand Down Expand Up @@ -69,16 +69,16 @@ public function enqueue($command): PromiseInterface
/**
* Consume.
*
* @param CommandBus $bus
* @param int $limit
* @param OutputInterface $output
* @param CommandBus $bus
* @param int $limit
* @param OutputPrinter $outputPrinter
*
* @throws InvalidCommandException
*/
public function consume(
CommandBus $bus,
int $limit,
OutputInterface $output
OutputPrinter $outputPrinter
) {
$this->resetIterations($limit);

Expand All @@ -87,7 +87,7 @@ public function consume(
->executeCommand(
$bus,
$command,
$output,
$outputPrinter,
function () use ($key) {
unset($this->queue[$key]);
},
Expand Down
15 changes: 8 additions & 7 deletions Async/RedisAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
use Clue\React\Redis\Client;
use Drift\Bus\Bus\CommandBus;
use Drift\Bus\Exception\InvalidCommandException;
use Drift\Console\OutputPrinter;
use function Clue\React\Block\await;
use React\EventLoop\LoopInterface;
use React\Promise\PromiseInterface;
use Symfony\Component\Console\Output\OutputInterface;

/**
* Class RedisAdapter.
Expand Down Expand Up @@ -73,35 +73,36 @@ public function enqueue($command): PromiseInterface
/**
* Consume.
*
* @param CommandBus $bus
* @param int $limit
* @param OutputInterface $output
* @param CommandBus $bus
* @param int $limit
* @param OutputPrinter $outputPrinter
*
* @throws InvalidCommandException
*/
public function consume(
CommandBus $bus,
int $limit,
OutputInterface $output
OutputPrinter $outputPrinter
) {
$this->resetIterations($limit);

while (true) {
$promise = $this
->redis
->blPop($this->key, 0)
->then(function (array $job) use ($bus, $output) {
->then(function (array $job) use ($bus, $outputPrinter) {
return $this->executeCommand(
$bus,
unserialize($job[1]),
$output,
$outputPrinter,
function () {},
function () {},
function () {}
);
});

$wasLastOne = await($promise, $this->loop);

if ($wasLastOne) {
return;
}
Expand Down
7 changes: 6 additions & 1 deletion Console/CommandConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use Drift\Bus\Async\AsyncAdapter;
use Drift\Bus\Bus\CommandBus;
use Drift\Console\OutputPrinter;
use React\EventLoop\LoopInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
Expand Down Expand Up @@ -88,12 +89,16 @@ protected function configure()
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$outputPrinter = new OutputPrinter($output);
(new ConsumerHeaderMessage('', 'Consumer built'))->print($outputPrinter);
(new ConsumerHeaderMessage('', 'Started listening...'))->print($outputPrinter);

$this
->asyncAdapter
->consume(
$this->commandBus,
\intval($input->getOption('limit')),
$output
$outputPrinter
);

return 0;
Expand Down
56 changes: 56 additions & 0 deletions Console/ConsumerHeaderMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

/*
* This file is part of the DriftPHP Project
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* Feel free to edit as you please, and have fun.
*
* @author Marc Morera <[email protected]>
*/

declare(strict_types=1);

namespace Drift\Bus\Console;

use Drift\Console\OutputPrinter;

/**
* Class ConsumerHeaderMessage.
*/
final class ConsumerHeaderMessage
{
private $elapsedTime;
private $message;

/**
* ConsumerMessage constructor.
*
* @param string $elapsedTime
* @param string $message
*/
public function __construct(
string $elapsedTime,
string $message
) {
$this->elapsedTime = $elapsedTime;
$this->message = $message;
}

/**
* Print.
*
* @param OutputPrinter $outputPrinter
*/
public function print(OutputPrinter $outputPrinter)
{
$color = '32';

$outputPrinter->print("\033[01;{$color}mCONSM\033[0m ");
$outputPrinter->print("(\e[00;37m".$this->elapsedTime.' | '.((int) (memory_get_usage() / 1000000))." MB\e[0m)");
$outputPrinter->print(" {$this->message}");
$outputPrinter->printLine();
}
}
Loading

0 comments on commit b16f15c

Please sign in to comment.