1
0
Fork 0
mirror of https://github.com/diamondburned/arikawa.git synced 2025-01-05 19:57:02 +00:00

Gateway: Sharding callback, proper closing (#190)

* Gateway: Fix gateway reconnect

This commit uses the correct timeout, Gateway.ReconnectTimeout, when reconnecting. Furthermore, it adds a delay between consecutive, failed reconnects.

* Gateway: Stop pacemaker when calling Gateway.CloseGracefully

* API: remove unnecessary leading/trailing whitespaces

* Gateway: Add Gateway.OnScalingRequired callback

* Gateway: Make all user initiated user closures graceful and ensure that closures are respected during reconnects

* Gateway: Fix typo

* Gateway: Add Gateway.ReconnectAttempts and deprecate .ReconnectTimeout

* Gateway: Add Gateway.Pause and reexport .Reconnect and .ReconnectCtx

* Gateway: Improve the Gateway.OnShardingRequired docs

* Wsutil: Code cleanup
This commit is contained in:
Maximilian von Lindern 2021-04-07 20:38:26 +02:00 committed by GitHub
parent 37d285184a
commit ebc74e3168
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 150 additions and 85 deletions

View file

@ -210,10 +210,10 @@ func (c *Client) SendText(channelID discord.ChannelID, content string) (*discord
//
// Fires a Message Create Gateway event.
func (c *Client) SendTextReply(
channelID discord.ChannelID,
content string,
channelID discord.ChannelID,
content string,
referenceID discord.MessageID) (*discord.Message, error) {
return c.SendMessageComplex(channelID, SendMessageData{
Content: content,
Reference: &discord.MessageReference{MessageID: referenceID},
@ -241,8 +241,8 @@ func (c *Client) SendEmbed(
//
// Fires a Message Create Gateway event.
func (c *Client) SendEmbedReply(
channelID discord.ChannelID,
e discord.Embed,
channelID discord.ChannelID,
e discord.Embed,
referenceID discord.MessageID) (*discord.Message, error) {
return c.SendMessageComplex(channelID, SendMessageData{
@ -277,7 +277,7 @@ func (c *Client) SendMessageReply(
content string,
embed *discord.Embed,
referenceID discord.MessageID) (*discord.Message, error) {
return c.SendMessageComplex(channelID, SendMessageData{
Content: content,
Embed: embed,

View file

@ -21,6 +21,7 @@ import (
"github.com/diamondburned/arikawa/v2/utils/httputil"
"github.com/diamondburned/arikawa/v2/utils/json"
"github.com/diamondburned/arikawa/v2/utils/wsutil"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
)
@ -34,9 +35,15 @@ var (
var (
ErrMissingForResume = errors.New("missing session ID or sequence for resuming")
ErrWSMaxTries = errors.New("max tries reached")
ErrWSMaxTries = errors.New(
"could not connect to the Discord gateway before reaching the timeout")
ErrClosed = errors.New("the gateway is closed and cannot reconnect")
)
// see
// https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-close-event-codes
const errCodeShardingRequired = 4011
// BotData contains the GatewayURL as well as extra metadata on how to
// shard bots.
type BotData struct {
@ -87,13 +94,19 @@ type Gateway struct {
// timeout for Start and the timeout for sending each Gateway command
// independently.
WSTimeout time.Duration
// ReconnectTimeout is the timeout used during reconnection.
// If the a connection to the gateway can't be established before the
// duration passes, the Gateway will be closed and FatalErrorCallback will
// be called.
//
// Setting this to 0 is equivalent to no timeout.
//
// Deprecated: It is recommended to use ReconnectAttempts instead.
ReconnectTimeout time.Duration
// ReconnectAttempts are the amount of attempts made to Reconnect, before
// aborting. If this set to 0, unlimited attempts will be made.
ReconnectAttempts uint
// All events sent over are pointers to Event structs (structs suffixed with
// "Event"). This shouldn't be accessed if the Gateway is created with a
@ -120,12 +133,22 @@ type Gateway struct {
// Defaults to noop.
FatalErrorCallback func(err error)
// AfterClose is called after each close. Error can be non-nil, as this is
// called even when the Gateway is gracefully closed. It's used mainly for
// OnScalingRequired is the function called, if Discord closes with error
// code 4011 aka Scaling Required. At the point of calling, the Gateway
// will be closed, and can, after increasing the number of shards, be
// reopened using Open. Reconnect or ReconnectCtx, however, will not be
// available as the session is invalidated.
OnScalingRequired func()
// AfterClose is called after each close or pause. It is used mainly for
// reconnections or any type of connection interruptions.
AfterClose func(err error) // noop by default
//
// Constructors will use a no-op function by default.
AfterClose func(err error)
waitGroup sync.WaitGroup
closed chan struct{}
}
// NewGatewayWithIntents creates a new Gateway with the given intents and the
@ -238,12 +261,47 @@ func (g *Gateway) HasIntents(intents Intents) bool {
return g.Identifier.Intents.Has(intents)
}
// Close closes the underlying Websocket connection.
// Close closes the underlying Websocket connection, invalidating the session
// ID. A new gateway connection can be established, by calling Open again.
//
// If the wsutil.Connection of the Gateway's WS implements
// wsutil.GracefulCloser, such as the default one, Close will send a closing
// frame before ending the connection, closing it gracefully. This will cause
// the bot to appear as offline instantly.
func (g *Gateway) Close() error {
return g.close(true)
}
// CloseGracefully attempts to close the gateway connection gracefully, by
// sending a closing frame before ending the connection. This will cause the
// gateway's session id to be rendered invalid.
//
// Note that a graceful closure is only possible, if the wsutil.Connection of
// the Gateway's Websocket implements wsutil.GracefulCloser.
//
// Deprecated: Close behaves identically to CloseGracefully, and should be used
// instead.
func (g *Gateway) CloseGracefully() error {
return g.Close()
}
// Pause pauses the Gateway connection, by ending the connection without
// sending a closing frame. This allows the connection to be resumed at a later
// point, by calling Reconnect or ReconnectCtx.
func (g *Gateway) Pause() error {
return g.close(false)
}
func (g *Gateway) close(graceful bool) (err error) {
wsutil.WSDebug("Trying to close. Pacemaker check skipped.")
wsutil.WSDebug("Closing the Websocket...")
err := g.WS.Close()
if graceful {
err = g.WS.CloseGracefully()
} else {
err = g.WS.Close()
}
if errors.Is(err, wsutil.ErrWebsocketClosed) {
wsutil.WSDebug("Websocket already closed.")
return nil
@ -262,25 +320,16 @@ func (g *Gateway) Close() error {
g.AfterClose(err)
wsutil.WSDebug("AfterClose callback finished.")
return err
}
if graceful {
// If a Reconnect is in progress, signal to cancel.
close(g.closed)
// CloseGracefully attempts to close the gateway connection gracefully, by
// sending a closing frame before ending the connection. This will cause the
// gateway's session id to be rendered invalid.
//
// Note that a graceful closure is only possible, if the wsutil.Connection of
// the Gateway's Websocket implements wsutil.GracefulCloser.
func (g *Gateway) CloseGracefully() error {
err := g.WS.CloseGracefully()
if errors.Is(err, wsutil.ErrWebsocketClosed) {
wsutil.WSDebug("Websocket already closed.")
return nil
// Delete our session id, as we just invalidated it.
g.sessionMu.Lock()
g.sessionID = ""
g.sessionMu.Unlock()
}
// Stop the pacemaker loop; This shouldn't error, so return is ignored
g.WS.Close()
return err
}
@ -293,23 +342,22 @@ func (g *Gateway) SessionID() string {
return g.sessionID
}
// Reconnect tries to reconnect until the ReconnectTimeout is reached, or if
// set to 0 reconnects indefinitely.
// Reconnect tries to reconnect to the Gateway until the ReconnectAttempts or
// ReconnectTimeout is reached.
func (g *Gateway) Reconnect() {
ctx := context.Background()
if g.ReconnectTimeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(context.Background(), g.WSTimeout)
ctx, cancel = context.WithTimeout(context.Background(), g.ReconnectTimeout)
defer cancel()
}
// ignore the error, it is already logged and FatalErrorCallback was called
g.ReconnectCtx(ctx)
}
// ReconnectCtx attempts to reconnect until context expires.
// ReconnectCtx attempts to Reconnect until context expires.
// If the context expires FatalErrorCallback will be called with ErrWSMaxTries,
// and the last error returned by Open will be returned.
func (g *Gateway) ReconnectCtx(ctx context.Context) (err error) {
@ -317,32 +365,42 @@ func (g *Gateway) ReconnectCtx(ctx context.Context) (err error) {
// Guarantee the gateway is already closed. Ignore its error, as we're
// redialing anyway.
g.Close()
g.Pause()
for i := 1; ; i++ {
for try := uint(1); g.ReconnectAttempts == 0 || g.ReconnectAttempts >= try; try++ {
select {
case <-g.closed:
g.ErrorLog(ErrClosed)
return ErrClosed
case <-ctx.Done():
wsutil.WSDebug("Unable to Reconnect after", try, "attempts, aborting")
g.FatalErrorCallback(ErrWSMaxTries)
return err
default:
}
wsutil.WSDebug("Trying to dial, attempt", i)
wsutil.WSDebug("Trying to dial, attempt", try)
// Condition: err == ErrInvalidSession:
// If the connection is rate limited (documented behavior):
// https://discord.com/developers/docs/topics/gateway#rate-limiting
// if we encounter an error, make sure we return it, and not nil
if oerr := g.OpenContext(ctx); oerr != nil {
err = oerr
g.ErrorLog(oerr)
// make sure we don't overwrite our last error
if err = g.OpenContext(ctx); err != nil {
g.ErrorLog(err)
wait := time.Duration(4+2*try) * time.Second
if wait > 60*time.Second {
wait = 60 * time.Second
}
time.Sleep(wait)
continue
}
wsutil.WSDebug("Started after attempt:", i)
return
wsutil.WSDebug("Started after attempt:", try)
return nil
}
wsutil.WSDebug("Unable to Reconnect after", g.ReconnectAttempts, "attempts, aborting")
return err
}
// Open connects to the Websocket and authenticate it. You should usually use
@ -360,7 +418,7 @@ func (g *Gateway) Open() error {
func (g *Gateway) OpenContext(ctx context.Context) error {
// Reconnect to the Gateway
if err := g.WS.Dial(ctx); err != nil {
return errors.Wrap(err, "failed to reconnect")
return errors.Wrap(err, "failed to Reconnect")
}
wsutil.WSDebug("Trying to start...")
@ -386,6 +444,8 @@ func (g *Gateway) Start() error {
// StartCtx authenticates with the websocket, or resume from a dead Websocket
// connection. You wouldn't usually use this function, but OpenCtx() instead.
func (g *Gateway) StartCtx(ctx context.Context) error {
g.closed = make(chan struct{})
if err := g.start(ctx); err != nil {
wsutil.WSDebug("Start failed:", err)
@ -429,13 +489,28 @@ func (g *Gateway) start(ctx context.Context) error {
g.waitGroup.Done() // mark so Close() can exit.
wsutil.WSDebug("Event loop stopped with error:", err)
// If Discord signals us sharding is required, do not attempt to
// Reconnect. Instead invalidate our session id, as we cannot resume,
// call OnShardingRequired, and exit.
var cerr *websocket.CloseError
if errors.As(err, &cerr) && cerr != nil && cerr.Code == errCodeShardingRequired {
g.ErrorLog(cerr)
g.sessionMu.Lock()
g.sessionID = ""
g.sessionMu.Unlock()
g.OnScalingRequired()
return
}
// Bail if there is no error or if the error is an explicit close, as
// there might be an ongoing reconnection.
if err == nil || errors.Is(err, wsutil.ErrWebsocketClosed) {
return
}
// Only attempt to reconnect if we have a session ID at all. We may not
// Only attempt to Reconnect if we have a session ID at all. We may not
// have one if we haven't even connected successfully once.
if g.SessionID() != "" {
g.ErrorLog(err)

View file

@ -99,11 +99,15 @@ func TestIntegration(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
if err := gateway.ReconnectCtx(ctx); err != nil {
g.ErrorLog = func(err error) {
t.Fatal("Unexpected error while reconnecting:", err)
}
gateway.ReconnectCtx(ctx)
})
g.ErrorLog = func(err error) { log.Println(err) }
// Wait for the desired event:
gotimeout(t, func() {
for ev := range gateway.Events {

View file

@ -51,11 +51,11 @@ func (g *Gateway) HandleOP(op *wsutil.OP) error {
}
case ReconnectOP:
// Server requests to reconnect, die and retry.
// Server requests to Reconnect, die and retry.
wsutil.WSDebug("ReconnectOP received.")
// Exit with the ReconnectOP error to force the heartbeat event loop to
// reconnect synchronously. Not really a fatal error.
// Reconnect synchronously. Not really a fatal error.
return wsutil.ErrBrokenConnection(ErrReconnectRequest)
case InvalidSessionOP:
@ -67,7 +67,7 @@ func (g *Gateway) HandleOP(op *wsutil.OP) error {
// Invalid session, try and Identify.
if err := g.IdentifyCtx(ctx); err != nil {
// Can't identify, reconnect.
// Can't identify, Reconnect.
return wsutil.ErrBrokenConnection(ErrReconnectRequest)
}

View file

@ -147,7 +147,7 @@ func (ws *Websocket) SendCtx(ctx context.Context, b []byte) error {
if err := ws.conn.Send(ctx, b); err != nil {
// We need to clean up ourselves if things are erroring out.
WSDebug("Conn: Error while sending; closing the connection. Error:", err)
ws.close()
ws.close(false)
return err
}
@ -157,50 +157,36 @@ func (ws *Websocket) SendCtx(ctx context.Context, b []byte) error {
// Close closes the websocket connection. It assumes that the Websocket is
// closed even when it returns an error. If the Websocket was already closed
// before, ErrWebsocketClosed will be returned.
func (ws *Websocket) Close() error {
func (ws *Websocket) Close() error { return ws.close(false) }
func (ws *Websocket) CloseGracefully() error { return ws.close(true) }
// close closes the Websocket without acquiring the mutex. Refer to Close for
// more information.
func (ws *Websocket) close(graceful bool) error {
WSDebug("Conn: Acquiring mutex lock to close...")
ws.mutex.Lock()
defer ws.mutex.Unlock()
WSDebug("Conn: Write mutex acquired; closing.")
return ws.close()
}
func (ws *Websocket) CloseGracefully() error {
WSDebug("Conn: Acquiring mutex lock to close gracefully...")
ws.mutex.Lock()
defer ws.mutex.Unlock()
WSDebug("Conn: Write mutex acquired")
if gc, ok := ws.conn.(GracefulCloser); ok {
if ws.closed {
WSDebug("Conn: Websocket is already closed.")
return ErrWebsocketClosed
}
WSDebug("Conn: closing gracefully")
ws.closed = true
return gc.CloseGracefully()
} else {
WSDebug("Conn: The Websocket's Connection does not provide graceful closure. Closing normally instead.")
return ws.close()
}
}
// close closes the Websocket without acquiring the mutex. Refer to Close for
// more information.
func (ws *Websocket) close() error {
if ws.closed {
WSDebug("Conn: Websocket is already closed.")
return ErrWebsocketClosed
}
err := ws.conn.Close()
ws.closed = true
return err
if graceful {
if gc, ok := ws.conn.(GracefulCloser); ok {
WSDebug("Conn: Closing gracefully")
return gc.CloseGracefully()
}
WSDebug("Conn: The Websocket's Connection does not support graceful closure.")
}
WSDebug("Conn: Closing")
return ws.conn.Close()
}