diff --git a/db/db.go b/db/db.go index c4c6d2777..756768696 100644 --- a/db/db.go +++ b/db/db.go @@ -571,6 +571,20 @@ func createTables(MetadataDbClient MetadataStorage) error { PRIMARY KEY (id) );` + sharedLocksTable := ` + CREATE TABLE IF NOT EXISTS shared_locks( + id SERIAL NOT NULL, + name VARCHAR NOT NULL, + tenant_name VARCHAR NOT NULL, + lock_held BOOL NOT NULL DEFAULT false, + locked_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (id), + UNIQUE(name, tenant_name), + CONSTRAINT fk_tenant_name_shared_locks + FOREIGN KEY(tenant_name) + REFERENCES tenants(name) + );` + alterAsyncTasks := `DO $$ BEGIN IF EXISTS ( @@ -597,7 +611,7 @@ func createTables(MetadataDbClient MetadataStorage) error { db := MetadataDbClient.Client ctx := MetadataDbClient.Ctx - tables := []string{alterTenantsTable, tenantsTable, alterUsersTable, usersTable, alterAuditLogsTable, auditLogsTable, alterConfigurationsTable, configurationsTable, alterIntegrationsTable, integrationsTable, alterSchemasTable, schemasTable, alterTagsTable, tagsTable, alterStationsTable, stationsTable, alterDlsMsgsTable, dlsMessagesTable, alterConsumersTable, consumersTable, alterSchemaVerseTable, schemaVersionsTable, alterProducersTable, producersTable, alterConnectionsTable, asyncTasksTable, alterAsyncTasks, testEventsTable, functionsTable, attachedFunctionsTable} + tables := []string{alterTenantsTable, tenantsTable, alterUsersTable, usersTable, alterAuditLogsTable, auditLogsTable, alterConfigurationsTable, configurationsTable, alterIntegrationsTable, integrationsTable, alterSchemasTable, schemasTable, alterTagsTable, tagsTable, alterStationsTable, stationsTable, alterDlsMsgsTable, dlsMessagesTable, alterConsumersTable, consumersTable, alterSchemaVerseTable, schemaVersionsTable, alterProducersTable, producersTable, alterConnectionsTable, asyncTasksTable, alterAsyncTasks, testEventsTable, functionsTable, attachedFunctionsTable, sharedLocksTable} for _, table := range tables { _, err := db.Exec(ctx, table) @@ -2703,7 +2717,6 @@ func DeleteProducerByNameStationIDAndConnID(name string, stationId int, connId s return false, err } defer conn.Release() - // query := `DELETE FROM producers WHERE name = $1 AND station_id = $2 AND connection_id = $3 LIMIT 1` query := `DELETE FROM producers WHERE name = $1 AND station_id = $2 AND connection_id = $3 AND EXISTS ( SELECT 1 FROM producers @@ -4188,7 +4201,7 @@ func UpdateIntegration(tenantName string, name string, keys map[string]interface VALUES($1, $2, $3, $4) ON CONFLICT(name, tenant_name) DO UPDATE SET keys = excluded.keys, properties = excluded.properties - RETURNING id, name, keys, properties, tenant_name + RETURNING id, name, keys, properties, tenant_name, is_valid ` stmt, err := conn.Conn().Prepare(ctx, "update_integration", query) if err != nil { @@ -7116,3 +7129,139 @@ func CountProudcersForStation(stationId int) (int64, error) { return count, nil } + +// Shared Locks Functions +func GetAndLockSharedLock(name string, tenantName string) (bool, bool, models.SharedLock, error) { + ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) + defer cancelfunc() + conn, err := MetadataDbClient.Client.Acquire(ctx) + if err != nil { + return false, false, models.SharedLock{}, err + } + defer conn.Release() + tx, err := conn.Begin(ctx) + if err != nil { + return false, false, models.SharedLock{}, err + } + defer tx.Rollback(ctx) + + if tenantName != conf.GlobalAccount { + tenantName = strings.ToLower(tenantName) + } + + selectQuery := `SELECT * FROM shared_locks WHERE name = $1 AND tenant_name = $2 FOR UPDATE LIMIT 1` + stmt, err := tx.Prepare(ctx, "get_and_lock_shared_lock", selectQuery) + if err != nil { + return false, false, models.SharedLock{}, err + } + rows, err := tx.Query(ctx, stmt.Name, name, tenantName) + if err != nil { + if err == pgx.ErrNoRows { + return false, false, models.SharedLock{}, nil + } else { + return false, false, models.SharedLock{}, err + } + } + defer rows.Close() + sharedLocks, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.SharedLock]) + if err != nil && err != pgx.ErrNoRows { + return false, false, models.SharedLock{}, err + } + var sharedLock models.SharedLock + lockTime := time.Now() + newLock := false + if len(sharedLocks) == 0 || err == pgx.ErrNoRows { + insterQuery := `INSERT INTO shared_locks(name, tenant_name, lock_held, locked_at) VALUES($1, $2, $3, $4) RETURNING *` + + stmt, err := conn.Conn().Prepare(ctx, "insert_new_shared_lock_and_lock", insterQuery) + if err != nil { + return false, false, models.SharedLock{}, err + } + + newSharedLock := models.SharedLock{} + if tenantName != conf.GlobalAccount { + tenantName = strings.ToLower(tenantName) + } + rows, err := conn.Conn().Query(ctx, stmt.Name, name, tenantName, true, lockTime) + if err != nil { + return false, false, models.SharedLock{}, err + } + defer rows.Close() + + for rows.Next() { + err := rows.Scan(&newSharedLock.ID, &newSharedLock.Name, &newSharedLock.TenantName, &newSharedLock.LockHeld, &newSharedLock.LockedAt) + if err != nil { + return false, false, models.SharedLock{}, err + } + } + sharedLock = newSharedLock + newLock = true + } else { + sharedLock = sharedLocks[0] + } + + if !sharedLock.LockHeld { + updateQuery := "UPDATE shared_locks SET lock_held = TRUE, locked_at = $1 WHERE id = $2" + lockStmt, err := tx.Prepare(ctx, "lock_shared_lock", updateQuery) + if err != nil { + return false, false, models.SharedLock{}, err + } + _, err = tx.Exec(ctx, lockStmt.Name, lockTime, sharedLock.ID) + if err != nil { + return false, false, models.SharedLock{}, err + } + sharedLock.LockHeld = true + sharedLock.LockedAt = lockTime + } else if !newLock { + return true, false, sharedLock, nil + } + + err = tx.Commit(ctx) + if err != nil { + return false, false, models.SharedLock{}, err + } + return true, true, sharedLock, nil +} + +func SharedLockUnlock(name, tenantName string) error { + ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) + defer cancelfunc() + conn, err := MetadataDbClient.Client.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + + query := `UPDATE shared_locks SET lock_held = FALSE WHERE name = $1 AND tenant_name=$2` + stmt, err := conn.Conn().Prepare(ctx, "unlock_shared_lock", query) + if err != nil { + return err + } + tenantName = strings.ToLower(tenantName) + _, err = conn.Conn().Query(ctx, stmt.Name, name, tenantName) + if err != nil { + return err + } + return nil +} + +func DeleteAllSharedLocks(tenantName string) error { + ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second) + defer cancelfunc() + + conn, err := MetadataDbClient.Client.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + query := `DELETE FROM shared_locks WHERE tenant_name=$1` + stmt, err := conn.Conn().Prepare(ctx, "delete_all_shared_locks", query) + if err != nil { + return err + } + _, err = conn.Conn().Query(ctx, stmt.Name, tenantName) + if err != nil { + return err + } + return nil +} diff --git a/db/db_cloud.go b/db/db_cloud.go index d222c9fb9..17c83e975 100644 --- a/db/db_cloud.go +++ b/db/db_cloud.go @@ -44,3 +44,7 @@ func DeleteAndGetAttachedFunctionsByStation(tenantName string, stationId int, pa func DeleteAndGetAttachedFunctionsByTenant(tenantName string) ([]FunctionSchema, error) { return []FunctionSchema{}, nil } + +func DeleteAllTestEvents(tenantName string) error { + return nil +} diff --git a/models/functions.go b/models/functions.go index ba3d2971d..4edcffa1c 100644 --- a/models/functions.go +++ b/models/functions.go @@ -84,3 +84,26 @@ type GetFunctionDetails struct { Type string `form:"type" json:"type"` Path string `form:"path" json:"path"` } + +type GetFunctionDetailsSchema struct { + Repository string `form:"repo" json:"repo"` + Branch string `form:"branch" json:"branch"` + Owner string `form:"owner" json:"owner"` + Scm string `form:"scm" json:"scm"` + Username string `json:"username"` + TenantName string `json:"tenant_name"` + FunctionName string `form:"function_name" json:"function_name"` + Vesrion int `form:"version" json:"version"` +} + +type GetFunctionFileCodeSchema struct { + Repository string `form:"repo" json:"repo"` + Branch string `form:"branch" json:"branch"` + Owner string `form:"owner" json:"owner"` + Scm string `form:"scm" json:"scm"` + TenantName string `json:"tenant_name"` + FunctionName string `form:"function_name" json:"function_name"` + Version int `form:"version" json:"version"` + Username string `json:"username"` + Path string `form:"path" json:"path"` +} diff --git a/models/shared_locks.go b/models/shared_locks.go new file mode 100644 index 000000000..98fb83c89 --- /dev/null +++ b/models/shared_locks.go @@ -0,0 +1,22 @@ +// Copyright 2022-2023 The Memphis.dev Authors +// Licensed under the Memphis Business Source License 1.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// Changed License: [Apache License, Version 2.0 (https://www.apache.org/licenses/LICENSE-2.0), as published by the Apache Foundation. +// +// https://github.com/memphisdev/memphis/blob/master/LICENSE +// +// Additional Use Grant: You may make use of the Licensed Work (i) only as part of your own product or service, provided it is not a message broker or a message queue product or service; and (ii) provided that you do not use, provide, distribute, or make available the Licensed Work as a Service. +// A "Service" is a commercial offering, product, hosted, or managed service, that allows third parties (other than your own employees and contractors acting on your behalf) to access and/or use the Licensed Work or a substantial set of the features or functionality of the Licensed Work to third parties as a software-as-a-service, platform-as-a-service, infrastructure-as-a-service or other similar services that compete with Licensor products or services. +package models + +import "time" + +type SharedLock struct { + ID int `json:"id"` + Name string `json:"name"` + TenantName string `json:"tenant_name"` + LockHeld bool `json:"lock_held"` + LockedAt time.Time `json:"locked_at"` +} diff --git a/server/memphis_cloud.go b/server/memphis_cloud.go index 488468e56..4ddc03896 100644 --- a/server/memphis_cloud.go +++ b/server/memphis_cloud.go @@ -2215,3 +2215,10 @@ func GetStationAttachedFunctionsByPartitions(stationID int, partitionsList []int func getInternalUserPassword() string { return configuration.ROOT_PASSWORD } + +func sendDeleteAllFunctionsReqToMS(user models.User, tenantName, scmType, repo, branch, computeEngine, owner string, uninstall bool) error { + return nil +} + +func sendCloneFunctionReqToMS(connectedRepo interface{}, user models.User, scm string, bodyToUpdate models.CreateIntegrationSchema, index int) { +} diff --git a/server/memphis_handlers_integrations.go b/server/memphis_handlers_integrations.go index 2a35300fd..100729182 100644 --- a/server/memphis_handlers_integrations.go +++ b/server/memphis_handlers_integrations.go @@ -196,6 +196,56 @@ func (it IntegrationsHandler) UpdateIntegration(c *gin.Context) { } integration = s3Integration case "github": + _, locked, _, err := db.GetAndLockSharedLock("functions", user.TenantName) + if err != nil { + serv.Errorf("[tenant: %v][user: %v]UpdateIntegration at GetAndLockSharedLock: %v", user.TenantName, user.Username, err.Error()) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + return + } + exist, integrationFromDb, err := db.GetIntegration("github", user.TenantName) + if err != nil { + serv.Errorf("[tenant: %v][user: %v]UpdateIntegration at GetIntegration: %v", user.TenantName, user.Username, err.Error()) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + return + } + if !exist { + serv.Warnf("[tenant: %v]UpdateIntegration: Integration does not exist %v", user.TenantName, body.Name) + c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Unsupported integration type - " + body.Name}) + return + } + if !locked { + serv.Warnf("[tenant: %v]UpdateIntegration: Integration %v: Processing a branch is currently in progress. Please wait for the current processes to complete.", user.TenantName, body.Name) + c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Processing a branch is currently in progress. Please wait for the current processes to complete."}) + return + } + connectedRepos := integrationFromDb.Keys["connected_repos"].([]interface{}) + connectedReposUpdated := body.Keys["connected_repos"].([]interface{}) + for _, connectedRepo := range connectedRepos { + if !containsRepo(connectedReposUpdated, connectedRepo) { + connectedRepoName, repoNameOK := connectedRepo.(map[string]interface{})["repo_name"].(string) + connectedRepoBranch, branchNameOK := connectedRepo.(map[string]interface{})["branch"].(string) + repoOwner, repoOwnerOK := connectedRepo.(map[string]interface{})["repo_owner"].(string) + if !repoNameOK || !branchNameOK || !repoOwnerOK { + serv.Errorf("[tenant: %v][user: %v]UpdateIntegration at handleUpdateGithubIntegration: %v", user.TenantName, user.Username, "Repo name, branch or owner name is missing") + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + return + } + err = sendDeleteAllFunctionsReqToMS(user, user.TenantName, integrationType, connectedRepoName, connectedRepoBranch, "aws_lambda", repoOwner, false) + if err != nil { + serv.Errorf("[tenant: %v][user: %v]UpdateIntegration at deleteAllFunctionMs: Repo %v: %v", user.TenantName, user.Username, connectedRepoName, err.Error()) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + return + } + } + } + for i, connectedRepoUpdated := range connectedReposUpdated { + if !containsRepo(connectedRepos, connectedRepoUpdated) { + connectedRepoUpdated.(map[string]interface{})["in_progress"] = true + connectedReposUpdated[i] = connectedRepoUpdated + go sendCloneFunctionReqToMS(connectedRepoUpdated, user, integrationType, body, i) + } + } + body.Keys["connected_repos"] = connectedReposUpdated githubIntegration, errorCode, err := it.handleUpdateGithubIntegration(user, body) if err != nil { if errorCode == 500 { @@ -249,6 +299,18 @@ func (it IntegrationsHandler) DisconnectIntegration(c *gin.Context) { integrationType := strings.ToLower(body.Name) if integrationType == "github" { + _, locked, _, err := db.GetAndLockSharedLock("functions", user.TenantName) + if err != nil { + serv.Errorf("[tenant: %v][user: %v]DisconnectIntegration at GetAndLockSharedLock: %v", user.TenantName, user.Username, err.Error()) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + return + } + if !locked { + errMsg := "Processing a branch or a function is currently in progress. Please wait for the current process to complete." + serv.Warnf("[tenant: %v]DisconnectIntegration: %v", user.TenantName, errMsg) + c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": errMsg}) + return + } err = deleteInstallationForAuthenticatedGithubApp(user.TenantName) if err != nil { if strings.Contains(err.Error(), "does not exist") { @@ -262,6 +324,12 @@ func (it IntegrationsHandler) DisconnectIntegration(c *gin.Context) { c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) return } + err = sendDeleteAllFunctionsReqToMS(user, user.TenantName, integrationType, "", "", "aws_lambda", "", false) + if err != nil { + serv.Errorf("[tenant: %v][user: %v]DisconnectIntegration at deleteAllFunctionMs: Repo %v: %v", user.TenantName, user.Username, err.Error()) + c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) + return + } } err = db.DeleteIntegration(integrationType, user.TenantName) @@ -616,3 +684,23 @@ func (s *Server) PurgeIntegrationsAuditLogs(tenantName string) { serv.Errorf("[tenant: %v]PurgeIntegrationsAuditLogs at respErr: %v", tenantName, respErr.Error()) } } + +func SharedLockUnlock(name, tenantName string) { + err := db.SharedLockUnlock(name, tenantName) + if err != nil { + serv.Errorf("[tenant: %v] at SharedLockUnlock: %v", tenantName, err.Error()) + } +} + +func containsRepo(repos []interface{}, target interface{}) bool { + for _, repo := range repos { + mapBranch := repo.(map[string]interface{})["branch"].(string) + mapRepo := repo.(map[string]interface{})["repo_name"].(string) + targetBranch := target.(map[string]interface{})["branch"].(string) + targetRepo := target.(map[string]interface{})["repo_name"].(string) + if mapBranch == targetBranch && mapRepo == targetRepo { + return true + } + } + return false +} diff --git a/server/memphis_handlers_stations.go b/server/memphis_handlers_stations.go index d03993176..25df7fe95 100644 --- a/server/memphis_handlers_stations.go +++ b/server/memphis_handlers_stations.go @@ -2527,7 +2527,7 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName func (s *Server) handleResendAllFailure(user models.User, stationId int, tenantName, stationName string) { systemMessage := SystemMessage{ - MessageType: "Error", + MessageType: "error", MessagePayload: fmt.Sprintf("Resend all unacked messages operation in station %s, triggered by user %s has failed due to an internal error:", stationName, user.Username), } err := serv.sendSystemMessageOnWS(user, systemMessage) diff --git a/server/memphis_handlers_user_mgmt.go b/server/memphis_handlers_user_mgmt.go index ba3755575..a59615809 100644 --- a/server/memphis_handlers_user_mgmt.go +++ b/server/memphis_handlers_user_mgmt.go @@ -151,6 +151,11 @@ func removeTenantResources(tenantName string, user models.User) error { return err } + err = db.DeleteAllTestEvents(tenantName) + if err != nil { + return err + } + _, err = db.DeleteAndGetAttachedFunctionsByTenant(tenantName) if err != nil { return err @@ -162,6 +167,20 @@ func removeTenantResources(tenantName string, user models.User) error { return err } + err = sendDeleteAllFunctionsReqToMS(user, tenantName, "github", "", "", "aws_lambda", "", true) + if err != nil { + return err + } + err = deleteInstallationForAuthenticatedGithubApp(user.TenantName) + if err != nil { + return err + } + + err = db.DeleteIntegrationsByTenantName(tenantName) + if err != nil { + return err + } + users_list, err := db.DeleteUsersByTenant(tenantName) if err != nil { return err @@ -174,7 +193,7 @@ func removeTenantResources(tenantName string, user models.User) error { return err } - err = db.DeleteIntegrationsByTenantName(tenantName) + err = db.DeleteAllSharedLocks(tenantName) if err != nil { return err }