Skip to content

Commit

Permalink
Add check-period to globalId strategy. (#1025)
Browse files Browse the repository at this point in the history
* Add check-period to globalId strategy.

* Add check period test and docs.
  • Loading branch information
alesj authored Nov 25, 2020
1 parent ec76f49 commit ab9c219
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 16 deletions.
50 changes: 42 additions & 8 deletions app/src/test/java/io/apicurio/registry/RegistrySerdeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import io.apicurio.registry.support.Tester;
import io.apicurio.registry.types.ArtifactType;
import io.apicurio.registry.utils.ConcurrentUtil;
import io.apicurio.registry.utils.IoUtil;
import io.apicurio.registry.utils.serde.AbstractKafkaSerDe;
import io.apicurio.registry.utils.serde.AbstractKafkaSerializer;
import io.apicurio.registry.utils.serde.AbstractKafkaStrategyAwareSerDe;
import io.apicurio.registry.utils.serde.AvroEncoding;
import io.apicurio.registry.utils.serde.AvroKafkaDeserializer;
import io.apicurio.registry.utils.serde.AvroKafkaSerializer;
Expand Down Expand Up @@ -94,7 +96,8 @@ public void testGetOrCreate(Supplier<RegistryService> supplier) throws Exception

Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"myrecord3\",\"fields\":[{\"name\":\"bar\",\"type\":\"string\"}]}");
String artifactId = generateArtifactId();
CompletionStage<ArtifactMetaData> csa = client.createArtifact(ArtifactType.AVRO, artifactId, null, new ByteArrayInputStream(schema.toString().getBytes(StandardCharsets.UTF_8)));
byte[] schemaContent = IoUtil.toBytes(schema.toString());
CompletionStage<ArtifactMetaData> csa = client.createArtifact(ArtifactType.AVRO, artifactId, null, new ByteArrayInputStream(schemaContent));
ArtifactMetaData amd = ConcurrentUtil.result(csa);

this.waitForGlobalId(amd.getGlobalId());
Expand Down Expand Up @@ -127,6 +130,37 @@ public void testCachedSchema(Supplier<RegistryService> supplier) throws Exceptio
Assertions.assertEquals(id, idStrategy.findId(service, artifactId, ArtifactType.AVRO, schema));
}

@RegistryServiceTest
public void testCheckPeriod(Supplier<RegistryService> supplier) throws Exception {
RegistryService service = supplier.get();

Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"myrecord5x\",\"fields\":[{\"name\":\"bar\",\"type\":\"string\"}]}");
String artifactId = generateArtifactId();
byte[] schemaContent = IoUtil.toBytes(schema.toString());
CompletionStage<ArtifactMetaData> csa = service.createArtifact(ArtifactType.AVRO, artifactId, null, new ByteArrayInputStream(schemaContent));
ConcurrentUtil.result(csa);

long pc = 5000L; // 5seconds check period ...

Map<String, Object> config = new HashMap<>();
config.put(AbstractKafkaStrategyAwareSerDe.REGISTRY_CHECK_PERIOD_MS_CONFIG_PARAM, String.valueOf(pc));
GlobalIdStrategy<Schema> idStrategy = new FindLatestIdStrategy<>();
idStrategy.configure(config, false);

long id1 = idStrategy.findId(service, artifactId, ArtifactType.AVRO, schema);
service.reset();
long id2 = idStrategy.findId(service, artifactId, ArtifactType.AVRO, schema);
service.reset();
Assertions.assertEquals(id1, id2); // should be less than 5seconds ...
retry(() -> service.getArtifactMetaDataByGlobalId(id2));

service.updateArtifact(artifactId, ArtifactType.AVRO, new ByteArrayInputStream(schemaContent));
Thread.sleep(pc + 1);
retry(() -> Assertions.assertNotEquals(id2, service.getArtifactMetaData(artifactId).getGlobalId()));

Assertions.assertNotEquals(id2, idStrategy.findId(service, artifactId, ArtifactType.AVRO, schema));
}

