From a81f3deb51f9eddd5a0937735b33c0d03666f019 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 24 May 2022 11:57:05 +0100 Subject: [PATCH 1/6] Add authentication to HTTP API --- distributed/distributed-schema.yaml | 136 +++++++++--------- distributed/distributed.yaml | 1 + distributed/http/scheduler/api.py | 31 ++++ .../scheduler/tests/test_scheduler_http.py | 75 +++++++++- docs/source/http_services.rst | 15 +- 5 files changed, 180 insertions(+), 78 deletions(-) diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 942eab8ff9a..664e7e28ee0 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -2,14 +2,12 @@ properties: distributed: type: object properties: - version: type: integer scheduler: type: object properties: - allowed-failures: type: integer minimum: 0 @@ -22,8 +20,8 @@ properties: bandwidth: type: - - integer - - string + - integer + - string description: | The expected bandwidth between any pair of workers @@ -45,8 +43,8 @@ properties: contact-address: type: - - string - - "null" + - string + - "null" description: | The address that the scheduler advertises to workers for communication with it. @@ -56,8 +54,8 @@ properties: default-data-size: type: - - string - - integer + - string + - integer description: | The default size of a piece of data if we don't know anything about it. @@ -71,8 +69,8 @@ properties: idle-timeout: type: - - string - - "null" + - string + - "null" description: | Shut down the scheduler after this duration if no activity has occured @@ -119,8 +117,8 @@ properties: worker-ttl: type: - - string - - "null" + - string + - "null" description: | Time to live for workers. @@ -205,16 +203,16 @@ properties: properties: ca-file: type: - - string - - "null" + - string + - "null" key: type: - - string - - "null" + - string + - "null" cert: type: - - string - - "null" + - string + - "null" bokeh-application: type: object description: | @@ -239,6 +237,10 @@ properties: type: object description: Settings for Dask's embedded HTTP Server properties: + api-key: + type: string + description: | + API key required to access private HTTP API methods. routes: type: array description: | @@ -263,8 +265,7 @@ properties: description: set to true to auto-start the AMM on Scheduler init interval: type: string - description: - Time expression, e.g. "2s". Run the AMM cycle every . + description: Time expression, e.g. "2s". Run the AMM cycle every . policies: type: array items: @@ -273,7 +274,8 @@ properties: properties: class: type: string - description: fully qualified name of an ActiveMemoryManagerPolicy + description: + fully qualified name of an ActiveMemoryManagerPolicy subclass additionalProperties: description: keyword arguments to the policy constructor, if any @@ -376,8 +378,8 @@ properties: properties: duration: type: - - string - - "null" + - string + - "null" description: | The time after creation to close the worker, like "1 hour" stagger: @@ -397,7 +399,6 @@ properties: description: | Do we try to resurrect the worker after the lifetime deadline? - profile: type: object description: | @@ -484,8 +485,8 @@ properties: target: oneOf: - - {type: number, minimum: 0, maximum: 1} - - {enum: [false]} + - { type: number, minimum: 0, maximum: 1 } + - { enum: [false] } description: >- When the process memory (as observed by the operating system) gets above this amount we start spilling the dask keys holding the largest @@ -493,24 +494,24 @@ properties: spill: oneOf: - - {type: number, minimum: 0, maximum: 1} - - {enum: [false]} + - { type: number, minimum: 0, maximum: 1 } + - { enum: [false] } description: >- When the process memory (as observed by the operating system) gets above this amount we spill all data to disk. pause: oneOf: - - {type: number, minimum: 0, maximum: 1} - - {enum: [false]} + - { type: number, minimum: 0, maximum: 1 } + - { enum: [false] } description: >- When the process memory (as observed by the operating system) gets above this amount we no longer start new tasks on this worker. terminate: oneOf: - - {type: number, minimum: 0, maximum: 1} - - {enum: [false]} + - { type: number, minimum: 0, maximum: 1 } + - { enum: [false] } description: >- When the process memory reaches this level the nanny process will kill the worker (if a nanny is present) @@ -518,7 +519,7 @@ properties: max-spill: oneOf: - type: string - - {type: number, minimum: 0} + - { type: number, minimum: 0 } - enum: [false] description: >- Limit of number of bytes to be spilled on disk. @@ -545,7 +546,6 @@ properties: description: | Configuration settings for Dask Nannies properties: - preload: type: array description: | @@ -574,8 +574,7 @@ properties: properties: heartbeat: type: string - description: - This value is the time between heartbeats + description: This value is the time between heartbeats The client sends a periodic heartbeat message to the scheduler. If it misses enough of these then the scheduler assumes that it has gone. @@ -585,7 +584,7 @@ properties: description: Interval between scheduler-info updates security-loader: - type: [string, 'null'] + type: [string, "null"] description: | A fully qualified name (e.g. ``module.submodule.function``) of a callback to use for loading security credentials for the @@ -609,7 +608,6 @@ properties: See https://docs.dask.org/en/latest/how-to/customize-initialization.html for more information - deploy: type: object description: Configuration settings for general Dask deployment @@ -670,13 +668,11 @@ properties: type: object description: Configuration settings for Dask communications properties: - retry: type: object description: | Some operations (such as gathering data) are subject to re-tries with the below parameters properties: - count: type: integer minimum: 0 @@ -702,8 +698,8 @@ properties: offload: type: - - boolean - - string + - boolean + - string description: | The size of message after which we choose to offload serialization to another thread @@ -755,8 +751,8 @@ properties: require-encryption: type: - - boolean - - "null" + - boolean + - "null" description: | Whether to require encryption on non-local comms @@ -774,8 +770,8 @@ properties: properties: ciphers: type: - - string - - "null" + - string + - "null" description: Allowed ciphers, specified as an OpenSSL cipher string. min-version: @@ -790,8 +786,8 @@ properties: ca-file: type: - - string - - "null" + - string + - "null" description: Path to a CA file, in pem format scheduler: @@ -800,13 +796,13 @@ properties: properties: cert: type: - - string - - "null" + - string + - "null" description: Path to certificate file key: type: - - string - - "null" + - string + - "null" description: | Path to key file. @@ -819,13 +815,13 @@ properties: properties: cert: type: - - string - - "null" + - string + - "null" description: Path to certificate file key: type: - - string - - "null" + - string + - "null" description: | Path to key file. @@ -838,13 +834,13 @@ properties: properties: cert: type: - - string - - "null" + - string + - "null" description: Path to certificate file key: type: - - string - - "null" + - string + - "null" description: | Path to key file. @@ -857,30 +853,30 @@ properties: UCX provides access to other transport methods including NVLink and InfiniBand. properties: cuda-copy: - type: [boolean, 'null'] + type: [boolean, "null"] description: | Set environment variables to enable CUDA support over UCX. This may be used even if InfiniBand and NVLink are not supported or disabled, then transferring data over TCP. tcp: - type: [boolean, 'null'] + type: [boolean, "null"] description: | Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled. nvlink: - type: [boolean, 'null'] + type: [boolean, "null"] description: | Set environment variables to enable UCX over NVLink, implies ``distributed.comm.ucx.tcp=True``. infiniband: - type: [boolean, 'null'] + type: [boolean, "null"] description: | Set environment variables to enable UCX over InfiniBand, implies ``distributed.comm.ucx.tcp=True``. rdmacm: - type: [boolean, 'null'] + type: [boolean, "null"] description: | Set environment variables to enable UCX RDMA connection manager support, requires ``distributed.comm.ucx.infiniband=True``. create-cuda-context: - type: [boolean, 'null'] + type: [boolean, "null"] description: | Creates a CUDA context before UCX is initialized. This is necessary to enable UCX to properly identify connectivity of GPUs with specialized networking hardware, such as @@ -902,7 +898,7 @@ properties: properties: shard: type: - - string + - string description: | The maximum size of a websocket frame to send through a comm. @@ -984,10 +980,10 @@ properties: interval: type: string description: The time between ticks, default 20ms - limit : + limit: type: string description: The time allowed before triggering a warning - cycle : + cycle: type: string description: The time in between verifying event loop speed @@ -1042,6 +1038,6 @@ properties: Configuration options for the RAPIDS Memory Manager. properties: pool-size: - type: [integer, 'null'] + type: [integer, "null"] description: | The size of the memory pool in bytes. diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 74d59addb35..dd8470ce190 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -49,6 +49,7 @@ distributed: lease-timeout: 30s # Maximum interval to wait for a Client refresh before a lease is invalidated and released. http: + api-key: null routes: - distributed.http.scheduler.prometheus - distributed.http.scheduler.info diff --git a/distributed/http/scheduler/api.py b/distributed/http/scheduler/api.py index 0a710a58ffd..d408f9c95f7 100644 --- a/distributed/http/scheduler/api.py +++ b/distributed/http/scheduler/api.py @@ -1,10 +1,40 @@ from __future__ import annotations +import asyncio +import functools import json +import dask.config + from distributed.http.utils import RequestHandler +def require_auth(method_func): + @functools.wraps(method_func) + def wrapper(self): + auth = self.request.headers.get("Authorization", None) + key = dask.config.get("distributed.scheduler.http.api-key") + if key and ( + not auth + or not auth.startswith("Bearer ") + or not key + or key != auth.split(" ")[-1] + ): + self.set_status(403, "Unauthorized") + return + + if not asyncio.iscoroutinefunction(method_func): + return method_func(self) + else: + + async def tmp(): + return await method_func(self) + + return tmp() + + return wrapper + + class APIHandler(RequestHandler): def get(self): self.write("API V1") @@ -12,6 +42,7 @@ def get(self): class RetireWorkersHandler(RequestHandler): + @require_auth async def post(self): self.set_header("Content-Type", "application/json") scheduler = self.server diff --git a/distributed/http/scheduler/tests/test_scheduler_http.py b/distributed/http/scheduler/tests/test_scheduler_http.py index e86f004b47e..5e50167f128 100644 --- a/distributed/http/scheduler/tests/test_scheduler_http.py +++ b/distributed/http/scheduler/tests/test_scheduler_http.py @@ -280,15 +280,78 @@ async def test_api(c, s, a, b): clean_kwargs={"threads": False}, config={ "distributed.scheduler.http.routes": DEFAULT_ROUTES - + ["distributed.http.scheduler.api"] + + ["distributed.http.scheduler.api"], + "distributed.scheduler.http.api-key": "abc123", }, ) -async def test_retire_workers(c, s, a, b): +async def test_api_auth(c, s, a, b): async with aiohttp.ClientSession() as session: + url = f"http://localhost:{s.http_server.port}/api/v1/retire_workers" params = {"workers": [a.address, b.address]} + + async with session.post(url, json=params) as resp: + assert resp.status == 403 + + async with session.post( + url, json=params, headers={"Authorization": "Bearer foobarbaz"} + ) as resp: + assert resp.status == 403 + + async with session.post( + url, json=params, headers={"Authorization": "Bearer abc"} + ) as resp: + assert resp.status == 403 + + async with session.post( + url, json=params, headers={"Authorization": "Bearer "} + ) as resp: + assert resp.status == 403 + + async with session.post( + url, json=params, headers={"Authorization": "Bearer abc123456"} + ) as resp: + assert resp.status == 403 + + async with session.post( + url, json=params, headers={"Authorization": "Bearer abc123"} + ) as resp: + assert resp.status == 200 + + +@gen_cluster( + client=True, + clean_kwargs={"threads": False}, + config={ + "distributed.scheduler.http.routes": DEFAULT_ROUTES + + ["distributed.http.scheduler.api"], + "distributed.scheduler.http.api-key": False, + }, +) +async def test_api_auth_disabled(c, s, a, b): + async with aiohttp.ClientSession() as session: + url = f"http://localhost:{s.http_server.port}/api/v1/retire_workers" + params = {"workers": [a.address, b.address]} + + async with session.post(url, json=params) as resp: + assert resp.status == 200 + + +@gen_cluster( + client=True, + clean_kwargs={"threads": False}, + config={ + "distributed.scheduler.http.routes": DEFAULT_ROUTES + + ["distributed.http.scheduler.api"], + "distributed.scheduler.http.api-key": "abc123", + }, +) +async def test_api_retire_workers(c, s, a, b): + async with aiohttp.ClientSession() as session: + url = f"http://localhost:{s.http_server.port}/api/v1/retire_workers" + params = {"workers": [a.address, b.address]} + async with session.post( - "http://localhost:%d/api/v1/retire_workers" % s.http_server.port, - json=params, + url, json=params, headers={"Authorization": "Bearer abc123"} ) as resp: assert resp.status == 200 assert resp.headers["Content-Type"] == "application/json" @@ -304,7 +367,7 @@ async def test_retire_workers(c, s, a, b): + ["distributed.http.scheduler.api"] }, ) -async def test_get_workers(c, s, a, b): +async def test_api_get_workers(c, s, a, b): async with aiohttp.ClientSession() as session: async with session.get( "http://localhost:%d/api/v1/get_workers" % s.http_server.port @@ -324,7 +387,7 @@ async def test_get_workers(c, s, a, b): + ["distributed.http.scheduler.api"] }, ) -async def test_adaptive_target(c, s, a, b): +async def test_api_adaptive_target(c, s, a, b): async with aiohttp.ClientSession() as session: async with session.get( "http://localhost:%d/api/v1/adaptive_target" % s.http_server.port diff --git a/docs/source/http_services.rst b/docs/source/http_services.rst index 31bb62292a7..b78a569888d 100644 --- a/docs/source/http_services.rst +++ b/docs/source/http_services.rst @@ -54,7 +54,7 @@ Scheduler API Scheduler methods exposed by the API with an example of the request body they take -- ``/api/v1/retire_workers`` : retire certain workers on the scheduler +- ``/api/v1/retire_workers`` : retire certain workers on the scheduler (requires auth) .. code-block:: json @@ -63,7 +63,18 @@ Scheduler methods exposed by the API with an example of the request body they ta } - ``/api/v1/get_workers`` : get all workers on the scheduler -- ``/api/v1/adaptive_target`` : get the target number of workers based on the scheduler's load +- ``/api/v1/adaptive_target`` : get the target number of workers based on the scheduler's load + +.. note:: + API methods that modify the state of the scheduler require an API key to be set in the ``Authorization`` header. + This API key can be set via ``distributed.scheduler.http.api-key`` in the Dask config. + + .. code-block:: json + + $ curl -H "Authorization: Bearer {api-key}" http://localhost:8787/api/v1/retire_workers + +.. warning:: + API authentication can be disabled by setting ``distributed.scheduler.http.api-key`` to ``False`` but this is not recommended. Individual bokeh plots ---------------------- From f31dfcab12043c15bc6c741d18e1a95a4537b9c3 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 24 May 2022 11:57:47 +0100 Subject: [PATCH 2/6] Revert prettier formatting --- distributed/distributed-schema.yaml | 134 +++++++++++++++------------- 1 file changed, 71 insertions(+), 63 deletions(-) diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 664e7e28ee0..117fdbe8e59 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -2,12 +2,14 @@ properties: distributed: type: object properties: + version: type: integer scheduler: type: object properties: + allowed-failures: type: integer minimum: 0 @@ -20,8 +22,8 @@ properties: bandwidth: type: - - integer - - string + - integer + - string description: | The expected bandwidth between any pair of workers @@ -43,8 +45,8 @@ properties: contact-address: type: - - string - - "null" + - string + - "null" description: | The address that the scheduler advertises to workers for communication with it. @@ -54,8 +56,8 @@ properties: default-data-size: type: - - string - - integer + - string + - integer description: | The default size of a piece of data if we don't know anything about it. @@ -69,8 +71,8 @@ properties: idle-timeout: type: - - string - - "null" + - string + - "null" description: | Shut down the scheduler after this duration if no activity has occured @@ -117,8 +119,8 @@ properties: worker-ttl: type: - - string - - "null" + - string + - "null" description: | Time to live for workers. @@ -203,16 +205,16 @@ properties: properties: ca-file: type: - - string - - "null" + - string + - "null" key: type: - - string - - "null" + - string + - "null" cert: type: - - string - - "null" + - string + - "null" bokeh-application: type: object description: | @@ -240,7 +242,7 @@ properties: api-key: type: string description: | - API key required to access private HTTP API methods. + API key required to access private HTTP API methods routes: type: array description: | @@ -265,7 +267,8 @@ properties: description: set to true to auto-start the AMM on Scheduler init interval: type: string - description: Time expression, e.g. "2s". Run the AMM cycle every . + description: + Time expression, e.g. "2s". Run the AMM cycle every . policies: type: array items: @@ -274,8 +277,7 @@ properties: properties: class: type: string - description: - fully qualified name of an ActiveMemoryManagerPolicy + description: fully qualified name of an ActiveMemoryManagerPolicy subclass additionalProperties: description: keyword arguments to the policy constructor, if any @@ -378,8 +380,8 @@ properties: properties: duration: type: - - string - - "null" + - string + - "null" description: | The time after creation to close the worker, like "1 hour" stagger: @@ -399,6 +401,7 @@ properties: description: | Do we try to resurrect the worker after the lifetime deadline? + profile: type: object description: | @@ -485,8 +488,8 @@ properties: target: oneOf: - - { type: number, minimum: 0, maximum: 1 } - - { enum: [false] } + - {type: number, minimum: 0, maximum: 1} + - {enum: [false]} description: >- When the process memory (as observed by the operating system) gets above this amount we start spilling the dask keys holding the largest @@ -494,24 +497,24 @@ properties: spill: oneOf: - - { type: number, minimum: 0, maximum: 1 } - - { enum: [false] } + - {type: number, minimum: 0, maximum: 1} + - {enum: [false]} description: >- When the process memory (as observed by the operating system) gets above this amount we spill all data to disk. pause: oneOf: - - { type: number, minimum: 0, maximum: 1 } - - { enum: [false] } + - {type: number, minimum: 0, maximum: 1} + - {enum: [false]} description: >- When the process memory (as observed by the operating system) gets above this amount we no longer start new tasks on this worker. terminate: oneOf: - - { type: number, minimum: 0, maximum: 1 } - - { enum: [false] } + - {type: number, minimum: 0, maximum: 1} + - {enum: [false]} description: >- When the process memory reaches this level the nanny process will kill the worker (if a nanny is present) @@ -519,7 +522,7 @@ properties: max-spill: oneOf: - type: string - - { type: number, minimum: 0 } + - {type: number, minimum: 0} - enum: [false] description: >- Limit of number of bytes to be spilled on disk. @@ -546,6 +549,7 @@ properties: description: | Configuration settings for Dask Nannies properties: + preload: type: array description: | @@ -574,7 +578,8 @@ properties: properties: heartbeat: type: string - description: This value is the time between heartbeats + description: + This value is the time between heartbeats The client sends a periodic heartbeat message to the scheduler. If it misses enough of these then the scheduler assumes that it has gone. @@ -584,7 +589,7 @@ properties: description: Interval between scheduler-info updates security-loader: - type: [string, "null"] + type: [string, 'null'] description: | A fully qualified name (e.g. ``module.submodule.function``) of a callback to use for loading security credentials for the @@ -608,6 +613,7 @@ properties: See https://docs.dask.org/en/latest/how-to/customize-initialization.html for more information + deploy: type: object description: Configuration settings for general Dask deployment @@ -668,11 +674,13 @@ properties: type: object description: Configuration settings for Dask communications properties: + retry: type: object description: | Some operations (such as gathering data) are subject to re-tries with the below parameters properties: + count: type: integer minimum: 0 @@ -698,8 +706,8 @@ properties: offload: type: - - boolean - - string + - boolean + - string description: | The size of message after which we choose to offload serialization to another thread @@ -751,8 +759,8 @@ properties: require-encryption: type: - - boolean - - "null" + - boolean + - "null" description: | Whether to require encryption on non-local comms @@ -770,8 +778,8 @@ properties: properties: ciphers: type: - - string - - "null" + - string + - "null" description: Allowed ciphers, specified as an OpenSSL cipher string. min-version: @@ -786,8 +794,8 @@ properties: ca-file: type: - - string - - "null" + - string + - "null" description: Path to a CA file, in pem format scheduler: @@ -796,13 +804,13 @@ properties: properties: cert: type: - - string - - "null" + - string + - "null" description: Path to certificate file key: type: - - string - - "null" + - string + - "null" description: | Path to key file. @@ -815,13 +823,13 @@ properties: properties: cert: type: - - string - - "null" + - string + - "null" description: Path to certificate file key: type: - - string - - "null" + - string + - "null" description: | Path to key file. @@ -834,13 +842,13 @@ properties: properties: cert: type: - - string - - "null" + - string + - "null" description: Path to certificate file key: type: - - string - - "null" + - string + - "null" description: | Path to key file. @@ -853,30 +861,30 @@ properties: UCX provides access to other transport methods including NVLink and InfiniBand. properties: cuda-copy: - type: [boolean, "null"] + type: [boolean, 'null'] description: | Set environment variables to enable CUDA support over UCX. This may be used even if InfiniBand and NVLink are not supported or disabled, then transferring data over TCP. tcp: - type: [boolean, "null"] + type: [boolean, 'null'] description: | Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled. nvlink: - type: [boolean, "null"] + type: [boolean, 'null'] description: | Set environment variables to enable UCX over NVLink, implies ``distributed.comm.ucx.tcp=True``. infiniband: - type: [boolean, "null"] + type: [boolean, 'null'] description: | Set environment variables to enable UCX over InfiniBand, implies ``distributed.comm.ucx.tcp=True``. rdmacm: - type: [boolean, "null"] + type: [boolean, 'null'] description: | Set environment variables to enable UCX RDMA connection manager support, requires ``distributed.comm.ucx.infiniband=True``. create-cuda-context: - type: [boolean, "null"] + type: [boolean, 'null'] description: | Creates a CUDA context before UCX is initialized. This is necessary to enable UCX to properly identify connectivity of GPUs with specialized networking hardware, such as @@ -898,7 +906,7 @@ properties: properties: shard: type: - - string + - string description: | The maximum size of a websocket frame to send through a comm. @@ -980,10 +988,10 @@ properties: interval: type: string description: The time between ticks, default 20ms - limit: + limit : type: string description: The time allowed before triggering a warning - cycle: + cycle : type: string description: The time in between verifying event loop speed @@ -1038,6 +1046,6 @@ properties: Configuration options for the RAPIDS Memory Manager. properties: pool-size: - type: [integer, "null"] + type: [integer, 'null'] description: | The size of the memory pool in bytes. From 85f38ab0c572af9c222460c076df738bfae76dd4 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 24 May 2022 11:58:14 +0100 Subject: [PATCH 3/6] Fix code language --- docs/source/http_services.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/http_services.rst b/docs/source/http_services.rst index b78a569888d..047fcd46e60 100644 --- a/docs/source/http_services.rst +++ b/docs/source/http_services.rst @@ -69,7 +69,7 @@ Scheduler methods exposed by the API with an example of the request body they ta API methods that modify the state of the scheduler require an API key to be set in the ``Authorization`` header. This API key can be set via ``distributed.scheduler.http.api-key`` in the Dask config. - .. code-block:: json + .. code-block:: console $ curl -H "Authorization: Bearer {api-key}" http://localhost:8787/api/v1/retire_workers From a723ef2bd69c2292eabba330378d2afa47e547a3 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 24 May 2022 12:02:42 +0100 Subject: [PATCH 4/6] Reinstate API by default but disallow all auth by default --- distributed/distributed.yaml | 1 + distributed/http/scheduler/api.py | 10 ++-- .../scheduler/tests/test_scheduler_http.py | 49 +++++-------------- 3 files changed, 19 insertions(+), 41 deletions(-) diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index dd8470ce190..f61cf7f826b 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -57,6 +57,7 @@ distributed: - distributed.http.health - distributed.http.proxy - distributed.http.statics + - distributed.http.scheduler.api allowed-imports: - dask diff --git a/distributed/http/scheduler/api.py b/distributed/http/scheduler/api.py index d408f9c95f7..6680fcf9ef3 100644 --- a/distributed/http/scheduler/api.py +++ b/distributed/http/scheduler/api.py @@ -14,11 +14,11 @@ def require_auth(method_func): def wrapper(self): auth = self.request.headers.get("Authorization", None) key = dask.config.get("distributed.scheduler.http.api-key") - if key and ( - not auth - or not auth.startswith("Bearer ") - or not key - or key != auth.split(" ")[-1] + if key is None or ( + key + and ( + not auth or not auth.startswith("Bearer ") or key != auth.split(" ")[-1] + ) ): self.set_status(403, "Unauthorized") return diff --git a/distributed/http/scheduler/tests/test_scheduler_http.py b/distributed/http/scheduler/tests/test_scheduler_http.py index 5e50167f128..a9858a57b4c 100644 --- a/distributed/http/scheduler/tests/test_scheduler_http.py +++ b/distributed/http/scheduler/tests/test_scheduler_http.py @@ -251,20 +251,7 @@ async def test_eventstream(c, s, a, b): ws_client.close() -def test_api_disabled_by_default(): - assert "distributed.http.scheduler.api" not in dask.config.get( - "distributed.scheduler.http.routes" - ) - - -@gen_cluster( - client=True, - clean_kwargs={"threads": False}, - config={ - "distributed.scheduler.http.routes": DEFAULT_ROUTES - + ["distributed.http.scheduler.api"] - }, -) +@gen_cluster(client=True, clean_kwargs={"threads": False}) async def test_api(c, s, a, b): async with aiohttp.ClientSession() as session: async with session.get( @@ -275,12 +262,20 @@ async def test_api(c, s, a, b): assert (await resp.text()) == "API V1" +@gen_cluster(client=True, clean_kwargs={"threads": False}) +async def test_api_auth_defaults(c, s, a, b): + async with aiohttp.ClientSession() as session: + url = f"http://localhost:{s.http_server.port}/api/v1/retire_workers" + params = {"workers": [a.address, b.address]} + + async with session.post(url, json=params) as resp: + assert resp.status == 403 + + @gen_cluster( client=True, clean_kwargs={"threads": False}, config={ - "distributed.scheduler.http.routes": DEFAULT_ROUTES - + ["distributed.http.scheduler.api"], "distributed.scheduler.http.api-key": "abc123", }, ) @@ -322,8 +317,6 @@ async def test_api_auth(c, s, a, b): client=True, clean_kwargs={"threads": False}, config={ - "distributed.scheduler.http.routes": DEFAULT_ROUTES - + ["distributed.http.scheduler.api"], "distributed.scheduler.http.api-key": False, }, ) @@ -340,8 +333,6 @@ async def test_api_auth_disabled(c, s, a, b): client=True, clean_kwargs={"threads": False}, config={ - "distributed.scheduler.http.routes": DEFAULT_ROUTES - + ["distributed.http.scheduler.api"], "distributed.scheduler.http.api-key": "abc123", }, ) @@ -359,14 +350,7 @@ async def test_api_retire_workers(c, s, a, b): assert len(retired_workers_info) == 2 -@gen_cluster( - client=True, - clean_kwargs={"threads": False}, - config={ - "distributed.scheduler.http.routes": DEFAULT_ROUTES - + ["distributed.http.scheduler.api"] - }, -) +@gen_cluster(client=True, clean_kwargs={"threads": False}) async def test_api_get_workers(c, s, a, b): async with aiohttp.ClientSession() as session: async with session.get( @@ -379,14 +363,7 @@ async def test_api_get_workers(c, s, a, b): assert set(workers_address) == {a.address, b.address} -@gen_cluster( - client=True, - clean_kwargs={"threads": False}, - config={ - "distributed.scheduler.http.routes": DEFAULT_ROUTES - + ["distributed.http.scheduler.api"] - }, -) +@gen_cluster(client=True, clean_kwargs={"threads": False}) async def test_api_adaptive_target(c, s, a, b): async with aiohttp.ClientSession() as session: async with session.get( From f8fef3b79188a743369dd254166eba1eb8793a1e Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 24 May 2022 14:36:46 +0100 Subject: [PATCH 5/6] Default API key to empty string which also always gives 403 --- distributed/distributed.yaml | 2 +- distributed/http/scheduler/api.py | 14 ++++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index f61cf7f826b..2587ea3012e 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -49,7 +49,7 @@ distributed: lease-timeout: 30s # Maximum interval to wait for a Client refresh before a lease is invalidated and released. http: - api-key: null + api-key: "" routes: - distributed.http.scheduler.prometheus - distributed.http.scheduler.info diff --git a/distributed/http/scheduler/api.py b/distributed/http/scheduler/api.py index 6680fcf9ef3..a4cb72a3f5f 100644 --- a/distributed/http/scheduler/api.py +++ b/distributed/http/scheduler/api.py @@ -14,10 +14,16 @@ def require_auth(method_func): def wrapper(self): auth = self.request.headers.get("Authorization", None) key = dask.config.get("distributed.scheduler.http.api-key") - if key is None or ( - key - and ( - not auth or not auth.startswith("Bearer ") or key != auth.split(" ")[-1] + if ( + key is None + or key == "" + or ( + key + and ( + not auth + or not auth.startswith("Bearer ") + or key != auth.split(" ")[-1] + ) ) ): self.set_status(403, "Unauthorized") From 37eabda8e68c5b868ba8ce4067a44399bd982112 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 24 May 2022 14:50:02 +0100 Subject: [PATCH 6/6] Flip if statement to simplify things --- distributed/http/scheduler/api.py | 36 +++++++++++++++---------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/distributed/http/scheduler/api.py b/distributed/http/scheduler/api.py index a4cb72a3f5f..42300e14e91 100644 --- a/distributed/http/scheduler/api.py +++ b/distributed/http/scheduler/api.py @@ -14,29 +14,27 @@ def require_auth(method_func): def wrapper(self): auth = self.request.headers.get("Authorization", None) key = dask.config.get("distributed.scheduler.http.api-key") - if ( - key is None - or key == "" - or ( - key - and ( - not auth - or not auth.startswith("Bearer ") - or key != auth.split(" ")[-1] - ) - ) + if key is False or ( + key and auth and auth.startswith("Bearer ") and key == auth.split(" ")[-1] ): - self.set_status(403, "Unauthorized") - return + if not asyncio.iscoroutinefunction(method_func): + return method_func(self) + else: - if not asyncio.iscoroutinefunction(method_func): - return method_func(self) - else: + async def tmp(): + return await method_func(self) - async def tmp(): - return await method_func(self) + return tmp() + else: + self.set_status(403, "Unauthorized") + if not asyncio.iscoroutinefunction(method_func): + return + else: + # When wrapping a coroutine we need to return a coroutine even if it just returns None + async def tmp(): + return - return tmp() + return tmp() return wrapper