Skip to content

Commit

Permalink
Main logic for justification sync (#902)
Browse files Browse the repository at this point in the history
Co-authored-by: timorl <[email protected]>
  • Loading branch information
timorl and timorl authored Feb 7, 2023
1 parent 83ad9e1 commit a6d3a62
Show file tree
Hide file tree
Showing 9 changed files with 628 additions and 39 deletions.
71 changes: 69 additions & 2 deletions finality-aleph/src/sync/data.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use std::mem::size_of;
use std::{collections::HashSet, marker::PhantomData, mem::size_of};

use aleph_primitives::MAX_BLOCK_SIZE;
use codec::{Decode, Encode, Error as CodecError, Input as CodecInput};
use log::warn;

use crate::{sync::Justification, Version};
use crate::{
network::GossipNetwork,
sync::{BlockIdFor, Justification, LOG_TARGET},
Version,
};

/// The representation of the database state to be sent to other nodes.
/// In the first version this only contains the top justification.
Expand All @@ -13,6 +17,16 @@ pub struct State<J: Justification> {
top_justification: J::Unverified,
}

impl<J: Justification> State<J> {
pub fn new(top_justification: J::Unverified) -> Self {
State { top_justification }
}

pub fn top_justification(&self) -> J::Unverified {
self.top_justification.clone()
}
}

/// Data to be sent over the network.
#[derive(Clone, Debug, Encode, Decode)]
pub enum NetworkData<J: Justification> {
Expand All @@ -22,6 +36,8 @@ pub enum NetworkData<J: Justification> {
StateBroadcast(State<J>),
/// A series of justifications, sent to a node that is clearly behind.
Justifications(Vec<J::Unverified>, State<J>),
/// An explicit request for data, potentially a lot of it.
Request(BlockIdFor<J>, State<J>),
}

/// Version wrapper around the network data.
Expand All @@ -44,6 +60,7 @@ fn encode_with_version(version: Version, payload: &[u8]) -> Vec<u8> {

if size > MAX_SYNC_MESSAGE_SIZE {
warn!(
target: LOG_TARGET,
"Versioned sync message v{:?} too big during Encode. Size is {:?}. Should be {:?} at max.",
version,
payload.len(),
Expand Down Expand Up @@ -100,3 +117,53 @@ impl<J: Justification> Decode for VersionedNetworkData<J> {
}
}
}

/// Wrap around a network to avoid thinking about versioning.
pub struct VersionWrapper<J: Justification, N: GossipNetwork<VersionedNetworkData<J>>> {
inner: N,
_phantom: PhantomData<J>,
}

impl<J: Justification, N: GossipNetwork<VersionedNetworkData<J>>> VersionWrapper<J, N> {
/// Wrap the inner network.
pub fn new(inner: N) -> Self {
VersionWrapper {
inner,
_phantom: PhantomData,
}
}
}

#[async_trait::async_trait]
impl<J: Justification, N: GossipNetwork<VersionedNetworkData<J>>> GossipNetwork<NetworkData<J>>
for VersionWrapper<J, N>
{
type Error = N::Error;
type PeerId = N::PeerId;

fn send_to(&mut self, data: NetworkData<J>, peer_id: Self::PeerId) -> Result<(), Self::Error> {
self.inner.send_to(VersionedNetworkData::V1(data), peer_id)
}

fn send_to_random(
&mut self,
data: NetworkData<J>,
peer_ids: HashSet<Self::PeerId>,
) -> Result<(), Self::Error> {
self.inner
.send_to_random(VersionedNetworkData::V1(data), peer_ids)
}

fn broadcast(&mut self, data: NetworkData<J>) -> Result<(), Self::Error> {
self.inner.broadcast(VersionedNetworkData::V1(data))
}

async fn next(&mut self) -> Result<(NetworkData<J>, Self::PeerId), Self::Error> {
loop {
match self.inner.next().await? {
(VersionedNetworkData::Other(version, _), _) => warn!(target: LOG_TARGET, "Received sync data of unsupported version {:?}, this node might be running outdated software.", version),
(VersionedNetworkData::V1(data), peer_id) => return Ok((data, peer_id)),
}
}
}
}
7 changes: 3 additions & 4 deletions finality-aleph/src/sync/forest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ use std::collections::{
HashMap, HashSet,
};

use crate::sync::{BlockIdentifier, Header, Justification, PeerId};
use crate::sync::{BlockIdFor, BlockIdentifier, Header, Justification, PeerId};

mod vertex;

use vertex::{JustificationAddResult, Vertex};

type BlockIdFor<J> = <<J as Justification>::Header as Header>::Identifier;
pub use vertex::JustificationAddResult;
use vertex::Vertex;

pub struct JustificationWithParent<J: Justification> {
pub justification: J,
Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/sync/forest/vertex.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashSet;

use crate::sync::{forest::BlockIdFor, Justification, PeerId};
use crate::sync::{BlockIdFor, Justification, PeerId};

#[derive(Clone, Debug, Copy, PartialEq, Eq)]
enum HeaderImportance {
Expand Down
Loading

0 comments on commit a6d3a62

Please sign in to comment.