diff --git a/discord/time.go b/discord/time.go index cc6c8f1..7648316 100644 --- a/discord/time.go +++ b/discord/time.go @@ -109,7 +109,9 @@ func (s Seconds) Duration() time.Duration { // -type Milliseconds int +// Milliseconds is in float64 because some Discord events return time with a +// trailing decimal. +type Milliseconds float64 func DurationToMilliseconds(dura time.Duration) Milliseconds { return Milliseconds(dura.Milliseconds()) @@ -120,5 +122,6 @@ func (ms Milliseconds) String() string { } func (ms Milliseconds) Duration() time.Duration { - return time.Duration(ms) * time.Millisecond + const f64ms = Milliseconds(time.Millisecond) + return time.Duration(ms * f64ms) } diff --git a/go.mod b/go.mod index 795de84..5ddaccf 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,10 @@ go 1.13 require ( github.com/gorilla/schema v1.1.0 github.com/gorilla/websocket v1.4.2 + github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect github.com/pkg/errors v0.9.1 github.com/sasha-s/go-csync v0.0.0-20160729053059-3bc6c8bdb3fa + github.com/sasha-s/go-deadlock v0.2.0 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 golang.org/x/net v0.0.0-20200202094626-16171245cfb2 // indirect golang.org/x/time v0.0.0-20191024005414-555d28b269f0 diff --git a/go.sum b/go.sum index 2f6ff15..eb097f4 100644 --- a/go.sum +++ b/go.sum @@ -2,10 +2,14 @@ github.com/gorilla/schema v1.1.0 h1:CamqUDOFUBqzrvxuz2vEwo8+SUdwsluFh7IlzJh30LY= github.com/gorilla/schema v1.1.0/go.mod h1:kgLaKoK1FELgZqMAVxx/5cbj0kT+57qxUrAlIO2eleU= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/sasha-s/go-csync v0.0.0-20160729053059-3bc6c8bdb3fa h1:xiD6U6h+QMkAwI195dFwdku2N+enlCy9XwFTnEXaCQo= github.com/sasha-s/go-csync v0.0.0-20160729053059-3bc6c8bdb3fa/go.mod h1:KKzWrLiWu6EpzxZBPmPisPgq6oL+do2yLa0C0BTx5fA= +github.com/sasha-s/go-deadlock v0.2.0 h1:lMqc+fUb7RrFS3gQLtoQsJ7/6TV/pAIFvBsqX73DK8Y= +github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= diff --git a/utils/heart/heart.go b/utils/heart/heart.go index 218dc4f..7b26972 100644 --- a/utils/heart/heart.go +++ b/utils/heart/heart.go @@ -2,7 +2,6 @@ package heart import ( - "log" "sync" "sync/atomic" "time" @@ -92,8 +91,6 @@ func (p *Pacemaker) Stop() { } func (p *Pacemaker) start() error { - log.Println("Heartbeat interval:", p.Heartrate) - // Reset states to its old position. p.EchoBeat.Set(time.Time{}) p.SentBeat.Set(time.Time{}) diff --git a/utils/wsutil/heart.go b/utils/wsutil/heart.go index 637ba48..aab4796 100644 --- a/utils/wsutil/heart.go +++ b/utils/wsutil/heart.go @@ -93,7 +93,7 @@ func (p *PacemakerLoop) Run() error { o, err := DecodeOP(ev) if err != nil { p.errorLog(errors.Wrap(err, "Failed to decode OP")) - return err + continue // ignore } // Check the events before handling. diff --git a/utils/wsutil/op.go b/utils/wsutil/op.go index ed080dc..09ee868 100644 --- a/utils/wsutil/op.go +++ b/utils/wsutil/op.go @@ -125,6 +125,10 @@ func (ex *ExtraHandlers) Add(check func(*OP) bool) (<-chan *OP, func()) { ex.mutex.Lock() defer ex.mutex.Unlock() + if ex.handlers == nil { + ex.handlers = make(map[uint32]*ExtraHandler, 1) + } + i := ex.serial ex.serial++ diff --git a/voice/connection.go b/voice/connection.go deleted file mode 100644 index dc757c8..0000000 --- a/voice/connection.go +++ /dev/null @@ -1,383 +0,0 @@ -// -// For the brave souls who get this far: You are the chosen ones, -// the valiant knights of programming who toil away, without rest, -// fixing our most awful code. To you, true saviors, kings of men, -// I say this: never gonna give you up, never gonna let you down, -// never gonna run around and desert you. Never gonna make you cry, -// never gonna say goodbye. Never gonna tell a lie and hurt you. -// - -package voice - -import ( - "context" - "net" - "strings" - "sync" - "time" - - "github.com/diamondburned/arikawa/discord" - "github.com/diamondburned/arikawa/gateway" - "github.com/diamondburned/arikawa/utils/heart" - "github.com/diamondburned/arikawa/utils/json" - "github.com/diamondburned/arikawa/utils/wsutil" - "github.com/pkg/errors" -) - -var ( - ErrNoSessionID = errors.New("no SessionID was received after 1 second") -) - -// Connection represents a Discord Voice Gateway connection. -type Connection struct { - json.Driver - mut sync.RWMutex - - SessionID string - Token string - Endpoint string - reconnection bool - - UserID discord.Snowflake - GuildID discord.Snowflake - ChannelID discord.Snowflake - - muted bool - deafened bool - speaking bool - - WS *wsutil.Websocket - WSTimeout time.Duration - - EventLoop *heart.PacemakerLoop - // Pacemaker *gateway.Pacemaker - - udpConn *net.UDPConn - OpusSend chan []byte - close chan struct{} - - // ErrorLog will be called when an error occurs (defaults to log.Println) - ErrorLog func(err error) - // AfterClose is called after each close. Error can be non-nil, as this is - // called even when the Gateway is gracefully closed. It's used mainly for - // reconnections or any type of connection interruptions. (defaults to noop) - AfterClose func(err error) - - // Stored Operations - hello HelloEvent - ready ReadyEvent - sessionDescription SessionDescriptionEvent - - // Operation Channels - helloChan chan bool - readyChan chan bool - sessionDescChan chan bool - - // Filled by methods, internal use - paceDeath chan error - waitGroup *sync.WaitGroup -} - -// newConnection . -func newConnection() *Connection { - return &Connection{ - Driver: json.Default{}, - - WSTimeout: WSTimeout, - - close: make(chan struct{}), - - ErrorLog: defaultErrorHandler, - AfterClose: func(error) {}, - - helloChan: make(chan bool), - readyChan: make(chan bool), - sessionDescChan: make(chan bool), - } -} - -// Open . -func (c *Connection) Open() error { - // Having this acquire a lock might cause a problem if the `onVoiceStateUpdate` - // does not set a session id in time :/ - c.mut.Lock() - defer c.mut.Unlock() - - // Check if the connection already has a websocket. - if c.WS != nil { - WSDebug("Connection already has an active websocket") - return nil - } - - // I doubt this would happen from my testing, but you never know. - if c.SessionID == "" { - return ErrNoSessionID - } - WSDebug("Connection has a session id") - - // https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection - endpoint := "wss://" + strings.TrimSuffix(c.Endpoint, ":80") + "/?v=" + Version - - WSDebug("Connecting to voice endpoint (endpoint=" + endpoint + ")") - - c.WS = wsutil.NewCustom(wsutil.NewConn(c.Driver), endpoint) - - // Create a new context with a timeout for the connection. - ctx, cancel := context.WithTimeout(context.Background(), c.WSTimeout) - defer cancel() - - // Connect to the Voice Gateway. - if err := c.WS.Dial(ctx); err != nil { - return errors.Wrap(err, "Failed to connect to Voice Gateway") - } - - WSDebug("Trying to start...") - - // Try to resume the connection - if err := c.Start(); err != nil { - return err - } - - return nil -} - -// Start . -func (c *Connection) Start() error { - if err := c.start(); err != nil { - WSDebug("Start failed: ", err) - - // Close can be called with the mutex still acquired here, as the - // pacemaker hasn't started yet. - if err := c.Close(); err != nil { - WSDebug("Failed to close after start fail: ", err) - } - return err - } - - return nil -} - -// start . -func (c *Connection) start() error { - // https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection - // Apparently we send an Identify or Resume once we are connected unlike the other gateway that - // waits for a Hello then sends an Identify or Resume. - if !c.reconnection { - if err := c.Identify(); err != nil { - return errors.Wrap(err, "Failed to identify") - } - } else { - if err := c.Resume(); err != nil { - return errors.Wrap(err, "Failed to resume") - } - } - c.reconnection = false - - // Make a new WaitGroup for use in background loops: - c.waitGroup = new(sync.WaitGroup) - - // Wait for hello. - WSDebug("Waiting for Hello..") - - _, err := AssertEvent(c, <-c.WS.Listen(), HelloOP, &c.hello) - if err != nil { - return errors.Wrap(err, "Error at Hello") - } - - WSDebug("Received Hello") - - // // Start the pacemaker with the heartrate received from Hello, after - // // initializing everything. This ensures we only heartbeat if the websocket - // // is authenticated. - // c.Pacemaker = &gateway.Pacemaker{ - // Heartrate: time.Duration(int(c.hello.HeartbeatInterval)) * time.Millisecond, - // Pace: c.Heartbeat, - // } - // // Pacemaker dies here, only when it's fatal. - // c.paceDeath = c.Pacemaker.StartAsync(c.waitGroup) - - // Start the event handler, which also handles the pacemaker death signal. - c.waitGroup.Add(1) - - // Calculate the heartrate. - heartrate := time.Duration(int(c.hello.HeartbeatInterval)) * time.Millisecond - - // Start the websocket handler. - go c.handleWS(heart.NewLoop(heartrate, c.WS.Listen(), c)) - - WSDebug("Started successfully.") - - return nil -} - -// Close . -func (c *Connection) Close() error { - if c.udpConn != nil { - WSDebug("Closing udp connection.") - close(c.close) - } - - // Check if the WS is already closed: - if c.waitGroup == nil && c.paceDeath == nil { - WSDebug("Gateway is already closed.") - - c.AfterClose(nil) - return nil - } - - // If the pacemaker is running: - if c.paceDeath != nil { - WSDebug("Stopping pacemaker...") - - // Stop the pacemaker and the event handler - // c.Pacemaker.Stop() - c.EventLoop.Stop() - - WSDebug("Stopped pacemaker.") - } - - WSDebug("Waiting for WaitGroup to be done.") - - // This should work, since Pacemaker should signal its loop to stop, which - // would also exit our event loop. Both would be 2. - c.waitGroup.Wait() - - // Mark g.waitGroup as empty: - c.waitGroup = nil - - WSDebug("WaitGroup is done. Closing the websocket.") - - err := c.WS.Close() - c.AfterClose(err) - return err -} - -func (c *Connection) Reconnect() { - WSDebug("Reconnecting...") - - // Guarantee the gateway is already closed. Ignore its error, as we're - // redialing anyway. - c.Close() - - c.mut.Lock() - c.reconnection = true - c.mut.Unlock() - - for i := 1; ; i++ { - WSDebug("Trying to dial, attempt #", i) - - // Condition: err == ErrInvalidSession: - // If the connection is rate limited (documented behavior): - // https://discordapp.com/developers/docs/topics/gateway#rate-limiting - - if err := c.Open(); err != nil { - c.ErrorLog(errors.Wrap(err, "Failed to open gateway")) - continue - } - - WSDebug("Started after attempt: ", i) - return - } -} - -func (c *Connection) Disconnect(g *gateway.Gateway) (err error) { - if c.SessionID != "" { - err = g.UpdateVoiceState(gateway.UpdateVoiceStateData{ - GuildID: c.GuildID, - ChannelID: nil, - SelfMute: true, - SelfDeaf: true, - }) - - c.SessionID = "" - } - - // We might want this error and the update voice state error - // but for now we will prioritize the voice state error - _ = c.Close() - - return -} - -func (c *Connection) HandleEvent(ev wsutil.Event) error { - return HandleEvent(c, ev) -} - -// handleWS . -func (c *Connection) handleWS(evl *heart.PacemakerLoop) { - c.EventLoop = evl - err := c.EventLoop.Run() - - c.waitGroup.Done() // mark so Close() can exit. - WSDebug("Event loop stopped.") - - if err != nil { - c.ErrorLog(err) - c.Reconnect() - // Reconnect should spawn another eventLoop in its Start function. - } -} - -// // eventLoop . -// func (c *Connection) eventLoop() error { -// ch := c.WS.Listen() - -// for { -// select { -// case err := <-c.paceDeath: -// // Got a paceDeath, we're exiting from here on out. -// c.paceDeath = nil // mark - -// if err == nil { -// WSDebug("Pacemaker stopped without errors.") -// // No error, just exit normally. -// return nil -// } - -// return errors.Wrap(err, "Pacemaker died, reconnecting") - -// case ev := <-ch: -// // Handle the event -// if err := HandleEvent(c, ev); err != nil { -// c.ErrorLog(errors.Wrap(err, "WS handler error")) -// } -// } -// } -// } - -// Send . -func (c *Connection) Send(code OPCode, v interface{}) error { - return c.send(code, v) -} - -// send . -func (c *Connection) send(code OPCode, v interface{}) error { - if c.WS == nil { - return errors.New("tried to send data to a connection without a Websocket") - } - - if c.WS.Conn == nil { - return errors.New("tried to send data to a connection with a closed Websocket") - } - - var op = OP{ - Code: code, - } - - if v != nil { - b, err := c.Driver.Marshal(v) - if err != nil { - return errors.Wrap(err, "Failed to encode v") - } - - op.Data = b - } - - b, err := c.Driver.Marshal(op) - if err != nil { - return errors.Wrap(err, "Failed to encode payload") - } - - // WS should already be thread-safe. - return c.WS.Send(b) -} diff --git a/voice/events.go b/voice/events.go deleted file mode 100644 index 9e9b416..0000000 --- a/voice/events.go +++ /dev/null @@ -1,74 +0,0 @@ -package voice - -import ( - "github.com/diamondburned/arikawa/gateway" -) - -// onVoiceStateUpdate receives VoiceStateUpdateEvents from the gateway -// to keep track of the current user's voice state. -func (v *Voice) onVoiceStateUpdate(e *gateway.VoiceStateUpdateEvent) { - // Get the current user. - me, err := v.state.Me() - if err != nil { - v.ErrorLog(err) - return - } - - // Ignore the event if it is an update from another user. - if me.ID != e.UserID { - return - } - - // Get the stored voice connection for the given guild. - conn, ok := v.GetConnection(e.GuildID) - - // Ignore if there is no connection for that guild. - if !ok { - return - } - - // Remove the connection if the current user has disconnected. - if e.ChannelID == 0 { - // TODO: Make sure connection is closed? - v.RemoveConnection(e.GuildID) - return - } - - // Update values on the connection. - conn.mut.Lock() - defer conn.mut.Unlock() - - conn.SessionID = e.SessionID - - conn.UserID = e.UserID - conn.ChannelID = e.ChannelID -} - -// onVoiceServerUpdate receives VoiceServerUpdateEvents from the gateway -// to manage the current user's voice connections. -func (v *Voice) onVoiceServerUpdate(e *gateway.VoiceServerUpdateEvent) { - // Get the stored voice connection for the given guild. - conn, ok := v.GetConnection(e.GuildID) - - // Ignore if there is no connection for that guild. - if !ok { - return - } - - // Ensure the connection is closed (has no effect if the connection is already closed) - conn.Close() - - // Update values on the connection. - conn.mut.Lock() - conn.Token = e.Token - conn.Endpoint = e.Endpoint - - conn.GuildID = e.GuildID - conn.mut.Unlock() - - // Open the voice connection. - if err := conn.Open(); err != nil { - v.ErrorLog(err) - return - } -} diff --git a/voice/integration_test.go b/voice/integration_test.go index dd2c2e5..2805df0 100644 --- a/voice/integration_test.go +++ b/voice/integration_test.go @@ -7,10 +7,15 @@ import ( "io" "log" "os" + "runtime" + "strconv" "testing" + "time" "github.com/diamondburned/arikawa/discord" "github.com/diamondburned/arikawa/state" + "github.com/diamondburned/arikawa/utils/wsutil" + "github.com/diamondburned/arikawa/voice/voicegateway" ) type testConfig struct { @@ -41,7 +46,7 @@ func mustConfig(t *testing.T) testConfig { } // file is only a few bytes lolmao -func nicoReader(t *testing.T) (read func() []byte) { +func nicoReadTo(t *testing.T, dst io.Writer) { f, err := os.Open("testdata/nico.dca") if err != nil { t.Fatal("Failed to open nico.dca:", err) @@ -53,22 +58,18 @@ func nicoReader(t *testing.T) (read func() []byte) { var lenbuf [4]byte - return func() []byte { + for { if _, err := io.ReadFull(f, lenbuf[:]); !catchRead(t, err) { - return nil + return } // Read the integer - framelen := int(binary.LittleEndian.Uint32(lenbuf[:])) + framelen := int64(binary.LittleEndian.Uint32(lenbuf[:])) - // Read exactly frame - frame := make([]byte, framelen) - - if _, err := io.ReadFull(f, frame); !catchRead(t, err) { - return nil + // Copy the frame. + if _, err := io.CopyN(dst, f, framelen); !catchRead(t, err) { + return } - - return frame } } @@ -87,9 +88,12 @@ func catchRead(t *testing.T, err error) bool { func TestIntegration(t *testing.T) { config := mustConfig(t) - WSDebug = func(v ...interface{}) { - log.Println(v...) + wsutil.WSDebug = func(v ...interface{}) { + _, file, line, _ := runtime.Caller(1) + caller := file + ":" + strconv.Itoa(line) + log.Println(append([]interface{}{caller}, v...)...) } + // heart.Debug = func(v ...interface{}) { // log.Println(append([]interface{}{"Pacemaker:"}, v...)...) // } @@ -104,6 +108,7 @@ func TestIntegration(t *testing.T) { if err := s.Open(); err != nil { t.Fatal("Failed to connect:", err) } + defer s.Close() // Validate the given voice channel. c, err := s.Channel(config.VoiceChID) @@ -114,31 +119,49 @@ func TestIntegration(t *testing.T) { t.Fatal("Channel isn't a guild voice channel.") } - conn, err := v.JoinChannel(c.GuildID, c.ID, false, false) + // Grab a timer to benchmark things. + finish := timer() + + // Join the voice channel. + vs, err := v.JoinChannel(c.GuildID, c.ID, false, false) if err != nil { t.Fatal("Failed to join channel:", err) } + defer func() { + log.Println("Disconnecting from the voice channel.") + if err := vs.Disconnect(); err != nil { + t.Fatal("Failed to disconnect:", err) + } + }() - // Grab the file in the local test data. - read := nicoReader(t) + finish("joining the voice channel") // Trigger speaking. - if err := conn.Speaking(Microphone); err != nil { + if err := vs.Speaking(voicegateway.Microphone); err != nil { t.Fatal("Failed to start speaking:", err) } + defer func() { + log.Println("Stopping speaking.") // sounds grammatically wrong + if err := vs.StopSpeaking(); err != nil { + t.Fatal("Failed to stop speaking:", err) + } + }() + + finish("sending the speaking command") // Copy the audio? - for bytes := read(); bytes != nil; bytes = read() { - conn.OpusSend <- bytes - // conn.Write(bytes) - } + nicoReadTo(t, vs) - // Finish speaking. - if err := conn.StopSpeaking(); err != nil { - t.Fatal("Failed to stop speaking:", err) - } + finish("copying the audio") +} - if err := conn.Disconnect(s.Gateway); err != nil { - t.Fatal("Failed to disconnect:", err) +// simple shitty benchmark thing +func timer() func(finished string) { + var then = time.Now() + + return func(finished string) { + now := time.Now() + log.Println("Finished", finished+", took", now.Sub(then)) + then = now } } diff --git a/voice/op.go b/voice/op.go deleted file mode 100644 index 8270036..0000000 --- a/voice/op.go +++ /dev/null @@ -1,124 +0,0 @@ -package voice - -import ( - "fmt" - - "github.com/diamondburned/arikawa/utils/json" - "github.com/diamondburned/arikawa/utils/wsutil" - "github.com/pkg/errors" -) - -// OPCode represents a Discord Voice Gateway operation code. -type OPCode uint8 - -const ( - IdentifyOP OPCode = 0 // send - SelectProtocolOP OPCode = 1 // send - ReadyOP OPCode = 2 // receive - HeartbeatOP OPCode = 3 // send - SessionDescriptionOP OPCode = 4 // receive - SpeakingOP OPCode = 5 // send/receive - HeartbeatAckOP OPCode = 6 // receive - ResumeOP OPCode = 7 // send - HelloOP OPCode = 8 // receive - ResumedOP OPCode = 9 // receive - // ClientDisconnectOP OPCode = 13 // receive -) - -// OP represents a Discord Voice Gateway operation. -type OP struct { - Code OPCode `json:"op"` - Data json.Raw `json:"d,omitempty"` -} - -func HandleEvent(c *Connection, ev wsutil.Event) error { - o, err := DecodeOP(c.Driver, ev) - if err != nil { - return err - } - - return HandleOP(c, o) -} - -func AssertEvent(driver json.Driver, ev wsutil.Event, code OPCode, v interface{}) (*OP, error) { - op, err := DecodeOP(driver, ev) - if err != nil { - return nil, err - } - - if op.Code != code { - return op, fmt.Errorf( - "Unexpected OP Code: %d, expected %d (%s)", - op.Code, code, op.Data, - ) - } - - if err := driver.Unmarshal(op.Data, v); err != nil { - return op, errors.Wrap(err, "Failed to decode data") - } - - return op, nil -} - -func DecodeOP(driver json.Driver, ev wsutil.Event) (*OP, error) { - if ev.Error != nil { - return nil, ev.Error - } - - if len(ev.Data) == 0 { - return nil, errors.New("Empty payload") - } - - var op *OP - if err := driver.Unmarshal(ev.Data, &op); err != nil { - return nil, errors.Wrap(err, "OP error: "+string(ev.Data)) - } - - return op, nil -} - -func HandleOP(c *Connection, op *OP) error { - switch op.Code { - // Gives information required to make a UDP connection - case ReadyOP: - if err := c.Driver.Unmarshal(op.Data, &c.ready); err != nil { - return errors.Wrap(err, "Failed to parse READY event") - } - - c.readyChan <- true - - // Gives information about the encryption mode and secret key for sending voice packets - case SessionDescriptionOP: - if err := c.Driver.Unmarshal(op.Data, &c.sessionDescription); err != nil { - c.ErrorLog(errors.Wrap(err, "Failed to parse SESSION_DESCRIPTION")) - } - - c.sessionDescChan <- true - - // Someone started or stopped speaking. - case SpeakingOP: - // ? - - // Heartbeat response from the server - case HeartbeatAckOP: - c.EventLoop.Echo() - - // Hello server, we hear you! :) - case HelloOP: - // Decode the data. - if err := c.Driver.Unmarshal(op.Data, &c.hello); err != nil { - c.ErrorLog(errors.Wrap(err, "Failed to parse HELLO")) - } - - c.helloChan <- true - - // Server is saying the connection was resumed, no data here. - case ResumedOP: - WSDebug("Voice connection has been resumed") - - default: - return fmt.Errorf("unknown OP code %d", op.Code) - } - - return nil -} diff --git a/voice/session.go b/voice/session.go new file mode 100644 index 0000000..433b763 --- /dev/null +++ b/voice/session.go @@ -0,0 +1,258 @@ +package voice + +import ( + "sync" + + "github.com/diamondburned/arikawa/discord" + "github.com/diamondburned/arikawa/gateway" + "github.com/diamondburned/arikawa/session" + "github.com/diamondburned/arikawa/utils/moreatomic" + "github.com/diamondburned/arikawa/utils/wsutil" + "github.com/diamondburned/arikawa/voice/udp" + "github.com/diamondburned/arikawa/voice/voicegateway" + "github.com/pkg/errors" +) + +const Protocol = "xsalsa20_poly1305" + +var OpusSilence = [...]byte{0xF8, 0xFF, 0xFE} + +type Session struct { + session *session.Session + state voicegateway.State + + ErrorLog func(err error) + + // Filled by events. + // sessionID string + // token string + // endpoint string + + // joining determines the behavior of incoming event callbacks (Update). + // If this is true, incoming events will just send into Updated channels. If + // false, events will trigger a reconnection. + joining moreatomic.Bool + incoming chan struct{} // used only when joining == true + + mut sync.RWMutex + + // TODO: expose getters mutex-guarded. + gateway *voicegateway.Gateway + voiceUDP *udp.Connection + + muted bool + deafened bool + speaking bool +} + +func NewSession(ses *session.Session, userID discord.Snowflake) *Session { + return &Session{ + session: ses, + state: voicegateway.State{ + UserID: userID, + }, + ErrorLog: func(err error) {}, + incoming: make(chan struct{}), + } +} + +func (s *Session) UpdateServer(ev *gateway.VoiceServerUpdateEvent) { + // If this is true, then mutex is acquired already. + if s.joining.Get() { + s.state.Endpoint = ev.Endpoint + s.state.Token = ev.Token + + s.incoming <- struct{}{} + return + } + + // Reconnect. + s.mut.Lock() + defer s.mut.Unlock() + + s.state.Endpoint = ev.Endpoint + s.state.Token = ev.Token + + if err := s.reconnect(); err != nil { + s.ErrorLog(errors.Wrap(err, "Failed to reconnect after voice server update")) + } +} + +func (s *Session) UpdateState(ev *gateway.VoiceStateUpdateEvent) { + if s.state.UserID != ev.UserID { + // Not our state. + return + } + + // If this is true, then mutex is acquired already. + if s.joining.Get() { + s.state.SessionID = ev.SessionID + s.state.ChannelID = ev.ChannelID + + s.incoming <- struct{}{} + return + } +} + +func (s *Session) JoinChannel(gID, cID discord.Snowflake, muted, deafened bool) error { + // Acquire the mutex during join, locking during IO as well. + s.mut.Lock() + defer s.mut.Unlock() + + // Set that we're joining. + s.joining.Set(true) + defer s.joining.Set(false) // reset when done + + // ensure gateeway and voiceUDP is already closed. + s.ensureClosed() + + // Set the state. + s.state.ChannelID = cID + s.state.GuildID = gID + + s.muted = muted + s.deafened = deafened + s.speaking = false + + // Ensure that if `cID` is zero that it passes null to the update event. + var channelID *discord.Snowflake + if cID.Valid() { + channelID = &cID + } + + // https://discordapp.com/developers/docs/topics/voice-connections#retrieving-voice-server-information + // Send a Voice State Update event to the gateway. + err := s.session.Gateway.UpdateVoiceState(gateway.UpdateVoiceStateData{ + GuildID: gID, + ChannelID: channelID, + SelfMute: muted, + SelfDeaf: deafened, + }) + if err != nil { + return errors.Wrap(err, "Failed to send Voice State Update event") + } + + // Wait for replies. The above command should reply with these 2 events. + <-s.incoming + <-s.incoming + + // These 2 methods should've updated s.state before sending into these + // channels. Since s.state is already filled, we can go ahead and connect. + + return s.reconnect() +} + +// reconnect uses the current state to reconnect to a new gateway and UDP +// connection. +func (s *Session) reconnect() (err error) { + s.gateway = voicegateway.New(s.state) + + // Open the voice gateway. The function will block until Ready is received. + if err := s.gateway.Open(); err != nil { + return errors.Wrap(err, "Failed to open voice gateway") + } + + // Get the Ready event. + voiceReady := s.gateway.Ready() + + // Prepare the UDP voice connection. + s.voiceUDP, err = udp.DialConnection(voiceReady.Addr(), voiceReady.SSRC) + if err != nil { + return errors.Wrap(err, "Failed to open voice UDP connection") + } + + // Get the session description from the voice gateway. + d, err := s.gateway.SessionDescription(voicegateway.SelectProtocol{ + Protocol: "udp", + Data: voicegateway.SelectProtocolData{ + Address: s.voiceUDP.GatewayIP, + Port: s.voiceUDP.GatewayPort, + Mode: Protocol, + }, + }) + if err != nil { + return errors.Wrap(err, "Failed to select protocol") + } + + // Start the UDP loop. + go s.voiceUDP.Start(&d.SecretKey) + + return nil +} + +// Speaking tells Discord we're speaking. This calls +// (*voicegateway.Gateway).Speaking(). +func (s *Session) Speaking(flag voicegateway.SpeakingFlag) error { + // TODO: maybe we don't need to mutex protect IO. + s.mut.RLock() + defer s.mut.RUnlock() + + return s.gateway.Speaking(flag) +} + +func (s *Session) StopSpeaking() error { + // Send 5 frames of silence. + for i := 0; i < 5; i++ { + if _, err := s.Write(OpusSilence[:]); err != nil { + return errors.Wrapf(err, "Failed to send frame %d", i) + } + } + return nil +} + +func (s *Session) Write(b []byte) (int, error) { + s.mut.RLock() + defer s.mut.RUnlock() + + if s.voiceUDP == nil { + return 0, ErrCannotSend + } + return s.voiceUDP.Write(b) +} + +func (s *Session) Disconnect() error { + s.mut.Lock() + defer s.mut.Unlock() + + // If we're already closed. + if s.gateway == nil && s.voiceUDP == nil { + return nil + } + + // Notify Discord that we're leaving. This will send a + // VoiceStateUpdateEvent, in which our handler will promptly remove the + // session from the map. + + err := s.session.Gateway.UpdateVoiceState(gateway.UpdateVoiceStateData{ + GuildID: s.state.GuildID, + ChannelID: nil, + SelfMute: true, + SelfDeaf: true, + }) + + s.ensureClosed() + // wrap returns nil if err is nil + return errors.Wrap(err, "Failed to update voice state") +} + +// close ensures everything is closed. It does not acquire the mutex. +func (s *Session) ensureClosed() { + // If we're already closed. + if s.gateway == nil && s.voiceUDP == nil { + return + } + + // Disconnect the UDP connection. + if s.voiceUDP != nil { + s.voiceUDP.Close() + s.voiceUDP = nil + } + + // Disconnect the voice gateway, ignoring the error. + if s.gateway != nil { + if err := s.gateway.Close(); err != nil { + wsutil.WSDebug("Uncaught voice gateway close error:", err) + } + s.gateway = nil + } +} diff --git a/voice/udp.go b/voice/udp.go deleted file mode 100644 index 88c86c0..0000000 --- a/voice/udp.go +++ /dev/null @@ -1,143 +0,0 @@ -package voice - -import ( - "encoding/binary" - "net" - "strconv" - "strings" - "time" - - "github.com/pkg/errors" - "golang.org/x/crypto/nacl/secretbox" -) - -// udpOpen . -func (c *Connection) udpOpen() error { - c.mut.Lock() - defer c.mut.Unlock() - - // As a wise man once said: "You always gotta check for stupidity" - if c.WS == nil { - return errors.New("connection does not have a websocket") - } - - // Check if a UDP connection is already open. - if c.udpConn != nil { - return errors.New("udp connection is already open") - } - - // Format the connection host. - host := c.ready.IP + ":" + strconv.Itoa(c.ready.Port) - - // Resolve the host. - addr, err := net.ResolveUDPAddr("udp", host) - if err != nil { - return errors.Wrap(err, "Failed to resolve host") - } - - // Create a new UDP connection. - c.udpConn, err = net.DialUDP("udp", nil, addr) - if err != nil { - return errors.Wrap(err, "Failed to dial host") - } - - // https://discordapp.com/developers/docs/topics/voice-connections#ip-discovery - ssrcBuffer := make([]byte, 70) - ssrcBuffer[0] = 0x1 - ssrcBuffer[1] = 0x2 - binary.BigEndian.PutUint16(ssrcBuffer[2:4], 70) - binary.BigEndian.PutUint32(ssrcBuffer[4:8], c.ready.SSRC) - _, err = c.udpConn.Write(ssrcBuffer) - if err != nil { - return errors.Wrap(err, "Failed to write") - } - - ipBuffer := make([]byte, 70) - var n int - n, err = c.udpConn.Read(ipBuffer) - if err != nil { - return errors.Wrap(err, "Failed to write") - } - if n < 70 { - return errors.New("udp packet received from discord is not the required 70 bytes") - } - - ipb := string(ipBuffer[4:68]) - nullPos := strings.Index(ipb, "\x00") - if nullPos < 0 { - return errors.New("udp ip discovery did not contain a null terminator") - } - ip := ipb[:nullPos] - port := binary.LittleEndian.Uint16(ipBuffer[68:70]) - - // Send a Select Protocol operation to the Discord Voice Gateway. - err = c.SelectProtocol(SelectProtocol{ - Protocol: "udp", - Data: SelectProtocolData{ - Address: ip, - Port: port, - Mode: "xsalsa20_poly1305", - }, - }) - if err != nil { - return err - } - - // Wait for session description. - WSDebug("Waiting for Session Description..") - <-c.sessionDescChan - WSDebug("Received Session Description") - - return nil -} - -// https://discordapp.com/developers/docs/topics/voice-connections#encrypting-and-sending-voice -func (c *Connection) opusSendLoop() { - header := make([]byte, 12) - header[0] = 0x80 // Version + Flags - header[1] = 0x78 // Payload Type - // header[2:4] // Sequence - // header[4:8] // Timestamp - binary.BigEndian.PutUint32(header[8:12], c.ready.SSRC) // SSRC - - var ( - sequence uint16 - timestamp uint32 - nonce [24]byte - - msg []byte - open bool - ) - - // 50 sends per second, 960 samples each at 48kHz - frequency := time.NewTicker(time.Millisecond * 20) - defer frequency.Stop() - - for { - select { - case msg, open = <-c.OpusSend: - if !open { - return - } - case <-c.close: - return - } - - binary.BigEndian.PutUint16(header[2:4], sequence) - sequence++ - - binary.BigEndian.PutUint32(header[4:8], timestamp) - timestamp += 960 // Samples - - copy(nonce[:], header) - - toSend := secretbox.Seal(header, msg, &nonce, &c.sessionDescription.SecretKey) - select { - case <-frequency.C: - case <-c.close: - return - } - - _, _ = c.udpConn.Write(toSend) - } -} diff --git a/voice/udp/udp.go b/voice/udp/udp.go new file mode 100644 index 0000000..c8a9116 --- /dev/null +++ b/voice/udp/udp.go @@ -0,0 +1,158 @@ +package udp + +import ( + "bytes" + "encoding/binary" + "io" + "net" + "time" + + "github.com/pkg/errors" + "golang.org/x/crypto/nacl/secretbox" +) + +type Connection struct { + GatewayIP string + GatewayPort uint16 + + ssrc uint32 + + sequence uint16 + timestamp uint32 + nonce [24]byte + + conn *net.UDPConn + close chan struct{} + closed chan struct{} + + send chan []byte + reply chan error +} + +func DialConnection(addr string, ssrc uint32) (*Connection, error) { + // Resolve the host. + a, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + return nil, errors.Wrap(err, "Failed to resolve host") + } + + // Create a new UDP connection. + conn, err := net.DialUDP("udp", nil, a) + if err != nil { + return nil, errors.Wrap(err, "Failed to dial host") + } + + // https://discordapp.com/developers/docs/topics/voice-connections#ip-discovery + ssrcBuffer := [70]byte{ + 0x1, 0x2, + } + binary.BigEndian.PutUint16(ssrcBuffer[2:4], 70) + binary.BigEndian.PutUint32(ssrcBuffer[4:8], ssrc) + + _, err = conn.Write(ssrcBuffer[:]) + if err != nil { + return nil, errors.Wrap(err, "Failed to write SSRC buffer") + } + + var ipBuffer [70]byte + + // ReadFull makes sure to read all 70 bytes. + _, err = io.ReadFull(conn, ipBuffer[:]) + if err != nil { + return nil, errors.Wrap(err, "Failed to read IP buffer") + } + + ipbody := ipBuffer[4:68] + + nullPos := bytes.Index(ipbody, []byte{'\x00'}) + if nullPos < 0 { + return nil, errors.New("UDP IP discovery did not contain a null terminator") + } + + ip := ipbody[:nullPos] + port := binary.LittleEndian.Uint16(ipBuffer[68:70]) + + return &Connection{ + GatewayIP: string(ip), + GatewayPort: port, + + ssrc: ssrc, + conn: conn, + send: make(chan []byte), + reply: make(chan error), + close: make(chan struct{}), + closed: make(chan struct{}), + }, nil +} + +func (c *Connection) Start(secret *[32]byte) { + header := [12]byte{ + 0: 0x80, // Version + Flags + 1: 0x78, // Payload Type + // [2:4] // Sequence + // [4:8] // Timestamp + } + + // Write SSRC to the header. + binary.BigEndian.PutUint32(header[8:12], c.ssrc) // SSRC + + // 50 sends per second, 960 samples each at 48kHz + frequency := time.NewTicker(time.Millisecond * 20) + defer frequency.Stop() + + var b []byte + var ok bool + + // Close these channels at the end so Write() doesn't block. + defer func() { + close(c.send) + close(c.closed) + }() + + for { + select { + case b, ok = <-c.send: + if !ok { + return + } + case <-c.close: + return + } + + // Write a new sequence. + binary.BigEndian.PutUint16(header[2:4], c.sequence) + c.sequence++ + + binary.BigEndian.PutUint32(header[4:8], c.timestamp) + c.timestamp += 960 // Samples + + copy(c.nonce[:], header[:]) + + toSend := secretbox.Seal(header[:], b, &c.nonce, secret) + + select { + case <-frequency.C: + case <-c.close: + return + } + + _, err := c.conn.Write(toSend) + c.reply <- err + } +} + +func (c *Connection) Close() error { + close(c.close) + <-c.closed + + return c.conn.Close() +} + +// Write sends bytes into the voice UDP connection. +func (c *Connection) Write(b []byte) (int, error) { + c.send <- b + if err := <-c.reply; err != nil { + return 0, err + } + return len(b), nil +} diff --git a/voice/voice.go b/voice/voice.go index cfcbdfb..d4def8e 100644 --- a/voice/voice.go +++ b/voice/voice.go @@ -8,57 +8,34 @@ import ( "github.com/diamondburned/arikawa/discord" "github.com/diamondburned/arikawa/gateway" "github.com/diamondburned/arikawa/state" - "github.com/diamondburned/arikawa/utils/wsutil" "github.com/pkg/errors" ) -const ( - // Version represents the current version of the Discord Voice Gateway this package uses. - Version = "4" - - // WSTimeout is the timeout for connecting and writing to the Websocket, - // before Gateway cancels and fails. - WSTimeout = wsutil.DefaultTimeout -) - var ( // defaultErrorHandler is the default error handler defaultErrorHandler = func(err error) { log.Println("Voice gateway error:", err) } - // WSDebug is used for extra debug logging. This is expected to behave - // similarly to log.Println(). - WSDebug = func(v ...interface{}) {} - - // ErrMissingForIdentify is an error when we are missing information to identify. - ErrMissingForIdentify = errors.New("missing GuildID, UserID, SessionID, or Token for identify") - - // ErrMissingForResume is an error when we are missing information to resume. - ErrMissingForResume = errors.New("missing GuildID, SessionID, or Token for resuming") - // ErrCannotSend is an error when audio is sent to a closed channel. ErrCannotSend = errors.New("cannot send audio to closed channel") ) -// Voice represents a Voice Repository used for managing voice connections. +// Voice represents a Voice Repository used for managing voice sessions. type Voice struct { - mut sync.RWMutex + *state.State - state *state.State - - // Connections holds all of the active voice connections. - connections map[discord.Snowflake]*Connection + // Session holds all of the active voice sessions. + mapmutex sync.Mutex + sessions map[discord.Snowflake]*Session // guildID:Session // ErrorLog will be called when an error occurs (defaults to log.Println) ErrorLog func(err error) } -// NewVoice creates a new Voice Repository. +// NewVoice creates a new Voice repository wrapped around a state. func NewVoice(s *state.State) *Voice { v := &Voice{ - state: s, - - connections: make(map[discord.Snowflake]*Connection), - + State: s, + sessions: make(map[discord.Snowflake]*Session), ErrorLog: defaultErrorHandler, } @@ -69,82 +46,91 @@ func NewVoice(s *state.State) *Voice { return v } -// GetConnection gets a connection for a guild with a read lock. -func (v *Voice) GetConnection(guildID discord.Snowflake) (*Connection, bool) { - v.mut.RLock() - defer v.mut.RUnlock() +// onVoiceStateUpdate receives VoiceStateUpdateEvents from the gateway +// to keep track of the current user's voice state. +func (v *Voice) onVoiceStateUpdate(e *gateway.VoiceStateUpdateEvent) { + // Get the current user. + me, err := v.Me() + if err != nil { + v.ErrorLog(err) + return + } - // For some reason you cannot just put `return v.connections[]` and return a bool D: - conn, ok := v.connections[guildID] + // Ignore the event if it is an update from another user. + if me.ID != e.UserID { + return + } + + // Get the stored voice session for the given guild. + vs, ok := v.GetSession(e.GuildID) + if !ok { + return + } + + // Do what we must. + vs.UpdateState(e) + + // Remove the connection if the current user has disconnected. + if e.ChannelID == 0 { + v.RemoveSession(e.GuildID) + } +} + +// onVoiceServerUpdate receives VoiceServerUpdateEvents from the gateway +// to manage the current user's voice connections. +func (v *Voice) onVoiceServerUpdate(e *gateway.VoiceServerUpdateEvent) { + // Get the stored voice session for the given guild. + vs, ok := v.GetSession(e.GuildID) + if !ok { + return + } + + // Do what we must. + vs.UpdateServer(e) +} + +// GetSession gets a session for a guild with a read lock. +func (v *Voice) GetSession(guildID discord.Snowflake) (*Session, bool) { + v.mapmutex.Lock() + defer v.mapmutex.Unlock() + + // For some reason you cannot just put `return v.sessions[]` and return a bool D: + conn, ok := v.sessions[guildID] return conn, ok } -// RemoveConnection removes a connection. -func (v *Voice) RemoveConnection(guildID discord.Snowflake) { - v.mut.Lock() - defer v.mut.Unlock() +// RemoveSession removes a session. +func (v *Voice) RemoveSession(guildID discord.Snowflake) { + v.mapmutex.Lock() + defer v.mapmutex.Unlock() - delete(v.connections, guildID) + // Ensure that the session is disconnected. + if ses, ok := v.sessions[guildID]; ok { + ses.Disconnect() + } + + delete(v.sessions, guildID) } // JoinChannel joins the specified channel in the specified guild. -func (v *Voice) JoinChannel(gID, cID discord.Snowflake, muted, deafened bool) (*Connection, error) { - // Get the stored voice connection for the given guild. - conn, ok := v.GetConnection(gID) +func (v *Voice) JoinChannel(gID, cID discord.Snowflake, muted, deafened bool) (*Session, error) { + // Get the stored voice session for the given guild. + conn, ok := v.GetSession(gID) - // Create a new voice connection if one does not exist. + // Create a new voice session if one does not exist. if !ok { - conn = newConnection() + u, err := v.Me() + if err != nil { + return nil, errors.Wrap(err, "Failed to get self") + } - v.mut.Lock() - v.connections[gID] = conn - v.mut.Unlock() + conn = NewSession(v.Session, u.ID) + + v.mapmutex.Lock() + v.sessions[gID] = conn + v.mapmutex.Unlock() } - // Update values on the connection. - conn.mut.Lock() - conn.GuildID = gID - conn.ChannelID = cID - - conn.muted = muted - conn.deafened = deafened - conn.mut.Unlock() - - // Ensure that if `cID` is zero that it passes null to the update event. - var channelID *discord.Snowflake - if cID != 0 { - channelID = &cID - } - - // https://discordapp.com/developers/docs/topics/voice-connections#retrieving-voice-server-information - // Send a Voice State Update event to the gateway. - err := v.state.Gateway.UpdateVoiceState(gateway.UpdateVoiceStateData{ - GuildID: gID, - ChannelID: channelID, - SelfMute: muted, - SelfDeaf: deafened, - }) - if err != nil { - return nil, errors.Wrap(err, "Failed to send Voice State Update event") - } - - // Wait for ready event. - WSDebug("Waiting for READY.") - <-conn.readyChan - WSDebug("Received READY.") - - // Open the UDP connection. - if err := conn.udpOpen(); err != nil { - return nil, errors.Wrap(err, "Failed to open UDP connection") - } - - // Make sure the OpusSend channel is set - if conn.OpusSend == nil { - conn.OpusSend = make(chan []byte) - } - - // Run the opus send loop. - go conn.opusSendLoop() - - return conn, nil + // Connect. + return conn, conn.JoinChannel(gID, cID, muted, deafened) } diff --git a/voice/commands.go b/voice/voicegateway/commands.go similarity index 63% rename from voice/commands.go rename to voice/voicegateway/commands.go index bb94d3e..8adb97e 100644 --- a/voice/commands.go +++ b/voice/voicegateway/commands.go @@ -1,9 +1,18 @@ -package voice +package voicegateway import ( "time" "github.com/diamondburned/arikawa/discord" + "github.com/pkg/errors" +) + +var ( + // ErrMissingForIdentify is an error when we are missing information to identify. + ErrMissingForIdentify = errors.New("missing GuildID, UserID, SessionID, or Token for identify") + + // ErrMissingForResume is an error when we are missing information to resume. + ErrMissingForResume = errors.New("missing GuildID, SessionID, or Token for resuming") ) // OPCode 0 @@ -15,12 +24,12 @@ type IdentifyData struct { Token string `json:"token"` } -// Identify sends an Identify operation (opcode 0) to the Voice Gateway. -func (c *Connection) Identify() error { - guildID := c.GuildID - userID := c.UserID - sessionID := c.SessionID - token := c.Token +// Identify sends an Identify operation (opcode 0) to the Gateway Gateway. +func (c *Gateway) Identify() error { + guildID := c.state.GuildID + userID := c.state.UserID + sessionID := c.state.SessionID + token := c.state.Token if guildID == 0 || userID == 0 || sessionID == "" || token == "" { return ErrMissingForIdentify @@ -47,8 +56,8 @@ type SelectProtocolData struct { Mode string `json:"mode"` } -// SelectProtocol sends a Select Protocol operation (opcode 1) to the Voice Gateway. -func (c *Connection) SelectProtocol(data SelectProtocol) error { +// SelectProtocol sends a Select Protocol operation (opcode 1) to the Gateway Gateway. +func (c *Gateway) SelectProtocol(data SelectProtocol) error { return c.Send(SelectProtocolOP, data) } @@ -56,16 +65,16 @@ func (c *Connection) SelectProtocol(data SelectProtocol) error { // https://discordapp.com/developers/docs/topics/voice-connections#heartbeating-example-heartbeat-payload type Heartbeat uint64 -// Heartbeat sends a Heartbeat operation (opcode 3) to the Voice Gateway. -func (c *Connection) Heartbeat() error { +// Heartbeat sends a Heartbeat operation (opcode 3) to the Gateway Gateway. +func (c *Gateway) Heartbeat() error { return c.Send(HeartbeatOP, time.Now().UnixNano()) } // https://discordapp.com/developers/docs/topics/voice-connections#speaking -type Speaking uint64 +type SpeakingFlag uint64 const ( - Microphone Speaking = 1 << iota + Microphone SpeakingFlag = 1 << iota Soundshare Priority ) @@ -73,37 +82,23 @@ const ( // OPCode 5 // https://discordapp.com/developers/docs/topics/voice-connections#speaking-example-speaking-payload type SpeakingData struct { - Speaking Speaking `json:"speaking"` - Delay int `json:"delay"` - SSRC uint32 `json:"ssrc"` + Speaking SpeakingFlag `json:"speaking"` + Delay int `json:"delay"` + SSRC uint32 `json:"ssrc"` } -// Speaking sends a Speaking operation (opcode 5) to the Voice Gateway. -func (c *Connection) Speaking(s Speaking) error { +// Speaking sends a Speaking operation (opcode 5) to the Gateway Gateway. +func (c *Gateway) Speaking(flag SpeakingFlag) error { // How do we allow a user to stop speaking? // Also: https://discordapp.com/developers/docs/topics/voice-connections#voice-data-interpolation return c.Send(SpeakingOP, SpeakingData{ - Speaking: s, + Speaking: flag, Delay: 0, SSRC: c.ready.SSRC, }) } -// StopSpeaking stops speaking. -// https://discordapp.com/developers/docs/topics/voice-connections#voice-data-interpolation -func (c *Connection) StopSpeaking() error { - if c.OpusSend == nil { - return ErrCannotSend - } - - for i := 0; i < 5; i++ { - c.OpusSend <- []byte{0xF8, 0xFF, 0xFE} - } - - return nil -} - // OPCode 7 // https://discordapp.com/developers/docs/topics/voice-connections#resuming-voice-connection-example-resume-connection-payload type ResumeData struct { @@ -112,13 +107,13 @@ type ResumeData struct { Token string `json:"token"` } -// Resume sends a Resume operation (opcode 7) to the Voice Gateway. -func (c *Connection) Resume() error { - guildID := c.GuildID - sessionID := c.SessionID - token := c.Token +// Resume sends a Resume operation (opcode 7) to the Gateway Gateway. +func (c *Gateway) Resume() error { + guildID := c.state.GuildID + sessionID := c.state.SessionID + token := c.state.Token - if guildID == 0 || sessionID == "" || token == "" { + if !guildID.Valid() || sessionID == "" || token == "" { return ErrMissingForResume } diff --git a/voice/ops.go b/voice/voicegateway/events.go similarity index 85% rename from voice/ops.go rename to voice/voicegateway/events.go index 95cd8d1..c0b4c6f 100644 --- a/voice/ops.go +++ b/voice/voicegateway/events.go @@ -1,4 +1,10 @@ -package voice +package voicegateway + +import ( + "strconv" + + "github.com/diamondburned/arikawa/discord" +) // OPCode 2 // https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection-example-voice-ready-payload @@ -17,6 +23,10 @@ type ReadyEvent struct { // HeartbeatInterval discord.Milliseconds `json:"heartbeat_interval"` } +func (r ReadyEvent) Addr() string { + return r.IP + ":" + strconv.Itoa(r.Port) +} + // OPCode 4 // https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-udp-connection-example-session-description-payload type SessionDescriptionEvent struct { @@ -34,7 +44,7 @@ type HeartbeatACKEvent uint64 // OPCode 8 // https://discordapp.com/developers/docs/topics/voice-connections#heartbeating-example-hello-payload-since-v3 type HelloEvent struct { - HeartbeatInterval float64 `json:"heartbeat_interval"` + HeartbeatInterval discord.Milliseconds `json:"heartbeat_interval"` } // OPCode 9 diff --git a/voice/voicegateway/gateway.go b/voice/voicegateway/gateway.go new file mode 100644 index 0000000..70b8933 --- /dev/null +++ b/voice/voicegateway/gateway.go @@ -0,0 +1,311 @@ +// +// For the brave souls who get this far: You are the chosen ones, +// the valiant knights of programming who toil away, without rest, +// fixing our most awful code. To you, true saviors, kings of men, +// I say this: never gonna give you up, never gonna let you down, +// never gonna run around and desert you. Never gonna make you cry, +// never gonna say goodbye. Never gonna tell a lie and hurt you. +// + +package voicegateway + +import ( + "context" + "strings" + "sync" + "time" + + "github.com/diamondburned/arikawa/discord" + "github.com/diamondburned/arikawa/utils/json" + "github.com/diamondburned/arikawa/utils/moreatomic" + "github.com/diamondburned/arikawa/utils/wsutil" + "github.com/pkg/errors" +) + +const ( + // Version represents the current version of the Discord Gateway Gateway this package uses. + Version = "4" +) + +var ( + ErrNoSessionID = errors.New("no sessionID was received") + ErrNoEndpoint = errors.New("no endpoint was received") +) + +// State contains state information of a voice gateway. +type State struct { + GuildID discord.Snowflake + ChannelID discord.Snowflake + UserID discord.Snowflake + + SessionID string + Token string + Endpoint string +} + +// Gateway represents a Discord Gateway Gateway connection. +type Gateway struct { + state State // constant + + mutex sync.RWMutex + ready ReadyEvent + + ws *wsutil.Websocket + + Timeout time.Duration + reconnect moreatomic.Bool + + EventLoop *wsutil.PacemakerLoop + + // ErrorLog will be called when an error occurs (defaults to log.Println) + ErrorLog func(err error) + // AfterClose is called after each close. Error can be non-nil, as this is + // called even when the Gateway is gracefully closed. It's used mainly for + // reconnections or any type of connection interruptions. (defaults to noop) + AfterClose func(err error) + + // Filled by methods, internal use + waitGroup *sync.WaitGroup +} + +func New(state State) *Gateway { + return &Gateway{ + state: state, + Timeout: wsutil.WSTimeout, + ErrorLog: wsutil.WSError, + AfterClose: func(error) {}, + } +} + +// TODO: get rid of +func (c *Gateway) Ready() ReadyEvent { + c.mutex.RLock() + defer c.mutex.RUnlock() + + return c.ready +} + +// Open shouldn't be used, but JoinServer instead. +func (c *Gateway) Open() error { + // https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection + var endpoint = "wss://" + strings.TrimSuffix(c.state.Endpoint, ":80") + "/?v=" + Version + + wsutil.WSDebug("Connecting to voice endpoint (endpoint=" + endpoint + ")") + c.ws = wsutil.New(endpoint) + + // Create a new context with a timeout for the connection. + ctx, cancel := context.WithTimeout(context.Background(), c.Timeout) + defer cancel() + + // Connect to the Gateway Gateway. + if err := c.ws.Dial(ctx); err != nil { + return errors.Wrap(err, "Failed to connect to voice gateway") + } + + wsutil.WSDebug("Trying to start...") + + // Try to start or resume the connection. + if err := c.start(); err != nil { + return err + } + + return nil +} + +// Start . +func (c *Gateway) start() error { + if err := c.__start(); err != nil { + wsutil.WSDebug("Start failed: ", err) + + // Close can be called with the mutex still acquired here, as the + // pacemaker hasn't started yet. + if err := c.Close(); err != nil { + wsutil.WSDebug("Failed to close after start fail: ", err) + } + return err + } + + return nil +} + +// this function blocks until READY. +func (c *Gateway) __start() error { + // Make a new WaitGroup for use in background loops: + c.waitGroup = new(sync.WaitGroup) + + ch := c.ws.Listen() + + // Wait for hello. + wsutil.WSDebug("Waiting for Hello..") + + var hello *HelloEvent + _, err := wsutil.AssertEvent(<-ch, HelloOP, &hello) + if err != nil { + return errors.Wrap(err, "Error at Hello") + } + + wsutil.WSDebug("Received Hello") + + // https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection + // Turns out Hello is sent right away on connection start. + if !c.reconnect.Get() { + if err := c.Identify(); err != nil { + return errors.Wrap(err, "Failed to identify") + } + } else { + if err := c.Resume(); err != nil { + return errors.Wrap(err, "Failed to resume") + } + } + // This bool is because we should only try and Resume once. + c.reconnect.Set(false) + + // Wait for either Ready or Resumed. + err = wsutil.WaitForEvent(c, ch, func(op *wsutil.OP) bool { + return op.Code == ReadyOP || op.Code == ResumedOP + }) + if err != nil { + return errors.Wrap(err, "Failed to wait for Ready or Resumed") + } + + // Start the event handler, which also handles the pacemaker death signal. + c.waitGroup.Add(1) + + // Start the websocket handler. + go c.handleWS(wsutil.NewLoop(hello.HeartbeatInterval.Duration(), ch, c)) + + wsutil.WSDebug("Started successfully.") + + return nil +} + +// Close . +func (c *Gateway) Close() error { + // Check if the WS is already closed: + if c.waitGroup == nil && c.EventLoop.Stopped() { + wsutil.WSDebug("Gateway is already closed.") + + c.AfterClose(nil) + return nil + } + + // If the pacemaker is running: + if !c.EventLoop.Stopped() { + wsutil.WSDebug("Stopping pacemaker...") + + // Stop the pacemaker and the event handler + c.EventLoop.Stop() + + wsutil.WSDebug("Stopped pacemaker.") + } + + wsutil.WSDebug("Waiting for WaitGroup to be done.") + + // This should work, since Pacemaker should signal its loop to stop, which + // would also exit our event loop. Both would be 2. + c.waitGroup.Wait() + + // Mark g.waitGroup as empty: + c.waitGroup = nil + + wsutil.WSDebug("WaitGroup is done. Closing the websocket.") + + err := c.ws.Close() + c.AfterClose(err) + return err +} + +func (c *Gateway) Reconnect() error { + wsutil.WSDebug("Reconnecting...") + + // Guarantee the gateway is already closed. Ignore its error, as we're + // redialing anyway. + c.Close() + + c.reconnect.Set(true) + + // Condition: err == ErrInvalidSession: + // If the connection is rate limited (documented behavior): + // https://discordapp.com/developers/docs/topics/gateway#rate-limiting + + if err := c.Open(); err != nil { + return errors.Wrap(err, "Failed to reopen gateway") + } + + wsutil.WSDebug("Reconnected successfully.") + + return nil +} + +func (c *Gateway) SessionDescription(sp SelectProtocol) (*SessionDescriptionEvent, error) { + // Add the handler first. + ch, cancel := c.EventLoop.Extras.Add(func(op *wsutil.OP) bool { + return op.Code == SessionDescriptionOP + }) + defer cancel() + + if err := c.SelectProtocol(sp); err != nil { + return nil, err + } + + var sesdesc *SessionDescriptionEvent + + // Wait for SessionDescriptionOP packet. + if err := (<-ch).UnmarshalData(&sesdesc); err != nil { + return nil, errors.Wrap(err, "Failed to unmarshal session description") + } + + return sesdesc, nil +} + +// handleWS . +func (c *Gateway) handleWS(evl *wsutil.PacemakerLoop) { + c.EventLoop = evl + err := c.EventLoop.Run() + + c.waitGroup.Done() // mark so Close() can exit. + wsutil.WSDebug("Event loop stopped.") + + if err != nil { + c.ErrorLog(err) + c.Reconnect() + // Reconnect should spawn another eventLoop in its Start function. + } +} + +// Send . +func (c *Gateway) Send(code OPCode, v interface{}) error { + return c.send(code, v) +} + +// send . +func (c *Gateway) send(code OPCode, v interface{}) error { + if c.ws == nil { + return errors.New("tried to send data to a connection without a Websocket") + } + + if c.ws.Conn == nil { + return errors.New("tried to send data to a connection with a closed Websocket") + } + + var op = wsutil.OP{ + Code: code, + } + + if v != nil { + b, err := json.Marshal(v) + if err != nil { + return errors.Wrap(err, "Failed to encode v") + } + + op.Data = b + } + + b, err := json.Marshal(op) + if err != nil { + return errors.Wrap(err, "Failed to encode payload") + } + + // WS should already be thread-safe. + return c.ws.Send(b) +} diff --git a/voice/voicegateway/op.go b/voice/voicegateway/op.go new file mode 100644 index 0000000..ee7c49d --- /dev/null +++ b/voice/voicegateway/op.go @@ -0,0 +1,72 @@ +package voicegateway + +import ( + "fmt" + "sync" + + "github.com/diamondburned/arikawa/utils/json" + "github.com/diamondburned/arikawa/utils/wsutil" + "github.com/pkg/errors" +) + +// OPCode represents a Discord Gateway Gateway operation code. +type OPCode = wsutil.OPCode + +const ( + IdentifyOP OPCode = 0 // send + SelectProtocolOP OPCode = 1 // send + ReadyOP OPCode = 2 // receive + HeartbeatOP OPCode = 3 // send + SessionDescriptionOP OPCode = 4 // receive + SpeakingOP OPCode = 5 // send/receive + HeartbeatAckOP OPCode = 6 // receive + ResumeOP OPCode = 7 // send + HelloOP OPCode = 8 // receive + ResumedOP OPCode = 9 // receive + // ClientDisconnectOP OPCode = 13 // receive +) + +func (c *Gateway) HandleOP(op *wsutil.OP) error { + switch op.Code { + // Gives information required to make a UDP connection + case ReadyOP: + if err := unmarshalMutex(op.Data, &c.ready, &c.mutex); err != nil { + return errors.Wrap(err, "Failed to parse READY event") + } + + // Gives information about the encryption mode and secret key for sending voice packets + case SessionDescriptionOP: + // ? + // Already handled by Session. + + // Someone started or stopped speaking. + case SpeakingOP: + // ? + // TODO: handler in Session + + // Heartbeat response from the server + case HeartbeatAckOP: + c.EventLoop.Echo() + + // Hello server, we hear you! :) + case HelloOP: + // ? + // Already handled on initial connection. + + // Server is saying the connection was resumed, no data here. + case ResumedOP: + wsutil.WSDebug("Gateway connection has been resumed.") + + default: + return fmt.Errorf("unknown OP code %d", op.Code) + } + + return nil +} + +func unmarshalMutex(d []byte, v interface{}, m *sync.RWMutex) error { + m.Lock() + err := json.Unmarshal(d, v) + m.Unlock() + return err +}