Skip to content

Commit

Permalink
Merge branch 'release/2024.2.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexa committed Mar 6, 2024
2 parents 4260b46 + f99d46a commit 46b8f77
Show file tree
Hide file tree
Showing 15 changed files with 228 additions and 64 deletions.
18 changes: 18 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,24 @@
## develop


## 2024.2.0

- [CHANGE] retry 設定を削除し、リトライ回数を指定する max_retry 設定を追加する
- リトライしない場合は、max_retry を設定ファイルから削除するか、または、max_retry = 0 を設定する
- デフォルト値: 0 (リトライ無し)
- @Hexa
- [ADD] サービス接続時のエラーによるリトライまでの時間間隔を指定する retry_interval_ms 設定(ミリ秒間隔)を追加する
- デフォルト値: 100 (100 ms)
- @Hexa
- [ADD] サービス接続時の特定のエラー発生時に、リトライする仕組みを追加する
- @Hexa
- [ADD] ハンドラーにリトライ回数を管理するメソッドを追加する
- @Hexa
- [CHANGE] aws への接続時に、時間をおいて再接続できる可能性がある HTTP ステータスコードが 429 の応答の場合は、指定されたリトライ設定に応じて、再接続を試みるように変更する
- @Hexa
- [CHANGE] aws、または、gcp への接続後にリトライ回数が max_retry を超えた場合は、{"type": "error", "reason": string} をクライアントへ送信する
- @Hexa

## 2024.1.0

- [UPDATE] go.mod の Go のバージョンを 1.22.0 にあげる
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2024.1.0
2024.2.0
7 changes: 7 additions & 0 deletions amazon_transcribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,16 @@ func (at *AmazonTranscribe) Start(ctx context.Context, r io.Reader) (*transcribe
if reqErr, ok := err.(awserr.RequestFailure); ok {
code := reqErr.StatusCode()
message := reqErr.Message()

var retry bool
if code == http.StatusTooManyRequests {
retry = true
}

return nil, &SuzuError{
Code: code,
Message: message,
Retry: retry,
}
}
return nil, err
Expand Down
39 changes: 31 additions & 8 deletions amazon_transcribe_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"io"
"sync"

"github.com/aws/aws-sdk-go/service/transcribestreamingservice"
zlog "github.com/rs/zerolog/log"
Expand All @@ -22,6 +23,8 @@ type AmazonTranscribeHandler struct {
SampleRate uint32
ChannelCount uint16
LanguageCode string
RetryCount int
mu sync.Mutex

OnResultFunc func(context.Context, io.WriteCloser, string, string, string, any) error
}
Expand All @@ -34,6 +37,7 @@ func NewAmazonTranscribeHandler(config Config, channelID, connectionID string, s
SampleRate: sampleRate,
ChannelCount: channelCount,
LanguageCode: languageCode,
RetryCount: 0,
OnResultFunc: onResultFunc.(func(context.Context, io.WriteCloser, string, string, string, any) error),
}
}
Expand Down Expand Up @@ -67,6 +71,24 @@ func (ar *AwsResult) SetMessage(message string) *AwsResult {
return ar
}

func (h *AmazonTranscribeHandler) UpdateRetryCount() int {
defer h.mu.Unlock()
h.mu.Lock()
h.RetryCount++
return h.RetryCount
}

func (h *AmazonTranscribeHandler) GetRetryCount() int {
return h.RetryCount
}

func (h *AmazonTranscribeHandler) ResetRetryCount() int {
defer h.mu.Unlock()
h.mu.Lock()
h.RetryCount = 0
return h.RetryCount
}

func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) {
at := NewAmazonTranscribe(h.Config, h.LanguageCode, int64(h.SampleRate), int64(h.ChannelCount))

Expand Down Expand Up @@ -153,17 +175,18 @@ func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader)
}

