Skip to content

Commit

Permalink
Merge pull request #189 from keboola/fix/CFT-3131
Browse files Browse the repository at this point in the history
Fix/cft 3131
  • Loading branch information
kudj authored Oct 18, 2024
2 parents 3187f50 + 8139592 commit 15a9ce2
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ RUN pip install -r /tmp/requirements.txt

# Install Node.js and set up symlinks
RUN set -eux; \
NODE_VERSION="$(curl -fsSL https://nodejs.org/dist/latest/SHASUMS256.txt | head -n1 | awk '{ print $2 }' | awk -F - '{ print $2 }')" \
NODE_VERSION="v22.10.0" \
ARCH= && dpkgArch="$(dpkg --print-architecture)"; \
case "${dpkgArch##*-}" in \
amd64) ARCH='x64';; \
Expand Down
64 changes: 43 additions & 21 deletions python-sync-actions/src/actions/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def __infer_mapping_from_structure_recursive(self, node_hierarchy: dict[str, Any
Infer first level Generic Extractor mapping from data sample.
Args:
node_hierarchy: sample data
primary_keys: optional list of columns to be used as primary keys=
primary_keys: optional list of columns to be used as primary keys
Returns:
Expand All @@ -110,27 +110,42 @@ def __infer_mapping_from_structure_recursive(self, node_hierarchy: dict[str, Any
if not current_mapping:
current_mapping = {}
for key, value in node_hierarchy.items():
current_node: Node = value['node']
path_key = path_separator.join(current_node.path)
normalized_header_name = self.header_normalizer._normalize_column_name(current_node.header_name) # noqa
match current_node.data_type:
case NodeType.SCALAR:
if path_key in primary_keys:
current_mapping[path_key] = MappingElements.primary_key_column(normalized_header_name)
else:
current_mapping[path_key] = normalized_header_name

case NodeType.DICT:
if current_level <= max_level:
self.__infer_mapping_from_structure_recursive(value['children'], primary_keys,
path_separator,
max_level, current_mapping,
current_level)
else:
if isinstance(value, dict):
current_node: Node = value['node']
path_key = path_separator.join(current_node.path)
normalized_header_name = self.header_normalizer._normalize_column_name(current_node.header_name) # noqa
match current_node.data_type:
case NodeType.SCALAR:
if path_key in primary_keys:
current_mapping[path_key] = MappingElements.primary_key_column(normalized_header_name)
else:
current_mapping[path_key] = normalized_header_name

case NodeType.DICT:
if current_level <= max_level:
self.__infer_mapping_from_structure_recursive(value['children'], primary_keys,
path_separator,
max_level, current_mapping,
current_level)
else:
current_mapping[path_key] = MappingElements.force_type_column(normalized_header_name)
case _:
# all other types including unknown map with forceType option
current_mapping[path_key] = MappingElements.force_type_column(normalized_header_name)
case _:
# all other types including unknown map with forceType option
current_mapping[path_key] = MappingElements.force_type_column(normalized_header_name)
elif isinstance(value, list):
# Handle list of dictionaries
if all(isinstance(item, dict) for item in value):
for idx, item in enumerate(value):
list_key = f"{key}[{idx}]"
self.__infer_mapping_from_structure_recursive({list_key: item}, primary_keys,
path_separator, max_level,
current_mapping, current_level)
else:
# Handle list of non-dictionary items
current_mapping[key] = MappingElements.force_type_column(key)
else:
# Handle scalar values directly
current_mapping[key] = MappingElements.force_type_column(key)
return current_mapping


Expand Down Expand Up @@ -194,6 +209,13 @@ def infer_mapping(data: list[dict],
"""
analyzer = StuctureAnalyzer()

if not isinstance(data, list):
for _, item in data.items():
if isinstance(item, list):
data = item
break

for row in data:
analyzer.parse_row(row)

Expand Down
2 changes: 1 addition & 1 deletion python-sync-actions/src/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import copy
import logging
from io import StringIO
from json import JSONDecodeError
from typing import List

import requests
from requests.exceptions import JSONDecodeError
from keboola.component.base import ComponentBase, sync_action
from keboola.component.exceptions import UserException

Expand Down
10 changes: 10 additions & 0 deletions python-sync-actions/tests/test_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,16 @@ def test_dedupe_keys(self):
res = StuctureAnalyzer.dedupe_values(data)
self.assertEqual(res, expected)

def test_list(self):
data = {"maxResults": 100, "startAt": 0, "total": 375, "values": [{"id": "12", "value":{ "name": "Max", "age": 25}},
{"id": "13", "value":{ "name": "Tom", "age": 30}},
{"id": "14", "value":{ "name": "John", "age": 35}}]}

expected = {'id': 'id', 'value.age': 'value_age', 'value.name': 'value_name'}
res = infer_mapping(data, max_level_nest_level=1)

self.assertEqual(res, expected)

@freeze_time("2021-01-01")
def test_infer_mapping_userdata(self):
component = self._get_test_component('test_007_infer_mapping_userdata')
Expand Down

0 comments on commit 15a9ce2

Please sign in to comment.