Skip to content

Commit

Permalink
Support multithreading in seq_join/parallel_join
Browse files Browse the repository at this point in the history
Support is currently behind a feature flag that is not enabled by default
We use userspace concurrency to drive many futures in parallel by spawning
tasks inside the executor. This model is not ideal for performance because
memory loads will happen across thread boundaries and NUMA cores, but
already gives 50% more throughput for OPRF version and 200% to old IPA.
  • Loading branch information
akoshelev committed Nov 30, 2023
1 parent 110f586 commit 362dd56
Show file tree
Hide file tree
Showing 7 changed files with 453 additions and 185 deletions.
3 changes: 3 additions & 0 deletions ipa-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ step-trace = ["descriptive-gate"]
# of unit tests use it. Compact uses memory-efficient gates and is suitable for production.
descriptive-gate = []
compact-gate = ["ipa-macros/compact-gate"]
# Enable using more than one thread for protocol execution. Most of the parallelism occurs at parallel/seq_join operations
multi-threading = ["async-scoped"]

# Standalone aggregation protocol. We use IPA infra for communication
# but it has nothing to do with IPA.
Expand All @@ -73,6 +75,7 @@ ipa-macros = { version = "*", path = "../ipa-macros" }

aes = "0.8.3"
async-trait = "0.1.68"
async-scoped = { version = "0.7.1", features = ["use-tokio"], optional = true }
axum = { version = "0.5.17", optional = true, features = ["http2"] }
axum-server = { version = "0.5.1", optional = true, features = [
"rustls",
Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/protocol/basics/reshare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::{
/// `to_helper` = (`rand_left`, `rand_right`) = (r0, r1)
/// `to_helper.right` = (`rand_right`, part1 + part2) = (r0, part1 + part2)
#[async_trait]
pub trait Reshare<C: Context, B: RecordBinding>: Sized {
pub trait Reshare<C: Context, B: RecordBinding>: Sized + 'static {
async fn reshare<'fut>(
&self,
ctx: C,
Expand Down
32 changes: 17 additions & 15 deletions ipa-core/src/protocol/modulus_conversion/convert_shares.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,17 +292,17 @@ where
/// # Panics
/// If the total record count on the context is unspecified.
#[tracing::instrument(name = "modulus_conversion", skip_all, fields(bits = ?bit_range, gate = %ctx.gate().as_ref()))]
pub fn convert_bits<F, V, C, S, VS>(
pub fn convert_bits<'a, F, V, C, S, VS>(
ctx: C,
binary_shares: VS,
bit_range: Range<u32>,
) -> impl Stream<Item = Result<BitDecomposed<S>, Error>>
) -> impl Stream<Item = Result<BitDecomposed<S>, Error>> + 'a
where
F: PrimeField,
V: ToBitConversionTriples<Residual = ()>,
C: UpgradedContext<F, Share = S>,
V: ToBitConversionTriples<Residual = ()> + 'a,
C: UpgradedContext<F, Share = S> + 'a,
S: LinearSecretSharing<F> + SecureMul<C>,
VS: Stream<Item = V> + Unpin + Send,
VS: Stream<Item = V> + Unpin + Send + 'a,
for<'u> UpgradeContext<'u, C, F, RecordId>:
UpgradeToMalicious<'u, BitConversionTriple<Replicated<F>>, BitConversionTriple<C::Share>>,
{
Expand All @@ -313,35 +313,37 @@ where
/// Note that unconverted fields are not upgraded, so they might need to be upgraded either before or
/// after invoking this function.
#[tracing::instrument(name = "modulus_conversion", skip_all, fields(bits = ?bit_range, gate = %ctx.gate().as_ref()))]
pub fn convert_selected_bits<F, V, C, S, VS, R>(
pub fn convert_selected_bits<'inp, F, V, C, S, VS, R>(
ctx: C,
binary_shares: VS,
bit_range: Range<u32>,
) -> impl Stream<Item = Result<(BitDecomposed<S>, R), Error>>
) -> impl Stream<Item = Result<(BitDecomposed<S>, R), Error>> + 'inp
where
R: Send + 'static,
F: PrimeField,
V: ToBitConversionTriples<Residual = R>,
C: UpgradedContext<F, Share = S>,
V: ToBitConversionTriples<Residual = R> + 'inp,
C: UpgradedContext<F, Share = S> + 'inp,
S: LinearSecretSharing<F> + SecureMul<C>,
VS: Stream<Item = V> + Unpin + Send,
VS: Stream<Item = V> + Unpin + Send + 'inp,
for<'u> UpgradeContext<'u, C, F, RecordId>:
UpgradeToMalicious<'u, BitConversionTriple<Replicated<F>>, BitConversionTriple<C::Share>>,
{
convert_some_bits(ctx, binary_shares, RecordId::FIRST, bit_range)
}

pub(crate) fn convert_some_bits<F, V, C, S, VS, R>(
pub(crate) fn convert_some_bits<'a, F, V, C, S, VS, R>(
ctx: C,
binary_shares: VS,
first_record: RecordId,
bit_range: Range<u32>,
) -> impl Stream<Item = Result<(BitDecomposed<S>, R), Error>>
) -> impl Stream<Item = Result<(BitDecomposed<S>, R), Error>> + 'a
where
R: Send + 'static,
F: PrimeField,
V: ToBitConversionTriples<Residual = R>,
C: UpgradedContext<F, Share = S>,
V: ToBitConversionTriples<Residual = R> + 'a,
C: UpgradedContext<F, Share = S> + 'a,
S: LinearSecretSharing<F> + SecureMul<C>,
VS: Stream<Item = V> + Unpin + Send,
VS: Stream<Item = V> + Unpin + Send + 'a,
for<'u> UpgradeContext<'u, C, F, RecordId>:
UpgradeToMalicious<'u, BitConversionTriple<Replicated<F>>, BitConversionTriple<C::Share>>,
{
Expand Down
4 changes: 3 additions & 1 deletion ipa-core/src/protocol/sort/generate_permutation_opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,9 @@ mod tests {
}

/// Passing 32 records for Fp31 doesn't work.
#[tokio::test]
///
/// Requires one extra thread to cancel futures running in parallel with the one that panics.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[should_panic = "prime field ipa_core::ff::prime_field::fp31::Fp31 is too small to sort 32 records"]
async fn fp31_overflow() {
const COUNT: usize = 32;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl<V: SharedValue + ExtendableField> LinearSecretSharing<V> for AdditiveShare<
/// when the protocol is done. This should not be used directly.
#[async_trait]
pub trait Downgrade: Send {
type Target: Send;
type Target: Send + 'static;
async fn downgrade(self) -> UnauthorizedDowngradeWrapper<Self::Target>;
}

Expand Down
3 changes: 2 additions & 1 deletion ipa-core/src/secret_sharing/scheme.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::{SharedValue, WeakSharedValue};
use crate::ff::{AddSub, AddSubAssign, GaloisField};

/// Secret sharing scheme i.e. Replicated secret sharing
pub trait SecretSharing<V: WeakSharedValue>: Clone + Debug + Sized + Send + Sync {
pub trait SecretSharing<V: WeakSharedValue>: Clone + Debug + Sized + Send + Sync + 'static {
const ZERO: Self;
}

Expand All @@ -21,6 +21,7 @@ pub trait Linear<V: SharedValue>:
+ Mul<V, Output = Self>
+ for<'r> Mul<&'r V, Output = Self>
+ Neg<Output = Self>
+ 'static
{
}

Expand Down
Loading

0 comments on commit 362dd56

Please sign in to comment.