if err := stream.Err(); err != nil {
zlog.Error().
Err(err).
Str("channel_id", h.ChannelID).
Str("connection_id", h.ConnectionID).
Int("retry_count", h.GetRetryCount()).
Send()

// 復帰が不可能なエラー以外は再接続を試みる
switch err.(type) {
case *transcribestreamingservice.LimitExceededException:
zlog.Error().
Err(err).
Str("channel_id", h.ChannelID).
Str("connection_id", h.ConnectionID).
Send()

// リトライしない設定の場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する
if !*at.Config.Retry {
// リトライしない設定の場合、または、max_retry を超えた場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する
if (at.Config.MaxRetry < 1) || (at.Config.MaxRetry <= h.GetRetryCount()) {
if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil {
zlog.Error().
Err(err).
Expand Down
15 changes: 10 additions & 5 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ const (

// 100ms
DefaultTimeToWaitForOpusPacketMs = 100

// リトライ間隔 100ms
DefaultRetryIntervalMs = 100
)

type Config struct {
Expand All @@ -46,7 +49,8 @@ type Config struct {
HTTP2MaxReadFrameSize uint32 `ini:"http2_max_read_frame_size"`
HTTP2IdleTimeout uint32 `ini:"http2_idle_timeout"`

Retry *bool `ini:"retry"`
MaxRetry int `ini:"max_retry"`
RetryIntervalMs int `ini:"retry_interval_ms"`

ExporterHTTPS bool `ini:"exporter_https"`
ExporterListenAddr string `ini:"exporter_listen_addr"`
Expand Down Expand Up @@ -160,12 +164,11 @@ func setDefaultsConfig(config *Config) {
config.TimeToWaitForOpusPacketMs = DefaultTimeToWaitForOpusPacketMs
}

// 未指定の場合は true
if config.Retry == nil {
defaultRetry := true
config.Retry = &defaultRetry
if config.RetryIntervalMs == 0 {
config.RetryIntervalMs = DefaultRetryIntervalMs
}
}

func validateConfig(config *Config) error {
var err error
// アドレスとして正しいことを確認する
Expand Down Expand Up @@ -213,4 +216,6 @@ func ShowConfig(config *Config) {
zlog.Info().Str("exporter_listen_addr", config.ExporterListenAddr).Msg("CONF")
zlog.Info().Int("exporter_listen_port", config.ExporterListenPort).Msg("CONF")

zlog.Info().Int("max_retry", config.MaxRetry).Msg("CONF")
zlog.Info().Int("retry_interval_ms", config.RetryIntervalMs).Msg("CONF")
}
6 changes: 4 additions & 2 deletions config_example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ audio_channel_count = 1
# 受信した音声データの保存先ファイルです
dump_file = ./dump.jsonl

# サーバからの切断時に再接続を試みます
retry = true
# サーバからの切断時またはハンドラー個別で指定した条件でのリトライ回数を指定します
max_retry = 0
# リトライ間隔(ミリ秒)です
retry_interval_ms = 100

# aws の場合は IsPartial が false, gcp の場合は IsFinal が true の場合の最終的な結果のみを返す指定
final_result_only = true
Expand Down
5 changes: 5 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@ package suzu
type SuzuError struct {
Code int
Message string
Retry bool
}

func (e *SuzuError) Error() string {
return e.Message
}

func (e *SuzuError) IsRetry() bool {
return e.Retry
}
28 changes: 14 additions & 14 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,29 @@ module github.com/shiguredo/suzu
go 1.22.0

require (
cloud.google.com/go/speech v1.21.1
github.com/aws/aws-sdk-go v1.50.20
cloud.google.com/go/speech v1.22.0
github.com/aws/aws-sdk-go v1.50.30
github.com/labstack/echo-contrib v0.15.0
github.com/labstack/echo/v4 v4.11.4
github.com/pion/randutil v0.1.0
github.com/pion/rtp v1.8.3
github.com/rs/zerolog v1.32.0
github.com/stretchr/testify v1.8.4
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a
github.com/stretchr/testify v1.9.0
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225
golang.org/x/net v0.21.0
golang.org/x/sync v0.6.0
google.golang.org/api v0.165.0
google.golang.org/grpc v1.61.1
google.golang.org/api v0.167.0
google.golang.org/grpc v1.62.0
google.golang.org/protobuf v1.32.0
gopkg.in/ini.v1 v1.67.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)

require (
cloud.google.com/go v0.112.0 // indirect
cloud.google.com/go/compute v1.23.3 // indirect
cloud.google.com/go/compute v1.24.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/longrunning v0.5.4 // indirect
cloud.google.com/go/longrunning v0.5.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -37,7 +37,7 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/googleapis/gax-go/v2 v2.12.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/labstack/gommon v0.4.2 // indirect
Expand All @@ -52,8 +52,8 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.48.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.48.0 // indirect
go.opentelemetry.io/otel v1.23.0 // indirect
go.opentelemetry.io/otel/metric v1.23.0 // indirect
go.opentelemetry.io/otel/trace v1.23.0 // indirect
Expand All @@ -63,8 +63,8 @@ require (
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240205150955-31a09d347014 // indirect
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240221002015-b0ce06bbee7c // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 46b8f77

Please sign in to comment.