1
0
Fork 0
mirror of https://github.com/diamondburned/arikawa.git synced 2024-09-28 13:19:06 +00:00
arikawa/utils/ws/ws.go
diamondburned c8f72547f7 voice: Refactor and fix up
This commit refactors a lot of voice's internals to be more stable and
handle more edge cases from Discord's voice servers. It should result in
an overall more stable voice connection.

A few helper functions have been added into voice.Session. Some fields
will have been broken and changed to accomodate for the refactor, as
well.

Below are some commits that have been squashed in:

    voice: Fix Speaking() panic on closed
    voice: StopSpeaking should not error out
        The rationale is added as a comment into the Speaking() method.
    voice: Add TestKickedOut
    voice: Fix region change disconnecting
2022-01-18 21:35:55 -08:00

118 lines
3.1 KiB
Go

// Package wsutil provides abstractions around the Websocket, including rate
// limits.
package ws
import (
"context"
"log"
"sync"
"github.com/pkg/errors"
"golang.org/x/time/rate"
)
var (
// WSError is the default error handler
WSError = func(err error) { log.Println("Gateway error:", err) }
// WSDebug is used for extra debug logging. This is expected to behave
// similarly to log.Println().
WSDebug = func(v ...interface{}) {}
)
// Websocket is a wrapper around a websocket Conn with thread safety and rate
// limiting for sending and throttling.
type Websocket struct {
mutex sync.Mutex
conn Connection
addr string
// If you ever need access to these fields from outside the package, please
// open an issue. It might be worth it to refactor these out for distributed
// sharding.
sendLimiter *rate.Limiter
dialLimiter *rate.Limiter
}
// NewWebsocket creates a default Websocket with the given address.
func NewWebsocket(c Codec, addr string) *Websocket {
return NewCustomWebsocket(NewConn(c), addr)
}
// NewCustomWebsocket creates a new undialed Websocket.
func NewCustomWebsocket(conn Connection, addr string) *Websocket {
return &Websocket{
conn: conn,
addr: addr,
sendLimiter: NewSendLimiter(),
dialLimiter: NewDialLimiter(),
}
}
// Dial waits until the rate limiter allows then dials the websocket.
func (ws *Websocket) Dial(ctx context.Context) (<-chan Op, error) {
if err := ws.dialLimiter.Wait(ctx); err != nil {
// Expired, fatal error
return nil, errors.Wrap(err, "failed to wait for dial rate limiter")
}
ws.mutex.Lock()
defer ws.mutex.Unlock()
// Reset the send limiter.
// TODO: see if each limit only applies to one connection or not.
ws.sendLimiter = NewSendLimiter()
return ws.conn.Dial(ctx, ws.addr)
}
// Send sends b over the Websocket with a deadline. It closes the internal
// Websocket if the Send method errors out.
func (ws *Websocket) Send(ctx context.Context, b []byte) error {
WSDebug("Acquiring the websocket mutex for sending.")
ws.mutex.Lock()
sendLimiter := ws.sendLimiter
conn := ws.conn
ws.mutex.Unlock()
WSDebug("Waiting for the send rate limiter...")
if err := sendLimiter.Wait(ctx); err != nil {
WSDebug("Send rate limiter timed out.")
return errors.Wrap(err, "SendLimiter failed")
}
WSDebug("Send has passed the rate limiting.")
return conn.Send(ctx, b)
}
// Close closes the websocket connection. It assumes that the Websocket is
// closed even when it returns an error. If the Websocket was already closed
// before, ErrWebsocketClosed will be returned.
func (ws *Websocket) Close() error {
WSDebug("Conn: Acquiring mutex lock to close...")
ws.mutex.Lock()
defer ws.mutex.Unlock()
WSDebug("Conn: Write mutex acquired")
return ws.conn.Close(false)
}
// CloseGracefully is similar to Close, but a proper close frame is sent to
// Discord, invalidating the internal session ID and voiding resumes.
func (ws *Websocket) CloseGracefully() error {
WSDebug("Conn: Acquiring mutex lock to close...")
ws.mutex.Lock()
defer ws.mutex.Unlock()
WSDebug("Conn: Write mutex acquired")
return ws.conn.Close(true)
}