Skip to content

Commit

Permalink
g3proxy: use generics for escaper side wrapper stats
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed Aug 17, 2023
1 parent 7d72d61 commit 49d5600
Show file tree
Hide file tree
Showing 73 changed files with 770 additions and 1,590 deletions.
24 changes: 12 additions & 12 deletions g3proxy/src/escape/direct_fixed/ftp_connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@
* limitations under the License.
*/

use std::sync::Arc;

use g3_io_ext::{AggregatedIo, LimitedReader, LimitedWriter};

use super::{DirectFixedEscaper, DirectFixedEscaperStats};
use super::DirectFixedEscaper;
use crate::module::ftp_over_http::{
ArcFtpTaskRemoteControlStats, ArcFtpTaskRemoteTransferStats, BoxFtpRemoteConnection,
FtpControlRemoteWrapperStats, FtpTransferRemoteWrapperStats,
};
use crate::module::tcp_connect::{TcpConnectError, TcpConnectTaskNotes};
use crate::serve::ServerTaskNotes;

mod stats;
use stats::{FtpControlRemoteStats, FtpTransferRemoteStats};

impl DirectFixedEscaper {
pub(super) async fn new_ftp_control_connection<'a>(
&'a self,
Expand All @@ -37,22 +37,22 @@ impl DirectFixedEscaper {

let (r, w) = stream.into_split();

let mut wrapper_stats = FtpControlRemoteStats::new(&self.stats, task_stats);
let mut wrapper_stats = FtpControlRemoteWrapperStats::new(&self.stats, task_stats);
wrapper_stats.push_user_io_stats(self.fetch_user_upstream_io_stats(task_notes));
let (ups_r_stats, ups_w_stats) = wrapper_stats.into_pair();
let wrapper_stats = Arc::new(wrapper_stats);

let limit_config = &self.config.general.tcp_sock_speed_limit;
let r = LimitedReader::new(
r,
limit_config.shift_millis,
limit_config.max_south,
ups_r_stats,
wrapper_stats.clone() as _,
);
let w = LimitedWriter::new(
w,
limit_config.shift_millis,
limit_config.max_north,
ups_w_stats,
wrapper_stats as _,
);

Ok(Box::new(AggregatedIo {
Expand All @@ -74,22 +74,22 @@ impl DirectFixedEscaper {

let (r, w) = stream.into_split();

let mut wrapper_stats = FtpTransferRemoteStats::new(&self.stats, task_stats);
let mut wrapper_stats = FtpTransferRemoteWrapperStats::new(&self.stats, task_stats);
wrapper_stats.push_user_io_stats(self.fetch_user_upstream_io_stats(task_notes));
let (ups_r_stats, ups_w_stats) = wrapper_stats.into_pair();
let wrapper_stats = Arc::new(wrapper_stats);

let limit_config = &self.config.general.tcp_sock_speed_limit;
let r = LimitedReader::new(
r,
limit_config.shift_millis,
limit_config.max_south,
ups_r_stats,
wrapper_stats.clone() as _,
);
let w = LimitedWriter::new(
w,
limit_config.shift_millis,
limit_config.max_north,
ups_w_stats,
wrapper_stats as _,
);

Ok(Box::new(AggregatedIo {
Expand Down
131 changes: 0 additions & 131 deletions g3proxy/src/escape/direct_fixed/ftp_connect/stats.rs

This file was deleted.

22 changes: 10 additions & 12 deletions g3proxy/src/escape/direct_fixed/http_forward/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@ use g3_types::net::OpensslTlsClientConfig;
use super::{DirectFixedEscaper, DirectFixedEscaperStats};
use crate::log::escape::tls_handshake::TlsApplication;
use crate::module::http_forward::{
ArcHttpForwardTaskRemoteStats, BoxHttpForwardConnection, HttpForwardRemoteStatsWrapper,
ArcHttpForwardTaskRemoteStats, BoxHttpForwardConnection, HttpForwardRemoteWrapperStats,
HttpForwardTaskRemoteWrapperStats,
};
use crate::module::tcp_connect::{TcpConnectError, TcpConnectTaskNotes};
use crate::serve::ServerTaskNotes;

mod stats;
use stats::DirectHttpMixedRemoteStats;

mod reader;
mod writer;

Expand All @@ -47,8 +45,8 @@ impl DirectFixedEscaper {

let (ups_r, ups_w) = stream.into_split();

let mut w_wrapper_stats = DirectHttpMixedRemoteStats::new(&self.stats, &task_stats);
let mut r_wrapper_stats = HttpForwardRemoteStatsWrapper::new(task_stats);
let mut w_wrapper_stats = HttpForwardRemoteWrapperStats::new(&self.stats, &task_stats);
let mut r_wrapper_stats = HttpForwardTaskRemoteWrapperStats::new(task_stats);
let user_stats = self.fetch_user_upstream_io_stats(task_notes);
w_wrapper_stats.push_user_io_stats_by_ref(&user_stats);
r_wrapper_stats.push_user_io_stats(user_stats);
Expand All @@ -59,13 +57,13 @@ impl DirectFixedEscaper {
limit_config.shift_millis,
limit_config.max_south,
self.stats.clone() as _,
r_wrapper_stats.into_reader(),
Arc::new(r_wrapper_stats) as _,
);
let ups_w = LimitedWriter::new(
ups_w,
limit_config.shift_millis,
limit_config.max_north,
w_wrapper_stats.into_writer(),
Arc::new(w_wrapper_stats) as _,
);

let writer = DirectFixedHttpForwardWriter::new(ups_w, Some(Arc::clone(&self.stats)));
Expand Down Expand Up @@ -94,16 +92,16 @@ impl DirectFixedEscaper {
let (ups_r, ups_w) = tokio::io::split(tls_stream);

// add task and user stats
let mut wrapper_stats = HttpForwardRemoteStatsWrapper::new(task_stats);
let mut wrapper_stats = HttpForwardTaskRemoteWrapperStats::new(task_stats);
wrapper_stats.push_user_io_stats(self.fetch_user_upstream_io_stats(task_notes));
let (ups_r_stats, ups_w_stats) = wrapper_stats.into_pair();
let wrapper_stats = Arc::new(wrapper_stats);

let ups_r = LimitedBufReader::new_unlimited(
ups_r,
Arc::new(NilLimitedReaderStats::default()),
ups_r_stats,
wrapper_stats.clone() as _,
);
let ups_w = LimitedWriter::new_unlimited(ups_w, ups_w_stats);
let ups_w = LimitedWriter::new_unlimited(ups_w, wrapper_stats as _);

let writer = DirectFixedHttpForwardWriter::new(ups_w, None);
let reader = DirectFixedHttpForwardReader::new(ups_r);
Expand Down
8 changes: 4 additions & 4 deletions g3proxy/src/escape/direct_fixed/http_forward/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use g3_io_ext::LimitedBufReader;

use crate::auth::UserUpstreamTrafficStats;
use crate::module::http_forward::{
ArcHttpForwardTaskRemoteStats, HttpForwardRead, HttpForwardRemoteStatsWrapper,
HttpForwardTaskNotes,
ArcHttpForwardTaskRemoteStats, HttpForwardRead, HttpForwardTaskNotes,
HttpForwardTaskRemoteWrapperStats,
};

#[pin_project]
Expand Down Expand Up @@ -102,9 +102,9 @@ where
task_stats: &ArcHttpForwardTaskRemoteStats,
user_stats: Vec<Arc<UserUpstreamTrafficStats>>,
) {
let mut wrapper_stats = HttpForwardRemoteStatsWrapper::new(Arc::clone(task_stats));
let mut wrapper_stats = HttpForwardTaskRemoteWrapperStats::new(Arc::clone(task_stats));
wrapper_stats.push_user_io_stats(user_stats);
self.inner.reset_buffer_stats(wrapper_stats.into_reader());
self.inner.reset_buffer_stats(Arc::new(wrapper_stats) as _);
}

async fn recv_response_header<'a>(
Expand Down
70 changes: 0 additions & 70 deletions g3proxy/src/escape/direct_fixed/http_forward/stats.rs

This file was deleted.

Loading

0 comments on commit 49d5600

Please sign in to comment.