From 7bce752fa4789c3ccd4e87795085c4bd275faf88 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 9 Jul 2019 15:14:16 -0400 Subject: [PATCH] Pubsub ordering keys (#5624) * Merge from master. (#4468) * Bump next snapshot (#4300) * Cloud Bigtable: instanceAdmin sample (#4299) * instanceAdmin sample and tests * comments added * instanceAdminExample changes * renamed variables * changes * Regenerate securitycenter client (#4311) * 3540: Used transformer for shaded plugin (#4305) * Added Query#fromProto to convert protobuf ReadRowsRequest (#4291) * Adding Query.fromProto * Adding Query.fromProto * adding changes as per feedback * updated javadoc as per feedback * reformatted Query & QueryTest.java * Bigquery: corrected equality check on subclasses of StringEnumValue (#4283) * Bigquery: corrected equality check on subclasses of StringEnumValue * update format for FieldTest.java * Fix #4269 update metata url to FQDN (#4278) * 4152: Added checking PAGE_TOKEN from options (#4303) * 3918: Added checking of attributes size for 0. Added unit test (#4304) * Bigtable: Merge bigtable-admin into the bigtable artifact (#4307) * Bigtable: merge bigtable-admin into the bigtable artifact * split exclusion change for future PR * fix admin pathes * fix deprecation warning * fix synth script * fix generated file * temporarily re-add kokoro scripts (and point them to the merged artifact) until the jobs have been removed * Remove dep from examples * fix admin integration tests * revert stray fix (will be added to a separate PR) * fix int tests * don't double format the code * fix up docs * Regenerate PubSub: documentation updates (#4293) * Regenerate monitoring client (#4316) * Regenerate bigtable client (#4318) * 3822: Fixed setDisableGZipContent call (#4314) * 3822: Fixed setDisableGZipContent call * 3822: Changes after review * Regenerate speech client: enable multi-channel features (#4317) * Release v0.77.0 (#4324) * Created enum Region.java to retrieve Regions without doing request. (#4309) * Bump next snapshot (#4325) * Bigtable: fix handling of unset rpc timeouts (#4323) When the rpc timeout is zero, then it should be treated as disabled not actual 0 * Bigtable: Expose single row read settings (#4321) * exposing ReadRowSettings thru BigtableDataSettings * fixed typo error * Regenerate compute client (#4327) * Regenerate speech client (#4328) * Update README version to use bom version (#4322) * BigQuery: Fix NPE for null table schema fields (#4338) * Add failing test for empty table schema * Fix NPE if table schema returns null for fields * Bump gax, grpc & opencensus version (#4336) * Cloud Bigtable: HelloWorld sample updates (#4339) * comments added in HelloWorld and ITHelloWorld * removed typsafe name * separate properties for bigtable.project and bigtable.instance * separate properties for bigtable.project and bigtable.instance (#4346) * Pub/Sub: default values in batch settings comments (#4350) * Pub/Sub: default values in batch settings comments Ref: https://github.com/googleapis/gax-java/blob/master/gax/src/main/java/com/google/api/gax/batching/BatchingSettings.java * Verbose comment * Update maven-surefire-plugin to 3.0.0-M3 to fix Java 8 classloader (#4344) * Update maven-surefire-plugin to 3.0.0-M3 to fix Java 8 classloader * Update failsafe plugin to 3.0.0-M3 too * Specify maven-surefire-plugin version in bigtable-emulator * Bigtable: Fixing a typo for KeyOffSet#geyKey to getKey (#4342) * Regenerate spanner client (#4332) * Spanner: remove junit from compile dependencies (#4334) * Firestore: change timestampsInSnapshots default to true. (#4353) BREAKING CHANGE: The `areTimestampsInSnapshotsEnabled()` setting is now enabled by default so timestamp fields read from a `DocumentSnapshot` will be returned as `Timestamp` objects instead of `Date`. Any code expecting to receive a `Date` object must be updated. * Regenerate clients with updated copyright year (#4382) * Regenerate asset client * Regenerate automl client * Regenerate bigquerydatatransfer client * Regenerate bigquerystorage client * Regenerate bigtable client * Regenerate container client * Regenerate containeranalysis client * Regenerate dataproc client * Regenerate dialogflow client * Regenerate dlp client * Regenerate errorreporting client * Regenerate iamcredentials client * Regenerate iot client * Regenerate kms client * Regenerate language client * Regenerate logging client * Regenerate monitoring client * Regenerate os-login client * Regenerate pubsub client * Regenerate redis client * Regenerate securitycenter client * Regenerate speech client * Regenerate tasks client * Regenerate trace client * Regenerate video-intelligence client * Regenerate websecurityscanner client * Release google-cloud-java v0.78.0 (#4386) * Regenerate compute client (#4359) * Cloud Bigtable: tableAdmin sample (#4313) * tableAdmin sample and tests * comments added * files renamed and other edits * some changes in TableAdminExample and tests * fixed timestamp to base 16 * separate properties for bigtable.project and bigtable.instance * Regenerate scheduler client (#4294) * Regenerate spanner client (#4388) * Bump next snapshot (#4391) * Regenerate asset client (#4395) * Regenerate scheduler client (#4396) * Firestore: Include a trailing /documents on root resource paths (#4352) This is required for v1 and accepted in v1beta1. Port of https://github.com/googleapis/nodejs-firestore/pull/516/commits/52c7381bc4f964c0ca0925f19c5192bf4a3e63c6 * Release v0.79.0 (#4402) * Removing some unused dependencies (#4385) * Removing some unused dependencies Also, reducing scope of auto-value to provided. * Restoring Firestore auto-value * Removing more instances of easymock. * Removing non-deprecated uses of joda time. (#4351) * Removing non-deprecated uses of joda time. This works towards #3482 * Update pom.xml * Ran `mvn com.coveo:fmt-maven-plugin:format` * Fixing a bad merge * Bump next snapshot (#4405) * fix shading (#4406) * Fixing some deprecation warnings (#4390) * fixing some deprecation warnings * updated comment * BigQuery : Fix Location configurable at BigQueryOptions (#4329) * Fix Location configurable from BigQueryOptions * modified code * modified code and add test case * removed unused location * Generate Firestore API v1 (#4410) * Bigtable: make row & cell ordering more consistent. (#4421) * Bigtable: make row & cell ordering more consistent. * RowCells should always be ordered in lexicographically by family, then qualifier and finally by reverse chronological order * Although rows will always be ordered lexicographically by row key, they should not implement Comparable to avoid confusion when compareTo() == 0 and equals() is false. Instead that ordering was moved to a separate comparator. * Add helpers to filter cells by family & qualifier * tweaks * code style * code style * Fix code formatting (#4437) * Regenerate compute client (#4399) * Regenerate compute client (#4444) * Firestore: Migrate client to use v1 gapic client / protos. (#4419) * Regenerate asset client (#4426) * Regenerate bigtable client (#4427) * Regenerate dataproc client (#4428) * Regenerate speech client (#4422) * Regenerate dialogflow client (#4429) * Regenerate redis client (#4431) * Regenerate securitycenter client (#4432) * Regenerate tasks client (#4411) * Fix usages of any(.class) matchers (#4453) In Mockito 2, if a method expects a primitive type, but an any(.class) matcher is used in its place, it will throw an error. To prepare for this upcoming breakage, change all existing any(.class) matchers to use the correct any() matcher. * Regenerate video-intelligence client (#4434) * Regenerate automl client (#4418) * Regenerate automl client (#4455) * Regenerate speech client (#4456) * add comments (#4441) * Update some default retry setting values (#4425) Ref: https://github.com/googleapis/google-cloud-java/blob/ed7bc857a05b34eed6be182d4798a62bf09cd394/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java#L497 * Implement BucketPolicyOnly (#4404) * implement BucketPolicyOnly * [Storage] Bucket Policy Only samples (#4408) * Humble beginnings for BPO samples with tests * Update samples per client changes * Fix format issue * Lint fix * Bigtable: make request context serializable (#4459) * Bigtable: expose helper to convert proto rows into model rows (#4458) This is needed for hbase adapter transition * Regenerate texttospeech client (#4333) * Regenerate compute client (#4462) * Skip integration tests if no changes (#4392) * Add environment variable for allowing skipping ITs * Skip integration tests if there are no changes in that directory * Also check google-cloud-core client and the grpc-api generated models * update TESTING.md (#4409) * Regenerate spanner client (#4433) * Regenerate texttospeech client (#4463) * 3534: Made HttpTransportOptions constructor public. (#4439) * use gax 1.38.0 (#4460) * Regenerate firestore client (#4348) * Empty table results can have a schema (#4185) * Update EmptyTableResult.java * Update Job.java * add test case * 4273: Added a new create method to pass offset and length of sub array. (#4407) * 4273: Added a new create method to pass offset and lenght of sub array. * 4273: Fixed codeformat error. * 4273: Rephrased a comment. * 4273: Added a new integration test using the new createBlobWithSubArrayFromByteArray code snippet. * Add firestore grpc and proto versions to versions.txt (#4464) * Release v0.80.0 (#4465) * Bump next snapshot (#4467) * Adding Cloud Bigtable Mutation fromProto (#4461) * Adding Cloud Bigtable Mutation fromProto * Fixing formatting * Addressing comments `fromProto` becomes `fromProtoUnsafe` * Fixing lint issues. * adding a warning to `fromProtoUnsafe` * Regenerate websecurityscanner client (#4435) * Add SequentialExecutorService that assists Pub/Sub to publish messages sequentially (#4315) * Add SequentialExecutorService * Add headers, make methods package private, and other review changes. * Fix formatting of SequentialExecutorService classes. * Add ordered publishing support to Cloud Pub/Sub (#4474) * Add ordering key fields to proto. We are locally committing the proto with ordering keys for now. Once we are prepared to release this feature, we can use the default, released proto. * Add support for ordered publishing * Add comments about ordering key properties. * Change retry codes and add checks for a null ordering key. * Pubsub ordering keys subscriber (#4515) * Add ordering key fields to proto. We are locally committing the proto with ordering keys for now. Once we are prepared to release this feature, we can use the default, released proto. * Add support for ordered publishing * Add comments about ordering key properties. * Change retry codes and add checks for a null ordering key. * Add support for ordered delivery to subscribers * Change the order of publishing batches when a large message is requested; Add unit tests (#4578) * Merge from master; Resolve conflicts (#4943) * Scale the system executor provider with the number of pull channels opened. (#4592) Make the SubscriberStubSettings refer to the user provided executor provider instead of a fixed instantiation of it. If the user provides an InstantiatingExecutorProvider instead of a FixedExecutorProvider, this will actually instantiate more than one as the user would expect. It will still only instantiate one for all connections to share, and will do so until the next PR which will make them have different stub instantiations. * Using the google-cloud-http-bom (#4594) * Using the latest version of truth (#4597) * Using a much newer version of truth * Upgrading annotations in google-api-grpc * Regenerate websecurityscanner client (#4614) * Regenerate vision client (#4613) * Regenerate video-intelligence client (#4612) * Regenerate tasks client (#4611) * Regenerate securitycenter client (#4610) * BigQuery: Fix useAvroLogicalTypes option in LoadJobConfiguration And WriteChannelConfiguration. (#4615) * Fix useAvroLogicalTypes in LoadJobConfiguration * Fix comment and added useAvroLogicalTypes in WriteChannelConfiguration * Fix comment * Fix code format * add propagateTimeout in SpannerExceptionFactory (#4598) - Add a helper method propagateTimeout in SpannerExceptionFactory to easily transform a TimeoutException to a SpannerException. - Propagate ApiExceptions a bit more nicely. * Regenerate scheduler client (#4609) * Regenerate pubsub client (#4608) * Regenerate os-login client (#4607) * Regenerate dlp client (#4603) * Regenerate iamcredentials client (#4605) * Regenerate bigquerystorage client (#4601) * Regenerate dataproc client (#4602) * Regenerate bigquerydatatransfer client (#4600) * Regenerate language client (#4606) * Regenerate firestore client (#4604) * Set the isDirectory flag appropriately (#4616) Set the isDirectory flag appropriately when using the currentDirectory() option * Extract the single message processing functionality from processOutstandingBatches. (#4618) * Upgrading grpc in google-api-grpc (#4593) * Prepare for KMS GA release - upgrade versions to 1.0.0. (#4581) * Adding vision v1p4beta1 (#4584) * Adding vision v1p4beta1 * Updating more pom.xmls * Fixing formatting issues * Clean up MessageDispatcher by changing processOutstandingBatches to explicitly loop instead of while(true) with breaks. There is now only 1 explicit return and 1 runtime error. (#4619) * Release google-cloud-java v0.82.0 (#4621) * Release v0.82.0 * Change KMS versions to 1.0.0. * Remove global synchronization from MessageDispatcher. (#4620) * Remove global synchronization from MessageDispatcher. Now that this uses a LinkedBlockingDeque for batches, this is no longer necessary. * Run code format. * Bump next snapshot (#4623) * Regenerate vision client (#4625) * Regenerate dataproc client (#4634) * Regenerate dialogflow client (#4635) * Regenerate firestore client (#4638) * Regenerate iot client (#4639) * Regenerate monitoring client (#4642) * Regenerate bigtable client (#4631) * Regenerate errorreporting client (#4637) * Regenerate pubsub client (#4643) * Regenerate automl client (#4629) * Regenerate redis client (#4644) * Regenerate bigquerydatatransfer client (#4630) * Regenerate scheduler client (#4645) * Regenerate kms client (#4640) * Regenerate vision client (#4650) * Regenerate websecurityscanner client (#4651) * Regenerate containeranalysis client (#4633) * Regenerate dlp client (#4636) * Regenerate trace client (#4649) * Regenerate spanner client (#4647) * Regenerate logging client (#4641) * Regenerate tasks client (#4648) * Regenerate securitycenter client (#4646) * Bump gax to 1.42.0 (#4624) Also regenerate the Compute client to match the newest version of gax. * Batch dml mainline (#4653) * Cloud Spanner Batch DML implementation and integration tests. (#45) * Fix the file header of the newly added classes. (#46) * Fix RPC interface mismatch after GAPIC migration. * Address review comment. * Fix code format with mvn com.coveo:fmt-maven-plugin:format. * Update year in file headers. * BigQuery: add long term storage bytes to standard table definition. (#4387) * BigQuery: Add long term storage bytes for managed tables. * formatting * let maven format the things * plumb this upwards into Table/TableInfo * return * assertion mismatch * Update TableInfoTest.java * Regenerate compute client (#4662) * Add Cloud Security Center v1 API. (#4659) * Add Cloud Security Center v1 API. * Add securitycenter to google-cloud-bom pom.xml * Fixing code format * Adding FieldValue.increment() (#4018) * DO NOT MERGE: Adding FieldValue.increment() * Use "increment" Proto naming * Spanner: Throw exception when SSLHandshakeException occurs instead of infinite retry loop (#4663) * #3889 throw exception when an SSLHandshakeException occurs SSLHandshakeExceptions are not retryable, as it is most probably an indication that the client does not accept the server certificate. * #3889 added test for retryability of SSLHandshakeException * fixed formatting * Add Cloud Scheduler v1 API. (#4658) * Add Cloud Scheduler v1 API. * Fixes to google-cloud-bom pom.xml * Add proto to scheduler pom.xml * Fix Timestamp.parseTimestamp. (#4656) * Fix parseTimestamp docs * Fix timestamp without timezone offset * Fix test cases related to timestamp.parseTimestamp * added test case * Fix timestampParser and added ZoneOffset in timestampParser * Release google-cloud-java v0.83.0 (#4665) * Regenerate securitycenter client (#4667) * Bump next snapshot (#4666) * Change each StreamingSubscriberConnection to have its own executor by default. (#4622) * Change each StreamingSubscriberConnection to have its own executor by default. This increases throughput by reducing contention on the executor queue mutex and makes the Subscriber implementation more accurately reflect the users intent when an InstantiatingExecutorProvider is passed. * Add a comment for executorProvider and alarmsExecutor. * Bigtable: Remove reference to deprecated typesafe name (#4671) * Add Cloud Video Intelligence v1p3beta1 API. (#4669) * Add Cloud Video Intelligence v1p3beta1 API. * Fix code formatting. * Regenerate language client (#4676) * Regenerate securitycenter client (#4677) * Regenerate firestore client (#4686) * #4685 Spanner now extends AutoCloseable (#4687) * Update speech readme to point to v1 javadoc. (#4693) * Fix pendingWrite race condition (#4696) pendingWrites.add() was not guaranteed to be called before pendingWrites.remove() * Optimize pendingWrites (#4697) Reduce contention on pendingWrites by using a ConcurrentHashMap instead. * Better explain how to use explicit credentials (#4694) * Better explain how to use explicit credentials This pull request updates the documentation and adds an example. * Run auto-formatter mvn com.coveo:fmt-maven-plugin:format * Updating Copyright year on UseExplicitCredentials * Add Cloud Talent Solution API. (#4699) * Add Talent API * Add Talent API * Add Talent API * Add talent API * reformat * Update pom.xml * Update pom.xml * Add MDC support in Logback appender (#4477) * Firestore: Update CustomClassMapper (#4675) * Firestore: Update CustomClassMapper * Adding Unit tests * Lint fix * Regenerate securitycenter client (#4704) * Regenerate talent client (#4705) * BigQuery: Added missing partitioned fields to listTables. (#4701) * Add missing partitioned fields to listTables * Fix table delete in finally block * Fix test failing * OpenCensus Support for Cloud Pub/Sub (#4240) * Adds OpenCensus context propagation to Publisher and Subscriber. * Updates OpenCensus attribute keys so that they will be propagated by CPS. * Addresses reviewer comments by fixing build files and using only defined annotations. * Updates build dependencies and copyright date. * Fixes typo. * Removes encoding of OpenCensus tags. Will re-enable once text encoding spec has been finalized (https://github.com/census-instrumentation/opencensus-specs/issues/65). * Updates encoding of SpanContext to use W3C specified encoding; Also preserves sampling decision from the publisher in the subscriber. * Adds unit test for OpenCensusUtil. * Adds unit test for OpenCensusUtil. * Updates OpenCensus integration to use a generic MessageTransform. * Removes now-unused private constant. * Update pom.xml * Marking setTransform as BetaApi * Fixes for formatting issues. * Release v0.84.0 (#4713) * Bump next snapshot (#4715) * Fix comment in grpc-google-cloud-firestore-v1 POM (#4717) Fixing a copy&paste typo * Regenerate firestore client (#4719) * Upgrade common protos dependency to 1.15.0. (#4722) * V1 Admin API for Firestore (#4716) * Manually regenerate Redis (#4723) * Manually regenerate Redis * Fixing a formatting issue * 1.x is stable (#4724) @sduskis * Regenerate bigquerystorage client (#4730) * Regenerate pubsub client (#4732) * Added feature to accept Firestore Value. (#4709) * Fix firestore value * Added IT test * try to fix flaky test (#4733) (cherry picked from commit 8255a9b475c599814d42a6a28eef3d8e11f6b082) * Regenerate compute client (#4731) * Replacing GoogleCloudPlatform with googleapis in docs. (#4707) * Replacing GoogleCloudPlatform with googleapis in docs. * Fixing formatting issues. * Regenerate Redis client (#4738) * redis changes with googleapis changes for STATIC_TYPES * foo * revert noop * fix default value of maxSessions in document of setMaxSessions (#4728) * Spanner: Added extra documentation to the DatabaseClient.singleUse methods (#4721) * added extra documentation to the singleUse methods #4212 * changed formatting manually and re-run mvn formatter * Regenerate Firestore clients (#4741) * add diff * linter * Regenerate iamcredentials client (#4746) * Regenerate monitoring client (#4747) * Refresh Monitoring client (#4744) * regen monitoring with STATIC_TYPES * regen again but with java_package * regen again with enable_string_formatting_functions * regen again * the OuterClass is generated because the proto file contains the amessage of the same name * no changes in protos or gapics, just refresh * refresh but with proto changes only * Add Snippets for working with Assets in Cloud Security Command Center. (#4690) * Add Snippets for working with Assets in Cloud Security Command Center. - I missed instruction for how to add a new key, but I believe a new account is needed, so prestaged assets can be queried. * Address code review comments. * remove securitycenter-it.cfg * update comments * fix string remove firewall reference * Fix format * fix format * Address comments and fix bad docs/alignment with python example * Fix print description * Updates per rubrics * fix warnings * Add Apache Headers * remove unused comment * Manually regenerating the talent v4beta1 API (#4750) * Manually regenerating the talent v4beta1 API ResumeService was removed from the protos, requiring manual deletion of all related classes. * Fixing formatting. * Pass through header provider in BQ storage client settings. (#4740) * Pass through header provider in BQ storage client settings. This change modifies the enhanced stub for the BigQuery storage client to pass through the user-specified header provider to the storage settings base class at creationt ime. This addresses an issue where user-specified headers were being ignored when creating a client object. * Apply formatting plugin * Set up the in-proc server only once * It's 2019 now * changes for iamcreds with STATIC_TYPES (#4745) * Regenerate asset client (#4758) * Regenerate automl client (#4759) * Regenerate dlp client (#4765) * Regenerate bigtable client (#4760) * Regenerate logging client (#4769) * Regenerate monitoring client (#4770) * Regenerate pubsub client (#4772) * Regenerate container client (#4761) * Regenerate os-login client (#4771) * Regenerate errorreporting client (#4766) * Regenerate containeranalysis client (#4762) * Regenerate talent client (#4777) * Regenerate securitycenter client (#4775) * Regenerate redis client (#4773) * Regenerate scheduler client (#4774) * Regenerate iamcredentials client (#4768) * Regenerate dialogflow client (#4764) * Regenerate firestore client (#4767) * Regenerate spanner client (#4776) * Regenerate trace client (#4778) * Regenerate dataproc client (#4763) * Regenerate vision client (#4779) * Regenerate websecurityscanner client (#4780) * regen (#4755) * Upgrade google.auth.version to 0.15.0. (#4743) * Upgrade google.auth.version to 0.14.0. * Upgrade google.auth.version to 0.14.0. * Upgrade google.auth.version to 0.15.0. * allow specifying a custom credentials file for integration tests (#4782) * Regenerate containeranalysis client (#4792) * Regenerate dataproc client (#4793) * Regenerate datastore client (#4794) * Regenerate asset client (#4786) * Regenerate iamcredentials client (#4799) * Regenerate bigquerydatatransfer client (#4788) * Regenerate language client (#4802) * Regenerate kms client (#4801) * Regenerate bigtable client (#4790) * Regenerate monitoring client (#4804) * Regenerate bigquerystorage client (#4789) * Regenerate speech client (#4811) * Regenerate securitycenter client (#4809) * Regenerate spanner client (#4810) * Regenerate redis client (#4807) * Regenerate container client (#4791) * Regenerate scheduler client (#4808) * Regenerate os-login client (#4805) * Regenerate errorreporting client (#4797) * Regenerate websecurityscanner client (#4818) * Regenerate logging client (#4803) * Regenerate texttospeech client (#4814) * Regenerate tasks client (#4813) * Regenerate dialogflow client (#4795) * Regenerate trace client (#4815) * Upgrade protobuf version to 3.7.0. (#4819) * Upgrade protobuf version to 3.7.0. * Upgrade protobuf version to 3.7.1. * Upgrade protobuf version to 3.7.0. * Release google-cloud-java v0.85.0 (#4820) * Bump snapshot (#4821) * Regenerate automl client (#4822) * Regenerate dialogflow client (#4823) * Regenerate dlp client (#4824) * Regenerate firestore client (#4825) * Regenerate iot client (#4826) * Regenerate pubsub client (#4827) * Regenerate talent client (#4828) * Regenerate video-intelligence client (#4829) * Regenerate vision client (#4830) * Add Cloud Asset v1 API. (#4834) * Upgrade gax to 1.43.0. (#4836) * Upgrade gax to 1.43.0. * code format * Regenerate iamcredentials client (#4849) * Regenerate iot client (#4850) * Regenerate kms client (#4851) * Regenerate texttospeech client (#4864) * Regenerate tasks client (#4863) * Regenerate talent client (#4862) * Regenerate bigquerystorage client (#4840) * Regenerate bigquerydatatransfer client (#4839) * Regenerate automl client (#4838) * Regenerate language client (#4852) * Regenerate monitoring client (#4854) * Regenerate pubsub client (#4856) * Regenerate redis client (#4857) * Regenerate scheduler client (#4858) * Regenerate securitycenter client (#4859) * Regenerate spanner client (#4860) * Regenerate trace client (#4865) * Regenerate video-intelligence client (#4866) * Regenerate websecurityscanner client (#4868) * Regenerate vision client (#4867) * Regenerate speech client (#4861) * Regenerate os-login client (#4855) * Regenerate logging client (#4853) * Regenerate firestore client (#4848) * Regenerate containeranalysis client (#4843) * Regenerate container client (#4842) * Regenerate bigtable client (#4841) * Regenerate dataproc client (#4844) * Regenerate dlp client (#4846) * Regenerate errorreporting client (#4847) * google-cloud-logging-logback: Allow user to specify custom LoggingOptions (#4729) google-cloud-logging-logback: #3215 allow user to specify custom LoggingOptions * Regenerate asset client (#4837) * Regenerate dialogflow client (#4845) * Add translate v3beta1 API. (#4870) * translate v3beta1 * readme * Adding datalabeling. (#4872) * Adding datalabeling. * Fixing formatting. * Adding datalabeling readme. * Fixing Readme. * Add tasks v2 client (#4879) * auto close input stream (#4878) * Add Cloud Web Risk client. (#4881) * Add Cloud Web Risk client. * code format fix * Add note on whitelist / signing up for Beta. * Delete stray preposition * Regenerate datalabeling client (#4883) * Regenerate automl client (#4882) * Regenerate talent client (#4884) * fix flaky TransactionManager tests (#4897) Transactions can always be aborted by Cloud Spanner, and all transaction code should therefore be surrounded by with abort safeguarding. * Change .yaml file name for webrisk client. (#4898) * Adding the translate beta samples (#4880) * Adding the translate beta samples * Fixing lint issues * Fixing coding format/style * Fixing coding format/style * Release v0.86.0 (#4899) * Bump next snapshot (#4900) * Regenerate bigquerystorage client (#4902) * Regenerate language client (#4903) * Regenerate webrisk client (#4904) * Storage: Add V4 signing support (#4692) * Add support for V4 signing * (storage) WIP: Add V4 signing support * (storage) Add V4 signing support * Add V4 samples (#4753) * storage: fix v4 samples (#4754) * Add V4 samples * Match C++ samples * Add missing import * Add web risk to list of clients. (#4901) * Release v0.87.0 (#4907) * Bump next snapshot (#4908) * Regenerate tasks client (#4912) * Regenerate scheduler client (#4911) * Regenerate redis client (#4910) * Regenerate compute client (#4909) * Spanner: Refactor SpannerImpl - Move AbstractResultSet to separate file (#4891) Refactor SpannerImpl: Move AbstractResultSet to separate file * Spanner: Refactor SpannerImpl - Move AbstractReadContext to separate file (#4890) Spanner: Refactor SpannerImpl - Move AbstractReadContext to separate file * Regenerate language client (#4921) * Spanner: Refactor SpannerImpl - Move DatabaseAdminClientImpl to separate file (#4892) * refactor SpannerImpl: move DatabaseAdminClientImpl to separate file * removed unnecessary imports * Bigquery : Fix close write channel (#4924) * close write channel * Update BigQuerySnippets.java * Update BigQuerySnippets.java * Update BigQuerySnippets.java * Spanner: Refactor SpannerImpl - Move InstanceAdminClient to separate file (#4893) refactor SpannerImpl: move InstanceAdminClient to separate file * refactor SpannerImpl: move PartitionedDmlTransaction to separate file (#4894) * BigQuery: Added listPartitions. (#4923) * added listPartitions * added unit test * modified code * modified unit test for listPartitions * added integration test * Fix table name * Fix integration test * Regenerate dialogflow client (#4928) * Regenerate securitycenter client (#4929) * Regenerate trace client (#4931) * google-cloud-core: Optimize Date.parseDate(String) (#4920) * optimized date parsing Date parsing using a regular expression is slower than a specific implementation for the only format that is supported. * removed IntParser and added test cases * check for invalid date/year/month/day * Spanner: Refactor SpannerImpl - Move TransactionRunnerImpl to separate file (#4896) refactor SpannerImpl: move TransactionRunnerImpl to separate file * Regenerate securitycenter client (#4937) * storage: Un-Ignore testRotateFromCustomerEncryptionToKmsKeyWithCustomerEncryption (#4936) * Adding support to custom ports on LocalDatastoreHelper (#4933) (#4935) * Storage : Fix manage resumeable signedURL uploads. (#4874) * commit for manage resumeable signedURL uploads #2462 * for manage resumeable signedURL uploads #2462 * fix comment * fix ITStorageTest case written for upload using signURL * fix format * fix BlobWriteChannel constructor changes. * fix signURL validation. * fix format * signurl rename to signedURL , firstnonnull check removed,signedURL validation with googleacessid and expires field also. * signedURL validation with googleacessid and expires field also. * fix forsignedURL validation with V4 Signing support. * fix forproviding example of writing content using signedURL through Writer. * fix forStorageRpc open method argument change. * fix forStorageRpc open method doc comment changes. * cloud-storage-contrib-nio: Add support for newFileChannel (#4718) add support for newFileChannel(...) * Change MessageDispatcher to be synchronous instead of asynchronous. (#4916) * Change MessageDispatcher to be synchronous instead of asynchronous. This removes the failure mode described in #2452 that can occur when MaxOutstandingElementCount is low and there is more than one connection. In this case, it is possible for an individual MessageDispatcher to have no outstanding in-flight messages, but also be blocked by flow control with a whole new batch outstanding. In this case, it will never make progress on that batch since it will never receive another batch and the queue was made to not be shared in #4590, so the batch will never be pulled off by another MessageDispatcher. By changing this to use a blocking flow controller, this will never happen, as each batch will synchronously wait until it is allowed by flow control before being processed. * Run mvn com.coveo:fmt-maven-plugin:format * Remove unused code * Update Publisher.java * Fixing format * Refactoring of the Pub/Sub Ordering keys client (#4962) * [WIP] Refactoring SequentialExecutorService (#4969) * Refactoring SequentialExecutorService Step 1: - create a `CallbackExecutor` and `AutoExecutor` as subclasses of SequentialExecutor. * Adding CallbackExecutor.submit() for encapsulation * Moving resume into `CallbackExecutor` * create an abstract class for execute(key, deque) This removest the last bit of code that directly used the TaskCompleteAction enum. * Running the formatter. * Merged with the formatting changes. (#4978) * Fixng a bad merge. * Refactoring SequentialExecutorService.java (#4979) - Marking methods as private - renaming variables from `finalTasks` to `tasks` for clarity - reducing code duplication by calling `execute` * Cleaning up generics in SequentialExecutorService (#4982) * Adding comments to CallbackExecutor.submit (#4981) * Adding comments to CallbackExecutor.submit * Fixing the merge * Exposing AutoExecutor and CallbackExecutor directly (#4983) * More refactoring to SequentialExecutor (#4984) - Moving common async code into a new method: `callNextTaskAsync()` - Adding a `postTaskExecution()` method which is useful for `AutoExecutor` * SequentialExecutorService.callNextTaskAsync only uses key (#4992) 1. Removing the `Deque` parameter from `SequentialExecutorService.callNextTaskAsync()` and looking it up the `Deque` in the method 2. Simplify `AutoExecutor` and `CallbackExecutor` after the refactoring in 1). * SequentialExecutorService now uses generics for Runnables. * Using a Queue instead of Deque. Queues are FIFO, Deque don't guaratee which end the value is removed from. * Renaming a variable in Publisher * More refactoring to SequentialExecutorService. * Running the formatter * Reformatting the publisher * Pub/Sub: publishAll defers sending batches. * Reverting last change to publishAllOutstanding * Cleaning up messageBatch entries * Alarms now wait to publish until previous bundle completes * Using Preconditions in Publisher. * The Publisher's callback should happen second The `SequentialExecutor`'s future callback needs to happen before the `Publisher`'s callback, in case the `SequentialExecutor` cancels. * Fixing a flaky test The `testLargeMessagesDoNotReorderBatches()` test had a flaky test, depending on how unrelated threads complete. Removing the check for the behavior of the unrelated thread. * Adding resume publish. (#5046) * Adding resume publish. * Addressing comments * Fixing formatting issues * Adding comments * Fixing formatting * Ordering keys setter is now package private This will allow us to merge this branch into master. * Cleanup before merge to main line - Removing EnumSet from Status codes - Adding `@BetaApi` to `resumePublish`, and making it package private. * Fixing lint * publish() doesn't use an executor. * Moving the publish RPC call into the lock Also,removing a comment that no longer applies. * Updating GAX in clients to 1.45.0 for docs. * Fix publisher throughput when there are no ordering keys. (#5607) * Fix publisher throughput when there are no ordering keys. * Remove batch from iterator if we are going to process it. * Make appropriate things private, remove unnecessary call to publishAllWithoutInflight, and fix unit tests accordingly. * Fix publisher formatting * Experimental removal of test to see if it is the one causing timeouts. * Fix comment --- .../cloud/pubsub/v1/MessageDispatcher.java | 11 +- .../com/google/cloud/pubsub/v1/Publisher.java | 181 ++++++++++-- .../pubsub/v1/SequentialExecutorService.java | 264 +++++++++++++++++ .../pubsub/v1/FakePublisherServiceImpl.java | 22 +- .../cloud/pubsub/v1/PublisherImplTest.java | 272 ++++++++++++++++++ .../v1/SequentialExecutorServiceTest.java | 247 ++++++++++++++++ 6 files changed, 966 insertions(+), 31 deletions(-) create mode 100644 google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java create mode 100644 google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 4b6d02d90867..53b979d5e447 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -62,6 +62,7 @@ class MessageDispatcher { @InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100); private final Executor executor; + private final SequentialExecutorService.AutoExecutor sequentialExecutor; private final ScheduledExecutorService systemExecutor; private final ApiClock clock; @@ -206,6 +207,7 @@ void sendAckOperations( jobLock = new ReentrantLock(); messagesWaiter = new MessageWaiter(); this.clock = clock; + this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(executor); } void start() { @@ -358,7 +360,7 @@ public void nack() { } }; ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor()); - executor.execute( + Runnable deliverMessageTask = new Runnable() { @Override public void run() { @@ -379,7 +381,12 @@ public void run() { response.setException(e); } } - }); + }; + if (message.getOrderingKey().isEmpty()) { + executor.execute(deliverMessageTask); + } else { + sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask); + } } /** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */ diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 31a57fcc9daa..8a5802420854 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -49,9 +49,12 @@ import com.google.pubsub.v1.TopicNames; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -85,15 +88,20 @@ public class Publisher { private final String topicName; private final BatchingSettings batchingSettings; + private final boolean enableMessageOrdering; private final Lock messagesBatchLock; - private MessagesBatch messagesBatch; + + private final Map messagesBatches; private final AtomicBoolean activeAlarm; private final PublisherStub publisherStub; private final ScheduledExecutorService executor; + + private final SequentialExecutorService.CallbackExecutor sequentialExecutor; + private final AtomicBoolean shutdown; private final BackgroundResource backgroundResources; private final MessageWaiter messagesWaiter; @@ -114,12 +122,14 @@ private Publisher(Builder builder) throws IOException { topicName = builder.topicName; this.batchingSettings = builder.batchingSettings; + this.enableMessageOrdering = builder.enableMessageOrdering; this.messageTransform = builder.messageTransform; - messagesBatch = new MessagesBatch(batchingSettings); + messagesBatches = new HashMap<>(); messagesBatchLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); executor = builder.executorProvider.getExecutor(); + sequentialExecutor = new SequentialExecutorService.CallbackExecutor(executor); List backgroundResourceList = new ArrayList<>(); if (builder.executorProvider.shouldAutoClose()) { backgroundResourceList.add(new ExecutorAsBackgroundResource(executor)); @@ -127,9 +137,18 @@ private Publisher(Builder builder) throws IOException { // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry. // We post-process this here to keep backward-compatibility. - RetrySettings retrySettings = builder.retrySettings; - if (retrySettings.getMaxAttempts() == 0) { - retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build(); + // Also, if "message ordering" is enabled, the publisher should retry sending the failed + // message infinitely rather than sending the next one. + RetrySettings.Builder retrySettingsBuilder = builder.retrySettings.toBuilder(); + if (retrySettingsBuilder.getMaxAttempts() == 0) { + retrySettingsBuilder.setMaxAttempts(Integer.MAX_VALUE); + } + if (enableMessageOrdering) { + // TODO: is there a way to have the default retry settings for requests without an ordering + // key? + retrySettingsBuilder + .setMaxAttempts(Integer.MAX_VALUE) + .setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE)); } PublisherStubSettings.Builder stubSettings = @@ -147,7 +166,7 @@ private Publisher(Builder builder) throws IOException { StatusCode.Code.RESOURCE_EXHAUSTED, StatusCode.Code.UNKNOWN, StatusCode.Code.UNAVAILABLE) - .setRetrySettings(retrySettings) + .setRetrySettings(retrySettingsBuilder.build()) .setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build()); this.publisherStub = GrpcPublisherStub.create(stubSettings.build()); backgroundResourceList.add(publisherStub); @@ -194,38 +213,71 @@ public String getTopicNameString() { public ApiFuture publish(PubsubMessage message) { Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher."); + final String orderingKey = message.getOrderingKey(); + Preconditions.checkState( + orderingKey.isEmpty() || enableMessageOrdering, + "Cannot publish a message with an ordering key when message ordering is not enabled."); + final OutstandingPublish outstandingPublish = new OutstandingPublish(messageTransform.apply(message)); List batchesToSend; messagesBatchLock.lock(); try { + MessagesBatch messagesBatch = messagesBatches.get(orderingKey); + if (messagesBatch == null) { + messagesBatch = new MessagesBatch(batchingSettings, orderingKey); + messagesBatches.put(orderingKey, messagesBatch); + } + batchesToSend = messagesBatch.add(outstandingPublish); + if (!batchesToSend.isEmpty() && messagesBatch.isEmpty()) { + messagesBatches.remove(orderingKey); + } // Setup the next duration based delivery alarm if there are messages batched. setupAlarm(); + // For messages with an ordering key, we need to publish with messagesBatchLock held in order + // to ensure another publish doesn't slip in and send a batch before these batches we already + // want to send. + if (!batchesToSend.isEmpty() && !orderingKey.isEmpty()) { + for (final OutstandingBatch batch : batchesToSend) { + logger.log(Level.FINER, "Scheduling a batch for immediate sending."); + publishOutstandingBatch(batch); + } + } } finally { messagesBatchLock.unlock(); } messagesWaiter.incrementPendingMessages(1); - if (!batchesToSend.isEmpty()) { + // For messages without ordering keys, it is okay to send batches without holding + // messagesBatchLock. + if (!batchesToSend.isEmpty() && orderingKey.isEmpty()) { for (final OutstandingBatch batch : batchesToSend) { logger.log(Level.FINER, "Scheduling a batch for immediate sending."); - executor.execute( - new Runnable() { - @Override - public void run() { - publishOutstandingBatch(batch); - } - }); + publishOutstandingBatch(batch); } } return outstandingPublish.publishResult; } + /** + * There may be non-recoverable problems with a request for an ordering key. In that case, all + * subsequent requests will fail until this method is called. If the key is not currently paused, + * calling this method will be a no-op. + * + * @param key The key for which to resume publishing. + */ + // TODO: make this public when Ordering keys is live + @BetaApi + void resumePublish(String key) { + Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher."); + sequentialExecutor.resumePublish(key); + } + private void setupAlarm() { - if (!messagesBatch.isEmpty()) { + if (!messagesBatches.isEmpty()) { if (!activeAlarm.getAndSet(true)) { long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis(); logger.log(Level.FINER, "Setting up alarm for the next {0} ms.", delayThresholdMs); @@ -236,7 +288,7 @@ private void setupAlarm() { public void run() { logger.log(Level.FINER, "Sending messages based on schedule."); activeAlarm.getAndSet(false); - publishAllOutstanding(); + publishAllWithoutInflight(); } }, delayThresholdMs, @@ -256,17 +308,61 @@ public void run() { * futures returned from {@code publish}. */ public void publishAllOutstanding() { + OutstandingBatch unorderedOutstandingBatch = null; + messagesBatchLock.lock(); + try { + for (MessagesBatch batch : messagesBatches.values()) { + if (!batch.isEmpty()) { + if (!batch.orderingKey.isEmpty()) { + // For messages with an ordering key, we need to publish with messagesBatchLock held in + // order to ensure another publish doesn't slip in and send a batch before these batches + // we already want to send. + publishOutstandingBatch(batch.popOutstandingBatch()); + } else { + unorderedOutstandingBatch = batch.popOutstandingBatch(); + } + } + } + messagesBatches.clear(); + } finally { + messagesBatchLock.unlock(); + } + if (unorderedOutstandingBatch != null) { + publishOutstandingBatch(unorderedOutstandingBatch); + } + } + + /** + * Publish any outstanding batches if non-empty and there are no other batches in flight. This + * method sends buffered messages, but does not wait for the send operations to complete. To wait + * for messages to send, call {@code get} on the futures returned from {@code publish}. + */ + private void publishAllWithoutInflight() { + OutstandingBatch unorderedOutstandingBatch = null; messagesBatchLock.lock(); - OutstandingBatch batchToSend; try { - if (messagesBatch.isEmpty()) { - return; + Iterator> it = messagesBatches.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + MessagesBatch batch = entry.getValue(); + String key = entry.getKey(); + if (batch.isEmpty()) { + it.remove(); + } else if (key.isEmpty()) { + // We will publish the batch with no ordering key outside messagesBatchLock. + unorderedOutstandingBatch = batch.popOutstandingBatch(); + it.remove(); + } else if (!sequentialExecutor.hasTasksInflight(key)) { + publishOutstandingBatch(batch.popOutstandingBatch()); + it.remove(); + } } - batchToSend = messagesBatch.popOutstandingBatch(); } finally { messagesBatchLock.unlock(); } - publishOutstandingBatch(batchToSend); + if (unorderedOutstandingBatch != null) { + publishOutstandingBatch(unorderedOutstandingBatch); + } } private ApiFuture publishCall(OutstandingBatch outstandingBatch) { @@ -280,12 +376,12 @@ private ApiFuture publishCall(OutstandingBatch outstandingBatch } private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { - ApiFutureCallback futureCallback = + final ApiFutureCallback futureCallback = new ApiFutureCallback() { @Override public void onSuccess(PublishResponse result) { try { - if (result.getMessageIdsCount() != outstandingBatch.size()) { + if (result == null || result.getMessageIdsCount() != outstandingBatch.size()) { outstandingBatch.onFailure( new IllegalStateException( String.format( @@ -311,7 +407,21 @@ public void onFailure(Throwable t) { } }; - ApiFutures.addCallback(publishCall(outstandingBatch), futureCallback, directExecutor()); + ApiFuture future; + if (outstandingBatch.orderingKey == null || outstandingBatch.orderingKey.isEmpty()) { + future = publishCall(outstandingBatch); + } else { + // If ordering key is specified, publish the batch using the sequential executor. + future = + sequentialExecutor.submit( + outstandingBatch.orderingKey, + new Callable>() { + public ApiFuture call() { + return publishCall(outstandingBatch); + } + }); + } + ApiFutures.addCallback(future, futureCallback, directExecutor()); } private static final class OutstandingBatch { @@ -319,12 +429,15 @@ private static final class OutstandingBatch { final long creationTime; int attempt; int batchSizeBytes; + final String orderingKey; - OutstandingBatch(List outstandingPublishes, int batchSizeBytes) { + OutstandingBatch( + List outstandingPublishes, int batchSizeBytes, String orderingKey) { this.outstandingPublishes = outstandingPublishes; attempt = 1; creationTime = System.currentTimeMillis(); this.batchSizeBytes = batchSizeBytes; + this.orderingKey = orderingKey; } int size() { @@ -468,7 +581,7 @@ public static final class Builder { .setRpcTimeoutMultiplier(2) .setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT) .build(); - + static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false; private static final int THREADS_PER_CPU = 5; static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = InstantiatingExecutorProvider.newBuilder() @@ -482,6 +595,8 @@ public static final class Builder { RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS; + private boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING; + private TransportChannelProvider channelProvider = TopicAdminSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build(); @@ -576,6 +691,14 @@ public Builder setRetrySettings(RetrySettings retrySettings) { return this; } + /** Sets the message ordering option. */ + // TODO: make this public when Ordering keys is live + @BetaApi + Builder setEnableMessageOrdering(boolean enableMessageOrdering) { + this.enableMessageOrdering = enableMessageOrdering; + return this; + } + /** Gives the ability to set a custom executor to be used by the library. */ public Builder setExecutorProvider(ExecutorProvider executorProvider) { this.executorProvider = Preconditions.checkNotNull(executorProvider); @@ -601,15 +724,17 @@ public Publisher build() throws IOException { private static class MessagesBatch { private List messages; private int batchedBytes; + private String orderingKey; private final BatchingSettings batchingSettings; - public MessagesBatch(BatchingSettings batchingSettings) { + private MessagesBatch(BatchingSettings batchingSettings, String orderingKey) { this.batchingSettings = batchingSettings; + this.orderingKey = orderingKey; reset(); } private OutstandingBatch popOutstandingBatch() { - OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes); + OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes, orderingKey); reset(); return batch; } diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java new file mode 100644 index 000000000000..a172c1ecb36d --- /dev/null +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -0,0 +1,264 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.core.BetaApi; +import com.google.api.core.SettableApiFuture; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Executor; + +interface CancellableRunnable extends Runnable { + void cancel(Throwable e); +} + +/** + * An executor service that runs the tasks with the same key sequentially. The tasks with the same + * key will be run only when its predecessor has been completed while tasks with different keys can + * be run in parallel. + */ +final class SequentialExecutorService { + + // This class is not directly usable. + private SequentialExecutorService() {} + + /** + * This Executor takes a serial stream of string keys and {@code Runnable} tasks, and runs the + * tasks with the same key sequentially. Tasks with the same key will be run only when its + * predecessor has been completed while tasks with different keys can be run in parallel. + */ + private abstract static class SequentialExecutor { + // Maps keys to tasks. + protected final Map> tasksByKey; + protected final Executor executor; + + private SequentialExecutor(Executor executor) { + this.executor = executor; + this.tasksByKey = new HashMap<>(); + } + + boolean hasTasksInflight(String key) { + synchronized (tasksByKey) { + return tasksByKey.containsKey(key); + } + } + + protected void execute(final String key, R task) { + synchronized (tasksByKey) { + Queue newTasks = tasksByKey.get(key); + // If this key is already being handled, add it to the queue and return. + if (newTasks != null) { + newTasks.add(task); + return; + } else { + newTasks = new LinkedList<>(); + newTasks.add(task); + tasksByKey.put(key, newTasks); + } + } + + callNextTaskAsync(key); + } + + protected void callNextTaskAsync(final String key) { + boolean executeTask = true; + synchronized (tasksByKey) { + Queue tasks = tasksByKey.get(key); + if (tasks != null && tasks.isEmpty()) { + // Only remove the Queue after all tasks were completed + tasksByKey.remove(key); + executeTask = false; + } + } + if (executeTask) { + executor.execute( + new Runnable() { + @Override + public void run() { + R task = null; + synchronized (tasksByKey) { + Queue tasks = tasksByKey.get(key); + if (tasks != null && !tasks.isEmpty()) { + task = tasks.poll(); + } + } + if (task != null) { + task.run(); + } + } + }); + } + } + } + + @BetaApi + static class AutoExecutor extends SequentialExecutor { + AutoExecutor(Executor executor) { + super(executor); + } + + /** Runs synchronous {@code Runnable} tasks sequentially. */ + void submit(final String key, final Runnable task) { + super.execute( + key, + new Runnable() { + @Override + public void run() { + try { + task.run(); + } finally { + callNextTaskAsync(key); + } + } + }); + } + } + + /** + * Runs asynchronous {@code Callable} tasks sequentially for the same key. If one of the tasks + * fails, other tasks with the same key that have not been executed will be cancelled. + */ + @BetaApi + static class CallbackExecutor extends SequentialExecutor { + static CancellationException CANCELLATION_EXCEPTION = + new CancellationException( + "Execution cancelled because executing previous runnable failed."); + + private final Set keysWithErrors = Collections.synchronizedSet(new HashSet()); + + CallbackExecutor(Executor executor) { + super(executor); + } + + /** + * Runs asynchronous {@code Callable} tasks sequentially. If one of the tasks fails, other tasks + * with the same key that have not been executed will be cancelled. + * + *

This method does the following in a chain: + * + *

    + *
  1. Creates an `ApiFuture` that can be used for tracking progress. + *
  2. Creates a `CancellableRunnable` out of the `Callable` + *
  3. Adds the `CancellableRunnable` to the task queue + *
  4. Once the task is ready to be run, it will execute the `Callable` + *
  5. When the `Callable` completes one of two things happens: + *
      + *
    1. On success: + *
        + *
      1. Complete the `ApiFuture` by setting the return value. + *
      2. Call the next task. + *
      + *
    2. On Failure: + *
        + *
      1. Fail the `ApiFuture` by setting the exception. + *
      2. Cancel all tasks in the queue. + *
      + *
    + *
+ * + * @param key The key for the task queue + * @param callable The thing to run + * @param The return type for the `Callable`'s `ApiFuture`. + * @return an `ApiFuture` for tracking. + */ + ApiFuture submit(final String key, final Callable> callable) { + // Step 1: create a future for the user + final SettableApiFuture future = SettableApiFuture.create(); + + if (keysWithErrors.contains(key)) { + future.setException(CANCELLATION_EXCEPTION); + return future; + } + + // Step 2: create the CancellableRunnable + // Step 3: add the task to queue via `execute` + CancellableRunnable task = + new CancellableRunnable() { + private boolean cancelled = false; + + @Override + public void run() { + // the task was cancelled + if (cancelled) { + return; + } + + try { + // Step 4: call the `Callable` + ApiFutureCallback callback = + new ApiFutureCallback() { + // Step 5.1: on success + @Override + public void onSuccess(T msg) { + future.set(msg); + callNextTaskAsync(key); + } + + // Step 5.2: on failure + @Override + public void onFailure(Throwable e) { + future.setException(e); + cancelQueuedTasks(key, CANCELLATION_EXCEPTION); + } + }; + ApiFutures.addCallback(callable.call(), callback, directExecutor()); + } catch (Exception e) { + cancel(e); + } + } + + @Override + public void cancel(Throwable e) { + this.cancelled = true; + future.setException(e); + } + }; + execute(key, task); + return future; + } + + void resumePublish(String key) { + keysWithErrors.remove(key); + } + + /** Cancels every task in the queue associated with {@code key}. */ + private void cancelQueuedTasks(final String key, Throwable e) { + keysWithErrors.add(key); + synchronized (tasksByKey) { + final Queue tasks = tasksByKey.get(key); + if (tasks != null) { + while (!tasks.isEmpty()) { + tasks.poll().cancel(e); + } + tasksByKey.remove(key); + } + } + } + } +} diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java index 396b5d05bd5f..620a09ac98bc 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; /** * A fake implementation of {@link PublisherImplBase}, that can be used to test clients of a Cloud @@ -33,6 +34,8 @@ class FakePublisherServiceImpl extends PublisherImplBase { private final LinkedBlockingQueue requests = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue publishResponses = new LinkedBlockingQueue<>(); + private final AtomicInteger nextMessageId = new AtomicInteger(1); + private boolean autoPublishResponse; /** Class used to save the state of a possible response. */ private static class Response { @@ -75,7 +78,15 @@ public void publish(PublishRequest request, StreamObserver resp requests.add(request); Response response; try { - response = publishResponses.take(); + if (autoPublishResponse) { + PublishResponse.Builder builder = PublishResponse.newBuilder(); + for (int i = 0; i < request.getMessagesCount(); i++) { + builder.addMessageIds(Integer.toString(nextMessageId.getAndIncrement())); + } + response = new Response(builder.build()); + } else { + response = publishResponses.take(); + } } catch (InterruptedException e) { throw new IllegalArgumentException(e); } @@ -87,6 +98,15 @@ public void publish(PublishRequest request, StreamObserver resp } } + /** + * If enabled, PublishResponse is generated with a unique message id automatically when publish() + * is called. + */ + public FakePublisherServiceImpl setAutoPublishResponse(boolean autoPublishResponse) { + this.autoPublishResponse = autoPublishResponse; + return this; + } + public FakePublisherServiceImpl addPublishResponse(PublishResponse publishResponse) { publishResponses.add(new Response(publishResponse)); return this; diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index acbc82c95c41..f3c85220fc31 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -35,16 +35,19 @@ import com.google.cloud.pubsub.v1.Publisher.Builder; import com.google.protobuf.ByteString; import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PubsubMessage; import io.grpc.Server; import io.grpc.Status; import io.grpc.StatusException; import io.grpc.inprocess.InProcessServerBuilder; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.easymock.EasyMock; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -240,6 +243,275 @@ private ApiFuture sendTestMessage(Publisher publisher, String data) { PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(data)).build()); } + @Test + public void testBatchedMessagesWithOrderingKeyByNum() throws Exception { + // Limit the number of maximum elements in a single batch to 3. + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(3L) + .setDelayThreshold(Duration.ofSeconds(100)) + .build()) + .setEnableMessageOrdering(true) + .build(); + testPublisherServiceImpl.setAutoPublishResponse(true); + + // Publish two messages with ordering key, "OrderA", and other two messages with "OrderB". + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "OrderA"); + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "m2", "OrderB"); + ApiFuture publishFuture3 = sendTestMessageWithOrderingKey(publisher, "m3", "OrderA"); + ApiFuture publishFuture4 = sendTestMessageWithOrderingKey(publisher, "m4", "OrderB"); + + // Verify that none of them were published since the batching size is 3. + assertFalse(publishFuture1.isDone()); + assertFalse(publishFuture2.isDone()); + assertFalse(publishFuture3.isDone()); + assertFalse(publishFuture4.isDone()); + + // One of the batches reaches the limit. + ApiFuture publishFuture5 = sendTestMessageWithOrderingKey(publisher, "m5", "OrderA"); + // Verify that they were delivered in order per ordering key. + assertTrue(Integer.parseInt(publishFuture1.get()) < Integer.parseInt(publishFuture3.get())); + assertTrue(Integer.parseInt(publishFuture3.get()) < Integer.parseInt(publishFuture5.get())); + + // The other batch reaches the limit. + ApiFuture publishFuture6 = sendTestMessageWithOrderingKey(publisher, "m6", "OrderB"); + assertTrue(Integer.parseInt(publishFuture2.get()) < Integer.parseInt(publishFuture4.get())); + assertTrue(Integer.parseInt(publishFuture4.get()) < Integer.parseInt(publishFuture6.get())); + + // Verify that every message within the same batch has the same ordering key. + List requests = testPublisherServiceImpl.getCapturedRequests(); + for (PublishRequest request : requests) { + if (request.getMessagesCount() > 1) { + String orderingKey = request.getMessages(0).getOrderingKey(); + for (PubsubMessage message : request.getMessagesList()) { + assertEquals(message.getOrderingKey(), orderingKey); + } + } + } + publisher.shutdown(); + } + + @Test + public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception { + // Limit the batching timeout to 100 seconds. + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(10L) + .setDelayThreshold(Duration.ofSeconds(100)) + .build()) + .setEnableMessageOrdering(true) + .build(); + testPublisherServiceImpl.setAutoPublishResponse(true); + + // Publish two messages with ordering key, "OrderA", and other two messages with "OrderB". + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "OrderA"); + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "m2", "OrderB"); + ApiFuture publishFuture3 = sendTestMessageWithOrderingKey(publisher, "m3", "OrderA"); + ApiFuture publishFuture4 = sendTestMessageWithOrderingKey(publisher, "m4", "OrderB"); + + // Verify that none of them were published since the batching size is 10 and timeout has not + // been expired. + assertFalse(publishFuture1.isDone()); + assertFalse(publishFuture2.isDone()); + assertFalse(publishFuture3.isDone()); + assertFalse(publishFuture4.isDone()); + + // The timeout expires. + fakeExecutor.advanceTime(Duration.ofSeconds(100)); + + // Verify that they were delivered in order per ordering key. + assertTrue(Integer.parseInt(publishFuture1.get()) < Integer.parseInt(publishFuture3.get())); + assertTrue(Integer.parseInt(publishFuture2.get()) < Integer.parseInt(publishFuture4.get())); + + // Verify that every message within the same batch has the same ordering key. + List requests = testPublisherServiceImpl.getCapturedRequests(); + for (PublishRequest request : requests) { + if (request.getMessagesCount() > 1) { + String orderingKey = request.getMessages(0).getOrderingKey(); + for (PubsubMessage message : request.getMessagesList()) { + assertEquals(message.getOrderingKey(), orderingKey); + } + } + } + publisher.shutdown(); + } + + @Test + public void testLargeMessagesDoNotReorderBatches() throws Exception { + // Set the maximum batching size to 20 bytes. + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(10L) + .setRequestByteThreshold(20L) + .setDelayThreshold(Duration.ofSeconds(100)) + .build()) + .setEnableMessageOrdering(true) + .build(); + testPublisherServiceImpl.setAutoPublishResponse(true); + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "OrderA"); + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "m2", "OrderB"); + + assertFalse(publishFuture1.isDone()); + assertFalse(publishFuture2.isDone()); + + ApiFuture publishFuture3 = + sendTestMessageWithOrderingKey(publisher, "VeryLargeMessage", "OrderB"); + // Verify that messages with "OrderB" were delivered in order. + assertTrue(Integer.parseInt(publishFuture2.get()) < Integer.parseInt(publishFuture3.get())); + + publisher.shutdown(); + } + + @Test + public void testOrderingKeyWhenDisabled_throwsException() throws Exception { + // Message ordering is disabled by default. + Publisher publisher = getTestPublisherBuilder().build(); + try { + ApiFuture publishFuture = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); + fail("Should have thrown an IllegalStateException"); + } catch (IllegalStateException expected) { + // expected + } + publisher.shutdown(); + } + + @Test + public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { + // Set maxAttempts to 1 and enableMessageOrdering to true at the same time. + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setRetrySettings( + Publisher.Builder.DEFAULT_RETRY_SETTINGS + .toBuilder() + .setTotalTimeout(Duration.ofSeconds(10)) + .setMaxAttempts(1) + .build()) + .setEnableMessageOrdering(true) + .build(); + + // Although maxAttempts is 1, the publisher will retry until it succeeds since + // enableMessageOrdering is true. + testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); + testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); + testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1")); + + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); + assertEquals("1", publishFuture1.get()); + + assertEquals(4, testPublisherServiceImpl.getCapturedRequests().size()); + publisher.shutdown(); + } + + @Test + /** + * Make sure that resume publishing works as expected: + * + *
    + *
  1. publish with key orderA which returns a failure. + *
  2. publish with key orderA again, which should fail immediately + *
  3. publish with key orderB, which should succeed + *
  4. resume publishing on key orderA + *
  5. publish with key orderA, which should now succeed + *
+ */ + public void testResumePublish() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .build()) + .setEnableMessageOrdering(true) + .build(); + + ApiFuture future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); + ApiFuture future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA"); + + fakeExecutor.advanceTime(Duration.ZERO); + assertFalse(future1.isDone()); + assertFalse(future2.isDone()); + + // This exception should stop future publishing to the same key + testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); + + fakeExecutor.advanceTime(Duration.ZERO); + + try { + future1.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + } + + try { + future2.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + } + + // Submit new requests with orderA that should fail. + ApiFuture future3 = sendTestMessageWithOrderingKey(publisher, "m3", "orderA"); + ApiFuture future4 = sendTestMessageWithOrderingKey(publisher, "m4", "orderA"); + + try { + future3.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + try { + future4.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + // Submit a new request with orderB, which should succeed + ApiFuture future5 = sendTestMessageWithOrderingKey(publisher, "m5", "orderB"); + ApiFuture future6 = sendTestMessageWithOrderingKey(publisher, "m6", "orderB"); + + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("5").addMessageIds("6")); + + Assert.assertEquals("5", future5.get()); + Assert.assertEquals("6", future6.get()); + + // Resume publishing of "orderA", which should now succeed + publisher.resumePublish("orderA"); + + ApiFuture future7 = sendTestMessageWithOrderingKey(publisher, "m7", "orderA"); + ApiFuture future8 = sendTestMessageWithOrderingKey(publisher, "m8", "orderA"); + + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("7").addMessageIds("8")); + + Assert.assertEquals("7", future7.get()); + Assert.assertEquals("8", future8.get()); + + publisher.shutdown(); + } + + private ApiFuture sendTestMessageWithOrderingKey( + Publisher publisher, String data, String orderingKey) { + return publisher.publish( + PubsubMessage.newBuilder() + .setOrderingKey(orderingKey) + .setData(ByteString.copyFromUtf8(data)) + .build()); + } + @Test public void testErrorPropagation() throws Exception { Publisher publisher = diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java new file mode 100644 index 000000000000..3788bd3c04f8 --- /dev/null +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java @@ -0,0 +1,247 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.api.core.ApiFuture; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.common.collect.ImmutableList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public final class SequentialExecutorServiceTest { + private final ExecutorProvider executorProvider = + InstantiatingExecutorProvider.newBuilder() + .setExecutorThreadCount(5 * Runtime.getRuntime().availableProcessors()) + .build(); + + static class AsyncTaskCallable implements Callable> { + boolean isCalled = false; + SettableApiFuture result = SettableApiFuture.create(); + + @Override + public ApiFuture call() { + isCalled = true; + return result; + } + + public boolean isCalled() { + return isCalled; + } + + public void finishWithError(Throwable e) { + result.setException(e); + } + + public void finish() { + result.set("ok"); + } + } + + @Test + public void testExecutorRunsNextTaskWhenPrevResponseReceived() throws Exception { + SequentialExecutorService.CallbackExecutor sequentialExecutorService = + new SequentialExecutorService.CallbackExecutor(executorProvider.getExecutor()); + AsyncTaskCallable callable1 = new AsyncTaskCallable(); + AsyncTaskCallable callable2 = new AsyncTaskCallable(); + AsyncTaskCallable callable3 = new AsyncTaskCallable(); + + ApiFuture result1 = sequentialExecutorService.submit("key", callable1); + ApiFuture result2 = sequentialExecutorService.submit("key", callable2); + ApiFuture result3 = sequentialExecutorService.submit("key", callable3); + + Thread.sleep(1000); + assertFalse(callable2.isCalled()); + assertFalse(callable3.isCalled()); + callable1.finish(); + assertEquals("ok", result1.get()); + + assertFalse(callable3.isCalled()); + callable2.finish(); + assertEquals("ok", result2.get()); + + callable3.finish(); + assertEquals("ok", result3.get()); + } + + @Test + public void testExecutorRunsDifferentKeySimultaneously() throws Exception { + SequentialExecutorService.CallbackExecutor sequentialExecutorService = + new SequentialExecutorService.CallbackExecutor(executorProvider.getExecutor()); + AsyncTaskCallable callable1 = new AsyncTaskCallable(); + AsyncTaskCallable callable2 = new AsyncTaskCallable(); + AsyncTaskCallable callable3 = new AsyncTaskCallable(); + + // Submit three tasks (two tasks with "key", and one task with "key2"). + ApiFuture result1 = sequentialExecutorService.submit("key", callable1); + ApiFuture result2 = sequentialExecutorService.submit("key", callable2); + ApiFuture result3 = sequentialExecutorService.submit("key2", callable3); + + // The task associated with "key2" can be run in parallel with other tasks with "key". + callable3.finish(); + assertEquals("ok", result3.get()); + + // Sleep some time to give the test a chance to fail. Verify that the second task has not been + // executed while the main thread is slpeeing. + Thread.sleep(100); + assertFalse(callable2.isCalled()); + // Complete the first task. + callable1.finish(); + assertEquals("ok", result1.get()); + // Now, the second task can be executed. + callable2.finish(); + assertEquals("ok", result2.get()); + } + + @Test + public void testExecutorCancelsAllTasksWhenOneFailed() throws Exception { + SequentialExecutorService.CallbackExecutor sequentialExecutorService = + new SequentialExecutorService.CallbackExecutor(executorProvider.getExecutor()); + AsyncTaskCallable callable1 = new AsyncTaskCallable(); + AsyncTaskCallable callable2 = new AsyncTaskCallable(); + AsyncTaskCallable callable3 = new AsyncTaskCallable(); + + ApiFuture result1 = sequentialExecutorService.submit("key", callable1); + ApiFuture result2 = sequentialExecutorService.submit("key", callable2); + ApiFuture result3 = sequentialExecutorService.submit("key", callable3); + + Throwable failure = new Exception("failure"); + callable1.finishWithError(failure); + // The failed task throws an exception that contains the cause of the failure. + try { + result1.get(); + fail("Should have thrown an ExecutionException"); + } catch (ExecutionException e) { + assertEquals(failure, e.getCause()); + } + // Other tasks in the queue are expected to fail with a CancellationException. + for (ApiFuture result : ImmutableList.of(result2, result3)) { + try { + result.get(); + fail("Should have thrown an ExecutionException"); + } catch (ExecutionException e) { + assertThat(e.getCause()).isInstanceOf(CancellationException.class); + } + } + } + + /** + * A task that sleeps {@code taskDurationMillis} milliseconds. Appends its {@code taskId} to + * {@code startedTasksSequence} before sleeping and appends it to {@code completedTasksSequence} + * when sleeping is done. + */ + static class SleepingSyncTask implements Runnable { + private final int taskId; + private final long taskDurationMillis; + private final LinkedHashSet startedTasksSequence; + private final LinkedHashSet completedTasksSequence; + private final CountDownLatch remainingTasksCount; + + public SleepingSyncTask( + int taskId, + long taskDurationMillis, + LinkedHashSet startedTasksSequence, + LinkedHashSet completedTasksSequence, + CountDownLatch remainingTasksCount) { + this.taskId = taskId; + this.taskDurationMillis = taskDurationMillis; + this.startedTasksSequence = startedTasksSequence; + this.completedTasksSequence = completedTasksSequence; + this.remainingTasksCount = remainingTasksCount; + } + + @Override + public void run() { + if (taskId > 0) { + // Verify that the previous task has been completed. + assertTrue(startedTasksSequence.contains(taskId - 1)); + assertTrue(completedTasksSequence.contains(taskId - 1)); + } + startedTasksSequence.add(taskId); + try { + Thread.sleep(taskDurationMillis); + } catch (InterruptedException e) { + return; + } + completedTasksSequence.add(taskId); + remainingTasksCount.countDown(); + + // Verify that the next task has not been started yet. + assertFalse(startedTasksSequence.contains(taskId + 1)); + assertFalse(completedTasksSequence.contains(taskId + 1)); + } + } + + @Test + public void SequentialExecutorRunsTasksAutomatically() throws Exception { + int numKeys = 50; + int numTasks = 50; + SequentialExecutorService.AutoExecutor sequentialExecutor = + new SequentialExecutorService.AutoExecutor(executorProvider.getExecutor()); + CountDownLatch remainingTasksCount = new CountDownLatch(numKeys * numTasks); + // Maps keys to lists of started and completed tasks. + Map> startedTasks = new HashMap<>(); + Map> completedTasks = new HashMap<>(); + + for (int i = 0; i < numKeys; i++) { + String key = "key" + i; + LinkedHashSet startedTasksSequence = new LinkedHashSet<>(); + LinkedHashSet completedTasksSequence = new LinkedHashSet<>(); + startedTasks.put(key, completedTasksSequence); + completedTasks.put(key, completedTasksSequence); + for (int taskId = 0; taskId < numTasks; taskId++) { + SleepingSyncTask task = + new SleepingSyncTask( + taskId, 5, startedTasksSequence, completedTasksSequence, remainingTasksCount); + sequentialExecutor.submit(key, task); + } + } + + remainingTasksCount.await(); + + for (int i = 0; i < numKeys; i++) { + LinkedHashSet startedTasksSequence = startedTasks.get("key" + i); + LinkedHashSet completedTasksSequence = completedTasks.get("key" + i); + // Verify that the tasks have been started and completed in order. + int expectedTaskId = 0; + Iterator it1 = startedTasksSequence.iterator(); + Iterator it2 = completedTasksSequence.iterator(); + while (it1.hasNext() && it2.hasNext()) { + assertEquals(expectedTaskId, it1.next().intValue()); + assertEquals(expectedTaskId, it2.next().intValue()); + expectedTaskId++; + } + } + } +}