Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rnd 124 remove durablity from internal temp consumers #1555

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var (

const (
// VERSION is the current version for the memphis server.
VERSION = "1.4.1" // ** changed by Memphis
VERSION = "1.4.2" // ** changed by Memphis

// ** added by Memphis
DEFAULT_SERVER_NAME = "memphis-0"
Expand Down
2 changes: 1 addition & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3330,7 +3330,7 @@ func (o *consumer) decDeliveryCount(sseq uint64) {
// send a delivery exceeded advisory.
func (o *consumer) notifyDeliveryExceeded(sseq, dc uint64, sm *StoreMsg) { // ** added by memphis (sm to notifyDeliveryExceeded) **
// *** added by memphis
if strings.HasPrefix(o.stream, "$memphis") { // skipping memphis streams max deliveries events
if strings.HasPrefix(o.stream, MEMPHIS_GLOBAL_ACCOUNT) { // skipping memphis streams max deliveries events
return
}
// added by memphis ***
Expand Down
2 changes: 1 addition & 1 deletion server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3740,7 +3740,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
// ** added by memphis
// send the message to tiered 2 storage if needed
tieredStorageEnabled := fs.cfg.StreamConfig.TieredStorageEnabled
if !secure && !strings.HasPrefix(fs.cfg.StreamConfig.Name, "$memphis") && tieredStorageEnabled && serv != nil {
if !secure && !strings.HasPrefix(fs.cfg.StreamConfig.Name, MEMPHIS_GLOBAL_ACCOUNT) && tieredStorageEnabled && serv != nil {
err = serv.sendToTier2Storage(fs, copyBytes(sm.buf), sm.seq, "s3")
if err != nil {
return false, err
Expand Down
4 changes: 2 additions & 2 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4975,8 +4975,8 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err
if isLeader {
// ** added by Memphis
logFunc := s.Noticef
if strings.Contains(streamName, "$memphis") ||
strings.Contains(consumerName, "$memphis") {
if strings.Contains(streamName, MEMPHIS_GLOBAL_ACCOUNT) ||
strings.Contains(consumerName, MEMPHIS_GLOBAL_ACCOUNT) {
logFunc = s.Debugf
}
// ** added by Memphis
Expand Down
4 changes: 2 additions & 2 deletions server/memphis_handlers_consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func (ch ConsumersHandler) GetDelayedCgsByTenant(tenantName string, streams []*S
consumers := make(map[string]map[string]models.DelayedCg, 0)
consumerNames := []string{}
for _, stream := range streams {
if strings.HasPrefix(stream.Config.Name, "$memphis") {
if strings.HasPrefix(stream.Config.Name, MEMPHIS_GLOBAL_ACCOUNT) {
continue
}
offset := 0
Expand All @@ -510,7 +510,7 @@ func (ch ConsumersHandler) GetDelayedCgsByTenant(tenantName string, streams []*S
return []models.DelayedCgResp{}, err
}
for _, consumer := range resp.Consumers {
if strings.HasPrefix(consumer.Config.FilterSubject, "$memphis") || !strings.HasSuffix(consumer.Config.FilterSubject, ".final") { // skip consumers that are not user consumers
if strings.HasPrefix(consumer.Config.FilterSubject, MEMPHIS_GLOBAL_ACCOUNT) || !strings.HasSuffix(consumer.Config.FilterSubject, ".final") { // skip consumers that are not user consumers
continue
}
if strings.Contains(consumer.Stream, "$") {
Expand Down
2 changes: 1 addition & 1 deletion server/memphis_handlers_dls_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (pmh PoisonMessagesHandler) GetDlsMessageDetailsById(messageId int, dlsType
}

for header := range dlsMsg.MessageDetails.Headers {
if strings.HasPrefix(header, "$memphis") {
if strings.HasPrefix(header, MEMPHIS_GLOBAL_ACCOUNT) {
delete(dlsMsg.MessageDetails.Headers, header)
}
}
Expand Down
9 changes: 2 additions & 7 deletions server/memphis_handlers_integrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,8 @@ func (s *Server) getIntegrationAuditLogs(integrationType, tenantName string) ([]
durableName := INTEGRATIONS_AUDIT_LOGS_CONSUMER + "_" + uid
cc := ConsumerConfig{
DeliverPolicy: DeliverAll,
AckPolicy: AckExplicit,
Durable: durableName,
AckPolicy: AckNone,
Name: durableName,
Replicas: 1,
FilterSubject: filterSubject,
}
Expand All @@ -631,8 +631,6 @@ func (s *Server) getIntegrationAuditLogs(integrationType, tenantName string) ([]
req := []byte(strconv.FormatUint(amount, 10))
sub, err := s.subscribeOnAcc(s.MemphisGlobalAccount(), reply, reply+"_sid", func(_ *client, subject, reply string, msg []byte) {
go func(respCh chan StoredMsg, subject, reply string, msg []byte) {
// ack
s.sendInternalAccountMsg(s.MemphisGlobalAccount(), reply, []byte(_EMPTY_))
rawTs := tokenAt(reply, 8)
seq, _, _ := ackReplyInfo(reply)

Expand Down Expand Up @@ -669,9 +667,6 @@ func (s *Server) getIntegrationAuditLogs(integrationType, tenantName string) ([]
cleanup:
timer.Stop()
s.unsubscribeOnAcc(s.MemphisGlobalAccount(), sub)
time.AfterFunc(500*time.Millisecond, func() {
serv.memphisRemoveConsumer(s.MemphisGlobalAccountString(), integrationsAuditLogsStream, durableName)
})

resMsgs := []models.IntegrationsAuditLog{}
for _, msg := range msgs {
Expand Down
7 changes: 2 additions & 5 deletions server/memphis_handlers_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,8 +892,8 @@ func (s *Server) GetSystemLogs(amount uint64,
cc := ConsumerConfig{
OptStartSeq: startSeq,
DeliverPolicy: DeliverByStartSequence,
AckPolicy: AckExplicit,
Durable: durableName,
AckPolicy: AckNone,
Name: durableName,
Replicas: 1,
}

Expand All @@ -912,8 +912,6 @@ func (s *Server) GetSystemLogs(amount uint64,
req := []byte(strconv.FormatUint(amount, 10))
sub, err := s.subscribeOnAcc(s.MemphisGlobalAccount(), reply, reply+"_sid", func(_ *client, subject, reply string, msg []byte) {
go func(respCh chan StoredMsg, subject, reply string, msg []byte) {
// ack
s.sendInternalAccountMsg(s.MemphisGlobalAccount(), reply, []byte(_EMPTY_))
rawTs := tokenAt(reply, 8)
seq, _, _ := ackReplyInfo(reply)

Expand Down Expand Up @@ -950,7 +948,6 @@ func (s *Server) GetSystemLogs(amount uint64,
cleanup:
timer.Stop()
s.unsubscribeOnAcc(s.MemphisGlobalAccount(), sub)
time.AfterFunc(500*time.Millisecond, func() { serv.memphisRemoveConsumer(s.MemphisGlobalAccountString(), syslogsStreamName, durableName) })

var resMsgs []models.Log
if uint64(len(msgs)) < amount && streamInfo.State.Msgs > amount && streamInfo.State.FirstSeq < startSeq {
Expand Down
6 changes: 3 additions & 3 deletions server/memphis_handlers_stations.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ func (sh StationsHandler) GetStationsDetails(tenantName string) ([]models.Extend
}
for _, info := range allStreamInfo {
streamName := info.Config.Name
if !strings.Contains(streamName, "$memphis") {
if !strings.Contains(streamName, MEMPHIS_GLOBAL_ACCOUNT) {
if strings.Contains(streamName, "$") {
stationNameAndPartition := strings.Split(streamName, "$")
stationTotalMsgs[stationNameAndPartition[0]] += int(info.State.Subjects[streamName+".final"])
Expand Down Expand Up @@ -777,7 +777,7 @@ func (sh StationsHandler) GetAllStationsDetailsLight(shouldExtend bool, tenantNa
}
for _, info := range streamsInfo {
streamName := info.Config.Name
if !strings.Contains(streamName, "$memphis") {
if !strings.Contains(streamName, MEMPHIS_GLOBAL_ACCOUNT) {
totalMessages += info.State.Subjects[streamName+".final"]
if strings.Contains(streamName, "$") {
stationNameAndPartition := strings.Split(streamName, "$")
Expand Down Expand Up @@ -1846,7 +1846,7 @@ func (sh StationsHandler) GetMessageDetails(c *gin.Context) {
producedByHeader := strings.ToLower(headersJson["$memphis_producedBy"])

for header := range headersJson {
if strings.HasPrefix(header, "$memphis") {
if strings.HasPrefix(header, MEMPHIS_GLOBAL_ACCOUNT) {
delete(headersJson, header)
}
}
Expand Down
10 changes: 3 additions & 7 deletions server/memphis_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,7 @@ func (s *Server) GetMessagesFromPartition(station models.Station, streamName str
producedByHeader := strings.ToLower(headersJson["$memphis_producedBy"])

for header := range headersJson {
if strings.HasPrefix(header, "$memphis") {
if strings.HasPrefix(header, MEMPHIS_GLOBAL_ACCOUNT) {
delete(headersJson, header)
}
}
Expand Down Expand Up @@ -1210,8 +1210,8 @@ func (s *Server) memphisGetMsgs(tenantName, filterSubj, streamName string, start
FilterSubject: filterSubj,
OptStartSeq: startSeq,
DeliverPolicy: DeliverByStartSequence,
Durable: durableName,
AckPolicy: AckExplicit,
Name: durableName,
AckPolicy: AckNone,
Replicas: consumerReplicas,
}

Expand All @@ -1232,9 +1232,6 @@ func (s *Server) memphisGetMsgs(tenantName, filterSubj, streamName string, start

sub, err := s.subscribeOnAcc(account, reply, reply+"_sid", func(_ *client, subject, reply string, msg []byte) {
go func(respCh chan StoredMsg, reply string, msg []byte, findHeader bool) {
// ack
s.sendInternalAccountMsg(account, reply, []byte(_EMPTY_))

rawTs := tokenAt(reply, 8)
seq, _, _ := ackReplyInfo(reply)

Expand Down Expand Up @@ -1285,7 +1282,6 @@ func (s *Server) memphisGetMsgs(tenantName, filterSubj, streamName string, start
cleanup:
timer.Stop()
s.unsubscribeOnAcc(account, sub)
time.AfterFunc(500*time.Millisecond, func() { serv.memphisRemoveConsumer(tenantName, streamName, durableName) })

return msgs, nil
}
Expand Down
2 changes: 1 addition & 1 deletion server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,7 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
// ** added by memphis
// send the message to tiered 2 storage if needed
tieredStorageEnabled := ms.cfg.TieredStorageEnabled
if !secure && !strings.HasPrefix(ms.cfg.Name, "$memphis") && tieredStorageEnabled && serv != nil {
if !secure && !strings.HasPrefix(ms.cfg.Name, MEMPHIS_GLOBAL_ACCOUNT) && tieredStorageEnabled && serv != nil {
serv.sendToTier2Storage(ms, copyBytes(sm.buf), sm.seq, "s3")
}
// ** added by memphis
Expand Down
2 changes: 1 addition & 1 deletion version.conf
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.4.1
1.4.2