1
0
Fork 0
mirror of https://github.com/diamondburned/arikawa.git synced 2024-10-02 23:58:52 +00:00

ws: Add RawEvent for debugging

This commit is contained in:
diamondburned 2022-04-01 04:41:22 -07:00
parent fd59b91de1
commit aee547fa1f
No known key found for this signature in database
GPG key ID: D78C4471CE776659
4 changed files with 80 additions and 24 deletions

View file

@ -39,20 +39,28 @@ func New() *Handler {
// Call calls all handlers with the given event. This is an internal method; use
// with care.
func (h *Handler) Call(ev interface{}) {
v := reflect.ValueOf(ev)
t := v.Type()
t := reflect.TypeOf(ev)
h.mutex.RLock()
defer h.mutex.RUnlock()
for _, entry := range h.events[t].Entries {
typedHandlers := h.events[t].Entries
anyHandlers := h.events[nil].Entries
if len(typedHandlers) == 0 && len(anyHandlers) == 0 {
return
}
v := reflect.ValueOf(ev)
for _, entry := range typedHandlers {
if entry.isInvalid() {
continue
}
entry.Call(v)
}
for _, entry := range h.events[nil].Entries {
for _, entry := range anyHandlers {
if entry.isInvalid() || entry.not(t) {
continue
}

View file

@ -1,6 +1,7 @@
package ws
import (
"context"
"io"
"net/http"
@ -50,15 +51,26 @@ func NewDecodeBuffer(cap int) DecodeBuffer {
}
}
// DecodeFrom reads the given reader and decodes it into an Op.
// DecodeInto reads the given reader and decodes it into the Op out channel.
//
// buf is optional.
func (c Codec) DecodeFrom(r io.Reader, buf *DecodeBuffer) Op {
func (c Codec) DecodeInto(ctx context.Context, r io.Reader, buf *DecodeBuffer, out chan<- Op) error {
var op codecOp
op.Data = json.Raw(buf.buf)
if err := json.DecodeStream(r, &op); err != nil {
return newErrOp(err, "cannot read JSON stream")
return c.send(ctx, out, newErrOp(err, "cannot read JSON stream"))
}
if EnableRawEvents {
dt := op.Data
op := op.Op
op.Data = &RawEvent{
Raw: dt,
OriginalCode: op.Code,
OriginalType: op.Type,
}
c.send(ctx, out, op)
}
// buf isn't grown from here out. Set it back right now. If Data hasn't been
@ -73,15 +85,24 @@ func (c Codec) DecodeFrom(r io.Reader, buf *DecodeBuffer) Op {
Op: op.Code,
Type: op.Type,
}
return newErrOp(err, "")
return c.send(ctx, out, newErrOp(err, ""))
}
op.Op.Data = fn()
if err := op.Data.UnmarshalTo(op.Op.Data); err != nil {
return newErrOp(err, "cannot unmarshal JSON data from gateway")
return c.send(ctx, out, newErrOp(err, "cannot unmarshal JSON data from gateway"))
}
return op.Op
return c.send(ctx, out, op.Op)
}
func (c *Codec) send(ctx context.Context, ch chan<- Op, op Op) error {
select {
case ch <- op:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func newErrOp(err error, wrap string) Op {

View file

@ -55,8 +55,9 @@ type Conn struct {
}
type connMutex struct {
wrmut chan struct{}
*websocket.Conn
wrmut chan struct{}
cancel context.CancelFunc
}
var _ Connection = (*Conn)(nil)
@ -101,12 +102,15 @@ func (c *Conn) Dial(ctx context.Context, addr string) (<-chan Op, error) {
return nil, errors.Wrap(err, "failed to dial WS")
}
ctx, cancel := context.WithCancel(context.Background())
events := make(chan Op, 1)
go readLoop(conn, c.codec, events)
go readLoop(ctx, conn, c.codec, events)
c.conn = &connMutex{
wrmut: make(chan struct{}, 1),
Conn: conn,
wrmut: make(chan struct{}, 1),
Conn: conn,
cancel: cancel,
}
return events, err
@ -168,6 +172,10 @@ func (c *connMutex) close(timeout time.Duration, gracefully bool) error {
}
c.Conn = nil
c.cancel()
c.cancel = nil
return err
}
@ -212,7 +220,7 @@ type loopState struct {
buf DecodeBuffer
}
func readLoop(conn *websocket.Conn, codec Codec, opCh chan<- Op) {
func readLoop(ctx context.Context, conn *websocket.Conn, codec Codec, opCh chan<- Op) {
// Clean up the events channel in the end.
defer close(opCh)
@ -224,8 +232,7 @@ func readLoop(conn *websocket.Conn, codec Codec, opCh chan<- Op) {
}
for {
b, err := state.handle()
if err != nil {
if err := state.handle(ctx, opCh); err != nil {
WSDebug("Conn: fatal Conn error:", err)
closeEv := &CloseEvent{
@ -247,16 +254,14 @@ func readLoop(conn *websocket.Conn, codec Codec, opCh chan<- Op) {
return
}
opCh <- b
}
}
func (state *loopState) handle() (Op, error) {
func (state *loopState) handle(ctx context.Context, opCh chan<- Op) error {
// skip message type
t, r, err := state.conn.NextReader()
if err != nil {
return Op{}, err
return err
}
if t == websocket.BinaryMessage {
@ -265,12 +270,12 @@ func (state *loopState) handle() (Op, error) {
if state.zlib == nil {
z, err := zlib.NewReader(r)
if err != nil {
return Op{}, errors.Wrap(err, "failed to create a zlib reader")
return errors.Wrap(err, "failed to create a zlib reader")
}
state.zlib = z
} else {
if err := state.zlib.(zlib.Resetter).Reset(r, nil); err != nil {
return Op{}, errors.Wrap(err, "failed to reset zlib reader")
return errors.Wrap(err, "failed to reset zlib reader")
}
}
@ -278,5 +283,9 @@ func (state *loopState) handle() (Op, error) {
r = state.zlib
}
return state.codec.DecodeFrom(r, &state.buf), nil
if err := state.codec.DecodeInto(ctx, r, &state.buf, opCh); err != nil {
return errors.Wrap(err, "error distributing event")
}
return nil
}

View file

@ -5,6 +5,7 @@ import (
"fmt"
"sync"
"github.com/diamondburned/arikawa/v3/utils/json"
"github.com/pkg/errors"
)
@ -35,6 +36,23 @@ func (e *CloseEvent) Op() OpCode { return -1 }
// EventType implements Event. It returns an emty string.
func (e *CloseEvent) EventType() EventType { return "__ws.CloseEvent" }
// EnableRawEvents, if true, will cause ws to generate a RawEvent for each
// regular Event. It should only be used for debugging.
var EnableRawEvents = false
// RawEvent is used if EnableRawEvents is true.
type RawEvent struct {
json.Raw
OriginalCode OpCode `json:"-"`
OriginalType EventType `json:"-"`
}
// Op implements Event. It returns -1.
func (e *RawEvent) Op() OpCode { return -1 }
// EventType implements Event. It returns an emty string.
func (e *RawEvent) EventType() EventType { return "__ws.RawEvent" }
// EventType is a type for event types, which is the "t" field in the payload.
type EventType string