From 396f7b2a041f25ccd37241f4b7eeacddb4a84e4d Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 6 Dec 2023 10:08:05 -0800 Subject: [PATCH] sql: replace WaitGroup in CopyIn with a channel We've recently seen "negative WaitGroup counter" server crash during COPY FROM execution a few times, but we have been unable to understand the root cause. It appears that the problem can happen right after the COPY execution is canceled due to `statement_timeout`. The synchronization setup is the following: - the network-handling goroutine calls `wg.Add(1)`, pushes CopyIn command onto the stmt buf, and then blocks via `wg.Wait()` - the copy-handling connExecutor calls `wg.Done()` in the defer of `execCopyIn`. It must be the case that that defer is executed at least twice, but it's unclear to me how that can happen. In the absence of understanding of how this can happen and with no reproduction, this commit attempts to mitigate the problem by switching from the wait group to a channel. In particular, now: - the network-handling goroutine will block until something is sent on the channel - the copy-handling connExecutor goroutine will send on the channel in the defer in `execCopyIn`. The channel is buffered, so up to 4 sends on the channel are allowed even though the network-handling goroutine will be unblocked on the very first one. The risk of this change is that with multiple sends on the channel we enter into an undefined territory. In particular, the contract is such that `execCopyIn` takes over the connection, and now it seems possible that the network-handling goroutine wakes up after the first send while the copy-handling connExecutor doesn't exit, so the latter could continue reading from the connection. In other words, we replace a server crash (which is very bad) with an undefined behavior (which could be very bad too). Release note: None --- pkg/sql/conn_executor.go | 4 +++- pkg/sql/conn_io.go | 9 +++++++-- pkg/sql/pgwire/conn.go | 15 ++++++++++----- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index a8c87a3d8ec0..caf173b62cb5 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -3006,7 +3006,9 @@ func (ex *connExecutor) execCopyIn( }() // When we're done, unblock the network connection. - defer cmd.CopyDone.Done() + defer func() { + cmd.CopyDone <- struct{}{} + }() // The connExecutor state machine has already set us up with a txn at this // point. diff --git a/pkg/sql/conn_io.go b/pkg/sql/conn_io.go index 2811ad65f4ed..e784ae483e48 100644 --- a/pkg/sql/conn_io.go +++ b/pkg/sql/conn_io.go @@ -367,9 +367,14 @@ type CopyIn struct { // Conn is the network connection. Execution of the CopyFrom statement takes // control of the connection. Conn pgwirebase.Conn - // CopyDone is decremented once execution finishes, signaling that control of + // CopyDone is sent on once execution finishes, signaling that control of // the connection is being handed back to the network routine. - CopyDone *sync.WaitGroup + // + // Ideally, this would have been a sync.WaitGroup, but we've seen a few + // times when the counter became negative leading to a server crash + // (#112095), so the wait group was replaced with a buffered channel that is + // never closed. + CopyDone chan<- struct{} // TimeReceived is the time at which the message was received // from the client. Used to compute the service latency. TimeReceived time.Time diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index 0f054d07875e..f7507fbb146d 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -18,7 +18,6 @@ import ( "io" "net" "strconv" - "sync" "sync/atomic" "time" @@ -409,15 +408,21 @@ func (c *conn) handleSimpleQuery( "COPY together with other statements in a query string is not supported"), }) } - copyDone := sync.WaitGroup{} - copyDone.Add(1) + // copyDone should act as a wait group to block this goroutine until + // the copy machine returns control to us. That is done by sending + // on the channel, so we could have used an unbuffered channel here. + // However, due to not fully understood circumstances (see #112095), + // sometimes the send can happen more than once, so we create a + // buffered channel with a buffer of 4 to make it extremely unlikely + // that the connExecutor for the COPY will get stuck. + copyDone := make(chan struct{}, 4) if err := c.stmtBuf.Push( ctx, sql.CopyIn{ Conn: c, ParsedStmt: stmts[i], Stmt: cp, - CopyDone: ©Done, + CopyDone: copyDone, TimeReceived: timeReceived, ParseStart: startParse, ParseEnd: endParse, @@ -425,7 +430,7 @@ func (c *conn) handleSimpleQuery( ); err != nil { return err } - copyDone.Wait() + <-copyDone return nil } if cp, ok := stmts[i].AST.(*tree.CopyTo); ok {