From 575087634862d0d6348b578668ae144a830db091 Mon Sep 17 00:00:00 2001 From: "diamondburned (Forefront)" Date: Mon, 6 Apr 2020 14:03:08 -0700 Subject: [PATCH] Gateway: Fixed a race condition related to concurrent WS writes --- internal/wsutil/conn.go | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/internal/wsutil/conn.go b/internal/wsutil/conn.go index 3fc9342..1f8df41 100644 --- a/internal/wsutil/conn.go +++ b/internal/wsutil/conn.go @@ -51,6 +51,10 @@ type Conn struct { mut sync.RWMutex events chan Event + // write channels + writes chan []byte + errors chan error + buf bytes.Buffer // zlib *zlib.Inflator // zlib.NewReader @@ -68,6 +72,8 @@ func NewConn(driver json.Driver) *Conn { EnableCompression: true, }, events: make(chan Event), + writes: make(chan []byte), + errors: make(chan error), // zlib: zlib.NewInflator(), // buf: make([]byte, CopyBufferSize), } @@ -96,6 +102,11 @@ func (c *Conn) Dial(ctx context.Context, addr string) error { c.events = make(chan Event) go c.readLoop() + + c.writes = make(chan []byte) + c.errors = make(chan error) + go c.writeLoop() + return err } @@ -141,6 +152,15 @@ func (c *Conn) readLoop() { } } +func (c *Conn) writeLoop() { + c.mut.RLock() + defer c.mut.RUnlock() + + for bytes := range c.writes { + c.errors <- c.Conn.WriteMessage(websocket.TextMessage, bytes) + } +} + func (c *Conn) handle() ([]byte, error) { // skip message type t, r, err := c.Conn.NextReader() @@ -193,7 +213,8 @@ func (c *Conn) Send(b []byte) error { return errors.New("Websocket is closed.") } - return c.Conn.WriteMessage(websocket.TextMessage, b) + c.writes <- b + return <-c.errors } func (c *Conn) Close(code int) error { @@ -204,9 +225,18 @@ func (c *Conn) Close(code int) error { } func (c *Conn) writeClose(code int) error { + // Acquire a read lock instead, as the read and write loops are still alive. c.mut.RLock() defer c.mut.RUnlock() + // Keep the current write channel so we can close them when we're done. + wr := c.writes + defer close(wr) + + // Stop future sends before closing. Nil channels block forever. + c.writes = nil + c.errors = nil + // Quick deadline: deadline := time.Now().Add(CloseDeadline)