Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/gobot update eventer #980

Draft
wants to merge 2 commits into
base: release
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 49 additions & 12 deletions eventer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package gobot

import "sync"
import (
"errors"
"sync"
)

type eventChannel chan *Event

Expand All @@ -14,11 +17,19 @@ type eventer struct {
// map of out channels used by subscribers
outs map[eventChannel]eventChannel

// the in/out channel length
bufferSize int

// mutex to protect the eventChannel map
eventsMutex sync.Mutex
}

const eventChanBufferSize = 10
const (
eventChanBufferSize = 10
maxEventChanBufferSize = 10240
maxChanWorkerCount = 128
EventCrash = "crash"
)

// Eventer is the interface which describes how a Driver or Adaptor
// handles events.
Expand Down Expand Up @@ -54,10 +65,19 @@ type Eventer interface {

// NewEventer returns a new Eventer.
func NewEventer() Eventer {
return NewEventerWithBufferSize(eventChanBufferSize)
}

func NewEventerWithBufferSize(bufferSize int) Eventer {
if bufferSize >= maxEventChanBufferSize {
bufferSize = maxEventChanBufferSize
}

evtr := &eventer{
eventnames: make(map[string]string),
in: make(eventChannel, eventChanBufferSize),
in: make(eventChannel, bufferSize),
outs: make(map[eventChannel]eventChannel),
bufferSize: bufferSize,
}

// goroutine to cascade "in" events to all "out" event channels
Expand Down Expand Up @@ -108,7 +128,7 @@ func (e *eventer) Publish(name string, data interface{}) {
func (e *eventer) Subscribe() eventChannel {
e.eventsMutex.Lock()
defer e.eventsMutex.Unlock()
out := make(eventChannel, eventChanBufferSize)
out := make(eventChannel, e.bufferSize)
e.outs[out] = out
return out
}
Expand All @@ -122,17 +142,34 @@ func (e *eventer) Unsubscribe(events eventChannel) {

// On executes the event handler f when e is Published to.
func (e *eventer) On(n string, f func(s interface{})) (err error) {
return e.OnWithParallel(n, 1, f)
}

// Multi goroutines On executes the event handler f when e is Published to.
func (e *eventer) OnWithParallel(n string, workerCnt int, f func(s interface{})) (err error) {
if workerCnt > maxChanWorkerCount {
return errors.New("out of max worker count")
}
out := e.Subscribe()
go func() {
for {
select {
case evt := <-out:
if evt.Name == n {
f(evt.Data)
for i := 0; i < workerCnt; i++ {
go func() {
// 增加goroutine的panic处理,防止因回调f引起panic
defer func() {
if r := recover(); r != nil {
e.Publish(EventCrash, r)
}
}()

for {
select {
case evt := <-out:
if evt.Name == n {
f(evt.Data)
}
}
}
}
}()
}()
}

return
}
Expand Down