Skip to content

Commit

Permalink
Merge 382f948 into 17aec1e
Browse files Browse the repository at this point in the history
  • Loading branch information
Prashanth Govindarajan authored Mar 18, 2021
2 parents 17aec1e + 382f948 commit 79dcab6
Show file tree
Hide file tree
Showing 9 changed files with 1,818 additions and 188 deletions.
203 changes: 100 additions & 103 deletions src/Microsoft.Data.Analysis/DataFrame.IO.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private static DataFrameColumn CreateColumn(Type kind, string[] columnNames, int
return ret;
}

private static DataFrame ReadCsvLinesIntoDataFrame(IEnumerable<string> lines,
private static DataFrame ReadCsvLinesIntoDataFrame(WrappedStreamReaderOrStringReader wrappedReader,
char separator = ',', bool header = true,
string[] columnNames = null, Type[] dataTypes = null,
long numberOfRowsToRead = -1, int guessRows = 10, bool addIndexColumn = false
Expand All @@ -183,140 +183,141 @@ private static DataFrame ReadCsvLinesIntoDataFrame(IEnumerable<string> lines,
throw new ArgumentException(string.Format(Strings.ExpectedEitherGuessRowsOrDataTypes, nameof(guessRows), nameof(dataTypes)));
}

var linesForGuessType = new List<string[]>();
long rowline = 0;
int numberOfColumns = dataTypes?.Length ?? 0;

if (header == true && numberOfRowsToRead != -1)
List<DataFrameColumn> columns;
string[] fields;
using (var textReader = wrappedReader.GetTextReader())
{
numberOfRowsToRead++;
}
TextFieldParser parser = new TextFieldParser(textReader);
parser.SetDelimiters(separator.ToString());

List<DataFrameColumn> columns;
// First pass: schema and number of rows.
string line = null;
var linesForGuessType = new List<string[]>();
long rowline = 0;
int numberOfColumns = dataTypes?.Length ?? 0;

var enumerator = lines.GetEnumerator();
while (enumerator.MoveNext())
{
line = enumerator.Current;
if ((numberOfRowsToRead == -1) || rowline < numberOfRowsToRead)
if (header == true && numberOfRowsToRead != -1)
{
numberOfRowsToRead++;
}

// First pass: schema and number of rows.
while ((fields = parser.ReadFields()) != null)
{
if (linesForGuessType.Count < guessRows || (header && rowline == 0))
if ((numberOfRowsToRead == -1) || rowline < numberOfRowsToRead)
{
var spl = line.Split(separator);
if (header && rowline == 0)
if (linesForGuessType.Count < guessRows || (header && rowline == 0))
{
if (columnNames == null)
string[] spl = fields;
if (header && rowline == 0)
{
columnNames = spl;
if (columnNames == null)
{
columnNames = spl;
}
}
else
{
linesForGuessType.Add(spl);
numberOfColumns = Math.Max(numberOfColumns, spl.Length);
}
}
else
{
linesForGuessType.Add(spl);
numberOfColumns = Math.Max(numberOfColumns, spl.Length);
}
}
++rowline;
if (rowline == guessRows || guessRows == 0)
{
break;
}
}
++rowline;
if (rowline == guessRows || guessRows == 0)

if (rowline == 0)
{
break;
throw new FormatException(Strings.EmptyFile);
}
}

if (rowline == 0)
{
throw new FormatException(Strings.EmptyFile);
}

columns = new List<DataFrameColumn>(numberOfColumns);
// Guesses types or looks up dataTypes and adds columns.
for (int i = 0; i < numberOfColumns; ++i)
{
Type kind = dataTypes == null ? GuessKind(i, linesForGuessType) : dataTypes[i];
columns.Add(CreateColumn(kind, columnNames, i));
columns = new List<DataFrameColumn>(numberOfColumns);
// Guesses types or looks up dataTypes and adds columns.
for (int i = 0; i < numberOfColumns; ++i)
{
Type kind = dataTypes == null ? GuessKind(i, linesForGuessType) : dataTypes[i];
columns.Add(CreateColumn(kind, columnNames, i));
}
}

DataFrame ret = new DataFrame(columns);
line = null;

// Fill values.
enumerator.Reset();
rowline = 0;
while (enumerator.MoveNext() && (numberOfRowsToRead == -1 || rowline < numberOfRowsToRead))
using (var textReader = wrappedReader.GetTextReader())
{
line = enumerator.Current;
var spl = line.Split(separator);
if (header && rowline == 0)
{
// Skips.
}
else
TextFieldParser parser = new TextFieldParser(textReader);
parser.SetDelimiters(separator.ToString());

long rowline = 0;
while ((fields = parser.ReadFields()) != null && (numberOfRowsToRead == -1 || rowline < numberOfRowsToRead))
{
ret.Append(spl, inPlace: true);
string[] spl = fields;
if (header && rowline == 0)
{
// Skips.
}
else
{
ret.Append(spl, inPlace: true);
}
++rowline;
}
++rowline;
}

if (addIndexColumn)
{
PrimitiveDataFrameColumn<int> indexColumn = new PrimitiveDataFrameColumn<int>("IndexColumn", columns[0].Length);
for (int i = 0; i < columns[0].Length; i++)
if (addIndexColumn)
{
indexColumn[i] = i;
PrimitiveDataFrameColumn<int> indexColumn = new PrimitiveDataFrameColumn<int>("IndexColumn", columns[0].Length);
for (int i = 0; i < columns[0].Length; i++)
{
indexColumn[i] = i;
}
columns.Insert(0, indexColumn);
}
columns.Insert(0, indexColumn);
}
return ret;
}

private class CsvLines : IEnumerable<string>
{
private CsvLineEnumerator enumerator;
public CsvLines(CsvLineEnumerator csvLineEnumerator)
{
enumerator = csvLineEnumerator;
}

public IEnumerator<string> GetEnumerator() => enumerator;

IEnumerator IEnumerable.GetEnumerator() => enumerator;
return ret;
}

private class CsvLineEnumerator : IEnumerator<string>
private class WrappedStreamReaderOrStringReader
{
private StreamReader streamReader;
private string currentLine;
private long streamStartPosition;
public CsvLineEnumerator(StreamReader csvStream)
{
streamStartPosition = csvStream.BaseStream.Position;
streamReader = csvStream;
currentLine = null;
}

public string Current => currentLine;

object IEnumerator.Current => currentLine;
private Stream _stream;
private long _initialPosition;
private Encoding _encoding;
private string _csvString;

public void Dispose()
public WrappedStreamReaderOrStringReader(Stream stream, Encoding encoding)
{
throw new NotImplementedException();
_stream = stream;
_initialPosition = stream.Position;
_encoding = encoding;
_csvString = null;
}

public bool MoveNext()
public WrappedStreamReaderOrStringReader(string csvString)
{
currentLine = streamReader.ReadLine();
return currentLine != null;
_csvString = csvString;
_initialPosition = 0;
_encoding = null;
_stream = null;
}

public void Reset()
// Returns a new TextReader. If the wrapped object is a stream, the stream is reset to its initial position.
public TextReader GetTextReader()
{
streamReader.DiscardBufferedData();
streamReader.BaseStream.Seek(streamStartPosition, SeekOrigin.Begin);
if (_stream != null)
{
_stream.Seek(_initialPosition, SeekOrigin.Begin);
return new StreamReader(_stream, _encoding, detectEncodingFromByteOrderMarks: true, DefaultStreamReaderBufferSize, leaveOpen: true);
}
else
{
return new StringReader(_csvString);
}

}

}

/// <summary>
Expand All @@ -336,8 +337,8 @@ public static DataFrame LoadCsvFromString(string csvString,
string[] columnNames = null, Type[] dataTypes = null,
long numberOfRowsToRead = -1, int guessRows = 10, bool addIndexColumn = false)
{
string[] lines = csvString.Split(new[] { Environment.NewLine }, StringSplitOptions.None);
return ReadCsvLinesIntoDataFrame(lines, separator, header, columnNames, dataTypes, numberOfRowsToRead, guessRows, addIndexColumn);
WrappedStreamReaderOrStringReader wrappedStreamReaderOrStringReader = new WrappedStreamReaderOrStringReader(csvString);
return ReadCsvLinesIntoDataFrame(wrappedStreamReaderOrStringReader, separator, header, columnNames, dataTypes, numberOfRowsToRead, guessRows, addIndexColumn);
}

/// <summary>
Expand Down Expand Up @@ -369,12 +370,8 @@ public static DataFrame LoadCsv(Stream csvStream,
throw new ArgumentException(string.Format(Strings.ExpectedEitherGuessRowsOrDataTypes, nameof(guessRows), nameof(dataTypes)));
}

using (var streamReader = new StreamReader(csvStream, encoding ?? Encoding.UTF8, detectEncodingFromByteOrderMarks: true, DefaultStreamReaderBufferSize, leaveOpen: true))
{
CsvLineEnumerator linesEnumerator = new CsvLineEnumerator(streamReader);
IEnumerable<string> lines = new CsvLines(linesEnumerator);
return ReadCsvLinesIntoDataFrame(lines, separator, header, columnNames, dataTypes, numberOfRowsToRead, guessRows, addIndexColumn);
}
WrappedStreamReaderOrStringReader wrappedStreamReaderOrStringReader = new WrappedStreamReaderOrStringReader(csvStream, encoding ?? Encoding.UTF8);
return ReadCsvLinesIntoDataFrame(wrappedStreamReaderOrStringReader, separator, header, columnNames, dataTypes, numberOfRowsToRead, guessRows, addIndexColumn);
}

/// <summary>
Expand Down
Loading

0 comments on commit 79dcab6

Please sign in to comment.