Skip to content

Commit

Permalink
Merge pull request #4 from aiven/master
Browse files Browse the repository at this point in the history
Sync fork
  • Loading branch information
amrutha-shanbhag authored Apr 20, 2021
2 parents 3e5f83b + 863bf4a commit c7cf56f
Show file tree
Hide file tree
Showing 28 changed files with 3,163 additions and 1,174 deletions.
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
SHORT_VER = $(shell git describe --tags --abbrev=0 | cut -f1-)
LONG_VER = $(shell git describe --long 2>/dev/null || echo $(SHORT_VER)-0-unknown-g`git describe --always`)
KAFKA_VERSION=2.4.1
SCALA_VERSION=2.12
KAFKA_PATH = kafka_$(SCALA_VERSION)-$(KAFKA_VERSION)
KAFKA_TAR = $(KAFKA_PATH).tgz
PYTHON_SOURCE_DIRS = karapace/
Expand All @@ -11,6 +9,10 @@ GENERATED = karapace/version.py
PYTHON = python3
DNF_INSTALL = sudo dnf install -y

# Keep these is sync with tests/integration/conftest.py
KAFKA_VERSION=2.7.0
SCALA_VERSION=2.13

KAFKA_IMAGE = karapace-test-kafka
ZK = 2181
KAFKA = 9092
Expand Down
48 changes: 34 additions & 14 deletions container/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,58 @@
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.0.0
image: confluentinc/cp-zookeeper:latest
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-server:6.0.0
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9101:9101" # JMX
- "9092:9092" # Kafka
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
# Listeners:
# PLAINTEXT_HOST -> Expose kafka to the host network
# PLAINTEXT -> Used by kafka for inter broker communication / containers
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://karapace-registry:8081
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
# Metrics:
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
# Keep in sync with tests/integration/conftest.py::configure_and_start_kafka
KAFKA_BROKER_ID: 1
KAFKA_BROKER_RACK: "local"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_INTER_BROKER_PROTOCOL_VERSION: 2.4
KAFKA_LOG_CLEANER_ENABLE: "true"
KAFKA_LOG_MESSAGE_FORMAT_VERSION: 2.4
KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 300000
KAFKA_LOG_SEGMENT_BYTES: 209715200
KAFKA_NUM_IO_THREADS: 8
KAFKA_NUM_NETWORK_THREADS: 112
KAFKA_NUM_PARTITIONS: 1
KAFKA_NUM_REPLICA_FETCHERS: 4
KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400
KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_NUM_PARTITIONS: 16
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 6000
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"

karapace-registry:
image: karapace-registry
Expand All @@ -55,6 +70,7 @@ services:
KARAPACE_REGISTRY_GROUP_ID: karapace-registry
KARAPACE_REGISTRY_MASTER_ELIGIBITY: "true"
KARAPACE_REGISTRY_TOPIC_NAME: _schemas
KARAPACE_REGISTRY_LOG_LEVEL: WARNING

karapace-rest:
image: karapace-rest
Expand All @@ -70,3 +86,7 @@ services:
KARAPACE_REST_BOOTSTRAP_URI: kafka:29092
KARAPACE_REST_REGISTRY_HOST: karapace-registry
KARAPACE_REST_REGISTRY_PORT: 8081
# Keep in sync with tests/integration/conftest.py::fixture_rest_async,
# new entries may need to be added to containers/start.sh
KARAPACE_REST_ADMIN_METADATA_MAX_AGE: 0
KARAPACE_REST_LOG_LEVEL: WARNING
10 changes: 8 additions & 2 deletions container/start.sh
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
#!/bin/bash
set -e

# keep in sync with karapace/config.py
KARAPACE_REGISTRY_PORT_DEFAULT=8081
KARAPACE_REGISTRY_HOST_DEFAULT=0.0.0.0
KARAPACE_REGISTRY_CLIENT_ID_DEFAULT=sr-1
KARAPACE_REGISTRY_GROUP_ID_DEFAULT=schema-registry
KARAPACE_REGISTRY_MASTER_ELIGIBITY_DEFAULT=true
KARAPACE_REGISTRY_TOPIC_NAME_DEFAULT=_schemas
KARAPACE_REGISTRY_LOG_LEVEL_DEFAULT=INFO
# Variables without defaults:
# KARAPACE_REGISTRY_ADVERTISED_HOSTNAME
# KARAPACE_REGISTRY_BOOTSTRAP_URI

