Wrapper of rabbitmq/amqp091-go that provides some features.
go get github.com/Almazatun/rmqgo
import (rmqgo "github.com/Almazatun/rmqgo")
rmq := rmqgo.New()
config := rmqgo.ConnectConfig{
User: "user",
Pass: "pass",
Host: "host",
Port: "port",
}
err := rmq.Connect(config)
if err != nil {
// some action
}
With RPC mode for request and replay pattern
rmqgo.New(rmqgo.WithRpc(replayQueueName, exchangeType))
With topic RPC
rmqgo.New(rmqgo.WithTopicRpc(replayQueueName, exchangeType, routingKey))
ch, err := rmq.CreateChannel()
if err != nil {
// some action
}
args := make(map[string]interface{})
q, err := rmq.CreateQueue(rmqgo.CreateQueueConfig{
Name: "some_name",
DeleteUnused: false,
Exclusive: false,
NoWait: false,
Durable: true,
Args: &args,
})
if err != nil {
// some action
}
Exchanges
import (rmqgo "github.com/Almazatun/rmqgo")
rmqgo.Exchanges.Direct()
rmqgo.Exchanges.Topic()
rmqgo.Exchanges.Fanout()
rmqgo.Exchanges.Headers()
Exchange types
import (rmqgo "github.com/Almazatun/rmqgo")
rmqgo.ExchangeType.Direct()
rmqgo.ExchangeType.Topic()
rmqgo.ExchangeType.Fanout()
args := make(map[string]interface{})
err := rmq.CreateExchange(rmqgo.CreateExchangeConfig{
Name: rmqgo.Exchanges.RmqDirect,
Type: rmqgo.ExchangeType.Direct,
Durable: true,
AutoDelete: false,
Internal: false,
NoWait: false,
Args: &args,
})
if err != nil {
// some action
}
args := make(map[string]interface{})
err := rmq.BindQueueByExchange(rmqgo.BindQueueByExgConfig{
QueueName: "some_name",
RoutingKey: "some_key",
ExchangeName: Exchanges.RmqDirect,
NoWait: false,
Args: &args,
})
if err != nil {
// some action
}
producer = rmqgo.NewProducer(&rmq)
err := producer.Send(Exchanges.RmqDirect, routingKey, msg, method)
if err != nil {
// some action
}
b, err := producer.SendReply(Exchanges.RmqDirect, routingKey, msg, method)
if err != nil {
// some action
}
// msg - is your own type SomeName struct { someFields:... }
err = json.Unmarshal(*b, &msg)
if err != nil {
// some action
}
consumer := rmqgo.NewConsumer(
&rmq,
rmqgo.WithConsumerConfig(rmqgo.CreateConsumerConfig{
NameQueue: "some_name",
Consumer: "some_value",
AutoAck: false,
Exclusive: false,
NoWait: false,
NoLocal: false,
}),
)
consumer.Listen()
Consuming messages from queues
// Bytes - <- chan []byte
<- rmq.ReceiveMessages()
consumer.Listen()
With HttpConsumer
rmqgo.NewConsumer(*rmq, rmqgo.WithHttpConsumer())
With Consumer Args
rmqgo.NewConsumer(*rmq, rmqgo.WithConsumerArgs(rmqgo.ConsumerArgs{
XDeadLetterExc *""
XDeadLetterRoutingKey *""
Ttl *int
XExpires *int
XMaxPriority *int
}))