From 8a38e79cdcbd0e546fb42c1d3f1f731745389a92 Mon Sep 17 00:00:00 2001 From: Holly Evans <39742776+holly-evans@users.noreply.github.com> Date: Fri, 4 Oct 2024 15:02:00 -0500 Subject: [PATCH] feat(mappers): Stream name can now be accessed in `__alias__` context of stream maps (#2701) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Access stream name in alias * Comply with mypy * Clarify log Co-authored-by: Edgar Ramírez Mondragón <16805946+edgarrmondragon@users.noreply.github.com> * Document * Missed this * Formatting * Apply suggestions from code review --------- Co-authored-by: Edgar Ramírez Mondragón <16805946+edgarrmondragon@users.noreply.github.com> --- docs/stream_maps.md | 39 +++++++++++++++++++ singer_sdk/mapper.py | 38 ++++++++++++++++++ tests/core/test_mapper.py | 24 ++++++++++++ .../aliased_stream_not_expr.jsonl | 6 +++ .../mapped_stream/aliased_stream_quoted.jsonl | 6 +++ .../builtin_variable_stream_name_alias.jsonl | 6 +++ ...ltin_variable_stream_name_alias_expr.jsonl | 6 +++ 7 files changed, 125 insertions(+) create mode 100644 tests/snapshots/mapped_stream/aliased_stream_not_expr.jsonl create mode 100644 tests/snapshots/mapped_stream/aliased_stream_quoted.jsonl create mode 100644 tests/snapshots/mapped_stream/builtin_variable_stream_name_alias.jsonl create mode 100644 tests/snapshots/mapped_stream/builtin_variable_stream_name_alias_expr.jsonl diff --git a/docs/stream_maps.md b/docs/stream_maps.md index e31a9b634..693b8caf9 100644 --- a/docs/stream_maps.md +++ b/docs/stream_maps.md @@ -266,6 +266,15 @@ The `Faker` class. The `Faker` class was deprecated in favor of instance methods on the `fake` object. ::: +#### Built-in Alias Variable Names + +The following variables are available in the context of the `__alias__` expression: +- `__stream_name__` - the existing stream name + +:::{versionadded} TODO +The `__stream_name__` variable. +::: + #### Automatic Schema Detection For performance reasons, type detection is performed at runtime using text analysis @@ -640,6 +649,36 @@ stream_maps: Support for stream glob expressions. ::: +### Aliasing two or more streams + +The `__alias__` operation evaluates simple python expressions. + +You can combine this with glob expressions to rename more than one stream: + +````{tab} meltano.yml +```yaml +stream_maps: + "*": + __alias__: "__stream_name__ + '_v2'" +``` +```` + +````{tab} JSON +```json +{ + "stream_maps": { + "*": { + "__alias__": "__stream_name__ + '_v2'" + } + } +} +``` +```` + +:::{versionadded} TODO +Support for `__alias__` expression evaluation. +::: + ### Understanding Filters' Affects on Parent-Child Streams Nested child streams iterations will be skipped if their parent stream has a record-level diff --git a/singer_sdk/mapper.py b/singer_sdk/mapper.py index b613b78ec..14f1eac3e 100644 --- a/singer_sdk/mapper.py +++ b/singer_sdk/mapper.py @@ -779,6 +779,7 @@ def register_raw_stream_schema( # noqa: PLR0912, C901 elif MAPPER_ALIAS_OPTION in stream_def: # : __alias__: stream_alias = stream_def.pop(MAPPER_ALIAS_OPTION) + stream_alias = PluginMapper._eval_stream(stream_alias, stream_name) if stream_name == source_stream: # Exact match @@ -831,3 +832,40 @@ def register_raw_stream_schema( # noqa: PLR0912, C901 else: # Additional mappers for aliasing and multi-projection: self.stream_maps[source_stream].append(mapper) + + @staticmethod + def _eval_stream(expr: str, stream_name: str) -> str: + """Solve an alias expression. + + Args: + expr: String expression to evaluate. + stream_name: Name of stream to transform. + + Returns: + Evaluated expression. + + Raises: + MapExpressionError: If the mapping expression failed to evaluate. + """ + # Allow stream name access within alias transform + names = {"__stream_name__": stream_name} + + result: str + + try: + expr_evaluator = simpleeval.EvalWithCompoundTypes(names=names) + result = expr_evaluator.eval(expr) + except simpleeval.NameNotDefined: + logging.debug( + "Failed to evaluate simpleeval expression %(expr) - " + "falling back to original expression", + extra={"expr": expr}, + ) + result = expr + except (simpleeval.InvalidExpression, SyntaxError) as ex: + msg = f"Failed to evaluate simpleeval expressions {expr}." + raise MapExpressionError(msg) from ex + + logging.debug("Stream eval result: %s = %s", expr, result) + + return result diff --git a/tests/core/test_mapper.py b/tests/core/test_mapper.py index a776689a1..37e289e1b 100644 --- a/tests/core/test_mapper.py +++ b/tests/core/test_mapper.py @@ -768,6 +768,30 @@ def discover_streams(self): "aliased_stream_batch.jsonl", id="aliased_stream_batch", ), + pytest.param( + {"mystream": {"__alias__": "aliased.stream"}}, + {"flattening_enabled": False, "flattening_max_depth": 0}, + "aliased_stream_not_expr.jsonl", + id="aliased_stream_not_expr", + ), + pytest.param( + {"mystream": {"__alias__": "'__stream_name__'"}}, + {"flattening_enabled": False, "flattening_max_depth": 0}, + "aliased_stream_quoted.jsonl", + id="aliased_stream_quoted", + ), + pytest.param( + {"mystream": {"__alias__": "'aliased_' + __stream_name__"}}, + {"flattening_enabled": False, "flattening_max_depth": 0}, + "builtin_variable_stream_name_alias.jsonl", + id="builtin_variable_stream_name_alias", + ), + pytest.param( + {"mystream": {"__alias__": "__stream_name__.upper()"}}, + {"flattening_enabled": False, "flattening_max_depth": 0}, + "builtin_variable_stream_name_alias_expr.jsonl", + id="builtin_variable_stream_name_alias_expr", + ), pytest.param( {}, {"flattening_enabled": True, "flattening_max_depth": 0}, diff --git a/tests/snapshots/mapped_stream/aliased_stream_not_expr.jsonl b/tests/snapshots/mapped_stream/aliased_stream_not_expr.jsonl new file mode 100644 index 000000000..473737291 --- /dev/null +++ b/tests/snapshots/mapped_stream/aliased_stream_not_expr.jsonl @@ -0,0 +1,6 @@ +{"type":"STATE","value":{}} +{"type":"SCHEMA","stream":"aliased.stream","schema":{"properties":{"email":{"type":["string"]},"count":{"type":["integer","null"]},"user":{"properties":{"id":{"type":["integer","null"]},"sub":{"properties":{"num":{"type":["integer","null"]},"custom_obj":{"type":["string","null"]}},"type":["object","null"]},"some_numbers":{"items":{"type":["number"]},"type":["array","null"]}},"type":["object","null"]}},"type":"object","required":["email"]},"key_properties":[]} +{"type":"RECORD","stream":"aliased.stream","record":{"email":"alice@example.com","count":21,"user":{"id":1,"sub":{"num":1,"custom_obj":"obj-hello"},"some_numbers":[3.14,2.718]}},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"aliased.stream","record":{"email":"bob@example.com","count":13,"user":{"id":2,"sub":{"num":2,"custom_obj":"obj-world"},"some_numbers":[10.32,1.618]}},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"aliased.stream","record":{"email":"charlie@example.com","count":19,"user":{"id":3,"sub":{"num":3,"custom_obj":"obj-hello"},"some_numbers":[1.414,1.732]}},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"STATE","value":{"bookmarks":{"mystream":{}}}} diff --git a/tests/snapshots/mapped_stream/aliased_stream_quoted.jsonl b/tests/snapshots/mapped_stream/aliased_stream_quoted.jsonl new file mode 100644 index 000000000..fd9fec13e --- /dev/null +++ b/tests/snapshots/mapped_stream/aliased_stream_quoted.jsonl @@ -0,0 +1,6 @@ +{"type":"STATE","value":{}} +{"type":"SCHEMA","stream":"__stream_name__","schema":{"properties":{"email":{"type":["string"]},"count":{"type":["integer","null"]},"user":{"properties":{"id":{"type":["integer","null"]},"sub":{"properties":{"num":{"type":["integer","null"]},"custom_obj":{"type":["string","null"]}},"type":["object","null"]},"some_numbers":{"items":{"type":["number"]},"type":["array","null"]}},"type":["object","null"]}},"type":"object","required":["email"]},"key_properties":[]} +{"type":"RECORD","stream":"__stream_name__","record":{"email":"alice@example.com","count":21,"user":{"id":1,"sub":{"num":1,"custom_obj":"obj-hello"},"some_numbers":[3.14,2.718]}},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"__stream_name__","record":{"email":"bob@example.com","count":13,"user":{"id":2,"sub":{"num":2,"custom_obj":"obj-world"},"some_numbers":[10.32,1.618]}},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"__stream_name__","record":{"email":"charlie@example.com","count":19,"user":{"id":3,"sub":{"num":3,"custom_obj":"obj-hello"},"some_numbers":[1.414,1.732]}},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"STATE","value":{"bookmarks":{"mystream":{}}}} diff --git a/tests/snapshots/mapped_stream/builtin_variable_stream_name_alias.jsonl b/tests/snapshots/mapped_stream/builtin_variable_stream_name_alias.jsonl new file mode 100644 index 000000000..bb0666313 --- /dev/null +++ b/tests/snapshots/mapped_stream/builtin_variable_stream_name_alias.jsonl @@ -0,0 +1,6 @@ +{"type":"STATE","value":{}} +{"type":"SCHEMA","stream":"aliased_mystream","schema":{"properties":{"email":{"type":["string"]},"count":{"type":["integer","null"]},"user":{"properties":{"id":{"type":["integer","null"]},"sub":{"properties":{"num":{"type":["integer","null"]},"custom_obj":{"type":["string","null"]}},"type":["object","null"]},"some_numbers":{"items":{"type":["number"]},"type":["array","null"]}},"type":["object","null"]}},"type":"object","required":["email"]},"key_properties":[]} +{"type":"RECORD","stream":"aliased_mystream","record":{"email":"alice@example.com","count":21,"user":{"id":1,"sub":{"num":1,"custom_obj":"obj-hello"},"some_numbers":[3.14,2.718]}},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"aliased_mystream","record":{"email":"bob@example.com","count":13,"user":{"id":2,"sub":{"num":2,"custom_obj":"obj-world"},"some_numbers":[10.32,1.618]}},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"aliased_mystream","record":{"email":"charlie@example.com","count":19,"user":{"id":3,"sub":{"num":3,"custom_obj":"obj-hello"},"some_numbers":[1.414,1.732]}},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"STATE","value":{"bookmarks":{"mystream":{}}}} diff --git a/tests/snapshots/mapped_stream/builtin_variable_stream_name_alias_expr.jsonl b/tests/snapshots/mapped_stream/builtin_variable_stream_name_alias_expr.jsonl new file mode 100644 index 000000000..c698cf72b --- /dev/null +++ b/tests/snapshots/mapped_stream/builtin_variable_stream_name_alias_expr.jsonl @@ -0,0 +1,6 @@ +{"type":"STATE","value":{}} +{"type":"SCHEMA","stream":"MYSTREAM","schema":{"properties":{"email":{"type":["string"]},"count":{"type":["integer","null"]},"user":{"properties":{"id":{"type":["integer","null"]},"sub":{"properties":{"num":{"type":["integer","null"]},"custom_obj":{"type":["string","null"]}},"type":["object","null"]},"some_numbers":{"items":{"type":["number"]},"type":["array","null"]}},"type":["object","null"]}},"type":"object","required":["email"]},"key_properties":[]} +{"type":"RECORD","stream":"MYSTREAM","record":{"email":"alice@example.com","count":21,"user":{"id":1,"sub":{"num":1,"custom_obj":"obj-hello"},"some_numbers":[3.14,2.718]}},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"MYSTREAM","record":{"email":"bob@example.com","count":13,"user":{"id":2,"sub":{"num":2,"custom_obj":"obj-world"},"some_numbers":[10.32,1.618]}},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"MYSTREAM","record":{"email":"charlie@example.com","count":19,"user":{"id":3,"sub":{"num":3,"custom_obj":"obj-hello"},"some_numbers":[1.414,1.732]}},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"STATE","value":{"bookmarks":{"mystream":{}}}}