Skip to content

Commit

Permalink
add session timeout handling (librespot-org#1129)
Browse files Browse the repository at this point in the history
  • Loading branch information
eladyn authored and RoachxD committed Jun 22, 2023
1 parent 1320e67 commit 4449072
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ https://github.com/librespot-org/librespot
- [core] Support downloading of lyrics
- [core] Support parsing `SpotifyId` for local files
- [core] Support parsing `SpotifyId` for named playlists
- [core] Add checks and handling for stale server connections.
- [main] Add all player events to `player_event_handler.rs`
- [main] Add an event worker thread that runs async to the main thread(s) but
sync to itself to prevent potential data races for event consumers
Expand Down
3 changes: 1 addition & 2 deletions core/src/mercury/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{
collections::HashMap,
future::Future,
mem,
pin::Pin,
task::{Context, Poll},
};
Expand Down Expand Up @@ -200,7 +199,7 @@ impl MercuryManager {

for i in 0..count {
let mut part = Self::parse_part(&mut data);
if let Some(mut partial) = mem::replace(&mut pending.partial, None) {
if let Some(mut partial) = pending.partial.take() {
partial.extend_from_slice(&part);
part = partial;
}
Expand Down
59 changes: 50 additions & 9 deletions core/src/session.rs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
process::exit,
sync::{Arc, Weak},
task::{Context, Poll},
time::{SystemTime, UNIX_EPOCH},
time::{Duration, SystemTime, UNIX_EPOCH},
};

use byteorder::{BigEndian, ByteOrder};
Expand All @@ -18,7 +18,7 @@ use once_cell::sync::OnceCell;
use parking_lot::RwLock;
use quick_xml::events::Event;
use thiserror::Error;
use tokio::sync::mpsc;
use tokio::{sync::mpsc, time::Instant};
use tokio_stream::wrappers::UnboundedReceiverStream;

use crate::{
Expand Down Expand Up @@ -80,6 +80,7 @@ struct SessionData {
time_delta: i64,
invalid: bool,
user_data: UserData,
last_ping: Option<Instant>,
}

struct SessionInternal {
Expand All @@ -100,6 +101,15 @@ struct SessionInternal {
handle: tokio::runtime::Handle,
}

/// A shared reference to a Spotify session.
///
/// After instantiating, you need to login via [Session::connect].
/// You can either implement the whole playback logic yourself by using
/// this structs interface directly or hand it to a
/// `Player`.
///
/// *Note*: [Session] instances cannot yet be reused once invalidated. After
/// an unexpectedly closed connection, you'll need to create a new [Session].
#[derive(Clone)]
pub struct Session(Arc<SessionInternal>);

Expand Down Expand Up @@ -181,9 +191,10 @@ impl Session {
.map(Ok)
.forward(sink);
let receiver_task = DispatchTask(stream, self.weak());
let timeout_task = Session::session_timeout(self.weak());

tokio::spawn(async move {
let result = future::try_join(sender_task, receiver_task).await;
let result = future::try_join3(sender_task, receiver_task, timeout_task).await;

if let Err(e) = result {
error!("{}", e);
Expand Down Expand Up @@ -231,6 +242,33 @@ impl Session {
.get_or_init(|| TokenProvider::new(self.weak()))
}

/// Returns an error, when we haven't received a ping for too long (2 minutes),
/// which means that we silently lost connection to Spotify servers.
async fn session_timeout(session: SessionWeak) -> io::Result<()> {
// pings are sent every 2 minutes and a 5 second margin should be fine
const SESSION_TIMEOUT: Duration = Duration::from_secs(125);

while let Some(session) = session.try_upgrade() {
if session.is_invalid() {
break;
}
let last_ping = session.0.data.read().last_ping.unwrap_or_else(Instant::now);
if last_ping.elapsed() >= SESSION_TIMEOUT {
session.shutdown();
// TODO: Optionally reconnect (with cached/last credentials?)
return Err(io::Error::new(
io::ErrorKind::TimedOut,
"session lost connection to server",
));
}
// drop the strong reference before sleeping
drop(session);
// a potential timeout cannot occur at least until SESSION_TIMEOUT after the last_ping
tokio::time::sleep_until(last_ping + SESSION_TIMEOUT).await;
}
Ok(())
}

pub fn time_delta(&self) -> i64 {
self.0.data.read().time_delta
}
Expand Down Expand Up @@ -278,13 +316,16 @@ impl Session {
match packet_type {
Some(Ping) => {
let server_timestamp = BigEndian::read_u32(data.as_ref()) as i64;
let timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(dur) => dur,
Err(err) => err.duration(),
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs() as i64;

{
let mut data = self.0.data.write();
data.time_delta = server_timestamp.saturating_sub(timestamp);
data.last_ping = Some(Instant::now());
}
.as_secs() as i64;

self.0.data.write().time_delta = server_timestamp - timestamp;

self.debug_info();
self.send_packet(Pong, vec![0, 0, 0, 0])
Expand Down
2 changes: 1 addition & 1 deletion core/src/version.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/// Version string of the form "librespot-<sha>"
/// Version string of the form "librespot-\<sha\>"
pub const VERSION_STRING: &str = concat!("librespot-", env!("VERGEN_GIT_SHA"));

/// Generate a timestamp string representing the build date (UTC).
Expand Down
21 changes: 11 additions & 10 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1662,7 +1662,7 @@ async fn main() {
let mut connecting = false;
let mut _event_handler: Option<EventHandler> = None;

let session = Session::new(setup.session_config.clone(), setup.cache.clone());
let mut session = Session::new(setup.session_config.clone(), setup.cache.clone());

if setup.enable_discovery {
let device_id = setup.session_config.device_id.clone();
Expand Down Expand Up @@ -1721,6 +1721,10 @@ async fn main() {
}
},
_ = async {}, if connecting && last_credentials.is_some() => {
if session.is_invalid() {
session = Session::new(setup.session_config.clone(), setup.cache.clone());
}

let mixer_config = setup.mixer_config.clone();
let mixer = (setup.mixer)(mixer_config);
let player_config = setup.player_config.clone();
Expand Down Expand Up @@ -1770,15 +1774,12 @@ async fn main() {
auto_connect_times.len() > RECONNECT_RATE_LIMIT
};

match last_credentials.clone() {
Some(_) if !reconnect_exceeds_rate_limit() => {
auto_connect_times.push(Instant::now());
connecting = true;
},
_ => {
error!("Spirc shut down too often. Not reconnecting automatically.");
exit(1);
},
if last_credentials.is_some() && !reconnect_exceeds_rate_limit() {
auto_connect_times.push(Instant::now());
connecting = true;
} else {
error!("Spirc shut down too often. Not reconnecting automatically.");
exit(1);
}
},
_ = tokio::signal::ctrl_c() => {
Expand Down

0 comments on commit 4449072

Please sign in to comment.