diff --git a/_example/simple/main.go b/_example/simple/main.go index e27a212..b08aaf7 100644 --- a/_example/simple/main.go +++ b/_example/simple/main.go @@ -28,6 +28,7 @@ func main() { if err := s.Open(); err != nil { log.Fatalln("Failed to connect:", err) } + defer s.Close() u, err := s.Me() if err != nil { @@ -36,8 +37,6 @@ func main() { log.Println("Started as", u.Username) - // Block until a fatal error or SIGINT. Wait also calls Close(). - if err := s.Wait(); err != nil { - log.Fatalln("Gateway fatal error:", err) - } + // Block forever. + select {} } diff --git a/_example/undeleter/main.go b/_example/undeleter/main.go index ab1d7ff..d2641ef 100644 --- a/_example/undeleter/main.go +++ b/_example/undeleter/main.go @@ -38,6 +38,7 @@ func main() { if err := s.Open(); err != nil { log.Fatalln("Failed to connect:", err) } + defer s.Close() u, err := s.Me() if err != nil { @@ -46,8 +47,6 @@ func main() { log.Println("Started as", u.Username) - // Block until a fatal error or SIGINT. Wait also calls Close(). - if err := s.Wait(); err != nil { - log.Fatalln("Gateway fatal error:", err) - } + // Block forever. + select {} } diff --git a/bot/ctx.go b/bot/ctx.go index 82e709f..c6cd92a 100644 --- a/bot/ctx.go +++ b/bot/ctx.go @@ -140,13 +140,15 @@ func Start(token string, cmd interface{}, } return func() error { - // Run cancel() last to remove handlers when the context exits. - defer cancel() - return s.Wait() + Wait() + // remove handler first + cancel() + // then finish closing session + return s.Close() }, nil } -// Wait is deprecated. Use (*Context).Wait(). +// Wait blocks until SIGINT. func Wait() { sigs := make(chan os.Signal) signal.Notify(sigs, os.Interrupt) diff --git a/gateway/gateway.go b/gateway/gateway.go index 79df16e..ebd992c 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -42,8 +42,6 @@ var ( // WSExtraReadTimeout is the duration to be added to Hello, as a read // timeout for the websocket. WSExtraReadTimeout = time.Second - // WSRetries controls the number of Reconnects before erroring out. - WSRetries = 3 // WSDebug is used for extra debug logging. This is expected to behave // similarly to log.Println(). WSDebug = func(v ...interface{}) {} @@ -90,12 +88,6 @@ type Gateway struct { // reconnections or any type of connection interruptions. AfterClose func(err error) // noop by default - // FatalError is where Reconnect errors will go to. When an error is sent - // here, the Gateway is already dead, so Close() shouldn't be called. - // This channel is buffered once. - FatalError <-chan error - fatalError chan error - // Only use for debugging // If this channel is non-nil, all incoming OP packets will also be sent @@ -133,9 +125,7 @@ func NewGatewayWithDriver(token string, driver json.Driver) (*Gateway, error) { Sequence: NewSequence(), ErrorLog: WSError, AfterClose: func(error) {}, - fatalError: make(chan error, 1), } - g.FatalError = g.fatalError // Parameters for the gateway param := url.Values{ @@ -173,10 +163,6 @@ func (g *Gateway) Close() error { WSDebug("Stopped pacemaker.") } - WSDebug("Closing the websocket.") - err := g.WS.Close() - g.AfterClose(err) - WSDebug("Waiting for WaitGroup to be done.") // This should work, since Pacemaker should signal its loop to stop, which @@ -186,21 +172,23 @@ func (g *Gateway) Close() error { // Mark g.waitGroup as empty: g.waitGroup = nil - WSDebug("WaitGroup is done.") + WSDebug("WaitGroup is done. Closing the websocket.") + err := g.WS.Close() + g.AfterClose(err) return err } -// Reconnects and resumes. -func (g *Gateway) Reconnect() error { +// Reconnect tries to reconnect forever. It will resume the connection if +// possible. If an Invalid Session is received, it will start a fresh one. +func (g *Gateway) Reconnect() { WSDebug("Reconnecting...") - // Guarantee the gateway is already closed: - if err := g.Close(); err != nil { - return errors.Wrap(err, "Failed to close Gateway before reconnecting") - } + // Guarantee the gateway is already closed. Ignore its error, as we're + // redialing anyway. + g.Close() - for i := 0; WSRetries < 0 || i < WSRetries; i++ { + for i := 1; ; i++ { WSDebug("Trying to dial, attempt", i) // Condition: err == ErrInvalidSession: @@ -213,10 +201,8 @@ func (g *Gateway) Reconnect() error { } WSDebug("Started after attempt:", i) - return nil + return } - - return ErrWSMaxTries } // Open connects to the Websocket and authenticate it. You should usually use @@ -261,11 +247,10 @@ func (g *Gateway) Start() error { return nil } -// Wait blocks until the Gateway fatally exits when it couldn't reconnect -// anymore. To use this withh other channels, check out g.FatalError. If a -// non-nil error is returned, Close() shouldn't be called again. +// Wait is deprecated. The gateway will reconnect forever. This function will +// panic. func (g *Gateway) Wait() error { - return <-g.FatalError + panic("Wait is deprecated. defer (*Gateway).Close() is required.") } func (g *Gateway) start() error { @@ -319,12 +304,11 @@ func (g *Gateway) start() error { g.Pacemaker = &Pacemaker{ Heartrate: hello.HeartbeatInterval.Duration(), Pace: g.Heartbeat, - OnDead: g.Reconnect, } // Pacemaker dies here, only when it's fatal. g.paceDeath = g.Pacemaker.StartAsync(g.waitGroup) - // Start the event handler + // Start the event handler, which also handles the pacemaker death signal. g.waitGroup.Add(1) go g.handleWS() @@ -336,13 +320,12 @@ func (g *Gateway) start() error { // handleWS uses the Websocket and parses them into g.Events. func (g *Gateway) handleWS() { err := g.eventLoop() - g.waitGroup.Done() + g.waitGroup.Done() // mark so Close() can exit. WSDebug("Event loop stopped.") if err != nil { g.ErrorLog(err) - - g.fatalError <- errors.Wrap(g.Reconnect(), "Failed to reconnect") + g.Reconnect() // Reconnect should spawn another eventLoop in its Start function. } } diff --git a/gateway/integration_test.go b/gateway/integration_test.go index 4678dc8..5c2460e 100644 --- a/gateway/integration_test.go +++ b/gateway/integration_test.go @@ -76,28 +76,21 @@ func TestIntegration(t *testing.T) { // Sleep past the rate limiter before reconnecting: time.Sleep(5 * time.Second) - // Try and reconnect - if err := gateway.Reconnect(); err != nil { - t.Fatal("Failed to reconnect:", err) - } + // Try and reconnect forever: + gotimeout(t, gateway.Reconnect) - timeout := time.After(10 * time.Second) - -Main: - for { - select { - case ev := <-gateway.Events: + // Wait for the desired event: + gotimeout(t, func() { + for ev := range gateway.Events { switch ev.(type) { // Accept only a Resumed event. case *ResumedEvent: - break Main + return // exit case *ReadyEvent: t.Fatal("Ready event received instead of Resumed.") } - case <-timeout: - t.Fatal("Timed out waiting for ResumedEvent") } - } + }) if err := g.Close(); err != nil { t.Fatal("Failed to close Gateway:", err) @@ -113,3 +106,18 @@ func wait(t *testing.T, evCh chan interface{}) interface{} { return nil } } + +func gotimeout(t *testing.T, fn func()) { + var done = make(chan struct{}) + go func() { + fn() + done <- struct{}{} + }() + + select { + case <-time.After(10 * time.Second): + t.Fatal("Timed out waiting for function.") + case <-done: + return + } +} diff --git a/gateway/op.go b/gateway/op.go index 00b12cb..30fe00d 100644 --- a/gateway/op.go +++ b/gateway/op.go @@ -146,7 +146,12 @@ func HandleOP(g *Gateway, op *OP) error { case ReconnectOP: // Server requests to reconnect, die and retry. WSDebug("ReconnectOP received.") - return g.Reconnect() + // We must reconnect in another goroutine, as running Reconnect + // synchronously would prevent the main event loop from exiting. + go g.Reconnect() + // Gracefully exit with a nil let the event handler take the signal from + // the pacemaker. + return nil case InvalidSessionOP: // Discord expects us to sleep for no reason diff --git a/gateway/pacemaker.go b/gateway/pacemaker.go index 2051460..db3ed2e 100644 --- a/gateway/pacemaker.go +++ b/gateway/pacemaker.go @@ -23,8 +23,6 @@ type Pacemaker struct { // Any callback that returns an error will stop the pacer. Pace func() error - // Event - OnDead func() error stop chan struct{} death chan error @@ -69,19 +67,7 @@ func (p *Pacemaker) Stop() { func (p *Pacemaker) start() error { tick := time.NewTicker(p.Heartrate) - - defer func() { - // Flush the ticker: - select { - case <-tick.C: - WSDebug("Flushed a tick.") - default: - WSDebug("No tick flushed.") - } - - // Then close the ticker: - tick.Stop() - }() + defer tick.Stop() // Echo at least once p.Echo() diff --git a/session/session.go b/session/session.go index 35ee8ce..e4e251d 100644 --- a/session/session.go +++ b/session/session.go @@ -4,9 +4,6 @@ package session import ( - "os" - "os/signal" - "github.com/diamondburned/arikawa/api" "github.com/diamondburned/arikawa/gateway" "github.com/diamondburned/arikawa/handler" @@ -137,21 +134,6 @@ func (s *Session) Close() error { return s.Gateway.Close() } -// Wait blocks until either a SIGINT or a Gateway fatal error is received. -// Check the Gateway documentation for more information. -func (s *Session) Wait() error { - sigint := make(chan os.Signal) - signal.Notify(sigint, os.Interrupt) - - select { - case <-sigint: - return s.Close() - case err := <-s.Gateway.FatalError: - s.close() - return err - } -} - func (s *Session) close() { if s.hstop != nil { close(s.hstop)