Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
vam-google committed Mar 21, 2017
2 parents d2c24a9 + 6c76a82 commit ca31a82
Show file tree
Hide file tree
Showing 12 changed files with 240 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static void main(String... args) throws Exception {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
System.out.println("got message: " + message.getData().toStringUtf8());
consumer.accept(AckReply.ACK, null);
consumer.accept(AckReply.ACK);
}
};
Subscriber subscriber = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ public MessageReceiver messageReceiver() {
MessageReceiver receiver = new MessageReceiver() {
public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
if (blockingQueue.offer(message)) {
consumer.accept(AckReply.ACK, null);
consumer.accept(AckReply.ACK);
} else {
consumer.accept(AckReply.NACK, null);
consumer.accept(AckReply.NACK);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
import static com.google.cloud.logging.spi.v2.PagedResponseWrappers.ListMonitoredResourceDescriptorsPagedResponse;

import com.google.api.MonitoredResourceDescriptor;
import com.google.api.gax.bundling.BundlingSettings;
import com.google.api.gax.bundling.RequestBuilder;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.RequestBuilder;
import com.google.api.gax.core.FlowControlSettings;
import com.google.api.gax.core.FlowController.LimitExceededBehavior;
import com.google.api.gax.core.GoogleCredentialsProvider;
import com.google.api.gax.core.RetrySettings;
import com.google.api.gax.grpc.BundledRequestIssuer;
import com.google.api.gax.grpc.BundlingCallSettings;
import com.google.api.gax.grpc.BundlingDescriptor;
import com.google.api.gax.grpc.BatchedRequestIssuer;
import com.google.api.gax.grpc.BatchingCallSettings;
import com.google.api.gax.grpc.BatchingDescriptor;
import com.google.api.gax.grpc.CallContext;
import com.google.api.gax.grpc.ChannelProvider;
import com.google.api.gax.grpc.ClientSettings;
Expand Down Expand Up @@ -114,7 +114,7 @@ public class LoggingSettings extends ClientSettings {
private static final String DEFAULT_GAPIC_VERSION = "";

private final SimpleCallSettings<DeleteLogRequest, Empty> deleteLogSettings;
private final BundlingCallSettings<WriteLogEntriesRequest, WriteLogEntriesResponse>
private final BatchingCallSettings<WriteLogEntriesRequest, WriteLogEntriesResponse>
writeLogEntriesSettings;
private final PagedCallSettings<
ListLogEntriesRequest, ListLogEntriesResponse, ListLogEntriesPagedResponse>
Expand All @@ -132,7 +132,7 @@ public SimpleCallSettings<DeleteLogRequest, Empty> deleteLogSettings() {
}

/** Returns the object with the settings used for calls to writeLogEntries. */
public BundlingCallSettings<WriteLogEntriesRequest, WriteLogEntriesResponse>
public BatchingCallSettings<WriteLogEntriesRequest, WriteLogEntriesResponse>
writeLogEntriesSettings() {
return writeLogEntriesSettings;
}
Expand Down Expand Up @@ -385,11 +385,11 @@ public ListLogsPagedResponse createPagedListResponse(
}
};

private static final BundlingDescriptor<WriteLogEntriesRequest, WriteLogEntriesResponse>
WRITE_LOG_ENTRIES_BUNDLING_DESC =
new BundlingDescriptor<WriteLogEntriesRequest, WriteLogEntriesResponse>() {
private static final BatchingDescriptor<WriteLogEntriesRequest, WriteLogEntriesResponse>
WRITE_LOG_ENTRIES_BATCHING_DESC =
new BatchingDescriptor<WriteLogEntriesRequest, WriteLogEntriesResponse>() {
@Override
public String getBundlePartitionKey(WriteLogEntriesRequest request) {
public String getBatchPartitionKey(WriteLogEntriesRequest request) {
return request.getLogName()
+ "|"
+ request.getResource()
Expand Down Expand Up @@ -421,10 +421,10 @@ public WriteLogEntriesRequest build() {

@Override
public void splitResponse(
WriteLogEntriesResponse bundleResponse,
Collection<? extends BundledRequestIssuer<WriteLogEntriesResponse>> bundle) {
int bundleMessageIndex = 0;
for (BundledRequestIssuer<WriteLogEntriesResponse> responder : bundle) {
WriteLogEntriesResponse batchResponse,
Collection<? extends BatchedRequestIssuer<WriteLogEntriesResponse>> batch) {
int batchMessageIndex = 0;
for (BatchedRequestIssuer<WriteLogEntriesResponse> responder : batch) {
WriteLogEntriesResponse response = WriteLogEntriesResponse.newBuilder().build();
responder.setResponse(response);
}
Expand All @@ -433,8 +433,8 @@ public void splitResponse(
@Override
public void splitException(
Throwable throwable,
Collection<? extends BundledRequestIssuer<WriteLogEntriesResponse>> bundle) {
for (BundledRequestIssuer<WriteLogEntriesResponse> responder : bundle) {
Collection<? extends BatchedRequestIssuer<WriteLogEntriesResponse>> batch) {
for (BatchedRequestIssuer<WriteLogEntriesResponse> responder : batch) {
responder.setException(throwable);
}
}
Expand All @@ -455,7 +455,7 @@ public static class Builder extends ClientSettings.Builder {
private final ImmutableList<UnaryCallSettings.Builder> unaryMethodSettingsBuilders;

private final SimpleCallSettings.Builder<DeleteLogRequest, Empty> deleteLogSettings;
private final BundlingCallSettings.Builder<WriteLogEntriesRequest, WriteLogEntriesResponse>
private final BatchingCallSettings.Builder<WriteLogEntriesRequest, WriteLogEntriesResponse>
writeLogEntriesSettings;
private final PagedCallSettings.Builder<
ListLogEntriesRequest, ListLogEntriesResponse, ListLogEntriesPagedResponse>
Expand Down Expand Up @@ -517,9 +517,9 @@ private Builder() {
deleteLogSettings = SimpleCallSettings.newBuilder(LoggingServiceV2Grpc.METHOD_DELETE_LOG);

writeLogEntriesSettings =
BundlingCallSettings.newBuilder(
LoggingServiceV2Grpc.METHOD_WRITE_LOG_ENTRIES, WRITE_LOG_ENTRIES_BUNDLING_DESC)
.setBundlingSettingsBuilder(BundlingSettings.newBuilder());
BatchingCallSettings.newBuilder(
LoggingServiceV2Grpc.METHOD_WRITE_LOG_ENTRIES, WRITE_LOG_ENTRIES_BATCHING_DESC)
.setBatchingSettingsBuilder(BatchingSettings.newBuilder());

listLogEntriesSettings =
PagedCallSettings.newBuilder(
Expand Down Expand Up @@ -553,7 +553,7 @@ private static Builder createDefault() {

builder
.writeLogEntriesSettings()
.getBundlingSettingsBuilder()
.getBatchingSettingsBuilder()
.setElementCountThreshold(1000)
.setRequestByteThreshold(1048576)
.setDelayThreshold(Duration.millis(50))
Expand Down Expand Up @@ -635,7 +635,7 @@ public SimpleCallSettings.Builder<DeleteLogRequest, Empty> deleteLogSettings() {
}

/** Returns the builder for the settings used for calls to writeLogEntries. */
public BundlingCallSettings.Builder<WriteLogEntriesRequest, WriteLogEntriesResponse>
public BatchingCallSettings.Builder<WriteLogEntriesRequest, WriteLogEntriesResponse>
writeLogEntriesSettings() {
return writeLogEntriesSettings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
/**
* Accepts a reply, sending it to the service.
*
* <p>Both the interface and its method is named after the Java 8's {@code BiConsumer} interface
* <p>Both the interface and its method is named after the Java 8's {@code Consumer} interface
* to make migration to Java 8 and adopting its patterns easier.
*/
public interface AckReplyConsumer {
void accept(AckReply ackReply, Throwable t);
void accept(AckReply ackReply);
}
Original file line number Diff line number Diff line change
Expand Up @@ -285,20 +285,20 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
final AckReplyConsumer consumer =
new AckReplyConsumer() {
@Override
public void accept(AckReply reply, Throwable t) {
if (reply != null) {
response.set(reply);
} else {
response.setException(t);
}
public void accept(AckReply reply) {
response.set(reply);
}
};
Futures.addCallback(response, ackHandler);
executor.submit(
new Runnable() {
@Override
public void run() {
receiver.receiveMessage(message, consumer);
try {
receiver.receiveMessage(message, consumer);
} catch (Exception e) {
response.setException(e);
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private boolean isAlive() {
@Override
public void sendAckOperations(
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions) {
// Send the modify ack deadlines in bundles as not to exceed the max request
// Send the modify ack deadlines in batches as not to exceed the max request
// size.
for (PendingModifyAckDeadline modifyAckDeadline : ackDeadlineExtensions) {
for (List<String> ackIdChunk :
Expand Down
Loading

0 comments on commit ca31a82

Please sign in to comment.