Skip to content

Commit

Permalink
Merge pull request #82 from kwadie/fix/fallback_for_folders_op_project
Browse files Browse the repository at this point in the history
Lookup project parent folder ID to determine folder-level fallback policy
Remove cloud_scheduler_sa variable in Terraform and compute it dynamically
Set default additional_backup_operation_project to empty list
  • Loading branch information
kwadie authored Apr 19, 2023
2 parents c8dedcd + b1a9633 commit d87f9a6
Show file tree
Hide file tree
Showing 19 changed files with 321 additions and 200 deletions.
26 changes: 17 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* [BigQuery Snapshot Policy Fields](#bigquery-snapshot-policy-fields)
* [GCS Snapshot Policy Fields](#gcs-snapshot-policy-fields)
* [Terraform Deployment](#terraform-deployment)
* [Manual Deployment](#manual-deployment)
* [Setup Access to Sources and Destinations](#setup-access-to-sources-and-destinations)
* [Set Environment Variables](#set-environment-variables)
* [Prepare Source Folders](#prepare-source-folders)
Expand Down Expand Up @@ -276,15 +277,8 @@ data_region = "<GCP region to deploy data resources (buckets, datasets, etc> (eq

##### Configure Cloud Scheduler Service Account

We will need to grant the Cloud Scheduler account permissions to use parts of the solution

```yaml
cloud_scheduler_account = "service-<project number>@gcp-sa-cloudscheduler.iam.gserviceaccount.com"
```

If this host project never used Cloud Scheduler before, create and run a sample job to force GCP to create the service account.

PS: project number is different from project id/name. You can find both info on the home page of any project.
Terraform will need to grant the Cloud Scheduler account permissions to use parts of the solution. If this host project
never used Cloud Scheduler before, create and run a sample job to force GCP to create the service account.

##### Configure Terraform Service Account

Expand Down Expand Up @@ -490,6 +484,20 @@ terraform apply -var-file=$VARS -auto-approve

```

#### Manual Deployment

Terraform doesn't provide modules to add TTL policies for Firestore (yet). For that, run the below command:

```bash
gcloud firestore fields ttls update expires_at \
--collection-group=project_folder_cache \
--enable-ttl \
--async \
--project=$PROJECT_ID
```
The solution used Firestore in Datastore mode as a cache in some situations. The TTL policy will allow
Firestore to automatically delete entries that are expired to save cost and improve lookup performance.

#### Setup Access to Sources and Destinations

##### Set Environment Variables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.pso.bq_snapshot_manager.services.backup_policy.BackupPolicyService;
import com.google.cloud.pso.bq_snapshot_manager.services.backup_policy.BackupPolicyServiceFireStoreImpl;
import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.PubSubServiceImpl;
import com.google.cloud.pso.bq_snapshot_manager.services.scan.ResourceScannerImpl;
import com.google.cloud.pso.bq_snapshot_manager.services.set.GCSPersistentSetImpl;
import com.google.gson.Gson;
import org.springframework.boot.SpringApplication;
Expand Down Expand Up @@ -126,6 +127,7 @@ public ResponseEntity receiveMessage(@RequestBody PubSubEvent requestBody) {
new BigQueryServiceImpl(configuratorRequest.getTargetTable().getProject()),
backupPolicyService,
new PubSubServiceImpl(),
new ResourceScannerImpl(),
new GCSPersistentSetImpl(environment.getGcsFlagsBucket()),
fallbackBackupPolicy,
"configurator-flags",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.cloud.pso.bq_snapshot_manager.services.backup_policy.BackupPolicyService;
import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.PubSubPublishResults;
import com.google.cloud.pso.bq_snapshot_manager.services.pubsub.PubSubService;
import com.google.cloud.pso.bq_snapshot_manager.services.scan.ResourceScanner;
import com.google.cloud.pso.bq_snapshot_manager.services.set.PersistentSet;
import org.springframework.scheduling.support.CronExpression;

Expand All @@ -35,6 +36,8 @@ public class Configurator {

private final BackupPolicyService backupPolicyService;
private final PubSubService pubSubService;

private final ResourceScanner resourceScanner;
private final PersistentSet persistentSet;
private final FallbackBackupPolicy fallbackBackupPolicy;
private final String persistentSetObjectPrefix;
Expand All @@ -44,6 +47,7 @@ public Configurator(ConfiguratorConfig config,
BigQueryService bqService,
BackupPolicyService backupPolicyService,
PubSubService pubSubService,
ResourceScanner resourceScanner,
PersistentSet persistentSet,
FallbackBackupPolicy fallbackBackupPolicy,
String persistentSetObjectPrefix,
Expand All @@ -52,6 +56,7 @@ public Configurator(ConfiguratorConfig config,
this.bqService = bqService;
this.backupPolicyService = backupPolicyService;
this.pubSubService = pubSubService;
this.resourceScanner = resourceScanner;
this.persistentSet = persistentSet;
this.fallbackBackupPolicy = fallbackBackupPolicy;
this.persistentSetObjectPrefix = persistentSetObjectPrefix;
Expand Down Expand Up @@ -212,7 +217,7 @@ public ConfiguratorResponse execute(ConfiguratorRequest request, String pubSubMe
gcsSnapshotRequest,
bqSnapshotPublishResults,
gcsSnapshotPublishResults
);
);
}

public BackupPolicy getBackupPolicy(ConfiguratorRequest request) throws IOException {
Expand All @@ -232,7 +237,7 @@ public BackupPolicy getBackupPolicy(ConfiguratorRequest request) throws IOExcept
);

return attachedBackupPolicy;
}else{
} else {

logger.logInfoWithTracker(request.isDryRun(),
request.getTrackingId(),
Expand All @@ -241,8 +246,18 @@ public BackupPolicy getBackupPolicy(ConfiguratorRequest request) throws IOExcept
);

// find the most granular fallback policy table > dataset > project
BackupPolicy fallbackPolicy = findFallbackBackupPolicy(fallbackBackupPolicy,
request.getTargetTable()).y();
Tuple<String, BackupPolicy> fallbackBackupPolicyTuple = findFallbackBackupPolicy(
fallbackBackupPolicy,
request.getTargetTable(),
request.getRunId()
);
BackupPolicy fallbackPolicy = fallbackBackupPolicyTuple.y();

logger.logInfoWithTracker(request.isDryRun(),
request.getTrackingId(),
request.getTargetTable(),
String.format("Will use a %s-level fallback policy", fallbackBackupPolicyTuple.x())
);

// if there is a system attached policy, then only use the last_xyz fields from it and use the latest fallback policy
// the last_backup_at needs to be checked to determine if we should take a backup in this run
Expand All @@ -254,7 +269,7 @@ public BackupPolicy getBackupPolicy(ConfiguratorRequest request) throws IOExcept
.setLastGcsSnapshotStorageUri(attachedBackupPolicy.getLastGcsSnapshotStorageUri())
.setLastBqSnapshotStorageUri(attachedBackupPolicy.getLastBqSnapshotStorageUri())
.build();
}else{
} else {
// if there is no attached policy, use fallback one
return fallbackPolicy;
}
Expand Down Expand Up @@ -320,7 +335,7 @@ public static boolean isBackupCronTime(

public static boolean isBackupTime(boolean isForceRun,
boolean isBackupCronTime,
boolean isTableCreatedBeforeTimeTravel){
boolean isTableCreatedBeforeTimeTravel) {
// table must have enough history to use the time travel feature.
// In addition to that, the run has to be a force run or the backup is due based on the backup cron

Expand Down Expand Up @@ -391,9 +406,10 @@ public Tuple<SnapshoterRequest, SnapshoterRequest> prepareSnapshotRequests(Backu
return Tuple.of(bqSnapshotRequest, gcsSnapshotRequest);
}

public static Tuple<String, BackupPolicy> findFallbackBackupPolicy(FallbackBackupPolicy
fallbackBackupPolicy,
TableSpec tableSpec) {
public Tuple<String, BackupPolicy> findFallbackBackupPolicy(FallbackBackupPolicy fallbackBackupPolicy,
TableSpec tableSpec,
String runId
) throws IOException {

BackupPolicy tableLevel = fallbackBackupPolicy.getTableOverrides().get(tableSpec.toSqlString());
if (tableLevel != null) {
Expand All @@ -414,9 +430,16 @@ public static Tuple<String, BackupPolicy> findFallbackBackupPolicy(FallbackBacku
return Tuple.of("project", projectLevel);
}

//TODO: check for folder level by getting the folder id of a project from the API
// API CALL (or cache)
String folderId = resourceScanner.getParentFolderId(tableSpec.getProject(), runId);
if (folderId != null) {
BackupPolicy folderLevel = fallbackBackupPolicy.getFolderOverrides().get(folderId);
if (folderLevel != null) {
return Tuple.of("folder", folderLevel);
}
}

// else return the global default policy
return Tuple.of("global", fallbackBackupPolicy.getDefaultPolicy());
return Tuple.of("default", fallbackBackupPolicy.getDefaultPolicy());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ public BigQuerySnapshoterResponse execute(SnapshoterRequest request, Timestamp o

// expiry date is calculated relative to the operation time
Timestamp expiryTs = Timestamp.ofTimeSecondsAndNanos(
operationTs.getSeconds() + (request.getBackupPolicy().getBigQuerySnapshotExpirationDays().longValue() * 86400L),
0);
operationTs.getSeconds() + (request.getBackupPolicy().getBigQuerySnapshotExpirationDays().longValue() * Utils.SECONDS_IN_DAY),
operationTs.getNanos());

// construct the snapshot table from the request params and calculated timetravel
TableSpec snapshotTable = getSnapshotTableSpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

public class Utils {

public static final Long SECONDS_IN_DAY = 86400L;
public static final Long MILLI_SECONDS_IN_DAY = 86400000L;

public static String getOrFail(Map<String, String> map, String key) {
String field = map.get(key);
if (field == null) {
Expand Down Expand Up @@ -102,7 +105,7 @@ public static void runServiceStartRoutines(LoggingHelper logger,
String flagFileName = String.format("%s/%s", persistentSetObjectPrefix, pubSubMessageId);
if (persistentSet.contains(flagFileName)) {
// log error and ACK and return
String msg = String.format("PubSub message ID '%s' has been processed before by the service. The message should be ACK to PubSub to stop retries. Please investigate further why the message was retried in the first place.",
String msg = String.format("PubSub message ID '%s' has been processed before by the service. This could be a PubSub duplicate message and safe to ignore or the previous messages were not ACK to PubSub to stop retries. Please investigate further if needed.",
pubSubMessageId
);
throw new NonRetryableApplicationException(msg);
Expand Down Expand Up @@ -145,7 +148,7 @@ public static Tuple<TableSpec, Long> getTableSpecWithTimeTravel(TableSpec table,
// use a buffer (milliseconds) to count for the operation time. 1 MIN = 60000 MILLISECONDS
Long bufferMs = timeTravelOffsetDays.equals(TimeTravelOffsetDays.DAYS_7) ? 60 * 60000L : 0L;
// milli seconds per day * number of days
Long timeTravelOffsetMs = (86400000L * Long.parseLong(timeTravelOffsetDays.getText()));
Long timeTravelOffsetMs = (Utils.MILLI_SECONDS_IN_DAY * Long.parseLong(timeTravelOffsetDays.getText()));
timeTravelMs = (refPointMs - timeTravelOffsetMs) + bufferMs;
}

Expand All @@ -167,4 +170,8 @@ public static String trimSlashes(String str){
"/"
);
}

public static Timestamp addSeconds(Timestamp timestamp, Long secondsDelta){
return Timestamp.ofTimeSecondsAndNanos(timestamp.getSeconds() + secondsDelta , timestamp.getNanos());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
public class BackupPolicyServiceFireStoreImpl implements BackupPolicyService {


private static final String KIND = "BigQueryBackupPolicy";
private static final String KIND = "bigquery_backup_policy";
private Datastore datastore;

public BackupPolicyServiceFireStoreImpl() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ public interface ResourceScanner {

// list tables under a project/dataset in the format "project.dataset.table"
List<String> listTables(String project, String dataset) throws InterruptedException, NonRetryableApplicationException;

String getParentFolderId(String project, String runId) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,59 +21,73 @@
import com.google.api.services.cloudresourcemanager.v3.model.Project;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.Timestamp;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.TableDefinition;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.sql.Time;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import com.google.api.services.cloudresourcemanager.v3.CloudResourceManager;
import com.google.api.services.iam.v1.IamScopes;
import com.google.cloud.datastore.Datastore;
import com.google.cloud.datastore.DatastoreOptions;
import com.google.cloud.datastore.Entity;
import com.google.cloud.datastore.Key;
import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.BackupPolicy;
import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.BackupPolicyFields;
import com.google.cloud.pso.bq_snapshot_manager.helpers.Utils;
import jdk.jshell.execution.Util;

public class ResourceScannerImpl implements ResourceScanner {

private BigQuery bqService;
private CloudResourceManager cloudResourceManager;
private final BigQuery bqService;
private final CloudResourceManager cloudResourceManager;

private static final String DATASTORE_KIND = "project_folder_cache";

private Datastore datastore;

public ResourceScannerImpl() throws IOException, GeneralSecurityException {

bqService = BigQueryOptions.getDefaultInstance().getService();
cloudResourceManager = createCloudResourceManagerService();
datastore = DatastoreOptions.getDefaultInstance().getService();
}

@Override
public List<String> listTables(String projectId, String datasetId) {
return StreamSupport.stream(bqService.listTables(DatasetId.of(projectId, datasetId)).iterateAll().spliterator(),
false)
false)
.filter(t -> t.getDefinition().getType().equals(TableDefinition.Type.TABLE))
.map(t -> String.format("%s.%s.%s", projectId, datasetId, t.getTableId().getTable()))
.collect(Collectors.toCollection(ArrayList::new));
}



@Override
public List<String> listDatasets(String projectId) {
return StreamSupport.stream(bqService.listDatasets(projectId)
.iterateAll()
.spliterator(),
false)
.iterateAll()
.spliterator(),
false)
.map(d -> String.format("%s.%s", projectId, d.getDatasetId().getDataset()))
.collect(Collectors.toCollection(ArrayList::new));
}


@Override
public List<String> listProjects(Long folderId) throws IOException {

List<Project> projects = cloudResourceManager.projects().list()
.setParent("folders/"+folderId)
.setParent("folders/" + folderId)
.execute()
.getProjects();

Expand All @@ -84,6 +98,64 @@ public List<String> listProjects(Long folderId) throws IOException {

}

/**
* Returns the folder id of the direct parent of a project.
* If the project doesn't have a folder, it returns null.
* If the project doesn't exist it will throw an exception
*/
@Override
public String getParentFolderId(String project, String runId) throws IOException {

/**
* Resource Manager API has a rate limit of 10 GET operations per second. This means that
* looking up the folder for each table (for thousands of tables) is not scalable.
* For that we use a cache layer to store the project-folder pairs in the scope of each run (to address cache invalidation)
*/

// 1. Lookup the project in the cache

// construct a key including the pro
String keyStr = generateProjectFolderCacheKey(project, runId);

Key projectFolderKey = datastore.newKeyFactory().setKind(DATASTORE_KIND).newKey(keyStr);
Entity projectFolderEntity = datastore.get(projectFolderKey);
if(projectFolderEntity == null){
// 2.a project-folder entity doesn't exist in the cache

// 2.a.1. Query the Resource Manager API
String parentFolderFromApi = cloudResourceManager
.projects()
.get(String.format("projects/%s", project))
.execute()
.getParent();

// API returns "folders/folder_name" and we just return folder_name
String parentFolderFinal = parentFolderFromApi.startsWith("folders/")? parentFolderFromApi.substring(8): null;

Timestamp now = Timestamp.now();
// 2.a.2. Add it to the cache
projectFolderEntity = Entity.newBuilder(projectFolderKey)
.set("project", project)
.set("parent_folder", parentFolderFinal)
.set("run_id", runId)
.set("created_at", now)
.set("expires_at", Utils.addSeconds(now, Utils.SECONDS_IN_DAY)) // TTL 1 day
.build();
datastore.put(projectFolderEntity);

// 2.a.3 return it to the caller
return parentFolderFinal;
}else{
// project-folder entity exist in the cache
// 2.b.1 Return from cache
return projectFolderEntity.getValue("parent_folder").toString();
}
}

public static String generateProjectFolderCacheKey(String project, String runId){
return String.format("%s_%s", project, runId);
}

public static CloudResourceManager createCloudResourceManagerService()
throws IOException, GeneralSecurityException {
// Use the Application Default Credentials strategy for authentication. For more info, see:
Expand Down
Loading

0 comments on commit d87f9a6

Please sign in to comment.