Skip to content

Commit

Permalink
samples: Fix flaky DLP inspect tests (#2359)
Browse files Browse the repository at this point in the history
* refactored and increased time limit for inspect samples
  • Loading branch information
shubha-rajan authored and chingor13 committed Aug 4, 2020
1 parent c871b63 commit 8a10d65
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

// [START dlp_inspect_bigquery]

import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.SettableFuture;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.BigQueryOptions;
import com.google.privacy.dlp.v2.BigQueryTable;
Expand Down Expand Up @@ -54,19 +54,18 @@ public static void inspectBigQueryTable()
String projectId = "your-project-id";
String bigQueryDatasetId = "your-bigquery-dataset-id";
String bigQueryTableId = "your-bigquery-table-id";
String pubSubTopicId = "your-pubsub-topic-id";
String pubSubSubscriptionId = "your-pubsub-subscription-id";
inspectBigQueryTable(
projectId, bigQueryDatasetId, bigQueryTableId, pubSubTopicId, pubSubSubscriptionId);
String topicId = "your-pubsub-topic-id";
String subscriptionId = "your-pubsub-subscription-id";
inspectBigQueryTable(projectId, bigQueryDatasetId, bigQueryTableId, topicId, subscriptionId);
}

// Inspects a BigQuery Table
public static void inspectBigQueryTable(
String projectId,
String bigQueryDatasetId,
String bigQueryTableId,
String pubSubTopicId,
String pubSubSubscriptionName)
String topicId,
String subscriptionId)
throws ExecutionException, InterruptedException, IOException {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
Expand Down Expand Up @@ -98,7 +97,7 @@ public static void inspectBigQueryTable(
InspectConfig.newBuilder().addAllInfoTypes(infoTypes).setIncludeQuote(true).build();

// Specify the action that is triggered when the job completes.
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, pubSubTopicId);
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
Action.PublishToPubSub publishToPubSub =
Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
Expand All @@ -119,36 +118,32 @@ public static void inspectBigQueryTable(
.build();

// Use the client to send the request.
final DlpJob job = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + job.getName());
final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + dlpJob.getName());

// Set up a Pub/Sub subscriber to listen on the job completion status
final SettableApiFuture<Boolean> done = SettableApiFuture.create();

// Set up a Pub/Sub subscriber to listen for the job completion status
SettableFuture<Void> jobDone = SettableFuture.create();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, pubSubSubscriptionName);
MessageReceiver handleMessage =
ProjectSubscriptionName.of(projectId, subscriptionId);

MessageReceiver messageHandler =
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
jobDone.set(null);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, handleMessage).build();
subscriber.startAsync(); // Let the subscriber listen to messages
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
subscriber.startAsync();

// Wait for the original job to complete
try {
jobDone.get(10, TimeUnit.MINUTES);
done.get(15, TimeUnit.MINUTES);
} catch (TimeoutException e) {
System.out.println("Job was not completed after 10 minutes.");
System.out.println("Job was not completed after 15 minutes.");
return;
}

// Get the latest state of the job from the service
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(job.getName()).build();
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
DlpJob completedJob = dlp.getDlpJob(request);

// Parse the response and process results.
Expand All @@ -161,5 +156,20 @@ public static void inspectBigQueryTable(
}
}
}

// handleMessage injects the job and settableFuture into the message reciever interface
private static void handleMessage(
DlpJob job,
SettableApiFuture<Boolean> done,
PubsubMessage pubsubMessage,
AckReplyConsumer ackReplyConsumer) {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
done.set(true);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
}
}
// [END dlp_inspect_bigquery]
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

// [START dlp_inspect_datastore]

import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.SettableFuture;
import com.google.privacy.dlp.v2.Action;
import com.google.privacy.dlp.v2.CreateDlpJobRequest;
import com.google.privacy.dlp.v2.DatastoreOptions;
Expand Down Expand Up @@ -55,19 +55,18 @@ public static void insepctDatastoreEntity()
String projectId = "your-project-id";
String datastoreNamespace = "your-datastore-namespace";
String datastoreKind = "your-datastore-kind";
String pubSubTopicId = "your-pubsub-topic-id";
String pubSubSubscriptionId = "your-pubsub-subscription-id";
insepctDatastoreEntity(
projectId, datastoreNamespace, datastoreKind, pubSubTopicId, pubSubSubscriptionId);
String topicId = "your-pubsub-topic-id";
String subscriptionId = "your-pubsub-subscription-id";
insepctDatastoreEntity(projectId, datastoreNamespace, datastoreKind, topicId, subscriptionId);
}

// Inspects a Datastore Entity.
public static void insepctDatastoreEntity(
String projectId,
String datastoreNamespce,
String datastoreKind,
String pubSubTopicId,
String pubSubSubscriptionName)
String topicId,
String subscriptionId)
throws ExecutionException, InterruptedException, IOException {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
Expand Down Expand Up @@ -99,7 +98,7 @@ public static void insepctDatastoreEntity(
InspectConfig.newBuilder().addAllInfoTypes(infoTypes).setIncludeQuote(true).build();

// Specify the action that is triggered when the job completes.
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, pubSubTopicId);
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
Action.PublishToPubSub publishToPubSub =
Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
Expand All @@ -120,37 +119,32 @@ public static void insepctDatastoreEntity(
.build();

// Use the client to send the request.
final DlpJob job = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + job.getName());
final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + dlpJob.getName());

