Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get rid of TimedJSONWebSignatureSerializer #24519

Merged
merged 1 commit into from
Jun 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions airflow/utils/jwt_signer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime, timedelta
from typing import Any, Dict

import jwt


class JWTSigner:
"""
Signs and verifies JWT Token. Used to authorise and verify requests.

:param secret_key: key used to sign the request
:param expiration_time_in_seconds: time after which the token becomes invalid (in seconds)
:param audience: audience that the request is expected to have
:param leeway_in_seconds: leeway that allows for a small clock skew between the two parties
:param algorithm: algorithm used for signing
"""

def __init__(
self,
secret_key: str,
expiration_time_in_seconds: int,
audience: str,
leeway_in_seconds: int = 5,
algorithm: str = "HS512",
):
self._secret_key = secret_key
self._expiration_time_in_seconds = expiration_time_in_seconds
self._audience = audience
self._leeway_in_seconds = leeway_in_seconds
self._algorithm = algorithm

def generate_signed_token(self, extra_payload: Dict[str, Any]) -> str:
"""
Generate JWT with extra payload added.
:param extra_payload: extra payload that is added to the signed token
:return: signed token
"""
jwt_dict = {
"aud": self._audience,
"iat": datetime.utcnow(),
"nbf": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(seconds=self._expiration_time_in_seconds),
}
jwt_dict.update(extra_payload)
token = jwt.encode(
jwt_dict,
self._secret_key,
algorithm=self._algorithm,
)
return token

def verify_token(self, token: str) -> Dict[str, Any]:
payload = jwt.decode(
token,
self._secret_key,
leeway=timedelta(seconds=self._leeway_in_seconds),
algorithms=[self._algorithm],
options={
"verify_signature": True,
"require_exp": True,
"require_iat": True,
"require_nbf": True,
},
audience=self._audience,
)
return payload
18 changes: 9 additions & 9 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
from pathlib import Path
from typing import TYPE_CHECKING, Optional, Tuple

from itsdangerous import TimedJSONWebSignatureSerializer

from airflow.configuration import AirflowConfigException, conf
from airflow.utils.context import Context
from airflow.utils.helpers import parse_template_string, render_template_to_string
from airflow.utils.jwt_signer import JWTSigner
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
from airflow.utils.session import create_session

Expand Down Expand Up @@ -201,16 +200,17 @@ def _read(self, ti, try_number, metadata=None):
except (AirflowConfigException, ValueError):
pass

signer = TimedJSONWebSignatureSerializer(
signer = JWTSigner(
secret_key=conf.get('webserver', 'secret_key'),
algorithm_name='HS512',
expires_in=conf.getint('webserver', 'log_request_clock_grace', fallback=30),
# This isn't really a "salt", more of a signing context
salt='task-instance-logs',
expiration_time_in_seconds=conf.getint(
'webserver', 'log_request_clock_grace', fallback=30
),
audience="task-instance-logs",
)

response = httpx.get(
url, timeout=timeout, headers={'Authorization': signer.dumps(log_relative_path)}
url,
timeout=timeout,
headers={b'Authorization': signer.generate_signed_token({"filename": log_relative_path})},
)
response.encoding = "utf-8"

Expand Down
84 changes: 59 additions & 25 deletions airflow/utils/serve_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,55 +16,89 @@
# under the License.

"""Serve logs process"""
import logging
import os
import time

import gunicorn.app.base
from flask import Flask, abort, request, send_from_directory
from itsdangerous import TimedJSONWebSignatureSerializer
from jwt.exceptions import (
ExpiredSignatureError,
ImmatureSignatureError,
InvalidAudienceError,
InvalidIssuedAtError,
InvalidSignatureError,
)
from setproctitle import setproctitle

from airflow.configuration import conf
from airflow.utils.docs import get_docs_url
from airflow.utils.jwt_signer import JWTSigner

logger = logging.getLogger(__name__)


def create_app():
flask_app = Flask(__name__, static_folder=None)
max_request_age = conf.getint('webserver', 'log_request_clock_grace', fallback=30)
expiration_time_in_seconds = conf.getint('webserver', 'log_request_clock_grace', fallback=30)
log_directory = os.path.expanduser(conf.get('logging', 'BASE_LOG_FOLDER'))

signer = TimedJSONWebSignatureSerializer(
signer = JWTSigner(
secret_key=conf.get('webserver', 'secret_key'),
algorithm_name='HS512',
expires_in=max_request_age,
# This isn't really a "salt", more of a signing context
salt='task-instance-logs',
expiration_time_in_seconds=expiration_time_in_seconds,
audience="task-instance-logs",
)

# Prevent direct access to the logs port
@flask_app.before_request
def validate_pre_signed_url():
try:
auth = request.headers['Authorization']

# We don't actually care about the payload, just that the signature
# was valid and the `exp` claim is correct
filename, headers = signer.loads(auth, return_header=True)

issued_at = int(headers['iat'])
expires_at = int(headers['exp'])
except Exception:
auth = request.headers.get('Authorization')
if auth is None:
logger.warning("The Authorization header is missing: %s.", request.headers)
abort(403)
payload = signer.verify_token(auth)
token_filename = payload.get("filename")
request_filename = request.view_args['filename']
if token_filename is None:
logger.warning("The payload does not contain 'filename' key: %s.", payload)
abort(403)
if token_filename != request_filename:
logger.warning(
potiuk marked this conversation as resolved.
Show resolved Hide resolved
"The payload log_relative_path key is different than the one in token:"
"Request path: %s. Token path: %s.",
request_filename,
token_filename,
)
abort(403)
except InvalidAudienceError:
logger.warning("Invalid audience for the request", exc_info=True)
abort(403)

if filename != request.view_args['filename']:
except InvalidSignatureError:
logger.warning("The signature of the request was wrong", exc_info=True)
abort(403)

# Validate the `iat` and `exp` are within `max_request_age` of now.
now = int(time.time())
if abs(now - issued_at) > max_request_age:
except ImmatureSignatureError:
logger.warning("The signature of the request was sent from the future", exc_info=True)
abort(403)
if abs(now - expires_at) > max_request_age:
except ExpiredSignatureError:
logger.warning(
"The signature of the request has expired. Make sure that all components "
"in your system have synchronized clocks. "
"See more at %s",
get_docs_url("configurations-ref.html#secret-key"),
exc_info=True,
)
abort(403)
if issued_at > expires_at or expires_at - issued_at > max_request_age:
except InvalidIssuedAtError:
logger.warning(
"The request was issues in the future. Make sure that all components "
"in your system have synchronized clocks. "
"See more at %s",
get_docs_url("configurations-ref.html#secret-key"),
exc_info=True,
)
abort(403)
except Exception:
logger.warning("Unknown error", exc_info=True)
abort(403)

@flask_app.route('/log/<path:filename>')
Expand Down
1 change: 1 addition & 0 deletions newsfragments/24519.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The JWT claims in the request to retrieve logs have been standardized: we use "nbf" and "aud" claims for maturity and audience of the requests. Also "filename" payload field is used to keep log name.
Loading