Skip to content

Commit

Permalink
feat(mappers): Stream name can now be accessed in __alias__ context…
Browse files Browse the repository at this point in the history
… of stream maps (#2701)

* Access stream name in alias

* Comply with mypy

* Clarify log

Co-authored-by: Edgar Ramírez Mondragón <[email protected]>

* Document

* Missed this

* Formatting

* Apply suggestions from code review

---------

Co-authored-by: Edgar Ramírez Mondragón <[email protected]>
  • Loading branch information
holly-evans and edgarrmondragon authored Oct 4, 2024
1 parent 9bf104e commit 8a38e79
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 0 deletions.
39 changes: 39 additions & 0 deletions docs/stream_maps.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,15 @@ The `Faker` class.
The `Faker` class was deprecated in favor of instance methods on the `fake` object.
:::

#### Built-in Alias Variable Names

The following variables are available in the context of the `__alias__` expression:
- `__stream_name__` - the existing stream name

:::{versionadded} TODO
The `__stream_name__` variable.
:::

#### Automatic Schema Detection

For performance reasons, type detection is performed at runtime using text analysis
Expand Down Expand Up @@ -640,6 +649,36 @@ stream_maps:
Support for stream glob expressions.
:::

### Aliasing two or more streams

The `__alias__` operation evaluates simple python expressions.

You can combine this with glob expressions to rename more than one stream:

````{tab} meltano.yml
```yaml
stream_maps:
"*":
__alias__: "__stream_name__ + '_v2'"
```
````

````{tab} JSON
```json
{
"stream_maps": {
"*": {
"__alias__": "__stream_name__ + '_v2'"
}
}
}
```
````

:::{versionadded} TODO
Support for `__alias__` expression evaluation.
:::

### Understanding Filters' Affects on Parent-Child Streams

Nested child streams iterations will be skipped if their parent stream has a record-level
Expand Down
38 changes: 38 additions & 0 deletions singer_sdk/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ def register_raw_stream_schema( # noqa: PLR0912, C901
elif MAPPER_ALIAS_OPTION in stream_def:
# <source>: __alias__: <alias>
stream_alias = stream_def.pop(MAPPER_ALIAS_OPTION)
stream_alias = PluginMapper._eval_stream(stream_alias, stream_name)

if stream_name == source_stream:
# Exact match
Expand Down Expand Up @@ -831,3 +832,40 @@ def register_raw_stream_schema( # noqa: PLR0912, C901
else:
# Additional mappers for aliasing and multi-projection:
self.stream_maps[source_stream].append(mapper)

@staticmethod
def _eval_stream(expr: str, stream_name: str) -> str:
"""Solve an alias expression.
Args:
expr: String expression to evaluate.
stream_name: Name of stream to transform.
Returns:
Evaluated expression.
Raises:
MapExpressionError: If the mapping expression failed to evaluate.
"""
# Allow stream name access within alias transform
names = {"__stream_name__": stream_name}

result: str

try:
expr_evaluator = simpleeval.EvalWithCompoundTypes(names=names)
result = expr_evaluator.eval(expr)
except simpleeval.NameNotDefined:
logging.debug(
"Failed to evaluate simpleeval expression %(expr) - "
"falling back to original expression",
extra={"expr": expr},
)
result = expr
except (simpleeval.InvalidExpression, SyntaxError) as ex:
msg = f"Failed to evaluate simpleeval expressions {expr}."
raise MapExpressionError(msg) from ex

logging.debug("Stream eval result: %s = %s", expr, result)

return result
24 changes: 24 additions & 0 deletions tests/core/test_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,30 @@ def discover_streams(self):
"aliased_stream_batch.jsonl",
id="aliased_stream_batch",
),
pytest.param(
{"mystream": {"__alias__": "aliased.stream"}},
{"flattening_enabled": False, "flattening_max_depth": 0},
"aliased_stream_not_expr.jsonl",
id="aliased_stream_not_expr",
),
pytest.param(
{"mystream": {"__alias__": "'__stream_name__'"}},
{"flattening_enabled": False, "flattening_max_depth": 0},
"aliased_stream_quoted.jsonl",
id="aliased_stream_quoted",
),
pytest.param(
{"mystream": {"__alias__": "'aliased_' + __stream_name__"}},
{"flattening_enabled": False, "flattening_max_depth": 0},
"builtin_variable_stream_name_alias.jsonl",
id="builtin_variable_stream_name_alias",
),
pytest.param(
{"mystream": {"__alias__": "__stream_name__.upper()"}},
{"flattening_enabled": False, "flattening_max_depth": 0},
"builtin_variable_stream_name_alias_expr.jsonl",
id="builtin_variable_stream_name_alias_expr",
),
pytest.param(
{},
{"flattening_enabled": True, "flattening_max_depth": 0},
Expand Down
6 changes: 6 additions & 0 deletions tests/snapshots/mapped_stream/aliased_stream_not_expr.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"type":"STATE","value":{}}
{"type":"SCHEMA","stream":"aliased.stream","schema":{"properties":{"email":{"type":["string"]},"count":{"type":["integer","null"]},"user":{"properties":{"id":{"type":["integer","null"]},"sub":{"properties":{"num":{"type":["integer","null"]},"custom_obj":{"type":["string","null"]}},"type":["object","null"]},"some_numbers":{"items":{"type":["number"]},"type":["array","null"]}},"type":["object","null"]}},"type":"object","required":["email"]},"key_properties":[]}
{"type":"RECORD","stream":"aliased.stream","record":{"email":"[email protected]","count":21,"user":{"id":1,"sub":{"num":1,"custom_obj":"obj-hello"},"some_numbers":[3.14,2.718]}},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"aliased.stream","record":{"email":"[email protected]","count":13,"user":{"id":2,"sub":{"num":2,"custom_obj":"obj-world"},"some_numbers":[10.32,1.618]}},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"aliased.stream","record":{"email":"[email protected]","count":19,"user":{"id":3,"sub":{"num":3,"custom_obj":"obj-hello"},"some_numbers":[1.414,1.732]}},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"mystream":{}}}}
6 changes: 6 additions & 0 deletions tests/snapshots/mapped_stream/aliased_stream_quoted.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"type":"STATE","value":{}}
{"type":"SCHEMA","stream":"__stream_name__","schema":{"properties":{"email":{"type":["string"]},"count":{"type":["integer","null"]},"user":{"properties":{"id":{"type":["integer","null"]},"sub":{"properties":{"num":{"type":["integer","null"]},"custom_obj":{"type":["string","null"]}},"type":["object","null"]},"some_numbers":{"items":{"type":["number"]},"type":["array","null"]}},"type":["object","null"]}},"type":"object","required":["email"]},"key_properties":[]}
{"type":"RECORD","stream":"__stream_name__","record":{"email":"[email protected]","count":21,"user":{"id":1,"sub":{"num":1,"custom_obj":"obj-hello"},"some_numbers":[3.14,2.718]}},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"__stream_name__","record":{"email":"[email protected]","count":13,"user":{"id":2,"sub":{"num":2,"custom_obj":"obj-world"},"some_numbers":[10.32,1.618]}},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"__stream_name__","record":{"email":"[email protected]","count":19,"user":{"id":3,"sub":{"num":3,"custom_obj":"obj-hello"},"some_numbers":[1.414,1.732]}},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"mystream":{}}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"type":"STATE","value":{}}
{"type":"SCHEMA","stream":"aliased_mystream","schema":{"properties":{"email":{"type":["string"]},"count":{"type":["integer","null"]},"user":{"properties":{"id":{"type":["integer","null"]},"sub":{"properties":{"num":{"type":["integer","null"]},"custom_obj":{"type":["string","null"]}},"type":["object","null"]},"some_numbers":{"items":{"type":["number"]},"type":["array","null"]}},"type":["object","null"]}},"type":"object","required":["email"]},"key_properties":[]}
{"type":"RECORD","stream":"aliased_mystream","record":{"email":"[email protected]","count":21,"user":{"id":1,"sub":{"num":1,"custom_obj":"obj-hello"},"some_numbers":[3.14,2.718]}},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"aliased_mystream","record":{"email":"[email protected]","count":13,"user":{"id":2,"sub":{"num":2,"custom_obj":"obj-world"},"some_numbers":[10.32,1.618]}},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"aliased_mystream","record":{"email":"[email protected]","count":19,"user":{"id":3,"sub":{"num":3,"custom_obj":"obj-hello"},"some_numbers":[1.414,1.732]}},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"mystream":{}}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"type":"STATE","value":{}}
{"type":"SCHEMA","stream":"MYSTREAM","schema":{"properties":{"email":{"type":["string"]},"count":{"type":["integer","null"]},"user":{"properties":{"id":{"type":["integer","null"]},"sub":{"properties":{"num":{"type":["integer","null"]},"custom_obj":{"type":["string","null"]}},"type":["object","null"]},"some_numbers":{"items":{"type":["number"]},"type":["array","null"]}},"type":["object","null"]}},"type":"object","required":["email"]},"key_properties":[]}
{"type":"RECORD","stream":"MYSTREAM","record":{"email":"[email protected]","count":21,"user":{"id":1,"sub":{"num":1,"custom_obj":"obj-hello"},"some_numbers":[3.14,2.718]}},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"MYSTREAM","record":{"email":"[email protected]","count":13,"user":{"id":2,"sub":{"num":2,"custom_obj":"obj-world"},"some_numbers":[10.32,1.618]}},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"MYSTREAM","record":{"email":"[email protected]","count":19,"user":{"id":3,"sub":{"num":3,"custom_obj":"obj-hello"},"some_numbers":[1.414,1.732]}},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"mystream":{}}}}

0 comments on commit 8a38e79

Please sign in to comment.