# keep in sync with karapace/config.py
KARAPACE_REST_PORT_DEFAULT=8082
KARAPACE_REST_ADMIN_METADATA_MAX_AGE_DEFAULT=5
KARAPACE_REST_HOST_DEFAULT=0.0.0.0
KARAPACE_REST_LOG_LEVEL_DEFAULT=INFO
# Variables without defaults:
# KARAPACE_REST_ADVERTISED_HOSTNAME
# KARAPACE_REST_BOOTSTRAP_URI
Expand All @@ -33,7 +38,7 @@ start_karapace_registry(){
"master_eligibility": ${KARAPACE_REGISTRY_MASTER_ELIGIBITY:-$KARAPACE_REGISTRY_MASTER_ELIGIBITY_DEFAULT},
"topic_name": "${KARAPACE_REGISTRY_TOPIC_NAME:-$KARAPACE_REGISTRY_TOPIC_NAME_DEFAULT}",
"compatibility": "FULL",
"log_level": "INFO",
"log_level": "${KARAPACE_REGISTRY_LOG_LEVEL:-$KARAPACE_REGISTRY_LOG_LEVEL_DEFAULT}",
"replication_factor": 1,
"security_protocol": "PLAINTEXT",
"ssl_cafile": null,
Expand All @@ -56,7 +61,8 @@ start_karapace_rest(){
"registry_port": ${KARAPACE_REST_REGISTRY_PORT},
"host": "${KARAPACE_REST_HOST:-$KARAPACE_REST_HOST_DEFAULT}",
"port": ${KARAPACE_REST_PORT:-$KARAPACE_REST_PORT_DEFAULT},
"log_level": "INFO",
"admin_metadata_max_age": ${KARAPACE_REST_ADMIN_METADATA_MAX_AGE:-$KARAPACE_REST_ADMIN_METADATA_MAX_AGE_DEFAULT},
"log_level": "${KARAPACE_REST_LOG_LEVEL:-$KARAPACE_REST_LOG_LEVEL_DEFAULT}",
"security_protocol": "PLAINTEXT",
"ssl_cafile": null,
"ssl_certfile": null,
Expand Down
69 changes: 54 additions & 15 deletions karapace/compatibility/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@

@unique
class CompatibilityModes(Enum):
""" Supported compatibility modes.
- none: no compatibility checks done.
- backward compatibility: new schema can *read* data produced by the olders
schemas.
- forward compatibility: new schema can *produce* data compatible with old
schemas.
- transitive compatibility: new schema can read data produced by *all*
previous schemas, otherwise only the previous schema is checked.
"""
BACKWARD = "BACKWARD"
BACKWARD_TRANSITIVE = "BACKWARD_TRANSITIVE"
FORWARD = "FORWARD"
Expand Down Expand Up @@ -53,45 +63,74 @@ def check_jsonschema_compatibility(reader: Draft7Validator, writer: Draft7Valida


def check_compatibility(
source: TypedSchema, target: TypedSchema, compatibility_mode: CompatibilityModes
old_schema: TypedSchema, new_schema: TypedSchema, compatibility_mode: CompatibilityModes
) -> SchemaCompatibilityResult:
if source.schema_type is not target.schema_type:
""" Check that `old_schema` and `new_schema` are compatible under `compatibility_mode`. """
if old_schema.schema_type is not new_schema.schema_type:
return SchemaCompatibilityResult.incompatible(
incompat_type=SchemaIncompatibilityType.type_mismatch,
message=f"Comparing different schema types: {source.schema_type} with {target.schema_type}",
message=f"Comparing different schema types: {old_schema.schema_type} with {new_schema.schema_type}",
location=[],
)

if compatibility_mode is CompatibilityModes.NONE:
LOG.info("Compatibility level set to NONE, no schema compatibility checks performed")
return SchemaCompatibilityResult.compatible()

if source.schema_type is SchemaType.AVRO:
if old_schema.schema_type is SchemaType.AVRO:
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = check_avro_compatibility(reader_schema=target.schema, writer_schema=source.schema)
result = check_avro_compatibility(
reader_schema=new_schema.schema,
writer_schema=old_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = check_avro_compatibility(reader_schema=source.schema, writer_schema=target.schema)
result = check_avro_compatibility(
reader_schema=old_schema.schema,
writer_schema=new_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = check_avro_compatibility(reader_schema=target.schema, writer_schema=source.schema)
result = result.merged_with(check_avro_compatibility(reader_schema=source.schema, writer_schema=target.schema))

elif source.schema_type is SchemaType.JSONSCHEMA:
result = check_avro_compatibility(
reader_schema=new_schema.schema,
writer_schema=old_schema.schema,
)
result = result.merged_with(
check_avro_compatibility(
reader_schema=old_schema.schema,
writer_schema=new_schema.schema,
)
)

elif old_schema.schema_type is SchemaType.JSONSCHEMA:
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = check_jsonschema_compatibility(reader=target.schema, writer=source.schema)
result = check_jsonschema_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = check_jsonschema_compatibility(reader=source.schema, writer=target.schema)
result = check_jsonschema_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = check_jsonschema_compatibility(reader=target.schema, writer=source.schema)
result = result.merged_with(check_jsonschema_compatibility(reader=source.schema, writer=target.schema))
result = check_jsonschema_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
result = result.merged_with(
check_jsonschema_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
)
)

else:
result = SchemaCompatibilityResult.incompatible(
incompat_type=SchemaIncompatibilityType.type_mismatch,
message=f"Unknow schema_type {source.schema_type}",
message=f"Unknow schema_type {old_schema.schema_type}",
location=[],
)

Expand Down
Loading

0 comments on commit c7cf56f

Please sign in to comment.