Skip to content

Commit

Permalink
Show clearer grpc errors, as well as a custom failure for sensor time…
Browse files Browse the repository at this point in the history
…outs in particular since those are so common (#11576)

Summary:
- surface the error code instead of burying it in the nested cause error
- for timeouts, include the original timeout (and that it timed out)
- For sensors, include how you might fix it

### Summary & Motivation

### How I Tested These Changes
  • Loading branch information
gibsondan authored Jan 10, 2023
1 parent 3b1ff76 commit 5f18862
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 4 deletions.
7 changes: 6 additions & 1 deletion python_modules/dagster/dagster/_api/snapshot_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dagster._core.errors import DagsterUserCodeProcessError
from dagster._core.host_representation.external_data import ExternalSensorExecutionErrorData
from dagster._core.host_representation.handle import RepositoryHandle
from dagster._grpc.client import DEFAULT_GRPC_TIMEOUT
from dagster._grpc.types import SensorExecutionArgs
from dagster._serdes import deserialize_as

Expand All @@ -20,6 +21,7 @@ def sync_get_external_sensor_execution_data_ephemeral_grpc(
last_completion_time: Optional[float],
last_run_key: Optional[str],
cursor: Optional[str],
timeout: Optional[int] = DEFAULT_GRPC_TIMEOUT,
) -> SensorExecutionData:
from dagster._grpc.client import ephemeral_grpc_api_client

Expand All @@ -35,6 +37,7 @@ def sync_get_external_sensor_execution_data_ephemeral_grpc(
last_completion_time,
last_run_key,
cursor,
timeout=timeout,
)


Expand All @@ -46,6 +49,7 @@ def sync_get_external_sensor_execution_data_grpc(
last_completion_time: Optional[float],
last_run_key: Optional[str],
cursor: Optional[str],
timeout: Optional[int] = DEFAULT_GRPC_TIMEOUT,
) -> SensorExecutionData:
check.inst_param(repository_handle, "repository_handle", RepositoryHandle)
check.str_param(sensor_name, "sensor_name")
Expand All @@ -64,7 +68,8 @@ def sync_get_external_sensor_execution_data_grpc(
last_completion_time=last_completion_time,
last_run_key=last_run_key,
cursor=cursor,
)
),
timeout=timeout,
),
(SensorExecutionData, ExternalSensorExecutionErrorData),
)
Expand Down
32 changes: 30 additions & 2 deletions python_modules/dagster/dagster/_grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,34 @@ def _get_response(
stub = DagsterApiStub(channel)
return getattr(stub, method)(request, metadata=self._metadata, timeout=timeout)

def _raise_grpc_exception(self, e: Exception, timeout, custom_timeout_message=None):
if isinstance(e, grpc.RpcError):
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
raise DagsterUserCodeUnreachableError(
custom_timeout_message
or f"User code server request timed out due to taking longer than {timeout} seconds to complete."
) from e
else:
raise DagsterUserCodeUnreachableError(
f"Could not reach user code server. gRPC Error code: {e.code().name}"
) from e
else:
raise DagsterUserCodeUnreachableError("Could not reach user code server") from e

def _query(
self,
method,
request_type,
timeout=DEFAULT_GRPC_TIMEOUT,
custom_timeout_message=None,
**kwargs,
):
try:
return self._get_response(method, request=request_type(**kwargs), timeout=timeout)
except Exception as e:
raise DagsterUserCodeUnreachableError("Could not reach user code server") from e
self._raise_grpc_exception(
e, timeout=timeout, custom_timeout_message=custom_timeout_message
)

def _get_streaming_response(
self,
Expand All @@ -155,14 +172,17 @@ def _streaming_query(
method,
request_type,
timeout=DEFAULT_GRPC_TIMEOUT,
custom_timeout_message=None,
**kwargs,
):
try:
yield from self._get_streaming_response(
method, request=request_type(**kwargs), timeout=timeout
)
except Exception as e:
raise DagsterUserCodeUnreachableError("Could not reach user code server") from e
self._raise_grpc_exception(
e, timeout=timeout, custom_timeout_message=custom_timeout_message
)

def ping(self, echo):
check.str_param(echo, "echo")
Expand Down Expand Up @@ -365,6 +385,13 @@ def external_sensor_execution(self, sensor_execution_args, timeout=DEFAULT_GRPC_
SensorExecutionArgs,
)

custom_timeout_message = (
f"The sensor tick timed out due to taking longer than {timeout} seconds to execute the"
" sensor function. One way to avoid this error is to break up the sensor work into"
" chunks, using cursors to let subsequent sensor calls pick up where the previous call"
" left off."
)

chunks = list(
self._streaming_query(
"ExternalSensorExecution",
Expand All @@ -373,6 +400,7 @@ def external_sensor_execution(self, sensor_execution_args, timeout=DEFAULT_GRPC_
serialized_external_sensor_execution_args=serialize_dagster_namedtuple(
sensor_execution_args
),
custom_timeout_message=custom_timeout_message,
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest
from dagster._api.snapshot_sensor import sync_get_external_sensor_execution_data_ephemeral_grpc
from dagster._core.definitions.sensor_definition import SensorExecutionData
from dagster._core.errors import DagsterUserCodeProcessError
from dagster._core.errors import DagsterUserCodeProcessError, DagsterUserCodeUnreachableError

from .utils import get_bar_repo_handle

Expand Down Expand Up @@ -32,3 +32,17 @@ def test_external_sensor_raises_dagster_error(instance):
sync_get_external_sensor_execution_data_ephemeral_grpc(
instance, repository_handle, "sensor_raises_dagster_error", None, None, None
)


def test_external_sensor_timeout(instance):
with get_bar_repo_handle(instance) as repository_handle:
with pytest.raises(
DagsterUserCodeUnreachableError,
match=(
"The sensor tick timed out due to taking longer than 0 seconds to execute the"
" sensor function."
),
):
sync_get_external_sensor_execution_data_ephemeral_grpc(
instance, repository_handle, "sensor_foo", None, None, None, timeout=0
)
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ def test_load_grpc_server(capfd):
assert f"Started Dagster code server for file {python_file} on port {port} in process" in out


def test_grpc_connection_error():
port = find_free_port()
client = DagsterGrpcClient(port=port, host="localhost")
with pytest.raises(
DagsterUserCodeUnreachableError,
match="Could not reach user code server. gRPC Error code: UNAVAILABLE",
):
client.ping("foobar")


def test_python_environment_args():
port = find_free_port()
python_file = file_relative_path(__file__, "grpc_repo.py")
Expand Down

0 comments on commit 5f18862

Please sign in to comment.