Skip to content

Commit

Permalink
move session timeout handling to Spirc
Browse files Browse the repository at this point in the history
  • Loading branch information
eladyn committed Mar 22, 2023
1 parent ae369c8 commit 6ef29ae
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 47 deletions.
4 changes: 4 additions & 0 deletions connect/src/spirc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,10 @@ impl SpircTask {
let commands = self.commands.as_mut();
let player_events = self.player_events.as_mut();
tokio::select! {
() = self.session.session_timeout() => {
error!("lost connection to server (session timeout)");
break;
}
remote_update = self.remote_update.next() => match remote_update {
Some(result) => match result {
Ok((username, frame)) => {
Expand Down
69 changes: 22 additions & 47 deletions core/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use futures_core::TryStream;
use futures_util::{future, ready, StreamExt, TryStreamExt};
use num_traits::FromPrimitive;
use once_cell::sync::OnceCell;
use parking_lot::{Mutex, RwLock};
use parking_lot::RwLock;
use quick_xml::events::Event;
use thiserror::Error;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio::{sync::mpsc, time::Instant};
use tokio_stream::wrappers::UnboundedReceiverStream;

use crate::{
Expand Down Expand Up @@ -80,6 +80,7 @@ struct SessionData {
time_delta: i64,
invalid: bool,
user_data: UserData,
last_ping: Option<Instant>,
}

struct SessionInternal {
Expand All @@ -95,7 +96,6 @@ struct SessionInternal {
mercury: OnceCell<MercuryManager>,
spclient: OnceCell<SpClient>,
token_provider: OnceCell<TokenProvider>,
session_timeout: OnceCell<SessionTimeout>,
cache: Option<Arc<Cache>>,

handle: tokio::runtime::Handle,
Expand Down Expand Up @@ -127,7 +127,6 @@ impl Session {
mercury: OnceCell::new(),
spclient: OnceCell::new(),
token_provider: OnceCell::new(),
session_timeout: OnceCell::new(),
handle: tokio::runtime::Handle::current(),
}))
}
Expand Down Expand Up @@ -233,10 +232,20 @@ impl Session {
.get_or_init(|| TokenProvider::new(self.weak()))
}

fn session_timeout(&self) -> &SessionTimeout {
self.0
.session_timeout
.get_or_init(|| SessionTimeout::new(self))
/// Returns, when we haven't received a ping for too long, which means
/// that we silently lost connection to Spotify servers.
pub async fn session_timeout(&self) {
// pings are sent every 2 minutes and a 5 second margin should be fine
const SESSION_TIMEOUT: Duration = Duration::from_secs(125);

loop {
let last_ping = self.0.data.read().last_ping.unwrap_or_else(Instant::now);
if last_ping.elapsed() >= SESSION_TIMEOUT {
break;
}
// a potential timeout cannot occur at least until SESSION_TIMEOUT after the last_ping
tokio::time::sleep_until(last_ping + SESSION_TIMEOUT).await;
}
}

pub fn time_delta(&self) -> i64 {
Expand Down Expand Up @@ -292,8 +301,11 @@ impl Session {
}
.as_secs() as i64;

self.0.data.write().time_delta = server_timestamp - timestamp;
self.session_timeout().reset(self);
{
let mut data = self.0.data.write();
data.time_delta = server_timestamp - timestamp;
data.last_ping = Some(Instant::now());
}

self.debug_info();
self.send_packet(Pong, vec![0, 0, 0, 0])
Expand Down Expand Up @@ -522,43 +534,6 @@ impl Drop for SessionInternal {
}
}

struct SessionTimeout(Mutex<JoinHandle<()>>);

// pings are sent every 2 minutes and a 5 second margin should be fine
const SESSION_TIMEOUT: Duration = Duration::from_secs(125);

impl SessionTimeout {
fn spawn_timeout(session: &Session) -> JoinHandle<()> {
session.0.handle.spawn({
let session = session.weak();
async move {
tokio::time::sleep(SESSION_TIMEOUT).await;
if let Some(session) = session.try_upgrade() {
error!("connection to server timed out");
session.shutdown();
}
}
})
}

pub fn new(session: &Session) -> Self {
Self(Mutex::new(Self::spawn_timeout(session)))
}

pub fn reset(&self, session: &Session) {
debug!("resetting session timeout");
let mut task = self.0.lock();
task.abort();
*task = Self::spawn_timeout(session);
}
}

impl Drop for SessionTimeout {
fn drop(&mut self) {
self.0.lock().abort()
}
}

struct DispatchTask<S>(S, SessionWeak)
where
S: TryStream<Ok = (u8, Bytes)> + Unpin;
Expand Down

0 comments on commit 6ef29ae

Please sign in to comment.