Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

copy from cloud functions #1385

Merged
merged 1 commit into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 152 additions & 3 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,20 @@ func createTables(MetadataDbClient MetadataStorage) error {
PRIMARY KEY (id)
);`

sharedLocksTable := `
CREATE TABLE IF NOT EXISTS shared_locks(
id SERIAL NOT NULL,
name VARCHAR NOT NULL,
tenant_name VARCHAR NOT NULL,
lock_held BOOL NOT NULL DEFAULT false,
locked_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (id),
UNIQUE(name, tenant_name),
CONSTRAINT fk_tenant_name_shared_locks
FOREIGN KEY(tenant_name)
REFERENCES tenants(name)
);`

alterAsyncTasks := `DO $$
BEGIN
IF EXISTS (
Expand All @@ -597,7 +611,7 @@ func createTables(MetadataDbClient MetadataStorage) error {
db := MetadataDbClient.Client
ctx := MetadataDbClient.Ctx

tables := []string{alterTenantsTable, tenantsTable, alterUsersTable, usersTable, alterAuditLogsTable, auditLogsTable, alterConfigurationsTable, configurationsTable, alterIntegrationsTable, integrationsTable, alterSchemasTable, schemasTable, alterTagsTable, tagsTable, alterStationsTable, stationsTable, alterDlsMsgsTable, dlsMessagesTable, alterConsumersTable, consumersTable, alterSchemaVerseTable, schemaVersionsTable, alterProducersTable, producersTable, alterConnectionsTable, asyncTasksTable, alterAsyncTasks, testEventsTable, functionsTable, attachedFunctionsTable}
tables := []string{alterTenantsTable, tenantsTable, alterUsersTable, usersTable, alterAuditLogsTable, auditLogsTable, alterConfigurationsTable, configurationsTable, alterIntegrationsTable, integrationsTable, alterSchemasTable, schemasTable, alterTagsTable, tagsTable, alterStationsTable, stationsTable, alterDlsMsgsTable, dlsMessagesTable, alterConsumersTable, consumersTable, alterSchemaVerseTable, schemaVersionsTable, alterProducersTable, producersTable, alterConnectionsTable, asyncTasksTable, alterAsyncTasks, testEventsTable, functionsTable, attachedFunctionsTable, sharedLocksTable}

for _, table := range tables {
_, err := db.Exec(ctx, table)
Expand Down Expand Up @@ -2703,7 +2717,6 @@ func DeleteProducerByNameStationIDAndConnID(name string, stationId int, connId s
return false, err
}
defer conn.Release()
// query := `DELETE FROM producers WHERE name = $1 AND station_id = $2 AND connection_id = $3 LIMIT 1`
query := `DELETE FROM producers WHERE name = $1 AND station_id = $2 AND connection_id = $3
AND EXISTS (
SELECT 1 FROM producers
Expand Down Expand Up @@ -4188,7 +4201,7 @@ func UpdateIntegration(tenantName string, name string, keys map[string]interface
VALUES($1, $2, $3, $4)
ON CONFLICT(name, tenant_name) DO UPDATE
SET keys = excluded.keys, properties = excluded.properties
RETURNING id, name, keys, properties, tenant_name
RETURNING id, name, keys, properties, tenant_name, is_valid
`
stmt, err := conn.Conn().Prepare(ctx, "update_integration", query)
if err != nil {
Expand Down Expand Up @@ -7116,3 +7129,139 @@ func CountProudcersForStation(stationId int) (int64, error) {

return count, nil
}

// Shared Locks Functions
func GetAndLockSharedLock(name string, tenantName string) (bool, bool, models.SharedLock, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return false, false, models.SharedLock{}, err
}
defer conn.Release()
tx, err := conn.Begin(ctx)
if err != nil {
return false, false, models.SharedLock{}, err
}
defer tx.Rollback(ctx)

if tenantName != conf.GlobalAccount {
tenantName = strings.ToLower(tenantName)
}

selectQuery := `SELECT * FROM shared_locks WHERE name = $1 AND tenant_name = $2 FOR UPDATE LIMIT 1`
stmt, err := tx.Prepare(ctx, "get_and_lock_shared_lock", selectQuery)
if err != nil {
return false, false, models.SharedLock{}, err
}
rows, err := tx.Query(ctx, stmt.Name, name, tenantName)
if err != nil {
if err == pgx.ErrNoRows {
return false, false, models.SharedLock{}, nil
} else {
return false, false, models.SharedLock{}, err
}
}
defer rows.Close()
sharedLocks, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.SharedLock])
if err != nil && err != pgx.ErrNoRows {
return false, false, models.SharedLock{}, err
}
var sharedLock models.SharedLock
lockTime := time.Now()
newLock := false
if len(sharedLocks) == 0 || err == pgx.ErrNoRows {
insterQuery := `INSERT INTO shared_locks(name, tenant_name, lock_held, locked_at) VALUES($1, $2, $3, $4) RETURNING *`

stmt, err := conn.Conn().Prepare(ctx, "insert_new_shared_lock_and_lock", insterQuery)
if err != nil {
return false, false, models.SharedLock{}, err
}

newSharedLock := models.SharedLock{}
if tenantName != conf.GlobalAccount {
tenantName = strings.ToLower(tenantName)
}
rows, err := conn.Conn().Query(ctx, stmt.Name, name, tenantName, true, lockTime)
if err != nil {
return false, false, models.SharedLock{}, err
}
defer rows.Close()

for rows.Next() {
err := rows.Scan(&newSharedLock.ID, &newSharedLock.Name, &newSharedLock.TenantName, &newSharedLock.LockHeld, &newSharedLock.LockedAt)
if err != nil {
return false, false, models.SharedLock{}, err
}
}
sharedLock = newSharedLock
newLock = true
} else {
sharedLock = sharedLocks[0]
}

if !sharedLock.LockHeld {
updateQuery := "UPDATE shared_locks SET lock_held = TRUE, locked_at = $1 WHERE id = $2"
lockStmt, err := tx.Prepare(ctx, "lock_shared_lock", updateQuery)
if err != nil {
return false, false, models.SharedLock{}, err
}
_, err = tx.Exec(ctx, lockStmt.Name, lockTime, sharedLock.ID)
if err != nil {
return false, false, models.SharedLock{}, err
}
sharedLock.LockHeld = true
sharedLock.LockedAt = lockTime
} else if !newLock {
return true, false, sharedLock, nil
}

err = tx.Commit(ctx)
if err != nil {
return false, false, models.SharedLock{}, err
}
return true, true, sharedLock, nil
}

func SharedLockUnlock(name, tenantName string) error {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()

query := `UPDATE shared_locks SET lock_held = FALSE WHERE name = $1 AND tenant_name=$2`
stmt, err := conn.Conn().Prepare(ctx, "unlock_shared_lock", query)
if err != nil {
return err
}
tenantName = strings.ToLower(tenantName)
_, err = conn.Conn().Query(ctx, stmt.Name, name, tenantName)
if err != nil {
return err
}
return nil
}

func DeleteAllSharedLocks(tenantName string) error {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()

conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()
query := `DELETE FROM shared_locks WHERE tenant_name=$1`
stmt, err := conn.Conn().Prepare(ctx, "delete_all_shared_locks", query)
if err != nil {
return err
}
_, err = conn.Conn().Query(ctx, stmt.Name, tenantName)
if err != nil {
return err
}
return nil
}
4 changes: 4 additions & 0 deletions db/db_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ func DeleteAndGetAttachedFunctionsByStation(tenantName string, stationId int, pa
func DeleteAndGetAttachedFunctionsByTenant(tenantName string) ([]FunctionSchema, error) {
return []FunctionSchema{}, nil
}

func DeleteAllTestEvents(tenantName string) error {
return nil
}
23 changes: 23 additions & 0 deletions models/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,26 @@ type GetFunctionDetails struct {
Type string `form:"type" json:"type"`
Path string `form:"path" json:"path"`
}

type GetFunctionDetailsSchema struct {
Repository string `form:"repo" json:"repo"`
Branch string `form:"branch" json:"branch"`
Owner string `form:"owner" json:"owner"`
Scm string `form:"scm" json:"scm"`
Username string `json:"username"`
TenantName string `json:"tenant_name"`
FunctionName string `form:"function_name" json:"function_name"`
Vesrion int `form:"version" json:"version"`
}

type GetFunctionFileCodeSchema struct {
Repository string `form:"repo" json:"repo"`
Branch string `form:"branch" json:"branch"`
Owner string `form:"owner" json:"owner"`
Scm string `form:"scm" json:"scm"`
TenantName string `json:"tenant_name"`
FunctionName string `form:"function_name" json:"function_name"`
Version int `form:"version" json:"version"`
Username string `json:"username"`
Path string `form:"path" json:"path"`
}
22 changes: 22 additions & 0 deletions models/shared_locks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2022-2023 The Memphis.dev Authors
// Licensed under the Memphis Business Source License 1.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// Changed License: [Apache License, Version 2.0 (https://www.apache.org/licenses/LICENSE-2.0), as published by the Apache Foundation.
//
// https://github.com/memphisdev/memphis/blob/master/LICENSE
//
// Additional Use Grant: You may make use of the Licensed Work (i) only as part of your own product or service, provided it is not a message broker or a message queue product or service; and (ii) provided that you do not use, provide, distribute, or make available the Licensed Work as a Service.
// A "Service" is a commercial offering, product, hosted, or managed service, that allows third parties (other than your own employees and contractors acting on your behalf) to access and/or use the Licensed Work or a substantial set of the features or functionality of the Licensed Work to third parties as a software-as-a-service, platform-as-a-service, infrastructure-as-a-service or other similar services that compete with Licensor products or services.
package models

import "time"

type SharedLock struct {
ID int `json:"id"`
Name string `json:"name"`
TenantName string `json:"tenant_name"`
LockHeld bool `json:"lock_held"`
LockedAt time.Time `json:"locked_at"`
}
7 changes: 7 additions & 0 deletions server/memphis_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2215,3 +2215,10 @@ func GetStationAttachedFunctionsByPartitions(stationID int, partitionsList []int
func getInternalUserPassword() string {
return configuration.ROOT_PASSWORD
}

func sendDeleteAllFunctionsReqToMS(user models.User, tenantName, scmType, repo, branch, computeEngine, owner string, uninstall bool) error {
return nil
}

func sendCloneFunctionReqToMS(connectedRepo interface{}, user models.User, scm string, bodyToUpdate models.CreateIntegrationSchema, index int) {
}
88 changes: 88 additions & 0 deletions server/memphis_handlers_integrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,56 @@ func (it IntegrationsHandler) UpdateIntegration(c *gin.Context) {
}
integration = s3Integration
case "github":
_, locked, _, err := db.GetAndLockSharedLock("functions", user.TenantName)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]UpdateIntegration at GetAndLockSharedLock: %v", user.TenantName, user.Username, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
exist, integrationFromDb, err := db.GetIntegration("github", user.TenantName)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]UpdateIntegration at GetIntegration: %v", user.TenantName, user.Username, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
if !exist {
serv.Warnf("[tenant: %v]UpdateIntegration: Integration does not exist %v", user.TenantName, body.Name)
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Unsupported integration type - " + body.Name})
return
}
if !locked {
serv.Warnf("[tenant: %v]UpdateIntegration: Integration %v: Processing a branch is currently in progress. Please wait for the current processes to complete.", user.TenantName, body.Name)
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Processing a branch is currently in progress. Please wait for the current processes to complete."})
return
}
connectedRepos := integrationFromDb.Keys["connected_repos"].([]interface{})
connectedReposUpdated := body.Keys["connected_repos"].([]interface{})
for _, connectedRepo := range connectedRepos {
if !containsRepo(connectedReposUpdated, connectedRepo) {
connectedRepoName, repoNameOK := connectedRepo.(map[string]interface{})["repo_name"].(string)
connectedRepoBranch, branchNameOK := connectedRepo.(map[string]interface{})["branch"].(string)
repoOwner, repoOwnerOK := connectedRepo.(map[string]interface{})["repo_owner"].(string)
if !repoNameOK || !branchNameOK || !repoOwnerOK {
serv.Errorf("[tenant: %v][user: %v]UpdateIntegration at handleUpdateGithubIntegration: %v", user.TenantName, user.Username, "Repo name, branch or owner name is missing")
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
err = sendDeleteAllFunctionsReqToMS(user, user.TenantName, integrationType, connectedRepoName, connectedRepoBranch, "aws_lambda", repoOwner, false)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]UpdateIntegration at deleteAllFunctionMs: Repo %v: %v", user.TenantName, user.Username, connectedRepoName, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
}
}
for i, connectedRepoUpdated := range connectedReposUpdated {
if !containsRepo(connectedRepos, connectedRepoUpdated) {
connectedRepoUpdated.(map[string]interface{})["in_progress"] = true
connectedReposUpdated[i] = connectedRepoUpdated
go sendCloneFunctionReqToMS(connectedRepoUpdated, user, integrationType, body, i)
}
}
body.Keys["connected_repos"] = connectedReposUpdated
githubIntegration, errorCode, err := it.handleUpdateGithubIntegration(user, body)
if err != nil {
if errorCode == 500 {
Expand Down Expand Up @@ -249,6 +299,18 @@ func (it IntegrationsHandler) DisconnectIntegration(c *gin.Context) {

integrationType := strings.ToLower(body.Name)
if integrationType == "github" {
_, locked, _, err := db.GetAndLockSharedLock("functions", user.TenantName)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]DisconnectIntegration at GetAndLockSharedLock: %v", user.TenantName, user.Username, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
if !locked {
errMsg := "Processing a branch or a function is currently in progress. Please wait for the current process to complete."
serv.Warnf("[tenant: %v]DisconnectIntegration: %v", user.TenantName, errMsg)
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": errMsg})
return
}
err = deleteInstallationForAuthenticatedGithubApp(user.TenantName)
if err != nil {
if strings.Contains(err.Error(), "does not exist") {
Expand All @@ -262,6 +324,12 @@ func (it IntegrationsHandler) DisconnectIntegration(c *gin.Context) {
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
err = sendDeleteAllFunctionsReqToMS(user, user.TenantName, integrationType, "", "", "aws_lambda", "", false)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]DisconnectIntegration at deleteAllFunctionMs: Repo %v: %v", user.TenantName, user.Username, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
}

err = db.DeleteIntegration(integrationType, user.TenantName)
Expand Down Expand Up @@ -616,3 +684,23 @@ func (s *Server) PurgeIntegrationsAuditLogs(tenantName string) {
serv.Errorf("[tenant: %v]PurgeIntegrationsAuditLogs at respErr: %v", tenantName, respErr.Error())
}
}

func SharedLockUnlock(name, tenantName string) {
err := db.SharedLockUnlock(name, tenantName)
if err != nil {
serv.Errorf("[tenant: %v] at SharedLockUnlock: %v", tenantName, err.Error())
}
}

func containsRepo(repos []interface{}, target interface{}) bool {
for _, repo := range repos {
mapBranch := repo.(map[string]interface{})["branch"].(string)
mapRepo := repo.(map[string]interface{})["repo_name"].(string)
targetBranch := target.(map[string]interface{})["branch"].(string)
targetRepo := target.(map[string]interface{})["repo_name"].(string)
if mapBranch == targetBranch && mapRepo == targetRepo {
return true
}
}
return false
}
2 changes: 1 addition & 1 deletion server/memphis_handlers_stations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2527,7 +2527,7 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName

func (s *Server) handleResendAllFailure(user models.User, stationId int, tenantName, stationName string) {
systemMessage := SystemMessage{
MessageType: "Error",
MessageType: "error",
MessagePayload: fmt.Sprintf("Resend all unacked messages operation in station %s, triggered by user %s has failed due to an internal error:", stationName, user.Username),
}
err := serv.sendSystemMessageOnWS(user, systemMessage)
Expand Down
Loading