From 084e9705d2025df5f0bf68199bd864a2af48b314 Mon Sep 17 00:00:00 2001 From: James Harris Date: Wed, 3 Apr 2024 16:52:44 +1000 Subject: [PATCH] Rename `ExchangeQueue.Exchange()` to `Do()`. --- internal/eventstream/append_test.go | 4 +-- internal/integration/executor.go | 2 +- internal/integration/supervisor_test.go | 4 +-- internal/messaging/exchange.go | 40 +++++++++++++------------ 4 files changed, 26 insertions(+), 24 deletions(-) diff --git a/internal/eventstream/append_test.go b/internal/eventstream/append_test.go index b3844016..4a791440 100644 --- a/internal/eventstream/append_test.go +++ b/internal/eventstream/append_test.go @@ -107,7 +107,7 @@ func TestAppend(t *testing.T) { RunInBackground(t, "supervisor", deps.Supervisor.Run). UntilStopped() - res, err := deps.Supervisor.AppendQueue.Exchange( + res, err := deps.Supervisor.AppendQueue.Do( tctx, AppendRequest{ StreamID: streamID, @@ -144,7 +144,7 @@ func TestAppend(t *testing.T) { t.Logf("append an event, attempt #%d", attempt) attempt++ - _, err := deps.Supervisor.AppendQueue.Exchange(tctx, req) + _, err := deps.Supervisor.AppendQueue.Do(tctx, req) if err == nil { break } diff --git a/internal/integration/executor.go b/internal/integration/executor.go index 33f20bab..afef29b4 100644 --- a/internal/integration/executor.go +++ b/internal/integration/executor.go @@ -21,7 +21,7 @@ func (e *CommandExecutor) ExecuteCommand( c dogma.Command, _ ...dogma.ExecuteCommandOption, ) error { - _, err := e.ExecuteQueue.Exchange( + _, err := e.ExecuteQueue.Do( ctx, ExecuteRequest{ Command: e.Packer.Pack(c), }, diff --git a/internal/integration/supervisor_test.go b/internal/integration/supervisor_test.go index 2181e823..40e7e707 100644 --- a/internal/integration/supervisor_test.go +++ b/internal/integration/supervisor_test.go @@ -339,7 +339,7 @@ func TestSupervisor(t *testing.T) { } for { - _, err := deps.Supervisor.ExecuteQueue.Exchange(tctx, req) + _, err := deps.Supervisor.ExecuteQueue.Do(tctx, req) if tctx.Err() != nil { t.Fatal(tctx.Err()) @@ -461,7 +461,7 @@ func TestSupervisor(t *testing.T) { RunInBackground(t, "supervisor", secondSupervisor.Run). BeforeTestEnds() - if _, err := secondSupervisor.ExecuteQueue.Exchange(tctx, req); err != nil { + if _, err := secondSupervisor.ExecuteQueue.Do(tctx, req); err != nil { t.Fatal(err) } diff --git a/internal/messaging/exchange.go b/internal/messaging/exchange.go index 690b5140..35f517e7 100644 --- a/internal/messaging/exchange.go +++ b/internal/messaging/exchange.go @@ -6,59 +6,61 @@ import ( ) // Exchange encapsulates a request/response pair. -type Exchange[Request, Response any] struct { +type Exchange[Req, Res any] struct { Context context.Context - Request Request - Response chan<- Failable[Response] + Request Req + Response chan<- Failable[Res] } // Ok sends a successful response. -func (e Exchange[Request, Response]) Ok(res Response) { - e.Response <- Failable[Response]{value: res} +func (e Exchange[Req, Res]) Ok(res Res) { + e.Response <- Failable[Res]{value: res} } // Err sends an error response. -func (e Exchange[Request, Response]) Err(err error) { - e.Response <- Failable[Response]{err: err} +func (e Exchange[Req, Res]) Err(err error) { + e.Response <- Failable[Res]{err: err} } // ExchangeQueue is a queue of request/response exchanges. -type ExchangeQueue[Request, Response any] struct { +type ExchangeQueue[Req, Res any] struct { init sync.Once - queue chan Exchange[Request, Response] + queue chan Exchange[Req, Res] } // Recv returns a channel that, when read, dequeues the next exchange. -func (q *ExchangeQueue[Request, Response]) Recv() <-chan Exchange[Request, Response] { +func (q *ExchangeQueue[Req, Res]) Recv() <-chan Exchange[Req, Res] { return q.getQueue() } // Send returns a channel that, when written, enqueues an exchange. -func (q *ExchangeQueue[Request, Response]) Send() chan<- Exchange[Request, Response] { +func (q *ExchangeQueue[Req, Res]) Send() chan<- Exchange[Req, Res] { return q.getQueue() } -// Exchange performs a synchronous request/response exchange. -func (q *ExchangeQueue[Request, Response]) Exchange(ctx context.Context, req Request) (res Response, err error) { - response := make(chan Failable[Response], 1) +// Do performs a synchronous request/response exchange. +func (q *ExchangeQueue[Req, Res]) Do(ctx context.Context, req Req) (Res, error) { + response := make(chan Failable[Res], 1) select { case <-ctx.Done(): - return res, ctx.Err() - case q.Send() <- Exchange[Request, Response]{ctx, req, response}: + var zero Res + return zero, ctx.Err() + case q.Send() <- Exchange[Req, Res]{ctx, req, response}: } select { case <-ctx.Done(): - return res, ctx.Err() + var zero Res + return zero, ctx.Err() case f := <-response: return f.Get() } } -func (q *ExchangeQueue[Request, Response]) getQueue() chan Exchange[Request, Response] { +func (q *ExchangeQueue[Req, Res]) getQueue() chan Exchange[Req, Res] { q.init.Do(func() { - q.queue = make(chan Exchange[Request, Response]) + q.queue = make(chan Exchange[Req, Res]) }) return q.queue }