Gateway: Fix race conditions and fatal reconnecting (#7)

This commit is contained in:
diamondburned 2020-02-07 22:17:27 -08:00 committed by GitHub
parent ce752a0230
commit 3288f2d19c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 96 additions and 60 deletions

View File

@ -12,6 +12,7 @@ import (
"log"
"net/url"
"runtime"
"sync"
"time"
"github.com/diamondburned/arikawa/api"
@ -36,9 +37,6 @@ var (
// WSBuffer is the size of the Event channel. This has to be at least 1 to
// make space for the first Event: Ready or Resumed.
WSBuffer = 10
// WSRetries is the times Gateway would try and connect or reconnect to the
// gateway.
WSRetries = uint(5)
// WSError is the default error handler
WSError = func(err error) { log.Println("Gateway error:", err) }
// WSFatal is the default fatal handler, which is called when the Gateway
@ -47,6 +45,8 @@ var (
// WSExtraReadTimeout is the duration to be added to Hello, as a read
// timeout for the websocket.
WSExtraReadTimeout = time.Second
WSDebug = func(v ...interface{}) {}
)
var (
@ -78,8 +78,6 @@ type Gateway struct {
// Timeout for connecting and writing to the Websocket, uses default
// WSTimeout (global).
WSTimeout time.Duration
// Retries on connect and reconnect.
WSRetries uint // 3
// All events sent over are pointers to Event structs (structs suffixed with
// "Event"). This shouldn't be accessed if the Gateway is created with a
@ -102,8 +100,8 @@ type Gateway struct {
OP chan Event
// Filled by methods, internal use
done chan struct{}
paceDeath chan error
waitGroup *sync.WaitGroup
}
// NewGateway starts a new Gateway with the default stdlib JSON driver. For more
@ -122,7 +120,6 @@ func NewGatewayWithDriver(token string, driver json.Driver) (*Gateway, error) {
g := &Gateway{
Driver: driver,
WSTimeout: WSTimeout,
WSRetries: WSRetries,
Events: make(chan Event, WSBuffer),
Identifier: DefaultIdentifier(token),
Sequence: NewSequence(),
@ -153,15 +150,20 @@ func NewGatewayWithDriver(token string, driver json.Driver) (*Gateway, error) {
// Close closes the underlying Websocket connection.
func (g *Gateway) Close() error {
WSDebug("Stopping pacemaker...")
// If the pacemaker is running:
// Stop the pacemaker and the event handler
g.Pacemaker.Stop()
if g.done != nil {
// Wait for the event handler to fully exit
<-g.done
g.done = nil
}
WSDebug("Stopped pacemaker. Waiting for WaitGroup to be done.")
// This should work, since Pacemaker should signal its loop to stop, which
// would also exit our event loop. Both would be 2.
g.waitGroup.Wait()
// Mark g.waitGroup as empty:
g.waitGroup = nil
// Stop the Websocket
return g.WS.Close(nil)
@ -169,9 +171,13 @@ func (g *Gateway) Close() error {
// Reconnects and resumes.
func (g *Gateway) Reconnect() error {
WSDebug("Reconnecting...")
// If the event loop is not dead:
if g.done != nil {
if g.paceDeath != nil {
WSDebug("Gateway is not closed, closing before reconnecting...")
g.Close()
WSDebug("Gateway is closed asynchronously. Goroutine may not be exited.")
}
// Actually a reconnect at this point.
@ -180,12 +186,15 @@ func (g *Gateway) Reconnect() error {
func (g *Gateway) Open() error {
// Reconnect timeout
ctx, cancel := context.WithTimeout(context.Background(), g.WSTimeout)
defer cancel()
// ctx, cancel := context.WithTimeout(context.Background(), g.WSTimeout)
// defer cancel()
var Lerr error
// TODO: this could be of some use.
ctx := context.Background()
for i := 0; ; i++ {
/* Context doesn't time out.
for i := uint(0); i < g.WSRetries; i++ {
// Check if context is expired
if err := ctx.Err(); err != nil {
// Close the connection
@ -195,14 +204,19 @@ func (g *Gateway) Open() error {
return err
}
*/
WSDebug("Trying to dial...", i)
// Reconnect to the Gateway
if err := g.WS.Dial(ctx); err != nil {
// Save the error, retry again
Lerr = errors.Wrap(err, "Failed to reconnect")
g.ErrorLog(err)
g.ErrorLog(errors.Wrap(err, "Failed to reconnect"))
continue
}
WSDebug("Trying to start...", i)
// Try to resume the connection
if err := g.Start(); err != nil {
// If the connection is rate limited (documented behavior):
@ -216,24 +230,20 @@ func (g *Gateway) Open() error {
continue
}
WSDebug("Started after attempt:", i)
// Started successfully, return
return nil
}
// Check if any earlier errors are fatal
if Lerr != nil {
return Lerr
}
// We tried.
return ErrWSMaxTries
}
// Start authenticates with the websocket, or resume from a dead Websocket
// connection. This function doesn't block.
func (g *Gateway) Start() error {
if err := g.start(); err != nil {
g.Close()
WSDebug("Start failed:", err)
if err := g.Close(); err != nil {
WSDebug("Failed to close after start fail:", err)
}
return err
}
return nil
@ -249,14 +259,17 @@ func (g *Gateway) start() error {
return errors.Wrap(err, "Error at Hello")
}
// Start the pacemaker with the heartrate received from Hello
// Make a new WaitGroup for use in background loops:
g.waitGroup = new(sync.WaitGroup)
// Start the pacemaker with the heartrate received from Hello:
g.Pacemaker = &Pacemaker{
Heartrate: hello.HeartbeatInterval.Duration(),
Pace: g.Heartbeat,
OnDead: g.Reconnect,
}
// Pacemaker dies here, only when it's fatal.
g.paceDeath = g.Pacemaker.StartAsync()
g.paceDeath = g.Pacemaker.StartAsync(g.waitGroup)
// Send Discord either the Identify packet (if it's a fresh connection), or
// a Resume packet (if it's a dead connection).
@ -285,38 +298,44 @@ func (g *Gateway) start() error {
}
// Start the event handler
g.done = make(chan struct{})
g.waitGroup.Add(1)
go g.handleWS()
WSDebug("Started successfully.")
return nil
}
// handleWS uses the Websocket and parses them into g.Events.
func (g *Gateway) handleWS() {
ch := g.WS.Listen()
err := g.eventLoop()
g.waitGroup.Done()
defer func() {
g.done <- struct{}{}
}()
if err != nil {
if err := g.Reconnect(); err != nil {
g.FatalLog(errors.Wrap(err, "Failed to reconnect"))
}
// Reconnect should spawn another eventLoop in its Start function.
}
}
func (g *Gateway) eventLoop() error {
ch := g.WS.Listen()
for {
select {
case err := <-g.paceDeath:
// Got a paceDeath, we're exiting from here on out.
g.paceDeath = nil // mark
if err == nil {
WSDebug("Pacemaker stopped without errors.")
// No error, just exit normally.
return
return nil
}
g.ErrorLog(errors.Wrap(err, "Pacemaker died"))
// Pacemaker died, pretty fatal. We'll reconnect though.
if err := g.Reconnect(); err != nil {
// Very fatal if this fails. We'll warn the user.
g.FatalLog(errors.Wrap(err, "Failed to reconnect"))
// Then, we'll take the safe way and exit.
return
}
return errors.New("Pacemaker died, reconnecting.")
case ev := <-ch:
// Check for error
@ -325,6 +344,10 @@ func (g *Gateway) handleWS() {
continue
}
if len(ev.Data) == 0 {
return errors.New("Event data is empty, reconnecting.")
}
// Handle the event
if err := HandleEvent(g, ev.Data); err != nil {
g.ErrorLog(errors.Wrap(err, "WS handler error"))

View File

@ -1,6 +1,7 @@
package gateway
import (
"sync"
"sync/atomic"
"time"
@ -64,10 +65,14 @@ func (p *Pacemaker) Stop() {
}
}
func (p *Pacemaker) start(stop chan struct{}) error {
func (p *Pacemaker) start(stop chan struct{}, wg *sync.WaitGroup) error {
tick := time.NewTicker(p.Heartrate)
defer tick.Stop()
if wg != nil {
defer wg.Done()
}
// Echo at least once
p.Echo()
@ -91,14 +96,19 @@ func (p *Pacemaker) start(stop chan struct{}) error {
}
}
func (p *Pacemaker) StartAsync() (death chan error) {
// StartAsync starts the pacemaker asynchronously. The WaitGroup is optional.
func (p *Pacemaker) StartAsync(wg *sync.WaitGroup) (death chan error) {
p.death = make(chan error)
stop := make(chan struct{})
p.stop = stop
if wg != nil {
wg.Add(1)
}
go func() {
p.death <- p.start(stop)
p.death <- p.start(stop, wg)
}()
return p.death

View File

@ -149,16 +149,7 @@ func (c *Conn) Send(ctx context.Context, b []byte) error {
func (c *Conn) Close(err error) error {
// Wait for the read loop to exit after exiting.
defer func() {
c.mut.Lock()
defer c.mut.Unlock()
<-c.events
c.events = nil
// Set the connection to nil.
c.Conn = nil
}()
defer c.close()
if err == nil {
return c.Conn.Close(websocket.StatusNormalClosure, "")
@ -171,3 +162,14 @@ func (c *Conn) Close(err error) error {
return c.Conn.Close(websocket.StatusProtocolError, msg)
}
func (c *Conn) close() {
c.mut.Lock()
defer c.mut.Unlock()
<-c.events
c.events = nil
// Set the connection to nil.
c.Conn = nil
}

View File

@ -7,7 +7,8 @@ import (
)
func NewSendLimiter() *rate.Limiter {
return rate.NewLimiter(rate.Every(time.Minute), 120)
// return rate.NewLimiter(rate.Every(time.Minute), 120)
return rate.NewLimiter(rate.Every(time.Second), 2)
}
func NewDialLimiter() *rate.Limiter {