mirror of
https://github.com/diamondburned/arikawa.git
synced 2025-01-08 04:57:31 +00:00
485 lines
14 KiB
Go
485 lines
14 KiB
Go
// Package gateway handles the Discord gateway (or Websocket) connection, its
|
|
// events, and everything related to it. This includes logging into the
|
|
// Websocket.
|
|
//
|
|
// This package does not abstract events and function handlers; instead, it
|
|
// leaves that to the session package. This package exposes only a single Events
|
|
// channel.
|
|
package gateway
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"net/url"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/diamondburned/arikawa/v3/api"
|
|
"github.com/diamondburned/arikawa/v3/internal/lazytime"
|
|
"github.com/diamondburned/arikawa/v3/utils/ws"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
var (
|
|
Version = api.Version
|
|
Encoding = "json"
|
|
)
|
|
|
|
// deadbeatDuration is the duration that limits whether the gateway should
|
|
// resume or restart entirely. If it's less than this duration, then it's deemed
|
|
// resumable.
|
|
const deadbeatDuration = 15 * time.Minute
|
|
|
|
// CodeInvalidSequence is the code returned by Discord to signal that the given
|
|
// sequence number is invalid.
|
|
const CodeInvalidSequence = 4007
|
|
|
|
// CodeShardingRequired is the code returned by Discord to signal that the bot
|
|
// must reshard before proceeding. For more information, see
|
|
// https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-close-event-codes.
|
|
const CodeShardingRequired = 4011
|
|
|
|
// URL asks Discord for a Websocket URL to the Gateway.
|
|
func URL(ctx context.Context) (string, error) {
|
|
return api.GatewayURL(ctx)
|
|
}
|
|
|
|
// BotURL fetches the Gateway URL along with extra metadata. The token
|
|
// passed in will NOT be prefixed with Bot.
|
|
func BotURL(ctx context.Context, token string) (*api.BotData, error) {
|
|
return api.NewClient(token).WithContext(ctx).BotURL()
|
|
}
|
|
|
|
// AddGatewayParams appends into the given URL string the gateway URL
|
|
// parameters.
|
|
func AddGatewayParams(baseURL string) string {
|
|
param := url.Values{
|
|
"v": {Version},
|
|
"encoding": {Encoding},
|
|
}
|
|
|
|
return baseURL + "?" + param.Encode()
|
|
}
|
|
|
|
// State contains the gateway state. It is a piece of data that can be shared
|
|
// across gateways during construction to be used for resuming a connection or
|
|
// starting a new one with the previous data.
|
|
//
|
|
// The data structure itself is not thread-safe, so they may only be pulled from
|
|
// the gateway after it's done and set before it's done.
|
|
type State struct {
|
|
Identifier Identifier
|
|
SessionID string
|
|
Sequence int64
|
|
}
|
|
|
|
// Gateway describes an instance that handles the Discord gateway. It is
|
|
// basically an abstracted concurrent event loop that the user could signal to
|
|
// start connecting to the Discord gateway server.
|
|
type Gateway struct {
|
|
gateway *ws.Gateway
|
|
state State
|
|
|
|
// non-mutex-guarded states
|
|
// TODO: make lastBeat part of ws.Gateway so it can keep track of whether or
|
|
// not the websocket is dead.
|
|
beatMutex sync.Mutex
|
|
sentBeat time.Time
|
|
echoBeat time.Time
|
|
retryTimer lazytime.Timer
|
|
}
|
|
|
|
// NewWithIntents creates a new Gateway with the given intents and the default
|
|
// stdlib JSON driver. Refer to NewGatewayWithDriver and AddIntents.
|
|
func NewWithIntents(ctx context.Context, token string, intents ...Intents) (*Gateway, error) {
|
|
var allIntents Intents
|
|
for _, intent := range intents {
|
|
allIntents |= intent
|
|
}
|
|
|
|
g, err := New(ctx, token)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
g.AddIntents(allIntents)
|
|
return g, nil
|
|
}
|
|
|
|
// New creates a new Gateway to the default Discord server.
|
|
func New(ctx context.Context, token string) (*Gateway, error) {
|
|
return NewWithIdentifier(ctx, DefaultIdentifier(token))
|
|
}
|
|
|
|
// NewWithIdentifier creates a new Gateway with the given gateway identifier and
|
|
// the default everything. Sharded bots should prefer this function for the
|
|
// shared identifier. The given Identifier will be modified.
|
|
func NewWithIdentifier(ctx context.Context, id Identifier) (*Gateway, error) {
|
|
gatewayURL, err := id.QueryGateway(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
gatewayURL = AddGatewayParams(gatewayURL)
|
|
gateway := NewCustomWithIdentifier(gatewayURL, id, nil)
|
|
|
|
return gateway, nil
|
|
}
|
|
|
|
// NewCustom creates a new Gateway with a custom gateway URL and a new
|
|
// Identifier. Most bots connecting to the official server should not use these
|
|
// custom functions.
|
|
func NewCustom(gatewayURL, token string) *Gateway {
|
|
return NewCustomWithIdentifier(gatewayURL, DefaultIdentifier(token), nil)
|
|
}
|
|
|
|
// DefaultGatewayOpts contains the default options to be used for connecting to
|
|
// the gateway.
|
|
var DefaultGatewayOpts = ws.GatewayOpts{
|
|
ReconnectDelay: func(try int) time.Duration {
|
|
// minimum 4 seconds
|
|
return time.Duration(4+(2*try)) * time.Second
|
|
},
|
|
// FatalCloseCodes contains the default gateway close codes that will cause
|
|
// the gateway to exit. In other words, it's a list of unrecoverable close
|
|
// codes.
|
|
FatalCloseCodes: []int{
|
|
4004, // authentication failed
|
|
4010, // invalid shard sent
|
|
4011, // sharding required
|
|
4012, // invalid API version
|
|
4013, // invalid intents
|
|
4014, // disallowed intents
|
|
},
|
|
DialTimeout: 0,
|
|
ReconnectAttempt: 0,
|
|
AlwaysCloseGracefully: true,
|
|
}
|
|
|
|
// NewCustomWithIdentifier creates a new Gateway with a custom gateway URL and a
|
|
// pre-existing Identifier. If opts is nil, then DefaultGatewayOpts is used.
|
|
func NewCustomWithIdentifier(gatewayURL string, id Identifier, opts *ws.GatewayOpts) *Gateway {
|
|
return NewFromState(gatewayURL, State{Identifier: id}, opts)
|
|
}
|
|
|
|
// NewFromState creates a new gateway from the given state and optionally
|
|
// gateway options. If opts is nil, then DefaultGatewayOpts is used.
|
|
func NewFromState(gatewayURL string, state State, opts *ws.GatewayOpts) *Gateway {
|
|
if opts == nil {
|
|
opts = &DefaultGatewayOpts
|
|
}
|
|
|
|
gw := ws.NewGateway(ws.NewWebsocket(ws.NewCodec(OpUnmarshalers), gatewayURL), opts)
|
|
return &Gateway{
|
|
gateway: gw,
|
|
state: state,
|
|
}
|
|
}
|
|
|
|
// Opts returns a copy of the gateway options that are being used.
|
|
func (g *Gateway) Opts() *ws.GatewayOpts {
|
|
return g.gateway.Opts()
|
|
}
|
|
|
|
// State returns a copy of the gateway's internal state. It panics if the
|
|
// gateway is currently running.
|
|
func (g *Gateway) State() State {
|
|
g.gateway.AssertIsNotRunning()
|
|
return g.state
|
|
}
|
|
|
|
// SetState sets the gateway's state.
|
|
func (g *Gateway) SetState(state State) {
|
|
g.gateway.AssertIsNotRunning()
|
|
g.state = state
|
|
}
|
|
|
|
// AddIntents adds a Gateway Intent before connecting to the Gateway. This
|
|
// function will only work before Connect() is called. Calling it once Connect()
|
|
// is called will result in a panic.
|
|
func (g *Gateway) AddIntents(i Intents) {
|
|
g.gateway.AssertIsNotRunning()
|
|
g.state.Identifier.AddIntents(i)
|
|
}
|
|
|
|
// SentBeat returns the last time that the heart was beaten. If the gateway has
|
|
// never connected, then a zero-value time is returned.
|
|
func (g *Gateway) SentBeat() time.Time {
|
|
g.beatMutex.Lock()
|
|
defer g.beatMutex.Unlock()
|
|
|
|
return g.sentBeat
|
|
}
|
|
|
|
// EchoBeat returns the last time that the heartbeat was acknowledged. It is
|
|
// similar to SentBeat.
|
|
func (g *Gateway) EchoBeat() time.Time {
|
|
g.beatMutex.Lock()
|
|
defer g.beatMutex.Unlock()
|
|
|
|
return g.echoBeat
|
|
}
|
|
|
|
// Latency is a convenient function around SentBeat and EchoBeat. It subtracts
|
|
// the EchoBeat with the SentBeat.
|
|
func (g *Gateway) Latency() time.Duration {
|
|
g.beatMutex.Lock()
|
|
defer g.beatMutex.Unlock()
|
|
|
|
return g.echoBeat.Sub(g.sentBeat)
|
|
}
|
|
|
|
// LastError returns the last error that the gateway has received. It only
|
|
// returns a valid error if the gateway's event loop as exited. If the event
|
|
// loop hasn't been started AND stopped, the function will panic.
|
|
func (g *Gateway) LastError() error {
|
|
return g.gateway.LastError()
|
|
}
|
|
|
|
// Send is a function to send an Op payload to the Gateway.
|
|
func (g *Gateway) Send(ctx context.Context, data ws.Event) error {
|
|
return g.gateway.Send(ctx, data)
|
|
}
|
|
|
|
// Connect starts the background goroutine that tries its best to maintain a
|
|
// stable connection to the Discord gateway. To the user, the gateway should
|
|
// appear to be working seamlessly.
|
|
//
|
|
// Behaviors
|
|
//
|
|
// There are several behaviors that the gateway will overload onto the channel.
|
|
//
|
|
// Once the gateway has exited, fatally or not, the event channel returned by
|
|
// Connect will be closed. The user should therefore know whether or not the
|
|
// gateway has exited by spinning on the channel until it is closed.
|
|
//
|
|
// If Connect is called twice, the second call will return the same exact
|
|
// channel that the first call has made without starting any new goroutines,
|
|
// except if the gateway is already closed, then a new gateway will spin up with
|
|
// the existing gateway state.
|
|
//
|
|
// If the gateway stumbles upon any background errors, it will do its best to
|
|
// recover from it, but errors will be notified to the user using the
|
|
// BackgroundErrorEvent event. The user can type-assert the Op's data field,
|
|
// like so:
|
|
//
|
|
// switch data := ev.Data.(type) {
|
|
// case *gateway.BackgroundErrorEvent:
|
|
// log.Println("gateway error:", data.Error)
|
|
// }
|
|
//
|
|
// Closing
|
|
//
|
|
// As outlined in the first paragraph, closing the gateway would involve
|
|
// cancelling the context that's given to gateway. If AlwaysCloseGracefully is
|
|
// true (which it is by default), then the gateway is closed gracefully, and the
|
|
// session ID is invalidated.
|
|
//
|
|
// To wait until the gateway has completely successfully exited, the user can
|
|
// keep spinning on the event loop:
|
|
//
|
|
// for op := range ch {
|
|
// select op.Data.(type) {
|
|
// case *gateway.ReadyEvent:
|
|
// // Close the gateway on READY.
|
|
// cancel()
|
|
// }
|
|
// }
|
|
//
|
|
// // Gateway is now completely closed.
|
|
//
|
|
// To capture the final close errors, the user can use the Error method once the
|
|
// event channel is closed, like so:
|
|
//
|
|
// var err error
|
|
//
|
|
// for op := range ch {
|
|
// switch data := op.Data.(type) {
|
|
// case *gateway.ReadyEvent:
|
|
// cancel()
|
|
// }
|
|
// }
|
|
//
|
|
// // Gateway is now completely closed.
|
|
// if gateway.LastError() != nil {
|
|
// return gateway.LastError()
|
|
// }
|
|
//
|
|
func (g *Gateway) Connect(ctx context.Context) <-chan ws.Op {
|
|
return g.gateway.Connect(ctx, &gatewayImpl{Gateway: g})
|
|
}
|
|
|
|
type gatewayImpl struct {
|
|
*Gateway
|
|
heartrate time.Duration
|
|
lastSentBeat time.Time
|
|
}
|
|
|
|
func (g *gatewayImpl) invalidate() {
|
|
g.state.SessionID = ""
|
|
g.state.Sequence = 0
|
|
}
|
|
|
|
// sendIdentify sends off the Identify command with the Gateway's IdentifyData
|
|
// with the given context for timeout.
|
|
func (g *gatewayImpl) sendIdentify(ctx context.Context) error {
|
|
if err := g.state.Identifier.Wait(ctx); err != nil {
|
|
return errors.Wrap(err, "can't wait for identify()")
|
|
}
|
|
|
|
return g.gateway.Send(ctx, &g.state.Identifier.IdentifyCommand)
|
|
}
|
|
|
|
func (g *gatewayImpl) sendResume(ctx context.Context) error {
|
|
return g.gateway.Send(ctx, &ResumeCommand{
|
|
Token: g.state.Identifier.Token,
|
|
SessionID: g.state.SessionID,
|
|
Sequence: g.state.Sequence,
|
|
})
|
|
}
|
|
|
|
func (g *gatewayImpl) OnOp(ctx context.Context, op ws.Op) bool {
|
|
if op.Code == dispatchOp {
|
|
g.state.Sequence = op.Sequence
|
|
}
|
|
|
|
switch data := op.Data.(type) {
|
|
case *ws.CloseEvent:
|
|
if data.Code == CodeInvalidSequence {
|
|
// Invalid sequence.
|
|
g.invalidate()
|
|
}
|
|
|
|
g.gateway.QueueReconnect()
|
|
|
|
case *HelloEvent:
|
|
g.heartrate = data.HeartbeatInterval.Duration()
|
|
g.gateway.ResetHeartbeat(g.heartrate)
|
|
|
|
now := time.Now()
|
|
|
|
g.beatMutex.Lock()
|
|
// Determine that we shouldn't reconnect if the last time we've received
|
|
// a heart beat was over (deadbeatDuration) ago.
|
|
resumable := g.echoBeat.IsZero() || time.Since(g.echoBeat) < deadbeatDuration
|
|
// Reset gateway times.
|
|
g.echoBeat = time.Time{}
|
|
g.sentBeat = time.Time{}
|
|
// Set the last sent beat time so we can treat sending an Identify or
|
|
// Resume as sending a heartbeat.
|
|
g.lastSentBeat = now
|
|
g.beatMutex.Unlock()
|
|
|
|
// Send Discord either the Identify packet (if it's a fresh
|
|
// connection), or a Resume packet (if it's a dead connection).
|
|
if !resumable || g.state.SessionID == "" || g.state.Sequence == 0 {
|
|
// SessionID is empty, so this is a completely new session.
|
|
if err := g.sendIdentify(ctx); err != nil {
|
|
g.gateway.SendErrorWrap(err, "failed to send identify")
|
|
g.gateway.QueueReconnect()
|
|
}
|
|
} else {
|
|
if err := g.sendResume(ctx); err != nil {
|
|
g.gateway.SendErrorWrap(err, "failed to send resume")
|
|
g.gateway.QueueReconnect()
|
|
}
|
|
}
|
|
|
|
case *InvalidSessionEvent:
|
|
// Wipe the session state.
|
|
g.invalidate()
|
|
|
|
if !*data {
|
|
g.gateway.QueueReconnect()
|
|
break
|
|
}
|
|
|
|
// Discord expects us to wait before reconnecting.
|
|
g.retryTimer.Reset(time.Duration(rand.Intn(5)+1) * time.Second)
|
|
if err := g.retryTimer.Wait(ctx); err != nil {
|
|
g.gateway.SendErrorWrap(err, "failed to wait before identifying")
|
|
g.gateway.QueueReconnect()
|
|
break
|
|
}
|
|
|
|
// If we fail to identify, then the gateway cannot continue with
|
|
// a bad identification, since it's likely a user error.
|
|
if err := g.sendIdentify(ctx); err != nil {
|
|
g.gateway.SendErrorWrap(err, "failed to identify")
|
|
g.gateway.QueueReconnect()
|
|
break
|
|
}
|
|
|
|
case *HeartbeatCommand:
|
|
g.SendHeartbeat(ctx)
|
|
|
|
case *HeartbeatAckEvent:
|
|
g.useLastSentBeat()
|
|
|
|
case *ReconnectEvent:
|
|
g.gateway.QueueReconnect()
|
|
|
|
case *ReadyEvent:
|
|
g.state.SessionID = data.SessionID
|
|
g.useLastSentBeat()
|
|
|
|
case *ResumedEvent:
|
|
g.useLastSentBeat()
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (g *gatewayImpl) useLastSentBeat() {
|
|
now := time.Now()
|
|
|
|
g.beatMutex.Lock()
|
|
g.sentBeat = g.lastSentBeat
|
|
g.echoBeat = now
|
|
g.beatMutex.Unlock()
|
|
}
|
|
|
|
func (g *gatewayImpl) isDead() bool {
|
|
if g.heartrate == 0 {
|
|
return false
|
|
}
|
|
|
|
g.beatMutex.Lock()
|
|
defer g.beatMutex.Unlock()
|
|
|
|
if g.echoBeat.IsZero() {
|
|
// No ack received yet. We wait for a bit.
|
|
return false
|
|
}
|
|
|
|
// Allow 2 beats to miss before we declare dead.
|
|
return g.lastSentBeat.Sub(g.echoBeat) > 2*g.heartrate
|
|
}
|
|
|
|
// SendHeartbeat sends a heartbeat with the gateway's current sequence.
|
|
func (g *gatewayImpl) SendHeartbeat(ctx context.Context) {
|
|
g.lastSentBeat = time.Now()
|
|
|
|
// TODO: move this to ws.Gateway
|
|
if g.isDead() {
|
|
g.gateway.SendError(fmt.Errorf("heartbeat timed out"))
|
|
g.gateway.QueueReconnect()
|
|
return
|
|
}
|
|
|
|
sequence := HeartbeatCommand(g.state.Sequence)
|
|
if err := g.gateway.Send(ctx, &sequence); err != nil {
|
|
g.gateway.SendErrorWrap(err, "heartbeat error")
|
|
g.gateway.QueueReconnect()
|
|
return
|
|
}
|
|
}
|
|
|
|
// Close closes the state.
|
|
func (g *gatewayImpl) Close() error {
|
|
g.retryTimer.Stop()
|
|
g.invalidate()
|
|
return nil
|
|
}
|