Skip to content

Commit

Permalink
Merge branch 'GoogleCloudPlatform:master' into revrepl
Browse files Browse the repository at this point in the history
  • Loading branch information
aksharauke authored Jan 9, 2024
2 parents b28b801 + 3fd736f commit 4e95648
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 114 deletions.
22 changes: 11 additions & 11 deletions common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ const (
//Migration types
MINIMAL_DOWNTIME_MIGRATION = "minimal_downtime"
//Job Resource Types
DATAFLOW_RESOURCE string = "dataflow"
PUBSUB_RESOURCE string = "pubsub"
PUBSUB_TOPIC_RESOURCE string = "pubsub_topic"
PUBSUB_SUB_RESOURCE string = "pubsub_sub"
MONITORING_RESOURCE string = "monitoring"
AGG_MONITORING_RESOURCE string = "aggregated_monitoring"
DATASTREAM_RESOURCE string = "datastream"
GCS_RESOURCE string = "gcs"
// Metadats table names
SMT_JOB_TABLE string = "SMT_JOBS"
SMT_RESOURCES_TABLE string = "SMT_RESOURCES"
DATAFLOW_RESOURCE string = "dataflow"
PUBSUB_RESOURCE string = "pubsub"
PUBSUB_TOPIC_RESOURCE string = "pubsub_topic"
PUBSUB_SUB_RESOURCE string = "pubsub_sub"
MONITORING_RESOURCE string = "monitoring"
AGG_MONITORING_RESOURCE string = "aggregated_monitoring"
DATASTREAM_RESOURCE string = "datastream"
GCS_RESOURCE string = "gcs"
// Metadata table names
SMT_JOB_TABLE string = "SMT_JOB"
SMT_RESOURCE_TABLE string = "SMT_RESOURCE"
)
86 changes: 34 additions & 52 deletions common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,90 +588,72 @@ func ReadSpannerSchema(ctx context.Context, conv *internal.Conv, client *sp.Clie
}

