From c0c17085baba27c8581707ecb5ac6a0693c69e44 Mon Sep 17 00:00:00 2001 From: "diamondburned (Forefront)" Date: Thu, 23 Apr 2020 20:32:48 -0700 Subject: [PATCH] Heart: Migrated Voice to PacemakerLoop --- utils/heart/heart.go | 245 +++++++++++++++++++++++++++++++++++++++++++ voice/connection.go | 95 ++++++++++------- voice/op.go | 22 +++- 3 files changed, 322 insertions(+), 40 deletions(-) create mode 100644 utils/heart/heart.go diff --git a/utils/heart/heart.go b/utils/heart/heart.go new file mode 100644 index 0000000..c1f91ae --- /dev/null +++ b/utils/heart/heart.go @@ -0,0 +1,245 @@ +// Package heart implements a general purpose pacemaker. +package heart + +import ( + "log" + "sync" + "sync/atomic" + "time" + + "github.com/diamondburned/arikawa/utils/wsutil" + "github.com/pkg/errors" +) + +// Debug is the default logger that Pacemaker uses. +var Debug = func(v ...interface{}) {} + +var ErrDead = errors.New("no heartbeat replied") + +// AtomicTime is a thread-safe UnixNano timestamp guarded by atomic. +type AtomicTime struct { + unixnano int64 +} + +func (t *AtomicTime) Get() int64 { + return atomic.LoadInt64(&t.unixnano) +} + +func (t *AtomicTime) Set(time time.Time) { + atomic.StoreInt64(&t.unixnano, time.UnixNano()) +} + +func (t *AtomicTime) Time() time.Time { + return time.Unix(0, t.Get()) +} + +type Pacemaker struct { + // Heartrate is the received duration between heartbeats. + Heartrate time.Duration + + // Time in nanoseconds, guarded by atomic read/writes. + SentBeat AtomicTime + EchoBeat AtomicTime + + // Any callback that returns an error will stop the pacer. + Pace func() error + + stop chan struct{} + death chan error +} + +func NewPacemaker(heartrate time.Duration, pacer func() error) *Pacemaker { + return &Pacemaker{ + Heartrate: heartrate, + Pace: pacer, + } +} + +func (p *Pacemaker) Echo() { + // Swap our received heartbeats + // p.LastBeat[0], p.LastBeat[1] = time.Now(), p.LastBeat[0] + p.EchoBeat.Set(time.Now()) +} + +// Dead, if true, will have Pace return an ErrDead. +func (p *Pacemaker) Dead() bool { + /* Deprecated + if p.LastBeat[0].IsZero() || p.LastBeat[1].IsZero() { + return false + } + + return p.LastBeat[0].Sub(p.LastBeat[1]) > p.Heartrate*2 + */ + + var ( + echo = p.EchoBeat.Get() + sent = p.SentBeat.Get() + ) + + if echo == 0 || sent == 0 { + return false + } + + return sent-echo > int64(p.Heartrate)*2 +} + +func (p *Pacemaker) Stop() { + if p.stop != nil { + p.stop <- struct{}{} + Debug("(*Pacemaker).stop was sent a stop signal.") + } else { + Debug("(*Pacemaker).stop is nil, skipping.") + } +} + +func (p *Pacemaker) start() error { + log.Println("HR:", p.Heartrate) + tick := time.NewTicker(p.Heartrate) + defer tick.Stop() + + // Echo at least once + p.Echo() + + for { + Debug("Pacemaker loop restarted.") + + if err := p.Pace(); err != nil { + return err + } + + Debug("Paced.") + + // Paced, save: + p.SentBeat.Set(time.Now()) + + if p.Dead() { + return ErrDead + } + + select { + case <-p.stop: + Debug("Received stop signal.") + return nil + + case <-tick.C: + Debug("Ticked. Restarting.") + } + } +} + +// StartAsync starts the pacemaker asynchronously. The WaitGroup is optional. +func (p *Pacemaker) StartAsync(wg *sync.WaitGroup) (death chan error) { + p.death = make(chan error) + p.stop = make(chan struct{}) + + if wg != nil { + wg.Add(1) + } + + go func() { + p.death <- p.start() + // Debug. + Debug("Pacemaker returned.") + // Mark the stop channel as nil, so later Close() calls won't block forever. + p.stop = nil + + // Mark the pacemaker loop as done. + if wg != nil { + wg.Done() + } + }() + + return p.death +} + +// TODO API +type EventLoop interface { + Heartbeat() error + HandleEvent(ev wsutil.Event) error +} + +// PacemakerLoop provides an event loop with a pacemaker. +type PacemakerLoop struct { + pacemaker *Pacemaker // let's not copy this + pacedeath chan error + + events <-chan wsutil.Event + handler func(wsutil.Event) error + + ErrorLog func(error) +} + +func NewLoop(heartrate time.Duration, evs <-chan wsutil.Event, evl EventLoop) *PacemakerLoop { + pacemaker := NewPacemaker(heartrate, evl.Heartbeat) + + return &PacemakerLoop{ + pacemaker: pacemaker, + events: evs, + handler: evl.HandleEvent, + } +} + +func (p *PacemakerLoop) errorLog(err error) { + if p.ErrorLog == nil { + Debug("Uncaught error:", err) + return + } + + p.ErrorLog(err) +} + +func (p *PacemakerLoop) Echo() { + p.pacemaker.Echo() +} + +func (p *PacemakerLoop) Stop() { + p.pacemaker.Stop() +} + +func (p *PacemakerLoop) Stopped() bool { + return p.pacedeath == nil +} + +func (p *PacemakerLoop) Run() error { + // If the event loop is already running. + if p.pacedeath != nil { + return nil + } + // callers should explicitly handle waitgroups. + p.pacedeath = p.pacemaker.StartAsync(nil) + + defer func() { + // mark pacedeath once done + p.pacedeath = nil + + Debug("Pacemaker loop has exited.") + }() + + for { + select { + case err := <-p.pacedeath: + // Got a paceDeath, we're exiting from here on out. + p.pacedeath = nil // mark + + if err == nil { + // No error, just exit normally. + return nil + } + + return errors.Wrap(err, "Pacemaker died, reconnecting") + + case ev, ok := <-p.events: + if !ok { + // Events channel is closed. Kill the pacemaker manually and + // die. + p.pacemaker.Stop() + return <-p.pacedeath + } + + // Handle the event + if err := p.handler(ev); err != nil { + p.errorLog(errors.Wrap(err, "WS handler error")) + } + } + } +} diff --git a/voice/connection.go b/voice/connection.go index c223d76..dc757c8 100644 --- a/voice/connection.go +++ b/voice/connection.go @@ -18,6 +18,7 @@ import ( "github.com/diamondburned/arikawa/discord" "github.com/diamondburned/arikawa/gateway" + "github.com/diamondburned/arikawa/utils/heart" "github.com/diamondburned/arikawa/utils/json" "github.com/diamondburned/arikawa/utils/wsutil" "github.com/pkg/errors" @@ -48,7 +49,8 @@ type Connection struct { WS *wsutil.Websocket WSTimeout time.Duration - Pacemaker *gateway.Pacemaker + EventLoop *heart.PacemakerLoop + // Pacemaker *gateway.Pacemaker udpConn *net.UDPConn OpusSend chan []byte @@ -174,27 +176,35 @@ func (c *Connection) start() error { // Make a new WaitGroup for use in background loops: c.waitGroup = new(sync.WaitGroup) - // Start the websocket handler. - go c.handleWS() - // Wait for hello. WSDebug("Waiting for Hello..") - <-c.helloChan + + _, err := AssertEvent(c, <-c.WS.Listen(), HelloOP, &c.hello) + if err != nil { + return errors.Wrap(err, "Error at Hello") + } + WSDebug("Received Hello") - // Start the pacemaker with the heartrate received from Hello, after - // initializing everything. This ensures we only heartbeat if the websocket - // is authenticated. - c.Pacemaker = &gateway.Pacemaker{ - Heartrate: time.Duration(int(c.hello.HeartbeatInterval)) * time.Millisecond, - Pace: c.Heartbeat, - } - // Pacemaker dies here, only when it's fatal. - c.paceDeath = c.Pacemaker.StartAsync(c.waitGroup) + // // Start the pacemaker with the heartrate received from Hello, after + // // initializing everything. This ensures we only heartbeat if the websocket + // // is authenticated. + // c.Pacemaker = &gateway.Pacemaker{ + // Heartrate: time.Duration(int(c.hello.HeartbeatInterval)) * time.Millisecond, + // Pace: c.Heartbeat, + // } + // // Pacemaker dies here, only when it's fatal. + // c.paceDeath = c.Pacemaker.StartAsync(c.waitGroup) // Start the event handler, which also handles the pacemaker death signal. c.waitGroup.Add(1) + // Calculate the heartrate. + heartrate := time.Duration(int(c.hello.HeartbeatInterval)) * time.Millisecond + + // Start the websocket handler. + go c.handleWS(heart.NewLoop(heartrate, c.WS.Listen(), c)) + WSDebug("Started successfully.") return nil @@ -220,7 +230,8 @@ func (c *Connection) Close() error { WSDebug("Stopping pacemaker...") // Stop the pacemaker and the event handler - c.Pacemaker.Stop() + // c.Pacemaker.Stop() + c.EventLoop.Stop() WSDebug("Stopped pacemaker.") } @@ -288,9 +299,15 @@ func (c *Connection) Disconnect(g *gateway.Gateway) (err error) { return } +func (c *Connection) HandleEvent(ev wsutil.Event) error { + return HandleEvent(c, ev) +} + // handleWS . -func (c *Connection) handleWS() { - err := c.eventLoop() +func (c *Connection) handleWS(evl *heart.PacemakerLoop) { + c.EventLoop = evl + err := c.EventLoop.Run() + c.waitGroup.Done() // mark so Close() can exit. WSDebug("Event loop stopped.") @@ -301,32 +318,32 @@ func (c *Connection) handleWS() { } } -// eventLoop . -func (c *Connection) eventLoop() error { - ch := c.WS.Listen() +// // eventLoop . +// func (c *Connection) eventLoop() error { +// ch := c.WS.Listen() - for { - select { - case err := <-c.paceDeath: - // Got a paceDeath, we're exiting from here on out. - c.paceDeath = nil // mark +// for { +// select { +// case err := <-c.paceDeath: +// // Got a paceDeath, we're exiting from here on out. +// c.paceDeath = nil // mark - if err == nil { - WSDebug("Pacemaker stopped without errors.") - // No error, just exit normally. - return nil - } +// if err == nil { +// WSDebug("Pacemaker stopped without errors.") +// // No error, just exit normally. +// return nil +// } - return errors.Wrap(err, "Pacemaker died, reconnecting") +// return errors.Wrap(err, "Pacemaker died, reconnecting") - case ev := <-ch: - // Handle the event - if err := HandleEvent(c, ev); err != nil { - c.ErrorLog(errors.Wrap(err, "WS handler error")) - } - } - } -} +// case ev := <-ch: +// // Handle the event +// if err := HandleEvent(c, ev); err != nil { +// c.ErrorLog(errors.Wrap(err, "WS handler error")) +// } +// } +// } +// } // Send . func (c *Connection) Send(code OPCode, v interface{}) error { diff --git a/voice/op.go b/voice/op.go index 4660391..8270036 100644 --- a/voice/op.go +++ b/voice/op.go @@ -40,6 +40,26 @@ func HandleEvent(c *Connection, ev wsutil.Event) error { return HandleOP(c, o) } +func AssertEvent(driver json.Driver, ev wsutil.Event, code OPCode, v interface{}) (*OP, error) { + op, err := DecodeOP(driver, ev) + if err != nil { + return nil, err + } + + if op.Code != code { + return op, fmt.Errorf( + "Unexpected OP Code: %d, expected %d (%s)", + op.Code, code, op.Data, + ) + } + + if err := driver.Unmarshal(op.Data, v); err != nil { + return op, errors.Wrap(err, "Failed to decode data") + } + + return op, nil +} + func DecodeOP(driver json.Driver, ev wsutil.Event) (*OP, error) { if ev.Error != nil { return nil, ev.Error @@ -81,7 +101,7 @@ func HandleOP(c *Connection, op *OP) error { // Heartbeat response from the server case HeartbeatAckOP: - c.Pacemaker.Echo() + c.EventLoop.Echo() // Hello server, we hear you! :) case HelloOP: