Skip to content

🐰 A wrapper of rabbitmq/amqp091-go that provides some features to use rabbitMQ easily

License

Notifications You must be signed in to change notification settings

Almazatun/rmqgo

Repository files navigation

🐰 rmqgo

Go Version License

Wrapper of rabbitmq/amqp091-go that provides some features.

Installation

go get github.com/Almazatun/rmqgo

Connect to rabbitMQ

import (rmqgo "github.com/Almazatun/rmqgo")

rmq := rmqgo.New()

config := rmqgo.ConnectConfig{
 User: "user",
 Pass: "pass",
 Host: "host",
 Port: "port",
}

err := rmq.Connect(config)

if err != nil {
 // some action
}

Optional params when initialize rmqgo.New()

With RPC mode for request and replay pattern

rmqgo.New(rmqgo.WithRpc(replayQueueName, exchangeType))

With topic RPC

rmqgo.New(rmqgo.WithTopicRpc(replayQueueName, exchangeType, routingKey))

Create channel

ch, err := rmq.CreateChannel()

if err != nil {
 // some action
}

Create queue

args := make(map[string]interface{})

q, err := rmq.CreateQueue(rmqgo.CreateQueueConfig{
 Name:         "some_name",
 DeleteUnused: false,
 Exclusive:    false,
 NoWait:       false,
 Durable:      true,
 Args:         &args,
})

if err != nil {
 // some action
}

Create exchange

Exchanges

import (rmqgo "github.com/Almazatun/rmqgo")

rmqgo.Exchanges.Direct()
rmqgo.Exchanges.Topic()
rmqgo.Exchanges.Fanout()
rmqgo.Exchanges.Headers()

Exchange types

import (rmqgo "github.com/Almazatun/rmqgo")

rmqgo.ExchangeType.Direct()
rmqgo.ExchangeType.Topic()
rmqgo.ExchangeType.Fanout()
args := make(map[string]interface{})

err := rmq.CreateExchange(rmqgo.CreateExchangeConfig{
 Name:       rmqgo.Exchanges.RmqDirect,
 Type:       rmqgo.ExchangeType.Direct,
 Durable:    true,
 AutoDelete: false,
 Internal:   false,
 NoWait:     false,
 Args:       &args,
})

if err != nil {
 // some action
}

Bind exchange by created queue

args := make(map[string]interface{})

err := rmq.BindQueueByExchange(rmqgo.BindQueueByExgConfig{
 QueueName:    "some_name",
 RoutingKey:   "some_key",
 ExchangeName: Exchanges.RmqDirect,
 NoWait:       false,
 Args:         &args,
})

if err != nil {
 // some action
}

Create producer

producer = rmqgo.NewProducer(&rmq)

Send message

err := producer.Send(Exchanges.RmqDirect, routingKey, msg, method)

if err != nil {
 // some action
}

Send message with reply

b, err := producer.SendReply(Exchanges.RmqDirect, routingKey, msg, method)

if err != nil {
 // some action
}

// msg - is your own type SomeName struct { someFields:... }

err = json.Unmarshal(*b, &msg)

if err != nil {
 // some action
}

Create consumer

consumer := rmqgo.NewConsumer(
  &rmq,
  rmqgo.WithConsumerConfig(rmqgo.CreateConsumerConfig{
   NameQueue: "some_name",
   Consumer:  "some_value",
   AutoAck:   false,
   Exclusive: false,
   NoWait:    false,
   NoLocal:   false,
  }),
 )

consumer.Listen()

Consuming messages from queues

// Bytes - <- chan []byte
<- rmq.ReceiveMessages()
consumer.Listen()

Optional params when initialize rmqgo.NewConsumer(...)

With HttpConsumer

rmqgo.NewConsumer(*rmq, rmqgo.WithHttpConsumer())

With Consumer Args

rmqgo.NewConsumer(*rmq, rmqgo.WithConsumerArgs(rmqgo.ConsumerArgs{
 XDeadLetterExc        *""
 XDeadLetterRoutingKey *""
 Ttl                   *int
 XExpires              *int
 XMaxPriority          *int
}))

About

🐰 A wrapper of rabbitmq/amqp091-go that provides some features to use rabbitMQ easily

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published