// CompareSchema compares the spanner schema of two conv objects and returns specific error if they don't match
func CompareSchema(conv1, conv2 *internal.Conv) error {
if conv1.SpDialect != conv2.SpDialect {
return fmt.Errorf("spanner dialect don't match")
func CompareSchema(sessionFileConv, actualSpannerConv *internal.Conv) error {
if sessionFileConv.SpDialect != actualSpannerConv.SpDialect {
return fmt.Errorf("spanner dialect don't match: session dialect %v, spanner dialect %v", sessionFileConv.SpDialect, actualSpannerConv.SpDialect)
}
for _, sessionTable := range conv1.SpSchema {
spannerTableId, _ := internal.GetTableIdFromSpName(conv2.SpSchema, sessionTable.Name)
spannerTable := conv2.SpSchema[spannerTableId]
for _, sessionTable := range sessionFileConv.SpSchema {
spannerTableId, err := internal.GetTableIdFromSpName(actualSpannerConv.SpSchema, sessionTable.Name)
if err != nil {
return fmt.Errorf("table %v not found in the spanner database schema but found in the session file. If this table does not need to be migrated, please exclude it during the schema conversion and migration process", sessionTable.Name)
}
spannerTable := actualSpannerConv.SpSchema[spannerTableId]
sessionTableParentName := sessionFileConv.SpSchema[sessionTable.ParentId].Name
spannerTableParentName := actualSpannerConv.SpSchema[spannerTable.ParentId].Name

//table names should match
if sessionTable.Name != spannerTable.Name {
return fmt.Errorf("table name don't match: session table %v, spanner table %v", sessionTable.Name, spannerTable.Name)
}

//parent table names should match
if sessionTableParentName != spannerTableParentName {
return fmt.Errorf("parent table name don't match: session table %v, parent session table name: %v, spanner table %v, parent spanner table name: %v", sessionTable.Name, sessionTableParentName, spannerTable.Name, spannerTableParentName)
}

sessionTableParentName := conv1.SpSchema[sessionTable.ParentId].Name
spannerTableParentName := conv2.SpSchema[spannerTable.ParentId].Name
//number of columns should match
if len(sessionTable.ColDefs) != len(spannerTable.ColDefs) {
return fmt.Errorf("number of columns don't match: session table %v, spanner table %v", sessionTable.Name, spannerTable.Name)
}

if sessionTable.Name != spannerTable.Name || sessionTableParentName != spannerTableParentName ||
len(sessionTable.PrimaryKeys) != len(spannerTable.PrimaryKeys) || len(sessionTable.ColDefs) != len(spannerTable.ColDefs) ||
len(sessionTable.Indexes) != len(spannerTable.Indexes) {
return fmt.Errorf("table detail for table %v don't match", sessionTable.Name)
//primary keys should be of the same length
if len(sessionTable.PrimaryKeys) != len(spannerTable.PrimaryKeys) {
return fmt.Errorf("primary keys don't match: session table primary key length %v: %v, spanner table primary key length %v: %v", sessionTable.Name, len(sessionTable.PrimaryKeys), spannerTable.Name, len(spannerTable.PrimaryKeys))
}

// Sorts both primary key slices based on primary key order
sortKeysByOrder(sessionTable.PrimaryKeys)
sortKeysByOrder(spannerTable.PrimaryKeys)

//primary keys should be of the same order
for idx, sessionPk := range sessionTable.PrimaryKeys {
sessionTablePkCol := sessionTable.ColDefs[sessionPk.ColId]
correspondingSpColId, _ := internal.GetColIdFromSpName(spannerTable.ColDefs, sessionTablePkCol.Name)
spannerTablePkCol := spannerTable.ColDefs[correspondingSpColId]

if sessionTablePkCol.Name != spannerTablePkCol.Name || sessionTable.PrimaryKeys[idx].Desc != spannerTable.PrimaryKeys[idx].Desc {
return fmt.Errorf("primary keys for table %v don't match", sessionTable.Name)
return fmt.Errorf("primary keys for table %v are not identical: session table primary key %v, spanner table primary key %v", sessionTable.Name, sessionTable.PrimaryKeys, spannerTable.PrimaryKeys)
}
}

//columns should be identical in terms of data type, name, length, nullability
for _, sessionColDef := range sessionTable.ColDefs {
correspondingSpColId, _ := internal.GetColIdFromSpName(spannerTable.ColDefs, sessionColDef.Name)
spannerColDef := spannerTable.ColDefs[correspondingSpColId]

// In case of PostgreSQL dialect, Spanner by default adds is_nullable = false to all the columns that are a part of primary key.
// Therefore, we cannot compare NotNull attributes for these columns.
if conv1.SpDialect == constants.DIALECT_POSTGRESQL && FindInPrimaryKey(sessionColDef.Id, sessionTable.PrimaryKeys) {
if sessionFileConv.SpDialect == constants.DIALECT_POSTGRESQL && FindInPrimaryKey(sessionColDef.Id, sessionTable.PrimaryKeys) {
if sessionColDef.Name != spannerColDef.Name ||
sessionColDef.T.IsArray != spannerColDef.T.IsArray || sessionColDef.T.Len != spannerColDef.T.Len || sessionColDef.T.Name != spannerColDef.T.Name {
return fmt.Errorf("column detail for table %v don't match", sessionTable.Name)
return fmt.Errorf("column detail for table %v don't match: session column name: %v, spanner column: %v", sessionTable.Name, sessionColDef, spannerColDef)
}

} else {
if sessionColDef.Name != spannerColDef.Name ||
sessionColDef.T.IsArray != spannerColDef.T.IsArray || sessionColDef.T.Len != spannerColDef.T.Len || sessionColDef.T.Name != spannerColDef.T.Name || sessionColDef.NotNull != spannerColDef.NotNull {
return fmt.Errorf("column detail for table %v don't match", sessionTable.Name)
}
}
}
for _, sessionTableIndex := range sessionTable.Indexes {
found := 0
for _, spannerTableIndex := range spannerTable.Indexes {
if sessionTableIndex.Name == spannerTableIndex.Name {
found = 1

sessionTableName := conv1.SpSchema[sessionTableIndex.TableId].Name
spannerTableName := conv2.SpSchema[spannerTableIndex.TableId].Name

// Sorts both primary key slices based on index key order
sortKeysByOrder(sessionTableIndex.Keys)
sortKeysByOrder(spannerTableIndex.Keys)

if sessionTableName != spannerTableName || sessionTableIndex.Unique != spannerTableIndex.Unique ||
len(sessionTableIndex.Keys) != len(spannerTableIndex.Keys) {
return fmt.Errorf("index %v - details don't match", sessionTableIndex.Name)
}

for idx, indexKey := range sessionTableIndex.Keys {
sessionIndexColumn := sessionTable.ColDefs[indexKey.ColId]
spannerIndexColumnId, _ := internal.GetColIdFromSpName(spannerTable.ColDefs, sessionIndexColumn.Name)
spannerIndexColumn := spannerTable.ColDefs[spannerIndexColumnId]

if sessionIndexColumn.Name != spannerIndexColumn.Name ||
sessionTableIndex.Keys[idx].Desc != spannerTableIndex.Keys[idx].Desc {
return fmt.Errorf("index %v - keys don't match", sessionTableIndex.Name)
}
}
break
return fmt.Errorf("column detail for table %v don't match: session column: %v, spanner column: %v", sessionTable.Name, sessionColDef, spannerColDef)
}
}
if found == 0 {
return fmt.Errorf("index %v not found in spanner schema", sessionTableIndex.Name)
}
}
}
return nil
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
cloud.google.com/go/datastream v1.10.3
cloud.google.com/go/monitoring v1.16.3
cloud.google.com/go/pubsub v1.33.0
cloud.google.com/go/resourcemanager v1.9.4
cloud.google.com/go/spanner v1.53.0
cloud.google.com/go/storage v1.30.1
github.com/DATA-DOG/go-sqlmock v1.5.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIA
cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
cloud.google.com/go/pubsub v1.33.0 h1:6SPCPvWav64tj0sVX/+npCBKhUi/UjJehy9op/V3p2g=
cloud.google.com/go/pubsub v1.33.0/go.mod h1:f+w71I33OMyxf9VpMVcZbnG5KSUkCOUHYpFd5U1GdRc=
cloud.google.com/go/resourcemanager v1.9.4 h1:JwZ7Ggle54XQ/FVYSBrMLOQIKoIT/uer8mmNvNLK51k=
cloud.google.com/go/resourcemanager v1.9.4/go.mod h1:N1dhP9RFvo3lUfwtfLWVxfUWq8+KUQ+XLlHLH3BoFJ0=
cloud.google.com/go/spanner v1.53.0 h1:/NzWQJ1MEhdRcffiutRKbW/AIGVKhcTeivWTDjEyCCo=
cloud.google.com/go/spanner v1.53.0/go.mod h1:liG4iCeLqm5L3fFLU5whFITqP0e0orsAW1uUSrd4rws=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
Expand Down
10 changes: 5 additions & 5 deletions streaming/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func InitiateJobCleanup(ctx context.Context, migrationJobId string, dataShardIds
}
}

func FetchResources(ctx context.Context, migrationJobId string, resourceType string, dataShardIds []string, project string, instance string) ([]SmtResources, error) {
func FetchResources(ctx context.Context, migrationJobId string, resourceType string, dataShardIds []string, project string, instance string) ([]SmtResource, error) {
dbURI := fmt.Sprintf("projects/%s/instances/%s/databases/%s", project, instance, constants.METADATA_DB)
client, err := utils.GetClient(ctx, dbURI)
if err != nil {
Expand All @@ -151,7 +151,7 @@ func FetchResources(ctx context.Context, migrationJobId string, resourceType str
ResourceName,
ResourceType,
TO_JSON_STRING(ResourceData) AS ResourceData
FROM SMT_RESOURCES
FROM SMT_RESOURCE
WHERE JobId = @migrationJobId and ResourceType = @resourceType and JSON_VALUE(ResourceData, '$.DataShardId') IN UNNEST (@dataShardIds)`,
Params: map[string]interface{}{
"migrationJobId": migrationJobId,
Expand All @@ -169,7 +169,7 @@ func FetchResources(ctx context.Context, migrationJobId string, resourceType str
ResourceName,
ResourceType,
TO_JSON_STRING(ResourceData) AS ResourceData
FROM SMT_RESOURCES
FROM SMT_RESOURCE
WHERE JobId = @migrationJobId and ResourceType = @resourceType`,
Params: map[string]interface{}{
"migrationJobId": migrationJobId,
Expand All @@ -178,7 +178,7 @@ func FetchResources(ctx context.Context, migrationJobId string, resourceType str
}
}
iter := txn.Query(ctx, resourceQuery)
jobResourcesList := []SmtResources{}
jobResourcesList := []SmtResource{}
for {
row, e := iter.Next()
if e == iterator.Done {
Expand All @@ -188,7 +188,7 @@ func FetchResources(ctx context.Context, migrationJobId string, resourceType str
err = e
break
}
var jobResource SmtResources
var jobResource SmtResource
row.ToStruct(&jobResource)
jobResourcesList = append(jobResourcesList, jobResource)
}
Expand Down
28 changes: 16 additions & 12 deletions streaming/storage_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,39 @@ import (
// Table structs - These structs map to the Spanner metadata tables

// Stores the migration job level data post orchestration of a migration job
type SmtJobs struct {
type SmtJob struct {
JobId string
JobName string
JobType string
JobStateData string
JobData string
Dialect string
SpannerDatabaseName string
CreatedAt time.Time
UpdatedAt time.Time
}

// Stores the resource level execution data post orchestration of a migration job
type SmtResources struct {
ResourceId string
JobId string
ExternalId string
ResourceName string
ResourceType string
ResourceData string
CreatedAt time.Time
type SmtResource struct {
ResourceId string
JobId string
ExternalId string
ResourceName string
ResourceType string
ResourceStateData string
ResourceData string
CreatedAt time.Time
UpdatedAt time.Time
}

// Storage structs - these structs map to JSON data stored inside the metadata

type MinimaldowntimeJobData struct {
IsSharded bool
Session *internal.Conv
Session *internal.Conv
}

type MinimalDowntimeResourceData struct {
DataShardId string
DataShardId string
ResourcePayload string
}
}
26 changes: 15 additions & 11 deletions streaming/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,16 @@ func writeJobDetails(ctx context.Context, migrationJobId string, isShardedMigrat
logger.Log.Error(fmt.Sprintf("internal error occurred while persisting metadata for migration job %s: %v\n", migrationJobId, err))
return err
}
jobDetails := SmtJobs{
jobDetails := SmtJob{
JobId: migrationJobId,
JobName: migrationJobId,
JobType: constants.MINIMAL_DOWNTIME_MIGRATION,
Dialect: conv.SpDialect,
JobStateData: "{\"state\": \"RUNNING\"}",
JobData: string(jobDataBytes),
SpannerDatabaseName: spannerDatabaseName,
CreatedAt: createTimestamp,
UpdatedAt: createTimestamp,
}
_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
mutation, err := spanner.InsertStruct(constants.SMT_JOB_TABLE, jobDetails)
Expand Down Expand Up @@ -222,16 +224,18 @@ func createResourceMutation(jobId string, externalResourceId string, resourceTyp
if err != nil {
return nil, err
}
jobResource := SmtResources{
ResourceId: resourceId,
JobId: jobId,
ExternalId: externalResourceId,
ResourceType: resourceType,
ResourceName: resourceName,
ResourceData: string(minimalDowntimeResourceDataBytes),
CreatedAt: time.Now(),
}
mutation, err := spanner.InsertStruct(constants.SMT_RESOURCES_TABLE, jobResource)
jobResource := SmtResource{
ResourceId: resourceId,
JobId: jobId,
ExternalId: externalResourceId,
ResourceType: resourceType,
ResourceName: resourceName,
ResourceStateData: "{\"state\": \"CREATED\"}",
ResourceData: string(minimalDowntimeResourceDataBytes),
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
mutation, err := spanner.InsertStruct(constants.SMT_RESOURCE_TABLE, jobResource)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 4e95648

Please sign in to comment.