From 07be112bcfd50ee055b04586b8809c0b0756358d Mon Sep 17 00:00:00 2001 From: Tobias Guyer Date: Wed, 6 Nov 2024 14:52:46 +0100 Subject: [PATCH] feat: Improve streaming stability, error handling, and refactor track handling in cspot - **Streaming Stability**: Addressed issues with continuous streaming, improving the handling of connection drops and reducing delays in reconnection. Improve playback continuity during network interruptions. - **Error Handling**: Improved error handling, particularly around HTTPClient and other critical components. Introduced exception management to deal with connection errors and retry mechanisms for smoother recovery. - **Track Handling**: Refactored track management logic to streamline the loading and processing of tracks, improving performance and reducing latency in track transitions. - **Concurrency Improvements**: Added thread protection to shared resources, including wrapping log messages with a semaphore to avoid race conditions and data corruption. - **Other Fixes**: - Updated the `getMacAddress` function to work on both WIN32 and ESP32 platforms. (Used for unique deviceID) - Fixed issues with FreeRTOS-to-pthread migration for improved compatibility. --- **Additional Notes**: - Some exceptions still occur, particularly with mbedtls, but they are logged and managed until a more permanent fix is available. - Reconnection times in certain cases may still be suboptimal but are currently being handled with exponential backoff. - The changes in track processing should improve the overall flow, reducing glitches during streaming. --- .gitmodules | 4 +- cspot/include/CDNAudioFile.h | 19 +- cspot/include/CSpotContext.h | 20 +- cspot/include/DeviceStateHandler.h | 3 + cspot/include/EventManager.h | 2 +- cspot/include/MercurySession.h | 37 ++- cspot/include/PlayerContext.h | 7 +- cspot/include/Session.h | 2 +- cspot/include/TrackPlayer.h | 22 +- cspot/include/TrackQueue.h | 2 + cspot/include/TrackReference.h | 19 +- cspot/src/CDNAudioFile.cpp | 79 ++++- cspot/src/DeviceStateHandler.cpp | 154 +++++---- cspot/src/EventManager.cpp | 6 +- cspot/src/LoginBlob.cpp | 5 +- cspot/src/MercurySession.cpp | 266 +++++++++------ cspot/src/PlayerContext.cpp | 304 ++++++++++-------- cspot/src/TrackPlayer.cpp | 100 +++--- cspot/src/TrackQueue.cpp | 41 ++- cspot/src/TrackReference.cpp | 11 - targets/cli/CliPlayer.cpp | 16 +- targets/esp32/CMakeLists.txt | 22 +- .../esp32/components/VS1053/CMakeLists.txt | 12 +- .../esp32/components/VS1053/include/VS1053.h | 5 +- .../esp32/components/VS1053/src/VS1053.cpp | 86 ++--- targets/esp32/main/CMakeLists.txt | 35 +- targets/esp32/main/EspPlayer.cpp | 17 +- targets/esp32/main/VSPlayer.cpp | 32 +- targets/esp32/main/VSPlayer.h | 1 + targets/esp32/main/VSinit.h | 15 +- targets/esp32/main/main.cpp | 150 +++++++-- 31 files changed, 920 insertions(+), 574 deletions(-) diff --git a/.gitmodules b/.gitmodules index f4a9c7e1..da9339d7 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "cspot/bell"] path = cspot/bell - url = https://github.com/feelfreelinux/bell - branch = develop + url = https://github.com/tobiasguyer/bell + branch = master diff --git a/cspot/include/CDNAudioFile.h b/cspot/include/CDNAudioFile.h index 90cf8286..19f3001b 100644 --- a/cspot/include/CDNAudioFile.h +++ b/cspot/include/CDNAudioFile.h @@ -21,11 +21,21 @@ class CDNAudioFile { public: CDNAudioFile(const std::string& cdnUrl, const std::vector& audioKey); +#ifndef CONFIG_BELL_NOCODEC /** * @brief Opens connection to the provided cdn url, and fetches track metadata. */ void openStream(); - +#else + /** + * @brief Opens connection to the provided cdn url, and fetches track header. + * + * @param header_size + * + * @returns char* where to read data from + */ + uint8_t* openStream(size_t&); +#endif /** * @brief Read and decrypt part of the cdn stream * @@ -34,7 +44,7 @@ class CDNAudioFile { * * @returns amount of bytes read */ - size_t readBytes(uint8_t* dst, size_t bytes); + long readBytes(uint8_t* dst, size_t bytes); /** * @brief Returns current position in CDN stream @@ -52,6 +62,8 @@ class CDNAudioFile { */ void seek(size_t position); + long getHeader(); + private: const int OPUS_HEADER_SIZE = 8 * 1024; const int OPUS_FOOTER_PREFFERED = 1024 * 12; // 12K should be safe @@ -60,10 +72,11 @@ class CDNAudioFile { const int HTTP_BUFFER_SIZE = 1024 * 14; const int SPOTIFY_OPUS_HEADER = 167; +#ifndef CONFIG_BELL_NOCODEC // Used to store opus metadata, speeds up read std::vector header = std::vector(OPUS_HEADER_SIZE); std::vector footer; - +#endif // General purpose buffer to read data std::vector httpBuffer = std::vector(HTTP_BUFFER_SIZE); diff --git a/cspot/include/CSpotContext.h b/cspot/include/CSpotContext.h index d39f7dc3..aab11ad2 100644 --- a/cspot/include/CSpotContext.h +++ b/cspot/include/CSpotContext.h @@ -19,6 +19,14 @@ #include "nlohmann/json_fwd.hpp" // for json #endif +#ifdef ESP_PLATFORM +#include "freertos/FreeRTOS.h" +#include "freertos/event_groups.h" +#define WIFI_CONNECTED_BIT BIT0 +#define WIFI_FAIL_BIT BIT1 +#define CSPOT_CONNECTED_BIT BIT2 +#endif + namespace cspot { struct Context { struct ConfigState { @@ -29,6 +37,10 @@ struct Context { std::vector authData; int volume; +#ifdef ESP_PLATFORM + EventGroupHandle_t s_cspot_event_group; +#endif + std::string username; std::string countryCode; }; @@ -67,12 +79,18 @@ struct Context { #endif } + void lost_connection(void*) { + //if(!connection) + } + static std::shared_ptr createFromBlob( std::shared_ptr blob) { auto ctx = std::make_shared(); ctx->timeProvider = std::make_shared(); ctx->rng = std::default_random_engine{ctx->rd()}; - +#ifdef ESP_PLATFORM + //s_cspot_event_group = xEventGroupCreate(); +#endif ctx->session = std::make_shared(ctx->timeProvider); ctx->playbackMetrics = std::make_shared(ctx); ctx->config.deviceId = blob->getDeviceId(); diff --git a/cspot/include/DeviceStateHandler.h b/cspot/include/DeviceStateHandler.h index 28b23dc6..e5cc124f 100644 --- a/cspot/include/DeviceStateHandler.h +++ b/cspot/include/DeviceStateHandler.h @@ -63,6 +63,8 @@ class DeviceStateHandler { void putDeviceState(PutStateReason member_type = PutStateReason::PutStateReason_PLAYER_STATE_CHANGED); + + void setDeviceState(PutStateReason put_state_reason); void putPlayerState(PutStateReason member_type = PutStateReason::PutStateReason_PLAYER_STATE_CHANGED); void handleConnectState(); @@ -106,5 +108,6 @@ class DeviceStateHandler { } static void reloadTrackList(void*); + std::atomic resolvingContext = false; }; } // namespace cspot \ No newline at end of file diff --git a/cspot/include/EventManager.h b/cspot/include/EventManager.h index 975c849c..9578045d 100644 --- a/cspot/include/EventManager.h +++ b/cspot/include/EventManager.h @@ -58,7 +58,7 @@ class TrackMetrics { std::shared_ptr ctx; std::vector> addInterval( std::vector>& intervals, - std::pair newInterval) { + std::pair newInterval) { // Add the new interval to the list of intervals intervals.push_back(newInterval); diff --git a/cspot/include/MercurySession.h b/cspot/include/MercurySession.h index 4359e0a0..d2644ccd 100644 --- a/cspot/include/MercurySession.h +++ b/cspot/include/MercurySession.h @@ -1,20 +1,26 @@ #pragma once -#include // for atomic -#include // for uint8_t, uint64_t, uint32_t +#include // for atomic +#include +#include // for uint8_t, uint64_t, uint32_t +#include #include // for function #include // for shared_ptr #include // for mutex #include // for string #include // for unordered_map #include // for vector +#include "pb_decode.h" #include "BellTask.h" // for Task #include "Packet.h" // for Packet -#include "Queue.h" // for Queue #include "Session.h" // for Session #include "protobuf/mercury.pb.h" // for Header +namespace bell { +class WrappedSemaphore; +}; + namespace cspot { class TimeProvider; @@ -25,12 +31,12 @@ class MercurySession : public bell::Task, public cspot::Session { typedef std::vector> DataParts; struct Response { - Header mercuryHeader; + Header mercuryHeader = Header_init_default; DataParts parts; - int64_t sequenceId; + uint64_t sequenceId; bool fail = true; }; - typedef std::function ResponseCallback; + typedef std::function ResponseCallback; typedef std::function&)> AudioKeyCallback; typedef std::function ConnectionEstabilishedCallback; @@ -105,6 +111,8 @@ class MercurySession : public bell::Task, public cspot::Session { void setConnectedHandler(ConnectionEstabilishedCallback callback); bool triggerTimeout() override; + bool isReconnecting = false; + void reconnect(); private: const int PING_TIMEOUT_MS = 2 * 60 * 1000 + 5000; @@ -113,15 +121,14 @@ class MercurySession : public bell::Task, public cspot::Session { Header tempMercuryHeader = {}; ConnectionEstabilishedCallback connectionReadyCallback = nullptr; - bell::Queue packetQueue; + std::deque packetQueue; void runTask() override; - void reconnect(); - - std::unordered_map callbacks; + std::unordered_map callbacks; std::deque partials; std::unordered_map subscriptions; std::unordered_map audioKeyCallbacks; + std::shared_ptr responseSemaphore; uint64_t sequenceId = 1; uint32_t audioKeySequence = 1; @@ -130,14 +137,20 @@ class MercurySession : public bell::Task, public cspot::Session { unsigned long long lastPingTimestamp = -1; std::string countryCode = ""; + std::mutex queueMutex; std::mutex isRunningMutex; + std::condition_variable queueCV; // For synchronization with waits std::atomic isRunning = false; - std::atomic isReconnecting = false; std::atomic executeEstabilishedCallback = false; std::atomic connection_lost = false; void failAllPending(); + void handleReconnection(); + bool processPackets(); MercurySession::Response decodeResponse(const std::vector& data); + std::vector prepareSequenceIdPayload( + uint64_t sequenceId, const std::vector& headerBytes, + const DataParts& payload); }; -} // namespace cspot +} // namespace cspot \ No newline at end of file diff --git a/cspot/include/PlayerContext.h b/cspot/include/PlayerContext.h index 1e330db9..733f34c7 100644 --- a/cspot/include/PlayerContext.h +++ b/cspot/include/PlayerContext.h @@ -16,7 +16,7 @@ #include "nlohmann/json_fwd.hpp" // for json #include "protobuf/metadata.pb.h" // for Track, _Track, AudioFile, Episode -#define MAX_TRACKS 80 +#define MAX_TRACKS 100 namespace cspot { struct PlayerContext { PlayerContext(std::shared_ptr ctx, PlayerState* playerState, @@ -26,9 +26,12 @@ struct PlayerContext { this->tracks = tracks; this->index = index; } - void resolveRadio( + void autoplayQuery( std::vector> metadata_map, void (*responseFunction)(void*), bool secondTry = false); + void resolveRadio( + std::vector> metadata_map, + void (*responseFunction)(void*), char*); void resolveTracklist( std::vector> metadata_map, diff --git a/cspot/include/Session.h b/cspot/include/Session.h index df7f03c8..4962e5b3 100644 --- a/cspot/include/Session.h +++ b/cspot/include/Session.h @@ -23,7 +23,7 @@ class Session { std::shared_ptr conn; std::shared_ptr authBlob; - std::string deviceId = "142137fd329622137a14901634264e6f332e2411"; + //std::string deviceId = "142137fd329622137a14901634264e6f332e2411"; public: Session(); diff --git a/cspot/include/TrackPlayer.h b/cspot/include/TrackPlayer.h index aa738ce9..6223f417 100644 --- a/cspot/include/TrackPlayer.h +++ b/cspot/include/TrackPlayer.h @@ -35,22 +35,16 @@ class TrackPlayer : bell::Task { public: // Callback types typedef std::function, bool)> - TrackLoadedCallback; - typedef std::function - DataCallback; - typedef std::function EOFCallback; + TrackChangedCallback; + typedef std::function DataCallback; + typedef std::function TrackEndedCallback; typedef std::function SeekableCallback; - EOFCallback eofCallback; + TrackEndedCallback onTrackEnd; TrackPlayer(std::shared_ptr ctx, std::shared_ptr trackQueue, - EOFCallback eofCallback, TrackLoadedCallback loadedCallback); + TrackEndedCallback onTrackEnd, + TrackChangedCallback onTrackChanged); ~TrackPlayer(); void loadTrackFromRef(TrackReference& ref, size_t playbackMs, @@ -81,7 +75,7 @@ class TrackPlayer : bell::Task { std::unique_ptr playbackSemaphore; - TrackLoadedCallback trackLoaded; + TrackChangedCallback onTrackChanged; DataCallback dataCallback = nullptr; #ifdef CONFIG_BELL_NOCODEC SeekableCallback spaces_available = nullptr; @@ -104,7 +98,7 @@ class TrackPlayer : bell::Task { #ifndef CONFIG_BELL_NOCODEC std::vector pcmBuffer = std::vector(1024); #else - std::vector pcmBuffer = std::vector(128); + std::vector pcmBuffer = std::vector(1024); #endif bool autoStart = false; diff --git a/cspot/include/TrackQueue.h b/cspot/include/TrackQueue.h index 513026ba..5ee64579 100644 --- a/cspot/include/TrackQueue.h +++ b/cspot/include/TrackQueue.h @@ -36,6 +36,7 @@ struct TrackInfo { class QueuedTrack { public: QueuedTrack(ProvidedTrack& ref, std::shared_ptr ctx, + std::shared_ptr playableSemaphore, int64_t requestedPosition = 0); ~QueuedTrack(); @@ -87,6 +88,7 @@ class QueuedTrack { private: std::shared_ptr ctx; + std::shared_ptr playableSemaphore; uint64_t pendingMercuryRequest = 0; uint32_t pendingAudioKeyRequest = 0; diff --git a/cspot/include/TrackReference.h b/cspot/include/TrackReference.h index 70b04e0a..17e6a1cb 100644 --- a/cspot/include/TrackReference.h +++ b/cspot/include/TrackReference.h @@ -9,7 +9,7 @@ #include "pb_decode.h" #include "protobuf/connect.pb.h" -#define TRACK_SEND_LIMIT 25 +#define TRACK_SEND_LIMIT 15 namespace cspot { struct TrackReference { @@ -38,8 +38,6 @@ struct TrackReference { Type type; - void decodeURI(); - bool operator==(const TrackReference& other) const; // Encodes list of track references into a pb structure, used by nanopb @@ -55,6 +53,21 @@ struct TrackReference { tracklist->clear(); } + static void deleteTracksInRange(std::vector* tracks, + size_t start, size_t end) { + // Sanity check for the range bounds + if (start >= tracks->size() || end >= tracks->size() || start > end) { + return; // Invalid range + } + + // Release resources for each track in the specified range + for (size_t i = start; i <= end; ++i) + pbReleaseProvidedTrack(&tracks->at(i)); + + // Erase the range of tracks from the tracklist + tracks->erase(tracks->begin() + start, tracks->begin() + end + 1); + } + static void pbReleaseProvidedTrack(ProvidedTrack* track) { if (track->metadata_count < track->full_metadata_count) track->metadata_count = track->full_metadata_count; diff --git a/cspot/src/CDNAudioFile.cpp b/cspot/src/CDNAudioFile.cpp index f58ea07d..ed88f26a 100644 --- a/cspot/src/CDNAudioFile.cpp +++ b/cspot/src/CDNAudioFile.cpp @@ -39,13 +39,21 @@ void CDNAudioFile::seek(size_t newPos) { this->position = newPos; } +#ifndef CONFIG_BELL_NOCODEC +/** + * @brief Opens connection to the provided cdn url, and fetches track metadata. + */ void CDNAudioFile::openStream() { CSPOT_LOG(info, "Opening HTTP stream to %s", this->cdnUrl.c_str()); // Open connection, read first 128 bytes this->httpConnection = bell::HTTPClient::get( this->cdnUrl, - {bell::HTTPClient::RangeHeader::range(0, OPUS_HEADER_SIZE - 1)}); + {bell::HTTPClient::RangeHeader::range(0, OPUS_HEADER_SIZE - 1)}, 20); + if (!httpConnection->stream().isOpen()) { + this->openStream(); + return; + } this->httpConnection->stream().read((char*)header.data(), OPUS_HEADER_SIZE); this->totalFileSize = @@ -72,8 +80,65 @@ void CDNAudioFile::openStream() { this->lastRequestPosition = 0; this->lastRequestCapacity = 0; } +#else +/** + * @brief Opens a connection to the CDN URL and fills the first buffer with track header data. + * + * @param header_size Reference to a size_t variable where the size of the header is stored. + * @return Pointer to the beginning of the HTTP buffer where the track header data is stored. + */ +uint8_t* CDNAudioFile::openStream(size_t& header_size) { + + // Open connection, fill first buffer + this->httpConnection = bell::HTTPClient::get( + this->cdnUrl, + {bell::HTTPClient::RangeHeader::range(0, HTTP_BUFFER_SIZE - 1)}, 20); + if (!httpConnection->stream().isOpen()) { + return this->openStream(header_size); + } + this->lastRequestPosition = 0; + this->lastRequestCapacity = this->httpConnection->contentLength(); + this->totalFileSize = + this->httpConnection->totalLength() - SPOTIFY_OPUS_HEADER; + + this->httpConnection->stream().read((char*)this->httpBuffer.data(), + lastRequestCapacity); + this->decrypt(this->httpBuffer.data(), lastRequestCapacity, + this->lastRequestPosition); + this->position = getHeader(); + header_size = this->position; + return &httpBuffer[0]; +} +#endif -size_t CDNAudioFile::readBytes(uint8_t* dst, size_t bytes) { +/** + * @brief Finds the position of the first audio frame in the HTTP response. + * + * The OGG Vorbis file starts with three headers. They contain valuable information + * for decoding the audio. + * + * @return The position of the first audio frame in the HTTP response. + */ +long CDNAudioFile::getHeader() { + uint32_t offset = SPOTIFY_OPUS_HEADER; + + for (int i = 0; i < 3; ++i) { + offset += 26; + uint32_t segmentCount = httpBuffer[offset]; + uint32_t segmentEnd = segmentCount + offset + 1; + ++offset; + + for (uint32_t j = offset; j < segmentEnd; ++j) { + offset += httpBuffer[j]; + } + + offset += segmentCount; + } + + return offset; +} + +long CDNAudioFile::readBytes(uint8_t* dst, size_t bytes) { size_t offsetPosition = position + SPOTIFY_OPUS_HEADER; size_t actualFileSize = this->totalFileSize + SPOTIFY_OPUS_HEADER; @@ -81,6 +146,7 @@ size_t CDNAudioFile::readBytes(uint8_t* dst, size_t bytes) { return 0; } +#ifndef CONFIG_BELL_NOCODEC // // Opus tries to read header, use prefetched data if (offsetPosition < OPUS_HEADER_SIZE && bytes + offsetPosition <= OPUS_HEADER_SIZE) { @@ -107,6 +173,7 @@ size_t CDNAudioFile::readBytes(uint8_t* dst, size_t bytes) { } // Data not in the headers. Make sense of whats going on. +#endif // Position in bounds :) if (offsetPosition >= this->lastRequestPosition && offsetPosition < this->lastRequestPosition + this->lastRequestCapacity) { @@ -130,9 +197,11 @@ size_t CDNAudioFile::readBytes(uint8_t* dst, size_t bytes) { this->enableRequestMargin = false; } - this->httpConnection->get( - cdnUrl, {bell::HTTPClient::RangeHeader::range( - requestPosition, requestPosition + HTTP_BUFFER_SIZE - 1)}); + if (!this->httpConnection->get( + cdnUrl, + {bell::HTTPClient::RangeHeader::range( + requestPosition, requestPosition + HTTP_BUFFER_SIZE - 1)})) + return -1; this->lastRequestPosition = requestPosition; this->lastRequestCapacity = this->httpConnection->contentLength(); diff --git a/cspot/src/DeviceStateHandler.cpp b/cspot/src/DeviceStateHandler.cpp index cc344f1b..4d8631ed 100644 --- a/cspot/src/DeviceStateHandler.cpp +++ b/cspot/src/DeviceStateHandler.cpp @@ -16,6 +16,7 @@ #include "NanoPBHelper.h" // for pbEncode, pbPutString #include "Packet.h" // for cspot #include "TrackReference.h" // for cspot +#include "WrappedSemaphore.h" // for WrappedSemaphore #include "nlohmann/json.hpp" // for basic_json<>::object_t, basic_json #include "nlohmann/json_fwd.hpp" // for json #include "pb.h" // for pb_bytes_array_t, PB_BYTES_ARRAY_T_A... @@ -27,12 +28,21 @@ static DeviceStateHandler* handler; void DeviceStateHandler::reloadTrackList(void*) { + if (handler->reloadPreloadedTracks) { + handler->needsToBeSkipped = true; + while (!handler->trackQueue->playableSemaphore->twait(1)) {}; + handler->trackPlayer->start(); + handler->trackPlayer->resetState(); + handler->reloadPreloadedTracks = false; + handler->sendCommand(CommandType::PLAYBACK_START); + } if (!handler->offset) { if (handler->trackQueue->preloadedTracks.size()) handler->trackQueue->preloadedTracks.clear(); handler->trackQueue->preloadedTracks.push_back( std::make_shared( handler->currentTracks[handler->offset], handler->ctx, + handler->trackQueue->playableSemaphore, handler->offsetFromStartInMillis)); handler->device.player_state.track = handler->currentTracks[handler->offset]; @@ -62,6 +72,7 @@ void DeviceStateHandler::reloadTrackList(void*) { handler->trackQueue->preloadedTracks.push_back( std::make_shared( handler->currentTracks[handler->offset - 1], handler->ctx, + handler->trackQueue->playableSemaphore, handler->offsetFromStartInMillis)); handler->offsetFromStartInMillis = 0; } @@ -76,21 +87,16 @@ void DeviceStateHandler::reloadTrackList(void*) { ->currentTracks[handler->offset + handler->trackQueue->preloadedTracks.size() - 1], - handler->ctx, 0)); + handler->ctx, handler->trackQueue->playableSemaphore)); } } - if (handler->reloadPreloadedTracks) { - handler->needsToBeSkipped = true; - handler->trackPlayer->start(); - handler->trackPlayer->resetState(); - handler->reloadPreloadedTracks = false; - handler->sendCommand(CommandType::PLAYBACK_START); - } if (handler->playerStateChanged) { handler->putPlayerState( PutStateReason::PutStateReason_PLAYER_STATE_CHANGED); handler->playerStateChanged = false; } + handler->resolvingContext.store(false); + //CSPOT_LOG(info,"heap_memory_check-safe = %i",heap_caps_check_integrity_all(true)); } DeviceStateHandler::DeviceStateHandler(std::shared_ptr ctx) { handler = this; @@ -99,7 +105,8 @@ DeviceStateHandler::DeviceStateHandler(std::shared_ptr ctx) { this->playerContext = std::make_shared( ctx, &this->device.player_state, ¤tTracks, &offset); - auto EOFCallback = [this](bool loaded) { + auto onTrackEnd = [this](bool loaded) { + if (!loaded) {} CSPOT_LOG(debug, "Ended track, needs_to_be_skipped = %s", needsToBeSkipped ? "true" : "false"); if (needsToBeSkipped) { @@ -113,31 +120,42 @@ DeviceStateHandler::DeviceStateHandler(std::shared_ptr ctx) { needsToBeSkipped = true; if (!this->trackQueue->preloadedTracks.size()) sendCommand(CommandType::DEPLETED); + if ((uint32_t)currentTracks.size() / 2 <= offset && + !this->resolvingContext) { + //CSPOT_LOG(info,"heap_memory_check-safe = %i",heap_caps_check_integrity_all(true)); + this->resolvingContext.store(true); + playerContext->resolveTracklist(metadata_map, reloadTrackList); + //CSPOT_LOG(info,"heap_memory_check-safe = %i",heap_caps_check_integrity_all(true)); + } }; - auto playerStateChangedCallback = [this](std::shared_ptr track, - bool new_track = false) { + auto onTrackChanged = [this](std::shared_ptr track, + bool new_track = false) { CSPOT_LOG(debug, "Track loaded, new_track = %s", new_track ? "true" : "false"); if (new_track) { this->device.player_state.timestamp = - this->ctx->timeProvider->getSyncedTimestamp(); - //putPlayerState(); + this->trackQueue->preloadedTracks[0] + ->trackMetrics->currentInterval->start; + + this->device.player_state.duration = track->trackInfo.duration; + // this->ctx->timeProvider->getSyncedTimestamp(); + // putPlayerState(PutStateReason::PutStateReason_PICKER_OPENED); sendCommand(CommandType::PLAYBACK, trackQueue->preloadedTracks[0]); - if ((uint32_t)currentTracks.size() / 2 == offset) - playerContext->resolveTracklist(metadata_map, reloadTrackList); } else putPlayerState(); }; - this->trackPlayer = std::make_shared( - ctx, trackQueue, EOFCallback, playerStateChangedCallback); + this->trackPlayer = std::make_shared(ctx, trackQueue, onTrackEnd, + onTrackChanged); CSPOT_LOG(info, "Started player"); - auto connectStateSubscription = [this](MercurySession::Response& res) { + auto connectStateSubscription = [this](MercurySession::Response res) { if (res.fail || !res.parts.size()) return; - if (strstr(res.mercuryHeader.uri, "player/command")) { + if (strstr(res.mercuryHeader.uri, "v1/devices/")) { + putDeviceState(PutStateReason::PutStateReason_SPIRC_NOTIFY); + } else if (strstr(res.mercuryHeader.uri, "player/command")) { if (res.parts[0].size()) parseCommand(res.parts[0]); } else if (strstr(res.mercuryHeader.uri, "volume")) { @@ -160,7 +178,7 @@ DeviceStateHandler::DeviceStateHandler(std::shared_ptr ctx) { CSPOT_LOG(info, "Added connect-state subscription"); // the device connection status gets reported trough "hm://social-connect",if active - auto socialConnectSubscription = [this](MercurySession::Response& res) { + auto socialConnectSubscription = [this](MercurySession::Response res) { if (res.fail || !res.parts.size()) return; if (res.parts[0].size()) { @@ -200,7 +218,8 @@ DeviceStateHandler::DeviceStateHandler(std::shared_ptr ctx) { ctx->session->setConnectedHandler([this]() { CSPOT_LOG(info, "Registered new device"); - this->putDeviceState(PutStateReason::PutStateReason_NEW_DEVICE); + this->putDeviceState( + PutStateReason_SPIRC_HELLO); // : PutStateReason::PutStateReason_NEW_DEVICE); // Assign country code this->ctx->config.countryCode = this->ctx->session->getCountryCode(); }); @@ -223,8 +242,8 @@ DeviceStateHandler::DeviceStateHandler(std::shared_ptr ctx) { device.device_info.capabilities = Capabilities{ true, 1, //can_be_player - false, - 0, //restrict_to_local + true, + 1, //restrict_to_local true, 1, //gaia_eq_connect_id true, @@ -260,9 +279,9 @@ DeviceStateHandler::DeviceStateHandler(std::shared_ptr ctx) { false, 0, //is_voice_enabled true, - 1, //needs_full_player_state + 0, //needs_full_player_state //overuses MercuryManager, but keeps connection for outside wlan alive false, - 1, //supports_gzip_pushes + 0, //supports_gzip_pushes false, 0, //supports_lossless_audio true, @@ -298,7 +317,7 @@ DeviceStateHandler::~DeviceStateHandler() { } void DeviceStateHandler::putDeviceState(PutStateReason put_state_reason) { - std::scoped_lock lock(playerStateMutex); + //std::scoped_lock lock(playerStateMutex); std::string uri = "hm://connect-state/v1/devices/" + this->ctx->config.deviceId + "/"; @@ -343,7 +362,7 @@ void DeviceStateHandler::putDeviceState(PutStateReason put_state_reason) { tempPutReq.device = Device_init_zero; pb_release(PutStateRequest_fields, &tempPutReq); auto parts = MercurySession::DataParts({putStateRequest}); - auto responseLambda = [this](MercurySession::Response& res) { + auto responseLambda = [this](MercurySession::Response res) { if (res.fail || !res.parts.size()) return; }; @@ -352,7 +371,7 @@ void DeviceStateHandler::putDeviceState(PutStateReason put_state_reason) { } void DeviceStateHandler::putPlayerState(PutStateReason put_state_reason) { - std::scoped_lock lock(playerStateMutex); + //std::scoped_lock lock(playerStateMutex); std::string uri = "hm://connect-state/v1/devices/" + this->ctx->config.deviceId + "/"; PutStateRequest tempPutReq = {}; @@ -367,16 +386,20 @@ void DeviceStateHandler::putPlayerState(PutStateReason put_state_reason) { tempPutReq.put_state_reason = put_state_reason; tempPutReq.last_command_message_id = last_message_id; tempPutReq.has_started_playing_at = true; - tempPutReq.started_playing_at = this->started_playing_at; + tempPutReq.started_playing_at = + this->trackQueue->preloadedTracks[0]->trackMetrics->trackHeaderTime; tempPutReq.has_has_been_playing_for_ms = true; tempPutReq.has_been_playing_for_ms = - this->ctx->timeProvider->getSyncedTimestamp() - this->started_playing_at; + this->ctx->timeProvider->getSyncedTimestamp() - + this->trackQueue->preloadedTracks[0]->trackMetrics->trackHeaderTime; tempPutReq.has_client_side_timestamp = true; tempPutReq.client_side_timestamp = this->ctx->timeProvider->getSyncedTimestamp(); tempPutReq.has_only_write_player_state = true; tempPutReq.only_write_player_state = true; device.player_state.has_position_as_of_timestamp = true; + device.player_state.timestamp = + trackQueue->preloadedTracks[0]->trackMetrics->currentInterval->start; device.player_state.position_as_of_timestamp = (int64_t)trackQueue->preloadedTracks[0]->trackMetrics->getPosition(); device.has_player_state = true; @@ -481,7 +504,7 @@ void DeviceStateHandler::putPlayerState(PutStateReason put_state_reason) { pb_release(PutStateRequest_fields, &tempPutReq); auto parts = MercurySession::DataParts({putStateRequest}); - auto responseLambda = [this](MercurySession::Response& res) { + auto responseLambda = [this](MercurySession::Response res) { if (res.fail || !res.parts.size()) return; }; @@ -513,7 +536,7 @@ void DeviceStateHandler::skip(CommandType dir, bool notify) { trackQueue->preloadedTracks.push_back( std::make_shared( currentTracks[offset + trackQueue->preloadedTracks.size()], - this->ctx, 0)); + this->ctx, this->trackQueue->playableSemaphore)); } } offset++; @@ -525,7 +548,8 @@ void DeviceStateHandler::skip(CommandType dir, bool notify) { trackQueue->preloadedTracks.pop_back(); offset--; trackQueue->preloadedTracks.push_front(std::make_shared( - currentTracks[offset - 1], this->ctx, 0)); + currentTracks[offset - 1], this->ctx, + this->trackQueue->playableSemaphore)); } else { if (trackQueue->preloadedTracks.size()) trackQueue->preloadedTracks[0]->requestedPosition = 0; @@ -544,9 +568,7 @@ void DeviceStateHandler::parseCommand(std::vector& data) { if (data.size() <= 2) return; auto jsonResult = nlohmann::json::parse(data); - - if (jsonResult.find("message_id") != jsonResult.end()) - last_message_id = jsonResult["message_id"].get(); + last_message_id = jsonResult.value("message_id", last_message_id); auto command = jsonResult.find("command"); if (command != jsonResult.end()) { @@ -557,6 +579,7 @@ void DeviceStateHandler::parseCommand(std::vector& data) { auto options = command->find("options"); + // std::cout<at("endpoint") == "transfer") { if (is_active) return; @@ -595,7 +618,7 @@ void DeviceStateHandler::parseCommand(std::vector& data) { logging_params->at("interaction_ids")[0].get())); } } - auto responseHandler = [this](MercurySession::Response& res) { + auto responseHandler = [this](MercurySession::Response res) { if (res.fail || !res.parts.size()) return; std::scoped_lock lock(trackQueue->tracksMutex); @@ -663,8 +686,10 @@ void DeviceStateHandler::parseCommand(std::vector& data) { responseHandler); } else if (this->is_active) { if (command->at("endpoint") == "play") { +#ifndef CONFIG_BELL_NOCODEC handler->trackPlayer->stop(); sendCommand(CommandType::DEPLETED); +#endif playerContext->radio_offset = 0; std::scoped_lock lock(trackQueue->tracksMutex); trackQueue->preloadedTracks.clear(); @@ -677,20 +702,11 @@ void DeviceStateHandler::parseCommand(std::vector& data) { for (int i = 0; i < currentTracks.size(); i++) { if (i > this->offset || strcmp(currentTracks[i].provider, "queue") != 0) { - if (currentTracks[i].full_metadata_count > - currentTracks[i].metadata_count) - currentTracks[i].metadata_count = - currentTracks[i].full_metadata_count; - pb_release(ProvidedTrack_fields, ¤tTracks[i]); - } else - queued++; + cspot::TrackReference::pbReleaseProvidedTrack(¤tTracks[i]); + currentTracks.erase(currentTracks.begin() + i); + i--; + } } - if (queued) { - currentTracks.erase(currentTracks.begin()); - currentTracks.erase(currentTracks.begin() + queued, - currentTracks.end()); - } else - currentTracks.clear(); auto logging_params = command->find("logging_params"); if (logging_params != command->end()) { @@ -881,11 +897,12 @@ void DeviceStateHandler::parseCommand(std::vector& data) { sendCommand(CommandType::PLAY); } else if (command->at("endpoint") == "skip_next") { ctx->playbackMetrics->end_reason = PlaybackMetrics::FORWARD_BTN; - needsToBeSkipped = false; +#ifndef CONFIG_BELL_NOCODEC + this->needsToBeSkipped = false; +#endif if (command->find("track") == command->end()) skip(CommandType::SKIP_NEXT, false); else { - std::scoped_lock lock(playerContext->trackListMutex); offset = 0; for (auto track : currentTracks) { if (strcmp(command->find("track") @@ -901,12 +918,12 @@ void DeviceStateHandler::parseCommand(std::vector& data) { this->device.player_state.track = currentTracks[offset]; for (auto i = offset; i < (currentTracks.size() < 3 + offset ? currentTracks.size() - : 3 + offset) + - offset; + : 3 + offset); i++) { trackQueue->preloadedTracks.push_back( - std::make_shared(currentTracks[i], - this->ctx, 0)); + std::make_shared( + currentTracks[i], this->ctx, + this->trackQueue->playableSemaphore)); } offset++; trackPlayer->resetState(); @@ -965,7 +982,8 @@ void DeviceStateHandler::parseCommand(std::vector& data) { trackQueue->preloadedTracks.insert( trackQueue->preloadedTracks.begin() + 1 + queuedOffset, std::make_shared( - currentTracks[offset + queuedOffset], this->ctx, 0)); + currentTracks[offset + queuedOffset], this->ctx, + this->trackQueue->playableSemaphore)); } #ifndef CONFIG_BELL_NOCODEC this->trackPlayer->seekMs( @@ -989,7 +1007,7 @@ void DeviceStateHandler::parseCommand(std::vector& data) { for (uint8_t i = offset; i < currentTracks.size(); i++) { if (strcmp(currentTracks[i].provider, "queue") != 0) break; - pb_release(ProvidedTrack_fields, ¤tTracks[i]); + cspot::TrackReference::pbReleaseProvidedTrack(¤tTracks[i]); currentTracks.erase(currentTracks.begin() + i); i--; } @@ -1014,7 +1032,8 @@ void DeviceStateHandler::parseCommand(std::vector& data) { continue; } removeTrack:; - pb_release(ProvidedTrack_fields, ¤tTracks[offset + i]); + cspot::TrackReference::pbReleaseProvidedTrack( + ¤tTracks[offset + i]); currentTracks.erase(currentTracks.begin() + offset + i); if (strcmp(currentTracks[offset + i].provider, "queue") != 0 || strcmp(command->at("next_tracks")[i]["uri"] @@ -1034,7 +1053,7 @@ void DeviceStateHandler::parseCommand(std::vector& data) { std::make_shared( currentTracks[offset + trackQueue->preloadedTracks.size() - 1], - this->ctx, 0)); + this->ctx, this->trackQueue->playableSemaphore)); } #ifndef CONFIG_BELL_NOCODEC this->trackPlayer->seekMs( @@ -1123,19 +1142,14 @@ void DeviceStateHandler::parseCommand(std::vector& data) { this->trackQueue->preloadedTracks.erase( this->trackQueue->preloadedTracks.begin(), this->trackQueue->preloadedTracks.end()); - uint8_t queued = 0; for (int i = offset; i < currentTracks.size(); i++) { if (strcmp(currentTracks[i].provider, "queue") != 0) { - if (currentTracks[i].full_metadata_count > - currentTracks[i].metadata_count) - currentTracks[i].metadata_count = - currentTracks[i].full_metadata_count; - pb_release(ProvidedTrack_fields, ¤tTracks[i]); - } else - queued++; + cspot::TrackReference::pbReleaseProvidedTrack( + ¤tTracks[offset + i]); + currentTracks.erase(currentTracks.begin() + offset + i); + i--; + } } - currentTracks.erase(currentTracks.begin() + offset + queued, - currentTracks.end()); playerContext->resolveTracklist(metadata_map, reloadTrackList, true); sendCommand(CommandType::SET_SHUFFLE, (int32_t)(this->device.player_state.options diff --git a/cspot/src/EventManager.cpp b/cspot/src/EventManager.cpp index 3b292fe7..042b9c76 100644 --- a/cspot/src/EventManager.cpp +++ b/cspot/src/EventManager.cpp @@ -272,13 +272,13 @@ appended_decision_id:; start_source = end_source; #ifndef CONFIG_HIDDEN - auto responseLambda = [=](MercurySession::Response& res) { + auto responseLambda = [=](MercurySession::Response res) { }; auto parts = MercurySession::DataParts({msg}); // Execute the request - ctx->session->execute(MercurySession::RequestType::POST, requestUrl, - responseLambda, parts); + //ctx->session->execute(MercurySession::RequestType::POST, requestUrl, + // responseLambda, parts); #endif return msg; } \ No newline at end of file diff --git a/cspot/src/LoginBlob.cpp b/cspot/src/LoginBlob.cpp index 232a8a90..31c9d8a6 100644 --- a/cspot/src/LoginBlob.cpp +++ b/cspot/src/LoginBlob.cpp @@ -41,11 +41,8 @@ std::string sha1_digest(const std::vector& message) { } LoginBlob::LoginBlob(std::string name) { - char hash[32]; - sprintf(hash, "%016zu", std::hash{}(name)); - // base is 142137fd329622137a14901634264e6f332e2411 std::string mac_string = bell::getMacAddress(); - CSPOT_LOG(info, "new mac : %s", mac_string.c_str()); + CSPOT_LOG(info, "Mac-address : %s", mac_string.c_str()); std::vector mac(mac_string.begin(), mac_string.end()); this->deviceId = sha1_digest(mac); this->crypto = std::make_unique(); diff --git a/cspot/src/MercurySession.cpp b/cspot/src/MercurySession.cpp index d04e9dbe..1915d717 100644 --- a/cspot/src/MercurySession.cpp +++ b/cspot/src/MercurySession.cpp @@ -18,77 +18,125 @@ #include "ShannonConnection.h" // for ShannonConnection #include "TimeProvider.h" // for TimeProvider #include "Utils.h" // for extract, pack, hton64 +#include "WrappedSemaphore.h" using namespace cspot; +template +T extractData(const std::vector& data, size_t& pos) { + static_assert(std::is_integral::value, + "extractData only supports integral types"); + + // Check that we have enough bytes to extract + if (pos + sizeof(T) > data.size()) { + throw std::out_of_range("Not enough data to extract"); + } + + T value; + memcpy(&value, &data[pos], sizeof(T)); + pos += sizeof(T); + + // Convert to host byte order based on the size of T + if constexpr (sizeof(T) == 2) { + return ntohs(value); + } else if constexpr (sizeof(T) == 4) { + return ntohl(value); + } else if constexpr (sizeof(T) == 8) { + return hton64( + value); // Assuming you have defined `hton64` similarly to `htonl` for 64-bit values + } else { + return 0; // static_assert(false, "Unsupported type size for extractData"); + } +} + MercurySession::MercurySession(std::shared_ptr timeProvider) - : bell::Task("mercury_dispatcher", 32 * 1024, 3, 1) { + : bell::Task("mercury_dispatcher", 8 * 1024, 3, + 1) { //double the size for reconnecting + responseSemaphore = std::make_shared(); this->timeProvider = timeProvider; } MercurySession::~MercurySession() { + this->responseSemaphore->give(); std::scoped_lock lock(this->isRunningMutex); } void MercurySession::runTask() { - isRunning = true; + isRunning.store(true); std::scoped_lock lock(this->isRunningMutex); - this->executeEstabilishedCallback = true; - while (isRunning) { - cspot::Packet packet = {}; - try { - packet = shanConn->recvPacket(); - CSPOT_LOG(info, "Received packet, command: %d", packet.command); - - if (static_cast(packet.command) == RequestType::PING) { - timeProvider->syncWithPingPacket(packet.data); - - this->lastPingTimestamp = timeProvider->getSyncedTimestamp(); - this->shanConn->sendPacket(0x49, packet.data); - } else { - this->packetQueue.push(packet); - } - } catch (const std::runtime_error& e) { - CSPOT_LOG(error, "Error while receiving packet: %s", e.what()); - connection_lost = true; - failAllPending(); - if (!isRunning) - return; - - reconnect(); - connection_lost = false; - continue; + while (isRunning) { + if (!processPackets()) { + handleReconnection(); } } } -void MercurySession::reconnect() { - isReconnecting = true; - +bool MercurySession::processPackets() { try { - this->conn = nullptr; - this->shanConn = nullptr; - this->partials.clear(); - - this->connectWithRandomAp(); - this->authenticate(this->authBlob); - - CSPOT_LOG(info, "Reconnection successful"); - - BELL_SLEEP_MS(100); + cspot::Packet packet = shanConn->recvPacket(); + CSPOT_LOG(info, "Received packet, command: %d", packet.command); + if (static_cast(packet.command) == RequestType::PING) { + timeProvider->syncWithPingPacket(packet.data); + + this->lastPingTimestamp = timeProvider->getSyncedTimestamp(); + this->shanConn->sendPacket(0x49, packet.data); + } else if (packet.data.size()) { + std::unique_lock lock(queueMutex); + this->packetQueue.push_back(packet); + lock.unlock(); // Optional, the destructor will unlock it + this->responseSemaphore->give(); + } + return true; + } catch (const std::runtime_error& e) { + CSPOT_LOG(error, "Error while receiving packet: %s", e.what()); + failAllPending(); // Fail all pending requests + return false; + } catch (const std::exception& e) { + CSPOT_LOG(error, "Unexpected exception: %s", e.what()); + failAllPending(); // Fail all pending requests + return false; + } catch (...) { + CSPOT_LOG(error, "Unknown error occurred while receiving packet."); + failAllPending(); // Fail all pending requests + return false; + } +} - lastPingTimestamp = timeProvider->getSyncedTimestamp(); - isReconnecting = false; +void MercurySession::handleReconnection() { + if (isReconnecting) + return; - this->executeEstabilishedCallback = true; - } catch (...) { - CSPOT_LOG(error, "Cannot reconnect, will retry in 5s"); - BELL_SLEEP_MS(5000); + isReconnecting = true; + reconnect(); + isReconnecting = false; +} - if (isRunning) { - return reconnect(); +void MercurySession::reconnect() { + while (isRunning) { + try { + this->conn = nullptr; + this->shanConn = nullptr; + this->partials.clear(); + // Reset connections + this->connectWithRandomAp(); + this->authenticate(this->authBlob); + + CSPOT_LOG(info, "Reconnection successful"); + + BELL_SLEEP_MS(100); + + lastPingTimestamp = timeProvider->getSyncedTimestamp(); + isReconnecting = false; + this->executeEstabilishedCallback = true; + return; // Successful connection, exit loop + } catch (...) { + CSPOT_LOG(error, "Cannot reconnect, will retry in 5s"); + BELL_SLEEP_MS(1000); + if (!isRunning) { // Stop retrying if session is not running + return; + } } } } @@ -129,7 +177,7 @@ void MercurySession::unregisterAudioKey(uint32_t sequenceId) { void MercurySession::disconnect() { CSPOT_LOG(info, "Disconnecting mercury session"); - this->isRunning = false; + isRunning.store(false); conn->close(); std::scoped_lock lock(this->isRunningMutex); } @@ -139,10 +187,13 @@ std::string MercurySession::getCountryCode() { } void MercurySession::handlePacket() { - Packet packet = {}; - if (connection_lost) + this->responseSemaphore->wait(); + std::unique_lock lock(queueMutex); + if (!packetQueue.size()) return; - this->packetQueue.wtpop(packet, 200); + Packet packet = std::move(*packetQueue.begin()); + packetQueue.pop_front(); + lock.unlock(); // Optional, the destructor will unlock it if (executeEstabilishedCallback && this->connectionReadyCallback != nullptr) { executeEstabilishedCallback = false; @@ -169,7 +220,6 @@ void MercurySession::handlePacket() { RequestType::AUDIO_KEY_SUCCESS_RESPONSE; this->audioKeyCallbacks[seqId](success, packet.data); } - break; } case RequestType::SEND: @@ -178,11 +228,10 @@ void MercurySession::handlePacket() { CSPOT_LOG(debug, "Received mercury packet"); auto response = this->decodeResponse(packet.data); if (!response.fail) { - if (response.sequenceId >= 0) { - if (this->callbacks.count(response.sequenceId)) { - this->callbacks[response.sequenceId](response); - this->callbacks.erase(this->callbacks.find(response.sequenceId)); - } + if (this->callbacks.count(response.sequenceId)) { + uint64_t tempSequenceId = response.sequenceId; + this->callbacks[response.sequenceId](response); + this->callbacks.erase(this->callbacks.find(tempSequenceId)); } pb_release(Header_fields, &response.mercuryHeader); } @@ -192,10 +241,10 @@ void MercurySession::handlePacket() { auto response = decodeResponse(packet.data); if (!response.fail) { std::string uri(response.mercuryHeader.uri); - for (auto& it : this->subscriptions) { - if (uri.find(it.first) != std::string::npos) { - it.second(response); - break; // Exit loop once subscription is found + for (const auto& [subUri, callback] : subscriptions) { + if (uri.find(subUri) != std::string::npos) { + callback(response); + break; } } pb_release(Header_fields, &response.mercuryHeader); @@ -222,28 +271,30 @@ void MercurySession::failAllPending() { MercurySession::Response MercurySession::decodeResponse( const std::vector& data) { - auto sequenceLength = ntohs(extract(data, 0)); - int64_t sequenceId; + size_t pos = 0; + auto sequenceLength = extractData(data, pos); + uint64_t sequenceId; uint8_t flag; Response resp; + resp.mercuryHeader = Header_init_default; if (sequenceLength == 2) - sequenceId = ntohs(extract(data, 2)); + sequenceId = extractData(data, pos); else if (sequenceLength == 4) - sequenceId = ntohl(extract(data, 2)); + sequenceId = extractData(data, pos); else if (sequenceLength == 8) - sequenceId = hton64(extract(data, 2)); + sequenceId = extractData(data, pos); else return resp; - size_t pos = 2 + sequenceLength; flag = (uint8_t)data[pos]; pos++; - uint16_t parts = ntohs(extract(data, pos)); - pos += 2; - auto partial = partials.begin(); - while (partial != partials.end() && partial->sequenceId != sequenceId) - partial++; // if(partial.first == sequenceId) + uint16_t parts = extractData(data, pos); + auto partial = std::find_if( + partials.begin(), partials.end(), + [sequenceId](const Response& p) { return p.sequenceId == sequenceId; }); if (partial == partials.end()) { + if (flag == 2) + return resp; CSPOT_LOG(debug, "Creating new Mercury Response, seq: %lli, flags: %i, parts: %i", sequenceId, flag, parts); @@ -259,12 +310,20 @@ MercurySession::Response MercurySession::decodeResponse( while (parts) { if (data.size() <= pos) break; - auto partSize = ntohs(extract(data, pos)); - pos += 2; + auto partSize = extractData(data, pos); if (partial->mercuryHeader.uri == NULL) { auto headerBytes = std::vector(data.begin() + pos, data.begin() + pos + partSize); pbDecode(partial->mercuryHeader, Header_fields, headerBytes); + pb_istream_t stream = + pb_istream_from_buffer(&headerBytes[0], headerBytes.size()); + + // Decode the message + if (pb_decode(&stream, Header_fields, &partial->mercuryHeader) == false) { + pb_release(Header_fields, &partial->mercuryHeader); + partials.erase(partial); + return resp; + } } else { if (index >= partial->parts.size()) partial->parts.push_back(std::vector{}); @@ -276,8 +335,9 @@ MercurySession::Response MercurySession::decodeResponse( pos += partSize; parts--; } - if (flag == static_cast(ResponseFlag::FINAL)) { - resp = *partial; + if (flag == static_cast(ResponseFlag::FINAL) && + partial->mercuryHeader.uri != NULL) { + resp = std::move(*partial); partials.erase(partial); resp.fail = false; } @@ -294,6 +354,8 @@ uint64_t MercurySession::executeSubscription(RequestType method, ResponseCallback callback, ResponseCallback subscription, DataParts& payload) { + while (isReconnecting) + BELL_SLEEP_MS(100); CSPOT_LOG(debug, "Executing Mercury Request, type %s", RequestTypeMap[method].c_str()); @@ -302,8 +364,7 @@ uint64_t MercurySession::executeSubscription(RequestType method, tempMercuryHeader.uri = strdup(uri.c_str()); tempMercuryHeader.method = strdup(RequestTypeMap[method].c_str()); - // GET and SEND are actually the same. Therefore the override - // The difference between them is only in header's method + // Map logical request type to the appropriate wire request type (SEND for POST, GET, PUT) if (method == RequestType::GET || method == RequestType::POST || method == RequestType::PUT) { method = RequestType::SEND; @@ -319,13 +380,33 @@ uint64_t MercurySession::executeSubscription(RequestType method, if (callback != nullptr) this->callbacks.insert({sequenceId, callback}); - // Structure: [Sequence size] [SequenceId] [0x1] [Payloads number] - // [Header size] [Header] [Payloads (size + data)] + // Prepare the data packet structure: + // [Sequence size] [SequenceId] [0x1] [Payloads number] [Header size] [Header] [Payloads (size + data)] + auto sequenceIdBytes = + prepareSequenceIdPayload(sequenceId, headerBytes, payload); + + // Bump sequence ID for the next request + this->sequenceId += 1; + + try { + this->shanConn->sendPacket( + static_cast::type>(method), + sequenceIdBytes); + } catch (...) { + // @TODO: handle disconnect + } + + return this->sequenceId - 1; +} +std::vector MercurySession::prepareSequenceIdPayload( + uint64_t sequenceId, const std::vector& headerBytes, + const DataParts& payload) { // Pack sequenceId - auto sequenceIdBytes = pack(hton64(this->sequenceId)); + auto sequenceIdBytes = pack(hton64(sequenceId)); auto sequenceSizeBytes = pack(htons(sequenceIdBytes.size())); + // Initial parts of the packet sequenceIdBytes.insert(sequenceIdBytes.begin(), sequenceSizeBytes.begin(), sequenceSizeBytes.end()); sequenceIdBytes.push_back(0x01); @@ -334,6 +415,7 @@ uint64_t MercurySession::executeSubscription(RequestType method, sequenceIdBytes.insert(sequenceIdBytes.end(), payloadNum.begin(), payloadNum.end()); + // Encode the header size and the header data auto headerSizePayload = pack(htons(headerBytes.size())); sequenceIdBytes.insert(sequenceIdBytes.end(), headerSizePayload.begin(), headerSizePayload.end()); @@ -341,26 +423,14 @@ uint64_t MercurySession::executeSubscription(RequestType method, headerBytes.end()); // Encode all the payload parts - for (int x = 0; x < payload.size(); x++) { - headerSizePayload = pack(htons(payload[x].size())); + for (const auto& part : payload) { + headerSizePayload = pack(htons(part.size())); sequenceIdBytes.insert(sequenceIdBytes.end(), headerSizePayload.begin(), headerSizePayload.end()); - sequenceIdBytes.insert(sequenceIdBytes.end(), payload[x].begin(), - payload[x].end()); - } - - // Bump sequence id - this->sequenceId += 1; - - try { - this->shanConn->sendPacket( - static_cast::type>(method), - sequenceIdBytes); - } catch (...) { - // @TODO: handle disconnect + sequenceIdBytes.insert(sequenceIdBytes.end(), part.begin(), part.end()); } - return this->sequenceId - 1; + return sequenceIdBytes; } uint32_t MercurySession::requestAudioKey(const std::vector& trackId, diff --git a/cspot/src/PlayerContext.cpp b/cspot/src/PlayerContext.cpp index 328708b4..8c4b5784 100644 --- a/cspot/src/PlayerContext.cpp +++ b/cspot/src/PlayerContext.cpp @@ -14,111 +14,163 @@ #include "nlohmann/json_fwd.hpp" // for json #endif +#define METADATA_STRING "metadata" +#define SMART_SHUFFLE_STRING "shuffle.distribution" + using namespace cspot; +/** + * @brief Create a C string from a JSON object string value. + * + * @param[in] jsonObject The JSON object. + * @param[in] key The key to look up. + * @return The C string or NULL if the key isn't found or the value is empty. + */ char* PlayerContext::createStringReferenceIfFound( nlohmann::json::value_type& jsonObject, const char* key) { - if (jsonObject.find(key) != jsonObject.end()) { - std::string value = jsonObject.at(key).get(); + auto object = jsonObject.find(key); + if (object != jsonObject.end()) { + std::string value = object.value(); if (value.size()) return strdup(value.c_str()); } return NULL; } -void PlayerContext::resolveRadio( + +/** + * @brief Retrieve metadata from a JSON object if a key is found. + * + * This function searches for a specified key in a JSON object. If the key is found, + * it creates a metadata entry with the key and its corresponding value. + * + * @param[in] jsonObject The JSON object to search. + * @param[in] key The key to look for in the JSON object. + * @param[out] metadata The metadata entry to populate if the key is found. + * @return True if the key is found and metadata is populated, false otherwise. + */ +bool createMetadataIfFound(nlohmann::json::value_type& jsonObject, + const char* key, + ProvidedTrack_MetadataEntry& metadata) { + // Find the key in the JSON object + auto object = jsonObject.find(key); + + // Check if the key exists in the JSON object + if (object != jsonObject.end()) { + std::string value = object.value(); + + // Populate metadata with the key and its value + metadata = {strdup(key), value.size() ? strdup(value.c_str()) : NULL}; + return true; + } + + // Return false if the key is not found + return false; +} + +template +T getFromJsonObject(nlohmann::json::value_type& jsonObject, const char* key) { + T value; + auto object = jsonObject.find(key); + if (object != jsonObject.end()) + value = object.value(); + return value; +} + +/** + * @brief Query the autoplay service for a context. + * + * This function queries the autoplay service if a context is autoplay-enabled. + * If the context is autoplay-enabled, it resolves the context into a tracklist. + * + * @param[in] metadata_map The metadata to pass to the autoplay service. + * @param[in] responseFunction The function to call with the resolved tracklist. + * @param[in] secondTry If true, use the first track in the tracklist as the context URI + * instead of the context URI from the player state. + */ +void PlayerContext::autoplayQuery( std::vector> metadata_map, void (*responseFunction)(void*), bool secondTry) { - CSPOT_LOG(debug, "Resolve autoplay context: %s", - secondTry ? tracks->at(0).uri : playerState->context_uri); + if (next_page_url != NULL) + resolveRadio(metadata_map, responseFunction, next_page_url); std::string requestUrl = string_format("hm://autoplay-enabled/query?uri=%s", secondTry ? tracks->at(0).uri : playerState->context_uri); + CSPOT_LOG(debug, "Querying autoplay: %s", &requestUrl[0]); auto responseHandler = [this, metadata_map, responseFunction, - secondTry](MercurySession::Response& res) { - if (res.fail || !res.parts.size()) { - if (secondTry) - resolveRadio(metadata_map, responseFunction, true); + secondTry](MercurySession::Response res) { + if (res.fail || !res.parts.size() || !res.parts[0].size()) { + if (!secondTry) + return autoplayQuery(metadata_map, responseFunction, true); else - return responseFunction(NULL); + return; // responseFunction(NULL); } std::string resolve_autoplay = std::string(res.parts[0].begin(), res.parts[0].end()); std::string requestUrl; - if (radio_offset) - requestUrl = (std::string)next_page_url; - else { - auto trackRef = tracks->end() - 1; - while ( - trackRef != tracks->begin() && - (!trackRef->provider || strcmp(trackRef->provider, "context") != 0)) { - trackRef--; - } - if (strcmp(trackRef->provider, "context") == 0) + { + if (strcmp(tracks->back().provider, "context") == 0) requestUrl = string_format( "hm://radio-apollo/v3/stations/%s?autoplay=true&offset=%i", - &resolve_autoplay[0], trackRef->original_index); + &resolve_autoplay[0], tracks->back().original_index); else { requestUrl = "hm://radio-apollo/v3/tracks/" + (std::string)playerState->context_uri + "?autoplay=true&count=50&isVideo=false&prev_tracks="; - uint8_t copiedTracks = 0; + bool copiedTracks = false; + auto trackRef = + tracks->size() > 50 ? tracks->end() - 50 : tracks->begin(); while (trackRef != tracks->end()) { - if (strcmp(trackRef->provider, "autoplay") == 0 && + if (trackRef->removed == NULL && //is no demlimiter (trackRef->uri && strrchr(trackRef->uri, ':'))) { if (copiedTracks) requestUrl += ","; requestUrl += (std::string)(strrchr(trackRef->uri, ':') + 1); - copiedTracks++; + copiedTracks = true; } trackRef++; } } + resolveRadio(metadata_map, responseFunction, &requestUrl[0]); } - auto responseHandler = [this, metadata_map, - responseFunction](MercurySession::Response& res) { - if (res.fail || !res.parts.size()) - return responseFunction(NULL); - if (!res.parts[0].size()) - return responseFunction(NULL); - std::scoped_lock lock(trackListMutex); - // remove old_tracks, keep 5 tracks in memory - int remove_tracks = ((int)*index) - 5; - if (remove_tracks > 0) { - for (int i = 0; i < remove_tracks; i++) { - if (tracks->at(i).full_metadata_count) - tracks->at(i).metadata_count = tracks->at(i).full_metadata_count; - pb_release(ProvidedTrack_fields, &tracks->at(i)); - } - tracks->erase(tracks->begin(), tracks->begin() + remove_tracks); - } - *index = (uint8_t)(remove_tracks < 0 ? 5 + remove_tracks : 5); - auto jsonResult = nlohmann::json::parse(res.parts[0]); - if (jsonResult.find("uri") != jsonResult.end()) - context_uri = jsonResult.at("uri").get(); - if (next_page_url != NULL) - free(next_page_url); - next_page_url = createStringReferenceIfFound(jsonResult, "next_page_url"); - std::vector> metadata = metadata_map; - metadata.push_back(std::make_pair("context_uri", context_uri)); - metadata.push_back(std::make_pair("entity_uri", context_uri)); - metadata.push_back(std::make_pair("iteration", "0")); - metadata.insert(metadata.begin(), - std::make_pair("autoplay.is_autoplay", "true")); - metadata.push_back(std::make_pair("track_player", "audio")); - metadata.push_back( - std::make_pair("actions.skipping_next_past_track", "resume")); - metadata.push_back( - std::make_pair("actions.skipping_prev_past_track", "resume")); - jsonToTracklist(tracks, metadata, jsonResult["tracks"], "autoplay", 0); - radio_offset++; - responseFunction(NULL); - }; - ctx->session->execute(MercurySession::RequestType::GET, requestUrl, - responseHandler); }; ctx->session->execute(MercurySession::RequestType::GET, requestUrl, responseHandler); } +void PlayerContext::resolveRadio( + std::vector> metadata_map, + void (*responseFunction)(void*), char* url) { + CSPOT_LOG(debug, "Resolve radio : %s", &url[0]); + auto responseHandler = [this, metadata_map, + responseFunction](MercurySession::Response res) { + if (res.fail || !res.parts.size()) + return responseFunction(NULL); + if (!res.parts[0].size()) + return responseFunction(NULL); + //std::scoped_lock lock(trackListMutex); + // remove old_tracks, keep 5 tracks in memory + int remove_tracks = ((int)*index) - 5; + if (remove_tracks > 0) { + cspot::TrackReference::deleteTracksInRange(tracks, 0, remove_tracks - 1); + *index = (uint8_t)(remove_tracks < 0 ? 5 + remove_tracks : 5); + } + auto jsonResult = nlohmann::json::parse(res.parts[0]); + context_uri = jsonResult.value("uri", context_uri); + if (next_page_url != NULL) + free(next_page_url); + next_page_url = createStringReferenceIfFound(jsonResult, "next_page_url"); + std::vector> metadata = metadata_map; + metadata.push_back(std::make_pair("context_uri", context_uri)); + metadata.push_back(std::make_pair("entity_uri", context_uri)); + metadata.insert(metadata.begin(), + std::make_pair("autoplay.is_autoplay", "true")); + metadata.push_back(std::make_pair("track_player", "audio")); + jsonToTracklist(tracks, metadata, jsonResult["tracks"], "autoplay", 0); + radio_offset++; + responseFunction(NULL); + }; + ctx->session->execute(MercurySession::RequestType::GET, url, responseHandler); +} + static unsigned long distributionToIndex(std::string d) { return strtoul(&d[d.find("(") + 1], nullptr, 10); } @@ -130,20 +182,17 @@ void PlayerContext::createIndexBasedOnTracklist( alternative_index.clear(); std::vector shuffle_index; bool smart_shuffle = - (json_tracks.at(0).find("metadata") == json_tracks.at(0).end() || - json_tracks.at(0).find("metadata")->find("shuffle.distribution") == - json_tracks.at(0).find("metadata")->end()) + (json_tracks.at(0).find(METADATA_STRING) == json_tracks.at(0).end() || + json_tracks.at(0).find(METADATA_STRING)->find(SMART_SHUFFLE_STRING) == + json_tracks.at(0).find(METADATA_STRING)->end()) ? false : true; for (int i = 0; i < tracks->size(); i++) { if (strstr(tracks->at(i).uri, "spotify:delimiter")) { uint8_t release_offset = 1; - while (i + release_offset < tracks->size()) { - cspot::TrackReference::pbReleaseProvidedTrack( - std::addressof(tracks->at(i + release_offset))); - release_offset++; - } - tracks->erase(tracks->begin() + i, tracks->end()); + CSPOT_LOG(info, "deleting %i tracks", tracks->size() - (i + 1)); + cspot::TrackReference::deleteTracksInRange(tracks, i + 1, + tracks->size() - 1); break; } } @@ -152,15 +201,14 @@ void PlayerContext::createIndexBasedOnTracklist( for (int i = 0; i < json_tracks.size(); i++) { if (smart_shuffle) { alternative_index[distributionToIndex(json_tracks.at(i) - .find("metadata") - ->find("shuffle.distribution") + .find(METADATA_STRING) + ->find(SMART_SHUFFLE_STRING) ->get()) - 1] = i; } else if (!shuffle) alternative_index.push_back(i); for (auto& track : *tracks) { - if (strcmp(track.uri, - json_tracks.at(i).at("uri").get().c_str()) == 0) { + if (track.uri == json_tracks.at(i)["uri"].get_ref()) { track.original_index = i; track.page = page; if (shuffle && !smart_shuffle) @@ -182,6 +230,8 @@ void PlayerContext::createIndexBasedOnTracklist( shuffle_index.begin(), shuffle_index.end()); } } +void jsonToDevice() {} + uint8_t PlayerContext::jsonToTracklist( std::vector* tracks, std::vector> metadata_map, @@ -205,12 +255,12 @@ uint8_t PlayerContext::jsonToTracklist( while (tracks->size() < MAX_TRACKS && offset < json_tracks.size()) { ProvidedTrack new_track = ProvidedTrack_init_zero; - int64_t index = radio ? offset : alternative_index[offset]; - if (index >= json_tracks.size() || index < 0) { + int64_t index_ = radio ? offset : alternative_index[offset]; + if (index_ >= json_tracks.size() || index_ < 0) { offset++; continue; } - auto track = json_tracks.at(index); + auto track = json_tracks.at(index_); new_track.uri = createStringReferenceIfFound(track, "uri"); new_track.uid = createStringReferenceIfFound(track, "uid"); new_track.provider = strdup(provider); @@ -221,14 +271,11 @@ uint8_t PlayerContext::jsonToTracklist( strdup(metadata.second.c_str()); metadata_offset++; } - if (track.find("metadata") != track.end()) { - if (track.at("metadata").find("decision_id") != - track.at("metadata").end()) { - new_track.metadata[metadata_offset].key = strdup("decision_id"); - new_track.metadata[metadata_offset].value = - strdup(std::string(track.at("metadata").at("decision_id")).c_str()); - metadata_offset++; - } + auto json_metadata = track.find(METADATA_STRING); + if (json_metadata != track.end()) { + metadata_offset += createMetadataIfFound( + *json_metadata, "decision_id", new_track.metadata[metadata_offset]); + //metadata_offset += createMetadataIfFound(*json_metadata, "provider", new_track.metadata[metadata_offset]); new_track.metadata_count = metadata_offset; for (auto metadata : track.at("metadata").items()) { if (metadata.key() != "decision_id" && @@ -245,7 +292,7 @@ uint8_t PlayerContext::jsonToTracklist( new_track.full_metadata_count = metadata_offset; if (!radio) new_track.metadata_count = metadata_offset; - new_track.original_index = index; + new_track.original_index = index_; new_track.page = page; tracks->push_back(new_track); copiedTracks++; @@ -257,19 +304,13 @@ uint8_t PlayerContext::jsonToTracklist( new_track.uid = strdup("delimiter0"); new_track.provider = strdup("context"); new_track.removed = strdup("context/delimiter"); - metadata_map.insert(metadata_map.begin(), std::make_pair("hidden", "true")); - metadata_map.push_back( - std::make_pair("actions.skipping_next_past_track", "resume")); - metadata_map.push_back( - std::make_pair("actions.advancing_past_track", "resume")); - metadata_map.push_back(std::make_pair("iteration", "0")); - for (auto metadata : metadata_map) { - new_track.metadata[new_track.metadata_count].key = - strdup(metadata.first.c_str()); - new_track.metadata[new_track.metadata_count].value = - strdup(metadata.second.c_str()); - new_track.metadata_count++; - } + new_track.metadata[0] = {strdup("hidden"), strdup("true")}; + new_track.metadata[1] = {strdup("actions.skipping_next_past_track"), + strdup("resume")}; + new_track.metadata[2] = {strdup("actions.advancing_past_track"), + strdup("resume")}; + new_track.metadata_count = 3; + new_track.full_metadata_count = 3; tracks->push_back(new_track); } return copiedTracks; @@ -279,16 +320,17 @@ void PlayerContext::resolveTracklist( std::vector> metadata_map, void (*responseFunction)(void*), bool changed_state, bool trackIsPartOfContext) { + // MAX_TRACKS if (changed_state) { + //free next_page_url + if (next_page_url != NULL) + free(next_page_url); + next_page_url = NULL; + //new Playlist was loaded, check if there is a delimiter in tracklist and if, delete all after for (int i = 0; i < tracks->size(); i++) { if (tracks->at(i).uri && strstr(tracks->at(i).uri, "spotify:delimiter")) { - uint8_t release_offset = 0; - while (i + release_offset < tracks->size()) { - cspot::TrackReference::pbReleaseProvidedTrack( - std::addressof(tracks->at(i + release_offset))); - release_offset++; - } - tracks->erase(tracks->begin() + i, tracks->end()); + cspot::TrackReference::deleteTracksInRange(tracks, i, + tracks->size() - 1); break; } } @@ -307,26 +349,19 @@ void PlayerContext::resolveTracklist( auto responseHandler = [this, metadata_map, responseFunction, changed_state, trackIsPartOfContext]( - MercurySession::Response& res) { + MercurySession::Response res) { if (res.fail || !res.parts.size()) return; if (!res.parts[0].size()) return; auto jsonResult = nlohmann::json::parse(res.parts[0]); - std::scoped_lock lock(trackListMutex); uint8_t copy_tracks = 0; if (tracks->size()) { // remove old_tracks, keep 5 tracks in memory int remove_tracks = ((int)*index) - 5; - if (remove_tracks > 0) { - for (int i = 0; i < remove_tracks; i++) { - if (tracks->at(i).full_metadata_count > - tracks->at(i).metadata_count) - tracks->at(i).metadata_count = tracks->at(i).full_metadata_count; - pb_release(ProvidedTrack_fields, &tracks->at(i)); - } - tracks->erase(tracks->begin(), tracks->begin() + remove_tracks); - } + if (remove_tracks > 0) + cspot::TrackReference::deleteTracksInRange(tracks, 0, + remove_tracks - 1); *index = (uint8_t)(remove_tracks < 0 ? 5 + remove_tracks : 5); auto trackref = tracks->end() - 1; @@ -338,8 +373,9 @@ void PlayerContext::resolveTracklist( } //if no normal track was found, resolve radio if (strcmp(trackref->provider, "queue") == 0) - return resolveRadio(metadata_map, responseFunction); + return autoplayQuery(metadata_map, responseFunction); looking_for_playlisttrack:; + // taskYIELD(); //if last track was a smart_shuffled track if (trackref != tracks->begin()) { if (trackref->removed != NULL || @@ -396,21 +432,22 @@ void PlayerContext::resolveTracklist( createIndexBasedOnTracklist( tracks, jsonResult["pages"][i]["tracks"], playerState->options.shuffling_context, i); - if (jsonResult["pages"][i]["tracks"].at(0).find("metadata") != + if (jsonResult["pages"][i]["tracks"].at(0).find( + METADATA_STRING) != jsonResult["pages"][i]["tracks"].at(0).end() && jsonResult["pages"][i]["tracks"] .at(0) - .find("metadata") - ->find("shuffle.distribution") != + .find(METADATA_STRING) + ->find(SMART_SHUFFLE_STRING) != jsonResult["pages"][i]["tracks"] .at(0) - .find("metadata") + .find(METADATA_STRING) ->end()) { if (playerState->options.shuffling_context) { if (alternative_index[0] != offset) { - for (auto& index : alternative_index) - if (index == offset) { - index = alternative_index[0]; + for (auto& index_ : alternative_index) + if (index_ == offset) { + index_ = alternative_index[0]; alternative_index[0] = offset; break; } @@ -430,8 +467,11 @@ void PlayerContext::resolveTracklist( if (!copy_tracks) { if (this->playerState->options.repeating_context || !tracks->size()) { if (*index >= tracks->size()) { - for (int i = 0; i < tracks->size(); i++) + for (int i = 0; i < tracks->size(); i++) { cspot::TrackReference::pbReleaseProvidedTrack(&tracks->at(i)); + + // taskYIELD(); + } tracks->clear(); *index = 0; } else @@ -484,7 +524,7 @@ void PlayerContext::resolveTracklist( playerState->options.shuffling_context, false); } else - return resolveRadio(metadata_map, responseFunction); + return autoplayQuery(metadata_map, responseFunction); } responseFunction(NULL); }; @@ -492,5 +532,5 @@ void PlayerContext::resolveTracklist( responseHandler); } else - resolveRadio(metadata_map, responseFunction); + autoplayQuery(metadata_map, responseFunction); } \ No newline at end of file diff --git a/cspot/src/TrackPlayer.cpp b/cspot/src/TrackPlayer.cpp index e5d6622e..c4a73e45 100644 --- a/cspot/src/TrackPlayer.cpp +++ b/cspot/src/TrackPlayer.cpp @@ -57,11 +57,12 @@ static long vorbisTellCb(TrackPlayer* self) { TrackPlayer::TrackPlayer(std::shared_ptr ctx, std::shared_ptr trackQueue, - EOFCallback eof, TrackLoadedCallback trackLoaded) - : bell::Task("cspot_player", 56 * 1024, 5, 1) { + TrackEndedCallback onTrackEnd, + TrackChangedCallback onTrackChanged) + : bell::Task("cspot_player", 48 * 1024, 5, 1) { this->ctx = ctx; - this->eofCallback = eof; - this->trackLoaded = trackLoaded; + this->onTrackEnd = onTrackEnd; + this->onTrackChanged = onTrackChanged; this->trackQueue = trackQueue; this->playbackSemaphore = std::make_unique(5); @@ -133,14 +134,10 @@ void TrackPlayer::runTask() { bool endOfQueueReached = false; while (isRunning) { + CSPOT_LOG(error, "new Track stream"); bool properStream = true; - // Ensure we even have any tracks to play - if (!this->trackQueue->preloadedTracks.size() || - (!pendingReset && endOfQueueReached && - this->trackQueue->preloadedTracks.size() == 1)) { - this->trackQueue->playableSemaphore->twait(300); - continue; - } + this->trackQueue->playableSemaphore->wait(); + CSPOT_LOG(error, "all good with stream"); // Last track was interrupted, reset to default if (pendingReset) { @@ -166,6 +163,7 @@ void TrackPlayer::runTask() { // Reset required track = nullptr; } + CSPOT_LOG(error, "NULLPTR"); BELL_SLEEP_MS(100); continue; @@ -176,65 +174,59 @@ void TrackPlayer::runTask() { inFuture = trackOffset > 0; - if (track->state != QueuedTrack::State::READY) { - track->loadedSemaphore->twait(5000); - - if (track->state != QueuedTrack::State::READY) { - CSPOT_LOG(error, "Track failed to load, skipping it"); - this->eofCallback(false); - continue; - } + while (track->state != QueuedTrack::State::READY && + track->state != QueuedTrack::State::FAILED) { + BELL_SLEEP_MS(100); + CSPOT_LOG(error, "track in state %i", (int)track->state); + } + if (track->state == QueuedTrack::State::FAILED) { + CSPOT_LOG(error, "Track failed to load, skipping it"); + this->onTrackEnd(true); + continue; } currentSongPlaying = true; + track->trackMetrics->startTrack(); { std::scoped_lock lock(playbackMutex); bool skipped = 0; - track->trackMetrics->startTrack(); currentTrackStream = track->getAudioFile(); // Open the stream +#ifndef CONFIG_BELL_NOCODEC currentTrackStream->openStream(); - +#else + size_t start_offset = 0; + uint8_t* headerBuf = currentTrackStream->openStream(start_offset); +#endif + CSPOT_LOG(info, "opend stream"); if (pendingReset || !currentSongPlaying) { continue; } track->trackMetrics->startTrackDecoding(); track->trackMetrics->track_size = currentTrackStream->getSize(); - this->trackLoaded(track, true); + this->onTrackChanged(track, true); startPaused = false; #ifndef CONFIG_BELL_NOCODEC int32_t r = ov_open_callbacks(this, &vorbisFile, NULL, 0, vorbisCallbacks); #else - size_t start_offset = 0; - size_t write_offset = 0; - while (!start_offset) { - size_t ret = this->currentTrackStream->readBytes(&pcmBuffer[0], - pcmBuffer.size()); - size_t written = 0; - size_t toWrite = ret; - if (!ret) - continue; - while (toWrite) { - written = dataCallback(pcmBuffer.data() + (ret - toWrite), toWrite, - tracksPlayed, 0); - if (written == 0) { - BELL_SLEEP_MS(1000); - } - toWrite -= written; - } - track->written_bytes += ret; - start_offset = seekable_callback(tracksPlayed); - if (this->spaces_available(tracksPlayed) < pcmBuffer.size()) { - BELL_SLEEP_MS(50); - continue; + size_t toWrite = start_offset; + while (toWrite) { + size_t written = dataCallback(headerBuf + (start_offset - toWrite), + toWrite, tracksPlayed, 0); + if (written == 0) { + BELL_SLEEP_MS(10); } + toWrite -= written; } + + track->written_bytes += start_offset; + CSPOT_LOG(info, "start offset at %i", start_offset); float duration_lambda = 1.0 * (currentTrackStream->getSize() - start_offset) / track->trackInfo.duration; @@ -272,15 +264,15 @@ void TrackPlayer::runTask() { #ifndef CONFIG_BELL_NOCODEC VORBIS_SEEK(&vorbisFile, track->requestedPosition); #else - uint32_t seekPosition = - track->requestedPosition * duration_lambda + start_offset; + uint32_t seekPosition = track->requestedPosition * duration_lambda + + seekable_callback(tracksPlayed); currentTrackStream->seek(seekPosition); skipped = true; #endif track->trackMetrics->newPosition(pendingSeekPositionMs); // Reset the pending seek position pendingSeekPositionMs = 0; - this->trackLoaded(track, false); + this->onTrackChanged(track, false); } long ret = @@ -316,19 +308,11 @@ void TrackPlayer::runTask() { #ifdef CONFIG_BELL_NOCODEC if (skipped) { // Reset the pending seek position - skipped = 0; + skipped = false; } #endif written = dataCallback(pcmBuffer.data() + (ret - toWrite), - toWrite, tracksPlayed -#ifdef CONFIG_BELL_NOCODEC - , - skipped -#endif - ); - } - if (written == 0) { - BELL_SLEEP_MS(50); + toWrite, tracksPlayed, skipped); } toWrite -= written; } @@ -353,7 +337,7 @@ void TrackPlayer::runTask() { endOfQueueReached = true; } #ifdef CONFIG_BELL_NOCODEC - this->eofCallback(properStream); + this->onTrackEnd(true); #endif } } diff --git a/cspot/src/TrackQueue.cpp b/cspot/src/TrackQueue.cpp index b81c2b59..bd6b62b2 100644 --- a/cspot/src/TrackQueue.cpp +++ b/cspot/src/TrackQueue.cpp @@ -123,18 +123,21 @@ void TrackInfo::loadPbEpisode(Episode* pbEpisode, duration = pbEpisode->duration; } -QueuedTrack::QueuedTrack(ProvidedTrack& ref, - std::shared_ptr ctx, - int64_t requestedPosition) +QueuedTrack::QueuedTrack( + ProvidedTrack& ref, std::shared_ptr ctx, + std::shared_ptr playableSemaphore, + int64_t requestedPosition) : requestedPosition((uint32_t)requestedPosition), ctx(ctx) { trackMetrics = std::make_shared(ctx, requestedPosition); loadedSemaphore = std::make_shared(); + this->playableSemaphore = playableSemaphore; this->ref = ref; - if (ref.uid == NULL || !strstr(ref.uri, "spotify:delimiter")) { + if (!strstr(ref.uri, "spotify:delimiter")) { this->gid = base62Decode(ref.uri); state = State::QUEUED; } else { state = State::FAILED; + playableSemaphore->give(); loadedSemaphore->give(); } } @@ -142,6 +145,7 @@ QueuedTrack::QueuedTrack(ProvidedTrack& ref, QueuedTrack::~QueuedTrack() { state = State::FAILED; loadedSemaphore->give(); + //playableSemaphore->give(); if (pendingMercuryRequest != 0) { ctx->session->unregister(pendingMercuryRequest); @@ -167,7 +171,7 @@ void QueuedTrack::stepParseMetadata(Track* pbTrack, Episode* pbEpisode) { bool canPlay = false; AudioFile* selectedFiles = nullptr; - const char* countryCode = ctx->config.countryCode.c_str(); + const char* countryCode = ctx->session->getCountryCode().c_str(); if (gid.first == SpotifyFileType::TRACK) { CSPOT_LOG(info, "Track name: %s", pbTrack->name); @@ -244,10 +248,10 @@ void QueuedTrack::stepParseMetadata(Track* pbTrack, Episode* pbEpisode) { // no alternatives for song state = State::FAILED; loadedSemaphore->give(); + playableSemaphore->give(); return; } identifier = bytesToHexString(fileId); - state = State::KEY_REQUIRED; } @@ -270,7 +274,7 @@ void QueuedTrack::stepLoadAudioFile( } else { CSPOT_LOG(error, "Failed to get audio key"); state = State::FAILED; - loadedSemaphore->give(); + playableSemaphore->give(); } updateSemaphore->give(); }); @@ -318,12 +322,14 @@ void QueuedTrack::stepLoadCDNUrl(const std::string& accessKey) { state = State::FAILED; loadedSemaphore->give(); } + playableSemaphore->give(); } void QueuedTrack::expire() { - if (state != State::QUEUED) { + if (state != State::QUEUED && state != State::FAILED) { state = State::FAILED; loadedSemaphore->give(); + playableSemaphore->give(); } } @@ -337,14 +343,16 @@ void QueuedTrack::stepLoadMetadata( bytesToHexString(gid.second).c_str()); auto responseHandler = [this, pbTrack, pbEpisode, &trackListMutex, - updateSemaphore](MercurySession::Response& res) { + updateSemaphore](MercurySession::Response res) { std::scoped_lock lock(trackListMutex); if (res.parts.size() == 0) { + CSPOT_LOG(info, "Invalid Metadata"); // Invalid metadata, cannot proceed state = State::FAILED; updateSemaphore->give(); loadedSemaphore->give(); + playableSemaphore->give(); return; } @@ -395,16 +403,14 @@ void TrackQueue::runTask() { std::deque> trackQueue; while (isRunning) { - processSemaphore->twait(100); + if (processSemaphore->twait(200)) { + if (!preloadedTracks.size()) + continue; + } // Make sure we have the newest access key accessKey = accessKeyFetcher->getAccessKey(); - - // No tracks loaded yet - if (!preloadedTracks.size()) { - BELL_SLEEP_MS(50); - continue; - } else { + { std::scoped_lock lock(tracksMutex); trackQueue = preloadedTracks; @@ -414,6 +420,9 @@ void TrackQueue::runTask() { std::scoped_lock lock(tracksMutex); if (track) { this->processTrack(track); + if (track->state != QueuedTrack::State::FAILED && + track->state != QueuedTrack::State::READY) + break; } } } diff --git a/cspot/src/TrackReference.cpp b/cspot/src/TrackReference.cpp index e83e55b3..31edfccd 100644 --- a/cspot/src/TrackReference.cpp +++ b/cspot/src/TrackReference.cpp @@ -9,17 +9,6 @@ static std::string empty_string = ""; TrackReference::TrackReference() : type(Type::TRACK) {} -void TrackReference::decodeURI() { - if (gid.size() == 0) { - // Episode GID is being fetched via base62 encoded URI - gid = base62Decode(uri).second; - - if (uri.find("episode:") != std::string::npos) { - type = Type::EPISODE; - } - } -} - bool TrackReference::operator==(const TrackReference& other) const { return other.gid == gid && other.uri == uri; } diff --git a/targets/cli/CliPlayer.cpp b/targets/cli/CliPlayer.cpp index 38bd39e4..75761eec 100644 --- a/targets/cli/CliPlayer.cpp +++ b/targets/cli/CliPlayer.cpp @@ -32,14 +32,11 @@ CliPlayer::CliPlayer(std::unique_ptr sink, this->dsp = std::make_shared(this->centralAudioBuffer); #endif - this->handler->trackPlayer->setDataCallback([this](uint8_t* data, - size_t bytes, -#ifdef CONFIG_BELL_NOCODEC - bool STORAGE_VOLATILE, -#endif - size_t trackId) { - return this->centralAudioBuffer->writePCM(data, bytes, trackId); - }); + this->handler->trackPlayer->setDataCallback( + [this](uint8_t* data, size_t bytes, size_t trackId, + bool STORAGE_VOLATILE) { + return this->centralAudioBuffer->writePCM(data, bytes, trackId); + }); this->isPaused = false; @@ -132,6 +129,7 @@ void CliPlayer::runTask() { tracks.at(0)->trackMetrics->endTrack(); this->handler->ctx->playbackMetrics->sendEvent(tracks[0]); tracks.clear(); + this->handler->putPlayerState(); } lastHash = 0; } @@ -143,7 +141,7 @@ void CliPlayer::runTask() { tracks.at(0)->trackMetrics->endTrack(); this->handler->ctx->playbackMetrics->sendEvent(tracks[0]); tracks.pop_front(); - this->handler->trackPlayer->eofCallback(true); + this->handler->trackPlayer->onTrackEnd(true); } lastHash = chunk->trackHash; tracks.at(0)->trackMetrics->startTrackPlaying( diff --git a/targets/esp32/CMakeLists.txt b/targets/esp32/CMakeLists.txt index fe3cb34f..c20ea7d9 100644 --- a/targets/esp32/CMakeLists.txt +++ b/targets/esp32/CMakeLists.txt @@ -1,13 +1,13 @@ -# The following lines of boilerplate have to be in your project's -# CMakeLists in this exact order for cmake to work correctly +#The following lines of boilerplate have to be in your project's +#CMakeLists in this exact order for cmake to work correctly cmake_minimum_required(VERSION 3.5) -# Don't override EXTRA_COMPONENT_DIRS as platformio uses it. Instead we append -# see https://github.com/platformio/platform-espressif32/issues/341 -list(APPEND EXTRA_COMPONENT_DIRS $ENV{IDF_PATH}/examples/common_components/protocol_examples_common) -list(APPEND EXTRA_COMPONENT_DIRS $ENV{IDF_PATH}/examples/common_components/led_strip) - -if(NOT IDF_NO_INCLUDE) - include($ENV{IDF_PATH}/tools/cmake/project.cmake) -endif() -project(cspot-esp32) \ No newline at end of file +#Don't override EXTRA_COMPONENT_DIRS as platformio uses it. Instead we append +#see https: //github.com/platformio/platform-espressif32/issues/341 + list(APPEND EXTRA_COMPONENT_DIRS $ENV{IDF_PATH} / examples / + common_components / protocol_examples_common) +#list(APPEND EXTRA_COMPONENT_DIRS $ENV{IDF_PATH} / examples / \ + common_components / led_strip) + if (NOT IDF_NO_INCLUDE) + include($ENV{IDF_PATH} / tools / cmake / project.cmake) endif() + project(cspot - esp32) \ No newline at end of file diff --git a/targets/esp32/components/VS1053/CMakeLists.txt b/targets/esp32/components/VS1053/CMakeLists.txt index 2d42d503..57c35f1b 100644 --- a/targets/esp32/components/VS1053/CMakeLists.txt +++ b/targets/esp32/components/VS1053/CMakeLists.txt @@ -1,4 +1,8 @@ -set(CMAKE_CXX_STANDARD 17) - idf_component_register(SRCS "src/VS1053.cpp" INCLUDE_DIRS - "include" - "src" PRIV_REQUIRES fatfs) +set(CMAKE_CXX_STANDARD 17) idf_component_register( + SRCS + "src/VS1053.cpp" + "../../../../cspot/bell/main/utilities/BellLogger.cpp" + "../../../../cspot/bell/main/platform/esp/WrappedSemaphore.cpp" INCLUDE_DIRS + "include" + "../../../../cspot/bell/main/utilities/include" + "../../../../cspot/bell/main/platform/" PRIV_REQUIRES fatfs) diff --git a/targets/esp32/components/VS1053/include/VS1053.h b/targets/esp32/components/VS1053/include/VS1053.h index 78acd468..41f59075 100644 --- a/targets/esp32/components/VS1053/include/VS1053.h +++ b/targets/esp32/components/VS1053/include/VS1053.h @@ -1,6 +1,7 @@ #ifndef VS1053_H #define VS1053_H +#include #include //for memset #include //for dequeue #include //for function @@ -38,7 +39,7 @@ #define VERSION 1 #define VS1053_CHUNK_SIZE 16 // chunck size -#define VS1053_PACKET_SIZE 8 +#define VS1053_PACKET_SIZE 32 #define BUF_SIZE_CMD 1028 #define BUF_SIZE_FEED 4096 * 4 @@ -79,7 +80,9 @@ class VS1053_TRACK { } state = tsPlaybackStart; size_t header_size = 0; size_t track_id; + StaticStreamBuffer_t xStaticStreamBuffer; StreamBufferHandle_t dataBuffer; + uint8_t* ucBufferStorage; private: }; diff --git a/targets/esp32/components/VS1053/src/VS1053.cpp b/targets/esp32/components/VS1053/src/VS1053.cpp index 1d86ebf8..cf663fa2 100644 --- a/targets/esp32/components/VS1053/src/VS1053.cpp +++ b/targets/esp32/components/VS1053/src/VS1053.cpp @@ -1,7 +1,7 @@ #include "VS1053.h" #include -#include "esp_log.h" +#include "BellLogger.h" static const char* TAG = "VS_SINK"; void vs_feed(void* sink) { @@ -39,7 +39,7 @@ const char* afName[] = { VS1053_SINK::VS1053_SINK() { // PIN CONFIG // DREQ - ESP_LOGI(TAG, "VS1053_DREQ=%d", CONFIG_GPIO_VS_DREQ); + BELL_LOG(info, TAG, "VS1053_DREQ=%d", CONFIG_GPIO_VS_DREQ); isRunning = true; gpio_config_t gpio_conf; gpio_conf.mode = GPIO_MODE_INPUT; @@ -49,17 +49,17 @@ VS1053_SINK::VS1053_SINK() { gpio_conf.pin_bit_mask = ((uint64_t)(((uint64_t)1) << CONFIG_GPIO_VS_DREQ)); ESP_ERROR_CHECK(gpio_config(&gpio_conf)); // CS - ESP_LOGI(TAG, "VS1053_CS=%d", CONFIG_GPIO_VS_CS); + BELL_LOG(info, TAG, "VS1053_CS=%d", CONFIG_GPIO_VS_CS); gpio_reset_pin((gpio_num_t)CONFIG_GPIO_VS_CS); gpio_set_direction((gpio_num_t)CONFIG_GPIO_VS_CS, GPIO_MODE_OUTPUT); gpio_set_level((gpio_num_t)CONFIG_GPIO_VS_CS, 1); // DCS - ESP_LOGI(TAG, "VS1053_DCS=%d", CONFIG_GPIO_VS_DCS); + BELL_LOG(info, TAG, "VS1053_DCS=%d", CONFIG_GPIO_VS_DCS); gpio_reset_pin((gpio_num_t)CONFIG_GPIO_VS_DCS); gpio_set_direction((gpio_num_t)CONFIG_GPIO_VS_DCS, GPIO_MODE_OUTPUT); gpio_set_level((gpio_num_t)CONFIG_GPIO_VS_DCS, 1); // RESET - ESP_LOGI(TAG, "VS1053_RESET=%d", CONFIG_GPIO_VS_RESET); + BELL_LOG(info, TAG, "VS1053_RESET=%d", CONFIG_GPIO_VS_RESET); if (CONFIG_GPIO_VS_RESET >= 0) { gpio_reset_pin((gpio_num_t)CONFIG_GPIO_VS_RESET); gpio_set_direction((gpio_num_t)CONFIG_GPIO_VS_RESET, GPIO_MODE_OUTPUT); @@ -77,7 +77,7 @@ esp_err_t VS1053_SINK::init(spi_host_device_t SPI, uint32_t freq = spi_get_actual_clock(APB_CLK_FREQ, 1400000, 128); spi_device_interface_config_t devcfg; memset(&devcfg, 0, sizeof(spi_device_interface_config_t)); - ESP_LOGI(TAG, "VS1053 LOWFreq: %d", freq); + BELL_LOG(info, TAG, "VS1053 LOWFreq: %lu", freq); devcfg.clock_speed_hz = freq, devcfg.command_bits = 8, devcfg.address_bits = 8, devcfg.dummy_bits = 0, devcfg.duty_cycle_pos = 0, @@ -87,9 +87,10 @@ esp_err_t VS1053_SINK::init(spi_host_device_t SPI, devcfg.pre_cb = NULL, // Specify pre-transfer callback to handle D/C line devcfg.post_cb = NULL; - ESP_LOGI(TAG, "spi device interface config done, VERSION : %i", VERSION); + BELL_LOG(info, TAG, "spi device interface config done, VERSION : %i", + VERSION); ret = spi_bus_add_device(SPI, &devcfg, &this->SPIHandleLow); - ESP_LOGI(TAG, "spi_bus_add_device=%d", ret); + BELL_LOG(info, TAG, "spi_bus_add_device=%d", ret); assert(ret == ESP_OK); // SPI TEST SLOW-/HIGH-SPEED vTaskDelay(20 / portTICK_PERIOD_MS); @@ -103,11 +104,11 @@ esp_err_t VS1053_SINK::init(spi_host_device_t SPI, SCI_CLOCKF, HZ_TO_SC_FREQ(12288000) | SC_MULT_53_45X | SC_ADD_53_00X); // Normal clock settings multiplyer 3.0 = 12.2 MHz - ESP_LOGI(TAG, "VS1053 HighFreq: %d", freq); + BELL_LOG(info, TAG, "VS1053 HighFreq: %lu", freq); devcfg.clock_speed_hz = freq, devcfg.command_bits = 0, devcfg.address_bits = 0, devcfg.spics_io_num = CONFIG_GPIO_VS_DCS; ret = spi_bus_add_device(SPI, &devcfg, &this->SPIHandleFast); - ESP_LOGI(TAG, "spi_bus_add_device=%d", ret); + BELL_LOG(info, TAG, "spi_bus_add_device=%d", ret); assert(ret == ESP_OK); ret = test_comm("Slow SPI,Testing VS1053 read/write registers...\n"); if (ret != ESP_OK) @@ -121,7 +122,7 @@ esp_err_t VS1053_SINK::init(spi_host_device_t SPI, load_user_code(PLUGIN, PLUGIN_SIZE); #endif vTaskDelay(100 / portTICK_PERIOD_MS); - xTaskCreate(vs_feed, "track_feed", 1028 * 20, (void*)this, 1, &task_handle); + xTaskCreate(vs_feed, "track_feed", 4098 * 4, (void*)this, 1, &task_handle); //xTaskCreatePinnedToCore(vs_feed, "track_feed", 1028 * 20, (void*)this, 1, &task_handle, 1); return ESP_OK; } @@ -136,13 +137,20 @@ VS1053_SINK::~VS1053_SINK() { // LOOP VS1053_TRACK::VS1053_TRACK(size_t track_id, size_t buffer_size) { this->track_id = track_id; - this->dataBuffer = xStreamBufferCreate(buffer_size, 1); + ucBufferStorage = (uint8_t*)malloc(buffer_size); + this->dataBuffer = xStreamBufferCreateStatic(buffer_size, 1, ucBufferStorage, + &xStaticStreamBuffer); // this->run_track(); + + if (dataBuffer == NULL) { + BELL_LOG(error, TAG, "not enough heap memory\n"); + /* There was not enough heap memory space available to create the + stream buffer. */ + } } VS1053_TRACK::~VS1053_TRACK() { - if (dataBuffer != NULL) - vStreamBufferDelete(dataBuffer); - dataBuffer = NULL; + vStreamBufferDelete(dataBuffer); + free(ucBufferStorage); } void VS1053_SINK::new_track(std::shared_ptr track) { @@ -208,7 +216,7 @@ size_t VS1053_SINK::get_track_info(size_t pos, uint8_t& endFillByte, if (audioFormat == afFlac) byteRate *= 4; - ESP_LOGI(TAG, + BELL_LOG(info, TAG, "Track %i, " "%dKiB " "%1ds %1.1f" @@ -225,14 +233,11 @@ size_t VS1053_SINK::get_track_info(size_t pos, uint8_t& endFillByte, size_t VS1053_TRACK::feed_data(uint8_t* data, size_t len, bool STORAGE_VOLATILE) { - if (this->dataBuffer == NULL || !len || - xStreamBufferSpacesAvailable(this->dataBuffer) < len) - return 0; if (STORAGE_VOLATILE) if (this->header_size) xStreamBufferReset(this->dataBuffer); size_t res = - xStreamBufferSend(this->dataBuffer, (void*)data, len, pdMS_TO_TICKS(100)); + xStreamBufferSend(this->dataBuffer, (void*)data, len, pdMS_TO_TICKS(30)); return res; } size_t VS1053_SINK::spaces_available(size_t track_id) { @@ -249,15 +254,15 @@ void VS1053_TRACK::empty_feed() { void VS1053_SINK::new_state(VS1053_TRACK::VS_TRACK_STATE& state, VS1053_TRACK::VS_TRACK_STATE new_state) { state = new_state; - ESP_LOGI(TAG, "New state %i", new_state); + BELL_LOG(info, TAG, "New state %i", new_state); if (state_callback != NULL) { state_callback((uint8_t)new_state); } } void VS1053_SINK::run_feed(size_t FILL_BUFFER_BEFORE_PLAYBACK) { + uint8_t* item = (uint8_t*)malloc(VS1053_PACKET_SIZE); while (isRunning) { - uint8_t* item = (uint8_t*)malloc(VS1053_PACKET_SIZE); if (tracks.size()) { size_t itemSize = 0; uint32_t pos = 0; @@ -289,8 +294,9 @@ void VS1053_SINK::run_feed(size_t FILL_BUFFER_BEFORE_PLAYBACK) { case VS1053_TRACK::VS_TRACK_STATE::tsPlaybackSeekable: tsPlaybackSeekable: - itemSize = xStreamBufferReceive(track->dataBuffer, (void*)item, - VS1053_PACKET_SIZE, 10); + itemSize = + xStreamBufferReceive(track->dataBuffer, (void*)item, + VS1053_PACKET_SIZE, pdMS_TO_TICKS(30)); if (itemSize) { this->sdi_send_buffer(item, itemSize); pos++; @@ -303,7 +309,6 @@ void VS1053_SINK::run_feed(size_t FILL_BUFFER_BEFORE_PLAYBACK) { VS1053_TRACK::VS_TRACK_STATE::tsCancel); [[fallthrough]]; case VS1053_TRACK::VS_TRACK_STATE::tsCancel: - free(item); track->empty_feed(); this->cancel_track(&track->state); [[fallthrough]]; @@ -318,9 +323,6 @@ void VS1053_SINK::run_feed(size_t FILL_BUFFER_BEFORE_PLAYBACK) { nextReportPos += this->get_track_info(pos, endFillByte, endFillBytes); } } - if (track->dataBuffer != NULL) - vStreamBufferDelete(track->dataBuffer); - track->dataBuffer = NULL; tracks.pop_front(); } vTaskDelay(50 / portTICK_PERIOD_MS); @@ -447,20 +449,20 @@ esp_err_t VS1053_SINK::test_comm(const char* header) { write_register(SCI_AICTRL2, 0x7E57); if (read_register(SCI_AICTRL1) != 0xABAD || read_register(SCI_AICTRL2) != 0x7E57) { - ESP_LOGI(TAG, "There is something wrong with VS10xx SCI registers\n"); + BELL_LOG(info, TAG, "There is something wrong with VS10xx SCI registers\n"); return ESP_ERR_INVALID_RESPONSE; } write_register(SCI_AICTRL1, 0); write_register(SCI_AICTRL2, 0); uint16_t ssVer = ((read_register(SCI_STATUS) >> 4) & 15); if (chipNumber[ssVer]) { - ESP_LOGI(TAG, "Chip is VS%d\n", chipNumber[ssVer]); + BELL_LOG(info, TAG, "Chip is VS%d\n", chipNumber[ssVer]); if (chipNumber[ssVer] != 1053) { - ESP_LOGI(TAG, "Incorrect chip\n"); + BELL_LOG(info, TAG, "Incorrect chip\n"); return ESP_ERR_NOT_SUPPORTED; } } else { - ESP_LOGI(TAG, "Unknown VS10xx SCI_MODE field SS_VER = %d\n", ssVer); + BELL_LOG(info, TAG, "Unknown VS10xx SCI_MODE field SS_VER = %d\n", ssVer); return ESP_ERR_NOT_FOUND; } @@ -471,7 +473,7 @@ esp_err_t VS1053_SINK::test_comm(const char* header) { void VS1053_SINK::load_user_code(const unsigned short* plugin, uint16_t sizeofpatch) { - ESP_LOGI(TAG, "Loading patch"); + BELL_LOG(info, TAG, "Loading patch"); await_data_request(); int i = 0; while (i < sizeofpatch) { @@ -497,7 +499,7 @@ void VS1053_SINK::load_user_code(const unsigned short* plugin, void VS1053_SINK::await_data_request() { while (!gpio_get_level((gpio_num_t)CONFIG_GPIO_VS_DREQ)) - vTaskDelay(1); + taskYIELD(); } // WRITE/READ FUNCTIONS @@ -511,8 +513,8 @@ uint16_t VS1053_SINK::read_register(uint8_t _reg) { SPITransaction.cmd = VS_READ_COMMAND; SPITransaction.addr = _reg; if (SPI_semaphore != NULL) - while (xSemaphoreTake(*SPI_semaphore, 1) != pdTRUE) - vTaskDelay(1); + if (xSemaphoreTake(*SPI_semaphore, portMAX_DELAY) != pdTRUE) + return 0; ret = spi_device_transmit(this->SPIHandleLow, &SPITransaction); assert(ret == ESP_OK); uint16_t result = (((SPITransaction.rx_data[0] & 0xFF) << 8) | @@ -536,8 +538,8 @@ bool VS1053_SINK::write_register(uint8_t _reg, uint16_t _value) { SPITransaction.tx_data[1] = (_value & 0xFF); SPITransaction.length = 16; if (SPI_semaphore != NULL) - while (xSemaphoreTake(*SPI_semaphore, 1) != pdTRUE) - vTaskDelay(1); + if (xSemaphoreTake(*SPI_semaphore, portMAX_DELAY) != pdTRUE) + return false; ret = spi_device_transmit(this->SPIHandleLow, &SPITransaction); assert(ret == ESP_OK); await_data_request(); // Wait for DREQ to be HIGH again @@ -597,8 +599,8 @@ bool VS1053_SINK::sdi_send_buffer(uint8_t* data, size_t len) { spi_transaction_t SPITransaction; esp_err_t ret; if (SPI_semaphore != NULL) - while (xSemaphoreTake(*SPI_semaphore, 1) != pdTRUE) - vTaskDelay(1); + if (xSemaphoreTake(*SPI_semaphore, portMAX_DELAY) != pdTRUE) + return false; while (len) // More to do? { await_data_request(); // Wait for space available @@ -630,8 +632,8 @@ bool VS1053_SINK::sdi_send_fillers(uint8_t endFillByte, size_t len) { for (int i = 0; i < VS1053_CHUNK_SIZE; i++) data[i] = endFillByte; if (SPI_semaphore != NULL) - while (xSemaphoreTake(*SPI_semaphore, 1) != pdTRUE) - vTaskDelay(1); + if (xSemaphoreTake(*SPI_semaphore, portMAX_DELAY) != pdTRUE) + return false; while (len) // More to do? { await_data_request(); // Wait for space available diff --git a/targets/esp32/main/CMakeLists.txt b/targets/esp32/main/CMakeLists.txt index 2f93f16c..c708cbea 100644 --- a/targets/esp32/main/CMakeLists.txt +++ b/targets/esp32/main/CMakeLists.txt @@ -1,22 +1,25 @@ cmake_minimum_required(VERSION 3.5) -# Main target sources -file(GLOB SOURCES "*.cpp" "*.c") +#Main target sources + file(GLOB SOURCES + "*.cpp" + "*.c") -# Configure the target -idf_component_register( - SRCS ${SOURCES} - INCLUDE_DIRS "." -) -idf_build_set_property(COMPILE_OPTIONS "-fdiagnostics-color=always" APPEND) +#Configure the target + idf_component_register(SRCS ${SOURCES} INCLUDE_DIRS + "." REQUIRES mdns spiffs esp_wifi nvs_flash + protocol_examples_common VS1053 fatfs) + idf_build_set_property(COMPILE_OPTIONS + "-fdiagnostics-color=always" APPEND) -# Build static library, do not build test executables -option(BUILD_SHARED_LIBS OFF) -option(BUILD_TESTING OFF) +#Build static library, do not build test executables + option(BUILD_SHARED_LIBS OFF) option(BUILD_TESTING OFF) -# Import cspot library -add_subdirectory("../../../cspot" ${CMAKE_CURRENT_BINARY_DIR}/cspot) +#Import cspot library + add_subdirectory( + "../../../cspot" ${CMAKE_CURRENT_BINARY_DIR} / cspot) -# Configure the target -target_link_libraries(${COMPONENT_LIB} PUBLIC cspot) -target_compile_options(${COMPONENT_LIB} PRIVATE -std=gnu++17) +#Configure the target + target_link_libraries(${COMPONENT_LIB} PUBLIC cspot) + target_compile_options(${COMPONENT_LIB} PRIVATE - + std = gnu++ 17) diff --git a/targets/esp32/main/EspPlayer.cpp b/targets/esp32/main/EspPlayer.cpp index 7f957e88..365743f1 100644 --- a/targets/esp32/main/EspPlayer.cpp +++ b/targets/esp32/main/EspPlayer.cpp @@ -26,15 +26,12 @@ EspPlayer::EspPlayer(std::unique_ptr sink, this->circularBuffer = std::make_shared(1024 * 128); - this->handler->trackPlayer->setDataCallback([this](uint8_t* data, - size_t bytes, -#ifdef CONFIG_BELL_NOCODEC - bool STORAGE_VOLATILE, -#endif - size_t trackId) { - this->feedData(data, bytes, trackId); - return bytes; - }); + this->handler->trackPlayer->setDataCallback( + [this](uint8_t* data, size_t bytes, size_t trackId, + bool STORAGE_VOLATILE) { + this->feedData(data, bytes, trackId); + return bytes; + }); this->isPaused = false; @@ -150,7 +147,7 @@ void EspPlayer::runTask() { tracks.at(0)->trackMetrics->endTrack(); this->handler->ctx->playbackMetrics->sendEvent(tracks[0]); tracks.pop_front(); - this->handler->trackPlayer->eofCallback(true); + this->handler->trackPlayer->onTrackEnd(true); } lastHash = current_hash; tracks.at(0)->trackMetrics->startTrackPlaying( diff --git a/targets/esp32/main/VSPlayer.cpp b/targets/esp32/main/VSPlayer.cpp index 90db043e..b0a8f57b 100644 --- a/targets/esp32/main/VSPlayer.cpp +++ b/targets/esp32/main/VSPlayer.cpp @@ -1,10 +1,11 @@ #include "VSPlayer.h" -#include // for uint8_t -#include // for operator<<, basic_ostream, endl, cout -#include // for shared_ptr, make_shared, make_unique -#include // for scoped_lock -#include // for get +#include // for BELL_LOG +#include // for uint8_t +#include // for operator<<, basic_ostream, endl, cout +#include // for shared_ptr, make_shared, make_unique +#include // for scoped_lock +#include // for get #include "TrackPlayer.h" // for TrackPlayer @@ -22,11 +23,20 @@ VSPlayer::VSPlayer(std::shared_ptr handler, if (!this->track) { this->track = std::make_shared(trackId, 4098 * 16); this->vsSink->new_track(this->track); + BELL_LOG(error, "VSPlayer", "New track_id (%d)", trackId); } if (trackId != this->track->track_id) { this->vsSink->soft_stop_feed(); this->track = std::make_shared(trackId, 4098 * 16); this->vsSink->new_track(this->track); + BELL_LOG(error, "VSPlayer", "New track_id (%d)", trackId); + } + if (this->vsSink->tracks[0]->track_id != trackId && + (this->vsSink->tracks[0]->state < VS1053_TRACK::tsSoftCancel)) { + this->vsSink->soft_stop_feed(); + BELL_LOG(error, "VSPlayer", + "VSSink track_id (%d) is different from VSPlayer(%d)", + this->vsSink->tracks[0]->track_id, trackId); } return this->track->feed_data(data, bytes, STORAGE_VOLATILE); }, @@ -42,12 +52,12 @@ VSPlayer::VSPlayer(std::shared_ptr handler, switch (event.commandType) { case cspot::DeviceStateHandler::CommandType::PAUSE: if (this->track) - this->vsSink->new_state(this->vsSink->tracks[0]->state, + this->vsSink->new_state(this->track->state, VS1053_TRACK::tsPlaybackPaused); break; case cspot::DeviceStateHandler::CommandType::PLAY: if (this->track) - this->vsSink->new_state(this->vsSink->tracks[0]->state, + this->vsSink->new_state(this->track->state, VS1053_TRACK::tsPlaybackSeekable); break; case cspot::DeviceStateHandler::CommandType::DISC: @@ -63,6 +73,13 @@ VSPlayer::VSPlayer(std::shared_ptr handler, //case cspot::DeviceStateHandler::CommandType::SEEK: //break; case cspot::DeviceStateHandler::CommandType::PLAYBACK_START: + if (this->track != nullptr) { + this->track = nullptr; + if (this->currentTrack != nullptr) { + this->vsSink->delete_all_tracks(); + this->futureTrack = nullptr; + } + } break; case cspot::DeviceStateHandler::CommandType::PLAYBACK: this->isPaused = true; @@ -84,6 +101,7 @@ VSPlayer::VSPlayer(std::shared_ptr handler, this->vsSink->feed_command([this](uint8_t) { this->vsSink->set_volume_logarithmic(this->volume); }); + this->handler->putDeviceState(PutStateReason_PLAYER_STATE_CHANGED); break; } default: diff --git a/targets/esp32/main/VSPlayer.h b/targets/esp32/main/VSPlayer.h index 8dd27bcc..f1cad8bb 100644 --- a/targets/esp32/main/VSPlayer.h +++ b/targets/esp32/main/VSPlayer.h @@ -21,6 +21,7 @@ class VSPlayer { size_t volume = 0; private: + size_t trackId = 0; std::string currentTrackId; std::shared_ptr vsSink; std::shared_ptr handler; diff --git a/targets/esp32/main/VSinit.h b/targets/esp32/main/VSinit.h index 1b08bd01..0ed99e10 100644 --- a/targets/esp32/main/VSinit.h +++ b/targets/esp32/main/VSinit.h @@ -1,17 +1,18 @@ -#include #include -#include #include #include #include "esp_log.h" -#include "esp_vfs_fat.h" -#include "sdmmc_cmd.h" #include "VS1053.h" +#if defined(SD_IN_USE) +#include +#include +#include "esp_vfs_fat.h" +#include "sdmmc_cmd.h" #define MOUNT_POINT "/sdcard" - +#endif SemaphoreHandle_t SPI_semaphore; #define TAG "INIT" void initAudioSink(std::shared_ptr VS1053) { @@ -31,6 +32,7 @@ void initAudioSink(std::shared_ptr VS1053) { // VS1053 SETUP VS1053->init(HSPI_HOST, &SPI_semaphore); +#if defined(SD_IN_USE) if (CONFIG_GPIO_SD_CS >= 0) { sdspi_device_config_t sd_device = SDSPI_DEVICE_CONFIG_DEFAULT(); sd_device.gpio_cs = (gpio_num_t)CONFIG_GPIO_SD_CS; @@ -99,7 +101,7 @@ void initAudioSink(std::shared_ptr VS1053) { printf("%s\n", n_name); printf("name:%s, ino: %i\n", entry->d_name, entry->d_ino); printf("stat_return_value:%i\n", stat(n_name, &file_stat)); - printf("stat_mode:%i\n", file_stat.st_mode); + printf("stat_mode:%lu\n", file_stat.st_mode); } dir = opendir(MOUNT_POINT "/TRACKS"); if (!dir) @@ -110,5 +112,6 @@ void initAudioSink(std::shared_ptr VS1053) { closedir(dir); } } +#endif VS1053->write_register(SCI_VOL, 10 | 10 << 8); } \ No newline at end of file diff --git a/targets/esp32/main/main.cpp b/targets/esp32/main/main.cpp index 8f37e3aa..af2c305e 100644 --- a/targets/esp32/main/main.cpp +++ b/targets/esp32/main/main.cpp @@ -9,12 +9,14 @@ #include "BellHTTPServer.h" #include "BellLogger.h" // for setDefaultLogger, AbstractLogger #include "BellTask.h" +#include "WrappedSemaphore.h" #include "civetweb.h" #include "esp_event.h" #include "esp_spiffs.h" #include "esp_system.h" #include "esp_wifi.h" #include "freertos/FreeRTOS.h" +#include "freertos/event_groups.h" #include "freertos/task.h" #include "mdns.h" #include "nvs_flash.h" @@ -25,19 +27,34 @@ #include #include -#include "BellTask.h" -#include "CircularBuffer.h" #include "BellUtils.h" #include "Logger.h" #include "esp_log.h" +#include "lwip/err.h" +#include "lwip/sys.h" + +/* FreeRTOS event group to signal when we are connected*/ +static EventGroupHandle_t s_wifi_event_group; + +/* The event group allows multiple bits for each event, but we only care about two events: + * - we are connected to the AP with an IP + * - we failed to connect after the maximum amount of retries */ +#define WIFI_CONNECTED_BIT BIT0 +#define WIFI_FAIL_BIT BIT1 + +#define EXAMPLE_ESP_WIFI_SSID CONFIG_EXAMPLE_WIFI_SSID +#define EXAMPLE_ESP_WIFI_PASS CONFIG_EXAMPLE_WIFI_PASSWORD +#define WIFI_AP_MAXIMUM_RETRY 5 + #define DEVICE_NAME CONFIG_CSPOT_DEVICE_NAME #ifdef CONFIG_BELL_NOCODEC #include "VSPlayer.h" #include "VSinit.h" #else +#define TAG "INIT" #include "EspPlayer.h" #ifdef CONFIG_CSPOT_SINK_INTERNAL #include @@ -63,6 +80,8 @@ extern "C" { void app_main(void); } +static int s_retry_num = 0; + class ZeroconfAuthenticator { public: ZeroconfAuthenticator(){}; @@ -174,7 +193,7 @@ class CSpotTask : public bell::Task { audioSink->volumeChanged(160); #endif - auto loggedInSemaphore = std::make_shared(1); + auto loggedInSemaphore = std::make_shared(); auto zeroconfServer = std::make_unique(); std::atomic isRunning = true; @@ -198,38 +217,62 @@ class CSpotTask : public bell::Task { #endif loggedInSemaphore->wait(); auto ctx = cspot::Context::createFromBlob(loginBlob); - ctx->session->connectWithRandomAp(); - ctx->config.authData = ctx->session->authenticate(loginBlob); - if (ctx->config.authData.size() > 0) { - // when credentials file is set, then store reusable credentials + CSPOTConnecting:; + try { + ctx->session->connectWithRandomAp(); + ctx->config.authData = ctx->session->authenticate(loginBlob); + if (ctx->config.authData.size() > 0) { + // when credentials file is set, then store reusable credentials - // Start device handler task - auto handler = std::make_shared(ctx); + // Start device handler task + auto handler = std::make_shared(ctx); - // Start handling mercury messages - ctx->session->startTask(); + // Start handling mercury messages + ctx->session->startTask(); - // Create a player, pass the handler + // Create a player, pass the handler #ifndef CONFIG_BELL_NOCODEC - auto player = - std::make_shared(std::move(audioSink), std::move(handler)); + auto player = std::make_shared(std::move(audioSink), + std::move(handler)); #else - auto player = - std::make_shared(std::move(handler), std::move(audioSink)); + auto player = std::make_shared(std::move(handler), + std::move(audioSink)); #endif - // If we wanted to handle multiple devices, we would halt this loop - // when a new zeroconf login is requested, and reinitialize the session - while (isRunning) { - ctx->session->handlePacket(); - } + // If we wanted to handle multiple devices, we would halt this loop + // when a new zeroconf login is requested, and reinitialize the session + uint8_t taskCount = 0; + while (isRunning) { + ctx->session->handlePacket(); + } - // Never happens, but required for above case - handler->disconnect(); - player->disconnect(); + // Never happens, but required for above case + handler->disconnect(); + player->disconnect(); + } + } catch (std::exception& e) { + std::cout << "Error while connecting " << e.what() << std::endl; + goto CSPOTConnecting; } } }; +static void event_handler(void* arg, esp_event_base_t event_base, + int32_t event_id, void* event_data) { + if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_START) { + esp_wifi_connect(); + } else if (event_base == WIFI_EVENT && + event_id == WIFI_EVENT_STA_DISCONNECTED) { + esp_wifi_connect(); + s_retry_num++; + ESP_LOGI(TAG, "retry to connect to the AP"); + } else if (event_base == IP_EVENT && event_id == IP_EVENT_STA_GOT_IP) { + ip_event_got_ip_t* event = (ip_event_got_ip_t*)event_data; + ESP_LOGI(TAG, "got ip:" IPSTR, IP2STR(&event->ip_info.ip)); + s_retry_num = 0; + xEventGroupSetBits(s_wifi_event_group, WIFI_CONNECTED_BIT); + } +} + void init_spiffs() { esp_vfs_spiffs_conf_t conf = {.base_path = "/spiffs", .partition_label = NULL, @@ -260,6 +303,55 @@ void init_spiffs() { } } +void wifi_init_sta(void) { + s_wifi_event_group = xEventGroupCreate(); + + ESP_ERROR_CHECK(esp_netif_init()); + + ESP_ERROR_CHECK(esp_event_loop_create_default()); + esp_netif_create_default_wifi_sta(); + + wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT(); + ESP_ERROR_CHECK(esp_wifi_init(&cfg)); + + esp_event_handler_instance_t instance_any_id; + esp_event_handler_instance_t instance_got_ip; + ESP_ERROR_CHECK(esp_event_handler_instance_register( + WIFI_EVENT, ESP_EVENT_ANY_ID, &event_handler, NULL, &instance_any_id)); + ESP_ERROR_CHECK(esp_event_handler_instance_register( + IP_EVENT, IP_EVENT_STA_GOT_IP, &event_handler, NULL, &instance_got_ip)); + wifi_config_t wifi_config = { + .sta = + { + .ssid = EXAMPLE_ESP_WIFI_SSID, + .password = EXAMPLE_ESP_WIFI_PASS, + }, + }; + ESP_ERROR_CHECK(esp_wifi_set_mode(WIFI_MODE_STA)); + ESP_ERROR_CHECK(esp_wifi_set_config(WIFI_IF_STA, &wifi_config)); + ESP_ERROR_CHECK(esp_wifi_start()); + + ESP_LOGI(TAG, "wifi_init_sta finished."); + + /* Waiting until either the connection is established (WIFI_CONNECTED_BIT) or connection failed for the maximum + * number of re-tries (WIFI_FAIL_BIT). The bits are set by event_handler() (see above) */ + EventBits_t bits = xEventGroupWaitBits(s_wifi_event_group, + WIFI_CONNECTED_BIT | WIFI_FAIL_BIT, + pdFALSE, pdFALSE, portMAX_DELAY); + + /* xEventGroupWaitBits() returns the bits before the call returned, hence we can test which event actually + * happened. */ + if (bits & WIFI_CONNECTED_BIT) { + ESP_LOGI(TAG, "connected to ap SSID:%s password:%s", EXAMPLE_ESP_WIFI_SSID, + EXAMPLE_ESP_WIFI_PASS); + } else if (bits & WIFI_FAIL_BIT) { + ESP_LOGI(TAG, "Failed to connect to SSID:%s, password:%s", + EXAMPLE_ESP_WIFI_SSID, EXAMPLE_ESP_WIFI_PASS); + } else { + ESP_LOGE(TAG, "UNEXPECTED EVENT"); + } +} + void app_main(void) { // statusLed = std::make_shared(); // statusLed->setStatus(StatusLed::IDLE); @@ -272,14 +364,8 @@ void app_main(void) { } ESP_ERROR_CHECK(ret); - init_spiffs(); - - // statusLed->setStatus(StatusLed::WIFI_CONNECTING); - - esp_wifi_set_ps(WIFI_PS_NONE); - ESP_ERROR_CHECK(esp_netif_init()); - ESP_ERROR_CHECK(esp_event_loop_create_default()); - ESP_ERROR_CHECK(example_connect()); + //init_spiffs(); + wifi_init_sta(); // statusLed->setStatus(StatusLed::WIFI_CONNECTED);