Skip to content

Commit

Permalink
go/runtime/host/protocol/connection: Wait for request handlers to finish
Browse files Browse the repository at this point in the history
  • Loading branch information
peternose committed Nov 13, 2023
1 parent 82e0afd commit 8cb671e
Showing 1 changed file with 20 additions and 10 deletions.
30 changes: 20 additions & 10 deletions go/runtime/host/protocol/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,6 @@ func (c *connection) readResponse(ctx context.Context, respCh <-chan *Body) (*Bo
}

func (c *connection) workerOutgoing() {
defer c.quitWg.Done()

for {
select {
case msg := <-c.outCh:
Expand Down Expand Up @@ -462,16 +460,18 @@ func (c *connection) handleMessage(ctx context.Context, message *Message) {
}

func (c *connection) workerIncoming() {
// Wait for request handlers to finish.
var wg sync.WaitGroup
defer wg.Wait()

// Cancel all request handlers.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

defer func() {
// Close connection and signal that connection is closed.
_ = c.conn.Close()
close(c.closeCh)

// Cancel all request handlers.
cancel()

c.quitWg.Done()
}()

for {
Expand All @@ -486,7 +486,11 @@ func (c *connection) workerIncoming() {
}

// Handle message in a separate goroutine.
go c.handleMessage(ctx, &message)
wg.Add(1)
go func() {
defer wg.Done()
c.handleMessage(ctx, &message)
}()
}
}

Expand All @@ -502,8 +506,14 @@ func (c *connection) initConn(conn net.Conn) {
c.codec = cbor.NewMessageCodec(conn, moduleName)

c.quitWg.Add(2)
go c.workerIncoming()
go c.workerOutgoing()
go func() {
defer c.quitWg.Done()
c.workerIncoming()
}()
go func() {
defer c.quitWg.Done()
c.workerOutgoing()
}()

// Change protocol state to Initializing so that some of the requests are allowed.
c.setStateLocked(stateInitializing)
Expand Down

0 comments on commit 8cb671e

Please sign in to comment.