Skip to content
This repository has been archived by the owner on Dec 4, 2023. It is now read-only.

[SDK][Bot-Azure] Add AzureQueueStorage component #1033

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions libraries/bot-azure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@
<artifactId>bot-dialogs</artifactId>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-queue</artifactId>
<version>12.8.0</version>
</dependency>

<dependency>
<groupId>com.microsoft.bot</groupId>
<artifactId>bot-builder</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.microsoft.bot.azure.queues;

import com.azure.storage.queue.QueueClient;
import com.azure.storage.queue.QueueClientBuilder;
import com.azure.storage.queue.models.QueueStorageException;
import com.azure.storage.queue.models.SendMessageResult;
import com.microsoft.bot.builder.QueueStorage;
import com.microsoft.bot.restclient.serializer.JacksonAdapter;
import com.microsoft.bot.schema.Activity;
import org.apache.commons.lang3.StringUtils;

import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Base64;
import java.util.concurrent.CompletableFuture;

/**
* Service used to add messages to an Azure.Storage.Queues.
*/
public class AzureQueueStorage extends QueueStorage {
private Boolean createQueueIfNotExists = true;
private final QueueClient queueClient;

/**
* Initializes a new instance of the {@link AzureQueueStorage} class.
* @param queuesStorageConnectionString Azure Storage connection string.
* @param queueName Name of the storage queue where entities will be queued.
*/
public AzureQueueStorage(String queuesStorageConnectionString, String queueName) {
if (StringUtils.isBlank(queuesStorageConnectionString)) {
throw new IllegalArgumentException("queuesStorageConnectionString is required.");
}

if (StringUtils.isBlank(queueName)) {
throw new IllegalArgumentException("queueName is required.");
}

queueClient = new QueueClientBuilder()
.connectionString(queuesStorageConnectionString)
.queueName(queueName)
.buildClient();
}

/**
* Queue an Activity to an Azure.Storage.Queues.QueueClient.
* The visibility timeout specifies how long the message should be invisible
* to Dequeue and Peek operations. The message content must be a UTF-8 encoded string that is up to 64KB in size.
* @param activity This is expected to be an {@link Activity} retrieved from a call to
* activity.GetConversationReference().GetContinuationActivity().
* This enables restarting the conversation using BotAdapter.ContinueConversationAsync.
* @param visibilityTimeout Default value of 0. Cannot be larger than 7 days.
* @param timeToLive Specifies the time-to-live interval for the message.
* @return {@link SendMessageResult} as a Json string, from the QueueClient SendMessageAsync operation.
*/
@Override
public CompletableFuture<String> queueActivity(Activity activity,
@Nullable Duration visibilityTimeout,
@Nullable Duration timeToLive) {
return CompletableFuture.supplyAsync(() -> {
if (createQueueIfNotExists) {
// This is an optimization flag to check if the container creation call has been made.
// It is okay if this is called more than once.
createQueueIfNotExists = false;
try {
queueClient.create();
} catch (QueueStorageException e) {
e.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it acceptable for the code to continue at this point or should we return here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @LeeParrishMSFT, we will review this and apply the corresponding fix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @LeeParrishMSFT, we improved this behaviour to throw a RuntimeException like the CosmosDbPartitionedStorage does.

}
}

try {
JacksonAdapter jacksonAdapter = new JacksonAdapter();
String serializedActivity = jacksonAdapter.serialize(activity);
byte[] encodedBytes = serializedActivity.getBytes(StandardCharsets.UTF_8);
String encodedString = Base64.getEncoder().encodeToString(encodedBytes);

SendMessageResult receipt = queueClient.sendMessage(encodedString);
return jacksonAdapter.serialize(receipt);
} catch (IOException e) {
e.printStackTrace();
}
return null;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for
// license information.

/**
* This package contains the classes for bot-integration-core.
*/
package com.microsoft.bot.azure.queues;
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.microsoft.bot.azure;

import com.azure.storage.queue.QueueClient;
import com.azure.storage.queue.QueueClientBuilder;
import com.azure.storage.queue.models.QueueMessageItem;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.microsoft.bot.azure.queues.AzureQueueStorage;
import com.microsoft.bot.builder.ConversationState;
import com.microsoft.bot.builder.MemoryStorage;
import com.microsoft.bot.builder.QueueStorage;
import com.microsoft.bot.builder.UserState;
import com.microsoft.bot.builder.adapters.TestAdapter;
import com.microsoft.bot.builder.adapters.TestFlow;
import com.microsoft.bot.dialogs.Dialog;
import com.microsoft.bot.dialogs.DialogContext;
import com.microsoft.bot.dialogs.DialogManager;
import com.microsoft.bot.dialogs.DialogTurnResult;
import com.microsoft.bot.schema.Activity;
import com.microsoft.bot.schema.ActivityEventNames;
import com.microsoft.bot.schema.ActivityTypes;
import com.microsoft.bot.schema.ConversationReference;
import org.apache.commons.codec.binary.Base64;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeParseException;
import java.util.Calendar;
import java.util.concurrent.CompletableFuture;

import com.microsoft.bot.restclient.serializer.JacksonAdapter;

public class AzureQueueTests {
private static final Integer DEFAULT_DELAY = 2000;
private static Boolean EMULATOR_IS_RUNNING = false;
private final String connectionString = "UseDevelopmentStorage=true";
private static final String NO_EMULATOR_MESSAGE = "This test requires Azure STORAGE Emulator! Go to https://docs.microsoft.com/azure/storage/common/storage-use-emulator to download and install.";

@BeforeClass
public static void allTestsInit() throws IOException, InterruptedException {
Process p = Runtime.getRuntime().exec
("cmd /C \"" + System.getenv("ProgramFiles") + " (x86)\\Microsoft SDKs\\Azure\\Storage Emulator\\AzureStorageEmulator.exe\" start");
int result = p.waitFor();
// status = 0: the service was started.
// status = -5: the service is already started. Only one instance of the application
// can be run at the same time.
EMULATOR_IS_RUNNING = result == 0 || result == -5;
}

// These tests require Azure Storage Emulator v5.7
public QueueClient containerInit(String name) {
QueueClient queue = new QueueClientBuilder()
.connectionString(connectionString)
.queueName(name)
.buildClient();
queue.create();
queue.clearMessages();
return queue;
}

@Test
public void continueConversationLaterTests() {
assertEmulator();
String queueName = "continueconversationlatertests";
QueueClient queue = containerInit(queueName);
ConversationReference cr = TestAdapter.createConversationReference("ContinueConversationLaterTests", "User1", "Bot");
TestAdapter adapter = new TestAdapter(cr)
.useStorage(new MemoryStorage())
.useBotState(new ConversationState(new MemoryStorage()), new UserState(new MemoryStorage()));

AzureQueueStorage queueStorage = new AzureQueueStorage(connectionString, queueName);

Calendar cal = Calendar.getInstance();
cal.add(Calendar.SECOND, 2);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

ContinueConversationLater ccl = new ContinueConversationLater() {
{
setDate(sdf.format(cal.getTime()));
setValue("foo");
}
};
DialogManager dm = new DialogManager(ccl, "DialogStateProperty");
dm.getInitialTurnState().replace("QueueStorage", queueStorage);

new TestFlow(adapter, turnContext -> CompletableFuture.runAsync(() -> dm.onTurn(turnContext)))
.send("hi")
.startTest().join();

try {
Thread.sleep(DEFAULT_DELAY);
} catch (InterruptedException e) {
e.printStackTrace();
Assert.fail();
}

QueueMessageItem messages = queue.receiveMessage();
JacksonAdapter jacksonAdapter = new JacksonAdapter();
String messageJson = new String(Base64.decodeBase64(messages.getMessageText()));
Activity activity = null;

try {
activity = jacksonAdapter.deserialize(messageJson, Activity.class);
} catch (IOException e) {
e.printStackTrace();
Assert.fail();
}

Assert.assertTrue(activity.isType(ActivityTypes.EVENT));
Assert.assertEquals(ActivityEventNames.CONTINUE_CONVERSATION, activity.getName());
Assert.assertEquals("foo", activity.getValue());
Assert.assertNotNull(activity.getRelatesTo());
ConversationReference cr2 = activity.getConversationReference();
cr.setActivityId(null);
cr2.setActivityId(null);

try {
Assert.assertEquals(jacksonAdapter.serialize(cr), jacksonAdapter.serialize(cr2));
} catch (IOException e) {
e.printStackTrace();
Assert.fail();
}
}

private void assertEmulator() {
if (!EMULATOR_IS_RUNNING) {
Assert.fail(NO_EMULATOR_MESSAGE);
}
}

private class ContinueConversationLater extends Dialog {
@JsonProperty("disabled")
private Boolean disabled = false;

@JsonProperty("date")
private String date;

@JsonProperty("value")
private String value;

/**
* Initializes a new instance of the Dialog class.
*/
public ContinueConversationLater() {
super(ContinueConversationLater.class.getName());
}

@Override
public CompletableFuture<DialogTurnResult> beginDialog(DialogContext dc, Object options) {
if (this.disabled) {
return dc.endDialog();
}

String dateString = this.date;
LocalDateTime date = null;
try {
date = LocalDateTime.parse(dateString);
} catch (DateTimeParseException ex) {
throw new IllegalArgumentException("Date is invalid");
}

ZonedDateTime zonedDate = date.atZone(ZoneOffset.UTC);
ZonedDateTime now = LocalDateTime.now().atZone(ZoneOffset.UTC);
if (zonedDate.isBefore(now)) {
throw new IllegalArgumentException("Date must be in the future");
}

// create ContinuationActivity from the conversation reference.
Activity activity = dc.getContext().getActivity().getConversationReference().getContinuationActivity();
activity.setValue(this.value);

Duration visibility = Duration.between(zonedDate, now);
Duration ttl = visibility.plusMinutes(2);

QueueStorage queueStorage = dc.getContext().getTurnState().get("QueueStorage");
if (queueStorage == null) {
throw new NullPointerException("Unable to locate QueueStorage in HostContext");
}
return queueStorage.queueActivity(activity, visibility, ttl).thenCompose(receipt -> {
// return the receipt as the result
return dc.endDialog(receipt);
});
}

/**
* Gets an optional expression which if is true will disable this action.
* "user.age > 18".
* @return A boolean expression.
*/
public Boolean getDisabled() {
return disabled;
}

/**
* Sets an optional expression which if is true will disable this action.
* "user.age > 18".
* @param withDisabled A boolean expression.
*/
public void setDisabled(Boolean withDisabled) {
this.disabled = withDisabled;
}

/**
* Gets the expression which resolves to the date/time to continue the conversation.
* @return Date/time string in ISO 8601 format to continue conversation.
*/
public String getDate() {
return date;
}

/**
* Sets the expression which resolves to the date/time to continue the conversation.
* @param withDate Date/time string in ISO 8601 format to continue conversation.
*/
public void setDate(String withDate) {
this.date = withDate;
}

/**
* Gets an optional value to use for EventActivity.Value.
* @return The value to use for the EventActivity.Value payload.
*/
public String getValue() {
return value;
}

/**
* Sets an optional value to use for EventActivity.Value.
* @param withValue The value to use for the EventActivity.Value payload.
*/
public void setValue(String withValue) {
this.value = withValue;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.microsoft.bot.builder;

import com.microsoft.bot.schema.Activity;

import javax.annotation.Nullable;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;

/**
* A base class for enqueueing an Activity for later processing.
*/
public abstract class QueueStorage {

/**
* Enqueues an Activity for later processing. The visibility timeout specifies how long the message
* should be invisible to Dequeue and Peek operations.
* @param activity The {@link Activity} to be queued for later processing.
* @param visibilityTimeout Visibility timeout. Optional with a default value of 0. Cannot be larger than 7 days.
* @param timeToLive Specifies the time-to-live interval for the message.
* @return A result string.
*/
public abstract CompletableFuture<String> queueActivity(Activity activity,
@Nullable Duration visibilityTimeout,
@Nullable Duration timeToLive);
}
Loading