diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 9c2a2d71ddb83..6b800ee1743b9 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -68,6 +68,7 @@ following works: - github.com/google/go-github [BSD 3-Clause "New" or "Revised" License](https://github.com/google/go-github/blob/master/LICENSE) - github.com/google/go-querystring [BSD 3-Clause "New" or "Revised" License](https://github.com/google/go-querystring/blob/master/LICENSE) - github.com/googleapis/gax-go [BSD 3-Clause "New" or "Revised" License](https://github.com/googleapis/gax-go/blob/master/LICENSE) +- github.com/gopcua/opcua [MIT License](https://github.com/gopcua/opcua/blob/master/LICENSE) - github.com/gorilla/mux [BSD 3-Clause "New" or "Revised" License](https://github.com/gorilla/mux/blob/master/LICENSE) - github.com/hailocab/go-hostpool [MIT License](https://github.com/hailocab/go-hostpool/blob/master/LICENSE) - github.com/harlow/kinesis-consumer [MIT License](https://github.com/harlow/kinesis-consumer/blob/master/MIT-LICENSE) diff --git a/go.mod b/go.mod index 472406b546563..b91c39b98c650 100644 --- a/go.mod +++ b/go.mod @@ -62,6 +62,7 @@ require ( github.com/google/go-cmp v0.4.0 github.com/google/go-github v17.0.0+incompatible github.com/google/go-querystring v1.0.0 // indirect + github.com/gopcua/opcua v0.1.12 github.com/gorilla/mux v1.6.2 github.com/gotestyourself/gotestyourself v2.2.0+incompatible // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect diff --git a/go.sum b/go.sum index bf7651ad2c33d..ed84b5f2556b4 100644 --- a/go.sum +++ b/go.sum @@ -292,6 +292,8 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+ github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= +github.com/gopcua/opcua v0.1.12 h1:TenluCr1CPB1NHjb9tX6yprc0eUmthznXxSc5mnJPBo= +github.com/gopcua/opcua v0.1.12/go.mod h1:a6QH4F9XeODklCmWuvaOdL8v9H0d73CEKUHWVZLQyE8= github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2 h1:Pgr17XVTNXAk3q/r4CpKzC5xBM/qW1uVLV+IhRZpIIk= @@ -597,6 +599,8 @@ golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 h1:ULYEB3JvPRE/IfO+9uO7vK golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72 h1:+ELyKg6m8UBf0nPFSqD0mi7zUfwPyXo23HNjMnXPz7w= golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -699,6 +703,7 @@ golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456 h1:ng0gs1AKnRRuEMZoTLLlbOd+C17zUDepwGQBb/n+JVg= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -714,6 +719,7 @@ golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4 h1:sfkvUWPNGwSV+8/fNqctR5lS2AqCSqYwXdrjCxp/dXo= golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6 h1:DvY3Zkh7KabQE/kfzMvYvKirSiguP9Q/veMtkYyf0o8= golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 4b38b88f7a53e..ba058be1c8132 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -116,6 +116,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/nstat" _ "github.com/influxdata/telegraf/plugins/inputs/ntpq" _ "github.com/influxdata/telegraf/plugins/inputs/nvidia_smi" + _ "github.com/influxdata/telegraf/plugins/inputs/opcua" _ "github.com/influxdata/telegraf/plugins/inputs/openldap" _ "github.com/influxdata/telegraf/plugins/inputs/openntpd" _ "github.com/influxdata/telegraf/plugins/inputs/opensmtpd" diff --git a/plugins/inputs/opcua/README.md b/plugins/inputs/opcua/README.md new file mode 100644 index 0000000000000..0a8c3b4316af8 --- /dev/null +++ b/plugins/inputs/opcua/README.md @@ -0,0 +1,76 @@ +# Telegraf Input Plugin: opcua_client + +The opcua_client plugin retrieves data from OPCUA slave devices + +### Configuration: + +```toml + +# ## Connection Configuration +# ## +# ## The plugin supports connections to PLCs via OPCUA +# ## +# ## Device name +name = "opcua_rocks" +# +# # OPC UA Endpoint URL +endpoint = "opc.tcp://opcua.rocks:4840" +# +# ## Read Timeout +# ## add an arbitrary timeout (seconds) to demonstrate how to stop a subscription +# ## with a context. +timeout = 30 +# +# # Time Inteval, default = 10s +time_interval = "5s" +# +# # Security policy: None, Basic128Rsa15, Basic256, Basic256Sha256. Default: auto +security_policy = "None" +# +# # Security mode: None, Sign, SignAndEncrypt. Default: auto +security_mode = "None" +# +# # Path to cert.pem. Required for security mode/policy != None. If cert path is not supplied, self-signed cert and key will be generated. +# # certificate = "/etc/telegraf/cert.pem" +# +# # Path to private key.pem. Required for security mode/policy != None. If key path is not supplied, self-signed cert and key will be generated. +# # private_key = "/etc/telegraf/key.pem" +# +# # To authenticate using a specific ID, select chosen method from 'Certificate' or 'UserName'. Else use 'Anonymous.' Defaults to 'Anonymous' if not provided. +# # auth_method = "Anonymous" +# +# # Required for auth_method = "UserName" +# # username = "myusername" +# +# # Required for auth_method = "UserName" +# # password = "mypassword" +# +# ## Measurements +# ## node id to subscribe to +# ## name - the variable name +# ## namespace - integer value 0 thru 3 +# ## identifier_type - s=string, i=numeric, g=guid, b=opaque +# ## identifier - tag as shown in opcua browser +# ## data_type - boolean, byte, short, int, uint, uint16, int16, uint32, int32, float, double, string, datetime, number +# ## Template - {name="", namespace="", identifier_type="", identifier="", data_type="", description=""}, +nodes = [ + {name="ProductName", namespace="0", identifier_type="i", identifier="2261", data_type="string", description="open62541 OPC UA Server"}, + {name="ProductUri", namespace="0", identifier_type="i", identifier="2262", data_type="string", description="http://open62541.org"}, + {name="ManufacturerName", namespace="0", identifier_type="i", identifier="2263", data_type="string", description="open62541"}, +] + +## Guide: +## An OPC UA node ID may resemble: "n=3,s=Temperature" +## In this example, n=3 is indicating the namespace is '3'. +## s=Temperature is indicting that the identifier type is a 'string' and the indentifier value is 'Temperature' +## This temperature node may have a current value of 79.0, which would possibly make the value a 'float'. +## To gather data from this node you would need to enter the following line into 'nodes' property above: +## {name="SomeLabel", namespace="3", identifier_type="s", identifier="Temperature", data_type="float", description="Some description."}, + +``` +### Example Output: + +``` +opcua,host=3c70aee0901e,name=Random,type=double Random=0.018158170305814902 1597820490000000000 + +``` diff --git a/plugins/inputs/opcua/opcua_client.go b/plugins/inputs/opcua/opcua_client.go new file mode 100644 index 0000000000000..a166111d71b71 --- /dev/null +++ b/plugins/inputs/opcua/opcua_client.go @@ -0,0 +1,435 @@ +package opcua_client + +import ( + "context" + "fmt" + "log" + "net/url" + "strings" + "time" + + "github.com/gopcua/opcua" + "github.com/gopcua/opcua/ua" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +// OpcUA type +type OpcUA struct { + Name string `toml:"name"` + Endpoint string `toml:"endpoint"` + SecurityPolicy string `toml:"security_policy"` + SecurityMode string `toml:"security_mode"` + Certificate string `toml:"certificate"` + PrivateKey string `toml:"private_key"` + Username string `toml:"username"` + Password string `toml:"password"` + AuthMethod string `toml:"auth_method"` + Interval string `toml:"time_interval"` + TimeOut int `toml:"timeout"` + NodeList []OPCTag `toml:"nodes"` + Nodes []string + NodeData []OPCData + NodeIDs []*ua.NodeID + NodeIDerror []error + state ConnectionState + + // status + ReadSuccess int + ReadError int + NumberOfTags int + + // internal values + client *opcua.Client + req *ua.ReadRequest + ctx context.Context + opts []opcua.Option +} + +// OPCTag type +type OPCTag struct { + Name string `toml:"name"` + Namespace string `toml:"namespace"` + IdentifierType string `toml:"identifier_type"` + Identifier string `toml:"identifier"` + DataType string `toml:"data_type"` + Description string `toml:"description"` +} + +// OPCData type +type OPCData struct { + TagName string + Value interface{} + Quality ua.StatusCode + TimeStamp string + Time string + DataType ua.TypeID +} + +// ConnectionState used for constants +type ConnectionState int + +const ( + //Disconnected constant state 0 + Disconnected ConnectionState = iota + //Connecting constant state 1 + Connecting + //Connected constant state 2 + Connected +) + +const description = `Retrieve data from OPCUA devices` +const sampleConfig = ` +# ## Connection Configuration +# ## +# ## The plugin supports connections to PLCs via OPCUA +# ## +# ## Device name +name = "opcua_rocks" +# +# # OPC UA Endpoint URL +endpoint = "opc.tcp://opcua.rocks:4840" +# +# ## Read Timeout +# ## add an arbitrary timeout (seconds) to demonstrate how to stop a subscription +# ## with a context. +timeout = 30 +# +# # Time Inteval, default = 10s +time_interval = "5s" +# +# # Security policy: None, Basic128Rsa15, Basic256, Basic256Sha256. Default: auto +security_policy = "None" +# +# # Security mode: None, Sign, SignAndEncrypt. Default: auto +security_mode = "None" +# +# # Path to cert.pem. Required for security mode/policy != None. If cert path is not supplied, self-signed cert and key will be generated. +# # certificate = "/etc/telegraf/cert.pem" +# +# # Path to private key.pem. Required for security mode/policy != None. If key path is not supplied, self-signed cert and key will be generated. +# # private_key = "/etc/telegraf/key.pem" +# +# # To authenticate using a specific ID, select chosen method from 'Certificate' or 'UserName'. Else use 'Anonymous.' Defaults to 'Anonymous' if not provided. +# # auth_method = "Anonymous" +# +# # Required for auth_method = "UserName" +# # username = "myusername" +# +# # Required for auth_method = "UserName" +# # password = "mypassword" +# +# ## Measurements +# ## node id to subscribe to +# ## name - the variable name +# ## namespace - integer value 0 thru 3 +# ## identifier_type - s=string, i=numeric, g=guid, b=opaque +# ## identifier - tag as shown in opcua browser +# ## data_type - boolean, byte, short, int, uint, uint16, int16, uint32, int32, float, double, string, datetime, number +# ## Template - {name="", namespace="", identifier_type="", identifier="", data_type="", description=""}, +nodes = [ + {name="ProductName", namespace="0", identifier_type="i", identifier="2261", data_type="string", description="open62541 OPC UA Server"}, + {name="ProductUri", namespace="0", identifier_type="i", identifier="2262", data_type="string", description="http://open62541.org"}, + {name="ManufacturerName", namespace="0", identifier_type="i", identifier="2263", data_type="string", description="open62541"}, +] + +## Guide: +## An OPC UA node ID may resemble: "n=3,s=Temperature" +## In this example, n=3 is indicating the namespace is '3'. +## s=Temperature is indicting that the identifier type is a 'string' and the indentifier value is 'Temperature' +## This temperature node may have a current value of 79.0, which would possibly make the value a 'float'. +## To gather data from this node you would need to enter the following line into 'nodes' property above: +## {name="SomeLabel", namespace="3", identifier_type="s", identifier="Temperature", data_type="float", description="Some description."}, + +` + +// Description will appear directly above the plugin definition in the config file +func (o *OpcUA) Description() string { + return description +} + +// SampleConfig will populate the sample configuration portion of the plugin's configuration +func (o *OpcUA) SampleConfig() string { + return sampleConfig +} + +// Init will initialize all tags +func (o *OpcUA) Init() error { + o.state = Disconnected + + o.ctx = context.Background() + + err := o.validateEndpoint() + if err != nil { + return err + } + + err = o.InitNodes() + if err != nil { + return err + } + o.NumberOfTags = len(o.NodeList) + + o.setupOptions() + + return nil + +} + +func (o *OpcUA) validateEndpoint() error { + //check device name + if o.Name == "" { + return fmt.Errorf("device name is empty") + } + //check device name + if o.Endpoint == "" { + return fmt.Errorf("device name is empty") + } + + _, err := url.Parse(o.Endpoint) + if err != nil { + return fmt.Errorf("endpoint url is invalid") + } + + if o.Interval == "" { + o.Interval = opcua.DefaultSubscriptionInterval.String() + } + + _, err = time.ParseDuration(o.Interval) + if err != nil { + return fmt.Errorf("fatal error with time interval") + } + + //search security policy type + switch o.SecurityPolicy { + case "None", "Basic128Rsa15", "Basic256", "Basic256Sha256", "auto": + break + default: + return fmt.Errorf("invalid security type '%s' in '%s'", o.SecurityPolicy, o.Name) + } + //search security mode type + switch o.SecurityMode { + case "None", "Sign", "SignAndEncrypt", "auto": + break + default: + return fmt.Errorf("invalid security type '%s' in '%s'", o.SecurityMode, o.Name) + } + return nil +} + +//InitNodes Method on OpcUA +func (o *OpcUA) InitNodes() error { + if len(o.NodeList) == 0 { + return nil + } + + err := o.validateOPCTags() + if err != nil { + return err + } + + return nil +} + +func (o *OpcUA) validateOPCTags() error { + nameEncountered := map[string]bool{} + for i, item := range o.NodeList { + //check empty name + if item.Name == "" { + return fmt.Errorf("empty name in '%s'", item.Name) + } + //search name duplicate + if nameEncountered[item.Name] { + return fmt.Errorf("name '%s' is duplicated in '%s'", item.Name, item.Name) + } else { + nameEncountered[item.Name] = true + } + //search identifier type + switch item.IdentifierType { + case "s", "i", "g", "b": + break + default: + return fmt.Errorf("invalid identifier type '%s' in '%s'", item.IdentifierType, item.Name) + } + // search data type + switch item.DataType { + case "boolean", "byte", "short", "int", "uint", "uint16", "int16", "uint32", "int32", "float", "double", "string", "datetime", "number": + break + default: + return fmt.Errorf("invalid data type '%s' in '%s'", item.DataType, item.Name) + } + + // build nodeid + o.Nodes = append(o.Nodes, BuildNodeID(item)) + + //parse NodeIds and NodeIds errors + nid, niderr := ua.ParseNodeID(o.Nodes[i]) + // build NodeIds and Errors + o.NodeIDs = append(o.NodeIDs, nid) + o.NodeIDerror = append(o.NodeIDerror, niderr) + // Grow NodeData for later input + o.NodeData = append(o.NodeData, OPCData{}) + } + return nil +} + +// BuildNodeID build node ID from OPC tag +func BuildNodeID(tag OPCTag) string { + return "ns=" + tag.Namespace + ";" + tag.IdentifierType + "=" + tag.Identifier +} + +// Connect to a OPCUA device +func Connect(o *OpcUA) error { + u, err := url.Parse(o.Endpoint) + if err != nil { + return err + } + + switch u.Scheme { + case "opc.tcp": + o.state = Connecting + + if o.client != nil { + o.client.CloseSession() + } + + o.client = opcua.NewClient(o.Endpoint, o.opts...) + if err := o.client.Connect(o.ctx); err != nil { + return fmt.Errorf("Error in Client Connection: %s", err) + } + + regResp, err := o.client.RegisterNodes(&ua.RegisterNodesRequest{ + NodesToRegister: o.NodeIDs, + }) + if err != nil { + return fmt.Errorf("RegisterNodes failed: %v", err) + } + + o.req = &ua.ReadRequest{ + MaxAge: 2000, + NodesToRead: readvalues(regResp.RegisteredNodeIDs), + TimestampsToReturn: ua.TimestampsToReturnBoth, + } + + err = o.getData() + if err != nil { + return fmt.Errorf("Get Data Failed: %v", err) + } + + default: + return fmt.Errorf("unsupported scheme %q in endpoint. Expected opc.tcp", u.Scheme) + } + return nil +} + +func (o *OpcUA) setupOptions() error { + + // Get a list of the endpoints for our target server + endpoints, err := opcua.GetEndpoints(o.Endpoint) + if err != nil { + log.Fatal(err) + } + + if o.Certificate == "" && o.PrivateKey == "" { + if o.SecurityPolicy != "None" || o.SecurityMode != "None" { + o.Certificate, o.PrivateKey = generateCert("urn:telegraf:gopcua:client", 2048, o.Certificate, o.PrivateKey, (365 * 24 * time.Hour)) + } + } + + o.opts = generateClientOpts(endpoints, o.Certificate, o.PrivateKey, o.SecurityPolicy, o.SecurityMode, o.AuthMethod, o.Username, o.Password) + + return nil +} + +func (o *OpcUA) getData() error { + resp, err := o.client.Read(o.req) + if err != nil { + o.ReadError++ + return fmt.Errorf("RegisterNodes Read failed: %v", err) + } + o.ReadSuccess++ + for i, d := range resp.Results { + if d.Status != ua.StatusOK { + return fmt.Errorf("Status not OK: %v", d.Status) + } + o.NodeData[i].TagName = o.NodeList[i].Name + if d.Value != nil { + o.NodeData[i].Value = d.Value.Value() + o.NodeData[i].DataType = d.Value.Type() + } + o.NodeData[i].Quality = d.Status + o.NodeData[i].TimeStamp = d.ServerTimestamp.String() + o.NodeData[i].Time = d.SourceTimestamp.String() + } + return nil +} + +func readvalues(ids []*ua.NodeID) []*ua.ReadValueID { + rvids := make([]*ua.ReadValueID, len(ids)) + for i, v := range ids { + rvids[i] = &ua.ReadValueID{NodeID: v} + } + return rvids +} + +func disconnect(o *OpcUA) error { + u, err := url.Parse(o.Endpoint) + if err != nil { + return err + } + + o.ReadError = 0 + o.ReadSuccess = 0 + + switch u.Scheme { + case "opc.tcp": + o.state = Disconnected + o.client.Close() + return nil + default: + return fmt.Errorf("invalid controller") + } +} + +// Gather defines what data the plugin will gather. +func (o *OpcUA) Gather(acc telegraf.Accumulator) error { + if o.state == Disconnected { + o.state = Connecting + err := Connect(o) + if err != nil { + o.state = Disconnected + return err + } + } + + o.state = Connected + + err := o.getData() + if err != nil && o.state == Connected { + o.state = Disconnected + disconnect(o) + return err + } + + for i, n := range o.NodeList { + fields := make(map[string]interface{}) + tags := map[string]string{ + "name": n.Name, + "id": BuildNodeID(n), + } + + fields[o.NodeData[i].TagName] = o.NodeData[i].Value + fields["Quality"] = strings.TrimSpace(fmt.Sprint(o.NodeData[i].Quality)) + acc.AddFields(o.Name, fields, tags) + } + return nil +} + +// Add this plugin to telegraf +func init() { + inputs.Add("opcua_client", func() telegraf.Input { + return &OpcUA{ + AuthMethod: "Anonymous", + } + }) +} diff --git a/plugins/inputs/opcua/opcua_client_test.go b/plugins/inputs/opcua/opcua_client_test.go new file mode 100644 index 0000000000000..d3f48e858efd6 --- /dev/null +++ b/plugins/inputs/opcua/opcua_client_test.go @@ -0,0 +1,67 @@ +package opcua_client + +import ( + "fmt" + "reflect" + "testing" +) + +type OPCTags struct { + Name string + Namespace string + IdentifierType string + Identifier string + DataType string + Want string +} + +func TestClient1(t *testing.T) { + var testopctags = []OPCTags{ + {"ProductName", "0", "i", "2261", "string", "open62541 OPC UA Server"}, + {"ProductUri", "0", "i", "2262", "string", "http://open62541.org"}, + {"ManufacturerName", "0", "i", "2263", "string", "open62541"}, + } + + var o OpcUA + var err error + + o.Name = "testing" + o.Endpoint = "opc.tcp://opcua.rocks:4840" + o.Interval = "10ms" + o.TimeOut = 30 + o.SecurityPolicy = "None" + o.SecurityMode = "None" + for _, tags := range testopctags { + o.NodeList = append(o.NodeList, MapOPCTag(tags)) + } + err = o.Init() + if err != nil { + t.Errorf("Initialize Error: %s", err) + } + err = Connect(&o) + if err != nil { + t.Logf("Connect Error: %s", err) + } + + for i, v := range o.NodeData { + if v.Value != nil { + types := reflect.TypeOf(v.Value) + value := reflect.ValueOf(v.Value) + compare := fmt.Sprintf("%v", value.Interface()) + if compare != testopctags[i].Want { + t.Errorf("Tag %s: Values %v for type %s does not match record", o.NodeList[i].Name, value.Interface(), types) + } + } else { + t.Errorf("Tag: %s has value: %v", o.NodeList[i].Name, v.Value) + } + } +} + +func MapOPCTag(tags OPCTags) (out OPCTag) { + out.Name = tags.Name + out.Namespace = tags.Namespace + out.IdentifierType = tags.IdentifierType + out.Identifier = tags.Identifier + out.DataType = tags.DataType + return out +} diff --git a/plugins/inputs/opcua/opcua_util.go b/plugins/inputs/opcua/opcua_util.go new file mode 100644 index 0000000000000..5a88ed44e0dc5 --- /dev/null +++ b/plugins/inputs/opcua/opcua_util.go @@ -0,0 +1,330 @@ +package opcua_client + +import ( + "crypto/ecdsa" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "fmt" + "io/ioutil" + "log" + "math/big" + "net" + "net/url" + "os" + "strings" + "time" + + "github.com/gopcua/opcua" + "github.com/gopcua/opcua/debug" + "github.com/gopcua/opcua/ua" + "github.com/pkg/errors" +) + +// SELF SIGNED CERT FUNCTIONS + +func newTempDir() (string, error) { + dir, err := ioutil.TempDir("", "ssc") + return dir, err +} + +func generateCert(host string, rsaBits int, certFile, keyFile string, dur time.Duration) (string, string) { + + dir, _ := newTempDir() + + if len(host) == 0 { + log.Fatalf("Missing required host parameter") + } + if rsaBits == 0 { + rsaBits = 2048 + } + if len(certFile) == 0 { + certFile = fmt.Sprintf("%s/cert.pem", dir) + } + if len(keyFile) == 0 { + keyFile = fmt.Sprintf("%s/key.pem", dir) + } + + priv, err := rsa.GenerateKey(rand.Reader, rsaBits) + if err != nil { + log.Fatalf("failed to generate private key: %s", err) + } + + notBefore := time.Now() + notAfter := notBefore.Add(dur) + + serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128) + serialNumber, err := rand.Int(rand.Reader, serialNumberLimit) + if err != nil { + log.Fatalf("failed to generate serial number: %s", err) + } + + template := x509.Certificate{ + SerialNumber: serialNumber, + Subject: pkix.Name{ + Organization: []string{"Telegraf OPC UA client"}, + }, + NotBefore: notBefore, + NotAfter: notAfter, + + KeyUsage: x509.KeyUsageContentCommitment | x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageDataEncipherment | x509.KeyUsageCertSign, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth}, + BasicConstraintsValid: true, + } + + hosts := strings.Split(host, ",") + for _, h := range hosts { + if ip := net.ParseIP(h); ip != nil { + template.IPAddresses = append(template.IPAddresses, ip) + } else { + template.DNSNames = append(template.DNSNames, h) + } + if uri, err := url.Parse(h); err == nil { + template.URIs = append(template.URIs, uri) + } + } + + derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, publicKey(priv), priv) + if err != nil { + log.Fatalf("Failed to create certificate: %s", err) + } + + certOut, err := os.Create(certFile) + if err != nil { + log.Fatalf("failed to open %s for writing: %s", certFile, err) + } + if err := pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}); err != nil { + log.Fatalf("failed to write data to %s: %s", certFile, err) + } + if err := certOut.Close(); err != nil { + log.Fatalf("error closing %s: %s", certFile, err) + } + + keyOut, err := os.OpenFile(keyFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + log.Printf("failed to open %s for writing: %s", keyFile, err) + return "", "" + } + if err := pem.Encode(keyOut, pemBlockForKey(priv)); err != nil { + log.Fatalf("failed to write data to %s: %s", keyFile, err) + } + if err := keyOut.Close(); err != nil { + log.Fatalf("error closing %s: %s", keyFile, err) + } + + return certFile, keyFile +} + +func publicKey(priv interface{}) interface{} { + switch k := priv.(type) { + case *rsa.PrivateKey: + return &k.PublicKey + case *ecdsa.PrivateKey: + return &k.PublicKey + default: + return nil + } +} + +func pemBlockForKey(priv interface{}) *pem.Block { + switch k := priv.(type) { + case *rsa.PrivateKey: + return &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(k)} + case *ecdsa.PrivateKey: + b, err := x509.MarshalECPrivateKey(k) + if err != nil { + fmt.Fprintf(os.Stderr, "Unable to marshal ECDSA private key: %v", err) + os.Exit(2) + } + return &pem.Block{Type: "EC PRIVATE KEY", Bytes: b} + default: + return nil + } +} + +// OPT FUNCTIONS + +func generateClientOpts(endpoints []*ua.EndpointDescription, certFile, keyFile, policy, mode, auth, username, password string) []opcua.Option { + opts := []opcua.Option{} + appuri := "urn:telegraf:gopcua:client" + appname := "Telegraf" + + // ApplicationURI is automatically read from the cert so is not required if a cert if provided + opts = append(opts, opcua.ApplicationURI(appuri)) + opts = append(opts, opcua.ApplicationName(appname)) + + if certFile == "" && keyFile == "" { + if policy != "None" || mode != "None" { + certFile, keyFile = generateCert(appuri, 2048, certFile, keyFile, (365 * 24 * time.Hour)) + } + } + + var cert []byte + if certFile != "" && keyFile != "" { + debug.Printf("Loading cert/key from %s/%s", certFile, keyFile) + c, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + log.Printf("Failed to load certificate: %s", err) + } else { + pk, ok := c.PrivateKey.(*rsa.PrivateKey) + if !ok { + log.Fatalf("Invalid private key") + } + cert = c.Certificate[0] + opts = append(opts, opcua.PrivateKey(pk), opcua.Certificate(cert)) + } + } + + var secPolicy string + switch { + case policy == "auto": + // set it later + case strings.HasPrefix(policy, ua.SecurityPolicyURIPrefix): + secPolicy = policy + policy = "" + case policy == "None" || policy == "Basic128Rsa15" || policy == "Basic256" || policy == "Basic256Sha256" || policy == "Aes128_Sha256_RsaOaep" || policy == "Aes256_Sha256_RsaPss": + secPolicy = ua.SecurityPolicyURIPrefix + policy + policy = "" + default: + log.Fatalf("Invalid security policy: %s", policy) + } + + // Select the most appropriate authentication mode from server capabilities and user input + authMode, authOption := generateAuth(auth, cert, username, password) + opts = append(opts, authOption) + + var secMode ua.MessageSecurityMode + switch strings.ToLower(mode) { + case "auto": + case "none": + secMode = ua.MessageSecurityModeNone + mode = "" + case "sign": + secMode = ua.MessageSecurityModeSign + mode = "" + case "signandencrypt": + secMode = ua.MessageSecurityModeSignAndEncrypt + mode = "" + default: + log.Fatalf("Invalid security mode: %s", mode) + } + + // Allow input of only one of sec-mode,sec-policy when choosing 'None' + if secMode == ua.MessageSecurityModeNone || secPolicy == ua.SecurityPolicyURINone { + secMode = ua.MessageSecurityModeNone + secPolicy = ua.SecurityPolicyURINone + } + + // Find the best endpoint based on our input and server recommendation (highest SecurityMode+SecurityLevel) + var serverEndpoint *ua.EndpointDescription + switch { + case mode == "auto" && policy == "auto": // No user selection, choose best + for _, e := range endpoints { + if serverEndpoint == nil || (e.SecurityMode >= serverEndpoint.SecurityMode && e.SecurityLevel >= serverEndpoint.SecurityLevel) { + serverEndpoint = e + } + } + + case mode != "auto" && policy == "auto": // User only cares about mode, select highest securitylevel with that mode + for _, e := range endpoints { + if e.SecurityMode == secMode && (serverEndpoint == nil || e.SecurityLevel >= serverEndpoint.SecurityLevel) { + serverEndpoint = e + } + } + + case mode == "auto" && policy != "auto": // User only cares about policy, select highest securitylevel with that policy + for _, e := range endpoints { + if e.SecurityPolicyURI == secPolicy && (serverEndpoint == nil || e.SecurityLevel >= serverEndpoint.SecurityLevel) { + serverEndpoint = e + } + } + + default: // User cares about both + for _, e := range endpoints { + if e.SecurityPolicyURI == secPolicy && e.SecurityMode == secMode && (serverEndpoint == nil || e.SecurityLevel >= serverEndpoint.SecurityLevel) { + serverEndpoint = e + } + } + } + + if serverEndpoint == nil { // Didn't find an endpoint with matching policy and mode. + log.Printf("unable to find suitable server endpoint with selected sec-policy and sec-mode") + log.Fatalf("quitting") + } + + secPolicy = serverEndpoint.SecurityPolicyURI + secMode = serverEndpoint.SecurityMode + + // Check that the selected endpoint is a valid combo + err := validateEndpointConfig(endpoints, secPolicy, secMode, authMode) + if err != nil { + log.Fatalf("error validating input: %s", err) + } + + opts = append(opts, opcua.SecurityFromEndpoint(serverEndpoint, authMode)) + return opts +} + +func generateAuth(a string, cert []byte, un, pw string) (ua.UserTokenType, opcua.Option) { + var err error + + var authMode ua.UserTokenType + var authOption opcua.Option + switch strings.ToLower(a) { + case "anonymous": + authMode = ua.UserTokenTypeAnonymous + authOption = opcua.AuthAnonymous() + + case "username": + authMode = ua.UserTokenTypeUserName + + if un == "" { + if err != nil { + log.Fatalf("error reading username input: %s", err) + } + } + + if pw == "" { + if err != nil { + log.Fatalf("error reading username input: %s", err) + } + } + + authOption = opcua.AuthUsername(un, pw) + + case "certificate": + authMode = ua.UserTokenTypeCertificate + authOption = opcua.AuthCertificate(cert) + + case "issuedtoken": + // todo: this is unsupported, fail here or fail in the opcua package? + authMode = ua.UserTokenTypeIssuedToken + authOption = opcua.AuthIssuedToken([]byte(nil)) + + default: + log.Printf("unknown auth-mode, defaulting to Anonymous") + authMode = ua.UserTokenTypeAnonymous + authOption = opcua.AuthAnonymous() + + } + + return authMode, authOption +} + +func validateEndpointConfig(endpoints []*ua.EndpointDescription, secPolicy string, secMode ua.MessageSecurityMode, authMode ua.UserTokenType) error { + for _, e := range endpoints { + if e.SecurityMode == secMode && e.SecurityPolicyURI == secPolicy { + for _, t := range e.UserIdentityTokens { + if t.TokenType == authMode { + return nil + } + } + } + } + + err := errors.Errorf("server does not support an endpoint with security : %s , %s", secPolicy, secMode) + return err +}