diff --git a/Console/CommandConsumerCommand.php b/Console/CommandConsumerCommand.php index e5b910d..0991750 100644 --- a/Console/CommandConsumerCommand.php +++ b/Console/CommandConsumerCommand.php @@ -18,7 +18,8 @@ use Drift\CommandBus\Async\AsyncAdapter; use Drift\CommandBus\Bus\InlineCommandBus; use Drift\Console\OutputPrinter; -use React\EventLoop\LoopInterface; +use Drift\EventBus\Bus\EventBus; +use Drift\EventBus\Subscriber\EventBusSubscriber; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; @@ -31,22 +32,25 @@ class CommandConsumerCommand extends Command { private AsyncAdapter $asyncAdapter; private InlineCommandBus $commandBus; + private ?EventBusSubscriber $eventBusSubscriber; /** * ConsumeCommand constructor. * - * @param AsyncAdapter $asyncAdapter - * @param InlineCommandBus $commandBus - * @param LoopInterface $loop + * @param AsyncAdapter $asyncAdapter + * @param InlineCommandBus $commandBus + * @param EventBusSubscriber|null $eventBusSubscriber */ public function __construct( AsyncAdapter $asyncAdapter, - InlineCommandBus $commandBus + InlineCommandBus $commandBus, + ?EventBusSubscriber $eventBusSubscriber ) { parent::__construct(); $this->asyncAdapter = $asyncAdapter; $this->commandBus = $commandBus; + $this->eventBusSubscriber = $eventBusSubscriber; } /** @@ -62,6 +66,18 @@ protected function configure() 'Number of jobs to handle before dying', 0 ); + + /* + * If we have the EventBus loaded, we can add listeners as well + */ + if (class_exists(EventBus::class)) { + $this->addOption( + 'exchange', + null, + InputOption::VALUE_IS_ARRAY | InputOption::VALUE_REQUIRED, + 'Exchanges to listen' + ); + } } /** @@ -82,6 +98,21 @@ protected function execute(InputInterface $input, OutputInterface $output) (new CommandBusHeaderMessage('', 'Using adapter '.$adapterName))->print($outputPrinter); (new CommandBusHeaderMessage('', 'Started listening...'))->print($outputPrinter); + $exchanges = self::buildQueueArray($input); + if ( + class_exists(EventBusSubscriber::class) && + !empty($exchanges) && + !is_null($this->eventBusSubscriber) + ) { + (new CommandBusHeaderMessage('', 'Kernel connected to exchanges.'))->print($outputPrinter); + $this + ->eventBusSubscriber + ->subscribeToExchanges( + $exchanges, + $outputPrinter + ); + } + $this ->asyncAdapter ->consume( @@ -92,4 +123,26 @@ protected function execute(InputInterface $input, OutputInterface $output) return 0; } + + /** + * Build queue architecture from array of strings. + * + * @param InputInterface $input + * + * @return array + */ + private static function buildQueueArray(InputInterface $input): array + { + if (!$input->hasOption('exchange')) { + return []; + } + + $exchanges = []; + foreach ($input->getOption('exchange') as $exchange) { + $exchangeParts = explode(':', $exchange, 2); + $exchanges[$exchangeParts[0]] = $exchangeParts[1] ?? ''; + } + + return $exchanges; + } } diff --git a/DependencyInjection/CompilerPass/BusCompilerPass.php b/DependencyInjection/CompilerPass/BusCompilerPass.php index 2facfac..be450ec 100644 --- a/DependencyInjection/CompilerPass/BusCompilerPass.php +++ b/DependencyInjection/CompilerPass/BusCompilerPass.php @@ -34,6 +34,7 @@ use Drift\CommandBus\Middleware\AsyncMiddleware; use Drift\CommandBus\Middleware\HandlerMiddleware; use Drift\CommandBus\Middleware\Middleware; +use Drift\EventBus\Subscriber\EventBusSubscriber; use Drift\Postgresql\DependencyInjection\CompilerPass\PostgresqlCompilerPass; use Drift\Redis\DependencyInjection\CompilerPass\RedisCompilerPass; use Exception; @@ -42,6 +43,7 @@ use ReflectionException; use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface; use Symfony\Component\DependencyInjection\ContainerBuilder; +use Symfony\Component\DependencyInjection\ContainerInterface; use Symfony\Component\DependencyInjection\Definition; use Symfony\Component\DependencyInjection\Reference; @@ -424,7 +426,7 @@ private static function createCommandConsumer(ContainerBuilder $container) $consumer = new Definition(CommandConsumerCommand::class, [ new Reference(AsyncAdapter::class), new Reference('drift.inline_command_bus'), - new Reference('reactphp.event_loop'), + new Reference(EventBusSubscriber::class, ContainerInterface::NULL_ON_INVALID_REFERENCE), ]); $consumer->addTag('console.command', [