Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API SchemaRegistryClient updates #24529

Merged
merged 15 commits into from
Oct 4, 2021
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public <T> Mono<T> deserializeAsync(InputStream inputStream, TypeReference<T> ty

return this.schemaRegistryClient.getSchema(schemaId)
.handle((registryObject, sink) -> {
byte[] payloadSchema = registryObject.getSchema();
byte[] payloadSchema = registryObject.getContent().getBytes(StandardCharsets.UTF_8);
int start = buffer.position() + buffer.arrayOffset();
int length = buffer.limit() - SCHEMA_ID_SIZE;
byte[] b = Arrays.copyOfRange(buffer.array(), start, start + length);
Expand Down Expand Up @@ -201,8 +201,8 @@ private Mono<String> maybeRegisterSchema(String schemaGroup, String schemaName,
.registerSchema(schemaGroup, schemaName, schemaString, SchemaFormat.AVRO)
.map(SchemaProperties::getSchemaId);
} else {
return this.schemaRegistryClient.getSchemaId(
schemaGroup, schemaName, schemaString, SchemaFormat.AVRO);
return this.schemaRegistryClient.getSchemaProperties(
schemaGroup, schemaName, schemaString, SchemaFormat.AVRO).map(properties -> properties.getSchemaId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.data.schemaregistry.avro.generatedtestsources.PlayingCardSuit;
import com.azure.data.schemaregistry.models.SchemaFormat;
import com.azure.data.schemaregistry.models.SchemaProperties;
import com.azure.data.schemaregistry.models.SchemaRegistrySchema;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.io.DecoderFactory;
Expand Down Expand Up @@ -88,11 +89,10 @@ void testRegistryGuidPrefixedToPayload() throws IOException {
final PlayingCard playingCard = new PlayingCard(true, 10, PlayingCardSuit.DIAMONDS);
final Schema playingClassSchema = PlayingCard.getClassSchema();
final byte[] schemaBytes = playingClassSchema.toString().getBytes(StandardCharsets.UTF_8);
final SchemaProperties registered = new SchemaProperties(MOCK_GUID, SchemaFormat.AVRO,
playingClassSchema.getFullName(), schemaBytes);
final SchemaProperties registered = new SchemaProperties(MOCK_GUID, SchemaFormat.AVRO);

when(client.getSchemaId(MOCK_SCHEMA_GROUP, registered.getSchemaName(), playingClassSchema.toString(),
SchemaFormat.AVRO)).thenReturn(Mono.just(MOCK_GUID));
when(client.getSchemaProperties(MOCK_SCHEMA_GROUP, playingClassSchema.getFullName(),
playingClassSchema.toString(), SchemaFormat.AVRO)).thenReturn(Mono.just(registered));

final SchemaRegistryAvroSerializer serializer = new SchemaRegistryAvroSerializer(client, avroSerializer,
MOCK_SCHEMA_GROUP, false);
Expand Down Expand Up @@ -144,18 +144,23 @@ void testGetSchemaAndDeserialize() throws IOException {
final AvroSerializer decoder = new AvroSerializer(false, parser, ENCODER_FACTORY,
DECODER_FACTORY);
final PlayingCard playingCard = new PlayingCard(true, 10, PlayingCardSuit.DIAMONDS);
final Schema playingClassSchema = PlayingCard.getClassSchema();
final SchemaProperties registered = new SchemaProperties(MOCK_GUID, SchemaFormat.AVRO,
playingClassSchema.getFullName(), playingClassSchema.toString().getBytes(StandardCharsets.UTF_8));
final String playingClassSchema = PlayingCard.getClassSchema().toString();
final SchemaProperties registered = new SchemaProperties(MOCK_GUID, SchemaFormat.AVRO);
final SchemaRegistrySchema registrySchema = new SchemaRegistrySchema(registered, playingClassSchema);
final SchemaRegistryAvroSerializer serializer = new SchemaRegistryAvroSerializer(client, decoder,
MOCK_SCHEMA_GROUP, true);

assertNotNull(registered.getSchema());
assertNotNull(registrySchema.getProperties());

when(client.getSchema(MOCK_GUID)).thenReturn(Mono.just(registered));
when(client.getSchema(MOCK_GUID)).thenReturn(Mono.just(registrySchema));

StepVerifier.create(client.getSchema(MOCK_GUID))
.assertNext(properties -> assertEquals(MOCK_GUID, properties.getSchemaId()))
.assertNext(schema -> {
assertNotNull(schema.getProperties());

assertEquals(playingClassSchema, schema.getContent());
assertEquals(MOCK_GUID, schema.getProperties().getSchemaId());
})
.verifyComplete();

final byte[] serializedPayload = getPayload(playingCard);
Expand Down
38 changes: 19 additions & 19 deletions sdk/schemaregistry/azure-data-schemaregistry/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,23 @@ Set the values of the client ID, tenant ID, and client secret of the AAD applica
##### Async client
<!-- embedme ./src/samples/java/com/azure/data/schemaregistry/ReadmeSamples.java#L24-L29 -->
```java
ic void createAsyncClient() {
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

SchemaRegistryAsyncClient schemaRegistryAsyncClient = new SchemaRegistryClientBuilder()
.endpoint("{schema-registry-endpoint")
.fullyQualifiedNamespace("{schema-registry-endpoint")
.credential(tokenCredential)
.buildAsyncClient();
```

##### Sync client
<!-- embedme ./src/samples/java/com/azure//data/schemaregistry/ReadmeSamples.java#L36-L41 -->
```java
ic void createSyncClient() {
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClientBuilder()
.endpoint("{schema-registry-endpoint")
.fullyQualifiedNamespace("{schema-registry-endpoint")
.credential(tokenCredential)
.buildClient();
```

## Key concepts
Expand Down Expand Up @@ -98,10 +98,11 @@ SchemaRegistry operations. Those exposed properties are `Content` and `Id`.
Register a schema to be stored in the Azure Schema Registry.
<!-- embedme ./src/samples/java/com/azure//data/schemaregistry/ReadmeSamples.java#L48-L70 -->
```java
ic void registerSchema() {
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClientBuilder()
.endpoint("{schema-registry-endpoint")
.fullyQualifiedNamespace("{schema-registry-endpoint")
.credential(tokenCredential)
.buildClient();

Expand All @@ -119,34 +120,37 @@ String schemaContent = "{\n"
+ " ]\n"
+ "}";
SchemaProperties schemaProperties = schemaRegistryClient.registerSchema("{schema-group}", "{schema-name}",
schemaContent, SerializationFormat.AVRO);
System.out.println("Registered schema: " + schemaProperties.getSchemaId());
schemaContent, SchemaFormat.AVRO);
```

### Retrieve a schema ID
Retrieve a previously registered schema ID from the Azure Schema Registry.

<!-- embedme ./src/samples/java/com/azure//data/schemaregistry/ReadmeSamples.java#L77-L85 -->
```java
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClientBuilder()
.endpoint("{schema-registry-endpoint")
.credential(tokenCredential)
.buildClient();

SchemaProperties schemaProperties = schemaRegistryClient.getSchema("{schema-id}");
System.out.println("Retrieved schema: " + schemaProperties.getSchemaName());







```
### Retrieve a schema
Retrieve a previously registered schema's content from the Azure Schema Registry.

<!-- embedme ./src/samples/java/com/azure//data/schemaregistry/ReadmeSamples.java#L92-L114 -->
```java

ample for getting the schema id of a registered schema.

ic void getSchemaId() {
conniey marked this conversation as resolved.
Show resolved Hide resolved
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClientBuilder()
.endpoint("{schema-registry-endpoint")
.fullyQualifiedNamespace("{schema-registry-endpoint")
.credential(tokenCredential)
.buildClient();

Expand All @@ -162,10 +166,6 @@ String schemaContent = "{\n"
+ " \"name\" : \"LastName\", \"type\" : \"string\" \n"
+ " }\n"
+ " ]\n"
+ "}";
String schemaId = schemaRegistryClient.getSchemaId("{schema-group}", "{schema-name}",
schemaContent, SerializationFormat.AVRO);
System.out.println("Retreived schema id: " + schemaId);
```

## Troubleshooting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.implementation.AzureSchemaRegistry;
import com.azure.data.schemaregistry.implementation.models.SchemaId;
import com.azure.data.schemaregistry.implementation.models.SerializationType;
import com.azure.data.schemaregistry.models.SchemaFormat;
import com.azure.data.schemaregistry.models.SchemaProperties;
import com.azure.data.schemaregistry.models.SchemaRegistrySchema;
import reactor.core.publisher.Mono;

import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
Expand Down Expand Up @@ -69,15 +69,15 @@ public String getFullyQualifiedNamespace() {
*
* @param groupName The schema group.
* @param name The schema name.
* @param content The string representation of the schema.
* @param schemaDefinition The string representation of the schema.
* @param schemaFormat The serialization type of this schema.
*
* @return The {@link SchemaProperties} of a successfully registered schema.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<SchemaProperties> registerSchema(String groupName, String name, String content,
public Mono<SchemaProperties> registerSchema(String groupName, String name, String schemaDefinition,
SchemaFormat schemaFormat) {
return registerSchemaWithResponse(groupName, name, content, schemaFormat)
return registerSchemaWithResponse(groupName, name, schemaDefinition, schemaFormat)
.map(Response::getValue);
}

Expand All @@ -87,31 +87,28 @@ public Mono<SchemaProperties> registerSchema(String groupName, String name, Stri
*
* @param groupName The schema group.
* @param name The schema name.
* @param content The string representation of the schema.
* @param schemaDefinition The string representation of the schema.
* @param schemaFormat The serialization type of this schema.
*
* @return The schema properties on successful registration of the schema.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<SchemaProperties>> registerSchemaWithResponse(String groupName, String name, String content,
SchemaFormat schemaFormat) {
return FluxUtil.withContext(context -> registerSchemaWithResponse(groupName, name, content,
public Mono<Response<SchemaProperties>> registerSchemaWithResponse(String groupName, String name,
String schemaDefinition, SchemaFormat schemaFormat) {
return FluxUtil.withContext(context -> registerSchemaWithResponse(groupName, name, schemaDefinition,
schemaFormat, context));
}

Mono<Response<SchemaProperties>> registerSchemaWithResponse(String groupName, String name, String content,
Mono<Response<SchemaProperties>> registerSchemaWithResponse(String groupName, String name, String schemaDefinition,
SchemaFormat schemaFormat, Context context) {
logger.verbose("Registering schema. Group: '{}', name: '{}', serialization type: '{}', payload: '{}'",
groupName, name, schemaFormat, content);
groupName, name, schemaFormat, schemaDefinition);

return this.restService.getSchemas().registerWithResponseAsync(groupName, name,
com.azure.data.schemaregistry.implementation.models.SerializationType.AVRO, content)
return restService.getSchemas().registerWithResponseAsync(groupName, name, getSerialization(schemaFormat),
schemaDefinition)
.handle((response, sink) -> {
SchemaId schemaId = response.getValue();
SchemaProperties registered = new SchemaProperties(schemaId.getId(),
schemaFormat,
name,
content.getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING));
SchemaProperties registered = new SchemaProperties(schemaId.getId(), schemaFormat);

SimpleResponse<SchemaProperties> schemaRegistryObjectSimpleResponse = new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
Expand All @@ -128,7 +125,7 @@ Mono<Response<SchemaProperties>> registerSchemaWithResponse(String groupName, St
* @return The {@link SchemaProperties} associated with the given {@code id}.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<SchemaProperties> getSchema(String id) {
public Mono<SchemaRegistrySchema> getSchema(String id) {
return getSchemaWithResponse(id).map(Response::getValue);
}

Expand All @@ -140,36 +137,23 @@ public Mono<SchemaProperties> getSchema(String id) {
* @return The {@link SchemaProperties} associated with the given {@code id} along with the HTTP response.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<SchemaProperties>> getSchemaWithResponse(String id) {
public Mono<Response<SchemaRegistrySchema>> getSchemaWithResponse(String id) {
return FluxUtil.withContext(context -> getSchemaWithResponse(id, context));
}

Mono<Response<SchemaProperties>> getSchemaWithResponse(String id, Context context) {
Mono<Response<SchemaRegistrySchema>> getSchemaWithResponse(String id, Context context) {
Objects.requireNonNull(id, "'id' should not be null");
return this.restService.getSchemas().getByIdWithResponseAsync(id)
.handle((response, sink) -> {
final SchemaFormat schemaFormat =
SchemaFormat.fromString(response.getDeserializedHeaders().getSchemaType());
final URI location = URI.create(response.getDeserializedHeaders().getLocation());
final Matcher matcher = SCHEMA_PATTERN.matcher(location.getPath());

if (!matcher.lookingAt()) {
sink.error(new IllegalArgumentException("Response location does not contain schema group or"
+ " schema name. Location: " + location.getPath()));

return;
}

final String schemaName = matcher.group("schemaName");
final SchemaProperties schemaObject = new SchemaProperties(id,
schemaFormat,
schemaName,
response.getValue());
final SimpleResponse<SchemaProperties> schemaResponse = new SimpleResponse<>(
final SchemaProperties schemaObject = new SchemaProperties(id, schemaFormat);
final String schemaDefinition = new String(response.getValue(), SCHEMA_REGISTRY_SERVICE_ENCODING);
final SimpleResponse<SchemaRegistrySchema> schemaRegistryResponse = new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
response.getHeaders(), schemaObject);
response.getHeaders(), new SchemaRegistrySchema(schemaObject, schemaDefinition));

sink.next(schemaResponse);
sink.next(schemaRegistryResponse);
});
}

Expand All @@ -179,15 +163,16 @@ Mono<Response<SchemaProperties>> getSchemaWithResponse(String id, Context contex
*
* @param groupName The schema group.
* @param name The schema name.
* @param content The string representation of the schema.
* @param schemaDefinition The string representation of the schema.
* @param schemaFormat The serialization type of this schema.
*
* @return The unique identifier for this schema.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<String> getSchemaId(String groupName, String name, String content,
public Mono<SchemaProperties> getSchemaProperties(String groupName, String name, String schemaDefinition,
SchemaFormat schemaFormat) {
return getSchemaIdWithResponse(groupName, name, content, schemaFormat)

return getSchemaPropertiesWithResponse(groupName, name, schemaDefinition, schemaFormat)
.map(response -> response.getValue());
}

Expand All @@ -196,40 +181,41 @@ public Mono<String> getSchemaId(String groupName, String name, String content,
*
* @param groupName The schema group.
* @param name The schema name.
* @param content The string representation of the schema.
* @param schemaDefinition The string representation of the schema.
* @param schemaFormat The serialization type of this schema.
*
* @return The unique identifier for this schema.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<String>> getSchemaIdWithResponse(String groupName, String name, String content,
SchemaFormat schemaFormat) {
public Mono<Response<SchemaProperties>> getSchemaPropertiesWithResponse(String groupName, String name,
String schemaDefinition, SchemaFormat schemaFormat) {

return FluxUtil.withContext(context ->
getSchemaIdWithResponse(groupName, name, content, schemaFormat, context));
getSchemaPropertiesWithResponse(groupName, name, schemaDefinition, schemaFormat, context));
}

/**
* Gets the schema id associated with the schema name a string representation of the schema.
*
* @param groupName The schema group.
* @param name The schema name.
* @param content The string representation of the schema.
* @param schemaDefinition The string representation of the schema.
* @param schemaFormat The serialization type of this schema.
* @param context Context to pass along with this request.
*
* @return A mono that completes with the schema id.
*/
Mono<Response<String>> getSchemaIdWithResponse(String groupName, String name, String content,
SchemaFormat schemaFormat, Context context) {
Mono<Response<SchemaProperties>> getSchemaPropertiesWithResponse(String groupName, String name,
String schemaDefinition, SchemaFormat schemaFormat, Context context) {

return this.restService.getSchemas()
.queryIdByContentWithResponseAsync(groupName, name, getSerialization(schemaFormat), content)
return restService.getSchemas()
.queryIdByContentWithResponseAsync(groupName, name, getSerialization(schemaFormat), schemaDefinition)
.handle((response, sink) -> {
SchemaId schemaId = response.getValue();
SimpleResponse<String> schemaIdResponse = new SimpleResponse<>(
SchemaProperties properties = new SchemaProperties(schemaId.getId(), schemaFormat);
SimpleResponse<SchemaProperties> schemaIdResponse = new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
response.getHeaders(), schemaId.getId());
response.getHeaders(), properties);

sink.next(schemaIdResponse);
});
Expand All @@ -244,10 +230,9 @@ Mono<Response<String>> getSchemaIdWithResponse(String groupName, String name, St
*
* @throws UnsupportedOperationException if the serialization type is not supported.
*/
private static com.azure.data.schemaregistry.implementation.models.SerializationType getSerialization(
SchemaFormat schemaFormat) {
private static SerializationType getSerialization(SchemaFormat schemaFormat) {
if (schemaFormat == SchemaFormat.AVRO) {
return com.azure.data.schemaregistry.implementation.models.SerializationType.AVRO;
return SerializationType.AVRO;
} else {
throw new UnsupportedOperationException("Serialization type is not supported: " + schemaFormat);
}
Expand Down
Loading