Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Write From Multi-Source (KeepAlive, etc) #228

Merged
merged 1 commit into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/client/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func (c *Client) ProcessRequest(ctx context.Context, req *http.Request, result i
// HandleResponse processes the HTTP response for both streaming and URL-based API requests.
func (c *Client) HandleResponse(res *http.Response, keys []string, resBody interface{}) (map[string]string, error) {
klog.V(6).Infof("Handle HTTP response\n")
fmt.Printf("keys: %s\v", keys)
switch res.StatusCode {
case http.StatusOK, http.StatusCreated, http.StatusNoContent:
return decodeResponseBody(res, keys, resBody)
Expand Down
75 changes: 51 additions & 24 deletions pkg/client/live/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,21 +352,26 @@ func (c *Client) Stream(r io.Reader) error {
return nil
default:
bytesRead, err := r.Read(chunk)
switch {
case err == nil:
// do nothing
case strings.Contains(err.Error(), FatalReadSocketErr):
klog.V(1).Infof("Fatal socket error: %v\n", err)
klog.V(6).Infof("live.listen() LEAVE\n")
return nil
case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry:
klog.V(3).Infof("stream object EOF\n")
klog.V(6).Infof("live.listen() LEAVE\n")
return nil
case err != nil:
klog.V(1).Infof("r.Read encountered EOF. Err: %v\n", err)
klog.V(6).Infof("live.Stream() LEAVE\n")
return err
if err != nil {
errStr := err.Error()
switch {
case strings.Contains(errStr, SuccessfulSocketErr):
klog.V(3).Infof("Graceful websocket close\n")
klog.V(6).Infof("live.Stream() LEAVE\n")
return nil
case strings.Contains(errStr, FatalReadSocketErr):
klog.V(1).Infof("Fatal socket error: %v\n", err)
klog.V(6).Infof("live.Stream() LEAVE\n")
return nil
case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry:
klog.V(3).Infof("stream object EOF\n")
klog.V(6).Infof("live.Stream() LEAVE\n")
return nil
case err != nil:
klog.V(1).Infof("r.Read encountered EOF. Err: %v\n", err)
klog.V(6).Infof("live.Stream() LEAVE\n")
return err
}
}

if bytesRead == 0 {
Expand All @@ -391,11 +396,17 @@ func (c *Client) WriteBinary(byData []byte) error {
// doing a write, need to lock
ws := c.Connect()
if ws == nil {
klog.V(1).Infof("WriteBinary Connection is not valid\n")
err := ErrInvalidConnection
klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.WriteBinary() LEAVE\n")
return ErrInvalidConnection

return err
}

// doing a write, need to lock
c.mu.Lock()
defer c.mu.Unlock()

if err := ws.WriteMessage(
websocket.BinaryMessage,
byData,
Expand All @@ -422,9 +433,11 @@ func (c *Client) WriteJSON(payload interface{}) error {
// doing a write, need to lock
ws := c.Connect()
if ws == nil {
klog.V(1).Infof("WriteJSON Connection is not valid\n")
err := ErrInvalidConnection
klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.WriteJSON() LEAVE\n")
return ErrInvalidConnection

return err
}

byData, err := json.Marshal(payload)
Expand All @@ -434,6 +447,10 @@ func (c *Client) WriteJSON(payload interface{}) error {
return err
}

// doing a write, need to lock
c.mu.Lock()
defer c.mu.Unlock()

if err := ws.WriteMessage(
websocket.TextMessage,
byData,
Expand Down Expand Up @@ -472,15 +489,20 @@ func (c *Client) Write(p []byte) (int, error) {
func (c *Client) Finalize() error {
klog.V(7).Infof("live.Finalize() ENTER\n")

if c.wsconn == nil {
// doing a write, need to lock
ws := c.Connect()
if ws == nil {
err := ErrInvalidConnection

klog.V(4).Infof("Finalize Failed. Err: %v\n", err)
klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.Finalize() LEAVE\n")

return err
}

// doing a write, need to lock
c.mu.Lock()
defer c.mu.Unlock()

err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"Finalize\" }"))

klog.V(4).Infof("Finalize Succeeded\n")
Expand Down Expand Up @@ -567,15 +589,20 @@ func (c *Client) ping() {
return
}

// doing a write, need to lock
c.mu.Lock()

// deepgram keepalive message
klog.V(5).Infof("Sending Deepgram KeepAlive message...\n")

err := c.WriteJSON(map[string]string{"type": "KeepAlive"})
err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"KeepAlive\" }"))
if err == nil {
klog.V(5).Infof("Ping sent!")
} else {
klog.V(1).Infof("Failed to send Deepgram KeepAlive. Err: %v\n", err)
}

// release
c.mu.Unlock()
}
}
}
Expand Down
Loading