Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add managed streaming ingest #344

Merged
merged 60 commits into from
Dec 19, 2021
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
df08e75
Added OneApi error
AsafMah Aug 2, 2021
5c8a6a7
Merge branch 'master' into add-managed-streaming
AsafMah Aug 15, 2021
f9eeefc
Tidying up descriptors.py -
AsafMah Aug 16, 2021
b9a42df
Dev requirements - removed redundant checks, and made test libraries …
AsafMah Aug 16, 2021
4233543
Added base_ingest_client.py for common functionality, and added inges…
AsafMah Aug 16, 2021
bca1168
-Added ManagedStreamingIngestClient
AsafMah Oct 12, 2021
82bd440
Streamlined code and added tests
AsafMah Oct 18, 2021
49d0885
Merge branch 'master' into add-managed-streaming
AsafMah Oct 18, 2021
5cb1bd0
Add return reason
AsafMah Oct 26, 2021
d648d16
warning fixes
AsafMah Oct 26, 2021
f4261c1
warning
AsafMah Oct 26, 2021
ad670db
Added ManagedStreamingIngest cases for other clients,
AsafMah Oct 26, 2021
46f39b2
Moved properties check to where it belongs
AsafMah Oct 26, 2021
73927d6
Never compress binary files on any ingestions
AsafMah Oct 27, 2021
e74dad2
Black
AsafMah Oct 27, 2021
5d130ca
Fixed error message
AsafMah Oct 28, 2021
a348cb0
Renamed for clarity
AsafMah Oct 28, 2021
c0f6585
ManagedStreamingIngest - unit tests
AsafMah Oct 28, 2021
f34b079
Added e2e tests
AsafMah Oct 31, 2021
541bdfd
Merge branch 'master' into add-managed-streaming
AsafMah Oct 31, 2021
372ffeb
docs
AsafMah Oct 31, 2021
4c98a87
Removed unnecessary type information (it infers it from the ctor)
AsafMah Oct 31, 2021
05641e4
Fixed source id type
AsafMah Oct 31, 2021
e135b7d
Removed useless call
AsafMah Oct 31, 2021
1228158
Add back asgrief to test
AsafMah Nov 1, 2021
9015494
Removed evil init.py
AsafMah Nov 1, 2021
0e23cd8
trying to push a commit
AsafMah Nov 1, 2021
e3924ab
Undoing the commit
AsafMah Nov 1, 2021
6f4b12a
Fixed imports
AsafMah Nov 1, 2021
f06fea2
Changes to comply -
AsafMah Nov 1, 2021
2d6e697
black
AsafMah Nov 1, 2021
dad264a
Merge branch 'master' into add-managed-streaming
AsafMah Nov 8, 2021
08dcb87
Merged in StreamingResponse
AsafMah Nov 8, 2021
792f4ae
black
AsafMah Nov 8, 2021
4efb3ef
PR fixes
AsafMah Nov 9, 2021
fcaf175
PR fixes
AsafMah Nov 14, 2021
6d0104a
Replaced backoff with library
AsafMah Nov 18, 2021
43a8943
Revert "Replaced backoff with library"
AsafMah Nov 23, 2021
201ff26
Added retry class
AsafMah Nov 23, 2021
2e92d27
Baseline with c# -
AsafMah Nov 23, 2021
56b0a70
Merge branch 'master' into add-managed-streaming
AsafMah Nov 23, 2021
93220de
-Added special request id to ManagedStreaming
AsafMah Nov 24, 2021
d11b269
PR Fixes
AsafMah Nov 30, 2021
8a8fec2
Fixed tests
AsafMah Nov 30, 2021
19922dc
PR fixes
AsafMah Dec 2, 2021
2d25cfe
PR fixes
AsafMah Dec 6, 2021
59248ee
Added samples and docs
AsafMah Dec 6, 2021
6da808d
Merge branch 'master' into add-managed-streaming
AsafMah Dec 9, 2021
d6a5d86
black
AsafMah Dec 9, 2021
106eab9
Naming
AsafMah Dec 13, 2021
d3b5283
Specify unit
AsafMah Dec 13, 2021
c3e37c2
more naming
AsafMah Dec 13, 2021
8ad3d3e
Have 3 total attempts to match c#
AsafMah Dec 13, 2021
0cd4239
Rename to fit convention
AsafMah Dec 13, 2021
937c752
Rename client id to camel case to be in line with other sdks
AsafMah Dec 13, 2021
da78256
comment
AsafMah Dec 13, 2021
1684089
fixed test
AsafMah Dec 13, 2021
388e2f2
Fixed prefix
AsafMah Dec 13, 2021
0422620
Merge branch 'master' into add-managed-streaming
AsafMah Dec 19, 2021
d887995
PR fixes
AsafMah Dec 19, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions azure-kusto-data/azure/kusto/data/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import Iterator, List, Any, Union, Optional, Dict

