From 4449072ab64b22cbbee4e0ff5362dd8b8fc60cb0 Mon Sep 17 00:00:00 2001 From: eladyn <59307989+eladyn@users.noreply.github.com> Date: Thu, 1 Jun 2023 21:39:35 +0200 Subject: [PATCH] add session timeout handling (#1129) --- CHANGELOG.md | 1 + core/src/mercury/mod.rs | 3 +-- core/src/session.rs | 59 ++++++++++++++++++++++++++++++++++------- core/src/version.rs | 2 +- src/main.rs | 21 ++++++++------- 5 files changed, 64 insertions(+), 22 deletions(-) mode change 100644 => 100755 core/src/session.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ceca30a8..0de0ad1fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,6 +87,7 @@ https://github.com/librespot-org/librespot - [core] Support downloading of lyrics - [core] Support parsing `SpotifyId` for local files - [core] Support parsing `SpotifyId` for named playlists +- [core] Add checks and handling for stale server connections. - [main] Add all player events to `player_event_handler.rs` - [main] Add an event worker thread that runs async to the main thread(s) but sync to itself to prevent potential data races for event consumers diff --git a/core/src/mercury/mod.rs b/core/src/mercury/mod.rs index 03bada3c3..7fde9b7fd 100644 --- a/core/src/mercury/mod.rs +++ b/core/src/mercury/mod.rs @@ -1,7 +1,6 @@ use std::{ collections::HashMap, future::Future, - mem, pin::Pin, task::{Context, Poll}, }; @@ -200,7 +199,7 @@ impl MercuryManager { for i in 0..count { let mut part = Self::parse_part(&mut data); - if let Some(mut partial) = mem::replace(&mut pending.partial, None) { + if let Some(mut partial) = pending.partial.take() { partial.extend_from_slice(&part); part = partial; } diff --git a/core/src/session.rs b/core/src/session.rs old mode 100644 new mode 100755 index de467a554..5f08b1341 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -6,7 +6,7 @@ use std::{ process::exit, sync::{Arc, Weak}, task::{Context, Poll}, - time::{SystemTime, UNIX_EPOCH}, + time::{Duration, SystemTime, UNIX_EPOCH}, }; use byteorder::{BigEndian, ByteOrder}; @@ -18,7 +18,7 @@ use once_cell::sync::OnceCell; use parking_lot::RwLock; use quick_xml::events::Event; use thiserror::Error; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, time::Instant}; use tokio_stream::wrappers::UnboundedReceiverStream; use crate::{ @@ -80,6 +80,7 @@ struct SessionData { time_delta: i64, invalid: bool, user_data: UserData, + last_ping: Option, } struct SessionInternal { @@ -100,6 +101,15 @@ struct SessionInternal { handle: tokio::runtime::Handle, } +/// A shared reference to a Spotify session. +/// +/// After instantiating, you need to login via [Session::connect]. +/// You can either implement the whole playback logic yourself by using +/// this structs interface directly or hand it to a +/// `Player`. +/// +/// *Note*: [Session] instances cannot yet be reused once invalidated. After +/// an unexpectedly closed connection, you'll need to create a new [Session]. #[derive(Clone)] pub struct Session(Arc); @@ -181,9 +191,10 @@ impl Session { .map(Ok) .forward(sink); let receiver_task = DispatchTask(stream, self.weak()); + let timeout_task = Session::session_timeout(self.weak()); tokio::spawn(async move { - let result = future::try_join(sender_task, receiver_task).await; + let result = future::try_join3(sender_task, receiver_task, timeout_task).await; if let Err(e) = result { error!("{}", e); @@ -231,6 +242,33 @@ impl Session { .get_or_init(|| TokenProvider::new(self.weak())) } + /// Returns an error, when we haven't received a ping for too long (2 minutes), + /// which means that we silently lost connection to Spotify servers. + async fn session_timeout(session: SessionWeak) -> io::Result<()> { + // pings are sent every 2 minutes and a 5 second margin should be fine + const SESSION_TIMEOUT: Duration = Duration::from_secs(125); + + while let Some(session) = session.try_upgrade() { + if session.is_invalid() { + break; + } + let last_ping = session.0.data.read().last_ping.unwrap_or_else(Instant::now); + if last_ping.elapsed() >= SESSION_TIMEOUT { + session.shutdown(); + // TODO: Optionally reconnect (with cached/last credentials?) + return Err(io::Error::new( + io::ErrorKind::TimedOut, + "session lost connection to server", + )); + } + // drop the strong reference before sleeping + drop(session); + // a potential timeout cannot occur at least until SESSION_TIMEOUT after the last_ping + tokio::time::sleep_until(last_ping + SESSION_TIMEOUT).await; + } + Ok(()) + } + pub fn time_delta(&self) -> i64 { self.0.data.read().time_delta } @@ -278,13 +316,16 @@ impl Session { match packet_type { Some(Ping) => { let server_timestamp = BigEndian::read_u32(data.as_ref()) as i64; - let timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) { - Ok(dur) => dur, - Err(err) => err.duration(), + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO) + .as_secs() as i64; + + { + let mut data = self.0.data.write(); + data.time_delta = server_timestamp.saturating_sub(timestamp); + data.last_ping = Some(Instant::now()); } - .as_secs() as i64; - - self.0.data.write().time_delta = server_timestamp - timestamp; self.debug_info(); self.send_packet(Pong, vec![0, 0, 0, 0]) diff --git a/core/src/version.rs b/core/src/version.rs index 28ff1c2dd..e5b4b0b02 100644 --- a/core/src/version.rs +++ b/core/src/version.rs @@ -1,4 +1,4 @@ -/// Version string of the form "librespot-" +/// Version string of the form "librespot-\" pub const VERSION_STRING: &str = concat!("librespot-", env!("VERGEN_GIT_SHA")); /// Generate a timestamp string representing the build date (UTC). diff --git a/src/main.rs b/src/main.rs index e4727ba8e..48edf1c95 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1662,7 +1662,7 @@ async fn main() { let mut connecting = false; let mut _event_handler: Option = None; - let session = Session::new(setup.session_config.clone(), setup.cache.clone()); + let mut session = Session::new(setup.session_config.clone(), setup.cache.clone()); if setup.enable_discovery { let device_id = setup.session_config.device_id.clone(); @@ -1721,6 +1721,10 @@ async fn main() { } }, _ = async {}, if connecting && last_credentials.is_some() => { + if session.is_invalid() { + session = Session::new(setup.session_config.clone(), setup.cache.clone()); + } + let mixer_config = setup.mixer_config.clone(); let mixer = (setup.mixer)(mixer_config); let player_config = setup.player_config.clone(); @@ -1770,15 +1774,12 @@ async fn main() { auto_connect_times.len() > RECONNECT_RATE_LIMIT }; - match last_credentials.clone() { - Some(_) if !reconnect_exceeds_rate_limit() => { - auto_connect_times.push(Instant::now()); - connecting = true; - }, - _ => { - error!("Spirc shut down too often. Not reconnecting automatically."); - exit(1); - }, + if last_credentials.is_some() && !reconnect_exceeds_rate_limit() { + auto_connect_times.push(Instant::now()); + connecting = true; + } else { + error!("Spirc shut down too often. Not reconnecting automatically."); + exit(1); } }, _ = tokio::signal::ctrl_c() => {