Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added async gcs snapshoter #66

Merged
merged 2 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,20 @@ BigQuery Types to Avro Logical Types mapping:
| `TIME` | `timestamp-micro` (annotates Avro `LONG`) |
| `DATETIME` | `STRING` (custom named logical type `datetime`) |

##### Configure Additional Backup Projects

Terraform needs to deploy resources to the backup projects where the backup operations will run. For example, log
sinks that send notifications to the Tagger once a backup operation has completed.

By default, all projects listed in the `backup_project` field in the fallback policy will be automatically included.
However, for additional backup projects such as the ones defined in external configuration (i.e. table backup policy tags),
one must add them to the below list.

```
additional_backup_projects = ["project1", "project2", ..]
```

If you're only using the fallback backup policy and without table-level external policies, you can set this variable to an empty list `[]`

#### Terraform Deployment

Expand Down
14 changes: 10 additions & 4 deletions scripts/prepare_backup_projects.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,15 @@ do
--member="serviceAccount:${SA_SNAPSHOTER_GCS_EMAIL}" \
--role="roles/bigquery.jobUser"

# GCS Snapshoter needs to write to GCS
gcloud projects add-iam-policy-binding "${project}" \
--member="serviceAccount:${SA_SNAPSHOTER_GCS_EMAIL}" \
--role="roles/storage.objectAdmin"
# GCS Snapshoter needs to write to GCS
gcloud projects add-iam-policy-binding "${project}" \
--member="serviceAccount:${SA_SNAPSHOTER_GCS_EMAIL}" \
--role="roles/storage.objectAdmin"

# Terraform needs to create log sinks to capture GCS export operation completion
gcloud projects add-iam-policy-binding "${project}" \
--member="serviceAccount:${TF_SA}@${PROJECT_ID}.iam.gserviceaccount.com" \
--role="roles/logging.configWriter"


