If you control your app lifecycle, better use amqplib. And don't use queue message broker for pubslish/subscribe.
Yet another opinionated RPC library based on RabbitMQ (through rabbot)
- Promise-based interface (thanks rabbot)
- Convention over configuration in exchange, queue, routingKeys naming
Implemented producer/consumer patterns:
- Request / Response
- Publish / Subscribe
- Send / Receive
npm install --save rabrpc
# or
yarn add rabrpc
Important!
rpc.configure
should be called after binding handlers viarpc.respond
in consumer microservice and must be called before requesting data withrpc.request
in provider microservice
config
- rabrpc or rabbot config objecttransformConfig
- ifconfig
is a rabbot settingtransformConfig
must be false, default true
If you need more flexibility, you can pass a valid rabbot configuration into rpc.configure
The only requirement is name exchanges, queues and bindings with convention
const config = {
connection: {
user: 'guest',
pass: 'guest',
server: '127.0.0.1',
// server: "127.0.0.1, 194.66.82.11",
// server: ["127.0.0.1", "194.66.82.11"],
port: 5672,
timeout: 2000,
vhost: '%2fmyhost'
},
exchanges: [
{ name: 'config-ex.1', type: 'fanout', publishTimeout: 1000 },
{
name: 'config-ex.2',
type: 'topic',
alternate: 'alternate-ex.2',
persistent: true
},
{ name: 'dead-letter-ex.2', type: 'fanout' }
],
queues: [
{ name: 'config-q.1', limit: 100, queueLimit: 1000 },
{ name: 'config-q.2', subscribe: true, deadLetter: 'dead-letter-ex.2' }
],
bindings: [
{ exchange: 'config-ex.1', target: 'config-q.1', keys: ['bob', 'fred'] },
{ exchange: 'config-ex.2', target: 'config-q.2', keys: 'test1' }
]
}
rpc.configure(config, false) // transform config = false
const rpc = require('rabrpc') // singleton
const config = {
// uri
connection: 'amqp://guest:guest@localhost:5672/?heartbeat=10',
// or object passed to rabbot see https://github.com/arobson/rabbot#configuration-via-json
// connection: {user: 'guest', pass: 'guest', server: 'localhost', ...}
// respond configuration in consumer microservice
// this config will create exchange(s), queue(s) and binding(s)
// request configation in provider microservice only create exchange(s)
res: {
// string or object or array of strings or objects
serviceName: 'foo-service-name',
// rabbot queue options, see https://github.com/arobson/rabbot#addqueue-queuename-options-connectionname-
// subscribe: true is default
messageTtl: 30000,
limit: 10
// ...etc
}
}
// somewhere in your microservice initialization cycle
rpc.configure(config) // returns promise
const rpc = require('rabrpc') // singleton
const config = {
connection: '<URI string>', // see above
// requesting resource configuration
// this config will create only exchange(s)
// respond configuration in consumer microservice will create queue(s) and binding(s)
// req: 'foo-service-name' | ['foo-service-name', 'bar-service-name'] | {serviceName: 'foo-service-name'} | [{serviceName: 'foo-service-name'}, {serviceName: 'bar-service-name'}]
req: 'foo-service-name'
}
// somewhere in your microservice initialization cycle
rpc.configure(config) // returns promise
Parmeter | Value | Example |
---|---|---|
exchange | req-res.serviceName |
req-res.foo-service-name |
queue | req-res.serviceName |
req-res.foo-service-name |
routingKey | serviceName |
foo-service-name |
messageType | version .serviceName .action |
v1.foo-service-name.someAction |
messageType
- full path for service action, e.g.'v1.images.resize'
or'v1.users.role.findAll'
where second part (images
,users
) is a serviceName specified in config (in rabbot using as type of message)handler
- function, which takespayload
ormessage
,responseActions
andmessageType
payload
ormessage
-message
ifraw
istrue
otherwisemessage.body
responseActions
- object with 3 functionssuccess
,fail
,error
messageType
- type of rabbot message (usefull when listening for types, which contain*
or#
)
raw
- iftrue
then first argument forhandler
will be rabbotmessage
insteadmessage.body
by default
const rpc = require('rabrpc')
// before initialization
rpc.respond('v1.foo-service-name.someAction', (payload, actions, messageType) =>
actions.success(payload * 2)
)
// handler can aslo just return promise, or `.then`able or value and result will be replied with success status
// exception or rejected promise will cause replying error (be sure throw `Error` with message)
rpc.respond('v1.foo-service-name.anotherAction', payload =>
SomeDB.query({
/* ... */
}).then(rows => ({ count: rows.count, data: rows }))
)
rpc.respond('v1.foo-service-name.thirdAction', payload => payload * 2)
rpc.respond(
'v1.foo-service-name.someResource.*',
(payload, actions, messageType) => {
const [version, serviceName, resource, actionName] = messageType.split('.')
switch (actionName) {
case 'find':
return Resource.findAll(paylaod)
case 'create':
return Resource.create(payload)
case 'destroy':
return Resource.destroy(payload)
default:
throw new Error(`Action '${actionName}' is not supported!`)
}
}
)
// in your service initialization cycle
rpc.configure(config)
messageType
- seerpc.respond
messageType argumentpayload
- payload data, which will passed into respond handler (see supported payload)options
- rabbot request options (will be merged with defaults:{replyTimeout: 10000}
)raw
- resolve rabbot replymessage
instead ofmessage.body
returns Promise
, which resolved with body
(or message
if raw
is true
)
body
ormessage
const rpc = require('rabrpc')
// request allowed only after initialization
rpc
.configure(config)
.then(() => rpc.request('v1.foo-service-name.someAction', 42))
.then(body => {
console.log('response:', body.data) // body = {status: 'succes', data: 84}
})
const rpc = require('rabrpc') // singleton
const config = {
connection: 'amqp://guest:guest@localhost:5672/?heartbeat=10',
sub: {
// string or object or array of strings or objects
serviceName: 'foo-service-name',
limit: 10
// ...etc
}
}
// somewhere in your microservice initialization cycle
rpc.configure(config) // returns promise
const rpc = require('rabrpc') // singleton
const config = {
connection: '<URI string>', // see above
// pub: 'foo-service-name' | ['foo-service-name', 'bar-service-name'] | {serviceName: 'foo-service-name'} | [{serviceName: 'foo-service-name'}, {serviceName: 'bar-service-name'}]
pub: 'foo-service-name'
}
// somewhere in your microservice initialization cycle
rpc.configure(config) // returns promise
Parmeter | Value | Example |
---|---|---|
exchange | pub-sub.serviceName |
pub-sub.foo-service-name |
queue | pub-sub.serviceName .uuid4 |
pub-sub.foo-service-name.110ec58a-a0f2-4ac4-8393-c866d813b8d1 |
routingKey | serviceName |
foo-service-name |
messageType | version .serviceName .action |
v1.foo-service-name.someAction |
messageType
- full path for service action, e.g.'v1.images.archive'
or'v1.statistics.synchronize'
where second part (images
,statistics
) is a serviceName specified in config (in rabbot using as type of message)handler
- function, which takespayload
ormessage
,actions
andmessageType
payload
ormessage
-message
ifraw
istrue
otherwisemessage.body
messageType
- type of rabbot message (usefull when listening for types, which contain*
or#
)
raw
- iftrue
then first argument forhandler
will be rabbotmessage
insteadmessage.body
by default
const rpc = require('rabrpc')
// before initialization
rpc.subscribe(
'v1.foo-service-name.someAction',
(payload, actions, messageType) => {}
)
// always auto ack
// in your service initialization cycle
rpc.configure(config)
messageType
- seerpc.respond
messageType argumentpayload
- payload data, which will passed into respond handler (see supported payload)options
- rabbot publish options (will be merged with defaults:{replyTimeout: 10000}
)
returns rabbot publish Promise
(see Rabbot Publish)
const rpc = require('rabrpc')
// publish allowed only after initialization
rpc
.configure(config)
.then(() => rpc.publish('v1.foo-service-name.someAction', 42))
.then(() => {
console.log('Message published')
})
const rpc = require('rabrpc') // singleton
const config = {
connection: 'amqp://guest:guest@localhost:5672/?heartbeat=10',
recv: {
// string or object or array of strings or objects
serviceName: 'foo-service-name',
messageTtl: 30000,
limit: 10
// ...etc
}
}
// somewhere in your microservice initialization cycle
rpc.configure(config) // returns promise
const rpc = require('rabrpc') // singleton
const config = {
connection: '<URI string>', // see above
// send: 'foo-service-name' | ['foo-service-name', 'bar-service-name'] | {serviceName: 'foo-service-name'} | [{serviceName: 'foo-service-name'}, {serviceName: 'bar-service-name'}]
send: 'foo-service-name'
}
// somewhere in your microservice initialization cycle
rpc.configure(config) // returns promise
Parmeter | Value | Example |
---|---|---|
exchange | send-recv.serviceName |
send-recv.foo-service-name |
queue | send-recv.serviceName |
send-recv.foo-service-name |
routingKey | serviceName |
foo-service-name |
messageType | version .serviceName .action |
v1.foo-service-name.someAction |
messageType
- full path for service action, e.g.'v1.images.archive'
or'v1.statistics.synchronize'
where second part (images
,statistics
) is a serviceName specified in config (in rabbot using as type of message)handler
- function, which takespayload
,actions
andmessageType
payload
ormessage
-message
ifraw
istrue
otherwisemessage.body
actions
- object with 3 functionsack
,nack
,reject
(see Rabbot Message API)messageType
- type of rabbot message (usefull when listening for types, which contain*
or#
)
raw
- iftrue
then first argument forhandler
will be rabbotmessage
insteadmessage.body
by default
const rpc = require('rabrpc')
// before initialization
rpc.receive('v1.foo-service-name.someAction', (payload, actions, messageType) =>
actions.ack()
)
// handler can aslo just return promise, or `.then`able or value and message will be ack'ed on promise resolution
// exception or rejected promise will cause nack'ing message
rpc.receive('v1.foo-service-name.anotherAction', payload =>
SomeDB.query({
/* ... */
})
) // auto ack
rpc.receive(
'v1.foo-service-name.someResource.*',
(payload, actions, messageType) => {
// you can manually ack message if you don't need default behaviuor
actions.ack() // DO NOT RETURNS PROMISE
const [version, serviceName, resource, actionName] = messageType.split('.')
switch (actionName) {
case 'find':
return Resource.findAll(paylaod)
case 'create':
return Resource.create(payload)
case 'destroy':
return Resource.destroy(payload)
default:
throw new Error(`Action '${actionName}' is not supported!`) // will not produce nack call
}
}
)
// in your service initialization cycle
rpc.configure(config)
messageType
- seerpc.respond
messageType argumentpayload
- payload data, which will passed into respond handler (see supported payload)options
- rabbot publish options (will be merged with defaults:{replyTimeout: 10000}
)
returns rabbot publish Promise
(see Rabbot Publish)
const rpc = require('rabrpc')
// send allowed only after initialization
rpc
.configure(config)
.then(() => rpc.send('v1.foo-service-name.someAction', 42))
.then(() => {
console.log('Message sended')
})
remove all handlers, unsubscribe from queues
string
number
null
- JSON serializable
Object
Buffer