Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update client to support connections pool #3

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions ssdb/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package ssdb

import "strconv"

func (c *Client) Set(key, val string) (bool, error) {
resp, err := c.Do("set", key, val)
if err != nil {
return false, err
}
if len(resp) == 2 && resp[0] == "ok" {
return true, nil
}
return false, ErrBadResponse
}

func (c *Client) Setx(key, val string, timeout int) (bool, error) {
resp, err := c.Do("setx", key, val, timeout)
if err != nil {
return false, err
}
if len(resp) == 2 && resp[0] == "ok" {
return true, nil
}
return false, ErrBadResponse
}

func (c *Client) Setnx(key, val string) (exists bool, err error) {
resp, err := c.Do("setnx", key, val)
if err != nil {
return false, nil
}
if len(resp) == 2 && resp[0] == "ok" {
if resp[1] == "1" {
return true, nil
}
return false, nil
}
return false, ErrBadResponse
}

func (c *Client) Get(key string) (string, error) {
resp, err := c.Do("get", key)
if err != nil {
return "", err
}
if len(resp) == 2 && resp[0] == "ok" {
return resp[1], nil
}
if resp[0] == "not_found" {
return "", nil
}
return "", ErrBadResponse
}

func (c *Client) Del(key string) (int, error) {
resp, err := c.Do("del", key)
if err != nil {
return 0, err
}
if len(resp) == 2 && resp[0] == "ok" {
return strconv.Atoi(resp[1])
}
return 0, ErrBadResponse
}

func (c *Client) MultiDel(keys []string) (int, error) {
resp, err := c.Do("multi_del", keys)
if err != nil {
return 0, err
}
if len(resp) == 2 && resp[0] == "ok" {
return strconv.Atoi(resp[1])
}
return 0, ErrBadResponse
}

func (c *Client) Zset(name, key string, score int) (bool, error) {
resp, err := c.Do("zset", name, key, score)
if err != nil {
return false, err
}
if len(resp) == 2 && resp[0] == "ok" {
return true, nil
}
return false, ErrBadResponse
}

func (c *Client) Zget(name, key string) (string, error) {
resp, err := c.Do("zget", name, key)
if err != nil {
return "", err
}
if len(resp) == 2 && resp[0] == "ok" {
return resp[1], nil
}
if resp[0] == "not_found" {
return "", ErrNotFound
}
return "", ErrBadResponse
}

// return del score count
func (c *Client) Zdel(name, key string) (int, error) {
resp, err := c.Do("zdel", name, key)
if err != nil {
return 0, err
}
if len(resp) == 2 && resp[0] == "ok" {
return strconv.Atoi(resp[1])
}
return 0, ErrBadResponse
}

func (c *Client) Zkeys(name, keyStart string, scoreStart, scoreEnd, limit int) ([]string, error) {
resp, err := c.Do("zkeys", name, keyStart, scoreStart, scoreEnd, limit)
if err != nil {
return nil, err
}
if resp[0] == "ok" {
return resp[1:], nil
}
return nil, ErrBadResponse
}

// return cleared keys count
func (c *Client) Zclear(name string) (int, error) {
resp, err := c.Do("zclear", name)
if err != nil {
return 0, err
}
if len(resp) == 2 && resp[0] == "ok" {
return strconv.Atoi(resp[1])
}
return 0, ErrBadResponse
}
136 changes: 64 additions & 72 deletions ssdb/ssdb.go
Original file line number Diff line number Diff line change
@@ -1,78 +1,49 @@
package ssdb

import (
"bufio"
"bytes"
"errors"
"fmt"
"net"
"strconv"
)

type Client struct {
sock *net.TCPConn
recv_buf bytes.Buffer
}
const (
defaulPoolSize = 10
)

func Connect(ip string, port int) (*Client, error) {
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", ip, port))
if err != nil {
return nil, err
}
sock, err := net.DialTCP("tcp", nil, addr)
if err != nil {
return nil, err
}
var c Client
c.sock = sock
return &c, nil
}
var ErrBadResponse = errors.New("bad response")
var ErrNotFound = errors.New("not_found")

