-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
adapter.go
119 lines (91 loc) · 2.47 KB
/
adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package socketio
import (
"context"
"encoding/json"
"fmt"
"github.com/redis/go-redis/v9"
)
type AdapterSender interface {
Broadcast(ctx context.Context, event string, data any) error
EmitForUser(ctx context.Context, userID, event string, data any) error
}
type AdapterReceiver interface {
// ReceivedNew will be called when new event is received.
//
// userID may be empty if this is a broadcast event.
//
// Currently all notifications will be processed here,
// without difference between user or broadcast notifications or
// if user in received event is present on this server.
ReceivedNew(ctx context.Context, userID, event string, data json.RawMessage)
}
var _ AdapterSender = RedisAdapter{}
type PushData struct {
UserID string
Event string
Data json.RawMessage
}
type RedisAdapter struct {
r redis.UniversalClient
recvr AdapterReceiver
codec DataCodec
eventsChannel string
}
func NewRedisAdapter(r redis.UniversalClient, recvr AdapterReceiver, codec DataCodec, eventsChannel string) AdapterSender {
if codec == nil {
codec = NewJSONCodec()
}
if eventsChannel == "" {
eventsChannel = "events:websocket"
}
adapter := RedisAdapter{
r: r,
recvr: recvr,
codec: codec,
eventsChannel: eventsChannel,
}
go adapter.Listen(context.Background())
return adapter
}
func (a RedisAdapter) Broadcast(ctx context.Context, event string, data any) error {
dataBytes, _ := a.codec.MarashalJSON(data)
if err := a.send(ctx, PushData{
Event: event,
Data: dataBytes,
}); err != nil {
return fmt.Errorf("broadcast: %w", err)
}
return nil
}
func (a RedisAdapter) EmitForUser(ctx context.Context, userID string, event string, data any) error {
dataBytes, _ := a.codec.MarashalJSON(data)
if err := a.send(ctx, PushData{
UserID: userID,
Event: event,
Data: dataBytes,
}); err != nil {
return fmt.Errorf("emitForUser: %w", err)
}
return nil
}
func (a RedisAdapter) send(ctx context.Context, data PushData) error {
bts, _ := json.Marshal(data)
resp := a.r.Publish(ctx, a.eventsChannel, bts)
if err := resp.Err(); err != nil {
return fmt.Errorf("send websocket event from adapter: %w", err)
}
return nil
}
func (a RedisAdapter) Listen(ctx context.Context) error {
s := a.r.Subscribe(ctx, a.eventsChannel)
defer s.Close()
evCh := s.Channel()
for msg := range evCh {
var d PushData
if err := json.Unmarshal([]byte(msg.Payload), &d); err != nil {
continue
}
a.recvr.ReceivedNew(ctx, d.UserID, d.Event, d.Data)
}
return nil
}