1
0
Fork 0
mirror of https://github.com/diamondburned/arikawa.git synced 2025-01-07 12:38:05 +00:00

Heart: Improve thread-safety and pace responsiveness

This commit consists of these smaller commits:

    Gateway: SessionID to be a method for thread safety

	This commit breaks the SessionID field of the Gateway struct to
	be thread-safe by wrapping its access with a read-write mutex.
	As this is a bug fix, it is reasonable of a breaking change

    Heart: Allow later binding of event channel

    Voice: Use the new Heart API

    Heart: Fixed data races

    Heart: Allow changing pace, thread-safe Heartbeat
This commit is contained in:
diamondburned 2020-12-31 00:35:37 -08:00
parent 700b55102d
commit f1f052180b
7 changed files with 113 additions and 47 deletions

View file

@ -48,7 +48,7 @@ func (g *Gateway) Resume() error {
// from a dead connection. Start() resumes from a dead connection.
func (g *Gateway) ResumeCtx(ctx context.Context) error {
var (
ses = g.SessionID
ses = g.SessionID()
seq = g.Sequence.Get()
)

View file

@ -96,9 +96,8 @@ type Gateway struct {
// Session.
Events chan Event
// SessionID is used to store the session ID received after Ready. It is not
// thread-safe.
SessionID string
sessionMu sync.RWMutex
sessionID string
Identifier *Identifier
Sequence *moreatomic.Int64
@ -216,6 +215,15 @@ func (g *Gateway) Close() error {
return err
}
// SessionID returns the session ID received after Ready. This function is
// concurrently safe.
func (g *Gateway) SessionID() string {
g.sessionMu.RLock()
defer g.sessionMu.RUnlock()
return g.sessionID
}
// Reconnect tries to reconnect until the ReconnectTimeout is reached, or if
// set to 0 reconnects indefinitely.
func (g *Gateway) Reconnect() {
@ -344,9 +352,25 @@ func (g *Gateway) start(ctx context.Context) error {
wsutil.WSDebug("Hello received; duration:", hello.HeartbeatInterval)
// Start the event handler, which also handles the pacemaker death signal.
g.waitGroup.Add(1)
// Use the pacemaker loop.
g.PacerLoop.StartBeating(hello.HeartbeatInterval.Duration(), g, func(err error) {
g.waitGroup.Done() // mark so Close() can exit.
wsutil.WSDebug("Event loop stopped with error:", err)
// Only attempt to reconnect if we have a session ID at all. We may not
// have one if we haven't even connected successfully once.
if err != nil && g.SessionID() != "" {
g.ErrorLog(err)
g.Reconnect()
}
})
// Send Discord either the Identify packet (if it's a fresh connection), or
// a Resume packet (if it's a dead connection).
if g.SessionID == "" {
if g.SessionID() == "" {
// SessionID is empty, so this is a completely new session.
if err := g.IdentifyCtx(ctx); err != nil {
return errors.Wrap(err, "failed to identify")
@ -377,19 +401,8 @@ func (g *Gateway) start(ctx context.Context) error {
return errors.Wrap(err, "first error")
}
// Start the event handler, which also handles the pacemaker death signal.
g.waitGroup.Add(1)
// Use the pacemaker loop.
g.PacerLoop.RunAsync(hello.HeartbeatInterval.Duration(), ch, g, func(err error) {
g.waitGroup.Done() // mark so Close() can exit.
wsutil.WSDebug("Event loop stopped with error:", err)
if err != nil {
g.ErrorLog(err)
g.Reconnect()
}
})
// Bind the event channel to the pacemaker loop.
g.PacerLoop.SetEventChannel(ch)
wsutil.WSDebug("Started successfully.")

View file

@ -72,12 +72,15 @@ func TestIntegration(t *testing.T) {
t.Fatal("Event received is not of type Ready:", ev)
}
if gateway.SessionID == "" {
if gateway.SessionID() == "" {
t.Fatal("Session ID is empty")
}
log.Println("Bot's username is", ready.User.Username)
// Send a faster heartbeat every second for testing.
g.PacerLoop.SetPace(time.Second)
// Sleep past the rate limiter before reconnecting:
time.Sleep(5 * time.Second)

View file

@ -101,7 +101,9 @@ func (g *Gateway) HandleOP(op *wsutil.OP) error {
// If the event is a ready, we'll want its sessionID
if ev, ok := ev.(*ReadyEvent); ok {
g.SessionID = ev.SessionID
g.sessionMu.Lock()
g.sessionID = ev.SessionID
g.sessionMu.Unlock()
}
// Throw the event into a channel; it's valid now.

View file

@ -31,9 +31,24 @@ func (t *AtomicTime) Time() time.Time {
return time.Unix(0, t.Get())
}
// AtomicDuration is a thread-safe Duration guarded by atomic.
type AtomicDuration struct {
duration int64
}
func (d *AtomicDuration) Get() time.Duration {
return time.Duration(atomic.LoadInt64(&d.duration))
}
func (d *AtomicDuration) Set(dura time.Duration) {
atomic.StoreInt64(&d.duration, int64(dura))
}
// Pacemaker is the internal pacemaker state. All fields are not thread-safe
// unless they're atomic.
type Pacemaker struct {
// Heartrate is the received duration between heartbeats.
Heartrate time.Duration
Heartrate AtomicDuration
ticker time.Ticker
Ticks <-chan time.Time
@ -48,7 +63,7 @@ type Pacemaker struct {
func NewPacemaker(heartrate time.Duration, pacer func(context.Context) error) Pacemaker {
p := Pacemaker{
Heartrate: heartrate,
Heartrate: AtomicDuration{int64(heartrate)},
Pacer: pacer,
ticker: *time.NewTicker(heartrate),
}
@ -77,7 +92,19 @@ func (p *Pacemaker) Dead() bool {
return false
}
return sent-echo > int64(p.Heartrate)*2
return sent-echo > int64(p.Heartrate.Get())*2
}
// SetHeartRate sets the ticker's heart rate.
func (p *Pacemaker) SetPace(heartrate time.Duration) {
p.Heartrate.Set(heartrate)
// To uncomment when 1.16 releases and we drop support for 1.14.
// p.ticker.Reset(heartrate)
p.ticker.Stop()
p.ticker = *time.NewTicker(heartrate)
p.Ticks = p.ticker.C
}
// Stop stops the pacemaker, or it does nothing if the pacemaker is not started.
@ -87,7 +114,7 @@ func (p *Pacemaker) StopTicker() {
// pace sends a heartbeat with the appropriate timeout for the context.
func (p *Pacemaker) Pace() error {
ctx, cancel := context.WithTimeout(context.Background(), p.Heartrate)
ctx, cancel := context.WithTimeout(context.Background(), p.Heartrate.Get())
defer cancel()
return p.PaceCtx(ctx)

View file

@ -51,6 +51,7 @@ type PacemakerLoop struct {
ErrorLog func(error)
events <-chan Event
control chan func()
handler func(*OP) error
}
@ -68,18 +69,32 @@ func (p *PacemakerLoop) Pace(ctx context.Context) error {
return p.Pacemaker.PaceCtx(ctx)
}
func (p *PacemakerLoop) RunAsync(
heartrate time.Duration, evs <-chan Event, evl EventLoopHandler, exit func(error)) {
// StartBeating asynchronously starts the pacemaker loop.
func (p *PacemakerLoop) StartBeating(pace time.Duration, evl EventLoopHandler, exit func(error)) {
WSDebug("Starting the pacemaker loop.")
p.Pacemaker = heart.NewPacemaker(heartrate, evl.HeartbeatCtx)
p.Pacemaker = heart.NewPacemaker(pace, evl.HeartbeatCtx)
p.control = make(chan func())
p.handler = evl.HandleOP
p.events = evs
p.events = nil // block forever
go func() { exit(p.startLoop()) }()
}
// SetEventChannel sets the event channel inside the event loop. There is no
// guarantee that the channel is set when the function returns. This function is
// concurrently safe.
func (p *PacemakerLoop) SetEventChannel(evCh <-chan Event) {
p.control <- func() { p.events = evCh }
}
// SetPace (re)sets the pace duration. As with SetEventChannel, there is no
// guarantee that the pacer is reset when the function returns. This function is
// concurrently safe.
func (p *PacemakerLoop) SetPace(pace time.Duration) {
p.control <- func() { p.Pacemaker.SetPace(pace) }
}
func (p *PacemakerLoop) startLoop() error {
defer WSDebug("Pacemaker loop has exited.")
defer p.Pacemaker.StopTicker()
@ -91,6 +106,9 @@ func (p *PacemakerLoop) startLoop() error {
return errors.Wrap(err, "pace failed, reconnecting")
}
case fn := <-p.control:
fn()
case ev, ok := <-p.events:
if !ok {
WSDebug("Events channel closed, stopping pacemaker.")

View file

@ -166,7 +166,25 @@ func (c *Gateway) __start(ctx context.Context) error {
wsutil.WSDebug("VoiceGateway: Received Hello")
// https://discord.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection
// Start the event handler, which also handles the pacemaker death signal.
c.waitGroup.Add(1)
c.EventLoop.StartBeating(hello.HeartbeatInterval.Duration(), c, func(err error) {
c.waitGroup.Done() // mark so Close() can exit.
wsutil.WSDebug("VoiceGateway: Event loop stopped.")
if err != nil {
c.ErrorLog(err)
if err := c.Reconnect(); err != nil {
c.ErrorLog(errors.Wrap(err, "failed to reconnect voice"))
}
// Reconnect should spawn another eventLoop in its Start function.
}
})
// https://discordapp.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection
// Turns out Hello is sent right away on connection start.
if !c.reconnect.Get() {
if err := c.IdentifyCtx(ctx); err != nil {
@ -188,23 +206,8 @@ func (c *Gateway) __start(ctx context.Context) error {
return errors.Wrap(err, "failed to wait for Ready or Resumed")
}
// Start the event handler, which also handles the pacemaker death signal.
c.waitGroup.Add(1)
c.EventLoop.RunAsync(hello.HeartbeatInterval.Duration(), ch, c, func(err error) {
c.waitGroup.Done() // mark so Close() can exit.
wsutil.WSDebug("VoiceGateway: Event loop stopped.")
if err != nil {
c.ErrorLog(err)
if err := c.Reconnect(); err != nil {
c.ErrorLog(errors.Wrap(err, "failed to reconnect voice"))
}
// Reconnect should spawn another eventLoop in its Start function.
}
})
// Bind the event channel away.
c.EventLoop.SetEventChannel(ch)
wsutil.WSDebug("VoiceGateway: Started successfully.")