1
0
Fork 0
mirror of https://github.com/diamondburned/arikawa.git synced 2025-11-22 12:33:38 +00:00

Gateway: Deprecated Wait(), fixed ReconnectOP bug

This commit is contained in:
diamondburned (Forefront) 2020-04-12 11:06:43 -07:00
parent 320e8a52f1
commit 4e4ebde93a
8 changed files with 58 additions and 94 deletions

View file

@ -28,6 +28,7 @@ func main() {
if err := s.Open(); err != nil { if err := s.Open(); err != nil {
log.Fatalln("Failed to connect:", err) log.Fatalln("Failed to connect:", err)
} }
defer s.Close()
u, err := s.Me() u, err := s.Me()
if err != nil { if err != nil {
@ -36,8 +37,6 @@ func main() {
log.Println("Started as", u.Username) log.Println("Started as", u.Username)
// Block until a fatal error or SIGINT. Wait also calls Close(). // Block forever.
if err := s.Wait(); err != nil { select {}
log.Fatalln("Gateway fatal error:", err)
}
} }

View file

@ -38,6 +38,7 @@ func main() {
if err := s.Open(); err != nil { if err := s.Open(); err != nil {
log.Fatalln("Failed to connect:", err) log.Fatalln("Failed to connect:", err)
} }
defer s.Close()
u, err := s.Me() u, err := s.Me()
if err != nil { if err != nil {
@ -46,8 +47,6 @@ func main() {
log.Println("Started as", u.Username) log.Println("Started as", u.Username)
// Block until a fatal error or SIGINT. Wait also calls Close(). // Block forever.
if err := s.Wait(); err != nil { select {}
log.Fatalln("Gateway fatal error:", err)
}
} }

View file

@ -140,13 +140,15 @@ func Start(token string, cmd interface{},
} }
return func() error { return func() error {
// Run cancel() last to remove handlers when the context exits. Wait()
defer cancel() // remove handler first
return s.Wait() cancel()
// then finish closing session
return s.Close()
}, nil }, nil
} }
// Wait is deprecated. Use (*Context).Wait(). // Wait blocks until SIGINT.
func Wait() { func Wait() {
sigs := make(chan os.Signal) sigs := make(chan os.Signal)
signal.Notify(sigs, os.Interrupt) signal.Notify(sigs, os.Interrupt)

View file

@ -42,8 +42,6 @@ var (
// WSExtraReadTimeout is the duration to be added to Hello, as a read // WSExtraReadTimeout is the duration to be added to Hello, as a read
// timeout for the websocket. // timeout for the websocket.
WSExtraReadTimeout = time.Second WSExtraReadTimeout = time.Second
// WSRetries controls the number of Reconnects before erroring out.
WSRetries = 3
// WSDebug is used for extra debug logging. This is expected to behave // WSDebug is used for extra debug logging. This is expected to behave
// similarly to log.Println(). // similarly to log.Println().
WSDebug = func(v ...interface{}) {} WSDebug = func(v ...interface{}) {}
@ -90,12 +88,6 @@ type Gateway struct {
// reconnections or any type of connection interruptions. // reconnections or any type of connection interruptions.
AfterClose func(err error) // noop by default AfterClose func(err error) // noop by default
// FatalError is where Reconnect errors will go to. When an error is sent
// here, the Gateway is already dead, so Close() shouldn't be called.
// This channel is buffered once.
FatalError <-chan error
fatalError chan error
// Only use for debugging // Only use for debugging
// If this channel is non-nil, all incoming OP packets will also be sent // If this channel is non-nil, all incoming OP packets will also be sent
@ -133,9 +125,7 @@ func NewGatewayWithDriver(token string, driver json.Driver) (*Gateway, error) {
Sequence: NewSequence(), Sequence: NewSequence(),
ErrorLog: WSError, ErrorLog: WSError,
AfterClose: func(error) {}, AfterClose: func(error) {},
fatalError: make(chan error, 1),
} }
g.FatalError = g.fatalError
// Parameters for the gateway // Parameters for the gateway
param := url.Values{ param := url.Values{
@ -173,10 +163,6 @@ func (g *Gateway) Close() error {
WSDebug("Stopped pacemaker.") WSDebug("Stopped pacemaker.")
} }
WSDebug("Closing the websocket.")
err := g.WS.Close()
g.AfterClose(err)
WSDebug("Waiting for WaitGroup to be done.") WSDebug("Waiting for WaitGroup to be done.")
// This should work, since Pacemaker should signal its loop to stop, which // This should work, since Pacemaker should signal its loop to stop, which
@ -186,21 +172,23 @@ func (g *Gateway) Close() error {
// Mark g.waitGroup as empty: // Mark g.waitGroup as empty:
g.waitGroup = nil g.waitGroup = nil
WSDebug("WaitGroup is done.") WSDebug("WaitGroup is done. Closing the websocket.")
err := g.WS.Close()
g.AfterClose(err)
return err return err
} }
// Reconnects and resumes. // Reconnect tries to reconnect forever. It will resume the connection if
func (g *Gateway) Reconnect() error { // possible. If an Invalid Session is received, it will start a fresh one.
func (g *Gateway) Reconnect() {
WSDebug("Reconnecting...") WSDebug("Reconnecting...")
// Guarantee the gateway is already closed: // Guarantee the gateway is already closed. Ignore its error, as we're
if err := g.Close(); err != nil { // redialing anyway.
return errors.Wrap(err, "Failed to close Gateway before reconnecting") g.Close()
}
for i := 0; WSRetries < 0 || i < WSRetries; i++ { for i := 1; ; i++ {
WSDebug("Trying to dial, attempt", i) WSDebug("Trying to dial, attempt", i)
// Condition: err == ErrInvalidSession: // Condition: err == ErrInvalidSession:
@ -213,10 +201,8 @@ func (g *Gateway) Reconnect() error {
} }
WSDebug("Started after attempt:", i) WSDebug("Started after attempt:", i)
return nil return
} }
return ErrWSMaxTries
} }
// Open connects to the Websocket and authenticate it. You should usually use // Open connects to the Websocket and authenticate it. You should usually use
@ -261,11 +247,10 @@ func (g *Gateway) Start() error {
return nil return nil
} }
// Wait blocks until the Gateway fatally exits when it couldn't reconnect // Wait is deprecated. The gateway will reconnect forever. This function will
// anymore. To use this withh other channels, check out g.FatalError. If a // panic.
// non-nil error is returned, Close() shouldn't be called again.
func (g *Gateway) Wait() error { func (g *Gateway) Wait() error {
return <-g.FatalError panic("Wait is deprecated. defer (*Gateway).Close() is required.")
} }
func (g *Gateway) start() error { func (g *Gateway) start() error {
@ -319,12 +304,11 @@ func (g *Gateway) start() error {
g.Pacemaker = &Pacemaker{ g.Pacemaker = &Pacemaker{
Heartrate: hello.HeartbeatInterval.Duration(), Heartrate: hello.HeartbeatInterval.Duration(),
Pace: g.Heartbeat, Pace: g.Heartbeat,
OnDead: g.Reconnect,
} }
// Pacemaker dies here, only when it's fatal. // Pacemaker dies here, only when it's fatal.
g.paceDeath = g.Pacemaker.StartAsync(g.waitGroup) g.paceDeath = g.Pacemaker.StartAsync(g.waitGroup)
// Start the event handler // Start the event handler, which also handles the pacemaker death signal.
g.waitGroup.Add(1) g.waitGroup.Add(1)
go g.handleWS() go g.handleWS()
@ -336,13 +320,12 @@ func (g *Gateway) start() error {
// handleWS uses the Websocket and parses them into g.Events. // handleWS uses the Websocket and parses them into g.Events.
func (g *Gateway) handleWS() { func (g *Gateway) handleWS() {
err := g.eventLoop() err := g.eventLoop()
g.waitGroup.Done() g.waitGroup.Done() // mark so Close() can exit.
WSDebug("Event loop stopped.") WSDebug("Event loop stopped.")
if err != nil { if err != nil {
g.ErrorLog(err) g.ErrorLog(err)
g.Reconnect()
g.fatalError <- errors.Wrap(g.Reconnect(), "Failed to reconnect")
// Reconnect should spawn another eventLoop in its Start function. // Reconnect should spawn another eventLoop in its Start function.
} }
} }

View file

@ -76,28 +76,21 @@ func TestIntegration(t *testing.T) {
// Sleep past the rate limiter before reconnecting: // Sleep past the rate limiter before reconnecting:
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
// Try and reconnect // Try and reconnect forever:
if err := gateway.Reconnect(); err != nil { gotimeout(t, gateway.Reconnect)
t.Fatal("Failed to reconnect:", err)
}
timeout := time.After(10 * time.Second) // Wait for the desired event:
gotimeout(t, func() {
Main: for ev := range gateway.Events {
for {
select {
case ev := <-gateway.Events:
switch ev.(type) { switch ev.(type) {
// Accept only a Resumed event. // Accept only a Resumed event.
case *ResumedEvent: case *ResumedEvent:
break Main return // exit
case *ReadyEvent: case *ReadyEvent:
t.Fatal("Ready event received instead of Resumed.") t.Fatal("Ready event received instead of Resumed.")
} }
case <-timeout:
t.Fatal("Timed out waiting for ResumedEvent")
}
} }
})
if err := g.Close(); err != nil { if err := g.Close(); err != nil {
t.Fatal("Failed to close Gateway:", err) t.Fatal("Failed to close Gateway:", err)
@ -113,3 +106,18 @@ func wait(t *testing.T, evCh chan interface{}) interface{} {
return nil return nil
} }
} }
func gotimeout(t *testing.T, fn func()) {
var done = make(chan struct{})
go func() {
fn()
done <- struct{}{}
}()
select {
case <-time.After(10 * time.Second):
t.Fatal("Timed out waiting for function.")
case <-done:
return
}
}

View file

@ -146,7 +146,12 @@ func HandleOP(g *Gateway, op *OP) error {
case ReconnectOP: case ReconnectOP:
// Server requests to reconnect, die and retry. // Server requests to reconnect, die and retry.
WSDebug("ReconnectOP received.") WSDebug("ReconnectOP received.")
return g.Reconnect() // We must reconnect in another goroutine, as running Reconnect
// synchronously would prevent the main event loop from exiting.
go g.Reconnect()
// Gracefully exit with a nil let the event handler take the signal from
// the pacemaker.
return nil
case InvalidSessionOP: case InvalidSessionOP:
// Discord expects us to sleep for no reason // Discord expects us to sleep for no reason

View file

@ -23,8 +23,6 @@ type Pacemaker struct {
// Any callback that returns an error will stop the pacer. // Any callback that returns an error will stop the pacer.
Pace func() error Pace func() error
// Event
OnDead func() error
stop chan struct{} stop chan struct{}
death chan error death chan error
@ -69,19 +67,7 @@ func (p *Pacemaker) Stop() {
func (p *Pacemaker) start() error { func (p *Pacemaker) start() error {
tick := time.NewTicker(p.Heartrate) tick := time.NewTicker(p.Heartrate)
defer tick.Stop()
defer func() {
// Flush the ticker:
select {
case <-tick.C:
WSDebug("Flushed a tick.")
default:
WSDebug("No tick flushed.")
}
// Then close the ticker:
tick.Stop()
}()
// Echo at least once // Echo at least once
p.Echo() p.Echo()

View file

@ -4,9 +4,6 @@
package session package session
import ( import (
"os"
"os/signal"
"github.com/diamondburned/arikawa/api" "github.com/diamondburned/arikawa/api"
"github.com/diamondburned/arikawa/gateway" "github.com/diamondburned/arikawa/gateway"
"github.com/diamondburned/arikawa/handler" "github.com/diamondburned/arikawa/handler"
@ -137,21 +134,6 @@ func (s *Session) Close() error {
return s.Gateway.Close() return s.Gateway.Close()
} }
// Wait blocks until either a SIGINT or a Gateway fatal error is received.
// Check the Gateway documentation for more information.
func (s *Session) Wait() error {
sigint := make(chan os.Signal)
signal.Notify(sigint, os.Interrupt)
select {
case <-sigint:
return s.Close()
case err := <-s.Gateway.FatalError:
s.close()
return err
}
}
func (s *Session) close() { func (s *Session) close() {
if s.hstop != nil { if s.hstop != nil {
close(s.hstop) close(s.hstop)