mirror of
https://github.com/diamondburned/arikawa.git
synced 2024-11-14 10:53:06 +00:00
119 lines
3.1 KiB
Go
119 lines
3.1 KiB
Go
|
// Package wsutil provides abstractions around the Websocket, including rate
|
||
|
// limits.
|
||
|
package ws
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"log"
|
||
|
"sync"
|
||
|
|
||
|
"github.com/pkg/errors"
|
||
|
"golang.org/x/time/rate"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
// WSError is the default error handler
|
||
|
WSError = func(err error) { log.Println("Gateway error:", err) }
|
||
|
// WSDebug is used for extra debug logging. This is expected to behave
|
||
|
// similarly to log.Println().
|
||
|
WSDebug = func(v ...interface{}) {}
|
||
|
)
|
||
|
|
||
|
// Websocket is a wrapper around a websocket Conn with thread safety and rate
|
||
|
// limiting for sending and throttling.
|
||
|
type Websocket struct {
|
||
|
mutex sync.Mutex
|
||
|
conn Connection
|
||
|
addr string
|
||
|
|
||
|
// If you ever need access to these fields from outside the package, please
|
||
|
// open an issue. It might be worth it to refactor these out for distributed
|
||
|
// sharding.
|
||
|
|
||
|
sendLimiter *rate.Limiter
|
||
|
dialLimiter *rate.Limiter
|
||
|
}
|
||
|
|
||
|
// NewWebsocket creates a default Websocket with the given address.
|
||
|
func NewWebsocket(c Codec, addr string) *Websocket {
|
||
|
return NewCustomWebsocket(NewConn(c), addr)
|
||
|
}
|
||
|
|
||
|
// NewCustomWebsocket creates a new undialed Websocket.
|
||
|
func NewCustomWebsocket(conn Connection, addr string) *Websocket {
|
||
|
return &Websocket{
|
||
|
conn: conn,
|
||
|
addr: addr,
|
||
|
|
||
|
sendLimiter: NewSendLimiter(),
|
||
|
dialLimiter: NewDialLimiter(),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Dial waits until the rate limiter allows then dials the websocket.
|
||
|
func (ws *Websocket) Dial(ctx context.Context) (<-chan Op, error) {
|
||
|
if err := ws.dialLimiter.Wait(ctx); err != nil {
|
||
|
// Expired, fatal error
|
||
|
return nil, errors.Wrap(err, "failed to wait for dial rate limiter")
|
||
|
}
|
||
|
|
||
|
ws.mutex.Lock()
|
||
|
defer ws.mutex.Unlock()
|
||
|
|
||
|
// Reset the send limiter.
|
||
|
// TODO: see if each limit only applies to one connection or not.
|
||
|
ws.sendLimiter = NewSendLimiter()
|
||
|
|
||
|
return ws.conn.Dial(ctx, ws.addr)
|
||
|
}
|
||
|
|
||
|
// Send sends b over the Websocket with a deadline. It closes the internal
|
||
|
// Websocket if the Send method errors out.
|
||
|
func (ws *Websocket) Send(ctx context.Context, b []byte) error {
|
||
|
WSDebug("Acquiring the websoccket mutex for sending.")
|
||
|
|
||
|
ws.mutex.Lock()
|
||
|
WSDebug("Mutex lock acquired.")
|
||
|
sendLimiter := ws.sendLimiter
|
||
|
conn := ws.conn
|
||
|
ws.mutex.Unlock()
|
||
|
|
||
|
WSDebug("Waiting for the send rate limiter...")
|
||
|
|
||
|
if err := sendLimiter.Wait(ctx); err != nil {
|
||
|
WSDebug("Send rate limiter timed out.")
|
||
|
return errors.Wrap(err, "SendLimiter failed")
|
||
|
}
|
||
|
|
||
|
WSDebug("Send has passed the rate limiting. Waiting on mutex.")
|
||
|
|
||
|
return conn.Send(ctx, b)
|
||
|
}
|
||
|
|
||
|
// Close closes the websocket connection. It assumes that the Websocket is
|
||
|
// closed even when it returns an error. If the Websocket was already closed
|
||
|
// before, ErrWebsocketClosed will be returned.
|
||
|
func (ws *Websocket) Close() error {
|
||
|
WSDebug("Conn: Acquiring mutex lock to close...")
|
||
|
|
||
|
ws.mutex.Lock()
|
||
|
defer ws.mutex.Unlock()
|
||
|
|
||
|
WSDebug("Conn: Write mutex acquired")
|
||
|
|
||
|
return ws.conn.Close(false)
|
||
|
}
|
||
|
|
||
|
// CloseGracefully is similar to Close, but a proper close frame is sent to
|
||
|
// Discord, invalidating the internal session ID and voiding resumes.
|
||
|
func (ws *Websocket) CloseGracefully() error {
|
||
|
WSDebug("Conn: Acquiring mutex lock to close...")
|
||
|
|
||
|
ws.mutex.Lock()
|
||
|
defer ws.mutex.Unlock()
|
||
|
|
||
|
WSDebug("Conn: Write mutex acquired")
|
||
|
|
||
|
return ws.conn.Close(true)
|
||
|
}
|