Skip to content

Commit

Permalink
sql: replace WaitGroup in CopyIn with a channel
Browse files Browse the repository at this point in the history
We've recently seen "negative WaitGroup counter" server crash during
COPY FROM execution a few times, but we have been unable to understand
the root cause. It appears that the problem can happen right after the
COPY execution is canceled due to `statement_timeout`. The
synchronization setup is the following:
- the network-handling goroutine calls `wg.Add(1)`, pushes CopyIn
command onto the stmt buf, and then blocks via `wg.Wait()`
- the copy-handling connExecutor calls `wg.Done()` in the defer of
`execCopyIn`. It must be the case that that defer is executed at least
twice, but it's unclear to me how that can happen.

In the absence of understanding of how this can happen and with no
reproduction, this commit attempts to mitigate the problem by switching
from the wait group to a channel. In particular, now:
- the network-handling goroutine will block until something is sent on
the channel
- the copy-handling connExecutor goroutine will send on the channel in
the defer in `execCopyIn`. The channel is buffered, so up to 4 sends on
the channel are allowed even though the network-handling goroutine will
be unblocked on the very first one.

The risk of this change is that with multiple sends on the channel we
enter into an undefined territory. In particular, the contract is such
that `execCopyIn` takes over the connection, and now it seems possible
that the network-handling goroutine wakes up after the first send while
the copy-handling connExecutor doesn't exit, so the latter could
continue reading from the connection. In other words, we replace
a server crash (which is very bad) with an undefined behavior (which
could be very bad too).

Release note: None
  • Loading branch information
yuzefovich committed Dec 6, 2023
1 parent 51c554b commit 396f7b2
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 8 deletions.
4 changes: 3 additions & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3006,7 +3006,9 @@ func (ex *connExecutor) execCopyIn(
}()

// When we're done, unblock the network connection.
defer cmd.CopyDone.Done()
defer func() {
cmd.CopyDone <- struct{}{}
}()

// The connExecutor state machine has already set us up with a txn at this
// point.
Expand Down
9 changes: 7 additions & 2 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,14 @@ type CopyIn struct {
// Conn is the network connection. Execution of the CopyFrom statement takes
// control of the connection.
Conn pgwirebase.Conn
// CopyDone is decremented once execution finishes, signaling that control of
// CopyDone is sent on once execution finishes, signaling that control of
// the connection is being handed back to the network routine.
CopyDone *sync.WaitGroup
//
// Ideally, this would have been a sync.WaitGroup, but we've seen a few
// times when the counter became negative leading to a server crash
// (#112095), so the wait group was replaced with a buffered channel that is
// never closed.
CopyDone chan<- struct{}
// TimeReceived is the time at which the message was received
// from the client. Used to compute the service latency.
TimeReceived time.Time
Expand Down
15 changes: 10 additions & 5 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"io"
"net"
"strconv"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -409,23 +408,29 @@ func (c *conn) handleSimpleQuery(
"COPY together with other statements in a query string is not supported"),
})
}
copyDone := sync.WaitGroup{}
copyDone.Add(1)
// copyDone should act as a wait group to block this goroutine until
// the copy machine returns control to us. That is done by sending
// on the channel, so we could have used an unbuffered channel here.
// However, due to not fully understood circumstances (see #112095),
// sometimes the send can happen more than once, so we create a
// buffered channel with a buffer of 4 to make it extremely unlikely
// that the connExecutor for the COPY will get stuck.
copyDone := make(chan struct{}, 4)
if err := c.stmtBuf.Push(
ctx,
sql.CopyIn{
Conn: c,
ParsedStmt: stmts[i],
Stmt: cp,
CopyDone: &copyDone,
CopyDone: copyDone,
TimeReceived: timeReceived,
ParseStart: startParse,
ParseEnd: endParse,
},
); err != nil {
return err
}
copyDone.Wait()
<-copyDone
return nil
}
if cp, ok := stmts[i].AST.(*tree.CopyTo); ok {
Expand Down

0 comments on commit 396f7b2

Please sign in to comment.