From 9ff458024c7e982042585704d5ac37e0305ed377 Mon Sep 17 00:00:00 2001 From: Sergiy Zaschipas Date: Fri, 4 Jun 2021 02:17:16 +0300 Subject: [PATCH] Merge with master of aiven/karapace to our repository (#8) * tests/test_schema.py: splitting test_schema() Split test_schema() to multiple single-purpose tests No essential functional changes in the tests * Added information how to run integration tests against Confluence stack Instructions in README.rst Docker Compose file to start the Confluence stack * Kafka REST fixed version to 6.1.1 to match Schema Registry * README.rst: clarified compatibility Changed the claim that Karapace is compatible to that aims to be compatible with 6.1.1 and added a list of known incompabilities. * Configuration Keys as table * fixed content table * Fixed small spelling bugs * test_schema.py removed assert_schema_versions from test_schema_repost, unrelated * test_schema.py added -> None to all test method signatures. * test_schema.py: added annotations to all functions * test_schema.py duplicate code removal * test_schema.py moved a comment to a an assert message * test_schema.py removed unneeded f-string wrappings * utils.py AVRO name compatible (http://avro.apache.org/docs/current/spec.html#names). Must not have '-'. * test_schema.py test_schema_version_numbering uses 'name' in the Avro to make the schema unique * test_schema.py: str format() -> f-strings * test_schema.py no more JSONs as strings, instead dicts that are dumped as JSON strings * utils.py add create_schema_name_factory, create safer names For example in Avro field names '-' is not allowed. Using underscore instead. * test_schema.py: split test_schema_versions into two tests New ones: test_schema_versions_multiple_subjects_same_schema and test_schema_versions_deleting The tests use unique schema names * test_schema.py: test_schema_remains_constant fixes Wasn't using a unique schema id. Added doc * test_schema.py removed test_enum_schema_compatibility Essentially a duplicate of test_enum_schema * test_schema.py: fix test_schema_repost Compares JSONs now, not strings. * test_schema.py test_compatibility_endpoint fix Now uses a dynamic unique schema name. Was clashing before. Added documentation on what the test does. * test_schema.py test_record_schema_compatibility_backward split into two The new ones: test_record_schema_compatibility_backward and test_record_schema_compatibility_forward * test_schema_version_number_existing_schema takes version ids from response Now compatible with SR * test_schema.py: test_schema_subject_version_schema fix Changed to use a proper Avro schema * test_schema.py: test_schema_same_subject fix No longer expects the exact same string schema to be returned. The str parsed as JSON needs to match. * Handle gracefully if no node is master eligible Karapace configuration allows configuring node to not be eligible for master. Handle gracefully ie. read-only mode if all nodes are configured non-eligible for master. * schema_registry: breaking change in an error message The error message in POST to /subject/ when schema is not specified in the request changed. Fixes test_schema_subject_post_invalid to run in Karapace and against Schema Registry * schema_registry: breaking change in subjects/{}/versions/{} Fixed the error message in subjects/{}/versions/{} to match Schema Registry Now test_schema_subject_invalid_id works against SR * test_schema.py test_version_number_validation fix Error message check matches the error from SR (was breaking the test) Dynamically fetches the version number Added description for the test * Add some typing, rename eligible master flag for clarification * schema_registry: breaking change in POST subjects/{subject}/versions In the case the endpoint is submitted without body, changed the HTTP status code, error_code and message match the ones in Schema Registry. Made the necessary changes so that Karapace also returns correct values. test_schema.py: test_schema_missing_body fixed accordingly. * schema_registry: breaking changes in some HTTP error messages Now HTTP error messages match with the ones coming from Schema Registry. Adjusted test_http_headers in test_schema.py to correctly check the messages. * schema_registry: breaking change in /schemas/ids/<>/versions /schemas/ids//versions now returns empty list in case nothing is found. This is the behaviour of SR. Karapace used to fail in this case before this change. The tests test_schema_lifecycle and test_schema_versions_deleting now works against Schema Registry (in addition to Karapace) * test_schema.py: test_schema_versions_deleting: No unique field Unique field name not needed, schema name is enough. Using a fixed one. * readme: clarified and separated readme moved documentation about development to the CONTRIBUTING.md file, and tried to make the README.rst a bit more concise. * Remove explicit master eligibility flag and utilize optional master_url * CONTRIBUTING.md small fixes Only minor changes, no essential content change: Changed some rst formattings to md Some typos fixed such as karapace -> Karapace A few small tweaks * doc: fixed grammar * KarapaceAll: startup fix When started from KarapaceAll, the __init__ of KarapaceSchemaRegistry is not called. schema_lock is initialized in __init__. Thus it's not called when using KarapaceAll. Fix is to move schema_lock init to _init() which gets called also when using KarapaceAll. * docs: locahost -> localhost Co-authored-by: Juha Mynttinen Co-authored-by: Francesco Co-authored-by: Tommi Vainikainen Co-authored-by: Augusto Hack --- CONTRIBUTING.md | 101 +- README.rst | 450 +++---- karapace/master_coordinator.py | 36 +- karapace/rapu.py | 8 +- karapace/schema_reader.py | 9 +- karapace/schema_registry_apis.py | 46 +- .../integration/confluent-docker-compose.yml | 51 + tests/integration/test_master_coordinator.py | 27 +- tests/integration/test_schema.py | 1035 +++++++++++------ tests/utils.py | 8 +- 10 files changed, 1029 insertions(+), 742 deletions(-) create mode 100644 tests/integration/confluent-docker-compose.yml diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 9f1be72b6..176e7f107 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,17 +1,102 @@ # Welcome! -Contributions are very welcome on Karapace. Please follow the guidelines: +Contributions are very welcome on Karapace. When contributing please keep this in mind: -- It's recommended to open an issue to discuss a feature before putting in a lot of effort. +- Open an issue to discuss new bigger features. +- Write code consistent with the project style and make sure the tests are passing. +- Stay in touch with us if we have follow up questions or requests for further changes. -- We use [GitHub Flow](https://guides.github.com/introduction/flow/), check that your main branch is up to date, and create a new branch for changes. +# Development -- Commit messages should describe the changes, not the filenames. Win our admiration by following the [excellent advice from Chris Beams](https://chris.beams.io/posts/git-commit/) when composing commit messages. +## Local Environment -- Choose a meaningful title for your pull request. +There is very little you need to get started coding for Karapace: -- The pull request description should focus on what changed and why. +- Use [one of the supported python versions](https://github.com/aiven/karapace/blob/master/setup.py) + documented in the `setup.py` classifiers. +- Create [a virtual environment](https://docs.python.org/3/tutorial/venv.html) and install the dev dependencies in it: -- Check that the tests pass (and add test coverage for your changes if appropriate). +```python +python -m venv +source /bin/activate +pip install -r ./requirements-dev.txt +pip install -e . +``` -- Stay in touch with us if we have follow up questions or requests for further changes. +## Tests + +Tests are written with the [pytest](https://docs.pytest.org/) framework, and All PRs are tested for +each supported Python version using [GitHub Flow](https://guides.github.com/introduction/flow/). + +There are two flavors of tests, unit tests and integration tests: + +- Unit: These are faster and very useful for quick iterations. They are usually testing pure + functions. +- Integration: Are slower but more complete. These tests run Karapace, ZooKeeper, and Kafka servers, + pytest's fixtures are used to start/stop these for you. + +Both flavors run in parallel using [pytest-xdist](https://github.com/pytest-dev/pytest-xdist). New +tests should be engineered with this in mind: + +- Don't reuse schema/subject/topic names +- Expect other clients to be interacting with the servers at the same time. + +To run the tests use `make`. It will download Kafka to be used in the tests for you: + +```sh +make unittest +make integrationtest +``` + +### PyCharm + +If you want to run the tests from within the IDE, first download Kafka using `make fetch-kafka`, and +use the project root as the working directory. + +### Compatibility tests + +The integration tests can be configured to use an external REST (`--rest-url`), Registry +(`--registry-url`) and Kafka (`--kafka-bootstrap-servers`). These can be used to make sure the +tests conform to the Kafka REST or Schema Registry APIs, and then that Karapace conform to the +tests: + +```sh +docker-compose -f ./tests/integration/confluent-docker-compose.yml up -d +pytest --kafka-bootstrap-servers localhost:9092 --registry-url http://localhost:8081 --rest-url http://localhost:8082/ tests/integration +``` + +## Static checking and Linting + +The code is statically checked and formatted using [a few +tools](https://github.com/aiven/karapace/blob/master/requirements-dev.txt). To run these +automatically on each commit please enable the [pre-commit](https://pre-commit.com) hooks. +Alternatively you can run it manually with `make pre-commit`. + +## Manual testing + +To use your development code, you just need to set up a Kafka server and run Karapace from you +virtual environment: + +``` +docker-compose -f ./container/docker-compose.yml up -d kafka +karapace karapace.config.json +``` + +### Configuration + +To see descriptions of configuration keys see our +[README](https://github.com/aiven/karapace#configuration-keys). + +Each configuration key can be overridden with an environment variable prefixed with `KARAPACE_`, +exception being configuration keys that actually start with the `karapace` string. For example, to +override the `bootstrap_uri` config value, one would use the environment variable +`KARAPACE_BOOTSTRAP_URI`. + +# Opening a PR + +- Commit messages should describe the changes, not the filenames. Win our admiration by following + the [excellent advice from Chris Beams](https://chris.beams.io/posts/git-commit/) when composing + commit messages. +- Choose a meaningful title for your pull request. +- The pull request description should focus on what changed and why. +- Check that the tests pass (and add test coverage for your changes if appropriate). diff --git a/README.rst b/README.rst index b083700d4..d541a07bb 100644 --- a/README.rst +++ b/README.rst @@ -3,195 +3,64 @@ Karapace ``karapace`` Your Kafka essentials in one tool +.. image:: https://github.com/aiven/karapace/actions/workflows/tests.yml/badge.svg + +Overview +======== + +Karapace supports the storing of schemas in a central repository, which clients can access to +serialize and deserialize messages. The schemas also maintain their own version histories and can be +checked for compatibility between their different respective versions. + +Karapace rest provides a RESTful interface to your Kafka cluster, allowing you to perform tasks such +as producing and consuming messages and perform administrative cluster work, all the while using the +language of the WEB. + +Karapace is compatible with Schema Registry 6.1.1 on API level. Features ======== -* Schema Registry and Rest Proxy that are 1:1 Compatible with the pre-existing proprietary - Confluent Schema Registry and Kafka Rest Proxy * Drop in replacement both on pre-existing Schema Registry / Kafka Rest Proxy client and server-sides * Moderate memory consumption * Asynchronous architecture based on aiohttp +* Supports Avro and JSON Schema. Protobuf development is tracked with `Issue 67`_. - -Overview -======== - -Karapace supports the storing of schemas in a central repository, which -clients can access to serialize and deserialize messages. The schemas also -maintain their own version histories and can be checked for compatibility -between their different respective versions. - -Karapace rest provides a RESTful interface to your Kafka cluster, allowing you to perform -tasks such as producing and consuming messages and perform administrative cluster work, -all the while using the language of the WEB. +.. _Issue 67: https://github.com/aiven/karapace/issues/67 Setup ===== -Karapace is a Python project, and requires Kafka for its backend storage. There is also a `Docker setup for development`_. - -Requirements ------------- - -Karapace requires Python 3.6 or later and some additional components in -order to operate: - -* aiohttp_ for serving schemas over HTTP in an asynchronous fashion -* avro-python3_ for Avro serialization -* kafka-python_ to read, write and coordinate Karapace's persistence in Kafka -* raven-python_ (optional) to report exceptions to sentry -* aiokafka_ for some components of the rest proxy - -.. _`aiohttp`: https://github.com/aio-libs/aiohttp -.. _`aiokafka`: https://github.com/aio-libs/aiokafka -.. _`avro-python3`: https://github.com/apache/avro -.. _`kafka-python`: https://github.com/dpkp/kafka-python -.. _`raven-python`: https://github.com/getsentry/raven-python - -Developing and testing Karapace also requires the following utilities: -requests_, flake8_, pylint_ and pytest_. - -.. _`flake8`: https://flake8.readthedocs.io/ -.. _`requests`: http://www.python-requests.org/en/latest/ -.. _`pylint`: https://www.pylint.org/ -.. _`pytest`: http://pytest.org/ - -Karapace has been developed and tested on modern Linux x86-64 systems, but -should work on other platforms that provide the required modules. - - -Building --------- - -To build an installation package for your distribution, go to the root -directory of a Karapace Git checkout and run:: - - python3 setup.py bdist_egg - -This will produce an egg file into a dist directory within the same folder. - -Installation +Using Docker ------------ -Python/Other:: - - easy_install dist/karapace-0.1.0-py3.6.egg - -On Linux systems it is recommended to simply run ``karapace`` under -``systemd``:: +To get you up and running with the latest release of Karapace, a docker setup is available:: - systemctl enable karapace.service + docker-compose -f ./container/docker-compose.yml up -d -and eventually after the setup section, you can just run:: +Then you should be able to reach two sets of endpoints: - systemctl start karapace.service +* Karapace schema registry on http://localhost:8081 +* Karapace REST on http://localhost:8082 Configuration -------------- - -After this you need to create a suitable JSON configuration file for your -installation. Keys to take special care are the ones needed to configure -Kafka and advertised_hostname. - -Each configuration key can be overridden with an environment variable prefixed with -``KARAPACE_``, exception being configuration keys that actually start with the ``karapace`` string. -For example, to override the ``bootstrap_uri`` config value, one would use the environment variable -``KARAPACE_BOOTSTRAP_URI`` - - -To see descriptions of configuration keys see section ``config``. Here's an -example configuration file to give you an idea what you need to change:: - - { - "advertised_hostname": "localhost", - "bootstrap_uri": "127.0.0.1:9092", - "client_id": "sr-1", - "compatibility": "FULL", - "group_id": "schema-registry", - "host": "127.0.0.1", - "log_level": "DEBUG", - "port": 8081, - "master_eligibility": true, - "replication_factor": 1, - "security_protocol": "PLAINTEXT", - "ssl_cafile": null, - "ssl_certfile": null, - "ssl_keyfile": null, - "topic_name": "_schemas" - } - -Local Development ------------------ - -Currently Karapace runs on the Python major versions 3.7, 3.8 and 3.9. You can use any of these for development. -Naturally, independently of Python version you use, the code needs to run on all the supported versions. -The CI pipeline in GitHub actions will run the tests on all these Python versions to ensure this. - -To run Karapace locally, or develop it, first install the dependencies. -If you only need the runtime, i.e. you're not running tests or committing to Git, -it's enough to install the runtime dependencies:: - - # Runtime dependencies - python3 -m pip install -r requirements.txt - -If you are developing and e.g. running tests, install the development dependencies. -This will install also the runtime dependencies:: - - # Development dependencies, contains runtime dependencies - python3 -m pip install -r requirements-dev.txt - -To run the local/current version of the code, set up the configuration file in ``karapace.config.json`` to include connection details for Kafka and any other config you want to change, then run:: - - python3 -m karapace.karapace_all karapace.config.json - -There are two flavors of tests, unit tests and integration tests. The unit tests are standalone, -i.e. can be run without anything outside of the test running. The integration tests in turn need -a running ZooKeeper and Kafka, but take internally care of starting and stopping them. +^^^^^^^^^^^^^ -The tests can be run from the command line using :code:`make`:: +Each configuration key can be overridden with an environment variable prefixed with ``KARAPACE_``, +exception being configuration keys that actually start with the ``karapace`` string. For example, to +override the ``bootstrap_uri`` config value, one would use the environment variable +``KARAPACE_BOOTSTRAP_URI``. Here_ you can find an example configuration file to give you an idea +what you need to change. - # Running unit tests - make unittest +.. _`Here`: https://github.com/aiven/karapace/blob/master/karapace.config.json - # Running integration tests - make integrationtest +Source install +-------------- -To run the tests in an IDE, you need once download and untar Kafka -by :code:`make fetch-kafka`. Additionally ensure that the working directory -when running tests, is set to Git root, e.g. in PyCharm you can -create a configuration template with the correct working directory. +Alternatively you can do a source install using:: -The integration tests are run in parallel e.g. in the CI-pipeline. -The tests need to be engineered taking this in mind. - -There are several coding style checks in `GitHub Actions `_. -Your code changes need to pass these tests. To run the checks locally, -you can run them manually:: - - # Runs all coding style checks - make pre-commit - -Alternatively,you can use `pre-commit `_ to automatically run the checks on commit time:: - - pre-commit install - -Docker setup for development ----------------------------- - -To get you up and running with a development copy of Karapace, a docker setup -is available. You can find everything you need for this in the ``container/`` -folder. - -Get the containers running:: - - docker-compose up - -Then you should be able to reach two sets of endpoints: - -* Karapace schema registry on http://localhost:8081 -* Karapace REST on http://localhost:8082 + python setup.py install Quickstart ========== @@ -303,6 +172,7 @@ Delete consumer:: $ curl -X DELETE -H "Accept: application/vnd.kafka.v2+json" \ http://localhost:8081/consumers/avro_consumers/instances/my_consumer + Backing up your Karapace ======================== @@ -318,7 +188,6 @@ consumer:: ./kafka-console-consumer.sh --bootstrap-server brokerhostname:9092 --topic _schemas --from-beginning --property print.key=true --timeout-ms 1000 1> schemas.log - Restoring Karapace from backup ============================== @@ -333,7 +202,6 @@ You can restore the data from the previous step by running:: ./kafka-console-producer.sh --broker-list brokerhostname:9092 --topic _schemas --property parse.key=true < schemas.log - Performance comparison to Confluent stack ========================================== Latency @@ -392,7 +260,6 @@ Ram consumption, different consumer count, over 300s 20 83 530 =========== =================== ================ - Commands ======== @@ -400,144 +267,121 @@ Once installed, the ``karapace`` program should be in your path. It is the main daemon process that should be run under a service manager such as ``systemd`` to serve clients. - Configuration keys ================== -``advertised_hostname`` (default ``socket.gethostname()``) - -The hostname being advertised to other instances of Karapace that are -attached to the same Kafka group. All nodes within the cluster need to have -their advertised_hostname's set so that they can all reach each other. - -``bootstrap_uri`` (default ``localhost:9092``) - -The URI to the Kafka service where to store the schemas and to run -coordination among the Karapace instances. - -``client_id`` (default ``sr-1``) - -The client_id name by which the Karapace will use when coordinating with -other Karapaces who is master. The one with the name that sorts as the -first alphabetically is chosen as master from among the services with -master_eligibility set to true. - -``consumer_enable_autocommit`` (default ``True``) - -Enable auto commit on rest proxy consumers - -``consumer_request_timeout_ms`` (default ``11000``) - -Rest proxy consumers timeout for reads that do not limit the max bytes or provide their own timeout - -``consumer_request_max_bytes`` (default ``67108864``) - -Rest proxy consumers maximum bytes to be fetched per request - -``fetch_min_bytes`` (default ``-1``) - -Rest proxy consumers minimum bytes to be fetched per request. -1 means no limit - -``group_id`` (default ``schema-registry``) - -The Kafka group name used for selecting a master service to coordinate the -storing of Schemas. - -``master_eligibility`` (``true``) - -Should the service instance be considered for promotion to be the master -service. Reason to turn this off would be to have an instances of Karapace -running somewhere else for HA purposes but which you wouldn't want to -automatically promote to master if the primary instances were to become -unavailable. - -``producer_compression_type`` (default ``None``) - -Type of compression to be used by rest proxy producers - -``producer_acks`` (default ``1``) - -Level of consistency desired by each producer message sent on the rest proxy -More on https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html - -``producer_linger_ms`` (default ``0``) - -Time to wait for grouping together requests -More on https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html - -``security_protocol`` (default ``PLAINTEXT``) - -Default Kafka security protocol needed to communicate with the Kafka -cluster. Other options is to use SSL for SSL client certificate -authentication. - -``sentry`` (default ``None``) - -Used to configure parameters for sentry integration (dsn, tags, ...). Setting the -environment variable ``SENTRY_DSN`` will also enable sentry integration. - -``ssl_cafile`` (default ``Path to CA certificate``) - -Used when security_protocol is set to SSL, the path to the SSL CA certificate. - -``ssl_certfile`` (default ``/path/to/certfile``) - -Used when security_protocol is set to SSL, the path to the SSL certfile. - -``ssl_keyfile`` (default ``/path/to/keyfile``) - -Used when security_protocol is set to SSL, the path to the SSL keyfile. - -``topic_name`` (default ``_schemas``) - -The name of the Kafka topic where to store the schemas. - -``replication_factor`` (default ``1``) - -The replication factor to be used with the schema topic. - -``host`` (default ``"127.0.0.1"``) - -Address to bind the Karapace HTTP server to. Set to an empty string to -listen to all available addresses. - -``registry_host`` (default ``"127.0.0.1"``) - -Kafka Registry host, used by Kafka Rest for avro related requests. -If running both in the same process, it should be left to its default value - -``port`` (default ``8081``) - -HTTP webserver port to bind the Karapace to. - -``registry_port`` (default ``8081``) - -Kafka Registry port, used by Kafka Rest for avro related requests. -If running both in the same process, it should be left to its default value - -``metadata_max_age_ms`` (default ``60000``) - -Preiod of time in milliseconds after Kafka metadata is force refreshed. - -``karapace_rest`` (default ``true``) - -If the rest part of the app should be included in the starting process -At least one of this and karapace_registry options need to be enabled in order -for the service to start - -``karapace_registry`` (default ``true``) - -If the registry part of the app should be included in the starting process -At least one of this and karapace_registry options need to be enabled in order -for the service to start - -``name_strategy`` (default ``subject_name``) - -Name strategy to use when storing schemas from the kafka rest proxy service - -``master_election_strategy`` (default ``lowest``) - -Decides on what basis the karapace cluster master is chosen (only relevant in a multi node setup) +Keys to take special care are the ones needed to configure Kafka and advertised_hostname. + +.. list-table:: + :header-rows: 1 + + * - Parameter + - Default Value + - Description + * - ``advertised_hostname`` + - ``socket.gethostname()`` + - The hostname being advertised to other instances of Karapace that are attached to the same Kafka group. All nodes within the cluster need to have their ``advertised_hostname``'s set so that they can all reach each other. + * - ``bootstrap_uri`` + - ``localhost:9092`` + - The URI to the Kafka service where to store the schemas and to run + coordination among the Karapace instances. + * - ``client_id`` + - ``sr-1`` + - The ``client_id`` name by which the Karapace will use when coordinating with + other Karapaces who is master. The one with the name that sorts as the + first alphabetically is chosen as master from among the services with + master_eligibility set to true. + * - ``consumer_enable_autocommit`` + - ``True`` + - Enable auto commit on rest proxy consumers + * - ``consumer_request_timeout_ms`` + - ``11000`` + - Rest proxy consumers timeout for reads that do not limit the max bytes or provide their own timeout + * - ``consumer_request_max_bytes`` + - ``67108864`` + - Rest proxy consumers maximum bytes to be fetched per request + * - ``fetch_min_bytes`` + - ``-1`` + - Rest proxy consumers minimum bytes to be fetched per request. ``-1`` means no limit + * - ``group_id`` + - ``schema-registry`` + - The Kafka group name used for selecting a master service to coordinate the storing of Schemas. + * - ``master_eligibility`` + - ``true`` + - Should the service instance be considered for promotion to be the master + service. Reason to turn this off would be to have an instances of Karapace + running somewhere else for HA purposes but which you wouldn't want to + automatically promote to master if the primary instances were to become + unavailable. + * - ``producer_compression_type`` + - ``None`` + - Type of compression to be used by rest proxy producers + * - ``producer_acks`` + - ``1`` + - Level of consistency desired by each producer message sent on the rest proxy. + More on `Kafka Producer `_ + * - ``producer_linger_ms`` + - ``0`` + - Time to wait for grouping together requests. + More on `Kafka Producer `_ + * - ``security_protocol`` + - ``PLAINTEXT`` + - Default Kafka security protocol needed to communicate with the Kafka + cluster. Other options is to use SSL for SSL client certificate + authentication. + * - ``sentry`` + - ``None`` + - Used to configure parameters for sentry integration (dsn, tags, ...). Setting the + environment variable ``SENTRY_DSN`` will also enable sentry integration. + * - ``ssl_cafile`` + - ``/path/to/cafile`` + - Used when ``security_protocol`` is set to SSL, the path to the SSL CA certificate. + * - ``ssl_certfile`` + - ``/path/to/certfile`` + - Used when ``security_protocol`` is set to SSL, the path to the SSL certfile. + * - ``ssl_keyfile`` + - ``/path/to/keyfile`` + - Used when ``security_protocol`` is set to SSL, the path to the SSL keyfile. + * - ``topic_name`` + - ``_schemas`` + - The name of the Kafka topic where to store the schemas. + * - ``replication_factor`` + - ``1`` + - The replication factor to be used with the schema topic. + * - ``host`` + - ``127.0.0.1`` + - Address to bind the Karapace HTTP server to. Set to an empty string to + listen to all available addresses. + * - ``registry_host`` + - ``127.0.0.1`` + - Kafka Registry host, used by Kafka Rest for avro related requests. + If running both in the same process, it should be left to its default value + * - ``port`` + - ``8081`` + - HTTP webserver port to bind the Karapace to. + * - ``registry_port`` + - ``8081`` + - Kafka Registry port, used by Kafka Rest for avro related requests. + If running both in the same process, it should be left to its default value + * - ``metadata_max_age_ms`` + - ``60000`` + - Period of time in milliseconds after Kafka metadata is force refreshed. + * - ``karapace_rest`` + - ``true`` + - If the rest part of the app should be included in the starting process + At least one of this and ``karapace_registry`` options need to be enabled in order + for the service to start + * - ``karapace_registry`` + - ``true`` + - If the registry part of the app should be included in the starting process + At least one of this and ``karapace_rest`` options need to be enabled in order + for the service to start + * - ``name_strategy`` + - ``subject_name`` + - Name strategy to use when storing schemas from the kafka rest proxy service + * - ``master_election_strategy`` + - ``lowest`` + - Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup) License ======= @@ -548,7 +392,6 @@ available in the ``LICENSE`` file. Please note that the project explicitly does not require a CLA (Contributor License Agreement) from its contributors. - Contact ======= @@ -557,7 +400,6 @@ and pull requests at https://github.com/aiven/karapace . Any possible vulnerabilities or other serious issues should be reported directly to the maintainers . - Credits ======= diff --git a/karapace/master_coordinator.py b/karapace/master_coordinator.py index a2f993155..8d1b8545d 100644 --- a/karapace/master_coordinator.py +++ b/karapace/master_coordinator.py @@ -11,6 +11,7 @@ from karapace import constants from karapace.utils import KarapaceKafkaClient from threading import Lock, Thread +from typing import Optional, Tuple import json import logging @@ -30,7 +31,7 @@ class SchemaCoordinator(BaseCoordinator): hostname = None port = None scheme = None - master = None + are_we_master = None master_url = None master_eligibility = True log = logging.getLogger("SchemaCoordinator") @@ -49,16 +50,25 @@ def group_protocols(self): def _perform_assignment(self, leader_id, protocol, members): self.log.info("Creating assignment: %r, protocol: %r, members: %r", leader_id, protocol, members) - self.master = None + self.are_we_master = None error = NO_ERROR urls = {} + fallback_urls = {} for member_id, member_data in members: member_identity = json.loads(member_data.decode("utf8")) if member_identity["master_eligibility"] is True: urls[get_identity_url(member_identity["scheme"], member_identity["host"], member_identity["port"])] = (member_id, member_data) - self.master_url = sorted(urls, reverse=self.election_strategy.lower() == "highest")[0] - schema_master_id, member_data = urls[self.master_url] + else: + fallback_urls[get_identity_url(member_identity["scheme"], member_identity["host"], + member_identity["port"])] = (member_id, member_data) + if len(urls) > 0: + chosen_url = sorted(urls, reverse=self.election_strategy.lower() == "highest")[0] + schema_master_id, member_data = urls[chosen_url] + else: + # Protocol guarantees there is at least one member thus if urls is empty, fallback_urls cannot be + chosen_url = sorted(fallback_urls, reverse=self.election_strategy.lower() == "highest")[0] + schema_master_id, member_data = fallback_urls[chosen_url] member_identity = json.loads(member_data.decode("utf8")) identity = self.get_identity( host=member_identity["host"], @@ -66,7 +76,7 @@ def _perform_assignment(self, leader_id, protocol, members): scheme=member_identity["scheme"], json_encode=False, ) - self.log.info("Chose: %r with url: %r as the master", schema_master_id, self.master_url) + self.log.info("Chose: %r with url: %r as the master", schema_master_id, chosen_url) assignments = {} for member_id, member_data in members: @@ -90,12 +100,16 @@ def _on_join_complete(self, generation, member_id, protocol, member_assignment_b host=member_identity["host"], port=member_identity["port"], ) - if member_assignment["master"] == member_id: + # On Kafka protocol we can be assigned to be master, but if not master eligible, then we're not master for real + if member_assignment["master"] == member_id and member_identity["master_eligibility"]: self.master_url = master_url - self.master = True + self.are_we_master = True + elif not member_identity["master_eligibility"]: + self.master_url = None + self.are_we_master = False else: self.master_url = master_url - self.master = False + self.are_we_master = False return super(SchemaCoordinator, self)._on_join_complete(generation, member_id, protocol, member_assignment_bytes) def _on_join_follower(self): @@ -157,10 +171,10 @@ def init_schema_coordinator(self): self.sc.master_eligibility = self.config["master_eligibility"] self.lock.release() # self.sc now exists, we get to release the lock - def get_master_info(self): + def get_master_info(self) -> Tuple[bool, Optional[str]]: """Return whether we're the master, and the actual master url that can be used if we're not""" with self.lock: - return self.sc.master, self.sc.master_url + return self.sc.are_we_master, self.sc.master_url def close(self): self.log.info("Closing master_coordinator") @@ -179,7 +193,7 @@ def run(self): self.sc.ensure_active_group() self.sc.poll_heartbeat() - self.log.debug("We're master: %r: master_uri: %r", self.sc.master, self.sc.master_url) + self.log.debug("We're master: %r: master_uri: %r", self.sc.are_we_master, self.sc.master_url) time.sleep(min(_hb_interval, self.sc.time_to_next_heartbeat())) except: # pylint: disable=bare-except self.log.exception("Exception in master_coordinator") diff --git a/karapace/rapu.py b/karapace/rapu.py index e34b0a465..348dabe75 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -202,7 +202,7 @@ def check_rest_headers(self, request: HTTPRequest) -> dict: # pylint:disable=in if method in {"POST", "PUT"}: if not content_matcher: http_error( - message=HTTPStatus.UNSUPPORTED_MEDIA_TYPE.description, + message="HTTP 415 Unsupported Media Type", content_type=result["content_type"], code=HTTPStatus.UNSUPPORTED_MEDIA_TYPE, ) @@ -214,7 +214,7 @@ def check_rest_headers(self, request: HTTPRequest) -> dict: # pylint:disable=in return result self.log.error("Not acceptable: %r", request.get_header("accept")) http_error( - message=HTTPStatus.NOT_ACCEPTABLE.description, + message="HTTP 406 Not Acceptable", content_type=result["content_type"], code=HTTPStatus.NOT_ACCEPTABLE, ) @@ -226,7 +226,7 @@ def check_schema_headers(self, request: HTTPRequest): if method in {"POST", "PUT"} and cgi.parse_header(content_type)[0] not in SCHEMA_CONTENT_TYPES: http_error( - message=HTTPStatus.UNSUPPORTED_MEDIA_TYPE.description, + message="HTTP 415 Unsupported Media Type", content_type=response_default_content_type, code=HTTPStatus.UNSUPPORTED_MEDIA_TYPE, ) @@ -238,7 +238,7 @@ def check_schema_headers(self, request: HTTPRequest): if not content_type_match: self.log.debug("Unexpected Accept value: %r", accept_val) http_error( - message=HTTPStatus.NOT_ACCEPTABLE.description, + message="HTTP 406 Not Acceptable", content_type=response_default_content_type, code=HTTPStatus.NOT_ACCEPTABLE, ) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index f49a4be88..c7645ea31 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -268,11 +268,12 @@ def handle_messages(self): self.ready = True add_offsets = False if self.master_coordinator is not None: - master, _ = self.master_coordinator.get_master_info() - # keep old behavior for True. When master is False, then we are a follower, so we should not accept direct - # writes anyway. When master is None, then this particular node is waiting for a stable value, so any + are_we_master, _ = self.master_coordinator.get_master_info() + # keep old behavior for True. When are_we_master is False, then we are a follower, so we should not accept direct + # writes anyway. When are_we_master is None, then this particular node is waiting for a stable value, so any # messages off the topic are writes performed by another node - if master is True: + # Also if master_elibility is disabled by configuration, disable writes too + if are_we_master is True: add_offsets = True for _, msgs in raw_msgs.items(): diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index c621615c7..6456119b4 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -10,7 +10,7 @@ from karapace.rapu import HTTPRequest from karapace.schema_reader import InvalidSchema, KafkaSchemaReader, SchemaType, TypedSchema from karapace.utils import json_encode -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Tuple import argparse import asyncio @@ -21,6 +21,7 @@ @unique class SchemaErrorCodes(Enum): + EMPTY_SCHEMA = 42201 HTTP_NOT_FOUND = HTTPStatus.NOT_FOUND.value HTTP_CONFLICT = HTTPStatus.CONFLICT.value HTTP_UNPROCESSABLE_ENTITY = HTTPStatus.UNPROCESSABLE_ENTITY.value @@ -48,7 +49,6 @@ def __init__(self, config_file_path: str, config: dict) -> None: super().__init__(config_file_path=config_file_path, config=config) self._add_routes() self._init(config=config) - self.schema_lock = asyncio.Lock() def _init(self, config: dict) -> None: # pylint: disable=unused-argument self.ksr = None @@ -56,6 +56,7 @@ def _init(self, config: dict) -> None: # pylint: disable=unused-argument self.producer = self._create_producer() self._create_master_coordinator() self._create_schema_reader() + self.schema_lock = asyncio.Lock() def _add_routes(self): self.route( @@ -172,7 +173,7 @@ def _validate_version(self, content_type, version): # pylint: disable=inconsist body={ "error_code": SchemaErrorCodes.INVALID_VERSION_ID.value, "message": ( - "The specified version is not a valid version id. " + f"The specified version '{version}' is not a valid version id. " "Allowed values are between [1, 2^31-1] and the string \"latest\"" ), }, @@ -361,17 +362,6 @@ async def schemas_get_versions(self, content_type, *, schema_id): for version, schema in schemas.items(): if int(schema["id"]) == schema_id_int and not schema["deleted"]: subject_versions.append({"subject": subject, "version": int(version)}) - - if not subject_versions: - self.r( - body={ - "error_code": SchemaErrorCodes.HTTP_NOT_FOUND.value, - "message": "HTTP 404 Not Found", - }, - content_type=content_type, - status=HTTPStatus.NOT_FOUND, - ) - subject_versions = sorted(subject_versions, key=lambda s: (s["subject"], s["version"])) self.r(subject_versions, content_type) @@ -401,7 +391,7 @@ async def config_set(self, content_type, *, request): are_we_master, master_url = await self.get_master() if are_we_master: self.send_config_message(compatibility_level=compatibility_level, subject=None) - elif are_we_master is None: + elif not master_url: self.no_master_error(content_type) else: url = f"{master_url}/config" @@ -452,7 +442,7 @@ async def config_subject_set(self, content_type, *, request, subject): are_we_master, master_url = await self.get_master() if are_we_master: self.send_config_message(compatibility_level=compatibility_level, subject=subject) - elif are_we_master is None: + elif not master_url: self.no_master_error(content_type) else: url = f"{master_url}/config/{subject}" @@ -499,7 +489,7 @@ async def subject_delete(self, content_type, *, subject, request: HTTPRequest): if are_we_master: async with self.schema_lock: await self._subject_delete_local(content_type, subject, permanent) - elif are_we_master is None: + elif not master_url: self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}?permanent={permanent}" @@ -603,7 +593,7 @@ async def subject_version_delete(self, content_type, *, subject, version, reques if are_we_master: async with self.schema_lock: await self._subject_version_delete_local(content_type, subject, version, permanent) - elif are_we_master is None: + elif not master_url: self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}" @@ -633,16 +623,16 @@ async def subject_versions_list(self, content_type, *, subject): subject_data = self._subject_get(subject, content_type) self.r(list(subject_data["schemas"]), content_type, status=HTTPStatus.OK) - async def get_master(self): + async def get_master(self) -> Tuple[bool, Optional[str]]: async with self.master_lock: while True: - master, master_url = self.mc.get_master_info() - if master is None: - self.log.info("No master set: %r, url: %r", master, master_url) + are_we_master, master_url = self.mc.get_master_info() + if are_we_master is None: + self.log.info("No master set: %r, url: %r", are_we_master, master_url) elif self.ksr.ready is False: self.log.info("Schema reader isn't ready yet: %r", self.ksr.ready) else: - return master, master_url + return are_we_master, master_url await asyncio.sleep(1.0) def _validate_schema_request_body(self, content_type, body) -> None: @@ -682,11 +672,11 @@ def _validate_schema_key(self, content_type, body) -> None: if "schema" not in body: self.r( body={ - "error_code": SchemaErrorCodes.HTTP_INTERNAL_SERVER_ERROR.value, - "message": "Internal Server Error", + "error_code": SchemaErrorCodes.EMPTY_SCHEMA.value, + "message": "Empty schema", }, content_type=content_type, - status=HTTPStatus.INTERNAL_SERVER_ERROR, + status=HTTPStatus.UNPROCESSABLE_ENTITY, ) async def subjects_schema_post(self, content_type, *, subject, request): @@ -698,7 +688,7 @@ async def subjects_schema_post(self, content_type, *, subject, request): self.r( body={ "error_code": SchemaErrorCodes.HTTP_INTERNAL_SERVER_ERROR.value, - "message": "Internal Server Error", + "message": f"Error while looking up schema under subject {subject}", }, content_type=content_type, status=HTTPStatus.INTERNAL_SERVER_ERROR, @@ -750,7 +740,7 @@ async def subject_post(self, content_type, *, subject, request): if are_we_master: async with self.schema_lock: await self.write_new_schema_local(subject, body, content_type) - elif are_we_master is None: + elif not master_url: self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}/versions" diff --git a/tests/integration/confluent-docker-compose.yml b/tests/integration/confluent-docker-compose.yml new file mode 100644 index 000000000..c755bb03f --- /dev/null +++ b/tests/integration/confluent-docker-compose.yml @@ -0,0 +1,51 @@ +--- +version: '2' +services: + zookeeper: + image: confluentinc/cp-zookeeper:latest + hostname: zookeeper + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka: + image: confluentinc/cp-kafka:latest + hostname: kafka + depends_on: + - zookeeper + ports: + - "9092:9092" + - "9101:9101" + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081 + + schema-registry: + image: confluentinc/cp-schema-registry:6.1.1 + + hostname: schema-registry + depends_on: + - kafka + ports: + - "8081:8081" + environment: + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:29092' + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 + + rest: + image: confluentinc/cp-kafka-rest:6.1.1 + depends_on: + - kafka + ports: + - "8082:8082" + environment: + KAFKA_REST_HOST_NAME: confluent-rest + KAFKA_REST_BOOTSTRAP_SERVERS: 'kafka:29092' + KAFKA_REST_LISTENERS: "http://rest:8082" + KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index a70dcaa7e..6d95a972d 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -32,12 +32,12 @@ def is_master(mc: MasterCoordinator) -> bool: This takes care of a race condition were the flag `master` is set but `master_url` is not yet set. """ - return bool(mc.sc and mc.sc.master and mc.sc.master_url) + return bool(mc.sc and mc.sc.are_we_master and mc.sc.master_url) def has_master(mc: MasterCoordinator) -> bool: """ True if `mc` has a master. """ - return bool(mc.sc and not mc.sc.master and mc.sc.master_url) + return bool(mc.sc and not mc.sc.are_we_master and mc.sc.master_url) @pytest.mark.timeout(60) # Github workflows need a bit of extra time @@ -91,6 +91,29 @@ def test_master_selection(kafka_servers: KafkaServers, strategy: str) -> None: assert slave.sc.master_url == master_url +def test_no_eligible_master(kafka_servers: KafkaServers) -> None: + client_id = new_random_name("master_selection_") + group_id = new_random_name("group_id") + + config_aa = set_config_defaults({ + "advertised_hostname": "127.0.0.1", + "bootstrap_uri": kafka_servers.bootstrap_servers, + "client_id": client_id, + "group_id": group_id, + "port": get_random_port(port_range=TESTS_PORT_RANGE, blacklist=[]), + "master_eligibility": False, + }) + + with closing(init_admin(config_aa)) as mc: + # Wait for the election to happen, ie. flag is not None + while not mc.sc or mc.sc.are_we_master is None: + time.sleep(0.3) + + # Make sure the end configuration is as expected + assert mc.sc.are_we_master is False + assert mc.sc.master_url is None + + async def test_schema_request_forwarding(registry_async_pair): master_url, slave_url = registry_async_pair max_tries, counter = 5, 0 diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index e5639b757..1406fe94a 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -7,9 +7,10 @@ from http import HTTPStatus from kafka import KafkaProducer from karapace.rapu import is_success +from karapace.schema_registry_apis import KarapaceSchemaRegistry from karapace.utils import Client from tests.utils import ( - create_field_name_factory, create_subject_name_factory, new_random_name, repeat_until_successful_request + create_field_name_factory, create_schema_name_factory, create_subject_name_factory, repeat_until_successful_request ) from typing import List, Tuple @@ -22,86 +23,7 @@ @pytest.mark.parametrize("trail", ["", "/"]) -@pytest.mark.parametrize("compatibility", ["FORWARD", "BACKWARD", "FULL"]) -async def test_enum_schema_compatibility(registry_async_client, compatibility, trail): - subject = create_subject_name_factory(f"test_enum_schema_compatibility-{trail}")() - - res = await registry_async_client.put(f"config{trail}", json={"compatibility": compatibility}) - assert res.status == 200 - schema = { - "type": "record", - "name": "myenumtest", - "fields": [{ - "type": { - "type": "enum", - "name": "enumtest", - "symbols": ["first", "second"], - }, - "name": "faa", - }] - } - res = await registry_async_client.post( - f"subjects/{subject}/versions{trail}", - json={"schema": jsonlib.dumps(schema)}, - ) - assert res.status == 200 - assert "id" in res.json() - schema_id = res.json()["id"] - schema = { - "type": "record", - "name": "myenumtest", - "fields": [{ - "type": { - "type": "enum", - "name": "enumtest", - "symbols": ["first", "second", "third"], - }, - "name": "faa", - }] - } - res = await registry_async_client.post( - f"subjects/{subject}/versions{trail}", - json={"schema": jsonlib.dumps(schema)}, - ) - assert res.status == 200 - assert "id" in res.json() - schema_id2 = res.json()["id"] - assert schema_id != schema_id2 - - schema = { - "type": "record", - "name": "myenumtest", - "fields": [{ - "type": { - "type": "enum", - "name": "enumtest", - "symbols": ["second"], - }, - "name": "faa", - }] - } - res = await registry_async_client.post( - f"subjects/{subject}/versions{trail}", - json={"schema": jsonlib.dumps(schema)}, - ) - assert res.status == 200 - assert "id" in res.json() - schema_id3 = res.json()["id"] - assert schema_id3 != schema_id2 - - res = await registry_async_client.get(f"schemas/ids/{schema_id3}{trail}") - assert res.status_code == 200 - res = jsonlib.loads(res.json()["schema"]) - assert res["type"] == "record" - assert res["name"] == "myenumtest" - assert res["fields"][0]["name"] == "faa" - assert res["fields"][0]["type"]["type"] == "enum" - assert res["fields"][0]["type"]["name"] == "enumtest" - assert res["fields"][0]["type"]["symbols"] == ["second"] - - -@pytest.mark.parametrize("trail", ["", "/"]) -async def test_union_to_union(registry_async_client, trail): +async def test_union_to_union(registry_async_client: Client, trail: str) -> None: subject_name_factory = create_subject_name_factory(f"test_union_to_union-{trail}") subject_1 = subject_name_factory() @@ -155,7 +77,7 @@ async def test_union_to_union(registry_async_client, trail): @pytest.mark.parametrize("trail", ["", "/"]) -async def test_missing_subject_compatibility(registry_async_client, trail): +async def test_missing_subject_compatibility(registry_async_client: Client, trail: str) -> None: subject = create_subject_name_factory(f"test_missing_subject_compatibility-{trail}")() res = await registry_async_client.post( @@ -173,7 +95,7 @@ async def test_missing_subject_compatibility(registry_async_client, trail): @pytest.mark.parametrize("trail", ["", "/"]) -async def test_record_union_schema_compatibility(registry_async_client, trail): +async def test_record_union_schema_compatibility(registry_async_client: Client, trail: str) -> None: subject = create_subject_name_factory(f"test_record_union_schema_compatibility-{trail}")() res = await registry_async_client.put(f"config/{subject}{trail}", json={"compatibility": "BACKWARD"}) @@ -253,7 +175,7 @@ async def test_record_union_schema_compatibility(registry_async_client, trail): @pytest.mark.parametrize("trail", ["", "/"]) -async def test_record_nested_schema_compatibility(registry_async_client, trail): +async def test_record_nested_schema_compatibility(registry_async_client: Client, trail: str) -> None: subject = create_subject_name_factory(f"test_record_nested_schema_compatibility-{trail}")() res = await registry_async_client.put("config", json={"compatibility": "BACKWARD"}) @@ -291,22 +213,26 @@ async def test_record_nested_schema_compatibility(registry_async_client, trail): # change string to integer in the nested record, should fail schema["fields"][1]["type"]["fields"][0]["type"] = "int" res = await registry_async_client.post( - "subjects/{}/versions".format(subject), + f"subjects/{subject}/versions", json={"schema": jsonlib.dumps(schema)}, ) assert res.status == 409 @pytest.mark.parametrize("trail", ["", "/"]) -async def test_compatibility_endpoint(registry_async_client, trail): +async def test_compatibility_endpoint(registry_async_client: Client, trail: str) -> None: + """ + Creates a subject with a schema. + Calls compatibility/subjects/{subject}/versions/latest endpoint + and checks it return is_compatible true for a compatible new schema + and false for incompatible schema. + """ subject = create_subject_name_factory(f"test_compatibility_endpoint-{trail}")() - - res = await registry_async_client.put(f"config{trail}", json={"compatibility": "BACKWARD"}) - assert res.status == 200 + schema_name = create_schema_name_factory(f"test_compatibility_endpoint_{trail}")() schema = { "type": "record", - "name": "Objct", + "name": schema_name, "fields": [ { "name": "age", @@ -321,9 +247,8 @@ async def test_compatibility_endpoint(registry_async_client, trail): ) assert res.status == 200 - res = await registry_async_client.get("schemas/ids/{}{}".format(res.json()["id"], trail)) - schema_gotten_back = jsonlib.loads(res.json()["schema"]) - assert schema_gotten_back == schema + res = await registry_async_client.put(f"config/{subject}{trail}", json={"compatibility": "BACKWARD"}) + assert res.status == 200 # replace int with long schema["fields"] = [{"type": "long", "name": "age"}] @@ -334,6 +259,7 @@ async def test_compatibility_endpoint(registry_async_client, trail): assert res.status == 200 assert res.json() == {"is_compatible": True} + # replace int with string schema["fields"] = [{"type": "string", "name": "age"}] res = await registry_async_client.post( f"compatibility/subjects/{subject}/versions/latest{trail}", @@ -344,7 +270,7 @@ async def test_compatibility_endpoint(registry_async_client, trail): @pytest.mark.parametrize("trail", ["", "/"]) -async def test_type_compatibility(registry_async_client, trail): +async def test_type_compatibility(registry_async_client: Client, trail: str) -> None: def _test_cases(): # Generate FORWARD, BACKWARD and FULL tests for primitive types _CONVERSIONS = { @@ -425,15 +351,14 @@ def _test_cases(): @pytest.mark.parametrize("trail", ["", "/"]) -async def test_record_schema_compatibility(registry_async_client, trail): - subject_name_factory = create_subject_name_factory(f"test_record_schema_compatibility-{trail}") - subject_1 = subject_name_factory() +async def test_record_schema_compatibility_forward(registry_async_client: Client, trail: str) -> None: + subject_name_factory = create_subject_name_factory(f"test_record_schema_compatibility_forward_{trail}") + subject = subject_name_factory() + schema_name = create_schema_name_factory(f"test_record_schema_compatibility_forward_{trail}")() - res = await registry_async_client.put("config", json={"compatibility": "FORWARD"}) - assert res.status == 200 - schema = { + schema_1 = { "type": "record", - "name": "Objct", + "name": schema_name, "fields": [ { "name": "first_name", @@ -441,18 +366,20 @@ async def test_record_schema_compatibility(registry_async_client, trail): }, ] } - res = await registry_async_client.post( - f"subjects/{subject_1}/versions{trail}", - json={"schema": jsonlib.dumps(schema)}, + f"subjects/{subject}/versions{trail}", + json={"schema": jsonlib.dumps(schema_1)}, ) assert res.status == 200 assert "id" in res.json() schema_id = res.json()["id"] - schema2 = { + res = await registry_async_client.put(f"/config/{subject}{trail}", json={"compatibility": "FORWARD"}) + assert res.status == 200 + + schema_2 = { "type": "record", - "name": "Objct", + "name": schema_name, "fields": [ { "name": "first_name", @@ -469,17 +396,17 @@ async def test_record_schema_compatibility(registry_async_client, trail): ] } res = await registry_async_client.post( - f"subjects/{subject_1}/versions{trail}", - json={"schema": jsonlib.dumps(schema2)}, + f"subjects/{subject}/versions{trail}", + json={"schema": jsonlib.dumps(schema_2)}, ) assert res.status == 200 assert "id" in res.json() schema_id2 = res.json()["id"] assert schema_id != schema_id2 - schema3a = { + schema_3a = { "type": "record", - "name": "Objct", + "name": schema_name, "fields": [ { "name": "last_name", @@ -497,17 +424,17 @@ async def test_record_schema_compatibility(registry_async_client, trail): ] } res = await registry_async_client.post( - f"subjects/{subject_1}/versions{trail}", - json={"schema": jsonlib.dumps(schema3a)}, + f"subjects/{subject}/versions{trail}", + json={"schema": jsonlib.dumps(schema_3a)}, ) # Fails because field removed assert res.status == 409 res_json = res.json() assert res_json["error_code"] == 409 - schema3b = { + schema_3b = { "type": "record", - "name": "Objct", + "name": schema_name, "fields": [ { "name": "first_name", @@ -524,17 +451,53 @@ async def test_record_schema_compatibility(registry_async_client, trail): ] } res = await registry_async_client.post( - f"subjects/{subject_1}/versions{trail}", - json={"schema": jsonlib.dumps(schema3b)}, + f"subjects/{subject}/versions{trail}", + json={"schema": jsonlib.dumps(schema_3b)}, ) # Fails because incompatible type change assert res.status == 409 res_json = res.json() assert res_json["error_code"] == 409 - schema4 = { + schema_4 = { "type": "record", - "name": "Objct", + "name": schema_name, + "fields": [ + { + "name": "first_name", + "type": "string" + }, + { + "name": "last_name", + "type": "string" + }, + { + "name": "third_name", + "type": "string", + "default": "foodefaultvalue" + }, + { + "name": "age", + "type": "int" + }, + ] + } + res = await registry_async_client.post( + f"subjects/{subject}/versions{trail}", + json={"schema": jsonlib.dumps(schema_4)}, + ) + assert res.status == 200 + + +@pytest.mark.parametrize("trail", ["", "/"]) +async def test_record_schema_compatibility_backward(registry_async_client: Client, trail: str) -> None: + subject_name_factory = create_subject_name_factory(f"test_record_schema_compatibility_backward_{trail}") + subject_1 = subject_name_factory() + schema_name = create_schema_name_factory(f"test_record_schema_compatibility_backward_{trail}")() + + schema_1 = { + "type": "record", + "name": schema_name, "fields": [ { "name": "first_name", @@ -557,14 +520,17 @@ async def test_record_schema_compatibility(registry_async_client, trail): } res = await registry_async_client.post( f"subjects/{subject_1}/versions{trail}", - json={"schema": jsonlib.dumps(schema4)}, + json={"schema": jsonlib.dumps(schema_1)}, ) assert res.status == 200 - res = await registry_async_client.put("config", json={"compatibility": "BACKWARD"}) - schema5 = { + res = await registry_async_client.put(f"config/{subject_1}{trail}", json={"compatibility": "BACKWARD"}) + assert res.status == 200 + + # adds fourth_name w/o default, invalid + schema_2 = { "type": "record", - "name": "Objct", + "name": schema_name, "fields": [ { "name": "first_name", @@ -591,39 +557,40 @@ async def test_record_schema_compatibility(registry_async_client, trail): } res = await registry_async_client.post( f"subjects/{subject_1}/versions{trail}", - json={"schema": jsonlib.dumps(schema5)}, + json={"schema": jsonlib.dumps(schema_2)}, ) assert res.status == 409 # Add a default value for the field - schema5["fields"][3] = {"name": "fourth_name", "type": "string", "default": "foof"} + schema_2["fields"][3] = {"name": "fourth_name", "type": "string", "default": "foof"} res = await registry_async_client.post( f"subjects/{subject_1}/versions{trail}", - json={"schema": jsonlib.dumps(schema5)}, + json={"schema": jsonlib.dumps(schema_2)}, ) assert res.status == 200 assert "id" in res.json() # Try to submit schema with a different definition - schema5["fields"][3] = {"name": "fourth_name", "type": "int", "default": 2} + schema_2["fields"][3] = {"name": "fourth_name", "type": "int", "default": 2} res = await registry_async_client.post( f"subjects/{subject_1}/versions{trail}", - json={"schema": jsonlib.dumps(schema5)}, + json={"schema": jsonlib.dumps(schema_2)}, ) assert res.status == 409 subject_2 = subject_name_factory() res = await registry_async_client.put(f"config/{subject_2}{trail}", json={"compatibility": "BACKWARD"}) - schema = {"type": "record", "name": "Object", "fields": [{"name": "first_name", "type": "string"}]} - res = await registry_async_client.post(f"subjects/{subject_2}/versions{trail}", json={"schema": jsonlib.dumps(schema)}) assert res.status == 200 - schema["fields"].append({"name": "last_name", "type": "string"}) - res = await registry_async_client.post(f"subjects/{subject_2}/versions{trail}", json={"schema": jsonlib.dumps(schema)}) + schema_1 = {"type": "record", "name": schema_name, "fields": [{"name": "first_name", "type": "string"}]} + res = await registry_async_client.post(f"subjects/{subject_2}/versions{trail}", json={"schema": jsonlib.dumps(schema_1)}) + assert res.status == 200 + schema_1["fields"].append({"name": "last_name", "type": "string"}) + res = await registry_async_client.post(f"subjects/{subject_2}/versions{trail}", json={"schema": jsonlib.dumps(schema_1)}) assert res.status == 409 @pytest.mark.parametrize("trail", ["", "/"]) -async def test_enum_schema_field_add_compatibility(registry_async_client, trail): +async def test_enum_schema_field_add_compatibility(registry_async_client: Client, trail: str) -> None: subject_name_factory = create_subject_name_factory(f"test_enum_schema_field_add_compatibility-{trail}") expected_results = [("BACKWARD", 200), ("FORWARD", 200), ("FULL", 200)] for compatibility, status_code in expected_results: @@ -641,7 +608,7 @@ async def test_enum_schema_field_add_compatibility(registry_async_client, trail) @pytest.mark.parametrize("trail", ["", "/"]) -async def test_array_schema_field_add_compatibility(registry_async_client, trail): +async def test_array_schema_field_add_compatibility(registry_async_client: Client, trail: str) -> None: subject_name_factory = create_subject_name_factory(f"test_array_schema_field_add_compatibility-{trail}") expected_results = [("BACKWARD", 200), ("FORWARD", 409), ("FULL", 409)] for compatibility, status_code in expected_results: @@ -659,7 +626,7 @@ async def test_array_schema_field_add_compatibility(registry_async_client, trail @pytest.mark.parametrize("trail", ["", "/"]) -async def test_array_nested_record_compatibility(registry_async_client, trail): +async def test_array_nested_record_compatibility(registry_async_client: Client, trail: str) -> None: subject_name_factory = create_subject_name_factory(f"test_array_nested_record_compatibility-{trail}") expected_results = [("BACKWARD", 409), ("FORWARD", 200), ("FULL", 409)] for compatibility, status_code in expected_results: @@ -687,7 +654,7 @@ async def test_array_nested_record_compatibility(registry_async_client, trail): @pytest.mark.parametrize("trail", ["", "/"]) -async def test_record_nested_array_compatibility(registry_async_client, trail): +async def test_record_nested_array_compatibility(registry_async_client: Client, trail: str) -> None: subject_name_factory = create_subject_name_factory(f"test_record_nested_array_compatibility-{trail}") expected_results = [("BACKWARD", 200), ("FORWARD", 409), ("FULL", 409)] for compatibility, status_code in expected_results: @@ -715,8 +682,8 @@ async def test_record_nested_array_compatibility(registry_async_client, trail): async def test_map_schema_field_add_compatibility( - registry_async_client -): # TODO: Rename to pålain check map schema and add additional steps + registry_async_client: Client +) -> None: # TODO: Rename to pålain check map schema and add additional steps subject_name_factory = create_subject_name_factory("test_map_schema_field_add_compatibility") expected_results = [("BACKWARD", 200), ("FORWARD", 409), ("FULL", 409)] for compatibility, status_code in expected_results: @@ -733,7 +700,7 @@ async def test_map_schema_field_add_compatibility( assert res.status == status_code -async def test_enum_schema(registry_async_client): +async def test_enum_schema(registry_async_client: Client) -> None: subject_name_factory = create_subject_name_factory("test_enum_schema") for compatibility in {"BACKWARD", "FORWARD", "FULL"}: subject = subject_name_factory() @@ -790,7 +757,7 @@ async def test_enum_schema(registry_async_client): @pytest.mark.parametrize("compatibility", ["BACKWARD", "FORWARD", "FULL"]) -async def test_fixed_schema(registry_async_client, compatibility): +async def test_fixed_schema(registry_async_client: Client, compatibility: str) -> None: subject_name_factory = create_subject_name_factory(f"test_fixed_schema-{compatibility}") status_code_allowed = 200 status_code_denied = 409 @@ -850,7 +817,7 @@ async def test_fixed_schema(registry_async_client, compatibility): assert res.status == status_code_denied -async def test_primitive_schema(registry_async_client): +async def test_primitive_schema(registry_async_client: Client) -> None: subject_name_factory = create_subject_name_factory("test_primitive_schema") expected_results = [("BACKWARD", 200), ("FORWARD", 200), ("FULL", 200)] for compatibility, status_code in expected_results: @@ -880,7 +847,7 @@ async def test_primitive_schema(registry_async_client): res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": jsonlib.dumps(schema)}) -async def test_union_comparing_to_other_types(registry_async_client): +async def test_union_comparing_to_other_types(registry_async_client: Client) -> None: subject_name_factory = create_subject_name_factory("test_primitive_schema") expected_results = [("BACKWARD", 409), ("FORWARD", 200), ("FULL", 409)] for compatibility, status_code in expected_results: @@ -927,7 +894,7 @@ async def test_union_comparing_to_other_types(registry_async_client): assert res.status == status_code -async def test_transitive_compatibility(registry_async_client): +async def test_transitive_compatibility(registry_async_client: Client) -> None: subject = create_subject_name_factory("test_transitive_compatibility")() res = await registry_async_client.put(f"config/{subject}", json={"compatibility": "BACKWARD_TRANSITIVE"}) assert res.status == 200 @@ -1019,71 +986,144 @@ async def assert_schema_versions_failed(client: Client, trail: str, schema_id: i assert res.status_code == response_code +async def register_schema(registry_async_client: Client, trail, subject: str, schema_str: str) -> Tuple[int, int]: + # Register to get the id + res = await registry_async_client.post( + f"subjects/{subject}/versions{trail}", + json={"schema": schema_str}, + ) + assert res.status == 200 + schema_id = res.json()["id"] + + # Get version + res = await registry_async_client.post( + f"subjects/{subject}{trail}", + json={"schema": schema_str}, + ) + assert res.status == 200 + assert res.json()["id"] == schema_id + return schema_id, res.json()["version"] + + @pytest.mark.parametrize("trail", ["", "/"]) -async def test_schema_versions(registry_async_client, trail): +async def test_schema_versions_multiple_subjects_same_schema(registry_async_client: Client, trail: str) -> None: """ Tests case where there are multiple subjects with the same schema. The schema/versions endpoint returns all these subjects. """ - subject_name_factory = create_subject_name_factory(f"test_schema-{trail}") - unique_field_factory = create_field_name_factory(trail) - - schema_str1 = '{"type": "string", "unique" : "%s"}' % unique_field_factory() - schema_str2 = '{"type": "string", "unique" : "%s"}' % unique_field_factory() + subject_name_factory = create_subject_name_factory(f"test_schema_versions_multiple_subjects_same_schema-{trail}") + schema_name_factory = create_schema_name_factory(f"test_schema_versions_multiple_subjects_same_schema_{trail}") - async def register_schema(subject: str, schema_str: str) -> Tuple[int, int]: - # Register to get the id - res = await registry_async_client.post( - f"subjects/{subject}/versions{trail}", - json={"schema": schema_str}, - ) - assert res.status == 200 - schema_id = res.json()["id"] - - # Get version - res = await registry_async_client.post( - f"subjects/{subject}{trail}", - json={"schema": schema_str}, - ) - assert res.status == 200 - assert res.json()["id"] == schema_id - return schema_id, res.json()["version"] + schema_1 = { + "type": "record", + "name": schema_name_factory(), + "fields": [{ + "name": "f1", + "type": "string", + }, { + "name": "f2", + "type": "string", + }] + } + schema_str_1 = jsonlib.dumps(schema_1) + schema_2 = { + "type": "record", + "name": schema_name_factory(), + "fields": [{ + "name": "f1", + "type": "string", + }] + } + schema_str_2 = jsonlib.dumps(schema_2) subject_1 = subject_name_factory() - schema_id_1, version_1 = await register_schema(subject_1, schema_str1) + schema_id_1, version_1 = await register_schema(registry_async_client, trail, subject_1, schema_str_1) schema_1_versions = [(subject_1, version_1)] await assert_schema_versions(registry_async_client, trail, schema_id_1, schema_1_versions) subject_2 = subject_name_factory() - schema_id_2, version_2 = await register_schema(subject_2, schema_str1) + schema_id_2, version_2 = await register_schema(registry_async_client, trail, subject_2, schema_str_1) schema_1_versions = [(subject_1, version_1), (subject_2, version_2)] assert schema_id_1 == schema_id_2 await assert_schema_versions(registry_async_client, trail, schema_id_1, schema_1_versions) subject_3 = subject_name_factory() - schema_id_3, version_3 = await register_schema(subject_3, schema_str1) + schema_id_3, version_3 = await register_schema(registry_async_client, trail, subject_3, schema_str_1) schema_1_versions = [(subject_1, version_1), (subject_2, version_2), (subject_3, version_3)] assert schema_id_1 == schema_id_3 await assert_schema_versions(registry_async_client, trail, schema_id_1, schema_1_versions) - # subject_4 with different schema + # subject_4 with different schema to check there are no side effects subject_4 = subject_name_factory() - schema_id_4, version_4 = await register_schema(subject_4, schema_str2) + schema_id_4, version_4 = await register_schema(registry_async_client, trail, subject_4, schema_str_2) schema_2_versions = [(subject_4, version_4)] assert schema_id_1 != schema_id_4 await assert_schema_versions(registry_async_client, trail, schema_id_1, schema_1_versions) await assert_schema_versions(registry_async_client, trail, schema_id_4, schema_2_versions) - # subject_4 now with the same schema, will have different version - schema_id_5, version_5 = await register_schema(subject_4, schema_str1) - assert schema_id_1 == schema_id_5 - schema_1_versions = [(subject_1, version_1), (subject_2, version_2), (subject_3, version_3), (subject_4, version_5)] + +@pytest.mark.parametrize("trail", ["", "/"]) +async def test_schema_versions_deleting(registry_async_client: Client, trail: str) -> None: + """ + Tests getting schema versions when removing a schema version and eventually the subject. + """ + subject = create_subject_name_factory(f"test_schema_versions_deleting_{trail}")() + schema_name = create_schema_name_factory(f"test_schema_versions_deleting_{trail}")() + + schema_1 = { + "type": "record", + "name": schema_name, + "fields": [{ + "name": "field_1", + "type": "string" + }, { + "name": "field_2", + "type": "string" + }] + } + schema_str_1 = jsonlib.dumps(schema_1) + schema_2 = { + "type": "record", + "name": schema_name, + "fields": [ + { + "name": "field_1", + "type": "string" + }, + ] + } + schema_str_2 = jsonlib.dumps(schema_2) + + schema_id_1, version_1 = await register_schema(registry_async_client, trail, subject, schema_str_1) + schema_1_versions = [(subject, version_1)] await assert_schema_versions(registry_async_client, trail, schema_id_1, schema_1_versions) - await assert_schema_versions(registry_async_client, trail, schema_id_4, schema_2_versions) + + res = await registry_async_client.put(f"config/{subject}{trail}", json={"compatibility": "BACKWARD"}) + assert res.status == 200 + + schema_id_2, version_2 = await register_schema(registry_async_client, trail, subject, schema_str_2) + schema_2_versions = [(subject, version_2)] + await assert_schema_versions(registry_async_client, trail, schema_id_2, schema_2_versions) + + # Deleting one version, the other still found + res = await registry_async_client.delete("subjects/{}/versions/{}".format(subject, version_1)) + assert res.status_code == 200 + assert res.json() == version_1 + + await assert_schema_versions(registry_async_client, trail, schema_id_1, []) + await assert_schema_versions(registry_async_client, trail, schema_id_2, schema_2_versions) + + # Deleting the subject, the schema version 2 cannot be found anymore + res = await registry_async_client.delete("subjects/{}".format(subject)) + assert res.status_code == 200 + assert res.json() == [version_2] + + await assert_schema_versions(registry_async_client, trail, schema_id_1, []) + await assert_schema_versions(registry_async_client, trail, schema_id_2, []) @pytest.mark.parametrize("trail", ["", "/"]) -async def test_schema_types(registry_async_client, trail): +async def test_schema_types(registry_async_client: Client, trail: str) -> None: """ Tests for /schemas/types endpoint. """ @@ -1096,9 +1136,15 @@ async def test_schema_types(registry_async_client, trail): @pytest.mark.parametrize("trail", ["", "/"]) -async def test_schema(registry_async_client, trail): - subject = new_random_name("subject") - schema_str = '{"type": "string"}' +async def test_schema_repost(registry_async_client: Client, trail: str) -> None: + """" + Repost same schema again to see that a new id is not generated but an old one is given back + """ + subject = create_subject_name_factory(f"test_schema_repost-{trail}")() + unique_field_factory = create_field_name_factory(trail) + + unique = unique_field_factory() + schema_str = jsonlib.dumps({"type": "string", "unique": unique}) res = await registry_async_client.post( f"subjects/{subject}/versions{trail}", json={"schema": schema_str}, @@ -1106,84 +1152,218 @@ async def test_schema(registry_async_client, trail): assert res.status == 200 assert "id" in res.json() schema_id = res.json()["id"] + res = await registry_async_client.get(f"schemas/ids/{schema_id}{trail}") assert res.status_code == 200 - assert res.json()["schema"] == schema_str + assert jsonlib.loads(res.json()["schema"]) == jsonlib.loads(schema_str) - # repost same schema again to see that a new id is not generated but an old one is given back res = await registry_async_client.post( f"subjects/{subject}/versions{trail}", - json={"schema": '{"type": "string"}'}, + json={"schema": schema_str}, ) assert res.status == 200 assert "id" in res.json() assert schema_id == res.json()["id"] - # Schema missing in the json body + +@pytest.mark.parametrize("trail", ["", "/"]) +async def test_schema_missing_body(registry_async_client: Client, trail: str) -> None: + subject = create_subject_name_factory(f"test_schema_missing_body-{trail}")() + res = await registry_async_client.post( f"subjects/{subject}/versions{trail}", json={}, ) - assert res.status == 500 - assert res.json()["error_code"] == 500 - assert res.json()["message"] == "Internal Server Error" + assert res.status == 422 + assert res.json()["error_code"] == 42201 + assert res.json()["message"] == "Empty schema" - # nonexistent schema id + +async def test_schema_non_existing_id(registry_async_client: Client) -> None: + """ + Tests getting a non-existing schema id + """ result = await registry_async_client.get(os.path.join("schemas/ids/123456789")) assert result.json()["error_code"] == 40403 - # invalid schema_id + +@pytest.mark.parametrize("trail", ["", "/"]) +async def test_schema_non_invalid_id(registry_async_client: Client, trail: str) -> None: + """ + Tests getting an invalid schema id + """ result = await registry_async_client.get(f"schemas/ids/invalid{trail}") assert result.status == 404 assert result.json()["error_code"] == 404 assert result.json()["message"] == "HTTP 404 Not Found" + +@pytest.mark.parametrize("trail", ["", "/"]) +async def test_schema_subject_invalid_id(registry_async_client: Client, trail: str) -> None: + """ + Creates a subject with a schema and trying to find the invalid versions for the subject. + """ + subject = create_subject_name_factory(f"test_schema_subject_invalid_id-{trail}")() + unique_field_factory = create_field_name_factory(trail) + res = await registry_async_client.post( - "subjects/{}/versions".format(subject), json={"schema": "{\"type\": \"string\", \"foo\": \"string\"}"} + f"subjects/{subject}/versions", + json={"schema": "{\"type\": \"string\", \"foo\": \"string\", \"%s\": \"string\"}" % unique_field_factory()} ) assert res.status_code == 200 - assert "id" in res.json() - assert schema_id != res.json()["id"] - # Fetch the schema back to see how it was mangled - result = await registry_async_client.get(os.path.join("schemas/ids/{}".format(res.json()["id"]))) - schema = jsonlib.loads(result.json()["schema"]) - assert schema["type"] == "string" - assert schema["foo"] == "string" + # Find an invalid version 0 + res = await registry_async_client.get(f"subjects/{subject}/versions/0") + assert res.status_code == 422 + assert res.json()["error_code"] == 42202 + assert res.json()["message"] == \ + 'The specified version \'0\' is not a valid version id. '\ + + 'Allowed values are between [1, 2^31-1] and the string "latest"' + + # Find an invalid version (too large) + res = await registry_async_client.get(f"subjects/{subject}/versions/15") + assert res.status_code == 404 + assert res.json()["error_code"] == 40402 + assert res.json()["message"] == "Version 15 not found." + + +async def test_schema_subject_post_invalid(registry_async_client: Client) -> None: + """ + Tests posting to /subjects/{subject} with different invalid values. + """ + subject_name_factory = create_subject_name_factory("test_schema_subject_post_invalid") + + schema_str = jsonlib.dumps({"type": "string"}) + + # Create the subject + subject_1 = subject_name_factory() + res = await registry_async_client.post( + f"subjects/{subject_1}/versions", + json={"schema": schema_str}, + ) + assert res.status == 200 + + res = await registry_async_client.post( + f"subjects/{subject_1}", + json={"schema": jsonlib.dumps({"type": "invalid_type"})}, + ) + assert res.status == 500, "Invalid schema for existing subject should return 500" + assert res.json()["message"] == f"Error while looking up schema under subject {subject_1}" + + # Subject is not found + subject_2 = subject_name_factory() + res = await registry_async_client.post( + f"subjects/{subject_2}", + json={"schema": schema_str}, + ) + assert res.status == 404 + assert res.json()["error_code"] == 40401 + assert res.json()["message"] == f"Subject '{subject_2}' not found." + + # Schema not found for subject + res = await registry_async_client.post( + f"subjects/{subject_1}", + json={"schema": '{"type": "int"}'}, + ) + assert res.status == 404 + assert res.json()["error_code"] == 40403 + assert res.json()["message"] == "Schema not found" + + # Schema not included in the request body + res = await registry_async_client.post(f"subjects/{subject_1}", json={}) + assert res.status == 500 + assert res.json()["error_code"] == 500 + assert res.json()["message"] == f"Error while looking up schema under subject {subject_1}" + + # Schema not included in the request body for subject that does not exist + subject_3 = subject_name_factory() + res = await registry_async_client.post( + f"subjects/{subject_3}", + json={}, + ) + assert res.status == 404 + assert res.json()["error_code"] == 40401 + assert res.json()["message"] == f"Subject '{subject_3}' not found." + + +@pytest.mark.parametrize("trail", ["", "/"]) +async def test_schema_lifecycle(registry_async_client: Client, trail: str) -> None: + subject = create_subject_name_factory(f"test_schema_lifecycle-{trail}")() + unique_field_factory = create_field_name_factory(trail) + + unique_1 = unique_field_factory() + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schema": jsonlib.dumps({ + "type": "string", + "foo": "string", + unique_1: "string" + })} + ) + assert res.status_code == 200 + schema_id_1 = res.json()["id"] + + unique_2 = unique_field_factory() + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schema": jsonlib.dumps({ + "type": "string", + "foo": "string", + unique_2: "string" + })} + ) + schema_id_2 = res.json()["id"] + assert res.status_code == 200 + assert schema_id_1 != schema_id_2 + + await assert_schema_versions(registry_async_client, trail, schema_id_1, [(subject, 1)]) + await assert_schema_versions(registry_async_client, trail, schema_id_2, [(subject, 2)]) + + result = await registry_async_client.get(os.path.join(f"schemas/ids/{schema_id_1}")) + schema_json_1 = jsonlib.loads(result.json()["schema"]) + assert schema_json_1["type"] == "string" + assert schema_json_1["foo"] == "string" + assert schema_json_1[unique_1] == "string" + + result = await registry_async_client.get(os.path.join(f"schemas/ids/{schema_id_2}")) + schema_json_2 = jsonlib.loads(result.json()["schema"]) + assert schema_json_2["type"] == "string" + assert schema_json_2["foo"] == "string" + assert schema_json_2[unique_2] == "string" res = await registry_async_client.get("subjects") assert res.status_code == 200 assert subject in res.json() - res = await registry_async_client.get("subjects/{}/versions".format(subject)) + res = await registry_async_client.get(f"subjects/{subject}/versions") assert res.status_code == 200 assert res.json() == [1, 2] - res = await registry_async_client.get("subjects/{}/versions/1".format(subject)) + res = await registry_async_client.get(f"subjects/{subject}/versions/1") assert res.status_code == 200 assert res.json()["subject"] == subject - assert res.json()["schema"] == schema_str - - # Find an invalid version 0 - res = await registry_async_client.get("subjects/{}/versions/0".format(subject)) - assert res.status_code == 422 - assert res.json()["error_code"] == 42202 - assert res.json()["message"] == \ - 'The specified version is not a valid version id. Allowed values are between [1, 2^31-1] and the string "latest"' - - # Find an invalid version (too large) - res = await registry_async_client.get("subjects/{}/versions/15".format(subject)) - assert res.status_code == 404 - assert res.json()["error_code"] == 40402 - assert res.json()["message"] == "Version 15 not found." + assert jsonlib.loads(res.json()["schema"]) == schema_json_1 # Delete an actual version - res = await registry_async_client.delete("subjects/{}/versions/1".format(subject)) + res = await registry_async_client.delete(f"subjects/{subject}/versions/1") assert res.status_code == 200 assert res.json() == 1 + # Get the schema by id, still there, wasn't hard-deleted + res = await registry_async_client.get(f"schemas/ids/{schema_id_1}{trail}") + assert res.status_code == 200 + assert jsonlib.loads(res.json()["schema"]) == schema_json_1 + + # Get the schema by id + res = await registry_async_client.get(f"schemas/ids/{schema_id_2}{trail}") + assert res.status_code == 200 + + # Get the versions, old version not found anymore (even if schema itself is) + await assert_schema_versions(registry_async_client, trail, schema_id_1, []) + await assert_schema_versions(registry_async_client, trail, schema_id_2, [(subject, 2)]) + # Delete a whole subject - res = await registry_async_client.delete("subjects/{}".format(subject)) + res = await registry_async_client.delete(f"subjects/{subject}") assert res.status_code == 200 assert res.json() == [2] @@ -1194,57 +1374,71 @@ async def test_schema(registry_async_client, trail): # After deleting the last version of a subject, it shouldn't be in the list res = await registry_async_client.post( - "subjects/{}/versions".format(subject), - json={"schema": '{"type": "string"}'}, + f"subjects/{subject}/versions", + json={"schema": '{"type": "string", "unique": "%s"}' % unique_field_factory()}, ) assert res.status == 200 res = await registry_async_client.get("subjects") assert subject in res.json() - res = await registry_async_client.get("subjects/{}/versions".format(subject)) + res = await registry_async_client.get(f"subjects/{subject}/versions") assert res.json() == [3] - res = await registry_async_client.delete("subjects/{}/versions/3".format(subject)) + res = await registry_async_client.delete(f"subjects/{subject}/versions/3") assert res.status_code == 200 res = await registry_async_client.get("subjects") assert subject not in res.json() - res = await registry_async_client.get("subjects/{}/versions".format(subject)) + res = await registry_async_client.get(f"subjects/{subject}/versions") assert res.status_code == 404 assert res.json()["error_code"] == 40401 assert res.json()["message"] == f"Subject '{subject}' not found." - res = await registry_async_client.get("subjects/{}/versions/latest".format(subject)) + res = await registry_async_client.get(f"subjects/{subject}/versions/latest") assert res.status_code == 404 assert res.json()["error_code"] == 40401 assert res.json()["message"] == f"Subject '{subject}' not found." # Creating a new schema works after deleting the only available version + unique_3 = unique_field_factory() res = await registry_async_client.post( - "subjects/{}/versions".format(subject), - json={"schema": '{"type": "string"}'}, + f"subjects/{subject}/versions", + json={"schema": jsonlib.dumps({ + "type": "string", + "foo": "string", + unique_3: "string" + })} ) assert res.status == 200 - res = await registry_async_client.get("subjects/{}/versions".format(subject)) + res = await registry_async_client.get(f"subjects/{subject}/versions") assert res.json() == [4] - # Check version number generation when deleting an entire subjcect - subject = new_random_name("subject") - res = await registry_async_client.put("config/{}".format(subject), json={"compatibility": "NONE"}) - assert res.status == 200 + +@pytest.mark.parametrize("trail", ["", "/"]) +async def test_schema_version_numbering(registry_async_client: Client, trail: str) -> None: + """ + Test updating the schema of a subject increases its version number. + Deletes the subjects and asserts that when recreated, has a greater version number. + """ + subject = create_subject_name_factory(f"test_schema_version_numbering-{trail}")() + unique_field_factory = create_field_name_factory(trail) + + unique = unique_field_factory() schema = { "type": "record", - "name": "Object", - "fields": [ - { - "name": "first_name", - "type": "string", - }, - ] + "name": unique, + "fields": [{ + "name": "first_name", + "type": "string", + }], } - res = await registry_async_client.post("subjects/{}/versions".format(subject), json={"schema": jsonlib.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": jsonlib.dumps(schema)}) assert res.status == 200 assert "id" in res.json() + + res = await registry_async_client.put(f"config/{subject}", json={"compatibility": "FORWARD"}) + assert res.status == 200 + schema2 = { "type": "record", - "name": "Object", + "name": unique, "fields": [ { "name": "first_name", @@ -1256,42 +1450,68 @@ async def test_schema(registry_async_client, trail): }, ] } - res = await registry_async_client.post("subjects/{}/versions".format(subject), json={"schema": jsonlib.dumps(schema2)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": jsonlib.dumps(schema2)}) assert res.status == 200 assert "id" in res.json() - res = await registry_async_client.get("subjects/{}/versions".format(subject)) + res = await registry_async_client.get(f"subjects/{subject}/versions") assert res.status == 200 assert res.json() == [1, 2] - res = await registry_async_client.delete("subjects/{}".format(subject)) - assert res.status == 200 + # Recreate subject - res = await registry_async_client.post("subjects/{}/versions".format(subject), json={"schema": jsonlib.dumps(schema)}) - res = await registry_async_client.get("subjects/{}/versions".format(subject)) + res = await registry_async_client.delete(f"subjects/{subject}") + assert res.status == 200 + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": jsonlib.dumps(schema)}) + assert res.status == 200 + res = await registry_async_client.get(f"subjects/{subject}/versions") + assert res.status == 200 assert res.json() == [3] # Version number generation should now begin at 3 - # Check the return format on a more complex schema for version get - subject = new_random_name("subject") + +@pytest.mark.parametrize("trail", ["", "/"]) +async def test_schema_version_numbering_complex(registry_async_client: Client, trail: str) -> None: + """ + Tests that when fetching a more complex schema, it matches with the created one. + """ + subject = create_subject_name_factory(f"test_schema_version_numbering_complex-{trail}")() + unique_field_factory = create_field_name_factory(trail) + schema = { "type": "record", - "name": "Objct", + "name": "Object", "fields": [ { "name": "first_name", "type": "string", }, - ] + ], + "unique": unique_field_factory() } res = await registry_async_client.post( - "subjects/{}/versions".format(subject), + f"subjects/{subject}/versions", json={"schema": jsonlib.dumps(schema)}, ) - res = await registry_async_client.get("subjects/{}/versions/1".format(subject)) + schema_id = res.json()["id"] + + res = await registry_async_client.get(f"subjects/{subject}/versions/1") assert res.status == 200 assert res.json()["subject"] == subject assert sorted(jsonlib.loads(res.json()["schema"])) == sorted(schema) + await assert_schema_versions(registry_async_client, trail, schema_id, [(subject, 1)]) + + +@pytest.mark.parametrize("trail", ["", "/"]) +async def test_schema_three_subjects_sharing_schema(registry_async_client: Client, trail: str) -> None: + """" + Submits two subjects with the same schema. + Submits a third subject initially with different schema. Updates to share the schema. + Asserts all three subjects have the same schema. + """ + subject_name_factory = create_subject_name_factory(f"test_schema_XXX-{trail}") + unique_field_factory = create_field_name_factory(trail) + # Submitting the exact same schema for a different subject should return the same schema ID. - subject = new_random_name("subject") + subject_1 = subject_name_factory() schema = { "type": "record", "name": "Object", @@ -1300,121 +1520,138 @@ async def test_schema(registry_async_client, trail): "name": "just_a_value", "type": "string", }, - ] + { + "name": unique_field_factory(), + "type": "string", + }, + ], } - res = await registry_async_client.post("subjects/{}/versions".format(subject), json={"schema": jsonlib.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject_1}/versions", json={"schema": jsonlib.dumps(schema)}) assert res.status == 200 assert "id" in res.json() - original_schema_id = res.json()["id"] + schema_id_1 = res.json()["id"] + # New subject with the same schema - subject = new_random_name("subject") - res = await registry_async_client.post("subjects/{}/versions".format(subject), json={"schema": jsonlib.dumps(schema)}) + subject_2 = subject_name_factory() + res = await registry_async_client.post(f"subjects/{subject_2}/versions", json={"schema": jsonlib.dumps(schema)}) assert res.status == 200 assert "id" in res.json() - new_schema_id = res.json()["id"] - assert original_schema_id == new_schema_id + schema_id_2 = res.json()["id"] + assert schema_id_1 == schema_id_2 # It also works for multiple versions in a single subject - subject = new_random_name("subject") + subject_3 = subject_name_factory() res = await registry_async_client.put( - "config/{}".format(subject), json={"compatibility": "NONE"} + f"config/{subject_3}", json={"compatibility": "NONE"} ) # We don't care about the compatibility in this test res = await registry_async_client.post( - "subjects/{}/versions".format(subject), + f"subjects/{subject_3}/versions", json={"schema": '{"type": "string"}'}, ) assert res.status == 200 res = await registry_async_client.post( - "subjects/{}/versions".format(subject), + f"subjects/{subject_3}/versions", json={"schema": jsonlib.dumps(schema)}, ) assert res.status == 200 - assert res.json()["id"] == new_schema_id # Same ID as in the previous test step + assert res.json()["id"] == schema_id_1 # Same ID as in the previous test step + + +@pytest.mark.parametrize("trail", ["", "/"]) +async def test_schema_subject_version_schema(registry_async_client: Client, trail: str) -> None: + """ + Tests for the /subjects/(string: subject)/versions/(versionId: version)/schema endpoint. + """ + subject_name_factory = create_subject_name_factory(f"test_schema_subject_version_schema_{trail}") + schema_name = create_schema_name_factory(f"test_schema_subject_version_schema_{trail}")() # The subject version schema endpoint returns the correct results - subject = new_random_name("subject") - schema_str = '{"type": "string"}' + subject_1 = subject_name_factory() + + schema = { + "type": "record", + "name": schema_name, + "fields": [{ + "name": "just_a_value", + "type": "string", + }], + } + schema_str = jsonlib.dumps(schema) + res = await registry_async_client.post( - "subjects/{}/versions".format(subject), + f"subjects/{subject_1}/versions", json={"schema": schema_str}, ) assert res.status == 200 - res = await registry_async_client.get(f"subjects/{subject}/versions/1/schema") + res = await registry_async_client.get(f"subjects/{subject_1}/versions/1/schema") assert res.status == 200 assert res.json() == jsonlib.loads(schema_str) - subject2 = new_random_name("subject") - res = await registry_async_client.get(f"subjects/{subject2}/versions/1/schema") # Invalid subject + + subject_2 = subject_name_factory() + res = await registry_async_client.get(f"subjects/{subject_2}/versions/1/schema") # Invalid subject assert res.status == 404 assert res.json()["error_code"] == 40401 - assert res.json()["message"] == f"Subject '{subject2}' not found." - res = await registry_async_client.get(f"subjects/{subject}/versions/2/schema") + assert res.json()["message"] == f"Subject '{subject_2}' not found." + + res = await registry_async_client.get(f"subjects/{subject_1}/versions/2/schema") assert res.status == 404 assert res.json()["error_code"] == 40402 assert res.json()["message"] == "Version 2 not found." - res = await registry_async_client.get(f"subjects/{subject}/versions/latest/schema") + + res = await registry_async_client.get(f"subjects/{subject_1}/versions/latest/schema") assert res.status == 200 assert res.json() == jsonlib.loads(schema_str) - # The schema check for subject endpoint returns correct results - subject = new_random_name("subject") + +@pytest.mark.parametrize("trail", ["", "/"]) +async def test_schema_same_subject(registry_async_client: Client, trail: str) -> None: + """ + The same schema JSON should be returned when checking the same schema str against the same subject + """ + subject_name_factory = create_subject_name_factory(f"test_schema_same_subject_{trail}") + schema_name = create_schema_name_factory(f"test_schema_same_subject_{trail}")() + + schema_str = jsonlib.dumps({ + "type": "record", + "name": schema_name, + "fields": [{ + "name": "f", + "type": "string", + }] + }) + subject = subject_name_factory() res = await registry_async_client.post( - "subjects/{}/versions".format(subject), + f"subjects/{subject}/versions", json={"schema": schema_str}, ) assert res.status == 200 schema_id = res.json()["id"] - # The same ID should be returned when checking the same schema against the same subject res = await registry_async_client.post( f"subjects/{subject}", json={"schema": schema_str}, ) assert res.status == 200 - assert res.json() == {"id": schema_id, "subject": subject, "schema": schema_str, "version": 1} - # Invalid schema should return 500 - res = await registry_async_client.post( - f"subjects/{subject}", - json={"schema": '{"type": "invalid_type"}'}, - ) - assert res.status == 500 - assert res.json()["message"] == f"Error while looking up schema under subject {subject}" - # Subject is not found - subject3 = new_random_name("subject") - res = await registry_async_client.post( - f"subjects/{subject3}", - json={"schema": schema_str}, - ) - assert res.status == 404 - assert res.json()["error_code"] == 40401 - assert res.json()["message"] == f"Subject '{subject3}' not found." - # Schema not found for subject - res = await registry_async_client.post( - f"subjects/{subject}", - json={"schema": '{"type": "int"}'}, - ) - assert res.status == 404 - assert res.json()["error_code"] == 40403 - assert res.json()["message"] == "Schema not found" - # Schema not included in the request body - res = await registry_async_client.post(f"subjects/{subject}", json={}) - assert res.status == 500 - assert res.json()["error_code"] == 500 - assert res.json()["message"] == "Internal Server Error" - # Schema not included in the request body for subject that does not exist - subject4 = new_random_name("subject") - res = await registry_async_client.post( - f"subjects/{subject4}", - json={}, - ) - assert res.status == 404 - assert res.json()["error_code"] == 40401 - assert res.json()["message"] == f"Subject '{subject4}' not found." - # Test that global ID values stay consistent after using pre-existing schema ids - subject = new_random_name("subject") + # Switch the str schema to a dict for comparison + json = res.json() + json["schema"] = jsonlib.loads(json["schema"]) + assert json == {"id": schema_id, "subject": subject, "schema": jsonlib.loads(schema_str), "version": 1} + + +@pytest.mark.parametrize("trail", ["", "/"]) +async def test_schema_version_number_existing_schema(registry_async_client: Client, trail: str) -> None: + """ + Tests creating the same schemas for two subjects. Asserts the schema ids are the same for both subjects. + """ + subject_name_factory = create_subject_name_factory(f"test_schema_version_number_existing_schema-{trail}") + unique_field_factory = create_field_name_factory(trail) + + subject_1 = subject_name_factory() res = await registry_async_client.put( - "config/{}".format(subject), json={"compatibility": "NONE"} + f"config/{subject_1}", json={"compatibility": "NONE"} ) # We don't care about compatibility - schema = { + unique = unique_field_factory() + schema_1 = { "type": "record", "name": "Object", "fields": [ @@ -1422,9 +1659,13 @@ async def test_schema(registry_async_client, trail): "name": "just_a_value", "type": "string", }, - ] + { + "name": f"{unique}", + "type": "string", + }, + ], } - schema2 = { + schema_2 = { "type": "record", "name": "Object", "fields": [ @@ -1432,9 +1673,13 @@ async def test_schema(registry_async_client, trail): "name": "just_a_value2", "type": "string", }, - ] + { + "name": f"{unique}", + "type": "string", + }, + ], } - schema3 = { + schema_3 = { "type": "record", "name": "Object", "fields": [ @@ -1442,30 +1687,40 @@ async def test_schema(registry_async_client, trail): "name": "just_a_value3", "type": "int", }, - ] + { + "name": f"{unique}", + "type": "string", + }, + ], } - res = await registry_async_client.post("subjects/{}/versions".format(subject), json={"schema": jsonlib.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject_1}/versions", json={"schema": jsonlib.dumps(schema_1)}) assert res.status == 200 - first_schema_id = res.json()["id"] - res = await registry_async_client.post("subjects/{}/versions".format(subject), json={"schema": jsonlib.dumps(schema2)}) + schema_id_1 = res.json()["id"] + + res = await registry_async_client.post(f"subjects/{subject_1}/versions", json={"schema": jsonlib.dumps(schema_2)}) assert res.status == 200 - assert res.json()["id"] == first_schema_id + 1 + schema_id_2 = res.json()["id"] + assert schema_id_2 > schema_id_1 + # Reuse the first schema in another subject - subject = new_random_name("subject") + subject_2 = subject_name_factory() res = await registry_async_client.put( - "config/{}".format(subject), json={"compatibility": "NONE"} + f"config/{subject_2}", json={"compatibility": "NONE"} ) # We don't care about compatibility - res = await registry_async_client.post("subjects/{}/versions".format(subject), json={"schema": jsonlib.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject_2}/versions", json={"schema": jsonlib.dumps(schema_1)}) assert res.status == 200 - assert res.json()["id"] == first_schema_id + assert res.json()["id"] == schema_id_1 + # Create a new schema - res = await registry_async_client.post("subjects/{}/versions".format(subject), json={"schema": jsonlib.dumps(schema3)}) + res = await registry_async_client.post(f"subjects/{subject_2}/versions", json={"schema": jsonlib.dumps(schema_3)}) assert res.status == 200 - assert res.json()["id"] == first_schema_id + 2 + schema_id_3 = res.json()["id"] + assert res.json()["id"] == schema_id_3 + assert schema_id_3 > schema_id_2 @pytest.mark.parametrize("trail", ["", "/"]) -async def test_config(registry_async_client, trail): +async def test_config(registry_async_client: Client, trail: str) -> None: subject_name_factory = create_subject_name_factory(f"test_config-{trail}") # Tests /config endpoint @@ -1521,17 +1776,17 @@ async def test_config(registry_async_client, trail): assert res.headers["Content-Type"] == "application/vnd.schemaregistry.v1+json" # The subject doesn't exist from the schema point of view - res = await registry_async_client.get("subjects/{}/versions".format(subject_2)) + res = await registry_async_client.get(f"subjects/{subject_2}/versions") assert res.status_code == 404 res = await registry_async_client.post( - "subjects/{}/versions".format(subject_2), + f"subjects/{subject_2}/versions", json={"schema": '{"type": "string"}'}, ) assert res.status_code == 200 assert "id" in res.json() - res = await registry_async_client.get("config/{}".format(subject_2)) + res = await registry_async_client.get(f"config/{subject_2}") assert res.status_code == 200 assert res.json()["compatibilityLevel"] == "FULL" @@ -1545,7 +1800,7 @@ async def test_config(registry_async_client, trail): assert res.json()["compatibilityLevel"] == "NONE" -async def test_http_headers(registry_async_client): +async def test_http_headers(registry_async_client: Client) -> None: res = await registry_async_client.get("subjects", headers={"Accept": "application/json"}) assert res.headers["Content-Type"] == "application/json" @@ -1556,12 +1811,12 @@ async def test_http_headers(registry_async_client): # Giving an invalid Accept value res = await registry_async_client.get("subjects", headers={"Accept": "application/vnd.schemaregistry.v2+json"}) assert res.status == 406 - assert res.json()["message"] == HTTPStatus.NOT_ACCEPTABLE.description + assert res.json()["message"] == "HTTP 406 Not Acceptable" # PUT with an invalid Content type res = await registry_async_client.put("config", json={"compatibility": "NONE"}, headers={"Content-Type": "text/html"}) assert res.status == 415 - assert res.json()["message"] == HTTPStatus.UNSUPPORTED_MEDIA_TYPE.description + assert res.json()["message"] == "HTTP 415 Unsupported Media Type" assert res.headers["Content-Type"] == "application/vnd.schemaregistry.v1+json" # Multiple Accept values @@ -1585,7 +1840,7 @@ async def test_http_headers(registry_async_client): assert res.headers["Content-Type"] == "application/vnd.schemaregistry.v1+json" res = await registry_async_client.get("subjects", headers={"Accept": "text/*"}) assert res.status == 406 - assert res.json()["message"] == HTTPStatus.NOT_ACCEPTABLE.description + assert res.json()["message"] == "HTTP 406 Not Acceptable" # Accept without any type works res = await registry_async_client.get("subjects", headers={"Accept": "*/does_not_matter"}) @@ -1608,7 +1863,6 @@ async def test_http_headers(registry_async_client): assert res.headers["Content-Type"] == "application/vnd.schemaregistry.v1+json" res = await registry_async_client.get("subjects", headers={"Accept": "application/octet-stream"}) assert res.status == 406 - assert res.json()["message"] == HTTPStatus.NOT_ACCEPTABLE.description # Parse Content-Type correctly res = await registry_async_client.put( @@ -1650,7 +1904,7 @@ async def test_http_headers(registry_async_client): assert res.status_code == 404, res.content -async def test_schema_body_validation(registry_async_client): +async def test_schema_body_validation(registry_async_client: Client) -> None: subject = create_subject_name_factory("test_schema_body_validation")() post_endpoints = {f"subjects/{subject}", f"subjects/{subject}/versions"} for endpoint in post_endpoints: @@ -1676,28 +1930,39 @@ async def test_schema_body_validation(registry_async_client): assert res.json()["message"] == "Internal Server Error" -async def test_version_number_validation(registry_async_client): - # Create a schema +async def test_version_number_validation(registry_async_client: Client) -> None: + """ + Creates a subject and schema. Tests that the endpoints + subjects/{subject}/versions/{version} and + subjects/{subject}/versions/{version}/schema + return correct values both with valid and invalid parameters. + """ subject = create_subject_name_factory("test_version_number_validation")() res = await registry_async_client.post( - "subjects/{}/versions".format(subject), + f"subjects/{subject}/versions", json={"schema": '{"type": "string"}'}, ) assert res.status_code == 200 assert "id" in res.json() + res = await registry_async_client.get(f"subjects/{subject}/versions") + assert res.status == 200 + schema_version = res.json()[0] + invalid_schema_version = schema_version - 1 + version_endpoints = {f"subjects/{subject}/versions/$VERSION", f"subjects/{subject}/versions/$VERSION/schema"} for endpoint in version_endpoints: # Valid schema id - res = await registry_async_client.get(endpoint.replace("$VERSION", "1")) + res = await registry_async_client.get(endpoint.replace("$VERSION", str(schema_version))) assert res.status == 200 + # Invalid number - res = await registry_async_client.get(endpoint.replace("$VERSION", "0")) + res = await registry_async_client.get(endpoint.replace("$VERSION", str(invalid_schema_version))) assert res.status == 422 assert res.json()["error_code"] == 42202 assert res.json()[ "message" - ] == "The specified version is not a valid version id. " \ + ] == f"The specified version '{invalid_schema_version}' is not a valid version id. " \ "Allowed values are between [1, 2^31-1] and the string \"latest\"" # Valid latest string res = await registry_async_client.get(endpoint.replace("$VERSION", "latest")) @@ -1708,35 +1973,47 @@ async def test_version_number_validation(registry_async_client): assert res.json()["error_code"] == 42202 assert res.json()[ "message" - ] == "The specified version is not a valid version id. " \ + ] == "The specified version 'invalid' is not a valid version id. " \ "Allowed values are between [1, 2^31-1] and the string \"latest\"" -async def test_common_endpoints(registry_async_client): +async def test_common_endpoints(registry_async_client: Client) -> None: res = await registry_async_client.get("") assert res.status == 200 assert res.json() == {} -async def test_invalid_namespace(registry_async_client): +async def test_invalid_namespace(registry_async_client: Client) -> None: subject = create_subject_name_factory("test_invalid_namespace")() schema = {"type": "record", "name": "foo", "namespace": "foo-bar-baz", "fields": []} res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": jsonlib.dumps(schema)}) assert res.ok, res.json() -async def test_schema_remains_constant(registry_async_client): +async def test_schema_remains_constant(registry_async_client: Client) -> None: + """ + Creates a subject with schema. Asserts the schema is the same when fetching it using schemas/ids/{schema_id} + """ subject = create_subject_name_factory("test_schema_remains_constant")() - schema = {"type": "record", "name": "foo", "namespace": "foo-bar-baz", "fields": [{"type": "string", "name": "bla"}]} + schema_name = create_schema_name_factory("test_schema_remains_constant")() + schema = { + "type": "record", + "name": schema_name, + "namespace": "foo-bar-baz", + "fields": [{ + "type": "string", + "name": "bla" + }] + } schema_str = jsonlib.dumps(schema) res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": schema_str}) assert res.ok, res.json() - scid = res.json()["id"] - res = await registry_async_client.get(f"schemas/ids/{scid}") - assert res.json()["schema"] == schema_str + schema_id = res.json()["id"] + res = await registry_async_client.get(f"schemas/ids/{schema_id}") + assert jsonlib.loads(res.json()["schema"]) == jsonlib.loads(schema_str) -async def test_malformed_kafka_message(registry_async, registry_async_client): +async def test_malformed_kafka_message(registry_async: KarapaceSchemaRegistry, registry_async_client: Client) -> None: topic = registry_async.config["topic_name"] prod = KafkaProducer(bootstrap_servers=registry_async.config["bootstrap_uri"]) @@ -1762,7 +2039,7 @@ async def test_malformed_kafka_message(registry_async, registry_async_client): assert res_data == payload, res_data -async def test_inner_type_compat_failure(registry_async_client): +async def test_inner_type_compat_failure(registry_async_client: Client) -> None: subject = create_subject_name_factory("test_inner_type_compat_failure")() sc = { @@ -1808,7 +2085,7 @@ async def test_inner_type_compat_failure(registry_async_client): assert sc_id != res.json()["id"] -async def test_anon_type_union_failure(registry_async_client): +async def test_anon_type_union_failure(registry_async_client: Client) -> None: subject = create_subject_name_factory("test_anon_type_union_failure")() schema = { "type": "record", @@ -1861,7 +2138,7 @@ async def test_anon_type_union_failure(registry_async_client): @pytest.mark.parametrize("compatibility", ["FULL", "FULL_TRANSITIVE"]) -async def test_full_transitive_failure(registry_async_client, compatibility): +async def test_full_transitive_failure(registry_async_client: Client, compatibility: str) -> None: subject = create_subject_name_factory(f"test_full_transitive_failure-{compatibility}")() init = { @@ -1915,7 +2192,7 @@ async def test_full_transitive_failure(registry_async_client, compatibility): assert res.status == 409 -async def test_invalid_schemas(registry_async_client): +async def test_invalid_schemas(registry_async_client: Client) -> None: subject = create_subject_name_factory("test_invalid_schemas")() repated_field = { @@ -1939,7 +2216,7 @@ async def test_invalid_schemas(registry_async_client): assert not is_success(HTTPStatus(res.status)), "an invalid schema must not be a success" -async def test_schema_hard_delete_version(registry_async_client): +async def test_schema_hard_delete_version(registry_async_client: Client) -> None: subject = create_subject_name_factory("test_schema_hard_delete_version")() res = await registry_async_client.put("config", json={"compatibility": "BACKWARD"}) assert res.status == 200 @@ -1985,40 +2262,40 @@ async def test_schema_hard_delete_version(registry_async_client): assert schemav1_id != schemav2_id # Cannot directly hard delete schema v1 - res = await registry_async_client.delete("subjects/{}/versions/1?permanent=true".format(subject)) + res = await registry_async_client.delete(f"subjects/{subject}/versions/1?permanent=true") assert res.status_code == 404 assert res.json()["error_code"] == 40407 assert res.json()["message"] == f"Subject '{subject}' Version 1 was not deleted first before being permanently deleted" # Soft delete schema v1 - res = await registry_async_client.delete("subjects/{}/versions/1".format(subject)) + res = await registry_async_client.delete(f"subjects/{subject}/versions/1") assert res.status_code == 200 assert res.json() == 1 # Cannot soft delete twice - res = await registry_async_client.delete("subjects/{}/versions/1".format(subject)) + res = await registry_async_client.delete(f"subjects/{subject}/versions/1") assert res.status_code == 404 assert res.json()["error_code"] == 40406 assert res.json( )["message"] == f"Subject '{subject}' Version 1 was soft deleted.Set permanent=true to delete permanently" - res = await registry_async_client.get("subjects/{}/versions/1".format(subject)) + res = await registry_async_client.get(f"subjects/{subject}/versions/1") assert res.status_code == 404 assert res.json()["error_code"] == 40402 assert res.json()["message"] == "Version 1 not found." # Hard delete schema v1 - res = await registry_async_client.delete("subjects/{}/versions/1?permanent=true".format(subject)) + res = await registry_async_client.delete(f"subjects/{subject}/versions/1?permanent=true") assert res.status_code == 200 # Cannot hard delete twice - res = await registry_async_client.delete("subjects/{}/versions/1?permanent=true".format(subject)) + res = await registry_async_client.delete(f"subjects/{subject}/versions/1?permanent=true") assert res.status_code == 404 assert res.json()["error_code"] == 40402 assert res.json()["message"] == "Version 1 not found." -async def test_schema_hard_delete_whole_schema(registry_async_client): +async def test_schema_hard_delete_whole_schema(registry_async_client: Client) -> None: subject = create_subject_name_factory("test_schema_hard_delete_whole_schema")() res = await registry_async_client.put("config", json={"compatibility": "BACKWARD"}) assert res.status == 200 @@ -2064,33 +2341,33 @@ async def test_schema_hard_delete_whole_schema(registry_async_client): assert schemav1_id != schemav2_id # Hard delete whole schema cannot be done before soft delete - res = await registry_async_client.delete("subjects/{}?permanent=true".format(subject)) + res = await registry_async_client.delete(f"subjects/{subject}?permanent=true") assert res.status_code == 404 assert res.json()["error_code"] == 40405 assert res.json()["message"] == f"Subject '{subject}' was not deleted first before being permanently deleted" # Soft delete whole schema - res = await registry_async_client.delete("subjects/{}".format(subject)) + res = await registry_async_client.delete(f"subjects/{subject}") assert res.status_code == 200 assert res.json() == [1, 2] - res = await registry_async_client.get("subjects/{}/versions".format(subject)) + res = await registry_async_client.get(f"subjects/{subject}/versions") assert res.status_code == 404 assert res.json()["error_code"] == 40401 assert res.json()["message"] == f"Subject '{subject}' not found." # Hard delete whole schema - res = await registry_async_client.delete("subjects/{}?permanent=true".format(subject)) + res = await registry_async_client.delete(f"subjects/{subject}?permanent=true") assert res.status_code == 200 assert res.json() == [1, 2] - res = await registry_async_client.get("subjects/{}/versions".format(subject)) + res = await registry_async_client.get(f"subjects/{subject}/versions") assert res.status_code == 404 assert res.json()["error_code"] == 40401 assert res.json()["message"] == f"Subject '{subject}' not found." -async def test_schema_hard_delete_and_recreate(registry_async_client): +async def test_schema_hard_delete_and_recreate(registry_async_client: Client) -> None: subject = create_subject_name_factory("test_schema_hard_delete_and_recreate")() res = await registry_async_client.put("config", json={"compatibility": "BACKWARD"}) assert res.status == 200 @@ -2115,7 +2392,7 @@ async def test_schema_hard_delete_and_recreate(registry_async_client): schema_id = res.json()["id"] # Soft delete whole schema - res = await registry_async_client.delete("subjects/{}".format(subject)) + res = await registry_async_client.delete(f"subjects/{subject}") assert res.status_code == 200 # Recreate with same subject after soft delete @@ -2128,13 +2405,13 @@ async def test_schema_hard_delete_and_recreate(registry_async_client): assert schema_id == res.json()["id"], "the same schema registered, the same identifier" # Soft delete whole schema - res = await registry_async_client.delete("subjects/{}".format(subject)) + res = await registry_async_client.delete(f"subjects/{subject}") assert res.status_code == 200 # Hard delete whole schema - res = await registry_async_client.delete("subjects/{}?permanent=true".format(subject)) + res = await registry_async_client.delete(f"subjects/{subject}?permanent=true") assert res.status_code == 200 - res = await registry_async_client.get("subjects/{}/versions".format(subject)) + res = await registry_async_client.get(f"subjects/{subject}/versions") assert res.status_code == 404 assert res.json()["error_code"] == 40401 assert res.json()["message"] == f"Subject '{subject}' not found." diff --git a/tests/utils.py b/tests/utils.py index 28a164001..41fc65671 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -217,11 +217,15 @@ def new_random_name(prefix: str) -> str: def create_subject_name_factory(prefix: str) -> Callable[[], str]: - return create_id_factory(f"subject-{prefix}") + return create_id_factory(f"subject_{prefix}") def create_field_name_factory(prefix: str) -> Callable[[], str]: - return create_id_factory(f"field-{prefix}") + return create_id_factory(f"field_{prefix}") + + +def create_schema_name_factory(prefix: str) -> Callable[[], str]: + return create_id_factory(f"schema_{prefix}") def create_id_factory(prefix: str) -> Callable[[], str]: