Skip to content

Commit

Permalink
feat: Avoiding processing the same active connection's duplicate even…
Browse files Browse the repository at this point in the history
…ts (#50)

* feat:avoiding processing the same active connection's duplicate events
  • Loading branch information
AkbaraliShaikh authored Nov 16, 2022
1 parent 4cf098c commit 9c9bdd3
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 5 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ jobs:
run: make install-protoc; make setup
- name: Invoking go test
run: make test
- name: Invoking go bench test
run: make test-bench
build:
runs-on: ubuntu-latest

Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ test: lint
@go list ./... | grep -v "vendor" | grep -v "integration" | xargs go test -count 1 -cover -short -race -timeout 1m -coverprofile ${COVER_FILE}
@go tool cover -func ${COVER_FILE} | tail -1 | xargs echo test coverage:

test-bench: # run benchmark tests
@go test $(shell go list ./... | grep -v "vendor") -v -bench ./... -run=^Benchmark

test_ci: install-protoc setup test

# Docker Run
Expand Down
1 change: 1 addition & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func Load() {

logConfigLoader()
publisherKafkaConfigLoader()
serverConfigLoader()
serverWsConfigLoader()
serverGRPCConfigLoader()
workerConfigLoader()
Expand Down
12 changes: 12 additions & 0 deletions config/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ func TestLogLevel(t *testing.T) {
}

func TestServerConfig(t *testing.T) {
// default value test
serverConfigLoader()
assert.Equal(t, false, Server.DedupEnabled)

// override value test
os.Setenv("SERVER_BATCH_DEDUP_IN_CONNECTION_ENABLED", "true")
serverConfigLoader()
assert.Equal(t, true, Server.DedupEnabled)
}

func TestServerWsConfig(t *testing.T) {
os.Setenv("SERVER_WEBSOCKET_PORT", "8080")
os.Setenv("SERVER_WEBSOCKET_PING_INTERVAL_MS", "1")
os.Setenv("SERVER_WEBSOCKET_PONG_WAIT_INTERVAL_MS", "1")
Expand All @@ -32,6 +43,7 @@ func TestServerConfig(t *testing.T) {
assert.Equal(t, "8080", ServerWs.AppPort)
assert.Equal(t, time.Duration(1)*time.Millisecond, ServerWs.PingInterval)
assert.Equal(t, time.Duration(1)*time.Millisecond, ServerWs.PongWaitInterval)

}

func TestGRPCServerConfig(t *testing.T) {
Expand Down
12 changes: 12 additions & 0 deletions config/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ import (
"github.com/spf13/viper"
)

var Server server
var ServerWs serverWs
var ServerGRPC serverGRPC

type server struct {
DedupEnabled bool
}

type serverWs struct {
AppPort string
ServerMaxConn int
Expand All @@ -29,6 +34,13 @@ type serverGRPC struct {
Port string
}

func serverConfigLoader() {
viper.SetDefault("SERVER_BATCH_DEDUP_IN_CONNECTION_ENABLED", "false")
Server = server{
DedupEnabled: util.MustGetBool("SERVER_BATCH_DEDUP_IN_CONNECTION_ENABLED"),
}
}

func serverWsConfigLoader() {
viper.SetDefault("SERVER_WEBSOCKET_PORT", "8080")
viper.SetDefault("SERVER_WEBSOCKET_MAX_CONN", 30000)
Expand Down
10 changes: 8 additions & 2 deletions docs/docs/reference/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ Toggle CORS check function. Set `true` to check each request origin. Set `false`
* Type: `Optional`
* Default value: `true`

### `SERVER_BATCH_DEDUP_IN_CONNECTION_ENABLED`

The server decides whether or not to handle duplicate batches for the active connection. If a batch is sent with a duplicate ReqGUID, the server uses best attempts to discard the duplicate batches. Set `true` to enable the setting.

* Type `Optional`
* Default value: `false`

## Worker

### `WORKER_BUFFER_CHANNEL_SIZE`
Expand Down Expand Up @@ -230,5 +237,4 @@ Level available are `info` `panic` `fatal` `error` `warn` `info` `debug` `trace`
Based on this parameter the server decides when to send the acknowledgement to the client. Supported values are `0` and `1`.

* Type `Optional`
* Default value: `0`

* Default value: `0`
27 changes: 24 additions & 3 deletions services/rest/websocket/connection/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ var (

type Table struct {
m *sync.RWMutex
connMap map[identification.Identifier]struct{}
connMap map[identification.Identifier]map[string]struct{}
counter map[string]int
maxUser int
}

func NewTable(maxUser int) *Table {
return &Table{
m: &sync.RWMutex{},
connMap: make(map[identification.Identifier]struct{}),
connMap: make(map[identification.Identifier]map[string]struct{}),
maxUser: maxUser,
counter: make(map[string]int),
}
Expand All @@ -44,11 +44,32 @@ func (t *Table) Store(c identification.Identifier) error {
if _, ok := t.connMap[c]; ok {
return errConnDuplicated
}
t.connMap[c] = struct{}{}
t.connMap[c] = make(map[string]struct{})
t.counter[c.Group] = t.counter[c.Group] + 1
return nil
}

func (t *Table) StoreBatch(c identification.Identifier, id string) {
t.m.Lock()
defer t.m.Unlock()
if _, ok := t.connMap[c]; ok {
t.connMap[c][id] = struct{}{}
}
}

func (t *Table) HasBatch(c identification.Identifier, id string) bool {
t.m.RLock()
defer t.m.RUnlock()
_, ok := t.connMap[c][id]
return ok
}

func (t *Table) RemoveBatch(c identification.Identifier, id string) {
t.m.RLock()
defer t.m.RUnlock()
delete(t.connMap[c], id)
}

func (t *Table) Remove(c identification.Identifier) {
t.m.Lock()
defer t.m.Unlock()
Expand Down
112 changes: 112 additions & 0 deletions services/rest/websocket/connection/table_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package connection

import (
"fmt"
"testing"

"github.com/odpf/raccoon/identification"
Expand Down Expand Up @@ -48,3 +49,114 @@ func TestStore(t *testing.T) {
assert.False(t, table.Exists(identification.Identifier{ID: "user1", Group: ""}))
})
}

func TestStoreBatch(t *testing.T) {
t.Run("Should store new event for a connection", func(t *testing.T) {
table := NewTable(10)
table.Store(identification.Identifier{ID: "user1", Group: ""})
table.StoreBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-1")

assert.True(t, table.HasBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-1"))
})

t.Run("Should not store new event if the connection is not active", func(t *testing.T) {
table := NewTable(10)
table.StoreBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-1")

assert.False(t, table.HasBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-1"))
})

t.Run("Should store multiple unique events for a connetion", func(t *testing.T) {
table := NewTable(10)
table.Store(identification.Identifier{ID: "user1", Group: ""})
table.StoreBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-1")
table.StoreBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-2")

assert.True(t, table.HasBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-1"))
assert.True(t, table.HasBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-2"))
})

t.Run("Should store multiple unique events for multiple connetion", func(t *testing.T) {
table := NewTable(10)
table.Store(identification.Identifier{ID: "user1", Group: ""})
table.Store(identification.Identifier{ID: "user2", Group: ""})

table.StoreBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-1")
table.StoreBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-2")
table.StoreBatch(identification.Identifier{ID: "user2", Group: ""}, "request-id-1")
table.StoreBatch(identification.Identifier{ID: "user2", Group: ""}, "request-id-2")

assert.True(t, table.HasBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-1"))
assert.True(t, table.HasBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-2"))
assert.True(t, table.HasBatch(identification.Identifier{ID: "user2", Group: ""}, "request-id-1"))
assert.True(t, table.HasBatch(identification.Identifier{ID: "user2", Group: ""}, "request-id-2"))
})

t.Run("Should remove all the events if connetion is removed or not active", func(t *testing.T) {
table := NewTable(10)

table.Store(identification.Identifier{ID: "user1", Group: ""})
table.Store(identification.Identifier{ID: "user2", Group: ""})

table.StoreBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-1")
table.StoreBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-2")
table.StoreBatch(identification.Identifier{ID: "user2", Group: ""}, "request-id-1")
table.StoreBatch(identification.Identifier{ID: "user2", Group: ""}, "request-id-2")

table.Remove(identification.Identifier{ID: "user1", Group: ""})
table.Remove(identification.Identifier{ID: "user2", Group: ""})

assert.False(t, table.HasBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-1"))
assert.False(t, table.HasBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-2"))
assert.False(t, table.HasBatch(identification.Identifier{ID: "user2", Group: ""}, "request-id-1"))
assert.False(t, table.HasBatch(identification.Identifier{ID: "user2", Group: ""}, "request-id-2"))

table.Store(identification.Identifier{ID: "user1", Group: ""})
table.StoreBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-1")
assert.True(t, table.HasBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-1"))
})

t.Run("Should be able to remove the batch ", func(t *testing.T) {
table := NewTable(10)

table.Store(identification.Identifier{ID: "user1", Group: ""})
table.Store(identification.Identifier{ID: "user2", Group: ""})

table.StoreBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-1")
table.StoreBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-2")
table.StoreBatch(identification.Identifier{ID: "user2", Group: ""}, "request-id-1")
table.StoreBatch(identification.Identifier{ID: "user2", Group: ""}, "request-id-2")

table.RemoveBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-1")
table.RemoveBatch(identification.Identifier{ID: "user2", Group: ""}, "request-id-1")

assert.False(t, table.HasBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-1"))
assert.False(t, table.HasBatch(identification.Identifier{ID: "user2", Group: ""}, "request-id-1"))

assert.True(t, table.HasBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-2"))
assert.True(t, table.HasBatch(identification.Identifier{ID: "user2", Group: ""}, "request-id-2"))

table.RemoveBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-2")
table.RemoveBatch(identification.Identifier{ID: "user2", Group: ""}, "request-id-2")

assert.False(t, table.HasBatch(identification.Identifier{ID: "user1", Group: ""}, "request-id-2"))
assert.False(t, table.HasBatch(identification.Identifier{ID: "user2", Group: ""}, "request-id-2"))

table.RemoveBatch(identification.Identifier{ID: "user1", Group: ""}, "")
})
}

func BenchmarkStoreBatch(b *testing.B) {
table := NewTable(b.N)
for i := 0; i < b.N; i++ {
go func(x int) {
userId := fmt.Sprintf("%s-%d", "user", x)
batchId := fmt.Sprintf("%s-%d", "equest-id-", x)
table.Store(identification.Identifier{ID: userId, Group: ""})
table.StoreBatch(identification.Identifier{ID: userId, Group: ""}, batchId)
assert.True(b, table.HasBatch(identification.Identifier{ID: userId, Group: ""}, batchId))
table.RemoveBatch(identification.Identifier{ID: userId, Group: ""}, batchId)
assert.False(b, table.HasBatch(identification.Identifier{ID: userId, Group: ""}, batchId))
}(i)
}
}
15 changes: 15 additions & 0 deletions services/rest/websocket/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ func (h *Handler) HandlerWSEvents(w http.ResponseWriter, r *http.Request) {
writeBadRequestResponse(conn, s, messageType, payload.ReqGuid, err)
continue
}
if config.Server.DedupEnabled {
// avoiding processing the same active connection's duplicate events.
if h.upgrader.Table.HasBatch(conn.Identifier, payload.ReqGuid) {
metrics.Increment("events_duplicate_total", fmt.Sprintf("request_guid=%s,reason=duplicate,conn_group=%s", payload.ReqGuid, conn.Identifier.Group))
writeSuccessResponse(conn, s, messageType, payload.ReqGuid)
continue
}
h.upgrader.Table.StoreBatch(conn.Identifier, payload.ReqGuid)
}

metrics.Increment("batches_read_total", fmt.Sprintf("status=success,conn_group=%s", conn.Identifier.Group))
h.sendEventCounters(payload.Events, conn.Identifier.Group)

Expand All @@ -121,6 +131,11 @@ func (h *Handler) Ack(conn connection.Conn, resChannel chan AckInfo, s serializa
return nil
case config.Synchronous:
return func(err error) {
if config.Server.DedupEnabled {
if err != nil {
h.upgrader.Table.RemoveBatch(conn.Identifier, reqGuid)
}
}
AckChan <- AckInfo{
MessageType: messageType,
RequestGuid: reqGuid,
Expand Down

0 comments on commit 9c9bdd3

Please sign in to comment.