From 9cc0662dc1f5172f8081df248809cbedc504bb6f Mon Sep 17 00:00:00 2001 From: Taras Halturin Date: Sun, 2 Apr 2023 12:04:56 +0200 Subject: [PATCH] V223 (#154) --- ChangeLog.md | 9 +- README.md | 9 +- etf/decode.go | 306 ++++++++++++++++++++++++++++++++++++++++++++---- etf/etf.go | 2 +- etf/etf_test.go | 198 +++++++++++++++++++++++++++++-- gen/tcp.go | 10 +- node/process.go | 42 ++++--- version.go | 2 +- 8 files changed, 525 insertions(+), 53 deletions(-) diff --git a/ChangeLog.md b/ChangeLog.md index 8020192a..a3407882 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -4,10 +4,17 @@ All notable changes to this project will be documented in this file. This format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +#### [v2.2.3](https://github.com/ergo-services/ergo/releases/tag/v1.999.223) 2023-04-02 [tag version v1.999.223] #### + +This release includes fixes: +- Improved `gen.TCP`. Issue #152 +- Fixed incorrect decoding registered map type using etf.RegisterType +- Fixed race condition on process termination. Issue #153 + #### [v2.2.2](https://github.com/ergo-services/ergo/releases/tag/v1.999.222) 2023-03-01 [tag version v1.999.222] #### * Introduced `gen.Pool`. This behavior implements a basic design pattern with a pool of workers. All messages/requests received by the pool process are forwarded to the workers using the "Round Robin" algorithm. The worker process is automatically restarting on termination. See example here [examples/genpool](https://github.com/ergo-services/examples/tree/master/genpool) -* Removed Erlang RPC support. A while ago Erlang has changed the way of handling this kind of request making this feature more similar to the regular `gen.Server`. So, there is no reason to keep supporting it. Use a regular way of messaging instead - `gen.Server`. +* Removed Erlang RPC support. A while ago Erlang has changed the way of handling this kind of request making this feature more similar to the regular `gen.Server`. So, there is no reason to keep supporting it. Use a regular way of messaging instead - `gen.Server`. * Fixed issue #130 (`StartType` option in `gen.ApplicationSpec` is ignored for the autostarting applications) * Fixed issue #143 (incorrect cleaning up the aliases belonging to the terminated process) diff --git a/README.md b/README.md index ce0b4915..08f95af2 100644 --- a/README.md +++ b/README.md @@ -61,10 +61,17 @@ Golang introduced [v2 rule](https://go.dev/blog/v2-go-modules) a while ago to so Here are the changes of latest release. For more details see the [ChangeLog](ChangeLog.md) +#### [v2.2.3](https://github.com/ergo-services/ergo/releases/tag/v1.999.223) 2023-04-02 [tag version v1.999.223] #### + +This release includes fixes: +- Improved `gen.TCP`. Issue #152 +- Fixed incorrect decoding registered map type using etf.RegisterType +- Fixed race condition on process termination. Issue #153 + #### [v2.2.2](https://github.com/ergo-services/ergo/releases/tag/v1.999.222) 2023-03-01 [tag version v1.999.222] #### * Introduced `gen.Pool`. This behavior implements a basic design pattern with a pool of workers. All messages/requests received by the pool process are forwarded to the workers using the "Round Robin" algorithm. The worker process is automatically restarting on termination. See example here [examples/genpool](https://github.com/ergo-services/examples/tree/master/genpool) -* Removed Erlang RPC support. A while ago Erlang has changed the way of handling this kind of request making this feature more similar to the regular `gen.Server`. So, there is no reason to keep supporting it. Use a regular way of messaging instead - `gen.Server`. +* Removed Erlang RPC support. A while ago Erlang has changed the way of handling this kind of request making this feature more similar to the regular `gen.Server`. So, there is no reason to keep supporting it. Use a regular way of messaging instead - `gen.Server`. * Fixed issue #130 (`StartType` option in `gen.ApplicationSpec` is ignored for the autostarting applications) * Fixed issue #143 (incorrect cleaning up the aliases belonging to the terminated process) diff --git a/etf/decode.go b/etf/decode.go index 4d8e3873..3289bba5 100644 --- a/etf/decode.go +++ b/etf/decode.go @@ -19,7 +19,11 @@ type stackElement struct { i int // current children int termType byte - strict bool // if encoding/decoding registered types must be strict + + regKeyKind reflect.Kind // we should align key and value types for maps (int*, float*) + regValueKind reflect.Kind // in case of decoding into value with registered type + + strict bool // if encoding/decoding registered types must be strict } var ( @@ -207,31 +211,109 @@ func Decode(packet []byte, cache []Atom, options DecodeOptions) (retTerm Term, r } packet = packet[1:] - case ettNewFloat: - if len(packet) < 8 { - return nil, nil, errMalformedNewFloat - } - bits := binary.BigEndian.Uint64(packet[:8]) - - term = math.Float64frombits(bits) - packet = packet[8:] - case ettSmallInteger: if len(packet) == 0 { return nil, nil, errMalformedSmallInteger } - term = int(packet[0]) + i := int(packet[0]) + term = int(i) packet = packet[1:] + if stack == nil { + break + } + + t := stack.regKeyKind + if stack.i&0x01 == 1 { + t = stack.regValueKind + } + switch t { + case reflect.Invalid: + // not registered type + break + case reflect.Int64: + term = int64(i) + case reflect.Int: + term = i + case reflect.Int8: + if i > math.MaxInt8 || i < math.MinInt8 { + panic("overflows int8") + } + term = int8(i) + case reflect.Int16: + term = int16(i) + case reflect.Int32: + term = int32(i) + case reflect.Uint: + term = uint(i) + case reflect.Uint8: + term = uint8(i) + case reflect.Uint16: + term = uint16(i) + case reflect.Uint32: + term = uint32(i) + default: + panic("destination value is not an int* or overflows") + } + case ettInteger: if len(packet) < 4 { return nil, nil, errMalformedInteger } - term = int64(int32(binary.BigEndian.Uint32(packet[:4]))) + // negatives are encoded as ettSmallBig so the value shouldn't be + // greater int32 + i := int32(binary.BigEndian.Uint32(packet[:4])) + term = int64(i) packet = packet[4:] + if stack == nil { + break + } + + t := stack.regKeyKind + if stack.i&0x01 == 1 { + t = stack.regValueKind + } + switch t { + case reflect.Invalid: + // not registered type + break + case reflect.Int64: + term = int64(i) + case reflect.Int: + term = int(i) + case reflect.Int8: + if i > math.MaxInt8 || i < math.MinInt8 { + panic("overflows int8") + } + term = int8(i) + case reflect.Int16: + if i > math.MaxInt16 || i < math.MinInt16 { + panic("overflows int16") + } + term = int16(i) + case reflect.Int32: + term = i + case reflect.Uint: + term = uint(i) + case reflect.Uint8: + if i > math.MaxUint8 { + panic("overflows uint") + } + term = uint8(i) + case reflect.Uint16: + if i > math.MaxUint16 { + panic("overflows uint") + } + term = uint16(i) + case reflect.Uint32: + term = uint32(i) + default: + panic("destination value is not an int* or overflows") + } + case ettSmallBig: if len(packet) == 0 { return nil, nil, errMalformedSmallBig @@ -246,8 +328,7 @@ func Decode(packet []byte, cache []Atom, options DecodeOptions) (retTerm Term, r copy(le8, packet[2:n+2]) smallBig := binary.LittleEndian.Uint64(le8) if negative { - smallBig = -smallBig - term = int64(smallBig) + term = int64(-smallBig) } else { if smallBig > math.MaxInt64 { term = uint64(smallBig) @@ -255,8 +336,116 @@ func Decode(packet []byte, cache []Atom, options DecodeOptions) (retTerm Term, r term = int64(smallBig) } } - packet = packet[n+2:] + + if stack == nil { + break + } + + t := stack.regKeyKind + if stack.i&0x01 == 1 { + t = stack.regValueKind + } + switch t { + case reflect.Invalid: + // not registered type + break + case reflect.Int64: + if negative { + if smallBig > -math.MinInt64 { + panic("overflows int64") + } + term = int64(-smallBig) + } else { + if smallBig > math.MaxInt64 { + panic("overflows int64") + } + term = int64(smallBig) + } + case reflect.Int: + if negative { + if smallBig > -math.MinInt { + panic("overflows int") + } + term = int(-smallBig) + } else { + if smallBig > math.MaxInt { + panic("overflows int") + } + term = int(smallBig) + } + case reflect.Int8: + if negative { + if smallBig > -math.MinInt8 { + panic("overflows int8") + } + term = int8(-smallBig) + } else { + if smallBig > math.MaxInt8 { + panic("overflows int8") + } + term = int8(smallBig) + } + case reflect.Int16: + if negative { + if smallBig > -math.MinInt16 { + panic("overflows int16") + } + term = int16(-smallBig) + } else { + if smallBig > math.MaxInt16 { + panic("overflows int16") + } + term = int16(smallBig) + } + case reflect.Int32: + if negative { + if smallBig > -math.MinInt32 { + panic("overflows int32") + } + term = int32(-smallBig) + } else { + if smallBig > math.MaxInt32 { + panic("overflows int32") + } + term = int32(smallBig) + } + case reflect.Uint: + if negative { + panic("signed integer value") + } + if smallBig > math.MaxUint { + panic("overflows uint") + } + term = uint(smallBig) + case reflect.Uint8: + if negative { + panic("signed integer value") + } + if smallBig > math.MaxUint8 { + panic("overflows uint8") + } + term = uint8(smallBig) + case reflect.Uint16: + if negative { + panic("signed integer value") + } + if smallBig > math.MaxUint16 { + panic("overflows uint16") + } + term = uint16(smallBig) + case reflect.Uint32: + if negative { + panic("signed integer value") + } + if smallBig > math.MaxUint32 { + panic("overflows uint32") + } + term = uint32(smallBig) + default: + panic("destination value is not an int* or overflows") + } + break } ///// @@ -498,6 +687,39 @@ func Decode(packet []byte, cache []Atom, options DecodeOptions) (retTerm Term, r term = b packet = packet[n+5:] + case ettNewFloat: + if len(packet) < 8 { + return nil, nil, errMalformedNewFloat + } + bits := binary.BigEndian.Uint64(packet[:8]) + + f := math.Float64frombits(bits) + term = f + packet = packet[8:] + + if stack == nil { + break + } + + t := stack.regKeyKind + if stack.i&0x01 == 1 { + t = stack.regValueKind + } + switch t { + case reflect.Invalid: + // not registered type + break + case reflect.Float64: + break + case reflect.Float32: + if f > math.MaxFloat32 { + panic("overflows float32") + } + term = float32(f) + default: + panic("destination value is not an float* or overflows") + } + case ettFloat: if len(packet) < 31 { return nil, nil, errMalformedFloat @@ -510,6 +732,29 @@ func Decode(packet []byte, cache []Atom, options DecodeOptions) (retTerm Term, r term = f packet = packet[31:] + if stack == nil { + break + } + + t := stack.regKeyKind + if stack.i&0x01 == 1 { + t = stack.regValueKind + } + switch t { + case reflect.Invalid: + // not registered type + break + case reflect.Float64: + break + case reflect.Float32: + if f > math.MaxFloat32 { + panic("overflows float32") + } + term = float32(f) + default: + panic("destination value is not an float* or overflows") + } + default: term = nil return nil, nil, errMalformedUnknownType @@ -541,11 +786,18 @@ func Decode(packet []byte, cache []Atom, options DecodeOptions) (retTerm Term, r r, found := registered.typesDec[typeName] registered.RUnlock() if found == true { - reg := reflect.Indirect(reflect.New(r.rtype)) - if r.rtype.Kind() == reflect.Slice { - reg = reflect.MakeSlice(r.rtype, stack.children-2, stack.children-1) + switch r.rtype.Kind() { + case reflect.Slice: + reg := reflect.MakeSlice(r.rtype, stack.children-2, stack.children-1) + stack.reg = ® + case reflect.Array: + reg := reflect.Indirect(reflect.New(r.rtype)) + stack.reg = ® + default: + if r.strict { + panic("destination value of registered type is not a slice/array") + } } - stack.reg = ® stack.strict = r.strict if r.strict == false { stack.term.(List)[stack.i] = term @@ -620,8 +872,20 @@ func Decode(packet []byte, cache []Atom, options DecodeOptions) (retTerm Term, r r, found := registered.typesDec[typeName] registered.RUnlock() if found == true { - reg := reflect.MakeMapWithSize(r.rtype, stack.children/2) - stack.reg = ® + if r.rtype.Kind() == reflect.Map { + reg := reflect.MakeMapWithSize(r.rtype, stack.children/2) + if r.rtype.Key().Kind() != reflect.Interface { + stack.regKeyKind = r.rtype.Key().Kind() + } + if r.rtype.Elem().Kind() != reflect.Interface { + stack.regValueKind = r.rtype.Elem().Kind() + } + stack.reg = ® + } else { + if r.strict { + panic("destination value of registered type is not a map") + } + } stack.strict = r.strict if r.strict == false { if stack.i&0x01 == 0x01 { // a value diff --git a/etf/etf.go b/etf/etf.go index 5a2d79c3..c3c67ef5 100644 --- a/etf/etf.go +++ b/etf/etf.go @@ -665,7 +665,7 @@ type RegisterTypeOptions struct { // RegisterType registers new type with the given options. It returns a Name // of the registered type, which can be used in the UnregisterType function -// for unregistering this type. +// for unregistering this type. Supported types: struct, slice, array, map. // Returns an error if this type can not be registered. func RegisterType(t interface{}, options RegisterTypeOptions) (Atom, error) { switch t.(type) { diff --git a/etf/etf_test.go b/etf/etf_test.go index e92836b3..d4bb5b91 100644 --- a/etf/etf_test.go +++ b/etf/etf_test.go @@ -585,6 +585,178 @@ func TestTermIntoStructUnmarshal(t *testing.T) { } } +func TestRegisterSlice(t *testing.T) { + type sliceString []string + type sliceInt []int + type sliceInt8 []int8 + type sliceInt16 []int16 + type sliceInt32 []int32 + type sliceInt64 []int64 + type sliceUint []uint + type sliceUint8 []uint8 + type sliceUint16 []uint16 + type sliceUint32 []uint32 + type sliceUint64 []uint64 + type sliceFloat32 []float32 + type sliceFloat64 []float64 + + type allInOneSlices struct { + A sliceString + B sliceInt + C sliceInt8 + D sliceInt16 + E sliceInt32 + F sliceInt64 + G sliceUint + H sliceUint8 + I sliceUint16 + K sliceUint32 + L sliceUint64 + M sliceFloat32 + O sliceFloat64 + } + types := []interface{}{ + sliceString{}, + sliceInt{}, + sliceInt8{}, + sliceInt16{}, + sliceInt32{}, + sliceInt64{}, + sliceUint{}, + sliceUint8{}, + sliceUint16{}, + sliceUint32{}, + sliceUint64{}, + sliceFloat32{}, + sliceFloat64{}, + allInOneSlices{}, + } + if err := registerTypes(types); err != nil { + t.Fatal(err) + } + + src := allInOneSlices{ + A: sliceString{"Hello", "World"}, + B: sliceInt{-1, 2, -3, 4, -5}, + C: sliceInt8{-1, 2, -3, 4, -5}, + D: sliceInt16{-1, 2, -3, 4, -5}, + E: sliceInt32{-1, 2, -3, 4, -5}, + F: sliceInt64{-1, 2, -3, 4, -5}, + G: sliceUint{1, 2, 3, 4, 5}, + H: sliceUint8{1, 2, 3, 4, 5}, + I: sliceUint16{1, 2, 3, 4, 5}, + K: sliceUint32{1, 2, 3, 4, 5}, + L: sliceUint64{1, 2, 3, 4, 5}, + M: sliceFloat32{1.1, -2.2, 3.3, -4.4, 5.5}, + O: sliceFloat64{1.1, -2.2, 3.3, -4.4, 5.5}, + } + + buf := lib.TakeBuffer() + defer lib.ReleaseBuffer(buf) + + encodeOptions := EncodeOptions{ + FlagBigPidRef: true, + FlagBigCreation: true, + } + if err := Encode(src, buf, encodeOptions); err != nil { + t.Fatal(err) + } + decodeOptions := DecodeOptions{ + FlagBigPidRef: true, + } + dst, _, err := Decode(buf.B, []Atom{}, decodeOptions) + if err != nil { + t.Fatal(err) + } + + if _, ok := dst.(allInOneSlices); !ok { + t.Fatalf("wrong term result: %#v\n", dst) + } + + if !reflect.DeepEqual(src, dst) { + t.Errorf("got:\n%#v\n\nwant:\n%#v\n", dst, src) + } +} + +func TestRegisterMap(t *testing.T) { + type mapIntString map[int]string + type mapStringInt map[string]int + type mapInt8Int map[int8]int + type mapFloat32Int32 map[float32]int32 + type mapFloat64Int32 map[float64]int32 + type mapInt32Float32 map[int32]float32 + type mapInt32Float64 map[int32]float64 + + type allInOne struct { + A mapIntString + B mapStringInt + C mapInt8Int + D mapFloat32Int32 + E mapFloat64Int32 + F mapInt32Float32 + G mapInt32Float64 + } + + types := []interface{}{ + mapIntString{}, + mapStringInt{}, + mapInt8Int{}, + mapFloat32Int32{}, + mapFloat64Int32{}, + mapInt32Float32{}, + mapInt32Float64{}, + allInOne{}, + } + if err := registerTypes(types); err != nil { + t.Fatal(err) + } + + src := allInOne{ + A: make(mapIntString), + B: make(mapStringInt), + C: make(mapInt8Int), + D: make(mapFloat32Int32), + E: make(mapFloat64Int32), + F: make(mapInt32Float32), + G: make(mapInt32Float64), + } + + src.A[1] = "Hello" + src.B["Hello"] = 1 + src.C[1] = 1 + src.D[3.14] = 1 + src.E[3.15] = 1 + src.F[1] = 3.15 + src.G[1] = 3.15 + + buf := lib.TakeBuffer() + defer lib.ReleaseBuffer(buf) + + encodeOptions := EncodeOptions{ + FlagBigPidRef: true, + FlagBigCreation: true, + } + if err := Encode(src, buf, encodeOptions); err != nil { + t.Fatal(err) + } + decodeOptions := DecodeOptions{ + FlagBigPidRef: true, + } + dst, _, err := Decode(buf.B, []Atom{}, decodeOptions) + if err != nil { + t.Fatal(err) + } + + if _, ok := dst.(allInOne); !ok { + t.Fatalf("wrong term result: %#v\n", dst) + } + + if !reflect.DeepEqual(src, dst) { + t.Errorf("got:\n%#v\n\nwant:\n%#v\n", dst, src) + } + +} + func TestRegisterType(t *testing.T) { type ccc []string type ddd [3]bool @@ -685,16 +857,13 @@ func TestRegisterType(t *testing.T) { t.Fatal("shouldn't be registered") } - if _, err := RegisterType(ddd{}, RegisterTypeOptions{Strict: true}); err != nil { - t.Fatal(err) + types := []interface{}{ + ddd{}, + aaa{}, + ccc{}, + bbb{}, } - if _, err := RegisterType(aaa{}, RegisterTypeOptions{Strict: true}); err != nil { - t.Fatal(err) - } - if _, err := RegisterType(ccc{}, RegisterTypeOptions{Strict: true}); err != nil { - t.Fatal(err) - } - if _, err := RegisterType(bbb{}, RegisterTypeOptions{Strict: true}); err != nil { + if err := registerTypes(types); err != nil { t.Fatal(err) } @@ -721,3 +890,14 @@ func TestRegisterType(t *testing.T) { t.Errorf("got:\n%#v\n\nwant:\n%#v\n", dst, src) } } + +func registerTypes(types []interface{}) error { + rtOpts := RegisterTypeOptions{Strict: true} + + for _, t := range types { + if _, err := RegisterType(t, rtOpts); err != nil && err != lib.ErrTaken { + return err + } + } + return nil +} diff --git a/gen/tcp.go b/gen/tcp.go index cb6b61df..6868d3f6 100644 --- a/gen/tcp.go +++ b/gen/tcp.go @@ -204,6 +204,8 @@ func (tcpp *TCPProcess) serve(ctx context.Context, c net.Conn) error { var handlerProcessID int var packet interface{} var disconnect bool + var deadline bool + var timeout bool var disconnectError error var expectingBytes int = 1 @@ -240,7 +242,7 @@ nextPacket: } if b.Len() < expectingBytes { - deadline := false + deadline = false if err := c.SetReadDeadline(time.Now().Add(deadlineTimeout)); err == nil { deadline = true } @@ -251,6 +253,7 @@ nextPacket: packet = messageTCPHandlerTimeout{ connection: tcpConnection, } + timeout = true break } packet = messageTCPHandlerDisconnect{ @@ -294,6 +297,10 @@ retry: if disconnect { return disconnectError } + if timeout { + timeout = false + goto nextPacket + } next, _ := nbytesInt.(messageTCPHandlerPacketResult) if next.left > 0 { if b.Len() > next.left { @@ -311,7 +318,6 @@ retry: expectingBytes++ } - //fmt.Println("TCP NEXT", next, expectingBytes) goto nextPacket case TCPHandlerStatusClose: diff --git a/node/process.go b/node/process.go index 7e4454c5..2729b792 100644 --- a/node/process.go +++ b/node/process.go @@ -594,31 +594,35 @@ func (p *process) Spawn(name string, opts gen.ProcessOptions, behavior gen.Proce // PutSyncRequest func (p *process) PutSyncRequest(ref etf.Ref) error { + var preply map[etf.Ref]chan syncReplyMessage p.RLock() - if p.reply == nil { - p.RUnlock() + preply = p.reply + p.RUnlock() + + if preply == nil { return lib.ErrProcessTerminated } - p.RUnlock() reply := syncReplyChannels.Get().(chan syncReplyMessage) p.replyMutex.Lock() - p.reply[ref] = reply + preply[ref] = reply p.replyMutex.Unlock() return nil } // PutSyncReply func (p *process) PutSyncReply(ref etf.Ref, reply etf.Term, err error) error { + var preply map[etf.Ref]chan syncReplyMessage p.RLock() - if p.reply == nil { - p.RUnlock() + preply = p.reply + p.RUnlock() + + if preply == nil { return lib.ErrProcessTerminated } - p.RUnlock() p.replyMutex.RLock() - rep, ok := p.reply[ref] + rep, ok := preply[ref] defer p.replyMutex.RUnlock() if !ok { @@ -633,29 +637,33 @@ func (p *process) PutSyncReply(ref etf.Ref, reply etf.Term, err error) error { // CancelSyncRequest func (p *process) CancelSyncRequest(ref etf.Ref) { + var preply map[etf.Ref]chan syncReplyMessage p.RLock() - if p.reply == nil { - p.RUnlock() + preply = p.reply + p.RUnlock() + + if preply == nil { return } - p.RUnlock() p.replyMutex.Lock() - delete(p.reply, ref) + delete(preply, ref) p.replyMutex.Unlock() } // WaitSyncReply func (p *process) WaitSyncReply(ref etf.Ref, timeout int) (etf.Term, error) { + var preply map[etf.Ref]chan syncReplyMessage p.RLock() - if p.reply == nil { - p.RUnlock() + preply = p.reply + p.RUnlock() + + if preply == nil { return nil, lib.ErrProcessTerminated } - p.RUnlock() p.replyMutex.RLock() - reply, wait_for_reply := p.reply[ref] + reply, wait_for_reply := preply[ref] p.replyMutex.RUnlock() if wait_for_reply == false { @@ -664,7 +672,7 @@ func (p *process) WaitSyncReply(ref etf.Ref, timeout int) (etf.Term, error) { defer func(ref etf.Ref) { p.replyMutex.Lock() - delete(p.reply, ref) + delete(preply, ref) p.replyMutex.Unlock() }(ref) diff --git a/version.go b/version.go index c2cc7875..a765d1ec 100644 --- a/version.go +++ b/version.go @@ -1,7 +1,7 @@ package ergo const ( - Version = "2.2.2" // Ergo Framework version + Version = "2.2.3" // Ergo Framework version VersionPrefix = "ergo" // Prefix using for the full version name VersionOTP int = 25 // Erlang version support )