Skip to content

Commit

Permalink
Merge pull request #1385 from memphisdev/copy-from-cloud-functions
Browse files Browse the repository at this point in the history
copy from cloud functions
  • Loading branch information
shay23b authored Nov 2, 2023
2 parents ae24b13 + 43ddab4 commit 0138016
Show file tree
Hide file tree
Showing 8 changed files with 317 additions and 5 deletions.
155 changes: 152 additions & 3 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,20 @@ func createTables(MetadataDbClient MetadataStorage) error {
PRIMARY KEY (id)
);`

sharedLocksTable := `
CREATE TABLE IF NOT EXISTS shared_locks(
id SERIAL NOT NULL,
name VARCHAR NOT NULL,
tenant_name VARCHAR NOT NULL,
lock_held BOOL NOT NULL DEFAULT false,
locked_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (id),
UNIQUE(name, tenant_name),
CONSTRAINT fk_tenant_name_shared_locks
FOREIGN KEY(tenant_name)
REFERENCES tenants(name)
);`

alterAsyncTasks := `DO $$
BEGIN
IF EXISTS (
Expand All @@ -597,7 +611,7 @@ func createTables(MetadataDbClient MetadataStorage) error {
db := MetadataDbClient.Client
ctx := MetadataDbClient.Ctx

tables := []string{alterTenantsTable, tenantsTable, alterUsersTable, usersTable, alterAuditLogsTable, auditLogsTable, alterConfigurationsTable, configurationsTable, alterIntegrationsTable, integrationsTable, alterSchemasTable, schemasTable, alterTagsTable, tagsTable, alterStationsTable, stationsTable, alterDlsMsgsTable, dlsMessagesTable, alterConsumersTable, consumersTable, alterSchemaVerseTable, schemaVersionsTable, alterProducersTable, producersTable, alterConnectionsTable, asyncTasksTable, alterAsyncTasks, testEventsTable, functionsTable, attachedFunctionsTable}
tables := []string{alterTenantsTable, tenantsTable, alterUsersTable, usersTable, alterAuditLogsTable, auditLogsTable, alterConfigurationsTable, configurationsTable, alterIntegrationsTable, integrationsTable, alterSchemasTable, schemasTable, alterTagsTable, tagsTable, alterStationsTable, stationsTable, alterDlsMsgsTable, dlsMessagesTable, alterConsumersTable, consumersTable, alterSchemaVerseTable, schemaVersionsTable, alterProducersTable, producersTable, alterConnectionsTable, asyncTasksTable, alterAsyncTasks, testEventsTable, functionsTable, attachedFunctionsTable, sharedLocksTable}

for _, table := range tables {
_, err := db.Exec(ctx, table)
Expand Down Expand Up @@ -2703,7 +2717,6 @@ func DeleteProducerByNameStationIDAndConnID(name string, stationId int, connId s
return false, err
}
defer conn.Release()
// query := `DELETE FROM producers WHERE name = $1 AND station_id = $2 AND connection_id = $3 LIMIT 1`
query := `DELETE FROM producers WHERE name = $1 AND station_id = $2 AND connection_id = $3
AND EXISTS (
SELECT 1 FROM producers
Expand Down Expand Up @@ -4188,7 +4201,7 @@ func UpdateIntegration(tenantName string, name string, keys map[string]interface
VALUES($1, $2, $3, $4)
ON CONFLICT(name, tenant_name) DO UPDATE
SET keys = excluded.keys, properties = excluded.properties
RETURNING id, name, keys, properties, tenant_name
RETURNING id, name, keys, properties, tenant_name, is_valid
`
stmt, err := conn.Conn().Prepare(ctx, "update_integration", query)
if err != nil {
Expand Down Expand Up @@ -7116,3 +7129,139 @@ func CountProudcersForStation(stationId int) (int64, error) {

return count, nil
}

// Shared Locks Functions
func GetAndLockSharedLock(name string, tenantName string) (bool, bool, models.SharedLock, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return false, false, models.SharedLock{}, err
}
defer conn.Release()
tx, err := conn.Begin(ctx)
if err != nil {
return false, false, models.SharedLock{}, err
}
defer tx.Rollback(ctx)

if tenantName != conf.GlobalAccount {
tenantName = strings.ToLower(tenantName)
}

selectQuery := `SELECT * FROM shared_locks WHERE name = $1 AND tenant_name = $2 FOR UPDATE LIMIT 1`
stmt, err := tx.Prepare(ctx, "get_and_lock_shared_lock", selectQuery)
if err != nil {
return false, false, models.SharedLock{}, err
}
rows, err := tx.Query(ctx, stmt.Name, name, tenantName)
if err != nil {
if err == pgx.ErrNoRows {
return false, false, models.SharedLock{}, nil
} else {
return false, false, models.SharedLock{}, err
}
}
defer rows.Close()
sharedLocks, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.SharedLock])
if err != nil && err != pgx.ErrNoRows {
return false, false, models.SharedLock{}, err
}
var sharedLock models.SharedLock
lockTime := time.Now()
newLock := false
if len(sharedLocks) == 0 || err == pgx.ErrNoRows {
insterQuery := `INSERT INTO shared_locks(name, tenant_name, lock_held, locked_at) VALUES($1, $2, $3, $4) RETURNING *`

stmt, err := conn.Conn().Prepare(ctx, "insert_new_shared_lock_and_lock", insterQuery)
if err != nil {
return false, false, models.SharedLock{}, err
}

newSharedLock := models.SharedLock{}
if tenantName != conf.GlobalAccount {
tenantName = strings.ToLower(tenantName)
}
rows, err := conn.Conn().Query(ctx, stmt.Name, name, tenantName, true, lockTime)
if err != nil {
return false, false, models.SharedLock{}, err
}
defer rows.Close()

for rows.Next() {
err := rows.Scan(&newSharedLock.ID, &newSharedLock.Name, &newSharedLock.TenantName, &newSharedLock.LockHeld, &newSharedLock.LockedAt)
if err != nil {
return false, false, models.SharedLock{}, err
}
}
sharedLock = newSharedLock
newLock = true
} else {
sharedLock = sharedLocks[0]
}

if !sharedLock.LockHeld {
updateQuery := "UPDATE shared_locks SET lock_held = TRUE, locked_at = $1 WHERE id = $2"
lockStmt, err := tx.Prepare(ctx, "lock_shared_lock", updateQuery)
if err != nil {
return false, false, models.SharedLock{}, err
}
_, err = tx.Exec(ctx, lockStmt.Name, lockTime, sharedLock.ID)
if err != nil {
return false, false, models.SharedLock{}, err
}
sharedLock.LockHeld = true
sharedLock.LockedAt = lockTime
} else if !newLock {
return true, false, sharedLock, nil
}

err = tx.Commit(ctx)
if err != nil {
return false, false, models.SharedLock{}, err
}
return true, true, sharedLock, nil
}

func SharedLockUnlock(name, tenantName string) error {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()

query := `UPDATE shared_locks SET lock_held = FALSE WHERE name = $1 AND tenant_name=$2`
stmt, err := conn.Conn().Prepare(ctx, "unlock_shared_lock", query)
if err != nil {
return err
}
tenantName = strings.ToLower(tenantName)
_, err = conn.Conn().Query(ctx, stmt.Name, name, tenantName)
if err != nil {
return err
}
return nil
}

func DeleteAllSharedLocks(tenantName string) error {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()

conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()
query := `DELETE FROM shared_locks WHERE tenant_name=$1`
stmt, err := conn.Conn().Prepare(ctx, "delete_all_shared_locks", query)
if err != nil {
return err
}
_, err = conn.Conn().Query(ctx, stmt.Name, tenantName)
if err != nil {
return err
}
return nil
}
4 changes: 4 additions & 0 deletions db/db_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ func DeleteAndGetAttachedFunctionsByStation(tenantName string, stationId int, pa
func DeleteAndGetAttachedFunctionsByTenant(tenantName string) ([]FunctionSchema, error) {
return []FunctionSchema{}, nil
}

func DeleteAllTestEvents(tenantName string) error {
return nil
}
23 changes: 23 additions & 0 deletions models/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,26 @@ type GetFunctionDetails struct {
Type string `form:"type" json:"type"`
Path string `form:"path" json:"path"`
}

type GetFunctionDetailsSchema struct {
Repository string `form:"repo" json:"repo"`
Branch string `form:"branch" json:"branch"`
Owner string `form:"owner" json:"owner"`
Scm string `form:"scm" json:"scm"`
Username string `json:"username"`
TenantName string `json:"tenant_name"`
FunctionName string `form:"function_name" json:"function_name"`
Vesrion int `form:"version" json:"version"`
}

type GetFunctionFileCodeSchema struct {
Repository string `form:"repo" json:"repo"`
Branch string `form:"branch" json:"branch"`
Owner string `form:"owner" json:"owner"`
Scm string `form:"scm" json:"scm"`
TenantName string `json:"tenant_name"`
FunctionName string `form:"function_name" json:"function_name"`
Version int `form:"version" json:"version"`
Username string `json:"username"`
Path string `form:"path" json:"path"`
}
22 changes: 22 additions & 0 deletions models/shared_locks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2022-2023 The Memphis.dev Authors
// Licensed under the Memphis Business Source License 1.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// Changed License: [Apache License, Version 2.0 (https://www.apache.org/licenses/LICENSE-2.0), as published by the Apache Foundation.
//
// https://github.com/memphisdev/memphis/blob/master/LICENSE
//
// Additional Use Grant: You may make use of the Licensed Work (i) only as part of your own product or service, provided it is not a message broker or a message queue product or service; and (ii) provided that you do not use, provide, distribute, or make available the Licensed Work as a Service.
// A "Service" is a commercial offering, product, hosted, or managed service, that allows third parties (other than your own employees and contractors acting on your behalf) to access and/or use the Licensed Work or a substantial set of the features or functionality of the Licensed Work to third parties as a software-as-a-service, platform-as-a-service, infrastructure-as-a-service or other similar services that compete with Licensor products or services.
package models

import "time"

type SharedLock struct {
ID int `json:"id"`
Name string `json:"name"`
TenantName string `json:"tenant_name"`
LockHeld bool `json:"lock_held"`
LockedAt time.Time `json:"locked_at"`
}
7 changes: 7 additions & 0 deletions server/memphis_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2215,3 +2215,10 @@ func GetStationAttachedFunctionsByPartitions(stationID int, partitionsList []int
func getInternalUserPassword() string {
return configuration.ROOT_PASSWORD
}

func sendDeleteAllFunctionsReqToMS(user models.User, tenantName, scmType, repo, branch, computeEngine, owner string, uninstall bool) error {
return nil
}

func sendCloneFunctionReqToMS(connectedRepo interface{}, user models.User, scm string, bodyToUpdate models.CreateIntegrationSchema, index int) {
}
88 changes: 88 additions & 0 deletions server/memphis_handlers_integrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,56 @@ func (it IntegrationsHandler) UpdateIntegration(c *gin.Context) {
}
integration = s3Integration
case "github":
_, locked, _, err := db.GetAndLockSharedLock("functions", user.TenantName)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]UpdateIntegration at GetAndLockSharedLock: %v", user.TenantName, user.Username, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
exist, integrationFromDb, err := db.GetIntegration("github", user.TenantName)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]UpdateIntegration at GetIntegration: %v", user.TenantName, user.Username, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
if !exist {
serv.Warnf("[tenant: %v]UpdateIntegration: Integration does not exist %v", user.TenantName, body.Name)
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Unsupported integration type - " + body.Name})
return
}
if !locked {
serv.Warnf("[tenant: %v]UpdateIntegration: Integration %v: Processing a branch is currently in progress. Please wait for the current processes to complete.", user.TenantName, body.Name)
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Processing a branch is currently in progress. Please wait for the current processes to complete."})
return
}
connectedRepos := integrationFromDb.Keys["connected_repos"].([]interface{})
connectedReposUpdated := body.Keys["connected_repos"].([]interface{})
for _, connectedRepo := range connectedRepos {
if !containsRepo(connectedReposUpdated, connectedRepo) {
connectedRepoName, repoNameOK := connectedRepo.(map[string]interface{})["repo_name"].(string)
connectedRepoBranch, branchNameOK := connectedRepo.(map[string]interface{})["branch"].(string)
repoOwner, repoOwnerOK := connectedRepo.(map[string]interface{})["repo_owner"].(string)
if !repoNameOK || !branchNameOK || !repoOwnerOK {
serv.Errorf("[tenant: %v][user: %v]UpdateIntegration at handleUpdateGithubIntegration: %v", user.TenantName, user.Username, "Repo name, branch or owner name is missing")
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
err = sendDeleteAllFunctionsReqToMS(user, user.TenantName, integrationType, connectedRepoName, connectedRepoBranch, "aws_lambda", repoOwner, false)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]UpdateIntegration at deleteAllFunctionMs: Repo %v: %v", user.TenantName, user.Username, connectedRepoName, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
}
}
for i, connectedRepoUpdated := range connectedReposUpdated {
if !containsRepo(connectedRepos, connectedRepoUpdated) {
connectedRepoUpdated.(map[string]interface{})["in_progress"] = true
connectedReposUpdated[i] = connectedRepoUpdated
go sendCloneFunctionReqToMS(connectedRepoUpdated, user, integrationType, body, i)
}
}
body.Keys["connected_repos"] = connectedReposUpdated
githubIntegration, errorCode, err := it.handleUpdateGithubIntegration(user, body)
if err != nil {
if errorCode == 500 {
Expand Down Expand Up @@ -249,6 +299,18 @@ func (it IntegrationsHandler) DisconnectIntegration(c *gin.Context) {

integrationType := strings.ToLower(body.Name)
if integrationType == "github" {
_, locked, _, err := db.GetAndLockSharedLock("functions", user.TenantName)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]DisconnectIntegration at GetAndLockSharedLock: %v", user.TenantName, user.Username, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
if !locked {
errMsg := "Processing a branch or a function is currently in progress. Please wait for the current process to complete."
serv.Warnf("[tenant: %v]DisconnectIntegration: %v", user.TenantName, errMsg)
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": errMsg})
return
}
err = deleteInstallationForAuthenticatedGithubApp(user.TenantName)
if err != nil {
if strings.Contains(err.Error(), "does not exist") {
Expand All @@ -262,6 +324,12 @@ func (it IntegrationsHandler) DisconnectIntegration(c *gin.Context) {
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
err = sendDeleteAllFunctionsReqToMS(user, user.TenantName, integrationType, "", "", "aws_lambda", "", false)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]DisconnectIntegration at deleteAllFunctionMs: Repo %v: %v", user.TenantName, user.Username, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
}

err = db.DeleteIntegration(integrationType, user.TenantName)
Expand Down Expand Up @@ -616,3 +684,23 @@ func (s *Server) PurgeIntegrationsAuditLogs(tenantName string) {
serv.Errorf("[tenant: %v]PurgeIntegrationsAuditLogs at respErr: %v", tenantName, respErr.Error())
}
}

func SharedLockUnlock(name, tenantName string) {
err := db.SharedLockUnlock(name, tenantName)
if err != nil {
serv.Errorf("[tenant: %v] at SharedLockUnlock: %v", tenantName, err.Error())
}
}

func containsRepo(repos []interface{}, target interface{}) bool {
for _, repo := range repos {
mapBranch := repo.(map[string]interface{})["branch"].(string)
mapRepo := repo.(map[string]interface{})["repo_name"].(string)
targetBranch := target.(map[string]interface{})["branch"].(string)
targetRepo := target.(map[string]interface{})["repo_name"].(string)
if mapBranch == targetBranch && mapRepo == targetRepo {
return true
}
}
return false
}
2 changes: 1 addition & 1 deletion server/memphis_handlers_stations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2527,7 +2527,7 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName

func (s *Server) handleResendAllFailure(user models.User, stationId int, tenantName, stationName string) {
systemMessage := SystemMessage{
MessageType: "Error",
MessageType: "error",
MessagePayload: fmt.Sprintf("Resend all unacked messages operation in station %s, triggered by user %s has failed due to an internal error:", stationName, user.Username),
}
err := serv.sendSystemMessageOnWS(user, systemMessage)
Expand Down
Loading

0 comments on commit 0138016

Please sign in to comment.