From 9e264f688091f8dfaf3688974c0f4c6b45f93152 Mon Sep 17 00:00:00 2001 From: basteln3rk Date: Sun, 5 Feb 2023 22:38:20 +0000 Subject: [PATCH 1/6] initial draft of incremental replication docs --- docs/incremental_replication.md | 50 +++++++++++++++++++++++++++++++++ docs/index.rst | 2 +- 2 files changed, 51 insertions(+), 1 deletion(-) create mode 100644 docs/incremental_replication.md diff --git a/docs/incremental_replication.md b/docs/incremental_replication.md new file mode 100644 index 000000000..9fa180130 --- /dev/null +++ b/docs/incremental_replication.md @@ -0,0 +1,50 @@ +# Incremental Replication + +With incremental replication, a Singer tap emits only data that were created or updated since the previous import rather than the full table. + +To support incremental replication, the tap must first define how its replication state will be tracked, e.g. the id of the newest record or the maximal update timestamp in the previous import. Meltano stores the state and makes it available through the context object on subsequent runs. Using the state, the tap should then skip returning rows where the replication key value is less than previous maximal replication key value stored in the state. + +## Example Code: Timestamp-Based Incremental Replication + +```py +class CommentsStream(RESTStream): + + replication_key = "date_gmt" + is_sorted = True + + base_url = "https://example.com/wp-json/wp/v2/comments" + + def get_url_params( + self, context: dict | None, next_page_token: _TToken | None + ) -> dict[str, Any]: + params = [] + if starting_date := self.get_starting_timestamp(context): + params["after"] = starting_date.isoformat() + if next_page_token is not None: + params["page"] = next_page_token + return params +``` + +First we inform the SDK of the `replication_key`, which automatically triggers incremental import mode. Second, optionally, set `is_sorted` to true; with this setting, Singer will throw an error if a supposedly incremental import sends results older than the starting timestamp. + +Last, we have to adapt the query to the remote system, in this example by adding a query parameter with the ISO timestamp. + +Note: +- unlike a `primary_key`, a `replication_key` does not have to be unique +- in incremental replication, it is OK to resend rows where the replication key is equal to previous highest key. + +## Manually testing incremental import during development + +To test the tap stand-alone, manually create a state file and run the tap: + +```shell +$ echo '{"bookmarks": {"documents": {"replication_key": "date_gmt", "replication_key_value": "2023-01-15T12:00:00.120000"}}}' > state_test.json + +$ tap-my-example --config tap_config_test.json --state state_test.json +``` + +## Additional References + +- [Tap SDK State](./implementation/state.md) +- [Context Object](./context_object.md) +- [Example tap with get_starting_replication_key_value](https://github.com/flexponsive/tap-eu-ted/blob/main/tap_eu_ted/client.py) \ No newline at end of file diff --git a/docs/index.rst b/docs/index.rst index 17592f087..04543acd9 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -62,7 +62,7 @@ Advanced Topics .. toctree:: :maxdepth: 1 - + incremental_replication parent_streams partitioning context_object From 08f2d32851ca9909caf9ac9890e7938b45188686 Mon Sep 17 00:00:00 2001 From: basteln3rk Date: Mon, 6 Feb 2023 16:20:58 +0000 Subject: [PATCH 2/6] some fixes --- docs/incremental_replication.md | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/docs/incremental_replication.md b/docs/incremental_replication.md index 9fa180130..7748e194f 100644 --- a/docs/incremental_replication.md +++ b/docs/incremental_replication.md @@ -12,17 +12,23 @@ class CommentsStream(RESTStream): replication_key = "date_gmt" is_sorted = True - base_url = "https://example.com/wp-json/wp/v2/comments" + schema = th.PropertiesList( + th.Property("date_gmt", th.DateTimeType, description="date"), + ).to_dict() def get_url_params( - self, context: dict | None, next_page_token: _TToken | None + self, context: dict | None, next_page_token ) -> dict[str, Any]: - params = [] + params = {} if starting_date := self.get_starting_timestamp(context): params["after"] = starting_date.isoformat() if next_page_token is not None: params["page"] = next_page_token + self.logger.info(f"QUERY PARAMS: {params}") return params + + url_base = "https://example.com/wp-json/wp/v2/comments" + authenticator = None ``` First we inform the SDK of the `replication_key`, which automatically triggers incremental import mode. Second, optionally, set `is_sorted` to true; with this setting, Singer will throw an error if a supposedly incremental import sends results older than the starting timestamp. From a9ceb3d3914135cf0c70dff8adb01c2a7e366221 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 6 Feb 2023 16:26:35 +0000 Subject: [PATCH 3/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- docs/incremental_replication.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/incremental_replication.md b/docs/incremental_replication.md index 7748e194f..645ee0644 100644 --- a/docs/incremental_replication.md +++ b/docs/incremental_replication.md @@ -1,14 +1,14 @@ # Incremental Replication -With incremental replication, a Singer tap emits only data that were created or updated since the previous import rather than the full table. +With incremental replication, a Singer tap emits only data that were created or updated since the previous import rather than the full table. -To support incremental replication, the tap must first define how its replication state will be tracked, e.g. the id of the newest record or the maximal update timestamp in the previous import. Meltano stores the state and makes it available through the context object on subsequent runs. Using the state, the tap should then skip returning rows where the replication key value is less than previous maximal replication key value stored in the state. +To support incremental replication, the tap must first define how its replication state will be tracked, e.g. the id of the newest record or the maximal update timestamp in the previous import. Meltano stores the state and makes it available through the context object on subsequent runs. Using the state, the tap should then skip returning rows where the replication key value is less than previous maximal replication key value stored in the state. ## Example Code: Timestamp-Based Incremental Replication ```py class CommentsStream(RESTStream): - + replication_key = "date_gmt" is_sorted = True From cb38b04be924231c71cdb7f8dacec903672b777e Mon Sep 17 00:00:00 2001 From: basteln3rk Date: Mon, 6 Feb 2023 16:30:06 +0000 Subject: [PATCH 4/6] fix docs index.rst --- docs/index.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/index.rst b/docs/index.rst index 04543acd9..3b2ce2720 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -62,6 +62,7 @@ Advanced Topics .. toctree:: :maxdepth: 1 + incremental_replication parent_streams partitioning From 90651c29c2fa8ff5703396fbc2bfdd759c7d12f8 Mon Sep 17 00:00:00 2001 From: "Edgar R. M" Date: Wed, 8 Feb 2023 11:10:11 -0600 Subject: [PATCH 5/6] Apply suggestions from code review --- docs/incremental_replication.md | 35 +++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/docs/incremental_replication.md b/docs/incremental_replication.md index 645ee0644..84b605094 100644 --- a/docs/incremental_replication.md +++ b/docs/incremental_replication.md @@ -2,7 +2,9 @@ With incremental replication, a Singer tap emits only data that were created or updated since the previous import rather than the full table. -To support incremental replication, the tap must first define how its replication state will be tracked, e.g. the id of the newest record or the maximal update timestamp in the previous import. Meltano stores the state and makes it available through the context object on subsequent runs. Using the state, the tap should then skip returning rows where the replication key value is less than previous maximal replication key value stored in the state. +To support incremental replication, the tap must first define how its replication state will be tracked, e.g. the id of the newest record or the maximal update timestamp in the previous import. + +You'll either have to manage your own [state file](https://hub.meltano.com/singer/spec#state-files-1), or use Meltano. The Singer SDK makes the tap state available through the [context object](./context_object.md) on subsequent runs. Using the state, the tap should then skip returning rows where the replication key comes _strictly before_ than previous maximal replication key value stored in the state. ## Example Code: Timestamp-Based Incremental Replication @@ -16,32 +18,35 @@ class CommentsStream(RESTStream): th.Property("date_gmt", th.DateTimeType, description="date"), ).to_dict() - def get_url_params( - self, context: dict | None, next_page_token - ) -> dict[str, Any]: + def get_url_params(self, context, next_page_token): params = {} - if starting_date := self.get_starting_timestamp(context): + + starting_date = self.get_starting_timestamp(context) + if starting_date: params["after"] = starting_date.isoformat() + if next_page_token is not None: params["page"] = next_page_token - self.logger.info(f"QUERY PARAMS: {params}") - return params - url_base = "https://example.com/wp-json/wp/v2/comments" - authenticator = None + self.logger.info("QUERY PARAMS: %s", params) + return params ``` -First we inform the SDK of the `replication_key`, which automatically triggers incremental import mode. Second, optionally, set `is_sorted` to true; with this setting, Singer will throw an error if a supposedly incremental import sends results older than the starting timestamp. +1. First we inform the SDK of the `replication_key`, which automatically triggers incremental import mode. + +2. Second, optionally, set `is_sorted` to true if the records are monotonically increasing (i.e. newer records always come later). With this setting, the sync will be resumable if it's interrupted at any point and the state file will reflect this. Otherwise, the tap has to run to completion so the state can safely reflect the largest replication value seen. + +3. Last, we have to adapt the query to the remote system, in this example by adding a query parameter with the ISO timestamp. -Last, we have to adapt the query to the remote system, in this example by adding a query parameter with the ISO timestamp. -Note: -- unlike a `primary_key`, a `replication_key` does not have to be unique -- in incremental replication, it is OK to resend rows where the replication key is equal to previous highest key. +```{note} +- The SDK will throw an error if records come out of order when `is_sorted` is true. +- Unlike a `primary_key`, a `replication_key` does not have to be unique +- In incremental replication, it is OK and usually recommended to resend rows where the replication key is equal to previous highest key. Targets are expected to update rows that are re-synced. ## Manually testing incremental import during development -To test the tap stand-alone, manually create a state file and run the tap: +To test the tap in standalone mode, manually create a state file and run the tap: ```shell $ echo '{"bookmarks": {"documents": {"replication_key": "date_gmt", "replication_key_value": "2023-01-15T12:00:00.120000"}}}' > state_test.json From cd2bcc1ab34ba38ce18e408a6f69c8a2bc14eb1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Wed, 8 Feb 2023 11:15:28 -0600 Subject: [PATCH 6/6] Fix note block --- docs/incremental_replication.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/incremental_replication.md b/docs/incremental_replication.md index 84b605094..757f15802 100644 --- a/docs/incremental_replication.md +++ b/docs/incremental_replication.md @@ -43,6 +43,7 @@ class CommentsStream(RESTStream): - The SDK will throw an error if records come out of order when `is_sorted` is true. - Unlike a `primary_key`, a `replication_key` does not have to be unique - In incremental replication, it is OK and usually recommended to resend rows where the replication key is equal to previous highest key. Targets are expected to update rows that are re-synced. +``` ## Manually testing incremental import during development @@ -58,4 +59,4 @@ $ tap-my-example --config tap_config_test.json --state state_test.json - [Tap SDK State](./implementation/state.md) - [Context Object](./context_object.md) -- [Example tap with get_starting_replication_key_value](https://github.com/flexponsive/tap-eu-ted/blob/main/tap_eu_ted/client.py) \ No newline at end of file +- [Example tap with get_starting_replication_key_value](https://github.com/flexponsive/tap-eu-ted/blob/main/tap_eu_ted/client.py)