1
0
Fork 0
mirror of https://github.com/diamondburned/arikawa.git synced 2025-07-23 13:20:51 +00:00

Compare commits

...

6 commits

Author SHA1 Message Date
diamondburned 63310fdc95 wsutil: Added graceful close checking 2020-10-30 13:54:35 -07:00
Maximilian von Lindern 607250ae55
Gateway: Added reconnect timeout; fixed UA (#154)
* Gateway: use gateway version 8

* API: remove old v0.0.1 version tag

* Discord: fix typos

* Gateway: add timeout

* Gateway: revert to returning errors on ReconnectCtx
2020-10-30 13:41:04 -07:00
diamondburned f3372e016a Gateway: Added missing intents in tests 2020-10-30 12:00:41 -07:00
diamondburned 2a2244c965 Gateway: Use API v8 version instead of its own v6 2020-10-30 11:24:10 -07:00
diamondburned 0ead315aa3 Gateway: Fixed race in test Fatal 2020-10-30 11:15:58 -07:00
diamondburned 16c1658163 {,Voice}Gateway: Refactored Closing
This commit refactors both wsutil, the normal Gateway and the Voice
Gateway to have better closing behavior, which should assume less and
cover edge cases completely.
2020-10-30 11:02:37 -07:00
9 changed files with 107 additions and 192 deletions

View file

@ -21,7 +21,7 @@ var (
EndpointGatewayBot = EndpointGateway + "/bot"
)
var UserAgent = "DiscordBot (https://github.com/diamondburned/arikawa/v2, v0.0.1)"
var UserAgent = "DiscordBot (https://github.com/diamondburned/arikawa/v2)"
type Client struct {
*httputil.Client

View file

@ -128,7 +128,7 @@ const (
OverwriteMember
)
// UnmarshalJSON unmarshals both a string-quoteed number and a regular number
// UnmarshalJSON unmarshalls both a string-quoted number and a regular number
// into OverwriteType. We need to do this because Discord is so bad that they
// can't even handle 1s and 0s properly.
func (otype *OverwriteType) UnmarshalJSON(b []byte) error {

View file

@ -26,9 +26,8 @@ var (
EndpointGateway = api.Endpoint + "gateway"
EndpointGatewayBot = api.EndpointGateway + "/bot"
Version = "6"
Version = api.Version
Encoding = "json"
// Compress = "zlib-stream"
)
var (
@ -79,6 +78,13 @@ func BotURL(token string) (*BotData, error) {
type Gateway struct {
WS *wsutil.Websocket
WSTimeout time.Duration
// ReconnectTimeout is the timeout used during reconnection.
// If the a connection to the gateway can't be established before the
// duration passes, the Gateway will be closed and FatalErrorCallback will
// be called.
//
// Setting this to 0 is equivalent to no timeout.
ReconnectTimeout time.Duration
// All events sent over are pointers to Event structs (structs suffixed with
// "Event"). This shouldn't be accessed if the Gateway is created with a
@ -95,14 +101,23 @@ type Gateway struct {
PacerLoop wsutil.PacemakerLoop
ErrorLog func(err error) // default to log.Println
// FatalErrorCallback is called, if the Gateway exits fatally. At the point
// of calling, the gateway will be already closed.
//
// Currently this will only be called, if the ReconnectTimeout was changed
// to a definite timeout, and connection could not be established during
// that time.
// err will be ErrWSMaxTries in that case.
//
// Defaults to noop.
FatalErrorCallback func(err error)
// AfterClose is called after each close. Error can be non-nil, as this is
// called even when the Gateway is gracefully closed. It's used mainly for
// reconnections or any type of connection interruptions.
AfterClose func(err error) // noop by default
// Filled by methods, internal use
waitGroup *sync.WaitGroup
waitGroup sync.WaitGroup
}
// NewGatewayWithIntents creates a new Gateway with the given intents and the
@ -161,55 +176,48 @@ func (g *Gateway) AddIntents(i Intents) {
}
// Close closes the underlying Websocket connection.
func (g *Gateway) Close() (err error) {
wsutil.WSDebug("Trying to close.")
func (g *Gateway) Close() error {
wsutil.WSDebug("Trying to close. Pacemaker check skipped.")
// Check if the WS is already closed:
if g.PacerLoop.Stopped() {
wsutil.WSDebug("Gateway is already closed.")
return err
wsutil.WSDebug("Closing the Websocket...")
err := g.WS.Close()
if errors.Is(err, wsutil.ErrWebsocketClosed) {
wsutil.WSDebug("Websocket already closed.")
return nil
}
// Trigger the close callback on exit.
defer func() { g.AfterClose(err) }()
wsutil.WSDebug("Websocket closed; error:", err)
// If the pacemaker is running:
if !g.PacerLoop.Stopped() {
wsutil.WSDebug("Stopping pacemaker...")
// Stop the pacemaker and the event handler.
g.PacerLoop.Stop()
wsutil.WSDebug("Stopped pacemaker.")
}
wsutil.WSDebug("Closing the websocket...")
err = g.WS.Close()
wsutil.WSDebug("Waiting for WaitGroup to be done.")
// This should work, since Pacemaker should signal its loop to stop, which
// would also exit our event loop. Both would be 2.
wsutil.WSDebug("Waiting for the Pacemaker loop to exit.")
g.waitGroup.Wait()
wsutil.WSDebug("Pacemaker loop exited.")
g.AfterClose(err)
wsutil.WSDebug("AfterClose callback finished.")
wsutil.WSDebug("WaitGroup is done. Closing the websocket.")
return err
}
// Reconnect tries to reconnect forever. It will resume the connection if
// possible. If an Invalid Session is received, it will start a fresh one.
// Reconnect tries to reconnect until the ReconnectTimeout is reached, or if
// set to 0 reconnects indefinitely.
func (g *Gateway) Reconnect() {
for {
if err := g.ReconnectCtx(context.Background()); err != nil {
g.ErrorLog(err)
} else {
return
}
ctx := context.Background()
if g.ReconnectTimeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(context.Background(), g.WSTimeout)
defer cancel()
}
// ignore the error, it is already logged and FatalErrorCallback was called
g.ReconnectCtx(ctx)
}
// ReconnectCtx attempts to reconnect until context expires. If context cannot
// expire, then the gateway will try to reconnect forever.
// ReconnectCtx attempts to reconnect until context expires.
// If the context expires FatalErrorCallback will be called with ErrWSMaxTries,
// and the last error returned by Open will be returned.
func (g *Gateway) ReconnectCtx(ctx context.Context) (err error) {
wsutil.WSDebug("Reconnecting...")
@ -220,6 +228,7 @@ func (g *Gateway) ReconnectCtx(ctx context.Context) (err error) {
for i := 1; ; i++ {
select {
case <-ctx.Done():
g.FatalErrorCallback(ErrWSMaxTries)
return err
default:
}
@ -301,9 +310,6 @@ func (g *Gateway) start(ctx context.Context) error {
// This is where we'll get our events
ch := g.WS.Listen()
// Make a new WaitGroup for use in background loops:
g.waitGroup = new(sync.WaitGroup)
// Create a new Hello event and wait for it.
var hello HelloEvent
// Wait for an OP 10 Hello.

View file

@ -49,7 +49,7 @@ func TestIntegration(t *testing.T) {
}
wsutil.WSError = func(err error) {
t.Fatal(err)
t.Error(err)
}
var gateway *Gateway
@ -59,6 +59,7 @@ func TestIntegration(t *testing.T) {
if err != nil {
t.Fatal("Failed to make a Gateway:", err)
}
g.AddIntents(IntentGuilds)
g.AfterClose = func(err error) {
log.Println("Closed.")
}

View file

@ -81,7 +81,7 @@ func (p *Pacemaker) Dead() bool {
}
// Stop stops the pacemaker, or it does nothing if the pacemaker is not started.
func (p *Pacemaker) Stop() {
func (p *Pacemaker) StopTicker() {
p.ticker.Stop()
}
@ -106,60 +106,3 @@ func (p *Pacemaker) PaceCtx(ctx context.Context) error {
return nil
}
// func (p *Pacemaker) start() error {
// // Reset states to its old position.
// p.EchoBeat.Set(time.Time{})
// p.SentBeat.Set(time.Time{})
// // Create a new ticker.
// tick := time.NewTicker(p.Heartrate)
// defer tick.Stop()
// // Echo at least once
// p.Echo()
// for {
// if err := p.pace(); err != nil {
// return errors.Wrap(err, "failed to pace")
// }
// // Paced, save:
// p.SentBeat.Set(time.Now())
// if p.Dead() {
// return ErrDead
// }
// select {
// case <-p.stop:
// return nil
// case <-tick.C:
// }
// }
// }
// // StartAsync starts the pacemaker asynchronously. The WaitGroup is optional.
// func (p *Pacemaker) StartAsync(wg *sync.WaitGroup) (death chan error) {
// p.death = make(chan error)
// p.stop = make(chan struct{})
// p.once = sync.Once{}
// if wg != nil {
// wg.Add(1)
// }
// go func() {
// p.death <- p.start()
// // Debug.
// Debug("Pacemaker returned.")
// // Mark the pacemaker loop as done.
// if wg != nil {
// wg.Done()
// }
// }()
// return p.death
// }

View file

@ -6,6 +6,7 @@ import (
"context"
"io"
"net/http"
"strings"
"time"
"github.com/gorilla/websocket"
@ -52,7 +53,8 @@ type Connection interface {
// Conn is the default Websocket connection. It tries to compresses all payloads
// using zlib.
type Conn struct {
Dialer *websocket.Dialer
Dialer websocket.Dialer
Header http.Header
Conn *websocket.Conn
events chan Event
}
@ -61,7 +63,7 @@ var _ Connection = (*Conn)(nil)
// NewConn creates a new default websocket connection with a default dialer.
func NewConn() *Conn {
return NewConnWithDialer(&websocket.Dialer{
return NewConnWithDialer(websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: WSTimeout,
ReadBufferSize: CopyBufferSize,
@ -71,20 +73,20 @@ func NewConn() *Conn {
}
// NewConn creates a new default websocket connection with a custom dialer.
func NewConnWithDialer(dialer *websocket.Dialer) *Conn {
return &Conn{Dialer: dialer}
func NewConnWithDialer(dialer websocket.Dialer) *Conn {
return &Conn{
Dialer: dialer,
Header: http.Header{
"Accept-Encoding": {"zlib"},
},
}
}
func (c *Conn) Dial(ctx context.Context, addr string) (err error) {
// BUG which prevents stream compression.
// See https://github.com/golang/go/issues/31514.
// Enable compression:
headers := http.Header{
"Accept-Encoding": {"zlib"},
}
c.Conn, _, err = c.Dialer.DialContext(ctx, addr, headers)
c.Conn, _, err = c.Dialer.DialContext(ctx, addr, c.Header)
if err != nil {
return errors.Wrap(err, "failed to dial WS")
}
@ -120,6 +122,10 @@ func (c *Conn) Send(ctx context.Context, b []byte) error {
}
func (c *Conn) Close() error {
// Have a deadline before closing.
var deadline = time.Now().Add(5 * time.Second)
c.Conn.SetWriteDeadline(deadline)
// Close the WS.
err := c.Conn.Close()
@ -162,6 +168,12 @@ func startReadLoop(conn *websocket.Conn, eventCh chan<- Event) {
return
}
// Is the error an intentional close call? Go 1.16 exposes
// ErrClosing, but we have to do this for now.
if strings.HasSuffix(err.Error(), "use of closed network connection") {
return
}
// Check if the error is a normal one:
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
return

View file

@ -2,13 +2,11 @@ package wsutil
import (
"context"
"runtime/debug"
"time"
"github.com/pkg/errors"
"github.com/diamondburned/arikawa/v2/internal/heart"
"github.com/diamondburned/arikawa/v2/internal/moreatomic"
)
type errBrokenConnection struct {
@ -49,17 +47,11 @@ type EventLoopHandler interface {
// is a valid instance only when RunAsync is called first.
type PacemakerLoop struct {
heart.Pacemaker
running moreatomic.Bool
Extras ExtraHandlers
ErrorLog func(error)
stop chan struct{}
events <-chan Event
handler func(*OP) error
stack []byte
Extras ExtraHandlers
ErrorLog func(error)
}
func (p *PacemakerLoop) errorLog(err error) {
@ -76,22 +68,6 @@ func (p *PacemakerLoop) Pace(ctx context.Context) error {
return p.Pacemaker.PaceCtx(ctx)
}
// Stop stops the pacer loop. It does nothing if the loop is already stopped.
func (p *PacemakerLoop) Stop() {
if p.Stopped() {
return
}
// Despite p.running and p.stop being thread-safe on their own, this entire
// block is actually not thread-safe.
p.Pacemaker.Stop()
close(p.stop)
}
func (p *PacemakerLoop) Stopped() bool {
return p == nil || !p.running.Get()
}
func (p *PacemakerLoop) RunAsync(
heartrate time.Duration, evs <-chan Event, evl EventLoopHandler, exit func(error)) {
@ -100,27 +76,16 @@ func (p *PacemakerLoop) RunAsync(
p.Pacemaker = heart.NewPacemaker(heartrate, evl.HeartbeatCtx)
p.handler = evl.HandleOP
p.events = evs
p.stack = debug.Stack()
p.stop = make(chan struct{})
p.running.Set(true)
go func() {
exit(p.startLoop())
}()
go func() { exit(p.startLoop()) }()
}
func (p *PacemakerLoop) startLoop() error {
defer WSDebug("Pacemaker loop has exited.")
defer p.running.Set(false)
defer p.Pacemaker.Stop()
defer p.Pacemaker.StopTicker()
for {
select {
case <-p.stop:
WSDebug("Stop requested; exiting.")
return nil
case <-p.Pacemaker.Ticks:
if err := p.Pacemaker.Pace(); err != nil {
return errors.Wrap(err, "pace failed, reconnecting")

View file

@ -144,7 +144,7 @@ func (ws *Websocket) SendCtx(ctx context.Context, b []byte) error {
// Close closes the websocket connection. It assumes that the Websocket is
// closed even when it returns an error. If the Websocket was already closed
// before, nil will be returned.
// before, ErrWebsocketClosed will be returned.
func (ws *Websocket) Close() error {
WSDebug("Conn: Acquiring mutex lock to close...")
@ -160,7 +160,7 @@ func (ws *Websocket) Close() error {
// more information.
func (ws *Websocket) close() error {
if ws.closed {
return nil
return ErrWebsocketClosed
}
err := ws.conn.Close()

View file

@ -99,7 +99,7 @@ func (c *Gateway) OpenCtx(ctx context.Context) error {
// https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection
var endpoint = "wss://" + strings.TrimSuffix(c.state.Endpoint, ":80") + "/?v=" + Version
wsutil.WSDebug("Connecting to voice endpoint (endpoint=" + endpoint + ")")
wsutil.WSDebug("VoiceGateway: Connecting to voice endpoint (endpoint=" + endpoint + ")")
// Create a new context with a timeout for the connection.
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
@ -110,7 +110,7 @@ func (c *Gateway) OpenCtx(ctx context.Context) error {
return errors.Wrap(err, "failed to connect to voice gateway")
}
wsutil.WSDebug("Trying to start...")
wsutil.WSDebug("VoiceGateway: Trying to start...")
// Try to start or resume the connection.
if err := c.start(ctx); err != nil {
@ -123,12 +123,12 @@ func (c *Gateway) OpenCtx(ctx context.Context) error {
// Start .
func (c *Gateway) start(ctx context.Context) error {
if err := c.__start(ctx); err != nil {
wsutil.WSDebug("Start failed: ", err)
wsutil.WSDebug("VoiceGateway: Start failed: ", err)
// Close can be called with the mutex still acquired here, as the
// pacemaker hasn't started yet.
if err := c.Close(); err != nil {
wsutil.WSDebug("Failed to close after start fail: ", err)
wsutil.WSDebug("VoiceGateway: Failed to close after start fail: ", err)
}
return err
}
@ -144,7 +144,7 @@ func (c *Gateway) __start(ctx context.Context) error {
ch := c.WS.Listen()
// Wait for hello.
wsutil.WSDebug("Waiting for Hello..")
wsutil.WSDebug("VoiceGateway: Waiting for Hello..")
var hello *HelloEvent
// Wait for the Hello event; return if it times out.
@ -160,7 +160,7 @@ func (c *Gateway) __start(ctx context.Context) error {
return errors.Wrap(ctx.Err(), "failed to wait for Hello event")
}
wsutil.WSDebug("Received Hello")
wsutil.WSDebug("VoiceGateway: Received Hello")
// https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection
// Turns out Hello is sent right away on connection start.
@ -189,7 +189,7 @@ func (c *Gateway) __start(ctx context.Context) error {
c.EventLoop.RunAsync(hello.HeartbeatInterval.Duration(), ch, c, func(err error) {
c.waitGroup.Done() // mark so Close() can exit.
wsutil.WSDebug("Event loop stopped.")
wsutil.WSDebug("VoiceGateway: Event loop stopped.")
if err != nil {
c.ErrorLog(err)
@ -202,44 +202,32 @@ func (c *Gateway) __start(ctx context.Context) error {
}
})
wsutil.WSDebug("Started successfully.")
wsutil.WSDebug("VoiceGateway: Started successfully.")
return nil
}
// Close .
func (c *Gateway) Close() (err error) {
wsutil.WSDebug("Trying to close.")
// Close closes the underlying Websocket connection.
func (g *Gateway) Close() error {
wsutil.WSDebug("VoiceGateway: Trying to close. Pacemaker check skipped.")
// Check if the WS is already closed:
if c.EventLoop.Stopped() {
wsutil.WSDebug("Gateway is already closed.")
return err
wsutil.WSDebug("VoiceGateway: Closing the Websocket...")
err := g.WS.Close()
if errors.Is(err, wsutil.ErrWebsocketClosed) {
wsutil.WSDebug("VoiceGateway: Websocket already closed.")
return nil
}
// Trigger the close callback on exit.
defer func() { c.AfterClose(err) }()
wsutil.WSDebug("VoiceGateway: Websocket closed; error:", err)
// If the pacemaker is running:
if !c.EventLoop.Stopped() {
wsutil.WSDebug("Stopping pacemaker...")
wsutil.WSDebug("VoiceGateway: Waiting for the Pacemaker loop to exit.")
g.waitGroup.Wait()
wsutil.WSDebug("VoiceGateway: Pacemaker loop exited.")
// Stop the pacemaker and the event handler.
c.EventLoop.Stop()
g.AfterClose(err)
wsutil.WSDebug("VoiceGateway: AfterClose callback finished.")
wsutil.WSDebug("Stopped pacemaker.")
}
wsutil.WSDebug("Closing the websocket...")
err = c.WS.Close()
wsutil.WSDebug("Waiting for WaitGroup to be done.")
// This should work, since Pacemaker should signal its loop to stop, which
// would also exit our event loop. Both would be 2.
c.waitGroup.Wait()
wsutil.WSDebug("WaitGroup is done. Closing the websocket.")
return err
}
@ -248,7 +236,7 @@ func (c *Gateway) Reconnect() error {
}
func (c *Gateway) ReconnectCtx(ctx context.Context) error {
wsutil.WSDebug("Reconnecting...")
wsutil.WSDebug("VoiceGateway: Reconnecting...")
// TODO: implement a reconnect loop
@ -266,7 +254,7 @@ func (c *Gateway) ReconnectCtx(ctx context.Context) error {
return errors.Wrap(err, "failed to reopen gateway")
}
wsutil.WSDebug("Reconnected successfully.")
wsutil.WSDebug("VoiceGateway: Reconnected successfully.")
return nil
}