mirror of
https://github.com/diamondburned/arikawa.git
synced 2025-07-23 13:20:51 +00:00
Compare commits
6 commits
9c1088bf7c
...
63310fdc95
Author | SHA1 | Date | |
---|---|---|---|
|
63310fdc95 | ||
|
607250ae55 | ||
|
f3372e016a | ||
|
2a2244c965 | ||
|
0ead315aa3 | ||
|
16c1658163 |
|
@ -21,7 +21,7 @@ var (
|
||||||
EndpointGatewayBot = EndpointGateway + "/bot"
|
EndpointGatewayBot = EndpointGateway + "/bot"
|
||||||
)
|
)
|
||||||
|
|
||||||
var UserAgent = "DiscordBot (https://github.com/diamondburned/arikawa/v2, v0.0.1)"
|
var UserAgent = "DiscordBot (https://github.com/diamondburned/arikawa/v2)"
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
*httputil.Client
|
*httputil.Client
|
||||||
|
|
|
@ -128,7 +128,7 @@ const (
|
||||||
OverwriteMember
|
OverwriteMember
|
||||||
)
|
)
|
||||||
|
|
||||||
// UnmarshalJSON unmarshals both a string-quoteed number and a regular number
|
// UnmarshalJSON unmarshalls both a string-quoted number and a regular number
|
||||||
// into OverwriteType. We need to do this because Discord is so bad that they
|
// into OverwriteType. We need to do this because Discord is so bad that they
|
||||||
// can't even handle 1s and 0s properly.
|
// can't even handle 1s and 0s properly.
|
||||||
func (otype *OverwriteType) UnmarshalJSON(b []byte) error {
|
func (otype *OverwriteType) UnmarshalJSON(b []byte) error {
|
||||||
|
|
|
@ -26,9 +26,8 @@ var (
|
||||||
EndpointGateway = api.Endpoint + "gateway"
|
EndpointGateway = api.Endpoint + "gateway"
|
||||||
EndpointGatewayBot = api.EndpointGateway + "/bot"
|
EndpointGatewayBot = api.EndpointGateway + "/bot"
|
||||||
|
|
||||||
Version = "6"
|
Version = api.Version
|
||||||
Encoding = "json"
|
Encoding = "json"
|
||||||
// Compress = "zlib-stream"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -79,6 +78,13 @@ func BotURL(token string) (*BotData, error) {
|
||||||
type Gateway struct {
|
type Gateway struct {
|
||||||
WS *wsutil.Websocket
|
WS *wsutil.Websocket
|
||||||
WSTimeout time.Duration
|
WSTimeout time.Duration
|
||||||
|
// ReconnectTimeout is the timeout used during reconnection.
|
||||||
|
// If the a connection to the gateway can't be established before the
|
||||||
|
// duration passes, the Gateway will be closed and FatalErrorCallback will
|
||||||
|
// be called.
|
||||||
|
//
|
||||||
|
// Setting this to 0 is equivalent to no timeout.
|
||||||
|
ReconnectTimeout time.Duration
|
||||||
|
|
||||||
// All events sent over are pointers to Event structs (structs suffixed with
|
// All events sent over are pointers to Event structs (structs suffixed with
|
||||||
// "Event"). This shouldn't be accessed if the Gateway is created with a
|
// "Event"). This shouldn't be accessed if the Gateway is created with a
|
||||||
|
@ -95,14 +101,23 @@ type Gateway struct {
|
||||||
PacerLoop wsutil.PacemakerLoop
|
PacerLoop wsutil.PacemakerLoop
|
||||||
|
|
||||||
ErrorLog func(err error) // default to log.Println
|
ErrorLog func(err error) // default to log.Println
|
||||||
|
// FatalErrorCallback is called, if the Gateway exits fatally. At the point
|
||||||
|
// of calling, the gateway will be already closed.
|
||||||
|
//
|
||||||
|
// Currently this will only be called, if the ReconnectTimeout was changed
|
||||||
|
// to a definite timeout, and connection could not be established during
|
||||||
|
// that time.
|
||||||
|
// err will be ErrWSMaxTries in that case.
|
||||||
|
//
|
||||||
|
// Defaults to noop.
|
||||||
|
FatalErrorCallback func(err error)
|
||||||
|
|
||||||
// AfterClose is called after each close. Error can be non-nil, as this is
|
// AfterClose is called after each close. Error can be non-nil, as this is
|
||||||
// called even when the Gateway is gracefully closed. It's used mainly for
|
// called even when the Gateway is gracefully closed. It's used mainly for
|
||||||
// reconnections or any type of connection interruptions.
|
// reconnections or any type of connection interruptions.
|
||||||
AfterClose func(err error) // noop by default
|
AfterClose func(err error) // noop by default
|
||||||
|
|
||||||
// Filled by methods, internal use
|
waitGroup sync.WaitGroup
|
||||||
waitGroup *sync.WaitGroup
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewGatewayWithIntents creates a new Gateway with the given intents and the
|
// NewGatewayWithIntents creates a new Gateway with the given intents and the
|
||||||
|
@ -161,55 +176,48 @@ func (g *Gateway) AddIntents(i Intents) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the underlying Websocket connection.
|
// Close closes the underlying Websocket connection.
|
||||||
func (g *Gateway) Close() (err error) {
|
func (g *Gateway) Close() error {
|
||||||
wsutil.WSDebug("Trying to close.")
|
wsutil.WSDebug("Trying to close. Pacemaker check skipped.")
|
||||||
|
|
||||||
// Check if the WS is already closed:
|
wsutil.WSDebug("Closing the Websocket...")
|
||||||
if g.PacerLoop.Stopped() {
|
err := g.WS.Close()
|
||||||
wsutil.WSDebug("Gateway is already closed.")
|
|
||||||
return err
|
if errors.Is(err, wsutil.ErrWebsocketClosed) {
|
||||||
|
wsutil.WSDebug("Websocket already closed.")
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Trigger the close callback on exit.
|
wsutil.WSDebug("Websocket closed; error:", err)
|
||||||
defer func() { g.AfterClose(err) }()
|
|
||||||
|
|
||||||
// If the pacemaker is running:
|
wsutil.WSDebug("Waiting for the Pacemaker loop to exit.")
|
||||||
if !g.PacerLoop.Stopped() {
|
|
||||||
wsutil.WSDebug("Stopping pacemaker...")
|
|
||||||
|
|
||||||
// Stop the pacemaker and the event handler.
|
|
||||||
g.PacerLoop.Stop()
|
|
||||||
|
|
||||||
wsutil.WSDebug("Stopped pacemaker.")
|
|
||||||
}
|
|
||||||
|
|
||||||
wsutil.WSDebug("Closing the websocket...")
|
|
||||||
err = g.WS.Close()
|
|
||||||
|
|
||||||
wsutil.WSDebug("Waiting for WaitGroup to be done.")
|
|
||||||
|
|
||||||
// This should work, since Pacemaker should signal its loop to stop, which
|
|
||||||
// would also exit our event loop. Both would be 2.
|
|
||||||
g.waitGroup.Wait()
|
g.waitGroup.Wait()
|
||||||
|
wsutil.WSDebug("Pacemaker loop exited.")
|
||||||
|
|
||||||
|
g.AfterClose(err)
|
||||||
|
wsutil.WSDebug("AfterClose callback finished.")
|
||||||
|
|
||||||
wsutil.WSDebug("WaitGroup is done. Closing the websocket.")
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reconnect tries to reconnect forever. It will resume the connection if
|
// Reconnect tries to reconnect until the ReconnectTimeout is reached, or if
|
||||||
// possible. If an Invalid Session is received, it will start a fresh one.
|
// set to 0 reconnects indefinitely.
|
||||||
func (g *Gateway) Reconnect() {
|
func (g *Gateway) Reconnect() {
|
||||||
for {
|
ctx := context.Background()
|
||||||
if err := g.ReconnectCtx(context.Background()); err != nil {
|
|
||||||
g.ErrorLog(err)
|
if g.ReconnectTimeout > 0 {
|
||||||
} else {
|
var cancel func()
|
||||||
return
|
ctx, cancel = context.WithTimeout(context.Background(), g.WSTimeout)
|
||||||
}
|
|
||||||
|
defer cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ignore the error, it is already logged and FatalErrorCallback was called
|
||||||
|
g.ReconnectCtx(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReconnectCtx attempts to reconnect until context expires. If context cannot
|
// ReconnectCtx attempts to reconnect until context expires.
|
||||||
// expire, then the gateway will try to reconnect forever.
|
// If the context expires FatalErrorCallback will be called with ErrWSMaxTries,
|
||||||
|
// and the last error returned by Open will be returned.
|
||||||
func (g *Gateway) ReconnectCtx(ctx context.Context) (err error) {
|
func (g *Gateway) ReconnectCtx(ctx context.Context) (err error) {
|
||||||
wsutil.WSDebug("Reconnecting...")
|
wsutil.WSDebug("Reconnecting...")
|
||||||
|
|
||||||
|
@ -220,6 +228,7 @@ func (g *Gateway) ReconnectCtx(ctx context.Context) (err error) {
|
||||||
for i := 1; ; i++ {
|
for i := 1; ; i++ {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
g.FatalErrorCallback(ErrWSMaxTries)
|
||||||
return err
|
return err
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
@ -301,9 +310,6 @@ func (g *Gateway) start(ctx context.Context) error {
|
||||||
// This is where we'll get our events
|
// This is where we'll get our events
|
||||||
ch := g.WS.Listen()
|
ch := g.WS.Listen()
|
||||||
|
|
||||||
// Make a new WaitGroup for use in background loops:
|
|
||||||
g.waitGroup = new(sync.WaitGroup)
|
|
||||||
|
|
||||||
// Create a new Hello event and wait for it.
|
// Create a new Hello event and wait for it.
|
||||||
var hello HelloEvent
|
var hello HelloEvent
|
||||||
// Wait for an OP 10 Hello.
|
// Wait for an OP 10 Hello.
|
||||||
|
|
|
@ -49,7 +49,7 @@ func TestIntegration(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
wsutil.WSError = func(err error) {
|
wsutil.WSError = func(err error) {
|
||||||
t.Fatal(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var gateway *Gateway
|
var gateway *Gateway
|
||||||
|
@ -59,6 +59,7 @@ func TestIntegration(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Failed to make a Gateway:", err)
|
t.Fatal("Failed to make a Gateway:", err)
|
||||||
}
|
}
|
||||||
|
g.AddIntents(IntentGuilds)
|
||||||
g.AfterClose = func(err error) {
|
g.AfterClose = func(err error) {
|
||||||
log.Println("Closed.")
|
log.Println("Closed.")
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,7 +81,7 @@ func (p *Pacemaker) Dead() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop stops the pacemaker, or it does nothing if the pacemaker is not started.
|
// Stop stops the pacemaker, or it does nothing if the pacemaker is not started.
|
||||||
func (p *Pacemaker) Stop() {
|
func (p *Pacemaker) StopTicker() {
|
||||||
p.ticker.Stop()
|
p.ticker.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,60 +106,3 @@ func (p *Pacemaker) PaceCtx(ctx context.Context) error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// func (p *Pacemaker) start() error {
|
|
||||||
// // Reset states to its old position.
|
|
||||||
// p.EchoBeat.Set(time.Time{})
|
|
||||||
// p.SentBeat.Set(time.Time{})
|
|
||||||
|
|
||||||
// // Create a new ticker.
|
|
||||||
// tick := time.NewTicker(p.Heartrate)
|
|
||||||
// defer tick.Stop()
|
|
||||||
|
|
||||||
// // Echo at least once
|
|
||||||
// p.Echo()
|
|
||||||
|
|
||||||
// for {
|
|
||||||
// if err := p.pace(); err != nil {
|
|
||||||
// return errors.Wrap(err, "failed to pace")
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Paced, save:
|
|
||||||
// p.SentBeat.Set(time.Now())
|
|
||||||
|
|
||||||
// if p.Dead() {
|
|
||||||
// return ErrDead
|
|
||||||
// }
|
|
||||||
|
|
||||||
// select {
|
|
||||||
// case <-p.stop:
|
|
||||||
// return nil
|
|
||||||
|
|
||||||
// case <-tick.C:
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // StartAsync starts the pacemaker asynchronously. The WaitGroup is optional.
|
|
||||||
// func (p *Pacemaker) StartAsync(wg *sync.WaitGroup) (death chan error) {
|
|
||||||
// p.death = make(chan error)
|
|
||||||
// p.stop = make(chan struct{})
|
|
||||||
// p.once = sync.Once{}
|
|
||||||
|
|
||||||
// if wg != nil {
|
|
||||||
// wg.Add(1)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// go func() {
|
|
||||||
// p.death <- p.start()
|
|
||||||
// // Debug.
|
|
||||||
// Debug("Pacemaker returned.")
|
|
||||||
|
|
||||||
// // Mark the pacemaker loop as done.
|
|
||||||
// if wg != nil {
|
|
||||||
// wg.Done()
|
|
||||||
// }
|
|
||||||
// }()
|
|
||||||
|
|
||||||
// return p.death
|
|
||||||
// }
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
|
@ -52,7 +53,8 @@ type Connection interface {
|
||||||
// Conn is the default Websocket connection. It tries to compresses all payloads
|
// Conn is the default Websocket connection. It tries to compresses all payloads
|
||||||
// using zlib.
|
// using zlib.
|
||||||
type Conn struct {
|
type Conn struct {
|
||||||
Dialer *websocket.Dialer
|
Dialer websocket.Dialer
|
||||||
|
Header http.Header
|
||||||
Conn *websocket.Conn
|
Conn *websocket.Conn
|
||||||
events chan Event
|
events chan Event
|
||||||
}
|
}
|
||||||
|
@ -61,7 +63,7 @@ var _ Connection = (*Conn)(nil)
|
||||||
|
|
||||||
// NewConn creates a new default websocket connection with a default dialer.
|
// NewConn creates a new default websocket connection with a default dialer.
|
||||||
func NewConn() *Conn {
|
func NewConn() *Conn {
|
||||||
return NewConnWithDialer(&websocket.Dialer{
|
return NewConnWithDialer(websocket.Dialer{
|
||||||
Proxy: http.ProxyFromEnvironment,
|
Proxy: http.ProxyFromEnvironment,
|
||||||
HandshakeTimeout: WSTimeout,
|
HandshakeTimeout: WSTimeout,
|
||||||
ReadBufferSize: CopyBufferSize,
|
ReadBufferSize: CopyBufferSize,
|
||||||
|
@ -71,20 +73,20 @@ func NewConn() *Conn {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConn creates a new default websocket connection with a custom dialer.
|
// NewConn creates a new default websocket connection with a custom dialer.
|
||||||
func NewConnWithDialer(dialer *websocket.Dialer) *Conn {
|
func NewConnWithDialer(dialer websocket.Dialer) *Conn {
|
||||||
return &Conn{Dialer: dialer}
|
return &Conn{
|
||||||
|
Dialer: dialer,
|
||||||
|
Header: http.Header{
|
||||||
|
"Accept-Encoding": {"zlib"},
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) Dial(ctx context.Context, addr string) (err error) {
|
func (c *Conn) Dial(ctx context.Context, addr string) (err error) {
|
||||||
// BUG which prevents stream compression.
|
// BUG which prevents stream compression.
|
||||||
// See https://github.com/golang/go/issues/31514.
|
// See https://github.com/golang/go/issues/31514.
|
||||||
|
|
||||||
// Enable compression:
|
c.Conn, _, err = c.Dialer.DialContext(ctx, addr, c.Header)
|
||||||
headers := http.Header{
|
|
||||||
"Accept-Encoding": {"zlib"},
|
|
||||||
}
|
|
||||||
|
|
||||||
c.Conn, _, err = c.Dialer.DialContext(ctx, addr, headers)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "failed to dial WS")
|
return errors.Wrap(err, "failed to dial WS")
|
||||||
}
|
}
|
||||||
|
@ -120,6 +122,10 @@ func (c *Conn) Send(ctx context.Context, b []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) Close() error {
|
func (c *Conn) Close() error {
|
||||||
|
// Have a deadline before closing.
|
||||||
|
var deadline = time.Now().Add(5 * time.Second)
|
||||||
|
c.Conn.SetWriteDeadline(deadline)
|
||||||
|
|
||||||
// Close the WS.
|
// Close the WS.
|
||||||
err := c.Conn.Close()
|
err := c.Conn.Close()
|
||||||
|
|
||||||
|
@ -162,6 +168,12 @@ func startReadLoop(conn *websocket.Conn, eventCh chan<- Event) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Is the error an intentional close call? Go 1.16 exposes
|
||||||
|
// ErrClosing, but we have to do this for now.
|
||||||
|
if strings.HasSuffix(err.Error(), "use of closed network connection") {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Check if the error is a normal one:
|
// Check if the error is a normal one:
|
||||||
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
||||||
return
|
return
|
||||||
|
|
|
@ -2,13 +2,11 @@ package wsutil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"runtime/debug"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/diamondburned/arikawa/v2/internal/heart"
|
"github.com/diamondburned/arikawa/v2/internal/heart"
|
||||||
"github.com/diamondburned/arikawa/v2/internal/moreatomic"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type errBrokenConnection struct {
|
type errBrokenConnection struct {
|
||||||
|
@ -49,17 +47,11 @@ type EventLoopHandler interface {
|
||||||
// is a valid instance only when RunAsync is called first.
|
// is a valid instance only when RunAsync is called first.
|
||||||
type PacemakerLoop struct {
|
type PacemakerLoop struct {
|
||||||
heart.Pacemaker
|
heart.Pacemaker
|
||||||
running moreatomic.Bool
|
Extras ExtraHandlers
|
||||||
|
ErrorLog func(error)
|
||||||
|
|
||||||
stop chan struct{}
|
|
||||||
events <-chan Event
|
events <-chan Event
|
||||||
handler func(*OP) error
|
handler func(*OP) error
|
||||||
|
|
||||||
stack []byte
|
|
||||||
|
|
||||||
Extras ExtraHandlers
|
|
||||||
|
|
||||||
ErrorLog func(error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PacemakerLoop) errorLog(err error) {
|
func (p *PacemakerLoop) errorLog(err error) {
|
||||||
|
@ -76,22 +68,6 @@ func (p *PacemakerLoop) Pace(ctx context.Context) error {
|
||||||
return p.Pacemaker.PaceCtx(ctx)
|
return p.Pacemaker.PaceCtx(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop stops the pacer loop. It does nothing if the loop is already stopped.
|
|
||||||
func (p *PacemakerLoop) Stop() {
|
|
||||||
if p.Stopped() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Despite p.running and p.stop being thread-safe on their own, this entire
|
|
||||||
// block is actually not thread-safe.
|
|
||||||
p.Pacemaker.Stop()
|
|
||||||
close(p.stop)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PacemakerLoop) Stopped() bool {
|
|
||||||
return p == nil || !p.running.Get()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PacemakerLoop) RunAsync(
|
func (p *PacemakerLoop) RunAsync(
|
||||||
heartrate time.Duration, evs <-chan Event, evl EventLoopHandler, exit func(error)) {
|
heartrate time.Duration, evs <-chan Event, evl EventLoopHandler, exit func(error)) {
|
||||||
|
|
||||||
|
@ -100,27 +76,16 @@ func (p *PacemakerLoop) RunAsync(
|
||||||
p.Pacemaker = heart.NewPacemaker(heartrate, evl.HeartbeatCtx)
|
p.Pacemaker = heart.NewPacemaker(heartrate, evl.HeartbeatCtx)
|
||||||
p.handler = evl.HandleOP
|
p.handler = evl.HandleOP
|
||||||
p.events = evs
|
p.events = evs
|
||||||
p.stack = debug.Stack()
|
|
||||||
p.stop = make(chan struct{})
|
|
||||||
|
|
||||||
p.running.Set(true)
|
go func() { exit(p.startLoop()) }()
|
||||||
|
|
||||||
go func() {
|
|
||||||
exit(p.startLoop())
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PacemakerLoop) startLoop() error {
|
func (p *PacemakerLoop) startLoop() error {
|
||||||
defer WSDebug("Pacemaker loop has exited.")
|
defer WSDebug("Pacemaker loop has exited.")
|
||||||
defer p.running.Set(false)
|
defer p.Pacemaker.StopTicker()
|
||||||
defer p.Pacemaker.Stop()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-p.stop:
|
|
||||||
WSDebug("Stop requested; exiting.")
|
|
||||||
return nil
|
|
||||||
|
|
||||||
case <-p.Pacemaker.Ticks:
|
case <-p.Pacemaker.Ticks:
|
||||||
if err := p.Pacemaker.Pace(); err != nil {
|
if err := p.Pacemaker.Pace(); err != nil {
|
||||||
return errors.Wrap(err, "pace failed, reconnecting")
|
return errors.Wrap(err, "pace failed, reconnecting")
|
||||||
|
|
|
@ -144,7 +144,7 @@ func (ws *Websocket) SendCtx(ctx context.Context, b []byte) error {
|
||||||
|
|
||||||
// Close closes the websocket connection. It assumes that the Websocket is
|
// Close closes the websocket connection. It assumes that the Websocket is
|
||||||
// closed even when it returns an error. If the Websocket was already closed
|
// closed even when it returns an error. If the Websocket was already closed
|
||||||
// before, nil will be returned.
|
// before, ErrWebsocketClosed will be returned.
|
||||||
func (ws *Websocket) Close() error {
|
func (ws *Websocket) Close() error {
|
||||||
WSDebug("Conn: Acquiring mutex lock to close...")
|
WSDebug("Conn: Acquiring mutex lock to close...")
|
||||||
|
|
||||||
|
@ -160,7 +160,7 @@ func (ws *Websocket) Close() error {
|
||||||
// more information.
|
// more information.
|
||||||
func (ws *Websocket) close() error {
|
func (ws *Websocket) close() error {
|
||||||
if ws.closed {
|
if ws.closed {
|
||||||
return nil
|
return ErrWebsocketClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
err := ws.conn.Close()
|
err := ws.conn.Close()
|
||||||
|
|
|
@ -99,7 +99,7 @@ func (c *Gateway) OpenCtx(ctx context.Context) error {
|
||||||
// https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection
|
// https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection
|
||||||
var endpoint = "wss://" + strings.TrimSuffix(c.state.Endpoint, ":80") + "/?v=" + Version
|
var endpoint = "wss://" + strings.TrimSuffix(c.state.Endpoint, ":80") + "/?v=" + Version
|
||||||
|
|
||||||
wsutil.WSDebug("Connecting to voice endpoint (endpoint=" + endpoint + ")")
|
wsutil.WSDebug("VoiceGateway: Connecting to voice endpoint (endpoint=" + endpoint + ")")
|
||||||
|
|
||||||
// Create a new context with a timeout for the connection.
|
// Create a new context with a timeout for the connection.
|
||||||
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
|
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
|
||||||
|
@ -110,7 +110,7 @@ func (c *Gateway) OpenCtx(ctx context.Context) error {
|
||||||
return errors.Wrap(err, "failed to connect to voice gateway")
|
return errors.Wrap(err, "failed to connect to voice gateway")
|
||||||
}
|
}
|
||||||
|
|
||||||
wsutil.WSDebug("Trying to start...")
|
wsutil.WSDebug("VoiceGateway: Trying to start...")
|
||||||
|
|
||||||
// Try to start or resume the connection.
|
// Try to start or resume the connection.
|
||||||
if err := c.start(ctx); err != nil {
|
if err := c.start(ctx); err != nil {
|
||||||
|
@ -123,12 +123,12 @@ func (c *Gateway) OpenCtx(ctx context.Context) error {
|
||||||
// Start .
|
// Start .
|
||||||
func (c *Gateway) start(ctx context.Context) error {
|
func (c *Gateway) start(ctx context.Context) error {
|
||||||
if err := c.__start(ctx); err != nil {
|
if err := c.__start(ctx); err != nil {
|
||||||
wsutil.WSDebug("Start failed: ", err)
|
wsutil.WSDebug("VoiceGateway: Start failed: ", err)
|
||||||
|
|
||||||
// Close can be called with the mutex still acquired here, as the
|
// Close can be called with the mutex still acquired here, as the
|
||||||
// pacemaker hasn't started yet.
|
// pacemaker hasn't started yet.
|
||||||
if err := c.Close(); err != nil {
|
if err := c.Close(); err != nil {
|
||||||
wsutil.WSDebug("Failed to close after start fail: ", err)
|
wsutil.WSDebug("VoiceGateway: Failed to close after start fail: ", err)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -144,7 +144,7 @@ func (c *Gateway) __start(ctx context.Context) error {
|
||||||
ch := c.WS.Listen()
|
ch := c.WS.Listen()
|
||||||
|
|
||||||
// Wait for hello.
|
// Wait for hello.
|
||||||
wsutil.WSDebug("Waiting for Hello..")
|
wsutil.WSDebug("VoiceGateway: Waiting for Hello..")
|
||||||
|
|
||||||
var hello *HelloEvent
|
var hello *HelloEvent
|
||||||
// Wait for the Hello event; return if it times out.
|
// Wait for the Hello event; return if it times out.
|
||||||
|
@ -160,7 +160,7 @@ func (c *Gateway) __start(ctx context.Context) error {
|
||||||
return errors.Wrap(ctx.Err(), "failed to wait for Hello event")
|
return errors.Wrap(ctx.Err(), "failed to wait for Hello event")
|
||||||
}
|
}
|
||||||
|
|
||||||
wsutil.WSDebug("Received Hello")
|
wsutil.WSDebug("VoiceGateway: Received Hello")
|
||||||
|
|
||||||
// https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection
|
// https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection
|
||||||
// Turns out Hello is sent right away on connection start.
|
// Turns out Hello is sent right away on connection start.
|
||||||
|
@ -189,7 +189,7 @@ func (c *Gateway) __start(ctx context.Context) error {
|
||||||
|
|
||||||
c.EventLoop.RunAsync(hello.HeartbeatInterval.Duration(), ch, c, func(err error) {
|
c.EventLoop.RunAsync(hello.HeartbeatInterval.Duration(), ch, c, func(err error) {
|
||||||
c.waitGroup.Done() // mark so Close() can exit.
|
c.waitGroup.Done() // mark so Close() can exit.
|
||||||
wsutil.WSDebug("Event loop stopped.")
|
wsutil.WSDebug("VoiceGateway: Event loop stopped.")
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.ErrorLog(err)
|
c.ErrorLog(err)
|
||||||
|
@ -202,44 +202,32 @@ func (c *Gateway) __start(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
wsutil.WSDebug("Started successfully.")
|
wsutil.WSDebug("VoiceGateway: Started successfully.")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close .
|
// Close closes the underlying Websocket connection.
|
||||||
func (c *Gateway) Close() (err error) {
|
func (g *Gateway) Close() error {
|
||||||
wsutil.WSDebug("Trying to close.")
|
wsutil.WSDebug("VoiceGateway: Trying to close. Pacemaker check skipped.")
|
||||||
|
|
||||||
// Check if the WS is already closed:
|
wsutil.WSDebug("VoiceGateway: Closing the Websocket...")
|
||||||
if c.EventLoop.Stopped() {
|
err := g.WS.Close()
|
||||||
wsutil.WSDebug("Gateway is already closed.")
|
|
||||||
return err
|
if errors.Is(err, wsutil.ErrWebsocketClosed) {
|
||||||
|
wsutil.WSDebug("VoiceGateway: Websocket already closed.")
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Trigger the close callback on exit.
|
wsutil.WSDebug("VoiceGateway: Websocket closed; error:", err)
|
||||||
defer func() { c.AfterClose(err) }()
|
|
||||||
|
|
||||||
// If the pacemaker is running:
|
wsutil.WSDebug("VoiceGateway: Waiting for the Pacemaker loop to exit.")
|
||||||
if !c.EventLoop.Stopped() {
|
g.waitGroup.Wait()
|
||||||
wsutil.WSDebug("Stopping pacemaker...")
|
wsutil.WSDebug("VoiceGateway: Pacemaker loop exited.")
|
||||||
|
|
||||||
// Stop the pacemaker and the event handler.
|
g.AfterClose(err)
|
||||||
c.EventLoop.Stop()
|
wsutil.WSDebug("VoiceGateway: AfterClose callback finished.")
|
||||||
|
|
||||||
wsutil.WSDebug("Stopped pacemaker.")
|
|
||||||
}
|
|
||||||
|
|
||||||
wsutil.WSDebug("Closing the websocket...")
|
|
||||||
err = c.WS.Close()
|
|
||||||
|
|
||||||
wsutil.WSDebug("Waiting for WaitGroup to be done.")
|
|
||||||
|
|
||||||
// This should work, since Pacemaker should signal its loop to stop, which
|
|
||||||
// would also exit our event loop. Both would be 2.
|
|
||||||
c.waitGroup.Wait()
|
|
||||||
|
|
||||||
wsutil.WSDebug("WaitGroup is done. Closing the websocket.")
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -248,7 +236,7 @@ func (c *Gateway) Reconnect() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Gateway) ReconnectCtx(ctx context.Context) error {
|
func (c *Gateway) ReconnectCtx(ctx context.Context) error {
|
||||||
wsutil.WSDebug("Reconnecting...")
|
wsutil.WSDebug("VoiceGateway: Reconnecting...")
|
||||||
|
|
||||||
// TODO: implement a reconnect loop
|
// TODO: implement a reconnect loop
|
||||||
|
|
||||||
|
@ -266,7 +254,7 @@ func (c *Gateway) ReconnectCtx(ctx context.Context) error {
|
||||||
return errors.Wrap(err, "failed to reopen gateway")
|
return errors.Wrap(err, "failed to reopen gateway")
|
||||||
}
|
}
|
||||||
|
|
||||||
wsutil.WSDebug("Reconnected successfully.")
|
wsutil.WSDebug("VoiceGateway: Reconnected successfully.")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue