Skip to content

Commit

Permalink
Implementing VectorSumCombiner (#276)
Browse files Browse the repository at this point in the history
  • Loading branch information
rialg authored May 20, 2022
1 parent 14b4c49 commit bb40462
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 6 deletions.
21 changes: 19 additions & 2 deletions examples/movie_view_ratings/run_all_frameworks.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
'contribution_bounds_already_enforced', False,
'Assume the input dataset already enforces the hard-coded contribution'
'bounds. Ignore the user identifiers.')
flags.DEFINE_boolean('vector_metrics', False,
'Compute DP vector metrics for rating values')


def calculate_private_result(movie_views, pipeline_backend):
Expand All @@ -70,7 +72,6 @@ def calc_dp_rating_metrics(movie_views, backend, public_partitions):
# Create a DPEngine instance.
dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)

# Specify which DP aggregated metrics to compute.
params = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.LAPLACE,
metrics=[
Expand All @@ -85,13 +86,23 @@ def calc_dp_rating_metrics(movie_views, backend, public_partitions):
contribution_bounds_already_enforced=FLAGS.
contribution_bounds_already_enforced)

value_extractor = lambda mv: mv.rating

if FLAGS.vector_metrics:
# Specify which DP aggregated metrics to compute for vector values.
params.metrics = [pipeline_dp.Metrics.VECTOR_SUM]
params.vector_size = 5 # Size of ratings vector
params.vector_max_norm = 1
value_extractor = lambda mv: encode_one_hot(mv.rating - 1, params.
vector_size)

# Specify how to extract privacy_id, partition_key and value from an
# element of movie view collection.
data_extractors = pipeline_dp.DataExtractors(
partition_extractor=lambda mv: mv.movie_id,
privacy_id_extractor=(lambda mv: mv.user_id)
if not FLAGS.contribution_bounds_already_enforced else None,
value_extractor=lambda mv: mv.rating)
value_extractor=value_extractor)

# Run aggregation.
dp_result = dp_engine.aggregate(movie_views, params, data_extractors,
Expand Down Expand Up @@ -170,6 +181,12 @@ def compute_on_local_backend():
write_to_file(dp_result, FLAGS.output_file)


def encode_one_hot(value, vector_size):
vec = [0] * vector_size
vec[value] = 1
return vec


def main(unused_argv):
if FLAGS.framework == 'beam':
compute_on_beam()
Expand Down
1 change: 1 addition & 0 deletions pipeline_dp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@
from pipeline_dp.pipeline_backend import BeamBackend
from pipeline_dp.pipeline_backend import LocalBackend
from pipeline_dp.pipeline_backend import SparkRDDBackend
from pipeline_dp.aggregate_params import NormKind

__version__ = '0.1.1'
15 changes: 14 additions & 1 deletion pipeline_dp/aggregate_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class Metrics(Enum):
SUM = 'sum'
MEAN = 'mean'
VARIANCE = 'variance'
VECTOR_SUM = 'vector_sum'


class NoiseKind(Enum):
Expand Down Expand Up @@ -69,13 +70,15 @@ class AggregateParams:
max_value: Upper bound on each value.
custom_combiners: Warning: experimental@ Combiners for computing custom
metrics.
vector_norm_kind: The type of norm. Used only for VECTOR_SUM metric calculations.
vector_max_norm: Bound on each value of a vector. Used only for VECTOR_SUM metric calculations.
vector_size: Number of coordinates in a vector. Used only for VECTOR_SUM metric calculations.
contribution_bounds_already_enforced: assume that the input dataset complies
with the bounds provided in max_partitions_contributed and
max_contributions_per_partition. This option can be used if the dataset
does not contain any identifiers that can be used to enforce contribution
bounds automatically.
"""

metrics: Iterable[Metrics]
max_partitions_contributed: int
max_contributions_per_partition: int
Expand All @@ -87,6 +90,9 @@ class AggregateParams:
public_partitions: Any = None # deprecated
noise_kind: NoiseKind = NoiseKind.LAPLACE
custom_combiners: Iterable['CustomCombiner'] = None
vector_norm_kind: NormKind = NormKind.Linf
vector_max_norm: float = None
vector_size: int = None
contribution_bounds_already_enforced: bool = False

def __post_init__(self):
Expand Down Expand Up @@ -123,6 +129,13 @@ def __post_init__(self):
raise ValueError(
"params.max_value must be equal to or greater than params.min_value"
)
if Metrics.VECTOR_SUM in self.metrics and \
(Metrics.SUM in self.metrics or \
Metrics.MEAN in self.metrics or \
Metrics.VARIANCE in self.metrics):
raise ValueError(
"Vector sum can not be computed together with scalar metrics, like sum, mean etc"
)
if self.contribution_bounds_already_enforced and Metrics.PRIVACY_ID_COUNT in self.metrics:
raise ValueError(
"Cannot calculate PRIVACY_ID_COUNT when "
Expand Down
65 changes: 64 additions & 1 deletion pipeline_dp/combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@
# limitations under the License.
import abc
import copy
from typing import Iterable, Sized, Tuple, List
from typing import Iterable, Sized, Tuple, List, Union

import pipeline_dp
from pipeline_dp import dp_computations
from pipeline_dp import budget_accounting
import numpy as np
import collections

ArrayLike = Union[np.ndarray, List[float]]


class Combiner(abc.ABC):
"""Base class for all combiners.
Expand Down Expand Up @@ -148,6 +150,19 @@ def mean_var_params(self):
self.aggregate_params.max_contributions_per_partition,
self.aggregate_params.noise_kind)

@property
def additive_vector_noise_params(
self) -> dp_computations.AdditiveVectorNoiseParams:
return dp_computations.AdditiveVectorNoiseParams(
eps_per_coordinate=self.eps / self.aggregate_params.vector_size,
delta_per_coordinate=self.delta / self.aggregate_params.vector_size,
max_norm=self.aggregate_params.vector_max_norm,
l0_sensitivity=self.aggregate_params.max_partitions_contributed,
linf_sensitivity=self.aggregate_params.
max_contributions_per_partition,
norm_kind=self.aggregate_params.vector_norm_kind,
noise_kind=self.aggregate_params.noise_kind)


class CountCombiner(Combiner):
"""Combiner for computing DP Count.
Expand Down Expand Up @@ -460,6 +475,48 @@ def metrics_names(self) -> List[str]:
return self._metrics_to_compute


class VectorSumCombiner(Combiner):
"""Combiner for computing dp vector sum.
The type of the accumulator is np.ndarray, which represents sum of the vectors of the same size
for which this accumulator is computed.
"""
AccumulatorType = np.ndarray

def __init__(self, params: CombinerParams):
self._params = params

def create_accumulator(self,
values: Iterable[ArrayLike]) -> AccumulatorType:
array_sum = None
for val in values:
if not isinstance(val, np.ndarray):
val = np.array(val)
if val.shape != (self._params.aggregate_params.vector_size,):
raise TypeError(
f"Shape mismatch: {val.shape} != {(self._params.aggregate_params.vector_size,)}"
)
if array_sum is None:
array_sum = val
else:
array_sum += val
return array_sum

def merge_accumulators(self, array_sum1: AccumulatorType,
array_sum2: AccumulatorType):
return array_sum1 + array_sum2

def compute_metrics(self, array_sum: AccumulatorType) -> dict:
return {
'vector_sum':
dp_computations.add_noise_vector(
array_sum, self._params.additive_vector_noise_params)
}

def metrics_names(self) -> List[str]:
return ['vector_sum']


def create_compound_combiner(
aggregate_params: pipeline_dp.AggregateParams,
budget_accountant: budget_accounting.BudgetAccountant
Expand Down Expand Up @@ -508,6 +565,12 @@ def create_compound_combiner(
combiners.append(
PrivacyIdCountCombiner(
CombinerParams(budget_privacy_id_count, aggregate_params)))
if pipeline_dp.Metrics.VECTOR_SUM in aggregate_params.metrics:
budget_vector_sum = budget_accountant.request_budget(
mechanism_type, weight=aggregate_params.budget_weight)
combiners.append(
VectorSumCombiner(
CombinerParams(budget_vector_sum, aggregate_params)))

return CompoundCombiner(combiners, return_named_tuple=True)

Expand Down
65 changes: 63 additions & 2 deletions tests/combiners_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ def _create_mechanism_spec(no_noise):
return ba.MechanismSpec(ba.MechanismType.GAUSSIAN, None, eps, delta)


def _create_aggregate_params(max_value: float = 1):
def _create_aggregate_params(max_value: float = 1, vector_size: int = 1):
return pipeline_dp.AggregateParams(
min_value=0,
max_value=max_value,
max_partitions_contributed=1,
max_contributions_per_partition=3,
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
metrics=[pipeline_dp.Metrics.COUNT])
metrics=[pipeline_dp.Metrics.COUNT],
vector_max_norm=5,
vector_size=vector_size)


class CreateCompoundCombinersTest(parameterized.TestCase):
Expand Down Expand Up @@ -86,6 +88,9 @@ def _create_aggregate_params(self, metrics: typing.Optional[typing.List]):
pipeline_dp.Metrics.MEAN, pipeline_dp.Metrics.VARIANCE
],
expected_combiner_types=[dp_combiners.VarianceCombiner]),
dict(testcase_name='vector_sum',
metrics=[pipeline_dp.Metrics.VECTOR_SUM],
expected_combiner_types=[dp_combiners.VectorSumCombiner]),
)
def test_create_compound_combiner(self, metrics, expected_combiner_types):
# Arrange.
Expand Down Expand Up @@ -458,5 +463,61 @@ def test_compute_metrics_with_noise(self):
self.assertTrue(np.var(noised_sum) > 1) # check that noise is added


class VectorSumCombinerTest(parameterized.TestCase):

def _create_combiner(self, no_noise, vector_size):
mechanism_spec = _create_mechanism_spec(no_noise)
aggregate_params = _create_aggregate_params(vector_size=vector_size)
params = dp_combiners.CombinerParams(mechanism_spec, aggregate_params)
return dp_combiners.VectorSumCombiner(params)

@parameterized.named_parameters(
dict(testcase_name='no_noise', no_noise=True),
dict(testcase_name='noise', no_noise=False),
)
def test_create_accumulator(self, no_noise):
combiner = self._create_combiner(no_noise, vector_size=1)
self.assertEqual(np.array([0.]), combiner.create_accumulator([[0.]]))
self.assertEqual(
np.array([2.]),
combiner.create_accumulator([np.array([1.]),
np.array([1.])]))

@parameterized.named_parameters(
dict(testcase_name='no_noise', no_noise=True),
dict(testcase_name='noise', no_noise=False),
)
def test_merge_accumulators(self, no_noise):
combiner = self._create_combiner(no_noise, vector_size=1)
self.assertEqual(
np.array([0.]),
combiner.merge_accumulators(np.array([0.]), np.array([0.])))
combiner = self._create_combiner(no_noise, vector_size=2)
merge_result = combiner.merge_accumulators(np.array([1., 1.]),
np.array([1., 4.]))
self.assertTrue(np.array_equal(np.array([2., 5.]), merge_result))

def test_compute_metrics_no_noise(self):
combiner = self._create_combiner(no_noise=True, vector_size=1)
self.assertAlmostEqual(5,
combiner.compute_metrics(np.array(
[5]))['vector_sum'],
delta=1e-5)

def test_compute_metrics_with_noise(self):
combiner = self._create_combiner(no_noise=False, vector_size=2)
accumulator = np.array([1, 3])
noisy_values = [
combiner.compute_metrics(accumulator)['vector_sum']
for _ in range(1000)
]
# Standard deviation for the noise is about 1.37. So we set a large
# delta here.
mean_array = np.mean(noisy_values, axis=0)
self.assertAlmostEqual(accumulator[0], mean_array[0], delta=0.5)
self.assertAlmostEqual(accumulator[1], mean_array[1], delta=0.5)
self.assertTrue(np.var(noisy_values) > 1) # check that noise is added


if __name__ == '__main__':
absltest.main()

0 comments on commit bb40462

Please sign in to comment.