From dbdf3b2863c43b62e66971764568dd83111f317d Mon Sep 17 00:00:00 2001 From: ibmmqmet Date: Mon, 3 Dec 2018 07:53:20 +0000 Subject: [PATCH 1/7] Convert samples to use defer instead of just suggesting it --- samples/README.md | 15 +++++----- samples/amqsget.go | 62 +++++++++++++++++++++-------------------- samples/amqsinq.go | 39 ++++++++++++++++---------- samples/amqsprop.go | 66 +++++++++++++++++++++---------------------- samples/amqspub.go | 65 ++++++++++++++++++++++--------------------- samples/amqsput.go | 68 ++++++++++++++++++++++++--------------------- samples/amqsset.go | 33 ++++++++++++++-------- samples/amqssub.go | 67 ++++++++++++++++++++++---------------------- samples/pubsub.sh | 4 +-- samples/putget.sh | 5 ++-- 10 files changed, 224 insertions(+), 200 deletions(-) diff --git a/samples/README.md b/samples/README.md index b586258..9e90257 100644 --- a/samples/README.md +++ b/samples/README.md @@ -13,7 +13,7 @@ Where needed for the sample programs: * the default queue manager is "QM1" * the default queue is "DEV.QUEUE.1" -* the default topic is "GO.TEST.TOPIC" +* the default topic is "DEV.BASE.TOPIC" ## Description of sample programs Current samples in this directory include @@ -25,10 +25,11 @@ Current samples in this directory include * amqsconn.go: How to programmatically connect as an MQ client to a remote queue manager. Allow use of a userid/password for authentication. There are no default values for this sample. * amqsprop.go: Set and extract message properties -* amqsinq.go : Demonstrate the new InqMap API for inquiring object attributes +* amqsinq.go : Demonstrate the new InqMap API for inquiring about object attributes +* amqsset.go : Demonstrate how to set attributes of an MQ object using the MQSET verb Some trivial scripts run the sample programs in matching pairs: -* putget.sh : Run amqsput and then use the generated MsgId to get the same message +* putget.sh : Run amqsput and then use the generated MsgId to get the same message with amqsget * pubsub.sh : Start amqssub and then run the amqspub program immediately The `mqitest` sample program in its own subdirectory is a more general demonstration @@ -55,9 +56,9 @@ there is something waiting to receive the publications when they are made. The ## More information Comments in the programs explain what they are doing. For more detailed information about the -MQ API, the functions, structures and constants, see the -[MQ Knowledge Center](https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.1.0/com.ibm.mq.ref.dev.doc/q089590_.htm) +MQ API, the functions, structures, and constants, see the +[MQ Knowledge Center](https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.1.0/com.ibm.mq.ref.dev.doc/q089590_.htm). -You can also find general MQ application development advice [here](https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.1.0/com.ibm.mq.dev.doc/q022830_.htm) +You can also find general MQ application development advice [here](https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.1.0/com.ibm.mq.dev.doc/q022830_.htm). Information about development for procedural programming languages such as C in that -site is most relevant for the interface exported by this Go package. +documentation is most relevant for the interface exported by this Go package. diff --git a/samples/amqsget.go b/samples/amqsget.go index d4d427a..f494c9c 100644 --- a/samples/amqsget.go +++ b/samples/amqsget.go @@ -48,15 +48,17 @@ var qMgrObject ibmmq.MQObject var qObject ibmmq.MQObject func main() { + os.Exit(mainWithRc()) +} + +// The real main function is here to set a return code. +func mainWithRc() int { var msgId string // The default queue manager and queue to be used. These can be overridden on command line. qMgrName := "QM1" qName := "DEV.QUEUE.1" - qMgrConnected := false - qOpened := false - fmt.Println("Sample AMQSGET.GO start") // Get the queue and queue manager names from command line for overriding @@ -82,8 +84,8 @@ func main() { if err != nil { fmt.Println(err) } else { - qMgrConnected = true fmt.Printf("Connected to queue manager %s\n", qMgrName) + defer disc(qMgrObject) } // Open of the queue @@ -93,7 +95,7 @@ func main() { // We have to say how we are going to use this queue. In this case, to GET // messages. That is done in the openOptions parameter. - openOptions := ibmmq.MQOO_INPUT_EXCLUSIVE + ibmmq.MQOO_FAIL_IF_QUIESCING + openOptions := ibmmq.MQOO_INPUT_EXCLUSIVE // Opening a QUEUE (rather than a Topic or other object type) and give the name mqod.ObjectType = ibmmq.MQOT_Q @@ -103,8 +105,8 @@ func main() { if err != nil { fmt.Println(err) } else { - qOpened = true fmt.Println("Opened queue", qObject.Name) + defer close(qObject) } } @@ -119,9 +121,8 @@ func main() { // The default options are OK, but it's always // a good idea to be explicit about transactional boundaries as - // not all platforms behave the same way. It's also good practice to - // set the FAIL_IF_QUIESCING flag on all verbs. - gmo.Options = ibmmq.MQGMO_NO_SYNCPOINT | ibmmq.MQGMO_FAIL_IF_QUIESCING + // not all platforms behave the same way. + gmo.Options = ibmmq.MQGMO_NO_SYNCPOINT // Set options to wait for a maximum of 3 seconds for any new message to arrive gmo.Options |= ibmmq.MQGMO_WAIT @@ -162,32 +163,33 @@ func main() { } } - // The usual tidy up at the end of a program is for queues to be closed, - // queue manager connections to be disconnected etc. - // In a larger Go program, we might move this to a defer() section to ensure - // it gets done regardless of other flows through the program. - - // Close the queue if it was opened - if qOpened { - err = qObject.Close(0) - if err != nil { - fmt.Println(err) - } else { - fmt.Println("Closed queue") - } + // Exit with any return code extracted from the failing MQI call. + // Deferred disconnect will happen after the return + mqret := 0 + if err != nil { + mqret = int((err.(*ibmmq.MQReturn)).MQCC) } + return mqret +} - // Disconnect from the queue manager - if qMgrConnected { - err = qMgrObject.Disc() - fmt.Printf("Disconnected from queue manager %s\n", qMgrName) +// Disconnect from the queue manager +func disc(qMgrObject ibmmq.MQQueueManager) error { + err := qMgrObject.Disc() + if err == nil { + fmt.Printf("Disconnected from queue manager %s\n", qMgrObject.Name) + } else { + fmt.Println(err) } + return err +} - // Exit with any return code extracted from the failing MQI call. +// Close the queue if it was opened +func close(object ibmmq.MQObject) error { + err := object.Close(0) if err == nil { - os.Exit(0) + fmt.Println("Closed queue") } else { - mqret := err.(*ibmmq.MQReturn) - os.Exit((int)(mqret.MQCC)) + fmt.Println(err) } + return err } diff --git a/samples/amqsinq.go b/samples/amqsinq.go index 1d28655..4be9035 100644 --- a/samples/amqsinq.go +++ b/samples/amqsinq.go @@ -39,7 +39,7 @@ var qMgrObject ibmmq.MQObject var object ibmmq.MQObject /* - * This is an example of how to call MQINQ with the new "map" format for + * This is an example of how to call MQINQ with a "map" format for * responses */ func inquire(obj ibmmq.MQObject, selectors []int32) { @@ -60,15 +60,19 @@ func inquire(obj ibmmq.MQObject, selectors []int32) { } } +// Main function that simply calls a subfunction to ensure defer routines are called before os.Exit happens func main() { + os.Exit(mainWithRc()) +} + +// The real main function is here to set a return code. +func mainWithRc() int { var selectors []int32 // The default queue manager, a queue and a namelist to be used. These can be overridden on command line. qMgrName := "QM1" qName := "DEV.QUEUE.1" nlName := "SYSTEM.DEFAULT.NAMELIST" - qMgrConnected := false - fmt.Println("Sample AMQSINQ.GO start") // Get the object names from command line for overriding @@ -91,8 +95,8 @@ func main() { if err != nil { fmt.Println(err) } else { - qMgrConnected = true fmt.Printf("Connected to queue manager %s\n", qMgrName) + defer disc(qMgrObject) } // Open an object @@ -101,7 +105,7 @@ func main() { mqod := ibmmq.NewMQOD() // We have to say how we are going to use this object. - openOptions := ibmmq.MQOO_FAIL_IF_QUIESCING + ibmmq.MQOO_INQUIRE + openOptions := ibmmq.MQOO_INQUIRE mqod.ObjectType = ibmmq.MQOT_Q_MGR // Do not need the qmgr name when opening it @@ -124,7 +128,7 @@ func main() { mqod := ibmmq.NewMQOD() // We have to say how we are going to use this object. - openOptions := ibmmq.MQOO_FAIL_IF_QUIESCING + ibmmq.MQOO_INQUIRE + openOptions := ibmmq.MQOO_INQUIRE mqod.ObjectType = ibmmq.MQOT_Q mqod.ObjectName = qName @@ -149,7 +153,7 @@ func main() { mqod := ibmmq.NewMQOD() // We have to say how we are going to use this object. - openOptions := ibmmq.MQOO_FAIL_IF_QUIESCING + ibmmq.MQOO_INQUIRE + openOptions := ibmmq.MQOO_INQUIRE mqod.ObjectType = ibmmq.MQOT_NAMELIST mqod.ObjectName = nlName @@ -166,17 +170,22 @@ func main() { } } - // Disconnect from the queue manager - if qMgrConnected { - err = qMgrObject.Disc() - fmt.Printf("Disconnected from queue manager %s\n", qMgrName) + // Exit with any return code extracted from the failing MQI call. + // Deferred disconnect will happen after the return + mqret := 0 + if err != nil { + mqret = int((err.(*ibmmq.MQReturn)).MQCC) } + return mqret +} - // Exit with any return code extracted from the failing MQI call. +// Disconnect from the queue manager +func disc(qMgrObject ibmmq.MQQueueManager) error { + err := qMgrObject.Disc() if err == nil { - os.Exit(0) + fmt.Printf("Disconnected from queue manager %s\n", qMgrObject.Name) } else { - mqret := err.(*ibmmq.MQReturn) - os.Exit((int)(mqret.MQCC)) + fmt.Println(err) } + return err } diff --git a/samples/amqsprop.go b/samples/amqsprop.go index 04df147..b1c69c7 100644 --- a/samples/amqsprop.go +++ b/samples/amqsprop.go @@ -176,15 +176,17 @@ func printProperties(getMsgHandle ibmmq.MQMessageHandle) { } func main() { + os.Exit(mainWithRc()) +} + +// The real main function is here to set a return code. +func mainWithRc() int { var putmqmd *ibmmq.MQMD // The default queue manager and queue to be used. These can be overridden on command line. qMgrName := "QM1" qName := "DEV.QUEUE.1" - qMgrConnected := false - qOpened := false - fmt.Println("Sample AMQSPROP.GO start") // Get the queue and queue manager names from command line for overriding @@ -205,8 +207,8 @@ func main() { if err != nil { fmt.Println(err) } else { - qMgrConnected = true fmt.Printf("Connected to queue manager %s\n", qMgrName) + defer disc(qMgrObject) } // Open of the queue @@ -216,7 +218,7 @@ func main() { // We have to say how we are going to use this queue. In this case, to PUT and GET // messages. That is done in the openOptions parameter. - openOptions := ibmmq.MQOO_OUTPUT + ibmmq.MQOO_FAIL_IF_QUIESCING + ibmmq.MQOO_INPUT_AS_Q_DEF + openOptions := ibmmq.MQOO_OUTPUT | ibmmq.MQOO_INPUT_AS_Q_DEF // Opening a QUEUE (rather than a Topic or other object type) and give the name mqod.ObjectType = ibmmq.MQOT_Q @@ -226,8 +228,8 @@ func main() { if err != nil { fmt.Println(err) } else { - qOpened = true fmt.Println("Opened queue", qObject.Name) + defer close(qObject) } } @@ -259,7 +261,7 @@ func main() { putmqmd = ibmmq.NewMQMD() pmo := ibmmq.NewMQPMO() - pmo.Options = ibmmq.MQPMO_NO_SYNCPOINT | ibmmq.MQPMO_FAIL_IF_QUIESCING + pmo.Options = ibmmq.MQPMO_NO_SYNCPOINT // Set the handle that holds the properties pmo.OriginalMsgHandle = putMsgHandle @@ -281,7 +283,7 @@ func main() { getmqmd := ibmmq.NewMQMD() gmo := ibmmq.NewMQGMO() - gmo.Options = ibmmq.MQGMO_NO_SYNCPOINT | ibmmq.MQGMO_FAIL_IF_QUIESCING + gmo.Options = ibmmq.MQGMO_NO_SYNCPOINT // Set options to not wait - we know the message is there since we just put it gmo.Options |= ibmmq.MQGMO_NO_WAIT @@ -312,37 +314,33 @@ func main() { } } - // The usual tidy up at the end of a program is for queues to be closed, - // queue manager connections to be disconnected etc. - // In a larger Go program, we might move this to a defer() section to ensure - // it gets done regardless of other flows through the program. - - // Close the queue if it was opened - if qOpened { - err = qObject.Close(0) - if err != nil { - fmt.Println(err) - } else { - fmt.Println("Closed queue") - } - - // Delete any created message handles - dmho := ibmmq.NewMQDMHO() - getMsgHandle.DltMH(dmho) - putMsgHandle.DltMH(dmho) + // Exit with any return code extracted from the failing MQI call. + // Deferred disconnect will happen after the return + mqret := 0 + if err != nil { + mqret = int((err.(*ibmmq.MQReturn)).MQCC) } + return mqret +} - // Disconnect from the queue manager - if qMgrConnected { - err = qMgrObject.Disc() - fmt.Printf("Disconnected from queue manager %s\n", qMgrName) +// Disconnect from the queue manager +func disc(qMgrObject ibmmq.MQQueueManager) error { + err := qMgrObject.Disc() + if err == nil { + fmt.Printf("Disconnected from queue manager %s\n", qMgrObject.Name) + } else { + fmt.Println(err) } + return err +} - // Exit with any return code extracted from the failing MQI call. +// Close the queue if it was opened +func close(object ibmmq.MQObject) error { + err := object.Close(0) if err == nil { - os.Exit(0) + fmt.Println("Closed queue") } else { - mqret := err.(*ibmmq.MQReturn) - os.Exit((int)(mqret.MQCC)) + fmt.Println(err) } + return err } diff --git a/samples/amqspub.go b/samples/amqspub.go index 97cba4d..4d3f6ed 100644 --- a/samples/amqspub.go +++ b/samples/amqspub.go @@ -42,13 +42,15 @@ var qMgrObject ibmmq.MQObject var topicObject ibmmq.MQObject func main() { + os.Exit(mainWithRc()) +} + +// The real main function is here to set a return code. +func mainWithRc() int { // The default queue manager and topic to be used. These can be overridden on command line. qMgrName := "QM1" - topic := "GO.TEST.TOPIC" - - qMgrConnected := false - topicOpened := false + topic := "DEV.BASE.TOPIC" fmt.Println("Sample AMQSPUB.GO start") @@ -70,8 +72,8 @@ func main() { if err != nil { fmt.Println(err) } else { - qMgrConnected = true fmt.Printf("Connected to queue manager %s\n", qMgrName) + defer disc(qMgrObject) } // Open of the topic object @@ -81,7 +83,7 @@ func main() { // We have to say how we are going to use this object. In this case, to PUBLISH // messages. That is done in the openOptions parameter. - openOptions := ibmmq.MQOO_OUTPUT + ibmmq.MQOO_FAIL_IF_QUIESCING + openOptions := ibmmq.MQOO_OUTPUT // When opening a Topic, MQ has a choice of whether to refer to // the object through an ObjectName value or the ObjectString value or both. @@ -93,8 +95,8 @@ func main() { if err != nil { fmt.Println(err) } else { - topicOpened = true fmt.Println("Opened topic ", topic) + defer close(topicObject) } } @@ -107,10 +109,8 @@ func main() { // The default options are OK, but it's always // a good idea to be explicit about transactional boundaries as - // not all platforms behave the same way. It's also good practice to - // set the FAIL_IF_QUIESCING flag on all verbs, even for short-running - // operations like this PUT. - pmo.Options = ibmmq.MQPMO_NO_SYNCPOINT | ibmmq.MQPMO_FAIL_IF_QUIESCING + // not all platforms behave the same way. + pmo.Options = ibmmq.MQPMO_NO_SYNCPOINT // Tell MQ what the message body format is. In this case, a text string putmqmd.Format = ibmmq.MQFMT_STRING @@ -131,32 +131,33 @@ func main() { } } - // The usual tidy up at the end of a program is for queues to be closed, - // queue manager connections to be disconnected etc. - // In a larger Go program, we might move this to a defer() section to ensure - // it gets done regardless of other flows through the program. - - // Close the topic if it was successfully opened - if topicOpened { - err = topicObject.Close(0) - if err != nil { - fmt.Println(err) - } else { - fmt.Println("Closed topic") - } + // Exit with any return code extracted from the failing MQI call. + // Deferred disconnect will happen after the return + mqret := 0 + if err != nil { + mqret = int((err.(*ibmmq.MQReturn)).MQCC) } + return mqret +} - // Disconnect from the queue manager - if qMgrConnected { - err = qMgrObject.Disc() - fmt.Printf("Disconnected from queue manager %s\n", qMgrName) +// Disconnect from the queue manager +func disc(qMgrObject ibmmq.MQQueueManager) error { + err := qMgrObject.Disc() + if err == nil { + fmt.Printf("Disconnected from queue manager %s\n", qMgrObject.Name) + } else { + fmt.Println(err) } + return err +} - // Exit with any return code extracted from the failing MQI call. +// Close the topic if it was opened +func close(object ibmmq.MQObject) error { + err := object.Close(0) if err == nil { - os.Exit(0) + fmt.Println("Closed topic") } else { - mqret := err.(*ibmmq.MQReturn) - os.Exit((int)(mqret.MQCC)) + fmt.Println(err) } + return err } diff --git a/samples/amqsput.go b/samples/amqsput.go index 367dfdb..e4760b6 100644 --- a/samples/amqsput.go +++ b/samples/amqsput.go @@ -45,15 +45,18 @@ import ( var qMgrObject ibmmq.MQObject var qObject ibmmq.MQObject +// Main function that simply calls a subfunction to ensure defer routines are called before os.Exit happens func main() { + os.Exit(mainWithRc()) +} + +// The real main function is here to set a return code. +func mainWithRc() int { // The default queue manager and queue to be used. These can be overridden on command line. qMgrName := "QM1" qName := "DEV.QUEUE.1" - qMgrConnected := false - qOpened := false - fmt.Println("Sample AMQSPUT.GO start") // Get the queue and queue manager names from command line for overriding @@ -74,18 +77,19 @@ func main() { if err != nil { fmt.Println(err) } else { - qMgrConnected = true + // Make sure we disconnect from the queue manager later fmt.Printf("Connected to queue manager %s\n", qMgrName) + defer disc(qMgrObject) } - // Open of the queue + // Open the queue if err == nil { // Create the Object Descriptor that allows us to give the queue name mqod := ibmmq.NewMQOD() // We have to say how we are going to use this queue. In this case, to PUT // messages. That is done in the openOptions parameter. - openOptions := ibmmq.MQOO_OUTPUT + ibmmq.MQOO_FAIL_IF_QUIESCING + openOptions := ibmmq.MQOO_OUTPUT // Opening a QUEUE (rather than a Topic or other object type) and give the name mqod.ObjectType = ibmmq.MQOT_Q @@ -95,8 +99,9 @@ func main() { if err != nil { fmt.Println(err) } else { - qOpened = true + // Make sure we close the queue once we're done with it fmt.Println("Opened queue", qObject.Name) + defer close(qObject) } } @@ -109,10 +114,8 @@ func main() { // The default options are OK, but it's always // a good idea to be explicit about transactional boundaries as - // not all platforms behave the same way. It's also good practice to - // set the FAIL_IF_QUIESCING flag on all verbs, even for short-running - // operations like this PUT. - pmo.Options = ibmmq.MQPMO_NO_SYNCPOINT | ibmmq.MQPMO_FAIL_IF_QUIESCING + // not all platforms behave the same way. + pmo.Options = ibmmq.MQPMO_NO_SYNCPOINT // Tell MQ what the message body format is. In this case, a text string putmqmd.Format = ibmmq.MQFMT_STRING @@ -135,32 +138,33 @@ func main() { } } - // The usual tidy up at the end of a program is for queues to be closed, - // queue manager connections to be disconnected etc. - // In a larger Go program, we might move this to a defer() section to ensure - // it gets done regardless of other flows through the program. - - // Close the queue if it was opened - if qOpened { - err = qObject.Close(0) - if err != nil { - fmt.Println(err) - } else { - fmt.Println("Closed queue") - } + // Exit with any return code extracted from the failing MQI call. + // Deferred disconnect will happen after the return + mqret := 0 + if err != nil { + mqret = int((err.(*ibmmq.MQReturn)).MQCC) } + return mqret +} - // Disconnect from the queue manager - if qMgrConnected { - err = qMgrObject.Disc() - fmt.Printf("Disconnected from queue manager %s\n", qMgrName) +// Disconnect from the queue manager +func disc(qMgrObject ibmmq.MQQueueManager) error { + err := qMgrObject.Disc() + if err == nil { + fmt.Printf("Disconnected from queue manager %s\n", qMgrObject.Name) + } else { + fmt.Println(err) } + return err +} - // Exit with any return code extracted from the failing MQI call. +// Close the queue if it was opened +func close(object ibmmq.MQObject) error { + err := object.Close(0) if err == nil { - os.Exit(0) + fmt.Println("Closed queue") } else { - mqret := err.(*ibmmq.MQReturn) - os.Exit((int)(mqret.MQCC)) + fmt.Println(err) } + return err } diff --git a/samples/amqsset.go b/samples/amqsset.go index 2d8e6a0..f5425f8 100644 --- a/samples/amqsset.go +++ b/samples/amqsset.go @@ -63,13 +63,17 @@ func setAttributes(obj ibmmq.MQObject) { } } +// Main function that simply calls a subfunction to ensure defer routines are called before os.Exit happens func main() { + os.Exit(mainWithRc()) +} + +// The real main function is here to set a return code. +func mainWithRc() int { // The default queue manager and a queue to be used. These can be overridden on command line. qMgrName := "QM1" qName := "DEV.QUEUE.1" - qMgrConnected := false - fmt.Println("Sample AMQSSET.GO start") // Get the object names from command line for overriding @@ -86,8 +90,8 @@ func main() { if err != nil { fmt.Println(err) } else { - qMgrConnected = true fmt.Printf("Connected to queue manager %s\n", qMgrName) + defer disc(qMgrObject) } // Open a queue with the option to say it will be modified @@ -97,7 +101,7 @@ func main() { // We have to say how we are going to use this object. The MQOO_SET flag // says that it will be used for an MQSET operation - openOptions := ibmmq.MQOO_FAIL_IF_QUIESCING + ibmmq.MQOO_SET + openOptions := ibmmq.MQOO_SET mqod.ObjectType = ibmmq.MQOT_Q mqod.ObjectName = qName @@ -110,17 +114,22 @@ func main() { } } - // Disconnect from the queue manager - if qMgrConnected { - err = qMgrObject.Disc() - fmt.Printf("Disconnected from queue manager %s\n", qMgrName) + // Exit with any return code extracted from the failing MQI call. + // Deferred disconnect will happen after the return + mqret := 0 + if err != nil { + mqret = int((err.(*ibmmq.MQReturn)).MQCC) } + return mqret +} - // Exit with any return code extracted from the failing MQI call. +// Disconnect from the queue manager +func disc(qMgrObject ibmmq.MQQueueManager) error { + err := qMgrObject.Disc() if err == nil { - os.Exit(0) + fmt.Printf("Disconnected from queue manager %s\n", qMgrObject.Name) } else { - mqret := err.(*ibmmq.MQReturn) - os.Exit((int)(mqret.MQCC)) + fmt.Println(err) } + return err } diff --git a/samples/amqssub.go b/samples/amqssub.go index b003900..6ebc6bd 100644 --- a/samples/amqssub.go +++ b/samples/amqssub.go @@ -5,7 +5,7 @@ * The topic and queue manager name can be given as parameters on the * command line. Defaults are coded in the program. * - * The program loops until no more publications arv available, waiting for + * The program loops until no more publications are available, waiting for * at most 3 seconds for new messages to arrive. * * Each MQI call prints its success or failure. @@ -44,14 +44,17 @@ var qMgrObject ibmmq.MQObject var qObject ibmmq.MQObject var subscriptionObject ibmmq.MQObject +// Main function that simply calls a subfunction to ensure defer routines are called before os.Exit happens func main() { + os.Exit(mainWithRc()) +} + +// The real main function is here to set a return code. +func mainWithRc() int { // The default queue manager and topic to be used. These can be overridden on command line. qMgrName := "QM1" - topic := "GO.TEST.TOPIC" - - qMgrConnected := false - subscriptionMade := false + topic := "DEV.BASE.TOPIC" fmt.Println("Sample AMQSSUB.GO start") @@ -73,8 +76,8 @@ func main() { if err != nil { fmt.Println(err) } else { - qMgrConnected = true fmt.Printf("Connected to queue manager %s\n", qMgrName) + defer disc(qMgrObject) } // Subscribe to the topic @@ -89,7 +92,6 @@ func main() { // where publications are delivered mqsd.Options = ibmmq.MQSO_CREATE | ibmmq.MQSO_NON_DURABLE | - ibmmq.MQSO_FAIL_IF_QUIESCING | ibmmq.MQSO_MANAGED // When opening a Subscription, MQ has a choice of whether to refer to @@ -103,8 +105,8 @@ func main() { if err != nil { fmt.Println(err) } else { - subscriptionMade = true fmt.Println("Subscription made to topic ", topic) + defer close(subscriptionObject) } } @@ -119,9 +121,8 @@ func main() { // The default options are OK, but it's always // a good idea to be explicit about transactional boundaries as - // not all platforms behave the same way. It's also good practice to - // set the FAIL_IF_QUIESCING flag on all verbs. - gmo.Options = ibmmq.MQGMO_NO_SYNCPOINT | ibmmq.MQGMO_FAIL_IF_QUIESCING + // not all platforms behave the same way. + gmo.Options = ibmmq.MQGMO_NO_SYNCPOINT // Set options to wait for a maximum of 3 seconds for any new message to arrive gmo.Options |= ibmmq.MQGMO_WAIT @@ -151,33 +152,33 @@ func main() { } } - // The usual tidy up at the end of a program is for queues to be closed, - // queue manager connections to be disconnected etc. - // In a larger Go program, we might move this to a defer() section to ensure - // it gets done regardless of other flows through the program. - - // Close the subscription if it was opened. This will also close the - // managed publication queue. - if subscriptionMade { - err = subscriptionObject.Close(0) - if err != nil { - fmt.Println(err) - } else { - fmt.Println("Closed topic") - } + // Exit with any return code extracted from the failing MQI call. + // On return, the deferred close/disconnect operations will tidy up + mqret := 0 + if err != nil { + mqret = int((err.(*ibmmq.MQReturn)).MQCC) } + return mqret +} - // Disconnect from the queue manager - if qMgrConnected { - err = qMgrObject.Disc() - fmt.Printf("Disconnected from queue manager %s\n", qMgrName) +// Disconnect from the queue manager +func disc(qMgrObject ibmmq.MQQueueManager) error { + err := qMgrObject.Disc() + if err == nil { + fmt.Printf("Disconnected from queue manager %s\n", qMgrObject.Name) + } else { + fmt.Println(err) } + return err +} - // Exit with any return code extracted from the failing MQI call. +// Close the topic if it was opened +func close(object ibmmq.MQObject) error { + err := object.Close(0) if err == nil { - os.Exit(0) + fmt.Println("Closed topic") } else { - mqret := err.(*ibmmq.MQReturn) - os.Exit((int)(mqret.MQCC)) + fmt.Println(err) } + return err } diff --git a/samples/pubsub.sh b/samples/pubsub.sh index 96a4f49..422cbef 100755 --- a/samples/pubsub.sh +++ b/samples/pubsub.sh @@ -2,7 +2,7 @@ # first and in the background. Give it a chance to start. Then run the # publisher -go run amqssub.go GO.TEST.TOPIC QM1 & +go run amqssub.go DEV.BASE.TOPIC QM1 & sleep 1 -go run amqspub.go GO.TEST.TOPIC QM1 +go run amqspub.go DEV.BASE.TOPIC QM1 wait diff --git a/samples/putget.sh b/samples/putget.sh index 974edf4..7e4bed0 100755 --- a/samples/putget.sh +++ b/samples/putget.sh @@ -1,9 +1,8 @@ # Run the amqsput and amqsget samples in sequence, extracting the MsgId # from the PUT operation and using it to retrieve the message in the GET sample -# We don't get to see the output from the amqsput program as it's filtered to -# extract the MsgId -id=`go run amqsput.go DEV.QUEUE.1 QM1 | grep MsgId | cut -d: -f2` +go run amqsput.go DEV.QUEUE.1 QM1 | tee /tmp/putget.out +id=`grep MsgId /tmp/putget.out | cut -d: -f2` if [ "$id" != "" ] then From 11e642fc3e8f2aa9511a1efbc7b0085c26531dba Mon Sep 17 00:00:00 2001 From: ibmmqmet Date: Mon, 3 Dec 2018 14:21:40 +0000 Subject: [PATCH 2/7] MQCB/MQCTL and MQBEGIN complete the MQI implementation --- CHANGELOG.md | 6 ++ README.md | 7 +- ibmmq/mqi.go | 37 +++++++- ibmmq/mqiBO.go | 54 ++++++++++++ ibmmq/mqiCBC.go | 78 +++++++++++++++++ ibmmq/mqiCBD.go | 78 +++++++++++++++++ ibmmq/mqiCTLO.go | 61 ++++++++++++++ ibmmq/mqiMQGMO.go | 2 +- ibmmq/mqiMQPMO.go | 2 +- ibmmq/mqiMQSD.go | 2 +- ibmmq/mqiSRO.go | 6 +- ibmmq/mqicb.go | 210 ++++++++++++++++++++++++++++++++++++++++++++++ ibmmq/mqicb_c.go | 37 ++++++++ samples/README.md | 1 + samples/amqscb.go | 189 +++++++++++++++++++++++++++++++++++++++++ 15 files changed, 758 insertions(+), 12 deletions(-) create mode 100644 ibmmq/mqiBO.go create mode 100644 ibmmq/mqiCBC.go create mode 100644 ibmmq/mqiCBD.go create mode 100644 ibmmq/mqiCTLO.go create mode 100644 ibmmq/mqicb.go create mode 100644 ibmmq/mqicb_c.go create mode 100644 samples/amqscb.go diff --git a/CHANGELOG.md b/CHANGELOG.md index da3b576..223da44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## December 2018 - v3.3.0 +* All relevant API calls now automatically set FAIL_IF_QUIESCING +* Samples updated to use "defer" instead of just suggesting it +* Add support for MQCB/MQCTL callback functions +* Add support for MQBEGIN transaction management + ## November 2018 - v3.2.0 * Added GetPlatform to mqmetric so it can be used as a label/tag in collectors * Added sample programs demonstrating specific operations such as put/get of message diff --git a/README.md b/README.md index cd0a60e..6e2b4b4 100755 --- a/README.md +++ b/README.md @@ -123,12 +123,9 @@ At this point, you should have a compiled copy of the program in `$GOPATH/bin`. ## Limitations -Almost all of the MQI verbs are now available through the `ibmmq` package. -Currently unavailable verbs include: +All regular MQI verbs are now available through the `ibmmq` package. -* MQCB/MQCTL - -There are also no structure handlers for message headers such as MQRFH2 or MQDLH. +There are no structure handlers for message headers such as MQRFH2 or MQDLH. ## History diff --git a/ibmmq/mqi.go b/ibmmq/mqi.go index 302593d..b3e3183 100644 --- a/ibmmq/mqi.go +++ b/ibmmq/mqi.go @@ -51,6 +51,7 @@ package ibmmq #include #include #include + */ import "C" @@ -219,6 +220,7 @@ func (x *MQQueueManager) Disc() error { var mqrc C.MQLONG var mqcc C.MQLONG + savedConn := x.hConn C.MQDISC(&x.hConn, &mqcc, &mqrc) mqreturn := MQReturn{MQCC: int32(mqcc), @@ -230,6 +232,8 @@ func (x *MQQueueManager) Disc() error { return &mqreturn } + cbRemoveConnection(savedConn) + return nil } @@ -248,7 +252,7 @@ func (x *MQQueueManager) Open(good *MQOD, goOpenOptions int32) (MQObject, error) } copyODtoC(&mqod, good) - mqOpenOptions = C.MQLONG(goOpenOptions) + mqOpenOptions = C.MQLONG(goOpenOptions) | C.MQOO_FAIL_IF_QUIESCING C.MQOPEN(x.hConn, (C.PMQVOID)(unsafe.Pointer(&mqod)), @@ -288,6 +292,9 @@ func (object *MQObject) Close(goCloseOptions int32) error { mqCloseOptions = C.MQLONG(goCloseOptions) + savedHConn := object.qMgr.hConn + savedHObj := object.hObj + C.MQCLOSE(object.qMgr.hConn, &object.hObj, mqCloseOptions, &mqcc, &mqrc) mqreturn := MQReturn{MQCC: int32(mqcc), @@ -299,6 +306,7 @@ func (object *MQObject) Close(goCloseOptions int32) error { return &mqreturn } + cbRemoveHandle(savedHConn, savedHObj) return nil } @@ -373,6 +381,33 @@ func (subObject *MQObject) Subrq(gosro *MQSRO, action int32) error { return nil } +/* +Begin is the function to start a two-phase XA transaction coordinated by MQ +*/ +func (x *MQQueueManager) Begin(gobo *MQBO) error { + var mqrc C.MQLONG + var mqcc C.MQLONG + var mqbo C.MQBO + + copyBOtoC(&mqbo, gobo) + + C.MQBEGIN(x.hConn, (C.PMQVOID)(unsafe.Pointer(&mqbo)), &mqcc, &mqrc) + + copyBOfromC(&mqbo, gobo) + + mqreturn := MQReturn{MQCC: int32(mqcc), + MQRC: int32(mqrc), + verb: "MQBEGIN", + } + + if mqcc != C.MQCC_OK { + return &mqreturn + } + + return nil + +} + /* Cmit is the function to commit an in-flight transaction */ diff --git a/ibmmq/mqiBO.go b/ibmmq/mqiBO.go new file mode 100644 index 0000000..99d26d1 --- /dev/null +++ b/ibmmq/mqiBO.go @@ -0,0 +1,54 @@ +package ibmmq + +/* + Copyright (c) IBM Corporation 2018 + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Contributors: + Mark Taylor - Initial Contribution +*/ + +/* + +#include +#include +#include + +*/ +import "C" + +/* +This module contains the Begin Options structure +*/ + +type MQBO struct { + Options int32 +} + +func NewMQBO() *MQBO { + bo := new(MQBO) + bo.Options = int32(C.MQBO_NONE) + return bo +} + +func copyBOtoC(mqbo *C.MQBO, gobo *MQBO) { + setMQIString((*C.char)(&mqbo.StrucId[0]), "BO ", 4) + mqbo.Version = 1 + mqbo.Options = C.MQLONG(gobo.Options) +} + +func copyBOfromC(mqbo *C.MQBO, gobo *MQBO) { + gobo.Options = int32(mqbo.Options) + return +} diff --git a/ibmmq/mqiCBC.go b/ibmmq/mqiCBC.go new file mode 100644 index 0000000..0d7ec2f --- /dev/null +++ b/ibmmq/mqiCBC.go @@ -0,0 +1,78 @@ +package ibmmq + +/* + Copyright (c) IBM Corporation 2018 + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Contributors: + Mark Taylor - Initial Contribution +*/ + +/* + +#include +#include +#include + +*/ +import "C" + +/* +MQCBC is a structure containing the MQ Callback Context +The CompCode and Reason in the C structure are not included here. They +are set in an independent MQReturn structure passed to the callback. Similarly +for the hObj +*/ +type MQCBC struct { + CallType int32 + CallbackArea []byte // These byte arrays are saved/restored in parent function + ConnectionArea []byte + State int32 + DataLength int32 + BufferLength int32 + Flags int32 + ReconnectDelay int32 +} + +/* +NewMQCBC creates a MQCBC structure. There are no default values +as the structure is created within MQ. +*/ +func NewMQCBC() *MQCBC { + cbc := new(MQCBC) + return cbc +} + +/* +Since we do not create the structure, there's no conversion for it into +a C format +*/ +func copyCBCtoC(mqcbc *C.MQCBC, gocbc *MQCBC) { + return +} + +/* +But we do need a conversion process from C +*/ +func copyCBCfromC(mqcbc *C.MQCBC, gocbc *MQCBC) { + gocbc.CallType = int32(mqcbc.CallType) + gocbc.State = int32(mqcbc.State) + gocbc.DataLength = int32(mqcbc.DataLength) + gocbc.BufferLength = int32(mqcbc.BufferLength) + gocbc.Flags = int32(mqcbc.Flags) + gocbc.ReconnectDelay = int32(mqcbc.ReconnectDelay) + // ConnectionArea and CallbackArea are restored outside this function + + return +} diff --git a/ibmmq/mqiCBD.go b/ibmmq/mqiCBD.go new file mode 100644 index 0000000..d1ef2e8 --- /dev/null +++ b/ibmmq/mqiCBD.go @@ -0,0 +1,78 @@ +package ibmmq + +/* + Copyright (c) IBM Corporation 2018 + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Contributors: + Mark Taylor - Initial Contribution +*/ + +/* + +#include +#include +#include + +*/ +import "C" + +/* +MQCBD is a structure containing the MQ Callback Descriptor +*/ +type MQCBD struct { + CallbackType int32 + Options int32 + CallbackArea []byte + CallbackFunction MQCB_FUNCTION + CallbackName string + MaxMsgLength int32 +} + +/* +NewMQCBD fills in default values for the MQCBD structure +*/ +func NewMQCBD() *MQCBD { + cbd := new(MQCBD) + cbd.CallbackType = C.MQCBT_MESSAGE_CONSUMER + cbd.Options = C.MQCBDO_NONE + cbd.CallbackArea = nil + cbd.CallbackFunction = nil + cbd.CallbackName = "" + cbd.MaxMsgLength = C.MQCBD_FULL_MSG_LENGTH + + return cbd +} + +func copyCBDtoC(mqcbd *C.MQCBD, gocbd *MQCBD) { + + setMQIString((*C.char)(&mqcbd.StrucId[0]), "CBD ", 4) + mqcbd.Version = C.MQCBD_VERSION_1 + + mqcbd.CallbackType = C.MQLONG(gocbd.CallbackType) + mqcbd.Options = C.MQLONG(gocbd.Options) | C.MQCBDO_FAIL_IF_QUIESCING + // CallbackArea is always set to NULL here. The user's values are saved/restored elsewhere + mqcbd.CallbackArea = (C.MQPTR)(C.NULL) + + setMQIString((*C.char)(&mqcbd.CallbackName[0]), gocbd.CallbackName, 128) // There's no MQI constant for the length + + mqcbd.MaxMsgLength = C.MQLONG(gocbd.MaxMsgLength) + + return +} + +func copyCBDfromC(mqcbd *C.MQCBD, gocbd *MQCBD) { + // There are no modified output parameters + return +} diff --git a/ibmmq/mqiCTLO.go b/ibmmq/mqiCTLO.go new file mode 100644 index 0000000..b9d01cd --- /dev/null +++ b/ibmmq/mqiCTLO.go @@ -0,0 +1,61 @@ +package ibmmq + +/* + Copyright (c) IBM Corporation 2018 + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Contributors: + Mark Taylor - Initial Contribution +*/ + +/* + +#include +#include +#include + +*/ +import "C" + +/* +MQCTLO is a structure containing the MQ Control Options +*/ +type MQCTLO struct { + ConnectionArea []byte + Options int32 +} + +/* +NewMQCTLO creates a MQCTLO structure. +*/ +func NewMQCTLO() *MQCTLO { + ctlo := new(MQCTLO) + ctlo.Options = C.MQCTLO_NONE + ctlo.ConnectionArea = nil + return ctlo +} + +func copyCTLOtoC(mqctlo *C.MQCTLO, goctlo *MQCTLO) { + setMQIString((*C.char)(&mqctlo.StrucId[0]), "CTLO", 4) + mqctlo.Version = C.MQCTLO_VERSION_1 + mqctlo.Options = (C.MQLONG)(goctlo.Options) | C.MQCTLO_FAIL_IF_QUIESCING + // Always pass NULL to the C function as the real array is saved/restored in the Go layer + mqctlo.ConnectionArea = (C.MQPTR)(C.NULL) + return +} + +func copyCTLOfromC(mqctlo *C.MQCTLO, goctlo *MQCTLO) { + // There are no output fields for this structure + return +} diff --git a/ibmmq/mqiMQGMO.go b/ibmmq/mqiMQGMO.go index 6168f5e..219cd85 100644 --- a/ibmmq/mqiMQGMO.go +++ b/ibmmq/mqiMQGMO.go @@ -80,7 +80,7 @@ func copyGMOtoC(mqgmo *C.MQGMO, gogmo *MQGMO) { setMQIString((*C.char)(&mqgmo.StrucId[0]), "GMO ", 4) mqgmo.Version = C.MQLONG(gogmo.Version) - mqgmo.Options = C.MQLONG(gogmo.Options) + mqgmo.Options = C.MQLONG(gogmo.Options) | C.MQGMO_FAIL_IF_QUIESCING mqgmo.WaitInterval = C.MQLONG(gogmo.WaitInterval) mqgmo.Signal1 = C.MQLONG(gogmo.Signal1) mqgmo.Signal2 = C.MQLONG(gogmo.Signal2) diff --git a/ibmmq/mqiMQPMO.go b/ibmmq/mqiMQPMO.go index b1b6ff5..9164131 100644 --- a/ibmmq/mqiMQPMO.go +++ b/ibmmq/mqiMQPMO.go @@ -90,7 +90,7 @@ func copyPMOtoC(mqpmo *C.MQPMO, gopmo *MQPMO) { setMQIString((*C.char)(&mqpmo.StrucId[0]), "PMO ", 4) mqpmo.Version = C.MQLONG(gopmo.Version) - mqpmo.Options = C.MQLONG(gopmo.Options) + mqpmo.Options = C.MQLONG(gopmo.Options) | C.MQPMO_FAIL_IF_QUIESCING mqpmo.Timeout = C.MQLONG(gopmo.Timeout) mqpmo.Context = gopmo.Context mqpmo.KnownDestCount = C.MQLONG(gopmo.KnownDestCount) diff --git a/ibmmq/mqiMQSD.go b/ibmmq/mqiMQSD.go index 84e0c46..bea6127 100644 --- a/ibmmq/mqiMQSD.go +++ b/ibmmq/mqiMQSD.go @@ -107,7 +107,7 @@ func copySDtoC(mqsd *C.MQSD, gosd *MQSD) { setMQIString((*C.char)(&mqsd.StrucId[0]), "SD ", 4) mqsd.Version = C.MQLONG(gosd.Version) - mqsd.Options = C.MQLONG(gosd.Options) + mqsd.Options = C.MQLONG(gosd.Options) | C.MQSO_FAIL_IF_QUIESCING setMQIString((*C.char)(&mqsd.ObjectName[0]), gosd.ObjectName, C.MQ_OBJECT_NAME_LENGTH) setMQIString((*C.char)(&mqsd.AlternateUserId[0]), gosd.AlternateUserId, C.MQ_USER_ID_LENGTH) diff --git a/ibmmq/mqiSRO.go b/ibmmq/mqiSRO.go index 0c0c90b..f040cfd 100644 --- a/ibmmq/mqiSRO.go +++ b/ibmmq/mqiSRO.go @@ -47,12 +47,12 @@ func NewMQSRO() *MQSRO { func copySROtoC(mqsro *C.MQSRO, gosro *MQSRO) { setMQIString((*C.char)(&mqsro.StrucId[0]), "SRO ", 4) mqsro.Version = 1 - mqsro.Options = C.MQLONG(gosro.Options) + mqsro.Options = C.MQLONG(gosro.Options) | C.MQSRO_FAIL_IF_QUIESCING mqsro.NumPubs = C.MQLONG(gosro.NumPubs) } func copySROfromC(mqsro *C.MQSRO, gosro *MQSRO) { - gosro.Options = gosro.Options - gosro.NumPubs = gosro.NumPubs + gosro.Options = int32(mqsro.Options) + gosro.NumPubs = int32(mqsro.NumPubs) return } diff --git a/ibmmq/mqicb.go b/ibmmq/mqicb.go new file mode 100644 index 0000000..68af08c --- /dev/null +++ b/ibmmq/mqicb.go @@ -0,0 +1,210 @@ +/* + Copyright (c) IBM Corporation 2018 + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Contributors: + Mark Taylor - Initial Contribution +*/ + +/* +This file deals with asynchronous delivery of MQ messages via the MQCTL/MQCB verbs. +*/ +package ibmmq + +/* +#include +#include +#include + +extern void MQCALLBACK_Go(MQHCONN, MQMD *, MQGMO *, PMQVOID, MQCBC *); +extern void MQCALLBACK_C(MQHCONN hc,MQMD *md,MQGMO *gmo,PMQVOID buf,MQCBC *cbc); +*/ +import "C" +import ( + "fmt" + "strings" + "unsafe" +) + +// The user's callback function must match this signature +type MQCB_FUNCTION func(*MQObject, *MQMD, *MQGMO, []byte, *MQCBC, *MQReturn) + +// Need to keep references to the user's callback function and some other +// structure elements which do not map to the C functions, or do not need to +// be passed onwards +type cbInfo struct { + hObj *MQObject + callbackFunction MQCB_FUNCTION + callbackArea []byte + connectionArea []byte +} + +// This map is indexed by a combination of the hConn and hObj values +var cbMap = make(map[string]*cbInfo) + +/* +MQCALLBACK_Go is a wrapper callback function that will invoke the user-supplied callback +after converting the C structures into the corresponding Go format. + +The "export" directive makes the function available through the CGo processing to be +accessible from a C function. See mqicb_c.go for the proxy/gateway C function that in turn calls this one +*/ +//export MQCALLBACK_Go +func MQCALLBACK_Go(hConn C.MQHCONN, mqmd *C.MQMD, mqgmo *C.MQGMO, mqBuffer C.PMQVOID, mqcbc *C.MQCBC) { + + // Find the real callback function and invoke it + // Invoked function should match signature of the MQCB_FUNCTION type + gogmo := NewMQGMO() + gomd := NewMQMD() + gocbc := NewMQCBC() + + copyGMOfromC(mqgmo, gogmo) + copyMDfromC(mqmd, gomd) + copyCBCfromC(mqcbc, gocbc) + + mqreturn := &MQReturn{MQCC: int32(mqcbc.CompCode), + MQRC: int32(mqcbc.Reason), + verb: "MQCALLBACK", + } + + key := makeKey(hConn, mqcbc.Hobj) + if info, ok := cbMap[key]; ok { + + gocbc.CallbackArea = info.callbackArea + gocbc.ConnectionArea = info.connectionArea + + // Get the data + b := C.GoBytes(unsafe.Pointer(mqBuffer), C.int(mqcbc.DataLength)) + + // And finally call the user function + info.callbackFunction(info.hObj, gomd, gogmo, b, gocbc, mqreturn) + } +} + +/* +CB is the function to register/unregister a callback function for a queue, based on +criteria in the message descriptor and get-message-options +*/ +func (object *MQObject) CB(goOperation int32, gocbd *MQCBD, gomd *MQMD, gogmo *MQGMO) error { + var mqrc C.MQLONG + var mqcc C.MQLONG + var mqOperation C.MQLONG + var mqcbd C.MQCBD + var mqmd C.MQMD + var mqgmo C.MQGMO + + mqOperation = C.MQLONG(goOperation) + copyCBDtoC(&mqcbd, gocbd) + copyMDtoC(&mqmd, gomd) + copyGMOtoC(&mqgmo, gogmo) + + key := makeKey(object.qMgr.hConn, object.hObj) + + // The callback function is a C function that is a proxy for the MQCALLBACK_Go function + // defined here. And that in turn will call the user's callback function + mqcbd.CallbackFunction = (C.MQPTR)(unsafe.Pointer(C.MQCALLBACK_C)) + + C.MQCB(object.qMgr.hConn, mqOperation, (C.PMQVOID)(unsafe.Pointer(&mqcbd)), + object.hObj, + (C.PMQVOID)(unsafe.Pointer(&mqmd)), (C.PMQVOID)(unsafe.Pointer(&mqgmo)), + &mqcc, &mqrc) + + mqreturn := MQReturn{MQCC: int32(mqcc), + MQRC: int32(mqrc), + verb: "MQCB", + } + + if mqcc != C.MQCC_OK { + return &mqreturn + } + + // Add or remove the control information in the map used by the callback routines + switch mqOperation { + case C.MQOP_DEREGISTER: + delete(cbMap, key) + case C.MQOP_REGISTER: + // Stash the hObj and real function to be called + info := &cbInfo{hObj: object, + callbackFunction: gocbd.CallbackFunction, + connectionArea: nil, + callbackArea: gocbd.CallbackArea} + cbMap[key] = info + default: // Other values leave the map alone + } + + return nil +} + +/* +Ctl is the function that starts/stops invocation of a registered callback. +*/ +func (x *MQQueueManager) Ctl(goOperation int32, goctlo *MQCTLO) error { + var mqrc C.MQLONG + var mqcc C.MQLONG + var mqOperation C.MQLONG + var mqctlo C.MQCTLO + + mqOperation = C.MQLONG(goOperation) + copyCTLOtoC(&mqctlo, goctlo) + + // Need to make sure control information is available before the callback + // is enabled. So this gets setup even if the MQCTL fails. + key := makePartialKey(x.hConn) + for k, info := range cbMap { + if strings.HasPrefix(k, key) { + info.connectionArea = goctlo.ConnectionArea + } + } + + C.MQCTL(x.hConn, mqOperation, (C.PMQVOID)(unsafe.Pointer(&mqctlo)), &mqcc, &mqrc) + + mqreturn := MQReturn{MQCC: int32(mqcc), + MQRC: int32(mqrc), + verb: "MQCTL", + } + + if mqcc != C.MQCC_OK { + return &mqreturn + } + + return nil +} + +// Functions below here manage the map of objects and control information so that +// the Go variables can be saved/restored from invocations to the C layer +func makeKey(hConn C.MQHCONN, hObj C.MQHOBJ) string { + key := fmt.Sprintf("%d/%d", hConn, hObj) + return key +} + +func makePartialKey(hConn C.MQHCONN) string { + key := fmt.Sprintf("%d/", hConn) + return key +} + +// Functions to delete any structures used to map C elements to Go +func cbRemoveConnection(hConn C.MQHCONN) { + // Remove all of the hObj values for this hconn + key := makePartialKey(hConn) + for k, _ := range cbMap { + if strings.HasPrefix(k, key) { + delete(cbMap, k) + } + } +} + +func cbRemoveHandle(hConn C.MQHCONN, hObj C.MQHOBJ) { + key := makeKey(hConn, hObj) + delete(cbMap, key) +} diff --git a/ibmmq/mqicb_c.go b/ibmmq/mqicb_c.go new file mode 100644 index 0000000..6173893 --- /dev/null +++ b/ibmmq/mqicb_c.go @@ -0,0 +1,37 @@ +/* + Copyright (c) IBM Corporation 2018 + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Contributors: + Mark Taylor - Initial Contribution +*/ +package ibmmq + +/* +#include +#include +#include + +extern void MQCALLBACK_Go(MQHCONN, MQMD *, MQGMO *, PMQVOID, MQCBC *); +void MQCALLBACK_C(MQHCONN hc,MQMD *md,MQGMO *gmo,PMQVOID buf,MQCBC *cbc) { + MQCALLBACK_Go(hc,md,gmo,buf,cbc); +} +*/ +import "C" + +// This file exists purely to provide the linkage between a C callback function +// and Go for the MQCB/MQCTL asynchronous message consumer. The MQCALLBACK_C function +// has to be in a separate file to avoid "multiple definition" errors from the +// CGo compilation process. It looks like it is just a comment above, but +// that section of the file is processed by CGo. diff --git a/samples/README.md b/samples/README.md index 9e90257..c38852c 100644 --- a/samples/README.md +++ b/samples/README.md @@ -27,6 +27,7 @@ Allow use of a userid/password for authentication. There are no default values f * amqsprop.go: Set and extract message properties * amqsinq.go : Demonstrate the new InqMap API for inquiring about object attributes * amqsset.go : Demonstrate how to set attributes of an MQ object using the MQSET verb +* amqscb.go : Demonstrate use of the CALLBACK capability for asynchronous consumption of messages Some trivial scripts run the sample programs in matching pairs: * putget.sh : Run amqsput and then use the generated MsgId to get the same message with amqsget diff --git a/samples/amqscb.go b/samples/amqscb.go new file mode 100644 index 0000000..5d05b81 --- /dev/null +++ b/samples/amqscb.go @@ -0,0 +1,189 @@ +/* + * This is an example of a Go program to get messages from an IBM MQ + * queue. It uses the asynchronous callback operation instead of using + * a synchronous MQGET. + * + * The queue and queue manager name can be given as parameters on the + * command line. Defaults are coded in the program. + * + * Each MQI call prints its success or failure. + * + */ +package main + +/* + Copyright (c) IBM Corporation 2018 + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the license. + + Contributors: + Mark Taylor - Initial Contribution +*/ + +import ( + "fmt" + "os" + "strings" + "time" + + "github.com/ibm-messaging/mq-golang/ibmmq" +) + +var qMgrObject ibmmq.MQObject +var qObject ibmmq.MQObject + +var ok = true + +// The main function just expects to be given a return code for Exit() +func main() { + os.Exit(mainWithRc()) +} + +// This is the callback function invoked when a message arrives on the queue. +func cb(hObj *ibmmq.MQObject, md *ibmmq.MQMD, gmo *ibmmq.MQGMO, buffer []byte, cbc *ibmmq.MQCBC, err *ibmmq.MQReturn) { + buflen := len(buffer) + if err.MQCC != ibmmq.MQCC_OK { + fmt.Println(err) + ok = false + } else { + // Assume the message is a printable string, which it will be + // if it's been created by the amqsput program + fmt.Printf("In callback - Got message of length %d from queue %s: ", buflen, hObj.Name) + fmt.Println(strings.TrimSpace(string(buffer[:buflen]))) + } +} + +// The real main function is here to set a return code. +func mainWithRc() int { + + // The default queue manager and queue to be used. These can be overridden on command line. + qMgrName := "QM1" + qName := "DEV.QUEUE.1" + + fmt.Println("Sample AMQSCB.GO start") + + // Get the queue and queue manager names from command line for overriding + // the defaults. Parameters are not required. + if len(os.Args) >= 2 { + qName = os.Args[1] + } + + if len(os.Args) >= 3 { + qMgrName = os.Args[2] + } + + // Connect to the queue manager. + qMgrObject, err := ibmmq.Conn(qMgrName) + if err != nil { + fmt.Println(err) + } else { + fmt.Printf("Connected to queue manager %s\n", qMgrName) + defer disc(qMgrObject) + } + + // Open the queue + if err == nil { + // Create the Object Descriptor that allows us to give the queue name + mqod := ibmmq.NewMQOD() + + // We have to say how we are going to use this queue. In this case, to GET + // messages. That is done in the openOptions parameter. + openOptions := ibmmq.MQOO_INPUT_EXCLUSIVE + + // Opening a QUEUE (rather than a Topic or other object type) and give the name + mqod.ObjectType = ibmmq.MQOT_Q + mqod.ObjectName = qName + + qObject, err = qMgrObject.Open(mqod, openOptions) + if err != nil { + fmt.Println(err) + } else { + fmt.Println("Opened queue", qObject.Name) + defer close(qObject) + } + } + + if err == nil { + // The GET/MQCB requires control structures, the Message Descriptor (MQMD) + // and Get Options (MQGMO). Create those with default values. + getmqmd := ibmmq.NewMQMD() + gmo := ibmmq.NewMQGMO() + + // The default options are OK, but it's always + // a good idea to be explicit about transactional boundaries as + // not all platforms behave the same way. + gmo.Options = ibmmq.MQGMO_NO_SYNCPOINT + + // Set options to wait for a maximum of 3 seconds for any new message to arrive + gmo.Options |= ibmmq.MQGMO_WAIT + gmo.WaitInterval = 3 * 1000 // The WaitInterval is in milliseconds + + // The MQCBD structure is used to specify the function to be invoked + // when a message arrives on a queue + cbd := ibmmq.NewMQCBD() + cbd.CallbackFunction = cb // The function at the top of this file + + // Register the callback function along with any selection criteria from the + // MQMD and MQGMO parameters + err = qObject.CB(ibmmq.MQOP_REGISTER, cbd, getmqmd, gmo) + } + + if err == nil { + // Then we are ready to enable the callback function. Any messages + // on the queue will be sent to the callback + ctlo := ibmmq.NewMQCTLO() // Default parameters are OK + err = qMgrObject.Ctl(ibmmq.MQOP_START, ctlo) + if err == nil { + // Use defer to disable the message consumer when we are ready to exit. + // Otherwise the shutdown will give MQRC_HCONN_ASYNC_ACTIVE error + defer qMgrObject.Ctl(ibmmq.MQOP_STOP, ctlo) + } + } + + // Keep the program running until the callback has indicated there are no + // more messages. + for ok { + d, _ := time.ParseDuration("5s") + time.Sleep(d) + } + + // Exit with any return code extracted from the failing MQI call. + // Deferred disconnect/close will happen after the return + mqret := 0 + if err != nil { + mqret = int((err.(*ibmmq.MQReturn)).MQCC) + } + return mqret +} + +// Disconnect from the queue manager +func disc(qMgrObject ibmmq.MQQueueManager) error { + err := qMgrObject.Disc() + if err == nil { + fmt.Printf("Disconnected from queue manager %s\n", qMgrObject.Name) + } else { + fmt.Println(err) + } + return err +} + +// Close the queue if it was opened +func close(object ibmmq.MQObject) error { + err := object.Close(0) + if err == nil { + fmt.Println("Closed queue") + } else { + fmt.Println(err) + } + return err +} From f832621500b60f3915fe747f83955735a1bd206f Mon Sep 17 00:00:00 2001 From: ibmmqmet Date: Tue, 4 Dec 2018 08:30:12 +0000 Subject: [PATCH 3/7] Formatting --- ibmmq/mqi.go | 2 +- ibmmq/mqicb.go | 8 ++++---- samples/amqscb.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ibmmq/mqi.go b/ibmmq/mqi.go index b3e3183..5a1cb7f 100644 --- a/ibmmq/mqi.go +++ b/ibmmq/mqi.go @@ -384,7 +384,7 @@ func (subObject *MQObject) Subrq(gosro *MQSRO, action int32) error { /* Begin is the function to start a two-phase XA transaction coordinated by MQ */ -func (x *MQQueueManager) Begin(gobo *MQBO) error { +func (x *MQQueueManager) Begin(gobo *MQBO) error { var mqrc C.MQLONG var mqcc C.MQLONG var mqbo C.MQBO diff --git a/ibmmq/mqicb.go b/ibmmq/mqicb.go index 68af08c..a111827 100644 --- a/ibmmq/mqicb.go +++ b/ibmmq/mqicb.go @@ -84,10 +84,10 @@ func MQCALLBACK_Go(hConn C.MQHCONN, mqmd *C.MQMD, mqgmo *C.MQGMO, mqBuffer C.PMQ gocbc.CallbackArea = info.callbackArea gocbc.ConnectionArea = info.connectionArea - // Get the data + // Get the data b := C.GoBytes(unsafe.Pointer(mqBuffer), C.int(mqcbc.DataLength)) - // And finally call the user function + // And finally call the user function info.callbackFunction(info.hObj, gomd, gogmo, b, gocbc, mqreturn) } } @@ -129,7 +129,7 @@ func (object *MQObject) CB(goOperation int32, gocbd *MQCBD, gomd *MQMD, gogmo *M return &mqreturn } - // Add or remove the control information in the map used by the callback routines + // Add or remove the control information in the map used by the callback routines switch mqOperation { case C.MQOP_DEREGISTER: delete(cbMap, key) @@ -158,7 +158,7 @@ func (x *MQQueueManager) Ctl(goOperation int32, goctlo *MQCTLO) error { mqOperation = C.MQLONG(goOperation) copyCTLOtoC(&mqctlo, goctlo) - // Need to make sure control information is available before the callback + // Need to make sure control information is available before the callback // is enabled. So this gets setup even if the MQCTL fails. key := makePartialKey(x.hConn) for k, info := range cbMap { diff --git a/samples/amqscb.go b/samples/amqscb.go index 5d05b81..c31c5d7 100644 --- a/samples/amqscb.go +++ b/samples/amqscb.go @@ -152,8 +152,8 @@ func mainWithRc() int { // Keep the program running until the callback has indicated there are no // more messages. + d, _ := time.ParseDuration("5s") for ok { - d, _ := time.ParseDuration("5s") time.Sleep(d) } From f9e53bf4df7324da0f002877877b35cd88b7337c Mon Sep 17 00:00:00 2001 From: ibmmqmet Date: Tue, 4 Dec 2018 09:29:23 +0000 Subject: [PATCH 4/7] Put _count back in to metric format --- mqmetric/discover.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mqmetric/discover.go b/mqmetric/discover.go index 6e2d4fc..f38d35d 100755 --- a/mqmetric/discover.go +++ b/mqmetric/discover.go @@ -862,10 +862,11 @@ func formatDescription(elem *MonElement) string { // There are some metrics that have both "count" and "byte count" in // the descriptions. They were getting mapped to the same string, so - // we have to ensure uniqueness. We do not put "_count" on the - // metric name. + // we have to ensure uniqueness. if strings.Contains(elem.Description, "byte count") { s = s + "_bytes" + } else if strings.HasSuffix(elem.Description," count") && !strings.Contains(s,"_count") { + s = s + "_count" } } From 888f08cf16087c8ff38f5e80cc1a8aa08b497645 Mon Sep 17 00:00:00 2001 From: ibmmqmet Date: Wed, 5 Dec 2018 10:28:52 +0000 Subject: [PATCH 5/7] Datatype error for Windows --- ibmmq/mqi.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ibmmq/mqi.go b/ibmmq/mqi.go index 5a1cb7f..ae5eb8c 100644 --- a/ibmmq/mqi.go +++ b/ibmmq/mqi.go @@ -1249,11 +1249,11 @@ func (handle *MQMessageHandle) InqMP(goimpo *MQIMPO, gopd *MQPD, name string) (s propertyValue = true } case C.MQTYPE_STRING: - propertyValue = C.GoStringN((*C.char)(propertyPtr), propertyLength) + propertyValue = C.GoStringN((*C.char)(propertyPtr), (C.int)(propertyLength)) case C.MQTYPE_BYTE_STRING: ba := make([]byte, propertyLength) p := (*C.MQBYTE)(propertyPtr) - copy(ba[:], C.GoBytes(unsafe.Pointer(p), propertyLength)) + copy(ba[:], C.GoBytes(unsafe.Pointer(p), (C.int)(propertyLength))) propertyValue = ba case C.MQTYPE_NULL: propertyValue = nil From 5adb00586b5768cb0c061300969b875575bcafcf Mon Sep 17 00:00:00 2001 From: ibmmqmet Date: Fri, 7 Dec 2018 07:16:13 +0000 Subject: [PATCH 6/7] permit any object type for callback correlators --- ibmmq/mqiCBC.go | 4 ++-- ibmmq/mqiCBD.go | 2 +- ibmmq/mqiCTLO.go | 2 +- ibmmq/mqicb.go | 4 ++-- samples/amqscb.go | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ibmmq/mqiCBC.go b/ibmmq/mqiCBC.go index 0d7ec2f..b3048b2 100644 --- a/ibmmq/mqiCBC.go +++ b/ibmmq/mqiCBC.go @@ -36,8 +36,8 @@ for the hObj */ type MQCBC struct { CallType int32 - CallbackArea []byte // These byte arrays are saved/restored in parent function - ConnectionArea []byte + CallbackArea interface{} // These fields are saved/restored in parent function + ConnectionArea interface{} State int32 DataLength int32 BufferLength int32 diff --git a/ibmmq/mqiCBD.go b/ibmmq/mqiCBD.go index d1ef2e8..8b739cf 100644 --- a/ibmmq/mqiCBD.go +++ b/ibmmq/mqiCBD.go @@ -34,7 +34,7 @@ MQCBD is a structure containing the MQ Callback Descriptor type MQCBD struct { CallbackType int32 Options int32 - CallbackArea []byte + CallbackArea interface{} CallbackFunction MQCB_FUNCTION CallbackName string MaxMsgLength int32 diff --git a/ibmmq/mqiCTLO.go b/ibmmq/mqiCTLO.go index b9d01cd..2874d60 100644 --- a/ibmmq/mqiCTLO.go +++ b/ibmmq/mqiCTLO.go @@ -32,7 +32,7 @@ import "C" MQCTLO is a structure containing the MQ Control Options */ type MQCTLO struct { - ConnectionArea []byte + ConnectionArea interface{} Options int32 } diff --git a/ibmmq/mqicb.go b/ibmmq/mqicb.go index a111827..b15e89e 100644 --- a/ibmmq/mqicb.go +++ b/ibmmq/mqicb.go @@ -46,8 +46,8 @@ type MQCB_FUNCTION func(*MQObject, *MQMD, *MQGMO, []byte, *MQCBC, *MQReturn) type cbInfo struct { hObj *MQObject callbackFunction MQCB_FUNCTION - callbackArea []byte - connectionArea []byte + callbackArea interface{} + connectionArea interface{} } // This map is indexed by a combination of the hConn and hObj values diff --git a/samples/amqscb.go b/samples/amqscb.go index c31c5d7..66ade77 100644 --- a/samples/amqscb.go +++ b/samples/amqscb.go @@ -153,7 +153,7 @@ func mainWithRc() int { // Keep the program running until the callback has indicated there are no // more messages. d, _ := time.ParseDuration("5s") - for ok { + for ok && err == nil { time.Sleep(d) } From a887947c12d1a5c3efffe8f577cf049b4bbf9268 Mon Sep 17 00:00:00 2001 From: ibmmqmet Date: Mon, 10 Dec 2018 08:20:39 +0000 Subject: [PATCH 7/7] Add Dead Letter Header parser --- CHANGELOG.md | 1 + README.md | 2 - ibmmq/mqi.go | 34 ++++++- ibmmq/mqiDLH.go | 148 ++++++++++++++++++++++++++++ ibmmq/mqiPCF.go | 2 - mqmetric/discover.go | 2 +- samples/amqsdlh.go | 228 +++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 410 insertions(+), 7 deletions(-) create mode 100644 ibmmq/mqiDLH.go create mode 100644 samples/amqsdlh.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 223da44..32d44a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * Samples updated to use "defer" instead of just suggesting it * Add support for MQCB/MQCTL callback functions * Add support for MQBEGIN transaction management +* Add Dead Letter Header parser ## November 2018 - v3.2.0 * Added GetPlatform to mqmetric so it can be used as a label/tag in collectors diff --git a/README.md b/README.md index 6e2b4b4..7c22b3f 100755 --- a/README.md +++ b/README.md @@ -125,8 +125,6 @@ At this point, you should have a compiled copy of the program in `$GOPATH/bin`. All regular MQI verbs are now available through the `ibmmq` package. -There are no structure handlers for message headers such as MQRFH2 or MQDLH. - ## History See [CHANGELOG](CHANGELOG.md) in this directory. diff --git a/ibmmq/mqi.go b/ibmmq/mqi.go index ae5eb8c..08d176b 100644 --- a/ibmmq/mqi.go +++ b/ibmmq/mqi.go @@ -57,6 +57,7 @@ import "C" import ( "encoding/binary" + "io" "strings" "unsafe" ) @@ -120,6 +121,8 @@ func (e *MQReturn) Error() string { return mqstrerror(e.verb, C.MQLONG(e.MQCC), C.MQLONG(e.MQRC)) } +var endian binary.ByteOrder // Used by structure formatters such as MQCFH + /* * Copy a Go string in "strings" * to a fixed-size C char array such as MQCHAR12 @@ -139,7 +142,7 @@ func setMQIString(a *C.char, v string, l int) { /* * The C.GoStringN function can return strings that include * NUL characters (which is not really what is expected for a C string-related - * function). So we have a utility function to remove any trailing nulls + * function). So we have a utility function to remove any trailing nulls and spaces */ func trimStringN(c *C.char, l C.int) string { var rc string @@ -150,7 +153,7 @@ func trimStringN(c *C.char, l C.int) string { } else { rc = s[0:i] } - return rc + return strings.TrimSpace(rc) } /* @@ -1261,3 +1264,30 @@ func (handle *MQMessageHandle) InqMP(goimpo *MQIMPO, gopd *MQPD, name string) (s return goimpo.ReturnedName, propertyValue, nil } + +/* +GetHeader returns a structure containing a parsed-out version of an MQI +message header such as the MQDLH (which is currently the only structure +supported). Other structures like the RFH2 could follow. + +The caller of this function needs to cast the returned structure to the +specific type in order to reference the fields. +*/ +func GetHeader(md *MQMD, buf []byte) (interface{}, int, error) { + switch md.Format { + case MQFMT_DEAD_LETTER_HEADER: + return getHeaderDLH(md, buf) + } + + mqreturn := &MQReturn{MQCC: int32(MQCC_FAILED), + MQRC: int32(MQRC_FORMAT_NOT_SUPPORTED), + } + + return nil, 0, mqreturn +} + +func readStringFromFixedBuffer(r io.Reader, l int32) string { + tmpBuf := make([]byte, l) + binary.Read(r, endian, tmpBuf) + return strings.TrimSpace(string(tmpBuf)) +} diff --git a/ibmmq/mqiDLH.go b/ibmmq/mqiDLH.go new file mode 100644 index 0000000..6faf941 --- /dev/null +++ b/ibmmq/mqiDLH.go @@ -0,0 +1,148 @@ +package ibmmq + +/* + Copyright (c) IBM Corporation 2016,2018 + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Contributors: + Mark Taylor - Initial Contribution +*/ + +/* +#include +#include +#include +*/ +import "C" + +import ( + "bytes" + "encoding/binary" +) + +type MQDLH struct { + Reason int32 + DestQName string + DestQMgrName string + Encoding int32 + CodedCharSetId int32 + Format string + PutApplType int32 + PutApplName string + PutDate string + PutTime string + strucLength int // Not exported +} + +func NewMQDLH(md *MQMD) *MQDLH { + dlh := new(MQDLH) + dlh.Reason = MQRC_NONE + dlh.CodedCharSetId = MQCCSI_UNDEFINED + dlh.PutApplType = 0 + dlh.PutApplName = "" + dlh.PutTime = "" + dlh.PutDate = "" + dlh.Format = "" + dlh.DestQName = "" + dlh.DestQMgrName = "" + + dlh.strucLength = int(MQDLH_CURRENT_LENGTH) + + if md != nil { + dlh.Encoding = md.Encoding + if md.CodedCharSetId == MQCCSI_DEFAULT { + dlh.CodedCharSetId = MQCCSI_INHERIT + } else { + dlh.CodedCharSetId = md.CodedCharSetId + } + dlh.Format = md.Format + + md.Format = MQFMT_DEAD_LETTER_HEADER + md.MsgType = MQMT_REPORT + md.CodedCharSetId = MQCCSI_Q_MGR + } + + if (C.MQENC_NATIVE % 2) == 0 { + endian = binary.LittleEndian + } else { + endian = binary.BigEndian + } + + return dlh +} + +func (dlh *MQDLH) Bytes() []byte { + buf := make([]byte, dlh.strucLength) + offset := 0 + + copy(buf[offset:], "DLH ") + offset += 4 + endian.PutUint32(buf[offset:], uint32(MQDLH_CURRENT_VERSION)) + offset += 4 + endian.PutUint32(buf[offset:], uint32(dlh.Reason)) + offset += 4 + copy(buf[offset:], dlh.DestQName) + offset += int(MQ_OBJECT_NAME_LENGTH) + copy(buf[offset:], dlh.DestQMgrName) + offset += int(MQ_Q_MGR_NAME_LENGTH) + endian.PutUint32(buf[offset:], uint32(dlh.Encoding)) + offset += 4 + endian.PutUint32(buf[offset:], uint32(dlh.CodedCharSetId)) + offset += 4 + copy(buf[offset:], dlh.Format) + offset += int(MQ_FORMAT_LENGTH) + endian.PutUint32(buf[offset:], uint32(dlh.PutApplType)) + offset += 4 + copy(buf[offset:], dlh.PutApplName) + offset += int(MQ_PUT_APPL_NAME_LENGTH) + copy(buf[offset:], dlh.PutDate) + offset += int(MQ_PUT_DATE_LENGTH) + copy(buf[offset:], dlh.PutTime) + offset += int(MQ_PUT_TIME_LENGTH) + + return buf +} + +/* +We have a byte array for the message contents. The start of that buffer +is the MQDLH structure. We read the bytes from that fixed header to match +the C structure definition for each field. The DLH does not have multiple +versions defined so we don't need to check that as we go through. +*/ +func getHeaderDLH(md *MQMD, buf []byte) (*MQDLH, int, error) { + + var version int32 + + dlh := NewMQDLH(nil) + + r := bytes.NewBuffer(buf) + _ = readStringFromFixedBuffer(r, 4) // StrucId + binary.Read(r, endian, &version) + binary.Read(r, endian, &dlh.Reason) + dlh.DestQName = readStringFromFixedBuffer(r, MQ_OBJECT_NAME_LENGTH) + dlh.DestQMgrName = readStringFromFixedBuffer(r, MQ_Q_MGR_NAME_LENGTH) + + binary.Read(r, endian, &dlh.Encoding) + binary.Read(r, endian, &dlh.CodedCharSetId) + + dlh.Format = readStringFromFixedBuffer(r, MQ_FORMAT_LENGTH) + + binary.Read(r, endian, &dlh.PutApplType) + + dlh.PutApplName = readStringFromFixedBuffer(r, MQ_PUT_APPL_NAME_LENGTH) + dlh.PutDate = readStringFromFixedBuffer(r, MQ_PUT_DATE_LENGTH) + dlh.PutTime = readStringFromFixedBuffer(r, MQ_PUT_TIME_LENGTH) + + return dlh, dlh.strucLength, nil +} diff --git a/ibmmq/mqiPCF.go b/ibmmq/mqiPCF.go index d260ca4..93c2f14 100644 --- a/ibmmq/mqiPCF.go +++ b/ibmmq/mqiPCF.go @@ -47,8 +47,6 @@ type MQCFH struct { ParameterCount int32 } -var endian binary.ByteOrder - /* PCFParameter is a structure containing the data associated with various types of PCF element. Use the Type field to decide which diff --git a/mqmetric/discover.go b/mqmetric/discover.go index f38d35d..b2faabc 100755 --- a/mqmetric/discover.go +++ b/mqmetric/discover.go @@ -865,7 +865,7 @@ func formatDescription(elem *MonElement) string { // we have to ensure uniqueness. if strings.Contains(elem.Description, "byte count") { s = s + "_bytes" - } else if strings.HasSuffix(elem.Description," count") && !strings.Contains(s,"_count") { + } else if strings.HasSuffix(elem.Description, " count") && !strings.Contains(s, "_count") { s = s + "_count" } } diff --git a/samples/amqsdlh.go b/samples/amqsdlh.go new file mode 100644 index 0000000..b038e20 --- /dev/null +++ b/samples/amqsdlh.go @@ -0,0 +1,228 @@ +/* + * This is an example of a Go program to put and get messages to an IBM MQ + * queue while manipulating a Dead Letter Header + * + * The queue and queue manager name can be given as parameters on the + * command line. Defaults are coded in the program. + * + */ +package main + +/* + Copyright (c) IBM Corporation 2018 + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the license. + + Contributors: + Mark Taylor - Initial Contribution +*/ + +import ( + "fmt" + "os" + "strings" + "time" + + "github.com/ibm-messaging/mq-golang/ibmmq" +) + +var qMgrObject ibmmq.MQObject +var qObject ibmmq.MQObject + +func main() { + os.Exit(mainWithRc()) +} + +func addDLH(md *ibmmq.MQMD, buf []byte) []byte { + // Create a new Dead Letter Header. This function modifies + // the original message descriptor to indicate there is a DLH + dlh := ibmmq.NewMQDLH(md) + + // Fill in the reason this message needs to be put to a DLQ along with + // any other relevant information. + dlh.Reason = ibmmq.MQRC_NOT_AUTHORIZED + dlh.DestQName = "DEST.QUEUE" + dlh.DestQMgrName = "DEST.QMGR" + // Set the current date/time in the header. The way Go does date formatting + // is very odd.Force the hundredths as there doesn't seem to be a simple way + // to extract it without a '.' in the format. + dlh.PutTime = time.Now().Format("030405") + dlh.PutDate = time.Now().Format("20060102") + + // Then return a modified buffer with the original message data + // following the DLH + return append(dlh.Bytes(), buf...) +} + +// Extract the DLH from the body of the message, print it and then +// print the remaining body. +func printDLH(md *ibmmq.MQMD, buf []byte) { + bodyStart := 0 + buflen := len(buf) + + // Look to see if there is indeed a DLH + fmt.Printf("Format = '%s'\n", md.Format) + if md.Format == ibmmq.MQFMT_DEAD_LETTER_HEADER { + header, headerLen, err := ibmmq.GetHeader(md, buf) + if err == nil { + dlh, ok := header.(*ibmmq.MQDLH) + if ok { + bodyStart += headerLen + fmt.Printf("DLH Structure = %v\n", dlh) + fmt.Printf("Format of next element = '%s'\n", dlh.Format) + } + } + } + + // The original message data starts further on in the slice + fmt.Printf("Got message of total length %d: ", buflen) + fmt.Println(strings.TrimSpace(string(buf[bodyStart:buflen]))) +} + +// The real main function is here to set a return code. +func mainWithRc() int { + var putmqmd *ibmmq.MQMD + + // The default queue manager and queue to be used. These can be overridden on command line. + qMgrName := "QM1" + qName := "DEV.QUEUE.1" + + fmt.Println("Sample AMQSDLH.GO start") + + // Get the queue and queue manager names from command line for overriding + // the defaults. Parameters are not required. + if len(os.Args) >= 2 { + qName = os.Args[1] + } + + if len(os.Args) >= 3 { + qMgrName = os.Args[2] + } + + // This is where we connect to the queue manager. It is assumed + // that the queue manager is either local, or you have set the + // client connection information externally eg via a CCDT or the + // MQSERVER environment variable + qMgrObject, err := ibmmq.Conn(qMgrName) + if err != nil { + fmt.Println(err) + } else { + fmt.Printf("Connected to queue manager %s\n", qMgrName) + defer disc(qMgrObject) + } + + // Open of the queue + if err == nil { + // Create the Object Descriptor that allows us to give the queue name + mqod := ibmmq.NewMQOD() + + // We have to say how we are going to use this queue. In this case, to PUT and GET + // messages. That is done in the openOptions parameter. + openOptions := ibmmq.MQOO_OUTPUT | ibmmq.MQOO_INPUT_AS_Q_DEF + + // Opening a QUEUE (rather than a Topic or other object type) and give the name + mqod.ObjectType = ibmmq.MQOT_Q + mqod.ObjectName = qName + + qObject, err = qMgrObject.Open(mqod, openOptions) + if err != nil { + fmt.Println(err) + } else { + fmt.Println("Opened queue", qObject.Name) + defer close(qObject) + } + } + + // PUT the message to the queue + if err == nil { + putmqmd = ibmmq.NewMQMD() + pmo := ibmmq.NewMQPMO() + + pmo.Options = ibmmq.MQPMO_NO_SYNCPOINT + + // Create the contents to include a timestamp just to prove when it was created + msgData := "Hello from Go at " + time.Now().Format(time.RFC3339) + buffer := []byte(msgData) + putmqmd.Format = ibmmq.MQFMT_STRING + + // Add a Dead Letter Header to the message. + newBuffer := addDLH(putmqmd, buffer) + + // Put the message to the queue) + err = qObject.Put(putmqmd, pmo, newBuffer) + if err != nil { + fmt.Println(err) + } + } + + // And now try to GET the message we just put + if err == nil { + getmqmd := ibmmq.NewMQMD() + gmo := ibmmq.NewMQGMO() + + gmo.Options = ibmmq.MQGMO_NO_SYNCPOINT + + // Set options to not wait - we know the message is there since we just put it + gmo.Options |= ibmmq.MQGMO_NO_WAIT + + // Use the MsgId to retrieve the same message + gmo.MatchOptions = ibmmq.MQMO_MATCH_MSG_ID + getmqmd.MsgId = putmqmd.MsgId + + // Create a buffer for the message data. This one is large enough + // for the messages put by the amqsput sample. + buffer := make([]byte, 1024) + + // Now we can try to get the message. + datalen := 0 + datalen, err = qObject.Get(getmqmd, gmo, buffer) + + if err != nil { + fmt.Println(err) + } else { + // A message has been retrieved. Print the contents, and the DLH + // if one exists + printDLH(getmqmd, buffer[0:datalen]) + } + } + + // Exit with any return code extracted from the failing MQI call. + // Deferred disconnect will happen after the return + mqret := 0 + if err != nil { + mqret = int((err.(*ibmmq.MQReturn)).MQCC) + } + return mqret +} + +// Disconnect from the queue manager +func disc(qMgrObject ibmmq.MQQueueManager) error { + err := qMgrObject.Disc() + if err == nil { + fmt.Printf("Disconnected from queue manager %s\n", qMgrObject.Name) + } else { + fmt.Println(err) + } + return err +} + +// Close the queue if it was opened +func close(object ibmmq.MQObject) error { + err := object.Close(0) + if err == nil { + fmt.Println("Closed queue") + } else { + fmt.Println(err) + } + return err +}