Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

function-installation #1355

Merged
merged 2 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/gofrs/uuid"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -563,7 +564,7 @@ func createTables(MetadataDbClient MetadataStorage) error {
tenant_name VARCHAR NOT NULL DEFAULT '$memphis',
station_id INT NOT NULL,
created_by VARCHAR NOT NULL,
UNIQUE(name, tenant_name, station_id)
PRIMARY KEY (id)
);`

alterAsyncTasks := `DO $$
Expand All @@ -573,12 +574,26 @@ func createTables(MetadataDbClient MetadataStorage) error {
) THEN
ALTER TABLE async_tasks ADD COLUMN IF NOT EXISTS created_by VARCHAR NOT NULL;
END IF;
IF EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE table_name = 'async_tasks' AND constraint_type = 'UNIQUE'
AND constraint_name = 'async_tasks_name_tenant_name_station_id_key'
) THEN
ALTER TABLE async_tasks DROP CONSTRAINT async_tasks_name_tenant_name_station_id_key;
END IF;

IF NOT EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE table_name = 'async_tasks' AND constraint_type = 'PRIMARY KEY'
) THEN
ALTER TABLE async_tasks ADD PRIMARY KEY (id);
END IF;
END $$;`

db := MetadataDbClient.Client
ctx := MetadataDbClient.Ctx

tables := []string{alterTenantsTable, tenantsTable, alterUsersTable, usersTable, alterAuditLogsTable, auditLogsTable, alterConfigurationsTable, configurationsTable, alterIntegrationsTable, integrationsTable, alterSchemasTable, schemasTable, alterTagsTable, tagsTable, alterStationsTable, stationsTable, alterDlsMsgsTable, dlsMessagesTable, alterConsumersTable, consumersTable, alterSchemaVerseTable, schemaVersionsTable, alterProducersTable, producersTable, alterConnectionsTable, asyncTasksTable, alterAsyncTasks, testEventsTable}
tables := []string{alterTenantsTable, tenantsTable, alterUsersTable, usersTable, alterAuditLogsTable, auditLogsTable, alterConfigurationsTable, configurationsTable, alterIntegrationsTable, integrationsTable, alterSchemasTable, schemasTable, alterTagsTable, tagsTable, alterStationsTable, stationsTable, alterDlsMsgsTable, dlsMessagesTable, alterConsumersTable, consumersTable, alterSchemaVerseTable, schemaVersionsTable, alterProducersTable, producersTable, alterConnectionsTable, asyncTasksTable, alterAsyncTasks, testEventsTable, installedFunctionsTable}

for _, table := range tables {
_, err := db.Exec(ctx, table)
Expand Down Expand Up @@ -1155,7 +1170,7 @@ func GetStationByName(name string, tenantName string) (bool, models.Station, err
return true, stations[0], nil
}

func GetStationById(messageId int, tenantName string) (bool, models.Station, error) {
func GetStationById(stationId int, tenantName string) (bool, models.Station, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
Expand All @@ -1171,7 +1186,7 @@ func GetStationById(messageId int, tenantName string) (bool, models.Station, err
if tenantName != conf.GlobalAccount {
tenantName = strings.ToLower(tenantName)
}
rows, err := conn.Conn().Query(ctx, stmt.Name, messageId, tenantName)
rows, err := conn.Conn().Query(ctx, stmt.Name, stationId, tenantName)
if err != nil {
return false, models.Station{}, err
}
Expand Down Expand Up @@ -6718,7 +6733,7 @@ func UpsertAsyncTask(task, brokerInCharge string, createdAt time.Time, tenantNam
}
defer conn.Release()

query := `INSERT INTO async_tasks (name, broker_in_charge, created_at, updated_at, tenant_name, station_id, created_by) VALUES($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (name, tenant_name, station_id) DO NOTHING RETURNING *`
query := `INSERT INTO async_tasks (name, broker_in_charge, created_at, updated_at, tenant_name, station_id, created_by) VALUES($1, $2, $3, $4, $5, $6, $7) RETURNING *`
stmt, err := conn.Conn().Prepare(ctx, "upsert_async_task", query)
if err != nil {
return models.AsyncTask{}, err
Expand Down Expand Up @@ -6759,7 +6774,7 @@ func UpsertAsyncTask(task, brokerInCharge string, createdAt time.Time, tenantNam
return models.AsyncTask{}, err
}

err = conn.Conn().QueryRow(ctx, existingStmt.Name, task, tenantName, stationId, username).Scan(
err = conn.Conn().QueryRow(ctx, existingStmt.Name, task, tenantName, stationId).Scan(
&asyncTask.ID,
&asyncTask.Name,
&asyncTask.BrokrInCharge,
Expand Down Expand Up @@ -6852,7 +6867,6 @@ func GetAllAsyncTasks(tenantName string) ([]models.AsyncTaskRes, error) {
return []models.AsyncTaskRes{}, err
}
defer conn.Release()

query := `SELECT a.id, a.name, a.created_at, a.created_by, s.name
FROM async_tasks AS a
LEFT JOIN stations AS s ON a.station_id = s.id
Expand All @@ -6869,8 +6883,32 @@ func GetAllAsyncTasks(tenantName string) ([]models.AsyncTaskRes, error) {
}
defer rows.Close()

asyncTasks, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.AsyncTaskRes])
if err != nil {
var asyncTasks []models.AsyncTaskRes
for rows.Next() {
var task models.AsyncTaskRes
var sName pgtype.Varchar

err := rows.Scan(
&task.ID,
&task.Name,
&task.CreatedAt,
&task.CreatedBy,
&sName,
)
if err != nil {
return []models.AsyncTaskRes{}, err
}

if sName.Status == pgtype.Present {
task.StationName = sName.String
} else {
task.StationName = "" // Handle NULL value
}

asyncTasks = append(asyncTasks, task)
}

if err := rows.Err(); err != nil {
return []models.AsyncTaskRes{}, err
}
if len(asyncTasks) == 0 {
Expand Down
1 change: 1 addition & 0 deletions db/db_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@
package db

const testEventsTable = ``
const installedFunctionsTable = ``
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/golang-jwt/jwt/v4 v4.5.0
github.com/graph-gophers/graphql-go v1.5.0
github.com/hamba/avro/v2 v2.13.0
github.com/jackc/pgtype v1.14.0
github.com/jackc/pgx/v5 v5.3.1
github.com/santhosh-tekuri/jsonschema/v5 v5.1.0
github.com/slack-go/slack v0.11.4
Expand All @@ -46,6 +47,7 @@ require (

require (
github.com/google/go-querystring v1.1.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/puddle/v2 v2.2.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
Expand Down
Loading