from . import _converters
from .exceptions import KustoServiceError, KustoStreamingQueryError
from .exceptions import KustoMultiApiError, KustoStreamingQueryError


class WellKnownDataSet(Enum):
Expand Down Expand Up @@ -158,7 +158,7 @@ def __init__(self, json_table: Dict[str, Any]):
super().__init__(json_table)
errors = [row for row in json_table["Rows"] if isinstance(row, dict)]
if errors:
raise KustoServiceError(errors[0]["OneApiErrors"][0]["error"]["@message"], kusto_response=json_table)
raise KustoMultiApiError(errors)

@property
def rows(self) -> List[KustoResultRow]:
Expand Down
6 changes: 2 additions & 4 deletions azure-kusto-data/azure/kusto/data/aio/streaming_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from ijson import IncompleteJSONError

from azure.kusto.data._models import WellKnownDataSet
from azure.kusto.data.exceptions import KustoTokenParsingError, KustoServiceError, KustoUnsupportedApiError
from azure.kusto.data.exceptions import KustoTokenParsingError, KustoUnsupportedApiError, KustoApiError, KustoMultiApiError
from azure.kusto.data.streaming_response import JsonTokenType, FrameType, JsonToken


Expand Down Expand Up @@ -163,9 +163,7 @@ async def row_iterator(self) -> Iterator[list]:
while True:
token = await self.reader.read_token_of_type(JsonTokenType.START_ARRAY, JsonTokenType.END_ARRAY, JsonTokenType.START_MAP)
if token.token_type == JsonTokenType.START_MAP:
raise KustoServiceError(
"Received error in data: " + str(self.parse_object(skip_start=True))
) # Todo - replace this with a better error once managed streaming ingest is merged
raise KustoMultiApiError([await self.parse_object(skip_start=True)])
if token.token_type == JsonTokenType.END_ARRAY:
return
yield await self.parse_array(skip_start=True)
Expand Down
34 changes: 24 additions & 10 deletions azure-kusto-data/azure/kusto/data/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from copy import copy
from datetime import timedelta
from enum import Enum, unique
from typing import TYPE_CHECKING, Union, Callable, Optional, Any, Coroutine, List, Tuple
from typing import TYPE_CHECKING, Union, Callable, Optional, Any, Coroutine, List, Tuple, AnyStr, IO

import requests
from requests import Response
Expand All @@ -17,7 +17,7 @@

from ._version import VERSION
from .data_format import DataFormat
from .exceptions import KustoServiceError
from .exceptions import KustoServiceError, KustoApiError
from .response import KustoResponseDataSetV1, KustoResponseDataSetV2, KustoStreamingResponseDataSet, KustoResponseDataSet
from .security import _AadHelper
from .streaming_response import StreamingDataSetEnumerator, JsonTokenReader
Expand Down Expand Up @@ -92,6 +92,14 @@ def parse(cls, key: str) -> "KustoConnectionStringBuilder.ValidKeywords":
return cls.msi_auth
if key in ["msi_type"]:
return cls.msi_params
if key in ["az cli"]:
return cls.az_cli
if key in ["interactive login"]:
return cls.interactive_login
if key in ["login hint"]:
return cls.login_hint
if key in ["domain hint"]:
return cls.domain_hint
raise KeyError(key)

