Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reading from BytesIO #125

Merged
merged 15 commits into from
Sep 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions nrrd/reader.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import bz2
import io
import os
import re
import shlex
Expand Down Expand Up @@ -401,9 +402,16 @@ def read_data(header, fh=None, filename=None, index_order='F'):

# If a compression encoding is used, then byte skip AFTER decompressing
if header['encoding'] == 'raw':
data = np.fromfile(fh, dtype)
if isinstance(fh, io.BytesIO):
raw_data = bytearray(fh.read(total_data_points * dtype.itemsize))
data = np.frombuffer(raw_data, dtype)
else:
data = np.fromfile(fh, dtype)
elif header['encoding'] in ['ASCII', 'ascii', 'text', 'txt']:
data = np.fromfile(fh, dtype, sep=' ')
if isinstance(fh, io.BytesIO):
data = np.fromstring(fh.read(), dtype, sep=' ')
else:
data = np.fromfile(fh, dtype, sep=' ')
else:
# Handle compressed data now
# Construct the decompression object based on encoding
Expand Down
36 changes: 36 additions & 0 deletions nrrd/tests/test_reading.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import unittest
from typing import ClassVar

Expand Down Expand Up @@ -465,6 +466,41 @@ def test_read_quoted_string_header_no_quotes(self):
self.assertEqual(['mm', 'cm', 'in'], header['space units'])
self.assertEqual(['X', 'Y', 'f(log(X,10),Y)'], header['labels'])

def test_read_memory(self):
def test(filename: str):
with open(filename, 'rb') as fh:
# Read into BytesIO and test that
x = fh.read()
memory_file = io.BytesIO(x)
memory_header = nrrd.read_header(memory_file)
memory_data = nrrd.read_data(memory_header, memory_file)

# Read normally via file handle
fh.seek(0)
expected_header = nrrd.read_header(fh)
expected_data = nrrd.read_data(expected_header, fh)

np.testing.assert_equal(expected_header, memory_header)
np.testing.assert_equal(expected_data, memory_data)

# Test that the data read is able to be edited
self.assertTrue(memory_data.flags['WRITEABLE'])

paths = [
RAW_NRRD_FILE_PATH,
GZ_NRRD_FILE_PATH,
GZ_BYTESKIP_NRRD_FILE_PATH,
GZ_LINESKIP_NRRD_FILE_PATH,
BZ2_NRRD_FILE_PATH,
ASCII_1D_NRRD_FILE_PATH,
ASCII_2D_NRRD_FILE_PATH,
RAW_4D_NRRD_FILE_PATH,
]

for filename in paths:
with self.subTest(filename):
test(filename)


class TestReadingFunctionsFortran(Abstract.TestReadingFunctions):
index_order = 'F'
Expand Down
158 changes: 119 additions & 39 deletions nrrd/tests/test_writing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,44 +21,66 @@ def setUp(self):
with open(RAW_DATA_FILE_PATH, 'rb') as fh:
self.expected_data = fh.read()

def write_and_read_back(self, encoding=None, level=9):
output_filename = os.path.join(self.temp_write_dir, f'testfile_{encoding}_{str(level)}.nrrd')
headers = {}
if encoding is not None:
headers['encoding'] = encoding
nrrd.write(output_filename, self.data_input, headers, compression_level=level,
index_order=self.index_order)
def test_write_default_header(self):
output_filename = os.path.join(self.temp_write_dir, 'testfile_default.nrrd')
nrrd.write(output_filename, self.data_input, {}, index_order=self.index_order)

# Read back the same file
data, header = nrrd.read(output_filename, index_order=self.index_order)
self.assertEqual(self.expected_data, data.tobytes(order=self.index_order))
self.assertEqual(header.get('encoding'), encoding or 'gzip') # default is gzip is not specified

return output_filename

def test_write_default_header(self):
self.write_and_read_back()
self.assertEqual(header.get('encoding'), 'gzip') # default is gzip if not specified

def test_write_raw(self):
self.write_and_read_back('raw')
output_filename = os.path.join(self.temp_write_dir, 'testfile_raw.nrrd')
nrrd.write(output_filename, self.data_input, {'encoding': 'raw'}, index_order=self.index_order)

# Read back the same file
data, header = nrrd.read(output_filename, index_order=self.index_order)
self.assertEqual(self.expected_data, data.tobytes(order=self.index_order))
self.assertEqual(header.get('encoding'), 'raw')

def test_write_gz(self):
self.write_and_read_back('gzip')
output_filename = os.path.join(self.temp_write_dir, 'testfile_gzip.nrrd')
nrrd.write(output_filename, self.data_input, {'encoding': 'gzip'}, index_order=self.index_order)

# Read back the same file
data, header = nrrd.read(output_filename, index_order=self.index_order)
self.assertEqual(self.expected_data, data.tobytes(order=self.index_order))
self.assertEqual(header.get('encoding'), 'gzip')

def test_write_bzip2(self):
self.write_and_read_back('bzip2')
output_filename = os.path.join(self.temp_write_dir, 'testfile_bzip2.nrrd')
nrrd.write(output_filename, self.data_input, {'encoding': 'bzip2'}, index_order=self.index_order)

# Read back the same file
data, header = nrrd.read(output_filename, index_order=self.index_order)
self.assertEqual(self.expected_data, data.tobytes(order=self.index_order))
self.assertEqual(header.get('encoding'), 'bzip2')

def test_write_gz_level1(self):
filename = self.write_and_read_back('gzip', level=1)
output_filename = os.path.join(self.temp_write_dir, 'testfile_gzip_1.nrrd')
nrrd.write(output_filename, self.data_input, {'encoding': 'gzip'}, compression_level=1,
index_order=self.index_order)

