1
0
Fork 0
mirror of https://github.com/diamondburned/arikawa.git synced 2024-11-17 12:23:08 +00:00

Gateway/Voice: Improved the event loop, fixed race conditions

This commit is contained in:
diamondburned (Forefront) 2020-04-25 00:13:07 -07:00
parent 5305600187
commit 51e88a47b2
5 changed files with 70 additions and 49 deletions

View file

@ -143,6 +143,8 @@ func NewCustomGateway(gatewayURL, token string) *Gateway {
// Close closes the underlying Websocket connection.
func (g *Gateway) Close() error {
wsutil.WSDebug("Trying to close.")
// Check if the WS is already closed:
if g.waitGroup == nil && g.PacerLoop.Stopped() {
wsutil.WSDebug("Gateway is already closed.")
@ -301,26 +303,22 @@ func (g *Gateway) start() error {
// Start the event handler, which also handles the pacemaker death signal.
g.waitGroup.Add(1)
go g.handleWS()
g.PacerLoop.RunAsync(func(err error) {
g.waitGroup.Done() // mark so Close() can exit.
wsutil.WSDebug("Event loop stopped with error:", err)
if err != nil {
g.ErrorLog(err)
g.Reconnect()
}
})
wsutil.WSDebug("Started successfully.")
return nil
}
// handleWS uses the Websocket and parses them into g.Events.
func (g *Gateway) handleWS() {
err := g.PacerLoop.Run()
g.waitGroup.Done() // mark so Close() can exit.
wsutil.WSDebug("Event loop stopped.")
if err != nil {
g.ErrorLog(err)
g.Reconnect()
// Reconnect should spawn another eventLoop in its Start function.
}
}
func (g *Gateway) Send(code OPCode, v interface{}) error {
var op = wsutil.OP{
Code: code,

View file

@ -31,6 +31,29 @@ func (t *AtomicTime) Time() time.Time {
return time.Unix(0, t.Get())
}
type atomicStop atomic.Value
func (s *atomicStop) Stop() bool {
if v := (*atomic.Value)(s).Load(); v != nil {
ch := v.(chan struct{})
close(ch)
return true
}
return false
}
func (s *atomicStop) Recv() <-chan struct{} {
if v := (*atomic.Value)(s).Load(); v != nil {
return v.(chan struct{})
}
return nil
}
func (s *atomicStop) SetNil() {
(*atomic.Value)(s).Store((chan struct{})(nil))
}
func (s *atomicStop) Reset() {
(*atomic.Value)(s).Store(make(chan struct{}))
}
type Pacemaker struct {
// Heartrate is the received duration between heartbeats.
Heartrate time.Duration
@ -42,7 +65,7 @@ type Pacemaker struct {
// Any callback that returns an error will stop the pacer.
Pace func() error
stop chan struct{}
stop atomicStop
death chan error
}
@ -82,8 +105,7 @@ func (p *Pacemaker) Dead() bool {
}
func (p *Pacemaker) Stop() {
if p.stop != nil {
p.stop <- struct{}{}
if p.stop.Stop() {
Debug("(*Pacemaker).stop was sent a stop signal.")
} else {
Debug("(*Pacemaker).stop is nil, skipping.")
@ -119,7 +141,7 @@ func (p *Pacemaker) start() error {
}
select {
case <-p.stop:
case <-p.stop.Recv():
Debug("Received stop signal.")
return nil
@ -132,7 +154,7 @@ func (p *Pacemaker) start() error {
// StartAsync starts the pacemaker asynchronously. The WaitGroup is optional.
func (p *Pacemaker) StartAsync(wg *sync.WaitGroup) (death chan error) {
p.death = make(chan error)
p.stop = make(chan struct{})
p.stop.Reset()
if wg != nil {
wg.Add(1)
@ -143,7 +165,7 @@ func (p *Pacemaker) StartAsync(wg *sync.WaitGroup) (death chan error) {
// Debug.
Debug("Pacemaker returned.")
// Mark the stop channel as nil, so later Close() calls won't block forever.
p.stop = nil
p.stop.SetNil()
// Mark the pacemaker loop as done.
if wg != nil {

View file

@ -7,7 +7,7 @@ type Bool struct {
}
func (b *Bool) Get() bool {
return atomic.LoadUint32(&b.val) == 1
return atomic.LoadUint32(&b.val) > 0
}
func (b *Bool) Set(val bool) {

View file

@ -4,6 +4,7 @@ import (
"time"
"github.com/diamondburned/arikawa/utils/heart"
"github.com/diamondburned/arikawa/utils/moreatomic"
"github.com/pkg/errors"
)
@ -19,6 +20,8 @@ type PacemakerLoop struct {
pacemaker *heart.Pacemaker // let's not copy this
pacedeath chan error
running moreatomic.Bool
events <-chan Event
handler func(*OP) error
@ -59,27 +62,29 @@ func (p *PacemakerLoop) Stop() {
}
func (p *PacemakerLoop) Stopped() bool {
return p == nil || p.pacedeath == nil
return p == nil || !p.running.Get()
}
func (p *PacemakerLoop) Run() error {
// If the event loop is already running.
if p.pacedeath != nil {
return nil
}
func (p *PacemakerLoop) RunAsync(exit func(error)) {
WSDebug("Starting the pacemaker loop.")
// callers should explicitly handle waitgroups.
p.pacedeath = p.pacemaker.StartAsync(nil)
p.running.Set(true)
defer func() {
// mark pacedeath once done
p.pacedeath = nil
WSDebug("Pacemaker loop has exited.")
go func() {
exit(p.startLoop())
}()
}
func (p *PacemakerLoop) startLoop() error {
defer WSDebug("Pacemaker loop has exited.")
defer p.running.Set(false)
for {
select {
case err := <-p.pacedeath:
// return nil if err == nil
return errors.Wrap(err, "Pacemaker died, reconnecting")
case ev, ok := <-p.events:

View file

@ -168,11 +168,22 @@ func (c *Gateway) __start() error {
return errors.Wrap(err, "Failed to wait for Ready or Resumed")
}
// Create an event loop executor.
c.EventLoop = wsutil.NewLoop(hello.HeartbeatInterval.Duration(), ch, c)
// Start the event handler, which also handles the pacemaker death signal.
c.waitGroup.Add(1)
// Start the websocket handler.
go c.handleWS(wsutil.NewLoop(hello.HeartbeatInterval.Duration(), ch, c))
c.EventLoop.RunAsync(func(err error) {
c.waitGroup.Done() // mark so Close() can exit.
wsutil.WSDebug("Event loop stopped.")
if err != nil {
c.ErrorLog(err)
c.Reconnect()
// Reconnect should spawn another eventLoop in its Start function.
}
})
wsutil.WSDebug("Started successfully.")
@ -258,21 +269,6 @@ func (c *Gateway) SessionDescription(sp SelectProtocol) (*SessionDescriptionEven
return sesdesc, nil
}
// handleWS .
func (c *Gateway) handleWS(evl *wsutil.PacemakerLoop) {
c.EventLoop = evl
err := c.EventLoop.Run()
c.waitGroup.Done() // mark so Close() can exit.
wsutil.WSDebug("Event loop stopped.")
if err != nil {
c.ErrorLog(err)
c.Reconnect()
// Reconnect should spawn another eventLoop in its Start function.
}
}
// Send .
func (c *Gateway) Send(code OPCode, v interface{}) error {
return c.send(code, v)