def is_secret(self) -> bool:
Expand Down Expand Up @@ -367,6 +375,7 @@ def with_aad_managed_service_identity_authentication(
if object_id is not None:
# Until we upgrade azure-identity to version 1.4.1, only client_id is excepted as a hint for user managed service identity
raise ValueError("User Managed Service Identity with object_id is temporarily not supported by azure identity 1.3.1. Please use client_id instead.")
# noinspection PyUnreachableCode
params["object_id"] = object_id
exclusive_pcount += 1

Expand All @@ -375,6 +384,7 @@ def with_aad_managed_service_identity_authentication(
raise ValueError(
"User Managed Service Identity with msi_res_id is temporarily not supported by azure identity 1.3.1. Please use client_id instead."
)
# noinspection PyUnreachableCode
params["msi_res_id"] = msi_res_id
exclusive_pcount += 1

Expand Down Expand Up @@ -580,6 +590,8 @@ class ClientRequestProperties:
For more information please look at: https://docs.microsoft.com/en-us/azure/kusto/api/netfx/request-properties
"""

OPTION_CLIENT_REQUEST_ID = "ClientRequestId"
AsafMah marked this conversation as resolved.
Show resolved Hide resolved

results_defer_partial_query_failures_option_name = "deferpartialqueryfailures"
request_timeout_option_name = "servertimeout"
no_request_timeout_option_name = "norequesttimeout"
Expand Down Expand Up @@ -717,15 +729,17 @@ def _handle_http_error(
if payload:
raise KustoServiceError("The ingestion endpoint does not exist. Please enable streaming ingestion on your cluster.", response) from exception

raise KustoServiceError("The requested endpoint '{}' does not exist.".format(endpoint), response) from exception
raise KustoServiceError(f"The requested endpoint '{endpoint}' does not exist.", response) from exception

if payload:
raise KustoServiceError(
"An error occurred while trying to ingest: Status: {0.status_code}, Reason: {0.reason}, Text: {1}.".format(response, response_text), response
) from exception
message = f"An error occurred while trying to ingest: Status: {status}, Reason: {response.reason}, Text: {response_text}."
if response_json:
raise KustoApiError(response_json, message, response) from exception

raise KustoServiceError(message, response) from exception

if response_json:
raise KustoServiceError([response_json], response) from exception
raise KustoApiError(response_json, http_response=response) from exception

if response_text:
raise KustoServiceError(response_text, response) from exception
Expand Down Expand Up @@ -864,7 +878,7 @@ def execute_streaming_ingest(
self,
database: str,
table: str,
stream: io.IOBase,
stream: IO[AnyStr],
stream_format: Union[DataFormat, str],
properties: Optional[ClientRequestProperties] = None,
mapping_name: str = None,
Expand Down Expand Up @@ -915,7 +929,7 @@ def _execute(
endpoint: str,
database: str,
query: Optional[str],
payload: Optional[io.IOBase],
payload: Optional[IO[AnyStr]],
timeout: timedelta,
properties: Optional[ClientRequestProperties] = None,
stream_response: bool = False,
Expand All @@ -934,7 +948,7 @@ def _execute(
response.raise_for_status()
return response
except Exception as e:
raise self._handle_http_error(e, self._query_endpoint, None, response, response.json(), response.text)
raise self._handle_http_error(e, self._query_endpoint, None, response, response.status_code, response.json(), response.text)

response_json = None
try:
Expand Down
73 changes: 71 additions & 2 deletions azure-kusto-data/azure/kusto/data/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
# Licensed under the MIT License
from typing import List, Union, TYPE_CHECKING, Optional, Dict, Any

import requests

if TYPE_CHECKING:
import requests

try:
from aiohttp import ClientResponse
except ImportError:
# No aio installed, ignore
ClientResponse = None
pass


Expand Down Expand Up @@ -57,10 +58,78 @@ def get_partial_results(self) -> Optional[Dict[str, Any]]:
return self.kusto_response


class OneApiError:
def __init__(self, code: str, message: str, type: str, description: str, context: dict, permanent: bool) -> None:
self.code = code
self.message = message
self.type = type
self.description = description
self.context = context
self.permanent = permanent

@staticmethod
def from_dict(obj: dict) -> "OneApiError":
code = obj["code"]
message = obj["message"]
type = obj["@type"]
description = obj["@message"]
context = obj["@context"]
permanent = obj["@permanent"]
return OneApiError(code, message, type, description, context, permanent)


class KustoMultiApiError(KustoServiceError):
"""
Represents a collection of standard API errors from kusto. Use `get_api_errors()` to retrieve more details.
"""

def __init__(self, errors: List[dict]):
self.errors = KustoMultiApiError.parse_errors(errors)
messages = [error.description for error in self.errors]
super().__init__(messages[0] if len(self.errors) == 1 else messages)

def get_api_errors(self) -> List[OneApiError]:
return self.errors

@staticmethod
def parse_errors(errors: List[dict]) -> List[OneApiError]:
parsed_errors = []
for error_block in errors:
one_api_errors = error_block.get("OneApiErrors", None)
if not one_api_errors:
continue
for inner_error in one_api_errors:
error_dict = inner_error.get("error", None)
if error_dict:
parsed_errors.append(OneApiError.from_dict(error_dict))
return parsed_errors


class KustoApiError(KustoServiceError):
AsafMah marked this conversation as resolved.
Show resolved Hide resolved
"""
Represents a standard API error from kusto. Use `get_api_error()` to retrieve more details.
"""

def __init__(self, error_dict: dict, message: str = None, http_response: "Union[requests.Response, ClientResponse, None]" = None, kusto_response=None):
self.error = OneApiError.from_dict(error_dict["error"])
super().__init__(message or self.error.description, http_response, kusto_response)

def get_api_error(self) -> OneApiError:
return self.error


class KustoClientError(KustoError):
"""Raised when a Kusto client is unable to send or complete a request."""


class KustoBlobError(KustoClientError):
def __init__(self, inner: Exception):
self.inner = inner

def message(self) -> str:
return f"Failed to upload blob: {self.inner}"


class KustoUnsupportedApiError(KustoError):
"""Raised when a Kusto client is unable to send or complete a request."""

Expand Down
6 changes: 2 additions & 4 deletions azure-kusto-data/azure/kusto/data/streaming_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from ijson import IncompleteJSONError

from azure.kusto.data._models import WellKnownDataSet
from azure.kusto.data.exceptions import KustoServiceError, KustoTokenParsingError, KustoUnsupportedApiError
from azure.kusto.data.exceptions import KustoServiceError, KustoTokenParsingError, KustoUnsupportedApiError, KustoApiError, KustoMultiApiError


class JsonTokenType(Enum):
Expand Down Expand Up @@ -202,9 +202,7 @@ def row_iterator(self) -> Iterator[list]:
if token.token_type == JsonTokenType.START_MAP:
# Todo - this method of error handling may be problematic, since after raising an error the iteration stops.
# This means that if there are more data or even more errors, we can't read them
raise KustoServiceError(
"Received error in data: " + str(self.parse_object(skip_start=True))
) # Todo - replace this with a better error once managed streaming ingest is merged
raise KustoMultiApiError([self.parse_object(skip_start=True)])
if token.token_type == JsonTokenType.END_ARRAY:
return
yield self.parse_array(skip_start=True)
Expand Down
Loading