Skip to content

Commit

Permalink
Move headers and delays into requestoption/ send option (#34)
Browse files Browse the repository at this point in the history
* Move headers and delays into requestoption/ send option

But keep codec as a property of the client, as this is a per method
option and you might want to store a client and use it repeatedly with
the codec set.

* Rename CallOptions to ClientOptions
  • Loading branch information
jackkleeman authored Aug 21, 2024
1 parent 16fb256 commit eb778c7
Show file tree
Hide file tree
Showing 14 changed files with 128 additions and 83 deletions.
10 changes: 5 additions & 5 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ type Context interface {

// Service gets a Service accessor by service and method name
// Note: use the CallAs helper function to deserialise return values
Service(service, method string, opts ...options.CallOption) CallClient
Service(service, method string, opts ...options.ClientOption) CallClient

// Object gets a Object accessor by name, key and method name
// Note: use the CallAs helper function to receive serialised values
Object(object, key, method string, opts ...options.CallOption) CallClient
Object(object, key, method string, opts ...options.ClientOption) CallClient

// Run runs the function (fn), storing final results (including terminal errors)
// durably in the journal, or otherwise for transient errors stopping execution
Expand Down Expand Up @@ -81,11 +81,11 @@ type Awakeable interface {
// CallClient represents all the different ways you can invoke a particular service/key/method tuple.
type CallClient interface {
// RequestFuture makes a call and returns a handle on a future response
RequestFuture(input any) ResponseFuture
RequestFuture(input any, opts ...options.RequestOption) ResponseFuture
// Request makes a call and blocks on getting the response which is stored in output
Request(input any, output any) error
Request(input any, output any, opts ...options.RequestOption) error
// Send makes a one-way call which is executed in the background
Send(input any, delay time.Duration)
Send(input any, opts ...options.SendOption)
}

// ResponseFuture is a handle on a potentially not-yet completed outbound call.
Expand Down
7 changes: 1 addition & 6 deletions examples/codegen/buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,7 @@ plugins:
- remote: buf.build/protocolbuffers/go:v1.34.2
out: .
opt: paths=source_relative
- local:
- docker
- run
- --pull=always
- -i
- ghcr.io/restatedev/protoc-gen-go-restate:latest
- local: protoc-gen-go-restate
out: .
opt: paths=source_relative
inputs:
Expand Down
42 changes: 21 additions & 21 deletions examples/codegen/proto/helloworld_restate.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions examples/ticketreservation/user_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (u *userSession) AddTicket(ctx restate.ObjectContext, ticketId string) (boo
tickets = append(tickets, ticketId)

ctx.Set("tickets", tickets)
ctx.Object(UserSessionServiceName, userId, "ExpireTicket").Send(ticketId, 15*time.Minute)
ctx.Object(UserSessionServiceName, userId, "ExpireTicket").Send(ticketId, restate.WithDelay(15*time.Minute))

return true, nil
}
Expand All @@ -60,7 +60,7 @@ func (u *userSession) ExpireTicket(ctx restate.ObjectContext, ticketId string) (
}

ctx.Set("tickets", tickets)
ctx.Object(TicketServiceName, ticketId, "Unreserve").Send(nil, 0)
ctx.Object(TicketServiceName, ticketId, "Unreserve").Send(restate.Void{})

return void, nil
}
Expand Down Expand Up @@ -101,8 +101,7 @@ func (u *userSession) Checkout(ctx restate.ObjectContext, _ restate.Void) (bool,
ctx.Log().Info("payment details", "id", response.ID, "price", response.Price)

for _, ticket := range tickets {
call := ctx.Object(TicketServiceName, ticket, "MarkAsSold")
call.Send(nil, 0)
ctx.Object(TicketServiceName, ticket, "MarkAsSold").Send(restate.Void{})
}

ctx.Clear("tickets")
Expand Down
19 changes: 9 additions & 10 deletions facilitators.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package restate

import (
"errors"
"time"

"github.com/restatedev/sdk-go/internal/options"
)
Expand Down Expand Up @@ -58,11 +57,11 @@ func AwakeableAs[T any](ctx Context, options ...options.AwakeableOption) TypedAw
// TypedCallClient is an extension of [CallClient] which deals in typed values
type TypedCallClient[I any, O any] interface {
// RequestFuture makes a call and returns a handle on a future response
RequestFuture(input I) TypedResponseFuture[O]
RequestFuture(input I, opts ...options.RequestOption) TypedResponseFuture[O]
// Request makes a call and blocks on getting the response
Request(input I) (O, error)
Request(input I, opts ...options.RequestOption) (O, error)
// Send makes a one-way call which is executed in the background
Send(input I, delay time.Duration)
Send(input I, opts ...options.SendOption)
}

type typedCallClient[I any, O any] struct {
Expand All @@ -76,17 +75,17 @@ func NewTypedCallClient[I any, O any](client CallClient) TypedCallClient[I, O] {
return typedCallClient[I, O]{client}
}

func (t typedCallClient[I, O]) Request(input I) (output O, err error) {
err = t.inner.RequestFuture(input).Response(&output)
func (t typedCallClient[I, O]) Request(input I, opts ...options.RequestOption) (output O, err error) {
err = t.inner.RequestFuture(input, opts...).Response(&output)
return
}

func (t typedCallClient[I, O]) RequestFuture(input I) TypedResponseFuture[O] {
return typedResponseFuture[O]{t.inner.RequestFuture(input)}
func (t typedCallClient[I, O]) RequestFuture(input I, opts ...options.RequestOption) TypedResponseFuture[O] {
return typedResponseFuture[O]{t.inner.RequestFuture(input, opts...)}
}

func (t typedCallClient[I, O]) Send(input I, delay time.Duration) {
t.inner.Send(input, delay)
func (t typedCallClient[I, O]) Send(input I, opts ...options.SendOption) {
t.inner.Send(input, opts...)
}

// TypedResponseFuture is an extension of [ResponseFuture] which returns typed responses instead of accepting a pointer
Expand Down
30 changes: 25 additions & 5 deletions internal/options/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package options

import "github.com/restatedev/sdk-go/encoding"
import (
"time"

"github.com/restatedev/sdk-go/encoding"
)

type AwakeableOptions struct {
Codec encoding.Codec
Expand Down Expand Up @@ -34,13 +38,29 @@ type SetOption interface {
BeforeSet(*SetOptions)
}

type CallOptions struct {
Codec encoding.Codec
type ClientOptions struct {
Codec encoding.Codec
}

type ClientOption interface {
BeforeClient(*ClientOptions)
}

type RequestOptions struct {
Headers map[string]string
}

type RequestOption interface {
BeforeRequest(*RequestOptions)
}

type SendOptions struct {
Headers map[string]string
Delay time.Duration
}

type CallOption interface {
BeforeCall(*CallOptions)
type SendOption interface {
BeforeSend(*SendOptions)
}

type RunOptions struct {
Expand Down
26 changes: 18 additions & 8 deletions internal/state/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,26 @@ import (
)

type serviceCall struct {
options options.CallOptions
options options.ClientOptions
machine *Machine
service string
key string
method string
}

// RequestFuture makes a call and returns a handle on the response
func (c *serviceCall) RequestFuture(input any) restate.ResponseFuture {
func (c *serviceCall) RequestFuture(input any, opts ...options.RequestOption) restate.ResponseFuture {
o := options.RequestOptions{}
for _, opt := range opts {
opt.BeforeRequest(&o)
}

bytes, err := encoding.Marshal(c.options.Codec, input)
if err != nil {
panic(c.machine.newCodecFailure(fmt.Errorf("failed to marshal RequestFuture input: %w", err)))
}

entry, entryIndex := c.machine.doCall(c.service, c.key, c.method, c.options.Headers, bytes)
entry, entryIndex := c.machine.doCall(c.service, c.key, c.method, o.Headers, bytes)

return decodingResponseFuture{
futures.NewResponseFuture(c.machine.suspensionCtx, entry, entryIndex, func(err error) any { return c.machine.newProtocolViolation(entry, err) }),
Expand All @@ -42,7 +47,7 @@ func (c *serviceCall) RequestFuture(input any) restate.ResponseFuture {
type decodingResponseFuture struct {
*futures.ResponseFuture
machine *Machine
options options.CallOptions
options options.ClientOptions
}

func (d decodingResponseFuture) Response(output any) (err error) {
Expand All @@ -59,17 +64,22 @@ func (d decodingResponseFuture) Response(output any) (err error) {
}

// Request makes a call and blocks on the response
func (c *serviceCall) Request(input any, output any) error {
return c.RequestFuture(input).Response(output)
func (c *serviceCall) Request(input any, output any, opts ...options.RequestOption) error {
return c.RequestFuture(input, opts...).Response(output)
}

// Send runs a call in the background after delay duration
func (c *serviceCall) Send(input any, delay time.Duration) {
func (c *serviceCall) Send(input any, opts ...options.SendOption) {
o := options.SendOptions{}
for _, opt := range opts {
opt.BeforeSend(&o)
}

bytes, err := encoding.Marshal(c.options.Codec, input)
if err != nil {
panic(c.machine.newCodecFailure(fmt.Errorf("failed to marshal Send input: %w", err)))
}
c.machine.sendCall(c.service, c.key, c.method, c.options.Headers, bytes, delay)
c.machine.sendCall(c.service, c.key, c.method, o.Headers, bytes, o.Delay)
return
}

Expand Down
12 changes: 6 additions & 6 deletions internal/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ func (c *Context) After(d time.Duration) restate.After {
return c.machine.after(d)
}

func (c *Context) Service(service, method string, opts ...options.CallOption) restate.CallClient {
o := options.CallOptions{}
func (c *Context) Service(service, method string, opts ...options.ClientOption) restate.CallClient {
o := options.ClientOptions{}
for _, opt := range opts {
opt.BeforeCall(&o)
opt.BeforeClient(&o)
}
if o.Codec == nil {
o.Codec = encoding.JSONCodec
Expand All @@ -134,10 +134,10 @@ func (c *Context) Service(service, method string, opts ...options.CallOption) re
}
}

func (c *Context) Object(service, key, method string, opts ...options.CallOption) restate.CallClient {
o := options.CallOptions{}
func (c *Context) Object(service, key, method string, opts ...options.ClientOption) restate.CallClient {
o := options.ClientOptions{}
for _, opt := range opts {
opt.BeforeCall(&o)
opt.BeforeClient(&o)
}
if o.Codec == nil {
o.Codec = encoding.JSONCodec
Expand Down
Loading

0 comments on commit eb778c7

Please sign in to comment.