Voice: Separated UDP and WS into packages, added io.Writer API

This commit is contained in:
diamondburned (Forefront) 2020-04-24 19:36:33 -07:00
parent 95624292f7
commit 3005c7b44a
18 changed files with 993 additions and 894 deletions

View File

@ -109,7 +109,9 @@ func (s Seconds) Duration() time.Duration {
//
type Milliseconds int
// Milliseconds is in float64 because some Discord events return time with a
// trailing decimal.
type Milliseconds float64
func DurationToMilliseconds(dura time.Duration) Milliseconds {
return Milliseconds(dura.Milliseconds())
@ -120,5 +122,6 @@ func (ms Milliseconds) String() string {
}
func (ms Milliseconds) Duration() time.Duration {
return time.Duration(ms) * time.Millisecond
const f64ms = Milliseconds(time.Millisecond)
return time.Duration(ms * f64ms)
}

2
go.mod
View File

@ -5,8 +5,10 @@ go 1.13
require (
github.com/gorilla/schema v1.1.0
github.com/gorilla/websocket v1.4.2
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
github.com/pkg/errors v0.9.1
github.com/sasha-s/go-csync v0.0.0-20160729053059-3bc6c8bdb3fa
github.com/sasha-s/go-deadlock v0.2.0
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0

4
go.sum
View File

@ -2,10 +2,14 @@ github.com/gorilla/schema v1.1.0 h1:CamqUDOFUBqzrvxuz2vEwo8+SUdwsluFh7IlzJh30LY=
github.com/gorilla/schema v1.1.0/go.mod h1:kgLaKoK1FELgZqMAVxx/5cbj0kT+57qxUrAlIO2eleU=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/sasha-s/go-csync v0.0.0-20160729053059-3bc6c8bdb3fa h1:xiD6U6h+QMkAwI195dFwdku2N+enlCy9XwFTnEXaCQo=
github.com/sasha-s/go-csync v0.0.0-20160729053059-3bc6c8bdb3fa/go.mod h1:KKzWrLiWu6EpzxZBPmPisPgq6oL+do2yLa0C0BTx5fA=
github.com/sasha-s/go-deadlock v0.2.0 h1:lMqc+fUb7RrFS3gQLtoQsJ7/6TV/pAIFvBsqX73DK8Y=
github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=

View File

@ -2,7 +2,6 @@
package heart
import (
"log"
"sync"
"sync/atomic"
"time"
@ -92,8 +91,6 @@ func (p *Pacemaker) Stop() {
}
func (p *Pacemaker) start() error {
log.Println("Heartbeat interval:", p.Heartrate)
// Reset states to its old position.
p.EchoBeat.Set(time.Time{})
p.SentBeat.Set(time.Time{})

View File

@ -93,7 +93,7 @@ func (p *PacemakerLoop) Run() error {
o, err := DecodeOP(ev)
if err != nil {
p.errorLog(errors.Wrap(err, "Failed to decode OP"))
return err
continue // ignore
}
// Check the events before handling.

View File

@ -125,6 +125,10 @@ func (ex *ExtraHandlers) Add(check func(*OP) bool) (<-chan *OP, func()) {
ex.mutex.Lock()
defer ex.mutex.Unlock()
if ex.handlers == nil {
ex.handlers = make(map[uint32]*ExtraHandler, 1)
}
i := ex.serial
ex.serial++

View File

@ -1,383 +0,0 @@
//
// For the brave souls who get this far: You are the chosen ones,
// the valiant knights of programming who toil away, without rest,
// fixing our most awful code. To you, true saviors, kings of men,
// I say this: never gonna give you up, never gonna let you down,
// never gonna run around and desert you. Never gonna make you cry,
// never gonna say goodbye. Never gonna tell a lie and hurt you.
//
package voice
import (
"context"
"net"
"strings"
"sync"
"time"
"github.com/diamondburned/arikawa/discord"
"github.com/diamondburned/arikawa/gateway"
"github.com/diamondburned/arikawa/utils/heart"
"github.com/diamondburned/arikawa/utils/json"
"github.com/diamondburned/arikawa/utils/wsutil"
"github.com/pkg/errors"
)
var (
ErrNoSessionID = errors.New("no SessionID was received after 1 second")
)
// Connection represents a Discord Voice Gateway connection.
type Connection struct {
json.Driver
mut sync.RWMutex
SessionID string
Token string
Endpoint string
reconnection bool
UserID discord.Snowflake
GuildID discord.Snowflake
ChannelID discord.Snowflake
muted bool
deafened bool
speaking bool
WS *wsutil.Websocket
WSTimeout time.Duration
EventLoop *heart.PacemakerLoop
// Pacemaker *gateway.Pacemaker
udpConn *net.UDPConn
OpusSend chan []byte
close chan struct{}
// ErrorLog will be called when an error occurs (defaults to log.Println)
ErrorLog func(err error)
// AfterClose is called after each close. Error can be non-nil, as this is
// called even when the Gateway is gracefully closed. It's used mainly for
// reconnections or any type of connection interruptions. (defaults to noop)
AfterClose func(err error)
// Stored Operations
hello HelloEvent
ready ReadyEvent
sessionDescription SessionDescriptionEvent
// Operation Channels
helloChan chan bool
readyChan chan bool
sessionDescChan chan bool
// Filled by methods, internal use
paceDeath chan error
waitGroup *sync.WaitGroup
}
// newConnection .
func newConnection() *Connection {
return &Connection{
Driver: json.Default{},
WSTimeout: WSTimeout,
close: make(chan struct{}),
ErrorLog: defaultErrorHandler,
AfterClose: func(error) {},
helloChan: make(chan bool),
readyChan: make(chan bool),
sessionDescChan: make(chan bool),
}
}
// Open .
func (c *Connection) Open() error {
// Having this acquire a lock might cause a problem if the `onVoiceStateUpdate`
// does not set a session id in time :/
c.mut.Lock()
defer c.mut.Unlock()
// Check if the connection already has a websocket.
if c.WS != nil {
WSDebug("Connection already has an active websocket")
return nil
}
// I doubt this would happen from my testing, but you never know.
if c.SessionID == "" {
return ErrNoSessionID
}
WSDebug("Connection has a session id")
// https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection
endpoint := "wss://" + strings.TrimSuffix(c.Endpoint, ":80") + "/?v=" + Version
WSDebug("Connecting to voice endpoint (endpoint=" + endpoint + ")")
c.WS = wsutil.NewCustom(wsutil.NewConn(c.Driver), endpoint)
// Create a new context with a timeout for the connection.
ctx, cancel := context.WithTimeout(context.Background(), c.WSTimeout)
defer cancel()
// Connect to the Voice Gateway.
if err := c.WS.Dial(ctx); err != nil {
return errors.Wrap(err, "Failed to connect to Voice Gateway")
}
WSDebug("Trying to start...")
// Try to resume the connection
if err := c.Start(); err != nil {
return err
}
return nil
}
// Start .
func (c *Connection) Start() error {
if err := c.start(); err != nil {
WSDebug("Start failed: ", err)
// Close can be called with the mutex still acquired here, as the
// pacemaker hasn't started yet.
if err := c.Close(); err != nil {
WSDebug("Failed to close after start fail: ", err)
}
return err
}
return nil
}
// start .
func (c *Connection) start() error {
// https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection
// Apparently we send an Identify or Resume once we are connected unlike the other gateway that
// waits for a Hello then sends an Identify or Resume.
if !c.reconnection {
if err := c.Identify(); err != nil {
return errors.Wrap(err, "Failed to identify")
}
} else {
if err := c.Resume(); err != nil {
return errors.Wrap(err, "Failed to resume")
}
}
c.reconnection = false
// Make a new WaitGroup for use in background loops:
c.waitGroup = new(sync.WaitGroup)
// Wait for hello.
WSDebug("Waiting for Hello..")
_, err := AssertEvent(c, <-c.WS.Listen(), HelloOP, &c.hello)
if err != nil {
return errors.Wrap(err, "Error at Hello")
}
WSDebug("Received Hello")
// // Start the pacemaker with the heartrate received from Hello, after
// // initializing everything. This ensures we only heartbeat if the websocket
// // is authenticated.
// c.Pacemaker = &gateway.Pacemaker{
// Heartrate: time.Duration(int(c.hello.HeartbeatInterval)) * time.Millisecond,
// Pace: c.Heartbeat,
// }
// // Pacemaker dies here, only when it's fatal.
// c.paceDeath = c.Pacemaker.StartAsync(c.waitGroup)
// Start the event handler, which also handles the pacemaker death signal.
c.waitGroup.Add(1)
// Calculate the heartrate.
heartrate := time.Duration(int(c.hello.HeartbeatInterval)) * time.Millisecond
// Start the websocket handler.
go c.handleWS(heart.NewLoop(heartrate, c.WS.Listen(), c))
WSDebug("Started successfully.")
return nil
}
// Close .
func (c *Connection) Close() error {
if c.udpConn != nil {
WSDebug("Closing udp connection.")
close(c.close)
}
// Check if the WS is already closed:
if c.waitGroup == nil && c.paceDeath == nil {
WSDebug("Gateway is already closed.")
c.AfterClose(nil)
return nil
}
// If the pacemaker is running:
if c.paceDeath != nil {
WSDebug("Stopping pacemaker...")
// Stop the pacemaker and the event handler
// c.Pacemaker.Stop()
c.EventLoop.Stop()
WSDebug("Stopped pacemaker.")
}
WSDebug("Waiting for WaitGroup to be done.")
// This should work, since Pacemaker should signal its loop to stop, which
// would also exit our event loop. Both would be 2.
c.waitGroup.Wait()
// Mark g.waitGroup as empty:
c.waitGroup = nil
WSDebug("WaitGroup is done. Closing the websocket.")
err := c.WS.Close()
c.AfterClose(err)
return err
}
func (c *Connection) Reconnect() {
WSDebug("Reconnecting...")
// Guarantee the gateway is already closed. Ignore its error, as we're
// redialing anyway.
c.Close()
c.mut.Lock()
c.reconnection = true
c.mut.Unlock()
for i := 1; ; i++ {
WSDebug("Trying to dial, attempt #", i)
// Condition: err == ErrInvalidSession:
// If the connection is rate limited (documented behavior):
// https://discordapp.com/developers/docs/topics/gateway#rate-limiting
if err := c.Open(); err != nil {
c.ErrorLog(errors.Wrap(err, "Failed to open gateway"))
continue
}
WSDebug("Started after attempt: ", i)
return
}
}
func (c *Connection) Disconnect(g *gateway.Gateway) (err error) {
if c.SessionID != "" {
err = g.UpdateVoiceState(gateway.UpdateVoiceStateData{
GuildID: c.GuildID,
ChannelID: nil,
SelfMute: true,
SelfDeaf: true,
})
c.SessionID = ""
}
// We might want this error and the update voice state error
// but for now we will prioritize the voice state error
_ = c.Close()
return
}
func (c *Connection) HandleEvent(ev wsutil.Event) error {
return HandleEvent(c, ev)
}
// handleWS .
func (c *Connection) handleWS(evl *heart.PacemakerLoop) {
c.EventLoop = evl
err := c.EventLoop.Run()
c.waitGroup.Done() // mark so Close() can exit.
WSDebug("Event loop stopped.")
if err != nil {
c.ErrorLog(err)
c.Reconnect()
// Reconnect should spawn another eventLoop in its Start function.
}
}
// // eventLoop .
// func (c *Connection) eventLoop() error {
// ch := c.WS.Listen()
// for {
// select {
// case err := <-c.paceDeath:
// // Got a paceDeath, we're exiting from here on out.
// c.paceDeath = nil // mark
// if err == nil {
// WSDebug("Pacemaker stopped without errors.")
// // No error, just exit normally.
// return nil
// }
// return errors.Wrap(err, "Pacemaker died, reconnecting")
// case ev := <-ch:
// // Handle the event
// if err := HandleEvent(c, ev); err != nil {
// c.ErrorLog(errors.Wrap(err, "WS handler error"))
// }
// }
// }
// }
// Send .
func (c *Connection) Send(code OPCode, v interface{}) error {
return c.send(code, v)
}
// send .
func (c *Connection) send(code OPCode, v interface{}) error {
if c.WS == nil {
return errors.New("tried to send data to a connection without a Websocket")
}
if c.WS.Conn == nil {
return errors.New("tried to send data to a connection with a closed Websocket")
}
var op = OP{
Code: code,
}
if v != nil {
b, err := c.Driver.Marshal(v)
if err != nil {
return errors.Wrap(err, "Failed to encode v")
}
op.Data = b
}
b, err := c.Driver.Marshal(op)
if err != nil {
return errors.Wrap(err, "Failed to encode payload")
}
// WS should already be thread-safe.
return c.WS.Send(b)
}

View File

@ -1,74 +0,0 @@
package voice
import (
"github.com/diamondburned/arikawa/gateway"
)
// onVoiceStateUpdate receives VoiceStateUpdateEvents from the gateway
// to keep track of the current user's voice state.
func (v *Voice) onVoiceStateUpdate(e *gateway.VoiceStateUpdateEvent) {
// Get the current user.
me, err := v.state.Me()
if err != nil {
v.ErrorLog(err)
return
}
// Ignore the event if it is an update from another user.
if me.ID != e.UserID {
return
}
// Get the stored voice connection for the given guild.
conn, ok := v.GetConnection(e.GuildID)
// Ignore if there is no connection for that guild.
if !ok {
return
}
// Remove the connection if the current user has disconnected.
if e.ChannelID == 0 {
// TODO: Make sure connection is closed?
v.RemoveConnection(e.GuildID)
return
}
// Update values on the connection.
conn.mut.Lock()
defer conn.mut.Unlock()
conn.SessionID = e.SessionID
conn.UserID = e.UserID
conn.ChannelID = e.ChannelID
}
// onVoiceServerUpdate receives VoiceServerUpdateEvents from the gateway
// to manage the current user's voice connections.
func (v *Voice) onVoiceServerUpdate(e *gateway.VoiceServerUpdateEvent) {
// Get the stored voice connection for the given guild.
conn, ok := v.GetConnection(e.GuildID)
// Ignore if there is no connection for that guild.
if !ok {
return
}
// Ensure the connection is closed (has no effect if the connection is already closed)
conn.Close()
// Update values on the connection.
conn.mut.Lock()
conn.Token = e.Token
conn.Endpoint = e.Endpoint
conn.GuildID = e.GuildID
conn.mut.Unlock()
// Open the voice connection.
if err := conn.Open(); err != nil {
v.ErrorLog(err)
return
}
}

View File

@ -7,10 +7,15 @@ import (
"io"
"log"
"os"
"runtime"
"strconv"
"testing"
"time"
"github.com/diamondburned/arikawa/discord"
"github.com/diamondburned/arikawa/state"
"github.com/diamondburned/arikawa/utils/wsutil"
"github.com/diamondburned/arikawa/voice/voicegateway"
)
type testConfig struct {
@ -41,7 +46,7 @@ func mustConfig(t *testing.T) testConfig {
}
// file is only a few bytes lolmao
func nicoReader(t *testing.T) (read func() []byte) {
func nicoReadTo(t *testing.T, dst io.Writer) {
f, err := os.Open("testdata/nico.dca")
if err != nil {
t.Fatal("Failed to open nico.dca:", err)
@ -53,22 +58,18 @@ func nicoReader(t *testing.T) (read func() []byte) {
var lenbuf [4]byte
return func() []byte {
for {
if _, err := io.ReadFull(f, lenbuf[:]); !catchRead(t, err) {
return nil
return
}
// Read the integer
framelen := int(binary.LittleEndian.Uint32(lenbuf[:]))
framelen := int64(binary.LittleEndian.Uint32(lenbuf[:]))
// Read exactly frame
frame := make([]byte, framelen)
if _, err := io.ReadFull(f, frame); !catchRead(t, err) {
return nil
// Copy the frame.
if _, err := io.CopyN(dst, f, framelen); !catchRead(t, err) {
return
}
return frame
}
}
@ -87,9 +88,12 @@ func catchRead(t *testing.T, err error) bool {
func TestIntegration(t *testing.T) {
config := mustConfig(t)
WSDebug = func(v ...interface{}) {
log.Println(v...)
wsutil.WSDebug = func(v ...interface{}) {
_, file, line, _ := runtime.Caller(1)
caller := file + ":" + strconv.Itoa(line)
log.Println(append([]interface{}{caller}, v...)...)
}
// heart.Debug = func(v ...interface{}) {
// log.Println(append([]interface{}{"Pacemaker:"}, v...)...)
// }
@ -104,6 +108,7 @@ func TestIntegration(t *testing.T) {
if err := s.Open(); err != nil {
t.Fatal("Failed to connect:", err)
}
defer s.Close()
// Validate the given voice channel.
c, err := s.Channel(config.VoiceChID)
@ -114,31 +119,49 @@ func TestIntegration(t *testing.T) {
t.Fatal("Channel isn't a guild voice channel.")
}
conn, err := v.JoinChannel(c.GuildID, c.ID, false, false)
// Grab a timer to benchmark things.
finish := timer()
// Join the voice channel.
vs, err := v.JoinChannel(c.GuildID, c.ID, false, false)
if err != nil {
t.Fatal("Failed to join channel:", err)
}
defer func() {
log.Println("Disconnecting from the voice channel.")
if err := vs.Disconnect(); err != nil {
t.Fatal("Failed to disconnect:", err)
}
}()
// Grab the file in the local test data.
read := nicoReader(t)
finish("joining the voice channel")
// Trigger speaking.
if err := conn.Speaking(Microphone); err != nil {
if err := vs.Speaking(voicegateway.Microphone); err != nil {
t.Fatal("Failed to start speaking:", err)
}
defer func() {
log.Println("Stopping speaking.") // sounds grammatically wrong
if err := vs.StopSpeaking(); err != nil {
t.Fatal("Failed to stop speaking:", err)
}
}()
finish("sending the speaking command")
// Copy the audio?
for bytes := read(); bytes != nil; bytes = read() {
conn.OpusSend <- bytes
// conn.Write(bytes)
}
nicoReadTo(t, vs)
// Finish speaking.
if err := conn.StopSpeaking(); err != nil {
t.Fatal("Failed to stop speaking:", err)
}
finish("copying the audio")
}
if err := conn.Disconnect(s.Gateway); err != nil {
t.Fatal("Failed to disconnect:", err)
// simple shitty benchmark thing
func timer() func(finished string) {
var then = time.Now()
return func(finished string) {
now := time.Now()
log.Println("Finished", finished+", took", now.Sub(then))
then = now
}
}

View File

@ -1,124 +0,0 @@
package voice
import (
"fmt"
"github.com/diamondburned/arikawa/utils/json"
"github.com/diamondburned/arikawa/utils/wsutil"
"github.com/pkg/errors"
)
// OPCode represents a Discord Voice Gateway operation code.
type OPCode uint8
const (
IdentifyOP OPCode = 0 // send
SelectProtocolOP OPCode = 1 // send
ReadyOP OPCode = 2 // receive
HeartbeatOP OPCode = 3 // send
SessionDescriptionOP OPCode = 4 // receive
SpeakingOP OPCode = 5 // send/receive
HeartbeatAckOP OPCode = 6 // receive
ResumeOP OPCode = 7 // send
HelloOP OPCode = 8 // receive
ResumedOP OPCode = 9 // receive
// ClientDisconnectOP OPCode = 13 // receive
)
// OP represents a Discord Voice Gateway operation.
type OP struct {
Code OPCode `json:"op"`
Data json.Raw `json:"d,omitempty"`
}
func HandleEvent(c *Connection, ev wsutil.Event) error {
o, err := DecodeOP(c.Driver, ev)
if err != nil {
return err
}
return HandleOP(c, o)
}
func AssertEvent(driver json.Driver, ev wsutil.Event, code OPCode, v interface{}) (*OP, error) {
op, err := DecodeOP(driver, ev)
if err != nil {
return nil, err
}
if op.Code != code {
return op, fmt.Errorf(
"Unexpected OP Code: %d, expected %d (%s)",
op.Code, code, op.Data,
)
}
if err := driver.Unmarshal(op.Data, v); err != nil {
return op, errors.Wrap(err, "Failed to decode data")
}
return op, nil
}
func DecodeOP(driver json.Driver, ev wsutil.Event) (*OP, error) {
if ev.Error != nil {
return nil, ev.Error
}
if len(ev.Data) == 0 {
return nil, errors.New("Empty payload")
}
var op *OP
if err := driver.Unmarshal(ev.Data, &op); err != nil {
return nil, errors.Wrap(err, "OP error: "+string(ev.Data))
}
return op, nil
}
func HandleOP(c *Connection, op *OP) error {
switch op.Code {
// Gives information required to make a UDP connection
case ReadyOP:
if err := c.Driver.Unmarshal(op.Data, &c.ready); err != nil {
return errors.Wrap(err, "Failed to parse READY event")
}
c.readyChan <- true
// Gives information about the encryption mode and secret key for sending voice packets
case SessionDescriptionOP:
if err := c.Driver.Unmarshal(op.Data, &c.sessionDescription); err != nil {
c.ErrorLog(errors.Wrap(err, "Failed to parse SESSION_DESCRIPTION"))
}
c.sessionDescChan <- true
// Someone started or stopped speaking.
case SpeakingOP:
// ?
// Heartbeat response from the server
case HeartbeatAckOP:
c.EventLoop.Echo()
// Hello server, we hear you! :)
case HelloOP:
// Decode the data.
if err := c.Driver.Unmarshal(op.Data, &c.hello); err != nil {
c.ErrorLog(errors.Wrap(err, "Failed to parse HELLO"))
}
c.helloChan <- true
// Server is saying the connection was resumed, no data here.
case ResumedOP:
WSDebug("Voice connection has been resumed")
default:
return fmt.Errorf("unknown OP code %d", op.Code)
}
return nil
}

258
voice/session.go Normal file
View File

@ -0,0 +1,258 @@
package voice
import (
"sync"
"github.com/diamondburned/arikawa/discord"
"github.com/diamondburned/arikawa/gateway"
"github.com/diamondburned/arikawa/session"
"github.com/diamondburned/arikawa/utils/moreatomic"
"github.com/diamondburned/arikawa/utils/wsutil"
"github.com/diamondburned/arikawa/voice/udp"
"github.com/diamondburned/arikawa/voice/voicegateway"
"github.com/pkg/errors"
)
const Protocol = "xsalsa20_poly1305"
var OpusSilence = [...]byte{0xF8, 0xFF, 0xFE}
type Session struct {
session *session.Session
state voicegateway.State
ErrorLog func(err error)
// Filled by events.
// sessionID string
// token string
// endpoint string
// joining determines the behavior of incoming event callbacks (Update).
// If this is true, incoming events will just send into Updated channels. If
// false, events will trigger a reconnection.
joining moreatomic.Bool
incoming chan struct{} // used only when joining == true
mut sync.RWMutex
// TODO: expose getters mutex-guarded.
gateway *voicegateway.Gateway
voiceUDP *udp.Connection
muted bool
deafened bool
speaking bool
}
func NewSession(ses *session.Session, userID discord.Snowflake) *Session {
return &Session{
session: ses,
state: voicegateway.State{
UserID: userID,
},
ErrorLog: func(err error) {},
incoming: make(chan struct{}),
}
}
func (s *Session) UpdateServer(ev *gateway.VoiceServerUpdateEvent) {
// If this is true, then mutex is acquired already.
if s.joining.Get() {
s.state.Endpoint = ev.Endpoint
s.state.Token = ev.Token
s.incoming <- struct{}{}
return
}
// Reconnect.
s.mut.Lock()
defer s.mut.Unlock()
s.state.Endpoint = ev.Endpoint
s.state.Token = ev.Token
if err := s.reconnect(); err != nil {
s.ErrorLog(errors.Wrap(err, "Failed to reconnect after voice server update"))
}
}
func (s *Session) UpdateState(ev *gateway.VoiceStateUpdateEvent) {
if s.state.UserID != ev.UserID {
// Not our state.
return
}
// If this is true, then mutex is acquired already.
if s.joining.Get() {
s.state.SessionID = ev.SessionID
s.state.ChannelID = ev.ChannelID
s.incoming <- struct{}{}
return
}
}
func (s *Session) JoinChannel(gID, cID discord.Snowflake, muted, deafened bool) error {
// Acquire the mutex during join, locking during IO as well.
s.mut.Lock()
defer s.mut.Unlock()
// Set that we're joining.
s.joining.Set(true)
defer s.joining.Set(false) // reset when done
// ensure gateeway and voiceUDP is already closed.
s.ensureClosed()
// Set the state.
s.state.ChannelID = cID
s.state.GuildID = gID
s.muted = muted
s.deafened = deafened
s.speaking = false
// Ensure that if `cID` is zero that it passes null to the update event.
var channelID *discord.Snowflake
if cID.Valid() {
channelID = &cID
}
// https://discordapp.com/developers/docs/topics/voice-connections#retrieving-voice-server-information
// Send a Voice State Update event to the gateway.
err := s.session.Gateway.UpdateVoiceState(gateway.UpdateVoiceStateData{
GuildID: gID,
ChannelID: channelID,
SelfMute: muted,
SelfDeaf: deafened,
})
if err != nil {
return errors.Wrap(err, "Failed to send Voice State Update event")
}
// Wait for replies. The above command should reply with these 2 events.
<-s.incoming
<-s.incoming
// These 2 methods should've updated s.state before sending into these
// channels. Since s.state is already filled, we can go ahead and connect.
return s.reconnect()
}
// reconnect uses the current state to reconnect to a new gateway and UDP
// connection.
func (s *Session) reconnect() (err error) {
s.gateway = voicegateway.New(s.state)
// Open the voice gateway. The function will block until Ready is received.
if err := s.gateway.Open(); err != nil {
return errors.Wrap(err, "Failed to open voice gateway")
}
// Get the Ready event.
voiceReady := s.gateway.Ready()
// Prepare the UDP voice connection.
s.voiceUDP, err = udp.DialConnection(voiceReady.Addr(), voiceReady.SSRC)
if err != nil {
return errors.Wrap(err, "Failed to open voice UDP connection")
}
// Get the session description from the voice gateway.
d, err := s.gateway.SessionDescription(voicegateway.SelectProtocol{
Protocol: "udp",
Data: voicegateway.SelectProtocolData{
Address: s.voiceUDP.GatewayIP,
Port: s.voiceUDP.GatewayPort,
Mode: Protocol,
},
})
if err != nil {
return errors.Wrap(err, "Failed to select protocol")
}
// Start the UDP loop.
go s.voiceUDP.Start(&d.SecretKey)
return nil
}
// Speaking tells Discord we're speaking. This calls
// (*voicegateway.Gateway).Speaking().
func (s *Session) Speaking(flag voicegateway.SpeakingFlag) error {
// TODO: maybe we don't need to mutex protect IO.
s.mut.RLock()
defer s.mut.RUnlock()
return s.gateway.Speaking(flag)
}
func (s *Session) StopSpeaking() error {
// Send 5 frames of silence.
for i := 0; i < 5; i++ {
if _, err := s.Write(OpusSilence[:]); err != nil {
return errors.Wrapf(err, "Failed to send frame %d", i)
}
}
return nil
}
func (s *Session) Write(b []byte) (int, error) {
s.mut.RLock()
defer s.mut.RUnlock()
if s.voiceUDP == nil {
return 0, ErrCannotSend
}
return s.voiceUDP.Write(b)
}
func (s *Session) Disconnect() error {
s.mut.Lock()
defer s.mut.Unlock()
// If we're already closed.
if s.gateway == nil && s.voiceUDP == nil {
return nil
}
// Notify Discord that we're leaving. This will send a
// VoiceStateUpdateEvent, in which our handler will promptly remove the
// session from the map.
err := s.session.Gateway.UpdateVoiceState(gateway.UpdateVoiceStateData{
GuildID: s.state.GuildID,
ChannelID: nil,
SelfMute: true,
SelfDeaf: true,
})
s.ensureClosed()
// wrap returns nil if err is nil
return errors.Wrap(err, "Failed to update voice state")
}
// close ensures everything is closed. It does not acquire the mutex.
func (s *Session) ensureClosed() {
// If we're already closed.
if s.gateway == nil && s.voiceUDP == nil {
return
}
// Disconnect the UDP connection.
if s.voiceUDP != nil {
s.voiceUDP.Close()
s.voiceUDP = nil
}
// Disconnect the voice gateway, ignoring the error.
if s.gateway != nil {
if err := s.gateway.Close(); err != nil {
wsutil.WSDebug("Uncaught voice gateway close error:", err)
}
s.gateway = nil
}
}

View File

@ -1,143 +0,0 @@
package voice
import (
"encoding/binary"
"net"
"strconv"
"strings"
"time"
"github.com/pkg/errors"
"golang.org/x/crypto/nacl/secretbox"
)
// udpOpen .
func (c *Connection) udpOpen() error {
c.mut.Lock()
defer c.mut.Unlock()
// As a wise man once said: "You always gotta check for stupidity"
if c.WS == nil {
return errors.New("connection does not have a websocket")
}
// Check if a UDP connection is already open.
if c.udpConn != nil {
return errors.New("udp connection is already open")
}
// Format the connection host.
host := c.ready.IP + ":" + strconv.Itoa(c.ready.Port)
// Resolve the host.
addr, err := net.ResolveUDPAddr("udp", host)
if err != nil {
return errors.Wrap(err, "Failed to resolve host")
}
// Create a new UDP connection.
c.udpConn, err = net.DialUDP("udp", nil, addr)
if err != nil {
return errors.Wrap(err, "Failed to dial host")
}
// https://discordapp.com/developers/docs/topics/voice-connections#ip-discovery
ssrcBuffer := make([]byte, 70)
ssrcBuffer[0] = 0x1
ssrcBuffer[1] = 0x2
binary.BigEndian.PutUint16(ssrcBuffer[2:4], 70)
binary.BigEndian.PutUint32(ssrcBuffer[4:8], c.ready.SSRC)
_, err = c.udpConn.Write(ssrcBuffer)
if err != nil {
return errors.Wrap(err, "Failed to write")
}
ipBuffer := make([]byte, 70)
var n int
n, err = c.udpConn.Read(ipBuffer)
if err != nil {
return errors.Wrap(err, "Failed to write")
}
if n < 70 {
return errors.New("udp packet received from discord is not the required 70 bytes")
}
ipb := string(ipBuffer[4:68])
nullPos := strings.Index(ipb, "\x00")
if nullPos < 0 {
return errors.New("udp ip discovery did not contain a null terminator")
}
ip := ipb[:nullPos]
port := binary.LittleEndian.Uint16(ipBuffer[68:70])
// Send a Select Protocol operation to the Discord Voice Gateway.
err = c.SelectProtocol(SelectProtocol{
Protocol: "udp",
Data: SelectProtocolData{
Address: ip,
Port: port,
Mode: "xsalsa20_poly1305",
},
})
if err != nil {
return err
}
// Wait for session description.
WSDebug("Waiting for Session Description..")
<-c.sessionDescChan
WSDebug("Received Session Description")
return nil
}
// https://discordapp.com/developers/docs/topics/voice-connections#encrypting-and-sending-voice
func (c *Connection) opusSendLoop() {
header := make([]byte, 12)
header[0] = 0x80 // Version + Flags
header[1] = 0x78 // Payload Type
// header[2:4] // Sequence
// header[4:8] // Timestamp
binary.BigEndian.PutUint32(header[8:12], c.ready.SSRC) // SSRC
var (
sequence uint16
timestamp uint32
nonce [24]byte
msg []byte
open bool
)
// 50 sends per second, 960 samples each at 48kHz
frequency := time.NewTicker(time.Millisecond * 20)
defer frequency.Stop()
for {
select {
case msg, open = <-c.OpusSend:
if !open {
return
}
case <-c.close:
return
}
binary.BigEndian.PutUint16(header[2:4], sequence)
sequence++
binary.BigEndian.PutUint32(header[4:8], timestamp)
timestamp += 960 // Samples
copy(nonce[:], header)
toSend := secretbox.Seal(header, msg, &nonce, &c.sessionDescription.SecretKey)
select {
case <-frequency.C:
case <-c.close:
return
}
_, _ = c.udpConn.Write(toSend)
}
}

158
voice/udp/udp.go Normal file
View File

@ -0,0 +1,158 @@
package udp
import (
"bytes"
"encoding/binary"
"io"
"net"
"time"
"github.com/pkg/errors"
"golang.org/x/crypto/nacl/secretbox"
)
type Connection struct {
GatewayIP string
GatewayPort uint16
ssrc uint32
sequence uint16
timestamp uint32
nonce [24]byte
conn *net.UDPConn
close chan struct{}
closed chan struct{}
send chan []byte
reply chan error
}
func DialConnection(addr string, ssrc uint32) (*Connection, error) {
// Resolve the host.
a, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, errors.Wrap(err, "Failed to resolve host")
}
// Create a new UDP connection.
conn, err := net.DialUDP("udp", nil, a)
if err != nil {
return nil, errors.Wrap(err, "Failed to dial host")
}
// https://discordapp.com/developers/docs/topics/voice-connections#ip-discovery
ssrcBuffer := [70]byte{
0x1, 0x2,
}
binary.BigEndian.PutUint16(ssrcBuffer[2:4], 70)
binary.BigEndian.PutUint32(ssrcBuffer[4:8], ssrc)
_, err = conn.Write(ssrcBuffer[:])
if err != nil {
return nil, errors.Wrap(err, "Failed to write SSRC buffer")
}
var ipBuffer [70]byte
// ReadFull makes sure to read all 70 bytes.
_, err = io.ReadFull(conn, ipBuffer[:])
if err != nil {
return nil, errors.Wrap(err, "Failed to read IP buffer")
}
ipbody := ipBuffer[4:68]
nullPos := bytes.Index(ipbody, []byte{'\x00'})
if nullPos < 0 {
return nil, errors.New("UDP IP discovery did not contain a null terminator")
}
ip := ipbody[:nullPos]
port := binary.LittleEndian.Uint16(ipBuffer[68:70])
return &Connection{
GatewayIP: string(ip),
GatewayPort: port,
ssrc: ssrc,
conn: conn,
send: make(chan []byte),
reply: make(chan error),
close: make(chan struct{}),
closed: make(chan struct{}),
}, nil
}
func (c *Connection) Start(secret *[32]byte) {
header := [12]byte{
0: 0x80, // Version + Flags
1: 0x78, // Payload Type
// [2:4] // Sequence
// [4:8] // Timestamp
}
// Write SSRC to the header.
binary.BigEndian.PutUint32(header[8:12], c.ssrc) // SSRC
// 50 sends per second, 960 samples each at 48kHz
frequency := time.NewTicker(time.Millisecond * 20)
defer frequency.Stop()
var b []byte
var ok bool
// Close these channels at the end so Write() doesn't block.
defer func() {
close(c.send)
close(c.closed)
}()
for {
select {
case b, ok = <-c.send:
if !ok {
return
}
case <-c.close:
return
}
// Write a new sequence.
binary.BigEndian.PutUint16(header[2:4], c.sequence)
c.sequence++
binary.BigEndian.PutUint32(header[4:8], c.timestamp)
c.timestamp += 960 // Samples
copy(c.nonce[:], header[:])
toSend := secretbox.Seal(header[:], b, &c.nonce, secret)
select {
case <-frequency.C:
case <-c.close:
return
}
_, err := c.conn.Write(toSend)
c.reply <- err
}
}
func (c *Connection) Close() error {
close(c.close)
<-c.closed
return c.conn.Close()
}
// Write sends bytes into the voice UDP connection.
func (c *Connection) Write(b []byte) (int, error) {
c.send <- b
if err := <-c.reply; err != nil {
return 0, err
}
return len(b), nil
}

View File

@ -8,57 +8,34 @@ import (
"github.com/diamondburned/arikawa/discord"
"github.com/diamondburned/arikawa/gateway"
"github.com/diamondburned/arikawa/state"
"github.com/diamondburned/arikawa/utils/wsutil"
"github.com/pkg/errors"
)
const (
// Version represents the current version of the Discord Voice Gateway this package uses.
Version = "4"
// WSTimeout is the timeout for connecting and writing to the Websocket,
// before Gateway cancels and fails.
WSTimeout = wsutil.DefaultTimeout
)
var (
// defaultErrorHandler is the default error handler
defaultErrorHandler = func(err error) { log.Println("Voice gateway error:", err) }
// WSDebug is used for extra debug logging. This is expected to behave
// similarly to log.Println().
WSDebug = func(v ...interface{}) {}
// ErrMissingForIdentify is an error when we are missing information to identify.
ErrMissingForIdentify = errors.New("missing GuildID, UserID, SessionID, or Token for identify")
// ErrMissingForResume is an error when we are missing information to resume.
ErrMissingForResume = errors.New("missing GuildID, SessionID, or Token for resuming")
// ErrCannotSend is an error when audio is sent to a closed channel.
ErrCannotSend = errors.New("cannot send audio to closed channel")
)
// Voice represents a Voice Repository used for managing voice connections.
// Voice represents a Voice Repository used for managing voice sessions.
type Voice struct {
mut sync.RWMutex
*state.State
state *state.State
// Connections holds all of the active voice connections.
connections map[discord.Snowflake]*Connection
// Session holds all of the active voice sessions.
mapmutex sync.Mutex
sessions map[discord.Snowflake]*Session // guildID:Session
// ErrorLog will be called when an error occurs (defaults to log.Println)
ErrorLog func(err error)
}
// NewVoice creates a new Voice Repository.
// NewVoice creates a new Voice repository wrapped around a state.
func NewVoice(s *state.State) *Voice {
v := &Voice{
state: s,
connections: make(map[discord.Snowflake]*Connection),
State: s,
sessions: make(map[discord.Snowflake]*Session),
ErrorLog: defaultErrorHandler,
}
@ -69,82 +46,91 @@ func NewVoice(s *state.State) *Voice {
return v
}
// GetConnection gets a connection for a guild with a read lock.
func (v *Voice) GetConnection(guildID discord.Snowflake) (*Connection, bool) {
v.mut.RLock()
defer v.mut.RUnlock()
// onVoiceStateUpdate receives VoiceStateUpdateEvents from the gateway
// to keep track of the current user's voice state.
func (v *Voice) onVoiceStateUpdate(e *gateway.VoiceStateUpdateEvent) {
// Get the current user.
me, err := v.Me()
if err != nil {
v.ErrorLog(err)
return
}
// For some reason you cannot just put `return v.connections[]` and return a bool D:
conn, ok := v.connections[guildID]
// Ignore the event if it is an update from another user.
if me.ID != e.UserID {
return
}
// Get the stored voice session for the given guild.
vs, ok := v.GetSession(e.GuildID)
if !ok {
return
}
// Do what we must.
vs.UpdateState(e)
// Remove the connection if the current user has disconnected.
if e.ChannelID == 0 {
v.RemoveSession(e.GuildID)
}
}
// onVoiceServerUpdate receives VoiceServerUpdateEvents from the gateway
// to manage the current user's voice connections.
func (v *Voice) onVoiceServerUpdate(e *gateway.VoiceServerUpdateEvent) {
// Get the stored voice session for the given guild.
vs, ok := v.GetSession(e.GuildID)
if !ok {
return
}
// Do what we must.
vs.UpdateServer(e)
}
// GetSession gets a session for a guild with a read lock.
func (v *Voice) GetSession(guildID discord.Snowflake) (*Session, bool) {
v.mapmutex.Lock()
defer v.mapmutex.Unlock()
// For some reason you cannot just put `return v.sessions[]` and return a bool D:
conn, ok := v.sessions[guildID]
return conn, ok
}
// RemoveConnection removes a connection.
func (v *Voice) RemoveConnection(guildID discord.Snowflake) {
v.mut.Lock()
defer v.mut.Unlock()
// RemoveSession removes a session.
func (v *Voice) RemoveSession(guildID discord.Snowflake) {
v.mapmutex.Lock()
defer v.mapmutex.Unlock()
delete(v.connections, guildID)
// Ensure that the session is disconnected.
if ses, ok := v.sessions[guildID]; ok {
ses.Disconnect()
}
delete(v.sessions, guildID)
}
// JoinChannel joins the specified channel in the specified guild.
func (v *Voice) JoinChannel(gID, cID discord.Snowflake, muted, deafened bool) (*Connection, error) {
// Get the stored voice connection for the given guild.
conn, ok := v.GetConnection(gID)
func (v *Voice) JoinChannel(gID, cID discord.Snowflake, muted, deafened bool) (*Session, error) {
// Get the stored voice session for the given guild.
conn, ok := v.GetSession(gID)
// Create a new voice connection if one does not exist.
// Create a new voice session if one does not exist.
if !ok {
conn = newConnection()
u, err := v.Me()
if err != nil {
return nil, errors.Wrap(err, "Failed to get self")
}
v.mut.Lock()
v.connections[gID] = conn
v.mut.Unlock()
conn = NewSession(v.Session, u.ID)
v.mapmutex.Lock()
v.sessions[gID] = conn
v.mapmutex.Unlock()
}
// Update values on the connection.
conn.mut.Lock()
conn.GuildID = gID
conn.ChannelID = cID
conn.muted = muted
conn.deafened = deafened
conn.mut.Unlock()
// Ensure that if `cID` is zero that it passes null to the update event.
var channelID *discord.Snowflake
if cID != 0 {
channelID = &cID
}
// https://discordapp.com/developers/docs/topics/voice-connections#retrieving-voice-server-information
// Send a Voice State Update event to the gateway.
err := v.state.Gateway.UpdateVoiceState(gateway.UpdateVoiceStateData{
GuildID: gID,
ChannelID: channelID,
SelfMute: muted,
SelfDeaf: deafened,
})
if err != nil {
return nil, errors.Wrap(err, "Failed to send Voice State Update event")
}
// Wait for ready event.
WSDebug("Waiting for READY.")
<-conn.readyChan
WSDebug("Received READY.")
// Open the UDP connection.
if err := conn.udpOpen(); err != nil {
return nil, errors.Wrap(err, "Failed to open UDP connection")
}
// Make sure the OpusSend channel is set
if conn.OpusSend == nil {
conn.OpusSend = make(chan []byte)
}
// Run the opus send loop.
go conn.opusSendLoop()
return conn, nil
// Connect.
return conn, conn.JoinChannel(gID, cID, muted, deafened)
}

View File

@ -1,9 +1,18 @@
package voice
package voicegateway
import (
"time"
"github.com/diamondburned/arikawa/discord"
"github.com/pkg/errors"
)
var (
// ErrMissingForIdentify is an error when we are missing information to identify.
ErrMissingForIdentify = errors.New("missing GuildID, UserID, SessionID, or Token for identify")
// ErrMissingForResume is an error when we are missing information to resume.
ErrMissingForResume = errors.New("missing GuildID, SessionID, or Token for resuming")
)
// OPCode 0
@ -15,12 +24,12 @@ type IdentifyData struct {
Token string `json:"token"`
}
// Identify sends an Identify operation (opcode 0) to the Voice Gateway.
func (c *Connection) Identify() error {
guildID := c.GuildID
userID := c.UserID
sessionID := c.SessionID
token := c.Token
// Identify sends an Identify operation (opcode 0) to the Gateway Gateway.
func (c *Gateway) Identify() error {
guildID := c.state.GuildID
userID := c.state.UserID
sessionID := c.state.SessionID
token := c.state.Token
if guildID == 0 || userID == 0 || sessionID == "" || token == "" {
return ErrMissingForIdentify
@ -47,8 +56,8 @@ type SelectProtocolData struct {
Mode string `json:"mode"`
}
// SelectProtocol sends a Select Protocol operation (opcode 1) to the Voice Gateway.
func (c *Connection) SelectProtocol(data SelectProtocol) error {
// SelectProtocol sends a Select Protocol operation (opcode 1) to the Gateway Gateway.
func (c *Gateway) SelectProtocol(data SelectProtocol) error {
return c.Send(SelectProtocolOP, data)
}
@ -56,16 +65,16 @@ func (c *Connection) SelectProtocol(data SelectProtocol) error {
// https://discordapp.com/developers/docs/topics/voice-connections#heartbeating-example-heartbeat-payload
type Heartbeat uint64
// Heartbeat sends a Heartbeat operation (opcode 3) to the Voice Gateway.
func (c *Connection) Heartbeat() error {
// Heartbeat sends a Heartbeat operation (opcode 3) to the Gateway Gateway.
func (c *Gateway) Heartbeat() error {
return c.Send(HeartbeatOP, time.Now().UnixNano())
}
// https://discordapp.com/developers/docs/topics/voice-connections#speaking
type Speaking uint64
type SpeakingFlag uint64
const (
Microphone Speaking = 1 << iota
Microphone SpeakingFlag = 1 << iota
Soundshare
Priority
)
@ -73,37 +82,23 @@ const (
// OPCode 5
// https://discordapp.com/developers/docs/topics/voice-connections#speaking-example-speaking-payload
type SpeakingData struct {
Speaking Speaking `json:"speaking"`
Delay int `json:"delay"`
SSRC uint32 `json:"ssrc"`
Speaking SpeakingFlag `json:"speaking"`
Delay int `json:"delay"`
SSRC uint32 `json:"ssrc"`
}
// Speaking sends a Speaking operation (opcode 5) to the Voice Gateway.
func (c *Connection) Speaking(s Speaking) error {
// Speaking sends a Speaking operation (opcode 5) to the Gateway Gateway.
func (c *Gateway) Speaking(flag SpeakingFlag) error {
// How do we allow a user to stop speaking?
// Also: https://discordapp.com/developers/docs/topics/voice-connections#voice-data-interpolation
return c.Send(SpeakingOP, SpeakingData{
Speaking: s,
Speaking: flag,
Delay: 0,
SSRC: c.ready.SSRC,
})
}
// StopSpeaking stops speaking.
// https://discordapp.com/developers/docs/topics/voice-connections#voice-data-interpolation
func (c *Connection) StopSpeaking() error {
if c.OpusSend == nil {
return ErrCannotSend
}
for i := 0; i < 5; i++ {
c.OpusSend <- []byte{0xF8, 0xFF, 0xFE}
}
return nil
}
// OPCode 7
// https://discordapp.com/developers/docs/topics/voice-connections#resuming-voice-connection-example-resume-connection-payload
type ResumeData struct {
@ -112,13 +107,13 @@ type ResumeData struct {
Token string `json:"token"`
}
// Resume sends a Resume operation (opcode 7) to the Voice Gateway.
func (c *Connection) Resume() error {
guildID := c.GuildID
sessionID := c.SessionID
token := c.Token
// Resume sends a Resume operation (opcode 7) to the Gateway Gateway.
func (c *Gateway) Resume() error {
guildID := c.state.GuildID
sessionID := c.state.SessionID
token := c.state.Token
if guildID == 0 || sessionID == "" || token == "" {
if !guildID.Valid() || sessionID == "" || token == "" {
return ErrMissingForResume
}

View File

@ -1,4 +1,10 @@
package voice
package voicegateway
import (
"strconv"
"github.com/diamondburned/arikawa/discord"
)
// OPCode 2
// https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection-example-voice-ready-payload
@ -17,6 +23,10 @@ type ReadyEvent struct {
// HeartbeatInterval discord.Milliseconds `json:"heartbeat_interval"`
}
func (r ReadyEvent) Addr() string {
return r.IP + ":" + strconv.Itoa(r.Port)
}
// OPCode 4
// https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-udp-connection-example-session-description-payload
type SessionDescriptionEvent struct {
@ -34,7 +44,7 @@ type HeartbeatACKEvent uint64
// OPCode 8
// https://discordapp.com/developers/docs/topics/voice-connections#heartbeating-example-hello-payload-since-v3
type HelloEvent struct {
HeartbeatInterval float64 `json:"heartbeat_interval"`
HeartbeatInterval discord.Milliseconds `json:"heartbeat_interval"`
}
// OPCode 9

View File

@ -0,0 +1,311 @@
//
// For the brave souls who get this far: You are the chosen ones,
// the valiant knights of programming who toil away, without rest,
// fixing our most awful code. To you, true saviors, kings of men,
// I say this: never gonna give you up, never gonna let you down,
// never gonna run around and desert you. Never gonna make you cry,
// never gonna say goodbye. Never gonna tell a lie and hurt you.
//
package voicegateway
import (
"context"
"strings"
"sync"
"time"
"github.com/diamondburned/arikawa/discord"
"github.com/diamondburned/arikawa/utils/json"
"github.com/diamondburned/arikawa/utils/moreatomic"
"github.com/diamondburned/arikawa/utils/wsutil"
"github.com/pkg/errors"
)
const (
// Version represents the current version of the Discord Gateway Gateway this package uses.
Version = "4"
)
var (
ErrNoSessionID = errors.New("no sessionID was received")
ErrNoEndpoint = errors.New("no endpoint was received")
)
// State contains state information of a voice gateway.
type State struct {
GuildID discord.Snowflake
ChannelID discord.Snowflake
UserID discord.Snowflake
SessionID string
Token string
Endpoint string
}
// Gateway represents a Discord Gateway Gateway connection.
type Gateway struct {
state State // constant
mutex sync.RWMutex
ready ReadyEvent
ws *wsutil.Websocket
Timeout time.Duration
reconnect moreatomic.Bool
EventLoop *wsutil.PacemakerLoop
// ErrorLog will be called when an error occurs (defaults to log.Println)
ErrorLog func(err error)
// AfterClose is called after each close. Error can be non-nil, as this is
// called even when the Gateway is gracefully closed. It's used mainly for
// reconnections or any type of connection interruptions. (defaults to noop)
AfterClose func(err error)
// Filled by methods, internal use
waitGroup *sync.WaitGroup
}
func New(state State) *Gateway {
return &Gateway{
state: state,
Timeout: wsutil.WSTimeout,
ErrorLog: wsutil.WSError,
AfterClose: func(error) {},
}
}
// TODO: get rid of
func (c *Gateway) Ready() ReadyEvent {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.ready
}
// Open shouldn't be used, but JoinServer instead.
func (c *Gateway) Open() error {
// https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection
var endpoint = "wss://" + strings.TrimSuffix(c.state.Endpoint, ":80") + "/?v=" + Version
wsutil.WSDebug("Connecting to voice endpoint (endpoint=" + endpoint + ")")
c.ws = wsutil.New(endpoint)
// Create a new context with a timeout for the connection.
ctx, cancel := context.WithTimeout(context.Background(), c.Timeout)
defer cancel()
// Connect to the Gateway Gateway.
if err := c.ws.Dial(ctx); err != nil {
return errors.Wrap(err, "Failed to connect to voice gateway")
}
wsutil.WSDebug("Trying to start...")
// Try to start or resume the connection.
if err := c.start(); err != nil {
return err
}
return nil
}
// Start .
func (c *Gateway) start() error {
if err := c.__start(); err != nil {
wsutil.WSDebug("Start failed: ", err)
// Close can be called with the mutex still acquired here, as the
// pacemaker hasn't started yet.
if err := c.Close(); err != nil {
wsutil.WSDebug("Failed to close after start fail: ", err)
}
return err
}
return nil
}
// this function blocks until READY.
func (c *Gateway) __start() error {
// Make a new WaitGroup for use in background loops:
c.waitGroup = new(sync.WaitGroup)
ch := c.ws.Listen()
// Wait for hello.
wsutil.WSDebug("Waiting for Hello..")
var hello *HelloEvent
_, err := wsutil.AssertEvent(<-ch, HelloOP, &hello)
if err != nil {
return errors.Wrap(err, "Error at Hello")
}
wsutil.WSDebug("Received Hello")
// https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection
// Turns out Hello is sent right away on connection start.
if !c.reconnect.Get() {
if err := c.Identify(); err != nil {
return errors.Wrap(err, "Failed to identify")
}
} else {
if err := c.Resume(); err != nil {
return errors.Wrap(err, "Failed to resume")
}
}
// This bool is because we should only try and Resume once.
c.reconnect.Set(false)
// Wait for either Ready or Resumed.
err = wsutil.WaitForEvent(c, ch, func(op *wsutil.OP) bool {
return op.Code == ReadyOP || op.Code == ResumedOP
})
if err != nil {
return errors.Wrap(err, "Failed to wait for Ready or Resumed")
}
// Start the event handler, which also handles the pacemaker death signal.
c.waitGroup.Add(1)
// Start the websocket handler.
go c.handleWS(wsutil.NewLoop(hello.HeartbeatInterval.Duration(), ch, c))
wsutil.WSDebug("Started successfully.")
return nil
}
// Close .
func (c *Gateway) Close() error {
// Check if the WS is already closed:
if c.waitGroup == nil && c.EventLoop.Stopped() {
wsutil.WSDebug("Gateway is already closed.")
c.AfterClose(nil)
return nil
}
// If the pacemaker is running:
if !c.EventLoop.Stopped() {
wsutil.WSDebug("Stopping pacemaker...")
// Stop the pacemaker and the event handler
c.EventLoop.Stop()
wsutil.WSDebug("Stopped pacemaker.")
}
wsutil.WSDebug("Waiting for WaitGroup to be done.")
// This should work, since Pacemaker should signal its loop to stop, which
// would also exit our event loop. Both would be 2.
c.waitGroup.Wait()
// Mark g.waitGroup as empty:
c.waitGroup = nil
wsutil.WSDebug("WaitGroup is done. Closing the websocket.")
err := c.ws.Close()
c.AfterClose(err)
return err
}
func (c *Gateway) Reconnect() error {
wsutil.WSDebug("Reconnecting...")
// Guarantee the gateway is already closed. Ignore its error, as we're
// redialing anyway.
c.Close()
c.reconnect.Set(true)
// Condition: err == ErrInvalidSession:
// If the connection is rate limited (documented behavior):
// https://discordapp.com/developers/docs/topics/gateway#rate-limiting
if err := c.Open(); err != nil {
return errors.Wrap(err, "Failed to reopen gateway")
}
wsutil.WSDebug("Reconnected successfully.")
return nil
}
func (c *Gateway) SessionDescription(sp SelectProtocol) (*SessionDescriptionEvent, error) {
// Add the handler first.
ch, cancel := c.EventLoop.Extras.Add(func(op *wsutil.OP) bool {
return op.Code == SessionDescriptionOP
})
defer cancel()
if err := c.SelectProtocol(sp); err != nil {
return nil, err
}
var sesdesc *SessionDescriptionEvent
// Wait for SessionDescriptionOP packet.
if err := (<-ch).UnmarshalData(&sesdesc); err != nil {
return nil, errors.Wrap(err, "Failed to unmarshal session description")
}
return sesdesc, nil
}
// handleWS .
func (c *Gateway) handleWS(evl *wsutil.PacemakerLoop) {
c.EventLoop = evl
err := c.EventLoop.Run()
c.waitGroup.Done() // mark so Close() can exit.
wsutil.WSDebug("Event loop stopped.")
if err != nil {
c.ErrorLog(err)
c.Reconnect()
// Reconnect should spawn another eventLoop in its Start function.
}
}
// Send .
func (c *Gateway) Send(code OPCode, v interface{}) error {
return c.send(code, v)
}
// send .
func (c *Gateway) send(code OPCode, v interface{}) error {
if c.ws == nil {
return errors.New("tried to send data to a connection without a Websocket")
}
if c.ws.Conn == nil {
return errors.New("tried to send data to a connection with a closed Websocket")
}
var op = wsutil.OP{
Code: code,
}
if v != nil {
b, err := json.Marshal(v)
if err != nil {
return errors.Wrap(err, "Failed to encode v")
}
op.Data = b
}
b, err := json.Marshal(op)
if err != nil {
return errors.Wrap(err, "Failed to encode payload")
}
// WS should already be thread-safe.
return c.ws.Send(b)
}

72
voice/voicegateway/op.go Normal file
View File

@ -0,0 +1,72 @@
package voicegateway
import (
"fmt"
"sync"
"github.com/diamondburned/arikawa/utils/json"
"github.com/diamondburned/arikawa/utils/wsutil"
"github.com/pkg/errors"
)
// OPCode represents a Discord Gateway Gateway operation code.
type OPCode = wsutil.OPCode
const (
IdentifyOP OPCode = 0 // send
SelectProtocolOP OPCode = 1 // send
ReadyOP OPCode = 2 // receive
HeartbeatOP OPCode = 3 // send
SessionDescriptionOP OPCode = 4 // receive
SpeakingOP OPCode = 5 // send/receive
HeartbeatAckOP OPCode = 6 // receive
ResumeOP OPCode = 7 // send
HelloOP OPCode = 8 // receive
ResumedOP OPCode = 9 // receive
// ClientDisconnectOP OPCode = 13 // receive
)
func (c *Gateway) HandleOP(op *wsutil.OP) error {
switch op.Code {
// Gives information required to make a UDP connection
case ReadyOP:
if err := unmarshalMutex(op.Data, &c.ready, &c.mutex); err != nil {
return errors.Wrap(err, "Failed to parse READY event")
}
// Gives information about the encryption mode and secret key for sending voice packets
case SessionDescriptionOP:
// ?
// Already handled by Session.
// Someone started or stopped speaking.
case SpeakingOP:
// ?
// TODO: handler in Session
// Heartbeat response from the server
case HeartbeatAckOP:
c.EventLoop.Echo()
// Hello server, we hear you! :)
case HelloOP:
// ?
// Already handled on initial connection.
// Server is saying the connection was resumed, no data here.
case ResumedOP:
wsutil.WSDebug("Gateway connection has been resumed.")
default:
return fmt.Errorf("unknown OP code %d", op.Code)
}
return nil
}
func unmarshalMutex(d []byte, v interface{}, m *sync.RWMutex) error {
m.Lock()
err := json.Unmarshal(d, v)
m.Unlock()
return err
}