func (c *Client) Do(args ...interface{}) ([]string, error) {
err := c.send(args)
if err != nil {
return nil, err
}
resp, err := c.recv()
return resp, err
type Client struct {
Addr string
Port int
MaxPoolSize int
pool chan net.Conn
}

func (c *Client) Set(key string, val string) (interface{}, error) {
resp, err := c.Do("set", key, val)
func (self *Client) Do(args ...interface{}) (data []string, err error) {
buf, err := self.formatCommand(args)
if err != nil {
return nil, err
}
if len(resp) == 2 && resp[0] == "ok" {
return true, nil
}
return nil, fmt.Errorf("bad response")
}

// TODO: Will somebody write addition semantic methods?
func (c *Client) Get(key string) (interface{}, error) {
resp, err := c.Do("get", key)
conn, err := self.popConn()
if err != nil {
return nil, err
}
if len(resp) == 2 && resp[0] == "ok" {
return resp[1], nil
}
if resp[0] == "not_found" {
return nil, nil
goto End
}
return nil, fmt.Errorf("bad response")
}

func (c *Client) Del(key string) (interface{}, error) {
resp, err := c.Do("del", key)
_, err = conn.Write(buf.Bytes())
if err != nil {
return nil, err
}
if len(resp) == 1 && resp[0] == "ok" {
return true, nil
}
return nil, fmt.Errorf("bad response")
data, err = recv(bufio.NewReader(conn))

End:
self.pushConn(conn)
return data, err
}

func (c *Client) send(args []interface{}) error {
func (self *Client) formatCommand(args []interface{}) (bytes.Buffer, error) {
var buf bytes.Buffer
for _, arg := range args {
var s string
Expand All @@ -83,18 +54,18 @@ func (c *Client) send(args []interface{}) error {
s = string(arg)
case []string:
for _, s := range arg {
buf.WriteString(fmt.Sprintf("%d", len(s)))
buf.WriteString(strconv.Itoa(len(s)))
buf.WriteByte('\n')
buf.WriteString(s)
buf.WriteByte('\n')
}
continue
case int:
s = fmt.Sprintf("%d", arg)
s = strconv.Itoa(arg)
case int64:
s = fmt.Sprintf("%d", arg)
s = strconv.FormatInt(arg, 10)
case float64:
s = fmt.Sprintf("%f", arg)
s = strconv.FormatFloat(arg, 'f', 6, 64)
case bool:
if arg {
s = "1"
Expand All @@ -104,36 +75,36 @@ func (c *Client) send(args []interface{}) error {
case nil:
s = ""
default:
return fmt.Errorf("bad arguments")
return buf, ErrBadResponse
}
buf.WriteString(fmt.Sprintf("%d", len(s)))
buf.WriteString(strconv.Itoa(len(s)))
buf.WriteByte('\n')
buf.WriteString(s)
buf.WriteByte('\n')
}
buf.WriteByte('\n')
_, err := c.sock.Write(buf.Bytes())
return err
return buf, nil
}

func (c *Client) recv() ([]string, error) {
func recv(reader *bufio.Reader) ([]string, error) {
var tmp [8192]byte
var recv_buf bytes.Buffer
for {
n, err := c.sock.Read(tmp[0:])
n, err := reader.Read(tmp[0:])
if err != nil {
return nil, err
}
c.recv_buf.Write(tmp[0:n])
resp := c.parse()
recv_buf.Write(tmp[0:n])
resp := parse(recv_buf)
if resp == nil || len(resp) > 0 {
return resp, nil
}
}
}

func (c *Client) parse() []string {
func parse(recv_buf bytes.Buffer) []string {
resp := []string{}
buf := c.recv_buf.Bytes()
buf := recv_buf.Bytes()
var idx, offset int
idx = 0
offset = 0
Expand All @@ -145,12 +116,11 @@ func (c *Client) parse() []string {
}
p := buf[offset : offset+idx]
offset += idx + 1
//fmt.Printf("> [%s]\n", p);
if len(p) == 0 || (len(p) == 1 && p[0] == '\r') {
if len(resp) == 0 {
continue
} else {
c.recv_buf.Next(offset)
recv_buf.Next(offset)
return resp
}
}
Expand All @@ -159,7 +129,7 @@ func (c *Client) parse() []string {
if err != nil || size < 0 {
return nil
}
if offset+size >= c.recv_buf.Len() {
if offset+size >= recv_buf.Len() {
break
}

Expand All @@ -171,7 +141,29 @@ func (c *Client) parse() []string {
return []string{}
}

// Close The Client Connection
func (c *Client) Close() error {
return c.sock.Close()
func (self *Client) popConn() (net.Conn, error) {
if self.pool == nil {
poolSize := self.MaxPoolSize
if poolSize == 0 {
poolSize = defaulPoolSize
}
self.pool = make(chan net.Conn, poolSize)
for i := 0; i < poolSize; i++ {
self.pool <- nil
}
}

c := <-self.pool
if c == nil {
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", self.Addr, self.Port))
if err != nil {
return nil, err
}
return conn, err
}
return c, nil
}

func (self *Client) pushConn(c net.Conn) {
self.pool <- c
}
Loading