lib_mysqludf_amqp
is a MySQL user-defined function
library for sending AMQP messages.
- C compiler and standard build tools (make, sh, ...).
- MySQL
- rabbitmq-c
- libbsd (on GNU/Linux)
These are the instructions for building releases. If you are building from non-release source (e.g. git clone
) and there is no ./configure
script, run ./autogen
first and then follow the instructions below..
$ ./configure
$ make
# sudo make install
# service mysql-server restart
$ make installdb
Publishes a string 'Hello, World!'
to the udf
exchange on localhost:5672
with a routing key of test
as the user guest
with the password guest
. Upon success, the message ID is returned.
mysql> SELECT lib_mysqludf_amqp_sendstring('amqp://guest:guest@localhost:5672', 'udf', 'test', 'Hello, World!');
+---------------------------------------------------------------------------------------------------+
| lib_mysqludf_amqp_sendstring('amqp://guest:guest@localhost:5672', 'udf', 'test', 'Hello, World!') |
+---------------------------------------------------------------------------------------------------+
| 000e9ced-9050-4454-8eb4-afe78c336b13 |
+---------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
The following publishes JSON objects representing table rows whenever a row is inserted, updated, or deleted.
SET @AMQP_URL = 'amqp://guest:guest@localhost:5672';
SET @AMQP_EXCHANGE = 'udf';
DROP TABLE IF EXISTS `accounts`;
CREATE TABLE `accounts` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`username` varchar(64) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='Customer Accounts';
DELIMITER ;;
DROP TRIGGER IF EXISTS `after_insert_on_accounts`;
CREATE DEFINER=`root`@`localhost` TRIGGER `after_insert_on_accounts` AFTER INSERT ON `accounts` FOR EACH ROW BEGIN
SET @message_id = (SELECT lib_mysqludf_amqp_sendjson(@AMQP_URL, @AMQP_EXCHANGE, 'accounts.insert', json_object('id', NEW.id, 'username', NEW.username)));
END ;;
DROP TRIGGER IF EXISTS `after_update_on_accounts`;
CREATE DEFINER=`root`@`localhost` TRIGGER `after_update_on_accounts` AFTER UPDATE ON `accounts` FOR EACH ROW BEGIN
SET @message_id = (SELECT lib_mysqludf_amqp_sendjson(@AMQP_URL, @AMQP_EXCHANGE, 'accounts.update', json_object('id', NEW.id, 'username', NEW.username)));
END ;;
DROP TRIGGER IF EXISTS `after_delete_on_accounts`;
CREATE DEFINER=`root`@`localhost` TRIGGER `after_delete_on_accounts` AFTER DELETE ON `accounts` FOR EACH ROW BEGIN
SET @message_id = (SELECT lib_mysqludf_amqp_sendjson(@AMQP_URL, @AMQP_EXCHANGE, 'accounts.delete', json_object('id', OLD.id, 'username', OLD.username)));
END ;;
DELIMITER ;
INSERT INTO accounts (username) values ('jdoe');
UPDATE accounts SET username = 'jsmith';
DELETE FROM accounts WHERE id = last_insert_id();
Returns an informational message denoting the package name and version.
None. Supplying any parameters will result in an error.
A string representation of the package name and version, separated by a single space. For example, lib_mysqludf_amqp 2.0.0
.
- invalid arguments Raised when the function is called with arguments.
SELECT lib_mysqludf_amqp_info();
Sends a plain text message
to the given exchange
on the provided hostname
and port
with the supplied routingKey
as username
identified by password
.
url
(string).amqp://[$USERNAME[:$PASSWORD]@]$HOST[:$PORT]/[$VHOST]
exchange
(string). The name of the AMQP exchange to publish the message to.routingKey
(string). The routing key for this message.message
(string). The body of the message.
Upon succes, this function returns a string containing the message ID, a Universally Unique IDentifier (UUID) (v4).
Upon failure, either NULL
is returned or an error is raised.
- invalid arguments Raised when the function is called with an invalid number of arguments or when any argument is not of the correct type.
- socket error Raised when a socket cannot be allocated.
- socket open error Raised when a socket cannot be opened.
- login error Raised when authentication against the AMQP server specified by
hostname
andport
fails using the supplied credentials,username
andpassword
. - channel error Raised when a communications channel cannot be opened between this module and the AMQP server.
- malloc error Raised when the memory allocation routine
malloc()
fails.
Login as user guest
with password guest
to the AMQP server running on localhost
port 5672
and publish the message Hello, World!
with routing key test
to exchange udf
:
SELECT lib_mysqludf_amqp_sendstring('amqp://guest:guest@localhost:5672', 'udf', 'test', 'Hello, World!');
Sends a JSON message
to the given exchange
on the provided hostname
and port
with the supplied routingKey
as username
identified by password
.
url
(string).amqp://[$USERNAME[:$PASSWORD]@]$HOST[:$PORT]/[$VHOST]
exchange
(string). The name of the AMQP exchange to publish the message to.routingKey
(string). The routing key for this message.message
(string). The body of the message, a JSON string.
Upon succes, this function returns a string containing the message ID, a Universally Unique IDentifier (UUID) (v4).
Upon failure, either NULL
is returned or an error is raised.
- invalid arguments Raised when the function is called with an invalid number of arguments or when any argument is not of the correct type.
- socket error Raised when a socket cannot be allocated.
- socket open error Raised when a socket cannot be opened.
- login error Raised when authentication against the AMQP server specified by
hostname
andport
fails using the supplied credentials,username
andpassword
. - channel error Raised when a communications channel cannot be opened between this module and the AMQP server.
- malloc error Raised when the memory allocation routine
malloc()
fails.
Login as user guest
with password guest
to the AMQP server running on localhost
port 5672
and publish the message { "info": "lib_mysqludf_amqp 0.0.0" }
with routing key test
to exchange udf
:
SELECT lib_mysqludf_amqp_sendjson('amqp://guest:guest@localhost:5672', 'udf', 'test', json_object('lib_mysqludf_amqp_info', cast(lib_mysqludf_amqp_info() as char)));
See COPYING.