From 6a029e0c8079f85212d63e1b742cd994dd37d110 Mon Sep 17 00:00:00 2001 From: Matthew Penner Date: Mon, 20 Apr 2020 11:52:07 -0600 Subject: [PATCH] Improvements to the way voice waits for events --- voice/commands.go | 8 +++++- voice/connection.go | 33 ++++++++++-------------- voice/op.go | 18 +++++-------- voice/udp.go | 8 +++--- voice/voice.go | 62 +++++++++++++++++---------------------------- 5 files changed, 54 insertions(+), 75 deletions(-) diff --git a/voice/commands.go b/voice/commands.go index 3bebf7e..bb94d3e 100644 --- a/voice/commands.go +++ b/voice/commands.go @@ -92,10 +92,16 @@ func (c *Connection) Speaking(s Speaking) error { // StopSpeaking stops speaking. // https://discordapp.com/developers/docs/topics/voice-connections#voice-data-interpolation -func (c *Connection) StopSpeaking() { +func (c *Connection) StopSpeaking() error { + if c.OpusSend == nil { + return ErrCannotSend + } + for i := 0; i < 5; i++ { c.OpusSend <- []byte{0xF8, 0xFF, 0xFE} } + + return nil } // OPCode 7 diff --git a/voice/connection.go b/voice/connection.go index 958cbd6..c223d76 100644 --- a/voice/connection.go +++ b/voice/connection.go @@ -66,6 +66,11 @@ type Connection struct { ready ReadyEvent sessionDescription SessionDescriptionEvent + // Operation Channels + helloChan chan bool + readyChan chan bool + sessionDescChan chan bool + // Filled by methods, internal use paceDeath chan error waitGroup *sync.WaitGroup @@ -82,11 +87,17 @@ func newConnection() *Connection { ErrorLog: defaultErrorHandler, AfterClose: func(error) {}, + + helloChan: make(chan bool), + readyChan: make(chan bool), + sessionDescChan: make(chan bool), } } // Open . func (c *Connection) Open() error { + // Having this acquire a lock might cause a problem if the `onVoiceStateUpdate` + // does not set a session id in time :/ c.mut.Lock() defer c.mut.Unlock() @@ -96,19 +107,11 @@ func (c *Connection) Open() error { return nil } - // Wait for the SessionID. - // TODO: Find better way to wait. - for i := 0; i < 20; i++ { - if c.SessionID != "" { - break - } - - time.Sleep(50 * time.Millisecond) - } - + // I doubt this would happen from my testing, but you never know. if c.SessionID == "" { return ErrNoSessionID } + WSDebug("Connection has a session id") // https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection endpoint := "wss://" + strings.TrimSuffix(c.Endpoint, ":80") + "/?v=" + Version @@ -175,16 +178,8 @@ func (c *Connection) start() error { go c.handleWS() // Wait for hello. - // TODO: Find better way to wait WSDebug("Waiting for Hello..") - for { - if c.hello.HeartbeatInterval == 0 { - time.Sleep(50 * time.Millisecond) - continue - } - - break - } + <-c.helloChan WSDebug("Received Hello") // Start the pacemaker with the heartrate received from Hello, after diff --git a/voice/op.go b/voice/op.go index d91e938..4660391 100644 --- a/voice/op.go +++ b/voice/op.go @@ -65,15 +65,7 @@ func HandleOP(c *Connection, op *OP) error { return errors.Wrap(err, "Failed to parse READY event") } - if err := c.udpOpen(); err != nil { - return errors.Wrap(err, "Failed to open UDP connection") - } - - if c.OpusSend == nil { - c.OpusSend = make(chan []byte) - } - - go c.opusSendLoop() + c.readyChan <- true // Gives information about the encryption mode and secret key for sending voice packets case SessionDescriptionOP: @@ -81,10 +73,11 @@ func HandleOP(c *Connection, op *OP) error { c.ErrorLog(errors.Wrap(err, "Failed to parse SESSION_DESCRIPTION")) } + c.sessionDescChan <- true + // Someone started or stopped speaking. case SpeakingOP: // ? - return nil // Heartbeat response from the server case HeartbeatAckOP: @@ -94,9 +87,10 @@ func HandleOP(c *Connection, op *OP) error { case HelloOP: // Decode the data. if err := c.Driver.Unmarshal(op.Data, &c.hello); err != nil { - c.ErrorLog(errors.Wrap(err, "Failed to parse SESSION_DESCRIPTION")) + c.ErrorLog(errors.Wrap(err, "Failed to parse HELLO")) } - return nil + + c.helloChan <- true // Server is saying the connection was resumed, no data here. case ResumedOP: diff --git a/voice/udp.go b/voice/udp.go index e666c36..88c86c0 100644 --- a/voice/udp.go +++ b/voice/udp.go @@ -83,10 +83,10 @@ func (c *Connection) udpOpen() error { return err } - // TODO: Wait until OP4 is received - // side note: you cannot just do a blocking loop as I've done before - // as this method is currently called inside of the event loop - // so for as long as it blocks no other events can be received + // Wait for session description. + WSDebug("Waiting for Session Description..") + <-c.sessionDescChan + WSDebug("Received Session Description") return nil } diff --git a/voice/voice.go b/voice/voice.go index d90c43f..cfcbdfb 100644 --- a/voice/voice.go +++ b/voice/voice.go @@ -4,7 +4,6 @@ package voice import ( "log" "sync" - "time" "github.com/diamondburned/arikawa/discord" "github.com/diamondburned/arikawa/gateway" @@ -30,14 +29,17 @@ var ( // similarly to log.Println(). WSDebug = func(v ...interface{}) {} - // ErrMissingForIdentify . + // ErrMissingForIdentify is an error when we are missing information to identify. ErrMissingForIdentify = errors.New("missing GuildID, UserID, SessionID, or Token for identify") - // ErrMissingForResume . + // ErrMissingForResume is an error when we are missing information to resume. ErrMissingForResume = errors.New("missing GuildID, SessionID, or Token for resuming") + + // ErrCannotSend is an error when audio is sent to a closed channel. + ErrCannotSend = errors.New("cannot send audio to closed channel") ) -// Voice . +// Voice represents a Voice Repository used for managing voice connections. type Voice struct { mut sync.RWMutex @@ -50,7 +52,7 @@ type Voice struct { ErrorLog func(err error) } -// NewVoice creates a new Voice repository. +// NewVoice creates a new Voice Repository. func NewVoice(s *state.State) *Voice { v := &Voice{ state: s, @@ -85,7 +87,7 @@ func (v *Voice) RemoveConnection(guildID discord.Snowflake) { delete(v.connections, guildID) } -// JoinChannel . +// JoinChannel joins the specified channel in the specified guild. func (v *Voice) JoinChannel(gID, cID discord.Snowflake, muted, deafened bool) (*Connection, error) { // Get the stored voice connection for the given guild. conn, ok := v.GetConnection(gID) @@ -126,41 +128,23 @@ func (v *Voice) JoinChannel(gID, cID discord.Snowflake, muted, deafened bool) (* return nil, errors.Wrap(err, "Failed to send Voice State Update event") } - // Wait until we are connected. + // Wait for ready event. WSDebug("Waiting for READY.") - - // TODO: Find better way to wait for ready event. - - // Check if the connection is opened. - for i := 0; i < 50; i++ { - if conn.WS != nil && conn.WS.Conn != nil { - break - } - - time.Sleep(50 * time.Millisecond) - } - - if conn.WS == nil || conn.WS.Conn == nil { - return nil, errors.Wrap(err, "Failed to wait for connection to open") - } - - for i := 0; i < 50; i++ { - if conn.ready.IP != "" { - break - } - - time.Sleep(50 * time.Millisecond) - } - - if conn.ready.IP == "" { - return nil, errors.New("failed to wait for ready event") - } - - if conn.udpConn == nil { - return nil, errors.New("udp connection is not open") - } - + <-conn.readyChan WSDebug("Received READY.") + // Open the UDP connection. + if err := conn.udpOpen(); err != nil { + return nil, errors.Wrap(err, "Failed to open UDP connection") + } + + // Make sure the OpusSend channel is set + if conn.OpusSend == nil { + conn.OpusSend = make(chan []byte) + } + + // Run the opus send loop. + go conn.opusSendLoop() + return conn, nil }