Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GBICSGO-2361: optimize count partition query #470

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion frontend/src/components/PricePrediction.vue
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const pricePredictionOptions = ref<ApexOptions>({
});

const backupStatusIsDeleted = computed(() => {
return props.backup?.status === BackupStatus.FINISHED || props.backup?.status === BackupStatus.BACKUP_DELETED;
return props.backup?.status === BackupStatus.FINISHED || props.backup?.status === BackupStatus.BACKUP_DELETED || props.backup?.status === BackupStatus.BACKUP_SOURCE_DELETED
});

const updateData = () => {
Expand Down
8 changes: 8 additions & 0 deletions pkg/http/mock/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ var (

GetBackUpSourceNotFoundForBigQueryMock = NewMockedHTTPRequest("GET", "bigquery/v2/projects/.*/datasets/notExistingDataset", getBackUpSourceNotFoundResponseForBigQuery)
GetBackUpSourceNotFoundForCloudStorageMock = NewMockedHTTPRequest("GET", "/storage/v1/b/notExistingBucket", getBackUpSourceNotFoundResponseForCloudStorage)

TableMetadataPartitionResultHTTPMock = NewMockedHTTPRequest("GET", "/bigquery/v2/projects/.*/queries", getMetadataTablePartitionsQueryResponse)
)

const (
Expand Down Expand Up @@ -178,6 +180,12 @@ Content-Type: application/json; charset=UTF-8

{"kind":"bigquery#getQueryResultsResponse","etag":"P0Ea2Yx1PihRPFrsH/Q3fA==","schema":{"fields":[{"name":"total","type":"INTEGER","mode":"NULLABLE"},{"name":"p","type":"TIMESTAMP","mode":"NULLABLE"}]},"jobReference":{"projectId":"local-ability-backup","jobID":"PYRwoNSDhUNuqUtVbxtkjbSvmx7","location":"EU"},"totalRows":"76","totalBytesProcessed":"0","jobComplete":true,"cacheHit":false}`

getMetadataTablePartitionsQueryResponse = `HTTP/2.0 200 OK
Content-Length: 1462
Content-Type: application/json; charset=UTF-8

{"kind": "bigquery#getQueryResultsResponse", "etag": "P0Ea2Yx1PihRPFrsH/Q3fA==", "schema": {"fields": [{"name": "total", "fullname": "total", "type": "INTEGER", "mode": "NULLABLE", "description": "", "fields": [], "hasPermission": true, "dataPolicies": [], "maxLength": "0", "precision": "0", "scale": "0", "roundingMode": "ROUNDING_MODE_UNSPECIFIED", "collation": "", "defaultValueExpression": "", "isMeasure": false, "rangeElementType": {"type": ""}, "foreignTypeDefinition": "", "dataGovernanceTags": [], "identityColumnInfo": {"generatedMode": "GENERATED_MODE_UNSPECIFIED", "start": "", "increment": ""}}, {"name": "partition_id", "fullname": "partition_id", "type": "STRING", "mode": "NULLABLE", "description": "", "fields": [], "hasPermission": true, "dataPolicies": [], "maxLength": "0", "precision": "0", "scale": "0", "roundingMode": "ROUNDING_MODE_UNSPECIFIED", "collation": "", "defaultValueExpression": "", "isMeasure": false, "rangeElementType": {"type": ""}, "foreignTypeDefinition": "", "dataGovernanceTags": [], "identityColumnInfo": {"generatedMode": "GENERATED_MODE_UNSPECIFIED", "start": "", "increment": ""}}]}, "rows": [{"f": [{"v": "22498"}, {"v": "20190110"}]}, {"f": [{"v": "65"}, {"v": "20200229"}]}], "jobReference": {"projectId": "local-ability-backup", "jobID": "PYRwoNSDhUNuqUtVbxtkjbSvmx7", "location": "EU"}, "totalRows": "2", "totalBytesProcessed": "0", "jobComplete": true, "cacheHit": false}`

getExtractJobResultOkResponse = `HTTP/2.0 200 OK
Content-Length: 1191
Content-Type: application/json; charset=UTF-8
Expand Down
78 changes: 13 additions & 65 deletions pkg/service/bigquery/bigquery_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@ package bigquery

import (
"context"
"errors"
"fmt"
"net/http"
"strconv"
"time"

bq "cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
"github.com/ottogroup/penelope/pkg/config"
"github.com/ottogroup/penelope/pkg/http/impersonate"
"github.com/pkg/errors"
"go.opencensus.io/trace"
gimpersonate "google.golang.org/api/impersonate"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -215,35 +213,8 @@ func (d *defaultBigQueryClient) HasTablePartitions(ctxIn context.Context, projec
}

type tablePartition struct {
Total int64 `bigquery:"total"`
TIMESTAMP time.Time `bigquery:"T_TIMESTAMP"`
DATE civil.Date `bigquery:"T_DATE"`
TIME civil.Time `bigquery:"T_TIME"`
DATETIME civil.DateTime `bigquery:"T_DATETIME"`
}

func (t *tablePartition) getPartitionFor(targetField string) (string, error) {
var partitionName string
switch targetField {
case "T_TIMESTAMP":
partitionName = fmt.Sprintf("%s%s%s",
zerofill(t.TIMESTAMP.Year()),
zerofill(int(t.TIMESTAMP.Month())),
zerofill(t.TIMESTAMP.Day()))
case "T_DATE":
partitionName = fmt.Sprintf("%s%s%s",
zerofill(t.DATE.Year),
zerofill(int(t.DATE.Month)),
zerofill(t.DATE.Day))
case "T_DATETIME":
partitionName = fmt.Sprintf("%s%s%s",
zerofill(t.DATETIME.Date.Year),
zerofill(int(t.DATETIME.Date.Month)),
zerofill(t.DATETIME.Date.Day))
default:
return "", fmt.Errorf("partition for target field %q is not supported", targetField)
}
return partitionName, nil
Total int64 `bigquery:"total"`
PartitionID string `bigquery:"partition_id"`
}

// GetTablePartitions list all partitions in table
Expand All @@ -260,24 +231,9 @@ func (d *defaultBigQueryClient) GetTablePartitions(ctxIn context.Context, projec
return nil, fmt.Errorf("GetTablePartitions failed for `%s.%s.%s`, because partition other then DAY is not supported", project, dataset, table)
}

timePartitioningField := "_PARTITIONTIME"
targetFieldInTablePartition := "T_TIMESTAMP"
if metadata.TimePartitioning.Field != "" {
timePartitioningField = metadata.TimePartitioning.Field
for _, schema := range metadata.Schema.Relax() {
if schema.Name == metadata.TimePartitioning.Field {
targetFieldInTablePartition = fmt.Sprintf("T_%s", schema.Type)
break
}
}
}

var partitions []*Table
q := fmt.Sprintf("SELECT COUNT(*) AS total, %s AS %s FROM `%s.%s.%s` WHERE %s IS NOT NULL GROUP BY %s",
timePartitioningField, targetFieldInTablePartition,
q := fmt.Sprintf("SELECT total_rows AS total, partition_id FROM `%s.%s.INFORMATION_SCHEMA.PARTITIONS` WHERE TABLE_NAME = '%s'",
grzr marked this conversation as resolved.
Show resolved Hide resolved
project, dataset, table,
timePartitioningField,
targetFieldInTablePartition,
)

run, err := d.client.Query(q).Run(ctx)
Expand All @@ -292,20 +248,20 @@ func (d *defaultBigQueryClient) GetTablePartitions(ctxIn context.Context, projec
for {
var s tablePartition
err := rowIt.Next(&s)
if err == iterator.Done {
if errors.Is(err, iterator.Done) {
break
}
if err != iterator.Done && err != nil {
} else if err != nil {
return nil, err
}
if s.Total == 0 {
continue
}

partition, err := s.getPartitionFor(targetFieldInTablePartition)
if err != nil {
return nil, fmt.Errorf("GetTablePartitions failed for `%s.%s.%s`, because partition with %s is not supported", project, dataset, table, targetFieldInTablePartition)
partition := s.PartitionID
KaanTolunayKilicOG marked this conversation as resolved.
Show resolved Hide resolved
if partition == "" {
return nil, fmt.Errorf("GetTablePartitions failed for `%s.%s.%s`, because partition_id is empty", project, dataset, table)
}

if _, exists := partitionMetadataCollected[partition]; exists {
// tables that where updated multiple times in the same day are skipped
continue
Expand All @@ -330,11 +286,10 @@ func (d *defaultBigQueryClient) GetDatasets(ctxIn context.Context, project strin
it.ProjectID = project
for {
dataset, err := it.Next()
if err == iterator.Done {
if errors.Is(err, iterator.Done) {
break
}
if err != nil && err != iterator.Done {
return []string{}, errors.Wrap(err, fmt.Sprintf("Datasets.Next() failed for project %s", project))
} else if err != nil {
return []string{}, errors.Join(err, fmt.Errorf("Datasets.Next() failed for project %s", project))
}
if dataset == nil {
return datasets, fmt.Errorf("datasets are nil for project %s", project)
Expand All @@ -344,13 +299,6 @@ func (d *defaultBigQueryClient) GetDatasets(ctxIn context.Context, project strin
return datasets, err
}

func zerofill(intToFill int) string {
if intToFill < 10 {
return "0" + strconv.Itoa(intToFill)
}
return strconv.Itoa(intToFill)
}

// GetDatasetDetails get the details of a bigquery dataset
func (d *defaultBigQueryClient) GetDatasetDetails(ctxIn context.Context, datasetId string) (*bq.DatasetMetadata, error) {
ctx, span := trace.StartSpan(ctxIn, "(*defaultBigQueryClient).GetDatasetDetails")
Expand Down
43 changes: 43 additions & 0 deletions pkg/service/bigquery/bigquery_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package bigquery

import (
"context"
"github.com/ottogroup/penelope/pkg/http/mock"
"github.com/stretchr/testify/assert"
"os"
"testing"
)

func init() {
os.Setenv("PENELOPE_USE_DEFAULT_HTTP_CLIENT", "true")
}

func TestNewBigQueryClient(t *testing.T) {
httpMockHandler := mock.NewHTTPMockHandler()
httpMocks := []mock.MockedHTTPRequest{ // /bigquery/v2/projects/.*/datasets/unknown-dataset
mock.ImpersonationHTTPMock, mock.RetrieveAccessTokenHTTPMock,
mock.DatasetInfoHTTPMock, mock.TableInfoHTTPMock,
mock.TablePartitionJobHTTPMock, mock.TableMetadataPartitionResultHTTPMock,
mock.ExtractJobResultOkHTTPMock,
}
httpMockHandler.Register(httpMocks...)
httpMockHandler.Start()
defer httpMockHandler.Stop()

ctx := context.Background()
provider := &mockImpersonatedTokenConfigProvider{}
client, err := NewBigQueryClient(ctx, provider, "test-project", "test-project-backup")
assert.NoError(t, err)

partitions, err := client.GetTablePartitions(ctx, "test-project", "example-dataset", "example-table")
assert.NoError(t, err)
assert.NotEmpty(t, partitions)
assert.Len(t, partitions, 2)
}

type mockImpersonatedTokenConfigProvider struct {
}

func (mi *mockImpersonatedTokenConfigProvider) GetTargetPrincipalForProject(ctxIn context.Context, projectID string) (string, []string, error) {
return "[email protected]", nil, nil
}
Loading