diff --git a/gateway/commands.go b/gateway/commands.go index 2116a53..37f6cd6 100644 --- a/gateway/commands.go +++ b/gateway/commands.go @@ -48,7 +48,7 @@ func (g *Gateway) Resume() error { // from a dead connection. Start() resumes from a dead connection. func (g *Gateway) ResumeCtx(ctx context.Context) error { var ( - ses = g.SessionID + ses = g.SessionID() seq = g.Sequence.Get() ) diff --git a/gateway/gateway.go b/gateway/gateway.go index fe407a1..0a1a6b9 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -96,9 +96,8 @@ type Gateway struct { // Session. Events chan Event - // SessionID is used to store the session ID received after Ready. It is not - // thread-safe. - SessionID string + sessionMu sync.RWMutex + sessionID string Identifier *Identifier Sequence *moreatomic.Int64 @@ -216,6 +215,15 @@ func (g *Gateway) Close() error { return err } +// SessionID returns the session ID received after Ready. This function is +// concurrently safe. +func (g *Gateway) SessionID() string { + g.sessionMu.RLock() + defer g.sessionMu.RUnlock() + + return g.sessionID +} + // Reconnect tries to reconnect until the ReconnectTimeout is reached, or if // set to 0 reconnects indefinitely. func (g *Gateway) Reconnect() { @@ -344,9 +352,25 @@ func (g *Gateway) start(ctx context.Context) error { wsutil.WSDebug("Hello received; duration:", hello.HeartbeatInterval) + // Start the event handler, which also handles the pacemaker death signal. + g.waitGroup.Add(1) + + // Use the pacemaker loop. + g.PacerLoop.StartBeating(hello.HeartbeatInterval.Duration(), g, func(err error) { + g.waitGroup.Done() // mark so Close() can exit. + wsutil.WSDebug("Event loop stopped with error:", err) + + // Only attempt to reconnect if we have a session ID at all. We may not + // have one if we haven't even connected successfully once. + if err != nil && g.SessionID() != "" { + g.ErrorLog(err) + g.Reconnect() + } + }) + // Send Discord either the Identify packet (if it's a fresh connection), or // a Resume packet (if it's a dead connection). - if g.SessionID == "" { + if g.SessionID() == "" { // SessionID is empty, so this is a completely new session. if err := g.IdentifyCtx(ctx); err != nil { return errors.Wrap(err, "failed to identify") @@ -377,19 +401,8 @@ func (g *Gateway) start(ctx context.Context) error { return errors.Wrap(err, "first error") } - // Start the event handler, which also handles the pacemaker death signal. - g.waitGroup.Add(1) - - // Use the pacemaker loop. - g.PacerLoop.RunAsync(hello.HeartbeatInterval.Duration(), ch, g, func(err error) { - g.waitGroup.Done() // mark so Close() can exit. - wsutil.WSDebug("Event loop stopped with error:", err) - - if err != nil { - g.ErrorLog(err) - g.Reconnect() - } - }) + // Bind the event channel to the pacemaker loop. + g.PacerLoop.SetEventChannel(ch) wsutil.WSDebug("Started successfully.") diff --git a/gateway/integration_test.go b/gateway/integration_test.go index 661ea3c..b7567b7 100644 --- a/gateway/integration_test.go +++ b/gateway/integration_test.go @@ -72,12 +72,15 @@ func TestIntegration(t *testing.T) { t.Fatal("Event received is not of type Ready:", ev) } - if gateway.SessionID == "" { + if gateway.SessionID() == "" { t.Fatal("Session ID is empty") } log.Println("Bot's username is", ready.User.Username) + // Send a faster heartbeat every second for testing. + g.PacerLoop.SetPace(time.Second) + // Sleep past the rate limiter before reconnecting: time.Sleep(5 * time.Second) diff --git a/gateway/op.go b/gateway/op.go index 55babed..f41db46 100644 --- a/gateway/op.go +++ b/gateway/op.go @@ -101,7 +101,9 @@ func (g *Gateway) HandleOP(op *wsutil.OP) error { // If the event is a ready, we'll want its sessionID if ev, ok := ev.(*ReadyEvent); ok { - g.SessionID = ev.SessionID + g.sessionMu.Lock() + g.sessionID = ev.SessionID + g.sessionMu.Unlock() } // Throw the event into a channel; it's valid now. diff --git a/internal/heart/heart.go b/internal/heart/heart.go index 73ae4e4..4fb706a 100644 --- a/internal/heart/heart.go +++ b/internal/heart/heart.go @@ -31,9 +31,24 @@ func (t *AtomicTime) Time() time.Time { return time.Unix(0, t.Get()) } +// AtomicDuration is a thread-safe Duration guarded by atomic. +type AtomicDuration struct { + duration int64 +} + +func (d *AtomicDuration) Get() time.Duration { + return time.Duration(atomic.LoadInt64(&d.duration)) +} + +func (d *AtomicDuration) Set(dura time.Duration) { + atomic.StoreInt64(&d.duration, int64(dura)) +} + +// Pacemaker is the internal pacemaker state. All fields are not thread-safe +// unless they're atomic. type Pacemaker struct { // Heartrate is the received duration between heartbeats. - Heartrate time.Duration + Heartrate AtomicDuration ticker time.Ticker Ticks <-chan time.Time @@ -48,7 +63,7 @@ type Pacemaker struct { func NewPacemaker(heartrate time.Duration, pacer func(context.Context) error) Pacemaker { p := Pacemaker{ - Heartrate: heartrate, + Heartrate: AtomicDuration{int64(heartrate)}, Pacer: pacer, ticker: *time.NewTicker(heartrate), } @@ -77,7 +92,19 @@ func (p *Pacemaker) Dead() bool { return false } - return sent-echo > int64(p.Heartrate)*2 + return sent-echo > int64(p.Heartrate.Get())*2 +} + +// SetHeartRate sets the ticker's heart rate. +func (p *Pacemaker) SetPace(heartrate time.Duration) { + p.Heartrate.Set(heartrate) + + // To uncomment when 1.16 releases and we drop support for 1.14. + // p.ticker.Reset(heartrate) + + p.ticker.Stop() + p.ticker = *time.NewTicker(heartrate) + p.Ticks = p.ticker.C } // Stop stops the pacemaker, or it does nothing if the pacemaker is not started. @@ -87,7 +114,7 @@ func (p *Pacemaker) StopTicker() { // pace sends a heartbeat with the appropriate timeout for the context. func (p *Pacemaker) Pace() error { - ctx, cancel := context.WithTimeout(context.Background(), p.Heartrate) + ctx, cancel := context.WithTimeout(context.Background(), p.Heartrate.Get()) defer cancel() return p.PaceCtx(ctx) diff --git a/utils/wsutil/heart.go b/utils/wsutil/heart.go index 75f5772..af9a178 100644 --- a/utils/wsutil/heart.go +++ b/utils/wsutil/heart.go @@ -51,6 +51,7 @@ type PacemakerLoop struct { ErrorLog func(error) events <-chan Event + control chan func() handler func(*OP) error } @@ -68,18 +69,32 @@ func (p *PacemakerLoop) Pace(ctx context.Context) error { return p.Pacemaker.PaceCtx(ctx) } -func (p *PacemakerLoop) RunAsync( - heartrate time.Duration, evs <-chan Event, evl EventLoopHandler, exit func(error)) { - +// StartBeating asynchronously starts the pacemaker loop. +func (p *PacemakerLoop) StartBeating(pace time.Duration, evl EventLoopHandler, exit func(error)) { WSDebug("Starting the pacemaker loop.") - p.Pacemaker = heart.NewPacemaker(heartrate, evl.HeartbeatCtx) + p.Pacemaker = heart.NewPacemaker(pace, evl.HeartbeatCtx) + p.control = make(chan func()) p.handler = evl.HandleOP - p.events = evs + p.events = nil // block forever go func() { exit(p.startLoop()) }() } +// SetEventChannel sets the event channel inside the event loop. There is no +// guarantee that the channel is set when the function returns. This function is +// concurrently safe. +func (p *PacemakerLoop) SetEventChannel(evCh <-chan Event) { + p.control <- func() { p.events = evCh } +} + +// SetPace (re)sets the pace duration. As with SetEventChannel, there is no +// guarantee that the pacer is reset when the function returns. This function is +// concurrently safe. +func (p *PacemakerLoop) SetPace(pace time.Duration) { + p.control <- func() { p.Pacemaker.SetPace(pace) } +} + func (p *PacemakerLoop) startLoop() error { defer WSDebug("Pacemaker loop has exited.") defer p.Pacemaker.StopTicker() @@ -91,6 +106,9 @@ func (p *PacemakerLoop) startLoop() error { return errors.Wrap(err, "pace failed, reconnecting") } + case fn := <-p.control: + fn() + case ev, ok := <-p.events: if !ok { WSDebug("Events channel closed, stopping pacemaker.") diff --git a/voice/voicegateway/gateway.go b/voice/voicegateway/gateway.go index 768cace..565ebc4 100644 --- a/voice/voicegateway/gateway.go +++ b/voice/voicegateway/gateway.go @@ -166,7 +166,25 @@ func (c *Gateway) __start(ctx context.Context) error { wsutil.WSDebug("VoiceGateway: Received Hello") - // https://discord.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection + // Start the event handler, which also handles the pacemaker death signal. + c.waitGroup.Add(1) + + c.EventLoop.StartBeating(hello.HeartbeatInterval.Duration(), c, func(err error) { + c.waitGroup.Done() // mark so Close() can exit. + wsutil.WSDebug("VoiceGateway: Event loop stopped.") + + if err != nil { + c.ErrorLog(err) + + if err := c.Reconnect(); err != nil { + c.ErrorLog(errors.Wrap(err, "failed to reconnect voice")) + } + + // Reconnect should spawn another eventLoop in its Start function. + } + }) + + // https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection // Turns out Hello is sent right away on connection start. if !c.reconnect.Get() { if err := c.IdentifyCtx(ctx); err != nil { @@ -188,23 +206,8 @@ func (c *Gateway) __start(ctx context.Context) error { return errors.Wrap(err, "failed to wait for Ready or Resumed") } - // Start the event handler, which also handles the pacemaker death signal. - c.waitGroup.Add(1) - - c.EventLoop.RunAsync(hello.HeartbeatInterval.Duration(), ch, c, func(err error) { - c.waitGroup.Done() // mark so Close() can exit. - wsutil.WSDebug("VoiceGateway: Event loop stopped.") - - if err != nil { - c.ErrorLog(err) - - if err := c.Reconnect(); err != nil { - c.ErrorLog(errors.Wrap(err, "failed to reconnect voice")) - } - - // Reconnect should spawn another eventLoop in its Start function. - } - }) + // Bind the event channel away. + c.EventLoop.SetEventChannel(ch) wsutil.WSDebug("VoiceGateway: Started successfully.")