From 8fb4009cdfbdda526a9856576cf2a182c90a1ef9 Mon Sep 17 00:00:00 2001 From: Reuben Frankel Date: Mon, 26 Feb 2024 04:07:08 +0000 Subject: [PATCH 1/4] Add support for record chunking --- tap_spotify/client.py | 19 ++++++++++++++++++- tap_spotify/streams.py | 30 +++++++++++++++++------------- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/tap_spotify/client.py b/tap_spotify/client.py index ab47fd2..e76d2b9 100644 --- a/tap_spotify/client.py +++ b/tap_spotify/client.py @@ -1,6 +1,6 @@ """REST client handling, including SpotifyStream base class.""" -from typing import Optional +from typing import Iterable, Optional from urllib.parse import ParseResult, parse_qsl from memoization import cached @@ -15,6 +15,7 @@ class SpotifyStream(RESTStream): url_base = "https://api.spotify.com/v1" records_jsonpath = "$.items[*]" + chunk_size = None @property @cached @@ -27,3 +28,19 @@ def get_new_paginator(self): def get_url_params(self, context, next_page_token: Optional[ParseResult]): params = super().get_url_params(context, next_page_token) return dict(parse_qsl(next_page_token.query)) if next_page_token else params + + def chunk_records(self, records: Iterable[dict]): + if not self.chunk_size: + return [records] + + chunk = [] + + for i, record in enumerate(records): + if i and not i % self.chunk_size: + yield list(chunk) + chunk.clear() + + chunk.append(record) + + if chunk: + yield list(chunk) diff --git a/tap_spotify/streams.py b/tap_spotify/streams.py index 425b8b6..b6de4a4 100644 --- a/tap_spotify/streams.py +++ b/tap_spotify/streams.py @@ -42,20 +42,24 @@ def post_process(self, row, context): class _TracksStream(SpotifyStream): """Define a track stream.""" + chunk_size = 100 + def get_records(self, context): - # get all track records - track_records = list(super().request_records(context)) - - # get all audio features records - # instantiate audio features stream inline and request records - audio_features_stream = _AudioFeaturesStream(self, track_records) - audio_features_records = audio_features_stream.request_records(context) - - # merge track and audio features records - for track, audio_features in zip(track_records, audio_features_records): - # account for tracks with `null` audio features - row = {**(audio_features or {}), **track} - yield self.post_process(row, context) + # chunk all track records + track_records = super().request_records(context) + track_records_chunks = self.chunk_records(track_records) + + for track_records_chunk in track_records_chunks: + # get audio features records + # instantiate audio features stream inline and request records + audio_features_stream = _AudioFeaturesStream(self, track_records_chunk) + audio_features_records = audio_features_stream.request_records(context) + + # merge chunked track and audio features records + for track, audio_features in zip(track_records_chunk, audio_features_records): + # account for tracks with `null` audio features + row = {**(audio_features or {}), **track} + yield self.post_process(row, context) class _AudioFeaturesStream(SpotifyStream): From d5c27691d3192732e6e57d75b9ebdd79fb4afbd1 Mon Sep 17 00:00:00 2001 From: Reuben Frankel Date: Mon, 26 Feb 2024 04:10:45 +0000 Subject: [PATCH 2/4] Define max tracks for audio features stream --- tap_spotify/streams.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tap_spotify/streams.py b/tap_spotify/streams.py index b6de4a4..459e13b 100644 --- a/tap_spotify/streams.py +++ b/tap_spotify/streams.py @@ -69,9 +69,17 @@ class _AudioFeaturesStream(SpotifyStream): path = "/audio-features" records_jsonpath = "$.audio_features[*]" schema = AudioFeaturesObject.schema + max_tracks = 100 def __init__(self, tracks_stream: _TracksStream, track_records: Iterable[dict]): super().__init__(tracks_stream._tap) + + total_tracks = len(track_records) + + if total_tracks > self.max_tracks: + msg = f"Cannot get audio features for more than {self.max_tracks} tracks at a time: {total_tracks} requested" + raise ValueError(msg) + self._track_records = track_records def get_url_params(self, *args, **kwargs): From 5ae215d800bfa70311b56d9fb5336f084543a2cf Mon Sep 17 00:00:00 2001 From: Reuben Frankel Date: Mon, 26 Feb 2024 04:12:13 +0000 Subject: [PATCH 3/4] Use max tracks for audio features stream to define track stream chunk size --- tap_spotify/streams.py | 51 +++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/tap_spotify/streams.py b/tap_spotify/streams.py index 459e13b..a0a991d 100644 --- a/tap_spotify/streams.py +++ b/tap_spotify/streams.py @@ -1,5 +1,7 @@ """Stream type classes for tap-spotify.""" +from __future__ import annotations + from datetime import datetime from typing import Iterable @@ -39,29 +41,6 @@ def post_process(self, row, context): return row -class _TracksStream(SpotifyStream): - """Define a track stream.""" - - chunk_size = 100 - - def get_records(self, context): - # chunk all track records - track_records = super().request_records(context) - track_records_chunks = self.chunk_records(track_records) - - for track_records_chunk in track_records_chunks: - # get audio features records - # instantiate audio features stream inline and request records - audio_features_stream = _AudioFeaturesStream(self, track_records_chunk) - audio_features_records = audio_features_stream.request_records(context) - - # merge chunked track and audio features records - for track, audio_features in zip(track_records_chunk, audio_features_records): - # account for tracks with `null` audio features - row = {**(audio_features or {}), **track} - yield self.post_process(row, context) - - class _AudioFeaturesStream(SpotifyStream): """Define an audio features stream.""" @@ -86,6 +65,32 @@ def get_url_params(self, *args, **kwargs): return {"ids": ",".join([track["id"] for track in self._track_records])} +class _TracksStream(SpotifyStream): + """Define a track stream.""" + + chunk_size = _AudioFeaturesStream.max_tracks + + def get_records(self, context): + # chunk all track records + track_records = super().request_records(context) + track_records_chunks = self.chunk_records(track_records) + + for track_records_chunk in track_records_chunks: + # get audio features records + # instantiate audio features stream inline and request records + audio_features_stream = _AudioFeaturesStream(self, track_records_chunk) + audio_features_records = audio_features_stream.request_records(context) + + # merge chunked track and audio features records + for track, audio_features in zip( + track_records_chunk, + audio_features_records, + ): + # account for tracks with `null` audio features + row = {**(audio_features or {}), **track} + yield self.post_process(row, context) + + class _UserTopItemsStream(_RankStream, _SyncedAtStream, SpotifyStream): """Define user top items stream.""" From afa21986c4edbbe9f3cd65ad6c367cef3e7e5b4a Mon Sep 17 00:00:00 2001 From: Reuben Frankel Date: Mon, 26 Feb 2024 04:12:40 +0000 Subject: [PATCH 4/4] Remove unused import --- tap_spotify/streams.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tap_spotify/streams.py b/tap_spotify/streams.py index a0a991d..6bb99a6 100644 --- a/tap_spotify/streams.py +++ b/tap_spotify/streams.py @@ -5,7 +5,6 @@ from datetime import datetime from typing import Iterable -from requests.models import Response as Response from singer_sdk.streams.rest import RESTStream from tap_spotify.client import SpotifyStream