From 6855b7290cab62a1fc6a26a2b633e0b5bbf248da Mon Sep 17 00:00:00 2001 From: MoofMonkey Date: Thu, 26 May 2022 21:37:13 +0300 Subject: [PATCH] fix: prevent goroutine leak and CPU spinning at websocket transport (#2209) * Added goroutine leak test for chat example * Improved chat example with proper concurrency * Revert "fix: prevents goroutine leak at websocket transport (#2168)" This reverts commit eef7bfaad1b524f9e2fc0c1150fdb321c276069e. * Improved subscription channel usage * Regenerated examples and codegen * Add support for subscription keepalives in websocket client * Update chat example test * if else chain to switch Signed-off-by: Steve Coffman * Revert "Add support for subscription keepalives in websocket client" This reverts commits 64b882c3c9901f25edc0684ce2a1f9b63443416b and 670cf22272b490005d46dc2bee1634de1cd06d68. * Fixed chat example race condition * Fixed chatroom#Messages type Co-authored-by: Steve Coffman --- _examples/chat/chat_test.go | 90 +++++---- _examples/chat/generated.go | 22 ++- _examples/chat/resolvers.go | 104 ++++------- codegen/field.gotpl | 22 ++- .../followschema/schema.generated.go | 176 +++++++++++------- codegen/testserver/singlefile/generated.go | 176 +++++++++++------- graphql/handler/transport/websocket.go | 18 +- 7 files changed, 336 insertions(+), 272 deletions(-) diff --git a/_examples/chat/chat_test.go b/_examples/chat/chat_test.go index 40b37c07f7..6f546dc0f6 100644 --- a/_examples/chat/chat_test.go +++ b/_examples/chat/chat_test.go @@ -1,49 +1,75 @@ package chat import ( - "testing" - "time" - + "fmt" "github.com/99designs/gqlgen/client" "github.com/99designs/gqlgen/graphql/handler" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "runtime" + "sync" + "testing" ) func TestChatSubscriptions(t *testing.T) { c := client.New(handler.NewDefaultServer(NewExecutableSchema(New()))) - sub := c.Websocket(`subscription @user(username:"vektah") { messageAdded(roomName:"#gophers") { text createdBy } }`) - defer sub.Close() - - go func() { - var resp interface{} - time.Sleep(10 * time.Millisecond) - err := c.Post(`mutation { - a:post(text:"Hello!", roomName:"#gophers", username:"vektah") { id } - b:post(text:"Hello Vektah!", roomName:"#gophers", username:"andrey") { id } - c:post(text:"Whats up?", roomName:"#gophers", username:"vektah") { id } - }`, &resp) - assert.NoError(t, err) - }() - - var msg struct { - resp struct { - MessageAdded struct { - Text string - CreatedBy string + const batchSize = 128 + var wg sync.WaitGroup + for i := 0; i < batchSize*8; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + sub := c.Websocket(fmt.Sprintf( + `subscription @user(username:"vektah") { messageAdded(roomName:"#gophers%d") { text createdBy } }`, + i, + )) + defer sub.Close() + + var msg struct { + resp struct { + MessageAdded struct { + Text string + CreatedBy string + } + } + err error } + + msg.err = sub.Next(&msg.resp) + require.NoError(t, msg.err, "sub.Next") + require.Equal(t, "You've joined the room", msg.resp.MessageAdded.Text) + require.Equal(t, "system", msg.resp.MessageAdded.CreatedBy) + + go func() { + var resp interface{} + err := c.Post(fmt.Sprintf(`mutation { + a:post(text:"Hello!", roomName:"#gophers%d", username:"vektah") { id } + b:post(text:"Hello Vektah!", roomName:"#gophers%d", username:"andrey") { id } + c:post(text:"Whats up?", roomName:"#gophers%d", username:"vektah") { id } + }`, i, i, i), &resp) + assert.NoError(t, err) + }() + + msg.err = sub.Next(&msg.resp) + require.NoError(t, msg.err, "sub.Next") + require.Equal(t, "Hello!", msg.resp.MessageAdded.Text) + require.Equal(t, "vektah", msg.resp.MessageAdded.CreatedBy) + + msg.err = sub.Next(&msg.resp) + require.NoError(t, msg.err, "sub.Next") + require.Equal(t, "Whats up?", msg.resp.MessageAdded.Text) + require.Equal(t, "vektah", msg.resp.MessageAdded.CreatedBy) + }(i) + // wait for goroutines to finish every N tests to not starve on CPU + if (i+1)%batchSize == 0 { + wg.Wait() } - err error } + wg.Wait() - msg.err = sub.Next(&msg.resp) - require.NoError(t, msg.err, "sub.Next") - require.Equal(t, "Hello!", msg.resp.MessageAdded.Text) - require.Equal(t, "vektah", msg.resp.MessageAdded.CreatedBy) - - msg.err = sub.Next(&msg.resp) - require.NoError(t, msg.err, "sub.Next") - require.Equal(t, "Whats up?", msg.resp.MessageAdded.Text) - require.Equal(t, "vektah", msg.resp.MessageAdded.CreatedBy) + // 1 for the main thread, 1 for the testing package and remainder is reserved for the HTTP server threads + // TODO: use something like runtime.Stack to filter out HTTP server threads, + // TODO: which is required for proper concurrency and leaks testing + require.Less(t, runtime.NumGoroutine(), 1+1+batchSize*2, "goroutine leak") } diff --git a/_examples/chat/generated.go b/_examples/chat/generated.go index 1c766d1613..a9c7fb0ee1 100644 --- a/_examples/chat/generated.go +++ b/_examples/chat/generated.go @@ -1001,17 +1001,21 @@ func (ec *executionContext) _Subscription_messageAdded(ctx context.Context, fiel return nil } return func(ctx context.Context) graphql.Marshaler { - res, ok := <-resTmp.(<-chan *Message) - if !ok { + select { + case res, ok := <-resTmp.(<-chan *Message): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalNMessage2ᚖgithubᚗcomᚋ99designsᚋgqlgenᚋ_examplesᚋchatᚐMessage(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): return nil } - return graphql.WriterFunc(func(w io.Writer) { - w.Write([]byte{'{'}) - graphql.MarshalString(field.Alias).MarshalGQL(w) - w.Write([]byte{':'}) - ec.marshalNMessage2ᚖgithubᚗcomᚋ99designsᚋgqlgenᚋ_examplesᚋchatᚐMessage(ctx, field.Selections, res).MarshalGQL(w) - w.Write([]byte{'}'}) - }) } } diff --git a/_examples/chat/resolvers.go b/_examples/chat/resolvers.go index baeee1c197..f466fb299e 100644 --- a/_examples/chat/resolvers.go +++ b/_examples/chat/resolvers.go @@ -14,8 +14,7 @@ import ( type ckey string type resolver struct { - Rooms map[string]*Chatroom - mu sync.Mutex // nolint: structcheck + Rooms sync.Map } func (r *resolver) Mutation() MutationResolver { @@ -33,7 +32,7 @@ func (r *resolver) Subscription() SubscriptionResolver { func New() Config { return Config{ Resolvers: &resolver{ - Rooms: map[string]*Chatroom{}, + Rooms: sync.Map{}, }, Directives: DirectiveRoot{ User: func(ctx context.Context, obj interface{}, next graphql.Resolver, username string) (res interface{}, err error) { @@ -50,103 +49,78 @@ func getUsername(ctx context.Context) string { return "" } +type Observer struct { + Username string + Message chan *Message +} + type Chatroom struct { Name string Messages []Message - Observers map[string]struct { - Username string - Message chan *Message - } + Observers sync.Map } type mutationResolver struct{ *resolver } func (r *mutationResolver) Post(ctx context.Context, text string, username string, roomName string) (*Message, error) { - r.mu.Lock() - room := r.Rooms[roomName] - if room == nil { - room = &Chatroom{ - Name: roomName, - Observers: map[string]struct { - Username string - Message chan *Message - }{}, - } - r.Rooms[roomName] = room - } - r.mu.Unlock() + room := r.getRoom(roomName) - message := Message{ + message := &Message{ ID: randString(8), CreatedAt: time.Now(), Text: text, CreatedBy: username, } - room.Messages = append(room.Messages, message) - r.mu.Lock() - for _, observer := range room.Observers { + room.Messages = append(room.Messages, *message) + room.Observers.Range(func(_, v interface{}) bool { + observer := v.(*Observer) if observer.Username == "" || observer.Username == message.CreatedBy { - observer.Message <- &message + observer.Message <- message } - } - r.mu.Unlock() - return &message, nil + return true + }) + return message, nil } type queryResolver struct{ *resolver } -func (r *queryResolver) Room(ctx context.Context, name string) (*Chatroom, error) { - r.mu.Lock() - room := r.Rooms[name] - if room == nil { - room = &Chatroom{ - Name: name, - Observers: map[string]struct { - Username string - Message chan *Message - }{}, - } - r.Rooms[name] = room - } - r.mu.Unlock() +func (r *resolver) getRoom(name string) *Chatroom { + room, _ := r.Rooms.LoadOrStore(name, &Chatroom{ + Name: name, + Observers: sync.Map{}, + }) + return room.(*Chatroom) +} - return room, nil +func (r *queryResolver) Room(ctx context.Context, name string) (*Chatroom, error) { + return r.getRoom(name), nil } type subscriptionResolver struct{ *resolver } func (r *subscriptionResolver) MessageAdded(ctx context.Context, roomName string) (<-chan *Message, error) { - r.mu.Lock() - room := r.Rooms[roomName] - if room == nil { - room = &Chatroom{ - Name: roomName, - Observers: map[string]struct { - Username string - Message chan *Message - }{}, - } - r.Rooms[roomName] = room - } - r.mu.Unlock() + room := r.getRoom(roomName) id := randString(8) events := make(chan *Message, 1) go func() { <-ctx.Done() - r.mu.Lock() - delete(room.Observers, id) - r.mu.Unlock() + room.Observers.Delete(id) }() - r.mu.Lock() - room.Observers[id] = struct { - Username string - Message chan *Message - }{Username: getUsername(ctx), Message: events} - r.mu.Unlock() + room.Observers.Store(id, &Observer{ + Username: getUsername(ctx), + Message: events, + }) + + events <- &Message{ + ID: randString(8), + CreatedAt: time.Now(), + Text: "You've joined the room", + CreatedBy: "system", + } return events, nil } diff --git a/codegen/field.gotpl b/codegen/field.gotpl index 3629d92c7f..e47b958dda 100644 --- a/codegen/field.gotpl +++ b/codegen/field.gotpl @@ -39,17 +39,21 @@ func (ec *executionContext) _{{$object.Name}}_{{$field.Name}}(ctx context.Contex } {{- if $object.Stream }} return func(ctx context.Context) graphql.Marshaler { - res, ok := <-resTmp.(<-chan {{$field.TypeReference.GO | ref}}) - if !ok { + select { + case res, ok := <-resTmp.(<-chan {{$field.TypeReference.GO | ref}}): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.{{ $field.TypeReference.MarshalFunc }}(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): return nil } - return graphql.WriterFunc(func(w io.Writer) { - w.Write([]byte{'{'}) - graphql.MarshalString(field.Alias).MarshalGQL(w) - w.Write([]byte{':'}) - ec.{{ $field.TypeReference.MarshalFunc }}(ctx, field.Selections, res).MarshalGQL(w) - w.Write([]byte{'}'}) - }) } {{- else }} res := resTmp.({{$field.TypeReference.GO | ref}}) diff --git a/codegen/testserver/followschema/schema.generated.go b/codegen/testserver/followschema/schema.generated.go index bb8b19614a..6e0ca5675e 100644 --- a/codegen/testserver/followschema/schema.generated.go +++ b/codegen/testserver/followschema/schema.generated.go @@ -4723,17 +4723,21 @@ func (ec *executionContext) _Subscription_updated(ctx context.Context, field gra return nil } return func(ctx context.Context) graphql.Marshaler { - res, ok := <-resTmp.(<-chan string) - if !ok { + select { + case res, ok := <-resTmp.(<-chan string): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalNString2string(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): return nil } - return graphql.WriterFunc(func(w io.Writer) { - w.Write([]byte{'{'}) - graphql.MarshalString(field.Alias).MarshalGQL(w) - w.Write([]byte{':'}) - ec.marshalNString2string(ctx, field.Selections, res).MarshalGQL(w) - w.Write([]byte{'}'}) - }) } } @@ -4774,17 +4778,21 @@ func (ec *executionContext) _Subscription_initPayload(ctx context.Context, field return nil } return func(ctx context.Context) graphql.Marshaler { - res, ok := <-resTmp.(<-chan string) - if !ok { + select { + case res, ok := <-resTmp.(<-chan string): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalNString2string(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): return nil } - return graphql.WriterFunc(func(w io.Writer) { - w.Write([]byte{'{'}) - graphql.MarshalString(field.Alias).MarshalGQL(w) - w.Write([]byte{':'}) - ec.marshalNString2string(ctx, field.Selections, res).MarshalGQL(w) - w.Write([]byte{'}'}) - }) } } @@ -4822,17 +4830,21 @@ func (ec *executionContext) _Subscription_directiveArg(ctx context.Context, fiel return nil } return func(ctx context.Context) graphql.Marshaler { - res, ok := <-resTmp.(<-chan *string) - if !ok { + select { + case res, ok := <-resTmp.(<-chan *string): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalOString2ᚖstring(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): return nil } - return graphql.WriterFunc(func(w io.Writer) { - w.Write([]byte{'{'}) - graphql.MarshalString(field.Alias).MarshalGQL(w) - w.Write([]byte{':'}) - ec.marshalOString2ᚖstring(ctx, field.Selections, res).MarshalGQL(w) - w.Write([]byte{'}'}) - }) } } @@ -4881,17 +4893,21 @@ func (ec *executionContext) _Subscription_directiveNullableArg(ctx context.Conte return nil } return func(ctx context.Context) graphql.Marshaler { - res, ok := <-resTmp.(<-chan *string) - if !ok { + select { + case res, ok := <-resTmp.(<-chan *string): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalOString2ᚖstring(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): return nil } - return graphql.WriterFunc(func(w io.Writer) { - w.Write([]byte{'{'}) - graphql.MarshalString(field.Alias).MarshalGQL(w) - w.Write([]byte{':'}) - ec.marshalOString2ᚖstring(ctx, field.Selections, res).MarshalGQL(w) - w.Write([]byte{'}'}) - }) } } @@ -4966,17 +4982,21 @@ func (ec *executionContext) _Subscription_directiveDouble(ctx context.Context, f return nil } return func(ctx context.Context) graphql.Marshaler { - res, ok := <-resTmp.(<-chan *string) - if !ok { + select { + case res, ok := <-resTmp.(<-chan *string): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalOString2ᚖstring(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): return nil } - return graphql.WriterFunc(func(w io.Writer) { - w.Write([]byte{'{'}) - graphql.MarshalString(field.Alias).MarshalGQL(w) - w.Write([]byte{':'}) - ec.marshalOString2ᚖstring(ctx, field.Selections, res).MarshalGQL(w) - w.Write([]byte{'}'}) - }) } } @@ -5034,17 +5054,21 @@ func (ec *executionContext) _Subscription_directiveUnimplemented(ctx context.Con return nil } return func(ctx context.Context) graphql.Marshaler { - res, ok := <-resTmp.(<-chan *string) - if !ok { + select { + case res, ok := <-resTmp.(<-chan *string): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalOString2ᚖstring(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): return nil } - return graphql.WriterFunc(func(w io.Writer) { - w.Write([]byte{'{'}) - graphql.MarshalString(field.Alias).MarshalGQL(w) - w.Write([]byte{':'}) - ec.marshalOString2ᚖstring(ctx, field.Selections, res).MarshalGQL(w) - w.Write([]byte{'}'}) - }) } } @@ -5082,17 +5106,21 @@ func (ec *executionContext) _Subscription_issue896b(ctx context.Context, field g return nil } return func(ctx context.Context) graphql.Marshaler { - res, ok := <-resTmp.(<-chan []*CheckIssue896) - if !ok { + select { + case res, ok := <-resTmp.(<-chan []*CheckIssue896): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalOCheckIssue8962ᚕᚖgithubᚗcomᚋ99designsᚋgqlgenᚋcodegenᚋtestserverᚋfollowschemaᚐCheckIssue896(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): return nil } - return graphql.WriterFunc(func(w io.Writer) { - w.Write([]byte{'{'}) - graphql.MarshalString(field.Alias).MarshalGQL(w) - w.Write([]byte{':'}) - ec.marshalOCheckIssue8962ᚕᚖgithubᚗcomᚋ99designsᚋgqlgenᚋcodegenᚋtestserverᚋfollowschemaᚐCheckIssue896(ctx, field.Selections, res).MarshalGQL(w) - w.Write([]byte{'}'}) - }) } } @@ -5137,17 +5165,21 @@ func (ec *executionContext) _Subscription_errorRequired(ctx context.Context, fie return nil } return func(ctx context.Context) graphql.Marshaler { - res, ok := <-resTmp.(<-chan *Error) - if !ok { + select { + case res, ok := <-resTmp.(<-chan *Error): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalNError2ᚖgithubᚗcomᚋ99designsᚋgqlgenᚋcodegenᚋtestserverᚋfollowschemaᚐError(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): return nil } - return graphql.WriterFunc(func(w io.Writer) { - w.Write([]byte{'{'}) - graphql.MarshalString(field.Alias).MarshalGQL(w) - w.Write([]byte{':'}) - ec.marshalNError2ᚖgithubᚗcomᚋ99designsᚋgqlgenᚋcodegenᚋtestserverᚋfollowschemaᚐError(ctx, field.Selections, res).MarshalGQL(w) - w.Write([]byte{'}'}) - }) } } diff --git a/codegen/testserver/singlefile/generated.go b/codegen/testserver/singlefile/generated.go index 394c177293..9df81d6dd3 100644 --- a/codegen/testserver/singlefile/generated.go +++ b/codegen/testserver/singlefile/generated.go @@ -10706,17 +10706,21 @@ func (ec *executionContext) _Subscription_updated(ctx context.Context, field gra return nil } return func(ctx context.Context) graphql.Marshaler { - res, ok := <-resTmp.(<-chan string) - if !ok { + select { + case res, ok := <-resTmp.(<-chan string): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalNString2string(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): return nil } - return graphql.WriterFunc(func(w io.Writer) { - w.Write([]byte{'{'}) - graphql.MarshalString(field.Alias).MarshalGQL(w) - w.Write([]byte{':'}) - ec.marshalNString2string(ctx, field.Selections, res).MarshalGQL(w) - w.Write([]byte{'}'}) - }) } } @@ -10757,17 +10761,21 @@ func (ec *executionContext) _Subscription_initPayload(ctx context.Context, field return nil } return func(ctx context.Context) graphql.Marshaler { - res, ok := <-resTmp.(<-chan string) - if !ok { + select { + case res, ok := <-resTmp.(<-chan string): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalNString2string(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): return nil } - return graphql.WriterFunc(func(w io.Writer) { - w.Write([]byte{'{'}) - graphql.MarshalString(field.Alias).MarshalGQL(w) - w.Write([]byte{':'}) - ec.marshalNString2string(ctx, field.Selections, res).MarshalGQL(w) - w.Write([]byte{'}'}) - }) } } @@ -10805,17 +10813,21 @@ func (ec *executionContext) _Subscription_directiveArg(ctx context.Context, fiel return nil } return func(ctx context.Context) graphql.Marshaler { - res, ok := <-resTmp.(<-chan *string) - if !ok { + select { + case res, ok := <-resTmp.(<-chan *string): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalOString2ᚖstring(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): return nil } - return graphql.WriterFunc(func(w io.Writer) { - w.Write([]byte{'{'}) - graphql.MarshalString(field.Alias).MarshalGQL(w) - w.Write([]byte{':'}) - ec.marshalOString2ᚖstring(ctx, field.Selections, res).MarshalGQL(w) - w.Write([]byte{'}'}) - }) } } @@ -10864,17 +10876,21 @@ func (ec *executionContext) _Subscription_directiveNullableArg(ctx context.Conte return nil } return func(ctx context.Context) graphql.Marshaler { - res, ok := <-resTmp.(<-chan *string) - if !ok { + select { + case res, ok := <-resTmp.(<-chan *string): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalOString2ᚖstring(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): return nil } - return graphql.WriterFunc(func(w io.Writer) { - w.Write([]byte{'{'}) - graphql.MarshalString(field.Alias).MarshalGQL(w) - w.Write([]byte{':'}) - ec.marshalOString2ᚖstring(ctx, field.Selections, res).MarshalGQL(w) - w.Write([]byte{'}'}) - }) } } @@ -10949,17 +10965,21 @@ func (ec *executionContext) _Subscription_directiveDouble(ctx context.Context, f return nil } return func(ctx context.Context) graphql.Marshaler { - res, ok := <-resTmp.(<-chan *string) - if !ok { + select { + case res, ok := <-resTmp.(<-chan *string): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalOString2ᚖstring(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): return nil } - return graphql.WriterFunc(func(w io.Writer) { - w.Write([]byte{'{'}) - graphql.MarshalString(field.Alias).MarshalGQL(w) - w.Write([]byte{':'}) - ec.marshalOString2ᚖstring(ctx, field.Selections, res).MarshalGQL(w) - w.Write([]byte{'}'}) - }) } } @@ -11017,17 +11037,21 @@ func (ec *executionContext) _Subscription_directiveUnimplemented(ctx context.Con return nil } return func(ctx context.Context) graphql.Marshaler { - res, ok := <-resTmp.(<-chan *string) - if !ok { + select { + case res, ok := <-resTmp.(<-chan *string): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalOString2ᚖstring(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): return nil } - return graphql.WriterFunc(func(w io.Writer) { - w.Write([]byte{'{'}) - graphql.MarshalString(field.Alias).MarshalGQL(w) - w.Write([]byte{':'}) - ec.marshalOString2ᚖstring(ctx, field.Selections, res).MarshalGQL(w) - w.Write([]byte{'}'}) - }) } } @@ -11065,17 +11089,21 @@ func (ec *executionContext) _Subscription_issue896b(ctx context.Context, field g return nil } return func(ctx context.Context) graphql.Marshaler { - res, ok := <-resTmp.(<-chan []*CheckIssue896) - if !ok { + select { + case res, ok := <-resTmp.(<-chan []*CheckIssue896): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalOCheckIssue8962ᚕᚖgithubᚗcomᚋ99designsᚋgqlgenᚋcodegenᚋtestserverᚋsinglefileᚐCheckIssue896(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): return nil } - return graphql.WriterFunc(func(w io.Writer) { - w.Write([]byte{'{'}) - graphql.MarshalString(field.Alias).MarshalGQL(w) - w.Write([]byte{':'}) - ec.marshalOCheckIssue8962ᚕᚖgithubᚗcomᚋ99designsᚋgqlgenᚋcodegenᚋtestserverᚋsinglefileᚐCheckIssue896(ctx, field.Selections, res).MarshalGQL(w) - w.Write([]byte{'}'}) - }) } } @@ -11120,17 +11148,21 @@ func (ec *executionContext) _Subscription_errorRequired(ctx context.Context, fie return nil } return func(ctx context.Context) graphql.Marshaler { - res, ok := <-resTmp.(<-chan *Error) - if !ok { + select { + case res, ok := <-resTmp.(<-chan *Error): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalNError2ᚖgithubᚗcomᚋ99designsᚋgqlgenᚋcodegenᚋtestserverᚋsinglefileᚐError(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): return nil } - return graphql.WriterFunc(func(w io.Writer) { - w.Write([]byte{'{'}) - graphql.MarshalString(field.Alias).MarshalGQL(w) - w.Write([]byte{':'}) - ec.marshalNError2ᚖgithubᚗcomᚋ99designsᚋgqlgenᚋcodegenᚋtestserverᚋsinglefileᚐError(ctx, field.Selections, res).MarshalGQL(w) - w.Write([]byte{'}'}) - }) } } diff --git a/graphql/handler/transport/websocket.go b/graphql/handler/transport/websocket.go index 67afe5d56c..51b1104ccc 100644 --- a/graphql/handler/transport/websocket.go +++ b/graphql/handler/transport/websocket.go @@ -371,20 +371,12 @@ func (c *wsConnection) subscribe(start time.Time, msg *message) { responses, ctx := c.exec.DispatchOperation(ctx, rc) for { - // prevents goroutine leak - select { - case <-ctx.Done(): - return - default: - { - response := responses(ctx) - if response == nil { - break - } - - c.sendResponse(msg.id, response) - } + response := responses(ctx) + if response == nil { + break } + + c.sendResponse(msg.id, response) } // complete and context cancel comes from the defer