[bugfix] Fix occasionally streaming empty messages (#3456)

This commit is contained in:
tobi 2024-10-18 15:43:09 +02:00 committed by GitHub
parent 9f6a1f7e79
commit ffc86f9092
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 39 additions and 23 deletions

View File

@ -35,6 +35,8 @@ import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
var pingMsg = []byte("ping!")
// StreamGETHandler swagger:operation GET /api/v1/streaming streamGet // StreamGETHandler swagger:operation GET /api/v1/streaming streamGet
// //
// Initiate a websocket connection for live streaming of statuses and notifications. // Initiate a websocket connection for live streaming of statuses and notifications.
@ -389,40 +391,54 @@ func (m *Module) writeToWSConn(
) { ) {
for { for {
// Wrap context with timeout to send a ping. // Wrap context with timeout to send a ping.
pingctx, cncl := context.WithTimeout(ctx, ping) pingCtx, cncl := context.WithTimeout(ctx, ping)
// Block on receipt of msg. // Block and wait for
msg, ok := stream.Recv(pingctx) // one of the following:
//
// - receipt of msg
// - timeout of pingCtx
// - stream closed.
msg, gotMsg := stream.Recv(pingCtx)
// Check if cancel because ping. // If ping context has timed
pinged := (pingctx.Err() != nil) // out, we should send a ping.
//
// In any case cancel pingCtx
// as we're done with it.
shouldPing := (pingCtx.Err() != nil)
cncl() cncl()
switch { switch {
case !ok && pinged:
// The ping context timed out!
l.Trace("writing websocket ping")
// Wrapped context time-out, send a keep-alive "ping". // We have a message to stream.
if err := wsConn.WriteControl(websocket.PingMessage, nil, time.Time{}); err != nil { case gotMsg:
l.Tracef("writing websocket message: %+v", msg)
if err := wsConn.WriteJSON(msg); err != nil {
// If there's an error writing then drop the
// connection, as client may have disappeared
// suddenly; they can reconnect if necessary.
l.Debugf("error writing websocket message: %v", err)
break
}
// We have no message but we
// need to send a keep-alive ping.
case shouldPing:
l.Trace("writing websocket ping")
if err := wsConn.WriteControl(websocket.PingMessage, pingMsg, time.Time{}); err != nil {
// If there's an error writing then drop the
// connection, as client may have disappeared
// suddenly; they can reconnect if necessary.
l.Debugf("error writing websocket ping: %v", err) l.Debugf("error writing websocket ping: %v", err)
break break
} }
case !ok: // We have no message and we shouldn't
// Stream was // send a ping; this means the stream
// closed. // has been closed from the client's end.
default:
return return
} }
l.Trace("writing websocket message: %+v", msg)
// Received a new message from the processor.
if err := wsConn.WriteJSON(msg); err != nil {
l.Debugf("error writing websocket message: %v", err)
break
}
} }
l.Debug("finished websocket write")
} }