@SuppressWarnings("unchecked")
@RegistryServiceTest
public void testConfiguration(Supplier<RegistryService> supplier) throws Exception {
Expand Down Expand Up @@ -194,7 +228,7 @@ record = deserializer.deserialize(artifactId, bytes);
@RegistryServiceTest
public void testAvro(Supplier<RegistryService> supplier) throws Exception {
Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"myrecord3\",\"fields\":[{\"name\":\"bar\",\"type\":\"string\"}]}");
try (AvroKafkaSerializer<GenericData.Record> serializer = new AvroKafkaSerializer<GenericData.Record>(supplier.get());
try (AvroKafkaSerializer<GenericData.Record> serializer = new AvroKafkaSerializer<>(supplier.get());
Deserializer<GenericData.Record> deserializer = new AvroKafkaDeserializer<>(supplier.get())) {

serializer.setGlobalIdStrategy(new AutoRegisterIdStrategy<>());
Expand All @@ -218,9 +252,9 @@ public void testAvro(Supplier<RegistryService> supplier) throws Exception {
@RegistryServiceTest
public void testAvroJSON(Supplier<RegistryService> supplier) throws Exception {
Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"myrecord3\",\"fields\":[{\"name\":\"bar\",\"type\":\"string\"}]}");
try (AvroKafkaSerializer<GenericData.Record> serializer = new AvroKafkaSerializer<GenericData.Record>(supplier.get());
try (AvroKafkaSerializer<GenericData.Record> serializer = new AvroKafkaSerializer<>(supplier.get());
Deserializer<GenericData.Record> deserializer = new AvroKafkaDeserializer<>(supplier.get())) {
HashMap<String, String> config = new HashMap();
HashMap<String, String> config = new HashMap<>();
config.put(AvroEncoding.AVRO_ENCODING, AvroEncoding.AVRO_JSON);
serializer.configure(config,false);
deserializer.configure(config, false);
Expand Down Expand Up @@ -250,11 +284,11 @@ public void testAvroJSON(Supplier<RegistryService> supplier) throws Exception {
@RegistryServiceTest
public void testAvroUsingHeaders(Supplier<RegistryService> supplier) throws Exception {
Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"myrecord3\",\"fields\":[{\"name\":\"bar\",\"type\":\"string\"}]}");
try (AvroKafkaSerializer<GenericData.Record> serializer = new AvroKafkaSerializer<GenericData.Record>(supplier.get());
try (AvroKafkaSerializer<GenericData.Record> serializer = new AvroKafkaSerializer<>(supplier.get());
Deserializer<GenericData.Record> deserializer = new AvroKafkaDeserializer<>(supplier.get())) {

serializer.setGlobalIdStrategy(new AutoRegisterIdStrategy<>());
HashMap<String, String> config = new HashMap();
HashMap<String, String> config = new HashMap<>();
config.put(AbstractKafkaSerDe.USE_HEADERS, "true");
serializer.configure(config,false);
deserializer.configure(config, false);
Expand Down Expand Up @@ -284,8 +318,8 @@ public void testAvroUsingHeaders(Supplier<RegistryService> supplier) throws Exce

@RegistryServiceTest
public void testAvroReflect(Supplier<RegistryService> supplier) throws Exception {
try (AvroKafkaSerializer<Tester> serializer = new AvroKafkaSerializer<Tester>(supplier.get());
AvroKafkaDeserializer<Tester> deserializer = new AvroKafkaDeserializer<Tester>(supplier.get())) {
try (AvroKafkaSerializer<Tester> serializer = new AvroKafkaSerializer<>(supplier.get());
AvroKafkaDeserializer<Tester> deserializer = new AvroKafkaDeserializer<>(supplier.get())) {

serializer.setGlobalIdStrategy(new AutoRegisterIdStrategy<>());
serializer.setAvroDatumProvider(new ReflectAvroDatumProvider<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,16 @@ Strategies to return a global ID based on an implementation of `GlobalIdStrategy

`FindLatestIdStrategy`:: Strategy that returns the global ID of the latest schema version, based on an artifact ID.
`FindBySchemaIdStrategy`:: Strategy that matches schema content, based on an artifact ID, to return a global ID.
`CachedSchemaIdStrategy`:: Strategy that caches the schema, and uses the global ID of the cached schema.
`GetOrCreateIdStrategy`:: Strategy that tries to get the latest schema, based on an artifact ID, and if it does not exist, it creates a new schema.
`AutoRegisterIdStrategy`:: Strategy that updates the schema, and uses the global ID of the updated schema.

[discrete]
[id='configuring-globalid-strategy-{context}']
== Configuring global ID strategy
You can configure the following application property:

* apicurio.registry.check-period-ms -- set remote lookup period in milliseconds

You can configure application properties as Java system properties or include them in the Quarkus
application.properties file. For more details, see the https://quarkus.io/guides/config#overriding-properties-at-runtime[Quarkus documentation].
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
public abstract class AbstractKafkaStrategyAwareSerDe<T, S extends AbstractKafkaStrategyAwareSerDe<T, S>> extends AbstractKafkaSerDe<S> {
public static final String REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.artifact-id";
public static final String REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.global-id";
public static final String REGISTRY_CHECK_PERIOD_MS_CONFIG_PARAM = "apicurio.registry.check-period-ms";

private ArtifactIdStrategy<T> artifactIdStrategy;
private GlobalIdStrategy<T> globalIdStrategy;
Expand Down Expand Up @@ -79,5 +80,6 @@ public void configure(Map<String, ?> configs, boolean isKey) {

Object gis = configs.get(REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM);
instantiate(GlobalIdStrategy.class, gis, this::setGlobalIdStrategy);
getGlobalIdStrategy().configure(configs, isKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
import io.apicurio.registry.types.ArtifactType;
import io.apicurio.registry.utils.ConcurrentUtil;

import java.net.HttpURLConnection;
import java.util.concurrent.CompletionStage;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import java.net.HttpURLConnection;
import java.util.concurrent.CompletionStage;

/**
* @author Ales Justin
*/
public abstract class AbstractCrudIdStrategy<T> implements GlobalIdStrategy<T> {
public abstract class AbstractCrudIdStrategy<T> extends CheckPeriodIdStrategy <T> {

protected <R> R unwrap(CompletionStage<R> cs) {
return ConcurrentUtil.result(cs);
Expand All @@ -46,7 +46,7 @@ protected void afterCreateArtifact(T schema, ArtifactMetaData amd) {
}

@Override
public long findId(RegistryService service, String artifactId, ArtifactType artifactType, T schema) {
long findIdInternal(RegistryService service, String artifactId, ArtifactType artifactType, T schema) {
try {
return initialLookup(service, artifactId, artifactType, schema);
} catch (WebApplicationException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2020 Red Hat
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.apicurio.registry.utils.serde.strategy;

import io.apicurio.registry.client.RegistryService;
import io.apicurio.registry.types.ArtifactType;
import io.apicurio.registry.utils.serde.AbstractKafkaStrategyAwareSerDe;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author Ales Justin
*/
public abstract class CheckPeriodIdStrategy<T> implements GlobalIdStrategy<T> {

static class CheckValue {
public CheckValue(long ts, long id) {
this.ts = ts;
this.id = id;
}

long ts;
long id;
}

private long checkPeriod;
private Map<String, CheckValue> checkMap = new ConcurrentHashMap<>();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Object cp = configs.get(AbstractKafkaStrategyAwareSerDe.REGISTRY_CHECK_PERIOD_MS_CONFIG_PARAM);
if (cp != null) {
long checkPeriodParam;
if (cp instanceof Number) {
checkPeriodParam = ((Number) cp).longValue();
} else if (cp instanceof String) {
checkPeriodParam = Long.parseLong((String) cp);
} else if (cp instanceof Duration) {

This comment has been minimized.

Copy link
@pilhuhn

pilhuhn Dec 3, 2020

I think this may not work, as the instanceof String in the check before already triggers.

This comment has been minimized.

Copy link
@EricWittmann

EricWittmann Dec 3, 2020

Member

What do you mean? String and Duration in java are separate classes - why would the string check trigger for a duration?

checkPeriodParam = ((Duration) cp).toMillis();
} else {
throw new IllegalArgumentException("Check period config param type unsupported: " + cp);
}
if (checkPeriodParam < 0) {
throw new IllegalArgumentException("Check period must be non-negative: " + checkPeriodParam);
}
this.checkPeriod = checkPeriodParam;
}
}

abstract long findIdInternal(RegistryService service, String artifactId, ArtifactType artifactType, T schema);

public long findId(RegistryService service, String artifactId, ArtifactType artifactType, T schema) {
CheckValue cv = checkMap.compute(artifactId, (aID, v) -> {
long now = System.currentTimeMillis();
if (v == null) {
long id = findIdInternal(service, artifactId, artifactType, schema);
return new CheckValue(now, id);
} else {
if (v.ts + checkPeriod < now) {
long id = findIdInternal(service, artifactId, artifactType, schema);
v.ts = now;
v.id = id;
}
return v;
}
});
return cv.id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
/**
* @author Ales Justin
*/
public class FindBySchemaIdStrategy<T> implements GlobalIdStrategy<T> {
public class FindBySchemaIdStrategy<T> extends CheckPeriodIdStrategy<T> {
@Override
public long findId(RegistryService service, String artifactId, ArtifactType artifactType, T schema) {
long findIdInternal(RegistryService service, String artifactId, ArtifactType artifactType, T schema) {
ArtifactMetaData amd = service.getArtifactMetaDataByContent(artifactId, toStream(schema));
return amd.getGlobalId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
/**
* @author Ales Justin
*/
public class FindLatestIdStrategy<T> implements GlobalIdStrategy<T> {
public class FindLatestIdStrategy<T> extends CheckPeriodIdStrategy<T> {
@Override
public long findId(RegistryService service, String artifactId, ArtifactType artifactType, T schema) {
long findIdInternal(RegistryService service, String artifactId, ArtifactType artifactType, T schema) {
ArtifactMetaData amd = service.getArtifactMetaData(artifactId);
return amd.getGlobalId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.Map;

/**
* A {@link GlobalIdStrategy} is used by the Kafka serializer/deserializer
Expand All @@ -43,6 +44,15 @@ public interface GlobalIdStrategy<T> {
*/
long findId(RegistryService service, String artifactId, ArtifactType artifactType, T schema);

/**
* Configure, if supported.
*
* @param configs the configs
* @param isKey are we handling key or value
*/
default void configure(Map<String, ?> configs, boolean isKey) {
}

/**
* Create InputStream from schema.
* By default we just take string bytes.
Expand Down

0 comments on commit ab9c219

Please sign in to comment.