1
0
Fork 0
mirror of https://github.com/diamondburned/arikawa.git synced 2024-11-19 21:32:49 +00:00
arikawa/gateway/gateway.go

509 lines
14 KiB
Go
Raw Normal View History

2020-01-15 04:56:50 +00:00
// Package gateway handles the Discord gateway (or Websocket) connection, its
// events, and everything related to it. This includes logging into the
// Websocket.
//
// This package does not abstract events and function handlers; instead, it
// leaves that to the session package. This package exposes only a single Events
// channel.
2020-01-15 04:43:34 +00:00
package gateway
import (
"context"
2020-04-19 16:17:04 +00:00
"net/http"
2020-01-15 04:43:34 +00:00
"net/url"
"strings"
"sync"
2020-01-15 04:43:34 +00:00
"time"
2020-10-28 22:39:59 +00:00
"github.com/diamondburned/arikawa/v2/api"
"github.com/diamondburned/arikawa/v2/discord"
"github.com/diamondburned/arikawa/v2/internal/moreatomic"
2020-10-28 22:39:59 +00:00
"github.com/diamondburned/arikawa/v2/utils/httputil"
"github.com/diamondburned/arikawa/v2/utils/json"
"github.com/diamondburned/arikawa/v2/utils/wsutil"
2020-01-15 04:43:34 +00:00
"github.com/pkg/errors"
)
var (
2020-01-15 04:43:34 +00:00
EndpointGateway = api.Endpoint + "gateway"
EndpointGatewayBot = api.EndpointGateway + "/bot"
Version = api.Version
2020-01-15 04:43:34 +00:00
Encoding = "json"
)
var (
2020-04-19 16:17:04 +00:00
ErrMissingForResume = errors.New("missing session ID or sequence for resuming")
ErrWSMaxTries = errors.New("max tries reached")
2020-01-15 04:43:34 +00:00
)
// BotData contains the GatewayURL as well as extra metadata on how to
2020-04-19 16:17:04 +00:00
// shard bots.
type BotData struct {
2020-04-19 16:17:04 +00:00
URL string `json:"url"`
Shards int `json:"shards,omitempty"`
StartLimit *SessionStartLimit `json:"session_start_limit"`
}
// SessionStartLimit is the information on the current session start limit. It's
// used in BotData.
2020-04-19 16:17:04 +00:00
type SessionStartLimit struct {
Total int `json:"total"`
Remaining int `json:"remaining"`
ResetAfter discord.Milliseconds `json:"reset_after"`
MaxConcurrency int `json:"max_concurrency"`
2020-04-19 16:17:04 +00:00
}
2020-04-19 21:53:53 +00:00
// URL asks Discord for a Websocket URL to the Gateway.
func URL() (string, error) {
var g BotData
2020-04-19 16:17:04 +00:00
2020-04-19 21:53:53 +00:00
return g.URL, httputil.NewClient().RequestJSON(
2020-04-19 16:17:04 +00:00
&g, "GET",
EndpointGateway,
)
}
2020-01-15 04:43:34 +00:00
2020-04-19 21:53:53 +00:00
// BotURL fetches the Gateway URL along with extra metadata. The token
2020-04-19 16:17:04 +00:00
// passed in will NOT be prefixed with Bot.
func BotURL(token string) (*BotData, error) {
var g *BotData
2020-04-19 16:17:04 +00:00
2020-04-19 21:53:53 +00:00
return g, httputil.NewClient().RequestJSON(
2020-04-19 16:17:04 +00:00
&g, "GET",
EndpointGatewayBot,
httputil.WithHeaders(http.Header{
"Authorization": {token},
}),
)
2020-01-15 04:43:34 +00:00
}
type Gateway struct {
WS *wsutil.Websocket
// WSTimeout is a timeout for an arbitrary action. An example of this is the
// timeout for Start and the timeout for sending each Gateway command
// independently.
2020-01-15 04:43:34 +00:00
WSTimeout time.Duration
// ReconnectTimeout is the timeout used during reconnection.
// If the a connection to the gateway can't be established before the
// duration passes, the Gateway will be closed and FatalErrorCallback will
// be called.
//
// Setting this to 0 is equivalent to no timeout.
ReconnectTimeout time.Duration
2020-01-15 04:43:34 +00:00
2020-01-15 04:56:50 +00:00
// All events sent over are pointers to Event structs (structs suffixed with
// "Event"). This shouldn't be accessed if the Gateway is created with a
// Session.
2020-01-15 04:56:50 +00:00
Events chan Event
2020-01-15 04:43:34 +00:00
sessionMu sync.RWMutex
sessionID string
2020-01-15 04:43:34 +00:00
2020-01-15 07:34:18 +00:00
Identifier *Identifier
Sequence *moreatomic.Int64
PacerLoop wsutil.PacemakerLoop
2020-01-15 04:43:34 +00:00
ErrorLog func(err error) // default to log.Println
// FatalErrorCallback is called, if the Gateway exits fatally. At the point
// of calling, the gateway will be already closed.
//
// Currently this will only be called, if the ReconnectTimeout was changed
// to a definite timeout, and connection could not be established during
// that time.
// err will be ErrWSMaxTries in that case.
//
// Defaults to noop.
FatalErrorCallback func(err error)
// AfterClose is called after each close. Error can be non-nil, as this is
// called even when the Gateway is gracefully closed. It's used mainly for
// reconnections or any type of connection interruptions.
AfterClose func(err error) // noop by default
waitGroup sync.WaitGroup
2020-01-15 04:43:34 +00:00
}
// NewGatewayWithIntents creates a new Gateway with the given intents and the
// default stdlib JSON driver. Refer to NewGatewayWithDriver and AddIntents.
func NewGatewayWithIntents(token string, intents ...Intents) (*Gateway, error) {
g, err := NewGateway(token)
if err != nil {
return nil, err
}
for _, intent := range intents {
g.AddIntents(intent)
}
return g, nil
}
// NewGateway creates a new Gateway to the default Discord server.
2020-01-15 04:43:34 +00:00
func NewGateway(token string) (*Gateway, error) {
return NewIdentifiedGateway(DefaultIdentifier(token))
}
// NewIdentifiedGateway creates a new Gateway with the given gateway identifier
// and the default everything. Sharded bots should prefer this function for the
// shared identifier.
func NewIdentifiedGateway(id *Identifier) (*Gateway, error) {
var gatewayURL string
var botData *BotData
var err error
if strings.HasPrefix(id.Token, "Bot ") {
botData, err = BotURL(id.Token)
if err != nil {
return nil, errors.Wrap(err, "failed to get bot data")
}
gatewayURL = botData.URL
} else {
gatewayURL, err = URL()
if err != nil {
return nil, errors.Wrap(err, "failed to get gateway endpoint")
}
2020-01-15 04:43:34 +00:00
}
// Parameters for the gateway
param := url.Values{
"v": {Version},
"encoding": {Encoding},
}
2020-01-15 04:43:34 +00:00
// Append the form to the URL
gatewayURL += "?" + param.Encode()
gateway := NewCustomIdentifiedGateway(gatewayURL, id)
// Use the supplied connect rate limit, if any.
if botData != nil && botData.StartLimit != nil {
resetAt := time.Now().Add(botData.StartLimit.ResetAfter.Duration())
limiter := gateway.Identifier.IdentifyGlobalLimit
// Update the burst to be the current given time and reset it back to
// the default when the given time is reached.
limiter.SetBurst(botData.StartLimit.Remaining)
limiter.SetBurstAt(resetAt, botData.StartLimit.Total)
2020-01-15 04:43:34 +00:00
// Update the maximum number of identify requests allowed per 5s.
gateway.Identifier.IdentifyShortLimit.SetBurst(botData.StartLimit.MaxConcurrency)
}
return gateway, nil
2020-04-19 16:17:04 +00:00
}
2020-01-15 04:43:34 +00:00
// NewCustomGateway creates a new Gateway with a custom gateway URL and a new
// Identifier. Most bots connecting to the official server should not use these
// custom functions.
func NewCustomGateway(gatewayURL, token string) *Gateway {
return NewCustomIdentifiedGateway(gatewayURL, DefaultIdentifier(token))
}
// NewCustomIdentifiedGateway creates a new Gateway with a custom gateway URL
// and a pre-existing Identifier. Refer to NewCustomGateway.
func NewCustomIdentifiedGateway(gatewayURL string, id *Identifier) *Gateway {
2020-04-19 16:17:04 +00:00
return &Gateway{
WS: wsutil.NewCustom(wsutil.NewConn(), gatewayURL),
WSTimeout: wsutil.WSTimeout,
Events: make(chan Event, wsutil.WSBuffer),
Identifier: id,
Sequence: moreatomic.NewInt64(0),
ErrorLog: wsutil.WSError,
2020-04-19 16:17:04 +00:00
AfterClose: func(error) {},
}
2020-01-15 04:43:34 +00:00
}
// AddIntents adds a Gateway Intent before connecting to the Gateway. As such,
// this function will only work before Open() is called.
func (g *Gateway) AddIntents(i Intents) {
g.Identifier.Intents |= i
}
// HasIntents reports if the Gateway has the passed Intents.
//
// If no intents are set, i.e. if using a user account HasIntents will always
// return true.
func (g *Gateway) HasIntents(intents Intents) bool {
if g.Identifier.Intents == 0 {
return true
}
return g.Identifier.Intents.Has(intents)
}
2020-01-15 04:43:34 +00:00
// Close closes the underlying Websocket connection.
func (g *Gateway) Close() error {
wsutil.WSDebug("Trying to close. Pacemaker check skipped.")
wsutil.WSDebug("Closing the Websocket...")
err := g.WS.Close()
if errors.Is(err, wsutil.ErrWebsocketClosed) {
wsutil.WSDebug("Websocket already closed.")
return nil
}
2021-01-01 07:48:29 +00:00
// Explicitly signal the pacemaker loop to stop. We should do this in case
// the Start function exited before it could bind the event channel into the
// loop.
g.PacerLoop.Stop()
wsutil.WSDebug("Websocket closed; error:", err)
wsutil.WSDebug("Waiting for the Pacemaker loop to exit.")
g.waitGroup.Wait()
wsutil.WSDebug("Pacemaker loop exited.")
g.AfterClose(err)
wsutil.WSDebug("AfterClose callback finished.")
return err
2020-01-15 04:43:34 +00:00
}
// CloseGracefully attempts to close the gateway connection gracefully, by
// sending a closing frame before ending the connection. This will cause the
// gateway's session id to be rendered invalid.
//
// Note that a graceful closure is only possible, if the wsutil.Connection of
// the Gateway's Websocket implements wsutil.GracefulCloser.
func (g *Gateway) CloseGracefully() error {
err := g.WS.CloseGracefully()
if errors.Is(err, wsutil.ErrWebsocketClosed) {
wsutil.WSDebug("Websocket already closed.")
return nil
}
// Stop the pacemaker loop; This shouldn't error, so return is ignored
g.WS.Close()
return err
}
// SessionID returns the session ID received after Ready. This function is
// concurrently safe.
func (g *Gateway) SessionID() string {
g.sessionMu.RLock()
defer g.sessionMu.RUnlock()
return g.sessionID
}
// Reconnect tries to reconnect until the ReconnectTimeout is reached, or if
// set to 0 reconnects indefinitely.
func (g *Gateway) Reconnect() {
ctx := context.Background()
if g.ReconnectTimeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(context.Background(), g.WSTimeout)
defer cancel()
}
// ignore the error, it is already logged and FatalErrorCallback was called
g.ReconnectCtx(ctx)
}
// ReconnectCtx attempts to reconnect until context expires.
// If the context expires FatalErrorCallback will be called with ErrWSMaxTries,
// and the last error returned by Open will be returned.
func (g *Gateway) ReconnectCtx(ctx context.Context) (err error) {
wsutil.WSDebug("Reconnecting...")
// Guarantee the gateway is already closed. Ignore its error, as we're
// redialing anyway.
g.Close()
2020-01-29 03:54:22 +00:00
for i := 1; ; i++ {
select {
case <-ctx.Done():
g.FatalErrorCallback(ErrWSMaxTries)
return err
default:
}
wsutil.WSDebug("Trying to dial, attempt", i)
2020-01-17 22:29:13 +00:00
// Condition: err == ErrInvalidSession:
// If the connection is rate limited (documented behavior):
// https://discord.com/developers/docs/topics/gateway#rate-limiting
2020-01-19 06:34:52 +00:00
// make sure we don't overwrite our last error
if err = g.OpenContext(ctx); err != nil {
g.ErrorLog(err)
continue
2020-01-17 22:29:13 +00:00
}
wsutil.WSDebug("Started after attempt:", i)
return
}
}
2020-01-17 22:29:13 +00:00
// Open connects to the Websocket and authenticate it. You should usually use
// this function over Start().
func (g *Gateway) Open() error {
ctx, cancel := context.WithTimeout(context.Background(), g.WSTimeout)
defer cancel()
return g.OpenContext(ctx)
}
// OpenContext connects to the Websocket and authenticates it. You should
// usually use this function over Start(). The given context provides
// cancellation and timeout.
func (g *Gateway) OpenContext(ctx context.Context) error {
// Reconnect to the Gateway
if err := g.WS.Dial(ctx); err != nil {
2020-05-16 21:14:49 +00:00
return errors.Wrap(err, "failed to reconnect")
}
2020-01-17 22:29:13 +00:00
wsutil.WSDebug("Trying to start...")
2020-01-17 22:29:13 +00:00
// Try to resume the connection
if err := g.StartCtx(ctx); err != nil {
return err
2020-01-17 22:29:13 +00:00
}
// Started successfully, return
return nil
2020-01-15 04:43:34 +00:00
}
// Start calls StartCtx with a background context. You wouldn't usually use this
// function, but Open() instead.
2020-01-15 04:43:34 +00:00
func (g *Gateway) Start() error {
ctx, cancel := context.WithTimeout(context.Background(), g.WSTimeout)
defer cancel()
return g.StartCtx(ctx)
}
// StartCtx authenticates with the websocket, or resume from a dead Websocket
// connection. You wouldn't usually use this function, but OpenCtx() instead.
func (g *Gateway) StartCtx(ctx context.Context) error {
if err := g.start(ctx); err != nil {
wsutil.WSDebug("Start failed:", err)
// Close can be called with the mutex still acquired here, as the
// pacemaker hasn't started yet.
if err := g.Close(); err != nil {
wsutil.WSDebug("Failed to close after start fail:", err)
}
return err
}
return nil
}
func (g *Gateway) start(ctx context.Context) error {
2020-01-15 04:43:34 +00:00
// This is where we'll get our events
ch := g.WS.Listen()
// Create a new Hello event and wait for it.
2020-01-15 04:43:34 +00:00
var hello HelloEvent
// Wait for an OP 10 Hello.
select {
case e, ok := <-ch:
if !ok {
return errors.New("unexpected ws close while waiting for Hello")
}
if _, err := wsutil.AssertEvent(e, HelloOP, &hello); err != nil {
return errors.Wrap(err, "error at Hello")
}
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "failed to wait for Hello event")
2020-01-15 04:43:34 +00:00
}
wsutil.WSDebug("Hello received; duration:", hello.HeartbeatInterval)
// Start the event handler, which also handles the pacemaker death signal.
g.waitGroup.Add(1)
// Use the pacemaker loop.
g.PacerLoop.StartBeating(hello.HeartbeatInterval.Duration(), g, func(err error) {
g.waitGroup.Done() // mark so Close() can exit.
wsutil.WSDebug("Event loop stopped with error:", err)
// Bail if there is no error or if the error is an explicit close, as
// there might be an ongoing reconnection.
if err == nil || errors.Is(err, wsutil.ErrWebsocketClosed) {
return
}
// Only attempt to reconnect if we have a session ID at all. We may not
// have one if we haven't even connected successfully once.
if g.SessionID() != "" {
g.ErrorLog(err)
g.Reconnect()
}
})
2020-01-15 04:43:34 +00:00
// Send Discord either the Identify packet (if it's a fresh connection), or
// a Resume packet (if it's a dead connection).
if g.SessionID() == "" {
2020-01-15 04:43:34 +00:00
// SessionID is empty, so this is a completely new session.
if err := g.IdentifyCtx(ctx); err != nil {
2020-05-16 21:14:49 +00:00
return errors.Wrap(err, "failed to identify")
2020-01-15 04:43:34 +00:00
}
} else {
if err := g.ResumeCtx(ctx); err != nil {
2020-05-16 21:14:49 +00:00
return errors.Wrap(err, "failed to resume")
2020-01-15 04:43:34 +00:00
}
}
// Expect either READY or RESUMED before continuing.
wsutil.WSDebug("Waiting for either READY or RESUMED.")
2020-04-19 16:17:04 +00:00
// WaitForEvent should
err := wsutil.WaitForEvent(ctx, g, ch, func(op *wsutil.OP) bool {
switch op.EventName {
case "READY":
wsutil.WSDebug("Found READY event.")
return true
case "RESUMED":
wsutil.WSDebug("Found RESUMED event.")
return true
}
return false
})
if err != nil {
2020-05-16 21:14:49 +00:00
return errors.Wrap(err, "first error")
}
// Bind the event channel to the pacemaker loop.
g.PacerLoop.SetEventChannel(ch)
2020-01-15 04:43:34 +00:00
wsutil.WSDebug("Started successfully.")
return nil
}
2020-01-15 04:43:34 +00:00
// SendCtx is a low-level function to send an OP payload to the Gateway. Most
// users shouldn't touch this, unless they know what they're doing.
func (g *Gateway) SendCtx(ctx context.Context, code OPCode, v interface{}) error {
var op = wsutil.OP{
2020-01-15 04:43:34 +00:00
Code: code,
}
if v != nil {
b, err := json.Marshal(v)
2020-01-15 04:43:34 +00:00
if err != nil {
2020-05-16 21:14:49 +00:00
return errors.Wrap(err, "failed to encode v")
2020-01-15 04:43:34 +00:00
}
op.Data = b
}
b, err := json.Marshal(op)
2020-01-15 04:43:34 +00:00
if err != nil {
2020-05-16 21:14:49 +00:00
return errors.Wrap(err, "failed to encode payload")
2020-01-15 04:43:34 +00:00
}
// WS should already be thread-safe.
return g.WS.SendCtx(ctx, b)
2020-01-15 04:43:34 +00:00
}