mirror of
https://github.com/diamondburned/arikawa.git
synced 2025-07-23 05:11:44 +00:00
Compare commits
2 commits
37d285184a
...
728bc5c472
Author | SHA1 | Date | |
---|---|---|---|
|
728bc5c472 | ||
|
ebc74e3168 |
|
@ -21,6 +21,7 @@ import (
|
||||||
"github.com/diamondburned/arikawa/v2/utils/httputil"
|
"github.com/diamondburned/arikawa/v2/utils/httputil"
|
||||||
"github.com/diamondburned/arikawa/v2/utils/json"
|
"github.com/diamondburned/arikawa/v2/utils/json"
|
||||||
"github.com/diamondburned/arikawa/v2/utils/wsutil"
|
"github.com/diamondburned/arikawa/v2/utils/wsutil"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -34,9 +35,15 @@ var (
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrMissingForResume = errors.New("missing session ID or sequence for resuming")
|
ErrMissingForResume = errors.New("missing session ID or sequence for resuming")
|
||||||
ErrWSMaxTries = errors.New("max tries reached")
|
ErrWSMaxTries = errors.New(
|
||||||
|
"could not connect to the Discord gateway before reaching the timeout")
|
||||||
|
ErrClosed = errors.New("the gateway is closed and cannot reconnect")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// see
|
||||||
|
// https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-close-event-codes
|
||||||
|
const errCodeShardingRequired = 4011
|
||||||
|
|
||||||
// BotData contains the GatewayURL as well as extra metadata on how to
|
// BotData contains the GatewayURL as well as extra metadata on how to
|
||||||
// shard bots.
|
// shard bots.
|
||||||
type BotData struct {
|
type BotData struct {
|
||||||
|
@ -87,13 +94,19 @@ type Gateway struct {
|
||||||
// timeout for Start and the timeout for sending each Gateway command
|
// timeout for Start and the timeout for sending each Gateway command
|
||||||
// independently.
|
// independently.
|
||||||
WSTimeout time.Duration
|
WSTimeout time.Duration
|
||||||
|
|
||||||
// ReconnectTimeout is the timeout used during reconnection.
|
// ReconnectTimeout is the timeout used during reconnection.
|
||||||
// If the a connection to the gateway can't be established before the
|
// If the a connection to the gateway can't be established before the
|
||||||
// duration passes, the Gateway will be closed and FatalErrorCallback will
|
// duration passes, the Gateway will be closed and FatalErrorCallback will
|
||||||
// be called.
|
// be called.
|
||||||
//
|
//
|
||||||
// Setting this to 0 is equivalent to no timeout.
|
// Setting this to 0 is equivalent to no timeout.
|
||||||
|
//
|
||||||
|
// Deprecated: It is recommended to use ReconnectAttempts instead.
|
||||||
ReconnectTimeout time.Duration
|
ReconnectTimeout time.Duration
|
||||||
|
// ReconnectAttempts are the amount of attempts made to Reconnect, before
|
||||||
|
// aborting. If this set to 0, unlimited attempts will be made.
|
||||||
|
ReconnectAttempts uint
|
||||||
|
|
||||||
// All events sent over are pointers to Event structs (structs suffixed with
|
// All events sent over are pointers to Event structs (structs suffixed with
|
||||||
// "Event"). This shouldn't be accessed if the Gateway is created with a
|
// "Event"). This shouldn't be accessed if the Gateway is created with a
|
||||||
|
@ -120,12 +133,22 @@ type Gateway struct {
|
||||||
// Defaults to noop.
|
// Defaults to noop.
|
||||||
FatalErrorCallback func(err error)
|
FatalErrorCallback func(err error)
|
||||||
|
|
||||||
// AfterClose is called after each close. Error can be non-nil, as this is
|
// OnScalingRequired is the function called, if Discord closes with error
|
||||||
// called even when the Gateway is gracefully closed. It's used mainly for
|
// code 4011 aka Scaling Required. At the point of calling, the Gateway
|
||||||
|
// will be closed, and can, after increasing the number of shards, be
|
||||||
|
// reopened using Open. Reconnect or ReconnectCtx, however, will not be
|
||||||
|
// available as the session is invalidated.
|
||||||
|
OnScalingRequired func()
|
||||||
|
|
||||||
|
// AfterClose is called after each close or pause. It is used mainly for
|
||||||
// reconnections or any type of connection interruptions.
|
// reconnections or any type of connection interruptions.
|
||||||
AfterClose func(err error) // noop by default
|
//
|
||||||
|
// Constructors will use a no-op function by default.
|
||||||
|
AfterClose func(err error)
|
||||||
|
|
||||||
waitGroup sync.WaitGroup
|
waitGroup sync.WaitGroup
|
||||||
|
|
||||||
|
closed chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewGatewayWithIntents creates a new Gateway with the given intents and the
|
// NewGatewayWithIntents creates a new Gateway with the given intents and the
|
||||||
|
@ -238,12 +261,47 @@ func (g *Gateway) HasIntents(intents Intents) bool {
|
||||||
return g.Identifier.Intents.Has(intents)
|
return g.Identifier.Intents.Has(intents)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the underlying Websocket connection.
|
// Close closes the underlying Websocket connection, invalidating the session
|
||||||
|
// ID. A new gateway connection can be established, by calling Open again.
|
||||||
|
//
|
||||||
|
// If the wsutil.Connection of the Gateway's WS implements
|
||||||
|
// wsutil.GracefulCloser, such as the default one, Close will send a closing
|
||||||
|
// frame before ending the connection, closing it gracefully. This will cause
|
||||||
|
// the bot to appear as offline instantly.
|
||||||
func (g *Gateway) Close() error {
|
func (g *Gateway) Close() error {
|
||||||
|
return g.close(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CloseGracefully attempts to close the gateway connection gracefully, by
|
||||||
|
// sending a closing frame before ending the connection. This will cause the
|
||||||
|
// gateway's session id to be rendered invalid.
|
||||||
|
//
|
||||||
|
// Note that a graceful closure is only possible, if the wsutil.Connection of
|
||||||
|
// the Gateway's Websocket implements wsutil.GracefulCloser.
|
||||||
|
//
|
||||||
|
// Deprecated: Close behaves identically to CloseGracefully, and should be used
|
||||||
|
// instead.
|
||||||
|
func (g *Gateway) CloseGracefully() error {
|
||||||
|
return g.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pause pauses the Gateway connection, by ending the connection without
|
||||||
|
// sending a closing frame. This allows the connection to be resumed at a later
|
||||||
|
// point, by calling Reconnect or ReconnectCtx.
|
||||||
|
func (g *Gateway) Pause() error {
|
||||||
|
return g.close(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *Gateway) close(graceful bool) (err error) {
|
||||||
wsutil.WSDebug("Trying to close. Pacemaker check skipped.")
|
wsutil.WSDebug("Trying to close. Pacemaker check skipped.")
|
||||||
wsutil.WSDebug("Closing the Websocket...")
|
wsutil.WSDebug("Closing the Websocket...")
|
||||||
|
|
||||||
err := g.WS.Close()
|
if graceful {
|
||||||
|
err = g.WS.CloseGracefully()
|
||||||
|
} else {
|
||||||
|
err = g.WS.Close()
|
||||||
|
}
|
||||||
|
|
||||||
if errors.Is(err, wsutil.ErrWebsocketClosed) {
|
if errors.Is(err, wsutil.ErrWebsocketClosed) {
|
||||||
wsutil.WSDebug("Websocket already closed.")
|
wsutil.WSDebug("Websocket already closed.")
|
||||||
return nil
|
return nil
|
||||||
|
@ -262,25 +320,16 @@ func (g *Gateway) Close() error {
|
||||||
g.AfterClose(err)
|
g.AfterClose(err)
|
||||||
wsutil.WSDebug("AfterClose callback finished.")
|
wsutil.WSDebug("AfterClose callback finished.")
|
||||||
|
|
||||||
return err
|
if graceful {
|
||||||
}
|
// If a Reconnect is in progress, signal to cancel.
|
||||||
|
close(g.closed)
|
||||||
|
|
||||||
// CloseGracefully attempts to close the gateway connection gracefully, by
|
// Delete our session id, as we just invalidated it.
|
||||||
// sending a closing frame before ending the connection. This will cause the
|
g.sessionMu.Lock()
|
||||||
// gateway's session id to be rendered invalid.
|
g.sessionID = ""
|
||||||
//
|
g.sessionMu.Unlock()
|
||||||
// Note that a graceful closure is only possible, if the wsutil.Connection of
|
|
||||||
// the Gateway's Websocket implements wsutil.GracefulCloser.
|
|
||||||
func (g *Gateway) CloseGracefully() error {
|
|
||||||
err := g.WS.CloseGracefully()
|
|
||||||
if errors.Is(err, wsutil.ErrWebsocketClosed) {
|
|
||||||
wsutil.WSDebug("Websocket already closed.")
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop the pacemaker loop; This shouldn't error, so return is ignored
|
|
||||||
g.WS.Close()
|
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -293,23 +342,22 @@ func (g *Gateway) SessionID() string {
|
||||||
return g.sessionID
|
return g.sessionID
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reconnect tries to reconnect until the ReconnectTimeout is reached, or if
|
// Reconnect tries to reconnect to the Gateway until the ReconnectAttempts or
|
||||||
// set to 0 reconnects indefinitely.
|
// ReconnectTimeout is reached.
|
||||||
func (g *Gateway) Reconnect() {
|
func (g *Gateway) Reconnect() {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
if g.ReconnectTimeout > 0 {
|
if g.ReconnectTimeout > 0 {
|
||||||
var cancel func()
|
var cancel func()
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), g.WSTimeout)
|
ctx, cancel = context.WithTimeout(context.Background(), g.ReconnectTimeout)
|
||||||
|
|
||||||
defer cancel()
|
defer cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ignore the error, it is already logged and FatalErrorCallback was called
|
|
||||||
g.ReconnectCtx(ctx)
|
g.ReconnectCtx(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReconnectCtx attempts to reconnect until context expires.
|
// ReconnectCtx attempts to Reconnect until context expires.
|
||||||
// If the context expires FatalErrorCallback will be called with ErrWSMaxTries,
|
// If the context expires FatalErrorCallback will be called with ErrWSMaxTries,
|
||||||
// and the last error returned by Open will be returned.
|
// and the last error returned by Open will be returned.
|
||||||
func (g *Gateway) ReconnectCtx(ctx context.Context) (err error) {
|
func (g *Gateway) ReconnectCtx(ctx context.Context) (err error) {
|
||||||
|
@ -317,32 +365,42 @@ func (g *Gateway) ReconnectCtx(ctx context.Context) (err error) {
|
||||||
|
|
||||||
// Guarantee the gateway is already closed. Ignore its error, as we're
|
// Guarantee the gateway is already closed. Ignore its error, as we're
|
||||||
// redialing anyway.
|
// redialing anyway.
|
||||||
g.Close()
|
g.Pause()
|
||||||
|
|
||||||
for i := 1; ; i++ {
|
for try := uint(1); g.ReconnectAttempts == 0 || g.ReconnectAttempts >= try; try++ {
|
||||||
select {
|
select {
|
||||||
|
case <-g.closed:
|
||||||
|
g.ErrorLog(ErrClosed)
|
||||||
|
return ErrClosed
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
wsutil.WSDebug("Unable to Reconnect after", try, "attempts, aborting")
|
||||||
g.FatalErrorCallback(ErrWSMaxTries)
|
g.FatalErrorCallback(ErrWSMaxTries)
|
||||||
return err
|
return err
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
wsutil.WSDebug("Trying to dial, attempt", i)
|
wsutil.WSDebug("Trying to dial, attempt", try)
|
||||||
|
|
||||||
// Condition: err == ErrInvalidSession:
|
// if we encounter an error, make sure we return it, and not nil
|
||||||
// If the connection is rate limited (documented behavior):
|
if oerr := g.OpenContext(ctx); oerr != nil {
|
||||||
// https://discord.com/developers/docs/topics/gateway#rate-limiting
|
err = oerr
|
||||||
|
g.ErrorLog(oerr)
|
||||||
|
|
||||||
// make sure we don't overwrite our last error
|
wait := time.Duration(4+2*try) * time.Second
|
||||||
if err = g.OpenContext(ctx); err != nil {
|
if wait > 60*time.Second {
|
||||||
g.ErrorLog(err)
|
wait = 60 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(wait)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
wsutil.WSDebug("Started after attempt:", i)
|
wsutil.WSDebug("Started after attempt:", try)
|
||||||
|
return nil
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wsutil.WSDebug("Unable to Reconnect after", g.ReconnectAttempts, "attempts, aborting")
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open connects to the Websocket and authenticate it. You should usually use
|
// Open connects to the Websocket and authenticate it. You should usually use
|
||||||
|
@ -360,7 +418,7 @@ func (g *Gateway) Open() error {
|
||||||
func (g *Gateway) OpenContext(ctx context.Context) error {
|
func (g *Gateway) OpenContext(ctx context.Context) error {
|
||||||
// Reconnect to the Gateway
|
// Reconnect to the Gateway
|
||||||
if err := g.WS.Dial(ctx); err != nil {
|
if err := g.WS.Dial(ctx); err != nil {
|
||||||
return errors.Wrap(err, "failed to reconnect")
|
return errors.Wrap(err, "failed to Reconnect")
|
||||||
}
|
}
|
||||||
|
|
||||||
wsutil.WSDebug("Trying to start...")
|
wsutil.WSDebug("Trying to start...")
|
||||||
|
@ -386,6 +444,8 @@ func (g *Gateway) Start() error {
|
||||||
// StartCtx authenticates with the websocket, or resume from a dead Websocket
|
// StartCtx authenticates with the websocket, or resume from a dead Websocket
|
||||||
// connection. You wouldn't usually use this function, but OpenCtx() instead.
|
// connection. You wouldn't usually use this function, but OpenCtx() instead.
|
||||||
func (g *Gateway) StartCtx(ctx context.Context) error {
|
func (g *Gateway) StartCtx(ctx context.Context) error {
|
||||||
|
g.closed = make(chan struct{})
|
||||||
|
|
||||||
if err := g.start(ctx); err != nil {
|
if err := g.start(ctx); err != nil {
|
||||||
wsutil.WSDebug("Start failed:", err)
|
wsutil.WSDebug("Start failed:", err)
|
||||||
|
|
||||||
|
@ -429,13 +489,28 @@ func (g *Gateway) start(ctx context.Context) error {
|
||||||
g.waitGroup.Done() // mark so Close() can exit.
|
g.waitGroup.Done() // mark so Close() can exit.
|
||||||
wsutil.WSDebug("Event loop stopped with error:", err)
|
wsutil.WSDebug("Event loop stopped with error:", err)
|
||||||
|
|
||||||
|
// If Discord signals us sharding is required, do not attempt to
|
||||||
|
// Reconnect. Instead invalidate our session id, as we cannot resume,
|
||||||
|
// call OnShardingRequired, and exit.
|
||||||
|
var cerr *websocket.CloseError
|
||||||
|
if errors.As(err, &cerr) && cerr != nil && cerr.Code == errCodeShardingRequired {
|
||||||
|
g.ErrorLog(cerr)
|
||||||
|
|
||||||
|
g.sessionMu.Lock()
|
||||||
|
g.sessionID = ""
|
||||||
|
g.sessionMu.Unlock()
|
||||||
|
|
||||||
|
g.OnScalingRequired()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Bail if there is no error or if the error is an explicit close, as
|
// Bail if there is no error or if the error is an explicit close, as
|
||||||
// there might be an ongoing reconnection.
|
// there might be an ongoing reconnection.
|
||||||
if err == nil || errors.Is(err, wsutil.ErrWebsocketClosed) {
|
if err == nil || errors.Is(err, wsutil.ErrWebsocketClosed) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only attempt to reconnect if we have a session ID at all. We may not
|
// Only attempt to Reconnect if we have a session ID at all. We may not
|
||||||
// have one if we haven't even connected successfully once.
|
// have one if we haven't even connected successfully once.
|
||||||
if g.SessionID() != "" {
|
if g.SessionID() != "" {
|
||||||
g.ErrorLog(err)
|
g.ErrorLog(err)
|
||||||
|
|
|
@ -39,16 +39,16 @@ func TestURL(t *testing.T) {
|
||||||
func TestInvalidToken(t *testing.T) {
|
func TestInvalidToken(t *testing.T) {
|
||||||
g, err := NewGateway("bad token")
|
g, err := NewGateway("bad token")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Failed to make a Gateway:", err)
|
t.Fatal("failed to make a Gateway:", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = g.Open(); err == nil {
|
if err = g.Open(); err == nil {
|
||||||
t.Fatal("Unexpected success while opening with a bad token.")
|
t.Fatal("unexpected success while opening with a bad token.")
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4004 Authentication Failed.
|
// 4004 Authentication Failed.
|
||||||
if !strings.Contains(err.Error(), "4004") {
|
if !strings.Contains(err.Error(), "4004") {
|
||||||
t.Fatal("Unexpected error:", err)
|
t.Fatal("unexpected error:", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,26 +64,26 @@ func TestIntegration(t *testing.T) {
|
||||||
// NewGateway should call Start for us.
|
// NewGateway should call Start for us.
|
||||||
g, err := NewGateway("Bot " + config.BotToken)
|
g, err := NewGateway("Bot " + config.BotToken)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Failed to make a Gateway:", err)
|
t.Fatal("failed to make a Gateway:", err)
|
||||||
}
|
}
|
||||||
g.AddIntents(IntentGuilds)
|
g.AddIntents(IntentGuilds)
|
||||||
g.AfterClose = func(err error) {
|
g.AfterClose = func(err error) {
|
||||||
t.Log("Closed.")
|
t.Log("closed.")
|
||||||
}
|
}
|
||||||
gateway = g
|
gateway = g
|
||||||
|
|
||||||
if err := g.Open(); err != nil {
|
if err := g.Open(); err != nil {
|
||||||
t.Fatal("Failed to authenticate with Discord:", err)
|
t.Fatal("failed to authenticate with Discord:", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ev := wait(t, gateway.Events)
|
ev := wait(t, gateway.Events)
|
||||||
ready, ok := ev.(*ReadyEvent)
|
ready, ok := ev.(*ReadyEvent)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatal("Event received is not of type Ready:", ev)
|
t.Fatal("event received is not of type Ready:", ev)
|
||||||
}
|
}
|
||||||
|
|
||||||
if gateway.SessionID() == "" {
|
if gateway.SessionID() == "" {
|
||||||
t.Fatal("Session ID is empty")
|
t.Fatal("session ID is empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println("Bot's username is", ready.User.Username)
|
log.Println("Bot's username is", ready.User.Username)
|
||||||
|
@ -99,11 +99,17 @@ func TestIntegration(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
g.ErrorLog = func(err error) {
|
||||||
|
t.Error("unexpected error while reconnecting:", err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := gateway.ReconnectCtx(ctx); err != nil {
|
if err := gateway.ReconnectCtx(ctx); err != nil {
|
||||||
t.Fatal("Unexpected error while reconnecting:", err)
|
t.Error("failed to reconnect Gateway:", err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
g.ErrorLog = func(err error) { log.Println(err) }
|
||||||
|
|
||||||
// Wait for the desired event:
|
// Wait for the desired event:
|
||||||
gotimeout(t, func() {
|
gotimeout(t, func() {
|
||||||
for ev := range gateway.Events {
|
for ev := range gateway.Events {
|
||||||
|
@ -118,7 +124,7 @@ func TestIntegration(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
if err := g.Close(); err != nil {
|
if err := g.Close(); err != nil {
|
||||||
t.Fatal("Failed to close Gateway:", err)
|
t.Fatal("failed to close Gateway:", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,7 +133,7 @@ func wait(t *testing.T, evCh chan interface{}) interface{} {
|
||||||
case ev := <-evCh:
|
case ev := <-evCh:
|
||||||
return ev
|
return ev
|
||||||
case <-time.After(20 * time.Second):
|
case <-time.After(20 * time.Second):
|
||||||
t.Fatal("Timed out waiting for event")
|
t.Fatal("timed out waiting for event")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -143,7 +149,7 @@ func gotimeout(t *testing.T, fn func()) {
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(20 * time.Second):
|
case <-time.After(20 * time.Second):
|
||||||
t.Fatal("Timed out waiting for function.")
|
t.Fatal("timed out waiting for function.")
|
||||||
case <-done:
|
case <-done:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,11 +51,11 @@ func (g *Gateway) HandleOP(op *wsutil.OP) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
case ReconnectOP:
|
case ReconnectOP:
|
||||||
// Server requests to reconnect, die and retry.
|
// Server requests to Reconnect, die and retry.
|
||||||
wsutil.WSDebug("ReconnectOP received.")
|
wsutil.WSDebug("ReconnectOP received.")
|
||||||
|
|
||||||
// Exit with the ReconnectOP error to force the heartbeat event loop to
|
// Exit with the ReconnectOP error to force the heartbeat event loop to
|
||||||
// reconnect synchronously. Not really a fatal error.
|
// Reconnect synchronously. Not really a fatal error.
|
||||||
return wsutil.ErrBrokenConnection(ErrReconnectRequest)
|
return wsutil.ErrBrokenConnection(ErrReconnectRequest)
|
||||||
|
|
||||||
case InvalidSessionOP:
|
case InvalidSessionOP:
|
||||||
|
@ -67,7 +67,7 @@ func (g *Gateway) HandleOP(op *wsutil.OP) error {
|
||||||
|
|
||||||
// Invalid session, try and Identify.
|
// Invalid session, try and Identify.
|
||||||
if err := g.IdentifyCtx(ctx); err != nil {
|
if err := g.IdentifyCtx(ctx); err != nil {
|
||||||
// Can't identify, reconnect.
|
// Can't identify, Reconnect.
|
||||||
return wsutil.ErrBrokenConnection(ErrReconnectRequest)
|
return wsutil.ErrBrokenConnection(ErrReconnectRequest)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -147,7 +147,7 @@ func (ws *Websocket) SendCtx(ctx context.Context, b []byte) error {
|
||||||
if err := ws.conn.Send(ctx, b); err != nil {
|
if err := ws.conn.Send(ctx, b); err != nil {
|
||||||
// We need to clean up ourselves if things are erroring out.
|
// We need to clean up ourselves if things are erroring out.
|
||||||
WSDebug("Conn: Error while sending; closing the connection. Error:", err)
|
WSDebug("Conn: Error while sending; closing the connection. Error:", err)
|
||||||
ws.close()
|
ws.close(false)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,50 +157,36 @@ func (ws *Websocket) SendCtx(ctx context.Context, b []byte) error {
|
||||||
// Close closes the websocket connection. It assumes that the Websocket is
|
// Close closes the websocket connection. It assumes that the Websocket is
|
||||||
// closed even when it returns an error. If the Websocket was already closed
|
// closed even when it returns an error. If the Websocket was already closed
|
||||||
// before, ErrWebsocketClosed will be returned.
|
// before, ErrWebsocketClosed will be returned.
|
||||||
func (ws *Websocket) Close() error {
|
func (ws *Websocket) Close() error { return ws.close(false) }
|
||||||
|
|
||||||
|
func (ws *Websocket) CloseGracefully() error { return ws.close(true) }
|
||||||
|
|
||||||
|
// close closes the Websocket without acquiring the mutex. Refer to Close for
|
||||||
|
// more information.
|
||||||
|
func (ws *Websocket) close(graceful bool) error {
|
||||||
WSDebug("Conn: Acquiring mutex lock to close...")
|
WSDebug("Conn: Acquiring mutex lock to close...")
|
||||||
|
|
||||||
ws.mutex.Lock()
|
ws.mutex.Lock()
|
||||||
defer ws.mutex.Unlock()
|
defer ws.mutex.Unlock()
|
||||||
|
|
||||||
WSDebug("Conn: Write mutex acquired; closing.")
|
|
||||||
|
|
||||||
return ws.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ws *Websocket) CloseGracefully() error {
|
|
||||||
WSDebug("Conn: Acquiring mutex lock to close gracefully...")
|
|
||||||
|
|
||||||
ws.mutex.Lock()
|
|
||||||
defer ws.mutex.Unlock()
|
|
||||||
|
|
||||||
WSDebug("Conn: Write mutex acquired")
|
WSDebug("Conn: Write mutex acquired")
|
||||||
|
|
||||||
if gc, ok := ws.conn.(GracefulCloser); ok {
|
|
||||||
if ws.closed {
|
|
||||||
WSDebug("Conn: Websocket is already closed.")
|
|
||||||
return ErrWebsocketClosed
|
|
||||||
}
|
|
||||||
|
|
||||||
WSDebug("Conn: closing gracefully")
|
|
||||||
|
|
||||||
ws.closed = true
|
|
||||||
return gc.CloseGracefully()
|
|
||||||
} else {
|
|
||||||
WSDebug("Conn: The Websocket's Connection does not provide graceful closure. Closing normally instead.")
|
|
||||||
return ws.close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// close closes the Websocket without acquiring the mutex. Refer to Close for
|
|
||||||
// more information.
|
|
||||||
func (ws *Websocket) close() error {
|
|
||||||
if ws.closed {
|
if ws.closed {
|
||||||
WSDebug("Conn: Websocket is already closed.")
|
WSDebug("Conn: Websocket is already closed.")
|
||||||
return ErrWebsocketClosed
|
return ErrWebsocketClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
err := ws.conn.Close()
|
|
||||||
ws.closed = true
|
ws.closed = true
|
||||||
return err
|
|
||||||
|
if graceful {
|
||||||
|
if gc, ok := ws.conn.(GracefulCloser); ok {
|
||||||
|
WSDebug("Conn: Closing gracefully")
|
||||||
|
return gc.CloseGracefully()
|
||||||
|
}
|
||||||
|
|
||||||
|
WSDebug("Conn: The Websocket's Connection does not support graceful closure.")
|
||||||
|
}
|
||||||
|
|
||||||
|
WSDebug("Conn: Closing")
|
||||||
|
return ws.conn.Close()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue