Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing VectorSumCombiner #276

Merged
merged 12 commits into from
May 20, 2022
21 changes: 19 additions & 2 deletions examples/movie_view_ratings/run_all_frameworks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
flags.DEFINE_boolean(
'private_partitions', False,
'Output private partitions (do not calculate any DP metrics)')
flags.DEFINE_boolean('vector_metrics', False,
'Compute DP vector metrics for rating values')


def calculate_private_result(movie_views, pipeline_backend):
Expand All @@ -66,7 +68,6 @@ def calc_dp_rating_metrics(movie_views, backend, public_partitions):
# Create a DPEngine instance.
dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)

# Specify which DP aggregated metrics to compute.
params = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.LAPLACE,
metrics=[
Expand All @@ -79,12 +80,22 @@ def calc_dp_rating_metrics(movie_views, backend, public_partitions):
min_value=1,
max_value=5)

value_extractor = lambda mv: mv.rating

if FLAGS.vector_metrics:
rialg marked this conversation as resolved.
Show resolved Hide resolved
# Specify which DP aggregated metrics to compute for vector values.
params.metrics = [pipeline_dp.Metrics.VECTOR_SUM]
params.vector_size = 5 # Size of ratings vector
params.vector_max_norm = 1
value_extractor = lambda mv: one_hot_encoder(mv.rating - 1, params.
rialg marked this conversation as resolved.
Show resolved Hide resolved
vector_size)

# Specify how to extract privacy_id, partition_key and value from an
# element of movie view collection.
data_extractors = pipeline_dp.DataExtractors(
partition_extractor=lambda mv: mv.movie_id,
privacy_id_extractor=lambda mv: mv.user_id,
value_extractor=lambda mv: mv.rating)
value_extractor=value_extractor)

# Run aggregation.
dp_result = dp_engine.aggregate(movie_views, params, data_extractors,
Expand Down Expand Up @@ -163,6 +174,12 @@ def compute_on_local_backend():
write_to_file(dp_result, FLAGS.output_file)


def one_hot_encoder(value, vector_size):
vec = [0] * vector_size
vec[value] = 1
return vec


def main(unused_argv):
if FLAGS.framework == 'beam':
compute_on_beam()
Expand Down
1 change: 1 addition & 0 deletions pipeline_dp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@
from pipeline_dp.pipeline_backend import BeamBackend
from pipeline_dp.pipeline_backend import LocalBackend
from pipeline_dp.pipeline_backend import SparkRDDBackend
from pipeline_dp.aggregate_params import NormKind

__version__ = '0.1.1'
15 changes: 14 additions & 1 deletion pipeline_dp/aggregate_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class Metrics(Enum):
SUM = 'sum'
MEAN = 'mean'
VARIANCE = 'variance'
VECTOR_SUM = 'vector_sum'


class NoiseKind(Enum):
Expand Down Expand Up @@ -69,8 +70,10 @@ class AggregateParams:
max_value: Upper bound on each value.
custom_combiners: Warning: experimental@ Combiners for computing custom
metrics.
vector_norm_kind: The type of norm to use for the DP calculations.
rialg marked this conversation as resolved.
Show resolved Hide resolved
vector_max_norm: Bound on each value of a vector.
rialg marked this conversation as resolved.
Show resolved Hide resolved
vector_size: Number of coordinates in a vector.
rialg marked this conversation as resolved.
Show resolved Hide resolved
"""

metrics: Iterable[Metrics]
max_partitions_contributed: int
max_contributions_per_partition: int
Expand All @@ -82,6 +85,9 @@ class AggregateParams:
public_partitions: Any = None # deprecated
noise_kind: NoiseKind = NoiseKind.LAPLACE
custom_combiners: Iterable['CustomCombiner'] = None
vector_norm_kind: NormKind = NormKind.Linf
vector_max_norm: float = None
vector_size: int = None

def __post_init__(self):
if self.low is not None:
Expand Down Expand Up @@ -117,6 +123,13 @@ def __post_init__(self):
raise ValueError(
"params.max_value must be equal to or greater than params.min_value"
)
if Metrics.VECTOR_SUM in self.metrics and \
(Metrics.SUM in self.metrics or \
Metrics.MEAN in self.metrics or \
Metrics.VARIANCE in self.metrics):
raise ValueError(
"Vector sum can not be computed together with scalar metrics, like sum, mean etc"
)
if self.custom_combiners:
logging.warning("Warning: custom combiners are used. This is an "
"experimental feature. It might not work properly "
Expand Down
63 changes: 62 additions & 1 deletion pipeline_dp/combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@
# limitations under the License.
import abc
import copy
from typing import Iterable, Sized, Tuple, List
from typing import Iterable, Sized, Tuple, List, Union

import pipeline_dp
from pipeline_dp import dp_computations
from pipeline_dp import budget_accounting
import numpy as np
import collections

ArrayLike = Union[np.ndarray, List[float]]


class Combiner(abc.ABC):
"""Base class for all combiners.
Expand Down Expand Up @@ -148,6 +150,18 @@ def mean_var_params(self):
self.aggregate_params.max_contributions_per_partition,
self.aggregate_params.noise_kind)

@property
def additive_vector_noise_params(self):
rialg marked this conversation as resolved.
Show resolved Hide resolved
return dp_computations.AdditiveVectorNoiseParams(
eps_per_coordinate=self.eps / self.aggregate_params.vector_size,
delta_per_coordinate=self.delta / self.aggregate_params.vector_size,
max_norm=self.aggregate_params.vector_max_norm,
l0_sensitivity=self.aggregate_params.max_partitions_contributed,
linf_sensitivity=self.aggregate_params.
max_contributions_per_partition,
norm_kind=self.aggregate_params.vector_norm_kind,
noise_kind=self.aggregate_params.noise_kind)


class CountCombiner(Combiner):
"""Combiner for computing DP Count.
Expand Down Expand Up @@ -459,6 +473,47 @@ def metrics_names(self) -> List[str]:
return self._metrics_to_compute


class VectorSumCombiner(Combiner):
"""Combiner for computing dp vector sum.

The type of the accumulator is ArrayLike, which represents sum of the vectors of the same size
for which this accumulator is computed.
"""
AccumulatorType = ArrayLike

def __init__(self, params: CombinerParams):
self._params = params

def create_accumulator(
self, values: Iterable[AccumulatorType]) -> AccumulatorType:
rialg marked this conversation as resolved.
Show resolved Hide resolved
array_sum = None
for val in values:
rialg marked this conversation as resolved.
Show resolved Hide resolved
if type(val) is not np.ndarray:
rialg marked this conversation as resolved.
Show resolved Hide resolved
val = np.array(val)
if array_sum is None:
rialg marked this conversation as resolved.
Show resolved Hide resolved
array_sum = val
else:
if array_sum.shape != val.shape:
raise TypeError(
f"Shape mismatch: {array_sum.shape} != {val.shape}")
array_sum += val
return array_sum

def merge_accumulators(self, array_sum1: AccumulatorType,
array_sum2: AccumulatorType):
return array_sum1 + array_sum2

def compute_metrics(self, array_sum: AccumulatorType) -> dict:
return {
'vector_sum':
dp_computations.add_noise_vector(
array_sum, self._params.additive_vector_noise_params)
}

def metrics_names(self) -> List[str]:
return ['vector_sum']


def create_compound_combiner(
aggregate_params: pipeline_dp.AggregateParams,
budget_accountant: budget_accounting.BudgetAccountant
Expand Down Expand Up @@ -507,6 +562,12 @@ def create_compound_combiner(
combiners.append(
PrivacyIdCountCombiner(
CombinerParams(budget_privacy_id_count, aggregate_params)))
if pipeline_dp.Metrics.VECTOR_SUM in aggregate_params.metrics:
budget_vector_sum = budget_accountant.request_budget(
mechanism_type, weight=aggregate_params.budget_weight)
combiners.append(
VectorSumCombiner(
CombinerParams(budget_vector_sum, aggregate_params)))

return CompoundCombiner(combiners, return_named_tuple=True)

Expand Down
69 changes: 67 additions & 2 deletions tests/combiners_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ def _create_mechanism_spec(no_noise):
return ba.MechanismSpec(ba.MechanismType.GAUSSIAN, None, eps, delta)


def _create_aggregate_params(max_value: float = 1):
def _create_aggregate_params(max_value: float = 1, vector_size: int = 1):
return pipeline_dp.AggregateParams(
min_value=0,
max_value=max_value,
max_partitions_contributed=1,
max_contributions_per_partition=3,
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
metrics=[pipeline_dp.Metrics.COUNT])
metrics=[pipeline_dp.Metrics.COUNT],
vector_max_norm=5,
vector_size=vector_size)


class CreateCompoundCombinersTest(parameterized.TestCase):
Expand Down Expand Up @@ -86,6 +88,9 @@ def _create_aggregate_params(self, metrics: typing.Optional[typing.List]):
pipeline_dp.Metrics.MEAN, pipeline_dp.Metrics.VARIANCE
],
expected_combiner_types=[dp_combiners.VarianceCombiner]),
dict(testcase_name='vector_sum',
metrics=[pipeline_dp.Metrics.VECTOR_SUM],
expected_combiner_types=[dp_combiners.VectorSumCombiner]),
)
def test_create_compound_combiner(self, metrics, expected_combiner_types):
# Arrange.
Expand Down Expand Up @@ -458,5 +463,65 @@ def test_compute_metrics_with_noise(self):
self.assertTrue(np.var(noised_sum) > 1) # check that noise is added


class VectorSumCombinerTest(parameterized.TestCase):

def _create_combiner(self, no_noise, vector_size):
mechanism_spec = _create_mechanism_spec(no_noise)
aggregate_params = _create_aggregate_params(vector_size=vector_size)
params = dp_combiners.CombinerParams(mechanism_spec, aggregate_params)
return dp_combiners.VectorSumCombiner(params)

@parameterized.named_parameters(
dict(testcase_name='no_noise', no_noise=True),
dict(testcase_name='noise', no_noise=False),
)
def test_create_accumulator(self, no_noise):
combiner = self._create_combiner(no_noise, vector_size=1)
self.assertEqual(np.array([0.]), combiner.create_accumulator([[0.]]))
self.assertEqual(
np.array([2.]),
combiner.create_accumulator([np.array([1.]),
np.array([1.])]))
# Bounding on values.
#self.assertEqual(2, combiner.create_accumulator([1, 3]))
#self.assertEqual(1, combiner.create_accumulator([0, 3]))

@parameterized.named_parameters(
dict(testcase_name='no_noise', no_noise=True),
dict(testcase_name='noise', no_noise=False),
)
def test_merge_accumulators(self, no_noise):
combiner = self._create_combiner(no_noise, vector_size=1)
self.assertEqual(
np.array([0.]),
combiner.merge_accumulators(np.array([0.]), np.array([0.])))
combiner = self._create_combiner(no_noise, vector_size=2)
merge_resut = combiner.merge_accumulators(np.array([1., 1.]),
np.array([1., 4.]))
self.assertEqual(2., merge_resut[0])
rialg marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual(5., merge_resut[1])

def test_compute_metrics_no_noise(self):
combiner = self._create_combiner(no_noise=True, vector_size=1)
self.assertAlmostEqual(5,
combiner.compute_metrics(np.array(
[5]))['vector_sum'],
delta=1e-5)

def test_compute_metrics_with_noise(self):
combiner = self._create_combiner(no_noise=False, vector_size=2)
accumulator = np.array([1, 3])
noisy_values = [
combiner.compute_metrics(accumulator)['vector_sum']
for _ in range(1000)
]
# Standard deviation for the noise is about 1.37. So we set a large
# delta here.
mean_array = np.mean(noisy_values, axis=0)
self.assertAlmostEqual(accumulator[0], mean_array[0], delta=0.5)
self.assertAlmostEqual(accumulator[1], mean_array[1], delta=0.5)
self.assertTrue(np.var(noisy_values) > 1) # check that noise is added


if __name__ == '__main__':
absltest.main()