-
Notifications
You must be signed in to change notification settings - Fork 73
/
Copy pathjsonpath.py
41 lines (28 loc) · 1.03 KB
/
jsonpath.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
"""JSONPath helpers."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Generator
import memoization
from jsonpath_ng.ext import parse
if TYPE_CHECKING:
import jsonpath_ng
def extract_jsonpath(expression: str, input: dict | list) -> Generator[Any, None, None]:
"""Extract records from an input based on a JSONPath expression.
Args:
expression: JSONPath expression to match against the input.
input: JSON object or array to extract records from.
Yields:
Records matched with JSONPath expression.
"""
compiled_jsonpath = _compile_jsonpath(expression)
match: jsonpath_ng.DatumInContext
for match in compiled_jsonpath.find(input):
yield match.value
@memoization.cached
def _compile_jsonpath(expression: str) -> jsonpath_ng.JSONPath:
"""Parse a JSONPath expression and cache the result.
Args:
expression: A string representing a JSONPath expression.
Returns:
A compiled JSONPath object.
"""
return parse(expression)