Skip to content

Commit

Permalink
1.0.3
Browse files Browse the repository at this point in the history
  • Loading branch information
Terry-Mao committed Apr 30, 2014
1 parent a1d416f commit 8ad270f
Show file tree
Hide file tree
Showing 24 changed files with 234 additions and 239 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ Bugfixes:
## Version 1.0.3 (2014-04-30)

Changes:
- Refactor message module, add more log

- Refactor message module, add more log.
- Refactor web module, add more log, http api add version in url.
- Refactor comet module, add more log.

New Features:


Bugfixes:

- Fixed redis clean expired message function connection leak bug.


Expand Down
10 changes: 5 additions & 5 deletions comet/comet-example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pidfile /tmp/gopush-cluster-comet.pid
# Note this directive is only support "websocket" protocol
# websocket.bind 192.168.1.100:6968,10.0.0.1:6968
# websocket.bind 127.0.0.1:6968
# websocket.bind :6968
# websocket.bind 0.0.0.0:6968
websocket.bind localhost:6968

# By default comet listens for connections from all the network interfaces
Expand All @@ -58,7 +58,7 @@ websocket.bind localhost:6968
# Note this directive is only support "tcp" protocol
# tcp.bind 192.168.1.100:6969,10.0.0.1:6969
# tcp.bind 127.0.0.1:6969
# tcp.bind :6969
# tcp.bind 0.0.0.0:6969
tcp.bind localhost:6969

# This is used by rpc listen for internal protocol.
Expand All @@ -69,7 +69,7 @@ tcp.bind localhost:6969
#
# rpc.bind 192.168.1.100:6970,10.0.0.1:6970
# rpc.bind 127.0.0.1:6970
# rpc.bind :6970
# rpc.bind 0.0.0.0:6970
rpc.bind localhost:6970

# This is used by comet service profiling (pprof).
Expand All @@ -80,7 +80,7 @@ rpc.bind localhost:6970
#
# pprof.bind 192.168.1.100:6971,10.0.0.1:6971
# pprof.bind 127.0.0.1:6971
# pprof.bind :6971
# pprof.bind 0.0.0.0:6971
pprof.bind localhost:6971

# This is used by comet service get stat info by http.
Expand All @@ -91,7 +91,7 @@ pprof.bind localhost:6971
#
# stat.bind 192.168.1.100:6971,10.0.0.1:6971
# stat.bind 127.0.0.1:6971
# stat.bind :6971
# stat.bind 0.0.0.0:6971
stat.bind localhost:6972

