From 6e80354dfae9d03102f7e1d1ab16904e7ed02c6d Mon Sep 17 00:00:00 2001 From: Simon Schmidt Date: Mon, 2 Oct 2017 07:49:27 +0200 Subject: [PATCH] Semirpc --- LICENSE | 2 +- semirpc/client.go | 198 ++++++++++++++++++++++++++++++++++++++++ semirpc/codec/codecs.go | 100 ++++++++++++++++++++ semirpc/common.go | 41 +++++++++ semirpc/server.go | 133 +++++++++++++++++++++++++++ 5 files changed, 473 insertions(+), 1 deletion(-) create mode 100644 semirpc/client.go create mode 100644 semirpc/codec/codecs.go create mode 100644 semirpc/common.go create mode 100644 semirpc/server.go diff --git a/LICENSE b/LICENSE index 4b72370..47d06d7 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2017 byte-mug +Copyright (c) 2017 Simon Schmidt Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/semirpc/client.go b/semirpc/client.go new file mode 100644 index 0000000..28e91e1 --- /dev/null +++ b/semirpc/client.go @@ -0,0 +1,198 @@ +/* +Copyright (c) 2017 Simon Schmidt + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + + +package semirpc + +import "sync" +import "sync/atomic" +import "time" +import "fmt" + +type ClientCodec interface { + Close() error + + // Threadsafe + Send(id uint64,r Request) error + + // Not threadsafe + RecvId() (uint64,error) + Recv(r Response) error +} + + +var pool_clientObject sync.Pool +type clientObject struct { + mutex sync.Mutex + resp Response + signal chan error + refc *int32 +} +func (c *clientObject) free() { + if atomic.AddInt32(c.refc,-1) > 0 { return } + select { + case <- c.signal: + default: + } + c.resp = nil + pool_clientObject.Put(c) +} +func (c *clientObject) cancel() { + c.mutex.Lock() + c.resp = nil + c.mutex.Unlock() +} + +type Client struct { + client ClientCodec + respmk func() Response + + mutex sync.Mutex + hmap map[uint64]*clientObject + nextid uint64 +} + +func NewClient(client ClientCodec,respmk func() Response) *Client { + c := &Client{ + client:client, + respmk:respmk, + hmap :make(map[uint64]*clientObject), + } + go c.input() + return c +} + +func (c *Client) input() error { + defer func (){ + c.mutex.Lock() + for id,co := range c.hmap { + co.cancel() + select { + case co.signal <- EClosed: + default: + } + delete(c.hmap,id) + } + c.mutex.Unlock() + }() + for{ + id,err := c.client.RecvId() + if err!=nil { return err } + + c.mutex.Lock() + co := c.hmap[id] + delete(c.hmap,id) + c.mutex.Unlock() + + var resp Response + + if co!=nil { + co.mutex.Lock() + resp = co.resp + } + + if resp!=nil { resp = c.respmk() } + + err = c.client.Recv(resp) + + if co!=nil { co.mutex.Unlock(); co.signal <- err; co.free() } + if err!=nil { return err } + } +} + +func (c *Client) doit(req Request,resp Response) (*clientObject,error) { + var co *clientObject + elem := pool_clientObject.Get() + if elem!=nil { + co = elem.(*clientObject) + co.resp = resp + } else { + co = &clientObject{ + resp: resp, + signal: make(chan error,1), + refc: new(int32), + } + } + atomic.AddInt32(co.refc,1) + + c.mutex.Lock() + _,ok := c.hmap[c.nextid] + c.nextid++ + for i:=0 ; ok && i<3 ; i++ { + _,ok = c.hmap[c.nextid] + c.nextid++ + } + if ok { + c.mutex.Unlock() + return co,EPipelineStall + } + id := c.nextid + c.hmap[id] = co + c.mutex.Unlock() + + atomic.AddInt32(co.refc,1) + + err := c.client.Send(id,req) + if err!=nil { + c.client.Close() + return nil,err // discard the *clientObject to the GC. + } + + return co,nil +} +func (c *Client) Do(req Request,resp Response) error { + co,e := c.doit(req,resp) + if co!=nil { defer co.free() } + if e!=nil { return e } + e = <- co.signal + co.signal <- e + return e +} +func (c *Client) DoTimeout(req Request,resp Response, timeout <- chan time.Time) error { + co,e := c.doit(req,resp) + if co!=nil { defer co.free() } + if e!=nil { return e } + select { + case e := <- co.signal: + co.signal <- e + return e + case tm := <- timeout: + co.cancel() + return fmt.Errorf("Timeout -> %v",tm) + } + panic("unreachable") +} +func (c *Client) DoAsyncSupport(req Request,resp Response, errch chan error) { + co,e := c.doit(req,resp) + if co!=nil { defer co.free() } + if e!=nil { return } + select { + case e := <- co.signal: + co.signal <- e + errch <- e + case e := <- errch: + co.cancel() + errch <- e + } +} + + diff --git a/semirpc/codec/codecs.go b/semirpc/codec/codecs.go new file mode 100644 index 0000000..d75f7ac --- /dev/null +++ b/semirpc/codec/codecs.go @@ -0,0 +1,100 @@ +/* +Copyright (c) 2017 Simon Schmidt + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + + +package codec + +import "github.com/byte-mug/gocom/semirpc" +import "io" +import "bufio" +import "sync" +import "encoding/binary" + +type ServerCodec struct{ + closer io.Closer + reader *bufio.Reader + writer *bufio.Writer + buffer []byte + mutex sync.Mutex +} +func (c *ServerCodec) Close() error { return c.closer.Close() } +func (c *ServerCodec) Send(id uint64,r semirpc.Response) error { + c.mutex.Lock(); defer c.mutex.Unlock() + l := binary.PutUvarint(c.buffer, id) + _,e := c.writer.Write(c.buffer[:l]) + if e!=nil { return e } + e = r.WriteResp(c.writer) + if e!=nil { return e } + e = c.writer.Flush() + return e +} +func (c *ServerCodec) RecvId() (uint64,error) { + return binary.ReadUvarint(c.reader) +} +func (c *ServerCodec) Recv(r semirpc.Request) error { + return r.ReadReq(c.reader) +} + +func NewServerCodec(c io.ReadWriteCloser) *ServerCodec { + return &ServerCodec{ + closer: c, + reader: bufio.NewReader(c), + writer: bufio.NewWriter(c), + buffer: make([]byte,16), + } +} + + +type ClientCodec struct{ + closer io.Closer + reader *bufio.Reader + writer *bufio.Writer + buffer []byte + mutex sync.Mutex +} +func (c *ClientCodec) Close() error { return c.closer.Close() } +func (c *ClientCodec) Send(id uint64,r semirpc.Request) error { + c.mutex.Lock(); defer c.mutex.Unlock() + l := binary.PutUvarint(c.buffer, id) + _,e := c.writer.Write(c.buffer[:l]) + if e!=nil { return e } + e = r.WriteReq(c.writer) + if e!=nil { return e } + e = c.writer.Flush() + return e +} +func (c *ClientCodec) RecvId() (uint64,error) { + return binary.ReadUvarint(c.reader) +} +func (c *ClientCodec) Recv(r semirpc.Response) error { + return r.ReadResp(c.reader) +} + +func NewClientCodec(c io.ReadWriteCloser) *ClientCodec { + return &ClientCodec{ + closer: c, + reader: bufio.NewReader(c), + writer: bufio.NewWriter(c), + buffer: make([]byte,16), + } +} + diff --git a/semirpc/common.go b/semirpc/common.go new file mode 100644 index 0000000..bd23707 --- /dev/null +++ b/semirpc/common.go @@ -0,0 +1,41 @@ +/* +Copyright (c) 2017 Simon Schmidt + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + + +package semirpc + +import "bufio" +import "errors" + +var EPipelineStall = errors.New("Pipeline-Stall") +var EClosed = errors.New("Connection Closed") + +type Request interface{ + ReadReq(r *bufio.Reader) error + WriteReq(w *bufio.Writer) error +} + +type Response interface{ + ReadResp(r *bufio.Reader) error + WriteResp(w *bufio.Writer) error +} + diff --git a/semirpc/server.go b/semirpc/server.go new file mode 100644 index 0000000..fd80676 --- /dev/null +++ b/semirpc/server.go @@ -0,0 +1,133 @@ +/* +Copyright (c) 2017 Simon Schmidt + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + + +package semirpc + +import "sync" + +type ServerError int +const ( + SE_None ServerError = iota + SE_QueueOverflow +) +func (s ServerError) Error() string { + switch s { + case SE_None: return "No Error!" + case SE_QueueOverflow: return "Queue Overflow" + } + return "???" +} + +type ServerHandler func(req Request,resp Response) +type ErrorHandler func(e ServerError,resp Response) + +type ServerCodec interface { + Close() error + + // Threadsafe + Send(id uint64,r Response) error + + // Not threadsafe + RecvId() (uint64,error) + Recv(r Request) error +} + +type Server struct { + Handle ServerHandler + Error ErrorHandler + Allocator func() (req Request,resp Response) + Concurrency int + QueueMulti int + + initialize sync.Once + queue chan *serverWorkItem + pool sync.Pool +} +func (s *Server) initi() { + conc := s.Concurrency + qm := s.QueueMulti + if conc<1 { conc=16 } + if qm<1 { qm =16 } + s.queue = make(chan *serverWorkItem,conc*qm) + for i:=0 ; i