// Set up a Pub/Sub subscriber to listen on the job completion status
final SettableApiFuture<Boolean> done = SettableApiFuture.create();

// Set up a Pub/Sub subscriber to listen for the job completion status
SettableFuture<Void> jobDone = SettableFuture.create();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, pubSubSubscriptionName);
MessageReceiver handleMessage =
ProjectSubscriptionName.of(projectId, subscriptionId);

MessageReceiver messageHandler =
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
jobDone.set(null);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
;
}
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, handleMessage).build();
subscriber.startAsync(); // Let the subscriber listen to messages
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
subscriber.startAsync();

// Wait for the original job to complete
try {
jobDone.get(10, TimeUnit.MINUTES);
done.get(15, TimeUnit.MINUTES);
} catch (TimeoutException e) {
System.out.println("Job was not completed after 10 minutes.");
System.out.println("Job was not completed after 15 minutes.");
return;
}

// Get the latest state of the job from the service
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(job.getName()).build();
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
DlpJob completedJob = dlp.getDlpJob(request);

// Parse the response and process results.
Expand All @@ -163,5 +157,20 @@ public static void insepctDatastoreEntity(
}
}
}

// handleMessage injects the job and settableFuture into the message reciever interface
private static void handleMessage(
DlpJob job,
SettableApiFuture<Boolean> done,
PubsubMessage pubsubMessage,
AckReplyConsumer ackReplyConsumer) {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
done.set(true);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
}
}
// [END dlp_inspect_datastore]
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package dlp.snippets;

// [START dlp_inspect_gcs]

import com.google.api.core.SettableApiFuture;
import com.google.cloud.dlp.v2.DlpServiceClient;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
Expand Down Expand Up @@ -52,14 +52,14 @@ public static void inspectGcsFile() throws InterruptedException, ExecutionExcept
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String gcsUri = "gs://" + "your-bucket-name" + "/path/to/your/file.txt";
String pubSubTopicId = "your-pubsub-topic-id";
String pubSubSubscriptionId = "your-pubsub-subscription-id";
inspectGcsFile(projectId, gcsUri, pubSubTopicId, pubSubSubscriptionId);
String topicId = "your-pubsub-topic-id";
String subscriptionId = "your-pubsub-subscription-id";
inspectGcsFile(projectId, gcsUri, topicId, subscriptionId);
}

// Inspects a file in a Google Cloud Storage Bucket.
public static void inspectGcsFile(
String projectId, String gcsUri, String pubSubTopicId, String pubSubSubscriptionName)
String projectId, String gcsUri, String topicId, String subscriptionId)
throws ExecutionException, InterruptedException, IOException {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
Expand All @@ -84,7 +84,7 @@ public static void inspectGcsFile(
InspectConfig.newBuilder().addAllInfoTypes(infoTypes).setIncludeQuote(true).build();

// Specify the action that is triggered when the job completes.
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, pubSubTopicId);
String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId);
Action.PublishToPubSub publishToPubSub =
Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build();
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
Expand All @@ -105,36 +105,32 @@ public static void inspectGcsFile(
.build();

// Use the client to send the request.
final DlpJob job = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + job.getName());
final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest);
System.out.println("Job created: " + dlpJob.getName());

// Set up a Pub/Sub subscriber to listen on the job completion status
final SettableApiFuture<Boolean> done = SettableApiFuture.create();

// Set up a Pub/Sub subscriber to listen for the job completion status
SettableFuture<Void> jobDone = SettableFuture.create();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, pubSubSubscriptionName);
MessageReceiver handleMessage =
ProjectSubscriptionName.of(projectId, subscriptionId);

MessageReceiver messageHandler =
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
jobDone.set(null);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, handleMessage).build();
subscriber.startAsync(); // Let the subscriber listen to messages
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
subscriber.startAsync();

// Wait for the original job to complete
try {
jobDone.get(10, TimeUnit.MINUTES);
done.get(15, TimeUnit.MINUTES);
} catch (TimeoutException e) {
System.out.println("Job was not completed after 10 minutes.");
System.out.println("Job was not completed after 15 minutes.");
return;
}

// Get the latest state of the job from the service
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(job.getName()).build();
GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
DlpJob completedJob = dlp.getDlpJob(request);

// Parse the response and process results.
Expand All @@ -147,5 +143,20 @@ public static void inspectGcsFile(
}
}
}

// handleMessage injects the job and settableFuture into the message reciever interface
private static void handleMessage(
DlpJob job,
SettableApiFuture<Boolean> done,
PubsubMessage pubsubMessage,
AckReplyConsumer ackReplyConsumer) {
String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName");
if (job.getName().equals(messageAttribute)) {
done.set(true);
ackReplyConsumer.ack();
} else {
ackReplyConsumer.nack();
}
}
}
// [END dlp_inspect_gcs]

0 comments on commit 8a10d65

Please sign in to comment.