Replies: 3 comments 11 replies
-
You can do distributed synchronization and locked with NATS using the The subject is the "key" that you want to lock, if you succeed to publish on the subject then you have acquired the lock, which you release by removing the message from the stream. If the publish fails then you could not get the lock. This is very similar to what is described in this blog: https://nats.io/blog/new-per-subject-discard-policy/ (basically a variation on the same theme). And yes you could also use the atomic compare and set features of JetStream (eg do an "insert" rather than "upsert") to the same effect. |
Beta Was this translation helpful? Give feedback.
-
the previous comment describes the solution to building a lock with NATS. it is roughly: nats stream add work --defaults --discard-per-subject --subjects='locks.*' --storage=memory --discard=new --max-msgs-per-subject=1
nats req locks.bob red # ok
nats req locks.bob red # will fail
nats stream rmm locks 1 #clear the lock
however, with the help of the maintainers i discovered a much smarter method of ensuring only one worker runs on one item writeup with more context is on my blog https://docs.kraudcloud.com/blog/2023/11/22/exclusive-worker-tokens-with-nats/ the basic idea is that the same DiscardNewPerSubject construct can be used to push a notification and the lock to the workers. nats will hold the item in the queue until the worker acks it, leading to an exclusive lock on the item. package main
import (
"fmt"
"github.com/nats-io/nats.go"
"time"
)
func InitNats() {
nc, err := nats.Connect("localhost")
if err != nil {
panic(err)
}
defer nc.Close()
js, err := nc.JetStream()
if err != nil {
panic(err)
}
// first create an inbox queue holding the latest state of work to be done
// values in here are replaced when new work on the same topic is submitted
_, err = js.AddStream(&nats.StreamConfig{
Name: "inbox",
Subjects: []string{"inbox.*"},
MaxMsgsPerSubject: 1,
Discard: nats.DiscardNew,
})
if err != nil {
panic(fmt.Sprintf("Error creating jetstream [needs a nats-server with -js] : %v", err))
}
// items are moved from the inbox into a token lock.
// these are held by a worker until its done and only THEN a new value is pulled from the inbox.
// if the worker fails to ack the item, it is resent to a different worker
_, err = js.AddStream(&nats.StreamConfig{
Name: "work",
Sources: []*nats.StreamSource{
{
Name: "inbox",
},
},
MaxMsgsPerSubject: 1,
Discard: nats.DiscardNew,
// this means you cant update a running token
DiscardNewPerSubject: true,
// an ack deletes the message and frees the topic for new work
Retention: nats.WorkQueuePolicy,
})
if err != nil {
panic(fmt.Sprintf("Error creating jetstream [needs a nats-server with -js] : %v", err))
}
// push the token into a delivery group
_, err = js.AddConsumer("work", &nats.ConsumerConfig{
Durable: "work",
DeliverSubject: "work",
DeliverGroup: "workers",
DeliverPolicy: nats.DeliverAllPolicy,
AckPolicy: nats.AckExplicitPolicy,
AckWait: 30 * time.Second,
Heartbeat: time.Second,
})
if err != nil {
panic(fmt.Sprintf("Error creating jetstream consumer : %v", err))
}
ch := make(chan *nats.Msg, 64)
nc.ChanQueueSubscribe("work", "workers", ch)
for msg := range ch {
if len(msg.Reply) == 0 {
// not jetstream, probably keepalive
continue
}
fmt.Println(msg.Reply)
fmt.Printf("Received a message on %s: %s\n", msg.Subject, string(msg.Data))
go func(msg *nats.Msg) {
for i := 0; i < 60; i++ {
fmt.Println("working...")
rsp, err := nc.Request(msg.Reply, []byte("+WPI"), time.Second)
if err != nil {
// lost lock, stop immediately or we risk working in parallel
panic(err)
}
fmt.Println("got in progress response", string(rsp.Data))
time.Sleep(1 * time.Second)
}
fmt.Println("done")
msg.Ack()
}(msg)
}
} |
Beta Was this translation helpful? Give feedback.
-
@aep have you tried this in production? |
Beta Was this translation helpful? Give feedback.
-
with nats having KV now, i wonder if we can just drop redis entirely.
however, we use redis SETNX to build locks, (or leases in k8s terms)
the algo is well described here https://redis.io/docs/manual/patterns/distributed-locks/
and has been battle tested over the years.
has anyone done similar work with nats yet?
I guess we can somehow use Create() and that should be atomic?
but there's no TTL, so its unclear how you would expire the lock if the locker is dead.
Beta Was this translation helpful? Give feedback.
All reactions