Skip to content

Commit

Permalink
added firestore cache for project-folder lookups
Browse files Browse the repository at this point in the history
  • Loading branch information
kwadie committed Apr 19, 2023
1 parent e1fe45c commit 14f6936
Show file tree
Hide file tree
Showing 15 changed files with 160 additions and 48 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* [BigQuery Snapshot Policy Fields](#bigquery-snapshot-policy-fields)
* [GCS Snapshot Policy Fields](#gcs-snapshot-policy-fields)
* [Terraform Deployment](#terraform-deployment)
* [Manual Deployment](#manual-deployment)
* [Setup Access to Sources and Destinations](#setup-access-to-sources-and-destinations)
* [Set Environment Variables](#set-environment-variables)
* [Prepare Source Folders](#prepare-source-folders)
Expand Down Expand Up @@ -483,6 +484,20 @@ terraform apply -var-file=$VARS -auto-approve

```

#### Manual Deployment

Terraform doesn't provide modules to add TTL policies for Firestore (yet). For that, run the below command:

```bash
gcloud firestore fields ttls update expires_at \
--collection-group=project_folder_cache \
--enable-ttl \
--async \
--project=$PROJECT_ID
```
The solution used Firestore in Datastore mode as a cache in some situations. The TTL policy will allow
Firestore to automatically delete entries that are expired to save cost and improve lookup performance.

#### Setup Access to Sources and Destinations

##### Set Environment Variables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public ConfiguratorResponse execute(ConfiguratorRequest request, String pubSubMe
gcsSnapshotRequest,
bqSnapshotPublishResults,
gcsSnapshotPublishResults
);
);
}

public BackupPolicy getBackupPolicy(ConfiguratorRequest request) throws IOException {
Expand All @@ -237,7 +237,7 @@ public BackupPolicy getBackupPolicy(ConfiguratorRequest request) throws IOExcept
);

return attachedBackupPolicy;
}else{
} else {

logger.logInfoWithTracker(request.isDryRun(),
request.getTrackingId(),
Expand All @@ -248,13 +248,15 @@ public BackupPolicy getBackupPolicy(ConfiguratorRequest request) throws IOExcept
// find the most granular fallback policy table > dataset > project
Tuple<String, BackupPolicy> fallbackBackupPolicyTuple = findFallbackBackupPolicy(
fallbackBackupPolicy,
request.getTargetTable());
request.getTargetTable(),
request.getRunId()
);
BackupPolicy fallbackPolicy = fallbackBackupPolicyTuple.y();

logger.logInfoWithTracker(request.isDryRun(),
request.getTrackingId(),
request.getTargetTable(),
String.format("Will use a %s-level fallback policy", fallbackBackupPolicyTuple.x() )
String.format("Will use a %s-level fallback policy", fallbackBackupPolicyTuple.x())
);

// if there is a system attached policy, then only use the last_xyz fields from it and use the latest fallback policy
Expand All @@ -267,7 +269,7 @@ public BackupPolicy getBackupPolicy(ConfiguratorRequest request) throws IOExcept
.setLastGcsSnapshotStorageUri(attachedBackupPolicy.getLastGcsSnapshotStorageUri())
.setLastBqSnapshotStorageUri(attachedBackupPolicy.getLastBqSnapshotStorageUri())
.build();
}else{
} else {
// if there is no attached policy, use fallback one
return fallbackPolicy;
}
Expand Down Expand Up @@ -333,7 +335,7 @@ public static boolean isBackupCronTime(

public static boolean isBackupTime(boolean isForceRun,
boolean isBackupCronTime,
boolean isTableCreatedBeforeTimeTravel){
boolean isTableCreatedBeforeTimeTravel) {
// table must have enough history to use the time travel feature.
// In addition to that, the run has to be a force run or the backup is due based on the backup cron

Expand Down Expand Up @@ -404,9 +406,10 @@ public Tuple<SnapshoterRequest, SnapshoterRequest> prepareSnapshotRequests(Backu
return Tuple.of(bqSnapshotRequest, gcsSnapshotRequest);
}

public Tuple<String, BackupPolicy> findFallbackBackupPolicy(FallbackBackupPolicy
fallbackBackupPolicy,
TableSpec tableSpec) throws IOException {
public Tuple<String, BackupPolicy> findFallbackBackupPolicy(FallbackBackupPolicy fallbackBackupPolicy,
TableSpec tableSpec,
String runId
) throws IOException {

BackupPolicy tableLevel = fallbackBackupPolicy.getTableOverrides().get(tableSpec.toSqlString());
if (tableLevel != null) {
Expand All @@ -427,11 +430,11 @@ public Tuple<String, BackupPolicy> findFallbackBackupPolicy(FallbackBackupPolicy
return Tuple.of("project", projectLevel);
}

// API CALL
String folderId = resourceScanner.getParentFolderId(tableSpec.getProject());
if(folderId != null){
// API CALL (or cache)
String folderId = resourceScanner.getParentFolderId(tableSpec.getProject(), runId);
if (folderId != null) {
BackupPolicy folderLevel = fallbackBackupPolicy.getFolderOverrides().get(folderId);
if(folderLevel != null){
if (folderLevel != null) {
return Tuple.of("folder", folderLevel);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ public BigQuerySnapshoterResponse execute(SnapshoterRequest request, Timestamp o

// expiry date is calculated relative to the operation time
Timestamp expiryTs = Timestamp.ofTimeSecondsAndNanos(
operationTs.getSeconds() + (request.getBackupPolicy().getBigQuerySnapshotExpirationDays().longValue() * 86400L),
0);
operationTs.getSeconds() + (request.getBackupPolicy().getBigQuerySnapshotExpirationDays().longValue() * Utils.SECONDS_IN_DAY),
operationTs.getNanos());

// construct the snapshot table from the request params and calculated timetravel
TableSpec snapshotTable = getSnapshotTableSpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

public class Utils {

public static final Long SECONDS_IN_DAY = 86400L;
public static final Long MILLI_SECONDS_IN_DAY = 86400000L;

public static String getOrFail(Map<String, String> map, String key) {
String field = map.get(key);
if (field == null) {
Expand Down Expand Up @@ -102,7 +105,7 @@ public static void runServiceStartRoutines(LoggingHelper logger,
String flagFileName = String.format("%s/%s", persistentSetObjectPrefix, pubSubMessageId);
if (persistentSet.contains(flagFileName)) {
// log error and ACK and return
String msg = String.format("PubSub message ID '%s' has been processed before by the service. The message should be ACK to PubSub to stop retries. Please investigate further why the message was retried in the first place.",
String msg = String.format("PubSub message ID '%s' has been processed before by the service. This could be a PubSub duplicate message and safe to ignore or the previous messages were not ACK to PubSub to stop retries. Please investigate further if needed.",
pubSubMessageId
);
throw new NonRetryableApplicationException(msg);
Expand Down Expand Up @@ -145,7 +148,7 @@ public static Tuple<TableSpec, Long> getTableSpecWithTimeTravel(TableSpec table,
// use a buffer (milliseconds) to count for the operation time. 1 MIN = 60000 MILLISECONDS
Long bufferMs = timeTravelOffsetDays.equals(TimeTravelOffsetDays.DAYS_7) ? 60 * 60000L : 0L;
// milli seconds per day * number of days
Long timeTravelOffsetMs = (86400000L * Long.parseLong(timeTravelOffsetDays.getText()));
Long timeTravelOffsetMs = (Utils.MILLI_SECONDS_IN_DAY * Long.parseLong(timeTravelOffsetDays.getText()));
timeTravelMs = (refPointMs - timeTravelOffsetMs) + bufferMs;
}

Expand All @@ -167,4 +170,8 @@ public static String trimSlashes(String str){
"/"
);
}

public static Timestamp addSeconds(Timestamp timestamp, Long secondsDelta){
return Timestamp.ofTimeSecondsAndNanos(timestamp.getSeconds() + secondsDelta , timestamp.getNanos());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
public class BackupPolicyServiceFireStoreImpl implements BackupPolicyService {


private static final String KIND = "BigQueryBackupPolicy";
private static final String KIND = "bigquery_backup_policy";
private Datastore datastore;

public BackupPolicyServiceFireStoreImpl() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ public interface ResourceScanner {
// list tables under a project/dataset in the format "project.dataset.table"
List<String> listTables(String project, String dataset) throws InterruptedException, NonRetryableApplicationException;

String getParentFolderId(String project) throws IOException;
String getParentFolderId(String project, String runId) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,47 @@
import com.google.api.services.cloudresourcemanager.v3.model.Project;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.Timestamp;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.TableDefinition;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.sql.Time;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import com.google.api.services.cloudresourcemanager.v3.CloudResourceManager;
import com.google.api.services.iam.v1.IamScopes;
import com.google.cloud.datastore.Datastore;
import com.google.cloud.datastore.DatastoreOptions;
import com.google.cloud.datastore.Entity;
import com.google.cloud.datastore.Key;
import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.BackupPolicy;
import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.BackupPolicyFields;
import com.google.cloud.pso.bq_snapshot_manager.helpers.Utils;
import jdk.jshell.execution.Util;

public class ResourceScannerImpl implements ResourceScanner {

private BigQuery bqService;
private CloudResourceManager cloudResourceManager;
private final BigQuery bqService;
private final CloudResourceManager cloudResourceManager;

private static final String DATASTORE_KIND = "project_folder_cache";

private Datastore datastore;

public ResourceScannerImpl() throws IOException, GeneralSecurityException {

bqService = BigQueryOptions.getDefaultInstance().getService();
cloudResourceManager = createCloudResourceManagerService();
datastore = DatastoreOptions.getDefaultInstance().getService();
}

@Override
Expand Down Expand Up @@ -85,25 +101,59 @@ public List<String> listProjects(Long folderId) throws IOException {
/**
* Returns the folder id of the direct parent of a project.
* If the project doesn't have a folder, it returns null.
* If the project doesn't exsist it will throw an exception
* If the project doesn't exist it will throw an exception
*/
@Override
public String getParentFolderId(String project) throws IOException {
String parentFolder = cloudResourceManager
.projects()
.get(String.format("projects/%s", project))
.execute()
.getParent();
public String getParentFolderId(String project, String runId) throws IOException {

/**
* Resource Manager API has a rate limit of 10 GET operations per second. This means that
* looking up the folder for each table (for thousands of tables) is not scalable.
* For that we use a cache layer to store the project-folder pairs in the scope of each run (to address cache invalidation)
*/

// 1. Lookup the project in the cache

// construct a key including the pro
String keyStr = generateProjectFolderCacheKey(project, runId);

Key projectFolderKey = datastore.newKeyFactory().setKind(DATASTORE_KIND).newKey(keyStr);
Entity projectFolderEntity = datastore.get(projectFolderKey);
if(projectFolderEntity == null){
// 2.a project-folder entity doesn't exist in the cache

// 2.a.1. Query the Resource Manager API
String parentFolderFromApi = cloudResourceManager
.projects()
.get(String.format("projects/%s", project))
.execute()
.getParent();

if (parentFolder == null) {
return null;
}
if (parentFolder.startsWith("folders/")){
// API returns "folders/folder_name" and we just return folder_name
return parentFolder.substring(8);
String parentFolderFinal = parentFolderFromApi.startsWith("folders/")? parentFolderFromApi.substring(8): null;

Timestamp now = Timestamp.now();
// 2.a.2. Add it to the cache
projectFolderEntity = Entity.newBuilder(projectFolderKey)
.set("project", project)
.set("parent_folder", parentFolderFinal)
.set("run_id", runId)
.set("created_at", now)
.set("expires_at", Utils.addSeconds(now, Utils.SECONDS_IN_DAY)) // TTL 1 day
.build();
datastore.put(projectFolderEntity);

// 2.a.3 return it to the caller
return parentFolderFinal;
}else{
// project-folder entity exist in the cache
// 2.b.1 Return from cache
return projectFolderEntity.getValue("parent_folder").toString();
}
// in all other cases, like the parent being organizations/xyz, return null
return null;
}

public static String generateProjectFolderCacheKey(String project, String runId){
return String.format("%s_%s", project, runId);
}

public static CloudResourceManager createCloudResourceManagerService()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public void shutdown() {
// test table level
Tuple<String, BackupPolicy> tableLevel = configurator.findFallbackBackupPolicy(
fallbackBackupPolicy,
TableSpec.fromSqlString("p1.d1.t1")
TableSpec.fromSqlString("p1.d1.t1"),
"runId"
);

assertEquals("table", tableLevel.x());
Expand All @@ -136,7 +137,8 @@ public void shutdown() {
// test dataset level
Tuple<String, BackupPolicy> datasetLevel = configurator.findFallbackBackupPolicy(
fallbackBackupPolicy,
TableSpec.fromSqlString("p1.d2.t1")
TableSpec.fromSqlString("p1.d2.t1"),
"runId"
);

assertEquals("dataset", datasetLevel.x());
Expand All @@ -145,7 +147,8 @@ public void shutdown() {
// test project level
Tuple<String, BackupPolicy> projectLevel = configurator.findFallbackBackupPolicy(
fallbackBackupPolicy,
TableSpec.fromSqlString("p2.d1.t1")
TableSpec.fromSqlString("p2.d1.t1"),
"runId"
);

assertEquals("project", projectLevel.x());
Expand All @@ -154,7 +157,8 @@ public void shutdown() {
// test folder level
Tuple<String, BackupPolicy> folderLevel = configurator.findFallbackBackupPolicy(
fallbackBackupPolicy,
TableSpec.fromSqlString("p3.d1.t1")
TableSpec.fromSqlString("p3.d1.t1"),
"runId"
);

assertEquals("folder", folderLevel.x());
Expand All @@ -163,7 +167,8 @@ public void shutdown() {
// test default level
Tuple<String, BackupPolicy> defaultLevel = configurator.findFallbackBackupPolicy(
fallbackBackupPolicy,
TableSpec.fromSqlString("p9.d1.t1")
TableSpec.fromSqlString("p9.d1.t1"),
"runId"
);

assertEquals("default", defaultLevel.x());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,16 @@ public void testTrimSlashes(){
}

@Test
public void test(){
Timestamp ts = Timestamp.now();
System.out.println(ts.toSqlTimestamp().getTime());
System.out.println(Utils.timestampToUnixTimeMillis(ts));
public void testAddSeconds(){

Timestamp today = Timestamp.parseTimestamp("2022-10-13T12:58:41Z");
Timestamp tomorrow = Timestamp.parseTimestamp("2022-10-14T12:58:41Z");

assertEquals(
Utils.addSeconds(today, 86400L),
tomorrow
);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public List<String> listTables(String project, String dataset) throws Interrupte
}

@Override
public String getParentFolderId(String project) throws IOException {
public String getParentFolderId(String project, String runId) throws IOException {
switch(project){
case "p1": return "500";
case "p2": return "600";
Expand Down
6 changes: 6 additions & 0 deletions terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -407,4 +407,10 @@ module "async-gcs-snapshoter" {
host_project = var.project
log_sink_name = "bq_backup_manager_gcs_export_pubsub_sink"
pubsub_topic_name = module.pubsub-tagger.topic-name
}

module "firestore" {
source = "./modules/firestore"
project = var.project
region = var.compute_region # store cache data next to the services
}
Loading

0 comments on commit 14f6936

Please sign in to comment.