Websocket: Replaced Conn's mutices with channels

This commit is contained in:
diamondburned (Forefront) 2020-04-10 20:03:52 -07:00
parent aa53661b60
commit 06136b7d5f
5 changed files with 47 additions and 56 deletions

View File

@ -117,6 +117,9 @@ type CommandContext struct {
MethodName string
Command string // empty if Plumb
// Hidden is true if the method has a hidden nameflag.
Hidden bool
value reflect.Value // Func
event reflect.Type // gateway.*Event
method reflect.Method

View File

@ -145,6 +145,7 @@ func HandleOP(g *Gateway, op *OP) error {
case ReconnectOP:
// Server requests to reconnect, die and retry.
WSDebug("ReconnectOP received.")
return g.Reconnect()
case InvalidSessionOP:

View File

@ -104,6 +104,8 @@ func (p *Pacemaker) StartAsync(wg *sync.WaitGroup) (death chan error) {
go func() {
p.death <- p.start()
// Debug.
WSDebug("Pacemaker returned.")
// Mark the stop channel as nil, so later Close() calls won't block forever.
p.stop = nil
// Mark the pacemaker loop as done.

View File

@ -6,7 +6,6 @@ import (
"context"
"io"
"net/http"
"sync"
"time"
"github.com/diamondburned/arikawa/utils/json"
@ -35,8 +34,8 @@ type Connection interface {
Send([]byte) error
// Close should close the websocket connection. The connection will not be
// reused. Code should be sent as the status code for the close frame.
Close(code int) error
// reused.
Close() error
}
// Conn is the default Websocket connection. It compresses all payloads using
@ -46,7 +45,7 @@ type Conn struct {
json.Driver
dialer *websocket.Dialer
mut sync.RWMutex
// mut sync.RWMutex
events chan Event
// write channels
@ -87,9 +86,6 @@ func (c *Conn) Dial(ctx context.Context, addr string) error {
// "compress": {"zlib-stream"},
// })
c.mut.Lock()
defer c.mut.Unlock()
c.Conn, _, err = c.dialer.DialContext(ctx, addr, headers)
if err != nil {
return errors.Wrap(err, "Failed to dial WS")
@ -113,8 +109,8 @@ func (c *Conn) readLoop() {
// Acquire the read lock throughout the span of the loop. This would still
// allow Send to acquire another RLock, but wouldn't allow Close to
// prematurely exit, as Close acquires a write lock.
c.mut.RLock()
defer c.mut.RUnlock()
// c.mut.RLock()
// defer c.mut.RUnlock()
// Clean up the events channel in the end.
defer close(c.events)
@ -148,12 +144,24 @@ func (c *Conn) readLoop() {
}
func (c *Conn) writeLoop() {
c.mut.RLock()
defer c.mut.RUnlock()
// Closig c.writes would break the loop immediately.
for bytes := range c.writes {
c.errors <- c.Conn.WriteMessage(websocket.TextMessage, bytes)
}
// Quick deadline:
deadline := time.Now().Add(CloseDeadline)
// Make a closure message:
msg := websocket.FormatCloseMessage(websocket.CloseGoingAway, "")
// Send a close message before closing the connection. We're not error
// checking this because it's not important.
c.Conn.WriteControl(websocket.TextMessage, msg, deadline)
// Safe to close now.
c.errors <- c.Conn.Close()
close(c.errors)
}
func (c *Conn) handle() ([]byte, error) {
@ -201,10 +209,13 @@ func (c *Conn) handle() ([]byte, error) {
}
func (c *Conn) Send(b []byte) error {
c.mut.RLock()
defer c.mut.RUnlock()
// Don't send a nil byte slice. That would confuse the write loop.
if b == nil {
return nil
}
if c.Conn == nil {
// If websocket is already closed.
if c.writes == nil {
return errors.New("Websocket is closed.")
}
@ -212,52 +223,27 @@ func (c *Conn) Send(b []byte) error {
return <-c.errors
}
func (c *Conn) Close(code int) error {
// Wait for the read loop to exit at the end.
err := c.writeClose(code)
c.close()
return err
}
func (c *Conn) Close() error {
// Close c.writes. This should trigger the websocket to close itself.
close(c.writes)
func (c *Conn) writeClose(code int) error {
// Acquire a read lock instead, as the read and write loops are still alive.
c.mut.RLock()
defer c.mut.RUnlock()
// Wait for the write loop to exit by flusing the errors channel.
var err = <-c.errors
for range c.errors {
}
// Keep the current write channel so we can close them when we're done.
wr := c.writes
defer close(wr)
// Stop future sends before closing. Nil channels block forever.
c.writes = nil
c.errors = nil
// Quick deadline:
deadline := time.Now().Add(CloseDeadline)
// Make a closure message:
msg := websocket.FormatCloseMessage(code, "")
// Send a close message before closing the connection. We're not error
// checking this because it's not important.
c.Conn.WriteControl(websocket.TextMessage, msg, deadline)
// Safe to close now.
return c.Conn.Close()
}
func (c *Conn) close() {
// Flush all events:
// Flush all events before closing the channel. This will return as soon as
// c.events is closed, or after closed.
for range c.events {
}
// This blocks until the events channel is dead.
c.mut.Lock()
defer c.mut.Unlock()
// Clean up.
// Mark c.events as empty.
c.events = nil
// Mark c.Conn as empty.
c.Conn = nil
return err
}
// readAll reads bytes into an existing buffer, copy it over, then wipe the old

View File

@ -8,7 +8,6 @@ import (
"time"
"github.com/diamondburned/arikawa/utils/json"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
"golang.org/x/time/rate"
)
@ -74,7 +73,7 @@ func (ws *Websocket) Send(b []byte) error {
}
func (ws *Websocket) Close() error {
return ws.Conn.Close(websocket.CloseGoingAway)
return ws.Conn.Close()
}
func InjectValues(rawurl string, values url.Values) string {