self.assertLess(os.path.getsize(GZ_NRRD_FILE_PATH), os.path.getsize(filename))
# Read back the same file
data, header = nrrd.read(output_filename, index_order=self.index_order)
self.assertEqual(self.expected_data, data.tobytes(order=self.index_order))
self.assertEqual(header.get('encoding'), 'gzip')
self.assertLess(os.path.getsize(GZ_NRRD_FILE_PATH), os.path.getsize(output_filename))

def test_write_bzip2_level1(self):
_ = self.write_and_read_back('bzip2', level=1)
output_filename = os.path.join(self.temp_write_dir, 'testfile_bzip2_1.nrrd')
nrrd.write(output_filename, self.data_input, {'encoding': 'bzip2'}, compression_level=1,
index_order=self.index_order)

# Read back the same file
data, header = nrrd.read(output_filename, index_order=self.index_order)
self.assertEqual(self.expected_data, data.tobytes(order=self.index_order))
self.assertEqual(header.get('encoding'), 'bzip2')

# note: we don't currently assert reduction here, because with the binary ball test data,
# the output size does not change at different bz2 levels.
# self.assertLess(os.path.getsize(BZ2_NRRD_FILE_PATH), os.path.getsize(fn))
# self.assertLess(os.path.getsize(BZ2_NRRD_FILE_PATH), os.path.getsize(output_filename))

def test_write_ascii_1d(self):
output_filename = os.path.join(self.temp_write_dir, 'testfile_ascii_1d.nrrd')
Expand Down Expand Up @@ -357,25 +379,83 @@ def test_write_check_remove_datafile(self):
data, header = nrrd.read(output_filename, index_order=self.index_order)
self.assertFalse('data file' in header)

def test_write_memory(self):
default_output_filename = os.path.join(self.temp_write_dir, 'testfile_default_filename.nrrd')
nrrd.write(default_output_filename, self.data_input, {}, index_order=self.index_order)

memory_nrrd = io.BytesIO()

nrrd.write(memory_nrrd, self.data_input, {}, index_order=self.index_order)

memory_nrrd.seek(0)

data, header = nrrd.read(default_output_filename, index_order=self.index_order)
memory_header = nrrd.read_header(memory_nrrd)
memory_data = nrrd.read_data(header=memory_header, fh=memory_nrrd, filename=None,
index_order=self.index_order)

self.assertEqual(self.expected_data, data.tobytes(order=self.index_order))
self.assertEqual(self.expected_data, memory_data.tobytes(order=self.index_order))
self.assertEqual(header.pop('sizes').all(), memory_header.pop('sizes').all())
self.assertSequenceEqual(header, memory_header)
def test_write_memory_default(self):
kwargs = {
'header': {},
'index_order': self.index_order
}

memory_nrrd_file = io.BytesIO()
nrrd.write(memory_nrrd_file, self.data_input, **kwargs)
memory_nrrd_file.seek(0)
memory_nrrd = memory_nrrd_file.readlines()

expected_filename = os.path.join(self.temp_write_dir, 'testfile_expected.nrrd')
nrrd.write(expected_filename, self.data_input, **kwargs)
with open(expected_filename, 'rb') as fh:
expected_nrrd = fh.readlines()

self.assertEqual(expected_nrrd, memory_nrrd)

def test_write_memory_raw(self):
kwargs = {
'header': {
'encoding': 'raw'
},
'index_order': self.index_order
}

memory_nrrd_file = io.BytesIO()
nrrd.write(memory_nrrd_file, self.data_input, **kwargs)
memory_nrrd_file.seek(0)
memory_nrrd = memory_nrrd_file.readlines()

expected_filename = os.path.join(self.temp_write_dir, 'testfile_expected.nrrd')
nrrd.write(expected_filename, self.data_input, **kwargs)
with open(expected_filename, 'rb') as fh:
expected_nrrd = fh.readlines()

self.assertEqual(expected_nrrd, memory_nrrd)

def test_write_memory_gzip(self):
kwargs = {
'header': {
'encoding': 'gzip'
},
'index_order': self.index_order
}

memory_nrrd_file = io.BytesIO()
nrrd.write(memory_nrrd_file, self.data_input, **kwargs)
memory_nrrd_file.seek(0)
memory_nrrd = memory_nrrd_file.readlines()

expected_filename = os.path.join(self.temp_write_dir, 'testfile_expected.nrrd')
nrrd.write(expected_filename, self.data_input, **kwargs)
with open(expected_filename, 'rb') as fh:
expected_nrrd = fh.readlines()

self.assertEqual(expected_nrrd, memory_nrrd)

def test_write_memory_bzip2(self):
kwargs = {
'header': {
'encoding': 'bzip2'
},
'index_order': self.index_order
}

memory_nrrd_file = io.BytesIO()
nrrd.write(memory_nrrd_file, self.data_input, **kwargs)
memory_nrrd_file.seek(0)
memory_nrrd = memory_nrrd_file.readlines()

expected_filename = os.path.join(self.temp_write_dir, 'testfile_expected.nrrd')
nrrd.write(expected_filename, self.data_input, **kwargs)
with open(expected_filename, 'rb') as fh:
expected_nrrd = fh.readlines()

self.assertEqual(expected_nrrd, memory_nrrd)

def test_write_memory_file_handle(self):
default_output_filename = os.path.join(self.temp_write_dir, 'testfile_default_filename.nrrd')
Expand Down