diff --git a/memoryleaks_test.go b/memoryleaks_test.go new file mode 100644 index 0000000..f9bf960 --- /dev/null +++ b/memoryleaks_test.go @@ -0,0 +1,167 @@ +/* + * Copyright (c) IBM Corporation 2023 + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * SPDX-License-Identifier: EPL-2.0 + */ +package main + +import ( + "fmt" + "runtime" + "testing" + "time" + + "github.com/ibm-messaging/mq-golang-jms20/jms20subset" + "github.com/ibm-messaging/mq-golang-jms20/mqjms" + "github.com/stretchr/testify/assert" +) + +/* + * Test for memory leak when there is no message to be received. + * + * This test is not included in the normal bucket as it sends an enormous number of + * messages, and requires human observation of the total process size to establish whether + * it passes or not, so can only be run under human supervision + */ +func DONT_RUNTestLeakOnEmptyGet(t *testing.T) { + + // Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory + //cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles() + //assert.Nil(t, cfErr) + + // Initialise the attributes of the CF in whatever way you like + cf := mqjms.ConnectionFactoryImpl{ + QMName: "QM1", + Hostname: "localhost", + PortNumber: 1414, + ChannelName: "DEV.APP.SVRCONN", + UserName: "app", + Password: "passw0rd", + } + + // Creates a connection to the queue manager, using defer to close it automatically + // at the end of the function (if it was created successfully) + context, ctxErr := cf.CreateContext() + assert.Nil(t, ctxErr) + if context != nil { + defer context.Close() + } + + // Now send the message and get it back again, to check that it roundtripped. + queue := context.CreateQueue("DEV.QUEUE.1") + + consumer, errCons := context.CreateConsumer(queue) + if consumer != nil { + defer consumer.Close() + } + assert.Nil(t, errCons) + + for i := 1; i < 35000; i++ { + + rcvMsg, errRvc := consumer.ReceiveNoWait() + assert.Nil(t, errRvc) + assert.Nil(t, rcvMsg) + + if i%1000 == 0 { + fmt.Println("Messages:", i) + } + + } + + fmt.Println("Finished receive calls - waiting for cooldown.") + runtime.GC() + + time.Sleep(30 * time.Second) + +} + +/* + * Test for memory leak when sending and receiving messages + * + * This test is not included in the normal bucket as it sends an enormous number of + * messages, and requires human observation of the total process size to establish whether + * it passes or not, so can only be run under human supervision + */ +func DONTRUN_TestLeakOnPutGet(t *testing.T) { + + // Loads CF parameters from connection_info.json and applicationApiKey.json in the Downloads directory + //cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles() + //assert.Nil(t, cfErr) + + // Initialise the attributes of the CF in whatever way you like + cf := mqjms.ConnectionFactoryImpl{ + QMName: "QM1", + Hostname: "localhost", + PortNumber: 1414, + ChannelName: "DEV.APP.SVRCONN", + UserName: "app", + Password: "passw0rd", + } + + // Creates a connection to the queue manager, using defer to close it automatically + // at the end of the function (if it was created successfully) + context, ctxErr := cf.CreateContext() + assert.Nil(t, ctxErr) + if context != nil { + defer context.Close() + } + + // Now send the message and get it back again, to check that it roundtripped. + queue := context.CreateQueue("DEV.QUEUE.1") + + consumer, errCons := context.CreateConsumer(queue) + if consumer != nil { + defer consumer.Close() + } + assert.Nil(t, errCons) + + ttlMillis := 20000 + producer := context.CreateProducer().SetTimeToLive(ttlMillis) + + for i := 1; i < 25000; i++ { + + // Create a TextMessage and check that we can populate it + msgBody := "Message " + fmt.Sprint(i) + txtMsg := context.CreateTextMessage() + txtMsg.SetText(msgBody) + txtMsg.SetIntProperty("MessageNumber", i) + + errSend := producer.Send(queue, txtMsg) + assert.Nil(t, errSend) + + rcvMsg, errRvc := consumer.ReceiveNoWait() + assert.Nil(t, errRvc) + assert.NotNil(t, rcvMsg) + + // Check message body. + switch msg := rcvMsg.(type) { + case jms20subset.TextMessage: + assert.Equal(t, msgBody, *msg.GetText()) + default: + assert.Fail(t, "Got something other than a text message") + } + + // Check messageID + assert.Equal(t, txtMsg.GetJMSMessageID(), rcvMsg.GetJMSMessageID()) + + // Check int property + rcvMsgNum, propErr := rcvMsg.GetIntProperty("MessageNumber") + assert.Nil(t, propErr) + assert.Equal(t, i, rcvMsgNum) + + if i%1000 == 0 { + fmt.Println("Messages:", i) + } + + } + + fmt.Println("Finished receive calls - waiting for cooldown.") + runtime.GC() + + time.Sleep(30 * time.Second) + +} diff --git a/mqjms/ConnectionFactoryImpl.go b/mqjms/ConnectionFactoryImpl.go index 3c3a6dc..1081c43 100644 --- a/mqjms/ConnectionFactoryImpl.go +++ b/mqjms/ConnectionFactoryImpl.go @@ -11,6 +11,7 @@ package mqjms import ( "strconv" + "sync" "github.com/ibm-messaging/mq-golang-jms20/jms20subset" ibmmq "github.com/ibm-messaging/mq-golang/v5/ibmmq" @@ -155,6 +156,7 @@ func (cf ConnectionFactoryImpl) CreateContextWithSessionMode(sessionMode int, mq // a new ContextImpl and return it to the caller. ctx = ContextImpl{ qMgr: qMgr, + ctxLock: &sync.Mutex{}, sessionMode: sessionMode, receiveBufferSize: cf.ReceiveBufferSize, sendCheckCount: cf.SendCheckCount, diff --git a/mqjms/ConsumerImpl.go b/mqjms/ConsumerImpl.go index b83b0c8..a053d33 100644 --- a/mqjms/ConsumerImpl.go +++ b/mqjms/ConsumerImpl.go @@ -11,8 +11,11 @@ package mqjms import ( "errors" + "fmt" + "runtime" "strconv" "strings" + "sync" "github.com/ibm-messaging/mq-golang-jms20/jms20subset" ibmmq "github.com/ibm-messaging/mq-golang/v5/ibmmq" @@ -57,6 +60,11 @@ func (consumer ConsumerImpl) Receive(waitMillis int32) (jms20subset.Message, jms // of receive. func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Message, jms20subset.JMSException) { + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use (below) to delete unused MessageHandles. + consumer.ctx.ctxLock.Lock() + defer consumer.ctx.ctxLock.Unlock() + // Prepare objects to be used in receiving the message. var msg jms20subset.Message var jmsErr jms20subset.JMSException @@ -99,6 +107,11 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess if err == nil { + // Set a finalizer on the message handle to allow it to be deleted + // when it is no longer referenced by an active object, to reduce/prevent + // memory leaks. + setMessageHandlerFinalizer(thisMsgHandle, consumer.ctx.ctxLock) + // Message received successfully (without error). // Determine on the basis of the format field what sort of message to create. @@ -116,6 +129,7 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess MessageImpl: MessageImpl{ mqmd: getmqmd, msgHandle: &thisMsgHandle, + ctxLock: consumer.ctx.ctxLock, }, } @@ -133,6 +147,7 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess MessageImpl: MessageImpl{ mqmd: getmqmd, msgHandle: &thisMsgHandle, + ctxLock: consumer.ctx.ctxLock, }, } } @@ -142,6 +157,11 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess // Error code was returned from MQ call. mqret := err.(*ibmmq.MQReturn) + // Delete the message handle object in-line here now that it is no longer required, + // to avoid memory leak + dmho := ibmmq.NewMQDMHO() + gmo.MsgHandle.DltMH(dmho) + if mqret.MQRC == ibmmq.MQRC_NO_MSG_AVAILABLE { // This isn't a real error - it's the way that MQ indicates that there @@ -164,6 +184,36 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess return msg, jmsErr } +/* + * Set a finalizer on the message handle to allow it to be deleted + * when it is no longer referenced by an active object, to reduce/prevent + * memory leaks. + */ +func setMessageHandlerFinalizer(thisMsgHandle ibmmq.MQMessageHandle, ctxLock *sync.Mutex) { + + runtime.SetFinalizer(&thisMsgHandle, func(msgHandle *ibmmq.MQMessageHandle) { + ctxLock.Lock() + defer ctxLock.Unlock() + + dmho := ibmmq.NewMQDMHO() + err := msgHandle.DltMH(dmho) + if err != nil { + + mqret := err.(*ibmmq.MQReturn) + + if mqret.MQRC == ibmmq.MQRC_HCONN_ERROR { + // Expected if the connection is closed before the finalizer executes + // (at which point it should get tidied up automatically by the connection) + } else { + fmt.Println("DltMH finalizer", err) + } + + } + + }) + +} + // ReceiveStringBodyNoWait implements the IBM MQ logic necessary to receive a // message from a Destination and return its body as a string. // @@ -356,6 +406,12 @@ func applySelector(selector string, getmqmd *ibmmq.MQMD, gmo *ibmmq.MQGMO) error func (consumer ConsumerImpl) Close() { if (ibmmq.MQObject{}) != consumer.qObject { + + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use (below) to delete unused MessageHandles. + consumer.ctx.ctxLock.Lock() + defer consumer.ctx.ctxLock.Unlock() + consumer.qObject.Close(0) } diff --git a/mqjms/ContextImpl.go b/mqjms/ContextImpl.go index 209dcd0..ceadbd6 100644 --- a/mqjms/ContextImpl.go +++ b/mqjms/ContextImpl.go @@ -12,6 +12,7 @@ package mqjms import ( "fmt" "strconv" + "sync" "github.com/ibm-messaging/mq-golang-jms20/jms20subset" ibmmq "github.com/ibm-messaging/mq-golang/v5/ibmmq" @@ -21,6 +22,7 @@ import ( // connection to an IBM MQ queue manager. type ContextImpl struct { qMgr ibmmq.MQQueueManager + ctxLock *sync.Mutex // Mutex to synchronize MQRC calls to the queue manager sessionMode int receiveBufferSize int sendCheckCount int @@ -65,6 +67,11 @@ func (ctx ContextImpl) CreateConsumer(dest jms20subset.Destination) (jms20subset // receive messages that match the specified selector from the given Destination. func (ctx ContextImpl) CreateConsumerWithSelector(dest jms20subset.Destination, selector string) (jms20subset.JMSConsumer, jms20subset.JMSException) { + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use to delete unused MessageHandles. + ctx.ctxLock.Lock() + defer ctx.ctxLock.Unlock() + // First validate the selector string format (we don't make use of it at // runtime until the receive is called) if selector != "" { @@ -118,6 +125,11 @@ func (ctx ContextImpl) CreateConsumerWithSelector(dest jms20subset.Destination, // an application can look at messages without removing them. func (ctx ContextImpl) CreateBrowser(dest jms20subset.Destination) (jms20subset.QueueBrowser, jms20subset.JMSException) { + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use to delete unused MessageHandles. + ctx.ctxLock.Lock() + defer ctx.ctxLock.Unlock() + // Set up the necessary objects to open the queue mqod := ibmmq.NewMQOD() var openOptions int32 @@ -166,23 +178,34 @@ func (ctx ContextImpl) CreateBrowser(dest jms20subset.Destination) (jms20subset. func (ctx ContextImpl) CreateTextMessage() jms20subset.TextMessage { var bodyStr *string - thisMsgHandle := createMsgHandle(ctx.qMgr) + thisMsgHandle := ctx.createMsgHandle(ctx.qMgr) return &TextMessageImpl{ bodyStr: bodyStr, MessageImpl: MessageImpl{ msgHandle: &thisMsgHandle, + ctxLock: ctx.ctxLock, }, } } // createMsgHandle creates a new message handle object that can be used to // store and retrieve message properties. -func createMsgHandle(qMgr ibmmq.MQQueueManager) ibmmq.MQMessageHandle { +func (ctx ContextImpl) createMsgHandle(qMgr ibmmq.MQQueueManager) ibmmq.MQMessageHandle { + + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use to delete unused MessageHandles. + ctx.ctxLock.Lock() + defer ctx.ctxLock.Unlock() cmho := ibmmq.NewMQCMHO() thisMsgHandle, err := qMgr.CrtMH(cmho) + // Set a finalizer on the message handle to allow it to be deleted + // when it is no longer referenced by an active object, to reduce/prevent + // memory leaks. + setMessageHandlerFinalizer(thisMsgHandle, ctx.ctxLock) + if err != nil { // No easy way to pass this error back to the application without // changing the function signature, which could break existing @@ -198,12 +221,13 @@ func createMsgHandle(qMgr ibmmq.MQQueueManager) ibmmq.MQMessageHandle { // and initialise it with the chosen text string. func (ctx ContextImpl) CreateTextMessageWithString(txt string) jms20subset.TextMessage { - thisMsgHandle := createMsgHandle(ctx.qMgr) + thisMsgHandle := ctx.createMsgHandle(ctx.qMgr) msg := &TextMessageImpl{ bodyStr: &txt, MessageImpl: MessageImpl{ msgHandle: &thisMsgHandle, + ctxLock: ctx.ctxLock, }, } @@ -214,12 +238,13 @@ func (ctx ContextImpl) CreateTextMessageWithString(txt string) jms20subset.TextM func (ctx ContextImpl) CreateBytesMessage() jms20subset.BytesMessage { var thisBodyBytes *[]byte - thisMsgHandle := createMsgHandle(ctx.qMgr) + thisMsgHandle := ctx.createMsgHandle(ctx.qMgr) return &BytesMessageImpl{ bodyBytes: thisBodyBytes, MessageImpl: MessageImpl{ msgHandle: &thisMsgHandle, + ctxLock: ctx.ctxLock, }, } } @@ -227,12 +252,13 @@ func (ctx ContextImpl) CreateBytesMessage() jms20subset.BytesMessage { // CreateBytesMessageWithBytes is a JMS standard mechanism for creating a BytesMessage. func (ctx ContextImpl) CreateBytesMessageWithBytes(bytes []byte) jms20subset.BytesMessage { - thisMsgHandle := createMsgHandle(ctx.qMgr) + thisMsgHandle := ctx.createMsgHandle(ctx.qMgr) return &BytesMessageImpl{ bodyBytes: &bytes, MessageImpl: MessageImpl{ msgHandle: &thisMsgHandle, + ctxLock: ctx.ctxLock, }, } } @@ -243,6 +269,12 @@ func (ctx ContextImpl) Commit() jms20subset.JMSException { var retErr jms20subset.JMSException if (ibmmq.MQQueueManager{}) != ctx.qMgr { + + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use to delete unused MessageHandles. + ctx.ctxLock.Lock() + defer ctx.ctxLock.Unlock() + err := ctx.qMgr.Cmit() if err != nil { @@ -297,6 +329,12 @@ func (ctx ContextImpl) Rollback() jms20subset.JMSException { var retErr jms20subset.JMSException if (ibmmq.MQQueueManager{}) != ctx.qMgr { + + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use to delete unused MessageHandles. + ctx.ctxLock.Lock() + defer ctx.ctxLock.Unlock() + err := ctx.qMgr.Back() if err != nil { @@ -321,6 +359,12 @@ func (ctx ContextImpl) Close() { ctx.Rollback() if (ibmmq.MQQueueManager{}) != ctx.qMgr { + + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use to delete unused MessageHandles. + ctx.ctxLock.Lock() + defer ctx.ctxLock.Unlock() + ctx.qMgr.Disc() } diff --git a/mqjms/MessageImpl.go b/mqjms/MessageImpl.go index 3ed6825..8964570 100644 --- a/mqjms/MessageImpl.go +++ b/mqjms/MessageImpl.go @@ -16,6 +16,7 @@ import ( "log" "strconv" "strings" + "sync" "time" "github.com/ibm-messaging/mq-golang-jms20/jms20subset" @@ -32,6 +33,7 @@ const MessageImpl_PROPERTY_CONVERT_NOTSUPPORTED_CODE string = "1056 " type MessageImpl struct { mqmd *ibmmq.MQMD msgHandle *ibmmq.MQMessageHandle + ctxLock *sync.Mutex } // GetJMSDeliveryMode extracts the persistence setting from this message @@ -311,6 +313,11 @@ func (msg *MessageImpl) SetStringProperty(name string, value *string) jms20subse return retErr } + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use to delete unused MessageHandles. + msg.ctxLock.Lock() + defer msg.ctxLock.Unlock() + if value != nil { // Looking to set a value var valueStr string @@ -569,6 +576,12 @@ func (msg *MessageImpl) GetStringProperty(name string) (*string, jms20subset.JMS isSpecialProp, value, err := msg.getSpecialPropertyValue(name) if !isSpecialProp { + + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use to delete unused MessageHandles. + msg.ctxLock.Lock() + defer msg.ctxLock.Unlock() + // If not then look for a user property _, value, err = msg.msgHandle.InqMP(impo, pd, name) } @@ -641,6 +654,11 @@ func (msg *MessageImpl) SetIntProperty(name string, value int) jms20subset.JMSEx smpo := ibmmq.NewMQSMPO() pd := ibmmq.NewMQPD() + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use to delete unused MessageHandles. + msg.ctxLock.Lock() + defer msg.ctxLock.Unlock() + linkedErr = msg.msgHandle.SetMP(smpo, name, pd, value) if linkedErr != nil { @@ -667,6 +685,12 @@ func (msg *MessageImpl) GetIntProperty(name string) (int, jms20subset.JMSExcepti isSpecialProp, value, err := msg.getSpecialPropertyValue(name) if !isSpecialProp { + + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use to delete unused MessageHandles. + msg.ctxLock.Lock() + defer msg.ctxLock.Unlock() + // If not then look for a user property _, value, err = msg.msgHandle.InqMP(impo, pd, name) } @@ -728,6 +752,11 @@ func (msg *MessageImpl) SetDoubleProperty(name string, value float64) jms20subse smpo := ibmmq.NewMQSMPO() pd := ibmmq.NewMQPD() + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use to delete unused MessageHandles. + msg.ctxLock.Lock() + defer msg.ctxLock.Unlock() + linkedErr = msg.msgHandle.SetMP(smpo, name, pd, value) if linkedErr != nil { @@ -754,6 +783,12 @@ func (msg *MessageImpl) GetDoubleProperty(name string) (float64, jms20subset.JMS isSpecialProp, value, err := msg.getSpecialPropertyValue(name) if !isSpecialProp { + + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use to delete unused MessageHandles. + msg.ctxLock.Lock() + defer msg.ctxLock.Unlock() + // If not then look for a user property _, value, err = msg.msgHandle.InqMP(impo, pd, name) } @@ -818,6 +853,11 @@ func (msg *MessageImpl) SetBooleanProperty(name string, value bool) jms20subset. return retErr } + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use to delete unused MessageHandles. + msg.ctxLock.Lock() + defer msg.ctxLock.Unlock() + linkedErr = msg.msgHandle.SetMP(smpo, name, pd, value) if linkedErr != nil { @@ -875,6 +915,12 @@ func (msg *MessageImpl) GetBooleanProperty(name string) (bool, jms20subset.JMSEx isSpecialProp, value, err := msg.getSpecialPropertyValue(name) if !isSpecialProp { + + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use to delete unused MessageHandles. + msg.ctxLock.Lock() + defer msg.ctxLock.Unlock() + // If not then look for a user property _, value, err = msg.msgHandle.InqMP(impo, pd, name) } @@ -951,6 +997,11 @@ func (msg *MessageImpl) getPropertiesInternal(name string) (bool, []string, jms2 pd := ibmmq.NewMQPD() propNames := []string{} + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use to delete unused MessageHandles. + msg.ctxLock.Lock() + defer msg.ctxLock.Unlock() + impo.Options = ibmmq.MQIMPO_CONVERT_VALUE | ibmmq.MQIMPO_INQ_FIRST for propsToRead := true; propsToRead; { @@ -998,6 +1049,11 @@ func (msg *MessageImpl) ClearProperties() jms20subset.JMSException { dmpo := ibmmq.NewMQDMPO() + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use to delete unused MessageHandles. + msg.ctxLock.Lock() + defer msg.ctxLock.Unlock() + for _, propName := range allPropNames { // Delete this property diff --git a/mqjms/ProducerImpl.go b/mqjms/ProducerImpl.go index 7bc4ac1..49451e5 100644 --- a/mqjms/ProducerImpl.go +++ b/mqjms/ProducerImpl.go @@ -58,6 +58,11 @@ func (producer ProducerImpl) SendBytes(dest jms20subset.Destination, body []byte // that are defined on this JMSProducer. func (producer ProducerImpl) Send(dest jms20subset.Destination, msg jms20subset.Message) jms20subset.JMSException { + // Lock the context while we are making calls to the queue manager so that it + // doesn't conflict with the finalizer we use (below) to delete unused MessageHandles. + producer.ctx.ctxLock.Lock() + defer producer.ctx.ctxLock.Unlock() + // Set up the basic objects we need to send the message. mqod := ibmmq.NewMQOD() putmqmd := ibmmq.NewMQMD()