From ce10217e50ee693fdfc15b81915d8350ccb6fc18 Mon Sep 17 00:00:00 2001 From: Takashi Matsuo Date: Thu, 16 Apr 2020 13:40:56 -0700 Subject: [PATCH 01/11] [dlp] fix: remove gcp-devrel-py-tools fixes #3375 fixes #3416 fixes #3417 * remove wrong usage of `eventually_consistent.call` * only test if the operation has been started * shorter timeout for polling * correct use of `pytest.mark.flaky` --- dlp/inspect_content.py | 3 + dlp/inspect_content_test.py | 129 +++++++++++++++++++++--------------- dlp/jobs_test.py | 3 +- dlp/requirements-test.txt | 1 - dlp/risk_test.py | 24 +++---- 5 files changed, 89 insertions(+), 71 deletions(-) diff --git a/dlp/inspect_content.py b/dlp/inspect_content.py index b1e97ae31475..9e50ecbdf79d 100644 --- a/dlp/inspect_content.py +++ b/dlp/inspect_content.py @@ -474,6 +474,7 @@ def inspect_gcs_file( operation = dlp.create_dlp_job(parent, inspect_job=inspect_job) print("Inspection operation started: {}".format(operation.name)) + # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() @@ -636,6 +637,7 @@ def inspect_datastore( } operation = dlp.create_dlp_job(parent, inspect_job=inspect_job) + print("Inspection operation started: {}".format(operation.name)) # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. @@ -802,6 +804,7 @@ def inspect_bigquery( } operation = dlp.create_dlp_job(parent, inspect_job=inspect_job) + print("Inspection operation started: {}".format(operation.name)) # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. diff --git a/dlp/inspect_content_test.py b/dlp/inspect_content_test.py index ad493ecce710..be2b821044ee 100644 --- a/dlp/inspect_content_test.py +++ b/dlp/inspect_content_test.py @@ -15,8 +15,6 @@ import os import uuid -from gcp_devrel.testing import eventually_consistent -from gcp_devrel.testing.flaky import flaky import google.api_core.exceptions import google.cloud.bigquery import google.cloud.datastore @@ -24,10 +22,11 @@ import google.cloud.exceptions import google.cloud.pubsub import google.cloud.storage - import pytest + import inspect_content + UNIQUE_STRING = str(uuid.uuid4()).split("-")[0] GCLOUD_PROJECT = os.getenv("GCLOUD_PROJECT") @@ -95,7 +94,8 @@ def subscription_id(topic_id): # Subscribes to a topic. subscriber = google.cloud.pubsub.SubscriberClient() topic_path = subscriber.topic_path(GCLOUD_PROJECT, topic_id) - subscription_path = subscriber.subscription_path(GCLOUD_PROJECT, SUBSCRIPTION_ID) + subscription_path = subscriber.subscription_path( + GCLOUD_PROJECT, SUBSCRIPTION_ID) try: subscriber.create_subscription(subscription_path, topic_path) except google.api_core.exceptions.AlreadyExists: @@ -297,21 +297,21 @@ def test_inspect_gcs_file(bucket, topic_id, subscription_id, capsys): topic_id, subscription_id, ["EMAIL_ADDRESS", "PHONE_NUMBER"], - timeout=420, + timeout=1 ) out, _ = capsys.readouterr() assert "Inspection operation started" in out # Cancel the operation - operation_id = out.split("Inspection operation started: ")[1].split("\n")[0] + operation_id = out.split( + "Inspection operation started: ")[1].split("\n")[0] print(operation_id) client = google.cloud.dlp_v2.DlpServiceClient() client.cancel_dlp_job(operation_id) def test_inspect_gcs_file_with_custom_info_types( - bucket, topic_id, subscription_id, capsys -): + bucket, topic_id, subscription_id, capsys): dictionaries = ["gary@somedomain.com"] regexes = ["\\(\\d{3}\\) \\d{3}-\\d{4}"] @@ -324,20 +324,21 @@ def test_inspect_gcs_file_with_custom_info_types( [], custom_dictionaries=dictionaries, custom_regexes=regexes, - timeout=420, - ) + timeout=1) out, _ = capsys.readouterr() assert "Inspection operation started" in out # Cancel the operation - operation_id = out.split("Inspection operation started: ")[1].split("\n")[0] + operation_id = out.split( + "Inspection operation started: ")[1].split("\n")[0] print(operation_id) client = google.cloud.dlp_v2.DlpServiceClient() client.cancel_dlp_job(operation_id) -def test_inspect_gcs_file_no_results(bucket, topic_id, subscription_id, capsys): +def test_inspect_gcs_file_no_results( + bucket, topic_id, subscription_id, capsys): inspect_content.inspect_gcs_file( GCLOUD_PROJECT, bucket.name, @@ -345,20 +346,19 @@ def test_inspect_gcs_file_no_results(bucket, topic_id, subscription_id, capsys): topic_id, subscription_id, ["EMAIL_ADDRESS", "PHONE_NUMBER"], - timeout=420, - ) + timeout=1) out, _ = capsys.readouterr() assert "Inspection operation started" in out # Cancel the operation - operation_id = out.split("Inspection operation started: ")[1].split("\n")[0] + operation_id = out.split( + "Inspection operation started: ")[1].split("\n")[0] print(operation_id) client = google.cloud.dlp_v2.DlpServiceClient() client.cancel_dlp_job(operation_id) -@pytest.mark.skip(reason="nondeterministically failing") def test_inspect_gcs_image_file(bucket, topic_id, subscription_id, capsys): inspect_content.inspect_gcs_file( GCLOUD_PROJECT, @@ -367,10 +367,16 @@ def test_inspect_gcs_image_file(bucket, topic_id, subscription_id, capsys): topic_id, subscription_id, ["EMAIL_ADDRESS", "PHONE_NUMBER"], - ) + timeout=1) out, _ = capsys.readouterr() - assert "Info type: EMAIL_ADDRESS" in out + assert "Inspection operation started" in out + # Cancel the operation + operation_id = out.split( + "Inspection operation started: ")[1].split("\n")[0] + print(operation_id) + client = google.cloud.dlp_v2.DlpServiceClient() + client.cancel_dlp_job(operation_id) def test_inspect_gcs_multiple_files(bucket, topic_id, subscription_id, capsys): @@ -381,55 +387,62 @@ def test_inspect_gcs_multiple_files(bucket, topic_id, subscription_id, capsys): topic_id, subscription_id, ["EMAIL_ADDRESS", "PHONE_NUMBER"], - ) + timeout=1) out, _ = capsys.readouterr() assert "Inspection operation started" in out # Cancel the operation - operation_id = out.split("Inspection operation started: ")[1].split("\n")[0] + operation_id = out.split( + "Inspection operation started: ")[1].split("\n")[0] print(operation_id) client = google.cloud.dlp_v2.DlpServiceClient() client.cancel_dlp_job(operation_id) -@flaky -def test_inspect_datastore(datastore_project, topic_id, subscription_id, capsys): - @eventually_consistent.call - def _(): - inspect_content.inspect_datastore( - GCLOUD_PROJECT, - datastore_project, - DATASTORE_KIND, - topic_id, - subscription_id, - ["FIRST_NAME", "EMAIL_ADDRESS", "PHONE_NUMBER"], - ) +def test_inspect_datastore( + datastore_project, topic_id, subscription_id, capsys): + inspect_content.inspect_datastore( + GCLOUD_PROJECT, + datastore_project, + DATASTORE_KIND, + topic_id, + subscription_id, + ["FIRST_NAME", "EMAIL_ADDRESS", "PHONE_NUMBER"], + timeout=1) - out, _ = capsys.readouterr() - assert "Info type: EMAIL_ADDRESS" in out + out, _ = capsys.readouterr() + assert "Inspection operation started" in out + # Cancel the operation + operation_id = out.split( + "Inspection operation started: ")[1].split("\n")[0] + print(operation_id) + client = google.cloud.dlp_v2.DlpServiceClient() + client.cancel_dlp_job(operation_id) -@flaky +# @pytest.mark.skip(reason="too slow") def test_inspect_datastore_no_results( - datastore_project, topic_id, subscription_id, capsys -): - @eventually_consistent.call - def _(): - inspect_content.inspect_datastore( - GCLOUD_PROJECT, - datastore_project, - DATASTORE_KIND, - topic_id, - subscription_id, - ["PHONE_NUMBER"], - ) - - out, _ = capsys.readouterr() - assert "No findings" in out - - -@pytest.mark.skip(reason="unknown issue") + datastore_project, topic_id, subscription_id, capsys): + inspect_content.inspect_datastore( + GCLOUD_PROJECT, + datastore_project, + DATASTORE_KIND, + topic_id, + subscription_id, + ["PHONE_NUMBER"], + timeout=1) + + out, _ = capsys.readouterr() + assert "Inspection operation started" in out + # Cancel the operation + operation_id = out.split( + "Inspection operation started: ")[1].split("\n")[0] + print(operation_id) + client = google.cloud.dlp_v2.DlpServiceClient() + client.cancel_dlp_job(operation_id) + + def test_inspect_bigquery(bigquery_project, topic_id, subscription_id, capsys): inspect_content.inspect_bigquery( GCLOUD_PROJECT, @@ -439,7 +452,13 @@ def test_inspect_bigquery(bigquery_project, topic_id, subscription_id, capsys): topic_id, subscription_id, ["FIRST_NAME", "EMAIL_ADDRESS", "PHONE_NUMBER"], - ) + timeout=1) out, _ = capsys.readouterr() - assert "Info type: FIRST_NAME" in out + assert "Inspection operation started" in out + # Cancel the operation + operation_id = out.split( + "Inspection operation started: ")[1].split("\n")[0] + print(operation_id) + client = google.cloud.dlp_v2.DlpServiceClient() + client.cancel_dlp_job(operation_id) diff --git a/dlp/jobs_test.py b/dlp/jobs_test.py index 98acb7464e38..5e53d71b2542 100644 --- a/dlp/jobs_test.py +++ b/dlp/jobs_test.py @@ -13,7 +13,6 @@ # limitations under the License. import os -from flaky import flaky import pytest @@ -66,7 +65,7 @@ def test_list_dlp_jobs(test_job_name, capsys): assert test_job_name not in out -@flaky +@pytest.mark.flaky def test_list_dlp_jobs_with_filter(test_job_name, capsys): jobs.list_dlp_jobs( GCLOUD_PROJECT, diff --git a/dlp/requirements-test.txt b/dlp/requirements-test.txt index d1ad7a9fd107..470977bf2ca8 100644 --- a/dlp/requirements-test.txt +++ b/dlp/requirements-test.txt @@ -1,4 +1,3 @@ pytest==5.3.2 -gcp-devrel-py-tools==0.0.15 flaky==3.6.1 mock==3.0.5 diff --git a/dlp/risk_test.py b/dlp/risk_test.py index 41b514f4da74..5f172bcbc8d2 100644 --- a/dlp/risk_test.py +++ b/dlp/risk_test.py @@ -12,14 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from flaky import flaky +import os import uuid import google.cloud.pubsub import google.cloud.bigquery - import pytest -import os import risk @@ -160,7 +158,7 @@ def bigquery_project(): bigquery_client.delete_dataset(dataset_ref, delete_contents=True) -@flaky +@pytest.mark.flaky def test_numerical_risk_analysis( topic_id, subscription_id, bigquery_project, capsys ): @@ -178,7 +176,7 @@ def test_numerical_risk_analysis( assert "Value Range:" in out -@flaky +@pytest.mark.flaky def test_categorical_risk_analysis_on_string_field( topic_id, subscription_id, bigquery_project, capsys ): @@ -197,7 +195,7 @@ def test_categorical_risk_analysis_on_string_field( assert "Most common value occurs" in out -@flaky +@pytest.mark.flaky def test_categorical_risk_analysis_on_number_field( topic_id, subscription_id, bigquery_project, capsys ): @@ -215,7 +213,7 @@ def test_categorical_risk_analysis_on_number_field( assert "Most common value occurs" in out -@flaky +@pytest.mark.flaky def test_k_anonymity_analysis_single_field( topic_id, subscription_id, bigquery_project, capsys ): @@ -234,7 +232,7 @@ def test_k_anonymity_analysis_single_field( assert "Class size:" in out -@flaky +@pytest.mark.flaky def test_k_anonymity_analysis_multiple_fields( topic_id, subscription_id, bigquery_project, capsys ): @@ -253,7 +251,7 @@ def test_k_anonymity_analysis_multiple_fields( assert "Class size:" in out -@flaky +@pytest.mark.flaky def test_l_diversity_analysis_single_field( topic_id, subscription_id, bigquery_project, capsys ): @@ -274,7 +272,7 @@ def test_l_diversity_analysis_single_field( assert "Sensitive value" in out -@flaky +@pytest.mark.flaky def test_l_diversity_analysis_multiple_field( topic_id, subscription_id, bigquery_project, capsys ): @@ -295,7 +293,7 @@ def test_l_diversity_analysis_multiple_field( assert "Sensitive value" in out -@flaky +@pytest.mark.flaky def test_k_map_estimate_analysis_single_field( topic_id, subscription_id, bigquery_project, capsys ): @@ -316,7 +314,7 @@ def test_k_map_estimate_analysis_single_field( assert "Values" in out -@flaky +@pytest.mark.flaky def test_k_map_estimate_analysis_multiple_field( topic_id, subscription_id, bigquery_project, capsys ): @@ -337,7 +335,7 @@ def test_k_map_estimate_analysis_multiple_field( assert "Values" in out -@flaky +@pytest.mark.flaky def test_k_map_estimate_analysis_quasi_ids_info_types_equal( topic_id, subscription_id, bigquery_project ): From b4b384981979645f8421f1025729457d1c3a442d Mon Sep 17 00:00:00 2001 From: Takashi Matsuo Date: Thu, 16 Apr 2020 13:53:24 -0700 Subject: [PATCH 02/11] Remove stale comment --- dlp/inspect_content_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dlp/inspect_content_test.py b/dlp/inspect_content_test.py index be2b821044ee..f7267c14a97f 100644 --- a/dlp/inspect_content_test.py +++ b/dlp/inspect_content_test.py @@ -421,7 +421,6 @@ def test_inspect_datastore( client.cancel_dlp_job(operation_id) -# @pytest.mark.skip(reason="too slow") def test_inspect_datastore_no_results( datastore_project, topic_id, subscription_id, capsys): inspect_content.inspect_datastore( From f447169db362b641a56b014b08b1bc9c4f973cdd Mon Sep 17 00:00:00 2001 From: Takashi Matsuo Date: Thu, 16 Apr 2020 14:58:40 -0700 Subject: [PATCH 03/11] use try-finally --- dlp/inspect_content_test.py | 259 +++++++++++++++++------------------- 1 file changed, 122 insertions(+), 137 deletions(-) diff --git a/dlp/inspect_content_test.py b/dlp/inspect_content_test.py index f7267c14a97f..ea100d16d84a 100644 --- a/dlp/inspect_content_test.py +++ b/dlp/inspect_content_test.py @@ -289,175 +289,160 @@ def test_inspect_image_file(capsys): assert "Info type: PHONE_NUMBER" in out -def test_inspect_gcs_file(bucket, topic_id, subscription_id, capsys): - inspect_content.inspect_gcs_file( - GCLOUD_PROJECT, - bucket.name, - "test.txt", - topic_id, - subscription_id, - ["EMAIL_ADDRESS", "PHONE_NUMBER"], - timeout=1 - ) +def cancel_operation(out): + if "Inspection operation started" in out: + # Cancel the operation + operation_id = out.split( + "Inspection operation started: ")[1].split("\n")[0] + client = google.cloud.dlp_v2.DlpServiceClient() + client.cancel_dlp_job(operation_id) - out, _ = capsys.readouterr() - assert "Inspection operation started" in out - # Cancel the operation - operation_id = out.split( - "Inspection operation started: ")[1].split("\n")[0] - print(operation_id) - client = google.cloud.dlp_v2.DlpServiceClient() - client.cancel_dlp_job(operation_id) + +def test_inspect_gcs_file(bucket, topic_id, subscription_id, capsys): + try: + inspect_content.inspect_gcs_file( + GCLOUD_PROJECT, + bucket.name, + "test.txt", + topic_id, + subscription_id, + ["EMAIL_ADDRESS", "PHONE_NUMBER"], + timeout=1 + ) + + out, _ = capsys.readouterr() + assert "Inspection operation started" in out + finally: + cancel_operation(out) def test_inspect_gcs_file_with_custom_info_types( - bucket, topic_id, subscription_id, capsys): - dictionaries = ["gary@somedomain.com"] - regexes = ["\\(\\d{3}\\) \\d{3}-\\d{4}"] + bucket, topic_id, subscription_id, capsys): + try: + dictionaries = ["gary@somedomain.com"] + regexes = ["\\(\\d{3}\\) \\d{3}-\\d{4}"] - inspect_content.inspect_gcs_file( - GCLOUD_PROJECT, - bucket.name, - "test.txt", - topic_id, - subscription_id, - [], - custom_dictionaries=dictionaries, - custom_regexes=regexes, - timeout=1) + inspect_content.inspect_gcs_file( + GCLOUD_PROJECT, + bucket.name, + "test.txt", + topic_id, + subscription_id, + [], + custom_dictionaries=dictionaries, + custom_regexes=regexes, + timeout=1) - out, _ = capsys.readouterr() + out, _ = capsys.readouterr() - assert "Inspection operation started" in out - # Cancel the operation - operation_id = out.split( - "Inspection operation started: ")[1].split("\n")[0] - print(operation_id) - client = google.cloud.dlp_v2.DlpServiceClient() - client.cancel_dlp_job(operation_id) + assert "Inspection operation started" in out + finally: + cancel_operation(out) def test_inspect_gcs_file_no_results( bucket, topic_id, subscription_id, capsys): - inspect_content.inspect_gcs_file( - GCLOUD_PROJECT, - bucket.name, - "harmless.txt", - topic_id, - subscription_id, - ["EMAIL_ADDRESS", "PHONE_NUMBER"], - timeout=1) + try: + inspect_content.inspect_gcs_file( + GCLOUD_PROJECT, + bucket.name, + "harmless.txt", + topic_id, + subscription_id, + ["EMAIL_ADDRESS", "PHONE_NUMBER"], + timeout=1) - out, _ = capsys.readouterr() + out, _ = capsys.readouterr() - assert "Inspection operation started" in out - # Cancel the operation - operation_id = out.split( - "Inspection operation started: ")[1].split("\n")[0] - print(operation_id) - client = google.cloud.dlp_v2.DlpServiceClient() - client.cancel_dlp_job(operation_id) + assert "Inspection operation started" in out + finally: + cancel_operation(out) def test_inspect_gcs_image_file(bucket, topic_id, subscription_id, capsys): - inspect_content.inspect_gcs_file( - GCLOUD_PROJECT, - bucket.name, - "test.png", - topic_id, - subscription_id, - ["EMAIL_ADDRESS", "PHONE_NUMBER"], - timeout=1) + try: + inspect_content.inspect_gcs_file( + GCLOUD_PROJECT, + bucket.name, + "test.png", + topic_id, + subscription_id, + ["EMAIL_ADDRESS", "PHONE_NUMBER"], + timeout=1) - out, _ = capsys.readouterr() - assert "Inspection operation started" in out - # Cancel the operation - operation_id = out.split( - "Inspection operation started: ")[1].split("\n")[0] - print(operation_id) - client = google.cloud.dlp_v2.DlpServiceClient() - client.cancel_dlp_job(operation_id) + out, _ = capsys.readouterr() + assert "Inspection operation started" in out + finally: + cancel_operation(out) def test_inspect_gcs_multiple_files(bucket, topic_id, subscription_id, capsys): - inspect_content.inspect_gcs_file( - GCLOUD_PROJECT, - bucket.name, - "*", - topic_id, - subscription_id, - ["EMAIL_ADDRESS", "PHONE_NUMBER"], - timeout=1) + try: + inspect_content.inspect_gcs_file( + GCLOUD_PROJECT, + bucket.name, + "*", + topic_id, + subscription_id, + ["EMAIL_ADDRESS", "PHONE_NUMBER"], + timeout=1) - out, _ = capsys.readouterr() + out, _ = capsys.readouterr() - assert "Inspection operation started" in out - # Cancel the operation - operation_id = out.split( - "Inspection operation started: ")[1].split("\n")[0] - print(operation_id) - client = google.cloud.dlp_v2.DlpServiceClient() - client.cancel_dlp_job(operation_id) + assert "Inspection operation started" in out + finally: + cancel_operation(out) def test_inspect_datastore( datastore_project, topic_id, subscription_id, capsys): - inspect_content.inspect_datastore( - GCLOUD_PROJECT, - datastore_project, - DATASTORE_KIND, - topic_id, - subscription_id, - ["FIRST_NAME", "EMAIL_ADDRESS", "PHONE_NUMBER"], - timeout=1) + try: + inspect_content.inspect_datastore( + GCLOUD_PROJECT, + datastore_project, + DATASTORE_KIND, + topic_id, + subscription_id, + ["FIRST_NAME", "EMAIL_ADDRESS", "PHONE_NUMBER"], + timeout=1) - out, _ = capsys.readouterr() - assert "Inspection operation started" in out - # Cancel the operation - operation_id = out.split( - "Inspection operation started: ")[1].split("\n")[0] - print(operation_id) - client = google.cloud.dlp_v2.DlpServiceClient() - client.cancel_dlp_job(operation_id) + out, _ = capsys.readouterr() + assert "Inspection operation started" in out + finally: + cancel_operation(out) def test_inspect_datastore_no_results( datastore_project, topic_id, subscription_id, capsys): - inspect_content.inspect_datastore( - GCLOUD_PROJECT, - datastore_project, - DATASTORE_KIND, - topic_id, - subscription_id, - ["PHONE_NUMBER"], - timeout=1) + try: + inspect_content.inspect_datastore( + GCLOUD_PROJECT, + datastore_project, + DATASTORE_KIND, + topic_id, + subscription_id, + ["PHONE_NUMBER"], + timeout=1) - out, _ = capsys.readouterr() - assert "Inspection operation started" in out - # Cancel the operation - operation_id = out.split( - "Inspection operation started: ")[1].split("\n")[0] - print(operation_id) - client = google.cloud.dlp_v2.DlpServiceClient() - client.cancel_dlp_job(operation_id) + out, _ = capsys.readouterr() + assert "Inspection operation started" in out + finally: + cancel_operation(out) def test_inspect_bigquery(bigquery_project, topic_id, subscription_id, capsys): - inspect_content.inspect_bigquery( - GCLOUD_PROJECT, - bigquery_project, - BIGQUERY_DATASET_ID, - BIGQUERY_TABLE_ID, - topic_id, - subscription_id, - ["FIRST_NAME", "EMAIL_ADDRESS", "PHONE_NUMBER"], - timeout=1) - - out, _ = capsys.readouterr() - assert "Inspection operation started" in out - # Cancel the operation - operation_id = out.split( - "Inspection operation started: ")[1].split("\n")[0] - print(operation_id) - client = google.cloud.dlp_v2.DlpServiceClient() - client.cancel_dlp_job(operation_id) + try: + inspect_content.inspect_bigquery( + GCLOUD_PROJECT, + bigquery_project, + BIGQUERY_DATASET_ID, + BIGQUERY_TABLE_ID, + topic_id, + subscription_id, + ["FIRST_NAME", "EMAIL_ADDRESS", "PHONE_NUMBER"], + timeout=1) + + out, _ = capsys.readouterr() + assert "Inspection operation started" in out + finally: + cancel_operation(out) From dc818d506f457b81d7198eec0f46b07cf24e93f8 Mon Sep 17 00:00:00 2001 From: Takashi Matsuo Date: Thu, 16 Apr 2020 14:59:41 -0700 Subject: [PATCH 04/11] increased max_runs --- dlp/jobs_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dlp/jobs_test.py b/dlp/jobs_test.py index 5e53d71b2542..d99ef2f51f60 100644 --- a/dlp/jobs_test.py +++ b/dlp/jobs_test.py @@ -58,6 +58,7 @@ def test_job_name(): print("Issue during teardown, missing job") +@pytest.mark.flaky(max_runs=5, min_passes=1) def test_list_dlp_jobs(test_job_name, capsys): jobs.list_dlp_jobs(GCLOUD_PROJECT) @@ -65,7 +66,7 @@ def test_list_dlp_jobs(test_job_name, capsys): assert test_job_name not in out -@pytest.mark.flaky +@pytest.mark.flaky(max_runs=5, min_passes=1) def test_list_dlp_jobs_with_filter(test_job_name, capsys): jobs.list_dlp_jobs( GCLOUD_PROJECT, From 61c745f21ecf0d27cf691dda1d38066b68c01150 Mon Sep 17 00:00:00 2001 From: Takashi Matsuo Date: Thu, 16 Apr 2020 15:52:55 -0700 Subject: [PATCH 05/11] add retry on some jobs tests --- dlp/jobs_test.py | 43 +++++++++++++++++++++++++++------------ dlp/requirements-test.txt | 1 + 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/dlp/jobs_test.py b/dlp/jobs_test.py index d99ef2f51f60..4d70eb15f20f 100644 --- a/dlp/jobs_test.py +++ b/dlp/jobs_test.py @@ -15,6 +15,7 @@ import os import pytest +from retrying import retry import jobs @@ -58,24 +59,40 @@ def test_job_name(): print("Issue during teardown, missing job") -@pytest.mark.flaky(max_runs=5, min_passes=1) +def retry_on_assertion_error(exception): + return isinstance(exception, (AssertionError)) + + def test_list_dlp_jobs(test_job_name, capsys): - jobs.list_dlp_jobs(GCLOUD_PROJECT) + @retry( + wait_exponential_multiplier=1000, + wait_exponential_max=10000, + stop_max_attempt_number=5, + retry_on_exception=retry_on_assertion_error) + def test_real(): + jobs.list_dlp_jobs(GCLOUD_PROJECT) - out, _ = capsys.readouterr() - assert test_job_name not in out + out, _ = capsys.readouterr() + assert test_job_name not in out + test_real() -@pytest.mark.flaky(max_runs=5, min_passes=1) def test_list_dlp_jobs_with_filter(test_job_name, capsys): - jobs.list_dlp_jobs( - GCLOUD_PROJECT, - filter_string="state=RUNNING", - job_type="RISK_ANALYSIS_JOB", - ) - - out, _ = capsys.readouterr() - assert test_job_name in out + @retry( + wait_exponential_multiplier=1000, + wait_exponential_max=10000, + stop_max_attempt_number=5, + retry_on_exception=retry_on_assertion_error) + def test_real(): + jobs.list_dlp_jobs( + GCLOUD_PROJECT, + filter_string="state=RUNNING", + job_type="RISK_ANALYSIS_JOB", + ) + + out, _ = capsys.readouterr() + assert test_job_name in out + test_real() def test_list_dlp_jobs_with_job_type(test_job_name, capsys): diff --git a/dlp/requirements-test.txt b/dlp/requirements-test.txt index 470977bf2ca8..ee755d17b520 100644 --- a/dlp/requirements-test.txt +++ b/dlp/requirements-test.txt @@ -1,3 +1,4 @@ pytest==5.3.2 flaky==3.6.1 mock==3.0.5 +retrying==1.3.3 From 9483d9ace05c99a0dfdbe0305cdfffbd8d02caf1 Mon Sep 17 00:00:00 2001 From: Takashi Matsuo Date: Thu, 16 Apr 2020 16:41:45 -0700 Subject: [PATCH 06/11] use uuid for job_id --- dlp/jobs_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dlp/jobs_test.py b/dlp/jobs_test.py index 4d70eb15f20f..36c5e6d31862 100644 --- a/dlp/jobs_test.py +++ b/dlp/jobs_test.py @@ -13,6 +13,7 @@ # limitations under the License. import os +import uuid import pytest from retrying import retry @@ -24,6 +25,7 @@ TEST_TABLE_PROJECT_ID = "bigquery-public-data" TEST_DATASET_ID = "san_francisco" TEST_TABLE_ID = "bikeshare_trips" +test_job_id = "test-job-{}".format(uuid.uuid4()) @pytest.fixture(scope="module") @@ -46,7 +48,7 @@ def test_job_name(): }, } - response = dlp.create_dlp_job(parent, risk_job=risk_job) + response = dlp.create_dlp_job(parent, risk_job=risk_job, job_id=test_job_id) full_path = response.name # API expects only job name, not full project path job_name = full_path[full_path.rfind("/") + 1:] From c7571e85ceb59f60a71dd51399eef2dd03f67fb9 Mon Sep 17 00:00:00 2001 From: Takashi Matsuo Date: Thu, 16 Apr 2020 16:52:11 -0700 Subject: [PATCH 07/11] remove retrying --- dlp/jobs_test.py | 41 +++++++++++---------------------------- dlp/requirements-test.txt | 1 - 2 files changed, 11 insertions(+), 31 deletions(-) diff --git a/dlp/jobs_test.py b/dlp/jobs_test.py index 36c5e6d31862..14e1e2658fe9 100644 --- a/dlp/jobs_test.py +++ b/dlp/jobs_test.py @@ -16,7 +16,6 @@ import uuid import pytest -from retrying import retry import jobs @@ -61,40 +60,22 @@ def test_job_name(): print("Issue during teardown, missing job") -def retry_on_assertion_error(exception): - return isinstance(exception, (AssertionError)) - - def test_list_dlp_jobs(test_job_name, capsys): - @retry( - wait_exponential_multiplier=1000, - wait_exponential_max=10000, - stop_max_attempt_number=5, - retry_on_exception=retry_on_assertion_error) - def test_real(): - jobs.list_dlp_jobs(GCLOUD_PROJECT) + jobs.list_dlp_jobs(GCLOUD_PROJECT) - out, _ = capsys.readouterr() - assert test_job_name not in out - test_real() + out, _ = capsys.readouterr() + assert test_job_name not in out def test_list_dlp_jobs_with_filter(test_job_name, capsys): - @retry( - wait_exponential_multiplier=1000, - wait_exponential_max=10000, - stop_max_attempt_number=5, - retry_on_exception=retry_on_assertion_error) - def test_real(): - jobs.list_dlp_jobs( - GCLOUD_PROJECT, - filter_string="state=RUNNING", - job_type="RISK_ANALYSIS_JOB", - ) - - out, _ = capsys.readouterr() - assert test_job_name in out - test_real() + jobs.list_dlp_jobs( + GCLOUD_PROJECT, + filter_string="state=RUNNING", + job_type="RISK_ANALYSIS_JOB", + ) + + out, _ = capsys.readouterr() + assert test_job_name in out def test_list_dlp_jobs_with_job_type(test_job_name, capsys): diff --git a/dlp/requirements-test.txt b/dlp/requirements-test.txt index ee755d17b520..470977bf2ca8 100644 --- a/dlp/requirements-test.txt +++ b/dlp/requirements-test.txt @@ -1,4 +1,3 @@ pytest==5.3.2 flaky==3.6.1 mock==3.0.5 -retrying==1.3.3 From bade67333db68291e4f1f899ea0b40a6a245b01f Mon Sep 17 00:00:00 2001 From: Takashi Matsuo Date: Thu, 16 Apr 2020 17:03:09 -0700 Subject: [PATCH 08/11] marking some tests as flaky --- dlp/jobs_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dlp/jobs_test.py b/dlp/jobs_test.py index 14e1e2658fe9..915a2b9b327e 100644 --- a/dlp/jobs_test.py +++ b/dlp/jobs_test.py @@ -60,6 +60,7 @@ def test_job_name(): print("Issue during teardown, missing job") +@pytest.mark.flaky(max_runs=5, min_passes=1) def test_list_dlp_jobs(test_job_name, capsys): jobs.list_dlp_jobs(GCLOUD_PROJECT) @@ -67,6 +68,7 @@ def test_list_dlp_jobs(test_job_name, capsys): assert test_job_name not in out +@pytest.mark.flaky(max_runs=5, min_passes=1) def test_list_dlp_jobs_with_filter(test_job_name, capsys): jobs.list_dlp_jobs( GCLOUD_PROJECT, From a45932daed960891b0ca86d2db1af55848465126 Mon Sep 17 00:00:00 2001 From: Takashi Matsuo Date: Thu, 16 Apr 2020 17:38:03 -0700 Subject: [PATCH 09/11] use backoff --- dlp/jobs_test.py | 35 ++++++++++++++++++++++------------- dlp/requirements-test.txt | 1 + 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/dlp/jobs_test.py b/dlp/jobs_test.py index 915a2b9b327e..c21fcfa37b5c 100644 --- a/dlp/jobs_test.py +++ b/dlp/jobs_test.py @@ -15,6 +15,7 @@ import os import uuid +import backoff import pytest import jobs @@ -60,24 +61,32 @@ def test_job_name(): print("Issue during teardown, missing job") -@pytest.mark.flaky(max_runs=5, min_passes=1) def test_list_dlp_jobs(test_job_name, capsys): - jobs.list_dlp_jobs(GCLOUD_PROJECT) + @backoff.on_exception(backoff.expo, + AssertionError, + max_time=60) + def test_real(): + jobs.list_dlp_jobs(GCLOUD_PROJECT) - out, _ = capsys.readouterr() - assert test_job_name not in out + out, _ = capsys.readouterr() + assert test_job_name not in out + test_real() -@pytest.mark.flaky(max_runs=5, min_passes=1) def test_list_dlp_jobs_with_filter(test_job_name, capsys): - jobs.list_dlp_jobs( - GCLOUD_PROJECT, - filter_string="state=RUNNING", - job_type="RISK_ANALYSIS_JOB", - ) - - out, _ = capsys.readouterr() - assert test_job_name in out + @backoff.on_exception(backoff.expo, + AssertionError, + max_time=60) + def test_real(): + jobs.list_dlp_jobs( + GCLOUD_PROJECT, + filter_string="state=RUNNING", + job_type="RISK_ANALYSIS_JOB", + ) + + out, _ = capsys.readouterr() + assert test_job_name in out + test_real() def test_list_dlp_jobs_with_job_type(test_job_name, capsys): diff --git a/dlp/requirements-test.txt b/dlp/requirements-test.txt index 470977bf2ca8..804438a024e0 100644 --- a/dlp/requirements-test.txt +++ b/dlp/requirements-test.txt @@ -1,3 +1,4 @@ pytest==5.3.2 flaky==3.6.1 mock==3.0.5 +backoff==1.10.0 From a3738eac3d5d9d2a2c8005a4951942f5bb52a7b3 Mon Sep 17 00:00:00 2001 From: Takashi Matsuo Date: Thu, 16 Apr 2020 18:05:30 -0700 Subject: [PATCH 10/11] add a filter to allow state = DONE --- dlp/jobs_test.py | 33 +++++++++++---------------------- dlp/requirements-test.txt | 2 +- 2 files changed, 12 insertions(+), 23 deletions(-) diff --git a/dlp/jobs_test.py b/dlp/jobs_test.py index c21fcfa37b5c..b3910dd5b330 100644 --- a/dlp/jobs_test.py +++ b/dlp/jobs_test.py @@ -15,7 +15,6 @@ import os import uuid -import backoff import pytest import jobs @@ -62,31 +61,21 @@ def test_job_name(): def test_list_dlp_jobs(test_job_name, capsys): - @backoff.on_exception(backoff.expo, - AssertionError, - max_time=60) - def test_real(): - jobs.list_dlp_jobs(GCLOUD_PROJECT) + jobs.list_dlp_jobs(GCLOUD_PROJECT) - out, _ = capsys.readouterr() - assert test_job_name not in out - test_real() + out, _ = capsys.readouterr() + assert test_job_name not in out def test_list_dlp_jobs_with_filter(test_job_name, capsys): - @backoff.on_exception(backoff.expo, - AssertionError, - max_time=60) - def test_real(): - jobs.list_dlp_jobs( - GCLOUD_PROJECT, - filter_string="state=RUNNING", - job_type="RISK_ANALYSIS_JOB", - ) - - out, _ = capsys.readouterr() - assert test_job_name in out - test_real() + jobs.list_dlp_jobs( + GCLOUD_PROJECT, + filter_string="state=RUNNING OR state=DONE", + job_type="RISK_ANALYSIS_JOB", + ) + + out, _ = capsys.readouterr() + assert test_job_name in out def test_list_dlp_jobs_with_job_type(test_job_name, capsys): diff --git a/dlp/requirements-test.txt b/dlp/requirements-test.txt index 804438a024e0..5776d6748130 100644 --- a/dlp/requirements-test.txt +++ b/dlp/requirements-test.txt @@ -1,4 +1,4 @@ pytest==5.3.2 flaky==3.6.1 mock==3.0.5 -backoff==1.10.0 + From 99448825a4b42c66f2b8e98bd2b41e4a19001ede Mon Sep 17 00:00:00 2001 From: Takashi Matsuo Date: Fri, 17 Apr 2020 10:36:26 -0700 Subject: [PATCH 11/11] move stuff in order --- dlp/risk.py | 332 ++++++++++++++++++++++++++-------------------------- 1 file changed, 166 insertions(+), 166 deletions(-) diff --git a/dlp/risk.py b/dlp/risk.py index 386f05c0d73d..a31dfb12c6ef 100644 --- a/dlp/risk.py +++ b/dlp/risk.py @@ -56,31 +56,6 @@ def numerical_risk_analysis( # potentially long-running operations. import google.cloud.pubsub - def callback(message): - if message.attributes["DlpJobName"] == operation.name: - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - results = job.risk_details.numerical_stats_result - print( - "Value Range: [{}, {}]".format( - results.min_value.integer_value, - results.max_value.integer_value, - ) - ) - prev_value = None - for percent, result in enumerate(results.quantile_values): - value = result.integer_value - if prev_value != value: - print("Value at {}% quantile: {}".format(percent, value)) - prev_value = value - subscription.set_result(None) - else: - # This is not the message we're looking for. - message.drop() - # Instantiate a client. dlp = google.cloud.dlp_v2.DlpServiceClient() @@ -107,15 +82,40 @@ def callback(message): "actions": actions, } + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) + + def callback(message): + if message.attributes["DlpJobName"] == operation.name: + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + results = job.risk_details.numerical_stats_result + print( + "Value Range: [{}, {}]".format( + results.min_value.integer_value, + results.max_value.integer_value, + ) + ) + prev_value = None + for percent, result in enumerate(results.quantile_values): + value = result.integer_value + if prev_value != value: + print("Value at {}% quantile: {}".format(percent, value)) + prev_value = value + subscription.set_result(None) + else: + # This is not the message we're looking for. + message.drop() + # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path(project, subscription_id) subscription = subscriber.subscribe(subscription_path, callback) - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) - try: subscription.result(timeout=timeout) except TimeoutError: @@ -166,6 +166,35 @@ def categorical_risk_analysis( # potentially long-running operations. import google.cloud.pubsub + # Instantiate a client. + dlp = google.cloud.dlp_v2.DlpServiceClient() + + # Convert the project id into a full resource id. + parent = dlp.project_path(project) + + # Location info of the BigQuery table. + source_table = { + "project_id": table_project_id, + "dataset_id": dataset_id, + "table_id": table_id, + } + + # Tell the API where to send a notification when the job is complete. + actions = [{"pub_sub": {"topic": "{}/topics/{}".format(parent, topic_id)}}] + + # Configure risk analysis job + # Give the name of the numeric column to compute risk metrics for + risk_job = { + "privacy_metric": { + "categorical_stats_config": {"field": {"name": column_name}} + }, + "source_table": source_table, + "actions": actions, + } + + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) + def callback(message): if message.attributes["DlpJobName"] == operation.name: # This is the message we're looking for, so acknowledge it. @@ -201,41 +230,12 @@ def callback(message): # This is not the message we're looking for. message.drop() - # Instantiate a client. - dlp = google.cloud.dlp_v2.DlpServiceClient() - - # Convert the project id into a full resource id. - parent = dlp.project_path(project) - - # Location info of the BigQuery table. - source_table = { - "project_id": table_project_id, - "dataset_id": dataset_id, - "table_id": table_id, - } - - # Tell the API where to send a notification when the job is complete. - actions = [{"pub_sub": {"topic": "{}/topics/{}".format(parent, topic_id)}}] - - # Configure risk analysis job - # Give the name of the numeric column to compute risk metrics for - risk_job = { - "privacy_metric": { - "categorical_stats_config": {"field": {"name": column_name}} - }, - "source_table": source_table, - "actions": actions, - } - # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path(project, subscription_id) subscription = subscriber.subscribe(subscription_path, callback) - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) - try: subscription.result(timeout=timeout) except TimeoutError: @@ -290,6 +290,39 @@ def k_anonymity_analysis( def get_values(obj): return int(obj.integer_value) + # Instantiate a client. + dlp = google.cloud.dlp_v2.DlpServiceClient() + + # Convert the project id into a full resource id. + parent = dlp.project_path(project) + + # Location info of the BigQuery table. + source_table = { + "project_id": table_project_id, + "dataset_id": dataset_id, + "table_id": table_id, + } + + # Convert quasi id list to Protobuf type + def map_fields(field): + return {"name": field} + + quasi_ids = map(map_fields, quasi_ids) + + # Tell the API where to send a notification when the job is complete. + actions = [{"pub_sub": {"topic": "{}/topics/{}".format(parent, topic_id)}}] + + # Configure risk analysis job + # Give the name of the numeric column to compute risk metrics for + risk_job = { + "privacy_metric": {"k_anonymity_config": {"quasi_ids": quasi_ids}}, + "source_table": source_table, + "actions": actions, + } + + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) + def callback(message): if message.attributes["DlpJobName"] == operation.name: # This is the message we're looking for, so acknowledge it. @@ -326,45 +359,12 @@ def callback(message): # This is not the message we're looking for. message.drop() - # Instantiate a client. - dlp = google.cloud.dlp_v2.DlpServiceClient() - - # Convert the project id into a full resource id. - parent = dlp.project_path(project) - - # Location info of the BigQuery table. - source_table = { - "project_id": table_project_id, - "dataset_id": dataset_id, - "table_id": table_id, - } - - # Convert quasi id list to Protobuf type - def map_fields(field): - return {"name": field} - - quasi_ids = map(map_fields, quasi_ids) - - # Tell the API where to send a notification when the job is complete. - actions = [{"pub_sub": {"topic": "{}/topics/{}".format(parent, topic_id)}}] - - # Configure risk analysis job - # Give the name of the numeric column to compute risk metrics for - risk_job = { - "privacy_metric": {"k_anonymity_config": {"quasi_ids": quasi_ids}}, - "source_table": source_table, - "actions": actions, - } - # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path(project, subscription_id) subscription = subscriber.subscribe(subscription_path, callback) - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) - try: subscription.result(timeout=timeout) except TimeoutError: @@ -421,6 +421,44 @@ def l_diversity_analysis( def get_values(obj): return int(obj.integer_value) + # Instantiate a client. + dlp = google.cloud.dlp_v2.DlpServiceClient() + + # Convert the project id into a full resource id. + parent = dlp.project_path(project) + + # Location info of the BigQuery table. + source_table = { + "project_id": table_project_id, + "dataset_id": dataset_id, + "table_id": table_id, + } + + # Convert quasi id list to Protobuf type + def map_fields(field): + return {"name": field} + + quasi_ids = map(map_fields, quasi_ids) + + # Tell the API where to send a notification when the job is complete. + actions = [{"pub_sub": {"topic": "{}/topics/{}".format(parent, topic_id)}}] + + # Configure risk analysis job + # Give the name of the numeric column to compute risk metrics for + risk_job = { + "privacy_metric": { + "l_diversity_config": { + "quasi_ids": quasi_ids, + "sensitive_attribute": {"name": sensitive_attribute}, + } + }, + "source_table": source_table, + "actions": actions, + } + + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) + def callback(message): if message.attributes["DlpJobName"] == operation.name: # This is the message we're looking for, so acknowledge it. @@ -464,50 +502,12 @@ def callback(message): # This is not the message we're looking for. message.drop() - # Instantiate a client. - dlp = google.cloud.dlp_v2.DlpServiceClient() - - # Convert the project id into a full resource id. - parent = dlp.project_path(project) - - # Location info of the BigQuery table. - source_table = { - "project_id": table_project_id, - "dataset_id": dataset_id, - "table_id": table_id, - } - - # Convert quasi id list to Protobuf type - def map_fields(field): - return {"name": field} - - quasi_ids = map(map_fields, quasi_ids) - - # Tell the API where to send a notification when the job is complete. - actions = [{"pub_sub": {"topic": "{}/topics/{}".format(parent, topic_id)}}] - - # Configure risk analysis job - # Give the name of the numeric column to compute risk metrics for - risk_job = { - "privacy_metric": { - "l_diversity_config": { - "quasi_ids": quasi_ids, - "sensitive_attribute": {"name": sensitive_attribute}, - } - }, - "source_table": source_table, - "actions": actions, - } - # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path(project, subscription_id) subscription = subscriber.subscribe(subscription_path, callback) - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) - try: subscription.result(timeout=timeout) except TimeoutError: @@ -571,41 +571,6 @@ def k_map_estimate_analysis( def get_values(obj): return int(obj.integer_value) - def callback(message): - if message.attributes["DlpJobName"] == operation.name: - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - histogram_buckets = ( - job.risk_details.k_map_estimation_result.k_map_estimation_histogram - ) - # Print bucket stats - for i, bucket in enumerate(histogram_buckets): - print("Bucket {}:".format(i)) - print( - " Anonymity range: [{}, {}]".format( - bucket.min_anonymity, bucket.max_anonymity - ) - ) - print(" Size: {}".format(bucket.bucket_size)) - for value_bucket in bucket.bucket_values: - print( - " Values: {}".format( - map(get_values, value_bucket.quasi_ids_values) - ) - ) - print( - " Estimated k-map anonymity: {}".format( - value_bucket.estimated_anonymity - ) - ) - subscription.set_result(None) - else: - # This is not the message we're looking for. - message.drop() - # Instantiate a client. dlp = google.cloud.dlp_v2.DlpServiceClient() @@ -648,15 +613,50 @@ def map_fields(quasi_id, info_type): "actions": actions, } + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) + + def callback(message): + if message.attributes["DlpJobName"] == operation.name: + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + histogram_buckets = ( + job.risk_details.k_map_estimation_result.k_map_estimation_histogram + ) + # Print bucket stats + for i, bucket in enumerate(histogram_buckets): + print("Bucket {}:".format(i)) + print( + " Anonymity range: [{}, {}]".format( + bucket.min_anonymity, bucket.max_anonymity + ) + ) + print(" Size: {}".format(bucket.bucket_size)) + for value_bucket in bucket.bucket_values: + print( + " Values: {}".format( + map(get_values, value_bucket.quasi_ids_values) + ) + ) + print( + " Estimated k-map anonymity: {}".format( + value_bucket.estimated_anonymity + ) + ) + subscription.set_result(None) + else: + # This is not the message we're looking for. + message.drop() + # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path(project, subscription_id) subscription = subscriber.subscribe(subscription_path, callback) - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) - try: subscription.result(timeout=timeout) except TimeoutError: