Skip to content

Commit

Permalink
Merge pull request #66 from kwadie/feature/async_gcs_snapshoter
Browse files Browse the repository at this point in the history
Added async gcs snapshoter
  • Loading branch information
kwadie authored Feb 14, 2023
2 parents 6dd0e97 + 607e162 commit 9d1080c
Show file tree
Hide file tree
Showing 33 changed files with 592 additions and 123 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,20 @@ BigQuery Types to Avro Logical Types mapping:
| `TIME` | `timestamp-micro` (annotates Avro `LONG`) |
| `DATETIME` | `STRING` (custom named logical type `datetime`) |

##### Configure Additional Backup Projects

Terraform needs to deploy resources to the backup projects where the backup operations will run. For example, log
sinks that send notifications to the Tagger once a backup operation has completed.

By default, all projects listed in the `backup_project` field in the fallback policy will be automatically included.
However, for additional backup projects such as the ones defined in external configuration (i.e. table backup policy tags),
one must add them to the below list.

```
additional_backup_projects = ["project1", "project2", ..]
```

If you're only using the fallback backup policy and without table-level external policies, you can set this variable to an empty list `[]`

#### Terraform Deployment

Expand Down
14 changes: 10 additions & 4 deletions scripts/prepare_backup_projects.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,15 @@ do
--member="serviceAccount:${SA_SNAPSHOTER_GCS_EMAIL}" \
--role="roles/bigquery.jobUser"

# GCS Snapshoter needs to write to GCS
gcloud projects add-iam-policy-binding "${project}" \
--member="serviceAccount:${SA_SNAPSHOTER_GCS_EMAIL}" \
--role="roles/storage.objectAdmin"
# GCS Snapshoter needs to write to GCS
gcloud projects add-iam-policy-binding "${project}" \
--member="serviceAccount:${SA_SNAPSHOTER_GCS_EMAIL}" \
--role="roles/storage.objectAdmin"

# Terraform needs to create log sinks to capture GCS export operation completion
gcloud projects add-iam-policy-binding "${project}" \
--member="serviceAccount:${TF_SA}@${PROJECT_ID}.iam.gserviceaccount.com" \
--role="roles/logging.configWriter"


done
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import com.google.cloud.Timestamp;
import com.google.cloud.Tuple;
import com.google.cloud.pso.bq_snapshot_manager.entities.Globals;
import com.google.cloud.pso.bq_snapshot_manager.entities.NonRetryableApplicationException;
import com.google.cloud.pso.bq_snapshot_manager.entities.TableSpec;
import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.BackupMethod;
import com.google.cloud.pso.bq_snapshot_manager.functions.f04_tagger.TaggerRequest;
import com.google.cloud.pso.bq_snapshot_manager.helpers.LoggingHelper;
import com.google.cloud.pso.bq_snapshot_manager.helpers.TrackingHelper;
import com.google.cloud.pso.bq_snapshot_manager.helpers.Utils;
import com.google.cloud.pso.bq_snapshot_manager.services.bq.BigQueryService;
import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.FailedPubSubMessage;
Expand Down Expand Up @@ -140,7 +142,10 @@ public BigQuerySnapshoterResponse execute(SnapshoterRequest request, Timestamp o

if(!request.isDryRun()){
// API Call
String jobId = TrackingHelper.generateBQSnapshotJobId(request.getTrackingId());

bqService.createSnapshot(
jobId,
sourceTableWithTimeTravelTuple.x(),
snapshotTable,
expiryTs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,22 @@

import com.google.cloud.Timestamp;
import com.google.cloud.Tuple;
import com.google.cloud.pso.bq_snapshot_manager.entities.Globals;
import com.google.cloud.pso.bq_snapshot_manager.entities.NonRetryableApplicationException;
import com.google.cloud.pso.bq_snapshot_manager.entities.TableSpec;
import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.BackupMethod;
import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.GCSSnapshotFormat;
import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.TimeTravelOffsetDays;
import com.google.cloud.pso.bq_snapshot_manager.functions.f04_tagger.TaggerRequest;
import com.google.cloud.pso.bq_snapshot_manager.helpers.LoggingHelper;
import com.google.cloud.pso.bq_snapshot_manager.helpers.TrackingHelper;
import com.google.cloud.pso.bq_snapshot_manager.helpers.Utils;
import com.google.cloud.pso.bq_snapshot_manager.services.bq.BigQueryService;
import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.FailedPubSubMessage;
import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.PubSubPublishResults;
import com.google.cloud.pso.bq_snapshot_manager.services.map.PersistentMap;
import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.PubSubService;
import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.SuccessPubSubMessage;
import com.google.cloud.pso.bq_snapshot_manager.services.set.PersistentSet;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class GCSSnapshoter {

Expand All @@ -31,19 +29,26 @@ public class GCSSnapshoter {
private final PersistentSet persistentSet;
private final String persistentSetObjectPrefix;

private final PersistentMap persistentMap;

private final String persistentMapObjectPrefix;

public GCSSnapshoter(SnapshoterConfig config,
BigQueryService bqService,
PubSubService pubSubService,
PersistentSet persistentSet,
String persistentSetObjectPrefix,
PersistentMap persistentMap,
String persistentMapObjectPrefix,
Integer functionNumber
) {
this.config = config;
this.bqService = bqService;
this.pubSubService = pubSubService;
this.persistentSet = persistentSet;
this.persistentSetObjectPrefix = persistentSetObjectPrefix;
this.persistentMap = persistentMap;
this.persistentMapObjectPrefix = persistentMapObjectPrefix;

logger = new LoggingHelper(
GCSSnapshoter.class.getSimpleName(),
Expand Down Expand Up @@ -123,59 +128,56 @@ public GCSSnapshoterResponse execute(SnapshoterRequest request, Timestamp operat
);

if(!request.isDryRun()){
// create an async bq export job

String jobId = TrackingHelper.generateBQExportJobId(request.getTrackingId());

// We create the tagging request and added it to a persistent storage
// The Tagger service will receive notifications of export job completion via log sinks and pick up the tagger request from the persistent storage
// Make sure the file is stored first before running the export job. In case of non-fatal error of file creation and retry, we don't re-run the export job
TaggerRequest taggerRequest = new TaggerRequest(
request.getTargetTable(),
request.getRunId(),
request.getTrackingId(),
request.isDryRun(),
request.getBackupPolicy(),
BackupMethod.GCS_SNAPSHOT,
null,
gcsDestinationUri,
operationTs
);

String taggerRequestFile = String.format("%s/%s", persistentMapObjectPrefix, jobId);
persistentMap.put(taggerRequestFile, taggerRequest.toJsonString());

Map<String, String> jobLabels = new HashMap<>();
// labels has to be max 63 chars, contain only lowercase letters, numeric characters, underscores, and dashes. All characters must use UTF-8 encoding, and international characters are allowed.
jobLabels.put("app", Globals.APPLICATION_NAME);

// API Call
bqService.exportToGCS(
jobId,
sourceTableWithTimeTravelTuple.x(),
gcsDestinationUri,
request.getBackupPolicy().getGcsExportFormat(),
null,
null,
null,
request.getTrackingId()
request.getBackupPolicy().getGcsCsvDelimiter(),
request.getBackupPolicy().getGcsCsvExportHeader(),
request.getBackupPolicy().getGcsUseAvroLogicalTypes(),
request.getTrackingId(),
jobLabels
);
}


logger.logInfoWithTracker(
request.isDryRun(),
request.getTrackingId(),
request.getTargetTable(),
String.format("BigQuery GCS export completed for table %s to %s",
String.format("BigQuery GCS export submitted for table %s to %s",
request.getTargetTable().toSqlString(),
gcsDestinationUri
)
);

// Create a Tagger request and send it to the Tagger PubSub topic
TaggerRequest taggerRequest = new TaggerRequest(
request.getTargetTable(),
request.getRunId(),
request.getTrackingId(),
request.isDryRun(),
request.getBackupPolicy(),
BackupMethod.GCS_SNAPSHOT,
null,
gcsDestinationUri,
operationTs
);

// Publish the list of tagging requests to PubSub
PubSubPublishResults publishResults = pubSubService.publishTableOperationRequests(
config.getProjectId(),
config.getOutputTopic(),
Arrays.asList(taggerRequest)
);

for (FailedPubSubMessage msg : publishResults.getFailedMessages()) {
String logMsg = String.format("Failed to publish this message %s", msg.toString());
logger.logWarnWithTracker(request.isDryRun(),request.getTrackingId(), request.getTargetTable(), logMsg);
}

for (SuccessPubSubMessage msg : publishResults.getSuccessMessages()) {
String logMsg = String.format("Published this message %s", msg.toString());
logger.logInfoWithTracker(request.isDryRun(),request.getTrackingId(), request.getTargetTable(), logMsg);
}

// run common service end logging and adding pubsub message to processed list
Utils.runServiceEndRoutines(
logger,
Expand All @@ -191,9 +193,7 @@ public GCSSnapshoterResponse execute(SnapshoterRequest request, Timestamp operat
request.getTrackingId(),
request.isDryRun(),
operationTs,
sourceTableWithTimeTravelTuple.x(),
taggerRequest,
publishResults
sourceTableWithTimeTravelTuple.x()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,16 @@
import com.google.cloud.Timestamp;
import com.google.cloud.pso.bq_snapshot_manager.entities.TableOperationRequestResponse;
import com.google.cloud.pso.bq_snapshot_manager.entities.TableSpec;
import com.google.cloud.pso.bq_snapshot_manager.functions.f04_tagger.TaggerRequest;
import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.PubSubPublishResults;

public class GCSSnapshoterResponse extends TableOperationRequestResponse {

private final Timestamp operationTs;
private final TableSpec computedSourceTable;
private final TaggerRequest outputTaggerRequest;
private final PubSubPublishResults pubSubPublishResults;

public GCSSnapshoterResponse(TableSpec targetTable, String runId, String trackingId, boolean isDryRun, Timestamp operationTs, TableSpec computedSourceTable, TaggerRequest outputTaggerRequest, PubSubPublishResults pubSubPublishResults) {
public GCSSnapshoterResponse(TableSpec targetTable, String runId, String trackingId, boolean isDryRun, Timestamp operationTs, TableSpec computedSourceTable) {
super(targetTable, runId, trackingId, isDryRun);
this.operationTs = operationTs;
this.computedSourceTable = computedSourceTable;
this.outputTaggerRequest = outputTaggerRequest;
this.pubSubPublishResults = pubSubPublishResults;
}

public Timestamp getOperationTs() {
Expand All @@ -29,11 +23,4 @@ public TableSpec getComputedSourceTable() {
return computedSourceTable;
}

public TaggerRequest getOutputTaggerRequest() {
return outputTaggerRequest;
}

public PubSubPublishResults getPubSubPublishResults() {
return pubSubPublishResults;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.pso.bq_snapshot_manager.helpers;

import com.google.cloud.Timestamp;
import com.google.cloud.pso.bq_snapshot_manager.entities.Globals;

import java.util.UUID;

Expand Down Expand Up @@ -68,4 +69,17 @@ public static Timestamp parseRunIdAsTimestamp(String runId){
public static String generateTrackingId (String runId){
return String.format("%s-%s", runId, UUID.randomUUID().toString());
}

public static String generateBQExportJobId(String trackingId){
return String.format("%s_%s_%s",trackingId, "export", Globals.APPLICATION_NAME, trackingId);
}

public static String parseTrackingIdFromBQExportJobId(String bqExportJobId){
return Utils.tokenize(bqExportJobId,"_", true).get(0);
}

public static String generateBQSnapshotJobId(String trackingId){
return String.format("%s_%s_%s",trackingId, "snapshot", Globals.APPLICATION_NAME, trackingId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,25 @@
import java.io.IOException;
import java.math.BigInteger;
import java.util.List;
import java.util.Map;

public interface BigQueryService {
void createSnapshot(TableSpec sourceTable,
TableSpec destinationId,
Timestamp snapshotExpirationTs,
String trackingId) throws InterruptedException;
void createSnapshot(
String jobId,
TableSpec sourceTable,
TableSpec destinationId,
Timestamp snapshotExpirationTs,
String trackingId) throws InterruptedException;

void exportToGCS(TableSpec sourceTable,
String gcsDestinationUri,
GCSSnapshotFormat exportFormat,
@Nullable String csvFieldDelimiter,
@Nullable Boolean csvPrintHeader,
@Nullable Boolean useAvroLogicalTypes,
String trackingId) throws InterruptedException;
void exportToGCS(
String jobId,
TableSpec sourceTable,
String gcsDestinationUri,
GCSSnapshotFormat exportFormat,
@Nullable String csvFieldDelimiter,
@Nullable Boolean csvPrintHeader,
@Nullable Boolean useAvroLogicalTypes,
String trackingId,
Map<String, String> jobLabels
) throws InterruptedException;
}
Loading

0 comments on commit 9d1080c

Please sign in to comment.