Skip to content

Commit

Permalink
sql: fix COPY CSV so it handles multiple records at a time
Browse files Browse the repository at this point in the history
This was broken since the golang csv reader reads the entire input all
at once, so the underlying buffer is consumed. This caused the loop that
reads each record to terminate early.

Release note (bug fix): Fixed the COPY CSV command so that it handles
multiple records separated by newline characters.
  • Loading branch information
rafiss committed Aug 7, 2021
1 parent 5a43a7e commit 00311d9
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 1 deletion.
13 changes: 12 additions & 1 deletion pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,17 +323,25 @@ func (c *copyMachine) processCopyData(ctx context.Context, data string, final bo
}
c.buf.WriteString(data)
var readFn func(ctx context.Context, final bool) (brk bool, err error)
var checkLoopFn func() bool
switch c.format {
case tree.CopyFormatText:
readFn = c.readTextData
checkLoopFn = func() bool { return c.buf.Len() > 0 }
case tree.CopyFormatBinary:
readFn = c.readBinaryData
checkLoopFn = func() bool { return c.buf.Len() > 0 }
case tree.CopyFormatCSV:
readFn = c.readCSVData
// Never exit the loop from this check. Instead, it's up to the readCSVData
// function to break when it's done reading. This is because the csv.Reader
// consumes all of c.buf in one shot, so checking if c.buf is empty would
// cause us to exit the loop early.
checkLoopFn = func() bool { return true }
default:
panic("unknown copy format")
}
for c.buf.Len() > 0 {
for checkLoopFn() {
brk, err := readFn(ctx, final)
if err != nil {
return err
Expand Down Expand Up @@ -378,6 +386,9 @@ func (c *copyMachine) readCSVData(ctx context.Context, final bool) (brk bool, er
record, err := c.csvReader.Read()
// Look for end of data before checking for errors, since a field count
// error will still return record data.
if record == nil && err == io.EOF {
return true, nil
}
if len(record) == 1 && record[0] == endOfData && c.buf.Len() == 0 {
return true, nil
}
Expand Down
48 changes: 48 additions & 0 deletions pkg/sql/pgwire/testdata/pgtest/copy
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,51 @@ ReadyForQuery
{"Type":"CopyInResponse","ColumnFormatCodes":[0,0]}
{"Type":"ErrorResponse","Code":"22P04"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "DELETE FROM t"}
Query {"String": "COPY t FROM STDIN CSV"}
CopyData {"Data": "1,one\n2,two\n3,three"}
CopyDone
Query {"String": "SELECT * FROM t ORDER BY i"}
----

until ignore=RowDescription
ReadyForQuery
ReadyForQuery
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"DELETE 3"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CopyInResponse","ColumnFormatCodes":[0,0]}
{"Type":"CommandComplete","CommandTag":"COPY 3"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"one"}]}
{"Type":"DataRow","Values":[{"text":"2"},{"text":"two"}]}
{"Type":"DataRow","Values":[{"text":"3"},{"text":"three"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 3"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "DELETE FROM t"}
Query {"String": "COPY t FROM STDIN DELIMITER ',' NULL ''"}
CopyData {"Data": "1,one\n2,two\n3,three"}
CopyDone
Query {"String": "SELECT * FROM t ORDER BY i"}
----

until ignore=RowDescription
ReadyForQuery
ReadyForQuery
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"DELETE 3"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CopyInResponse","ColumnFormatCodes":[0,0]}
{"Type":"CommandComplete","CommandTag":"COPY 3"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"one"}]}
{"Type":"DataRow","Values":[{"text":"2"},{"text":"two"}]}
{"Type":"DataRow","Values":[{"text":"3"},{"text":"three"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 3"}
{"Type":"ReadyForQuery","TxStatus":"I"}

0 comments on commit 00311d9

Please sign in to comment.