Heart: Migrated Voice to PacemakerLoop

This commit is contained in:
diamondburned (Forefront) 2020-04-23 20:32:48 -07:00
parent 4cfde32531
commit c0c17085ba
3 changed files with 322 additions and 40 deletions

245
utils/heart/heart.go Normal file
View File

@ -0,0 +1,245 @@
// Package heart implements a general purpose pacemaker.
package heart
import (
"log"
"sync"
"sync/atomic"
"time"
"github.com/diamondburned/arikawa/utils/wsutil"
"github.com/pkg/errors"
)
// Debug is the default logger that Pacemaker uses.
var Debug = func(v ...interface{}) {}
var ErrDead = errors.New("no heartbeat replied")
// AtomicTime is a thread-safe UnixNano timestamp guarded by atomic.
type AtomicTime struct {
unixnano int64
}
func (t *AtomicTime) Get() int64 {
return atomic.LoadInt64(&t.unixnano)
}
func (t *AtomicTime) Set(time time.Time) {
atomic.StoreInt64(&t.unixnano, time.UnixNano())
}
func (t *AtomicTime) Time() time.Time {
return time.Unix(0, t.Get())
}
type Pacemaker struct {
// Heartrate is the received duration between heartbeats.
Heartrate time.Duration
// Time in nanoseconds, guarded by atomic read/writes.
SentBeat AtomicTime
EchoBeat AtomicTime
// Any callback that returns an error will stop the pacer.
Pace func() error
stop chan struct{}
death chan error
}
func NewPacemaker(heartrate time.Duration, pacer func() error) *Pacemaker {
return &Pacemaker{
Heartrate: heartrate,
Pace: pacer,
}
}
func (p *Pacemaker) Echo() {
// Swap our received heartbeats
// p.LastBeat[0], p.LastBeat[1] = time.Now(), p.LastBeat[0]
p.EchoBeat.Set(time.Now())
}
// Dead, if true, will have Pace return an ErrDead.
func (p *Pacemaker) Dead() bool {
/* Deprecated
if p.LastBeat[0].IsZero() || p.LastBeat[1].IsZero() {
return false
}
return p.LastBeat[0].Sub(p.LastBeat[1]) > p.Heartrate*2
*/
var (
echo = p.EchoBeat.Get()
sent = p.SentBeat.Get()
)
if echo == 0 || sent == 0 {
return false
}
return sent-echo > int64(p.Heartrate)*2
}
func (p *Pacemaker) Stop() {
if p.stop != nil {
p.stop <- struct{}{}
Debug("(*Pacemaker).stop was sent a stop signal.")
} else {
Debug("(*Pacemaker).stop is nil, skipping.")
}
}
func (p *Pacemaker) start() error {
log.Println("HR:", p.Heartrate)
tick := time.NewTicker(p.Heartrate)
defer tick.Stop()
// Echo at least once
p.Echo()
for {
Debug("Pacemaker loop restarted.")
if err := p.Pace(); err != nil {
return err
}
Debug("Paced.")
// Paced, save:
p.SentBeat.Set(time.Now())
if p.Dead() {
return ErrDead
}
select {
case <-p.stop:
Debug("Received stop signal.")
return nil
case <-tick.C:
Debug("Ticked. Restarting.")
}
}
}
// StartAsync starts the pacemaker asynchronously. The WaitGroup is optional.
func (p *Pacemaker) StartAsync(wg *sync.WaitGroup) (death chan error) {
p.death = make(chan error)
p.stop = make(chan struct{})
if wg != nil {
wg.Add(1)
}
go func() {
p.death <- p.start()
// Debug.
Debug("Pacemaker returned.")
// Mark the stop channel as nil, so later Close() calls won't block forever.
p.stop = nil
// Mark the pacemaker loop as done.
if wg != nil {
wg.Done()
}
}()
return p.death
}
// TODO API
type EventLoop interface {
Heartbeat() error
HandleEvent(ev wsutil.Event) error
}
// PacemakerLoop provides an event loop with a pacemaker.
type PacemakerLoop struct {
pacemaker *Pacemaker // let's not copy this
pacedeath chan error
events <-chan wsutil.Event
handler func(wsutil.Event) error
ErrorLog func(error)
}
func NewLoop(heartrate time.Duration, evs <-chan wsutil.Event, evl EventLoop) *PacemakerLoop {
pacemaker := NewPacemaker(heartrate, evl.Heartbeat)
return &PacemakerLoop{
pacemaker: pacemaker,
events: evs,
handler: evl.HandleEvent,
}
}
func (p *PacemakerLoop) errorLog(err error) {
if p.ErrorLog == nil {
Debug("Uncaught error:", err)
return
}
p.ErrorLog(err)
}
func (p *PacemakerLoop) Echo() {
p.pacemaker.Echo()
}
func (p *PacemakerLoop) Stop() {
p.pacemaker.Stop()
}
func (p *PacemakerLoop) Stopped() bool {
return p.pacedeath == nil
}
func (p *PacemakerLoop) Run() error {
// If the event loop is already running.
if p.pacedeath != nil {
return nil
}
// callers should explicitly handle waitgroups.
p.pacedeath = p.pacemaker.StartAsync(nil)
defer func() {
// mark pacedeath once done
p.pacedeath = nil
Debug("Pacemaker loop has exited.")
}()
for {
select {
case err := <-p.pacedeath:
// Got a paceDeath, we're exiting from here on out.
p.pacedeath = nil // mark
if err == nil {
// No error, just exit normally.
return nil
}
return errors.Wrap(err, "Pacemaker died, reconnecting")
case ev, ok := <-p.events:
if !ok {
// Events channel is closed. Kill the pacemaker manually and
// die.
p.pacemaker.Stop()
return <-p.pacedeath
}
// Handle the event
if err := p.handler(ev); err != nil {
p.errorLog(errors.Wrap(err, "WS handler error"))
}
}
}
}

View File

@ -18,6 +18,7 @@ import (
"github.com/diamondburned/arikawa/discord"
"github.com/diamondburned/arikawa/gateway"
"github.com/diamondburned/arikawa/utils/heart"
"github.com/diamondburned/arikawa/utils/json"
"github.com/diamondburned/arikawa/utils/wsutil"
"github.com/pkg/errors"
@ -48,7 +49,8 @@ type Connection struct {
WS *wsutil.Websocket
WSTimeout time.Duration
Pacemaker *gateway.Pacemaker
EventLoop *heart.PacemakerLoop
// Pacemaker *gateway.Pacemaker
udpConn *net.UDPConn
OpusSend chan []byte
@ -174,27 +176,35 @@ func (c *Connection) start() error {
// Make a new WaitGroup for use in background loops:
c.waitGroup = new(sync.WaitGroup)
// Start the websocket handler.
go c.handleWS()
// Wait for hello.
WSDebug("Waiting for Hello..")
<-c.helloChan
_, err := AssertEvent(c, <-c.WS.Listen(), HelloOP, &c.hello)
if err != nil {
return errors.Wrap(err, "Error at Hello")
}
WSDebug("Received Hello")
// Start the pacemaker with the heartrate received from Hello, after
// initializing everything. This ensures we only heartbeat if the websocket
// is authenticated.
c.Pacemaker = &gateway.Pacemaker{
Heartrate: time.Duration(int(c.hello.HeartbeatInterval)) * time.Millisecond,
Pace: c.Heartbeat,
}
// Pacemaker dies here, only when it's fatal.
c.paceDeath = c.Pacemaker.StartAsync(c.waitGroup)
// // Start the pacemaker with the heartrate received from Hello, after
// // initializing everything. This ensures we only heartbeat if the websocket
// // is authenticated.
// c.Pacemaker = &gateway.Pacemaker{
// Heartrate: time.Duration(int(c.hello.HeartbeatInterval)) * time.Millisecond,
// Pace: c.Heartbeat,
// }
// // Pacemaker dies here, only when it's fatal.
// c.paceDeath = c.Pacemaker.StartAsync(c.waitGroup)
// Start the event handler, which also handles the pacemaker death signal.
c.waitGroup.Add(1)
// Calculate the heartrate.
heartrate := time.Duration(int(c.hello.HeartbeatInterval)) * time.Millisecond
// Start the websocket handler.
go c.handleWS(heart.NewLoop(heartrate, c.WS.Listen(), c))
WSDebug("Started successfully.")
return nil
@ -220,7 +230,8 @@ func (c *Connection) Close() error {
WSDebug("Stopping pacemaker...")
// Stop the pacemaker and the event handler
c.Pacemaker.Stop()
// c.Pacemaker.Stop()
c.EventLoop.Stop()
WSDebug("Stopped pacemaker.")
}
@ -288,9 +299,15 @@ func (c *Connection) Disconnect(g *gateway.Gateway) (err error) {
return
}
func (c *Connection) HandleEvent(ev wsutil.Event) error {
return HandleEvent(c, ev)
}
// handleWS .
func (c *Connection) handleWS() {
err := c.eventLoop()
func (c *Connection) handleWS(evl *heart.PacemakerLoop) {
c.EventLoop = evl
err := c.EventLoop.Run()
c.waitGroup.Done() // mark so Close() can exit.
WSDebug("Event loop stopped.")
@ -301,32 +318,32 @@ func (c *Connection) handleWS() {
}
}
// eventLoop .
func (c *Connection) eventLoop() error {
ch := c.WS.Listen()
// // eventLoop .
// func (c *Connection) eventLoop() error {
// ch := c.WS.Listen()
for {
select {
case err := <-c.paceDeath:
// Got a paceDeath, we're exiting from here on out.
c.paceDeath = nil // mark
// for {
// select {
// case err := <-c.paceDeath:
// // Got a paceDeath, we're exiting from here on out.
// c.paceDeath = nil // mark
if err == nil {
WSDebug("Pacemaker stopped without errors.")
// No error, just exit normally.
return nil
}
// if err == nil {
// WSDebug("Pacemaker stopped without errors.")
// // No error, just exit normally.
// return nil
// }
return errors.Wrap(err, "Pacemaker died, reconnecting")
// return errors.Wrap(err, "Pacemaker died, reconnecting")
case ev := <-ch:
// Handle the event
if err := HandleEvent(c, ev); err != nil {
c.ErrorLog(errors.Wrap(err, "WS handler error"))
}
}
}
}
// case ev := <-ch:
// // Handle the event
// if err := HandleEvent(c, ev); err != nil {
// c.ErrorLog(errors.Wrap(err, "WS handler error"))
// }
// }
// }
// }
// Send .
func (c *Connection) Send(code OPCode, v interface{}) error {

View File

@ -40,6 +40,26 @@ func HandleEvent(c *Connection, ev wsutil.Event) error {
return HandleOP(c, o)
}
func AssertEvent(driver json.Driver, ev wsutil.Event, code OPCode, v interface{}) (*OP, error) {
op, err := DecodeOP(driver, ev)
if err != nil {
return nil, err
}
if op.Code != code {
return op, fmt.Errorf(
"Unexpected OP Code: %d, expected %d (%s)",
op.Code, code, op.Data,
)
}
if err := driver.Unmarshal(op.Data, v); err != nil {
return op, errors.Wrap(err, "Failed to decode data")
}
return op, nil
}
func DecodeOP(driver json.Driver, ev wsutil.Event) (*OP, error) {
if ev.Error != nil {
return nil, ev.Error
@ -81,7 +101,7 @@ func HandleOP(c *Connection, op *OP) error {
// Heartbeat response from the server
case HeartbeatAckOP:
c.Pacemaker.Echo()
c.EventLoop.Echo()
// Hello server, we hear you! :)
case HelloOP: