1
0
Fork 0
mirror of https://github.com/diamondburned/arikawa.git synced 2024-11-30 18:53:30 +00:00

Gateway: Migrated to wsutil.PacemakerLoop

This commit is contained in:
diamondburned (Forefront) 2020-04-24 15:30:15 -07:00
parent 443ec791af
commit 54ac0a6951
5 changed files with 21 additions and 135 deletions

View file

@ -135,10 +135,13 @@ func NewGateway(token string) (*Gateway, error) {
func NewCustomGateway(gatewayURL, token string) *Gateway { func NewCustomGateway(gatewayURL, token string) *Gateway {
return &Gateway{ return &Gateway{
WS: wsutil.NewCustom(wsutil.NewConn(), gatewayURL), WS: wsutil.NewCustom(wsutil.NewConn(), gatewayURL),
WSTimeout: wsutil.WSTimeout,
Events: make(chan Event, wsutil.WSBuffer), Events: make(chan Event, wsutil.WSBuffer),
Identifier: DefaultIdentifier(token), Identifier: DefaultIdentifier(token),
Sequence: NewSequence(), Sequence: NewSequence(),
ErrorLog: wsutil.WSError, ErrorLog: wsutil.WSError,
AfterClose: func(error) {}, AfterClose: func(error) {},
} }

View file

@ -8,10 +8,12 @@ import (
"strings" "strings"
"testing" "testing"
"time" "time"
"github.com/diamondburned/arikawa/utils/wsutil"
) )
func init() { func init() {
WSDebug = func(v ...interface{}) { wsutil.WSDebug = func(v ...interface{}) {
log.Println(append([]interface{}{"Debug:"}, v...)...) log.Println(append([]interface{}{"Debug:"}, v...)...)
} }
} }
@ -41,7 +43,7 @@ func TestIntegration(t *testing.T) {
t.Fatal("Missing $BOT_TOKEN") t.Fatal("Missing $BOT_TOKEN")
} }
WSError = func(err error) { wsutil.WSError = func(err error) {
t.Fatal(err) t.Fatal(err)
} }
@ -77,7 +79,11 @@ func TestIntegration(t *testing.T) {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
// Try and reconnect forever: // Try and reconnect forever:
gotimeout(t, gateway.Reconnect) gotimeout(t, func() {
if err := gateway.Reconnect(); err != nil {
t.Fatal("Unexpected error while reconnecting:", err)
}
})
// Wait for the desired event: // Wait for the desired event:
gotimeout(t, func() { gotimeout(t, func() {

View file

@ -1,121 +0,0 @@
package gateway
import (
"sync"
"sync/atomic"
"time"
"github.com/diamondburned/arikawa/utils/wsutil"
"github.com/pkg/errors"
)
var ErrDead = errors.New("no heartbeat replied")
// Time is a UnixNano timestamp.
type Time = int64
type Pacemaker struct {
// Heartrate is the received duration between heartbeats.
Heartrate time.Duration
// Time in nanoseconds, guarded by atomic read/writes.
SentBeat Time
EchoBeat Time
// Any callback that returns an error will stop the pacer.
Pace func() error
stop chan struct{}
death chan error
}
func (p *Pacemaker) Echo() {
// Swap our received heartbeats
// p.LastBeat[0], p.LastBeat[1] = time.Now(), p.LastBeat[0]
atomic.StoreInt64(&p.EchoBeat, time.Now().UnixNano())
}
// Dead, if true, will have Pace return an ErrDead.
func (p *Pacemaker) Dead() bool {
/* Deprecated
if p.LastBeat[0].IsZero() || p.LastBeat[1].IsZero() {
return false
}
return p.LastBeat[0].Sub(p.LastBeat[1]) > p.Heartrate*2
*/
var (
echo = atomic.LoadInt64(&p.EchoBeat)
sent = atomic.LoadInt64(&p.SentBeat)
)
if echo == 0 || sent == 0 {
return false
}
return sent-echo > int64(p.Heartrate)*2
}
func (p *Pacemaker) Stop() {
if p.stop != nil {
p.stop <- struct{}{}
wsutil.WSDebug("(*Pacemaker).stop was sent a stop signal.")
} else {
wsutil.WSDebug("(*Pacemaker).stop is nil, skipping.")
}
}
func (p *Pacemaker) start() error {
tick := time.NewTicker(p.Heartrate)
defer tick.Stop()
// Echo at least once
p.Echo()
for {
wsutil.WSDebug("Pacemaker loop restarted.")
if err := p.Pace(); err != nil {
return err
}
wsutil.WSDebug("Paced.")
// Paced, save:
atomic.StoreInt64(&p.SentBeat, time.Now().UnixNano())
if p.Dead() {
return ErrDead
}
select {
case <-p.stop:
wsutil.WSDebug("Received stop signal.")
return nil
case <-tick.C:
wsutil.WSDebug("Ticked. Restarting.")
}
}
}
// StartAsync starts the pacemaker asynchronously. The WaitGroup is optional.
func (p *Pacemaker) StartAsync(wg *sync.WaitGroup) (death chan error) {
p.death = make(chan error)
p.stop = make(chan struct{})
wg.Add(1)
go func() {
p.death <- p.start()
// Debug.
wsutil.WSDebug("Pacemaker returned.")
// Mark the stop channel as nil, so later Close() calls won't block forever.
p.stop = nil
// Mark the pacemaker loop as done.
wg.Done()
}()
return p.death
}

View file

@ -92,7 +92,13 @@ func (p *Pacemaker) Stop() {
} }
func (p *Pacemaker) start() error { func (p *Pacemaker) start() error {
log.Println("HR:", p.Heartrate) log.Println("Heartbeat interval:", p.Heartrate)
// Reset states to its old position.
p.EchoBeat.Set(time.Time{})
p.SentBeat.Set(time.Time{})
// Create a new ticker.
tick := time.NewTicker(p.Heartrate) tick := time.NewTicker(p.Heartrate)
defer tick.Stop() defer tick.Stop()

View file

@ -59,7 +59,7 @@ func (p *PacemakerLoop) Stop() {
} }
func (p *PacemakerLoop) Stopped() bool { func (p *PacemakerLoop) Stopped() bool {
return p.pacedeath == nil return p == nil || p.pacedeath == nil
} }
func (p *PacemakerLoop) Run() error { func (p *PacemakerLoop) Run() error {
@ -80,14 +80,6 @@ func (p *PacemakerLoop) Run() error {
for { for {
select { select {
case err := <-p.pacedeath: case err := <-p.pacedeath:
// Got a paceDeath, we're exiting from here on out.
p.pacedeath = nil // mark
if err == nil {
// No error, just exit normally.
return nil
}
return errors.Wrap(err, "Pacemaker died, reconnecting") return errors.Wrap(err, "Pacemaker died, reconnecting")
case ev, ok := <-p.events: case ev, ok := <-p.events: