Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve csv parsing #5711

Merged
merged 10 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 98 additions & 103 deletions src/Microsoft.Data.Analysis/DataFrame.IO.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private static DataFrameColumn CreateColumn(Type kind, string[] columnNames, int
return ret;
}

private static DataFrame ReadCsvLinesIntoDataFrame(IEnumerable<string> lines,
private static DataFrame ReadCsvLinesIntoDataFrame(WrappedStreamReaderOrStringReader wrappedReader,
char separator = ',', bool header = true,
string[] columnNames = null, Type[] dataTypes = null,
long numberOfRowsToRead = -1, int guessRows = 10, bool addIndexColumn = false
Expand All @@ -183,140 +183,139 @@ private static DataFrame ReadCsvLinesIntoDataFrame(IEnumerable<string> lines,
throw new ArgumentException(string.Format(Strings.ExpectedEitherGuessRowsOrDataTypes, nameof(guessRows), nameof(dataTypes)));
}

var linesForGuessType = new List<string[]>();
long rowline = 0;
int numberOfColumns = dataTypes?.Length ?? 0;

if (header == true && numberOfRowsToRead != -1)
List<DataFrameColumn> columns;
string[] fields;
using (var textReader = wrappedReader.GetTextReader())
{
numberOfRowsToRead++;
}
TextFieldParser parser = new TextFieldParser(textReader);
parser.SetDelimiters(separator.ToString());

List<DataFrameColumn> columns;
// First pass: schema and number of rows.
string line = null;
var linesForGuessType = new List<string[]>();
long rowline = 0;
int numberOfColumns = dataTypes?.Length ?? 0;

var enumerator = lines.GetEnumerator();
while (enumerator.MoveNext())
{
line = enumerator.Current;
if ((numberOfRowsToRead == -1) || rowline < numberOfRowsToRead)
if (header == true && numberOfRowsToRead != -1)
{
numberOfRowsToRead++;
}

// First pass: schema and number of rows.
while ((fields = parser.ReadFields()) != null)
{
if (linesForGuessType.Count < guessRows || (header && rowline == 0))
if ((numberOfRowsToRead == -1) || rowline < numberOfRowsToRead)
{
var spl = line.Split(separator);
if (header && rowline == 0)
if (linesForGuessType.Count < guessRows || (header && rowline == 0))
{
if (columnNames == null)
if (header && rowline == 0)
{
columnNames = spl;
if (columnNames == null)
{
columnNames = fields;
}
}
else
{
linesForGuessType.Add(fields);
numberOfColumns = Math.Max(numberOfColumns, fields.Length);
}
}
else
{
linesForGuessType.Add(spl);
numberOfColumns = Math.Max(numberOfColumns, spl.Length);
}
}
++rowline;
if (rowline == guessRows || guessRows == 0)
{
break;
}
}
++rowline;
if (rowline == guessRows || guessRows == 0)

if (rowline == 0)
{
break;
throw new FormatException(Strings.EmptyFile);
}
}

if (rowline == 0)
{
throw new FormatException(Strings.EmptyFile);
}

columns = new List<DataFrameColumn>(numberOfColumns);
// Guesses types or looks up dataTypes and adds columns.
for (int i = 0; i < numberOfColumns; ++i)
{
Type kind = dataTypes == null ? GuessKind(i, linesForGuessType) : dataTypes[i];
columns.Add(CreateColumn(kind, columnNames, i));
columns = new List<DataFrameColumn>(numberOfColumns);
// Guesses types or looks up dataTypes and adds columns.
for (int i = 0; i < numberOfColumns; ++i)
{
Type kind = dataTypes == null ? GuessKind(i, linesForGuessType) : dataTypes[i];
columns.Add(CreateColumn(kind, columnNames, i));
}
}

DataFrame ret = new DataFrame(columns);
line = null;

// Fill values.
enumerator.Reset();
rowline = 0;
while (enumerator.MoveNext() && (numberOfRowsToRead == -1 || rowline < numberOfRowsToRead))
using (var textReader = wrappedReader.GetTextReader())
{
line = enumerator.Current;
var spl = line.Split(separator);
if (header && rowline == 0)
{
// Skips.
}
else
TextFieldParser parser = new TextFieldParser(textReader);
parser.SetDelimiters(separator.ToString());

long rowline = 0;
while ((fields = parser.ReadFields()) != null && (numberOfRowsToRead == -1 || rowline < numberOfRowsToRead))
{
ret.Append(spl, inPlace: true);
if (header && rowline == 0)
{
// Skips.
}
else
{
ret.Append(fields, inPlace: true);
}
++rowline;
}
++rowline;
}

if (addIndexColumn)
{
PrimitiveDataFrameColumn<int> indexColumn = new PrimitiveDataFrameColumn<int>("IndexColumn", columns[0].Length);
for (int i = 0; i < columns[0].Length; i++)
if (addIndexColumn)
{
indexColumn[i] = i;
PrimitiveDataFrameColumn<int> indexColumn = new PrimitiveDataFrameColumn<int>("IndexColumn", columns[0].Length);
for (int i = 0; i < columns[0].Length; i++)
{
indexColumn[i] = i;
}
columns.Insert(0, indexColumn);
}
columns.Insert(0, indexColumn);
}
return ret;
}

private class CsvLines : IEnumerable<string>
{
private CsvLineEnumerator enumerator;
public CsvLines(CsvLineEnumerator csvLineEnumerator)
{
enumerator = csvLineEnumerator;
}

public IEnumerator<string> GetEnumerator() => enumerator;

IEnumerator IEnumerable.GetEnumerator() => enumerator;
return ret;
}

private class CsvLineEnumerator : IEnumerator<string>
private class WrappedStreamReaderOrStringReader
{
private StreamReader streamReader;
private string currentLine;
private long streamStartPosition;
public CsvLineEnumerator(StreamReader csvStream)
{
streamStartPosition = csvStream.BaseStream.Position;
streamReader = csvStream;
currentLine = null;
}

public string Current => currentLine;

object IEnumerator.Current => currentLine;
private Stream _stream;
private long _initialPosition;
private Encoding _encoding;
private string _csvString;

public void Dispose()
public WrappedStreamReaderOrStringReader(Stream stream, Encoding encoding)
{
throw new NotImplementedException();
_stream = stream;
_initialPosition = stream.Position;
_encoding = encoding;
_csvString = null;
}

public bool MoveNext()
public WrappedStreamReaderOrStringReader(string csvString)
{
currentLine = streamReader.ReadLine();
return currentLine != null;
_csvString = csvString;
_initialPosition = 0;
_encoding = null;
_stream = null;
}

public void Reset()
// Returns a new TextReader. If the wrapped object is a stream, the stream is reset to its initial position.
public TextReader GetTextReader()
{
streamReader.DiscardBufferedData();
streamReader.BaseStream.Seek(streamStartPosition, SeekOrigin.Begin);
if (_stream != null)
{
pgovind marked this conversation as resolved.
Show resolved Hide resolved
_stream.Seek(_initialPosition, SeekOrigin.Begin);
return new StreamReader(_stream, _encoding, detectEncodingFromByteOrderMarks: true, DefaultStreamReaderBufferSize, leaveOpen: true);
}
else
{
return new StringReader(_csvString);
}

}

}

/// <summary>
Expand All @@ -336,8 +335,8 @@ public static DataFrame LoadCsvFromString(string csvString,
string[] columnNames = null, Type[] dataTypes = null,
long numberOfRowsToRead = -1, int guessRows = 10, bool addIndexColumn = false)
{
string[] lines = csvString.Split(new[] { Environment.NewLine }, StringSplitOptions.None);
return ReadCsvLinesIntoDataFrame(lines, separator, header, columnNames, dataTypes, numberOfRowsToRead, guessRows, addIndexColumn);
WrappedStreamReaderOrStringReader wrappedStreamReaderOrStringReader = new WrappedStreamReaderOrStringReader(csvString);
return ReadCsvLinesIntoDataFrame(wrappedStreamReaderOrStringReader, separator, header, columnNames, dataTypes, numberOfRowsToRead, guessRows, addIndexColumn);
}

/// <summary>
Expand Down Expand Up @@ -369,12 +368,8 @@ public static DataFrame LoadCsv(Stream csvStream,
throw new ArgumentException(string.Format(Strings.ExpectedEitherGuessRowsOrDataTypes, nameof(guessRows), nameof(dataTypes)));
}

using (var streamReader = new StreamReader(csvStream, encoding ?? Encoding.UTF8, detectEncodingFromByteOrderMarks: true, DefaultStreamReaderBufferSize, leaveOpen: true))
{
CsvLineEnumerator linesEnumerator = new CsvLineEnumerator(streamReader);
IEnumerable<string> lines = new CsvLines(linesEnumerator);
return ReadCsvLinesIntoDataFrame(lines, separator, header, columnNames, dataTypes, numberOfRowsToRead, guessRows, addIndexColumn);
}
WrappedStreamReaderOrStringReader wrappedStreamReaderOrStringReader = new WrappedStreamReaderOrStringReader(csvStream, encoding ?? Encoding.UTF8);
return ReadCsvLinesIntoDataFrame(wrappedStreamReaderOrStringReader, separator, header, columnNames, dataTypes, numberOfRowsToRead, guessRows, addIndexColumn);
}

/// <summary>
Expand Down
Loading