diff --git a/gateway/gateway.go b/gateway/gateway.go index 3c3664d..c7121cc 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -101,8 +101,7 @@ type Gateway struct { // reconnections or any type of connection interruptions. AfterClose func(err error) // noop by default - // Filled by methods, internal use - waitGroup *sync.WaitGroup + waitGroup sync.WaitGroup } // NewGatewayWithIntents creates a new Gateway with the given intents and the @@ -161,38 +160,26 @@ func (g *Gateway) AddIntents(i Intents) { } // Close closes the underlying Websocket connection. -func (g *Gateway) Close() (err error) { - wsutil.WSDebug("Trying to close.") +func (g *Gateway) Close() error { + wsutil.WSDebug("Trying to close. Pacemaker check skipped.") - // Check if the WS is already closed: - if g.PacerLoop.Stopped() { - wsutil.WSDebug("Gateway is already closed.") - return err + wsutil.WSDebug("Closing the Websocket...") + err := g.WS.Close() + + if errors.Is(err, wsutil.ErrWebsocketClosed) { + wsutil.WSDebug("Websocket already closed.") + return nil } - // Trigger the close callback on exit. - defer func() { g.AfterClose(err) }() + wsutil.WSDebug("Websocket closed; error:", err) - // If the pacemaker is running: - if !g.PacerLoop.Stopped() { - wsutil.WSDebug("Stopping pacemaker...") - - // Stop the pacemaker and the event handler. - g.PacerLoop.Stop() - - wsutil.WSDebug("Stopped pacemaker.") - } - - wsutil.WSDebug("Closing the websocket...") - err = g.WS.Close() - - wsutil.WSDebug("Waiting for WaitGroup to be done.") - - // This should work, since Pacemaker should signal its loop to stop, which - // would also exit our event loop. Both would be 2. + wsutil.WSDebug("Waiting for the Pacemaker loop to exit.") g.waitGroup.Wait() + wsutil.WSDebug("Pacemaker loop exited.") + + g.AfterClose(err) + wsutil.WSDebug("AfterClose callback finished.") - wsutil.WSDebug("WaitGroup is done. Closing the websocket.") return err } @@ -301,9 +288,6 @@ func (g *Gateway) start(ctx context.Context) error { // This is where we'll get our events ch := g.WS.Listen() - // Make a new WaitGroup for use in background loops: - g.waitGroup = new(sync.WaitGroup) - // Create a new Hello event and wait for it. var hello HelloEvent // Wait for an OP 10 Hello. diff --git a/internal/heart/heart.go b/internal/heart/heart.go index ec43eb0..73ae4e4 100644 --- a/internal/heart/heart.go +++ b/internal/heart/heart.go @@ -81,7 +81,7 @@ func (p *Pacemaker) Dead() bool { } // Stop stops the pacemaker, or it does nothing if the pacemaker is not started. -func (p *Pacemaker) Stop() { +func (p *Pacemaker) StopTicker() { p.ticker.Stop() } @@ -106,60 +106,3 @@ func (p *Pacemaker) PaceCtx(ctx context.Context) error { return nil } - -// func (p *Pacemaker) start() error { -// // Reset states to its old position. -// p.EchoBeat.Set(time.Time{}) -// p.SentBeat.Set(time.Time{}) - -// // Create a new ticker. -// tick := time.NewTicker(p.Heartrate) -// defer tick.Stop() - -// // Echo at least once -// p.Echo() - -// for { -// if err := p.pace(); err != nil { -// return errors.Wrap(err, "failed to pace") -// } - -// // Paced, save: -// p.SentBeat.Set(time.Now()) - -// if p.Dead() { -// return ErrDead -// } - -// select { -// case <-p.stop: -// return nil - -// case <-tick.C: -// } -// } -// } - -// // StartAsync starts the pacemaker asynchronously. The WaitGroup is optional. -// func (p *Pacemaker) StartAsync(wg *sync.WaitGroup) (death chan error) { -// p.death = make(chan error) -// p.stop = make(chan struct{}) -// p.once = sync.Once{} - -// if wg != nil { -// wg.Add(1) -// } - -// go func() { -// p.death <- p.start() -// // Debug. -// Debug("Pacemaker returned.") - -// // Mark the pacemaker loop as done. -// if wg != nil { -// wg.Done() -// } -// }() - -// return p.death -// } diff --git a/utils/wsutil/conn.go b/utils/wsutil/conn.go index 750a742..baf65fa 100644 --- a/utils/wsutil/conn.go +++ b/utils/wsutil/conn.go @@ -52,7 +52,8 @@ type Connection interface { // Conn is the default Websocket connection. It tries to compresses all payloads // using zlib. type Conn struct { - Dialer *websocket.Dialer + Dialer websocket.Dialer + Header http.Header Conn *websocket.Conn events chan Event } @@ -61,7 +62,7 @@ var _ Connection = (*Conn)(nil) // NewConn creates a new default websocket connection with a default dialer. func NewConn() *Conn { - return NewConnWithDialer(&websocket.Dialer{ + return NewConnWithDialer(websocket.Dialer{ Proxy: http.ProxyFromEnvironment, HandshakeTimeout: WSTimeout, ReadBufferSize: CopyBufferSize, @@ -71,20 +72,20 @@ func NewConn() *Conn { } // NewConn creates a new default websocket connection with a custom dialer. -func NewConnWithDialer(dialer *websocket.Dialer) *Conn { - return &Conn{Dialer: dialer} +func NewConnWithDialer(dialer websocket.Dialer) *Conn { + return &Conn{ + Dialer: dialer, + Header: http.Header{ + "Accept-Encoding": {"zlib"}, + }, + } } func (c *Conn) Dial(ctx context.Context, addr string) (err error) { // BUG which prevents stream compression. // See https://github.com/golang/go/issues/31514. - // Enable compression: - headers := http.Header{ - "Accept-Encoding": {"zlib"}, - } - - c.Conn, _, err = c.Dialer.DialContext(ctx, addr, headers) + c.Conn, _, err = c.Dialer.DialContext(ctx, addr, c.Header) if err != nil { return errors.Wrap(err, "failed to dial WS") } @@ -120,6 +121,10 @@ func (c *Conn) Send(ctx context.Context, b []byte) error { } func (c *Conn) Close() error { + // Have a deadline before closing. + var deadline = time.Now().Add(5 * time.Second) + c.Conn.SetWriteDeadline(deadline) + // Close the WS. err := c.Conn.Close() diff --git a/utils/wsutil/heart.go b/utils/wsutil/heart.go index 9020471..75f5772 100644 --- a/utils/wsutil/heart.go +++ b/utils/wsutil/heart.go @@ -2,13 +2,11 @@ package wsutil import ( "context" - "runtime/debug" "time" "github.com/pkg/errors" "github.com/diamondburned/arikawa/v2/internal/heart" - "github.com/diamondburned/arikawa/v2/internal/moreatomic" ) type errBrokenConnection struct { @@ -49,17 +47,11 @@ type EventLoopHandler interface { // is a valid instance only when RunAsync is called first. type PacemakerLoop struct { heart.Pacemaker - running moreatomic.Bool + Extras ExtraHandlers + ErrorLog func(error) - stop chan struct{} events <-chan Event handler func(*OP) error - - stack []byte - - Extras ExtraHandlers - - ErrorLog func(error) } func (p *PacemakerLoop) errorLog(err error) { @@ -76,22 +68,6 @@ func (p *PacemakerLoop) Pace(ctx context.Context) error { return p.Pacemaker.PaceCtx(ctx) } -// Stop stops the pacer loop. It does nothing if the loop is already stopped. -func (p *PacemakerLoop) Stop() { - if p.Stopped() { - return - } - - // Despite p.running and p.stop being thread-safe on their own, this entire - // block is actually not thread-safe. - p.Pacemaker.Stop() - close(p.stop) -} - -func (p *PacemakerLoop) Stopped() bool { - return p == nil || !p.running.Get() -} - func (p *PacemakerLoop) RunAsync( heartrate time.Duration, evs <-chan Event, evl EventLoopHandler, exit func(error)) { @@ -100,27 +76,16 @@ func (p *PacemakerLoop) RunAsync( p.Pacemaker = heart.NewPacemaker(heartrate, evl.HeartbeatCtx) p.handler = evl.HandleOP p.events = evs - p.stack = debug.Stack() - p.stop = make(chan struct{}) - p.running.Set(true) - - go func() { - exit(p.startLoop()) - }() + go func() { exit(p.startLoop()) }() } func (p *PacemakerLoop) startLoop() error { defer WSDebug("Pacemaker loop has exited.") - defer p.running.Set(false) - defer p.Pacemaker.Stop() + defer p.Pacemaker.StopTicker() for { select { - case <-p.stop: - WSDebug("Stop requested; exiting.") - return nil - case <-p.Pacemaker.Ticks: if err := p.Pacemaker.Pace(); err != nil { return errors.Wrap(err, "pace failed, reconnecting") diff --git a/utils/wsutil/ws.go b/utils/wsutil/ws.go index dfaa315..45f6341 100644 --- a/utils/wsutil/ws.go +++ b/utils/wsutil/ws.go @@ -144,7 +144,7 @@ func (ws *Websocket) SendCtx(ctx context.Context, b []byte) error { // Close closes the websocket connection. It assumes that the Websocket is // closed even when it returns an error. If the Websocket was already closed -// before, nil will be returned. +// before, ErrWebsocketClosed will be returned. func (ws *Websocket) Close() error { WSDebug("Conn: Acquiring mutex lock to close...") @@ -160,7 +160,7 @@ func (ws *Websocket) Close() error { // more information. func (ws *Websocket) close() error { if ws.closed { - return nil + return ErrWebsocketClosed } err := ws.conn.Close() diff --git a/voice/voicegateway/gateway.go b/voice/voicegateway/gateway.go index 6ba98e4..f7b83b0 100644 --- a/voice/voicegateway/gateway.go +++ b/voice/voicegateway/gateway.go @@ -99,7 +99,7 @@ func (c *Gateway) OpenCtx(ctx context.Context) error { // https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection var endpoint = "wss://" + strings.TrimSuffix(c.state.Endpoint, ":80") + "/?v=" + Version - wsutil.WSDebug("Connecting to voice endpoint (endpoint=" + endpoint + ")") + wsutil.WSDebug("VoiceGateway: Connecting to voice endpoint (endpoint=" + endpoint + ")") // Create a new context with a timeout for the connection. ctx, cancel := context.WithTimeout(ctx, c.Timeout) @@ -110,7 +110,7 @@ func (c *Gateway) OpenCtx(ctx context.Context) error { return errors.Wrap(err, "failed to connect to voice gateway") } - wsutil.WSDebug("Trying to start...") + wsutil.WSDebug("VoiceGateway: Trying to start...") // Try to start or resume the connection. if err := c.start(ctx); err != nil { @@ -123,12 +123,12 @@ func (c *Gateway) OpenCtx(ctx context.Context) error { // Start . func (c *Gateway) start(ctx context.Context) error { if err := c.__start(ctx); err != nil { - wsutil.WSDebug("Start failed: ", err) + wsutil.WSDebug("VoiceGateway: Start failed: ", err) // Close can be called with the mutex still acquired here, as the // pacemaker hasn't started yet. if err := c.Close(); err != nil { - wsutil.WSDebug("Failed to close after start fail: ", err) + wsutil.WSDebug("VoiceGateway: Failed to close after start fail: ", err) } return err } @@ -144,7 +144,7 @@ func (c *Gateway) __start(ctx context.Context) error { ch := c.WS.Listen() // Wait for hello. - wsutil.WSDebug("Waiting for Hello..") + wsutil.WSDebug("VoiceGateway: Waiting for Hello..") var hello *HelloEvent // Wait for the Hello event; return if it times out. @@ -160,7 +160,7 @@ func (c *Gateway) __start(ctx context.Context) error { return errors.Wrap(ctx.Err(), "failed to wait for Hello event") } - wsutil.WSDebug("Received Hello") + wsutil.WSDebug("VoiceGateway: Received Hello") // https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection // Turns out Hello is sent right away on connection start. @@ -189,7 +189,7 @@ func (c *Gateway) __start(ctx context.Context) error { c.EventLoop.RunAsync(hello.HeartbeatInterval.Duration(), ch, c, func(err error) { c.waitGroup.Done() // mark so Close() can exit. - wsutil.WSDebug("Event loop stopped.") + wsutil.WSDebug("VoiceGateway: Event loop stopped.") if err != nil { c.ErrorLog(err) @@ -202,44 +202,32 @@ func (c *Gateway) __start(ctx context.Context) error { } }) - wsutil.WSDebug("Started successfully.") + wsutil.WSDebug("VoiceGateway: Started successfully.") return nil } -// Close . -func (c *Gateway) Close() (err error) { - wsutil.WSDebug("Trying to close.") +// Close closes the underlying Websocket connection. +func (g *Gateway) Close() error { + wsutil.WSDebug("VoiceGateway: Trying to close. Pacemaker check skipped.") - // Check if the WS is already closed: - if c.EventLoop.Stopped() { - wsutil.WSDebug("Gateway is already closed.") - return err + wsutil.WSDebug("VoiceGateway: Closing the Websocket...") + err := g.WS.Close() + + if errors.Is(err, wsutil.ErrWebsocketClosed) { + wsutil.WSDebug("VoiceGateway: Websocket already closed.") + return nil } - // Trigger the close callback on exit. - defer func() { c.AfterClose(err) }() + wsutil.WSDebug("VoiceGateway: Websocket closed; error:", err) - // If the pacemaker is running: - if !c.EventLoop.Stopped() { - wsutil.WSDebug("Stopping pacemaker...") + wsutil.WSDebug("VoiceGateway: Waiting for the Pacemaker loop to exit.") + g.waitGroup.Wait() + wsutil.WSDebug("VoiceGateway: Pacemaker loop exited.") - // Stop the pacemaker and the event handler. - c.EventLoop.Stop() + g.AfterClose(err) + wsutil.WSDebug("VoiceGateway: AfterClose callback finished.") - wsutil.WSDebug("Stopped pacemaker.") - } - - wsutil.WSDebug("Closing the websocket...") - err = c.WS.Close() - - wsutil.WSDebug("Waiting for WaitGroup to be done.") - - // This should work, since Pacemaker should signal its loop to stop, which - // would also exit our event loop. Both would be 2. - c.waitGroup.Wait() - - wsutil.WSDebug("WaitGroup is done. Closing the websocket.") return err } @@ -248,7 +236,7 @@ func (c *Gateway) Reconnect() error { } func (c *Gateway) ReconnectCtx(ctx context.Context) error { - wsutil.WSDebug("Reconnecting...") + wsutil.WSDebug("VoiceGateway: Reconnecting...") // TODO: implement a reconnect loop @@ -266,7 +254,7 @@ func (c *Gateway) ReconnectCtx(ctx context.Context) error { return errors.Wrap(err, "failed to reopen gateway") } - wsutil.WSDebug("Reconnected successfully.") + wsutil.WSDebug("VoiceGateway: Reconnected successfully.") return nil }