Skip to content

Commit

Permalink
Change conn id type to string
Browse files Browse the repository at this point in the history
  • Loading branch information
isp-owner committed Nov 1, 2024
1 parent b72fc78 commit b2b525b
Show file tree
Hide file tree
Showing 16 changed files with 91 additions and 59 deletions.
2 changes: 1 addition & 1 deletion .version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.2.0
4.0.0
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
### v4.0.0
* change conn id type to string, generate it from `crypto/rand`
### v3.0.0
* fully reimplemented with API change (see Migrated to V3 in README.MD)
### v2.1.3
Expand Down
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# etp

[![GoDoc](https://godoc.org/github.com/txix-open/etp/v3?status.svg)](https://godoc.org/github.com/txix-open/etp/v3)
[![GoDoc](https://godoc.org/github.com/txix-open/etp/v4?status.svg)](https://godoc.org/github.com/txix-open/etp/v4)
![Build and test](https://github.com/txix-open/etp/actions/workflows/main.yml/badge.svg)
[![codecov](https://codecov.io/gh/txix-open/etp/branch/master/graph/badge.svg?token=JMTTJ5O6WB)](https://codecov.io/gh/txix-open/etp)
[![Go Report Card](https://goreportcard.com/badge/github.com/txix-open/etp/v3)](https://goreportcard.com/report/github.com/txix-open/etp/v3)
[![Go Report Card](https://goreportcard.com/badge/github.com/txix-open/etp/v4)](https://goreportcard.com/report/github.com/txix-open/etp/v4)

ETP - event transport protocol on WebSocket, simple and powerful.

Expand All @@ -14,7 +14,7 @@ The package based on [github.com/nhooyr/websocket](https://github.com/nhooyr/web
## Install

```bash
go get -u github.com/txix-open/etp/v3
go get -u github.com/txix-open/etp/v4
```

## Features:
Expand Down Expand Up @@ -42,8 +42,8 @@ import (
"net/http"
"time"

"github.com/txix-open/etp/v3"
"github.com/txix-open/etp/v3/msg"
"github.com/txix-open/etp/v4"
"github.com/txix-open/etp/v4/msg"
)

func main() {
Expand Down Expand Up @@ -148,7 +148,7 @@ func main() {
* Each event now is handled in separated goroutine (completely async)
* Significantly reduce code base, removed redundant interfaces
* Fixed some memory leaks and potential deadlocks
* Main package `github.com/txix-open/isp-etp-go/v2` -> `github.com/txix-open/etp/v3`
* Main package `github.com/txix-open/isp-etp-go/v2` -> `github.com/txix-open/etp/v4`
* `OnDefault` -> `OnUnknownEvent`
* `On*` API are the same either `etp.Client` and `etp.Server`
* WAS
Expand Down
4 changes: 2 additions & 2 deletions acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (

"github.com/coder/websocket"
"github.com/stretchr/testify/require"
"github.com/txix-open/etp/v3"
"github.com/txix-open/etp/v3/msg"
"github.com/txix-open/etp/v4"
"github.com/txix-open/etp/v4/msg"
)

type CallHandler struct {
Expand Down
41 changes: 23 additions & 18 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"context"
"errors"
"fmt"
"sync"

"github.com/coder/websocket"
"github.com/txix-open/etp/v3/internal"
"github.com/txix-open/etp/v4/internal"
"sync"
"sync/atomic"
)

var (
Expand All @@ -18,7 +18,7 @@ type Client struct {
mux *mux
idGenerator *internal.IdGenerator
opts *clientOptions
conn *Conn
conn *atomic.Pointer[Conn]
lock sync.Locker
}

Expand All @@ -32,7 +32,7 @@ func NewClient(opts ...ClientOption) *Client {
idGenerator: internal.NewIdGenerator(),
opts: options,
lock: &sync.Mutex{},
conn: nil,
conn: &atomic.Pointer[Conn]{},
}
}

Expand Down Expand Up @@ -65,7 +65,7 @@ func (c *Client) Dial(ctx context.Context, url string) error {
c.lock.Lock()
defer c.lock.Unlock()

if c.conn != nil {
if c.conn.Load() != nil {
return errors.New("already connected")
}

Expand All @@ -78,7 +78,7 @@ func (c *Client) Dial(ctx context.Context, url string) error {

id := c.idGenerator.Next()
conn := newConn(id, resp.Request, ws)
c.conn = conn
c.conn.Store(conn)

keeper := newKeeper(conn, c.mux)
go func() {
Expand All @@ -90,43 +90,48 @@ func (c *Client) Dial(ctx context.Context, url string) error {

c.lock.Lock()
defer c.lock.Unlock()
if c.conn != nil && c.conn.Id() == id {
c.conn = nil
conn := c.conn.Load()
if conn != nil && conn.Id() == id {
c.conn.Store(nil)
}
}()
return nil
}

func (c *Client) Emit(ctx context.Context, event string, data []byte) error {
if c.conn == nil {
conn := c.conn.Load()
if conn == nil {
return ErrClientClosed
}
return c.conn.Emit(ctx, event, data)
return conn.Emit(ctx, event, data)
}

func (c *Client) EmitWithAck(ctx context.Context, event string, data []byte) ([]byte, error) {
if c.conn == nil {
conn := c.conn.Load()
if conn == nil {
return nil, ErrClientClosed
}
return c.conn.EmitWithAck(ctx, event, data)
return conn.EmitWithAck(ctx, event, data)
}

func (c *Client) Ping(ctx context.Context) error {
if c.conn == nil {
conn := c.conn.Load()
if conn == nil {
return ErrClientClosed
}
return c.conn.Ping(ctx)
return conn.Ping(ctx)
}

func (c *Client) Close() error {
c.lock.Lock()
defer c.lock.Unlock()

if c.conn == nil {
conn := c.conn.Load()
if conn == nil {
return ErrClientClosed
}

err := c.conn.Close()
c.conn = nil
err := conn.Close()
c.conn.Store(nil)
return err
}
8 changes: 4 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (

"github.com/coder/websocket"
"github.com/stretchr/testify/require"
"github.com/txix-open/etp/v3"
"github.com/txix-open/etp/v3/msg"
"github.com/txix-open/etp/v3/store"
"github.com/txix-open/etp/v4"
"github.com/txix-open/etp/v4/msg"
"github.com/txix-open/etp/v4/store"
)

func TestClient_OnDisconnect(t *testing.T) {
Expand Down Expand Up @@ -73,7 +73,7 @@ func TestClientHeavyConcurrency(t *testing.T) {

conn.Data().Set("key", conn.Id())
}).On("closeMe", etp.HandlerFunc(func(ctx context.Context, conn *etp.Conn, event msg.Event) []byte {
connId, err := store.Get[uint64](conn.Data(), "key")
connId, err := store.Get[string](conn.Data(), "key")
require.NoError(err)
require.Equal(conn.Id(), connId)

Expand Down
14 changes: 7 additions & 7 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ import (
"net/http"

"github.com/coder/websocket"
"github.com/txix-open/etp/v3/bpool"
"github.com/txix-open/etp/v3/internal"
"github.com/txix-open/etp/v3/msg"
"github.com/txix-open/etp/v3/store"
"github.com/txix-open/etp/v4/bpool"
"github.com/txix-open/etp/v4/internal"
"github.com/txix-open/etp/v4/msg"
"github.com/txix-open/etp/v4/store"
)

type Conn struct {
id uint64
id string
request *http.Request
ws *websocket.Conn
data *store.Store
acks *internal.Acks
}

func newConn(
id uint64,
id string,
request *http.Request,
ws *websocket.Conn,
) *Conn {
Expand All @@ -34,7 +34,7 @@ func newConn(
}
}

func (c *Conn) Id() uint64 {
func (c *Conn) Id() string {
return c.id
}

Expand Down
8 changes: 4 additions & 4 deletions example/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"net/http"
"time"

"github.com/txix-open/etp/v3"
"github.com/txix-open/etp/v3/msg"
"github.com/txix-open/etp/v4"
"github.com/txix-open/etp/v4/msg"
)

func ExampleServer() {
Expand All @@ -18,7 +18,7 @@ func ExampleServer() {
//callback to handle new connection
srv.OnConnect(func(conn *etp.Conn) {
//you have access to original HTTP request
fmt.Printf("id: %d, url: %s, connected\n", conn.Id(), conn.HttpRequest().URL)
fmt.Printf("id: %s, url: %s, connected\n", conn.Id(), conn.HttpRequest().URL)
srv.Rooms().Join(conn, "goodClients") //leave automatically then disconnected

conn.Data().Set("key", "value") //put any data associative with connection
Expand All @@ -31,7 +31,7 @@ func ExampleServer() {

//callback to handle any error during serving
srv.OnError(func(conn *etp.Conn, err error) {
connId := uint64(0)
connId := ""
// be careful, conn can be nil on upgrading protocol error (before success WebSocket connection)
if conn != nil {
connId = conn.Id()
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/txix-open/etp/v3
module github.com/txix-open/etp/v4

go 1.22
go 1.23

require github.com/coder/websocket v1.8.12

Expand Down
2 changes: 1 addition & 1 deletion handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"sync"

"github.com/txix-open/etp/v3/msg"
"github.com/txix-open/etp/v4/msg"
)

type Handler interface {
Expand Down
4 changes: 2 additions & 2 deletions internal/acks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
type Acks struct {
lock sync.Locker
acks map[uint64]*Ack
idGenerator *IdGenerator
idGenerator *SequenceGenerator
}

func NewAcks() *Acks {
return &Acks{
lock: &sync.Mutex{},
acks: make(map[uint64]*Ack),
idGenerator: NewIdGenerator(),
idGenerator: NewSequenceGenerator(),
}
}

Expand Down
26 changes: 22 additions & 4 deletions internal/generator.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
package internal

import (
"crypto/rand"
"encoding/hex"
"sync/atomic"
)

type IdGenerator struct {
type SequenceGenerator struct {
next *atomic.Uint64
}

func NewIdGenerator() *IdGenerator {
return &IdGenerator{
func NewSequenceGenerator() *SequenceGenerator {
return &SequenceGenerator{
next: &atomic.Uint64{},
}
}

func (g *IdGenerator) Next() uint64 {
func (g *SequenceGenerator) Next() uint64 {
return g.next.Add(1)
}

type IdGenerator struct {
}

func NewIdGenerator() *IdGenerator {
return &IdGenerator{}
}

func (g *IdGenerator) Next() string {
value := make([]byte, 16)
_, err := rand.Read(value)
if err != nil {
panic(err)
}
return hex.EncodeToString(value)
}
4 changes: 2 additions & 2 deletions keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package etp
import (
"context"

"github.com/txix-open/etp/v3/internal"
"github.com/txix-open/etp/v3/msg"
"github.com/txix-open/etp/v4/internal"
"github.com/txix-open/etp/v4/msg"
)

type keeper struct {
Expand Down
10 changes: 5 additions & 5 deletions rooms.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ const (

type Rooms struct {
mu sync.RWMutex
rooms map[string]map[uint64]*Conn
rooms map[string]map[string]*Conn
}

func newRooms() *Rooms {
return &Rooms{
rooms: make(map[string]map[uint64]*Conn),
rooms: make(map[string]map[string]*Conn),
}
}

func (s *Rooms) Get(connId uint64) (*Conn, bool) {
func (s *Rooms) Get(connId string) (*Conn, bool) {
s.mu.RLock()
var (
conn *Conn
Expand All @@ -37,15 +37,15 @@ func (s *Rooms) Join(conn *Conn, rooms ...string) {
if conns, ok := s.rooms[room]; ok {
conns[conn.Id()] = conn
} else {
s.rooms[room] = map[uint64]*Conn{
s.rooms[room] = map[string]*Conn{
conn.Id(): conn,
}
}
}
s.mu.Unlock()
}

func (s *Rooms) LeaveByConnId(id uint64, rooms ...string) {
func (s *Rooms) LeaveByConnId(id string, rooms ...string) {
s.mu.Lock()
for _, room := range rooms {
if conns, ok := s.rooms[room]; ok {
Expand Down
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"net/http"

"github.com/coder/websocket"
"github.com/txix-open/etp/v3/internal"
"github.com/txix-open/etp/v4/internal"
)

type Server struct {
Expand Down
Loading

0 comments on commit b2b525b

Please sign in to comment.