From 54ac0a6951c28e937e9e26c96250b61b7e42fb6c Mon Sep 17 00:00:00 2001 From: "diamondburned (Forefront)" Date: Fri, 24 Apr 2020 15:30:15 -0700 Subject: [PATCH] Gateway: Migrated to wsutil.PacemakerLoop --- gateway/gateway.go | 5 +- gateway/integration_test.go | 12 +++- gateway/pacemaker.go | 121 ------------------------------------ utils/heart/heart.go | 8 ++- utils/wsutil/heart.go | 10 +-- 5 files changed, 21 insertions(+), 135 deletions(-) delete mode 100644 gateway/pacemaker.go diff --git a/gateway/gateway.go b/gateway/gateway.go index 889e2e2..98e2f6f 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -135,10 +135,13 @@ func NewGateway(token string) (*Gateway, error) { func NewCustomGateway(gatewayURL, token string) *Gateway { return &Gateway{ - WS: wsutil.NewCustom(wsutil.NewConn(), gatewayURL), + WS: wsutil.NewCustom(wsutil.NewConn(), gatewayURL), + WSTimeout: wsutil.WSTimeout, + Events: make(chan Event, wsutil.WSBuffer), Identifier: DefaultIdentifier(token), Sequence: NewSequence(), + ErrorLog: wsutil.WSError, AfterClose: func(error) {}, } diff --git a/gateway/integration_test.go b/gateway/integration_test.go index 5c2460e..b845faa 100644 --- a/gateway/integration_test.go +++ b/gateway/integration_test.go @@ -8,10 +8,12 @@ import ( "strings" "testing" "time" + + "github.com/diamondburned/arikawa/utils/wsutil" ) func init() { - WSDebug = func(v ...interface{}) { + wsutil.WSDebug = func(v ...interface{}) { log.Println(append([]interface{}{"Debug:"}, v...)...) } } @@ -41,7 +43,7 @@ func TestIntegration(t *testing.T) { t.Fatal("Missing $BOT_TOKEN") } - WSError = func(err error) { + wsutil.WSError = func(err error) { t.Fatal(err) } @@ -77,7 +79,11 @@ func TestIntegration(t *testing.T) { time.Sleep(5 * time.Second) // Try and reconnect forever: - gotimeout(t, gateway.Reconnect) + gotimeout(t, func() { + if err := gateway.Reconnect(); err != nil { + t.Fatal("Unexpected error while reconnecting:", err) + } + }) // Wait for the desired event: gotimeout(t, func() { diff --git a/gateway/pacemaker.go b/gateway/pacemaker.go deleted file mode 100644 index f51b0d6..0000000 --- a/gateway/pacemaker.go +++ /dev/null @@ -1,121 +0,0 @@ -package gateway - -import ( - "sync" - "sync/atomic" - "time" - - "github.com/diamondburned/arikawa/utils/wsutil" - "github.com/pkg/errors" -) - -var ErrDead = errors.New("no heartbeat replied") - -// Time is a UnixNano timestamp. -type Time = int64 - -type Pacemaker struct { - // Heartrate is the received duration between heartbeats. - Heartrate time.Duration - - // Time in nanoseconds, guarded by atomic read/writes. - SentBeat Time - EchoBeat Time - - // Any callback that returns an error will stop the pacer. - Pace func() error - - stop chan struct{} - death chan error -} - -func (p *Pacemaker) Echo() { - // Swap our received heartbeats - // p.LastBeat[0], p.LastBeat[1] = time.Now(), p.LastBeat[0] - atomic.StoreInt64(&p.EchoBeat, time.Now().UnixNano()) -} - -// Dead, if true, will have Pace return an ErrDead. -func (p *Pacemaker) Dead() bool { - /* Deprecated - if p.LastBeat[0].IsZero() || p.LastBeat[1].IsZero() { - return false - } - - return p.LastBeat[0].Sub(p.LastBeat[1]) > p.Heartrate*2 - */ - - var ( - echo = atomic.LoadInt64(&p.EchoBeat) - sent = atomic.LoadInt64(&p.SentBeat) - ) - - if echo == 0 || sent == 0 { - return false - } - - return sent-echo > int64(p.Heartrate)*2 -} - -func (p *Pacemaker) Stop() { - if p.stop != nil { - p.stop <- struct{}{} - wsutil.WSDebug("(*Pacemaker).stop was sent a stop signal.") - } else { - wsutil.WSDebug("(*Pacemaker).stop is nil, skipping.") - } -} - -func (p *Pacemaker) start() error { - tick := time.NewTicker(p.Heartrate) - defer tick.Stop() - - // Echo at least once - p.Echo() - - for { - wsutil.WSDebug("Pacemaker loop restarted.") - - if err := p.Pace(); err != nil { - return err - } - - wsutil.WSDebug("Paced.") - - // Paced, save: - atomic.StoreInt64(&p.SentBeat, time.Now().UnixNano()) - - if p.Dead() { - return ErrDead - } - - select { - case <-p.stop: - wsutil.WSDebug("Received stop signal.") - return nil - - case <-tick.C: - wsutil.WSDebug("Ticked. Restarting.") - } - } -} - -// StartAsync starts the pacemaker asynchronously. The WaitGroup is optional. -func (p *Pacemaker) StartAsync(wg *sync.WaitGroup) (death chan error) { - p.death = make(chan error) - p.stop = make(chan struct{}) - - wg.Add(1) - - go func() { - p.death <- p.start() - // Debug. - wsutil.WSDebug("Pacemaker returned.") - // Mark the stop channel as nil, so later Close() calls won't block forever. - p.stop = nil - // Mark the pacemaker loop as done. - wg.Done() - }() - - return p.death -} diff --git a/utils/heart/heart.go b/utils/heart/heart.go index c21f39b..218dc4f 100644 --- a/utils/heart/heart.go +++ b/utils/heart/heart.go @@ -92,7 +92,13 @@ func (p *Pacemaker) Stop() { } func (p *Pacemaker) start() error { - log.Println("HR:", p.Heartrate) + log.Println("Heartbeat interval:", p.Heartrate) + + // Reset states to its old position. + p.EchoBeat.Set(time.Time{}) + p.SentBeat.Set(time.Time{}) + + // Create a new ticker. tick := time.NewTicker(p.Heartrate) defer tick.Stop() diff --git a/utils/wsutil/heart.go b/utils/wsutil/heart.go index 432b5a4..637ba48 100644 --- a/utils/wsutil/heart.go +++ b/utils/wsutil/heart.go @@ -59,7 +59,7 @@ func (p *PacemakerLoop) Stop() { } func (p *PacemakerLoop) Stopped() bool { - return p.pacedeath == nil + return p == nil || p.pacedeath == nil } func (p *PacemakerLoop) Run() error { @@ -80,14 +80,6 @@ func (p *PacemakerLoop) Run() error { for { select { case err := <-p.pacedeath: - // Got a paceDeath, we're exiting from here on out. - p.pacedeath = nil // mark - - if err == nil { - // No error, just exit normally. - return nil - } - return errors.Wrap(err, "Pacemaker died, reconnecting") case ev, ok := <-p.events: