Skip to content

Commit

Permalink
feat: front-backend communication channel
Browse files Browse the repository at this point in the history
Signed-off-by: Martichou <[email protected]>
  • Loading branch information
Martichou committed Feb 22, 2024
1 parent 8560c82 commit 45d9d7c
Show file tree
Hide file tree
Showing 13 changed files with 177 additions and 77 deletions.
4 changes: 0 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,3 @@ repos:
hooks:
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/doublify/pre-commit-rust
rev: v1.0
hooks:
- id: fmt
1 change: 1 addition & 0 deletions core_lib/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ tokio = { version = "1.25", features = ["macros", "rt", "rt-multi-thread", "net"
tokio-util = { version = "0.7", features = ["rt"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = "1.7.0"
serde = { version = "1.0", features = ["derive"] }

[build-dependencies]
prost-build = "0.12"
30 changes: 30 additions & 0 deletions core_lib/src/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use serde::{Deserialize, Serialize};

use crate::hdl::info::TransferMetadata;
use crate::hdl::State;

#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub enum ChannelDirection {
FrontToLib,
LibToFront,
}

#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub enum ChannelAction {
AcceptTransfer,
RejectTransfer,
CancelTransfer,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ChannelMessage {
pub id: String,
pub direction: ChannelDirection,

// Only present when channelDirection is frontToLib
pub action: Option<ChannelAction>,

// Only present when channelDirection is libToFront
pub state: Option<State>,
pub meta: Option<TransferMetadata>,
}
92 changes: 82 additions & 10 deletions core_lib/src/hdl/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ use rand::Rng;
use sha2::{Digest, Sha256, Sha512};
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tokio::sync::broadcast::{Receiver, Sender};

use super::{InnerState, State};
use crate::channel::{ChannelAction, ChannelDirection, ChannelMessage};
use crate::hdl::info::{InternalFileInfo, TransferMetadata};
use crate::location_nearby_connections::payload_transfer_frame::{
payload_header, PacketType, PayloadChunk, PayloadHeader,
Expand Down Expand Up @@ -43,10 +45,14 @@ const SANE_FRAME_LENGTH: i32 = 5 * 1024 * 1024;
pub struct InboundRequest {
socket: TcpStream,
state: InnerState,
sender: Sender<ChannelMessage>,
receiver: Receiver<ChannelMessage>,
}

impl InboundRequest {
pub fn new(socket: TcpStream, id: String) -> Self {
pub fn new(socket: TcpStream, id: String, sender: Sender<ChannelMessage>) -> Self {
let receiver = sender.subscribe();

Self {
socket,
state: InnerState {
Expand All @@ -58,14 +64,57 @@ impl InboundRequest {
text_payload_id: -1,
..Default::default()
},
sender,
receiver,
}
}

pub async fn handle(&mut self) -> Result<(), anyhow::Error> {
// Buffer for the 4-byte length
let mut length_buf = [0u8; 4];
stream_read_exact(&mut self.socket, &mut length_buf).await?;

tokio::select! {
i = self.receiver.recv() => {
match i {
Ok(channel_msg) => {
if channel_msg.direction == ChannelDirection::LibToFront {
return Ok(());
}

debug!("inbound: got: {:?}", channel_msg);
match channel_msg.action {
Some(ChannelAction::AcceptTransfer) => {
self.accept_transfer().await?;
},
Some(ChannelAction::RejectTransfer) => {
self.reject_transfer(Some(
sharing_nearby::connection_response_frame::Status::Reject
)).await?;
},
Some(ChannelAction::CancelTransfer) => {
todo!()
},
None => {
trace!("inbound: nothing to do")
},
}
}
Err(e) => {
error!("inbound: channel error: {}", e);
}
}
},
h = stream_read_exact(&mut self.socket, &mut length_buf) => {
h?;

self._handle(length_buf).await?
}
}

Ok(())
}

pub async fn _handle(&mut self, length_buf: [u8; 4]) -> Result<(), anyhow::Error> {
let msg_length = u32::from_be_bytes(length_buf) as usize;
// Ensure the message length is not unreasonably big to avoid allocation attacks
if msg_length > SANE_FRAME_LENGTH as usize {
Expand Down Expand Up @@ -512,6 +561,13 @@ impl InboundRequest {
self.state.transferred_files.remove(&payload_id);
if self.state.transferred_files.is_empty() {
info!("Transfer finished");
self.sender.send(ChannelMessage {
id: self.state.id.clone(),
direction: ChannelDirection::LibToFront,
action: None,
state: Some(State::Finished),
meta: None,
})?;
self.disconnection().await?;
return Err(anyhow!(crate::errors::AppError::NotAnError));
}
Expand Down Expand Up @@ -670,47 +726,63 @@ impl InboundRequest {
}

let metadata = TransferMetadata {
files: files_name,
id: self.state.id.clone(),
files: files_name,
pin_code: self.state.pin_code.clone(),
text_description: None,
};

info!("Asking for user consent: {:?}", metadata);
self.update_state(|e| {
e.transfer_metadata = Some(metadata);
e.transfer_metadata = Some(metadata.clone());
});

// TODO - Ask for user consent
// Currently always accept file transfer
self.accept_transfer().await?;
// self.accept_transfer().await?;
self.sender.send(ChannelMessage {
id: self.state.id.clone(),
direction: ChannelDirection::LibToFront,
action: None,
state: Some(self.state.state.clone()),
meta: Some(metadata),
})?;
} else if introduction.text_metadata.len() == 1 {
trace!("process_introduction: handling text_metadata");
let meta = introduction.text_metadata.first().unwrap();
if meta.r#type() == text_metadata::Type::Url {
let metadata = TransferMetadata {
files: vec![],
id: self.state.id.clone(),
files: vec![],
pin_code: self.state.pin_code.clone(),
text_description: meta.text_title.clone(),
};

info!("Asking for user consent: {:?}", metadata);
self.update_state(|e| {
e.text_payload_id = meta.payload_id();
e.transfer_metadata = Some(metadata);
e.transfer_metadata = Some(metadata.clone());
});

// TODO - Ask for user consent
// Currently always accept file transfer
self.accept_transfer().await?;
// self.accept_transfer().await?;
self.sender.send(ChannelMessage {
id: self.state.id.clone(),
direction: ChannelDirection::LibToFront,
action: None,
state: Some(self.state.state.clone()),
meta: Some(metadata),
})?;
} else {
// TODO - Reject transfer
// Reject transfer
self.reject_transfer(Some(
sharing_nearby::connection_response_frame::Status::UnsupportedAttachmentType,
))
.await?;
}
} else {
// TODO - Reject transfer
// Reject transfer
self.reject_transfer(Some(
sharing_nearby::connection_response_frame::Status::UnsupportedAttachmentType,
))
Expand Down
6 changes: 4 additions & 2 deletions core_lib/src/hdl/info.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::fs::File;
use std::path::PathBuf;

use serde::{Deserialize, Serialize};

use crate::sharing_nearby::FileMetadata;

#[derive(Debug)]
Expand All @@ -12,10 +14,10 @@ pub struct InternalFileInfo {
pub file: Option<File>,
}

#[derive(Debug)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TransferMetadata {
pub files: Vec<String>,
pub id: String,
pub files: Vec<String>,
pub pin_code: Option<String>,
pub text_description: Option<String>,
}
6 changes: 4 additions & 2 deletions core_lib/src/hdl/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;

use p256::{PublicKey, SecretKey};
use serde::{Deserialize, Serialize};

use self::info::{InternalFileInfo, TransferMetadata};
use crate::securegcm::ukey2_client_init::CipherCommitment;
Expand All @@ -10,12 +11,12 @@ mod ble;
pub use ble::*;
mod inbound;
pub use inbound::*;
mod info;
pub(crate) mod info;
mod mdns;
pub use mdns::*;

#[allow(dead_code)]
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub enum State {
#[default]
Initial,
Expand All @@ -28,6 +29,7 @@ pub enum State {
WaitingForUserConsent,
ReceivingFiles,
Disconnected,
Finished,
}

#[derive(Debug, Default)]
Expand Down
17 changes: 14 additions & 3 deletions core_lib/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
#[macro_use]
extern crate log;

use channel::ChannelMessage;
use tokio::net::TcpListener;
use tokio::sync::broadcast;
use tokio::sync::broadcast::{self, Receiver, Sender};
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;

use crate::hdl::{BleListener, MDnsServer};
use crate::manager::TcpServer;

pub mod channel;
mod errors;
mod hdl;
mod manager;
Expand Down Expand Up @@ -36,6 +38,7 @@ pub mod location_nearby_connections {
pub struct RQS {
tracker: TaskTracker,
ctoken: CancellationToken,
pub channel: (Sender<ChannelMessage>, Receiver<ChannelMessage>),
}

impl Default for RQS {
Expand All @@ -48,17 +51,25 @@ impl RQS {
fn new() -> Self {
let tracker = TaskTracker::new();
let ctoken = CancellationToken::new();
let channel = broadcast::channel(10);

Self { tracker, ctoken }
Self {
tracker,
ctoken,
channel,
}
}

pub async fn run(&self) -> Result<(), anyhow::Error> {
let tcp_listener = TcpListener::bind("0.0.0.0:0").await?;
let binded_addr = tcp_listener.local_addr()?;
info!("TcpListener on: {}", binded_addr);

// Sender for the TcpServer
let sender = self.channel.0.clone();

// Start TcpServer in own "task"
let server = TcpServer::new(tcp_listener)?;
let mut server = TcpServer::new(tcp_listener, sender)?;
let ctk = self.ctoken.clone();
self.tracker.spawn(async move { server.run(ctk).await });

Expand Down
24 changes: 17 additions & 7 deletions core_lib/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
use tokio::net::TcpListener;
use tokio::sync::broadcast::Sender;
use tokio_util::sync::CancellationToken;

use crate::channel::ChannelMessage;
use crate::errors::AppError;
use crate::hdl::InboundRequest;

const INNER_NAME: &str = "TcpServer";

pub struct TcpServer {
tcp_listener: TcpListener,
sender: Sender<ChannelMessage>,
}

impl TcpServer {
pub fn new(tcp_listener: TcpListener) -> Result<Self, anyhow::Error> {
Ok(Self { tcp_listener })
pub fn new(
tcp_listener: TcpListener,
sender: Sender<ChannelMessage>,
) -> Result<Self, anyhow::Error> {
Ok(Self {
tcp_listener,
sender,
})
}

pub async fn run(&self, ctk: CancellationToken) -> Result<(), anyhow::Error> {
pub async fn run(&mut self, ctk: CancellationToken) -> Result<(), anyhow::Error> {
info!("{INNER_NAME}: service starting");

loop {
Expand All @@ -29,18 +38,19 @@ impl TcpServer {
r = self.tcp_listener.accept() => {
match r {
Ok((socket, remote_addr)) => {
trace!("New client: {remote_addr}");
trace!("{INNER_NAME}: new client: {remote_addr}");
let csender = self.sender.clone();

tokio::spawn(async move {
let mut ir = InboundRequest::new(socket, remote_addr.to_string());
let mut ir = InboundRequest::new(socket, remote_addr.to_string(), csender);

loop {
match ir.handle().await {
Ok(_) => {},
Err(e) => match e.downcast_ref() {
Some(AppError::NotAnError) => break,
None => {
error!("Error while handling client: {e}");
error!("{INNER_NAME}: error while handling client: {e}");
break;
}
},
Expand All @@ -49,7 +59,7 @@ impl TcpServer {
});
},
Err(err) => {
error!("TcpListener: error accepting: {}", err);
error!("{INNER_NAME}: error accepting: {}", err);
break;
}
}
Expand Down
Loading

0 comments on commit 45d9d7c

Please sign in to comment.