Skip to content

Commit

Permalink
sql/pgwire: fix service latency for COPY
Browse files Browse the repository at this point in the history
Release note (bug fix): Fixed the latency that is reported for
COPY comands in the CLI and stats reporting.

Release justification: low risk bug fix
  • Loading branch information
rafiss committed Aug 29, 2022
1 parent db1554b commit b6f4a3b
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 2 deletions.
10 changes: 10 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2047,9 +2047,19 @@ func (ex *connExecutor) execCmd() error {
}
}
case CopyIn:
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionQueryReceived, tcmd.TimeReceived)
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionStartParse, tcmd.ParseStart)
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionEndParse, tcmd.ParseEnd)
res = ex.clientComm.CreateCopyInResult(pos)
var err error
ev, payload, err = ex.execCopyIn(ctx, tcmd)
// Note: we write to ex.statsCollector.phaseTimes, instead of ex.phaseTimes,
// because:
// - stats use ex.statsCollector, not ex.phasetimes.
// - ex.statsCollector merely contains a copy of the times, that
// was created when the statement started executing (via the
// reset() method).
ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.SessionQueryServiced, timeutil.Now())
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,13 @@ type CopyIn struct {
// CopyDone is decremented once execution finishes, signaling that control of
// the connection is being handed back to the network routine.
CopyDone *sync.WaitGroup
// TimeReceived is the time at which the message was received
// from the client. Used to compute the service latency.
TimeReceived time.Time
// ParseStart/ParseEnd are the timing info for parsing of the query. Used for
// stats reporting.
ParseStart time.Time
ParseEnd time.Time
}

// command implements the Command interface.
Expand Down
12 changes: 11 additions & 1 deletion pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,17 @@ func (c *conn) handleSimpleQuery(
}
copyDone := sync.WaitGroup{}
copyDone.Add(1)
if err := c.stmtBuf.Push(ctx, sql.CopyIn{Conn: c, Stmt: cp, CopyDone: &copyDone}); err != nil {
if err := c.stmtBuf.Push(
ctx,
sql.CopyIn{
Conn: c,
Stmt: cp,
CopyDone: &copyDone,
TimeReceived: timeReceived,
ParseStart: startParse,
ParseEnd: endParse,
},
); err != nil {
return err
}
copyDone.Wait()
Expand Down
34 changes: 34 additions & 0 deletions pkg/sql/pgwire/testdata/pgtest/copy
Original file line number Diff line number Diff line change
Expand Up @@ -925,3 +925,37 @@ ReadyForQuery
{"Type":"DataRow","Values":[{"text":"{2,3,4,5,6}"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 2"}
{"Type":"ReadyForQuery","TxStatus":"I"}


send crdb_only
Query {"String": "COPY c(d) FROM STDIN WITH CSV"}
CopyData {"Data": "\"{0,1}\"\n"}
CopyData {"Data": "\"{2,3,4,5,6}\"\n"}
CopyData {"Data": "\\.\n"}
CopyDone
----


until crdb_only ignore=RowDescription
ReadyForQuery
----
{"Type":"CopyInResponse","ColumnFormatCodes":[0]}
{"Type":"CommandComplete","CommandTag":"COPY 2"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# SHOW LAST QUERY STATISTICS cannot be used as a statement source, so we
# store the result as a variable in the test.
let $copy_service_latency crdb_only
Query {"String": "SHOW LAST QUERY STATISTICS RETURNING service_latency"}
----

send crdb_only
Query {"String": "SELECT '$copy_service_latency'::INTERVAL > '0 seconds'::INTERVAL"}
----

until crdb_only ignore=RowDescription
ReadyForQuery
----
{"Type":"DataRow","Values":[{"text":"t"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}
6 changes: 5 additions & 1 deletion pkg/testutils/pgtest/datadriven.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ func RunTest(t *testing.T, path, addr, user string) {
return d.Expected

case "let":
require.Len(t, d.CmdArgs, 1, "only one argument permitted for let")
if (d.HasArg("crdb_only") && !p.isCockroachDB) ||
(d.HasArg("noncrdb_only") && p.isCockroachDB) {
return d.Expected
}
require.GreaterOrEqual(t, len(d.CmdArgs), 1, "at least one argument required for let")
require.Truef(t, strings.HasPrefix(d.CmdArgs[0].Key, "$"), "let argument must begin with '$'")
lines := strings.Split(d.Input, "\n")
require.Len(t, lines, 1, "only one input command permitted for let")
Expand Down

0 comments on commit b6f4a3b

Please sign in to comment.