-
Notifications
You must be signed in to change notification settings - Fork 2
/
chans.go
89 lines (83 loc) · 1.72 KB
/
chans.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package websocket
import (
"time"
)
type ChannelHandler func(rc <-chan *Message, wc chan<- *Message) error
func reader(wsc *Connection, rc chan *Message, wc chan *Message) {
for {
msg, err := wsc.Recv()
if err != nil {
//wsc.LogError(err.Error())
wsc.LogError("1) %T %v %s", err, err, err.Error())
wsc.LogInfo("closing rc")
close(rc)
wc <- &Message{OPCODE_CLOSE, BuildCloseBodyError(err)}
return
}
switch msg.Opcode {
case OPCODE_PING:
wc <- &Message{OPCODE_PONG, msg.Body}
case OPCODE_PONG:
// okay, ignore it
case OPCODE_CLOSE:
// echo close frame back to client
close(rc)
wc <- msg
return
default:
// pass to handler
rc <- msg
}
}
}
func writer(wsc *Connection, rc chan *Message, wc chan *Message) {
for {
msg, ok := <-wc
if !ok {
wsc.LogError("wc unexpectedly closed")
wsc.Close()
return
}
err := wsc.Send(msg)
if err != nil {
wsc.LogError(err.Error())
break
}
if msg.Opcode == OPCODE_CLOSE {
break
}
}
serverClose := wsc.RcvdClose == nil
t := time.NewTimer(wsc.server.Config.CloseTimeout)
OUT:
for {
select {
case msg := <-wc:
if serverClose {
if wsc.RcvdClose != nil {
break OUT
}
} else {
if msg == nil || msg.Opcode == OPCODE_CLOSE {
break OUT
}
}
case <-t.C:
wsc.LogWarn("timeout while closing connection (serverClose=%s)", serverClose)
break OUT
}
}
wsc.Close()
return
}
func WrapChannelHandler(handler ChannelHandler, l int) HandlerFunc {
return func(wsc *Connection) error {
rc := make(chan *Message, l)
wc := make(chan *Message, l)
go reader(wsc, rc, wc)
go writer(wsc, rc, wc)
err := handler(rc, wc)
wc <- &Message{OPCODE_CLOSE, BuildCloseBodyError(err)}
return err
}
}