Skip to content

Commit

Permalink
Release (#1666)
Browse files Browse the repository at this point in the history
* fix create consumer nats with consumer group that was destroyed before

* ability to Nack / terminate a message (#1625)

* partition-default-view (#1649)

* Build UI Static Files (#1650)

Co-authored-by: teammemphis <[email protected]>

* bugfix - consumer creation

* Txt fix (#1652)

* txt fix

* txt fix

* get-station-overview-for-old-station

* Build UI Static Files (#1654)

* copied changes

* createTenant bugfix

* Update README.md (#1662)

* removing cloud related code from backend

* Update README.md (#1664)

* version update

* Rnd 755 remove cloud mentions from the memphis oss version (#1665)

* first iteration

* second iteration

* .

* updated

* chane redirection to discord

* ui build

---------

Co-authored-by: svetaStrech <[email protected]>
Co-authored-by: idanasulinStrech <[email protected]>

---------

Co-authored-by: shohamroditimemphis <[email protected]>
Co-authored-by: shohamroditimemphis <[email protected]>
Co-authored-by: Idan Asulin <[email protected]>
Co-authored-by: Sveta Gimpelson <[email protected]>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: teammemphis <[email protected]>
Co-authored-by: Valera Bronshtein <[email protected]>
Co-authored-by: svetaStrech <[email protected]>
Co-authored-by: daniel-davidd <[email protected]>
Co-authored-by: daniel-davidd <[email protected]>
Co-authored-by: Avitaltrifsik <[email protected]>
  • Loading branch information
12 people authored May 27, 2024
1 parent df6d315 commit c5c7225
Show file tree
Hide file tree
Showing 193 changed files with 942 additions and 13,751 deletions.
12 changes: 5 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
<p align="center">
Please pay attention that Memphis.dev is no longer supported officially by the Superstream team (formerly Memphis.dev) and was released to the public.
<p align="center">

<a href="![Github (4)](https://github.com/memphisdev/memphis-terraform/assets/107035359/a5fe5d0f-22e1-4445-957d-5ce4464e61b1)">[![Github (4)](https://github.com/memphisdev/memphis-terraform/assets/107035359/a5fe5d0f-22e1-4445-957d-5ce4464e61b1)](https://memphis.dev)</a>
<p align="center">
<a href="https://memphis.dev/discord"><img src="https://img.shields.io/discord/963333392844328961?color=6557ff&label=discord" alt="Discord"></a>
Expand All @@ -8,19 +12,13 @@
<img src="https://img.shields.io/github/last-commit/memphisdev/memphis?color=61dfc6&label=last%20commit">
</p>

<b><p align="center">
<a href="https://memphis.dev/pricing/">Cloud</a> - <a href="github.com/memphisdev/memphis-dev-academy">Academy</a> - <a href="https://memphis.dev/docs/">Docs</a> - <a href="https://twitter.com/Memphis_Dev">X</a> - <a href="https://www.youtube.com/channel/UCVdMDLCSxXOqtgrBaRUHKKg">YouTube</a>
</p></b>

<div align="center">

<h4>

**[Memphis.dev](https://memphis.dev)** Is The First Data Streaming Platform Designed For Backend Developers<br>
To Build Event-driven And Real-time Features Faster Than Ever.<br>

<img width="177" alt="cloud_native 2 (5)" src="https://github.com/memphisdev/memphis/assets/107035359/a20ea11c-d509-42bb-a46c-e388c8424101">

</h4>

</div>
Expand Down Expand Up @@ -48,7 +46,7 @@ docker compose -f docker-compose.yml -p memphis up

</div>

## ✨ Key Features [v1.4.2](https://docs.memphis.dev/memphis/release-notes/releases/v1.4.2-latest)
## ✨ Key Features [v1.4.4](https://docs.memphis.dev/memphis/release-notes/releases/v1.4.4-latest)


![20](https://user-images.githubusercontent.com/70286779/220196529-abb958d2-5c58-4c33-b5e0-40f5446515ad.png) Production-ready message broker in under 3 minutes<br>
Expand Down
139 changes: 1 addition & 138 deletions analytics/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,7 @@
// A "Service" is a commercial offering, product, hosted, or managed service, that allows third parties (other than your own employees and contractors acting on your behalf) to access and/or use the Licensed Work or a substantial set of the features or functionality of the Licensed Work to third parties as a software-as-a-service, platform-as-a-service, infrastructure-as-a-service or other similar services that compete with Licensor products or services.
package analytics

import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"

"github.com/memphisdev/memphis/conf"
"github.com/memphisdev/memphis/db"

"github.com/gofrs/uuid"
"github.com/memphisdev/memphis.go"
)

const (
ACCOUNT_ID = 223671990
USERNAME = "traces_producer"
PASSWORD = "usersTracesMemphis@1"
HOST = "aws-eu-central-1.cloud.memphis.dev"
)
import ()

type EventParam struct {
Name string `json:"name"`
Expand All @@ -44,130 +25,12 @@ type EventBody struct {
TimeStamp string `json:"timestamp"`
}

var configuration = conf.GetConfig()
var deploymentId string
var memphisVersion string
var memphisConnection *memphis.Conn

func InitializeAnalytics(memphisV, customDeploymentId string) error {
acc := conf.MemphisGlobalAccountName
if !conf.GetConfig().USER_PASS_BASED_AUTH {
acc = conf.GlobalAccount
}

memphisVersion = memphisV
if customDeploymentId != "" {
deploymentId = customDeploymentId
} else {
exist, deployment, err := db.GetSystemKey("deployment_id", acc)
if err != nil {
return err
} else if !exist {
uid, err := uuid.NewV4()
if err != nil {
return err
}
deploymentId = uid.String()
err = db.InsertSystemKey("deployment_id", deploymentId, acc)
if err != nil {
return err
}
} else {
deploymentId = deployment.Value
}
}

exist, _, err := db.GetSystemKey("analytics", acc)
if err != nil {
return err
} else if !exist {
value := ""
if configuration.ANALYTICS == "true" {
value = "true"
} else {
value = "false"
}

err = db.InsertSystemKey("analytics", value, acc)
if err != nil {
return err
}
}

timeout := 0 * time.Minute
if configuration.PROVIDER == "aws" && configuration.REGION == "eu-central-1" {
timeout = 1 * time.Minute
}
time.AfterFunc(timeout, func() {
memphisConnection, err = memphis.Connect(HOST, USERNAME, memphis.Password(PASSWORD), memphis.AccountId(ACCOUNT_ID), memphis.MaxReconnect(500), memphis.ReconnectInterval(1*time.Second))
if err != nil {
fmt.Printf("InitializeAnalytics: initalize connection failed %s \n", err.Error())
} else {
memphisConnection.CreateStation("users-traces", memphis.Replicas(3), memphis.TieredStorageEnabled(true), memphis.RetentionTypeOpt(memphis.MaxMessageAgeSeconds), memphis.RetentionVal(14400))
}
})

return nil
}

func Close() {
acc := conf.MemphisGlobalAccountName
if !conf.GetConfig().USER_PASS_BASED_AUTH {
acc = conf.GlobalAccount
}
_, analytics, _ := db.GetSystemKey("analytics", acc)
if analytics.Value == "true" {
memphisConnection.Close()
}
}

func SendEvent(tenantName, username string, params map[string]interface{}, eventName string) {
distinctId := deploymentId
if configuration.DEV_ENV != "" {
distinctId = "dev"
}

if eventName != "error" {
tenantName = strings.ReplaceAll(tenantName, "-", "_") // for parsing purposes
if tenantName != "" && username != "" {
distinctId = distinctId + "-" + tenantName + "-" + username
}
}

var eventMsg []byte
var event *EventBody
var err error

creationTime := time.Now().Unix()
timestamp := strconv.FormatInt(creationTime, 10)
params["memphis_version"] = memphisVersion
if eventName == "error" {
event = &EventBody{
DistinctId: distinctId,
Event: "error",
Properties: params,
TimeStamp: timestamp,
}
} else {
event = &EventBody{
DistinctId: distinctId,
Event: eventName,
Properties: params,
TimeStamp: timestamp,
}
}

eventMsg, err = json.Marshal(event)
if err != nil {
return
}
if memphisConnection != nil {
err := memphisConnection.Produce("users-traces", "producer_users_traces", eventMsg, []memphis.ProducerOpt{}, []memphis.ProduceOpt{})
if err != nil { // retry
memphisConnection, err = memphis.Connect(HOST, USERNAME, memphis.Password(PASSWORD), memphis.AccountId(ACCOUNT_ID), memphis.MaxReconnect(500), memphis.ReconnectInterval(1*time.Second))
if err == nil {
memphisConnection.Produce("users-traces", "producer_users_traces", eventMsg, []memphis.ProducerOpt{}, []memphis.ProduceOpt{})
}
}
}
}
64 changes: 58 additions & 6 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3013,7 +3013,7 @@ func GetActiveConsumerByCG(consumersGroup string, stationId int) (bool, models.C
}
defer conn.Release()

query := `SELECT * FROM consumers WHERE consumers_group = $1 AND station_id = $2 AND type = 'application' LIMIT 1`
query := `SELECT * FROM consumers WHERE consumers_group = $1 AND station_id = $2 AND type = 'application' AND is_active = true LIMIT 1`
stmt, err := conn.Conn().Prepare(ctx, "get_active_consumer_by_cg", query)
if err != nil {
return false, models.Consumer{}, err
Expand Down Expand Up @@ -5901,7 +5901,7 @@ func GetMsgByStationIdAndMsgSeq(stationId, messageSeq, partitionNumber int) (boo
return true, message[0], nil
}

func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName string, poisonedCgs []string, messageDetails models.MessagePayload, tenantName string, partitionNumber int) (int, bool, error) {
func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName string, poisonedCgs []string, messageDetails models.MessagePayload, tenantName string, partitionNumber int, validationError string) (int, bool, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
updated := false
Expand Down Expand Up @@ -5979,7 +5979,7 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin
if tenantName != conf.GlobalAccount {
tenantName = strings.ToLower(tenantName)
}
rows, err := tx.Query(ctx, stmt.Name, stationId, messageSeq, producerName, poisonedCgs, messageDetails, updatedAt, "poison", "", tenantName, partitionNumber)
rows, err := tx.Query(ctx, stmt.Name, stationId, messageSeq, producerName, poisonedCgs, messageDetails, updatedAt, "poison", validationError, tenantName, partitionNumber)
if err != nil {
return 0, updated, err
}
Expand Down Expand Up @@ -6008,7 +6008,7 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin
}
}
} else { // then update
query = `UPDATE dls_messages SET poisoned_cgs = ARRAY_APPEND(poisoned_cgs, $1), updated_at = $4 WHERE station_id=$2 AND message_seq=$3 AND not($1 = ANY(poisoned_cgs)) AND tenant_name=$5 RETURNING id`
query = `UPDATE dls_messages SET poisoned_cgs = ARRAY_APPEND(poisoned_cgs, $1), updated_at = $4, validation_error = $6 WHERE station_id=$2 AND message_seq=$3 AND not($1 = ANY(poisoned_cgs)) AND tenant_name=$5 RETURNING id`
stmt, err := tx.Prepare(ctx, "update_poisoned_cgs", query)
if err != nil {
return 0, updated, err
Expand All @@ -6017,7 +6017,7 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin
if tenantName != conf.GlobalAccount {
tenantName = strings.ToLower(tenantName)
}
rows, err = tx.Query(ctx, stmt.Name, poisonedCgs[0], stationId, messageSeq, updatedAt, tenantName)
rows, err = tx.Query(ctx, stmt.Name, poisonedCgs[0], stationId, messageSeq, updatedAt, tenantName, validationError)
if err != nil {
return 0, updated, err
}
Expand Down Expand Up @@ -6756,6 +6756,12 @@ func CreateTenant(name, firebaseOrganizationId, encryptrdInternalWSPass, organiz
}
defer conn.Release()

var colors any
colors = brandData.Colors
if brandData.Colors == nil || len(brandData.Colors) == 0 {
colors = []string{}
}

query := `INSERT INTO tenants
(id, name, firebase_organization_id, internal_ws_pass, organization_name, dark_icon, light_icon, brand_colors)
VALUES(nextval('tenants_seq'),$1, $2, $3, $4, $5, $6, $7)`
Expand All @@ -6766,7 +6772,7 @@ func CreateTenant(name, firebaseOrganizationId, encryptrdInternalWSPass, organiz
}

var tenantId int
rows, err := conn.Conn().Query(ctx, stmt.Name, name, firebaseOrganizationId, encryptrdInternalWSPass, organizationName, brandData.DarkIcon, brandData.LightIcon, brandData.Colors)
rows, err := conn.Conn().Query(ctx, stmt.Name, name, firebaseOrganizationId, encryptrdInternalWSPass, organizationName, brandData.DarkIcon, brandData.LightIcon, colors)
if err != nil {
return models.Tenant{}, err
}
Expand Down Expand Up @@ -8407,3 +8413,49 @@ func RemoveRoleAndPermissions(roleID []int, tenantName string) error {

return nil
}

func RemovePermissionsByTenant(tenantName string) error {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
tenantName = strings.ToLower(tenantName)
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()

query := `DELETE FROM permissions WHERE tenant_name = $1`
stmt, err := conn.Conn().Prepare(ctx, "remove_permissions_by_tenant", query)
if err != nil {
return err
}
_, err = conn.Conn().Query(ctx, stmt.Name, tenantName)
if err != nil {
return err
}

return nil
}

func RemoveRolesByTenant(tenantName string) error {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
tenantName = strings.ToLower(tenantName)
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()

query := `DELETE FROM roles WHERE tenant_name = $1`
stmt, err := conn.Conn().Prepare(ctx, "remove_roles_by_tenant", query)
if err != nil {
return err
}
_, err = conn.Conn().Query(ctx, stmt.Name, tenantName)
if err != nil {
return err
}

return nil
}
8 changes: 8 additions & 0 deletions models/dead_letter_station.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ type SchemaVerseDlsMessageSdk struct {
PartitionNumber int `json:"partition_number"`
}

type NackedDlsMessageSdk struct {
StationName string `json:"station_name"`
Error string `json:"error"`
CgName string `json:"cg_name"`
Seq uint64 `json:"seq"`
Partition int `json:"partition"`
}

type FunctionsDlsMessage struct {
StationID int `json:"station_id"`
TenantName string `json:"tenant_name"`
Expand Down
6 changes: 6 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
Loading

0 comments on commit c5c7225

Please sign in to comment.