diff --git a/CHANGELOG.md b/CHANGELOG.md index db43aa9..21dd3e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,11 +13,16 @@ Bugfixes: ## Version 1.0.3 (2014-04-30) Changes: - - Refactor message module, add more log + + - Refactor message module, add more log. + - Refactor web module, add more log, http api add version in url. + - Refactor comet module, add more log. New Features: + Bugfixes: + - Fixed redis clean expired message function connection leak bug. diff --git a/comet/comet-example.conf b/comet/comet-example.conf index 9181b81..b5a5f36 100644 --- a/comet/comet-example.conf +++ b/comet/comet-example.conf @@ -45,7 +45,7 @@ pidfile /tmp/gopush-cluster-comet.pid # Note this directive is only support "websocket" protocol # websocket.bind 192.168.1.100:6968,10.0.0.1:6968 # websocket.bind 127.0.0.1:6968 -# websocket.bind :6968 +# websocket.bind 0.0.0.0:6968 websocket.bind localhost:6968 # By default comet listens for connections from all the network interfaces @@ -58,7 +58,7 @@ websocket.bind localhost:6968 # Note this directive is only support "tcp" protocol # tcp.bind 192.168.1.100:6969,10.0.0.1:6969 # tcp.bind 127.0.0.1:6969 -# tcp.bind :6969 +# tcp.bind 0.0.0.0:6969 tcp.bind localhost:6969 # This is used by rpc listen for internal protocol. @@ -69,7 +69,7 @@ tcp.bind localhost:6969 # # rpc.bind 192.168.1.100:6970,10.0.0.1:6970 # rpc.bind 127.0.0.1:6970 -# rpc.bind :6970 +# rpc.bind 0.0.0.0:6970 rpc.bind localhost:6970 # This is used by comet service profiling (pprof). @@ -80,7 +80,7 @@ rpc.bind localhost:6970 # # pprof.bind 192.168.1.100:6971,10.0.0.1:6971 # pprof.bind 127.0.0.1:6971 -# pprof.bind :6971 +# pprof.bind 0.0.0.0:6971 pprof.bind localhost:6971 # This is used by comet service get stat info by http. @@ -91,7 +91,7 @@ pprof.bind localhost:6971 # # stat.bind 192.168.1.100:6971,10.0.0.1:6971 # stat.bind 127.0.0.1:6971 -# stat.bind :6971 +# stat.bind 0.0.0.0:6971 stat.bind localhost:6972 # The working directory. diff --git a/comet/config.go b/comet/config.go index 103b140..aa79dd5 100644 --- a/comet/config.go +++ b/comet/config.go @@ -26,11 +26,11 @@ import ( var ( Conf *Config - ConfFile string + confFile string ) func init() { - flag.StringVar(&ConfFile, "c", "./comet.conf", " set gopush-cluster comet config file path") + flag.StringVar(&confFile, "c", "./comet.conf", " set gopush-cluster comet config file path") } type Config struct { @@ -68,8 +68,8 @@ type Config struct { } // InitConfig get a new Config struct. -func InitConfig(file string) (*Config, error) { - cf := &Config{ +func InitConfig() error { + Conf = &Config{ // base User: "nobody nobody", PidFile: "/tmp/gopush-cluster-comet.pid", @@ -103,13 +103,13 @@ func InitConfig(file string) (*Config, error) { MsgBufNum: 30, } c := goconf.New() - if err := c.Parse(file); err != nil { - glog.Errorf("goconf.Parse(\"%s\") error(%v)", file, err) - return nil, err + if err := c.Parse(confFile); err != nil { + glog.Errorf("goconf.Parse(\"%s\") error(%v)", confFile, err) + return err } - if err := c.Unmarshal(cf); err != nil { + if err := c.Unmarshal(Conf); err != nil { glog.Errorf("goconf.Unmarshall() error(%v)", err) - return nil, err + return err } - return cf, nil + return nil } diff --git a/comet/main.go b/comet/main.go index c68e395..7af4eac 100644 --- a/comet/main.go +++ b/comet/main.go @@ -27,27 +27,25 @@ import ( ) func main() { - var err error // parse cmd-line arguments flag.Parse() + glog.Infof("comet ver: \"%s\" start", ver.Version) defer glog.Flush() - signalCH := InitSignal() // init config - Conf, err = InitConfig(ConfFile) - if err != nil { - glog.Errorf("NewConfig(\"%s\") error(%v)", ConfFile, err) + if err := InitConfig(); err != nil { + glog.Errorf("InitConfig() error(%v)", err) return } // set max routine runtime.GOMAXPROCS(Conf.MaxProc) + // start pprof + perf.Init(Conf.PprofBind) // create channel - UserChannel = NewChannelList() // if process exit, close channel + UserChannel = NewChannelList() defer UserChannel.Close() // start stats StartStats() - // start pprof - perf.Init(Conf.PprofBind) // start rpc StartRPC() // start comet @@ -64,11 +62,11 @@ func main() { // sleep one second, let the listen start time.Sleep(time.Second) if err = process.Init(Conf.User, Conf.Dir, Conf.PidFile); err != nil { - glog.Errorf("process.Init() error(%v)", err) + glog.Errorf("process.Init(\"%s\", \"%s\", \"%s\") error(%v)", Conf.User, Conf.Dir, Conf.PidFile, err) return } - glog.Infof("comet(%s) start", ver.Version) // init signals, block wait signals + signalCH := InitSignal() HandleSignal(signalCH) // exit glog.Info("comet stop") diff --git a/comet/rpc.go b/comet/rpc.go index 2ef2e77..d2ab09b 100644 --- a/comet/rpc.go +++ b/comet/rpc.go @@ -34,7 +34,7 @@ func StartRPC() { c := &CometRPC{} rpc.Register(c) for _, bind := range Conf.RPCBind { - glog.Infof("start listen rpc addr:\"%s\"", bind) + glog.Infof("start listen rpc addr: \"%s\"", bind) go rpcListen(bind) } } @@ -108,40 +108,14 @@ func (c *CometRPC) PushPrivate(args *myrpc.CometPushPrivateArgs, ret *int) error return err } // use the channel push message - msg := &myrpc.Message{Msg: args.Msg, GroupId: args.GroupId} - if err = ch.PushMsg(args.Key, msg, args.Expire); err != nil { - glog.Error("ch.PushMsg(\"%s\", \"%v\") error(%v)", args.Key, msg, err) + m := &myrpc.Message{Msg: args.Msg} + if err = ch.PushMsg(args.Key, m, args.Expire); err != nil { + glog.Errorf("ch.PushMsg(\"%s\", \"%v\") error(%v)", args.Key, m, err) return err } return nil } -// PushPublic expored a method for publishing a public message for the channel -func (c *CometRPC) PushPublic(args *myrpc.CometPushPublicArgs, ret *int) error { - /* - // get all the channel lock - m := &Message{Msg: args.Msg, MsgID: args.MsgID, GroupID: myrpc.PublicGroupID} - for _, c := range UserChannel.Channels { - c.Lock() - cm := make(map[string]Channel, len(c.Data)) - for k, v := range c.Data { - cm[k] = v - } - c.Unlock() - // multiple routine push message - go func() { - for k, v := range cm { - if err := v.PushMsg(k, m); err != nil { - continue - } - } - }() - } - *ret = myrpc.OK - */ - return nil -} - // Publish expored a method for publishing a message for the channel func (c *CometRPC) Migrate(args *myrpc.CometMigrateArgs, ret *int) error { if len(args.Nodes) == 0 { @@ -193,6 +167,6 @@ func (c *CometRPC) Migrate(args *myrpc.CometMigrateArgs, ret *int) error { } func (c *CometRPC) Ping(args int, ret *int) error { - glog.V(2).Info("CometRPC.Ping() ok") + glog.V(2).Info("ping ok") return nil } diff --git a/comet/seq_channel.go b/comet/seq_channel.go index 3bd0863..44d4d61 100644 --- a/comet/seq_channel.go +++ b/comet/seq_channel.go @@ -102,14 +102,14 @@ func (c *SeqChannel) PushMsg(key string, m *myrpc.Message, expire uint) error { c.mutex.Lock() // private message need persistence // if message expired no need persistence, only send online message - // rewrite message id - m.MsgId = c.timeID.ID() + // rewrite message id + m.MsgId = c.timeID.ID() if m.GroupId != myrpc.PublicGroupId && expire > 0 { - args := &myrpc.MessageSaveArgs{Key: key, Msg: m.Msg, MsgId: m.MsgId, GroupId: m.GroupId, Expire: expire} + args := &myrpc.MessageSavePrivateArgs{Key: key, Msg: m.Msg, MsgId: m.MsgId, Expire: expire} ret := 0 - if err = client.Call(myrpc.MessageServiceSave, args, &ret); err != nil { + if err = client.Call(myrpc.MessageServiceSavePrivate, args, &ret); err != nil { c.mutex.Unlock() - glog.Errorf("%s(\"%s\", \"%v\", &ret) error(%v)", myrpc.MessageServiceSave, key, args, err) + glog.Errorf("%s(\"%s\", \"%v\", &ret) error(%v)", myrpc.MessageServiceSavePrivate, key, args, err) return err } } diff --git a/comet/zk.go b/comet/zk.go index 22f782f..58c4019 100644 --- a/comet/zk.go +++ b/comet/zk.go @@ -52,7 +52,9 @@ func InitZK() (*zk.Conn, error) { for _, addr := range Conf.RPCBind { data += fmt.Sprintf("rpc://%s,", addr) } - if err = myzk.RegisterTemp(conn, fpath, strings.TrimRight(data, ",")); err != nil { + data = strings.TrimRight(data, ",") + glog.V(1).Infof("zk data: \"%s\"", data) + if err = myzk.RegisterTemp(conn, fpath, data); err != nil { glog.Errorf("zk.RegisterTemp() error(%v)", err) return conn, err } diff --git a/message/message-example.conf b/message/message-example.conf index 2f5b962..dc3b08e 100644 --- a/message/message-example.conf +++ b/message/message-example.conf @@ -23,8 +23,9 @@ # default tcp listen localhost:8070 # Examples: # -# addr 192.168.1.100:8070,10.0.0.1:8070 -# addr 127.0.0.1:8070 +# rpc.bind 192.168.1.100:8070,10.0.0.1:8070 +# rpc.bind 127.0.0.1:8070 +# rpc.bind 0.0.0.0:8070 rpc.bind localhost:8070 # If the master process is run as root, then message will setuid()/setgid() @@ -56,7 +57,7 @@ dir ./ # # pprof.bind 192.168.1.100:8170,10.0.0.1:8170 # pprof.bind 127.0.0.1:8170 -# pprof.bind :8170 +# pprof.bind 0.0.0.0:8170 pprof.bind localhost:8170 [storage] @@ -81,7 +82,7 @@ active 300 store 20 [redis.source] -# The format like "NodeName=IP:Port", NodeName was specified by Comet service. +# The format like "NodeName IP:Port", NodeName was specified by Comet service. # If there are multiple nodes, then configure following # node1 IP1:Port1 # node2 IP2:Port2 diff --git a/message/mysql.go b/message/mysql.go index 6e241bc..2e0bb99 100644 --- a/message/mysql.go +++ b/message/mysql.go @@ -21,17 +21,18 @@ import ( "encoding/json" "errors" "github.com/Terry-Mao/gopush-cluster/hash" - "github.com/Terry-Mao/gopush-cluster/rpc" + myrpc "github.com/Terry-Mao/gopush-cluster/rpc" _ "github.com/go-sql-driver/mysql" "github.com/golang/glog" "time" ) const ( - saveSQL = "INSERT INTO message(sub,gid,mid,expire,msg,ctime,mtime) VALUES(?,?,?,?,?,?,?)" - getSQL = "SELECT mid, gid, expire, msg FROM message WHERE sub=? AND mid>?" - delExpireSQL = "DELETE FROM message WHERE expire<=?" - delKeySQL = "DELETE FROM message WHERE sub=?" + savePrivateMsgSQL = "INSERT INTO private_msg(skey,mid,ttl,msg,ctime,mtime) VALUES(?,?,?,?,?,?)" + // TODO limit + getPrivateMsgSQL = "SELECT mid, ttl, msg FROM private_msg WHERE skey=? AND mid>? ORDER BY mid" + delExpiredPrivateMsgSQL = "DELETE FROM private_msg WHERE ttl<=?" + delPrivateMsgSQL = "DELETE FROM private_msg WHERE skey=?" ) var ( @@ -60,40 +61,39 @@ func NewMySQLStorage() *MySQLStorage { return s } -// Save implements the Storage Save method. -func (s *MySQLStorage) Save(key string, msg json.RawMessage, mid int64, gid uint, expire uint) error { +// SavePrivate implements the Storage SavePrivate method. +func (s *MySQLStorage) SavePrivate(key string, msg json.RawMessage, mid int64, expire uint) error { db := s.getConn(key) if db == nil { return ErrNoMySQLConn } now := time.Now() - _, err := db.Exec(saveSQL, key, gid, mid, now.Unix()+int64(expire), []byte(msg), now, now) + _, err := db.Exec(savePrivateMsgSQL, key, mid, now.Unix()+int64(expire), []byte(msg), now, now) if err != nil { - glog.Errorf("db.Exec(%s,%s,%d,%d,%d,%s,now,now) failed (%v)", saveSQL, key, 0, mid, expire, string(msg), err) + glog.Errorf("db.Exec(\"%s\",\"%s\",%d,%d,%d,\"%s\",now,now) failed (%v)", savePrivateMsgSQL, key, mid, expire, string(msg), err) return err } return nil } -// Get implements the Storage Get method. -func (s *MySQLStorage) Get(key string, mid int64) ([]*rpc.Message, error) { +// GetPrivate implements the Storage GetPrivate method. +func (s *MySQLStorage) GetPrivate(key string, mid int64) ([]*myrpc.Message, error) { db := s.getConn(key) if db == nil { return nil, ErrNoMySQLConn } now := time.Now().Unix() - rows, err := db.Query(getSQL, key, mid) + rows, err := db.Query(getPrivateMsgSQL, key, mid) if err != nil { - glog.Errorf("db.Query(%s,%s,%d,now) failed (%v)", getSQL, key, mid, err) + glog.Errorf("db.Query(\"%s\",\"%s\",%d,now) failed (%v)", getPrivateMsgSQL, key, mid, err) return nil, err } expire := int64(0) cmid := int64(0) - cgid := uint(0) msg := json.RawMessage([]byte{}) - msgs := []*rpc.Message{} + msgs := []*myrpc.Message{} for rows.Next() { - if err := rows.Scan(&cmid, &cgid, &expire, &msg); err != nil { + if err := rows.Scan(&cmid, &expire, &msg); err != nil { glog.Errorf("rows.Scan() failed (%v)", err) return nil, err } @@ -101,20 +101,20 @@ func (s *MySQLStorage) Get(key string, mid int64) ([]*rpc.Message, error) { glog.Warningf("user_key: \"%s\" mid: %d expired", key, cmid) continue } - msgs = append(msgs, &rpc.Message{MsgId: cmid, GroupId: cgid, Msg: msg}) + msgs = append(msgs, &myrpc.Message{MsgId: cmid, GroupId: myrpc.PrivateGroupId, Msg: msg}) } return msgs, nil } -// Del implements the Storage DelKey method. -func (s *MySQLStorage) Del(key string) error { +// DelPrivate implements the Storage DelPrivate method. +func (s *MySQLStorage) DelPrivate(key string) error { db := s.getConn(key) if db == nil { return ErrNoMySQLConn } - res, err := db.Exec(delKeySQL, key) + res, err := db.Exec(delPrivateMsgSQL, key) if err != nil { - glog.Errorf("db.Exec(\"%s\", \"%s\") error(%v)", delKeySQL, key, err) + glog.Errorf("db.Exec(\"%s\", \"%s\") error(%v)", delPrivateMsgSQL, key, err) return err } rows, err := res.RowsAffected() @@ -133,9 +133,9 @@ func (s *MySQLStorage) clean() { now := time.Now().Unix() affect := int64(0) for _, db := range s.pool { - res, err := db.Exec(delExpireSQL, now) + res, err := db.Exec(delExpiredPrivateMsgSQL, now) if err != nil { - glog.Errorf("db.Exec(\"%s\", now) failed (%v)", delExpireSQL, err) + glog.Errorf("db.Exec(\"%s\", %d) failed (%v)", delExpiredPrivateMsgSQL, now, err) continue } aff, err := res.RowsAffected() diff --git a/message/redis.go b/message/redis.go index 0aea8f2..2ad3e12 100644 --- a/message/redis.go +++ b/message/redis.go @@ -21,7 +21,7 @@ import ( "errors" "fmt" "github.com/Terry-Mao/gopush-cluster/hash" - "github.com/Terry-Mao/gopush-cluster/rpc" + myrpc "github.com/Terry-Mao/gopush-cluster/rpc" "github.com/garyburd/redigo/redis" "github.com/golang/glog" "time" @@ -32,10 +32,9 @@ var ( ) // RedisMessage struct encoding the composite info. -type RedisMessage struct { - Msg json.RawMessage `json:"msg"` // message content - GroupId uint `json:"gid"` // group id - Expire int64 `json:"expire"` // expire second +type RedisPrivateMessage struct { + Msg json.RawMessage `json:"msg"` // message content + Expire int64 `json:"expire"` // expire second } // Struct for delele message @@ -75,14 +74,14 @@ func NewRedisStorage() *RedisStorage { return s } -// Save implements the Storage Save method. -func (s *RedisStorage) Save(key string, msg json.RawMessage, mid int64, gid uint, expire uint) error { +// SavePrivate implements the Storage SavePrivate method. +func (s *RedisStorage) SavePrivate(key string, msg json.RawMessage, mid int64, expire uint) error { conn := s.getConn(key) if conn == nil { return RedisNoConnErr } defer conn.Close() - rm := &RedisMessage{Msg: msg, GroupId: gid, Expire: int64(expire) + time.Now().Unix()} + rm := &RedisPrivateMessage{Msg: msg, Expire: int64(expire) + time.Now().Unix()} m, err := json.Marshal(rm) if err != nil { glog.Errorf("json.Marshal(\"%v\") error(%v)", rm, err) @@ -113,8 +112,8 @@ func (s *RedisStorage) Save(key string, msg json.RawMessage, mid int64, gid uint return nil } -// Save implements the Storage Get method. -func (s *RedisStorage) Get(key string, mid int64) ([]*rpc.Message, error) { +// GetPrivate implements the Storage GetPrivate method. +func (s *RedisStorage) GetPrivate(key string, mid int64) ([]*myrpc.Message, error) { conn := s.getConn(key) if conn == nil { return nil, RedisNoConnErr @@ -125,7 +124,7 @@ func (s *RedisStorage) Get(key string, mid int64) ([]*rpc.Message, error) { glog.Errorf("conn.Do(\"ZRANGEBYSCORE\", \"%s\", \"%s\", \"+inf\", \"WITHSCORES\") error(%v)", err) return nil, err } - msgs := make([]*rpc.Message, 0, len(values)) + msgs := make([]*myrpc.Message, 0, len(values)) delMsgs := []int64{} cmid := int64(0) b := []byte{} @@ -136,7 +135,7 @@ func (s *RedisStorage) Get(key string, mid int64) ([]*rpc.Message, error) { glog.Errorf("redis.Scan() error(%v)", err) return nil, err } - rm := &RedisMessage{} + rm := &RedisPrivateMessage{} if err := json.Unmarshal(b, rm); err != nil { glog.Errorf("json.Unmarshal(\"%s\", rm) error(%v)", string(b), err) delMsgs = append(delMsgs, cmid) @@ -148,7 +147,7 @@ func (s *RedisStorage) Get(key string, mid int64) ([]*rpc.Message, error) { delMsgs = append(delMsgs, cmid) continue } - m := &rpc.Message{MsgId: cmid, Msg: rm.Msg, GroupId: rm.GroupId} + m := &myrpc.Message{MsgId: cmid, Msg: rm.Msg, GroupId: myrpc.PrivateGroupId} msgs = append(msgs, m) } // delete unmarshal failed and expired message @@ -162,8 +161,8 @@ func (s *RedisStorage) Get(key string, mid int64) ([]*rpc.Message, error) { return msgs, nil } -// Del implements the Storage DelKey method. -func (s *RedisStorage) Del(key string) error { +// DelPrivate implements the Storage DelPrivate method. +func (s *RedisStorage) DelPrivate(key string) error { conn := s.getConn(key) if conn == nil { return RedisNoConnErr diff --git a/message/rpc.go b/message/rpc.go index 0f44db8..c5cc6a1 100644 --- a/message/rpc.go +++ b/message/rpc.go @@ -51,53 +51,78 @@ func rpcListen(bind string) { rpc.Accept(l) } -// Store offline pravite message interface -func (r *MessageRPC) Save(m *myrpc.MessageSaveArgs, ret *int) error { - if m == nil || m.Msg == nil || m.MsgId < 0 || m.GroupId < 0 { +// SavePrivate rpc interface save user private message. +func (r *MessageRPC) SavePrivate(m *myrpc.MessageSavePrivateArgs, ret *int) error { + if m == nil || m.Msg == nil || m.MsgId < 0 { return myrpc.ErrParam } - if err := UseStorage.Save(m.Key, m.Msg, m.MsgId, m.GroupId, m.Expire); err != nil { - glog.Errorf("UseStorage.Save(\"%s\", \"%s\", %d, %d, %d) error(%v)", m.Key, string(m.Msg), m.MsgId, m.GroupId, m.Expire, err) + if err := UseStorage.SavePrivate(m.Key, m.Msg, m.MsgId, m.Expire); err != nil { + glog.Errorf("UseStorage.SavePrivate(\"%s\", \"%s\", %d, %d) error(%v)", m.Key, string(m.Msg), m.MsgId, m.Expire, err) return err } - glog.V(1).Infof("UseStorage.Save(\"%s\", \"%s\", %d, %d, %d) ok", m.Key, string(m.Msg), m.MsgId, m.GroupId, m.Expire) + glog.V(1).Infof("UseStorage.SavePrivate(\"%s\", \"%s\", %d, %d) ok", m.Key, string(m.Msg), m.MsgId, m.Expire) return nil } -// Store offline public message interface -func (r *MessageRPC) SavePub(m *myrpc.MessageSavePubArgs, ret *int) error { - return nil -} - -// Get offline message interface -func (r *MessageRPC) Get(m *myrpc.MessageGetArgs, rw *myrpc.MessageGetResp) error { +// GetPrivate rpc interface get user private message. +func (r *MessageRPC) GetPrivate(m *myrpc.MessageGetPrivateArgs, rw *myrpc.MessageGetResp) error { if m == nil || m.Key == "" || m.MsgId < 0 { return myrpc.ErrParam } - msgs, err := UseStorage.Get(m.Key, m.MsgId) + msgs, err := UseStorage.GetPrivate(m.Key, m.MsgId) if err != nil { - glog.Errorf("UseStorage.Get(\"%s\", %d) error(%v)", m.Key, m.MsgId, err) + glog.Errorf("UseStorage.GetPrivate(\"%s\", %d) error(%v)", m.Key, m.MsgId, err) return err } rw.Msgs = msgs - // TODO - rw.PubMsgs = nil - glog.V(1).Infof("UserStorage.Get(\"%s\", %d) ok", m.Key, m.MsgId) + glog.V(1).Infof("UserStorage.GetPrivate(\"%s\", %d) ok", m.Key, m.MsgId) return nil } -// Clean offline message interface -func (r *MessageRPC) Clean(key string, ret *int) error { +// DelPrivate rpc interface delete user private message. +func (r *MessageRPC) DelPrivate(key string, ret *int) error { if key == "" { return myrpc.ErrParam } - if err := UseStorage.Del(key); err != nil { - glog.Errorf("UserStorage.Del(\"%s\") error(%v)", key, err) + if err := UseStorage.DelPrivate(key); err != nil { + glog.Errorf("UserStorage.DelPrivate(\"%s\") error(%v)", key, err) return err } - glog.V(1).Infof("UserStorage.Del(\"%s\") ok", key) + glog.V(1).Infof("UserStorage.DelPrivate(\"%s\") ok", key) + return nil +} + +/* +// SavePublish rpc interface save publish message. +func (r *MessageRPC) SavePublish(m *myrpc.MessageSaveGroupArgs, ret *int) error { + return nil +} + +// GetPublish rpc interface get publish message. +func (r *MessageRPC) GetPublish(m *myrpc.MessageGetGroupArgs, rw *myrpc.MessageGetResp) error { + return nil +} + +// DelPublish rpc interface delete publish message. +func (r *MessageRPC) DelPublish(key string, ret *int) error { + return nil +} + +// SaveGroup rpc interface save publish message. +func (r *MessageRPC) SaveGroup(m *myrpc.MessageSaveGroupArgs, ret *int) error { + return nil +} + +// GetPublish rpc interface get publish message. +func (r *MessageRPC) GetGroup(m *myrpc.MessageGetGroupArgs, rw *myrpc.MessageGetResp) error { + return nil +} + +// DelPublish rpc interface delete publish message. +func (r *MessageRPC) DelGroup(key string, ret *int) error { return nil } +*/ // Server Ping interface func (r *MessageRPC) Ping(p int, ret *int) error { diff --git a/message/storage.go b/message/storage.go index 5e041d0..d56091b 100644 --- a/message/storage.go +++ b/message/storage.go @@ -35,12 +35,10 @@ var ( // Stored messages interface type Storage interface { - // Get messages - Get(key string, mid int64) ([]*rpc.Message, error) - // Save message - Save(key string, msg json.RawMessage, mid int64, gid uint, expire uint) error - // Delete key - Del(key string) error + // private message method + GetPrivate(key string, mid int64) ([]*rpc.Message, error) + SavePrivate(key string, msg json.RawMessage, mid int64, expire uint) error + DelPrivate(key string) error } // InitStorage init the storage type(mysql or redis). diff --git a/rpc/comet.go b/rpc/comet.go index 09629a7..6c0f4b2 100644 --- a/rpc/comet.go +++ b/rpc/comet.go @@ -32,9 +32,6 @@ import ( ) const ( - // group id - PrivateGroupId = 0 - PublicGroupId = 1 // protocol of Comet subcription cometProtocolUnknown = 0 cometProtocolWS = 1 @@ -76,7 +73,6 @@ type CometNodeEvent struct { type CometPushPrivateArgs struct { Key string // subscriber key Msg json.RawMessage // message content - GroupId uint // message group id Expire uint // message expire second } diff --git a/rpc/message.go b/rpc/message.go index f190f92..abc07c5 100644 --- a/rpc/message.go +++ b/rpc/message.go @@ -27,6 +27,17 @@ import ( "time" ) +const ( + // group id + PrivateGroupId = 0 + PublicGroupId = 1 + // message rpc service + MessageService = "MessageRPC" + MessageServiceGetPrivate = "MessageRPC.GetPrivate" + MessageServiceSavePrivate = "MessageRPC.SavePrivate" + MessageServiceDelPrivate = "MessageRPC.DelPrivate" +) + type MessageNodeEvent struct { // addr:port Key string @@ -43,9 +54,9 @@ type Message struct { // The Old Message struct (Compatible), TODO remove it. type OldMessage struct { - Msg string `json:"msg"` // Message - MsgId int64 `json:"mid"` // Message id - GroupId uint `json:"gid"` // Group id + Msg string `json:"msg"` // Message + MsgId int64 `json:"mid"` // Message id + GroupId uint `json:"gid"` // Group id } // Bytes get a message reply bytes. @@ -69,42 +80,32 @@ func (m *Message) OldBytes() ([]byte, error) { return byteJson, nil } -// Message Save Args -type MessageSaveArgs struct { - Key string // subscriber key - Msg json.RawMessage // message content - MsgId int64 // message id - GroupId uint // message group id - Expire uint // message expire second +// Message SavePrivate Args +type MessageSavePrivateArgs struct { + Key string // subscriber key + Msg json.RawMessage // message content + MsgId int64 // message id + Expire uint // message expire second } -// Public Message Save Args -type MessageSavePubArgs struct { +// Message SavePublish Args +type MessageSavePublishArgs struct { MsgID int64 // message id Msg string // message content Expire int64 // message expire second } // Message Get Args -type MessageGetArgs struct { - MsgId int64 // message id - PubMsgId int64 // public message id - Key string // subscriber key +type MessageGetPrivateArgs struct { + MsgId int64 // message id + Key string // subscriber key } // Message Get Response type MessageGetResp struct { - Msgs []*Message // messages - PubMsgs []*Message // public messages + Msgs []*Message // messages } -const ( - MessageService = "MessageRPC" - MessageServiceGet = "MessageRPC.Get" - MessageServiceSave = "MessageRPC.Save" - MessageServiceClean = "MessageRPC.Clean" -) - var ( MessageRPC *RandLB ) diff --git a/sql/gopush-cluster.sql b/sql/gopush-cluster.sql index d63ff4a..da9b0ca 100644 --- a/sql/gopush-cluster.sql +++ b/sql/gopush-cluster.sql @@ -3,17 +3,16 @@ CREATE DATABASE IF NOT EXISTS gopush; USE gopush; -# offline message -# DROP TABLE message; -CREATE TABLE IF NOT EXISTS message ( +# private message +# DROP TABLE private_msg; +CREATE TABLE IF NOT EXISTS private_msg ( id bigint unsigned NOT NULL AUTO_INCREMENT PRIMARY KEY, # auto increment id - sub varchar(64) NOT NULL, # subscriber key + skey varchar(64) NOT NULL, # subscriber key mid bigint unsigned NOT NULL, # message id - gid int unsigned NOT NULL, # message group id - expire bigint NOT NULL, # message expire second + ttl bigint NOT NULL, # message expire second msg blob NOT NULL, # message content ctime timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', # create time mtime timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', # modify time, - UNIQUE KEY ux_message_1 (sub, mid), - INDEX ix_message_1 (expire) + UNIQUE KEY ux_private_msg_1 (skey, mid), + INDEX ix_private_msg_1 (ttl) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; \ No newline at end of file diff --git a/ver/ver.go b/ver/ver.go index 166de0e..037f054 100644 --- a/ver/ver.go +++ b/ver/ver.go @@ -17,5 +17,5 @@ package ver const ( - Version = "1.0.2" + Version = "1.0.3" ) diff --git a/web/admin.go b/web/admin.go index d5746e2..c56718f 100644 --- a/web/admin.go +++ b/web/admin.go @@ -46,17 +46,11 @@ func PushPrivate(w http.ResponseWriter, r *http.Request) { body = string(bodyBytes) params := r.URL.Query() key := params.Get("key") - gidStr := params.Get("gid") expireStr := params.Get("expire") if key == "" { res["ret"] = ParamErr return } - gid, err := strconv.ParseUint(gidStr, 10, 32) - if err != nil { - res["ret"] = ParamErr - glog.Errorf("strconv.ParseInt(\"%s\", 10, 32) error(%v)", gidStr, err) - } expire, err := strconv.ParseUint(expireStr, 10, 32) if err != nil { res["ret"] = ParamErr @@ -80,8 +74,7 @@ func PushPrivate(w http.ResponseWriter, r *http.Request) { glog.Errorf("json.RawMessage(\"%s\").MarshalJSON() error(%v)", body, err) return } - // RPC call publish interface - args := &myrpc.CometPushPrivateArgs{GroupId: uint(gid), Msg: json.RawMessage(msg), Expire: uint(expire), Key: key} + args := &myrpc.CometPushPrivateArgs{Msg: json.RawMessage(msg), Expire: uint(expire), Key: key} ret := 0 if err := client.Call(myrpc.CometServicePushPrivate, args, &ret); err != nil { glog.Errorf("client.Call(\"%s\", \"%v\", &ret) error(%v)", myrpc.CometServicePushPrivate, args, err) @@ -91,8 +84,8 @@ func PushPrivate(w http.ResponseWriter, r *http.Request) { return } -// DelOfflineMessage handle for push private message. -func DelOfflineMessage(w http.ResponseWriter, r *http.Request) { +// DelPrivate handle for push private message. +func DelPrivate(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { http.Error(w, "Method Not Allowed", 405) return @@ -125,8 +118,8 @@ func DelOfflineMessage(w http.ResponseWriter, r *http.Request) { return } ret := 0 - if err := client.Call(myrpc.MessageServiceClean, key, &ret); err != nil { - glog.Errorf("client.Call(\"%s\", \"%s\", &ret) error(%v)", myrpc.MessageServiceClean, key, err) + if err := client.Call(myrpc.MessageServiceDelPrivate, key, &ret); err != nil { + glog.Errorf("client.Call(\"%s\", \"%s\", &ret) error(%v)", myrpc.MessageServiceDelPrivate, key, err) res["ret"] = InternalErr return } diff --git a/web/config.go b/web/config.go index 4039d07..f19f527 100644 --- a/web/config.go +++ b/web/config.go @@ -26,17 +26,17 @@ import ( var ( Conf *Config - ConfFile string + confFile string ) // InitConfig initialize config file path func init() { - flag.StringVar(&ConfFile, "c", "./web.conf", " set web config file path") + flag.StringVar(&confFile, "c", "./web.conf", " set web config file path") } type Config struct { - Addr []string `goconf:"base:addr:,"` - AdminAddr []string `goconf:"base:admin.addr:,"` + HttpBind []string `goconf:"base:http.bind:,"` + AdminBind []string `goconf:"base:admin.bind:,"` MaxProc int `goconf:"base:maxproc"` PprofBind []string `goconf:"base:pprof.bind:,"` User string `goconf:"base:user"` @@ -52,17 +52,17 @@ type Config struct { RPCPing time.Duration `goconf:"rpc:ping:time"` } -// Initialize config -func NewConfig(file string) (*Config, error) { +// InitConfig init configuration file. +func InitConfig() error { gconf := goconf.New() - if err := gconf.Parse(file); err != nil { - glog.Errorf("goconf.Parse(\"%s\") error(%v)", file, err) - return nil, err + if err := gconf.Parse(confFile); err != nil { + glog.Errorf("goconf.Parse(\"%s\") error(%v)", confFile, err) + return err } // Default config - conf := &Config{ - Addr: []string{"localhost:80"}, - AdminAddr: []string{"localhost:81"}, + Conf = &Config{ + HttpBind: []string{"localhost:80"}, + AdminBind: []string{"localhost:81"}, MaxProc: runtime.NumCPU(), PprofBind: []string{"localhost:8190"}, User: "nobody nobody", @@ -77,9 +77,9 @@ func NewConfig(file string) (*Config, error) { RPCRetry: 3 * time.Second, RPCPing: 1 * time.Second, } - if err := gconf.Unmarshal(conf); err != nil { + if err := gconf.Unmarshal(Conf); err != nil { glog.Errorf("goconf.Unmarshall() error(%v)", err) - return nil, err + return err } - return conf, nil + return nil } diff --git a/web/handle.go b/web/handle.go index 6c9af73..c3978c2 100644 --- a/web/handle.go +++ b/web/handle.go @@ -80,11 +80,10 @@ func GetOfflineMsg0(w http.ResponseWriter, r *http.Request) { params := r.URL.Query() key := params.Get("key") midStr := params.Get("mid") - pmidStr := params.Get("pmid") callback := params.Get("callback") res := map[string]interface{}{"ret": OK} defer retWrite(w, r, res, callback, time.Now()) - if key == "" || midStr == "" || pmidStr == "" { + if key == "" || midStr == "" { res["ret"] = ParamErr return } @@ -94,22 +93,16 @@ func GetOfflineMsg0(w http.ResponseWriter, r *http.Request) { glog.Errorf("strconv.ParseInt(\"%s\", 10, 64) error(%v)", midStr, err) return } - pmid, err := strconv.ParseInt(pmidStr, 10, 64) - if err != nil { - res["ret"] = ParamErr - glog.Errorf("strconv.ParseInt(\"%s\", 10, 64) error(%v)", pmidStr, err) - return - } // RPC get offline messages reply := &myrpc.MessageGetResp{} - args := &myrpc.MessageGetArgs{MsgId: mid, PubMsgId: pmid, Key: key} + args := &myrpc.MessageGetPrivateArgs{MsgId: mid, Key: key} client := myrpc.MessageRPC.Get() if client == nil { res["ret"] = InternalErr return } - if err := client.Call(myrpc.MessageServiceGet, args, reply); err != nil { - glog.Errorf("myrpc.MessageRPC.Call(\"%s\", \"%v\", reply) error(%v)", myrpc.MessageServiceGet, args, err) + if err := client.Call(myrpc.MessageServiceGetPrivate, args, reply); err != nil { + glog.Errorf("myrpc.MessageRPC.Call(\"%s\", \"%v\", reply) error(%v)", myrpc.MessageServiceGetPrivate, args, err) res["ret"] = InternalErr return } @@ -118,9 +111,6 @@ func GetOfflineMsg0(w http.ResponseWriter, r *http.Request) { omsgs = append(omsgs, &myrpc.OldMessage{GroupId: msg.GroupId, MsgId: msg.MsgId, Msg: string(msg.Msg)}) } opmsgs := []*myrpc.OldMessage{} - for _, msg := range reply.PubMsgs { - opmsgs = append(opmsgs, &myrpc.OldMessage{GroupId: msg.GroupId, MsgId: msg.MsgId, Msg: string(msg.Msg)}) - } res["data"] = map[string]interface{}{"msgs": omsgs, "pmsgs": opmsgs} return } diff --git a/web/handle_1.0.go b/web/handle_1.0.go index e3a8225..0eda8c3 100644 --- a/web/handle_1.0.go +++ b/web/handle_1.0.go @@ -80,11 +80,10 @@ func GetOfflineMsg(w http.ResponseWriter, r *http.Request) { params := r.URL.Query() key := params.Get("k") midStr := params.Get("m") - pmidStr := params.Get("p") callback := params.Get("cb") res := map[string]interface{}{"ret": OK} defer retWrite(w, r, res, callback, time.Now()) - if key == "" || midStr == "" || pmidStr == "" { + if key == "" || midStr == "" { res["ret"] = ParamErr return } @@ -94,26 +93,20 @@ func GetOfflineMsg(w http.ResponseWriter, r *http.Request) { glog.Errorf("strconv.ParseInt(\"%s\", 10, 64) error(%v)", midStr, err) return } - pmid, err := strconv.ParseInt(pmidStr, 10, 64) - if err != nil { - res["ret"] = ParamErr - glog.Errorf("strconv.ParseInt(\"%s\", 10, 64) error(%v)", pmidStr, err) - return - } // RPC get offline messages reply := &myrpc.MessageGetResp{} - args := &myrpc.MessageGetArgs{MsgId: mid, PubMsgId: pmid, Key: key} + args := &myrpc.MessageGetPrivateArgs{MsgId: mid, Key: key} client := myrpc.MessageRPC.Get() if client == nil { res["ret"] = InternalErr return } - if err := client.Call(myrpc.MessageServiceGet, args, reply); err != nil { - glog.Errorf("myrpc.MessageRPC.Call(\"%s\", \"%v\", reply) error(%v)", myrpc.MessageServiceGet, args, err) + if err := client.Call(myrpc.MessageServiceGetPrivate, args, reply); err != nil { + glog.Errorf("myrpc.MessageRPC.Call(\"%s\", \"%v\", reply) error(%v)", myrpc.MessageServiceGetPrivate, args, err) res["ret"] = InternalErr return } - res["data"] = map[string]interface{}{"msgs": reply.Msgs, "pmsgs": reply.PubMsgs} + res["data"] = map[string]interface{}{"msgs": reply.Msgs} return } diff --git a/web/http.go b/web/http.go index 659488c..2d77797 100644 --- a/web/http.go +++ b/web/http.go @@ -43,13 +43,17 @@ func StartHTTP() { httpServeMux.HandleFunc("/time/get", GetTime0) // internal httpAdminServeMux := http.NewServeMux() + // 1.0 + httpAdminServeMux.HandleFunc("/1/admin/push/private", PushPrivate) + httpAdminServeMux.HandleFunc("/1/admin/msg/del", DelPrivate) + // old httpAdminServeMux.HandleFunc("/admin/push", PushPrivate) - httpAdminServeMux.HandleFunc("/admin/msg/clean", DelOfflineMessage) - for _, bind := range Conf.Addr { + httpAdminServeMux.HandleFunc("/admin/msg/clean", DelPrivate) + for _, bind := range Conf.HttpBind { glog.Infof("start http listen addr:\"%s\"", bind) go httpListen(httpServeMux, bind) } - for _, bind := range Conf.AdminAddr { + for _, bind := range Conf.AdminBind { glog.Infof("start admin http listen addr:\"%s\"", bind) go httpListen(httpAdminServeMux, bind) } diff --git a/web/main.go b/web/main.go index dc1133b..c918a55 100644 --- a/web/main.go +++ b/web/main.go @@ -30,12 +30,10 @@ func main() { var err error // Parse cmd-line arguments flag.Parse() + glog.Infof("web ver: \"%s\" start", ver.Version) defer glog.Flush() - signalCH := InitSignal() - // Load config - Conf, err = NewConfig(ConfFile) - if err != nil { - glog.Errorf("NewConfig(\"%s\") error(%v)", ConfFile, err) + if err = InitConfig(); err != nil { + glog.Errorf("InitConfig() error(%v)", err) return } // Set max routine @@ -67,7 +65,7 @@ func main() { return } // init signals, block wait signals - glog.Infof("web(%s) start", ver.Version) + signalCH := InitSignal() HandleSignal(signalCH) glog.Info("web stop") } diff --git a/web/web-example.conf b/web/web-example.conf index 2539440..fa118a0 100644 --- a/web/web-example.conf +++ b/web/web-example.conf @@ -20,11 +20,20 @@ [base] # Web service http listen and server on this address, default localhost:8090 -addr localhost:8090 +# +# Examples +# tcp.bind localhost:8090 +# tcp.bind 192.168.1.100:8090,192.168.1.101:8090 +# tcp.bind 0.0.0.0:8090 +tcp.bind localhost:8090 # Web service http listen and server on this address, default localhost:8091 # mainly servers for internal admin -admin.addr localhost:8091 +# Examples +# tcp.bind localhost:8091 +# tcp.bind 192.168.1.100:8091,192.168.1.101:8091 +# tcp.bind 0.0.0.0:8091 +admin.bind localhost:8091 # Sets the maximum number of CPUs that can be executing simultaneously. # This call will go away when the scheduler improves. By default the number of @@ -40,7 +49,7 @@ admin.addr localhost:8091 # # pprof.bind 192.168.1.100:8190,10.0.0.1:8190 # pprof.bind 127.0.0.1:8190 -# pprof.bind :8190 +# pprof.bind 0.0.0.0:8190 pprof.bind localhost:8190 # If the master process is run as root, then web will setuid()/setgid() @@ -80,7 +89,7 @@ addr localhost:2181 # The timeout parameter, given in nanoseconds, allows controlling # the amount of time the zookeeper connection can stay unresponsive before the # zookeeper server will be considered problematic. -# timeout 30s +timeout 30s # The root path of all nodes that Comet mounted in zookeeper,default /gopush-cluster-comet comet.path /gopush-cluster-comet @@ -94,3 +103,13 @@ message.path /gopush-cluster-message # Interval time of every reconnection # retry 3s + +################################## INCLUDES ################################### + +# Include one or more other config files here. This is useful if you +# have a standard template that goes to all comet server but also need +# to customize a few per-server settings. Include files can include +# other files, so use this wisely. +# +# include /path/to/local.conf +# include /path/to/other.conf diff --git a/zk/zk.go b/zk/zk.go index b1e8b77..6ed149d 100644 --- a/zk/zk.go +++ b/zk/zk.go @@ -47,7 +47,7 @@ func Connect(addr []string, timeout time.Duration) (*zk.Conn, error) { go func() { for { event := <-session - glog.Infof("zookeeper get a event: %s", event.State.String()) + glog.V(1).Infof("zookeeper get a event: %s", event.State.String()) } }() return conn, nil