Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
VxDxK committed Sep 3, 2024
1 parent 8a3f4dd commit 22ecb01
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 51 deletions.
15 changes: 15 additions & 0 deletions .github/workflows/cloc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name: Count Lines of Code

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

jobs:
cloc:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Count Lines of Code (cloc)
uses: djdefi/cloc-action@6
2 changes: 1 addition & 1 deletion torrent-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ sha1 = "0.10"
reqwest = { version = "0.12", features = ["blocking"] }
rand = "0.8.5"
hex = "0.4.3"
bytes = "1.6.1"
bytes = "1"
40 changes: 12 additions & 28 deletions torrent-client/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod worker;

use crate::file::TorrentFile;
use crate::peer::connection::PeerConnection;
use crate::peer::PeerId;
Expand All @@ -8,6 +10,7 @@ use std::net::TcpStream;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use thiserror::Error;
use crate::client::worker::PeerWorker;

#[derive(Error, Debug)]
pub enum ClientError {
Expand Down Expand Up @@ -51,36 +54,17 @@ impl Client {
.set_port(6881)
.set_num_want(Some(100))
.set_request_mode(RequestMode::Verbose);
let mut distribution = self.tracker_client.announce(meta.announce, params)?;
let mut rng = rand::thread_rng();
distribution.peers.shuffle(&mut rng);
let peers = Arc::new(Mutex::new(VecDeque::from(distribution.peers)));
let mut torrent_info = self.tracker_client.announce(meta.announce.clone(), params)?;
torrent_info.peers.shuffle(&mut rand::thread_rng());
let peers = Arc::new(Mutex::new(torrent_info.peers));
let mut handles = vec![];
for worker_id in 0..self.config.connection_numbers {
let peers_a = peers.clone();
let client_id = self.client_id.clone();
let handle = std::thread::spawn(move || {
let mut q = peers_a.lock().unwrap();
if q.len() == 0 {
println!("thread {worker_id} closes due no peers");
}
let peer = q.pop_back().unwrap();
drop(q);
println!("{:#?}", peer);

println!();
let connection = TcpStream::connect_timeout(&peer.addr, Duration::from_secs(5));
if connection.is_err() {
println!("timeout ");
return;
}
let connection = connection.unwrap();
let bt_conn =
PeerConnection::handshake(connection, &meta.info.info_hash.clone(), &client_id);
match bt_conn {
Ok(_) => println!("conn ok"),
Err(e) => println!("err {}", e.to_string()),
}
let meta = Arc::new(meta);
let id = Arc::new(self.client_id.clone());
for _ in 0..self.config.connection_numbers {
let mut worker = PeerWorker::new(peers.clone(), meta.clone(), id.clone());
let handle = std::thread::spawn(move || {
worker.go();
});
handles.push(handle);
}
Expand Down
49 changes: 49 additions & 0 deletions torrent-client/src/client/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::net::TcpStream;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use crate::file::TorrentFile;
use crate::peer::connection::PeerConnection;
use crate::peer::{Peer, PeerId};

pub struct PeerWorker {
peers: Arc<Mutex<Vec<Peer>>>,
meta: Arc<TorrentFile>,
client_id: Arc<PeerId>
}

impl PeerWorker {
pub fn go(&mut self) {
loop {
let mut q = self.peers.lock().unwrap();
if q.len() == 0 {
return;
}
let peer = q.pop().unwrap();
drop(q);
let connection = TcpStream::connect_timeout(&peer.addr, Duration::from_secs(5));
if connection.is_err() {
continue;
}
let connection =
PeerConnection::handshake(connection.unwrap(), &self.meta.info.info_hash, &self.client_id);
if connection.is_err() {
continue;
}
let mut connection = connection.unwrap();
loop {
match connection.recv() {
Ok(message) => {
println!("message {message}")
}
Err(err) => {
println!("error {err:?}")
}
}
}
}
}

pub fn new(peers: Arc<Mutex<Vec<Peer>>>, meta: Arc<TorrentFile>, client_id: Arc<PeerId>) -> Self {
Self { peers, meta, client_id }
}
}
163 changes: 148 additions & 15 deletions torrent-client/src/peer/connection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::peer::connection::ConnectionError::HandshakeFailed;
use std::fmt::{format, Display, Formatter};
use crate::peer::connection::ConnectionError::{HandshakeFailed, MessageId, PayloadLength, Todo, UnexpectedEOF};
use crate::peer::connection::HandshakeMessageError::{ProtocolString, ProtocolStringLen};
use crate::peer::PeerId;
use crate::util::Sha1;
use crate::util::{BitField, Sha1};
use bytes::{Buf, BufMut};
use std::io;
use std::io::{Read, Write};
Expand Down Expand Up @@ -66,6 +67,12 @@ impl HandshakeMessage {
}
}

impl From<HandshakeMessage> for Box<[u8; 68]> {
fn from(value: HandshakeMessage) -> Self {
value.to_bytes()
}
}

#[derive(Error, Debug)]
pub enum ConnectionError {
#[error("BitTorrent handshake failed {0}")]
Expand All @@ -74,6 +81,14 @@ pub enum ConnectionError {
HandshakeResponse(#[from] HandshakeMessageError),
#[error(transparent)]
IoKind(#[from] io::Error),
#[error("Unexpected end of file")]
UnexpectedEOF,
#[error("Undefined message id {0}")]
MessageId(u8),
#[error("Unexpected payload length {0}")]
PayloadLength(usize),
#[error("todo")]
Todo,
}

pub struct PeerConnection {
Expand All @@ -90,33 +105,151 @@ impl PeerConnection {
let mut bytes =
HandshakeMessage::new([0; 8], info_hash.clone(), peer_id.clone()).to_bytes();
let _ = tcp_connection.write_all(bytes.as_ref())?;
let read_bytes = tcp_connection.read(bytes.as_mut())?;
if read_bytes != 68 {
return Err(HandshakeFailed(format!(
"Invalid bytes count received {read_bytes}"
)));
}

let _ = tcp_connection.read_exact(bytes.as_mut())?;
let response = HandshakeMessage::from_bytes(bytes)?;

Ok(Self {
tcp_connection,
peer_id: response.peer_id,
})
}

pub fn recv(&mut self) -> Result<Message> {
let mut length_prefix = [0u8; 4];
let _ = self.tcp_connection.read_exact(&mut length_prefix)?;
let length_prefix = u32::from_be_bytes(length_prefix);
if length_prefix == 0 {
return Ok(Message::KeepAlive);
}
let mut data = Vec::with_capacity(length_prefix as usize);
data.resize(length_prefix as usize, 0);
let _ = self.tcp_connection.read_exact(data.as_mut_slice())?;
let message = Message::try_from(data.as_slice())?;
Ok(message)
}
}

#[derive(Debug, Clone)]
pub struct BlockRequest {
index: u32,
begin: u32,
length: u32,
}

impl BlockRequest {
pub fn new(index: u32, begin: u32, length: u32) -> Self {
Self { index, begin, length }
}
}

impl TryFrom<&[u8]> for BlockRequest {
type Error = ConnectionError;

fn try_from(mut value: &[u8]) -> std::result::Result<Self, Self::Error> {
if value.len() != 12 {
return Err(PayloadLength(value.len()))
}
Ok(BlockRequest::new(value.get_u32_ne(), value.get_u32_ne(), value.get_u32_ne()))
}
}

#[derive(Debug, Clone)]
pub struct Piece {
index: u32,
begin: u32,
data: Vec<u8>,
}

impl Piece {
pub fn new(index: u32, begin: u32, data: Vec<u8>) -> Self {
Self { index, begin, data }
}
}

impl TryFrom<&[u8]> for Piece {
type Error = ConnectionError;

fn try_from(mut value: &[u8]) -> std::result::Result<Self, Self::Error> {
if value.len() < 8 {
return Err(PayloadLength(value.len()))
}
Ok(Piece::new(value.get_u32_ne(), value.get_u32_ne(), value.to_vec()))
}
}

pub enum Messages {
#[derive(Debug, Clone)]
pub enum Message {
KeepAlive,
Choke,
UnChoke,
Interested,
NotInterested,
Have,
Request,
Piece,
Cancel,
Port
Have(u32),
Bitfield(Vec<BitField>),
Request(BlockRequest),
Piece(Piece),
Cancel(BlockRequest),
Port(u16),
}

impl Display for Message {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Message::KeepAlive => write!(f, "KeepAlive"),
Message::Choke => write!(f, "Choke"),
Message::UnChoke => write!(f, "UnChoke"),
Message::Interested => write!(f, "Interested"),
Message::NotInterested => write!(f, "NotInterested"),
Message::Have(have) => write!(f, "Have({})", have),
Message::Bitfield(_) => write!(f, "Bitfield"),
Message::Request(_) => write!(f, "Request"),
Message::Piece(_) => write!(f, "Piece"),
Message::Cancel(_) => write!(f, "Cancel"),
Message::Port(port) => write!(f, "Port({})", port),
}
}
}

impl TryFrom<&[u8]> for Message {
type Error = ConnectionError;

fn try_from(mut value: &[u8]) -> std::result::Result<Self, Self::Error> {
let id = value.get(0).ok_or(UnexpectedEOF)?.to_owned();
value = &value[1..];

let message: Message = match id {
0 => Message::Choke,
1 => Message::UnChoke,
2 => Message::Interested,
3 => Message::NotInterested,
4 => Message::Have(u32::from_be_bytes(
value
.get(0..4)
.ok_or(UnexpectedEOF)?
.try_into()
.map_err(|_| UnexpectedEOF)?,
)),
5 => Message::Bitfield(
value
.into_iter()
.map(|x| BitField::new(x.to_owned()))
.collect::<Vec<_>>(),
),
6 => Message::Request(BlockRequest::try_from(value)?),
7 => Message::Piece(Piece::try_from(value)?),
8 => Message::Cancel(BlockRequest::try_from(value)?),
9 => Message::Port(u16::from_be_bytes(
value
.get(0..2)
.ok_or(UnexpectedEOF)?
.try_into()
.map_err(|_| UnexpectedEOF)?,
)),
_ => return Err(MessageId(id)),
};

Ok(message)
}
}

#[cfg(test)]
Expand Down
9 changes: 3 additions & 6 deletions torrent-client/src/tracker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ impl AnnounceResponse {
"No 'ip' field found in dictionary form".to_string(),
))?
.try_into()?;
let ip = ip.parse::<IpAddr>().unwrap();
let ip = ip.parse::<IpAddr>().map_err(|_| {
ResponseFormat(format!("{ip} is not valid ip address"))
})?;
let port: u16 = dict
.remove(b"port".as_slice())
.ok_or(ResponseFormat(
Expand Down Expand Up @@ -314,11 +316,6 @@ impl TrackerClient for HttpTracker {
};
return Err(TrackerResponse(error));
}

for (k, v) in &bencode {
println!("{} {}", String::from_utf8(k.clone()).unwrap(), v.name())
}

AnnounceResponse::from_bencode(bencode)
}

Expand Down
Loading

0 comments on commit 22ecb01

Please sign in to comment.