Skip to content

Commit

Permalink
Add capability for executing individual queries per device (#112)
Browse files Browse the repository at this point in the history
Adds QueryOp method and exposes RS485 and SunSpec device types
  • Loading branch information
andig authored Apr 20, 2020
1 parent 612973d commit 6539a8f
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 53 deletions.
77 changes: 43 additions & 34 deletions meters/rs485/rs485.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,50 +14,58 @@ const (
ReadInputReg = 4
)

type rs485 struct {
// RS485 implements meters.Device
type RS485 struct {
producer Producer
ops chan Operation
inflight Operation
}

// NewDevice creates a device who's type must exist in the producer registry
func NewDevice(typeid string) (meters.Device, error) {
func NewDevice(typeid string) (*RS485, error) {
if factory, ok := Producers[typeid]; ok {
device := &rs485{
device := &RS485{
producer: factory(),
ops: make(chan Operation),
}

// ringbuffer of device operations
go func(d *rs485) {
for {
for _, op := range d.producer.Produce() {
d.ops <- op
}
}
}(device)

return device, nil
}

return nil, fmt.Errorf("unknown meter type %s", typeid)
}

// Initialize prepares the device for usage. Any setup or initilization should be done here.
func (d *rs485) Initialize(client modbus.Client) error {
// Initialize prepares the device for usage. Any setup or initialization should be done here.
func (d *RS485) Initialize(client modbus.Client) error {
return nil
}

// Descriptor returns the device descriptor. Since this method doe not have bus access the descriptor should be preared
// during initilization.
func (d *rs485) Descriptor() meters.DeviceDescriptor {
// Producer returns the underlying producer. The producer can be used to understand which operations the device supports.
func (d *RS485) Producer() Producer {
return d.producer
}

// Descriptor returns the device descriptor. Since this method does not have bus access the descriptor should be
// prepared during initialization.
func (d *RS485) Descriptor() meters.DeviceDescriptor {
return meters.DeviceDescriptor{
Manufacturer: d.producer.Type(),
Model: d.producer.Description(),
}
}

func (d *rs485) query(client modbus.Client, op Operation) (res meters.MeasurementResult, err error) {
// Probe is called by the handler after preparing the bus by setting the device id
func (d *RS485) Probe(client modbus.Client) (res meters.MeasurementResult, err error) {
op := d.producer.Probe()

res, err = d.QueryOp(client, op)
if err != nil {
return res, err
}

return res, nil
}

// QueryOp executes a single query operation on the bus
func (d *RS485) QueryOp(client modbus.Client, op Operation) (res meters.MeasurementResult, err error) {
var bytes []byte

if op.ReadLen == 0 {
Expand Down Expand Up @@ -90,22 +98,23 @@ func (d *rs485) query(client modbus.Client, op Operation) (res meters.Measuremen
return res, nil
}

// Probe is called by the handler after preparing the bus by setting the device id
func (d *rs485) Probe(client modbus.Client) (res meters.MeasurementResult, err error) {
op := d.producer.Probe()

res, err = d.query(client, op)
if err != nil {
return res, err
}

return res, nil
}

// Query is called by the handler after preparing the bus by setting the device id and waiting for rate limit
func (d *rs485) Query(client modbus.Client) (res []meters.MeasurementResult, err error) {
func (d *RS485) Query(client modbus.Client) (res []meters.MeasurementResult, err error) {
res = make([]meters.MeasurementResult, 0)

if d.ops == nil {
d.ops = make(chan Operation)

// ringbuffer of device operations
go func(d *RS485) {
for {
for _, op := range d.producer.Produce() {
d.ops <- op
}
}
}(d)
}

// Query loop will try to read all operations in a single run. It will
// always start with the current inflight operation. If an error is encountered,
// the partial results are returned. The loop is terminated after as many
Expand All @@ -118,7 +127,7 @@ func (d *rs485) Query(client modbus.Client) (res []meters.MeasurementResult, err
d.inflight = <-d.ops
}

m, err := d.query(client, d.inflight)
m, err := d.QueryOp(client, d.inflight)
if err != nil {
return res, err
}
Expand Down
72 changes: 53 additions & 19 deletions meters/sunspec/sunspec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/volkszaehler/mbmd/meters"
)

type sunSpec struct {
type SunSpec struct {
models []sunspec.Model
descriptor meters.DeviceDescriptor
}
Expand All @@ -37,15 +37,15 @@ func (e partialError) Cause() error {
func (e partialError) PartiallyInitialized() {}

// NewDevice creates a Sunspec device
func NewDevice(meterType string) meters.Device {
return &sunSpec{
func NewDevice(meterType string) *SunSpec {
return &SunSpec{
descriptor: meters.DeviceDescriptor{
Manufacturer: meterType,
},
}
}

func (d *sunSpec) Initialize(client modbus.Client) error {
func (d *SunSpec) Initialize(client modbus.Client) error {
in, err := sunspecbus.Open(client)
if err != nil && in == nil {
return err
Expand Down Expand Up @@ -79,7 +79,7 @@ func (d *sunSpec) Initialize(client modbus.Client) error {
return err
}

func (d *sunSpec) readCommonBlock(device sunspec.Device) error {
func (d *SunSpec) readCommonBlock(device sunspec.Device) error {
// TODO catch panic
commonModel := device.MustModel(sunspec.ModelId(1))
// TODO catch panic
Expand All @@ -100,7 +100,7 @@ func (d *sunSpec) readCommonBlock(device sunspec.Device) error {
}

// collect and sort supported models except for common
func (d *sunSpec) collectModels(device sunspec.Device) error {
func (d *SunSpec) collectModels(device sunspec.Device) error {
d.models = device.Collect(sunspec.OneOfSeveralModelIds(d.relevantModelIds()))
if len(d.models) == 0 {
return errors.New("sunspec: could not find supported model")
Expand All @@ -110,7 +110,7 @@ func (d *sunSpec) collectModels(device sunspec.Device) error {
return nil
}

func (d *sunSpec) relevantModelIds() []sunspec.ModelId {
func (d *SunSpec) relevantModelIds() []sunspec.ModelId {
modelIds := make([]sunspec.ModelId, 0, len(modelMap))
for k := range modelMap {
modelIds = append(modelIds, sunspec.ModelId(k))
Expand All @@ -120,7 +120,7 @@ func (d *sunSpec) relevantModelIds() []sunspec.ModelId {
}

// remove model 101 if model 103 found
func (d *sunSpec) sanitizeModels() {
func (d *SunSpec) sanitizeModels() {
m101 := -1
for i, m := range d.models {
if m.Id() == sunspec.ModelId(101) {
Expand All @@ -133,12 +133,12 @@ func (d *sunSpec) sanitizeModels() {
}
}

func (d *sunSpec) Descriptor() meters.DeviceDescriptor {
func (d *SunSpec) Descriptor() meters.DeviceDescriptor {
return d.descriptor
}

func (d *sunSpec) Probe(client modbus.Client) (res meters.MeasurementResult, err error) {
if d.notInitilized() {
func (d *SunSpec) Probe(client modbus.Client) (res meters.MeasurementResult, err error) {
if d.notInitialized() {
return res, errors.New("sunspec: not initialized")
}

Expand Down Expand Up @@ -172,12 +172,11 @@ func (d *sunSpec) Probe(client modbus.Client) (res meters.MeasurementResult, err
return res, fmt.Errorf("sunspec: could not find model for probe snip")
}

func (d *sunSpec) notInitilized() bool {
func (d *SunSpec) notInitialized() bool {
return len(d.models) == 0
}

func (d *sunSpec) convertPoint(b sunspec.Block, blockID int, pointID string, m meters.Measurement) (meters.MeasurementResult, error) {
p := b.MustPoint(pointID)
func (d *SunSpec) convertPoint(b sunspec.Block, p sunspec.Point, m meters.Measurement) (meters.MeasurementResult, error) {
v := p.ScaledValue()

if math.IsNaN(v) {
Expand All @@ -198,24 +197,59 @@ func (d *sunSpec) convertPoint(b sunspec.Block, blockID int, pointID string, m m
return mr, nil
}

func (d *sunSpec) Query(client modbus.Client) (res []meters.MeasurementResult, err error) {
if d.notInitilized() {
// QueryOp executes a single query operation on the bus
func (d *SunSpec) QueryOp(client modbus.Client, measurement meters.Measurement) (res meters.MeasurementResult, err error) {
if d.notInitialized() {
return res, errors.New("sunspec: not initialized")
}

for _, model := range d.models {
supportedID := model.Id()
for modelID, blockMap := range modelMap {
if modelID != supportedID {
continue
}

for blockID, pointMap := range blockMap {
for pointID, m := range pointMap {
if m == measurement {
block := model.MustBlock(blockID)
if err = block.Read(); err != nil {
return
}

point := block.MustPoint(pointID)
return d.convertPoint(block, point, m)
}
}
}
}
}

return meters.MeasurementResult{}, fmt.Errorf("sunspec: %s not found", measurement)
}

// Query is called by the handler after preparing the bus by setting the device id and waiting for rate limit
func (d *SunSpec) Query(client modbus.Client) (res []meters.MeasurementResult, err error) {
if d.notInitialized() {
return res, errors.New("sunspec: not initialized")
}

for _, model := range d.models {
blockID := 0

model.Do(func(b sunspec.Block) {
model.Do(func(block sunspec.Block) {
defer func() { blockID++ }()

if err = b.Read(); err != nil {
if err = block.Read(); err != nil {
return
}

if bps, ok := modelMap[model.Id()][blockID]; ok {
for pointID, m := range bps {
if mr, err := d.convertPoint(b, blockID, pointID, m); err == nil {
point := block.MustPoint(pointID)

if mr, err := d.convertPoint(block, point, m); err == nil {
res = append(res, mr)
}
}
Expand Down

0 comments on commit 6539a8f

Please sign in to comment.