Gateway: Fixed a race condition related to concurrent WS writes

This commit is contained in:
diamondburned (Forefront) 2020-04-06 14:03:08 -07:00
parent 9fefba18f7
commit 5750876348
1 changed files with 31 additions and 1 deletions

View File

@ -51,6 +51,10 @@ type Conn struct {
mut sync.RWMutex mut sync.RWMutex
events chan Event events chan Event
// write channels
writes chan []byte
errors chan error
buf bytes.Buffer buf bytes.Buffer
// zlib *zlib.Inflator // zlib.NewReader // zlib *zlib.Inflator // zlib.NewReader
@ -68,6 +72,8 @@ func NewConn(driver json.Driver) *Conn {
EnableCompression: true, EnableCompression: true,
}, },
events: make(chan Event), events: make(chan Event),
writes: make(chan []byte),
errors: make(chan error),
// zlib: zlib.NewInflator(), // zlib: zlib.NewInflator(),
// buf: make([]byte, CopyBufferSize), // buf: make([]byte, CopyBufferSize),
} }
@ -96,6 +102,11 @@ func (c *Conn) Dial(ctx context.Context, addr string) error {
c.events = make(chan Event) c.events = make(chan Event)
go c.readLoop() go c.readLoop()
c.writes = make(chan []byte)
c.errors = make(chan error)
go c.writeLoop()
return err return err
} }
@ -141,6 +152,15 @@ func (c *Conn) readLoop() {
} }
} }
func (c *Conn) writeLoop() {
c.mut.RLock()
defer c.mut.RUnlock()
for bytes := range c.writes {
c.errors <- c.Conn.WriteMessage(websocket.TextMessage, bytes)
}
}
func (c *Conn) handle() ([]byte, error) { func (c *Conn) handle() ([]byte, error) {
// skip message type // skip message type
t, r, err := c.Conn.NextReader() t, r, err := c.Conn.NextReader()
@ -193,7 +213,8 @@ func (c *Conn) Send(b []byte) error {
return errors.New("Websocket is closed.") return errors.New("Websocket is closed.")
} }
return c.Conn.WriteMessage(websocket.TextMessage, b) c.writes <- b
return <-c.errors
} }
func (c *Conn) Close(code int) error { func (c *Conn) Close(code int) error {
@ -204,9 +225,18 @@ func (c *Conn) Close(code int) error {
} }
func (c *Conn) writeClose(code int) error { func (c *Conn) writeClose(code int) error {
// Acquire a read lock instead, as the read and write loops are still alive.
c.mut.RLock() c.mut.RLock()
defer c.mut.RUnlock() defer c.mut.RUnlock()
// Keep the current write channel so we can close them when we're done.
wr := c.writes
defer close(wr)
// Stop future sends before closing. Nil channels block forever.
c.writes = nil
c.errors = nil
// Quick deadline: // Quick deadline:
deadline := time.Now().Add(CloseDeadline) deadline := time.Now().Add(CloseDeadline)