This repository has been archived by the owner on Aug 24, 2022. It is now read-only.
forked from gjedeer/celery-php
-
Notifications
You must be signed in to change notification settings - Fork 0
/
amqp.php
130 lines (116 loc) · 3.34 KB
/
amqp.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
<?php
/* Include namespaced code only if PhpAmqpLib available */
if(class_exists('PhpAmqpLib\Connection\AMQPConnection'))
{
require_once('amqplibconnector.php');
require_once('amqplibconnectorssl.php');
}
/* Include only if predis available */
if(class_exists('Predis\Autoloader'))
{
require_once('redisconnector.php');
}
/* Including the PECL connector never fails */
require_once('amqppeclconnector.php');
/**
* Abstraction for AMQP client libraries
* Abstract base class
* @package celery-php
*/
abstract class AbstractAMQPConnector
{
/**
* Return a concrete AMQP abstraction object. Factory method.
* @param string $name Name of desired concrete object: 'pecl', 'php-amqplib' or false: autodetect
* @return AbstractAMQPConnector concrete object implementing AbstractAMQPConnector interface
*/
static function GetConcrete($name = false)
{
if($name === false)
{
$name = self::GetBestInstalledExtensionName();
}
return self::GetConcreteByName($name);
}
/**
* Return a concrete AMQP abstraction object given by the name
* @param string $name Name of desired concrete object: 'pecl', 'php-amqplib'
* @return AbstractAMQPConnector concrete object implementing AbstractAMQPConnector interface
*/
static function GetConcreteByName($name)
{
if($name == 'pecl')
{
return new PECLAMQPConnector();
}
elseif($name == 'php-amqplib')
{
return new AMQPLibConnector();
}
elseif($name == 'php-amqplib-ssl')
{
return new AMQPLibConnectorSsl();
}
elseif($name == 'redis')
{
return new RedisConnector();
}
else
{
throw new Exception('Unknown extension name ' . $name);
}
}
/**
* Return name of best available AMQP connector library
* @return string Name of available library or 'unknown'
*/
static function GetBestInstalledExtensionName($ssl = false)
{
if($ssl === true) //pecl doesn't support ssl
{
return 'php-amqplib-ssl';
}
elseif(class_exists('AMQPConnection') && extension_loaded('amqp'))
{
return 'pecl';
}
elseif(class_exists('PhpAmqpLib\Connection\AMQPConnection'))
{
return 'php-amqplib';
}
else
{
return 'unknown';
}
}
/**
* Return backend-specific connection object passed to all other calls
* @param array $details Array of connection details
* @return object
*/
abstract function GetConnectionObject($details); // details = array
/**
* Initialize connection on a given connection object
* @return NULL
*/
abstract function Connect($connection);
/**
* Post a task to exchange specified in $details
* @param AMQPConnection $connection Connection object
* @param array $details Array of connection details
* @param string $task JSON-encoded task
* @param array $params AMQP message parameters
* @return bool true if posted successfuly
*/
abstract function PostToExchange($connection, $details, $task, $params);
/**
* Return result of task execution for $task_id
* @param object $connection Backend-specific connection object returned by GetConnectionObject()
* @param string $task_id Celery task identifier
* @param boolean $removeMessageFromQueue whether to remove message from queue
* @return array array('body' => JSON-encoded message body, 'complete_result' => library-specific message object)
* or false if result not ready yet
*/
abstract function GetMessageBody($connection, $task_id, $removeMessageFromQueue);
}
?>