diff --git a/examples/movie_view_ratings/run_all_frameworks.py b/examples/movie_view_ratings/run_all_frameworks.py index 7878aa89..5c40c6f5 100644 --- a/examples/movie_view_ratings/run_all_frameworks.py +++ b/examples/movie_view_ratings/run_all_frameworks.py @@ -50,6 +50,8 @@ 'contribution_bounds_already_enforced', False, 'Assume the input dataset already enforces the hard-coded contribution' 'bounds. Ignore the user identifiers.') +flags.DEFINE_boolean('vector_metrics', False, + 'Compute DP vector metrics for rating values') def calculate_private_result(movie_views, pipeline_backend): @@ -70,7 +72,6 @@ def calc_dp_rating_metrics(movie_views, backend, public_partitions): # Create a DPEngine instance. dp_engine = pipeline_dp.DPEngine(budget_accountant, backend) - # Specify which DP aggregated metrics to compute. params = pipeline_dp.AggregateParams( noise_kind=pipeline_dp.NoiseKind.LAPLACE, metrics=[ @@ -85,13 +86,23 @@ def calc_dp_rating_metrics(movie_views, backend, public_partitions): contribution_bounds_already_enforced=FLAGS. contribution_bounds_already_enforced) + value_extractor = lambda mv: mv.rating + + if FLAGS.vector_metrics: + # Specify which DP aggregated metrics to compute for vector values. + params.metrics = [pipeline_dp.Metrics.VECTOR_SUM] + params.vector_size = 5 # Size of ratings vector + params.vector_max_norm = 1 + value_extractor = lambda mv: encode_one_hot(mv.rating - 1, params. + vector_size) + # Specify how to extract privacy_id, partition_key and value from an # element of movie view collection. data_extractors = pipeline_dp.DataExtractors( partition_extractor=lambda mv: mv.movie_id, privacy_id_extractor=(lambda mv: mv.user_id) if not FLAGS.contribution_bounds_already_enforced else None, - value_extractor=lambda mv: mv.rating) + value_extractor=value_extractor) # Run aggregation. dp_result = dp_engine.aggregate(movie_views, params, data_extractors, @@ -170,6 +181,12 @@ def compute_on_local_backend(): write_to_file(dp_result, FLAGS.output_file) +def encode_one_hot(value, vector_size): + vec = [0] * vector_size + vec[value] = 1 + return vec + + def main(unused_argv): if FLAGS.framework == 'beam': compute_on_beam() diff --git a/pipeline_dp/__init__.py b/pipeline_dp/__init__.py index 1c264b31..765ac712 100644 --- a/pipeline_dp/__init__.py +++ b/pipeline_dp/__init__.py @@ -27,5 +27,6 @@ from pipeline_dp.pipeline_backend import BeamBackend from pipeline_dp.pipeline_backend import LocalBackend from pipeline_dp.pipeline_backend import SparkRDDBackend +from pipeline_dp.aggregate_params import NormKind __version__ = '0.1.1' diff --git a/pipeline_dp/aggregate_params.py b/pipeline_dp/aggregate_params.py index 9458ba57..26e4ac78 100644 --- a/pipeline_dp/aggregate_params.py +++ b/pipeline_dp/aggregate_params.py @@ -26,6 +26,7 @@ class Metrics(Enum): SUM = 'sum' MEAN = 'mean' VARIANCE = 'variance' + VECTOR_SUM = 'vector_sum' class NoiseKind(Enum): @@ -69,13 +70,15 @@ class AggregateParams: max_value: Upper bound on each value. custom_combiners: Warning: experimental@ Combiners for computing custom metrics. + vector_norm_kind: The type of norm. Used only for VECTOR_SUM metric calculations. + vector_max_norm: Bound on each value of a vector. Used only for VECTOR_SUM metric calculations. + vector_size: Number of coordinates in a vector. Used only for VECTOR_SUM metric calculations. contribution_bounds_already_enforced: assume that the input dataset complies with the bounds provided in max_partitions_contributed and max_contributions_per_partition. This option can be used if the dataset does not contain any identifiers that can be used to enforce contribution bounds automatically. """ - metrics: Iterable[Metrics] max_partitions_contributed: int max_contributions_per_partition: int @@ -87,6 +90,9 @@ class AggregateParams: public_partitions: Any = None # deprecated noise_kind: NoiseKind = NoiseKind.LAPLACE custom_combiners: Iterable['CustomCombiner'] = None + vector_norm_kind: NormKind = NormKind.Linf + vector_max_norm: float = None + vector_size: int = None contribution_bounds_already_enforced: bool = False def __post_init__(self): @@ -123,6 +129,13 @@ def __post_init__(self): raise ValueError( "params.max_value must be equal to or greater than params.min_value" ) + if Metrics.VECTOR_SUM in self.metrics and \ + (Metrics.SUM in self.metrics or \ + Metrics.MEAN in self.metrics or \ + Metrics.VARIANCE in self.metrics): + raise ValueError( + "Vector sum can not be computed together with scalar metrics, like sum, mean etc" + ) if self.contribution_bounds_already_enforced and Metrics.PRIVACY_ID_COUNT in self.metrics: raise ValueError( "Cannot calculate PRIVACY_ID_COUNT when " diff --git a/pipeline_dp/combiners.py b/pipeline_dp/combiners.py index 56b13083..58dffe10 100644 --- a/pipeline_dp/combiners.py +++ b/pipeline_dp/combiners.py @@ -13,7 +13,7 @@ # limitations under the License. import abc import copy -from typing import Iterable, Sized, Tuple, List +from typing import Iterable, Sized, Tuple, List, Union import pipeline_dp from pipeline_dp import dp_computations @@ -21,6 +21,8 @@ import numpy as np import collections +ArrayLike = Union[np.ndarray, List[float]] + class Combiner(abc.ABC): """Base class for all combiners. @@ -148,6 +150,19 @@ def mean_var_params(self): self.aggregate_params.max_contributions_per_partition, self.aggregate_params.noise_kind) + @property + def additive_vector_noise_params( + self) -> dp_computations.AdditiveVectorNoiseParams: + return dp_computations.AdditiveVectorNoiseParams( + eps_per_coordinate=self.eps / self.aggregate_params.vector_size, + delta_per_coordinate=self.delta / self.aggregate_params.vector_size, + max_norm=self.aggregate_params.vector_max_norm, + l0_sensitivity=self.aggregate_params.max_partitions_contributed, + linf_sensitivity=self.aggregate_params. + max_contributions_per_partition, + norm_kind=self.aggregate_params.vector_norm_kind, + noise_kind=self.aggregate_params.noise_kind) + class CountCombiner(Combiner): """Combiner for computing DP Count. @@ -460,6 +475,48 @@ def metrics_names(self) -> List[str]: return self._metrics_to_compute +class VectorSumCombiner(Combiner): + """Combiner for computing dp vector sum. + + The type of the accumulator is np.ndarray, which represents sum of the vectors of the same size + for which this accumulator is computed. + """ + AccumulatorType = np.ndarray + + def __init__(self, params: CombinerParams): + self._params = params + + def create_accumulator(self, + values: Iterable[ArrayLike]) -> AccumulatorType: + array_sum = None + for val in values: + if not isinstance(val, np.ndarray): + val = np.array(val) + if val.shape != (self._params.aggregate_params.vector_size,): + raise TypeError( + f"Shape mismatch: {val.shape} != {(self._params.aggregate_params.vector_size,)}" + ) + if array_sum is None: + array_sum = val + else: + array_sum += val + return array_sum + + def merge_accumulators(self, array_sum1: AccumulatorType, + array_sum2: AccumulatorType): + return array_sum1 + array_sum2 + + def compute_metrics(self, array_sum: AccumulatorType) -> dict: + return { + 'vector_sum': + dp_computations.add_noise_vector( + array_sum, self._params.additive_vector_noise_params) + } + + def metrics_names(self) -> List[str]: + return ['vector_sum'] + + def create_compound_combiner( aggregate_params: pipeline_dp.AggregateParams, budget_accountant: budget_accounting.BudgetAccountant @@ -508,6 +565,12 @@ def create_compound_combiner( combiners.append( PrivacyIdCountCombiner( CombinerParams(budget_privacy_id_count, aggregate_params))) + if pipeline_dp.Metrics.VECTOR_SUM in aggregate_params.metrics: + budget_vector_sum = budget_accountant.request_budget( + mechanism_type, weight=aggregate_params.budget_weight) + combiners.append( + VectorSumCombiner( + CombinerParams(budget_vector_sum, aggregate_params))) return CompoundCombiner(combiners, return_named_tuple=True) diff --git a/tests/combiners_test.py b/tests/combiners_test.py index 08cb8f54..2a0458f9 100644 --- a/tests/combiners_test.py +++ b/tests/combiners_test.py @@ -33,14 +33,16 @@ def _create_mechanism_spec(no_noise): return ba.MechanismSpec(ba.MechanismType.GAUSSIAN, None, eps, delta) -def _create_aggregate_params(max_value: float = 1): +def _create_aggregate_params(max_value: float = 1, vector_size: int = 1): return pipeline_dp.AggregateParams( min_value=0, max_value=max_value, max_partitions_contributed=1, max_contributions_per_partition=3, noise_kind=pipeline_dp.NoiseKind.GAUSSIAN, - metrics=[pipeline_dp.Metrics.COUNT]) + metrics=[pipeline_dp.Metrics.COUNT], + vector_max_norm=5, + vector_size=vector_size) class CreateCompoundCombinersTest(parameterized.TestCase): @@ -86,6 +88,9 @@ def _create_aggregate_params(self, metrics: typing.Optional[typing.List]): pipeline_dp.Metrics.MEAN, pipeline_dp.Metrics.VARIANCE ], expected_combiner_types=[dp_combiners.VarianceCombiner]), + dict(testcase_name='vector_sum', + metrics=[pipeline_dp.Metrics.VECTOR_SUM], + expected_combiner_types=[dp_combiners.VectorSumCombiner]), ) def test_create_compound_combiner(self, metrics, expected_combiner_types): # Arrange. @@ -458,5 +463,61 @@ def test_compute_metrics_with_noise(self): self.assertTrue(np.var(noised_sum) > 1) # check that noise is added +class VectorSumCombinerTest(parameterized.TestCase): + + def _create_combiner(self, no_noise, vector_size): + mechanism_spec = _create_mechanism_spec(no_noise) + aggregate_params = _create_aggregate_params(vector_size=vector_size) + params = dp_combiners.CombinerParams(mechanism_spec, aggregate_params) + return dp_combiners.VectorSumCombiner(params) + + @parameterized.named_parameters( + dict(testcase_name='no_noise', no_noise=True), + dict(testcase_name='noise', no_noise=False), + ) + def test_create_accumulator(self, no_noise): + combiner = self._create_combiner(no_noise, vector_size=1) + self.assertEqual(np.array([0.]), combiner.create_accumulator([[0.]])) + self.assertEqual( + np.array([2.]), + combiner.create_accumulator([np.array([1.]), + np.array([1.])])) + + @parameterized.named_parameters( + dict(testcase_name='no_noise', no_noise=True), + dict(testcase_name='noise', no_noise=False), + ) + def test_merge_accumulators(self, no_noise): + combiner = self._create_combiner(no_noise, vector_size=1) + self.assertEqual( + np.array([0.]), + combiner.merge_accumulators(np.array([0.]), np.array([0.]))) + combiner = self._create_combiner(no_noise, vector_size=2) + merge_result = combiner.merge_accumulators(np.array([1., 1.]), + np.array([1., 4.])) + self.assertTrue(np.array_equal(np.array([2., 5.]), merge_result)) + + def test_compute_metrics_no_noise(self): + combiner = self._create_combiner(no_noise=True, vector_size=1) + self.assertAlmostEqual(5, + combiner.compute_metrics(np.array( + [5]))['vector_sum'], + delta=1e-5) + + def test_compute_metrics_with_noise(self): + combiner = self._create_combiner(no_noise=False, vector_size=2) + accumulator = np.array([1, 3]) + noisy_values = [ + combiner.compute_metrics(accumulator)['vector_sum'] + for _ in range(1000) + ] + # Standard deviation for the noise is about 1.37. So we set a large + # delta here. + mean_array = np.mean(noisy_values, axis=0) + self.assertAlmostEqual(accumulator[0], mean_array[0], delta=0.5) + self.assertAlmostEqual(accumulator[1], mean_array[1], delta=0.5) + self.assertTrue(np.var(noisy_values) > 1) # check that noise is added + + if __name__ == '__main__': absltest.main()