From 5be37aad9a13fd72e7dc7d1faad4ba123561b637 Mon Sep 17 00:00:00 2001 From: Karim Wadie Date: Tue, 14 Feb 2023 12:02:38 +0100 Subject: [PATCH 1/2] added async gcs snapshoter --- README.md | 14 +++ scripts/prepare_backup_projects.sh | 14 ++- .../f03_snapshoter/BigQuerySnapshoter.java | 4 + .../f03_snapshoter/GCSSnapshoter.java | 92 +++++++------- .../f03_snapshoter/GCSSnapshoterResponse.java | 15 +-- .../helpers/TrackingHelper.java | 9 ++ .../services/bq/BigQueryService.java | 29 +++-- .../services/bq/BigQueryServiceImpl.java | 46 +++---- .../services/map/GcsPersistentMapImpl.java | 33 +++++ .../services/map/PersistentMap.java | 7 ++ .../BigQuerySnapshoterTest.java | 15 +-- .../f03_snapshoter/GCSSnapshoterTest.java | 11 +- .../helpers/TrackingHelperTest.java | 11 ++ .../services/PersistentMapTestImpl.java | 25 ++++ .../GCSSnapshoterController.java | 3 + services/tagger-app/pom.xml | 7 ++ .../tagger/TaggerController.java | 117 +++++++++++++++++- .../src/main/test/TaggerControllerTest.java | 102 +++++++++++++++ terraform/main.tf | 57 +++++++++ .../modules/async-gcs-snapshoter/main.tf | 18 +++ .../modules/async-gcs-snapshoter/variables.tf | 5 + terraform/modules/bigquery/main.tf | 23 ++++ terraform/modules/bigquery/variables.tf | 4 + terraform/modules/cloud-logging/main.tf | 2 +- terraform/modules/cloud-logging/variables.tf | 3 +- terraform/modules/cloud-run/main.tf | 10 +- terraform/modules/cloud-run/variables.tf | 4 + terraform/modules/gcs/main.tf | 2 + terraform/modules/gcs/variables.tf | 4 + terraform/modules/pubsub/main.tf | 2 + terraform/modules/pubsub/variables.tf | 3 + terraform/outputs.tf | 7 ++ terraform/variables.tf | 11 ++ 33 files changed, 586 insertions(+), 123 deletions(-) create mode 100644 services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/map/GcsPersistentMapImpl.java create mode 100644 services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/map/PersistentMap.java create mode 100644 services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/services/PersistentMapTestImpl.java create mode 100644 services/tagger-app/src/main/test/TaggerControllerTest.java create mode 100644 terraform/modules/async-gcs-snapshoter/main.tf create mode 100644 terraform/modules/async-gcs-snapshoter/variables.tf diff --git a/README.md b/README.md index 147873c..6bc27d5 100644 --- a/README.md +++ b/README.md @@ -410,6 +410,20 @@ BigQuery Types to Avro Logical Types mapping: | `TIME` | `timestamp-micro` (annotates Avro `LONG`) | | `DATETIME` | `STRING` (custom named logical type `datetime`) | +##### Configure Additional Backup Projects + +Terraform needs to deploy resources to the backup projects where the backup operations will run. For example, log +sinks that send notifications to the Tagger once a backup operation has completed. + +By default, all projects listed in the `backup_project` field in the fallback policy will be automatically included. +However, for additional backup projects such as the ones defined in external configuration (i.e. table backup policy tags), +one must add them to the below list. + +``` +additional_backup_projects = ["project1", "project2", ..] +``` + +If you're only using the fallback backup policy and without table-level external policies, you can set this variable to an empty list `[]` #### Terraform Deployment diff --git a/scripts/prepare_backup_projects.sh b/scripts/prepare_backup_projects.sh index 57e64de..5ad0be9 100755 --- a/scripts/prepare_backup_projects.sh +++ b/scripts/prepare_backup_projects.sh @@ -43,9 +43,15 @@ do --member="serviceAccount:${SA_SNAPSHOTER_GCS_EMAIL}" \ --role="roles/bigquery.jobUser" - # GCS Snapshoter needs to write to GCS - gcloud projects add-iam-policy-binding "${project}" \ - --member="serviceAccount:${SA_SNAPSHOTER_GCS_EMAIL}" \ - --role="roles/storage.objectAdmin" + # GCS Snapshoter needs to write to GCS + gcloud projects add-iam-policy-binding "${project}" \ + --member="serviceAccount:${SA_SNAPSHOTER_GCS_EMAIL}" \ + --role="roles/storage.objectAdmin" + + # Terraform needs to create log sinks to capture GCS export operation completion + gcloud projects add-iam-policy-binding "${project}" \ + --member="serviceAccount:${TF_SA}@${PROJECT_ID}.iam.gserviceaccount.com" \ + --role="roles/logging.configWriter" + done diff --git a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/BigQuerySnapshoter.java b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/BigQuerySnapshoter.java index fdee2dd..bc5aade 100644 --- a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/BigQuerySnapshoter.java +++ b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/BigQuerySnapshoter.java @@ -19,6 +19,7 @@ import com.google.cloud.Timestamp; import com.google.cloud.Tuple; +import com.google.cloud.pso.bq_snapshot_manager.entities.Globals; import com.google.cloud.pso.bq_snapshot_manager.entities.NonRetryableApplicationException; import com.google.cloud.pso.bq_snapshot_manager.entities.TableSpec; import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.BackupMethod; @@ -140,7 +141,10 @@ public BigQuerySnapshoterResponse execute(SnapshoterRequest request, Timestamp o if(!request.isDryRun()){ // API Call + String jobId = String.format("%s_%s_%s", Globals.APPLICATION_NAME, "snapshot", request.getTrackingId()); + bqService.createSnapshot( + jobId, sourceTableWithTimeTravelTuple.x(), snapshotTable, expiryTs, diff --git a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/GCSSnapshoter.java b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/GCSSnapshoter.java index 3eb856e..95b1c5b 100644 --- a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/GCSSnapshoter.java +++ b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/GCSSnapshoter.java @@ -2,24 +2,22 @@ import com.google.cloud.Timestamp; import com.google.cloud.Tuple; +import com.google.cloud.pso.bq_snapshot_manager.entities.Globals; import com.google.cloud.pso.bq_snapshot_manager.entities.NonRetryableApplicationException; import com.google.cloud.pso.bq_snapshot_manager.entities.TableSpec; import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.BackupMethod; -import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.GCSSnapshotFormat; -import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.TimeTravelOffsetDays; import com.google.cloud.pso.bq_snapshot_manager.functions.f04_tagger.TaggerRequest; import com.google.cloud.pso.bq_snapshot_manager.helpers.LoggingHelper; +import com.google.cloud.pso.bq_snapshot_manager.helpers.TrackingHelper; import com.google.cloud.pso.bq_snapshot_manager.helpers.Utils; import com.google.cloud.pso.bq_snapshot_manager.services.bq.BigQueryService; -import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.FailedPubSubMessage; -import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.PubSubPublishResults; +import com.google.cloud.pso.bq_snapshot_manager.services.map.PersistentMap; import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.PubSubService; -import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.SuccessPubSubMessage; import com.google.cloud.pso.bq_snapshot_manager.services.set.PersistentSet; -import org.apache.commons.lang3.StringUtils; import java.io.IOException; -import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; public class GCSSnapshoter { @@ -31,12 +29,17 @@ public class GCSSnapshoter { private final PersistentSet persistentSet; private final String persistentSetObjectPrefix; + private final PersistentMap persistentMap; + + private final String persistentMapObjectPrefix; public GCSSnapshoter(SnapshoterConfig config, BigQueryService bqService, PubSubService pubSubService, PersistentSet persistentSet, String persistentSetObjectPrefix, + PersistentMap persistentMap, + String persistentMapObjectPrefix, Integer functionNumber ) { this.config = config; @@ -44,6 +47,8 @@ public GCSSnapshoter(SnapshoterConfig config, this.pubSubService = pubSubService; this.persistentSet = persistentSet; this.persistentSetObjectPrefix = persistentSetObjectPrefix; + this.persistentMap = persistentMap; + this.persistentMapObjectPrefix = persistentMapObjectPrefix; logger = new LoggingHelper( GCSSnapshoter.class.getSimpleName(), @@ -123,59 +128,56 @@ public GCSSnapshoterResponse execute(SnapshoterRequest request, Timestamp operat ); if(!request.isDryRun()){ + // create an async bq export job + + String jobId = TrackingHelper.generateBQExportJobId(request.getTrackingId()); + + // We create the tagging request and added it to a persistent storage + // The Tagger service will receive notifications of export job completion via log sinks and pick up the tagger request from the persistent storage + // Make sure the file is stored first before running the export job. In case of non-fatal error of file creation and retry, we don't re-run the export job + TaggerRequest taggerRequest = new TaggerRequest( + request.getTargetTable(), + request.getRunId(), + request.getTrackingId(), + request.isDryRun(), + request.getBackupPolicy(), + BackupMethod.GCS_SNAPSHOT, + null, + gcsDestinationUri, + operationTs + ); + + String taggerRequestFile = String.format("%s/%s", persistentMapObjectPrefix, jobId); + persistentMap.put(taggerRequestFile, taggerRequest.toJsonString()); + + Map jobLabels = new HashMap<>(); + // labels has to be max 63 chars, contain only lowercase letters, numeric characters, underscores, and dashes. All characters must use UTF-8 encoding, and international characters are allowed. + jobLabels.put("app", Globals.APPLICATION_NAME); + // API Call bqService.exportToGCS( + jobId, sourceTableWithTimeTravelTuple.x(), gcsDestinationUri, request.getBackupPolicy().getGcsExportFormat(), - null, - null, - null, - request.getTrackingId() + request.getBackupPolicy().getGcsCsvDelimiter(), + request.getBackupPolicy().getGcsCsvExportHeader(), + request.getBackupPolicy().getGcsUseAvroLogicalTypes(), + request.getTrackingId(), + jobLabels ); } - logger.logInfoWithTracker( request.isDryRun(), request.getTrackingId(), request.getTargetTable(), - String.format("BigQuery GCS export completed for table %s to %s", + String.format("BigQuery GCS export submitted for table %s to %s", request.getTargetTable().toSqlString(), gcsDestinationUri ) ); - // Create a Tagger request and send it to the Tagger PubSub topic - TaggerRequest taggerRequest = new TaggerRequest( - request.getTargetTable(), - request.getRunId(), - request.getTrackingId(), - request.isDryRun(), - request.getBackupPolicy(), - BackupMethod.GCS_SNAPSHOT, - null, - gcsDestinationUri, - operationTs - ); - - // Publish the list of tagging requests to PubSub - PubSubPublishResults publishResults = pubSubService.publishTableOperationRequests( - config.getProjectId(), - config.getOutputTopic(), - Arrays.asList(taggerRequest) - ); - - for (FailedPubSubMessage msg : publishResults.getFailedMessages()) { - String logMsg = String.format("Failed to publish this message %s", msg.toString()); - logger.logWarnWithTracker(request.isDryRun(),request.getTrackingId(), request.getTargetTable(), logMsg); - } - - for (SuccessPubSubMessage msg : publishResults.getSuccessMessages()) { - String logMsg = String.format("Published this message %s", msg.toString()); - logger.logInfoWithTracker(request.isDryRun(),request.getTrackingId(), request.getTargetTable(), logMsg); - } - // run common service end logging and adding pubsub message to processed list Utils.runServiceEndRoutines( logger, @@ -191,9 +193,7 @@ public GCSSnapshoterResponse execute(SnapshoterRequest request, Timestamp operat request.getTrackingId(), request.isDryRun(), operationTs, - sourceTableWithTimeTravelTuple.x(), - taggerRequest, - publishResults + sourceTableWithTimeTravelTuple.x() ); } diff --git a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/GCSSnapshoterResponse.java b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/GCSSnapshoterResponse.java index b90025c..b87dbf9 100644 --- a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/GCSSnapshoterResponse.java +++ b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/GCSSnapshoterResponse.java @@ -3,22 +3,16 @@ import com.google.cloud.Timestamp; import com.google.cloud.pso.bq_snapshot_manager.entities.TableOperationRequestResponse; import com.google.cloud.pso.bq_snapshot_manager.entities.TableSpec; -import com.google.cloud.pso.bq_snapshot_manager.functions.f04_tagger.TaggerRequest; -import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.PubSubPublishResults; public class GCSSnapshoterResponse extends TableOperationRequestResponse { private final Timestamp operationTs; private final TableSpec computedSourceTable; - private final TaggerRequest outputTaggerRequest; - private final PubSubPublishResults pubSubPublishResults; - public GCSSnapshoterResponse(TableSpec targetTable, String runId, String trackingId, boolean isDryRun, Timestamp operationTs, TableSpec computedSourceTable, TaggerRequest outputTaggerRequest, PubSubPublishResults pubSubPublishResults) { + public GCSSnapshoterResponse(TableSpec targetTable, String runId, String trackingId, boolean isDryRun, Timestamp operationTs, TableSpec computedSourceTable) { super(targetTable, runId, trackingId, isDryRun); this.operationTs = operationTs; this.computedSourceTable = computedSourceTable; - this.outputTaggerRequest = outputTaggerRequest; - this.pubSubPublishResults = pubSubPublishResults; } public Timestamp getOperationTs() { @@ -29,11 +23,4 @@ public TableSpec getComputedSourceTable() { return computedSourceTable; } - public TaggerRequest getOutputTaggerRequest() { - return outputTaggerRequest; - } - - public PubSubPublishResults getPubSubPublishResults() { - return pubSubPublishResults; - } } diff --git a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/helpers/TrackingHelper.java b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/helpers/TrackingHelper.java index bb7b474..df1f234 100644 --- a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/helpers/TrackingHelper.java +++ b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/helpers/TrackingHelper.java @@ -17,6 +17,7 @@ package com.google.cloud.pso.bq_snapshot_manager.helpers; import com.google.cloud.Timestamp; +import com.google.cloud.pso.bq_snapshot_manager.entities.Globals; import java.util.UUID; @@ -68,4 +69,12 @@ public static Timestamp parseRunIdAsTimestamp(String runId){ public static String generateTrackingId (String runId){ return String.format("%s-%s", runId, UUID.randomUUID().toString()); } + + public static String generateBQExportJobId(String trackingId){ + return String.format("%s_%s_%s",trackingId, "export", Globals.APPLICATION_NAME, trackingId); + } + + public static String parseTrackingIdFromBQExportJobId(String bqExportJobId){ + return Utils.tokenize(bqExportJobId,"_", true).get(0); + } } diff --git a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/bq/BigQueryService.java b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/bq/BigQueryService.java index 45bcf96..55ded44 100644 --- a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/bq/BigQueryService.java +++ b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/bq/BigQueryService.java @@ -29,18 +29,25 @@ import java.io.IOException; import java.math.BigInteger; import java.util.List; +import java.util.Map; public interface BigQueryService { - void createSnapshot(TableSpec sourceTable, - TableSpec destinationId, - Timestamp snapshotExpirationTs, - String trackingId) throws InterruptedException; + void createSnapshot( + String jobId, + TableSpec sourceTable, + TableSpec destinationId, + Timestamp snapshotExpirationTs, + String trackingId) throws InterruptedException; - void exportToGCS(TableSpec sourceTable, - String gcsDestinationUri, - GCSSnapshotFormat exportFormat, - @Nullable String csvFieldDelimiter, - @Nullable Boolean csvPrintHeader, - @Nullable Boolean useAvroLogicalTypes, - String trackingId) throws InterruptedException; + void exportToGCS( + String jobId, + TableSpec sourceTable, + String gcsDestinationUri, + GCSSnapshotFormat exportFormat, + @Nullable String csvFieldDelimiter, + @Nullable Boolean csvPrintHeader, + @Nullable Boolean useAvroLogicalTypes, + String trackingId, + Map jobLabels + ) throws InterruptedException; } diff --git a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/bq/BigQueryServiceImpl.java b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/bq/BigQueryServiceImpl.java index 1b3c1c8..b7851dd 100644 --- a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/bq/BigQueryServiceImpl.java +++ b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/bq/BigQueryServiceImpl.java @@ -25,6 +25,8 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; public class BigQueryServiceImpl implements BigQueryService { @@ -39,7 +41,7 @@ public BigQueryServiceImpl(String projectId) throws IOException { } - public void createSnapshot(TableSpec sourceTable, TableSpec destinationTable, Timestamp snapshotExpirationTs, String trackingId) throws InterruptedException { + public void createSnapshot(String jobId, TableSpec sourceTable, TableSpec destinationTable, Timestamp snapshotExpirationTs, String trackingId) throws InterruptedException { CopyJobConfiguration copyJobConfiguration = CopyJobConfiguration .newBuilder(destinationTable.toTableId(), sourceTable.toTableId()) .setWriteDisposition(JobInfo.WriteDisposition.WRITE_EMPTY) @@ -49,7 +51,7 @@ public void createSnapshot(TableSpec sourceTable, TableSpec destinationTable, Ti Job job = bigQuery.create(JobInfo .newBuilder(copyJobConfiguration) - .setJobId(JobId.of(String.format("%s_%s_%s", Globals.APPLICATION_NAME, "snapshot", trackingId))) + .setJobId(JobId.of(jobId)) .build()); // wait for the job to complete @@ -61,47 +63,45 @@ public void createSnapshot(TableSpec sourceTable, TableSpec destinationTable, Ti } } - public void exportToGCS(TableSpec sourceTable, - String gcsDestinationUri, - GCSSnapshotFormat exportFormat, - @Nullable String csvFieldDelimiter, - @Nullable Boolean csvPrintHeader, - @Nullable Boolean useAvroLogicalTypes, - String trackingId) throws InterruptedException { + public void exportToGCS( + String jobId, + TableSpec sourceTable, + String gcsDestinationUri, + GCSSnapshotFormat exportFormat, + @Nullable String csvFieldDelimiter, + @Nullable Boolean csvPrintHeader, + @Nullable Boolean useAvroLogicalTypes, + String trackingId, + Map jobLabels + ) throws InterruptedException { Tuple formatAndCompression = GCSSnapshotFormat.getFormatAndCompression(exportFormat); ExtractJobConfiguration.Builder extractConfigurationBuilder = ExtractJobConfiguration .newBuilder(sourceTable.toTableId(), gcsDestinationUri) + .setLabels(jobLabels) .setFormat(formatAndCompression.x()); // check if compression is required - if(formatAndCompression.y() != null){ + if (formatAndCompression.y() != null) { extractConfigurationBuilder.setCompression(formatAndCompression.y()); } // set optional fields - if(csvFieldDelimiter != null){ + if (csvFieldDelimiter != null) { extractConfigurationBuilder.setFieldDelimiter(csvFieldDelimiter); } - if(csvPrintHeader != null){ + if (csvPrintHeader != null) { extractConfigurationBuilder.setPrintHeader(csvPrintHeader); } - if(useAvroLogicalTypes != null){ + if (useAvroLogicalTypes != null) { extractConfigurationBuilder.setUseAvroLogicalTypes(useAvroLogicalTypes); } - Job job = bigQuery.create(JobInfo + // async call to create an export job + bigQuery.create(JobInfo .newBuilder(extractConfigurationBuilder.build()) - .setJobId(JobId.of(String.format("%s_%s_%s", Globals.APPLICATION_NAME, "export", trackingId))) + .setJobId(JobId.of(jobId)) .build()); - - // Blocks until this job completes its execution, either failing or succeeding. - job = job.waitFor(); - - // if job finished with errors - if (job.getStatus().getError() != null) { - throw new RuntimeException(job.getStatus().getError().toString()); - } } } \ No newline at end of file diff --git a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/map/GcsPersistentMapImpl.java b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/map/GcsPersistentMapImpl.java new file mode 100644 index 0000000..059bb8d --- /dev/null +++ b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/map/GcsPersistentMapImpl.java @@ -0,0 +1,33 @@ +package com.google.cloud.pso.bq_snapshot_manager.services.map; + +import com.google.cloud.storage.*; + +import java.nio.charset.StandardCharsets; + +public class GcsPersistentMapImpl implements PersistentMap{ + + private Storage storage; + private String bucketName; + + public GcsPersistentMapImpl(String bucketName) { + // Instantiates a client + this.storage = StorageOptions.getDefaultInstance().getService(); + this.bucketName = bucketName; + } + + @Override + public void put(String key, String value) { + BlobId blobId = BlobId.of(bucketName, key); + BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build(); + storage.create(blobInfo, value.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public String get(String key) { + BlobId blobId = BlobId.of(bucketName, key); + Blob blob = storage.get(blobId); + byte [] content = blob.getContent(); + return new String(content, StandardCharsets.UTF_8); + } + +} diff --git a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/map/PersistentMap.java b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/map/PersistentMap.java new file mode 100644 index 0000000..cc1d1df --- /dev/null +++ b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/map/PersistentMap.java @@ -0,0 +1,7 @@ +package com.google.cloud.pso.bq_snapshot_manager.services.map; + +public interface PersistentMap { + + void put(String key, String value); + String get(String key); +} diff --git a/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/BigQuerySnapshoterTest.java b/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/BigQuerySnapshoterTest.java index 5809327..a623162 100644 --- a/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/BigQuerySnapshoterTest.java +++ b/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/BigQuerySnapshoterTest.java @@ -15,13 +15,14 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.Map; import static org.junit.Assert.assertEquals; public class BigQuerySnapshoterTest { @Test - public void testGetSnapshotTableSpec(){ + public void testGetSnapshotTableSpec() { TableSpec actual = BigQuerySnapshoter.getSnapshotTableSpec( TableSpec.fromSqlString("p.d.t"), @@ -40,14 +41,14 @@ public void testGetSnapshotTableSpec(){ public void testExecute() throws NonRetryableApplicationException, IOException, InterruptedException { BigQuerySnapshoter snapshoter = new BigQuerySnapshoter( - new SnapshoterConfig("host-project", "data-region"), + new SnapshoterConfig("host-project", "data-region"), new BigQueryService() { @Override - public void createSnapshot(TableSpec sourceTable, TableSpec destinationId, Timestamp snapshotExpirationTs, String trackingId) throws InterruptedException { + public void createSnapshot(String jobId, TableSpec sourceTable, TableSpec destinationId, Timestamp snapshotExpirationTs, String trackingId) throws InterruptedException { } @Override - public void exportToGCS(TableSpec sourceTable, String gcsDestinationUri, GCSSnapshotFormat exportFormat, @Nullable String csvFieldDelimiter, @Nullable Boolean csvPrintHeader, @Nullable Boolean useAvroLogicalTypes, String trackingId) throws InterruptedException { + public void exportToGCS(String jobId, TableSpec sourceTable, String gcsDestinationUri, GCSSnapshotFormat exportFormat, @Nullable String csvFieldDelimiter, @Nullable Boolean csvPrintHeader, @Nullable Boolean useAvroLogicalTypes, String trackingId, Map jobLabels) throws InterruptedException { } }, new PubSubServiceTestImpl(), @@ -68,9 +69,9 @@ public void exportToGCS(TableSpec sourceTable, String gcsDestinationUri, GCSSnap TableSpec sourceTable = TableSpec.fromSqlString("project.dataset.table"); Timestamp operationTime = Timestamp.ofTimeSecondsAndNanos(1667478075L, 0); - Long timeTravelMilis = (operationTime.getSeconds() - (3* 86400))*1000; - TableSpec expectedSourceTable = TableSpec.fromSqlString("project.dataset.table@"+timeTravelMilis); - TableSpec expectedSnapshotTable = TableSpec.fromSqlString("backup-p.backup-d.project_dataset_table_runId_"+timeTravelMilis); + Long timeTravelMilis = (operationTime.getSeconds() - (3 * 86400)) * 1000; + TableSpec expectedSourceTable = TableSpec.fromSqlString("project.dataset.table@" + timeTravelMilis); + TableSpec expectedSnapshotTable = TableSpec.fromSqlString("backup-p.backup-d.project_dataset_table_runId_" + timeTravelMilis); BigQuerySnapshoterResponse actualResponse = snapshoter.execute( new SnapshoterRequest( diff --git a/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/GCSSnapshoterTest.java b/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/GCSSnapshoterTest.java index e0237d3..f8fc794 100644 --- a/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/GCSSnapshoterTest.java +++ b/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/GCSSnapshoterTest.java @@ -5,6 +5,7 @@ import com.google.cloud.pso.bq_snapshot_manager.entities.TableSpec; import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.*; import com.google.cloud.pso.bq_snapshot_manager.functions.f04_tagger.TaggerRequest; +import com.google.cloud.pso.bq_snapshot_manager.services.PersistentMapTestImpl; import com.google.cloud.pso.bq_snapshot_manager.services.PersistentSetTestImpl; import com.google.cloud.pso.bq_snapshot_manager.services.PubSubServiceTestImpl; import com.google.cloud.pso.bq_snapshot_manager.services.bq.BigQueryService; @@ -16,6 +17,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Map; import static org.junit.Assert.assertEquals; @@ -42,16 +44,18 @@ public void testExecute() throws NonRetryableApplicationException, IOException, new SnapshoterConfig("host-project", "data-region"), new BigQueryService() { @Override - public void createSnapshot(TableSpec sourceTable, TableSpec destinationId, Timestamp snapshotExpirationTs, String trackingId) throws InterruptedException { + public void createSnapshot(String jobId, TableSpec sourceTable, TableSpec destinationId, Timestamp snapshotExpirationTs, String trackingId) throws InterruptedException { } @Override - public void exportToGCS(TableSpec sourceTable, String gcsDestinationUri, GCSSnapshotFormat exportFormat, @Nullable String csvFieldDelimiter, @Nullable Boolean csvPrintHeader, @Nullable Boolean useAvroLogicalTypes, String trackingId) throws InterruptedException { + public void exportToGCS(String jobId, TableSpec sourceTable, String gcsDestinationUri, GCSSnapshotFormat exportFormat, @Nullable String csvFieldDelimiter, @Nullable Boolean csvPrintHeader, @Nullable Boolean useAvroLogicalTypes, String trackingId, Map jobLabels) throws InterruptedException { } }, new PubSubServiceTestImpl(), new PersistentSetTestImpl(), - "test-prefix", + "test-set-prefix", + new PersistentMapTestImpl(), + "test-map-prefix", -3 ); @@ -94,7 +98,6 @@ public void exportToGCS(TableSpec sourceTable, String gcsDestinationUri, GCSSnap operationTime ); - assertEquals(expectedTaggerRequest, actualResponse.getOutputTaggerRequest()); assertEquals(expectedSourceTable, actualResponse.getComputedSourceTable()); assertEquals(operationTime, actualResponse.getOperationTs()); diff --git a/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/helpers/TrackingHelperTest.java b/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/helpers/TrackingHelperTest.java index 613e973..803848d 100644 --- a/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/helpers/TrackingHelperTest.java +++ b/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/helpers/TrackingHelperTest.java @@ -16,4 +16,15 @@ public void parseAsTimestamp(){ assertEquals(1641034800, fromRunId.getSeconds()); } + + @Test + public void parseTrackingIdFromJobId(){ + String trackingId = TrackingHelper.generateTrackingId(TrackingHelper.MIN_RUN_ID); + String parsedTrackingId = TrackingHelper.parseTrackingIdFromBQExportJobId( + TrackingHelper.generateBQExportJobId(trackingId) + ); + + assertEquals(trackingId, parsedTrackingId); + + } } diff --git a/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/services/PersistentMapTestImpl.java b/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/services/PersistentMapTestImpl.java new file mode 100644 index 0000000..6802d3a --- /dev/null +++ b/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/services/PersistentMapTestImpl.java @@ -0,0 +1,25 @@ +package com.google.cloud.pso.bq_snapshot_manager.services; + +import com.google.cloud.pso.bq_snapshot_manager.services.map.PersistentMap; + +import java.util.HashMap; +import java.util.Map; + +public class PersistentMapTestImpl implements PersistentMap { + + private Map map; + + public PersistentMapTestImpl(){ + map = new HashMap<>(); + } + + @Override + public void put(String key, String value) { + map.put(key, value); + } + + @Override + public String get(String key) { + return map.get(key); + } +} diff --git a/services/snapshoter-gcs-app/src/main/java/com/google/cloud/pso/bq_snapshot_manager/snapshoter_gcs/GCSSnapshoterController.java b/services/snapshoter-gcs-app/src/main/java/com/google/cloud/pso/bq_snapshot_manager/snapshoter_gcs/GCSSnapshoterController.java index 2a677ce..43cfb6d 100644 --- a/services/snapshoter-gcs-app/src/main/java/com/google/cloud/pso/bq_snapshot_manager/snapshoter_gcs/GCSSnapshoterController.java +++ b/services/snapshoter-gcs-app/src/main/java/com/google/cloud/pso/bq_snapshot_manager/snapshoter_gcs/GCSSnapshoterController.java @@ -27,6 +27,7 @@ import com.google.cloud.pso.bq_snapshot_manager.helpers.LoggingHelper; import com.google.cloud.pso.bq_snapshot_manager.helpers.TrackingHelper; import com.google.cloud.pso.bq_snapshot_manager.services.bq.BigQueryServiceImpl; +import com.google.cloud.pso.bq_snapshot_manager.services.map.GcsPersistentMapImpl; import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.PubSubServiceImpl; import com.google.cloud.pso.bq_snapshot_manager.services.set.GCSPersistentSetImpl; import com.google.gson.Gson; @@ -101,6 +102,8 @@ public ResponseEntity receiveMessage(@RequestBody PubSubEvent requestBody) { new PubSubServiceImpl(), new GCSPersistentSetImpl(environment.getGcsFlagsBucket()), "snapshoter-gcs-flags", + new GcsPersistentMapImpl(environment.getGcsFlagsBucket()), + "snapshoter-gcs-tagger-requests", functionNumber ); diff --git a/services/tagger-app/pom.xml b/services/tagger-app/pom.xml index 6cb6aa1..429af70 100644 --- a/services/tagger-app/pom.xml +++ b/services/tagger-app/pom.xml @@ -54,6 +54,13 @@ test + + junit + junit + 4.13.2 + test + + diff --git a/services/tagger-app/src/main/java/com/google/cloud/pso/bq_snapshot_manager/tagger/TaggerController.java b/services/tagger-app/src/main/java/com/google/cloud/pso/bq_snapshot_manager/tagger/TaggerController.java index 5e1c79a..214c44b 100644 --- a/services/tagger-app/src/main/java/com/google/cloud/pso/bq_snapshot_manager/tagger/TaggerController.java +++ b/services/tagger-app/src/main/java/com/google/cloud/pso/bq_snapshot_manager/tagger/TaggerController.java @@ -19,6 +19,8 @@ import com.google.cloud.pso.bq_snapshot_manager.entities.NonRetryableApplicationException; import com.google.cloud.pso.bq_snapshot_manager.entities.PubSubEvent; import com.google.cloud.pso.bq_snapshot_manager.entities.TableSpec; +import com.google.cloud.pso.bq_snapshot_manager.functions.f03_snapshoter.BigQuerySnapshoter; +import com.google.cloud.pso.bq_snapshot_manager.functions.f03_snapshoter.GCSSnapshoter; import com.google.cloud.pso.bq_snapshot_manager.functions.f04_tagger.Tagger; import com.google.cloud.pso.bq_snapshot_manager.functions.f04_tagger.TaggerResponse; import com.google.cloud.pso.bq_snapshot_manager.functions.f04_tagger.TaggerRequest; @@ -26,8 +28,13 @@ import com.google.cloud.pso.bq_snapshot_manager.helpers.LoggingHelper; import com.google.cloud.pso.bq_snapshot_manager.helpers.TrackingHelper; import com.google.cloud.pso.bq_snapshot_manager.services.catalog.DataCatalogServiceImpl; +import com.google.cloud.pso.bq_snapshot_manager.services.map.GcsPersistentMapImpl; +import com.google.cloud.pso.bq_snapshot_manager.services.map.PersistentMap; import com.google.cloud.pso.bq_snapshot_manager.services.set.GCSPersistentSetImpl; import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.http.HttpStatus; @@ -81,12 +88,41 @@ public ResponseEntity receiveMessage(@RequestBody PubSubEvent requestBody) { String requestJsonString = requestBody.getMessage().dataToUtf8String(); - // remove any escape characters (e.g. from Terraform - requestJsonString = requestJsonString.replace("\\", ""); - logger.logInfoWithTracker(trackingId, null, String.format("Received payload: %s", requestJsonString)); - taggerRequest = gson.fromJson(requestJsonString, TaggerRequest.class); + // The received pubsub message could have been sent by two different sources + // 1. BigQuery Snapshoter: as a TaggerRequest JSON payload + // 2. From a log sink listening for BQ export job completion events. These jobs are originally submitted by the GCS Snapshoter + + boolean isGCSExportJobMessage = isGCSExportJobMessage(requestJsonString); + if(isGCSExportJobMessage){ + // parse the pubsub request as a BQ Export job completion notification + + String jobId = getGcsExportJobId(requestJsonString); + String jobProjectId = getGcsExportJobProjectId(requestJsonString); + trackingId = TrackingHelper.parseTrackingIdFromBQExportJobId(jobId); + boolean isSuccessfulJob = isSuccessfulJob(requestJsonString); + String jobError = getGcsExportJobError(requestJsonString); + + PersistentMap persistentMap = new GcsPersistentMapImpl(environment.getGcsFlagsBucket()); + String taggerRequestFile = String.format("%s/%s", "snapshoter-gcs-tagger-requests", jobId); + String taggerRequestJson = persistentMap.get(taggerRequestFile); + taggerRequest = gson.fromJson(taggerRequestJson, TaggerRequest.class); + + // After parsing the taggerRequest for tracking, throw a non retryable exception if the backup job failed + if (!isSuccessfulJob){ + String msg = String.format("GCS export job '%s' on project '%s' has failed with error `%s`. Please check the BigQuery logs in the backup project where the job ran.", + jobId, + jobProjectId, + jobError + ); + throw new NonRetryableApplicationException(msg); + } + + }else{ + // parse the pubsub request as a taggerRequest (from BQ Snapshoter) + taggerRequest = gson.fromJson(requestJsonString, TaggerRequest.class); + } trackingId = taggerRequest.getTrackingId(); @@ -142,6 +178,79 @@ public ResponseEntity receiveMessage(@RequestBody PubSubEvent requestBody) { return responseEntity; } + public boolean isGCSExportJobMessage(String jsonStr){ + try{ + getGcsExportJobId(jsonStr); + return true; + }catch (Exception ex){ + return false; + } + } + + public static String getGcsExportJobError(String jsonStr){ + + JsonObject errorObject = JsonParser.parseString(jsonStr) + .getAsJsonObject().get("protoPayload") + .getAsJsonObject().get("serviceData") + .getAsJsonObject().get("jobCompletedEvent") + .getAsJsonObject().get("job") + .getAsJsonObject().get("jobStatus") + .getAsJsonObject().get("error").getAsJsonObject(); + + if(errorObject.has("message")){ + return errorObject.get("message").getAsString(); + }else{ + return ""; + } + } + + public static boolean isSuccessfulJob(String jsonStr){ + + // if job has error message then it's not successful + return !JsonParser.parseString(jsonStr) + .getAsJsonObject().get("protoPayload") + .getAsJsonObject().get("serviceData") + .getAsJsonObject().get("jobCompletedEvent") + .getAsJsonObject().get("job") + .getAsJsonObject().get("jobStatus") + .getAsJsonObject().get("error") + .getAsJsonObject().has("message"); + } + + public static String getGcsExportJobId(String jsonStr){ + + return JsonParser.parseString(jsonStr) + .getAsJsonObject().get("protoPayload") + .getAsJsonObject().get("serviceData") + .getAsJsonObject().get("jobCompletedEvent") + .getAsJsonObject().get("job") + .getAsJsonObject().get("jobName") + .getAsJsonObject().get("jobId").getAsString(); + } + + public static String getGcsExportJobProjectId(String jsonStr){ + + return JsonParser.parseString(jsonStr) + .getAsJsonObject().get("protoPayload") + .getAsJsonObject().get("serviceData") + .getAsJsonObject().get("jobCompletedEvent") + .getAsJsonObject().get("job") + .getAsJsonObject().get("jobName") + .getAsJsonObject().get("projectId").getAsString(); + } + + public static String getGcsExportJobLabel(String jsonStr, String label){ + + return JsonParser.parseString(jsonStr) + .getAsJsonObject().get("protoPayload") + .getAsJsonObject().get("serviceData") + .getAsJsonObject().get("jobCompletedEvent") + .getAsJsonObject().get("job") + .getAsJsonObject().get("jobConfiguration") + .getAsJsonObject().get("labels").getAsJsonObject() + .get(label).getAsString(); + } + public static void main(String[] args) { SpringApplication.run(TaggerController.class, args); } diff --git a/services/tagger-app/src/main/test/TaggerControllerTest.java b/services/tagger-app/src/main/test/TaggerControllerTest.java new file mode 100644 index 0000000..c41afd3 --- /dev/null +++ b/services/tagger-app/src/main/test/TaggerControllerTest.java @@ -0,0 +1,102 @@ +import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.BackupMethod; +import com.google.cloud.pso.bq_snapshot_manager.tagger.TaggerController; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +public class TaggerControllerTest { + + private final String testEvent = "{\n" + + " \"insertId\": \"-ksjkpxe3omqc\",\n" + + " \"logName\": \"projects/backup-project/logs/cloudaudit.googleapis.com%2Fdata_access\",\n" + + " \"protoPayload\": {\n" + + " \"@type\": \"type.googleapis.com/google.cloud.audit.AuditLog\",\n" + + " \"authenticationInfo\": {\n" + + " \"principalEmail\": \"snapshoter-gcs@prject.iam.gserviceaccount.com\"\n" + + " },\n" + + " \"methodName\": \"jobservice.jobcompleted\",\n" + + " \"requestMetadata\": {\n" + + " \"callerIp\": \"35.203.254.111\",\n" + + " \"callerSuppliedUserAgent\": \"gcloud-java/2.16.1 Google-API-Java-Client/2.0.0 Google-HTTP-Java-Client/1.42.2 (gzip),gzip(gfe)\"\n" + + " },\n" + + " \"resourceName\": \"projects/project/jobs/test-job\",\n" + + " \"serviceData\": {\n" + + " \"@type\": \"type.googleapis.com/google.cloud.bigquery.logging.v1.AuditData\",\n" + + " \"jobCompletedEvent\": {\n" + + " \"eventName\": \"extract_job_completed\",\n" + + " \"job\": {\n" + + " \"jobConfiguration\": {\n" + + " \"extract\": {\n" + + " \"destinationUris\": [\n" + + " \"gs://bucket/project/dataset/table/tracking_id/timestamp/AVRO_SNAPPY/*\"\n" + + " ],\n" + + " \"sourceTable\": {\n" + + " \"datasetId\": \"test-dataset\",\n" + + " \"projectId\": \"test-project\",\n" + + " \"tableId\": \"test-table@1675701106000\"\n" + + " }\n" + + " },\n" + + " \"labels\": {\n" + + " \"app\": \"bq_backup_manager\",\n" + + " \"tracking_id\": \"test-tracking-id\",\n" + + " \"table_spec\": \"project.dataset.table\"\n" + + " }\n" + + " },\n" + + " \"jobName\": {\n" + + " \"jobId\": \"bq_backup_manager_export_1675960287852-F-4999ca89-d448-47d2-81de-fc5afba8e484\",\n" + + " \"location\": \"EU\",\n" + + " \"projectId\": \"project\"\n" + + " },\n" + + " \"jobStatistics\": {\n" + + " \"createTime\": \"2023-02-09T16:31:47.674Z\",\n" + + " \"endTime\": \"2023-02-09T16:31:48.399Z\",\n" + + " \"startTime\": \"2023-02-09T16:31:47.767Z\",\n" + + " \"totalSlotMs\": \"228\"\n" + + " },\n" + + " \"jobStatus\": {\n" + + " \"error\": \"{}\",\n" + + " \"state\": \"DONE\"\n" + + " }\n" + + " }\n" + + " }\n" + + " },\n" + + " \"serviceName\": \"bigquery.googleapis.com\",\n" + + " \"status\": \"status\"\n" + + " },\n" + + " \"receiveTimestamp\": \"2023-02-09T16:31:49.213113343Z\",\n" + + " \"resource\": {\n" + + " \"labels\": {\n" + + " \"project_id\": \"test-project\"\n" + + " },\n" + + " \"type\": \"bigquery_resource\"\n" + + " },\n" + + " \"severity\": \"INFO\",\n" + + " \"timestamp\": \"2023-02-09T16:31:48.416574Z\"\n" + + "}"; + + @Test + public void testParsingJobCompletionEvent() { + + assertEquals( + TaggerController.getGcsExportJobLabel(testEvent, "tracking_id"), + "test-tracking-id" + ); + + assertEquals( + TaggerController.getGcsExportJobLabel(testEvent, "table_spec"), + "project.dataset.table" + ); + + assertEquals( + TaggerController.getGcsExportJobId(testEvent), + "bq_backup_manager_export_1675960287852-F-4999ca89-d448-47d2-81de-fc5afba8e484" + ); + + + assertEquals( + TaggerController.getGcsExportJobError(testEvent), + "NA" + ); + + + } +} diff --git a/terraform/main.tf b/terraform/main.tf index d6b936d..4f2cbca 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -48,6 +48,11 @@ provider "google-beta" { } locals { + common_labels = { + "app" = var.application_name + "provisioned_by" = "terraform" + } + common_cloud_run_variables = [ { name = "PROJECT_ID", @@ -64,8 +69,27 @@ locals { { name = "GCS_FLAGS_BUCKET", value = module.gcs.create_gcs_flags_bucket_name + }, + { + name = "APPLICATION_NAME", + value = var.application_name } ] + + fallback_policy_default_level_backup_project = lookup(lookup(var.fallback_policy, "default_policy") , "backup_project") + fallback_policy_folder_level_backup_projects = [for k, v in lookup(var.fallback_policy, "folder_overrides"): lookup(v,"backup_project")] + fallback_policy_project_level_backup_projects = [for k, v in lookup(var.fallback_policy, "project_overrides"): lookup(v,"backup_project")] + fallback_policy_dataset_level_backup_projects = [for k, v in lookup(var.fallback_policy, "dataset_overrides"): lookup(v,"backup_project")] + fallback_policy_table_level_backup_projects = [for k, v in lookup(var.fallback_policy, "table_overrides"): lookup(v,"backup_project")] + fallback_policy_backup_projects = distinct(concat( + [local.fallback_policy_default_level_backup_project], + local.fallback_policy_folder_level_backup_projects, + local.fallback_policy_project_level_backup_projects, + local.fallback_policy_dataset_level_backup_projects, + local.fallback_policy_table_level_backup_projects + )) + all_backup_projects = distinct(concat(var.additional_backup_projects, local.fallback_policy_backup_projects)) + } module "iam" { @@ -97,6 +121,7 @@ module "gcs" { "serviceAccount:${module.iam.sa_snapshoter_gcs_email}", "serviceAccount:${module.iam.sa_tagger_email}", ] + common_labels = local.common_labels } module "bigquery" { @@ -105,6 +130,7 @@ module "bigquery" { region = var.data_region dataset = var.bigquery_dataset_name logging_sink_sa = module.cloud_logging.service_account + common_labels = local.common_labels } module "cloud_logging" { @@ -112,6 +138,7 @@ module "cloud_logging" { dataset = module.bigquery.results_dataset project = var.project log_sink_name = var.log_sink_name + application_name = var.application_name } @@ -139,6 +166,7 @@ module "cloud-run-dispatcher" { }, ] ) + common_labels = local.common_labels } module "cloud-run-configurator" { @@ -172,6 +200,8 @@ module "cloud-run-configurator" { } ] ) + + common_labels = local.common_labels } module "cloud-run-snapshoter-bq" { @@ -193,6 +223,8 @@ module "cloud-run-snapshoter-bq" { } ] ) + + common_labels = local.common_labels } module "cloud-run-snapshoter-gcs" { @@ -214,6 +246,8 @@ module "cloud-run-snapshoter-gcs" { } ] ) + + common_labels = local.common_labels } module "cloud-run-tagger" { @@ -234,6 +268,8 @@ module "cloud-run-tagger" { }, ] ) + + common_labels = local.common_labels } @@ -253,6 +289,8 @@ module "pubsub-dispatcher" { # avoid resending dispatcher messages if things went wrong and the msg was NAK (e.g. timeout expired, app error, etc) # min value must be at equal to the ack_deadline_seconds subscription_message_retention_duration = var.dispatcher_subscription_message_retention_duration + + common_labels = local.common_labels } module "pubsub-configurator" { @@ -268,6 +306,8 @@ module "pubsub-configurator" { # How long to retain unacknowledged messages in the subscription's backlog, from the moment a message is published. # In case of unexpected problems we want to avoid a buildup that re-trigger functions subscription_message_retention_duration = var.configurator_subscription_message_retention_duration + + common_labels = local.common_labels } module "pubsub-snapshoter-bq" { @@ -283,6 +323,8 @@ module "pubsub-snapshoter-bq" { # How long to retain unacknowledged messages in the subscription's backlog, from the moment a message is published. # In case of unexpected problems we want to avoid a buildup that re-trigger functions subscription_message_retention_duration = var.snapshoter_bq_subscription_message_retention_duration + + common_labels = local.common_labels } module "pubsub-snapshoter-gcs" { @@ -298,6 +340,8 @@ module "pubsub-snapshoter-gcs" { # How long to retain unacknowledged messages in the subscription's backlog, from the moment a message is published. # In case of unexpected problems we want to avoid a buildup that re-trigger functions subscription_message_retention_duration = var.snapshoter_gcs_subscription_message_retention_duration + + common_labels = local.common_labels } module "pubsub-tagger" { @@ -316,6 +360,8 @@ module "pubsub-tagger" { # How long to retain unacknowledged messages in the subscription's backlog, from the moment a message is published. # In case of unexpected problems we want to avoid a buildup that re-trigger functions subscription_message_retention_duration = var.tagger_subscription_message_retention_duration + + common_labels = local.common_labels } module "cloud-scheduler" { @@ -336,3 +382,14 @@ module "data-catalog" { tagTemplateUsers = [module.iam.sa_tagger_email] } +module "async-gcs-snapshoter" { + + count = length(local.all_backup_projects) + source = "./modules/async-gcs-snapshoter" + + application_name = var.application_name + log_project = local.all_backup_projects[count.index] + host_project = var.project + log_sink_name = "bq_backup_manager_gcs_export_pubsub_sink" + pubsub_topic_name = module.pubsub-tagger.topic-name +} \ No newline at end of file diff --git a/terraform/modules/async-gcs-snapshoter/main.tf b/terraform/modules/async-gcs-snapshoter/main.tf new file mode 100644 index 0000000..cca109b --- /dev/null +++ b/terraform/modules/async-gcs-snapshoter/main.tf @@ -0,0 +1,18 @@ + +// create a pubsub log sink in the backup project where the bq extract jobs run +resource "google_logging_project_sink" "backup_project_pubsub_sink" { + project = var.log_project + name = var.log_sink_name + destination = "pubsub.googleapis.com/projects/${var.host_project}/topics/${var.pubsub_topic_name}" + filter = "resource.type=bigquery_resource protoPayload.serviceData.jobCompletedEvent.eventName=extract_job_completed protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.app=${var.application_name}" + # Use a unique writer (creates a unique service account used for writing) + unique_writer_identity = true +} + +// grant access to the sink service account to publish messages to pubsub +resource "google_pubsub_topic_iam_member" "sa_topic_publisher" { + project = var.host_project + topic = "projects/${var.host_project}/topics/${var.pubsub_topic_name}" + role = "roles/pubsub.publisher" + member = google_logging_project_sink.backup_project_pubsub_sink.writer_identity +} \ No newline at end of file diff --git a/terraform/modules/async-gcs-snapshoter/variables.tf b/terraform/modules/async-gcs-snapshoter/variables.tf new file mode 100644 index 0000000..24bff16 --- /dev/null +++ b/terraform/modules/async-gcs-snapshoter/variables.tf @@ -0,0 +1,5 @@ +variable "host_project" {type = string} +variable "log_project" {type = string} +variable "log_sink_name" {type = string} +variable "application_name" {type = string} +variable "pubsub_topic_name" {type = string} \ No newline at end of file diff --git a/terraform/modules/bigquery/main.tf b/terraform/modules/bigquery/main.tf index c09ca13..211fd07 100644 --- a/terraform/modules/bigquery/main.tf +++ b/terraform/modules/bigquery/main.tf @@ -9,6 +9,7 @@ resource "google_bigquery_dataset" "results_dataset" { location = var.region dataset_id = var.dataset description = "To store DLP results from BQ Security Classifier app" + labels = var.common_labels } # Logging BQ sink must be able to write data to logging table in the dataset @@ -35,6 +36,8 @@ resource "google_bigquery_table" "logging_table" { schema = file("modules/bigquery/schema/run_googleapis_com_stdout.json") deletion_protection = true + + labels = var.common_labels } @@ -56,6 +59,8 @@ resource "google_bigquery_table" "view_audit_log_by_table" { } ) } + + labels = var.common_labels } resource "google_bigquery_table" "view_audit_log_by_table_grouped" { @@ -74,6 +79,8 @@ resource "google_bigquery_table" "view_audit_log_by_table_grouped" { } ) } + + labels = var.common_labels } resource "google_bigquery_table" "logging_view_steps" { @@ -92,6 +99,8 @@ resource "google_bigquery_table" "logging_view_steps" { } ) } + + labels = var.common_labels } resource "google_bigquery_table" "view_service_calls" { @@ -110,6 +119,8 @@ resource "google_bigquery_table" "view_service_calls" { } ) } + + labels = var.common_labels } resource "google_bigquery_table" "view_run_summary" { @@ -128,6 +139,8 @@ resource "google_bigquery_table" "view_run_summary" { } ) } + + labels = var.common_labels } resource "google_bigquery_table" "view_run_summary_counts" { @@ -147,6 +160,8 @@ resource "google_bigquery_table" "view_run_summary_counts" { } ) } + + labels = var.common_labels } resource "google_bigquery_table" "view_errors_non_retryable" { @@ -165,6 +180,8 @@ resource "google_bigquery_table" "view_errors_non_retryable" { } ) } + + labels = var.common_labels } resource "google_bigquery_table" "view_errors_retryable" { @@ -183,6 +200,8 @@ resource "google_bigquery_table" "view_errors_retryable" { } ) } + + labels = var.common_labels } resource "google_bigquery_table" "view_tracking_id_map" { @@ -201,6 +220,8 @@ resource "google_bigquery_table" "view_tracking_id_map" { } ) } + + labels = var.common_labels } resource "google_bigquery_table" "view_run_duration" { @@ -219,6 +240,8 @@ resource "google_bigquery_table" "view_run_duration" { } ) } + + labels = var.common_labels } diff --git a/terraform/modules/bigquery/variables.tf b/terraform/modules/bigquery/variables.tf index e8a74d8..dd774cb 100644 --- a/terraform/modules/bigquery/variables.tf +++ b/terraform/modules/bigquery/variables.tf @@ -14,6 +14,10 @@ variable "logging_sink_sa" { type = string } +variable "common_labels" { + type = map(string) +} + diff --git a/terraform/modules/cloud-logging/main.tf b/terraform/modules/cloud-logging/main.tf index 82139eb..6f6555b 100644 --- a/terraform/modules/cloud-logging/main.tf +++ b/terraform/modules/cloud-logging/main.tf @@ -3,7 +3,7 @@ resource "google_logging_project_sink" "bigquery-logging-sink" { name = var.log_sink_name destination = "bigquery.googleapis.com/projects/${var.project}/datasets/${var.dataset}" - filter = "resource.type=cloud_run_revision jsonPayload.global_app=bq_backup_manager" + filter = "resource.type=cloud_run_revision jsonPayload.global_app=${var.application_name}" # Use a unique writer (creates a unique service account used for writing) unique_writer_identity = true bigquery_options { diff --git a/terraform/modules/cloud-logging/variables.tf b/terraform/modules/cloud-logging/variables.tf index c2e0dad..59d091d 100644 --- a/terraform/modules/cloud-logging/variables.tf +++ b/terraform/modules/cloud-logging/variables.tf @@ -1,3 +1,4 @@ variable "project" {type = string} variable "dataset" {type = string} -variable "log_sink_name" {type = string} \ No newline at end of file +variable "log_sink_name" {type = string} +variable "application_name" {type = string} \ No newline at end of file diff --git a/terraform/modules/cloud-run/main.tf b/terraform/modules/cloud-run/main.tf index 00bad8c..99ba5af 100644 --- a/terraform/modules/cloud-run/main.tf +++ b/terraform/modules/cloud-run/main.tf @@ -11,7 +11,6 @@ resource "google_cloud_run_service" "service" { template { spec { - timeout_seconds = var.timeout_seconds service_account_name = var.service_account_email @@ -47,18 +46,13 @@ resource "google_cloud_run_service" "service" { metadata { annotations = { "autoscaling.knative.dev/maxScale" = var.max_containers + "run.googleapis.com/ingress" : "internal" } + labels = var.common_labels } } - metadata { - annotations = { - "run.googleapis.com/ingress" : "internal" - } - } - - traffic { percent = 100 latest_revision = true diff --git a/terraform/modules/cloud-run/variables.tf b/terraform/modules/cloud-run/variables.tf index 0c66269..4339ab0 100644 --- a/terraform/modules/cloud-run/variables.tf +++ b/terraform/modules/cloud-run/variables.tf @@ -31,3 +31,7 @@ variable "max_requests_per_container" { } variable "timeout_seconds" {type = number} + +variable "common_labels" { + type = map(string) +} \ No newline at end of file diff --git a/terraform/modules/gcs/main.tf b/terraform/modules/gcs/main.tf index 578585f..a0e558c 100644 --- a/terraform/modules/gcs/main.tf +++ b/terraform/modules/gcs/main.tf @@ -19,6 +19,8 @@ resource "google_storage_bucket" "gcs_flags_bucket" { } uniform_bucket_level_access = true + + labels = var.common_labels } resource "google_storage_bucket_iam_binding" "gcs_flags_bucket_iam_bindings" { diff --git a/terraform/modules/gcs/variables.tf b/terraform/modules/gcs/variables.tf index 2b2ea57..295b224 100644 --- a/terraform/modules/gcs/variables.tf +++ b/terraform/modules/gcs/variables.tf @@ -6,4 +6,8 @@ variable "gcs_flags_bucket_name" {type = string} variable "gcs_flags_bucket_admins" { type = list(string) +} + +variable "common_labels" { + type = map(string) } \ No newline at end of file diff --git a/terraform/modules/pubsub/main.tf b/terraform/modules/pubsub/main.tf index c0d5a28..a6a6fdc 100644 --- a/terraform/modules/pubsub/main.tf +++ b/terraform/modules/pubsub/main.tf @@ -4,6 +4,7 @@ resource "google_pubsub_topic" "topic" { project = var.project name = var.topic + labels = var.common_labels } # https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_subscription @@ -48,6 +49,7 @@ resource "google_pubsub_subscription" "subscription" { service_account_email = var.subscription_service_account } } + labels = var.common_labels } # Allow an SA to publish to this topic diff --git a/terraform/modules/pubsub/variables.tf b/terraform/modules/pubsub/variables.tf index e588073..2be1f88 100644 --- a/terraform/modules/pubsub/variables.tf +++ b/terraform/modules/pubsub/variables.tf @@ -9,3 +9,6 @@ variable "topic_publishers_sa_emails" { } variable "subscription_message_retention_duration" {type = string} variable "subscription_ack_deadline_seconds" {type = number} +variable "common_labels" { + type = map(string) +} diff --git a/terraform/outputs.tf b/terraform/outputs.tf index e69de29..f9fdd02 100644 --- a/terraform/outputs.tf +++ b/terraform/outputs.tf @@ -0,0 +1,7 @@ +output "local_fallback_policy_backup_projects" { + value = local.fallback_policy_backup_projects +} + +output "local_all_backup_projects" { + value = local.all_backup_projects +} \ No newline at end of file diff --git a/terraform/variables.tf b/terraform/variables.tf index b7373dd..af30f7c 100644 --- a/terraform/variables.tf +++ b/terraform/variables.tf @@ -12,6 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +variable "application_name" { + type = string + default = "bq_backup_manager" +} + variable "project" { type = string } @@ -359,6 +364,12 @@ variable "fallback_policy" { }) } +// make sure that you include all projects in this list while calling /scripts/prepare_backup_projects.sh to grant terraform SA permissions to deploy resources there +variable "additional_backup_projects" { + type = list(string) + description = "Projects were backup operations will run but not defined in the fallback policy (e.g. in Tag policies). Used to deploy required resources on these projects." +} + From 607e1622299539fe62f56a4c2015b38197c6ec1b Mon Sep 17 00:00:00 2001 From: Karim Wadie Date: Tue, 14 Feb 2023 12:20:37 +0100 Subject: [PATCH 2/2] added async gcs snapshoter PR review comments --- .../functions/f03_snapshoter/BigQuerySnapshoter.java | 3 ++- .../pso/bq_snapshot_manager/helpers/TrackingHelper.java | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/BigQuerySnapshoter.java b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/BigQuerySnapshoter.java index bc5aade..9ec44a2 100644 --- a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/BigQuerySnapshoter.java +++ b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/BigQuerySnapshoter.java @@ -25,6 +25,7 @@ import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.BackupMethod; import com.google.cloud.pso.bq_snapshot_manager.functions.f04_tagger.TaggerRequest; import com.google.cloud.pso.bq_snapshot_manager.helpers.LoggingHelper; +import com.google.cloud.pso.bq_snapshot_manager.helpers.TrackingHelper; import com.google.cloud.pso.bq_snapshot_manager.helpers.Utils; import com.google.cloud.pso.bq_snapshot_manager.services.bq.BigQueryService; import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.FailedPubSubMessage; @@ -141,7 +142,7 @@ public BigQuerySnapshoterResponse execute(SnapshoterRequest request, Timestamp o if(!request.isDryRun()){ // API Call - String jobId = String.format("%s_%s_%s", Globals.APPLICATION_NAME, "snapshot", request.getTrackingId()); + String jobId = TrackingHelper.generateBQSnapshotJobId(request.getTrackingId()); bqService.createSnapshot( jobId, diff --git a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/helpers/TrackingHelper.java b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/helpers/TrackingHelper.java index df1f234..955dc25 100644 --- a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/helpers/TrackingHelper.java +++ b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/helpers/TrackingHelper.java @@ -77,4 +77,9 @@ public static String generateBQExportJobId(String trackingId){ public static String parseTrackingIdFromBQExportJobId(String bqExportJobId){ return Utils.tokenize(bqExportJobId,"_", true).get(0); } + + public static String generateBQSnapshotJobId(String trackingId){ + return String.format("%s_%s_%s",trackingId, "snapshot", Globals.APPLICATION_NAME, trackingId); + } + }