Skip to content

Commit

Permalink
feat: Improve streaming stability, error handling, and refactor track…
Browse files Browse the repository at this point in the history
… handling in cspot

- **Streaming Stability**:
  Addressed issues with continuous streaming, improving the handling of connection drops and reducing delays in reconnection. Improve playback continuity during network interruptions.

- **Error Handling**:
  Improved error handling, particularly around HTTPClient and other critical components. Introduced exception management to deal with connection errors and retry mechanisms for smoother recovery.

- **Track Handling**:
  Refactored track management logic to streamline the loading and processing of tracks, improving performance and reducing latency in track transitions.

- **Concurrency Improvements**:
  Added thread protection to shared resources, including wrapping log messages with a semaphore to avoid race conditions and data corruption.

- **Other Fixes**:
  - Updated the `getMacAddress` function to work on both WIN32 and ESP32 platforms. (Used for unique deviceID)
  - Fixed issues with FreeRTOS-to-pthread migration for improved compatibility.

---

**Additional Notes**:
- Some exceptions still occur, particularly with mbedtls, but they are logged and managed until a more permanent fix is available.
- Reconnection times in certain cases may still be suboptimal but are currently being handled with exponential backoff.
- The changes in track processing should improve the overall flow, reducing glitches during streaming.
  • Loading branch information
tobiasguyer committed Nov 6, 2024
1 parent ea556ec commit 07be112
Show file tree
Hide file tree
Showing 31 changed files with 920 additions and 574 deletions.
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "cspot/bell"]
path = cspot/bell
url = https://github.com/feelfreelinux/bell
branch = develop
url = https://github.com/tobiasguyer/bell
branch = master
19 changes: 16 additions & 3 deletions cspot/include/CDNAudioFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,21 @@ class CDNAudioFile {
public:
CDNAudioFile(const std::string& cdnUrl, const std::vector<uint8_t>& audioKey);

#ifndef CONFIG_BELL_NOCODEC
/**
* @brief Opens connection to the provided cdn url, and fetches track metadata.
*/
void openStream();

#else
/**
* @brief Opens connection to the provided cdn url, and fetches track header.
*
* @param header_size
*
* @returns char* where to read data from
*/
uint8_t* openStream(size_t&);
#endif
/**
* @brief Read and decrypt part of the cdn stream
*
Expand All @@ -34,7 +44,7 @@ class CDNAudioFile {
*
* @returns amount of bytes read
*/
size_t readBytes(uint8_t* dst, size_t bytes);
long readBytes(uint8_t* dst, size_t bytes);

/**
* @brief Returns current position in CDN stream
Expand All @@ -52,6 +62,8 @@ class CDNAudioFile {
*/
void seek(size_t position);

long getHeader();

private:
const int OPUS_HEADER_SIZE = 8 * 1024;
const int OPUS_FOOTER_PREFFERED = 1024 * 12; // 12K should be safe
Expand All @@ -60,10 +72,11 @@ class CDNAudioFile {
const int HTTP_BUFFER_SIZE = 1024 * 14;
const int SPOTIFY_OPUS_HEADER = 167;

#ifndef CONFIG_BELL_NOCODEC
// Used to store opus metadata, speeds up read
std::vector<uint8_t> header = std::vector<uint8_t>(OPUS_HEADER_SIZE);
std::vector<uint8_t> footer;

#endif
// General purpose buffer to read data
std::vector<uint8_t> httpBuffer = std::vector<uint8_t>(HTTP_BUFFER_SIZE);

Expand Down
20 changes: 19 additions & 1 deletion cspot/include/CSpotContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@
#include "nlohmann/json_fwd.hpp" // for json
#endif

#ifdef ESP_PLATFORM
#include "freertos/FreeRTOS.h"
#include "freertos/event_groups.h"
#define WIFI_CONNECTED_BIT BIT0
#define WIFI_FAIL_BIT BIT1
#define CSPOT_CONNECTED_BIT BIT2
#endif

namespace cspot {
struct Context {
struct ConfigState {
Expand All @@ -29,6 +37,10 @@ struct Context {
std::vector<uint8_t> authData;
int volume;

#ifdef ESP_PLATFORM
EventGroupHandle_t s_cspot_event_group;
#endif

std::string username;
std::string countryCode;
};
Expand Down Expand Up @@ -67,12 +79,18 @@ struct Context {
#endif
}

void lost_connection(void*) {
//if(!connection)
}

static std::shared_ptr<Context> createFromBlob(
std::shared_ptr<LoginBlob> blob) {
auto ctx = std::make_shared<Context>();
ctx->timeProvider = std::make_shared<TimeProvider>();
ctx->rng = std::default_random_engine{ctx->rd()};

#ifdef ESP_PLATFORM
//s_cspot_event_group = xEventGroupCreate();
#endif
ctx->session = std::make_shared<MercurySession>(ctx->timeProvider);
ctx->playbackMetrics = std::make_shared<PlaybackMetrics>(ctx);
ctx->config.deviceId = blob->getDeviceId();
Expand Down
3 changes: 3 additions & 0 deletions cspot/include/DeviceStateHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class DeviceStateHandler {

void putDeviceState(PutStateReason member_type =
PutStateReason::PutStateReason_PLAYER_STATE_CHANGED);

void setDeviceState(PutStateReason put_state_reason);
void putPlayerState(PutStateReason member_type =
PutStateReason::PutStateReason_PLAYER_STATE_CHANGED);
void handleConnectState();
Expand Down Expand Up @@ -106,5 +108,6 @@ class DeviceStateHandler {
}

static void reloadTrackList(void*);
std::atomic<bool> resolvingContext = false;
};
} // namespace cspot
2 changes: 1 addition & 1 deletion cspot/include/EventManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class TrackMetrics {
std::shared_ptr<cspot::Context> ctx;
std::vector<std::pair<uint64_t, uint64_t>> addInterval(
std::vector<std::pair<uint64_t, uint64_t>>& intervals,
std::pair<std::uint64_t, std::uint64_t> newInterval) {
std::pair<uint64_t, uint64_t> newInterval) {
// Add the new interval to the list of intervals
intervals.push_back(newInterval);

Expand Down
37 changes: 25 additions & 12 deletions cspot/include/MercurySession.h
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
#pragma once

#include <atomic> // for atomic
#include <cstdint> // for uint8_t, uint64_t, uint32_t
#include <atomic> // for atomic
#include <condition_variable>
#include <cstdint> // for uint8_t, uint64_t, uint32_t
#include <deque>
#include <functional> // for function
#include <memory> // for shared_ptr
#include <mutex> // for mutex
#include <string> // for string
#include <unordered_map> // for unordered_map
#include <vector> // for vector
#include "pb_decode.h"

#include "BellTask.h" // for Task
#include "Packet.h" // for Packet
#include "Queue.h" // for Queue
#include "Session.h" // for Session
#include "protobuf/mercury.pb.h" // for Header

namespace bell {
class WrappedSemaphore;
};

namespace cspot {
class TimeProvider;

Expand All @@ -25,12 +31,12 @@ class MercurySession : public bell::Task, public cspot::Session {
typedef std::vector<std::vector<uint8_t>> DataParts;

struct Response {
Header mercuryHeader;
Header mercuryHeader = Header_init_default;
DataParts parts;
int64_t sequenceId;
uint64_t sequenceId;
bool fail = true;
};
typedef std::function<void(Response&)> ResponseCallback;
typedef std::function<void(const Response)> ResponseCallback;
typedef std::function<void(bool, const std::vector<uint8_t>&)>
AudioKeyCallback;
typedef std::function<void()> ConnectionEstabilishedCallback;
Expand Down Expand Up @@ -105,6 +111,8 @@ class MercurySession : public bell::Task, public cspot::Session {
void setConnectedHandler(ConnectionEstabilishedCallback callback);

bool triggerTimeout() override;
bool isReconnecting = false;
void reconnect();

private:
const int PING_TIMEOUT_MS = 2 * 60 * 1000 + 5000;
Expand All @@ -113,15 +121,14 @@ class MercurySession : public bell::Task, public cspot::Session {
Header tempMercuryHeader = {};
ConnectionEstabilishedCallback connectionReadyCallback = nullptr;

bell::Queue<cspot::Packet> packetQueue;
std::deque<cspot::Packet> packetQueue;

void runTask() override;
void reconnect();

std::unordered_map<int64_t, ResponseCallback> callbacks;
std::unordered_map<uint64_t, ResponseCallback> callbacks;
std::deque<Response> partials;
std::unordered_map<std::string, ResponseCallback> subscriptions;
std::unordered_map<uint32_t, AudioKeyCallback> audioKeyCallbacks;
std::shared_ptr<bell::WrappedSemaphore> responseSemaphore;

uint64_t sequenceId = 1;
uint32_t audioKeySequence = 1;
Expand All @@ -130,14 +137,20 @@ class MercurySession : public bell::Task, public cspot::Session {
unsigned long long lastPingTimestamp = -1;
std::string countryCode = "";

std::mutex queueMutex;
std::mutex isRunningMutex;
std::condition_variable queueCV; // For synchronization with waits
std::atomic<bool> isRunning = false;
std::atomic<bool> isReconnecting = false;
std::atomic<bool> executeEstabilishedCallback = false;
std::atomic<bool> connection_lost = false;

void failAllPending();

void handleReconnection();
bool processPackets();
MercurySession::Response decodeResponse(const std::vector<uint8_t>& data);
std::vector<uint8_t> prepareSequenceIdPayload(
uint64_t sequenceId, const std::vector<uint8_t>& headerBytes,
const DataParts& payload);
};
} // namespace cspot
} // namespace cspot
7 changes: 5 additions & 2 deletions cspot/include/PlayerContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include "nlohmann/json_fwd.hpp" // for json

#include "protobuf/metadata.pb.h" // for Track, _Track, AudioFile, Episode
#define MAX_TRACKS 80
#define MAX_TRACKS 100
namespace cspot {
struct PlayerContext {
PlayerContext(std::shared_ptr<cspot::Context> ctx, PlayerState* playerState,
Expand All @@ -26,9 +26,12 @@ struct PlayerContext {
this->tracks = tracks;
this->index = index;
}
void resolveRadio(
void autoplayQuery(
std::vector<std::pair<std::string, std::string>> metadata_map,
void (*responseFunction)(void*), bool secondTry = false);
void resolveRadio(
std::vector<std::pair<std::string, std::string>> metadata_map,
void (*responseFunction)(void*), char*);

void resolveTracklist(
std::vector<std::pair<std::string, std::string>> metadata_map,
Expand Down
2 changes: 1 addition & 1 deletion cspot/include/Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class Session {
std::shared_ptr<cspot::PlainConnection> conn;
std::shared_ptr<LoginBlob> authBlob;

std::string deviceId = "142137fd329622137a14901634264e6f332e2411";
//std::string deviceId = "142137fd329622137a14901634264e6f332e2411";

public:
Session();
Expand Down
22 changes: 8 additions & 14 deletions cspot/include/TrackPlayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,16 @@ class TrackPlayer : bell::Task {
public:
// Callback types
typedef std::function<void(std::shared_ptr<QueuedTrack>, bool)>
TrackLoadedCallback;
typedef std::function<size_t(uint8_t*, size_t, size_t

#ifdef CONFIG_BELL_NOCODEC
,
bool
#endif
)>
DataCallback;
typedef std::function<void(bool)> EOFCallback;
TrackChangedCallback;
typedef std::function<size_t(uint8_t*, size_t, size_t, bool)> DataCallback;
typedef std::function<void(bool)> TrackEndedCallback;
typedef std::function<size_t(size_t)> SeekableCallback;
EOFCallback eofCallback;
TrackEndedCallback onTrackEnd;

TrackPlayer(std::shared_ptr<cspot::Context> ctx,
std::shared_ptr<cspot::TrackQueue> trackQueue,
EOFCallback eofCallback, TrackLoadedCallback loadedCallback);
TrackEndedCallback onTrackEnd,
TrackChangedCallback onTrackChanged);
~TrackPlayer();

void loadTrackFromRef(TrackReference& ref, size_t playbackMs,
Expand Down Expand Up @@ -81,7 +75,7 @@ class TrackPlayer : bell::Task {

std::unique_ptr<bell::WrappedSemaphore> playbackSemaphore;

TrackLoadedCallback trackLoaded;
TrackChangedCallback onTrackChanged;
DataCallback dataCallback = nullptr;
#ifdef CONFIG_BELL_NOCODEC
SeekableCallback spaces_available = nullptr;
Expand All @@ -104,7 +98,7 @@ class TrackPlayer : bell::Task {
#ifndef CONFIG_BELL_NOCODEC
std::vector<uint8_t> pcmBuffer = std::vector<uint8_t>(1024);
#else
std::vector<uint8_t> pcmBuffer = std::vector<uint8_t>(128);
std::vector<uint8_t> pcmBuffer = std::vector<uint8_t>(1024);
#endif

bool autoStart = false;
Expand Down
2 changes: 2 additions & 0 deletions cspot/include/TrackQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ struct TrackInfo {
class QueuedTrack {
public:
QueuedTrack(ProvidedTrack& ref, std::shared_ptr<cspot::Context> ctx,
std::shared_ptr<bell::WrappedSemaphore> playableSemaphore,
int64_t requestedPosition = 0);
~QueuedTrack();

Expand Down Expand Up @@ -87,6 +88,7 @@ class QueuedTrack {

private:
std::shared_ptr<cspot::Context> ctx;
std::shared_ptr<bell::WrappedSemaphore> playableSemaphore;

uint64_t pendingMercuryRequest = 0;
uint32_t pendingAudioKeyRequest = 0;
Expand Down
19 changes: 16 additions & 3 deletions cspot/include/TrackReference.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include "pb_decode.h"
#include "protobuf/connect.pb.h"

#define TRACK_SEND_LIMIT 25
#define TRACK_SEND_LIMIT 15

namespace cspot {
struct TrackReference {
Expand Down Expand Up @@ -38,8 +38,6 @@ struct TrackReference {

Type type;

void decodeURI();

bool operator==(const TrackReference& other) const;

// Encodes list of track references into a pb structure, used by nanopb
Expand All @@ -55,6 +53,21 @@ struct TrackReference {
tracklist->clear();
}

static void deleteTracksInRange(std::vector<ProvidedTrack>* tracks,
size_t start, size_t end) {
// Sanity check for the range bounds
if (start >= tracks->size() || end >= tracks->size() || start > end) {
return; // Invalid range
}

// Release resources for each track in the specified range
for (size_t i = start; i <= end; ++i)
pbReleaseProvidedTrack(&tracks->at(i));

// Erase the range of tracks from the tracklist
tracks->erase(tracks->begin() + start, tracks->begin() + end + 1);
}

static void pbReleaseProvidedTrack(ProvidedTrack* track) {
if (track->metadata_count < track->full_metadata_count)
track->metadata_count = track->full_metadata_count;
Expand Down
Loading

0 comments on commit 07be112

Please sign in to comment.