done
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import com.google.cloud.Timestamp;
import com.google.cloud.Tuple;
import com.google.cloud.pso.bq_snapshot_manager.entities.Globals;
import com.google.cloud.pso.bq_snapshot_manager.entities.NonRetryableApplicationException;
import com.google.cloud.pso.bq_snapshot_manager.entities.TableSpec;
import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.BackupMethod;
import com.google.cloud.pso.bq_snapshot_manager.functions.f04_tagger.TaggerRequest;
import com.google.cloud.pso.bq_snapshot_manager.helpers.LoggingHelper;
import com.google.cloud.pso.bq_snapshot_manager.helpers.TrackingHelper;
import com.google.cloud.pso.bq_snapshot_manager.helpers.Utils;
import com.google.cloud.pso.bq_snapshot_manager.services.bq.BigQueryService;
import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.FailedPubSubMessage;
Expand Down Expand Up @@ -140,7 +142,10 @@ public BigQuerySnapshoterResponse execute(SnapshoterRequest request, Timestamp o

if(!request.isDryRun()){
// API Call
String jobId = TrackingHelper.generateBQSnapshotJobId(request.getTrackingId());

bqService.createSnapshot(
jobId,
sourceTableWithTimeTravelTuple.x(),
snapshotTable,
expiryTs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,22 @@

import com.google.cloud.Timestamp;
import com.google.cloud.Tuple;
import com.google.cloud.pso.bq_snapshot_manager.entities.Globals;
import com.google.cloud.pso.bq_snapshot_manager.entities.NonRetryableApplicationException;
import com.google.cloud.pso.bq_snapshot_manager.entities.TableSpec;
import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.BackupMethod;
import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.GCSSnapshotFormat;
import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.TimeTravelOffsetDays;
import com.google.cloud.pso.bq_snapshot_manager.functions.f04_tagger.TaggerRequest;
import com.google.cloud.pso.bq_snapshot_manager.helpers.LoggingHelper;
import com.google.cloud.pso.bq_snapshot_manager.helpers.TrackingHelper;
import com.google.cloud.pso.bq_snapshot_manager.helpers.Utils;
import com.google.cloud.pso.bq_snapshot_manager.services.bq.BigQueryService;
import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.FailedPubSubMessage;
import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.PubSubPublishResults;
import com.google.cloud.pso.bq_snapshot_manager.services.map.PersistentMap;
import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.PubSubService;
import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.SuccessPubSubMessage;
import com.google.cloud.pso.bq_snapshot_manager.services.set.PersistentSet;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class GCSSnapshoter {

Expand All @@ -31,19 +29,26 @@ public class GCSSnapshoter {
private final PersistentSet persistentSet;
private final String persistentSetObjectPrefix;

private final PersistentMap persistentMap;

private final String persistentMapObjectPrefix;

public GCSSnapshoter(SnapshoterConfig config,
BigQueryService bqService,
PubSubService pubSubService,
PersistentSet persistentSet,
String persistentSetObjectPrefix,
PersistentMap persistentMap,
String persistentMapObjectPrefix,
Integer functionNumber
) {
this.config = config;
this.bqService = bqService;
this.pubSubService = pubSubService;
this.persistentSet = persistentSet;
this.persistentSetObjectPrefix = persistentSetObjectPrefix;
this.persistentMap = persistentMap;
this.persistentMapObjectPrefix = persistentMapObjectPrefix;

logger = new LoggingHelper(
GCSSnapshoter.class.getSimpleName(),
Expand Down Expand Up @@ -123,59 +128,56 @@ public GCSSnapshoterResponse execute(SnapshoterRequest request, Timestamp operat
);

if(!request.isDryRun()){
// create an async bq export job

String jobId = TrackingHelper.generateBQExportJobId(request.getTrackingId());

// We create the tagging request and added it to a persistent storage
// The Tagger service will receive notifications of export job completion via log sinks and pick up the tagger request from the persistent storage
// Make sure the file is stored first before running the export job. In case of non-fatal error of file creation and retry, we don't re-run the export job
TaggerRequest taggerRequest = new TaggerRequest(
request.getTargetTable(),
request.getRunId(),
request.getTrackingId(),
request.isDryRun(),
request.getBackupPolicy(),
BackupMethod.GCS_SNAPSHOT,
null,
gcsDestinationUri,
operationTs
);

String taggerRequestFile = String.format("%s/%s", persistentMapObjectPrefix, jobId);
persistentMap.put(taggerRequestFile, taggerRequest.toJsonString());

Map<String, String> jobLabels = new HashMap<>();
// labels has to be max 63 chars, contain only lowercase letters, numeric characters, underscores, and dashes. All characters must use UTF-8 encoding, and international characters are allowed.
jobLabels.put("app", Globals.APPLICATION_NAME);

// API Call
bqService.exportToGCS(
jobId,
sourceTableWithTimeTravelTuple.x(),
gcsDestinationUri,
request.getBackupPolicy().getGcsExportFormat(),
null,
null,
null,
request.getTrackingId()
request.getBackupPolicy().getGcsCsvDelimiter(),
request.getBackupPolicy().getGcsCsvExportHeader(),
request.getBackupPolicy().getGcsUseAvroLogicalTypes(),
request.getTrackingId(),
jobLabels
);
}


logger.logInfoWithTracker(
request.isDryRun(),
request.getTrackingId(),
request.getTargetTable(),
String.format("BigQuery GCS export completed for table %s to %s",
String.format("BigQuery GCS export submitted for table %s to %s",
request.getTargetTable().toSqlString(),
gcsDestinationUri
)
);

// Create a Tagger request and send it to the Tagger PubSub topic
TaggerRequest taggerRequest = new TaggerRequest(
request.getTargetTable(),
request.getRunId(),
request.getTrackingId(),
request.isDryRun(),
request.getBackupPolicy(),
BackupMethod.GCS_SNAPSHOT,
null,
gcsDestinationUri,
operationTs
);

// Publish the list of tagging requests to PubSub
PubSubPublishResults publishResults = pubSubService.publishTableOperationRequests(
config.getProjectId(),
config.getOutputTopic(),
Arrays.asList(taggerRequest)
);

for (FailedPubSubMessage msg : publishResults.getFailedMessages()) {
String logMsg = String.format("Failed to publish this message %s", msg.toString());
logger.logWarnWithTracker(request.isDryRun(),request.getTrackingId(), request.getTargetTable(), logMsg);
}

for (SuccessPubSubMessage msg : publishResults.getSuccessMessages()) {
String logMsg = String.format("Published this message %s", msg.toString());
logger.logInfoWithTracker(request.isDryRun(),request.getTrackingId(), request.getTargetTable(), logMsg);
}

// run common service end logging and adding pubsub message to processed list
Utils.runServiceEndRoutines(
logger,
Expand All @@ -191,9 +193,7 @@ public GCSSnapshoterResponse execute(SnapshoterRequest request, Timestamp operat
request.getTrackingId(),
request.isDryRun(),
operationTs,
sourceTableWithTimeTravelTuple.x(),
taggerRequest,
publishResults
sourceTableWithTimeTravelTuple.x()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,16 @@
import com.google.cloud.Timestamp;
import com.google.cloud.pso.bq_snapshot_manager.entities.TableOperationRequestResponse;
import com.google.cloud.pso.bq_snapshot_manager.entities.TableSpec;
import com.google.cloud.pso.bq_snapshot_manager.functions.f04_tagger.TaggerRequest;
import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.PubSubPublishResults;

public class GCSSnapshoterResponse extends TableOperationRequestResponse {

private final Timestamp operationTs;
private final TableSpec computedSourceTable;
private final TaggerRequest outputTaggerRequest;
private final PubSubPublishResults pubSubPublishResults;

public GCSSnapshoterResponse(TableSpec targetTable, String runId, String trackingId, boolean isDryRun, Timestamp operationTs, TableSpec computedSourceTable, TaggerRequest outputTaggerRequest, PubSubPublishResults pubSubPublishResults) {
public GCSSnapshoterResponse(TableSpec targetTable, String runId, String trackingId, boolean isDryRun, Timestamp operationTs, TableSpec computedSourceTable) {
super(targetTable, runId, trackingId, isDryRun);
this.operationTs = operationTs;
this.computedSourceTable = computedSourceTable;
this.outputTaggerRequest = outputTaggerRequest;
this.pubSubPublishResults = pubSubPublishResults;
}

public Timestamp getOperationTs() {
Expand All @@ -29,11 +23,4 @@ public TableSpec getComputedSourceTable() {
return computedSourceTable;
}

public TaggerRequest getOutputTaggerRequest() {
return outputTaggerRequest;
}

public PubSubPublishResults getPubSubPublishResults() {
return pubSubPublishResults;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.pso.bq_snapshot_manager.helpers;

import com.google.cloud.Timestamp;
import com.google.cloud.pso.bq_snapshot_manager.entities.Globals;

import java.util.UUID;

Expand Down Expand Up @@ -68,4 +69,17 @@ public static Timestamp parseRunIdAsTimestamp(String runId){
public static String generateTrackingId (String runId){
return String.format("%s-%s", runId, UUID.randomUUID().toString());
}

public static String generateBQExportJobId(String trackingId){
return String.format("%s_%s_%s",trackingId, "export", Globals.APPLICATION_NAME, trackingId);
}

public static String parseTrackingIdFromBQExportJobId(String bqExportJobId){
return Utils.tokenize(bqExportJobId,"_", true).get(0);
}

public static String generateBQSnapshotJobId(String trackingId){
return String.format("%s_%s_%s",trackingId, "snapshot", Globals.APPLICATION_NAME, trackingId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,25 @@
import java.io.IOException;
import java.math.BigInteger;
import java.util.List;
import java.util.Map;

public interface BigQueryService {
void createSnapshot(TableSpec sourceTable,
TableSpec destinationId,
Timestamp snapshotExpirationTs,
String trackingId) throws InterruptedException;
void createSnapshot(
String jobId,
TableSpec sourceTable,
TableSpec destinationId,
Timestamp snapshotExpirationTs,
String trackingId) throws InterruptedException;

void exportToGCS(TableSpec sourceTable,
String gcsDestinationUri,
GCSSnapshotFormat exportFormat,
@Nullable String csvFieldDelimiter,
@Nullable Boolean csvPrintHeader,
@Nullable Boolean useAvroLogicalTypes,
String trackingId) throws InterruptedException;
void exportToGCS(
String jobId,
TableSpec sourceTable,
String gcsDestinationUri,
GCSSnapshotFormat exportFormat,
@Nullable String csvFieldDelimiter,
@Nullable Boolean csvPrintHeader,
@Nullable Boolean useAvroLogicalTypes,
String trackingId,
Map<String, String> jobLabels
) throws InterruptedException;
}
Loading