1
0
Fork 0
mirror of https://github.com/diamondburned/arikawa.git synced 2024-12-11 16:05:00 +00:00
arikawa/gateway/gateway.go
diamondburned 54cadd2f45 gateway: Refactor for a better concurrent API
This commit refactors the whole package gateway as well as utils/ws
(formerly utils/wsutil) and voice/voicegateway. The new refactor
utilizes a design pattern involving a concurrent loop and an arriving
event channel.

An additional change was made to the way gateway events are typed.
Before, pretty much any type will satisfy a gateway event type, since
the actual type was just interface{}. The new refactor defines a
concrete interface that events can implement:

    type Event interface {
        Op() OpCode
        EventType() EventType
    }

Using this interface, the user can easily add custom gateway events
independently of the library without relying on string maps. This adds a
lot of type safety into the library and makes type-switching on Event
types much more reasonable.

Gateway error callbacks are also almost entirely removed in favor of
custom gateway events. A catch-all can easily be added like this:

    s.AddHandler(func(err error) {
        log.Println("gateway error:, err")
    })
2021-12-14 13:49:34 -08:00

427 lines
12 KiB
Go

// Package gateway handles the Discord gateway (or Websocket) connection, its
// events, and everything related to it. This includes logging into the
// Websocket.
//
// This package does not abstract events and function handlers; instead, it
// leaves that to the session package. This package exposes only a single Events
// channel.
package gateway
import (
"context"
"math/rand"
"net/url"
"sync"
"time"
"github.com/diamondburned/arikawa/v3/api"
"github.com/diamondburned/arikawa/v3/internal/lazytime"
"github.com/diamondburned/arikawa/v3/utils/ws"
"github.com/pkg/errors"
)
var (
Version = api.Version
Encoding = "json"
)
// CodeInvalidSequence is the code returned by Discord to signal that the given
// sequence number is invalid.
const CodeInvalidSequence = 4007
// CodeShardingRequired is the code returned by Discord to signal that the bot
// must reshard before proceeding. For more information, see
// https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-close-event-codes.
const CodeShardingRequired = 4011
// URL asks Discord for a Websocket URL to the Gateway.
func URL(ctx context.Context) (string, error) {
return api.GatewayURL(ctx)
}
// BotURL fetches the Gateway URL along with extra metadata. The token
// passed in will NOT be prefixed with Bot.
func BotURL(ctx context.Context, token string) (*api.BotData, error) {
return api.NewClient(token).WithContext(ctx).BotURL()
}
// AddGatewayParams appends into the given URL string the gateway URL
// parameters.
func AddGatewayParams(baseURL string) string {
param := url.Values{
"v": {Version},
"encoding": {Encoding},
}
return baseURL + "?" + param.Encode()
}
// State contains the gateway state. It is a piece of data that can be shared
// across gateways during construction to be used for resuming a connection or
// starting a new one with the previous data.
//
// The data structure itself is not thread-safe, so they may only be pulled from
// the gateway after it's done and set before it's done.
type State struct {
Identifier Identifier
SessionID string
Sequence int64
}
// Gateway describes an instance that handles the Discord gateway. It is
// basically an abstracted concurrent event loop that the user could signal to
// start connecting to the Discord gateway server.
type Gateway struct {
gateway *ws.Gateway
state State
// non-mutex-guarded states
// TODO: make lastBeat part of ws.Gateway so it can keep track of whether or
// not the websocket is dead.
beatMutex sync.Mutex
sentBeat time.Time
echoBeat time.Time
retryTimer lazytime.Timer
}
// NewWithIntents creates a new Gateway with the given intents and the default
// stdlib JSON driver. Refer to NewGatewayWithDriver and AddIntents.
func NewWithIntents(ctx context.Context, token string, intents ...Intents) (*Gateway, error) {
var allIntents Intents
for _, intent := range intents {
allIntents |= intent
}
g, err := New(ctx, token)
if err != nil {
return nil, err
}
g.AddIntents(allIntents)
return g, nil
}
// New creates a new Gateway to the default Discord server.
func New(ctx context.Context, token string) (*Gateway, error) {
return NewWithIdentifier(ctx, DefaultIdentifier(token))
}
// NewWithIdentifier creates a new Gateway with the given gateway identifier and
// the default everything. Sharded bots should prefer this function for the
// shared identifier. The given Identifier will be modified.
func NewWithIdentifier(ctx context.Context, id Identifier) (*Gateway, error) {
gatewayURL, err := id.QueryGateway(ctx)
if err != nil {
return nil, err
}
gatewayURL = AddGatewayParams(gatewayURL)
gateway := NewCustomWithIdentifier(gatewayURL, id, nil)
return gateway, nil
}
// NewCustom creates a new Gateway with a custom gateway URL and a new
// Identifier. Most bots connecting to the official server should not use these
// custom functions.
func NewCustom(gatewayURL, token string) *Gateway {
return NewCustomWithIdentifier(gatewayURL, DefaultIdentifier(token), nil)
}
// DefaultGatewayOpts contains the default options to be used for connecting to
// the gateway.
var DefaultGatewayOpts = ws.GatewayOpts{
ReconnectDelay: func(try int) time.Duration {
// minimum 4 seconds
return time.Duration(4+(2*try)) * time.Second
},
// FatalCloseCodes contains the default gateway close codes that will cause
// the gateway to exit. In other words, it's a list of unrecoverable close
// codes.
FatalCloseCodes: []int{
4003, // not authenticated
4004, // authentication failed
4010, // invalid shard sent
4011, // sharding required
4012, // invalid API version
4013, // invalid intents
4014, // disallowed intents
},
DialTimeout: 0,
ReconnectAttempt: 0,
AlwaysCloseGracefully: true,
}
// NewCustomWithIdentifier creates a new Gateway with a custom gateway URL and a
// pre-existing Identifier. If opts is nil, then DefaultGatewayOpts is used.
func NewCustomWithIdentifier(gatewayURL string, id Identifier, opts *ws.GatewayOpts) *Gateway {
return NewFromState(gatewayURL, State{Identifier: id}, opts)
}
// NewFromState creates a new gateway from the given state and optionally
// gateway options. If opts is nil, then DefaultGatewayOpts is used.
func NewFromState(gatewayURL string, state State, opts *ws.GatewayOpts) *Gateway {
if opts == nil {
opts = &DefaultGatewayOpts
}
gw := ws.NewGateway(ws.NewWebsocket(ws.NewCodec(OpUnmarshalers), gatewayURL), opts)
return &Gateway{
gateway: gw,
state: state,
}
}
// State returns a copy of the gateway's internal state. It panics if the
// gateway is currently running.
func (g *Gateway) State() State {
g.gateway.AssertIsNotRunning()
return g.state
}
// SetState sets the gateway's state.
func (g *Gateway) SetState(state State) {
g.gateway.AssertIsNotRunning()
g.state = state
}
// AddIntents adds a Gateway Intent before connecting to the Gateway. This
// function will only work before Connect() is called. Calling it once Connect()
// is called will result in a panic.
func (g *Gateway) AddIntents(i Intents) {
g.gateway.AssertIsNotRunning()
g.state.Identifier.AddIntents(i)
}
// SentBeat returns the last time that the heart was beaten. If the gateway has
// never connected, then a zero-value time is returned.
func (g *Gateway) SentBeat() time.Time {
g.beatMutex.Lock()
defer g.beatMutex.Unlock()
return g.sentBeat
}
// EchoBeat returns the last time that the heartbeat was acknowledged. It is
// similar to SentBeat.
func (g *Gateway) EchoBeat() time.Time {
g.beatMutex.Lock()
defer g.beatMutex.Unlock()
return g.echoBeat
}
// Latency is a convenient function around SentBeat and EchoBeat. It subtracts
// the EchoBeat with the SentBeat.
func (g *Gateway) Latency() time.Duration {
g.beatMutex.Lock()
defer g.beatMutex.Unlock()
return g.echoBeat.Sub(g.sentBeat)
}
// LastError returns the last error that the gateway has received. It only
// returns a valid error if the gateway's event loop as exited. If the event
// loop hasn't been started AND stopped, the function will panic.
func (g *Gateway) LastError() error {
return g.gateway.LastError()
}
// Send is a function to send an Op payload to the Gateway.
func (g *Gateway) Send(ctx context.Context, data ws.Event) error {
return g.gateway.Send(ctx, data)
}
// Connect starts the background goroutine that tries its best to maintain a
// stable connection to the Discord gateway. To the user, the gateway should
// appear to be working seamlessly.
//
// Behaviors
//
// There are several behaviors that the gateway will overload onto the channel.
//
// Once the gateway has exited, fatally or not, the event channel returned by
// Connect will be closed. The user should therefore know whether or not the
// gateway has exited by spinning on the channel until it is closed.
//
// If Connect is called twice, the second call will return the same exact
// channel that the first call has made without starting any new goroutines,
// except if the gateway is already closed, then a new gateway will spin up with
// the existing gateway state.
//
// If the gateway stumbles upon any background errors, it will do its best to
// recover from it, but errors will be notified to the user using the
// BackgroundErrorEvent event. The user can type-assert the Op's data field,
// like so:
//
// switch data := ev.Data.(type) {
// case *gateway.BackgroundErrorEvent:
// log.Println("gateway error:", data.Error)
// }
//
// Closing
//
// As outlined in the first paragraph, closing the gateway would involve
// cancelling the context that's given to gateway. If AlwaysCloseGracefully is
// true (which it is by default), then the gateway is closed gracefully, and the
// session ID is invalidated.
//
// To wait until the gateway has completely successfully exited, the user can
// keep spinning on the event loop:
//
// for op := range ch {
// select op.Data.(type) {
// case *gateway.ReadyEvent:
// // Close the gateway on READY.
// cancel()
// }
// }
//
// // Gateway is now completely closed.
//
// To capture the final close errors, the user can use the Error method once the
// event channel is closed, like so:
//
// var err error
//
// for op := range ch {
// switch data := op.Data.(type) {
// case *gateway.ReadyEvent:
// cancel()
// }
// }
//
// // Gateway is now completely closed.
// if gateway.LastError() != nil {
// return gateway.LastError()
// }
//
func (g *Gateway) Connect(ctx context.Context) <-chan ws.Op {
return g.gateway.Connect(ctx, &gatewayImpl{Gateway: g})
}
type gatewayImpl struct {
*Gateway
lastSentBeat time.Time
}
func (g *gatewayImpl) invalidate() {
g.state.SessionID = ""
g.state.Sequence = 0
}
// sendIdentify sends off the Identify command with the Gateway's IdentifyData
// with the given context for timeout.
func (g *gatewayImpl) sendIdentify(ctx context.Context) error {
if err := g.state.Identifier.Wait(ctx); err != nil {
return errors.Wrap(err, "can't wait for identify()")
}
return g.gateway.Send(ctx, &g.state.Identifier.IdentifyCommand)
}
func (g *gatewayImpl) sendResume(ctx context.Context) error {
return g.gateway.Send(ctx, &ResumeCommand{
Token: g.state.Identifier.Token,
SessionID: g.state.SessionID,
Sequence: g.state.Sequence,
})
}
func (g *gatewayImpl) OnOp(ctx context.Context, op ws.Op) bool {
if op.Code == dispatchOp {
g.state.Sequence = op.Sequence
}
switch data := op.Data.(type) {
case *ws.CloseEvent:
if data.Code == CodeInvalidSequence {
// Invalid sequence.
g.invalidate()
}
g.gateway.QueueReconnect()
case *HelloEvent:
g.gateway.ResetHeartbeat(data.HeartbeatInterval.Duration())
// Send Discord either the Identify packet (if it's a fresh
// connection), or a Resume packet (if it's a dead connection).
if g.state.SessionID == "" || g.state.Sequence == 0 {
// SessionID is empty, so this is a completely new session.
if err := g.sendIdentify(ctx); err != nil {
g.gateway.SendErrorWrap(err, "failed to send identify")
g.gateway.QueueReconnect()
}
} else {
if err := g.sendResume(ctx); err != nil {
g.gateway.SendErrorWrap(err, "failed to send resume")
g.gateway.QueueReconnect()
}
}
case *InvalidSessionEvent:
// Wipe the session state.
g.invalidate()
if !*data {
g.gateway.QueueReconnect()
break
}
// Discord expects us to wait before reconnecting.
g.retryTimer.Reset(time.Duration(rand.Intn(5)+1) * time.Second)
if err := g.retryTimer.Wait(ctx); err != nil {
g.gateway.SendErrorWrap(err, "failed to wait before identifying")
g.gateway.QueueReconnect()
break
}
// If we fail to identify, then the gateway cannot continue with
// a bad identification, since it's likely a user error.
if err := g.sendIdentify(ctx); err != nil {
g.gateway.SendErrorWrap(err, "failed to identify")
g.gateway.QueueReconnect()
break
}
case *HeartbeatCommand:
g.SendHeartbeat(ctx)
case *HeartbeatAckEvent:
now := time.Now()
g.beatMutex.Lock()
g.sentBeat = g.lastSentBeat
g.echoBeat = now
g.beatMutex.Unlock()
case *ReconnectEvent:
g.gateway.QueueReconnect()
case *ReadyEvent:
g.state.SessionID = data.SessionID
}
return true
}
// SendHeartbeat sends a heartbeat with the gateway's current sequence.
func (g *gatewayImpl) SendHeartbeat(ctx context.Context) {
g.lastSentBeat = time.Now()
sequence := HeartbeatCommand(g.state.Sequence)
if err := g.gateway.Send(ctx, &sequence); err != nil {
g.gateway.SendErrorWrap(err, "heartbeat error")
g.gateway.QueueReconnect()
}
}
// Close closes the state.
func (g *gatewayImpl) Close() error {
g.retryTimer.Stop()
g.invalidate()
return nil
}