diff --git a/src/tape/__init__.py b/src/tape/__init__.py index e2ac94ab..1e9471fa 100644 --- a/src/tape/__init__.py +++ b/src/tape/__init__.py @@ -3,3 +3,4 @@ from .ensemble_frame import * # noqa from .timeseries import * # noqa from .ensemble_readers import * # noqa +from ._version import __version__ # noqa diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 63b25d74..1002dff3 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -13,11 +13,10 @@ from .analysis.feature_extractor import BaseLightCurveFeature, FeatureExtractor from .analysis.structure_function import SF_METHODS from .analysis.structurefunction2 import calc_sf2 -from .ensemble_frame import EnsembleFrame, EnsembleSeries, ObjectFrame, SourceFrame, TapeFrame, TapeSeries +from .ensemble_frame import EnsembleFrame, EnsembleSeries, ObjectFrame, SourceFrame, TapeFrame, TapeObjectFrame, TapeSourceFrame, TapeSeries from .timeseries import TimeSeries from .utils import ColumnMapper -# TODO import from EnsembleFrame...? SOURCE_FRAME_LABEL = "source" OBJECT_FRAME_LABEL = "object" @@ -48,7 +47,6 @@ def __init__(self, client=True, **kwargs): # A unique ID to allocate new result frame labels. self.default_frame_id = 1 - # TODO(wbeebe@uw.edu) Replace self._source and self._object with these self.source = None # Source Table EnsembleFrame self.object = None # Object Table EnsembleFrame @@ -779,40 +777,68 @@ def calc_nobs(self, by_band=False, label="nobs", temporary=True): """ if by_band: - band_counts = ( - self._source.groupby([self._id_col])[self._band_col] # group by each object - .value_counts() # count occurence of each band - .to_frame() # convert series to dataframe - .reset_index() # break up the multiindex - .categorize(columns=[self._band_col]) # retype the band labels as categories - .pivot_table(values=self._band_col, index=self._id_col, columns=self._band_col, aggfunc="sum") - ) # the pivot_table call makes each band_count a column of the id_col row - # repartition the result to align with object if self._object.known_divisions: - self._object.divisions = tuple([None for i in range(self._object.npartitions + 1)]) - band_counts = band_counts.repartition(npartitions=self._object.npartitions) + # Grab these up front to help out the task graph + id_col = self._id_col + band_col = self._band_col + + # Get the band metadata + unq_bands = np.unique(self._source[band_col]) + meta = {band: float for band in unq_bands} + + # Map the groupby to each partition + band_counts = self._source.map_partitions( + lambda x: x.groupby(id_col)[[band_col]] + .value_counts() + .to_frame() + .reset_index() + .pivot_table(values=band_col, index=id_col, columns=band_col, aggfunc="sum"), + meta=meta, + ).repartition(divisions=self._object.divisions) else: + band_counts = ( + self._source.groupby([self._id_col])[self._band_col] # group by each object + .value_counts() # count occurence of each band + .to_frame() # convert series to dataframe + .rename(columns={self._band_col: "counts"}) # rename column + .reset_index() # break up the multiindex + .categorize(columns=[self._band_col]) # retype the band labels as categories + .pivot_table( + values=self._band_col, index=self._id_col, columns=self._band_col, aggfunc="sum" + ) + ) # the pivot_table call makes each band_count a column of the id_col row + band_counts = band_counts.repartition(npartitions=self._object.npartitions) # short-hand for calculating nobs_total band_counts["total"] = band_counts[list(band_counts.columns)].sum(axis=1) bands = band_counts.columns.values - self._object = self._object.assign(**{label + "_" + band: band_counts[band] for band in bands}) + self._object = self._object.assign( + **{label + "_" + str(band): band_counts[band] for band in bands} + ) if temporary: - self._object_temp.extend(label + "_" + band for band in bands) + self._object_temp.extend(label + "_" + str(band) for band in bands) else: - counts = self._source.groupby([self._id_col])[[self._band_col]].aggregate("count") - - # repartition the result to align with object - if self._object.known_divisions: - self._object.divisions = tuple([None for i in range(self._object.npartitions + 1)]) - counts = counts.repartition(npartitions=self._object.npartitions) + if self._object.known_divisions and self._source.known_divisions: + # Grab these up front to help out the task graph + id_col = self._id_col + band_col = self._band_col + + # Map the groupby to each partition + counts = self._source.map_partitions( + lambda x: x.groupby([id_col])[[band_col]].aggregate("count") + ).repartition(divisions=self._object.divisions) else: - counts = counts.repartition(npartitions=self._object.npartitions) + # Just do a groupby on all source + counts = ( + self._source.groupby([self._id_col])[[self._band_col]] + .aggregate("count") + .repartition(npartitions=self._object.npartitions) + ) self._object = self._object.assign(**{label + "_total": counts[self._band_col]}) @@ -849,8 +875,7 @@ def prune(self, threshold=50, col_name=None): col_name = "nobs_total" # Mask on object table - mask = self._object[col_name] >= threshold - self.update_frame(self._object[mask]) + self = self.query(f"{col_name} >= {threshold}", table="object") self._object.set_dirty(True) # Object table is now dirty @@ -1134,12 +1159,18 @@ def s2n_inter_quartile_range(flux, err): meta=meta, ) + # Inherit divisions if known from source and the resulting index is the id + # Groupby on index should always return a subset that adheres to the same divisions criteria + if self._source.known_divisions and batch.index.name == self._id_col: + batch.divisions = self._source.divisions + if label is not None: if label == "": label = self._generate_frame_label() print(f"Using generated label, {label}, for a batch result.") # Track the result frame under the provided label self.add_frame(batch, label) + if compute: return batch.compute() else: @@ -1243,8 +1274,6 @@ def from_dask_dataframe( The ensemble object with the Dask dataframe data loaded. """ self._load_column_mapper(column_mapper, **kwargs) - - # TODO(wbeebe@uw.edu): Determine most efficient way to convert to SourceFrame/ObjectFrame source_frame = SourceFrame.from_dask_dataframe(source_frame, self) # Set the index of the source frame and save the resulting table @@ -1255,7 +1284,6 @@ def from_dask_dataframe( self.update_frame(self._generate_object_table()) else: - # TODO(wbeebe@uw.edu): Determine most efficient way to convert to SourceFrame/ObjectFrame self.update_frame(ObjectFrame.from_dask_dataframe(object_frame, ensemble=self)) self.update_frame(self._object.set_index(self._id_col, sorted=sorted, sort=sort)) @@ -1270,6 +1298,12 @@ def from_dask_dataframe( elif partition_size: self._source = self._source.repartition(partition_size=partition_size) + # Check that Divisions are established, warn if not. + for name, table in [("object", self._object), ("source", self._source)]: + if not table.known_divisions: + warnings.warn( + f"Divisions for {name} are not set, certain downstream dask operations may fail as a result. We recommend setting the `sort` or `sorted` flags when loading data to establish division information." + ) return self def from_hipscat(self, dir, source_subdir="source", object_subdir="object", column_mapper=None, **kwargs): @@ -1464,7 +1498,10 @@ def from_parquet( columns.append(self._provenance_col) # Read in the source parquet file(s) - source = SourceFrame.from_parquet(source_file, index=self._id_col, columns=columns, ensemble=self) + # Index is set False so that we can set it with a future set_index call + # This has the advantage of letting Dask set partition boundaries based + # on the divisions between the sources of different objects. + source = SourceFrame.from_parquet(source_file, index=False, columns=columns, ensemble=self) # Generate a provenance column if not provided if self._provenance_col is None: @@ -1474,7 +1511,9 @@ def from_parquet( object = None if object_file: # Read in the object file(s) - object = ObjectFrame.from_parquet(object_file, index=self._id_col, ensemble=self) + # Index is False so that we can set it with a future set_index call + # More meaningful for source than object but parity seems good here + object = ObjectFrame.from_parquet(object_file, index=False, ensemble=self) return self.from_dask_dataframe( source_frame=source, object_frame=object, @@ -1660,13 +1699,7 @@ def convert_flux_to_mag(self, zero_point, zp_form="mag", out_col_name=None, flux def _generate_object_table(self): """Generate an empty object table from the source table.""" - sor_idx = self._source.index.unique() - obj_df = pd.DataFrame(index=sor_idx) - - # Convert the resulting dataframe into an ObjectFrame - # TODO(wbeebe): Switch for a cleaner loading fucnction - res = ObjectFrame.from_dask_dataframe( - dd.from_pandas(obj_df, npartitions=int(np.ceil(self._source.npartitions / 100))), ensemble=self) + res = self._source.map_partitions(lambda x: TapeObjectFrame(index=x.index.unique())) return res @@ -1719,9 +1752,20 @@ def _sync_tables(self): if self._object.is_dirty(): # Sync Object to Source; remove any missing objects from source - obj_idx = list(self._object.index.compute()) - self.update_frame(self._source.map_partitions(lambda x: x[x.index.isin(obj_idx)])) - self.update_frame(self._source.persist()) # persist the source frame + + if self._object.known_divisions and self._source.known_divisions: + # Lazily Create an empty object table (just index) for joining + empty_obj = self._object.map_partitions(lambda x: TapeObjectFrame(index=x.index)) + if type(empty_obj) != type(self._object): + raise ValueError("Bad type for empty_obj: " + str(type(empty_obj))) + + # Join source onto the empty object table to align + self.update_frame(self._source.join(empty_obj, how="inner")) + else: + warnings.warn("Divisions are not known, syncing using a non-lazy method.") + obj_idx = list(self._object.index.compute()) + self.update_frame(self._source.map_partitions(lambda x: x[x.index.isin(obj_idx)])) + self.update_frame(self._source.persist()) # persist the source frame # Drop Temporary Source Columns on Sync if len(self._source_temp): @@ -1731,10 +1775,20 @@ def _sync_tables(self): if self._source.is_dirty(): # not elif if not self.keep_empty_objects: - # Sync Source to Object; remove any objects that do not have sources - sor_idx = list(self._source.index.unique().compute()) - self.update_frame(self._object.map_partitions(lambda x: x[x.index.isin(sor_idx)])) - self.update_frame(self._object.persist()) # persist the object frame + if self._object.known_divisions and self._source.known_divisions: + # Lazily Create an empty source table (just unique indexes) for joining + empty_src = self._source.map_partitions(lambda x: TapeSourceFrame(index=x.index.unique())) + if type(empty_src) != type(self._source): + raise ValueError("Bad type for empty_src: " + str(type(empty_src))) + + # Join object onto the empty unique source table to align + self.update_frame(self._object.join(empty_src, how="inner")) + else: + warnings.warn("Divisions are not known, syncing using a non-lazy method.") + # Sync Source to Object; remove any objects that do not have sources + sor_idx = list(self._source.index.unique().compute()) + self.update_frame(self._object.map_partitions(lambda x: x[x.index.isin(sor_idx)])) + self.update_frame(self._object.persist()) # persist the object frame # Drop Temporary Object Columns on Sync if len(self._object_temp): @@ -1834,7 +1888,7 @@ def _build_index(self, obj_id, band): index = pd.MultiIndex.from_tuples(tuples, names=["object_id", "band", "index"]) return index - def sf2(self, sf_method="basic", argument_container=None, use_map=True): + def sf2(self, sf_method="basic", argument_container=None, use_map=True, compute=True): """Wrapper interface for calling structurefunction2 on the ensemble Parameters @@ -1876,11 +1930,17 @@ def sf2(self, sf_method="basic", argument_container=None, use_map=True): self._source.index, argument_container=argument_container, ) - return result + else: - result = self.batch(calc_sf2, use_map=use_map, argument_container=argument_container) + result = self.batch( + calc_sf2, use_map=use_map, argument_container=argument_container, compute=compute + ) - return result + # Inherit divisions information if known + if self._source.known_divisions and self._object.known_divisions: + result.divisions = self._source.divisions + + return result def _translate_meta(self, meta): """Translates Dask-style meta into a TapeFrame or TapeSeries object. diff --git a/tests/tape_tests/conftest.py b/tests/tape_tests/conftest.py index e416a04a..c0af84c3 100644 --- a/tests/tape_tests/conftest.py +++ b/tests/tape_tests/conftest.py @@ -270,6 +270,25 @@ def parquet_ensemble(dask_client): return ens +# pylint: disable=redefined-outer-name +@pytest.fixture +def parquet_ensemble_with_divisions(dask_client): + """Create an Ensemble from parquet data.""" + ens = Ensemble(client=dask_client) + ens.from_parquet( + "tests/tape_tests/data/source/test_source.parquet", + "tests/tape_tests/data/object/test_object.parquet", + id_col="ps1_objid", + time_col="midPointTai", + band_col="filterName", + flux_col="psFlux", + err_col="psFluxErr", + sort=True, + ) + + return ens + + # pylint: disable=redefined-outer-name @pytest.fixture def parquet_ensemble_from_source(dask_client): diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index 89fb2dbc..c36d5dd9 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -32,6 +32,7 @@ def test_with_client(): "data_fixture", [ "parquet_ensemble", + "parquet_ensemble_with_divisions", "parquet_ensemble_without_client", "parquet_ensemble_from_source", "parquet_ensemble_from_hipscat", @@ -61,6 +62,11 @@ def test_parquet_construction(data_fixture, request): assert parquet_ensemble._source is not None assert parquet_ensemble._object is not None + # Make sure divisions are set + if data_fixture == "parquet_ensemble_with_divisions": + assert parquet_ensemble._source.known_divisions + assert parquet_ensemble._object.known_divisions + # Check that the data is not empty. obj, source = parquet_ensemble.compute() assert len(source) == 2000 @@ -723,12 +729,21 @@ def test_update_column_map(dask_client): assert cmap_2.map["provenance_col"] == "p" +@pytest.mark.parametrize( + "data_fixture", + [ + "parquet_ensemble", + "parquet_ensemble_with_divisions", + ], +) @pytest.mark.parametrize("legacy", [True, False]) -def test_sync_tables(parquet_ensemble, legacy): +def test_sync_tables(data_fixture, request, legacy): """ Test that _sync_tables works as expected, using Ensemble-level APIs when `legacy` is `True`, and EsnembleFrame APIs when `legacy` is `False`. """ + parquet_ensemble = request.getfixturevalue(data_fixture) + if legacy: assert len(parquet_ensemble.compute("object")) == 15 assert len(parquet_ensemble.compute("source")) == 2000 @@ -744,24 +759,16 @@ def test_sync_tables(parquet_ensemble, legacy): else: assert len(parquet_ensemble.object.compute()) == 5 - # Replace the maximum flux value with a NaN so that we will have a row to drop. - max_flux = max(parquet_ensemble.source[parquet_ensemble._flux_col]) - parquet_ensemble.source[parquet_ensemble._flux_col] = parquet_ensemble.source[ - parquet_ensemble._flux_col].apply( - lambda x: np.nan if x == max_flux else x, meta=pd.Series(dtype=float) - ) if legacy: parquet_ensemble.dropna(table="source") else: parquet_ensemble.source.dropna().update_ensemble() assert parquet_ensemble.source.is_dirty() # Dropna should set the source dirty flag - # Drop a whole object to test that the object is dropped in the object table + # Drop a whole object from Source to test that the object is dropped in the object table + dropped_obj_id = 88472935274829959 if legacy: - parquet_ensemble.query(f"{parquet_ensemble._id_col} != 88472935274829959", table="source") - assert parquet_ensemble.source.is_dirty() - parquet_ensemble.compute() - assert not parquet_ensemble.source.is_dirty() + parquet_ensemble.query(f"{parquet_ensemble._id_col} != {dropped_obj_id}", table="source") else: filtered_src = parquet_ensemble.source.query(f"{parquet_ensemble._id_col} != 88472935274829959") @@ -771,12 +778,16 @@ def test_sync_tables(parquet_ensemble, legacy): filtered_src.compute() assert parquet_ensemble.source.is_dirty() - # After updating the ensemble validate that a sync occurred and the table is no longer dirty. + # Update the ensemble to use the filtered source. filtered_src.update_ensemble() - filtered_src.compute() # Now equivalent to parquet_ensemble.source.compute() - assert not parquet_ensemble.source.is_dirty() - # both tables should have the expected number of rows after a sync + # Verify that the object ID we removed from the source table is present in the object table + assert dropped_obj_id in parquet_ensemble._object.index.compute().values + + # Perform an operation which should trigger syncing both tables. + parquet_ensemble.compute() + + # Both tables should have the expected number of rows after a sync if legacy: assert len(parquet_ensemble.compute("object")) == 4 assert len(parquet_ensemble.compute("source")) == 1063 @@ -784,9 +795,18 @@ def test_sync_tables(parquet_ensemble, legacy): assert len(parquet_ensemble.object.compute()) == 4 assert len(parquet_ensemble.source.compute()) == 1063 - # dirty flags should be unset after sync - assert not parquet_ensemble._object.is_dirty() - assert not parquet_ensemble._source.is_dirty() + # Validate that the filtered object has been removed from both tables. + assert dropped_obj_id not in parquet_ensemble.source.index.compute().values + assert dropped_obj_id not in parquet_ensemble.object.index.compute().values + + # Dirty flags should be unset after sync + assert not parquet_ensemble.object_dirty + assert not parquet_ensemble.source_dirty + + # Make sure that divisions are preserved + if data_fixture == "parquet_ensemble_with_divisions": + assert parquet_ensemble.source.known_divisions + assert parquet_ensemble.object.known_divisions @pytest.mark.parametrize("legacy", [True, False]) @@ -1026,10 +1046,19 @@ def test_temporary_cols(parquet_ensemble): assert "f2" not in ens._source.columns +@pytest.mark.parametrize( + "data_fixture", + [ + "parquet_ensemble", + "parquet_ensemble_with_divisions", + ], +) @pytest.mark.parametrize("legacy", [True, False]) -def test_dropna(parquet_ensemble, legacy): +def test_dropna(data_fixture, request, legacy): """Tests dropna, using Ensemble.dropna when `legacy` is `True`, and EnsembleFrame.dropna when `legacy` is `False`.""" + parquet_ensemble = request.getfixturevalue(data_fixture) + # Try passing in an unrecognized 'table' parameter and verify an exception is thrown with pytest.raises(ValueError): parquet_ensemble.dropna(table="banana") @@ -1062,6 +1091,10 @@ def test_dropna(parquet_ensemble, legacy): parquet_ensemble.source.dropna().update_ensemble() assert len(parquet_ensemble._source.compute().index) == source_length - occurrences_source + if data_fixture == "parquet_ensemble_with_divisions": + # divisions should be preserved + assert parquet_ensemble._source.known_divisions + # Now test dropping na from 'object' table # Sync the tables parquet_ensemble._sync_tables() @@ -1077,10 +1110,8 @@ def test_dropna(parquet_ensemble, legacy): parquet_ensemble.object.dropna().update_ensemble() assert len(parquet_ensemble.object.compute().index) == object_length - # get a valid object id and set at least two occurences of that id in the object table + # select an id from the object table valid_object_id = object_pdf.index.values[1] - object_pdf.index.values[0] = valid_object_id - occurrences_object = len(object_pdf.loc[valid_object_id].values) # Set the nobs_g values for one object to NaN so we can drop it. # We do this on the instantiated object (pdf) and convert it back into a @@ -1088,14 +1119,19 @@ def test_dropna(parquet_ensemble, legacy): object_pdf.loc[valid_object_id, parquet_ensemble._object.columns[0]] = pd.NA parquet_ensemble.update_frame(ObjectFrame.from_tapeframe(TapeObjectFrame(object_pdf), label="object", npartitions=1)) - # Try dropping NaNs from object and confirm that we did. + # Try dropping NaNs from object and confirm that we dropped a row if legacy: parquet_ensemble.dropna(table="object") else: parquet_ensemble.object.dropna().update_ensemble() - assert len(parquet_ensemble.object.compute().index) == object_length - occurrences_object + assert len(parquet_ensemble.object.compute().index) == object_length - 1 + + if data_fixture == "parquet_ensemble_with_divisions": + # divisions should be preserved + assert parquet_ensemble._object.known_divisions + new_objects_pdf = parquet_ensemble.object.compute() - assert len(new_objects_pdf.index) == len(object_pdf.index) - occurrences_object + assert len(new_objects_pdf.index) == len(object_pdf.index) - 1 # Assert the filtered ID is no longer in the objects. assert valid_source_id not in new_objects_pdf.index.values @@ -1136,18 +1172,29 @@ def test_keep_zeros(parquet_ensemble, legacy): assert parquet_ensemble._object.npartitions == prev_npartitions +@pytest.mark.parametrize( + "data_fixture", + [ + "parquet_ensemble", + "parquet_ensemble_with_divisions", + ], +) @pytest.mark.parametrize("by_band", [True, False]) -@pytest.mark.parametrize("know_divisions", [True, False]) -def test_calc_nobs(parquet_ensemble, by_band, know_divisions): - ens = parquet_ensemble - ens._object = ens._object.drop(["nobs_g", "nobs_r", "nobs_total"], axis=1) +@pytest.mark.parametrize("multi_partition", [True, False]) +def test_calc_nobs(data_fixture, request, by_band, multi_partition): + # Get the Ensemble from a fixture + ens = request.getfixturevalue(data_fixture) - if know_divisions: - ens._object = ens._object.reset_index().set_index(ens._id_col) - assert ens._object.known_divisions + if multi_partition: + ens._source = ens._source.repartition(3) + + # Drop the existing nobs columns + ens._object = ens._object.drop(["nobs_g", "nobs_r", "nobs_total"], axis=1) + # Calculate nobs ens.calc_nobs(by_band) + # Check that things turned out as we expect lc = ens._object.loc[88472935274829959].compute() if by_band: @@ -1158,16 +1205,46 @@ def test_calc_nobs(parquet_ensemble, by_band, know_divisions): assert "nobs_total" in ens._object.columns assert lc["nobs_total"].values[0] == 499 + # Make sure that if divisions were set previously, they are preserved + if data_fixture == "parquet_ensemble_with_divisions": + assert ens._object.known_divisions + assert ens._source.known_divisions + -def test_prune(parquet_ensemble): +@pytest.mark.parametrize( + "data_fixture", + [ + "parquet_ensemble", + "parquet_ensemble_with_divisions", + ], +) +@pytest.mark.parametrize("generate_nobs", [False, True]) +def test_prune(data_fixture, request, generate_nobs): """ Test that ensemble.prune() appropriately filters the dataframe """ + + # Get the Ensemble from a fixture + parquet_ensemble = request.getfixturevalue(data_fixture) + threshold = 10 - parquet_ensemble.prune(threshold) + # Generate the nobs cols from within prune + if generate_nobs: + # Drop the existing nobs columns + parquet_ensemble._object = parquet_ensemble._object.drop(["nobs_g", "nobs_r", "nobs_total"], axis=1) + parquet_ensemble.prune(threshold) + + # Use an existing column + else: + parquet_ensemble.prune(threshold, col_name="nobs_total") assert not np.any(parquet_ensemble._object["nobs_total"].values < threshold) + # Make sure that if divisions were set previously, they are preserved + if data_fixture == "parquet_ensemble_with_divisions": + assert parquet_ensemble._source.known_divisions + assert parquet_ensemble._object.known_divisions + def test_query(dask_client): ens = Ensemble(client=dask_client) @@ -1517,6 +1594,7 @@ def test_bin_sources_two_days(dask_client): "data_fixture", [ "parquet_ensemble", + "parquet_ensemble_with_divisions", "parquet_ensemble_without_client", ], ) @@ -1547,6 +1625,10 @@ def test_batch(data_fixture, request, use_map, on): assert isinstance(tracked_result, EnsembleSeries) assert result is tracked_result + # Make sure that divisions information is propagated if known + if parquet_ensemble._source.known_divisions and parquet_ensemble._object.known_divisions: + assert result.known_divisions + result = result.compute() if on is None: @@ -1681,25 +1763,41 @@ def test_build_index(dask_client): assert result_ids == target +@pytest.mark.parametrize( + "data_fixture", + [ + "parquet_ensemble", + "parquet_ensemble_with_divisions", + ], +) @pytest.mark.parametrize("method", ["size", "length", "loglength"]) @pytest.mark.parametrize("combine", [True, False]) @pytest.mark.parametrize("sthresh", [50, 100]) -def test_sf2(parquet_ensemble, method, combine, sthresh, use_map=False): +def test_sf2(data_fixture, request, method, combine, sthresh, use_map=False): """ Test calling sf2 from the ensemble """ + parquet_ensemble = request.getfixturevalue(data_fixture) arg_container = StructureFunctionArgumentContainer() arg_container.bin_method = method arg_container.combine = combine arg_container.bin_count_target = sthresh - res_sf2 = parquet_ensemble.sf2(argument_container=arg_container, use_map=use_map) + if not combine: + res_sf2 = parquet_ensemble.sf2(argument_container=arg_container, use_map=use_map, compute=False) + else: + res_sf2 = parquet_ensemble.sf2(argument_container=arg_container, use_map=use_map) res_batch = parquet_ensemble.batch(calc_sf2, use_map=use_map, argument_container=arg_container) + if parquet_ensemble._source.known_divisions and parquet_ensemble._object.known_divisions: + if not combine: + assert res_sf2.known_divisions + if combine: assert not res_sf2.equals(res_batch) # output should be different else: + res_sf2 = res_sf2.compute() assert res_sf2.equals(res_batch) # output should be identical diff --git a/tests/tape_tests/test_packaging.py b/tests/tape_tests/test_packaging.py new file mode 100644 index 00000000..ef36cc82 --- /dev/null +++ b/tests/tape_tests/test_packaging.py @@ -0,0 +1,6 @@ +import tape + + +def test_version(): + """Check to see that the version property returns something""" + assert tape.__version__ is not None