From b4129ba3e1dfced453fa76e503786d5b8e92eeab Mon Sep 17 00:00:00 2001 From: Tony Date: Mon, 10 Dec 2018 10:50:59 -0500 Subject: [PATCH] Initial commit --- LICENSE | 21 +++ README.md | 76 +++++++++ parse-nats-data.go | 413 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 510 insertions(+) create mode 100644 LICENSE create mode 100644 README.md create mode 100644 parse-nats-data.go diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..93ed8d2 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 Tony Meehan + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..af00834 --- /dev/null +++ b/README.md @@ -0,0 +1,76 @@ +# Parse NATS Streaming Server Data + +This is a tool to parse the contents of [NATS streaming server](https://github.com/nats-io/nats-streaming-server/) durable subscription. + +From the [nats.io documentation](https://nats.io/documentation/streaming/nats-streaming-intro/): + +> Durable subscriptions - Subscriptions may also specify a “durable name” which will survive client restarts. Durable subscriptions cause the server to track the last acknowledged message sequence number for a client and durable name. When the client restarts/resubscribes, and uses the same client ID and durable name, the server will resume delivery beginning with the earliest unacknowledged message for this durable subscription. + +## Installation + +``` +go get -u github.com/endgameinc/parse-nats-data +``` + +## Usage + +``` +# ./parse-nats-data + +Usage: ./parse-nats-data [options] + +Options: + -d, --msgsDat msgs.dat file + -i, --msgsIdx msgs.idx file + -s, --subsDat subs.dat file + -c, --clientsDat clients.dat file +``` + +To read messages, run `parse-nats-data` with the following arguments: + +``` +# ./parse-nats-data -d /path/to/msgs.1.dat -i /path/to/msgs.1.idx +[2018-07-10 15:00:31.716244931 +0000 UTC] seq: 1 | size: 936 | offset: 4 | crc32: 8e22ea12 +[2018-07-10 15:00:31.836288178 +0000 UTC] seq: 2 | size: 959 | offset: 948 | crc32: 7cccde1c +[2018-07-10 15:00:36.202716151 +0000 UTC] seq: 3 | size: 969 | offset: 1915 | crc32: 562b1ffc +[2018-07-10 15:00:50.33685147 +0000 UTC] seq: 4 | size: 958 | offset: 2892 | crc32: 256e9151 +[2018-07-10 15:00:53.790236525 +0000 UTC] seq: 5 | size: 994 | offset: 3858 | crc32: 3c5ad77 +[2018-07-10 15:00:54.425182393 +0000 UTC] seq: 6 | size: 983 | offset: 4860 | crc32: 2fe42320 +[2018-07-10 15:00:55.447482123 +0000 UTC] seq: 7 | size: 982 | offset: 5851 | crc32: fd8ff587 +[2018-07-10 15:00:56.251222498 +0000 UTC] seq: 8 | size: 984 | offset: 6841 | crc32: 2ec77197 +[2018-07-10 15:02:47.133969633 +0000 UTC] seq: 9 | size: 980 | offset: 7833 | crc32: 83c370d4 +[2018-07-10 15:02:47.716276067 +0000 UTC] seq: 10 | size: 995 | offset: 8821 | crc32: bd8ce709 +[2018-07-10 15:02:50.598110162 +0000 UTC] seq: 11 | size: 891 | offset: 9824 | crc32: f0bdc27b +[2018-07-10 15:02:51.031120836 +0000 UTC] seq: 12 | size: 899 | offset: 10723 | crc32: 175b6e1b +``` + +To read subscription events, run `parse-nats-data` with the following arguments: + +``` +# ./parse-nats-data -s /path/to/subs.dat +ID: 1 'Client_6715' Type: subRecNew LastSent: 0 qGroup: Inbox: _INBOX.4SR1QZIfwYLHPtQYrb1AzQ AckInbox: _STAN.subacks.54V6hblM4Dt4B7KyUzRYy5.sub:queue.neCdF3TS8SIjOY1fpRavR0 MaxInFlight: 1024 AckWaitInSecs: 30 Durable: IsDurable: false IsClosed: false +ID: 2 'Client_10792_bcff6c27' Type: subRecNew LastSent: 0 qGroup: sub:queue Inbox: _INBOX.Jby2tc5tiIBlZvYSnfBUOR AckInbox: _STAN.subacks.54V6hblM4Dt4B7KyUzRYy5.sub.neCdF3TS8SIjOY1fpRavss MaxInFlight: 8192 AckWaitInSecs: 30 Durable: IsDurable: true IsClosed: false +ID: 3 'Client_11933_4794405c' Type: subRecNew LastSent: 0 qGroup: sub:queue Inbox: _INBOX.KoEbjf9Czr4kk62w3eLslO AckInbox: _STAN.subacks.54V6hblM4Dt4B7KyUzRYy5.sub.neCdF3TS8SIjOY1fpRawgQ MaxInFlight: 8192 AckWaitInSecs: 30 Durable: IsDurable: true IsClosed: false +ID: 1 SeqNo: 1 subRecMsg +ID: 2 SeqNo: 1 subRecMsg +ID: 3 SeqNo: 1 subRecMsg +ID: 1 SeqNo: 1 subRecAck +ID: 3 SeqNo: 1 subRecAck +ID: 1 SeqNo: 2 subRecMsg +ID: 2 SeqNo: 2 subRecMsg +``` + +To read client events, run `parse-nats-data` as follows: + +``` +# ./parse-nats-data -c clients.dat +CID: Client_40887 Inbox: _INBOX.cggINaUbPQHIjTwxsJSIuz ConnId: [99 103 103 73 78 97 85 98 80 81 72 73 106 84 119 120 115 74 83 73 115 73] Protocol: 1 PingInterval: 5 PingMaxTimeout: 3 addClient +CID: Client_40887 Inbox: _INBOX.cggINaUbPQHIjTwxsJSIpb ConnId: [99 103 103 73 78 97 85 98 80 81 72 73 106 84 119 120 115 74 83 73 109 117] Protocol: 1 PingInterval: 5 PingMaxTimeout: 3 addClient +CID: Client_40887 Inbox: _INBOX.cggINaUbPQHIjTwxsJSJGX ConnId: [99 103 103 73 78 97 85 98 80 81 72 73 106 84 119 120 115 74 83 74 68 113] Protocol: 1 PingInterval: 5 PingMaxTimeout: 3 addClient +CID: Client_40887 delClient +CID: Client_40887 Inbox: _INBOX.cggINaUbPQHIjTwxsJSJpY ConnId: [99 103 103 73 78 97 85 98 80 81 72 73 106 84 119 120 115 74 83 74 109 114] Protocol: 1 PingInterval: 5 PingMaxTimeout: 3 addClient +CID: Client_40887 Inbox: _INBOX.cggINaUbPQHIjTwxsJSKDn ConnId: [99 103 103 73 78 97 85 98 80 81 72 73 106 84 119 120 115 74 83 75 66 54] Protocol: 1 PingInterval: 5 PingMaxTimeout: 3 addClient +CID: Client_40887 delClient +CID: Client_40887 Inbox: _INBOX.cggINaUbPQHIjTwxsJSKZL ConnId: [99 103 103 73 78 97 85 98 80 81 72 73 106 84 119 120 115 74 83 75 87 101] Protocol: 1 PingInterval: 5 PingMaxTimeout: 3 addClient +CID: Client_40887 delClient +``` diff --git a/parse-nats-data.go b/parse-nats-data.go new file mode 100644 index 0000000..3ef3c80 --- /dev/null +++ b/parse-nats-data.go @@ -0,0 +1,413 @@ +package main + +import ( + "encoding/binary" + "errors" + "flag" + "fmt" + "log" + "os" + "time" + + proto "github.com/gogo/protobuf/proto" + spb "github.com/nats-io/nats-streaming-server/spb" +) + +// Record types for subscription file +type recordType byte + +const ( + subRecNew = recordType(iota) + 1 + subRecUpdate + subRecDel + subRecAck + subRecMsg +) + +// Record types for client store +const ( + addClient = recordType(iota) + 1 + delClient +) + +func readNextBytes(file *os.File, number int) ([]byte, error) { + bytes := make([]byte, number) + + _, err := file.Read(bytes) + if err != nil { + return nil, err + } + + return bytes, nil +} + +func readNextBytesAsUint32(f *os.File) (uint32, error) { + b, err := readNextBytes(f, 4) + if err != nil { + return 0, err + } + return binary.LittleEndian.Uint32(b), nil +} + +func readNextBytesAsUint64(f *os.File) (uint64, error) { + b, err := readNextBytes(f, 8) + if err != nil { + return 0, err + } + return binary.LittleEndian.Uint64(b), nil +} + +func getMsgSize(d []byte) int { + if len(d) <= 0 { + return 0 + } + size := int(uint(d[0]) | uint(d[1])<<8 | uint(d[2])<<16) + return size +} + +func getNextMsgOffsetFromIndex(idxFile *os.File) (uint32, uint64, error) { + var err error + + seq, err := readNextBytesAsUint64(idxFile) + if err != nil { + return 0, 0, err + } + + offset, err := readNextBytesAsUint64(idxFile) + if err != nil { + return 0, 0, err + } + + timestamp, err := readNextBytesAsUint64(idxFile) + if err != nil { + return 0, 0, err + } + + idxMsgSize, err := readNextBytesAsUint32(idxFile) + if err != nil { + return 0, 0, err + } + + crc32, err := readNextBytesAsUint32(idxFile) + if err != nil { + return 0, 0, err + } + + t := time.Unix(0, int64(timestamp)) + + location, err := time.LoadLocation("UTC") + if err != nil { + return 0, 0, err + } + + fmt.Printf("[%s] seq: %d | size: %d | offset: %d | crc32: %x", t.In(location), seq, idxMsgSize, offset, crc32) + + return idxMsgSize, offset, err +} + +func printMsgAtOffset(datFile *os.File, size uint32, offset uint64) error { + var err error + + bytes := make([]byte, size) + + _, err = datFile.Seek(int64(offset), 0) + if err != nil { + log.Fatal(err) + } + + _, err = datFile.Read(bytes) + if err != nil { + log.Fatal(err) + } + + msgSize := getMsgSize(bytes) + if int(msgSize) != int(size) { + return errors.New(fmt.Sprintf("error: size mismatch (%u msg!= %u idx)", int(msgSize), int(size))) + } + return err +} + +func parseMessages(idxFile *os.File, datFile *os.File) error { + var err error + size := uint32(0) + offset := uint64(0) + + for { + size, offset, err = getNextMsgOffsetFromIndex(idxFile) + if err != nil { + break + } + + err = printMsgAtOffset(datFile, size, offset) + if err != nil { + break + } + fmt.Printf("\n") + } + + return err +} + +func checkVersion(f *os.File) error { + fileVersion, err := readNextBytesAsUint32(f) + if fileVersion != 1 || err != nil { + return errors.New("invalid file version") + } + + return nil +} + +func parseData(idxPath string, datPath string) error { + var err error + + idxFile, err := os.Open(idxPath) + if err != nil { + return err + } + + datFile, err := os.Open(datPath) + if err != nil { + log.Fatalf("Cannot open file '%s' err:%s", datPath, err) + } + defer idxFile.Close() + + if checkVersion(idxFile) != nil { + return errors.New("invalid index file") + } + + if checkVersion(datFile) != nil { + return errors.New("invalid dat file") + } + + err = parseMessages(idxFile, datFile) + return err +} + +func parseSubRecNew(msgType string, msg *[]byte) { + newSub := &spb.SubState{} + err := proto.Unmarshal(*msg, newSub) + if err != nil { + fmt.Printf("unmarshalling error: %s\n", err) + return + } + + fmt.Printf("ID: %d '%s' Type: %s LastSent: %d qGroup: %s Inbox: %s AckInbox: %s MaxInFlight: %d AckWaitInSecs: %d Durable: %s IsDurable: %t IsClosed: %t\n", + newSub.ID, + newSub.ClientID, + msgType, + newSub.LastSent, + newSub.QGroup, + newSub.Inbox, + newSub.AckInbox, + newSub.MaxInFlight, + newSub.AckWaitInSecs, + newSub.DurableName, + newSub.IsDurable, + newSub.IsClosed) +} + +func parseSubReqDel(msgType string, msg *[]byte) { + delSub := &spb.SubStateDelete{} + err := proto.Unmarshal(*msg, delSub) + if err != nil { + fmt.Printf("unmarshalling error: %s\n", err) + return + } + fmt.Printf("ID: %d Tyep: %s\n", delSub.ID, msgType) +} + +func parseSubReqAck(msgType string, msg *[]byte) { + updateSub := &spb.SubStateUpdate{} + err := proto.Unmarshal(*msg, updateSub) + if err != nil { + fmt.Printf("unmarshalling error: %s\n", err) + return + } + fmt.Printf("ID: %d SeqNo: %d %s\n", updateSub.ID, updateSub.Seqno, msgType) +} + +func parseAddClient(msgType string, msg *[]byte) { + addClient := &spb.ClientInfo{} + err := proto.Unmarshal(*msg, addClient) + if err != nil { + fmt.Printf("unmarshalling error: %s\n", err) + return + } + fmt.Printf("CID: %s Inbox: %s ConnId: %d Protocol: %d PingInterval: %d PingMaxTimeout: %d %s\n", addClient.ID, addClient.HbInbox, addClient.ConnID, addClient.Protocol, addClient.PingInterval, addClient.PingMaxOut, msgType) +} + +func parseDelClient(msgType string, msg *[]byte) { + delClient := &spb.ClientInfo{} + err := proto.Unmarshal(*msg, delClient) + if err != nil { + fmt.Printf("unmarshalling error: %s\n", err) + return + } + fmt.Printf("CID: %s %s\n", delClient.ID, msgType) + return +} + +func parseClients(clientsPath string) error { + var err error + + clientsFile, err := os.Open(clientsPath) + if err != nil { + return err + } + defer clientsFile.Close() + + if checkVersion(clientsFile) != nil { + return errors.New("invalid sub file") + } + + msgSizeAndType := []byte{} + msg := []byte{} + + for { + msgSizeAndType, err = readNextBytes(clientsFile, 4) + if err != nil { + return err + } + + msgSize := getMsgSize(msgSizeAndType) + _, err = readNextBytes(clientsFile, 4) + if err != nil { + return err + } + + msgType := recordType(msgSizeAndType[3]) + msg, err = readNextBytes(clientsFile, msgSize) + if err != nil { + return err + } + + switch msgType { + case addClient: + parseAddClient("addClient", &msg) + case delClient: + parseDelClient("delClient", &msg) + default: + return errors.New("unknown message type") + } + } + + return err +} + +func parseSubscriptions(subPath string) error { + var err error + + subFile, err := os.Open(subPath) + if err != nil { + return err + } + defer subFile.Close() + + if checkVersion(subFile) != nil { + return errors.New("invalid sub file") + } + + msgSizeAndType := []byte{} + msg := []byte{} + + for true { + + msgSizeAndType, err = readNextBytes(subFile, 4) + if err != nil { + return err + } + + msgSize := getMsgSize(msgSizeAndType) + _, err = readNextBytes(subFile, 4) + if err != nil { + return err + } + + msgType := recordType(msgSizeAndType[3]) + msg, err = readNextBytes(subFile, msgSize) + if err != nil { + return err + } + + switch msgType { + case subRecNew: + parseSubRecNew("subRecNew", &msg) + case subRecUpdate: + parseSubRecNew("subRecUpdate", &msg) + case subRecDel: + parseSubReqDel("subRecDel", &msg) + case subRecAck: + parseSubReqAck("subRecAck", &msg) + case subRecMsg: + parseSubReqAck("subRecMsg", &msg) + default: + return errors.New("unknown message type") + } + } + + return err +} + +var usageStr = ` +Usage: %s [options] + +Options: + -d, --msgsDat msgs.dat file + -i, --msgsIdx msgs.idx file + -s, --subsDat subs.dat file + -c, --clientsDat clients.dat file +` + +func usage() { + fmt.Printf("%s\n", fmt.Sprintf(usageStr, os.Args[0])) + os.Exit(0) +} + +func main() { + var err error + + msgDatFile := "" + flag.StringVar(&msgDatFile, "d", "", "msgs.dat file") + flag.StringVar(&msgDatFile, "msgsDat", "", "msgs.dat file") + + msgIdxFile := "" + flag.StringVar(&msgIdxFile, "i", "", "msgs.idx file") + flag.StringVar(&msgIdxFile, "msgsIdx", "", "msgs.idx file") + + subDatFile := "" + flag.StringVar(&subDatFile, "s", "", "subs.dat file") + flag.StringVar(&subDatFile, "subsDat", "", "subs.dat file") + + clientsDatFile := "" + flag.StringVar(&clientsDatFile, "c", "", "clients.dat file") + flag.StringVar(&clientsDatFile, "clientsDat", "", "clients.dat file") + + flag.Usage = usage + flag.Parse() + + if flag.NFlag() < 1 { + usage() + os.Exit(0) + } + + if subDatFile != "" { + err = parseSubscriptions(subDatFile) + } else if clientsDatFile != "" { + err = parseClients(clientsDatFile) + } else { + if msgDatFile == "" { + fmt.Println("error: missing msgs.dat") + os.Exit(1) + } + if msgIdxFile == "" { + fmt.Println("error: missing msgs.idx") + os.Exit(1) + } + err = parseData(msgIdxFile, msgDatFile) + } + + if err != nil { + fmt.Printf("error: %s\n", err) + os.Exit(1) + } +}