Skip to content

Commit

Permalink
Add javadoc and tests for functional ReceivedMessage class
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed Jun 1, 2016
1 parent 6df07ba commit e78f782
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* a key {@code iana.org/language_tag} and value {@code en} could be added to messages to mark them
* as readable by an English-speaking subscriber.
*
* <p>To be published a message must have a non-empty payload, or at least one attribute.
* <p>To be published, a message must have a non-empty payload, or at least one attribute.
*
* @see <a href="https://cloud.google.com/pubsub/overview#data_model">Pub/Sub Data Model</a>
*/
Expand All @@ -64,11 +64,11 @@ private static final class InternalByteArray extends ByteArray {

private static final long serialVersionUID = -3330181485911805428L;

protected InternalByteArray(ByteString byteString) {
InternalByteArray(ByteString byteString) {
super(byteString);
}

protected InternalByteArray(ByteArray byteArray) {
InternalByteArray(ByteArray byteArray) {
super(byteArray);
}

Expand Down Expand Up @@ -244,17 +244,24 @@ public ByteArray payload() {
return payload;
}

final boolean baseEquals(Message message) {
return Objects.equals(id, message.id)
&& Objects.equals(payload, message.payload)
&& Objects.equals(attributes, message.attributes)
&& Objects.equals(publishTime, message.publishTime);
}

@Override
public boolean equals(Object obj) {
return obj == this
|| obj != null
&& obj.getClass().equals(Message.class)
&& Objects.equals(toPb(), ((Message) obj).toPb());
&& baseEquals((Message) obj);
}

@Override
public int hashCode() {
return Objects.hash(serialVersionUID, id, payload, attributes, publishTime);
return Objects.hash(id, payload, attributes, publishTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,16 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class ReceivedMessage extends Message {
/**
* A Google Cloud Pub/Sub received message. A received message has all the information in
* {@link Message} as well as the acknowledge id. The ack id can be used to acknowledge the received
* message.
*
* <p>{@code ReceivedMessage} also adds a layer of service-related functionality over
* {@link Message} that help manage received messages (see {@link #ack()}, {@link #nack()} and
* {@link #modifyAckDeadline(int, TimeUnit)}).
*/
public final class ReceivedMessage extends Message {

private static final long serialVersionUID = -4178477763916251733L;

Expand Down Expand Up @@ -124,48 +133,109 @@ public int hashCode() {

@Override
public boolean equals(Object obj) {
if (this == obj) {
if (obj == this) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
if (obj == null || !obj.getClass().equals(ReceivedMessage.class)) {
return false;
}
ReceivedMessage other = (ReceivedMessage) obj;
return Objects.equals(toPb(), other.toPb()) && Objects.equals(options, other.options);
return baseEquals(other) && Objects.equals(options, other.options);
}

public PubSub pubSub() {
/**
* Returns the received message's {@code PubSub} object used to issue requests.
*/
public PubSub pubsub() {
return pubsub;
}

/**
* Returns the name of the subscription this message was received from.
*/
public String subscription() {
return subscription;
}

/**
* Returns the acknowledge id of the message. The ack id can be used to acknowledge the received
* message.
*/
public String ackId() {
return ackId;
}

/**
* Acknowledges the current message.
*
* @throws PubSubException upon failure, or if the subscription was not found
*/
public void ack() {
pubsub.ack(subscription, ackId);
}

/**
* Sends a request to acknowledge the current message. The method returns a {@code Future} object
* that can be used to wait for the acknowledge operation to be completed.
*
* @throws PubSubException upon failure, or if the subscription was not found
*/
public Future<Void> ackAsync() {
return pubsub.ackAsync(subscription, ackId);
}

/**
* "Nacks" the current message. This method corresponds to calling
* {@link #modifyAckDeadline(int, TimeUnit)} with a deadline of 0.
*
* @throws PubSubException upon failure, or if the subscription was not found
*/
public void nack() {
pubsub.nack(subscription, ackId);
}

/**
* Sends a request to "nack" the current message. This method corresponds to calling
* {@link #modifyAckDeadlineAsync(int, TimeUnit)} with a deadline of 0. The method returns a
* {@code Future} object that can be used to wait for the "nack" operation to be completed.
*
* @throws PubSubException upon failure, or if the subscription was not found
*/
public Future<Void> nackAsync() {
return pubsub.nackAsync(subscription, ackId);
}

/**
* Modifies the acknowledge deadline of the current message. {@code deadline} must be &gt;= 0 and
* is the new deadline with respect to the time the modify request was received by the Pub/Sub
* service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS},
* the new ack deadline will expire 10 seconds after the modify request was received by the
* service. Specifying 0 may be used to make the message available for another pull request
* (corresponds to calling {@link #nack()}.
*
* @param deadline the new deadline, relative to the time the modify request is received by the
* Pub/Sub service
* @param unit {@code deadline} time unit
* @throws PubSubException upon failure, or if the subscription was not found
*/
public void modifyAckDeadline(int deadline, TimeUnit unit) {
pubsub.modifyAckDeadline(subscription, deadline, unit, ackId);
}

/**
* Sends a request to modify the acknowledge deadline of the given messages. {@code deadline}
* must be &gt;= 0 and is the new deadline with respect to the time the modify request was
* received by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
* {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request
* was received by the service. Specifying 0 may be used to make the message available for another
* pull request (corresponds to calling {@link #nackAsync()}. The method returns a {@code Future}
* object that can be used to wait for the modify operation to be completed.
*
* @param deadline the new deadline, relative to the time the modify request is received by the
* Pub/Sub service
* @param unit {@code deadline} time unit
* @throws PubSubException upon failure, or if the subscription was not found
*/
public Future<Void> modifyAckDeadlineAsync(int deadline, TimeUnit unit) {
return pubsub.modifyAckDeadlineAsync(subscription, deadline, unit, ackId);
}
Expand All @@ -175,10 +245,10 @@ private void readObject(ObjectInputStream input) throws IOException, ClassNotFou
this.pubsub = options.service();
}

static ReceivedMessage fromPb(PubSub storage, String subscription,
static ReceivedMessage fromPb(PubSub pubsub, String subscription,
com.google.pubsub.v1.ReceivedMessage msgPb) {
Message message = fromPb(msgPb.getMessage());
String ackId = msgPb.getAckId();
return new Builder(subscription, ackId, storage, new BuilderImpl(message)).build();
return new Builder(subscription, ackId, pubsub, new BuilderImpl(message)).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub;

import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.createStrictMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;

import com.google.api.client.util.Charsets;
import com.google.cloud.ByteArray;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;

import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Test;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class ReceivedMessageTest {

private static final String SUBSCRIPTION = "subscription";
private static final String ACK_ID = "ackId";
private static final String MESSAGE_ID = "messageId";
private static final String PAYLOAD_STRING = "payload";
private static final ByteArray PAYLOAD =
ByteArray.copyFrom("payload".getBytes(StandardCharsets.UTF_8));
private static final Map<String, String> ATTRIBUTES =
ImmutableMap.of("key1", "value1", "key2", "value2");
private static final Long PUBLISH_TIME = 42L;
private static final Message MESSAGE = Message.builder(PAYLOAD)
.id(MESSAGE_ID)
.attributes(ATTRIBUTES)
.publishTime(PUBLISH_TIME)
.build();
private static final com.google.pubsub.v1.ReceivedMessage RECEIVED_MESSAGE_PB =
com.google.pubsub.v1.ReceivedMessage.newBuilder()
.setMessage(MESSAGE.toPb())
.setAckId(ACK_ID)
.build();

private final PubSub serviceMockReturnsOptions = createStrictMock(PubSub.class);
private final PubSubOptions mockOptions = createMock(PubSubOptions.class);
private PubSub pubsub;
private ReceivedMessage expectedMessage;
private ReceivedMessage message;

private void initializeExpectedMessage(int optionsCalls) {
expect(serviceMockReturnsOptions.options()).andReturn(mockOptions).times(optionsCalls);
replay(serviceMockReturnsOptions);
pubsub = createStrictMock(PubSub.class);
expectedMessage =
ReceivedMessage.fromPb(serviceMockReturnsOptions, SUBSCRIPTION, RECEIVED_MESSAGE_PB);
}

private void initializeMessage() {
message = ReceivedMessage.fromPb(pubsub, SUBSCRIPTION, RECEIVED_MESSAGE_PB);
}

@After
public void tearDown() throws Exception {
verify(pubsub, serviceMockReturnsOptions);
}

@Test
public void testBuilder() {
initializeExpectedMessage(3);
replay(pubsub);
Map<String, String> attributes = ImmutableMap.of("newKey1", "newVal1");
ReceivedMessage builtMessage = expectedMessage.toBuilder()
.payload("newPayload")
.id("newMessageId")
.attributes(attributes)
.publishTime(PUBLISH_TIME + 1)
.build();
assertSame(serviceMockReturnsOptions, builtMessage.pubsub());
assertEquals(SUBSCRIPTION, builtMessage.subscription());
assertEquals(ACK_ID, builtMessage.ackId());
assertEquals("newMessageId", builtMessage.id());
assertArrayEquals("newPayload".getBytes(Charsets.UTF_8), builtMessage.payload().toByteArray());
assertEquals("newPayload", builtMessage.payloadAsString());
assertEquals(attributes, builtMessage.attributes());
assertEquals(PUBLISH_TIME + 1, (long) builtMessage.publishTime());
builtMessage = builtMessage.toBuilder()
.payload(PAYLOAD)
.id(MESSAGE_ID)
.clearAttributes()
.addAttribute("key1", "value1")
.addAttribute("key2", "value2")
.publishTime(PUBLISH_TIME)
.build();
assertSame(serviceMockReturnsOptions, builtMessage.pubsub());
assertEquals(MESSAGE_ID, builtMessage.id());
assertEquals(PAYLOAD, builtMessage.payload());
assertEquals(PAYLOAD_STRING, builtMessage.payloadAsString());
assertEquals(ATTRIBUTES, builtMessage.attributes());
assertEquals(PUBLISH_TIME, builtMessage.publishTime());
compareReceivedMessage(expectedMessage, builtMessage);
}

@Test
public void testToBuilder() {
initializeExpectedMessage(2);
replay(pubsub);
compareReceivedMessage(expectedMessage, expectedMessage.toBuilder().build());
}

@Test
public void testAck() {
initializeExpectedMessage(1);
expect(pubsub.options()).andReturn(mockOptions);
pubsub.ack(SUBSCRIPTION, ACK_ID);
EasyMock.expectLastCall();
replay(pubsub);
initializeMessage();
message.ack();
}

@Test
public void testAckAsync() throws ExecutionException, InterruptedException {
initializeExpectedMessage(1);
expect(pubsub.options()).andReturn(mockOptions);
expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID)).andReturn(Futures.<Void>immediateFuture(null));
EasyMock.expectLastCall();
replay(pubsub);
initializeMessage();
assertNull(message.ackAsync().get());
}

@Test
public void testModifyAckDeadline() {
initializeExpectedMessage(1);
expect(pubsub.options()).andReturn(mockOptions);
pubsub.modifyAckDeadline(SUBSCRIPTION, 10, TimeUnit.SECONDS, ACK_ID);
EasyMock.expectLastCall();
replay(pubsub);
initializeMessage();
message.modifyAckDeadline(10, TimeUnit.SECONDS);
}

@Test
public void testModifyAckDeadlineAsync() throws ExecutionException, InterruptedException {
initializeExpectedMessage(1);
expect(pubsub.options()).andReturn(mockOptions);
expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION, 10, TimeUnit.SECONDS, ACK_ID))
.andReturn(Futures.<Void>immediateFuture(null));
EasyMock.expectLastCall();
replay(pubsub);
initializeMessage();
assertNull(message.modifyAckDeadlineAsync(10, TimeUnit.SECONDS).get());
}

private void compareReceivedMessage(ReceivedMessage expected, ReceivedMessage value) {
assertEquals(expected, value);
assertEquals(expected.id(), value.id());
assertEquals(expected.payload(), value.payload());
assertEquals(expected.payloadAsString(), value.payloadAsString());
assertEquals(expected.attributes(), value.attributes());
assertEquals(expected.publishTime(), value.publishTime());
assertEquals(expected.ackId(), value.ackId());
assertEquals(expected.subscription(), value.subscription());
assertEquals(expected.hashCode(), value.hashCode());
}
}

0 comments on commit e78f782

Please sign in to comment.