From 77b1b08bce7092e11eb4974ab32113845e9b53fd Mon Sep 17 00:00:00 2001 From: diamondburned Date: Thu, 30 Jul 2020 12:44:07 -0700 Subject: [PATCH] Heart: Better synchronization on close methods --- gateway/integration_test.go | 4 ++++ internal/heart/heart.go | 45 +++++++++---------------------------- 2 files changed, 15 insertions(+), 34 deletions(-) diff --git a/gateway/integration_test.go b/gateway/integration_test.go index 04645fe..2af97e3 100644 --- a/gateway/integration_test.go +++ b/gateway/integration_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/diamondburned/arikawa/internal/heart" "github.com/diamondburned/arikawa/utils/wsutil" ) @@ -17,6 +18,9 @@ func init() { wsutil.WSDebug = func(v ...interface{}) { log.Println(append([]interface{}{"Debug:"}, v...)...) } + heart.Debug = func(v ...interface{}) { + log.Println(append([]interface{}{"Heart:"}, v...)...) + } } func TestInvalidToken(t *testing.T) { diff --git a/internal/heart/heart.go b/internal/heart/heart.go index b9d3ba7..50b2be3 100644 --- a/internal/heart/heart.go +++ b/internal/heart/heart.go @@ -32,29 +32,6 @@ func (t *AtomicTime) Time() time.Time { return time.Unix(0, t.Get()) } -type atomicStop atomic.Value - -func (s *atomicStop) Stop() bool { - if v := (*atomic.Value)(s).Load(); v != nil { - ch := v.(chan struct{}) - close(ch) - return true - } - return false -} -func (s *atomicStop) Recv() <-chan struct{} { - if v := (*atomic.Value)(s).Load(); v != nil { - return v.(chan struct{}) - } - return nil -} -func (s *atomicStop) SetNil() { - (*atomic.Value)(s).Store((chan struct{})(nil)) -} -func (s *atomicStop) Reset() { - (*atomic.Value)(s).Store(make(chan struct{})) -} - type Pacemaker struct { // Heartrate is the received duration between heartbeats. Heartrate time.Duration @@ -66,7 +43,8 @@ type Pacemaker struct { // Any callback that returns an error will stop the pacer. Pace func(context.Context) error - stop atomicStop + stop chan struct{} + once sync.Once death chan error } @@ -79,7 +57,6 @@ func NewPacemaker(heartrate time.Duration, pacer func(context.Context) error) *P func (p *Pacemaker) Echo() { // Swap our received heartbeats - // p.LastBeat[0], p.LastBeat[1] = time.Now(), p.LastBeat[0] p.EchoBeat.Set(time.Now()) } @@ -107,11 +84,12 @@ func (p *Pacemaker) Dead() bool { // Stop stops the pacemaker, or it does nothing if the pacemaker is not started. func (p *Pacemaker) Stop() { - if p.stop.Stop() { - Debug("(*Pacemaker).stop was sent a stop signal.") - } else { - Debug("(*Pacemaker).stop is nil, skipping.") - } + Debug("(*Pacemaker).stop is trying sync.Once.") + + p.once.Do(func() { + Debug("(*Pacemaker).stop closed the channel.") + close(p.stop) + }) } // pace sends a heartbeat with the appropriate timeout for the context. @@ -147,7 +125,7 @@ func (p *Pacemaker) start() error { } select { - case <-p.stop.Recv(): + case <-p.stop: return nil case <-tick.C: @@ -158,7 +136,8 @@ func (p *Pacemaker) start() error { // StartAsync starts the pacemaker asynchronously. The WaitGroup is optional. func (p *Pacemaker) StartAsync(wg *sync.WaitGroup) (death chan error) { p.death = make(chan error) - p.stop.Reset() + p.stop = make(chan struct{}) + p.once = sync.Once{} if wg != nil { wg.Add(1) @@ -168,8 +147,6 @@ func (p *Pacemaker) StartAsync(wg *sync.WaitGroup) (death chan error) { p.death <- p.start() // Debug. Debug("Pacemaker returned.") - // Mark the stop channel as nil, so later Close() calls won't block forever. - p.stop.SetNil() // Mark the pacemaker loop as done. if wg != nil {