From f429010dedadbb11cd5dfb0568197c9e9fa456df Mon Sep 17 00:00:00 2001 From: Matthew Penner Date: Sun, 19 Apr 2020 19:21:10 -0600 Subject: [PATCH] Add janky voice support --- gateway/commands.go | 8 +- voice/commands.go | 124 +++++++++++++++ voice/connection.go | 371 ++++++++++++++++++++++++++++++++++++++++++++ voice/events.go | 74 +++++++++ voice/op.go | 110 +++++++++++++ voice/ops.go | 42 +++++ voice/packet.go | 11 ++ voice/udp.go | 143 +++++++++++++++++ voice/voice.go | 166 ++++++++++++++++++++ 9 files changed, 1045 insertions(+), 4 deletions(-) create mode 100644 voice/commands.go create mode 100644 voice/connection.go create mode 100644 voice/events.go create mode 100644 voice/op.go create mode 100644 voice/ops.go create mode 100644 voice/packet.go create mode 100644 voice/udp.go create mode 100644 voice/voice.go diff --git a/gateway/commands.go b/gateway/commands.go index bfe0d42..0aafbde 100644 --- a/gateway/commands.go +++ b/gateway/commands.go @@ -68,10 +68,10 @@ func (g *Gateway) RequestGuildMembers(data RequestGuildMembersData) error { } type UpdateVoiceStateData struct { - GuildID discord.Snowflake `json:"guild_id"` - ChannelID discord.Snowflake `json:"channel_id"` - SelfMute bool `json:"self_mute"` - SelfDeaf bool `json:"self_deaf"` + GuildID discord.Snowflake `json:"guild_id"` + ChannelID *discord.Snowflake `json:"channel_id"` // nullable + SelfMute bool `json:"self_mute"` + SelfDeaf bool `json:"self_deaf"` } func (g *Gateway) UpdateVoiceState(data UpdateVoiceStateData) error { diff --git a/voice/commands.go b/voice/commands.go new file mode 100644 index 0000000..3bebf7e --- /dev/null +++ b/voice/commands.go @@ -0,0 +1,124 @@ +package voice + +import ( + "time" + + "github.com/diamondburned/arikawa/discord" +) + +// OPCode 0 +// https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection-example-voice-identify-payload +type IdentifyData struct { + GuildID discord.Snowflake `json:"server_id"` // yes, this should be "server_id" + UserID discord.Snowflake `json:"user_id"` + SessionID string `json:"session_id"` + Token string `json:"token"` +} + +// Identify sends an Identify operation (opcode 0) to the Voice Gateway. +func (c *Connection) Identify() error { + guildID := c.GuildID + userID := c.UserID + sessionID := c.SessionID + token := c.Token + + if guildID == 0 || userID == 0 || sessionID == "" || token == "" { + return ErrMissingForIdentify + } + + return c.Send(IdentifyOP, IdentifyData{ + GuildID: guildID, + UserID: userID, + SessionID: sessionID, + Token: token, + }) +} + +// OPCode 1 +// https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-udp-connection-example-select-protocol-payload +type SelectProtocol struct { + Protocol string `json:"protocol"` + Data SelectProtocolData `json:"data"` +} + +type SelectProtocolData struct { + Address string `json:"address"` + Port uint16 `json:"port"` + Mode string `json:"mode"` +} + +// SelectProtocol sends a Select Protocol operation (opcode 1) to the Voice Gateway. +func (c *Connection) SelectProtocol(data SelectProtocol) error { + return c.Send(SelectProtocolOP, data) +} + +// OPCode 3 +// https://discordapp.com/developers/docs/topics/voice-connections#heartbeating-example-heartbeat-payload +type Heartbeat uint64 + +// Heartbeat sends a Heartbeat operation (opcode 3) to the Voice Gateway. +func (c *Connection) Heartbeat() error { + return c.Send(HeartbeatOP, time.Now().UnixNano()) +} + +// https://discordapp.com/developers/docs/topics/voice-connections#speaking +type Speaking uint64 + +const ( + Microphone Speaking = 1 << iota + Soundshare + Priority +) + +// OPCode 5 +// https://discordapp.com/developers/docs/topics/voice-connections#speaking-example-speaking-payload +type SpeakingData struct { + Speaking Speaking `json:"speaking"` + Delay int `json:"delay"` + SSRC uint32 `json:"ssrc"` +} + +// Speaking sends a Speaking operation (opcode 5) to the Voice Gateway. +func (c *Connection) Speaking(s Speaking) error { + // How do we allow a user to stop speaking? + // Also: https://discordapp.com/developers/docs/topics/voice-connections#voice-data-interpolation + + return c.Send(SpeakingOP, SpeakingData{ + Speaking: s, + Delay: 0, + SSRC: c.ready.SSRC, + }) +} + +// StopSpeaking stops speaking. +// https://discordapp.com/developers/docs/topics/voice-connections#voice-data-interpolation +func (c *Connection) StopSpeaking() { + for i := 0; i < 5; i++ { + c.OpusSend <- []byte{0xF8, 0xFF, 0xFE} + } +} + +// OPCode 7 +// https://discordapp.com/developers/docs/topics/voice-connections#resuming-voice-connection-example-resume-connection-payload +type ResumeData struct { + GuildID discord.Snowflake `json:"server_id"` // yes, this should be "server_id" + SessionID string `json:"session_id"` + Token string `json:"token"` +} + +// Resume sends a Resume operation (opcode 7) to the Voice Gateway. +func (c *Connection) Resume() error { + guildID := c.GuildID + sessionID := c.SessionID + token := c.Token + + if guildID == 0 || sessionID == "" || token == "" { + return ErrMissingForResume + } + + return c.Send(ResumeOP, ResumeData{ + GuildID: guildID, + SessionID: sessionID, + Token: token, + }) +} diff --git a/voice/connection.go b/voice/connection.go new file mode 100644 index 0000000..958cbd6 --- /dev/null +++ b/voice/connection.go @@ -0,0 +1,371 @@ +// +// For the brave souls who get this far: You are the chosen ones, +// the valiant knights of programming who toil away, without rest, +// fixing our most awful code. To you, true saviors, kings of men, +// I say this: never gonna give you up, never gonna let you down, +// never gonna run around and desert you. Never gonna make you cry, +// never gonna say goodbye. Never gonna tell a lie and hurt you. +// + +package voice + +import ( + "context" + "net" + "strings" + "sync" + "time" + + "github.com/diamondburned/arikawa/discord" + "github.com/diamondburned/arikawa/gateway" + "github.com/diamondburned/arikawa/utils/json" + "github.com/diamondburned/arikawa/utils/wsutil" + "github.com/pkg/errors" +) + +var ( + ErrNoSessionID = errors.New("no SessionID was received after 1 second") +) + +// Connection represents a Discord Voice Gateway connection. +type Connection struct { + json.Driver + mut sync.RWMutex + + SessionID string + Token string + Endpoint string + reconnection bool + + UserID discord.Snowflake + GuildID discord.Snowflake + ChannelID discord.Snowflake + + muted bool + deafened bool + speaking bool + + WS *wsutil.Websocket + WSTimeout time.Duration + + Pacemaker *gateway.Pacemaker + + udpConn *net.UDPConn + OpusSend chan []byte + close chan struct{} + + // ErrorLog will be called when an error occurs (defaults to log.Println) + ErrorLog func(err error) + // AfterClose is called after each close. Error can be non-nil, as this is + // called even when the Gateway is gracefully closed. It's used mainly for + // reconnections or any type of connection interruptions. (defaults to noop) + AfterClose func(err error) + + // Stored Operations + hello HelloEvent + ready ReadyEvent + sessionDescription SessionDescriptionEvent + + // Filled by methods, internal use + paceDeath chan error + waitGroup *sync.WaitGroup +} + +// newConnection . +func newConnection() *Connection { + return &Connection{ + Driver: json.Default{}, + + WSTimeout: WSTimeout, + + close: make(chan struct{}), + + ErrorLog: defaultErrorHandler, + AfterClose: func(error) {}, + } +} + +// Open . +func (c *Connection) Open() error { + c.mut.Lock() + defer c.mut.Unlock() + + // Check if the connection already has a websocket. + if c.WS != nil { + WSDebug("Connection already has an active websocket") + return nil + } + + // Wait for the SessionID. + // TODO: Find better way to wait. + for i := 0; i < 20; i++ { + if c.SessionID != "" { + break + } + + time.Sleep(50 * time.Millisecond) + } + + if c.SessionID == "" { + return ErrNoSessionID + } + + // https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection + endpoint := "wss://" + strings.TrimSuffix(c.Endpoint, ":80") + "/?v=" + Version + + WSDebug("Connecting to voice endpoint (endpoint=" + endpoint + ")") + + c.WS = wsutil.NewCustom(wsutil.NewConn(c.Driver), endpoint) + + // Create a new context with a timeout for the connection. + ctx, cancel := context.WithTimeout(context.Background(), c.WSTimeout) + defer cancel() + + // Connect to the Voice Gateway. + if err := c.WS.Dial(ctx); err != nil { + return errors.Wrap(err, "Failed to connect to Voice Gateway") + } + + WSDebug("Trying to start...") + + // Try to resume the connection + if err := c.Start(); err != nil { + return err + } + + return nil +} + +// Start . +func (c *Connection) Start() error { + if err := c.start(); err != nil { + WSDebug("Start failed: ", err) + + // Close can be called with the mutex still acquired here, as the + // pacemaker hasn't started yet. + if err := c.Close(); err != nil { + WSDebug("Failed to close after start fail: ", err) + } + return err + } + + return nil +} + +// start . +func (c *Connection) start() error { + // https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection + // Apparently we send an Identify or Resume once we are connected unlike the other gateway that + // waits for a Hello then sends an Identify or Resume. + if !c.reconnection { + if err := c.Identify(); err != nil { + return errors.Wrap(err, "Failed to identify") + } + } else { + if err := c.Resume(); err != nil { + return errors.Wrap(err, "Failed to resume") + } + } + c.reconnection = false + + // Make a new WaitGroup for use in background loops: + c.waitGroup = new(sync.WaitGroup) + + // Start the websocket handler. + go c.handleWS() + + // Wait for hello. + // TODO: Find better way to wait + WSDebug("Waiting for Hello..") + for { + if c.hello.HeartbeatInterval == 0 { + time.Sleep(50 * time.Millisecond) + continue + } + + break + } + WSDebug("Received Hello") + + // Start the pacemaker with the heartrate received from Hello, after + // initializing everything. This ensures we only heartbeat if the websocket + // is authenticated. + c.Pacemaker = &gateway.Pacemaker{ + Heartrate: time.Duration(int(c.hello.HeartbeatInterval)) * time.Millisecond, + Pace: c.Heartbeat, + } + // Pacemaker dies here, only when it's fatal. + c.paceDeath = c.Pacemaker.StartAsync(c.waitGroup) + + // Start the event handler, which also handles the pacemaker death signal. + c.waitGroup.Add(1) + + WSDebug("Started successfully.") + + return nil +} + +// Close . +func (c *Connection) Close() error { + if c.udpConn != nil { + WSDebug("Closing udp connection.") + close(c.close) + } + + // Check if the WS is already closed: + if c.waitGroup == nil && c.paceDeath == nil { + WSDebug("Gateway is already closed.") + + c.AfterClose(nil) + return nil + } + + // If the pacemaker is running: + if c.paceDeath != nil { + WSDebug("Stopping pacemaker...") + + // Stop the pacemaker and the event handler + c.Pacemaker.Stop() + + WSDebug("Stopped pacemaker.") + } + + WSDebug("Waiting for WaitGroup to be done.") + + // This should work, since Pacemaker should signal its loop to stop, which + // would also exit our event loop. Both would be 2. + c.waitGroup.Wait() + + // Mark g.waitGroup as empty: + c.waitGroup = nil + + WSDebug("WaitGroup is done. Closing the websocket.") + + err := c.WS.Close() + c.AfterClose(err) + return err +} + +func (c *Connection) Reconnect() { + WSDebug("Reconnecting...") + + // Guarantee the gateway is already closed. Ignore its error, as we're + // redialing anyway. + c.Close() + + c.mut.Lock() + c.reconnection = true + c.mut.Unlock() + + for i := 1; ; i++ { + WSDebug("Trying to dial, attempt #", i) + + // Condition: err == ErrInvalidSession: + // If the connection is rate limited (documented behavior): + // https://discordapp.com/developers/docs/topics/gateway#rate-limiting + + if err := c.Open(); err != nil { + c.ErrorLog(errors.Wrap(err, "Failed to open gateway")) + continue + } + + WSDebug("Started after attempt: ", i) + return + } +} + +func (c *Connection) Disconnect(g *gateway.Gateway) (err error) { + if c.SessionID != "" { + err = g.UpdateVoiceState(gateway.UpdateVoiceStateData{ + GuildID: c.GuildID, + ChannelID: nil, + SelfMute: true, + SelfDeaf: true, + }) + + c.SessionID = "" + } + + // We might want this error and the update voice state error + // but for now we will prioritize the voice state error + _ = c.Close() + + return +} + +// handleWS . +func (c *Connection) handleWS() { + err := c.eventLoop() + c.waitGroup.Done() // mark so Close() can exit. + WSDebug("Event loop stopped.") + + if err != nil { + c.ErrorLog(err) + c.Reconnect() + // Reconnect should spawn another eventLoop in its Start function. + } +} + +// eventLoop . +func (c *Connection) eventLoop() error { + ch := c.WS.Listen() + + for { + select { + case err := <-c.paceDeath: + // Got a paceDeath, we're exiting from here on out. + c.paceDeath = nil // mark + + if err == nil { + WSDebug("Pacemaker stopped without errors.") + // No error, just exit normally. + return nil + } + + return errors.Wrap(err, "Pacemaker died, reconnecting") + + case ev := <-ch: + // Handle the event + if err := HandleEvent(c, ev); err != nil { + c.ErrorLog(errors.Wrap(err, "WS handler error")) + } + } + } +} + +// Send . +func (c *Connection) Send(code OPCode, v interface{}) error { + return c.send(code, v) +} + +// send . +func (c *Connection) send(code OPCode, v interface{}) error { + if c.WS == nil { + return errors.New("tried to send data to a connection without a Websocket") + } + + if c.WS.Conn == nil { + return errors.New("tried to send data to a connection with a closed Websocket") + } + + var op = OP{ + Code: code, + } + + if v != nil { + b, err := c.Driver.Marshal(v) + if err != nil { + return errors.Wrap(err, "Failed to encode v") + } + + op.Data = b + } + + b, err := c.Driver.Marshal(op) + if err != nil { + return errors.Wrap(err, "Failed to encode payload") + } + + // WS should already be thread-safe. + return c.WS.Send(b) +} diff --git a/voice/events.go b/voice/events.go new file mode 100644 index 0000000..9e9b416 --- /dev/null +++ b/voice/events.go @@ -0,0 +1,74 @@ +package voice + +import ( + "github.com/diamondburned/arikawa/gateway" +) + +// onVoiceStateUpdate receives VoiceStateUpdateEvents from the gateway +// to keep track of the current user's voice state. +func (v *Voice) onVoiceStateUpdate(e *gateway.VoiceStateUpdateEvent) { + // Get the current user. + me, err := v.state.Me() + if err != nil { + v.ErrorLog(err) + return + } + + // Ignore the event if it is an update from another user. + if me.ID != e.UserID { + return + } + + // Get the stored voice connection for the given guild. + conn, ok := v.GetConnection(e.GuildID) + + // Ignore if there is no connection for that guild. + if !ok { + return + } + + // Remove the connection if the current user has disconnected. + if e.ChannelID == 0 { + // TODO: Make sure connection is closed? + v.RemoveConnection(e.GuildID) + return + } + + // Update values on the connection. + conn.mut.Lock() + defer conn.mut.Unlock() + + conn.SessionID = e.SessionID + + conn.UserID = e.UserID + conn.ChannelID = e.ChannelID +} + +// onVoiceServerUpdate receives VoiceServerUpdateEvents from the gateway +// to manage the current user's voice connections. +func (v *Voice) onVoiceServerUpdate(e *gateway.VoiceServerUpdateEvent) { + // Get the stored voice connection for the given guild. + conn, ok := v.GetConnection(e.GuildID) + + // Ignore if there is no connection for that guild. + if !ok { + return + } + + // Ensure the connection is closed (has no effect if the connection is already closed) + conn.Close() + + // Update values on the connection. + conn.mut.Lock() + conn.Token = e.Token + conn.Endpoint = e.Endpoint + + conn.GuildID = e.GuildID + conn.mut.Unlock() + + // Open the voice connection. + if err := conn.Open(); err != nil { + v.ErrorLog(err) + return + } +} diff --git a/voice/op.go b/voice/op.go new file mode 100644 index 0000000..d91e938 --- /dev/null +++ b/voice/op.go @@ -0,0 +1,110 @@ +package voice + +import ( + "fmt" + + "github.com/diamondburned/arikawa/utils/json" + "github.com/diamondburned/arikawa/utils/wsutil" + "github.com/pkg/errors" +) + +// OPCode represents a Discord Voice Gateway operation code. +type OPCode uint8 + +const ( + IdentifyOP OPCode = 0 // send + SelectProtocolOP OPCode = 1 // send + ReadyOP OPCode = 2 // receive + HeartbeatOP OPCode = 3 // send + SessionDescriptionOP OPCode = 4 // receive + SpeakingOP OPCode = 5 // send/receive + HeartbeatAckOP OPCode = 6 // receive + ResumeOP OPCode = 7 // send + HelloOP OPCode = 8 // receive + ResumedOP OPCode = 9 // receive + // ClientDisconnectOP OPCode = 13 // receive +) + +// OP represents a Discord Voice Gateway operation. +type OP struct { + Code OPCode `json:"op"` + Data json.Raw `json:"d,omitempty"` +} + +func HandleEvent(c *Connection, ev wsutil.Event) error { + o, err := DecodeOP(c.Driver, ev) + if err != nil { + return err + } + + return HandleOP(c, o) +} + +func DecodeOP(driver json.Driver, ev wsutil.Event) (*OP, error) { + if ev.Error != nil { + return nil, ev.Error + } + + if len(ev.Data) == 0 { + return nil, errors.New("Empty payload") + } + + var op *OP + if err := driver.Unmarshal(ev.Data, &op); err != nil { + return nil, errors.Wrap(err, "OP error: "+string(ev.Data)) + } + + return op, nil +} + +func HandleOP(c *Connection, op *OP) error { + switch op.Code { + // Gives information required to make a UDP connection + case ReadyOP: + if err := c.Driver.Unmarshal(op.Data, &c.ready); err != nil { + return errors.Wrap(err, "Failed to parse READY event") + } + + if err := c.udpOpen(); err != nil { + return errors.Wrap(err, "Failed to open UDP connection") + } + + if c.OpusSend == nil { + c.OpusSend = make(chan []byte) + } + + go c.opusSendLoop() + + // Gives information about the encryption mode and secret key for sending voice packets + case SessionDescriptionOP: + if err := c.Driver.Unmarshal(op.Data, &c.sessionDescription); err != nil { + c.ErrorLog(errors.Wrap(err, "Failed to parse SESSION_DESCRIPTION")) + } + + // Someone started or stopped speaking. + case SpeakingOP: + // ? + return nil + + // Heartbeat response from the server + case HeartbeatAckOP: + c.Pacemaker.Echo() + + // Hello server, we hear you! :) + case HelloOP: + // Decode the data. + if err := c.Driver.Unmarshal(op.Data, &c.hello); err != nil { + c.ErrorLog(errors.Wrap(err, "Failed to parse SESSION_DESCRIPTION")) + } + return nil + + // Server is saying the connection was resumed, no data here. + case ResumedOP: + WSDebug("Voice connection has been resumed") + + default: + return fmt.Errorf("unknown OP code %d", op.Code) + } + + return nil +} diff --git a/voice/ops.go b/voice/ops.go new file mode 100644 index 0000000..95cd8d1 --- /dev/null +++ b/voice/ops.go @@ -0,0 +1,42 @@ +package voice + +// OPCode 2 +// https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection-example-voice-ready-payload +type ReadyEvent struct { + SSRC uint32 `json:"ssrc"` + IP string `json:"ip"` + Port int `json:"port"` + Modes []string `json:"modes"` + Experiments []string `json:"experiments"` + + // From Discord's API Docs: + // + // `heartbeat_interval` here is an erroneous field and should be ignored. + // The correct `heartbeat_interval` value comes from the Hello payload. + + // HeartbeatInterval discord.Milliseconds `json:"heartbeat_interval"` +} + +// OPCode 4 +// https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-udp-connection-example-session-description-payload +type SessionDescriptionEvent struct { + Mode string `json:"mode"` + SecretKey [32]byte `json:"secret_key"` +} + +// OPCode 5 +type SpeakingEvent SpeakingData + +// OPCode 6 +// https://discordapp.com/developers/docs/topics/voice-connections#heartbeating-example-heartbeat-ack-payload +type HeartbeatACKEvent uint64 + +// OPCode 8 +// https://discordapp.com/developers/docs/topics/voice-connections#heartbeating-example-hello-payload-since-v3 +type HelloEvent struct { + HeartbeatInterval float64 `json:"heartbeat_interval"` +} + +// OPCode 9 +// https://discordapp.com/developers/docs/topics/voice-connections#resuming-voice-connection-example-resumed-payload +type ResumedEvent struct{} diff --git a/voice/packet.go b/voice/packet.go new file mode 100644 index 0000000..e3ff58a --- /dev/null +++ b/voice/packet.go @@ -0,0 +1,11 @@ +package voice + +// https://discordapp.com/developers/docs/topics/voice-connections#encrypting-and-sending-voice +type Packet struct { + Version byte // Single byte value of 0x80 - 1 byte + Type byte // Single byte value of 0x78 - 1 byte + Sequence uint16 // Unsigned short (big endian) - 4 bytes + Timestamp uint32 // Unsigned integer (big endian) - 4 bytes + SSRC uint32 // Unsigned integer (big endian) - 4 bytes + Opus []byte // Binary data +} diff --git a/voice/udp.go b/voice/udp.go new file mode 100644 index 0000000..e666c36 --- /dev/null +++ b/voice/udp.go @@ -0,0 +1,143 @@ +package voice + +import ( + "encoding/binary" + "net" + "strconv" + "strings" + "time" + + "github.com/pkg/errors" + "golang.org/x/crypto/nacl/secretbox" +) + +// udpOpen . +func (c *Connection) udpOpen() error { + c.mut.Lock() + defer c.mut.Unlock() + + // As a wise man once said: "You always gotta check for stupidity" + if c.WS == nil { + return errors.New("connection does not have a websocket") + } + + // Check if a UDP connection is already open. + if c.udpConn != nil { + return errors.New("udp connection is already open") + } + + // Format the connection host. + host := c.ready.IP + ":" + strconv.Itoa(c.ready.Port) + + // Resolve the host. + addr, err := net.ResolveUDPAddr("udp", host) + if err != nil { + return errors.Wrap(err, "Failed to resolve host") + } + + // Create a new UDP connection. + c.udpConn, err = net.DialUDP("udp", nil, addr) + if err != nil { + return errors.Wrap(err, "Failed to dial host") + } + + // https://discordapp.com/developers/docs/topics/voice-connections#ip-discovery + ssrcBuffer := make([]byte, 70) + ssrcBuffer[0] = 0x1 + ssrcBuffer[1] = 0x2 + binary.BigEndian.PutUint16(ssrcBuffer[2:4], 70) + binary.BigEndian.PutUint32(ssrcBuffer[4:8], c.ready.SSRC) + _, err = c.udpConn.Write(ssrcBuffer) + if err != nil { + return errors.Wrap(err, "Failed to write") + } + + ipBuffer := make([]byte, 70) + var n int + n, err = c.udpConn.Read(ipBuffer) + if err != nil { + return errors.Wrap(err, "Failed to write") + } + if n < 70 { + return errors.New("udp packet received from discord is not the required 70 bytes") + } + + ipb := string(ipBuffer[4:68]) + nullPos := strings.Index(ipb, "\x00") + if nullPos < 0 { + return errors.New("udp ip discovery did not contain a null terminator") + } + ip := ipb[:nullPos] + port := binary.LittleEndian.Uint16(ipBuffer[68:70]) + + // Send a Select Protocol operation to the Discord Voice Gateway. + err = c.SelectProtocol(SelectProtocol{ + Protocol: "udp", + Data: SelectProtocolData{ + Address: ip, + Port: port, + Mode: "xsalsa20_poly1305", + }, + }) + if err != nil { + return err + } + + // TODO: Wait until OP4 is received + // side note: you cannot just do a blocking loop as I've done before + // as this method is currently called inside of the event loop + // so for as long as it blocks no other events can be received + + return nil +} + +// https://discordapp.com/developers/docs/topics/voice-connections#encrypting-and-sending-voice +func (c *Connection) opusSendLoop() { + header := make([]byte, 12) + header[0] = 0x80 // Version + Flags + header[1] = 0x78 // Payload Type + // header[2:4] // Sequence + // header[4:8] // Timestamp + binary.BigEndian.PutUint32(header[8:12], c.ready.SSRC) // SSRC + + var ( + sequence uint16 + timestamp uint32 + nonce [24]byte + + msg []byte + open bool + ) + + // 50 sends per second, 960 samples each at 48kHz + frequency := time.NewTicker(time.Millisecond * 20) + defer frequency.Stop() + + for { + select { + case msg, open = <-c.OpusSend: + if !open { + return + } + case <-c.close: + return + } + + binary.BigEndian.PutUint16(header[2:4], sequence) + sequence++ + + binary.BigEndian.PutUint32(header[4:8], timestamp) + timestamp += 960 // Samples + + copy(nonce[:], header) + + toSend := secretbox.Seal(header, msg, &nonce, &c.sessionDescription.SecretKey) + select { + case <-frequency.C: + case <-c.close: + return + } + + _, _ = c.udpConn.Write(toSend) + } +} diff --git a/voice/voice.go b/voice/voice.go new file mode 100644 index 0000000..d90c43f --- /dev/null +++ b/voice/voice.go @@ -0,0 +1,166 @@ +// Package voice is coming soon to an arikawa near you! +package voice + +import ( + "log" + "sync" + "time" + + "github.com/diamondburned/arikawa/discord" + "github.com/diamondburned/arikawa/gateway" + "github.com/diamondburned/arikawa/state" + "github.com/diamondburned/arikawa/utils/wsutil" + "github.com/pkg/errors" +) + +const ( + // Version represents the current version of the Discord Voice Gateway this package uses. + Version = "4" + + // WSTimeout is the timeout for connecting and writing to the Websocket, + // before Gateway cancels and fails. + WSTimeout = wsutil.DefaultTimeout +) + +var ( + // defaultErrorHandler is the default error handler + defaultErrorHandler = func(err error) { log.Println("Voice gateway error:", err) } + + // WSDebug is used for extra debug logging. This is expected to behave + // similarly to log.Println(). + WSDebug = func(v ...interface{}) {} + + // ErrMissingForIdentify . + ErrMissingForIdentify = errors.New("missing GuildID, UserID, SessionID, or Token for identify") + + // ErrMissingForResume . + ErrMissingForResume = errors.New("missing GuildID, SessionID, or Token for resuming") +) + +// Voice . +type Voice struct { + mut sync.RWMutex + + state *state.State + + // Connections holds all of the active voice connections. + connections map[discord.Snowflake]*Connection + + // ErrorLog will be called when an error occurs (defaults to log.Println) + ErrorLog func(err error) +} + +// NewVoice creates a new Voice repository. +func NewVoice(s *state.State) *Voice { + v := &Voice{ + state: s, + + connections: make(map[discord.Snowflake]*Connection), + + ErrorLog: defaultErrorHandler, + } + + // Add the required event handlers to the session. + s.AddHandler(v.onVoiceStateUpdate) + s.AddHandler(v.onVoiceServerUpdate) + + return v +} + +// GetConnection gets a connection for a guild with a read lock. +func (v *Voice) GetConnection(guildID discord.Snowflake) (*Connection, bool) { + v.mut.RLock() + defer v.mut.RUnlock() + + // For some reason you cannot just put `return v.connections[]` and return a bool D: + conn, ok := v.connections[guildID] + return conn, ok +} + +// RemoveConnection removes a connection. +func (v *Voice) RemoveConnection(guildID discord.Snowflake) { + v.mut.Lock() + defer v.mut.Unlock() + + delete(v.connections, guildID) +} + +// JoinChannel . +func (v *Voice) JoinChannel(gID, cID discord.Snowflake, muted, deafened bool) (*Connection, error) { + // Get the stored voice connection for the given guild. + conn, ok := v.GetConnection(gID) + + // Create a new voice connection if one does not exist. + if !ok { + conn = newConnection() + + v.mut.Lock() + v.connections[gID] = conn + v.mut.Unlock() + } + + // Update values on the connection. + conn.mut.Lock() + conn.GuildID = gID + conn.ChannelID = cID + + conn.muted = muted + conn.deafened = deafened + conn.mut.Unlock() + + // Ensure that if `cID` is zero that it passes null to the update event. + var channelID *discord.Snowflake + if cID != 0 { + channelID = &cID + } + + // https://discordapp.com/developers/docs/topics/voice-connections#retrieving-voice-server-information + // Send a Voice State Update event to the gateway. + err := v.state.Gateway.UpdateVoiceState(gateway.UpdateVoiceStateData{ + GuildID: gID, + ChannelID: channelID, + SelfMute: muted, + SelfDeaf: deafened, + }) + if err != nil { + return nil, errors.Wrap(err, "Failed to send Voice State Update event") + } + + // Wait until we are connected. + WSDebug("Waiting for READY.") + + // TODO: Find better way to wait for ready event. + + // Check if the connection is opened. + for i := 0; i < 50; i++ { + if conn.WS != nil && conn.WS.Conn != nil { + break + } + + time.Sleep(50 * time.Millisecond) + } + + if conn.WS == nil || conn.WS.Conn == nil { + return nil, errors.Wrap(err, "Failed to wait for connection to open") + } + + for i := 0; i < 50; i++ { + if conn.ready.IP != "" { + break + } + + time.Sleep(50 * time.Millisecond) + } + + if conn.ready.IP == "" { + return nil, errors.New("failed to wait for ready event") + } + + if conn.udpConn == nil { + return nil, errors.New("udp connection is not open") + } + + WSDebug("Received READY.") + + return conn, nil +}