From 14f6936890db023672bf89a8fb0b8ef7eedd7a04 Mon Sep 17 00:00:00 2001 From: Karim Wadie Date: Wed, 19 Apr 2023 19:44:52 +0200 Subject: [PATCH] added firestore cache for project-folder lookups --- README.md | 15 ++++ .../f02_configurator/Configurator.java | 29 ++++--- .../f03_snapshoter/BigQuerySnapshoter.java | 4 +- .../bq_snapshot_manager/helpers/Utils.java | 11 ++- .../BackupPolicyServiceFireStoreImpl.java | 2 +- .../services/scan/ResourceScanner.java | 2 +- .../services/scan/ResourceScannerImpl.java | 82 +++++++++++++++---- .../f02_configurator/ConfiguratorTest.java | 15 ++-- .../helpers/UtilsTest.java | 15 +++- .../services/ResourceScannerTestImpl.java | 2 +- terraform/main.tf | 6 ++ .../modules/async-gcs-snapshoter/main.tf | 2 +- terraform/modules/firestore/main.tf | 17 ++++ terraform/modules/firestore/variables.tf | 2 + terraform/modules/iam/main.tf | 4 +- 15 files changed, 160 insertions(+), 48 deletions(-) create mode 100644 terraform/modules/firestore/main.tf create mode 100644 terraform/modules/firestore/variables.tf diff --git a/README.md b/README.md index f5156ee..511df96 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ * [BigQuery Snapshot Policy Fields](#bigquery-snapshot-policy-fields) * [GCS Snapshot Policy Fields](#gcs-snapshot-policy-fields) * [Terraform Deployment](#terraform-deployment) + * [Manual Deployment](#manual-deployment) * [Setup Access to Sources and Destinations](#setup-access-to-sources-and-destinations) * [Set Environment Variables](#set-environment-variables) * [Prepare Source Folders](#prepare-source-folders) @@ -483,6 +484,20 @@ terraform apply -var-file=$VARS -auto-approve ``` +#### Manual Deployment + +Terraform doesn't provide modules to add TTL policies for Firestore (yet). For that, run the below command: + +```bash +gcloud firestore fields ttls update expires_at \ +--collection-group=project_folder_cache \ +--enable-ttl \ +--async \ +--project=$PROJECT_ID +``` +The solution used Firestore in Datastore mode as a cache in some situations. The TTL policy will allow +Firestore to automatically delete entries that are expired to save cost and improve lookup performance. + #### Setup Access to Sources and Destinations ##### Set Environment Variables diff --git a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f02_configurator/Configurator.java b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f02_configurator/Configurator.java index bca3973..1a8f43a 100644 --- a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f02_configurator/Configurator.java +++ b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f02_configurator/Configurator.java @@ -217,7 +217,7 @@ public ConfiguratorResponse execute(ConfiguratorRequest request, String pubSubMe gcsSnapshotRequest, bqSnapshotPublishResults, gcsSnapshotPublishResults - ); + ); } public BackupPolicy getBackupPolicy(ConfiguratorRequest request) throws IOException { @@ -237,7 +237,7 @@ public BackupPolicy getBackupPolicy(ConfiguratorRequest request) throws IOExcept ); return attachedBackupPolicy; - }else{ + } else { logger.logInfoWithTracker(request.isDryRun(), request.getTrackingId(), @@ -248,13 +248,15 @@ public BackupPolicy getBackupPolicy(ConfiguratorRequest request) throws IOExcept // find the most granular fallback policy table > dataset > project Tuple fallbackBackupPolicyTuple = findFallbackBackupPolicy( fallbackBackupPolicy, - request.getTargetTable()); + request.getTargetTable(), + request.getRunId() + ); BackupPolicy fallbackPolicy = fallbackBackupPolicyTuple.y(); logger.logInfoWithTracker(request.isDryRun(), request.getTrackingId(), request.getTargetTable(), - String.format("Will use a %s-level fallback policy", fallbackBackupPolicyTuple.x() ) + String.format("Will use a %s-level fallback policy", fallbackBackupPolicyTuple.x()) ); // if there is a system attached policy, then only use the last_xyz fields from it and use the latest fallback policy @@ -267,7 +269,7 @@ public BackupPolicy getBackupPolicy(ConfiguratorRequest request) throws IOExcept .setLastGcsSnapshotStorageUri(attachedBackupPolicy.getLastGcsSnapshotStorageUri()) .setLastBqSnapshotStorageUri(attachedBackupPolicy.getLastBqSnapshotStorageUri()) .build(); - }else{ + } else { // if there is no attached policy, use fallback one return fallbackPolicy; } @@ -333,7 +335,7 @@ public static boolean isBackupCronTime( public static boolean isBackupTime(boolean isForceRun, boolean isBackupCronTime, - boolean isTableCreatedBeforeTimeTravel){ + boolean isTableCreatedBeforeTimeTravel) { // table must have enough history to use the time travel feature. // In addition to that, the run has to be a force run or the backup is due based on the backup cron @@ -404,9 +406,10 @@ public Tuple prepareSnapshotRequests(Backu return Tuple.of(bqSnapshotRequest, gcsSnapshotRequest); } - public Tuple findFallbackBackupPolicy(FallbackBackupPolicy - fallbackBackupPolicy, - TableSpec tableSpec) throws IOException { + public Tuple findFallbackBackupPolicy(FallbackBackupPolicy fallbackBackupPolicy, + TableSpec tableSpec, + String runId + ) throws IOException { BackupPolicy tableLevel = fallbackBackupPolicy.getTableOverrides().get(tableSpec.toSqlString()); if (tableLevel != null) { @@ -427,11 +430,11 @@ public Tuple findFallbackBackupPolicy(FallbackBackupPolicy return Tuple.of("project", projectLevel); } - // API CALL - String folderId = resourceScanner.getParentFolderId(tableSpec.getProject()); - if(folderId != null){ + // API CALL (or cache) + String folderId = resourceScanner.getParentFolderId(tableSpec.getProject(), runId); + if (folderId != null) { BackupPolicy folderLevel = fallbackBackupPolicy.getFolderOverrides().get(folderId); - if(folderLevel != null){ + if (folderLevel != null) { return Tuple.of("folder", folderLevel); } } diff --git a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/BigQuerySnapshoter.java b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/BigQuerySnapshoter.java index 009a920..177bb5f 100644 --- a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/BigQuerySnapshoter.java +++ b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/BigQuerySnapshoter.java @@ -111,8 +111,8 @@ public BigQuerySnapshoterResponse execute(SnapshoterRequest request, Timestamp o // expiry date is calculated relative to the operation time Timestamp expiryTs = Timestamp.ofTimeSecondsAndNanos( - operationTs.getSeconds() + (request.getBackupPolicy().getBigQuerySnapshotExpirationDays().longValue() * 86400L), - 0); + operationTs.getSeconds() + (request.getBackupPolicy().getBigQuerySnapshotExpirationDays().longValue() * Utils.SECONDS_IN_DAY), + operationTs.getNanos()); // construct the snapshot table from the request params and calculated timetravel TableSpec snapshotTable = getSnapshotTableSpec( diff --git a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/helpers/Utils.java b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/helpers/Utils.java index b4f1154..a7cc33a 100644 --- a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/helpers/Utils.java +++ b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/helpers/Utils.java @@ -32,6 +32,9 @@ public class Utils { + public static final Long SECONDS_IN_DAY = 86400L; + public static final Long MILLI_SECONDS_IN_DAY = 86400000L; + public static String getOrFail(Map map, String key) { String field = map.get(key); if (field == null) { @@ -102,7 +105,7 @@ public static void runServiceStartRoutines(LoggingHelper logger, String flagFileName = String.format("%s/%s", persistentSetObjectPrefix, pubSubMessageId); if (persistentSet.contains(flagFileName)) { // log error and ACK and return - String msg = String.format("PubSub message ID '%s' has been processed before by the service. The message should be ACK to PubSub to stop retries. Please investigate further why the message was retried in the first place.", + String msg = String.format("PubSub message ID '%s' has been processed before by the service. This could be a PubSub duplicate message and safe to ignore or the previous messages were not ACK to PubSub to stop retries. Please investigate further if needed.", pubSubMessageId ); throw new NonRetryableApplicationException(msg); @@ -145,7 +148,7 @@ public static Tuple getTableSpecWithTimeTravel(TableSpec table, // use a buffer (milliseconds) to count for the operation time. 1 MIN = 60000 MILLISECONDS Long bufferMs = timeTravelOffsetDays.equals(TimeTravelOffsetDays.DAYS_7) ? 60 * 60000L : 0L; // milli seconds per day * number of days - Long timeTravelOffsetMs = (86400000L * Long.parseLong(timeTravelOffsetDays.getText())); + Long timeTravelOffsetMs = (Utils.MILLI_SECONDS_IN_DAY * Long.parseLong(timeTravelOffsetDays.getText())); timeTravelMs = (refPointMs - timeTravelOffsetMs) + bufferMs; } @@ -167,4 +170,8 @@ public static String trimSlashes(String str){ "/" ); } + + public static Timestamp addSeconds(Timestamp timestamp, Long secondsDelta){ + return Timestamp.ofTimeSecondsAndNanos(timestamp.getSeconds() + secondsDelta , timestamp.getNanos()); + } } diff --git a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/backup_policy/BackupPolicyServiceFireStoreImpl.java b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/backup_policy/BackupPolicyServiceFireStoreImpl.java index 28ca117..7729b1e 100644 --- a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/backup_policy/BackupPolicyServiceFireStoreImpl.java +++ b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/backup_policy/BackupPolicyServiceFireStoreImpl.java @@ -13,7 +13,7 @@ public class BackupPolicyServiceFireStoreImpl implements BackupPolicyService { - private static final String KIND = "BigQueryBackupPolicy"; + private static final String KIND = "bigquery_backup_policy"; private Datastore datastore; public BackupPolicyServiceFireStoreImpl() { diff --git a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/scan/ResourceScanner.java b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/scan/ResourceScanner.java index 3cd96b5..074b4de 100644 --- a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/scan/ResourceScanner.java +++ b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/scan/ResourceScanner.java @@ -34,5 +34,5 @@ public interface ResourceScanner { // list tables under a project/dataset in the format "project.dataset.table" List listTables(String project, String dataset) throws InterruptedException, NonRetryableApplicationException; - String getParentFolderId(String project) throws IOException; + String getParentFolderId(String project, String runId) throws IOException; } \ No newline at end of file diff --git a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/scan/ResourceScannerImpl.java b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/scan/ResourceScannerImpl.java index 75e0720..72c8d10 100644 --- a/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/scan/ResourceScannerImpl.java +++ b/services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/services/scan/ResourceScannerImpl.java @@ -21,6 +21,7 @@ import com.google.api.services.cloudresourcemanager.v3.model.Project; import com.google.auth.http.HttpCredentialsAdapter; import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.Timestamp; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.DatasetId; @@ -28,24 +29,39 @@ import java.io.IOException; import java.security.GeneralSecurityException; +import java.sql.Time; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import com.google.api.services.cloudresourcemanager.v3.CloudResourceManager; import com.google.api.services.iam.v1.IamScopes; +import com.google.cloud.datastore.Datastore; +import com.google.cloud.datastore.DatastoreOptions; +import com.google.cloud.datastore.Entity; +import com.google.cloud.datastore.Key; +import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.BackupPolicy; +import com.google.cloud.pso.bq_snapshot_manager.entities.backup_policy.BackupPolicyFields; +import com.google.cloud.pso.bq_snapshot_manager.helpers.Utils; +import jdk.jshell.execution.Util; public class ResourceScannerImpl implements ResourceScanner { - private BigQuery bqService; - private CloudResourceManager cloudResourceManager; + private final BigQuery bqService; + private final CloudResourceManager cloudResourceManager; + + private static final String DATASTORE_KIND = "project_folder_cache"; + + private Datastore datastore; public ResourceScannerImpl() throws IOException, GeneralSecurityException { bqService = BigQueryOptions.getDefaultInstance().getService(); cloudResourceManager = createCloudResourceManagerService(); + datastore = DatastoreOptions.getDefaultInstance().getService(); } @Override @@ -85,25 +101,59 @@ public List listProjects(Long folderId) throws IOException { /** * Returns the folder id of the direct parent of a project. * If the project doesn't have a folder, it returns null. - * If the project doesn't exsist it will throw an exception + * If the project doesn't exist it will throw an exception */ @Override - public String getParentFolderId(String project) throws IOException { - String parentFolder = cloudResourceManager - .projects() - .get(String.format("projects/%s", project)) - .execute() - .getParent(); + public String getParentFolderId(String project, String runId) throws IOException { + + /** + * Resource Manager API has a rate limit of 10 GET operations per second. This means that + * looking up the folder for each table (for thousands of tables) is not scalable. + * For that we use a cache layer to store the project-folder pairs in the scope of each run (to address cache invalidation) + */ + + // 1. Lookup the project in the cache + + // construct a key including the pro + String keyStr = generateProjectFolderCacheKey(project, runId); + + Key projectFolderKey = datastore.newKeyFactory().setKind(DATASTORE_KIND).newKey(keyStr); + Entity projectFolderEntity = datastore.get(projectFolderKey); + if(projectFolderEntity == null){ + // 2.a project-folder entity doesn't exist in the cache + + // 2.a.1. Query the Resource Manager API + String parentFolderFromApi = cloudResourceManager + .projects() + .get(String.format("projects/%s", project)) + .execute() + .getParent(); - if (parentFolder == null) { - return null; - } - if (parentFolder.startsWith("folders/")){ // API returns "folders/folder_name" and we just return folder_name - return parentFolder.substring(8); + String parentFolderFinal = parentFolderFromApi.startsWith("folders/")? parentFolderFromApi.substring(8): null; + + Timestamp now = Timestamp.now(); + // 2.a.2. Add it to the cache + projectFolderEntity = Entity.newBuilder(projectFolderKey) + .set("project", project) + .set("parent_folder", parentFolderFinal) + .set("run_id", runId) + .set("created_at", now) + .set("expires_at", Utils.addSeconds(now, Utils.SECONDS_IN_DAY)) // TTL 1 day + .build(); + datastore.put(projectFolderEntity); + + // 2.a.3 return it to the caller + return parentFolderFinal; + }else{ + // project-folder entity exist in the cache + // 2.b.1 Return from cache + return projectFolderEntity.getValue("parent_folder").toString(); } - // in all other cases, like the parent being organizations/xyz, return null - return null; + } + + public static String generateProjectFolderCacheKey(String project, String runId){ + return String.format("%s_%s", project, runId); } public static CloudResourceManager createCloudResourceManagerService() diff --git a/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/functions/f02_configurator/ConfiguratorTest.java b/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/functions/f02_configurator/ConfiguratorTest.java index e760cbb..2868749 100644 --- a/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/functions/f02_configurator/ConfiguratorTest.java +++ b/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/functions/f02_configurator/ConfiguratorTest.java @@ -127,7 +127,8 @@ public void shutdown() { // test table level Tuple tableLevel = configurator.findFallbackBackupPolicy( fallbackBackupPolicy, - TableSpec.fromSqlString("p1.d1.t1") + TableSpec.fromSqlString("p1.d1.t1"), + "runId" ); assertEquals("table", tableLevel.x()); @@ -136,7 +137,8 @@ public void shutdown() { // test dataset level Tuple datasetLevel = configurator.findFallbackBackupPolicy( fallbackBackupPolicy, - TableSpec.fromSqlString("p1.d2.t1") + TableSpec.fromSqlString("p1.d2.t1"), + "runId" ); assertEquals("dataset", datasetLevel.x()); @@ -145,7 +147,8 @@ public void shutdown() { // test project level Tuple projectLevel = configurator.findFallbackBackupPolicy( fallbackBackupPolicy, - TableSpec.fromSqlString("p2.d1.t1") + TableSpec.fromSqlString("p2.d1.t1"), + "runId" ); assertEquals("project", projectLevel.x()); @@ -154,7 +157,8 @@ public void shutdown() { // test folder level Tuple folderLevel = configurator.findFallbackBackupPolicy( fallbackBackupPolicy, - TableSpec.fromSqlString("p3.d1.t1") + TableSpec.fromSqlString("p3.d1.t1"), + "runId" ); assertEquals("folder", folderLevel.x()); @@ -163,7 +167,8 @@ public void shutdown() { // test default level Tuple defaultLevel = configurator.findFallbackBackupPolicy( fallbackBackupPolicy, - TableSpec.fromSqlString("p9.d1.t1") + TableSpec.fromSqlString("p9.d1.t1"), + "runId" ); assertEquals("default", defaultLevel.x()); diff --git a/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/helpers/UtilsTest.java b/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/helpers/UtilsTest.java index b3d794f..e859577 100644 --- a/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/helpers/UtilsTest.java +++ b/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/helpers/UtilsTest.java @@ -83,9 +83,16 @@ public void testTrimSlashes(){ } @Test - public void test(){ - Timestamp ts = Timestamp.now(); - System.out.println(ts.toSqlTimestamp().getTime()); - System.out.println(Utils.timestampToUnixTimeMillis(ts)); + public void testAddSeconds(){ + + Timestamp today = Timestamp.parseTimestamp("2022-10-13T12:58:41Z"); + Timestamp tomorrow = Timestamp.parseTimestamp("2022-10-14T12:58:41Z"); + + assertEquals( + Utils.addSeconds(today, 86400L), + tomorrow + ); } + + } \ No newline at end of file diff --git a/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/services/ResourceScannerTestImpl.java b/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/services/ResourceScannerTestImpl.java index 2da1692..74b5c74 100644 --- a/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/services/ResourceScannerTestImpl.java +++ b/services/library/src/test/java/com/google/cloud/pso/bq_snapshot_manager/services/ResourceScannerTestImpl.java @@ -52,7 +52,7 @@ public List listTables(String project, String dataset) throws Interrupte } @Override - public String getParentFolderId(String project) throws IOException { + public String getParentFolderId(String project, String runId) throws IOException { switch(project){ case "p1": return "500"; case "p2": return "600"; diff --git a/terraform/main.tf b/terraform/main.tf index 51a5443..16cc968 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -407,4 +407,10 @@ module "async-gcs-snapshoter" { host_project = var.project log_sink_name = "bq_backup_manager_gcs_export_pubsub_sink" pubsub_topic_name = module.pubsub-tagger.topic-name +} + +module "firestore" { + source = "./modules/firestore" + project = var.project + region = var.compute_region # store cache data next to the services } \ No newline at end of file diff --git a/terraform/modules/async-gcs-snapshoter/main.tf b/terraform/modules/async-gcs-snapshoter/main.tf index cca109b..17a1822 100644 --- a/terraform/modules/async-gcs-snapshoter/main.tf +++ b/terraform/modules/async-gcs-snapshoter/main.tf @@ -2,7 +2,7 @@ // create a pubsub log sink in the backup project where the bq extract jobs run resource "google_logging_project_sink" "backup_project_pubsub_sink" { project = var.log_project - name = var.log_sink_name + name = "${var.log_sink_name}_${var.host_project}" destination = "pubsub.googleapis.com/projects/${var.host_project}/topics/${var.pubsub_topic_name}" filter = "resource.type=bigquery_resource protoPayload.serviceData.jobCompletedEvent.eventName=extract_job_completed protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.app=${var.application_name}" # Use a unique writer (creates a unique service account used for writing) diff --git a/terraform/modules/firestore/main.tf b/terraform/modules/firestore/main.tf new file mode 100644 index 0000000..b36be09 --- /dev/null +++ b/terraform/modules/firestore/main.tf @@ -0,0 +1,17 @@ +#https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/firestore_database + +resource "google_project_service" "firestore" { + project = var.project + service = "firestore.googleapis.com" +} + +resource "google_firestore_database" "datastore_mode_database" { + project = var.project + + name = "(default)" + + location_id = var.region + type = "DATASTORE_MODE" + + depends_on = [google_project_service.firestore] +} \ No newline at end of file diff --git a/terraform/modules/firestore/variables.tf b/terraform/modules/firestore/variables.tf new file mode 100644 index 0000000..4211a60 --- /dev/null +++ b/terraform/modules/firestore/variables.tf @@ -0,0 +1,2 @@ +variable "project" {type = string} +variable "region" {type = string} diff --git a/terraform/modules/iam/main.tf b/terraform/modules/iam/main.tf index fef0131..9187d20 100644 --- a/terraform/modules/iam/main.tf +++ b/terraform/modules/iam/main.tf @@ -83,10 +83,10 @@ resource "google_service_account_iam_member" "sa_dispatcher_account_user_sa_disp #### Configurator SA Permissions ### -// read backup policies when using datastore as backend +// write cache entries and/or read backup policies (when using datastore as policies backend) resource "google_project_iam_member" "sa_configurator_datastore_viewer" { project = var.project -role = "roles/datastore.viewer" +role = "roles/datastore.user" member = "serviceAccount:${google_service_account.sa_configurator.email}" }