diff --git a/CHANGELOG.md b/CHANGELOG.md index 223da44..32d44a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * Samples updated to use "defer" instead of just suggesting it * Add support for MQCB/MQCTL callback functions * Add support for MQBEGIN transaction management +* Add Dead Letter Header parser ## November 2018 - v3.2.0 * Added GetPlatform to mqmetric so it can be used as a label/tag in collectors diff --git a/README.md b/README.md index 6e2b4b4..7c22b3f 100755 --- a/README.md +++ b/README.md @@ -125,8 +125,6 @@ At this point, you should have a compiled copy of the program in `$GOPATH/bin`. All regular MQI verbs are now available through the `ibmmq` package. -There are no structure handlers for message headers such as MQRFH2 or MQDLH. - ## History See [CHANGELOG](CHANGELOG.md) in this directory. diff --git a/ibmmq/mqi.go b/ibmmq/mqi.go index ae5eb8c..08d176b 100644 --- a/ibmmq/mqi.go +++ b/ibmmq/mqi.go @@ -57,6 +57,7 @@ import "C" import ( "encoding/binary" + "io" "strings" "unsafe" ) @@ -120,6 +121,8 @@ func (e *MQReturn) Error() string { return mqstrerror(e.verb, C.MQLONG(e.MQCC), C.MQLONG(e.MQRC)) } +var endian binary.ByteOrder // Used by structure formatters such as MQCFH + /* * Copy a Go string in "strings" * to a fixed-size C char array such as MQCHAR12 @@ -139,7 +142,7 @@ func setMQIString(a *C.char, v string, l int) { /* * The C.GoStringN function can return strings that include * NUL characters (which is not really what is expected for a C string-related - * function). So we have a utility function to remove any trailing nulls + * function). So we have a utility function to remove any trailing nulls and spaces */ func trimStringN(c *C.char, l C.int) string { var rc string @@ -150,7 +153,7 @@ func trimStringN(c *C.char, l C.int) string { } else { rc = s[0:i] } - return rc + return strings.TrimSpace(rc) } /* @@ -1261,3 +1264,30 @@ func (handle *MQMessageHandle) InqMP(goimpo *MQIMPO, gopd *MQPD, name string) (s return goimpo.ReturnedName, propertyValue, nil } + +/* +GetHeader returns a structure containing a parsed-out version of an MQI +message header such as the MQDLH (which is currently the only structure +supported). Other structures like the RFH2 could follow. + +The caller of this function needs to cast the returned structure to the +specific type in order to reference the fields. +*/ +func GetHeader(md *MQMD, buf []byte) (interface{}, int, error) { + switch md.Format { + case MQFMT_DEAD_LETTER_HEADER: + return getHeaderDLH(md, buf) + } + + mqreturn := &MQReturn{MQCC: int32(MQCC_FAILED), + MQRC: int32(MQRC_FORMAT_NOT_SUPPORTED), + } + + return nil, 0, mqreturn +} + +func readStringFromFixedBuffer(r io.Reader, l int32) string { + tmpBuf := make([]byte, l) + binary.Read(r, endian, tmpBuf) + return strings.TrimSpace(string(tmpBuf)) +} diff --git a/ibmmq/mqiDLH.go b/ibmmq/mqiDLH.go new file mode 100644 index 0000000..6faf941 --- /dev/null +++ b/ibmmq/mqiDLH.go @@ -0,0 +1,148 @@ +package ibmmq + +/* + Copyright (c) IBM Corporation 2016,2018 + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Contributors: + Mark Taylor - Initial Contribution +*/ + +/* +#include +#include +#include +*/ +import "C" + +import ( + "bytes" + "encoding/binary" +) + +type MQDLH struct { + Reason int32 + DestQName string + DestQMgrName string + Encoding int32 + CodedCharSetId int32 + Format string + PutApplType int32 + PutApplName string + PutDate string + PutTime string + strucLength int // Not exported +} + +func NewMQDLH(md *MQMD) *MQDLH { + dlh := new(MQDLH) + dlh.Reason = MQRC_NONE + dlh.CodedCharSetId = MQCCSI_UNDEFINED + dlh.PutApplType = 0 + dlh.PutApplName = "" + dlh.PutTime = "" + dlh.PutDate = "" + dlh.Format = "" + dlh.DestQName = "" + dlh.DestQMgrName = "" + + dlh.strucLength = int(MQDLH_CURRENT_LENGTH) + + if md != nil { + dlh.Encoding = md.Encoding + if md.CodedCharSetId == MQCCSI_DEFAULT { + dlh.CodedCharSetId = MQCCSI_INHERIT + } else { + dlh.CodedCharSetId = md.CodedCharSetId + } + dlh.Format = md.Format + + md.Format = MQFMT_DEAD_LETTER_HEADER + md.MsgType = MQMT_REPORT + md.CodedCharSetId = MQCCSI_Q_MGR + } + + if (C.MQENC_NATIVE % 2) == 0 { + endian = binary.LittleEndian + } else { + endian = binary.BigEndian + } + + return dlh +} + +func (dlh *MQDLH) Bytes() []byte { + buf := make([]byte, dlh.strucLength) + offset := 0 + + copy(buf[offset:], "DLH ") + offset += 4 + endian.PutUint32(buf[offset:], uint32(MQDLH_CURRENT_VERSION)) + offset += 4 + endian.PutUint32(buf[offset:], uint32(dlh.Reason)) + offset += 4 + copy(buf[offset:], dlh.DestQName) + offset += int(MQ_OBJECT_NAME_LENGTH) + copy(buf[offset:], dlh.DestQMgrName) + offset += int(MQ_Q_MGR_NAME_LENGTH) + endian.PutUint32(buf[offset:], uint32(dlh.Encoding)) + offset += 4 + endian.PutUint32(buf[offset:], uint32(dlh.CodedCharSetId)) + offset += 4 + copy(buf[offset:], dlh.Format) + offset += int(MQ_FORMAT_LENGTH) + endian.PutUint32(buf[offset:], uint32(dlh.PutApplType)) + offset += 4 + copy(buf[offset:], dlh.PutApplName) + offset += int(MQ_PUT_APPL_NAME_LENGTH) + copy(buf[offset:], dlh.PutDate) + offset += int(MQ_PUT_DATE_LENGTH) + copy(buf[offset:], dlh.PutTime) + offset += int(MQ_PUT_TIME_LENGTH) + + return buf +} + +/* +We have a byte array for the message contents. The start of that buffer +is the MQDLH structure. We read the bytes from that fixed header to match +the C structure definition for each field. The DLH does not have multiple +versions defined so we don't need to check that as we go through. +*/ +func getHeaderDLH(md *MQMD, buf []byte) (*MQDLH, int, error) { + + var version int32 + + dlh := NewMQDLH(nil) + + r := bytes.NewBuffer(buf) + _ = readStringFromFixedBuffer(r, 4) // StrucId + binary.Read(r, endian, &version) + binary.Read(r, endian, &dlh.Reason) + dlh.DestQName = readStringFromFixedBuffer(r, MQ_OBJECT_NAME_LENGTH) + dlh.DestQMgrName = readStringFromFixedBuffer(r, MQ_Q_MGR_NAME_LENGTH) + + binary.Read(r, endian, &dlh.Encoding) + binary.Read(r, endian, &dlh.CodedCharSetId) + + dlh.Format = readStringFromFixedBuffer(r, MQ_FORMAT_LENGTH) + + binary.Read(r, endian, &dlh.PutApplType) + + dlh.PutApplName = readStringFromFixedBuffer(r, MQ_PUT_APPL_NAME_LENGTH) + dlh.PutDate = readStringFromFixedBuffer(r, MQ_PUT_DATE_LENGTH) + dlh.PutTime = readStringFromFixedBuffer(r, MQ_PUT_TIME_LENGTH) + + return dlh, dlh.strucLength, nil +} diff --git a/ibmmq/mqiPCF.go b/ibmmq/mqiPCF.go index d260ca4..93c2f14 100644 --- a/ibmmq/mqiPCF.go +++ b/ibmmq/mqiPCF.go @@ -47,8 +47,6 @@ type MQCFH struct { ParameterCount int32 } -var endian binary.ByteOrder - /* PCFParameter is a structure containing the data associated with various types of PCF element. Use the Type field to decide which diff --git a/mqmetric/discover.go b/mqmetric/discover.go index f38d35d..b2faabc 100755 --- a/mqmetric/discover.go +++ b/mqmetric/discover.go @@ -865,7 +865,7 @@ func formatDescription(elem *MonElement) string { // we have to ensure uniqueness. if strings.Contains(elem.Description, "byte count") { s = s + "_bytes" - } else if strings.HasSuffix(elem.Description," count") && !strings.Contains(s,"_count") { + } else if strings.HasSuffix(elem.Description, " count") && !strings.Contains(s, "_count") { s = s + "_count" } } diff --git a/samples/amqsdlh.go b/samples/amqsdlh.go new file mode 100644 index 0000000..b038e20 --- /dev/null +++ b/samples/amqsdlh.go @@ -0,0 +1,228 @@ +/* + * This is an example of a Go program to put and get messages to an IBM MQ + * queue while manipulating a Dead Letter Header + * + * The queue and queue manager name can be given as parameters on the + * command line. Defaults are coded in the program. + * + */ +package main + +/* + Copyright (c) IBM Corporation 2018 + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the license. + + Contributors: + Mark Taylor - Initial Contribution +*/ + +import ( + "fmt" + "os" + "strings" + "time" + + "github.com/ibm-messaging/mq-golang/ibmmq" +) + +var qMgrObject ibmmq.MQObject +var qObject ibmmq.MQObject + +func main() { + os.Exit(mainWithRc()) +} + +func addDLH(md *ibmmq.MQMD, buf []byte) []byte { + // Create a new Dead Letter Header. This function modifies + // the original message descriptor to indicate there is a DLH + dlh := ibmmq.NewMQDLH(md) + + // Fill in the reason this message needs to be put to a DLQ along with + // any other relevant information. + dlh.Reason = ibmmq.MQRC_NOT_AUTHORIZED + dlh.DestQName = "DEST.QUEUE" + dlh.DestQMgrName = "DEST.QMGR" + // Set the current date/time in the header. The way Go does date formatting + // is very odd.Force the hundredths as there doesn't seem to be a simple way + // to extract it without a '.' in the format. + dlh.PutTime = time.Now().Format("030405") + dlh.PutDate = time.Now().Format("20060102") + + // Then return a modified buffer with the original message data + // following the DLH + return append(dlh.Bytes(), buf...) +} + +// Extract the DLH from the body of the message, print it and then +// print the remaining body. +func printDLH(md *ibmmq.MQMD, buf []byte) { + bodyStart := 0 + buflen := len(buf) + + // Look to see if there is indeed a DLH + fmt.Printf("Format = '%s'\n", md.Format) + if md.Format == ibmmq.MQFMT_DEAD_LETTER_HEADER { + header, headerLen, err := ibmmq.GetHeader(md, buf) + if err == nil { + dlh, ok := header.(*ibmmq.MQDLH) + if ok { + bodyStart += headerLen + fmt.Printf("DLH Structure = %v\n", dlh) + fmt.Printf("Format of next element = '%s'\n", dlh.Format) + } + } + } + + // The original message data starts further on in the slice + fmt.Printf("Got message of total length %d: ", buflen) + fmt.Println(strings.TrimSpace(string(buf[bodyStart:buflen]))) +} + +// The real main function is here to set a return code. +func mainWithRc() int { + var putmqmd *ibmmq.MQMD + + // The default queue manager and queue to be used. These can be overridden on command line. + qMgrName := "QM1" + qName := "DEV.QUEUE.1" + + fmt.Println("Sample AMQSDLH.GO start") + + // Get the queue and queue manager names from command line for overriding + // the defaults. Parameters are not required. + if len(os.Args) >= 2 { + qName = os.Args[1] + } + + if len(os.Args) >= 3 { + qMgrName = os.Args[2] + } + + // This is where we connect to the queue manager. It is assumed + // that the queue manager is either local, or you have set the + // client connection information externally eg via a CCDT or the + // MQSERVER environment variable + qMgrObject, err := ibmmq.Conn(qMgrName) + if err != nil { + fmt.Println(err) + } else { + fmt.Printf("Connected to queue manager %s\n", qMgrName) + defer disc(qMgrObject) + } + + // Open of the queue + if err == nil { + // Create the Object Descriptor that allows us to give the queue name + mqod := ibmmq.NewMQOD() + + // We have to say how we are going to use this queue. In this case, to PUT and GET + // messages. That is done in the openOptions parameter. + openOptions := ibmmq.MQOO_OUTPUT | ibmmq.MQOO_INPUT_AS_Q_DEF + + // Opening a QUEUE (rather than a Topic or other object type) and give the name + mqod.ObjectType = ibmmq.MQOT_Q + mqod.ObjectName = qName + + qObject, err = qMgrObject.Open(mqod, openOptions) + if err != nil { + fmt.Println(err) + } else { + fmt.Println("Opened queue", qObject.Name) + defer close(qObject) + } + } + + // PUT the message to the queue + if err == nil { + putmqmd = ibmmq.NewMQMD() + pmo := ibmmq.NewMQPMO() + + pmo.Options = ibmmq.MQPMO_NO_SYNCPOINT + + // Create the contents to include a timestamp just to prove when it was created + msgData := "Hello from Go at " + time.Now().Format(time.RFC3339) + buffer := []byte(msgData) + putmqmd.Format = ibmmq.MQFMT_STRING + + // Add a Dead Letter Header to the message. + newBuffer := addDLH(putmqmd, buffer) + + // Put the message to the queue) + err = qObject.Put(putmqmd, pmo, newBuffer) + if err != nil { + fmt.Println(err) + } + } + + // And now try to GET the message we just put + if err == nil { + getmqmd := ibmmq.NewMQMD() + gmo := ibmmq.NewMQGMO() + + gmo.Options = ibmmq.MQGMO_NO_SYNCPOINT + + // Set options to not wait - we know the message is there since we just put it + gmo.Options |= ibmmq.MQGMO_NO_WAIT + + // Use the MsgId to retrieve the same message + gmo.MatchOptions = ibmmq.MQMO_MATCH_MSG_ID + getmqmd.MsgId = putmqmd.MsgId + + // Create a buffer for the message data. This one is large enough + // for the messages put by the amqsput sample. + buffer := make([]byte, 1024) + + // Now we can try to get the message. + datalen := 0 + datalen, err = qObject.Get(getmqmd, gmo, buffer) + + if err != nil { + fmt.Println(err) + } else { + // A message has been retrieved. Print the contents, and the DLH + // if one exists + printDLH(getmqmd, buffer[0:datalen]) + } + } + + // Exit with any return code extracted from the failing MQI call. + // Deferred disconnect will happen after the return + mqret := 0 + if err != nil { + mqret = int((err.(*ibmmq.MQReturn)).MQCC) + } + return mqret +} + +// Disconnect from the queue manager +func disc(qMgrObject ibmmq.MQQueueManager) error { + err := qMgrObject.Disc() + if err == nil { + fmt.Printf("Disconnected from queue manager %s\n", qMgrObject.Name) + } else { + fmt.Println(err) + } + return err +} + +// Close the queue if it was opened +func close(object ibmmq.MQObject) error { + err := object.Close(0) + if err == nil { + fmt.Println("Closed queue") + } else { + fmt.Println(err) + } + return err +}