Skip to content

Commit

Permalink
Merge pull request #15 from ChannexIO/features/CNX-1786_live_feed_pro…
Browse files Browse the repository at this point in the history
…ducer

CNX-1786 Implement logic for creating custom exchange
  • Loading branch information
EevanW authored Dec 11, 2020
2 parents 3d2a584 + 88833f6 commit 2e96b16
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 12 deletions.
55 changes: 44 additions & 11 deletions lib/message_queue/adapters/rabbitmq/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -77,26 +77,57 @@ defmodule MessageQueue.Adapters.RabbitMQ.Producer do
end
end

defp get_exchange_name(queues, _option) when is_list(queues), do: {:ok, "amq.fanout"}
defp get_exchange_type(queue, options) do
{_exchange_name, exchange_type} = get_exchange(queue, options)
{:ok, exchange_type}
end

defp get_exchange_name(queue, options) do
{exchange_name, _exchange_type} = get_exchange(queue, options)
{:ok, exchange_name}
end

defp get_exchange_name("" = _queue, options) do
if match?([_ | _], options[:headers]), do: {:ok, "amq.headers"}, else: {:ok, "amq.direct"}
defp get_exchange(queues, options) when is_list(queues) do
exchange_type = options[:exchange_type] || :fanout
exchange_name = options[:exchange] || "amq.#{exchange_type}"
{exchange_name, exchange_type}
end

defp get_exchange_name(_queue, _options), do: {:ok, "amq.direct"}
defp get_exchange("" = _queue, options) do
default_type = if match?([_ | _], options[:headers]), do: :headers, else: :direct
exchange_type = options[:exchange_type] || default_type
exchange_name = options[:exchange] || "amq.#{exchange_type}"
{exchange_name, exchange_type}
end

defp get_exchange(_queue, options) do
exchange_type = options[:exchange_type] || :direct
exchange_name = options[:exchange] || "amq.#{exchange_type}"
{exchange_name, exchange_type}
end

defp declare_and_bind_queue("amq.headers", _channel, _queue, _options), do: {:ok, ""}
defp declare_and_bind_queue("amq.headers", channel, _queue, _options) do
{:ok, %{routing_key: "", channel: channel}}
end

defp declare_and_bind_queue(exchange, channel, queues, options) do
if match?([_ | _], options[:headers]) do
{:ok, %{routing_key: "", channel: channel}}
else
declare_and_bind(exchange, channel, queues, options)
end
end

defp declare_and_bind_queue(exchange, channel, queues, options) when is_list(queues) do
Enum.reduce_while(queues, {:ok, ""}, fn queue, acc ->
case declare_and_bind_queue(exchange, channel, queue, options) do
{:ok, _} -> {:cont, acc}
defp declare_and_bind(exchange, channel, queues, options) when is_list(queues) do
Enum.reduce_while(queues, {:ok, %{routing_key: "", channel: channel}}, fn queue, acc ->
case declare_and_bind(exchange, channel, queue, options) do
{:ok, _} = result -> {:cont, result}
error -> {:halt, error}
end
end)
end

defp declare_and_bind_queue(exchange, channel, queue, options) do
defp declare_and_bind(exchange, channel, queue, options) do
options = Keyword.put_new(options, :durable, true)

with {:ok, %{queue: queue}} <- Queue.declare(channel, queue, [{:passive, true} | options]),
Expand All @@ -116,7 +147,9 @@ defmodule MessageQueue.Adapters.RabbitMQ.Producer do
defp redeclare_and_bind_queue(exchange, channel, queue, options) do
case Channel.open(channel.conn) do
{:ok, channel} ->
with {:ok, %{queue: queue}} <- Queue.declare(channel, queue, options),
with {:ok, exchange_type} <- get_exchange_type(queue, options),
:ok <- Exchange.declare(channel, exchange, exchange_type, options),
{:ok, %{queue: queue}} <- Queue.declare(channel, queue, options),
routing_key <- Keyword.get(options, :routing_key, queue),
:ok <- Queue.bind(channel, queue, exchange, routing_key: routing_key) do
{:ok, %{routing_key: routing_key, channel: channel}}
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule MessageQueue.MixProject do
use Mix.Project

@name "MessageQueue"
@version "0.1.8"
@version "0.1.9"
@repo_url "https://github.com/ChannexIO/message_queue"

def project do
Expand Down

0 comments on commit 2e96b16

Please sign in to comment.