Skip to content

Commit

Permalink
Refactor message filtering.
Browse files Browse the repository at this point in the history
  • Loading branch information
bemasher committed Jul 11, 2015
1 parent 767fea7 commit 548dac1
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 19 deletions.
34 changes: 34 additions & 0 deletions flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package main

import (
"bytes"
"encoding/gob"
"encoding/json"
"encoding/xml"
Expand All @@ -28,6 +29,7 @@ import (
"strings"

"github.com/bemasher/rtlamr/csv"
"github.com/bemasher/rtlamr/parse"
)

var logFilename = flag.String("logfile", "/dev/stdout", "log statement dump file")
Expand All @@ -45,6 +47,7 @@ var decimation = flag.Int("decimation", 1, "integer decimation factor, keep ever
var timeLimit = flag.Duration("duration", 0, "time to run for, 0 for infinite, ex. 1h5m10s")
var meterID UintMap
var meterType UintMap

var unique = flag.Bool("unique", false, "suppress duplicate messages from each meter")

var encoder Encoder
Expand Down Expand Up @@ -167,3 +170,34 @@ func (m UintMap) Set(value string) error {

return nil
}

type MeterIDFilter UintMap

func (m MeterIDFilter) Filter(msg parse.Message) bool {
return m[uint(msg.MeterID())]
}

type MeterTypeFilter UintMap

func (m MeterTypeFilter) Filter(msg parse.Message) bool {
return m[uint(msg.MeterType())]
}

type UniqueFilter map[uint][]byte

func NewUniqueFilter() UniqueFilter {
return make(UniqueFilter)
}

func (uf UniqueFilter) Filter(msg parse.Message) bool {
checksum := msg.Checksum()
mid := uint(msg.MeterID())

if val, ok := uf[mid]; ok && bytes.Compare(val, checksum) == 0 {
return false
}

uf[mid] = make([]byte, len(checksum))
copy(uf[mid], checksum)
return true
}
24 changes: 24 additions & 0 deletions parse/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,27 @@ func (msg LogMessage) Record() (r []string) {
r = append(r, msg.Message.Record()...)
return r
}

type FilterChain []Filter

func (fc *FilterChain) Add(filter Filter) {
*fc = append(*fc, filter)
}

func (fc FilterChain) Match(msg Message) bool {
if len(fc) == 0 {
return true
}

for _, filter := range fc {
if !filter.Filter(msg) {
return false
}
}

return true
}

type Filter interface {
Filter(Message) bool
}
31 changes: 12 additions & 19 deletions recv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package main

import (
"bytes"
"encoding/xml"
"flag"
"fmt"
Expand All @@ -40,7 +39,8 @@ var rcvr Receiver

type Receiver struct {
rtltcp.SDR
p parse.Parser
p parse.Parser
fc parse.FilterChain
}

func (rcvr *Receiver) NewReceiver() {
Expand Down Expand Up @@ -82,8 +82,17 @@ func (rcvr *Receiver) NewReceiver() {
sampleRateFlagSet = true
case "gainbyindex", "tunergainmode", "tunergain", "agcmode":
gainFlagSet = true
case "unique":
rcvr.fc.Add(NewUniqueFilter())
case "filterid":
rcvr.fc.Add(MeterIDFilter(meterID))
case "filtertype":
rcvr.fc.Add(MeterTypeFilter(meterType))
default:
fmt.Println(f.Name)
}
})
fmt.Printf("%+v\n", rcvr.fc)

// Set some parameters for listening.
if centerfreqFlagSet {
Expand Down Expand Up @@ -127,7 +136,6 @@ func (rcvr *Receiver) Run() {
}()

block := make([]byte, rcvr.p.Cfg().BlockSize2)
checksumHistory := make(map[uint][]byte)

start := time.Now()
for {
Expand All @@ -149,24 +157,9 @@ func (rcvr *Receiver) Run() {
indices := rcvr.p.Dec().Decode(block)

for _, pkt := range rcvr.p.Parse(indices) {
if len(meterID) > 0 && !meterID[uint(pkt.MeterID())] {
continue
}

if len(meterType) > 0 && !meterType[uint(pkt.MeterType())] {
if !rcvr.fc.Match(pkt) {
continue
}
if *unique {
checksum := pkt.Checksum()
mid := uint(pkt.MeterID())

if val, ok := checksumHistory[mid]; ok && bytes.Compare(val, checksum) == 0 {
continue
}

checksumHistory[mid] = make([]byte, len(checksum))
copy(checksumHistory[mid], checksum)
}

var msg parse.LogMessage
msg.Time = time.Now()
Expand Down

0 comments on commit 548dac1

Please sign in to comment.