diff --git a/gateway/gateway.go b/gateway/gateway.go index ca88e5c..4e6b46e 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -417,6 +417,14 @@ func (g *gatewayImpl) SendHeartbeat(ctx context.Context) { } } +// LastAcknowledgedBeat returns the last acknowledged beat. +func (g *gatewayImpl) LastAcknowledgedBeat() time.Time { + g.beatMutex.Lock() + defer g.beatMutex.Unlock() + + return g.echoBeat +} + // Close closes the state. func (g *gatewayImpl) Close() error { g.retryTimer.Stop() diff --git a/utils/ws/gateway.go b/utils/ws/gateway.go index 69c06a8..1a81726 100644 --- a/utils/ws/gateway.go +++ b/utils/ws/gateway.go @@ -11,6 +11,10 @@ import ( "github.com/pkg/errors" ) +// ErrHeartbeatTimeout is returned if the server fails to acknowledge our heart +// beat in time. +var ErrHeartbeatTimeout = errors.New("server timed out replying to heartbeat") + // ConnectionError is given to the user if the gateway fails to connect to the // gateway for any reason, including during an initial connection or a // reconnection. To check for this error, use the errors.As function. @@ -97,6 +101,7 @@ type Gateway struct { reconnect chan struct{} heart lazytime.Ticker + heartRate time.Duration srcOp <-chan Op // from WS outer outerState lastError error @@ -125,6 +130,9 @@ type Handler interface { // SendHeartbeat is called by the gateway event loop everytime a heartbeat // needs to be sent over. SendHeartbeat(context.Context) + // LastAcknowledgedBeat returns the last time that the server acknowledged + // our heart beat. + LastAcknowledgedBeat() time.Time // Close closes the handler. Close() error } @@ -263,6 +271,7 @@ func (g *Gateway) QueueReconnect() { // ResetHeartbeat resets the heartbeat to be the given duration. func (g *Gateway) ResetHeartbeat(d time.Duration) { g.heart.Reset(d) + g.heartRate = d } // SendError sends the given error wrapped in a BackgroundErrorEvent into the @@ -327,7 +336,12 @@ func (g *Gateway) spin(ctx context.Context, h Handler) { g.lastError = nil case <-g.heart.C: - h.SendHeartbeat(ctx) + if h.LastAcknowledgedBeat().Add(2 * g.heartRate).Before(time.Now()) { + g.SendError(ErrHeartbeatTimeout) + g.QueueReconnect() + } else { + h.SendHeartbeat(ctx) + } case <-g.reconnect: // Close the previous connection if it's not already. Ignore the diff --git a/voice/voicegateway/gateway.go b/voice/voicegateway/gateway.go index f8d7399..2a06d7b 100644 --- a/voice/voicegateway/gateway.go +++ b/voice/voicegateway/gateway.go @@ -45,8 +45,9 @@ type Gateway struct { gateway *ws.Gateway state State // constant - mutex sync.RWMutex - ready *ReadyEvent + mutex sync.RWMutex + beatAck time.Time + ready *ReadyEvent } // DefaultGatewayOpts contains the default options to be used for connecting to @@ -187,6 +188,10 @@ func (g *gatewayImpl) OnOp(ctx context.Context, op ws.Op) bool { g.gateway.QueueReconnect() } } + case *HeartbeatAckEvent: + g.mutex.Lock() + g.beatAck = time.Now() + g.mutex.Unlock() case *ReadyEvent: g.mutex.Lock() g.ready = data @@ -204,6 +209,13 @@ func (g *gatewayImpl) SendHeartbeat(ctx context.Context) { } } +func (g *gatewayImpl) LastAcknowledgedBeat() time.Time { + g.mutex.RLock() + defer g.mutex.RUnlock() + + return g.beatAck +} + func (g *gatewayImpl) Close() error { return nil }