Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API SchemaRegistryClient updates #24529

Merged
merged 15 commits into from
Oct 4, 2021
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public <T> Mono<T> deserializeAsync(InputStream inputStream, TypeReference<T> ty

return this.schemaRegistryClient.getSchema(schemaId)
.handle((registryObject, sink) -> {
byte[] payloadSchema = registryObject.getSchema();
byte[] payloadSchema = registryObject.getSchemaDefinition().getBytes(StandardCharsets.UTF_8);
int start = buffer.position() + buffer.arrayOffset();
int length = buffer.limit() - SCHEMA_ID_SIZE;
byte[] b = Arrays.copyOfRange(buffer.array(), start, start + length);
Expand Down Expand Up @@ -201,8 +201,8 @@ private Mono<String> maybeRegisterSchema(String schemaGroup, String schemaName,
.registerSchema(schemaGroup, schemaName, schemaString, SchemaFormat.AVRO)
.map(SchemaProperties::getSchemaId);
} else {
return this.schemaRegistryClient.getSchemaId(
schemaGroup, schemaName, schemaString, SchemaFormat.AVRO);
return this.schemaRegistryClient.getSchemaProperties(
schemaGroup, schemaName, schemaString, SchemaFormat.AVRO).map(properties -> properties.getSchemaId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.data.schemaregistry.avro.generatedtestsources.PlayingCardSuit;
import com.azure.data.schemaregistry.models.SchemaFormat;
import com.azure.data.schemaregistry.models.SchemaProperties;
import com.azure.data.schemaregistry.models.SchemaRegistrySchema;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.io.DecoderFactory;
Expand Down Expand Up @@ -88,11 +89,10 @@ void testRegistryGuidPrefixedToPayload() throws IOException {
final PlayingCard playingCard = new PlayingCard(true, 10, PlayingCardSuit.DIAMONDS);
final Schema playingClassSchema = PlayingCard.getClassSchema();
final byte[] schemaBytes = playingClassSchema.toString().getBytes(StandardCharsets.UTF_8);
final SchemaProperties registered = new SchemaProperties(MOCK_GUID, SchemaFormat.AVRO,
playingClassSchema.getFullName(), schemaBytes);
final SchemaProperties registered = new SchemaProperties(MOCK_GUID, SchemaFormat.AVRO);

when(client.getSchemaId(MOCK_SCHEMA_GROUP, registered.getSchemaName(), playingClassSchema.toString(),
SchemaFormat.AVRO)).thenReturn(Mono.just(MOCK_GUID));
when(client.getSchemaProperties(MOCK_SCHEMA_GROUP, playingClassSchema.getFullName(),
playingClassSchema.toString(), SchemaFormat.AVRO)).thenReturn(Mono.just(registered));

final SchemaRegistryAvroSerializer serializer = new SchemaRegistryAvroSerializer(client, avroSerializer,
MOCK_SCHEMA_GROUP, false);
Expand Down Expand Up @@ -144,18 +144,23 @@ void testGetSchemaAndDeserialize() throws IOException {
final AvroSerializer decoder = new AvroSerializer(false, parser, ENCODER_FACTORY,
DECODER_FACTORY);
final PlayingCard playingCard = new PlayingCard(true, 10, PlayingCardSuit.DIAMONDS);
final Schema playingClassSchema = PlayingCard.getClassSchema();
final SchemaProperties registered = new SchemaProperties(MOCK_GUID, SchemaFormat.AVRO,
playingClassSchema.getFullName(), playingClassSchema.toString().getBytes(StandardCharsets.UTF_8));
final String playingClassSchema = PlayingCard.getClassSchema().toString();
final SchemaProperties registered = new SchemaProperties(MOCK_GUID, SchemaFormat.AVRO);
final SchemaRegistrySchema registrySchema = new SchemaRegistrySchema(registered, playingClassSchema);
final SchemaRegistryAvroSerializer serializer = new SchemaRegistryAvroSerializer(client, decoder,
MOCK_SCHEMA_GROUP, true);

assertNotNull(registered.getSchema());
assertNotNull(registrySchema.getProperties());

when(client.getSchema(MOCK_GUID)).thenReturn(Mono.just(registered));
when(client.getSchema(MOCK_GUID)).thenReturn(Mono.just(registrySchema));

StepVerifier.create(client.getSchema(MOCK_GUID))
.assertNext(properties -> assertEquals(MOCK_GUID, properties.getSchemaId()))
.assertNext(schema -> {
assertNotNull(schema.getProperties());

assertEquals(playingClassSchema, schema.getSchemaDefinition());
assertEquals(MOCK_GUID, schema.getProperties().getSchemaId());
})
.verifyComplete();

final byte[] serializedPayload = getPayload(playingCard);
Expand Down
46 changes: 26 additions & 20 deletions sdk/schemaregistry/azure-data-schemaregistry/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,23 +52,23 @@ You will also need to [register a new AAD application][register_aad_app] and [gr
Set the values of the client ID, tenant ID, and client secret of the AAD application as environment variables: AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_CLIENT_SECRET.

##### Async client
<!-- embedme ./src/samples/java/com/azure/data/schemaregistry/ReadmeSamples.java#L24-L29 -->
<!-- embedme ./src/samples/java/com/azure/data/schemaregistry/ReadmeSamples.java#L25-L30 -->
```java
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

SchemaRegistryAsyncClient schemaRegistryAsyncClient = new SchemaRegistryClientBuilder()
.endpoint("{schema-registry-endpoint")
.fullyQualifiedNamespace("{schema-registry-endpoint")
.credential(tokenCredential)
.buildAsyncClient();
```

##### Sync client
<!-- embedme ./src/samples/java/com/azure//data/schemaregistry/ReadmeSamples.java#L36-L41 -->
<!-- embedme ./src/samples/java/com/azure/data/schemaregistry/ReadmeSamples.java#L37-L42 -->
```java
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClientBuilder()
.endpoint("{schema-registry-endpoint")
.fullyQualifiedNamespace("{schema-registry-endpoint")
.credential(tokenCredential)
.buildClient();
```
Expand All @@ -91,17 +91,18 @@ SchemaRegistry operations. Those exposed properties are `Content` and `Id`.
## Examples

* [Register a schema](#register-a-schema)
* [Retrieve a schema ID](#retrieve-a-schema-id)
* [Retrieve a schema's properties](#retrieve-a-schemas-properties)
* [Retrieve a schema](#retrieve-a-schema)

### Register a schema
Register a schema to be stored in the Azure Schema Registry.
<!-- embedme ./src/samples/java/com/azure//data/schemaregistry/ReadmeSamples.java#L48-L70 -->

<!-- embedme ./src/samples/java/com/azure/data/schemaregistry/ReadmeSamples.java#L49-L72 -->
```java
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClientBuilder()
.endpoint("{schema-registry-endpoint")
.fullyQualifiedNamespace("{schema-registry-endpoint")
.credential(tokenCredential)
.buildClient();

Expand All @@ -119,34 +120,38 @@ String schemaContent = "{\n"
+ " ]\n"
+ "}";
SchemaProperties schemaProperties = schemaRegistryClient.registerSchema("{schema-group}", "{schema-name}",
schemaContent, SerializationFormat.AVRO);
schemaContent, SchemaFormat.AVRO);

System.out.println("Registered schema: " + schemaProperties.getSchemaId());
```

### Retrieve a schema ID
Retrieve a previously registered schema ID from the Azure Schema Registry.
### Retrieve a schema's properties
Retrieve a previously registered schema's properties from the Azure Schema Registry.

<!-- embedme ./src/samples/java/com/azure//data/schemaregistry/ReadmeSamples.java#L77-L85 -->
<!-- embedme ./src/samples/java/com/azure/data/schemaregistry/ReadmeSamples.java#L79-L89 -->
```java
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClientBuilder()
.endpoint("{schema-registry-endpoint")
.fullyQualifiedNamespace("{schema-registry-endpoint")
.credential(tokenCredential)
.buildClient();

SchemaProperties schemaProperties = schemaRegistryClient.getSchema("{schema-id}");
System.out.println("Retrieved schema: " + schemaProperties.getSchemaName());
SchemaRegistrySchema schema = schemaRegistryClient.getSchema("{schema-id}");

System.out.printf("Retrieved schema: '%s'. Contents: %s%n", schema.getProperties().getSchemaId(),
schema.getSchemaDefinition());
```

### Retrieve a schema
Retrieve a previously registered schema's content from the Azure Schema Registry.
Retrieve a previously registered schema's content and properties from the Azure Schema Registry.

<!-- embedme ./src/samples/java/com/azure//data/schemaregistry/ReadmeSamples.java#L92-L114 -->
<!-- embedme ./src/samples/java/com/azure/data/schemaregistry/ReadmeSamples.java#L96-L119 -->
```java
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClientBuilder()
.endpoint("{schema-registry-endpoint")
.fullyQualifiedNamespace("{schema-registry-endpoint")
.credential(tokenCredential)
.buildClient();

Expand All @@ -163,9 +168,10 @@ String schemaContent = "{\n"
+ " }\n"
+ " ]\n"
+ "}";
String schemaId = schemaRegistryClient.getSchemaId("{schema-group}", "{schema-name}",
schemaContent, SerializationFormat.AVRO);
System.out.println("Retreived schema id: " + schemaId);
SchemaProperties properties = schemaRegistryClient.getSchemaProperties("{schema-group}", "{schema-name}",
schemaContent, SchemaFormat.AVRO);

System.out.println("Retrieved schema id: " + properties.getSchemaId());
```

## Troubleshooting
Expand Down
Loading