Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating Schema Registry to cache responses. Adding tests #23565

Merged
merged 38 commits into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
5541d53
Make maxCacheSize package-private.
conniey Aug 12, 2021
7f8cd6c
Removes references to cache removal.
conniey Aug 12, 2021
a0a4ab1
Move max/min sizes into builder.
conniey Aug 12, 2021
b8900d6
Make SchemaRegistryClient's get with Response APIs package-private.
conniey Aug 12, 2021
dbc8cb5
Make SchemaRegistryAsyncClient's get with Response APIs package-private.
conniey Aug 12, 2021
1b287b3
Adding ErrorResponse.
conniey Aug 12, 2021
ff6560d
Remove null checks (they're not needed according to spec).
conniey Aug 12, 2021
4b55453
(WIP)
conniey Aug 12, 2021
66ee1a7
Returning HttpResponses when 404 or 400 is returned.
conniey Aug 13, 2021
405f230
Change content type to text/plain because it does not like applicatio…
conniey Aug 13, 2021
a00b577
Adding to idCache when registering/getting schema.
conniey Aug 13, 2021
87cee52
Adding integration tests.
conniey Aug 13, 2021
9468817
Fixing expectations.
conniey Aug 13, 2021
cd53c3c
Update CHANGELOG.md
conniey Aug 13, 2021
555aa2f
Fix headers because they are no longer prefaced with x-
conniey Aug 13, 2021
d857f38
Fix response body to return byte[] so we stop deserializing it.
conniey Aug 13, 2021
a071764
Make SerializationType case-insensitive.
conniey Aug 13, 2021
bb6e6e2
Fix response to decode byte[]
conniey Aug 13, 2021
dbd5d0d
Update assertions.
conniey Aug 13, 2021
ea303d2
Update based on comments.
conniey Aug 13, 2021
234a8fb
Remove unused class. Add test for 404.
conniey Aug 13, 2021
af4f3f5
Fix assertion logic.
conniey Aug 13, 2021
29a2ec1
Updating tests.
conniey Aug 13, 2021
9fbafc7
Adding one more test for querying.
conniey Aug 13, 2021
a7d1d8d
Make correct ones package-private.
conniey Aug 13, 2021
19e2d84
Remove mockito tests.
conniey Aug 13, 2021
7df7847
Rename session-records.
conniey Aug 13, 2021
0eed94f
Fixing checkstyle.
conniey Aug 13, 2021
61c7d3e
Remove uncalled method.
conniey Aug 13, 2021
3f7dbed
Remove initializeBuilder()
conniey Aug 15, 2021
69dc5b1
Replace with asyncClient calls.
conniey Aug 15, 2021
60ff36e
Updating headers.
conniey Aug 15, 2021
98c73e4
Adding SchemaRegistryAsyncClient tests.
conniey Aug 15, 2021
cdcf7a9
Adding session-records.json
conniey Aug 15, 2021
b7adf08
Fixing extraction of schemaName.
conniey Aug 16, 2021
1eb1874
Fixing mockito issues.
conniey Aug 16, 2021
4db474f
FIxing checkstyle issues.
conniey Aug 16, 2021
c66c13a
Fixing checkstyle issues.
conniey Aug 16, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions sdk/schemaregistry/azure-data-schemaregistry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## 1.0.0-beta.5 (Unreleased)

### Features added
- Caching schemas based on schemaId.

### Breaking changes
- 4xx responses return their respective `HttpResponseException` rather than `IllegalStateException`
- Removed `Response<T>` overloads for getSchema and getSchemaId because response could be cached.
- `SchemaRegistryClientBuilder.maxCacheSize` is package-private.

## 1.0.0-beta.4 (2020-09-21)
- Minor code cleanup and refactoring
Expand Down
6 changes: 6 additions & 0 deletions sdk/schemaregistry/azure-data-schemaregistry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@
<version>1.10.1</version> <!-- {x-version-update;org.apache.avro:avro;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-test</artifactId>
<version>1.7.0</version> <!-- {x-version-update;com.azure:azure-core-test;dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,24 @@
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.implementation.AzureSchemaRegistry;
import com.azure.data.schemaregistry.implementation.models.SchemaId;
import com.azure.data.schemaregistry.models.SchemaProperties;
import com.azure.data.schemaregistry.models.SerializationType;
import reactor.core.publisher.Mono;

import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Function;

import com.azure.data.schemaregistry.implementation.AzureSchemaRegistry;
import com.azure.data.schemaregistry.implementation.models.SchemaId;
import reactor.core.publisher.Mono;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* HTTP-based client that interacts with Azure Schema Registry service to store and retrieve schemas on demand.
Expand All @@ -32,12 +35,10 @@
@ServiceClient(builder = SchemaRegistryClientBuilder.class, isAsync = true)
public final class SchemaRegistryAsyncClient {

private final ClientLogger logger = new ClientLogger(SchemaRegistryAsyncClient.class);

static final Charset SCHEMA_REGISTRY_SERVICE_ENCODING = StandardCharsets.UTF_8;
static final int MAX_SCHEMA_MAP_SIZE_DEFAULT = 1000;
static final int MAX_SCHEMA_MAP_SIZE_MINIMUM = 10;

private static final Pattern SCHEMA_PATTERN = Pattern.compile("/\\$schemagroups/(?<schemaGroup>.+)/schemas/(?<schemaName>.+?)/");
private final ClientLogger logger = new ClientLogger(SchemaRegistryAsyncClient.class);
private final AzureSchemaRegistry restService;
private final Integer maxSchemaMapSize;
private final ConcurrentSkipListMap<String, Function<String, Object>> typeParserMap;
Expand Down Expand Up @@ -69,15 +70,6 @@ public final class SchemaRegistryAsyncClient {
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<SchemaProperties> registerSchema(
String schemaGroup, String schemaName, String schemaString, SerializationType serializationType) {

if (schemaStringCache.containsKey(getSchemaStringCacheKey(schemaGroup, schemaName, schemaString))) {
logger.verbose(
"Cache hit schema string. Group: '{}', name: '{}', schema type: '{}', payload: '{}'",
schemaGroup, schemaName, serializationType, schemaString);
return Mono.fromCallable(
() -> schemaStringCache.get(getSchemaStringCacheKey(schemaGroup, schemaName, schemaString)));
}

return registerSchemaWithResponse(schemaGroup, schemaName, schemaString, serializationType)
.map(Response::getValue);
}
Expand All @@ -95,42 +87,29 @@ public Mono<SchemaProperties> registerSchema(
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<SchemaProperties>> registerSchemaWithResponse(String schemaGroup, String schemaName,
String schemaString, SerializationType serializationType) {
return registerSchemaWithResponse(schemaGroup, schemaName, schemaString, serializationType, Context.NONE);
String schemaString, SerializationType serializationType) {
return FluxUtil.withContext(context -> registerSchemaWithResponse(schemaGroup, schemaName, schemaString,
serializationType, context));
}

Mono<Response<SchemaProperties>> registerSchemaWithResponse(String schemaGroup, String schemaName,
String schemaString, SerializationType serializationType, Context context) {
logger.verbose(
"Registering schema. Group: '{}', name: '{}', serialization type: '{}', payload: '{}'",
logger.verbose("Registering schema. Group: '{}', name: '{}', serialization type: '{}', payload: '{}'",
schemaGroup, schemaName, serializationType, schemaString);

return this.restService
.getSchemas().registerWithResponseAsync(schemaGroup, schemaName,
com.azure.data.schemaregistry.implementation.models.SerializationType.AVRO, schemaString)
return this.restService.getSchemas().registerWithResponseAsync(schemaGroup, schemaName,
com.azure.data.schemaregistry.implementation.models.SerializationType.AVRO, schemaString)
.handle((response, sink) -> {
if (response == null) {
sink.error(logger.logExceptionAsError(
new NullPointerException("Client returned null response")));
return;
}

if (response.getStatusCode() == 400) {
sink.error(logger.logExceptionAsError(
new IllegalStateException("Invalid schema registration attempted")));
return;
}

SchemaId schemaId = response.getValue();

SchemaProperties registered = new SchemaProperties(schemaId.getId(),
serializationType,
schemaName,
schemaString.getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING));

resetIfNeeded();
schemaStringCache
.putIfAbsent(getSchemaStringCacheKey(schemaGroup, schemaName, schemaString), registered);
schemaStringCache.putIfAbsent(getSchemaStringCacheKey(schemaGroup, schemaName, schemaString),
registered);
idCache.putIfAbsent(schemaId.getId(), registered);

logger.verbose("Cached schema string. Group: '{}', name: '{}'", schemaGroup, schemaName);
SimpleResponse<SchemaProperties> schemaRegistryObjectSimpleResponse = new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
Expand Down Expand Up @@ -161,47 +140,51 @@ public Mono<SchemaProperties> getSchema(String schemaId) {
* @return The {@link SchemaProperties} associated with the given {@code schemaId} along with the HTTP
* response.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<SchemaProperties>> getSchemaWithResponse(String schemaId) {
return getSchemaWithResponse(schemaId, Context.NONE);
Mono<Response<SchemaProperties>> getSchemaWithResponse(String schemaId) {
conniey marked this conversation as resolved.
Show resolved Hide resolved
return FluxUtil.withContext(context -> getSchemaWithResponse(schemaId, context));
}

Mono<Response<SchemaProperties>> getSchemaWithResponse(String schemaId, Context context) {
Objects.requireNonNull(schemaId, "'schemaId' should not be null");
return this.restService.getSchemas().getByIdWithResponseAsync(schemaId)
.handle((response, sink) -> {
if (response == null) {
sink.error(logger.logExceptionAsError(
new NullPointerException("Client returned null response")));
return;
}
final SerializationType serializationType =
SerializationType.fromString(response.getDeserializedHeaders().getSchemaType());
final URI location = URI.create(response.getDeserializedHeaders().getLocation());
final Matcher matcher = SCHEMA_PATTERN.matcher(location.getPath());

if (!matcher.lookingAt()) {
sink.error(new IllegalArgumentException("Response location does not contain schema group or"
+ " schema name. Location: " + location.getPath()));

if (response.getStatusCode() == 404) {
sink.error(logger.logExceptionAsError(
new IllegalStateException(String.format("Schema does not exist, id %s", schemaId))));
return;
}

SerializationType serializationType =
SerializationType.fromString(response.getDeserializedHeaders().getXSchemaType());

SchemaProperties schemaObject = new SchemaProperties(schemaId,
final String schemaGroup = matcher.group("schemaGroup");
final String schemaName = matcher.group("schemaName");
final SchemaProperties schemaObject = new SchemaProperties(schemaId,
serializationType,
null,
response.getValue().getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING));
schemaName,
response.getValue());
final String schemaCacheKey = getSchemaStringCacheKey(schemaGroup, schemaName,
new String(response.getValue(), SCHEMA_REGISTRY_SERVICE_ENCODING));

resetIfNeeded();
schemaStringCache.putIfAbsent(schemaCacheKey, schemaObject);
idCache.putIfAbsent(schemaId, schemaObject);

logger.verbose("Cached schema object. Path: '{}'", schemaId);
SimpleResponse<SchemaProperties> schemaRegistryObjectSimpleResponse = new SimpleResponse<>(

SimpleResponse<SchemaProperties> schemaResponse = new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
response.getHeaders(), schemaObject);
sink.next(schemaRegistryObjectSimpleResponse);

sink.next(schemaResponse);
});
}

/**
* Gets the schema identifier associated with the given schema.
* Gets the schema identifier associated with the given schema. Gets a cached value if it exists, otherwise makes a
* call to the service.
*
* @param schemaGroup The schema group.
* @param schemaName The schema name.
Expand All @@ -212,20 +195,23 @@ Mono<Response<SchemaProperties>> getSchemaWithResponse(String schemaId, Context
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<String> getSchemaId(String schemaGroup, String schemaName, String schemaString,
SerializationType serializationType) {
SerializationType serializationType) {

String schemaStringCacheKey = getSchemaStringCacheKey(schemaGroup, schemaName, schemaString);

if (schemaStringCache.containsKey(schemaStringCacheKey)) {
logger.verbose("Cache hit schema string. Group: '{}', name: '{}'", schemaGroup, schemaName);
return Mono.fromCallable(() -> schemaStringCache.get(schemaStringCacheKey).getSchemaId());
return Mono.fromCallable(() -> {
logger.verbose("Cache hit schema string. Group: '{}', name: '{}'", schemaGroup, schemaName);
return schemaStringCache.get(schemaStringCacheKey).getSchemaId();
});
}

return getSchemaIdWithResponse(schemaGroup, schemaName, schemaString, serializationType)
.map(response -> response.getValue());
}

/**
* Gets the schema identifier associated with the given schema.
* Gets the schema identifier associated with the given schema. Always makes a call to the service.
*
* @param schemaGroup The schema group.
* @param schemaName The schema name.
Expand All @@ -234,41 +220,38 @@ public Mono<String> getSchemaId(String schemaGroup, String schemaName, String sc
*
* @return The unique identifier for this schema.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<String>> getSchemaIdWithResponse(String schemaGroup, String schemaName, String schemaString,
SerializationType serializationType) {
return getSchemaIdWithResponse(schemaGroup, schemaName, schemaString, serializationType, Context.NONE);
Mono<Response<String>> getSchemaIdWithResponse(String schemaGroup, String schemaName, String schemaString,
SerializationType serializationType) {

return FluxUtil.withContext(context ->
getSchemaIdWithResponse(schemaGroup, schemaName, schemaString, serializationType, context));
}

/**
* Gets the schema id associated with the schema name a string representation of the schema.
*
* @param schemaGroup The schema group.
* @param schemaName The schema name.
* @param schemaString The string representation of the schema.
* @param serializationType The serialization type of this schema.
* @param context Context to pass along with this request.
* @return A mono that completes with the schema id.
*/
Mono<Response<String>> getSchemaIdWithResponse(String schemaGroup, String schemaName, String schemaString,
SerializationType serializationType, Context context) {
SerializationType serializationType, Context context) {

return this.restService.getSchemas()
.queryIdByContentWithResponseAsync(schemaGroup, schemaName,
com.azure.data.schemaregistry.implementation.models.SerializationType.AVRO, schemaString)
.handle((response, sink) -> {
if (response == null) {
sink.error(logger.logExceptionAsError(
new NullPointerException("Client returned null response")));
return;
}

if (response.getStatusCode() == 404) {
sink.error(
logger.logExceptionAsError(new IllegalStateException("Existing matching schema not found.")));
return;
}

SchemaId schemaId = response.getValue();
SchemaProperties properties = new SchemaProperties(schemaId.getId(), serializationType, schemaName,
schemaString.getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING));

resetIfNeeded();
schemaStringCache.putIfAbsent(
getSchemaStringCacheKey(schemaGroup, schemaName, schemaString),
new SchemaProperties(
schemaId.getId(),
serializationType,
schemaName,
schemaString.getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING)));
getSchemaStringCacheKey(schemaGroup, schemaName, schemaString), properties);
idCache.putIfAbsent(schemaId.getId(), properties);

logger.verbose("Cached schema string. Group: '{}', name: '{}'", schemaGroup, schemaName);

SimpleResponse<String> schemaIdResponse = new SimpleResponse<>(
Expand All @@ -287,24 +270,7 @@ void clearCache() {
typeParserMap.clear();
}

// TODO: max age for schema maps? or will schemas always be immutable?

/**
* Checks if caches should be reinitialized to satisfy initial configuration
*/
private void resetIfNeeded() {
// todo add verbose log
if (idCache.size() > this.maxSchemaMapSize) {
idCache.clear();
logger.verbose("Cleared schema ID cache.");
}
if (schemaStringCache.size() > this.maxSchemaMapSize) {
schemaStringCache.clear();
logger.verbose("Cleared schema string cache.");
}
}

private String getSchemaStringCacheKey(String schemaGroup, String schemaName, String schemaString) {
private static String getSchemaStringCacheKey(String schemaGroup, String schemaName, String schemaString) {
return schemaGroup + schemaName + schemaString;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ public final class SchemaRegistryClient {
@ServiceMethod(returns = ReturnType.SINGLE)
public SchemaProperties registerSchema(String schemaGroup, String schemaName, String schemaString,
SerializationType serializationType) {
return registerSchemaWithResponse(schemaGroup, schemaName, schemaString, serializationType, Context.NONE)
.getValue();
return this.asyncClient.registerSchema(schemaGroup, schemaName, schemaString, serializationType).block();
}

/**
Expand Down Expand Up @@ -67,7 +66,7 @@ public Response<SchemaProperties> registerSchemaWithResponse(String schemaGroup,
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public SchemaProperties getSchema(String schemaId) {
return getSchemaWithResponse(schemaId, Context.NONE).getValue();
return this.asyncClient.getSchema(schemaId).block();
}

/**
Expand All @@ -77,8 +76,7 @@ public SchemaProperties getSchema(String schemaId) {
* @return The {@link SchemaProperties} associated with the given {@code schemaId} along with the HTTP
* response.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<SchemaProperties> getSchemaWithResponse(String schemaId, Context context) {
Response<SchemaProperties> getSchemaWithResponse(String schemaId, Context context) {
conniey marked this conversation as resolved.
Show resolved Hide resolved
return this.asyncClient.getSchemaWithResponse(schemaId, context).block();
}

Expand All @@ -95,8 +93,7 @@ public Response<SchemaProperties> getSchemaWithResponse(String schemaId, Context
@ServiceMethod(returns = ReturnType.SINGLE)
public String getSchemaId(String schemaGroup, String schemaName, String schemaString,
SerializationType serializationType) {
return getSchemaIdWithResponse(schemaGroup, schemaName, schemaString, serializationType, Context.NONE)
.getValue();
return this.asyncClient.getSchemaId(schemaGroup, schemaName, schemaString, serializationType).block();
}

/**
Expand All @@ -109,8 +106,7 @@ public String getSchemaId(String schemaGroup, String schemaName, String schemaSt
* @param context The context to pass to the Http pipeline.
* @return The unique identifier for this schema.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<String> getSchemaIdWithResponse(String schemaGroup, String schemaName, String schemaString,
Response<String> getSchemaIdWithResponse(String schemaGroup, String schemaName, String schemaString,
conniey marked this conversation as resolved.
Show resolved Hide resolved
SerializationType serializationType, Context context) {
return this.asyncClient
.getSchemaIdWithResponse(schemaGroup, schemaName, schemaString, serializationType, context).block();
Expand All @@ -119,5 +115,4 @@ public Response<String> getSchemaIdWithResponse(String schemaGroup, String schem
void clearCache() {
this.asyncClient.clearCache();
}

}
Loading