# The working directory.
Expand Down
20 changes: 10 additions & 10 deletions comet/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (

var (
Conf *Config
ConfFile string
confFile string
)

func init() {
flag.StringVar(&ConfFile, "c", "./comet.conf", " set gopush-cluster comet config file path")
flag.StringVar(&confFile, "c", "./comet.conf", " set gopush-cluster comet config file path")
}

type Config struct {
Expand Down Expand Up @@ -68,8 +68,8 @@ type Config struct {
}

// InitConfig get a new Config struct.
func InitConfig(file string) (*Config, error) {
cf := &Config{
func InitConfig() error {
Conf = &Config{
// base
User: "nobody nobody",
PidFile: "/tmp/gopush-cluster-comet.pid",
Expand Down Expand Up @@ -103,13 +103,13 @@ func InitConfig(file string) (*Config, error) {
MsgBufNum: 30,
}
c := goconf.New()
if err := c.Parse(file); err != nil {
glog.Errorf("goconf.Parse(\"%s\") error(%v)", file, err)
return nil, err
if err := c.Parse(confFile); err != nil {
glog.Errorf("goconf.Parse(\"%s\") error(%v)", confFile, err)
return err
}
if err := c.Unmarshal(cf); err != nil {
if err := c.Unmarshal(Conf); err != nil {
glog.Errorf("goconf.Unmarshall() error(%v)", err)
return nil, err
return err
}
return cf, nil
return nil
}
18 changes: 8 additions & 10 deletions comet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,25 @@ import (
)

func main() {
var err error
// parse cmd-line arguments
flag.Parse()
glog.Infof("comet ver: \"%s\" start", ver.Version)
defer glog.Flush()
signalCH := InitSignal()
// init config
Conf, err = InitConfig(ConfFile)
if err != nil {
glog.Errorf("NewConfig(\"%s\") error(%v)", ConfFile, err)
if err := InitConfig(); err != nil {
glog.Errorf("InitConfig() error(%v)", err)
return
}
// set max routine
runtime.GOMAXPROCS(Conf.MaxProc)
// start pprof
perf.Init(Conf.PprofBind)
// create channel
UserChannel = NewChannelList()
// if process exit, close channel
UserChannel = NewChannelList()
defer UserChannel.Close()
// start stats
StartStats()
// start pprof
perf.Init(Conf.PprofBind)
// start rpc
StartRPC()
// start comet
Expand All @@ -64,11 +62,11 @@ func main() {
// sleep one second, let the listen start
time.Sleep(time.Second)
if err = process.Init(Conf.User, Conf.Dir, Conf.PidFile); err != nil {
glog.Errorf("process.Init() error(%v)", err)
glog.Errorf("process.Init(\"%s\", \"%s\", \"%s\") error(%v)", Conf.User, Conf.Dir, Conf.PidFile, err)
return
}
glog.Infof("comet(%s) start", ver.Version)
// init signals, block wait signals
signalCH := InitSignal()
HandleSignal(signalCH)
// exit
glog.Info("comet stop")
Expand Down
36 changes: 5 additions & 31 deletions comet/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func StartRPC() {
c := &CometRPC{}
rpc.Register(c)
for _, bind := range Conf.RPCBind {
glog.Infof("start listen rpc addr:\"%s\"", bind)
glog.Infof("start listen rpc addr: \"%s\"", bind)
go rpcListen(bind)
}
}
Expand Down Expand Up @@ -108,40 +108,14 @@ func (c *CometRPC) PushPrivate(args *myrpc.CometPushPrivateArgs, ret *int) error
return err
}
// use the channel push message
msg := &myrpc.Message{Msg: args.Msg, GroupId: args.GroupId}
if err = ch.PushMsg(args.Key, msg, args.Expire); err != nil {
glog.Error("ch.PushMsg(\"%s\", \"%v\") error(%v)", args.Key, msg, err)
m := &myrpc.Message{Msg: args.Msg}
if err = ch.PushMsg(args.Key, m, args.Expire); err != nil {
glog.Errorf("ch.PushMsg(\"%s\", \"%v\") error(%v)", args.Key, m, err)
return err
}
return nil
}

// PushPublic expored a method for publishing a public message for the channel
func (c *CometRPC) PushPublic(args *myrpc.CometPushPublicArgs, ret *int) error {
/*
// get all the channel lock
m := &Message{Msg: args.Msg, MsgID: args.MsgID, GroupID: myrpc.PublicGroupID}
for _, c := range UserChannel.Channels {
c.Lock()
cm := make(map[string]Channel, len(c.Data))
for k, v := range c.Data {
cm[k] = v
}
c.Unlock()
// multiple routine push message
go func() {
for k, v := range cm {
if err := v.PushMsg(k, m); err != nil {
continue
}
}
}()
}
*ret = myrpc.OK
*/
return nil
}

// Publish expored a method for publishing a message for the channel
func (c *CometRPC) Migrate(args *myrpc.CometMigrateArgs, ret *int) error {
if len(args.Nodes) == 0 {
Expand Down Expand Up @@ -193,6 +167,6 @@ func (c *CometRPC) Migrate(args *myrpc.CometMigrateArgs, ret *int) error {
}

func (c *CometRPC) Ping(args int, ret *int) error {
glog.V(2).Info("CometRPC.Ping() ok")
glog.V(2).Info("ping ok")
return nil
}
10 changes: 5 additions & 5 deletions comet/seq_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ func (c *SeqChannel) PushMsg(key string, m *myrpc.Message, expire uint) error {
c.mutex.Lock()
// private message need persistence
// if message expired no need persistence, only send online message
// rewrite message id
m.MsgId = c.timeID.ID()
// rewrite message id
m.MsgId = c.timeID.ID()
if m.GroupId != myrpc.PublicGroupId && expire > 0 {
args := &myrpc.MessageSaveArgs{Key: key, Msg: m.Msg, MsgId: m.MsgId, GroupId: m.GroupId, Expire: expire}
args := &myrpc.MessageSavePrivateArgs{Key: key, Msg: m.Msg, MsgId: m.MsgId, Expire: expire}
ret := 0
if err = client.Call(myrpc.MessageServiceSave, args, &ret); err != nil {
if err = client.Call(myrpc.MessageServiceSavePrivate, args, &ret); err != nil {
c.mutex.Unlock()
glog.Errorf("%s(\"%s\", \"%v\", &ret) error(%v)", myrpc.MessageServiceSave, key, args, err)
glog.Errorf("%s(\"%s\", \"%v\", &ret) error(%v)", myrpc.MessageServiceSavePrivate, key, args, err)
return err
}
}
Expand Down
4 changes: 3 additions & 1 deletion comet/zk.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func InitZK() (*zk.Conn, error) {
for _, addr := range Conf.RPCBind {
data += fmt.Sprintf("rpc://%s,", addr)
}
if err = myzk.RegisterTemp(conn, fpath, strings.TrimRight(data, ",")); err != nil {
data = strings.TrimRight(data, ",")
glog.V(1).Infof("zk data: \"%s\"", data)
if err = myzk.RegisterTemp(conn, fpath, data); err != nil {
glog.Errorf("zk.RegisterTemp() error(%v)", err)
return conn, err
}
Expand Down
9 changes: 5 additions & 4 deletions message/message-example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
# default tcp listen localhost:8070
# Examples:
#
# addr 192.168.1.100:8070,10.0.0.1:8070
# addr 127.0.0.1:8070
# rpc.bind 192.168.1.100:8070,10.0.0.1:8070
# rpc.bind 127.0.0.1:8070
# rpc.bind 0.0.0.0:8070
rpc.bind localhost:8070

# If the master process is run as root, then message will setuid()/setgid()
Expand Down Expand Up @@ -56,7 +57,7 @@ dir ./
#
# pprof.bind 192.168.1.100:8170,10.0.0.1:8170
# pprof.bind 127.0.0.1:8170
# pprof.bind :8170
# pprof.bind 0.0.0.0:8170
pprof.bind localhost:8170

[storage]
Expand All @@ -81,7 +82,7 @@ active 300
store 20

[redis.source]
# The format like "NodeName=IP:Port", NodeName was specified by Comet service.
# The format like "NodeName IP:Port", NodeName was specified by Comet service.
# If there are multiple nodes, then configure following
# node1 IP1:Port1
# node2 IP2:Port2
Expand Down
46 changes: 23 additions & 23 deletions message/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ import (
"encoding/json"
"errors"
"github.com/Terry-Mao/gopush-cluster/hash"
"github.com/Terry-Mao/gopush-cluster/rpc"
myrpc "github.com/Terry-Mao/gopush-cluster/rpc"
_ "github.com/go-sql-driver/mysql"
"github.com/golang/glog"
"time"
)

const (
saveSQL = "INSERT INTO message(sub,gid,mid,expire,msg,ctime,mtime) VALUES(?,?,?,?,?,?,?)"
getSQL = "SELECT mid, gid, expire, msg FROM message WHERE sub=? AND mid>?"
delExpireSQL = "DELETE FROM message WHERE expire<=?"
delKeySQL = "DELETE FROM message WHERE sub=?"
savePrivateMsgSQL = "INSERT INTO private_msg(skey,mid,ttl,msg,ctime,mtime) VALUES(?,?,?,?,?,?)"
// TODO limit
getPrivateMsgSQL = "SELECT mid, ttl, msg FROM private_msg WHERE skey=? AND mid>? ORDER BY mid"
delExpiredPrivateMsgSQL = "DELETE FROM private_msg WHERE ttl<=?"
delPrivateMsgSQL = "DELETE FROM private_msg WHERE skey=?"
)

var (
Expand Down Expand Up @@ -60,61 +61,60 @@ func NewMySQLStorage() *MySQLStorage {
return s
}

// Save implements the Storage Save method.
func (s *MySQLStorage) Save(key string, msg json.RawMessage, mid int64, gid uint, expire uint) error {
// SavePrivate implements the Storage SavePrivate method.
func (s *MySQLStorage) SavePrivate(key string, msg json.RawMessage, mid int64, expire uint) error {
db := s.getConn(key)
if db == nil {
return ErrNoMySQLConn
}
now := time.Now()
_, err := db.Exec(saveSQL, key, gid, mid, now.Unix()+int64(expire), []byte(msg), now, now)
_, err := db.Exec(savePrivateMsgSQL, key, mid, now.Unix()+int64(expire), []byte(msg), now, now)
if err != nil {
glog.Errorf("db.Exec(%s,%s,%d,%d,%d,%s,now,now) failed (%v)", saveSQL, key, 0, mid, expire, string(msg), err)
glog.Errorf("db.Exec(\"%s\",\"%s\",%d,%d,%d,\"%s\",now,now) failed (%v)", savePrivateMsgSQL, key, mid, expire, string(msg), err)
return err
}
return nil
}

// Get implements the Storage Get method.
func (s *MySQLStorage) Get(key string, mid int64) ([]*rpc.Message, error) {
// GetPrivate implements the Storage GetPrivate method.
func (s *MySQLStorage) GetPrivate(key string, mid int64) ([]*myrpc.Message, error) {
db := s.getConn(key)
if db == nil {
return nil, ErrNoMySQLConn
}
now := time.Now().Unix()
rows, err := db.Query(getSQL, key, mid)
rows, err := db.Query(getPrivateMsgSQL, key, mid)
if err != nil {
glog.Errorf("db.Query(%s,%s,%d,now) failed (%v)", getSQL, key, mid, err)
glog.Errorf("db.Query(\"%s\",\"%s\",%d,now) failed (%v)", getPrivateMsgSQL, key, mid, err)
return nil, err
}
expire := int64(0)
cmid := int64(0)
cgid := uint(0)
msg := json.RawMessage([]byte{})
msgs := []*rpc.Message{}
msgs := []*myrpc.Message{}
for rows.Next() {
if err := rows.Scan(&cmid, &cgid, &expire, &msg); err != nil {
if err := rows.Scan(&cmid, &expire, &msg); err != nil {
glog.Errorf("rows.Scan() failed (%v)", err)
return nil, err
}
if now > expire {
glog.Warningf("user_key: \"%s\" mid: %d expired", key, cmid)
continue
}
msgs = append(msgs, &rpc.Message{MsgId: cmid, GroupId: cgid, Msg: msg})
msgs = append(msgs, &myrpc.Message{MsgId: cmid, GroupId: myrpc.PrivateGroupId, Msg: msg})
}
return msgs, nil
}

// Del implements the Storage DelKey method.
func (s *MySQLStorage) Del(key string) error {
// DelPrivate implements the Storage DelPrivate method.
func (s *MySQLStorage) DelPrivate(key string) error {
db := s.getConn(key)
if db == nil {
return ErrNoMySQLConn
}
res, err := db.Exec(delKeySQL, key)
res, err := db.Exec(delPrivateMsgSQL, key)
if err != nil {
glog.Errorf("db.Exec(\"%s\", \"%s\") error(%v)", delKeySQL, key, err)
glog.Errorf("db.Exec(\"%s\", \"%s\") error(%v)", delPrivateMsgSQL, key, err)
return err
}
rows, err := res.RowsAffected()
Expand All @@ -133,9 +133,9 @@ func (s *MySQLStorage) clean() {
now := time.Now().Unix()
affect := int64(0)
for _, db := range s.pool {
res, err := db.Exec(delExpireSQL, now)
res, err := db.Exec(delExpiredPrivateMsgSQL, now)
if err != nil {
glog.Errorf("db.Exec(\"%s\", now) failed (%v)", delExpireSQL, err)
glog.Errorf("db.Exec(\"%s\", %d) failed (%v)", delExpiredPrivateMsgSQL, now, err)
continue
}
aff, err := res.RowsAffected()
Expand Down
Loading

0 comments on commit 8ad270f

Please sign in to comment.