From 58e0d3e736beb6718084adf3c4a0ee8f5412ef42 Mon Sep 17 00:00:00 2001 From: zeayes Date: Sat, 6 Dec 2014 17:13:30 +0800 Subject: [PATCH] update client to support connections pool --- ssdb/command.go | 135 +++++++++++++++++++++++++++++++++++++++++++++ ssdb/ssdb.go | 136 ++++++++++++++++++++++------------------------ ssdb/ssdb_test.go | 84 ++++++++++++++++++++++++++++ 3 files changed, 283 insertions(+), 72 deletions(-) create mode 100644 ssdb/command.go create mode 100644 ssdb/ssdb_test.go diff --git a/ssdb/command.go b/ssdb/command.go new file mode 100644 index 0000000..9b8fb97 --- /dev/null +++ b/ssdb/command.go @@ -0,0 +1,135 @@ +package ssdb + +import "strconv" + +func (c *Client) Set(key, val string) (bool, error) { + resp, err := c.Do("set", key, val) + if err != nil { + return false, err + } + if len(resp) == 2 && resp[0] == "ok" { + return true, nil + } + return false, ErrBadResponse +} + +func (c *Client) Setx(key, val string, timeout int) (bool, error) { + resp, err := c.Do("setx", key, val, timeout) + if err != nil { + return false, err + } + if len(resp) == 2 && resp[0] == "ok" { + return true, nil + } + return false, ErrBadResponse +} + +func (c *Client) Setnx(key, val string) (exists bool, err error) { + resp, err := c.Do("setnx", key, val) + if err != nil { + return false, nil + } + if len(resp) == 2 && resp[0] == "ok" { + if resp[1] == "1" { + return true, nil + } + return false, nil + } + return false, ErrBadResponse +} + +func (c *Client) Get(key string) (string, error) { + resp, err := c.Do("get", key) + if err != nil { + return "", err + } + if len(resp) == 2 && resp[0] == "ok" { + return resp[1], nil + } + if resp[0] == "not_found" { + return "", nil + } + return "", ErrBadResponse +} + +func (c *Client) Del(key string) (int, error) { + resp, err := c.Do("del", key) + if err != nil { + return 0, err + } + if len(resp) == 2 && resp[0] == "ok" { + return strconv.Atoi(resp[1]) + } + return 0, ErrBadResponse +} + +func (c *Client) MultiDel(keys []string) (int, error) { + resp, err := c.Do("multi_del", keys) + if err != nil { + return 0, err + } + if len(resp) == 2 && resp[0] == "ok" { + return strconv.Atoi(resp[1]) + } + return 0, ErrBadResponse +} + +func (c *Client) Zset(name, key string, score int) (bool, error) { + resp, err := c.Do("zset", name, key, score) + if err != nil { + return false, err + } + if len(resp) == 2 && resp[0] == "ok" { + return true, nil + } + return false, ErrBadResponse +} + +func (c *Client) Zget(name, key string) (string, error) { + resp, err := c.Do("zget", name, key) + if err != nil { + return "", err + } + if len(resp) == 2 && resp[0] == "ok" { + return resp[1], nil + } + if resp[0] == "not_found" { + return "", ErrNotFound + } + return "", ErrBadResponse +} + +// return del score count +func (c *Client) Zdel(name, key string) (int, error) { + resp, err := c.Do("zdel", name, key) + if err != nil { + return 0, err + } + if len(resp) == 2 && resp[0] == "ok" { + return strconv.Atoi(resp[1]) + } + return 0, ErrBadResponse +} + +func (c *Client) Zkeys(name, keyStart string, scoreStart, scoreEnd, limit int) ([]string, error) { + resp, err := c.Do("zkeys", name, keyStart, scoreStart, scoreEnd, limit) + if err != nil { + return nil, err + } + if resp[0] == "ok" { + return resp[1:], nil + } + return nil, ErrBadResponse +} + +// return cleared keys count +func (c *Client) Zclear(name string) (int, error) { + resp, err := c.Do("zclear", name) + if err != nil { + return 0, err + } + if len(resp) == 2 && resp[0] == "ok" { + return strconv.Atoi(resp[1]) + } + return 0, ErrBadResponse +} diff --git a/ssdb/ssdb.go b/ssdb/ssdb.go index a84e4ba..0d31400 100644 --- a/ssdb/ssdb.go +++ b/ssdb/ssdb.go @@ -1,78 +1,49 @@ package ssdb import ( + "bufio" "bytes" + "errors" "fmt" "net" "strconv" ) -type Client struct { - sock *net.TCPConn - recv_buf bytes.Buffer -} +const ( + defaulPoolSize = 10 +) -func Connect(ip string, port int) (*Client, error) { - addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", ip, port)) - if err != nil { - return nil, err - } - sock, err := net.DialTCP("tcp", nil, addr) - if err != nil { - return nil, err - } - var c Client - c.sock = sock - return &c, nil -} +var ErrBadResponse = errors.New("bad response") +var ErrNotFound = errors.New("not_found") -func (c *Client) Do(args ...interface{}) ([]string, error) { - err := c.send(args) - if err != nil { - return nil, err - } - resp, err := c.recv() - return resp, err +type Client struct { + Addr string + Port int + MaxPoolSize int + pool chan net.Conn } -func (c *Client) Set(key string, val string) (interface{}, error) { - resp, err := c.Do("set", key, val) +func (self *Client) Do(args ...interface{}) (data []string, err error) { + buf, err := self.formatCommand(args) if err != nil { return nil, err } - if len(resp) == 2 && resp[0] == "ok" { - return true, nil - } - return nil, fmt.Errorf("bad response") -} - -// TODO: Will somebody write addition semantic methods? -func (c *Client) Get(key string) (interface{}, error) { - resp, err := c.Do("get", key) + conn, err := self.popConn() if err != nil { - return nil, err - } - if len(resp) == 2 && resp[0] == "ok" { - return resp[1], nil - } - if resp[0] == "not_found" { - return nil, nil + goto End } - return nil, fmt.Errorf("bad response") -} - -func (c *Client) Del(key string) (interface{}, error) { - resp, err := c.Do("del", key) + _, err = conn.Write(buf.Bytes()) if err != nil { return nil, err } - if len(resp) == 1 && resp[0] == "ok" { - return true, nil - } - return nil, fmt.Errorf("bad response") + data, err = recv(bufio.NewReader(conn)) + +End: + self.pushConn(conn) + return data, err } -func (c *Client) send(args []interface{}) error { +func (self *Client) formatCommand(args []interface{}) (bytes.Buffer, error) { var buf bytes.Buffer for _, arg := range args { var s string @@ -83,18 +54,18 @@ func (c *Client) send(args []interface{}) error { s = string(arg) case []string: for _, s := range arg { - buf.WriteString(fmt.Sprintf("%d", len(s))) + buf.WriteString(strconv.Itoa(len(s))) buf.WriteByte('\n') buf.WriteString(s) buf.WriteByte('\n') } continue case int: - s = fmt.Sprintf("%d", arg) + s = strconv.Itoa(arg) case int64: - s = fmt.Sprintf("%d", arg) + s = strconv.FormatInt(arg, 10) case float64: - s = fmt.Sprintf("%f", arg) + s = strconv.FormatFloat(arg, 'f', 6, 64) case bool: if arg { s = "1" @@ -104,36 +75,36 @@ func (c *Client) send(args []interface{}) error { case nil: s = "" default: - return fmt.Errorf("bad arguments") + return buf, ErrBadResponse } - buf.WriteString(fmt.Sprintf("%d", len(s))) + buf.WriteString(strconv.Itoa(len(s))) buf.WriteByte('\n') buf.WriteString(s) buf.WriteByte('\n') } buf.WriteByte('\n') - _, err := c.sock.Write(buf.Bytes()) - return err + return buf, nil } -func (c *Client) recv() ([]string, error) { +func recv(reader *bufio.Reader) ([]string, error) { var tmp [8192]byte + var recv_buf bytes.Buffer for { - n, err := c.sock.Read(tmp[0:]) + n, err := reader.Read(tmp[0:]) if err != nil { return nil, err } - c.recv_buf.Write(tmp[0:n]) - resp := c.parse() + recv_buf.Write(tmp[0:n]) + resp := parse(recv_buf) if resp == nil || len(resp) > 0 { return resp, nil } } } -func (c *Client) parse() []string { +func parse(recv_buf bytes.Buffer) []string { resp := []string{} - buf := c.recv_buf.Bytes() + buf := recv_buf.Bytes() var idx, offset int idx = 0 offset = 0 @@ -145,12 +116,11 @@ func (c *Client) parse() []string { } p := buf[offset : offset+idx] offset += idx + 1 - //fmt.Printf("> [%s]\n", p); if len(p) == 0 || (len(p) == 1 && p[0] == '\r') { if len(resp) == 0 { continue } else { - c.recv_buf.Next(offset) + recv_buf.Next(offset) return resp } } @@ -159,7 +129,7 @@ func (c *Client) parse() []string { if err != nil || size < 0 { return nil } - if offset+size >= c.recv_buf.Len() { + if offset+size >= recv_buf.Len() { break } @@ -171,7 +141,29 @@ func (c *Client) parse() []string { return []string{} } -// Close The Client Connection -func (c *Client) Close() error { - return c.sock.Close() +func (self *Client) popConn() (net.Conn, error) { + if self.pool == nil { + poolSize := self.MaxPoolSize + if poolSize == 0 { + poolSize = defaulPoolSize + } + self.pool = make(chan net.Conn, poolSize) + for i := 0; i < poolSize; i++ { + self.pool <- nil + } + } + + c := <-self.pool + if c == nil { + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", self.Addr, self.Port)) + if err != nil { + return nil, err + } + return conn, err + } + return c, nil +} + +func (self *Client) pushConn(c net.Conn) { + self.pool <- c } diff --git a/ssdb/ssdb_test.go b/ssdb/ssdb_test.go new file mode 100644 index 0000000..9d2e8ef --- /dev/null +++ b/ssdb/ssdb_test.go @@ -0,0 +1,84 @@ +package ssdb + +import "testing" + +var ( + addr = "127.0.0.1" + port = 8888 +) + +func TestCommand(t *testing.T) { + client := Client{Addr: addr, Port: port} + ok, err := client.Set("TestString", "abc") + if err != nil || ok == false { + t.Fatal("set command error") + } + value, err := client.Get("TestString") + if err != nil { + t.Fatal("get command error") + } + if value != "abc" { + t.Fatal("get result not match") + } + num, err := client.Del("TestString") + if err != nil { + t.Fatal("del command error") + } + if num != 1 { + t.Fatal("del result error") + } + + client.Set("TestString1", "abc") + client.Set("TestString2", "abc") + num, err = client.MultiDel([]string{"TestString1", "TestString2"}) + if err != nil { + t.Fatal("multi_del command error") + } + if num != 2 { + t.Fatal("multi_del result error") + } + + ok, err = client.Zset("TestZset", "abc", 10) + if err != nil || ok != true { + t.Fatal("zset command error") + } + value, err = client.Zget("TestZset", "abc") + if err != nil { + t.Fatal("zget command error") + } + if value != "10" { + t.Fatal("zget value not match") + } + // key not exist + value, err = client.Zget("TestZset0", "abc") + if err == nil || err != ErrNotFound { + t.Fatal("zget when key not exist error") + } + + _, err = client.Zdel("TestZset", "abc") + if err != nil { + t.Fatal("zdel command error") + } + // key had been deleted + num, err = client.Zdel("TestZset", "abc") + if err != nil || num != 0 { + t.Fatal("zdel when key has been deleted command error") + } + + client.Zset("TestZset", "abc", 10) + client.Zset("TestZset", "abcd", 20) + keys, err := client.Zkeys("TestZset", "0", 0, 100, -1) + if err != nil { + t.Fatal("zkeys command error") + } + if keys[0] != "abc" || keys[1] != "abcd" { + t.Fatal("zkeys value not match") + } + num, err = client.Zclear("TestZset") + if err != nil { + t.Fatal("zclear command error") + } + if num != 2 { + t.Fatal("zclear result error") + } +}