diff --git a/README.md b/README.md index 3dbc3df5..9f0fec13 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,11 @@ For documentation relating to Speech-to-Text (and Intelligence) from PreRecorded For documentation relating to Text-to-Speech: +- WebSocket: + - Speak REST Client - [https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/client/speak/v1/websocket](https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/client/speak/v1/websocket) + - Speak REST API - [https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/api/speak/v1/websocket](https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/api/speak/v1/websocket) + - Speak API Interfaces - [https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/api/speak/v1/websocket/interfaces](https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/api/speak/v1/websocket/interfaces) + - REST: - Speak REST Client - [https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/client/speak/v1/rest](https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/client/speak/v1/rest) - Speak REST API - [https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/api/speak/v1/rest](https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/api/speak/v1/rest) @@ -207,6 +212,11 @@ Speech-to-Text - Live Audio: - From a Microphone - [examples/speech-to-text/websocket/microphone](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/speech-to-text/websocket/microphone/main.go) - From an HTTP Endpoint - [examples/speech-to-text/websocket/http](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/speech-to-text/websocket/http/main.go) +Text-to-Speech - WebSocket + +- Websocket Simple Example - [examples/text-to-speech/websocket/simple](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/text-to-speech/websocket/simple/main.go) +- Interactive Websocket - [examples/text-to-speech/websocket/interactive](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/text-to-speech/websocket/interactive/main.go) + Text-to-Speech - REST - Save audio to a Path - [examples/text-to-speech/rest/file](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/text-to-speech/rest/file/main.go) diff --git a/docs.go b/docs.go index d4262d78..e4c68664 100644 --- a/docs.go +++ b/docs.go @@ -30,5 +30,5 @@ import ( _ "github.com/deepgram/deepgram-go-sdk/pkg/api/listen/v1/websocket" _ "github.com/deepgram/deepgram-go-sdk/pkg/api/manage/v1" _ "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/rest" - // _ "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket" + _ "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket" ) diff --git a/examples/text-to-speech/websocket/interactive/main.go b/examples/text-to-speech/websocket/interactive/main.go new file mode 100644 index 00000000..9d15845b --- /dev/null +++ b/examples/text-to-speech/websocket/interactive/main.go @@ -0,0 +1,159 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package main + +import ( + "bufio" + "context" + "fmt" + "os" + "strings" + "time" + + msginterfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket/interfaces" + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces" + speak "github.com/deepgram/deepgram-go-sdk/pkg/client/speak" +) + +const ( + API_KEY = "" + TTS_TEXT = "Hello, this is a text to speech example using Deepgram." + AUDIO_FILE = "output.mp3" +) + +// Implement your own callback +type MyCallback struct{} + +func (c MyCallback) Metadata(md *msginterfaces.MetadataResponse) error { + fmt.Printf("\n[Metadata] Received\n") + fmt.Printf("Metadata.RequestID: %s\n", strings.TrimSpace(md.RequestID)) + return nil +} + +func (c MyCallback) Binary(byMsg []byte) error { + fmt.Printf("\n[Binary] Received\n") + + file, err := os.OpenFile(AUDIO_FILE, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o666) + if err != nil { + fmt.Printf("Error creating file %s: %v\n", AUDIO_FILE, err) + return err + } + defer file.Close() + + _, err = file.Write(byMsg) + if err != nil { + fmt.Printf("Error writing audio data to file: %v\n", err) + return err + } + + fmt.Printf("Audio data saved to %s\n", AUDIO_FILE) + return nil +} + +func (c MyCallback) Flush(fl *msginterfaces.FlushedResponse) error { + fmt.Printf("\n[Flushed] Received\n") + fmt.Printf("\n\nPress 'r' and ENTER to reset the buffer, 'f' and ENTER to flush, enter new text to send it, or just ENTER to exit...\n\n> ") + return nil +} + +func (c MyCallback) Warning(wr *msginterfaces.WarningResponse) error { + fmt.Printf("\n[Warning] Received\n") + fmt.Printf("Warning.Code: %s\n", wr.WarnCode) + fmt.Printf("Warning.Description: %s\n\n", wr.WarnMsg) + return nil +} + +func (c MyCallback) Error(er *msginterfaces.ErrorResponse) error { + fmt.Printf("\n[Error] Received\n") + fmt.Printf("Error.Code: %s\n", er.ErrCode) + fmt.Printf("Error.Description: %s\n\n", er.Description) + return nil +} + +func (c MyCallback) Close(cr *msginterfaces.CloseResponse) error { + fmt.Printf("\n[Close] Received\n") + return nil +} + +func (c MyCallback) Open(or *msginterfaces.OpenResponse) error { + fmt.Printf("\n[Open] Received\n") + return nil +} + +func main() { + // init library + speak.InitWithDefault() + + // Go context + ctx := context.Background() + + // print instructions + fmt.Print("\n\nPress ENTER to exit!\n\n") + + // set the TTS options + ttsOptions := &interfaces.SpeakOptions{ + Model: "aura-asteria-en", + } + + // set the Client options + cOptions := &interfaces.ClientOptions{} + + // create the callback + callback := MyCallback{} + + // create a new stream using the NewStream function + dgClient, err := speak.NewWebSocket(ctx, "", cOptions, ttsOptions, callback) + if err != nil { + fmt.Println("ERROR creating TTS connection:", err) + return + } + + // connect the websocket to Deepgram + bConnected := dgClient.Connect() + if !bConnected { + fmt.Println("Client.Connect failed") + os.Exit(1) + } + + // Simulate user input to reset the buffer, flush, send new text, or just exit + time.Sleep(2 * time.Second) + fmt.Printf("\n\nPress 'r' and ENTER to reset the buffer, 'f' and ENTER to flush, enter new text to send it, or just ENTER to exit...\n\n> ") + input := bufio.NewScanner(os.Stdin) + for input.Scan() { + switch input.Text() { + case "r": + err = dgClient.Reset() + if err != nil { + fmt.Printf("Error resetting buffer: %v\n", err) + } else { + fmt.Println("Buffer reset successfully.") + } + case "f": + err = dgClient.Flush() + if err != nil { + fmt.Printf("Error flushing buffer: %v\n", err) + } else { + fmt.Println("Buffer flushed successfully.") + } + case "": + goto EXIT + default: + err = dgClient.SpeakWithText(input.Text()) + if err != nil { + fmt.Printf("Error sending text input: %v\n", err) + } else { + fmt.Println("Text sent successfully.") + } + fmt.Printf("\n\nPress 'r' and ENTER to reset the buffer, 'f' and ENTER to flush, enter new text to send it, or just ENTER to exit...\n\n> ") + } + } + +EXIT: + + // close the connection + dgClient.Stop() + + fmt.Printf("Program exiting...\n") +} diff --git a/examples/text-to-speech/websocket/simple/main.go b/examples/text-to-speech/websocket/simple/main.go new file mode 100644 index 00000000..1bbccf78 --- /dev/null +++ b/examples/text-to-speech/websocket/simple/main.go @@ -0,0 +1,138 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package main + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + msginterfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket/interfaces" + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces/v1" + speak "github.com/deepgram/deepgram-go-sdk/pkg/client/speak" +) + +const ( + API_KEY = "" + TTS_TEXT = "Hello, this is a text to speech example using Deepgram." + AUDIO_FILE = "output.mp3" +) + +// Implement your own callback +type MyCallback struct{} + +func (c MyCallback) Metadata(md *msginterfaces.MetadataResponse) error { + fmt.Printf("\n[Metadata] Received\n") + fmt.Printf("Metadata.RequestID: %s\n", strings.TrimSpace(md.RequestID)) + return nil +} + +func (c MyCallback) Binary(byMsg []byte) error { + fmt.Printf("\n[Binary] Received\n") + + file, err := os.OpenFile(AUDIO_FILE, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o666) + if err != nil { + fmt.Printf("Error creating file %s: %v\n", AUDIO_FILE, err) + return err + } + defer file.Close() + + _, err = file.Write(byMsg) + if err != nil { + fmt.Printf("Error writing audio data to file: %v\n", err) + return err + } + + fmt.Printf("Audio data saved to %s\n", AUDIO_FILE) + return nil +} + +func (c MyCallback) Flush(fl *msginterfaces.FlushedResponse) error { + fmt.Printf("\n[Flushed] Received\n") + return nil +} + +func (c MyCallback) Warning(wr *msginterfaces.WarningResponse) error { + fmt.Printf("\n[Warning] Received\n") + fmt.Printf("Warning.Code: %s\n", wr.WarnCode) + fmt.Printf("Warning.Description: %s\n\n", wr.WarnMsg) + return nil +} + +func (c MyCallback) Error(er *msginterfaces.ErrorResponse) error { + fmt.Printf("\n[Error] Received\n") + fmt.Printf("Error.Code: %s\n", er.ErrCode) + fmt.Printf("Error.Description: %s\n\n", er.ErrMsg) + return nil +} + +func (c MyCallback) Close(cr *msginterfaces.CloseResponse) error { + fmt.Printf("\n[Close] Received\n") + return nil +} + +func (c MyCallback) Open(or *msginterfaces.OpenResponse) error { + fmt.Printf("\n[Open] Received\n") + return nil +} + +func main() { + // init library + speak.Init(speak.InitLib{ + LogLevel: speak.LogLevelDefault, // LogLevelDefault, LogLevelFull, LogLevelDebug, LogLevelTrace + }) + + // Go context + ctx := context.Background() + + // set the Client options + cOptions := &interfaces.ClientOptions{} + + // set the TTS options + ttsOptions := &interfaces.SpeakOptions{ + Model: "aura-asteria-en", + } + + // create the callback + callback := MyCallback{} + + // create a new stream using the NewStream function + dgClient, err := speak.NewWebSocket(ctx, "", cOptions, ttsOptions, callback) + if err != nil { + fmt.Println("ERROR creating TTS connection:", err) + return + } + + // connect the websocket to Deepgram + bConnected := dgClient.Connect() + if !bConnected { + fmt.Println("Client.Connect failed") + os.Exit(1) + } + + // Send the text input + err = dgClient.SpeakWithText(TTS_TEXT) + if err != nil { + fmt.Printf("Error sending text input: %v\n", err) + return + } + + // Flush the text input + err = dgClient.Flush() + if err != nil { + fmt.Printf("Error sending text input: %v\n", err) + return + } + + // wait for user input to exit + time.Sleep(5 * time.Second) + + // close the connection + dgClient.Stop() + + fmt.Printf("Program exiting...\n") +} diff --git a/pkg/api/speak/v1/websocket/constants.go b/pkg/api/speak/v1/websocket/constants.go new file mode 100644 index 00000000..3b175d19 --- /dev/null +++ b/pkg/api/speak/v1/websocket/constants.go @@ -0,0 +1,25 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package websocketv1 + +import ( + "errors" +) + +const ( + PackageVersion string = "v1.0" +) + +// errors +var ( + // ErrInvalidInput required input was not found + ErrInvalidInput = errors.New("required input was not found") + + // ErrInvalidMessageType invalid message type + ErrInvalidMessageType = errors.New("invalid message type") + + // ErrUserCallbackNotDefined user callback object not defined + ErrUserCallbackNotDefined = errors.New("user callback object not defined") +) diff --git a/pkg/api/speak/v1/websocket/default.go b/pkg/api/speak/v1/websocket/default.go new file mode 100644 index 00000000..b2fc33e3 --- /dev/null +++ b/pkg/api/speak/v1/websocket/default.go @@ -0,0 +1,249 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package websocketv1 + +import ( + "encoding/json" + "fmt" + "os" + "strings" + + prettyjson "github.com/hokaccha/go-prettyjson" + klog "k8s.io/klog/v2" + + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket/interfaces" +) + +// DefaultCallbackHandler is a default callback handler for text-to-speech connections +type DefaultCallbackHandler struct{} + +// NewDefaultCallbackHandler creates a new DefaultCallbackHandler +func NewDefaultCallbackHandler() DefaultCallbackHandler { + return DefaultCallbackHandler{} +} + +// Open is the callback for when the connection opens +func (dch DefaultCallbackHandler) Open(or *interfaces.OpenResponse) error { + var debugStr string + if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { + klog.V(4).Infof("DEEPGRAM_DEBUG found") + debugStr = v + } + + if strings.EqualFold(debugStr, "true") { + data, err := json.Marshal(or) + if err != nil { + klog.V(1).Infof("Open json.Marshal failed. Err: %v\n", err) + return err + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + return err + } + klog.V(2).Infof("\n\nOpen Object:\n%s\n\n", prettyJSON) + + return nil + } + + // handle the message + fmt.Printf("\n\n[OpenResponse]\n\n") + + return nil +} + +// Metadata is the callback for information about the connection +func (dch DefaultCallbackHandler) Metadata(md *interfaces.MetadataResponse) error { + var debugStr string + if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { + klog.V(4).Infof("DEEPGRAM_DEBUG found") + debugStr = v + } + + if strings.EqualFold(debugStr, "true") { + data, err := json.Marshal(md) + if err != nil { + klog.V(1).Infof("Metadata json.Marshal failed. Err: %v\n", err) + return err + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + return err + } + klog.V(2).Infof("\n\nMetadata Object:\n%s\n\n", prettyJSON) + + return nil + } + + // handle the message + fmt.Printf("\n\nMetadata.RequestID: %s\n", strings.TrimSpace(md.RequestID)) + + return nil +} + +// Flushed is the callback for when the connection flushes +func (dch DefaultCallbackHandler) Flush(fr *interfaces.FlushedResponse) error { + var debugStr string + if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { + klog.V(4).Infof("DEEPGRAM_DEBUG found") + debugStr = v + } + + if strings.EqualFold(debugStr, "true") { + data, err := json.Marshal(fr) + if err != nil { + klog.V(1).Infof("Flush json.Marshal failed. Err: %v\n", err) + return err + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + return err + } + klog.V(2).Infof("\n\nFlush Object:\n%s\n\n", prettyJSON) + + return nil + } + + // handle the message + fmt.Printf("\n\nFlushed.SequenceID: %d\n", fr.SequenceID) + + return nil +} + +// Binary is the callback for when the connection receives binary data +func (dch DefaultCallbackHandler) Binary(br []byte) error { + klog.V(3).Infof("Received binary data: %d bytes", len(br)) + return nil +} + +// Close is the callback for when the connection closes +func (dch DefaultCallbackHandler) Close(or *interfaces.CloseResponse) error { + var debugStr string + if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { + klog.V(4).Infof("DEEPGRAM_DEBUG found") + debugStr = v + } + + if strings.EqualFold(debugStr, "true") { + data, err := json.Marshal(or) + if err != nil { + klog.V(1).Infof("Close json.Marshal failed. Err: %v\n", err) + return err + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + return err + } + klog.V(2).Infof("\n\nClose Object:\n%s\n\n", prettyJSON) + + return nil + } + + // handle the message + fmt.Printf("\n\n[CloseResponse]\n\n") + + return nil +} + +// Warning is the callback for error messages +func (dch DefaultCallbackHandler) Warning(wr *interfaces.WarningResponse) error { + var debugStr string + if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { + klog.V(4).Infof("DEEPGRAM_DEBUG found") + debugStr = v + } + + if strings.EqualFold(debugStr, "true") { + data, err := json.Marshal(wr) + if err != nil { + klog.V(1).Infof("Error json.Marshal failed. Err: %v\n", err) + return err + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + return err + } + klog.V(2).Infof("\n\nWarning Object:\n%s\n\n", prettyJSON) + + return nil + } + + // handle the message + fmt.Printf("\n[WarningResponse]\n") + fmt.Printf("\nError.Code: %s\n", wr.WarnCode) + fmt.Printf("Error.Message: %s\n", wr.WarnMsg) + + return nil +} + +// Error is the callback for error messages +func (dch DefaultCallbackHandler) Error(er *interfaces.ErrorResponse) error { + var debugStr string + if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { + klog.V(4).Infof("DEEPGRAM_DEBUG found") + debugStr = v + } + + if strings.EqualFold(debugStr, "true") { + data, err := json.Marshal(er) + if err != nil { + klog.V(1).Infof("Error json.Marshal failed. Err: %v\n", err) + return err + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + return err + } + klog.V(2).Infof("\n\nError Object:\n%s\n\n", prettyJSON) + + return nil + } + + // handle the message + fmt.Printf("\n[ErrorResponse]\n") + fmt.Printf("\nError.Type: %s\n", er.ErrCode) + fmt.Printf("Error.Message: %s\n", er.ErrMsg) + fmt.Printf("Error.Description: %s\n\n", er.Description) + fmt.Printf("Error.Variant: %s\n\n", er.Variant) + + return nil +} + +// UnhandledEvent is the callback for unknown messages +func (dch DefaultCallbackHandler) UnhandledEvent(byData []byte) error { + var debugStr string + if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { + klog.V(4).Infof("DEEPGRAM_DEBUG found") + debugStr = v + } + + if strings.EqualFold(debugStr, "true") { + prettyJSON, err := prettyjson.Format(byData) + if err != nil { + klog.V(2).Infof("\n\nRaw Data:\n%s\n\n", string(byData)) + } else { + klog.V(2).Infof("\n\nError Object:\n%s\n\n", prettyJSON) + } + + return nil + } + + // handle the message + fmt.Printf("\n[UnhandledEvent]") + fmt.Printf("Dump:\n%s\n\n", string(byData)) + + return nil +} diff --git a/pkg/api/speak/v1/websocket/interfaces/constants.go b/pkg/api/speak/v1/websocket/interfaces/constants.go new file mode 100644 index 00000000..63ddcff1 --- /dev/null +++ b/pkg/api/speak/v1/websocket/interfaces/constants.go @@ -0,0 +1,18 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package interfacesv1 + +// These are the message types that can be received from the text-to-speech streaming API +const ( + // message types + TypeOpenResponse string = "Open" + TypeMetadataResponse string = "Metadata" + TypeFlushedResponse string = "Flushed" + TypeCloseResponse string = "Close" + + // "Error" type + TypeWarningResponse string = "Warning" + TypeErrorResponse string = "Error" +) diff --git a/pkg/api/speak/v1/websocket/interfaces/interfaces.go b/pkg/api/speak/v1/websocket/interfaces/interfaces.go new file mode 100644 index 00000000..b6fb1a6c --- /dev/null +++ b/pkg/api/speak/v1/websocket/interfaces/interfaces.go @@ -0,0 +1,21 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +// This package defines interfaces for the live API +package interfacesv1 + +// SpeakMessageCallback is a callback used to receive notifications for platforms messages +type SpeakMessageCallback interface { + // These are WS TextMessage that are used for flow control. + Open(or *OpenResponse) error + Metadata(md *MetadataResponse) error + Flush(fl *FlushedResponse) error + Close(cr *CloseResponse) error + + Warning(er *WarningResponse) error + Error(er *ErrorResponse) error + + // These are WS BinaryMessage that are used to send audio data to the client + Binary(byMsg []byte) error +} diff --git a/pkg/api/speak/v1/websocket/interfaces/types.go b/pkg/api/speak/v1/websocket/interfaces/types.go new file mode 100644 index 00000000..f4c7fb93 --- /dev/null +++ b/pkg/api/speak/v1/websocket/interfaces/types.go @@ -0,0 +1,58 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package interfacesv1 + +import ( + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces" +) + +/***********************************/ +// Request/Input structs +/***********************************/ +type SpeakOptions interfaces.SpeakOptions + +/***********************************/ +// MessageType is the header to bootstrap you way unmarshalling other messages +/***********************************/ +/* + Example: + { + "type": "message", + "message": { + ... + } + } +*/ +type MessageType struct { + Type string `json:"type"` +} + +// MetadataResponse is the response from the text-to-speech request which contains metadata about the request +type MetadataResponse struct { + Type string `json:"type,omitempty"` + RequestID string `json:"request_id,omitempty"` +} + +// FlushedResponse is the response which indicates that the server has flushed the buffer and is ready to return audio +type FlushedResponse struct { + Type string `json:"type,omitempty"` + SequenceID int `json:"sequence_id,omitempty"` +} + +// OpenResponse is the response from the connection opening +type OpenResponse struct { + Type string `json:"type,omitempty"` +} + +// CloseResponse is the response from the connection closing +type CloseResponse struct { + Type string `json:"type,omitempty"` +} + +// WarningResponse is the Deepgram specific response warning +type WarningResponse interfaces.DeepgramWarning + +// ErrorResponse is the Deepgram specific response error +type ErrorResponse interfaces.DeepgramError diff --git a/pkg/api/speak/v1/websocket/router.go b/pkg/api/speak/v1/websocket/router.go new file mode 100644 index 00000000..a0e15116 --- /dev/null +++ b/pkg/api/speak/v1/websocket/router.go @@ -0,0 +1,195 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package websocketv1 + +import ( + "encoding/json" + "os" + "strings" + + prettyjson "github.com/hokaccha/go-prettyjson" + klog "k8s.io/klog/v2" + + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket/interfaces" +) + +// MessageRouter routes events +type MessageRouter struct { + callback interfaces.SpeakMessageCallback + debugWebsocket bool +} + +// NewWithDefault creates a MessageRouter with the default callback handler +func NewWithDefault() *MessageRouter { + return NewStream(NewDefaultCallbackHandler()) +} + +// New creates a MessageRouter with a user-defined callback +func NewStream(callback interfaces.SpeakMessageCallback) *MessageRouter { + debugStr := os.Getenv("DEEPGRAM_DEBUG") + return &MessageRouter{ + callback: callback, + debugWebsocket: strings.EqualFold(strings.ToLower(debugStr), "true"), + } +} + +// OpenHelper handles the OpenResponse message +func (r *MessageRouter) OpenHelper(or *interfaces.OpenResponse) error { + return r.callback.Open(or) +} + +// CloseHelper handles the OpenResponse message +func (r *MessageRouter) CloseHelper(or *interfaces.CloseResponse) error { + return r.callback.Close(or) +} + +// ErrorHelper handles the ErrorResponse message +func (r *MessageRouter) ErrorHelper(er *interfaces.ErrorResponse) error { + return r.callback.Error(er) +} + +// processMessage generalizes the handling of all message types +func (r *MessageRouter) processGeneric(msgType string, byMsg []byte, action func(data *interface{}) error, data interface{}) error { + klog.V(6).Infof("router.%s ENTER\n", msgType) + + r.printDebugMessages(5, msgType, byMsg) + + var err error + if err = action(&data); err != nil { + klog.V(1).Infof("callback.%s failed. Err: %v\n", msgType, err) + } else { + klog.V(5).Infof("callback.%s succeeded\n", msgType) + } + klog.V(6).Infof("router.%s LEAVE\n", msgType) + + return err +} + +func (r *MessageRouter) processFlushed(byMsg []byte) error { + var msg interfaces.FlushedResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + return err + } + + action := func(data *interface{}) error { + return r.callback.Flush(&msg) + } + + return r.processGeneric("MessageResponse", byMsg, action, msg) +} + +func (r *MessageRouter) processMetadata(byMsg []byte) error { + var msg interfaces.MetadataResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + return err + } + + action := func(data *interface{}) error { + return r.callback.Metadata(&msg) + } + + return r.processGeneric("MetadataResponse", byMsg, action, msg) +} + +func (r *MessageRouter) processWarningResponse(byMsg []byte) error { + var msg interfaces.WarningResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + return err + } + + action := func(data *interface{}) error { + return r.callback.Warning(&msg) + } + + return r.processGeneric("WarningResponse", byMsg, action, msg) +} + +func (r *MessageRouter) processErrorResponse(byMsg []byte) error { + var msg interfaces.ErrorResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + return err + } + + action := func(data *interface{}) error { + return r.callback.Error(&msg) + } + + return r.processGeneric("ErrorResponse", byMsg, action, msg) +} + +// Message handles platform messages and routes them appropriately based on the MessageType +func (r *MessageRouter) Message(byMsg []byte) error { + klog.V(6).Infof("router.Message ENTER\n") + + if r.debugWebsocket { + klog.V(5).Infof("Raw Message:\n%s\n", string(byMsg)) + } + + var mt interfaces.MessageType + if err := json.Unmarshal(byMsg, &mt); err != nil { + klog.V(1).Infof("json.Unmarshal(MessageType) failed. Err: %v\n", err) + klog.V(6).Infof("router.Message LEAVE\n") + return err + } + + var err error + switch mt.Type { + case interfaces.TypeFlushedResponse: + err = r.processFlushed(byMsg) + case interfaces.TypeMetadataResponse: + err = r.processMetadata(byMsg) + case interfaces.TypeWarningResponse: + err = r.processWarningResponse(byMsg) + case interfaces.TypeErrorResponse: + err = r.processErrorResponse(byMsg) + default: + err = r.UnhandledMessage(byMsg) + klog.V(1).Infof("Message type %s is unhandled\n", mt.Type) + } + + if err == nil { + klog.V(6).Infof("MessageType(%s) after - Result: succeeded\n", mt.Type) + } else { + klog.V(5).Infof("MessageType(%s) after - Result: %v\n", mt.Type, err) + } + klog.V(6).Infof("router.Message LEAVE\n") + return err +} + +// Binary handles binary messages +func (r *MessageRouter) Binary(byMsg []byte) error { + klog.V(6).Infof("router.Binary ENTER\n") + + err := r.callback.Binary(byMsg) + if err != nil { + klog.V(1).Infof("callback.Binary failed. Err: %v\n", err) + } else { + klog.V(5).Infof("callback.Binary succeeded\n") + } + + klog.V(6).Infof("router.Binary LEAVE\n") + return err +} + +// UnhandledMessage logs and handles any unexpected message types +func (r *MessageRouter) UnhandledMessage(byMsg []byte) error { + klog.V(6).Infof("router.UnhandledMessage ENTER\n") + r.printDebugMessages(3, "UnhandledMessage", byMsg) + klog.V(1).Infof("Unknown Event was received\n") + klog.V(6).Infof("router.UnhandledMessage LEAVE\n") + return ErrInvalidMessageType +} + +// printDebugMessages formats and logs debugging messages +func (r *MessageRouter) printDebugMessages(level klog.Level, function string, byMsg []byte) { + prettyJSON, err := prettyjson.Format(byMsg) + if err != nil { + klog.V(1).Infof("prettyjson.Format failed. Err: %v\n", err) + return + } + klog.V(level).Infof("\n\n-----------------------------------------------\n") + klog.V(level).Infof("%s RAW:\n%s\n", function, prettyJSON) + klog.V(level).Infof("-----------------------------------------------\n\n\n") +} diff --git a/pkg/client/speak/client.go b/pkg/client/speak/client.go index b87d5308..f3968f79 100644 --- a/pkg/client/speak/client.go +++ b/pkg/client/speak/client.go @@ -8,10 +8,12 @@ This package provides the speak client implementation for the Deepgram API package speak import ( - // msginterfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket/interfaces" + "context" + + msginterfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket/interfaces" interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces/v1" speakv1rest "github.com/deepgram/deepgram-go-sdk/pkg/client/speak/v1/rest" - // speakv1ws "github.com/deepgram/deepgram-go-sdk/pkg/client/speak/v1/websocket" + speakv1ws "github.com/deepgram/deepgram-go-sdk/pkg/client/speak/v1/websocket" ) /***********************************/ @@ -77,68 +79,68 @@ func NewREST(apiKey string, options *interfaces.ClientOptions) *speakv1rest.Clie return speakv1rest.New(apiKey, options) } -// /***********************************/ -// // WebSocket Client -// /***********************************/ -// const ( -// WebSocketPackageVersion = speakv1ws.PackageVersion -// ) - -// type WebSocketClient = speakv1ws.Client - -// /* -// NewWebSocketForDemo creates a new websocket connection with all default options - -// Notes: -// - The Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY -// */ -// func NewWebSocketForDemo(ctx context.Context, options *interfaces.SpeakOptions) (*speakv1ws.Client, error) { -// return speakv1ws.NewWebSocketForDemo(ctx, options) -// } - -// /* -// NewStreamWithDefaults creates a new websocket connection with all default options - -// Notes: -// - The callback handler is set to the default handler -// */ -// func NewWebSocketWithDefaults(ctx context.Context, options *interfaces.SpeakOptions, callback msginterfaces.SpeakMessageCallback) (*speakv1ws.Client, error) { -// return speakv1ws.NewWebSocketWithDefaults(ctx, options, callback) -// } - -// /* -// NewStream creates a new websocket connection with the specified options - -// Input parameters: -// - ctx: context.Context object -// - apiKey: string containing the Deepgram API key -// - cOptions: ClientOptions which allows overriding things like hostname, version of the API, etc. -// - sOptions: SpeakOptions which allows overriding things like model, etc. -// - callback: SpeakMessageCallback is a callback which lets you perform actions based on platform messages - -// Notes: -// - If apiKey is an empty string, the Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY -// - The callback handler is set to the default handler -// */ -// func NewWebSocket(ctx context.Context, apiKey string, cOptions *interfaces.ClientOptions, sOptions *interfaces.SpeakOptions, callback msginterfaces.SpeakMessageCallback) (*speakv1ws.Client, error) { -// return speakv1ws.NewWebSocket(ctx, apiKey, cOptions, sOptions, callback) -// } - -// /* -// NewWebSocketWithCancel creates a new websocket connection but has facilities to BYOC (Bring Your Own Cancel) - -// Input parameters: -// - ctx: context.Context object -// - ctxCancel: allow passing in own cancel -// - apiKey: string containing the Deepgram API key -// - cOptions: ClientOptions which allows overriding things like hostname, version of the API, etc. -// - sOptions: SpeakOptions which allows overriding things like model, etc. -// - callback: SpeakMessageCallback is a callback which lets you perform actions based on platform messages - -// Notes: -// - If apiKey is an empty string, the Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY -// - The callback handler is set to the default handler -// */ -// func NewWebSocketWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey string, cOptions *interfaces.ClientOptions, sOptions *interfaces.SpeakOptions, callback msginterfaces.SpeakMessageCallback) (*speakv1ws.Client, error) { -// return speakv1ws.NewWebSocketWithCancel(ctx, ctxCancel, apiKey, cOptions, sOptions, callback) -// } +/***********************************/ +// WebSocket Client +/***********************************/ +const ( + WebSocketPackageVersion = speakv1ws.PackageVersion +) + +type WebSocketClient = speakv1ws.Client + +/* +NewWebSocketForDemo creates a new websocket connection with all default options + +Notes: + - The Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY +*/ +func NewWebSocketForDemo(ctx context.Context, options *interfaces.SpeakOptions) (*speakv1ws.Client, error) { + return speakv1ws.NewWebSocketForDemo(ctx, options) +} + +/* +NewStreamWithDefaults creates a new websocket connection with all default options + +Notes: + - The callback handler is set to the default handler +*/ +func NewWebSocketWithDefaults(ctx context.Context, options *interfaces.SpeakOptions, callback msginterfaces.SpeakMessageCallback) (*speakv1ws.Client, error) { + return speakv1ws.NewWebSocketWithDefaults(ctx, options, callback) +} + +/* +NewStream creates a new websocket connection with the specified options + +Input parameters: +- ctx: context.Context object +- apiKey: string containing the Deepgram API key +- cOptions: ClientOptions which allows overriding things like hostname, version of the API, etc. +- sOptions: SpeakOptions which allows overriding things like model, etc. +- callback: SpeakMessageCallback is a callback which lets you perform actions based on platform messages + +Notes: + - If apiKey is an empty string, the Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY + - The callback handler is set to the default handler +*/ +func NewWebSocket(ctx context.Context, apiKey string, cOptions *interfaces.ClientOptions, sOptions *interfaces.SpeakOptions, callback msginterfaces.SpeakMessageCallback) (*speakv1ws.Client, error) { + return speakv1ws.NewWebSocket(ctx, apiKey, cOptions, sOptions, callback) +} + +/* +NewWebSocketWithCancel creates a new websocket connection but has facilities to BYOC (Bring Your Own Cancel) + +Input parameters: +- ctx: context.Context object +- ctxCancel: allow passing in own cancel +- apiKey: string containing the Deepgram API key +- cOptions: ClientOptions which allows overriding things like hostname, version of the API, etc. +- sOptions: SpeakOptions which allows overriding things like model, etc. +- callback: SpeakMessageCallback is a callback which lets you perform actions based on platform messages + +Notes: + - If apiKey is an empty string, the Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY + - The callback handler is set to the default handler +*/ +func NewWebSocketWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey string, cOptions *interfaces.ClientOptions, sOptions *interfaces.SpeakOptions, callback msginterfaces.SpeakMessageCallback) (*speakv1ws.Client, error) { + return speakv1ws.NewWebSocketWithCancel(ctx, ctxCancel, apiKey, cOptions, sOptions, callback) +} diff --git a/pkg/client/speak/v1/websocket/client.go b/pkg/client/speak/v1/websocket/client.go new file mode 100644 index 00000000..c803ffd6 --- /dev/null +++ b/pkg/client/speak/v1/websocket/client.go @@ -0,0 +1,745 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +// This package provides the speak/streaming client implementation for the Deepgram API +package websocketv1 + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "net/http" + "regexp" + "strings" + "time" + + "github.com/dvonthenen/websocket" + klog "k8s.io/klog/v2" + + speak "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket" + msginterfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket/interfaces" + version "github.com/deepgram/deepgram-go-sdk/pkg/api/version" + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces/v1" +) + +type controlMessage struct { + Type string `json:"type"` +} +type TextSource struct { + Type string `json:"type"` + Text string `json:"text"` +} + +/* +NewWebSocketForDemo creates a new websocket connection with all default options + +Notes: + - The Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY +*/ +func NewWebSocketForDemo(ctx context.Context, options *interfaces.SpeakOptions) (*Client, error) { + return NewWebSocket(ctx, "", &interfaces.ClientOptions{}, options, nil) +} + +/* +NewWebSocketWithDefaults creates a new websocket connection with all default options + +Notes: + - The callback handler is set to the default handler +*/ +func NewWebSocketWithDefaults(ctx context.Context, options *interfaces.SpeakOptions, callback msginterfaces.SpeakMessageCallback) (*Client, error) { + return NewWebSocket(ctx, "", &interfaces.ClientOptions{}, options, callback) +} + +/* +NewWebSocket creates a new websocket connection with the specified options + +Input parameters: +- ctx: context.Context object +- apiKey: string containing the Deepgram API key +- cOptions: ClientOptions which allows overriding things like hostname, version of the API, etc. +- sOptions: SpeakOptions which allows overriding things like model, etc. +- callback: SpeakMessageCallback is a callback which lets you perform actions based on platform messages + +Notes: + - If apiKey is an empty string, the Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY + - The callback handler is set to the default handler +*/ +func NewWebSocket(ctx context.Context, apiKey string, cOptions *interfaces.ClientOptions, sOptions *interfaces.SpeakOptions, callback msginterfaces.SpeakMessageCallback) (*Client, error) { + ctx, ctxCancel := context.WithCancel(ctx) + return NewWebSocketWithCancel(ctx, ctxCancel, apiKey, cOptions, sOptions, callback) +} + +/* +NewWebSocketWithCancel creates a new websocket connection with the specified options + +Input parameters: +- ctx: context.Context object +- ctxCancel: allow passing in own cancel +- apiKey: string containing the Deepgram API key +- cOptions: ClientOptions which allows overriding things like hostname, version of the API, etc. +- sOptions: SpeakOptions which allows overriding things like model, etc. +- callback: SpeakMessageCallback is a callback which lets you perform actions based on platform messages + +Notes: + - If apiKey is an empty string, the Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY + - The callback handler is set to the default handler +*/ +func NewWebSocketWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey string, cOptions *interfaces.ClientOptions, sOptions *interfaces.SpeakOptions, callback msginterfaces.SpeakMessageCallback) (*Client, error) { + klog.V(6).Infof("speak.New() ENTER\n") + + if apiKey != "" { + cOptions.APIKey = apiKey + } + err := cOptions.Parse() + if err != nil { + klog.V(1).Infof("ClientOptions.Parse() failed. Err: %v\n", err) + return nil, err + } + err = sOptions.Check() + if err != nil { + klog.V(1).Infof("SpeakOptions.Check() failed. Err: %v\n", err) + return nil, err + } + + if callback == nil { + klog.V(2).Infof("Using DefaultCallbackHandler.\n") + callback = speak.NewDefaultCallbackHandler() + } + + // init + conn := Client{ + cOptions: cOptions, + sOptions: sOptions, + sendBuf: make(chan []byte, 1), + callback: callback, + router: speak.NewStream(callback), + ctx: ctx, + ctxCancel: ctxCancel, + retry: true, + } + + klog.V(3).Infof("NewDeepGramWSClient Succeeded\n") + klog.V(6).Infof("speak.New() LEAVE\n") + + return &conn, nil +} + +// Connect performs a websocket connection with "DefaultConnectRetry" number of retries. +func (c *Client) Connect() bool { + // set the retry count + if c.retryCnt == 0 { + c.retryCnt = DefaultConnectRetry + } + return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt), true) != nil +} + +// ConnectWithCancel performs a websocket connection with specified number of retries and providing a +// cancel function to stop the connection +func (c *Client) ConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) bool { + return c.internalConnectWithCancel(ctx, ctxCancel, retryCnt, true) != nil +} + +// AttemptReconnect performs a reconnect after failing retries +func (c *Client) AttemptReconnect(ctx context.Context, retries int64) bool { + c.retry = true + c.ctx, c.ctxCancel = context.WithCancel(ctx) + return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(retries), true) != nil +} + +// AttemptReconnect performs a reconnect after failing retries and providing a cancel function +func (c *Client) AttemptReconnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retries int64) bool { + c.retry = true + return c.internalConnectWithCancel(ctx, ctxCancel, int(retries), true) != nil +} + +func (c *Client) internalConnect() *websocket.Conn { + return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt), false) +} + +//nolint:funlen // this is a complex function. keep as is +func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int, lock bool) *websocket.Conn { + klog.V(7).Infof("speak.internalConnectWithCancel() ENTER\n") + + // set the context + c.ctx = ctx + c.ctxCancel = ctxCancel + c.retryCnt = int64(retryCnt) + + // we explicitly stopped and should not attempt to reconnect + if !c.retry { + klog.V(7).Infof("This connection has been terminated. Please either call with AttemptReconnect or create a new Client object using NewWebSocketClient.") + klog.V(7).Infof("speak.internalConnectWithCancel() LEAVE\n") + return nil + } + + // lock conn access + if lock { + klog.V(3).Infof("Locking connection mutex\n") + c.muConn.Lock() + defer c.muConn.Unlock() + } + + // if the connection is good, return it otherwise, attempt reconnect + if c.wsconn != nil { + select { + case <-c.ctx.Done(): + klog.V(1).Infof("Connection is not valid\n") + klog.V(7).Infof("speak.internalConnectWithCancel() LEAVE\n") + return nil + default: + klog.V(7).Infof("Connection is good. Return object.") + klog.V(7).Infof("speak.internalConnectWithCancel() LEAVE\n") + return c.wsconn + } + } else { + select { + case <-c.ctx.Done(): + klog.V(1).Infof("Context is not valid. Has been canceled.\n") + klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n") + return nil + default: + klog.V(3).Infof("Context is still valid. Retry...\n") + } + } + + dialer := websocket.Dialer{ + HandshakeTimeout: 45 * time.Second, + /* #nosec G402 */ + TLSClientConfig: &tls.Config{InsecureSkipVerify: c.cOptions.SkipServerAuth}, + RedirectService: c.cOptions.RedirectService, + SkipServerAuth: c.cOptions.SkipServerAuth, + } + + // set websocket headers + myHeader := http.Header{} + + // restore application options to HTTP header + if headers, ok := c.ctx.Value(interfaces.HeadersContext{}).(http.Header); ok { + for k, v := range headers { + for _, v := range v { + klog.V(3).Infof("internalConnectWithCancel RESTORE Header: %s = %s\n", k, v) + myHeader.Add(k, v) + } + } + } + + // sets the API key + myHeader.Set("Host", c.cOptions.Host) + myHeader.Set("Authorization", "token "+c.cOptions.APIKey) + myHeader.Set("User-Agent", interfaces.DgAgent) + + // attempt to establish connection + i := int64(0) + for { + if i >= c.retryCnt { + klog.V(3).Infof("Connect timeout... exiting!\n") + c.retry = false + break + } + + // delay on subsequent calls + if i > 0 { + klog.V(2).Infof("Sleep for retry #%d...\n", i) + time.Sleep(time.Second * time.Duration(defaultDelayBetweenRetry)) + } + + i++ + + // create new connection + url, err := version.GetSpeakStreamAPI(c.ctx, c.cOptions.Host, c.cOptions.APIVersion, c.cOptions.Path, c.sOptions) + if err != nil { + klog.V(1).Infof("version.GetSpeakAPI failed. Err: %v\n", err) + klog.V(7).Infof("speak.internalConnectWithCancel() LEAVE\n") + return nil // no point in retrying because this is going to fail on every retry + } + klog.V(5).Infof("Connecting to %s\n", url) + + // perform the websocket connection + ws, res, err := dialer.DialContext(c.ctx, url, myHeader) + if res != nil { + klog.V(3).Infof("HTTP Response: %s\n", res.Status) + res.Body.Close() + } + if err != nil { + klog.V(1).Infof("Cannot connect to websocket: %s\n", c.cOptions.Host) + klog.V(1).Infof("Dialer failed. Err: %v\n", err) + continue + } + + // set the object to allow threads to function + c.wsconn = ws + c.retry = true + + // kick off threads to listen for messages + go c.listen() + + // fire off open connection + err = c.router.OpenHelper(&msginterfaces.OpenResponse{ + Type: msginterfaces.TypeOpenResponse, + }) + if err != nil { + klog.V(1).Infof("router.OpenHelper failed. Err: %v\n", err) + } + + klog.V(3).Infof("WebSocket Connection Successful!") + klog.V(7).Infof("speak.internalConnectWithCancel() LEAVE\n") + + return c.wsconn + } + + // if we get here, we failed to connect + klog.V(1).Infof("Failed to connect to websocket: %s\n", c.cOptions.Host) + klog.V(7).Infof("speak.ConnectWithRetry() LEAVE\n") + + return nil +} + +//nolint:funlen,gocyclo // this is a complex function. keep as is +func (c *Client) listen() { + klog.V(6).Infof("speak.listen() ENTER\n") + + defer func() { + if r := recover(); r != nil { + klog.V(1).Infof("Panic triggered\n") + + // send error on callback + err := ErrFatalPanicRecovered + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("listen: Fatal socket error. Err: %v\n", sendErr) + } + + // fatal close + c.closeWs(true) + + klog.V(6).Infof("live.flush() LEAVE\n") + return + } + }() + + for { + // doing a read, need to lock + c.muConn.Lock() + + // get the connection + ws := c.internalConnect() + if ws == nil { + // release + c.muConn.Unlock() + + klog.V(3).Infof("listen: Connection is not valid\n") + klog.V(6).Infof("live.listen() LEAVE\n") + return + } + + // release the lock + c.muConn.Unlock() + + // msgType can be binary or text + msgType, byMsg, err := ws.ReadMessage() + + if err != nil { + errStr := err.Error() + switch { + case strings.Contains(errStr, SuccessfulSocketErr): + klog.V(3).Infof("Graceful websocket close\n") + + // graceful close + c.closeWs(false) + + klog.V(6).Infof("speak.listen() LEAVE\n") + return + case strings.Contains(errStr, UseOfClosedSocket): + klog.V(3).Infof("Probable graceful websocket close: %v\n", err) + + // fatal close + c.closeWs(false) + + klog.V(6).Infof("speak.listen() LEAVE\n") + return + case strings.Contains(errStr, FatalReadSocketErr): + klog.V(1).Infof("Fatal socket error: %v\n", err) + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("speak.listen(): Fatal socket error. Err: %v\n", sendErr) + } + + // fatal close + c.closeWs(true) + + klog.V(6).Infof("speak.listen() LEAVE\n") + return + case strings.Contains(errStr, "Deepgram"): + klog.V(1).Infof("speak.listen(): Deepgram error. Err: %v\n", err) + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("speak.listen(): Deepgram ErrorMsg. Err: %v\n", sendErr) + } + + // close the connection + c.closeWs(false) + + klog.V(6).Infof("speak.listen() LEAVE\n") + return + case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry: + klog.V(3).Infof("Client object EOF\n") + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("speak.listen(): EOF error. Err: %v\n", sendErr) + } + + // close the connection + c.closeWs(true) + + klog.V(6).Infof("speak.listen() LEAVE\n") + return + default: + klog.V(1).Infof("speak.listen(): Cannot read websocket message. Err: %v\n", err) + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("speak.listen(): EOF error. Err: %v\n", sendErr) + } + + // close the connection + c.closeWs(true) + + klog.V(6).Infof("speak.listen() LEAVE\n") + return + } + } + + if len(byMsg) == 0 { + klog.V(7).Infof("listen(): message empty") + continue + } + + // inspect the message + // if c.cOptions.InspectMessage() { + // err := c.inspect(byMsg) + // if err != nil { + // klog.V(1).Infof("speak.listen(): inspect failed. Err: %v\n", err) + // } + // } + + switch msgType { + case websocket.TextMessage: + err := c.router.Message(byMsg) + if err != nil { + klog.V(1).Infof("speak.listen(): router.Message failed. Err: %v\n", err) + } + case websocket.BinaryMessage: + err := c.router.Binary(byMsg) + if err != nil { + klog.V(1).Infof("speak.listen(): router.Message failed. Err: %v\n", err) + } + default: + klog.V(7).Infof("speak.listen(): msg recv: type %d, len: %d\n", msgType, len(byMsg)) + } + } +} + +// SpeakWithText writes binary data to the websocket server +func (c *Client) SpeakWithText(text string) error { + klog.V(6).Infof("speak.SpeakText() ENTER\n") + klog.V(4).Infof("text: %s\n", text) + + err := c.WriteJSON(TextSource{ + Type: MessageTypeSpeak, + Text: text, + }) + if err == nil { + klog.V(4).Infof("SpeakText Succeeded\n") + } else { + klog.V(1).Infof("SpeakText failed. Err: %v\n", err) + } + + klog.V(6).Infof("speak.SpeakText() LEAVE\n") + + return err +} + +// SpeakWithStream writes binary data to the websocket server +// NOTE: This is unimplemented on the server side +func (c *Client) SpeakWithStream(byData []byte) error { + klog.V(6).Infof("speak.SpeakText() ENTER\n") + + err := c.WriteBinary(byData) + if err == nil { + klog.V(4).Infof("SpeakText Succeeded\n") + } else { + klog.V(1).Infof("SpeakText failed. Err: %v\n", err) + } + + klog.V(6).Infof("speak.SpeakText() LEAVE\n") + + return err +} + +// WriteBinary writes binary data to the websocket server +func (c *Client) WriteBinary(byData []byte) error { + klog.V(6).Infof("speak.WriteBinary() ENTER\n") + + // doing a write, need to lock + c.muConn.Lock() + defer c.muConn.Unlock() + + // get the connection + ws := c.internalConnect() + if ws == nil { + err := ErrInvalidConnection + klog.V(1).Infof("c.Connect() is nil. Err: %v\n", err) + klog.V(6).Infof("speak.WriteBinary() LEAVE\n") + + return err + } + + if err := ws.WriteMessage( + websocket.BinaryMessage, + byData, + ); err != nil { + klog.V(1).Infof("WriteBinary WriteMessage failed. Err: %v\n", err) + klog.V(6).Infof("speak.WriteBinary() LEAVE\n") + return err + } + + klog.V(6).Infof("WriteBinary Successful\n") + klog.V(7).Infof("payload: %x\n", byData) + klog.V(6).Infof("speak.WriteBinary() LEAVE\n") + + return nil +} + +/* +WriteJSON writes a JSON control payload to the websocket server. These are control messages for +managing the text-to-speech session on the Deepgram server. +*/ +func (c *Client) WriteJSON(payload interface{}) error { + klog.V(6).Infof("speak.WriteJSON() ENTER\n") + + byData, err := json.Marshal(payload) + if err != nil { + klog.V(1).Infof("WriteJSON: Error marshaling JSON. Data: %v, Err: %v\n", payload, err) + klog.V(6).Infof("speak.WriteJSON() LEAVE\n") + return err + } + + // doing a write, need to lock + c.muConn.Lock() + defer c.muConn.Unlock() + + // doing a write, need to lock + ws := c.internalConnect() + if ws == nil { + err := ErrInvalidConnection + klog.V(1).Infof("c.internalConnect() is nil. Err: %v\n", err) + klog.V(6).Infof("speak.WriteJSON() LEAVE\n") + + return err + } + if err := ws.WriteMessage( + websocket.TextMessage, + byData, + ); err != nil { + klog.V(1).Infof("WriteJSON WriteMessage failed. Err: %v\n", err) + klog.V(6).Infof("speak.WriteJSON() LEAVE\n") + return err + } + + klog.V(4).Infof("WriteJSON succeeded.\n") + klog.V(7).Infof("payload: %s\n", string(byData)) + klog.V(6).Infof("speak.WriteJSON() LEAVE\n") + + return nil +} + +// Flush will instruct the server to flush the current text buffer +func (c *Client) Flush() error { + klog.V(6).Infof("speak.Flush() ENTER\n") + + err := c.WriteJSON(controlMessage{Type: MessageTypeFlush}) + if err != nil { + klog.V(1).Infof("Flush failed. Err: %v\n", err) + klog.V(6).Infof("speak.Flush() LEAVE\n") + + return err + } + + klog.V(4).Infof("Flush Succeeded\n") + klog.V(6).Infof("speak.Flush() LEAVE\n") + + return err +} + +// Reset will instruct the server to reset the current buffer +func (c *Client) Reset() error { + klog.V(6).Infof("speak.Reset() ENTER\n") + + err := c.WriteJSON(controlMessage{Type: MessageTypeReset}) + if err != nil { + klog.V(1).Infof("Reset failed. Err: %v\n", err) + klog.V(6).Infof("speak.Reset() LEAVE\n") + + return err + } + + klog.V(4).Infof("Reset Succeeded\n") + klog.V(6).Infof("speak.Reset() LEAVE\n") + return nil +} + +// closeStream sends an application level message to Deepgram +func (c *Client) closeStream(lock bool) error { + klog.V(6).Infof("speak.closeStream() ENTER\n") + + // doing a write, need to lock + if lock { + c.muConn.Lock() + defer c.muConn.Unlock() + } + + err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"Close\" }")) + if err != nil { + klog.V(1).Infof("WriteMessage failed. Err: %v\n", err) + klog.V(6).Infof("speak.closeStream() LEAVE\n") + + return err + } + + klog.V(4).Infof("closeStream Succeeded\n") + klog.V(6).Infof("speak.closeStream() LEAVE\n") + + return err +} + +// normalClosure sends a normal closure message to the server +func (c *Client) normalClosure(lock bool) error { + klog.V(6).Infof("speak.normalClosure() ENTER\n") + + // doing a write, need to lock + if lock { + c.muConn.Lock() + defer c.muConn.Unlock() + } + + ws := c.internalConnect() + if ws == nil { + err := ErrInvalidConnection + klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err) + klog.V(6).Infof("speak.normalClosure() LEAVE\n") + + return err + } + + err := c.wsconn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + switch err { + case websocket.ErrCloseSent: + klog.V(3).Infof("ErrCloseSent was sent. Err: %v\n", err) + case nil: + klog.V(4).Infof("normalClosure Succeeded\n") + default: + klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", err) + } + + klog.V(6).Infof("speak.normalClosure() LEAVE\n") + + return err +} + +// Stop will send close message and shutdown websocket connection +func (c *Client) Stop() { + klog.V(3).Infof("Stopping...\n") + c.retry = false + + // exit gracefully + c.ctxCancel() + c.closeWs(false) +} + +// closeWs closes the websocket connection +func (c *Client) closeWs(fatal bool) { + klog.V(6).Infof("speak.closeWs() closing channels...\n") + + // doing a write, need to lock + c.muConn.Lock() + defer c.muConn.Unlock() + + if c.wsconn != nil && !fatal { + // calling this even though it's apart of the TTS WS protocol, causes a websocket: close 1005 (no status) + // // deepgram requires a close message to be sent + // _ = c.closeStream(false) + // time.Sleep(TerminationSleep) // allow time for server to register closure + + // websocket protocol message + _ = c.normalClosure(false) + time.Sleep(TerminationSleep) // allow time for server to register closure + } + + if fatal || c.wsconn != nil { + // fire off close connection + err := c.router.CloseHelper(&msginterfaces.CloseResponse{ + Type: msginterfaces.TypeCloseResponse, + }) + if err != nil { + klog.V(1).Infof("router.CloseHelper failed. Err: %v\n", err) + } + } + + // close the connection + if c.wsconn != nil { + c.wsconn.Close() + c.wsconn = nil + } + + klog.V(4).Infof("speak.closeWs() Succeeded\n") + klog.V(6).Infof("speak.closeWs() LEAVE\n") +} + +// sendError sends an error message to the callback handler +func (c *Client) sendError(err error) error { + response := c.errorToResponse(err) + sendErr := c.router.ErrorHelper(response) + if err != nil { + klog.V(1).Infof("speak.listen(): router.Error failed. Err: %v\n", sendErr) + } + + return err +} + +// errorToResponse converts an error into a Deepgram error response +func (c *Client) errorToResponse(err error) *msginterfaces.ErrorResponse { + r := regexp.MustCompile(`websocket: ([a-z]+) (\d+) .+: (.+)`) + + var errorCode string + var errorNum string + var errorDesc string + + matches := r.FindStringSubmatch(err.Error()) + if len(matches) > 3 { + errorCode = matches[1] + errorNum = matches[2] + errorDesc = matches[3] + } else { + errorCode = UnknownDeepgramErr + errorNum = UnknownDeepgramErr + errorDesc = err.Error() + } + + response := &msginterfaces.ErrorResponse{ + Type: msginterfaces.TypeErrorResponse, + ErrMsg: strings.TrimSpace(fmt.Sprintf("%s %s", errorCode, errorNum)), + Description: strings.TrimSpace(errorDesc), + Variant: errorNum, + } + return response +} diff --git a/pkg/client/speak/v1/websocket/constants.go b/pkg/client/speak/v1/websocket/constants.go new file mode 100644 index 00000000..d5ad627e --- /dev/null +++ b/pkg/client/speak/v1/websocket/constants.go @@ -0,0 +1,62 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package websocketv1 + +import ( + "errors" + "time" +) + +const ( + PackageVersion string = "v1.0" +) + +// external constants +const ( + DefaultConnectRetry int64 = 3 + + ChunkSize = 1024 * 2 + TerminationSleep = 100 * time.Millisecond + + // socket errors + FatalReadSocketErr string = "read: can't assign requested address" + FatalWriteSocketErr string = "write: broken pipe" + UseOfClosedSocket string = "use of closed network connection" + UnknownDeepgramErr string = "unknown deepgram error" + + // socket successful close error + SuccessfulSocketErr string = "close 1000" +) + +const ( + // MessageTypeFlush flushes the audio from the server + MessageTypeSpeak string = "Speak" + + // MessageTypeFlush flushes the audio from the server + MessageTypeFlush string = "Flush" + + // MessageTypeReset resets the text buffer + MessageTypeReset string = "Reset" + + // MessageTypeClose closes the stream + MessageTypeClose string = "Close" +) + +// errors +var ( + // ErrInvalidInput required input was not found + ErrInvalidInput = errors.New("required input was not found") + + // ErrInvalidConnection connection is not valid + ErrInvalidConnection = errors.New("connection is not valid") + + // ErrFatalPanicRecovered fatal panic recovered + ErrFatalPanicRecovered = errors.New("fatal panic - attempt to recover") +) + +// internal constants for retry, waits, back-off, etc. +const ( + defaultDelayBetweenRetry int64 = 2 +) diff --git a/pkg/client/speak/v1/websocket/types.go b/pkg/client/speak/v1/websocket/types.go new file mode 100644 index 00000000..e0886260 --- /dev/null +++ b/pkg/client/speak/v1/websocket/types.go @@ -0,0 +1,34 @@ +// Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package websocketv1 + +import ( + "context" + "sync" + + "github.com/dvonthenen/websocket" + + speak "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket" + msginterface "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket/interfaces" + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces/v1" +) + +// Client is a struct representing the websocket client connection +type Client struct { + cOptions *interfaces.ClientOptions + sOptions *interfaces.SpeakOptions + + sendBuf chan []byte + ctx context.Context + ctxCancel context.CancelFunc + + muConn sync.RWMutex + wsconn *websocket.Conn + retry bool + retryCnt int64 + + callback msginterface.SpeakMessageCallback + router *speak.MessageRouter +}