diff --git a/app/src/test/java/io/apicurio/registry/RegistrySerdeTest.java b/app/src/test/java/io/apicurio/registry/RegistrySerdeTest.java index f0d299127b..572b045991 100644 --- a/app/src/test/java/io/apicurio/registry/RegistrySerdeTest.java +++ b/app/src/test/java/io/apicurio/registry/RegistrySerdeTest.java @@ -24,8 +24,10 @@ import io.apicurio.registry.support.Tester; import io.apicurio.registry.types.ArtifactType; import io.apicurio.registry.utils.ConcurrentUtil; +import io.apicurio.registry.utils.IoUtil; import io.apicurio.registry.utils.serde.AbstractKafkaSerDe; import io.apicurio.registry.utils.serde.AbstractKafkaSerializer; +import io.apicurio.registry.utils.serde.AbstractKafkaStrategyAwareSerDe; import io.apicurio.registry.utils.serde.AvroEncoding; import io.apicurio.registry.utils.serde.AvroKafkaDeserializer; import io.apicurio.registry.utils.serde.AvroKafkaSerializer; @@ -94,7 +96,8 @@ public void testGetOrCreate(Supplier supplier) throws Exception Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"myrecord3\",\"fields\":[{\"name\":\"bar\",\"type\":\"string\"}]}"); String artifactId = generateArtifactId(); - CompletionStage csa = client.createArtifact(ArtifactType.AVRO, artifactId, null, new ByteArrayInputStream(schema.toString().getBytes(StandardCharsets.UTF_8))); + byte[] schemaContent = IoUtil.toBytes(schema.toString()); + CompletionStage csa = client.createArtifact(ArtifactType.AVRO, artifactId, null, new ByteArrayInputStream(schemaContent)); ArtifactMetaData amd = ConcurrentUtil.result(csa); this.waitForGlobalId(amd.getGlobalId()); @@ -127,6 +130,37 @@ public void testCachedSchema(Supplier supplier) throws Exceptio Assertions.assertEquals(id, idStrategy.findId(service, artifactId, ArtifactType.AVRO, schema)); } + @RegistryServiceTest + public void testCheckPeriod(Supplier supplier) throws Exception { + RegistryService service = supplier.get(); + + Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"myrecord5x\",\"fields\":[{\"name\":\"bar\",\"type\":\"string\"}]}"); + String artifactId = generateArtifactId(); + byte[] schemaContent = IoUtil.toBytes(schema.toString()); + CompletionStage csa = service.createArtifact(ArtifactType.AVRO, artifactId, null, new ByteArrayInputStream(schemaContent)); + ConcurrentUtil.result(csa); + + long pc = 5000L; // 5seconds check period ... + + Map config = new HashMap<>(); + config.put(AbstractKafkaStrategyAwareSerDe.REGISTRY_CHECK_PERIOD_MS_CONFIG_PARAM, String.valueOf(pc)); + GlobalIdStrategy idStrategy = new FindLatestIdStrategy<>(); + idStrategy.configure(config, false); + + long id1 = idStrategy.findId(service, artifactId, ArtifactType.AVRO, schema); + service.reset(); + long id2 = idStrategy.findId(service, artifactId, ArtifactType.AVRO, schema); + service.reset(); + Assertions.assertEquals(id1, id2); // should be less than 5seconds ... + retry(() -> service.getArtifactMetaDataByGlobalId(id2)); + + service.updateArtifact(artifactId, ArtifactType.AVRO, new ByteArrayInputStream(schemaContent)); + Thread.sleep(pc + 1); + retry(() -> Assertions.assertNotEquals(id2, service.getArtifactMetaData(artifactId).getGlobalId())); + + Assertions.assertNotEquals(id2, idStrategy.findId(service, artifactId, ArtifactType.AVRO, schema)); + } + @SuppressWarnings("unchecked") @RegistryServiceTest public void testConfiguration(Supplier supplier) throws Exception { @@ -194,7 +228,7 @@ record = deserializer.deserialize(artifactId, bytes); @RegistryServiceTest public void testAvro(Supplier supplier) throws Exception { Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"myrecord3\",\"fields\":[{\"name\":\"bar\",\"type\":\"string\"}]}"); - try (AvroKafkaSerializer serializer = new AvroKafkaSerializer(supplier.get()); + try (AvroKafkaSerializer serializer = new AvroKafkaSerializer<>(supplier.get()); Deserializer deserializer = new AvroKafkaDeserializer<>(supplier.get())) { serializer.setGlobalIdStrategy(new AutoRegisterIdStrategy<>()); @@ -218,9 +252,9 @@ public void testAvro(Supplier supplier) throws Exception { @RegistryServiceTest public void testAvroJSON(Supplier supplier) throws Exception { Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"myrecord3\",\"fields\":[{\"name\":\"bar\",\"type\":\"string\"}]}"); - try (AvroKafkaSerializer serializer = new AvroKafkaSerializer(supplier.get()); + try (AvroKafkaSerializer serializer = new AvroKafkaSerializer<>(supplier.get()); Deserializer deserializer = new AvroKafkaDeserializer<>(supplier.get())) { - HashMap config = new HashMap(); + HashMap config = new HashMap<>(); config.put(AvroEncoding.AVRO_ENCODING, AvroEncoding.AVRO_JSON); serializer.configure(config,false); deserializer.configure(config, false); @@ -250,11 +284,11 @@ public void testAvroJSON(Supplier supplier) throws Exception { @RegistryServiceTest public void testAvroUsingHeaders(Supplier supplier) throws Exception { Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"myrecord3\",\"fields\":[{\"name\":\"bar\",\"type\":\"string\"}]}"); - try (AvroKafkaSerializer serializer = new AvroKafkaSerializer(supplier.get()); + try (AvroKafkaSerializer serializer = new AvroKafkaSerializer<>(supplier.get()); Deserializer deserializer = new AvroKafkaDeserializer<>(supplier.get())) { serializer.setGlobalIdStrategy(new AutoRegisterIdStrategy<>()); - HashMap config = new HashMap(); + HashMap config = new HashMap<>(); config.put(AbstractKafkaSerDe.USE_HEADERS, "true"); serializer.configure(config,false); deserializer.configure(config, false); @@ -284,8 +318,8 @@ public void testAvroUsingHeaders(Supplier supplier) throws Exce @RegistryServiceTest public void testAvroReflect(Supplier supplier) throws Exception { - try (AvroKafkaSerializer serializer = new AvroKafkaSerializer(supplier.get()); - AvroKafkaDeserializer deserializer = new AvroKafkaDeserializer(supplier.get())) { + try (AvroKafkaSerializer serializer = new AvroKafkaSerializer<>(supplier.get()); + AvroKafkaDeserializer deserializer = new AvroKafkaDeserializer<>(supplier.get())) { serializer.setGlobalIdStrategy(new AutoRegisterIdStrategy<>()); serializer.setAvroDatumProvider(new ReflectAvroDatumProvider<>()); diff --git a/docs/modules/ROOT/partials/getting-started/con-registry-serdes-strategy.adoc b/docs/modules/ROOT/partials/getting-started/con-registry-serdes-strategy.adoc index 32352d5d77..ecb796226a 100644 --- a/docs/modules/ROOT/partials/getting-started/con-registry-serdes-strategy.adoc +++ b/docs/modules/ROOT/partials/getting-started/con-registry-serdes-strategy.adoc @@ -64,5 +64,16 @@ Strategies to return a global ID based on an implementation of `GlobalIdStrategy `FindLatestIdStrategy`:: Strategy that returns the global ID of the latest schema version, based on an artifact ID. `FindBySchemaIdStrategy`:: Strategy that matches schema content, based on an artifact ID, to return a global ID. +`CachedSchemaIdStrategy`:: Strategy that caches the schema, and uses the global ID of the cached schema. `GetOrCreateIdStrategy`:: Strategy that tries to get the latest schema, based on an artifact ID, and if it does not exist, it creates a new schema. `AutoRegisterIdStrategy`:: Strategy that updates the schema, and uses the global ID of the updated schema. + +[discrete] +[id='configuring-globalid-strategy-{context}'] +== Configuring global ID strategy +You can configure the following application property: + +* apicurio.registry.check-period-ms -- set remote lookup period in milliseconds + +You can configure application properties as Java system properties or include them in the Quarkus +application.properties file. For more details, see the https://quarkus.io/guides/config#overriding-properties-at-runtime[Quarkus documentation]. \ No newline at end of file diff --git a/utils/serde/src/main/java/io/apicurio/registry/utils/serde/AbstractKafkaStrategyAwareSerDe.java b/utils/serde/src/main/java/io/apicurio/registry/utils/serde/AbstractKafkaStrategyAwareSerDe.java index 3042ed851a..2d55e9357d 100644 --- a/utils/serde/src/main/java/io/apicurio/registry/utils/serde/AbstractKafkaStrategyAwareSerDe.java +++ b/utils/serde/src/main/java/io/apicurio/registry/utils/serde/AbstractKafkaStrategyAwareSerDe.java @@ -31,6 +31,7 @@ public abstract class AbstractKafkaStrategyAwareSerDe> extends AbstractKafkaSerDe { public static final String REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.artifact-id"; public static final String REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.global-id"; + public static final String REGISTRY_CHECK_PERIOD_MS_CONFIG_PARAM = "apicurio.registry.check-period-ms"; private ArtifactIdStrategy artifactIdStrategy; private GlobalIdStrategy globalIdStrategy; @@ -79,5 +80,6 @@ public void configure(Map configs, boolean isKey) { Object gis = configs.get(REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM); instantiate(GlobalIdStrategy.class, gis, this::setGlobalIdStrategy); + getGlobalIdStrategy().configure(configs, isKey); } } diff --git a/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/AbstractCrudIdStrategy.java b/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/AbstractCrudIdStrategy.java index 1acb5b860f..54edda26e5 100644 --- a/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/AbstractCrudIdStrategy.java +++ b/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/AbstractCrudIdStrategy.java @@ -22,15 +22,15 @@ import io.apicurio.registry.types.ArtifactType; import io.apicurio.registry.utils.ConcurrentUtil; -import java.net.HttpURLConnection; -import java.util.concurrent.CompletionStage; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; +import java.net.HttpURLConnection; +import java.util.concurrent.CompletionStage; /** * @author Ales Justin */ -public abstract class AbstractCrudIdStrategy implements GlobalIdStrategy { +public abstract class AbstractCrudIdStrategy extends CheckPeriodIdStrategy { protected R unwrap(CompletionStage cs) { return ConcurrentUtil.result(cs); @@ -46,7 +46,7 @@ protected void afterCreateArtifact(T schema, ArtifactMetaData amd) { } @Override - public long findId(RegistryService service, String artifactId, ArtifactType artifactType, T schema) { + long findIdInternal(RegistryService service, String artifactId, ArtifactType artifactType, T schema) { try { return initialLookup(service, artifactId, artifactType, schema); } catch (WebApplicationException e) { diff --git a/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/CheckPeriodIdStrategy.java b/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/CheckPeriodIdStrategy.java new file mode 100644 index 0000000000..7ba96fd3ef --- /dev/null +++ b/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/CheckPeriodIdStrategy.java @@ -0,0 +1,85 @@ +/* + * Copyright 2020 Red Hat + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.apicurio.registry.utils.serde.strategy; + +import io.apicurio.registry.client.RegistryService; +import io.apicurio.registry.types.ArtifactType; +import io.apicurio.registry.utils.serde.AbstractKafkaStrategyAwareSerDe; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author Ales Justin + */ +public abstract class CheckPeriodIdStrategy implements GlobalIdStrategy { + + static class CheckValue { + public CheckValue(long ts, long id) { + this.ts = ts; + this.id = id; + } + + long ts; + long id; + } + + private long checkPeriod; + private Map checkMap = new ConcurrentHashMap<>(); + + @Override + public void configure(Map configs, boolean isKey) { + Object cp = configs.get(AbstractKafkaStrategyAwareSerDe.REGISTRY_CHECK_PERIOD_MS_CONFIG_PARAM); + if (cp != null) { + long checkPeriodParam; + if (cp instanceof Number) { + checkPeriodParam = ((Number) cp).longValue(); + } else if (cp instanceof String) { + checkPeriodParam = Long.parseLong((String) cp); + } else if (cp instanceof Duration) { + checkPeriodParam = ((Duration) cp).toMillis(); + } else { + throw new IllegalArgumentException("Check period config param type unsupported: " + cp); + } + if (checkPeriodParam < 0) { + throw new IllegalArgumentException("Check period must be non-negative: " + checkPeriodParam); + } + this.checkPeriod = checkPeriodParam; + } + } + + abstract long findIdInternal(RegistryService service, String artifactId, ArtifactType artifactType, T schema); + + public long findId(RegistryService service, String artifactId, ArtifactType artifactType, T schema) { + CheckValue cv = checkMap.compute(artifactId, (aID, v) -> { + long now = System.currentTimeMillis(); + if (v == null) { + long id = findIdInternal(service, artifactId, artifactType, schema); + return new CheckValue(now, id); + } else { + if (v.ts + checkPeriod < now) { + long id = findIdInternal(service, artifactId, artifactType, schema); + v.ts = now; + v.id = id; + } + return v; + } + }); + return cv.id; + } +} diff --git a/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/FindBySchemaIdStrategy.java b/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/FindBySchemaIdStrategy.java index f0db4b2c40..a43c7dace7 100644 --- a/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/FindBySchemaIdStrategy.java +++ b/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/FindBySchemaIdStrategy.java @@ -23,9 +23,9 @@ /** * @author Ales Justin */ -public class FindBySchemaIdStrategy implements GlobalIdStrategy { +public class FindBySchemaIdStrategy extends CheckPeriodIdStrategy { @Override - public long findId(RegistryService service, String artifactId, ArtifactType artifactType, T schema) { + long findIdInternal(RegistryService service, String artifactId, ArtifactType artifactType, T schema) { ArtifactMetaData amd = service.getArtifactMetaDataByContent(artifactId, toStream(schema)); return amd.getGlobalId(); } diff --git a/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/FindLatestIdStrategy.java b/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/FindLatestIdStrategy.java index 43be8a3fab..d0b923ac84 100644 --- a/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/FindLatestIdStrategy.java +++ b/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/FindLatestIdStrategy.java @@ -23,9 +23,9 @@ /** * @author Ales Justin */ -public class FindLatestIdStrategy implements GlobalIdStrategy { +public class FindLatestIdStrategy extends CheckPeriodIdStrategy { @Override - public long findId(RegistryService service, String artifactId, ArtifactType artifactType, T schema) { + long findIdInternal(RegistryService service, String artifactId, ArtifactType artifactType, T schema) { ArtifactMetaData amd = service.getArtifactMetaData(artifactId); return amd.getGlobalId(); } diff --git a/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/GlobalIdStrategy.java b/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/GlobalIdStrategy.java index 927234f05c..cbd4da63ae 100644 --- a/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/GlobalIdStrategy.java +++ b/utils/serde/src/main/java/io/apicurio/registry/utils/serde/strategy/GlobalIdStrategy.java @@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; +import java.util.Map; /** * A {@link GlobalIdStrategy} is used by the Kafka serializer/deserializer @@ -43,6 +44,15 @@ public interface GlobalIdStrategy { */ long findId(RegistryService service, String artifactId, ArtifactType artifactType, T schema); + /** + * Configure, if supported. + * + * @param configs the configs + * @param isKey are we handling key or value + */ + default void configure(Map configs, boolean isKey) { + } + /** * Create InputStream from schema. * By default we just take string bytes.