Skip to content

Commit

Permalink
values() operator can extract multiple attributes. Closes #29.
Browse files Browse the repository at this point in the history
  • Loading branch information
tshead2 committed Aug 2, 2013
1 parent 3e9a65b commit 0a068a2
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 45 deletions.
21 changes: 14 additions & 7 deletions analysis-server/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,18 +529,25 @@ def test_scan_null():

def test_values_default():
array1 = random(5, attributes=["foo", "bar"])
b = values(array1)
numpy.testing.assert_equal(len(values(array1)), 2)
numpy.testing.assert_array_equal(values(array1)[0], values(array1, 0))
numpy.testing.assert_array_equal(values(array1)[1], values(array1, 1))

def test_values_index():
array1 = random(5, attributes=["foo", "bar"])
b = values(array1, 0)
c = values(array1, 1)

def test_values_name():
def test_values_index_name():
array1 = random(5, attributes=["foo", "bar"])
numpy.testing.assert_array_equal(values(array1, "foo"), values(array1, 0))
numpy.testing.assert_array_equal(values(array1, "bar"), values(array1, 1))

def test_values_indexes():
array1 = random(5, attributes=["foo", "bar"])
numpy.testing.assert_array_equal(values(array1, [1, 0])[0], values(array1, 1))
numpy.testing.assert_array_equal(values(array1, [1, 0])[1], values(array1, 0))

def test_values_names():
array1 = random(5, attributes=["foo", "bar"])
numpy.testing.assert_array_equal(values(array1, ["bar", "foo"])[0], values(array1, 1))
numpy.testing.assert_array_equal(values(array1, ["bar", "foo"])[1], values(array1, 0))

def test_zeros_1d():
array1 = zeros(5)
require_array_schema(array1, [("d0", "int64", 0, 5, 5)], [("val", "float64")])
Expand Down
113 changes: 75 additions & 38 deletions packages/slycat/analysis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,46 +285,81 @@ def value(self, source, attribute=0):
iterator.release()
raise

def values(self, source, attribute=0):
"""Return an array attribute as a numpy array.
"""
start_time = time.time()
def values(self, source, attributes=None):
"""Convert array attributes into numpy arrays.
# Handle attribute names ...
if isinstance(attribute, basestring):
for index, array_attribute in enumerate(source.attributes):
if array_attribute["name"] == attribute:
attribute = index
break
else:
raise InvalidArgument("Not an attribute name: {}".format(attribute))
Attributes can be specified by-index or by-name, or any mixture of the two.
# Materialize every chunk into memory ...
chunk_coordinates = []
chunk_values = []
iterator = source.proxy.iterator()
try:
while True:
iterator.next()
chunk_coordinates.append(iterator.coordinates())
chunk_values.append(iterator.values(attribute))
except StopIteration:
iterator.release()
except:
iterator.release()
raise
If the attributes parameter is None (the default), values() will return
every attribute in the array. If the array has a single attribute, it will
be returned as a single numpy array. If the array has multiple attributes,
they will be returned as a tuple of numpy arrays.
If the attributes parameter is a single integer or string, a single numpy
array will be returned.
If the attributes parameter is a sequence of integers / strings, a tuple of
numpy arrays will be returned.
# Calculate a compatible dtype for our result array (this would be easy,
# except we have to handle string arrays, which will contain different
# fixed-width string dtypes).
#log.debug("dtypes: %s", [values.dtype for values in chunk_values])
result_type = numpy.result_type(*[values.dtype for values in chunk_values])
Note that converting an attribute from an array means moving all the
attribute data to the client, which may be impractical or exceed client
memory for large arrays.
"""
def materialize_attribute(attribute, source_attributes):
"""Materializes one attribute into a numpy array."""
# Convert attribute names into indices ...
if isinstance(attribute, basestring):
for index, source_attribute in enumerate(source_attributes):
if source_attribute["name"] == attribute:
attribute = index
break
else:
raise InvalidArgument("Unknown attribute name: {}".format(attribute))
elif isinstance(attribute, int):
if attribute >= len(source_attributes):
raise InvalidArgument("Attribute index out-of-bounds: {}".format(attribute))
else:
raise InvalidArgument("Attribute must be an integer index or a name: {}".format(attribute))

# Materialize every chunk into memory ...
chunk_coordinates = []
chunk_values = []
iterator = source.proxy.iterator()
try:
while True:
iterator.next()
chunk_coordinates.append(iterator.coordinates())
chunk_values.append(iterator.values(attribute))
except StopIteration:
iterator.release()
except:
iterator.release()
raise

# Calculate a compatible dtype for the result array (this would be easy,
# except we have to handle string arrays, where each chunk may contain
# different fixed-width string dtypes).
result_type = numpy.result_type(*[values.dtype for values in chunk_values])

# Create the result array and populate it ...
result = numpy.empty(source.shape, dtype=result_type)
for coordinates, values in zip(chunk_coordinates, chunk_values):
hyperslice = [slice(coordinate, coordinate + values.shape[index]) for index, coordinate in enumerate(coordinates)]
result[hyperslice] = values
return result

# Create the result array and populate it ...
result = numpy.empty(source.shape, dtype=result_type)
for coordinates, values in zip(chunk_coordinates, chunk_values):
hyperslice = [slice(coordinate, coordinate + values.shape[index]) for index, coordinate in enumerate(coordinates)]
result[hyperslice] = values
start_time = time.time()

source_attributes = source.attributes
if attributes is None:
if len(source_attributes) == 1:
return materialize_attribute(0, source_attributes)
else:
return tuple([materialize_attribute(attribute, source_attributes) for attribute in range(len(source_attributes))])
elif isinstance(attributes, list) or isinstance(attributes, tuple):
return tuple([materialize_attribute(attribute, source_attributes) for attribute in attributes])
else:
return materialize_attribute(attributes, source_attributes)

log.info("elapsed time: %s seconds" % (time.time() - start_time))
return result
Expand Down Expand Up @@ -622,9 +657,11 @@ def shutdown():
def value(source, attribute=0):
return get_coordinator().value(source, attribute)
value.__doc__ = coordinator.value.__doc__
def values(source, attribute=0):
return get_coordinator().values(source, attribute)

def values(source, attributes=None):
return get_coordinator().values(source, attributes)
values.__doc__ = coordinator.values.__doc__

def workers():
return get_coordinator().workers()
workers.__doc__ = coordinator.workers.__doc__
Expand Down

0 comments on commit 0a068a2

Please sign in to comment.