Skip to content

Commit

Permalink
Merge pull request #255 from whisperfish/tracing
Browse files Browse the repository at this point in the history
Add tracing
  • Loading branch information
rubdos authored Jan 9, 2024
2 parents daad1fc + bac7984 commit 036de0d
Show file tree
Hide file tree
Showing 22 changed files with 381 additions and 205 deletions.
4 changes: 2 additions & 2 deletions libsignal-service-actix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ actix-rt = "2.4"
mpart-async = "0.6"
serde_json = "1.0"
futures = "0.3"
tracing = "0.1"
tracing-futures = "0.2"
bytes = "1"
rustls = "0.21"
rustls-pemfile = "0.3"
url = "2.1"
serde = "1.0"
log = "0.4"
rand = "0.8"

thiserror = "1.0"
Expand All @@ -33,7 +34,6 @@ base64 = "0.13"
phonenumber = "0.3"

[dev-dependencies]
env_logger = "0.9"
image = { version = "0.23", default-features = false, features = ["png"] }
opener = "0.5"
qrcode = "0.12"
Expand Down
1 change: 0 additions & 1 deletion libsignal-service-actix/examples/registering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use structopt::StructOpt;

#[actix_rt::main]
async fn main() -> Result<(), Error> {
env_logger::init();
let client = "libsignal-service-hyper-example";
let use_voice = false;

Expand Down
58 changes: 39 additions & 19 deletions libsignal-service-actix/src/push_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use libsignal_service::{
websocket::SignalWebSocket,
};
use serde::{Deserialize, Serialize};
use tracing_futures::Instrument;

use crate::websocket::AwcWebSocket;

Expand Down Expand Up @@ -47,7 +48,7 @@ impl AwcPushService {
credentials_override: HttpAuthOverride,
) -> Result<ClientRequest, ServiceError> {
let url = self.cfg.base_url(endpoint).join(path.as_ref())?;
log::debug!("HTTP request {} {}", method, url);
tracing::debug!(%url, %method, "HTTP request");
let mut builder = self.client.request(method, url.as_str());
for &header in additional_headers {
builder = builder.insert_header(header);
Expand Down Expand Up @@ -85,6 +86,7 @@ impl AwcPushService {
})
}

#[tracing::instrument(name = "extracting error", skip(response))]
async fn from_response<S>(
response: &mut ClientResponse<S>,
) -> Result<(), ServiceError>
Expand All @@ -108,7 +110,8 @@ impl AwcPushService {
StatusCode::CONFLICT => {
let mismatched_devices =
response.json().await.map_err(|e| {
log::error!(
tracing::error!(
?response,
"Failed to decode HTTP 409 response: {}",
e
);
Expand All @@ -122,7 +125,11 @@ impl AwcPushService {
},
StatusCode::GONE => {
let stale_devices = response.json().await.map_err(|e| {
log::error!("Failed to decode HTTP 410 response: {}", e);
tracing::error!(
?response,
"Failed to decode HTTP 410 response: {}",
e
);
ServiceError::UnhandledResponseCode {
http_code: StatusCode::GONE.as_u16(),
}
Expand All @@ -131,7 +138,11 @@ impl AwcPushService {
},
StatusCode::PRECONDITION_REQUIRED => {
let proof_required = response.json().await.map_err(|e| {
log::error!("Failed to decode HTTP 428 response: {}", e);
tracing::error!(
?response,
"Failed to decode HTTP 428 response: {}",
e
);
ServiceError::UnhandledResponseCode {
http_code: StatusCode::PRECONDITION_REQUIRED.as_u16(),
}
Expand All @@ -141,7 +152,8 @@ impl AwcPushService {
// XXX: fill in rest from PushServiceSocket
code => {
let contents = response.body().await;
log::trace!(
tracing::trace!(
?response,
"Unhandled response {} with body: {:?}",
code.as_u16(),
contents,
Expand Down Expand Up @@ -191,7 +203,7 @@ impl PushService for AwcPushService {
},
})?;

log::debug!("AwcPushService::get response: {:?}", response);
let _span = tracing::debug_span!("processing response", ?response);

Self::from_response(&mut response).await?;

Expand All @@ -203,7 +215,7 @@ impl PushService for AwcPushService {
// actix already imports that anyway.
let text = match response.body().await {
Ok(text) => {
log::debug!(
tracing::debug!(
"GET response: {:?}",
String::from_utf8_lossy(&text)
);
Expand Down Expand Up @@ -249,7 +261,8 @@ impl PushService for AwcPushService {
},
})?;

log::debug!("AwcPushService::delete response: {:?}", response);
let _span =
tracing::debug_span!("processing response", ?response).entered();

Self::from_response(&mut response).await?;

Expand All @@ -261,7 +274,7 @@ impl PushService for AwcPushService {
// actix already imports that anyway.
let text = match response.body().await {
Ok(text) => {
log::debug!(
tracing::debug!(
"GET response: {:?}",
String::from_utf8_lossy(&text)
);
Expand Down Expand Up @@ -303,7 +316,8 @@ impl PushService for AwcPushService {
reason: e.to_string(),
})?;

log::debug!("AwcPushService::put response: {:?}", response);
let _span =
tracing::debug_span!("processing response", ?response).entered();

Self::from_response(&mut response).await?;

Expand All @@ -315,7 +329,7 @@ impl PushService for AwcPushService {
// actix already imports that anyway.
let text = match response.body().await {
Ok(text) => {
log::debug!(
tracing::debug!(
"GET response: {:?}",
String::from_utf8_lossy(&text)
);
Expand Down Expand Up @@ -356,7 +370,8 @@ impl PushService for AwcPushService {
reason: e.to_string(),
})?;

log::debug!("AwcPushService::patch response: {:?}", response);
let _span =
tracing::debug_span!("processing response", ?response).entered();

Self::from_response(&mut response).await?;

Expand All @@ -368,7 +383,7 @@ impl PushService for AwcPushService {
// actix already imports that anyway.
let text = match response.body().await {
Ok(text) => {
log::debug!(
tracing::debug!(
"PATCH response: {:?}",
String::from_utf8_lossy(&text)
);
Expand Down Expand Up @@ -409,7 +424,8 @@ impl PushService for AwcPushService {
reason: e.to_string(),
})?;

log::debug!("AwcPushService::post response: {:?}", response);
let _span =
tracing::debug_span!("processing response", ?response).entered();

Self::from_response(&mut response).await?;

Expand All @@ -421,7 +437,7 @@ impl PushService for AwcPushService {
// actix already imports that anyway.
let text = match response.body().await {
Ok(text) => {
log::debug!(
tracing::debug!(
"GET response: {:?}",
String::from_utf8_lossy(&text)
);
Expand Down Expand Up @@ -527,7 +543,8 @@ impl PushService for AwcPushService {
reason: e.to_string(),
})?;

log::debug!("AwcPushService::get_stream response: {:?}", response);
let _span =
tracing::debug_span!("processing response", ?response).entered();

Self::from_response(&mut response).await?;

Expand Down Expand Up @@ -600,7 +617,7 @@ impl PushService for AwcPushService {
// Unwrap, because no error type was used above
body_contents.extend(b.unwrap());
}
log::trace!(
tracing::trace!(
"Sending PUT with Content-Type={} and length {}",
content_type,
body_contents.len()
Expand All @@ -615,7 +632,8 @@ impl PushService for AwcPushService {
reason: e.to_string(),
})?;

log::debug!("AwcPushService::put response: {:?}", response);
let _span =
tracing::debug_span!("processing response", ?response).entered();

Self::from_response(&mut response).await?;

Expand All @@ -629,20 +647,22 @@ impl PushService for AwcPushService {
additional_headers: &[(&str, &str)],
credentials: Option<ServiceCredentials>,
) -> Result<SignalWebSocket, ServiceError> {
let span = tracing::debug_span!("websocket");
let (ws, stream) = AwcWebSocket::with_client(
&mut self.client,
self.cfg.base_url(Endpoint::Service),
path,
additional_headers,
credentials.as_ref(),
)
.instrument(span.clone())
.await?;
let (ws, task) = SignalWebSocket::from_socket(
ws,
stream,
keep_alive_path.to_owned(),
);
actix_rt::spawn(task);
actix_rt::spawn(task.instrument(span));
Ok(ws)
}
}
Expand Down
28 changes: 17 additions & 11 deletions libsignal-service-actix/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,17 @@ where
futures::pin_mut!(tick);
futures::select! {
_ = tick => {
log::trace!("Triggering keep-alive");
tracing::trace!("Triggering keep-alive");
if let Err(e) = incoming_sink.send(WebSocketStreamItem::KeepAliveRequest).await {
log::info!("Websocket sink has closed: {:?}.", e);
tracing::info!("Websocket sink has closed: {:?}.", e);
break;
};
},
frame = socket_stream.next() => {
let frame = if let Some(frame) = frame {
frame
} else {
log::info!("process: Socket stream ended");
tracing::info!("process: Socket stream ended");
break;
};

Expand All @@ -105,32 +105,32 @@ where

Frame::Continuation(_c) => todo!(),
Frame::Ping(msg) => {
log::warn!("Received Ping({:?})", msg);
tracing::warn!(?msg, "received Ping");

continue;
},
Frame::Pong(msg) => {
log::trace!("Received Pong({:?})", msg);
tracing::trace!(?msg, "received Pong");

continue;
},
Frame::Text(frame) => {
log::warn!("Frame::Text {:?}", frame);
tracing::warn!(?frame, "frame::Text",);

// this is a protocol violation, maybe break; is better?
continue;
},

Frame::Close(c) => {
log::warn!("Websocket closing: {:?}", c);
tracing::warn!(?c, "Websocket closing");

break;
},
};

// Match SendError
if let Err(e) = incoming_sink.send(WebSocketStreamItem::Message(frame)).await {
log::info!("Websocket sink has closed: {:?}.", e);
tracing::info!("Websocket sink has closed: {:?}.", e);
break;
}
},
Expand Down Expand Up @@ -160,14 +160,20 @@ impl AwcWebSocket {
);
}

log::trace!("Will start websocket at {:?}", url);
tracing::trace!(
url.scheme = url.scheme(),
url.host = ?url.host(),
url.path = url.path(),
url.has_query = ?url.query().is_some(),
"starting websocket",
);
let mut ws = client.ws(url.as_str());
for (key, value) in additional_headers {
ws = ws.header(*key, *value);
}
let (response, framed) = ws.connect().await?;

log::debug!("WebSocket connected: {:?}", response);
tracing::debug!(?response, "WebSocket connected");

let (incoming_sink, incoming_stream) = channel(5);

Expand All @@ -179,7 +185,7 @@ impl AwcWebSocket {
actix_rt::spawn(processing_task.map(|v| match v {
Ok(()) => (),
Err(e) => {
log::warn!("Processing task terminated with error: {:?}", e)
tracing::warn!("Processing task terminated with error: {:?}", e)
},
}));

Expand Down
3 changes: 2 additions & 1 deletion libsignal-service-hyper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ libsignal-service = { path = "../libsignal-service" }
async-trait = "0.1"
bytes = "1.0"
futures = "0.3"
log = "0.4"
tracing = "0.1"
tracing-futures = "0.2"
mpart-async = "0.6"
serde = "1.0"
serde_json = "1.0"
Expand Down
Loading

0 comments on commit 036de0d

Please sign in to comment.