Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrated queues to the new generator #18795

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
import com.azure.storage.common.implementation.SasImplUtils;
import com.azure.storage.common.implementation.StorageImplUtils;
import com.azure.storage.queue.implementation.AzureQueueStorageImpl;
import com.azure.storage.queue.implementation.models.MessageIdUpdateHeaders;
import com.azure.storage.queue.implementation.models.MessageIdsUpdateHeaders;
import com.azure.storage.queue.implementation.models.MessageIdsUpdateResponse;
import com.azure.storage.queue.implementation.models.QueueGetPropertiesHeaders;
import com.azure.storage.queue.implementation.models.QueueMessage;
import com.azure.storage.queue.implementation.models.QueuesGetPropertiesHeaders;
import com.azure.storage.queue.implementation.models.QueuesGetPropertiesResponse;
import com.azure.storage.queue.implementation.util.QueueSasImplUtil;
import com.azure.storage.queue.models.PeekedMessageItem;
Expand Down Expand Up @@ -169,7 +169,7 @@ public Mono<Response<Void>> createWithResponse(Map<String, String> metadata) {

Mono<Response<Void>> createWithResponse(Map<String, String> metadata, Context context) {
context = context == null ? Context.NONE : context;
return client.queues().createWithRestResponseAsync(queueName, null, metadata, null,
return client.getQueues().createWithResponseAsync(queueName, null, metadata, null,
context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE))
.map(response -> new SimpleResponse<>(response, null));
}
Expand Down Expand Up @@ -224,7 +224,7 @@ public Mono<Response<Void>> deleteWithResponse() {

Mono<Response<Void>> deleteWithResponse(Context context) {
context = context == null ? Context.NONE : context;
return client.queues().deleteWithRestResponseAsync(queueName,
return client.getQueues().deleteWithResponseAsync(queueName, null, null,
context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE))
.map(response -> new SimpleResponse<>(response, null));
}
Expand Down Expand Up @@ -281,7 +281,7 @@ public Mono<Response<QueueProperties>> getPropertiesWithResponse() {

Mono<Response<QueueProperties>> getPropertiesWithResponse(Context context) {
context = context == null ? Context.NONE : context;
return client.queues().getPropertiesWithRestResponseAsync(queueName,
return client.getQueues().getPropertiesWithResponseAsync(queueName, null, null,
context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE))
.map(this::getQueuePropertiesResponse);
}
Expand Down Expand Up @@ -350,8 +350,8 @@ public Mono<Response<Void>> setMetadataWithResponse(Map<String, String> metadata

Mono<Response<Void>> setMetadataWithResponse(Map<String, String> metadata, Context context) {
context = context == null ? Context.NONE : context;
return client.queues()
.setMetadataWithRestResponseAsync(queueName, null, metadata, null,
return client.getQueues()
.setMetadataWithResponseAsync(queueName, null, metadata, null,
context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE))
.map(response -> new SimpleResponse<>(response, null));
}
Expand All @@ -375,8 +375,8 @@ Mono<Response<Void>> setMetadataWithResponse(Map<String, String> metadata, Conte
public PagedFlux<QueueSignedIdentifier> getAccessPolicy() {
try {
Function<String, Mono<PagedResponse<QueueSignedIdentifier>>> retriever =
marker -> this.client.queues()
.getAccessPolicyWithRestResponseAsync(queueName, Context.NONE)
marker -> this.client.getQueues()
.getAccessPolicyWithResponseAsync(queueName, null, null, Context.NONE)
.map(response -> new PagedResponseBase<>(response.getRequest(),
response.getStatusCode(),
response.getHeaders(),
Expand Down Expand Up @@ -466,8 +466,8 @@ OffsetDateTime.now will only give back milliseconds (more precise fields are zer
permissions != null ? permissions.spliterator() : Spliterators.emptySpliterator(), false)
.collect(Collectors.toList());

return client.queues()
.setAccessPolicyWithRestResponseAsync(queueName, permissionsList, null, null,
return client.getQueues()
.setAccessPolicyWithResponseAsync(queueName, null, null, permissionsList,
context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE))
.map(response -> new SimpleResponse<>(response, null));
}
Expand Down Expand Up @@ -522,7 +522,7 @@ public Mono<Response<Void>> clearMessagesWithResponse() {

Mono<Response<Void>> clearMessagesWithResponse(Context context) {
context = context == null ? Context.NONE : context;
return client.messages().clearWithRestResponseAsync(queueName,
return client.getMessages().clearWithResponseAsync(queueName, null, null,
context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE))
.map(response -> new SimpleResponse<>(response, null));
}
Expand Down Expand Up @@ -599,8 +599,8 @@ Mono<Response<SendMessageResult>> sendMessageWithResponse(String messageText, Du
QueueMessage message = new QueueMessage().setMessageText(messageText);
context = context == null ? Context.NONE : context;

return client.messages()
.enqueueWithRestResponseAsync(queueName, message, visibilityTimeoutInSeconds, timeToLiveInSeconds,
return client.getMessages()
.enqueueWithResponseAsync(queueName, message, visibilityTimeoutInSeconds, timeToLiveInSeconds,
null, null,
context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE))
.map(response -> new SimpleResponse<>(response, response.getValue().get(0)));
Expand Down Expand Up @@ -701,8 +701,8 @@ PagedFlux<QueueMessageItem> receiveMessagesWithOptionalTimeout(Integer maxMessag
Duration timeout, Context context) {
Integer visibilityTimeoutInSeconds = (visibilityTimeout == null) ? null : (int) visibilityTimeout.getSeconds();
Function<String, Mono<PagedResponse<QueueMessageItem>>> retriever =
marker -> StorageImplUtils.applyOptionalTimeout(this.client.messages()
.dequeueWithRestResponseAsync(queueName, maxMessages, visibilityTimeoutInSeconds,
marker -> StorageImplUtils.applyOptionalTimeout(this.client.getMessages()
.dequeueWithResponseAsync(queueName, maxMessages, visibilityTimeoutInSeconds,
null, null, context), timeout)
.map(response -> new PagedResponseBase<>(response.getRequest(),
response.getStatusCode(),
Expand Down Expand Up @@ -774,8 +774,8 @@ public PagedFlux<PeekedMessageItem> peekMessages(Integer maxMessages) {
PagedFlux<PeekedMessageItem> peekMessagesWithOptionalTimeout(Integer maxMessages, Duration timeout,
Context context) {
Function<String, Mono<PagedResponse<PeekedMessageItem>>> retriever =
marker -> StorageImplUtils.applyOptionalTimeout(this.client.messages()
.peekWithRestResponseAsync(queueName, maxMessages, null, null, context), timeout)
marker -> StorageImplUtils.applyOptionalTimeout(this.client.getMessages()
.peekWithResponseAsync(queueName, maxMessages, null, null, context), timeout)
.map(response -> new PagedResponseBase<>(response.getRequest(),
response.getStatusCode(),
response.getHeaders(),
Expand Down Expand Up @@ -859,8 +859,8 @@ Mono<Response<UpdateMessageResult>> updateMessageWithResponse(String messageId,
QueueMessage message = messageText == null ? null : new QueueMessage().setMessageText(messageText);
context = context == null ? Context.NONE : context;
visibilityTimeout = visibilityTimeout == null ? Duration.ZERO : visibilityTimeout;
return client.messageIds().updateWithRestResponseAsync(queueName, messageId, popReceipt,
(int) visibilityTimeout.getSeconds(), message, null, null,
return client.getMessageIds().updateWithResponseAsync(queueName, messageId, popReceipt,
(int) visibilityTimeout.getSeconds(), null, null, message,
context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE))
.map(this::getUpdatedMessageResponse);
}
Expand Down Expand Up @@ -922,7 +922,7 @@ public Mono<Response<Void>> deleteMessageWithResponse(String messageId, String p

Mono<Response<Void>> deleteMessageWithResponse(String messageId, String popReceipt, Context context) {
context = context == null ? Context.NONE : context;
return client.messageIds().deleteWithRestResponseAsync(queueName, messageId, popReceipt,
return client.getMessageIds().deleteWithResponseAsync(queueName, messageId, popReceipt, null, null,
context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE))
.map(response -> new SimpleResponse<>(response, null));
}
Expand Down Expand Up @@ -992,9 +992,9 @@ public String generateSas(QueueServiceSasSignatureValues queueServiceSasSignatur
* @return Mapped response
*/
private Response<QueueProperties> getQueuePropertiesResponse(QueuesGetPropertiesResponse response) {
QueueGetPropertiesHeaders propertiesHeaders = response.getDeserializedHeaders();
QueueProperties properties = new QueueProperties(propertiesHeaders.getMetadata(),
propertiesHeaders.getApproximateMessagesCount());
QueuesGetPropertiesHeaders propertiesHeaders = response.getDeserializedHeaders();
QueueProperties properties = new QueueProperties(propertiesHeaders.getXMsMeta(),
propertiesHeaders.getXMsApproximateMessagesCount());
return new SimpleResponse<>(response, properties);
}

Expand All @@ -1004,9 +1004,9 @@ private Response<QueueProperties> getQueuePropertiesResponse(QueuesGetProperties
* @return Mapped response
*/
private Response<UpdateMessageResult> getUpdatedMessageResponse(MessageIdsUpdateResponse response) {
MessageIdUpdateHeaders headers = response.getDeserializedHeaders();
UpdateMessageResult updateMessageResult = new UpdateMessageResult(headers.getPopReceipt(),
headers.getTimeNextVisible());
MessageIdsUpdateHeaders headers = response.getDeserializedHeaders();
UpdateMessageResult updateMessageResult = new UpdateMessageResult(headers.getXMsPopreceipt(),
headers.getXMsTimeNextVisible());
return new SimpleResponse<>(response, updateMessageResult);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.azure.storage.common.implementation.connectionstring.StorageConnectionString;
import com.azure.storage.common.implementation.connectionstring.StorageEndpoint;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.queue.implementation.AzureQueueStorageBuilder;
import com.azure.storage.queue.implementation.AzureQueueStorageImpl;
import com.azure.storage.queue.implementation.AzureQueueStorageImplBuilder;
import com.azure.storage.queue.implementation.util.BuilderHelper;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -147,11 +147,11 @@ public QueueAsyncClient buildAsyncClient() {
endpoint, retryOptions, logOptions,
clientOptions, httpClient, perCallPolicies, perRetryPolicies, configuration, logger);

AzureQueueStorageImpl azureQueueStorage = new AzureQueueStorageBuilder()
AzureQueueStorageImpl azureQueueStorage = new AzureQueueStorageImplBuilder()
.url(endpoint)
.pipeline(pipeline)
.version(serviceVersion.getVersion())
.build();
.buildClient();

return new QueueAsyncClient(azureQueueStorage, queueName, accountName, serviceVersion);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.rest.PagedFlux;
import com.azure.core.http.rest.PagedResponse;
import com.azure.core.http.rest.PagedResponseBase;
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.Context;
Expand All @@ -20,7 +19,6 @@
import com.azure.storage.common.implementation.StorageImplUtils;
import com.azure.storage.common.sas.AccountSasSignatureValues;
import com.azure.storage.queue.implementation.AzureQueueStorageImpl;
import com.azure.storage.queue.implementation.models.ListQueuesIncludeType;
import com.azure.storage.queue.models.QueueCorsRule;
import com.azure.storage.queue.models.QueueItem;
import com.azure.storage.queue.models.QueueServiceProperties;
Expand Down Expand Up @@ -277,24 +275,18 @@ PagedFlux<QueueItem> listQueuesWithOptionalTimeout(String marker, QueuesSegmentO
Context context) {
final String prefix = (options != null) ? options.getPrefix() : null;
final Integer maxResultsPerPage = (options != null) ? options.getMaxResultsPerPage() : null;
final List<ListQueuesIncludeType> include = new ArrayList<>();
final List<String> include = new ArrayList<>();

if (options != null) {
if (options.isIncludeMetadata()) {
include.add(ListQueuesIncludeType.fromString(ListQueuesIncludeType.METADATA.toString()));
include.add("metadata");
}
}

Function<String, Mono<PagedResponse<QueueItem>>> retriever =
nextMarker -> StorageImplUtils.applyOptionalTimeout(this.client.services()
.listQueuesSegmentWithRestResponseAsync(prefix, nextMarker, maxResultsPerPage, include,
null, null, context), timeout)
.map(response -> new PagedResponseBase<>(response.getRequest(),
response.getStatusCode(),
response.getHeaders(),
response.getValue().getQueueItems(),
response.getValue().getNextMarker(),
response.getDeserializedHeaders()));
nextMarker -> StorageImplUtils.applyOptionalTimeout(this.client.getServices()
.listQueuesSegmentSinglePageAsync(prefix, nextMarker, maxResultsPerPage, include,
null, null, context), timeout);

return new PagedFlux<>(() -> retriever.apply(marker), retriever);
}
Expand Down Expand Up @@ -351,7 +343,7 @@ public Mono<Response<QueueServiceProperties>> getPropertiesWithResponse() {

Mono<Response<QueueServiceProperties>> getPropertiesWithResponse(Context context) {
context = context == null ? Context.NONE : context;
return client.services().getPropertiesWithRestResponseAsync(
return client.getServices().getPropertiesWithResponseAsync(null, null,
context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE))
.map(response -> new SimpleResponse<>(response, response.getValue()));
}
Expand Down Expand Up @@ -450,7 +442,7 @@ public Mono<Response<Void>> setPropertiesWithResponse(QueueServiceProperties pro

Mono<Response<Void>> setPropertiesWithResponse(QueueServiceProperties properties, Context context) {
context = context == null ? Context.NONE : context;
return client.services().setPropertiesWithRestResponseAsync(properties,
return client.getServices().setPropertiesWithResponseAsync(properties, null, null,
context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE))
.map(response -> new SimpleResponse<>(response, null));
}
Expand Down Expand Up @@ -503,7 +495,7 @@ public Mono<Response<QueueServiceStatistics>> getStatisticsWithResponse() {

Mono<Response<QueueServiceStatistics>> getStatisticsWithResponse(Context context) {
context = context == null ? Context.NONE : context;
return client.services().getStatisticsWithRestResponseAsync(
return client.getServices().getStatisticsWithResponseAsync(null, null,
context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE))
.map(response -> new SimpleResponse<>(response, response.getValue()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@
import com.azure.core.http.policy.HttpLogOptions;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.Configuration;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.common.implementation.connectionstring.StorageAuthenticationSettings;
import com.azure.storage.common.implementation.connectionstring.StorageConnectionString;
import com.azure.storage.common.implementation.connectionstring.StorageEndpoint;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.queue.implementation.AzureQueueStorageBuilder;
import com.azure.storage.queue.implementation.AzureQueueStorageImpl;

import com.azure.storage.queue.implementation.AzureQueueStorageImplBuilder;
import com.azure.storage.queue.implementation.util.BuilderHelper;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -122,11 +122,11 @@ public QueueServiceAsyncClient buildAsyncClient() {
endpoint, retryOptions, logOptions,
clientOptions, httpClient, perCallPolicies, perRetryPolicies, configuration, logger);

AzureQueueStorageImpl azureQueueStorage = new AzureQueueStorageBuilder()
AzureQueueStorageImpl azureQueueStorage = new AzureQueueStorageImplBuilder()
.url(endpoint)
.pipeline(pipeline)
.version(serviceVersion.getVersion())
.build();
.buildClient();

return new QueueServiceAsyncClient(azureQueueStorage, accountName, serviceVersion);
}
Expand Down
Loading