Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
22.92% |
11 / 48 |
|
44.44% |
4 / 9 |
CRAP | |
0.00% |
0 / 1 |
RabbitMQService | |
22.92% |
11 / 48 |
|
44.44% |
4 / 9 |
90.40 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
1 | |||
createConnection | |
0.00% |
0 / 16 |
|
0.00% |
0 / 1 |
12 | |||
getConnection | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getChannel | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
publishMessage | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
closeConnection | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
getConsumers | |
0.00% |
0 / 17 |
|
0.00% |
0 / 1 |
12 | |||
setConnection | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
setChannel | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 |
1 | <?php |
2 | |
3 | namespace App\Services; |
4 | |
5 | use Illuminate\Support\Facades\Log; |
6 | use PhpAmqpLib\Connection\AMQPStreamConnection; |
7 | use PhpAmqpLib\Message\AMQPMessage; |
8 | use PhpAmqpLib\Channel\AbstractChannel; |
9 | |
10 | class RabbitMQService |
11 | { |
12 | protected $connection; |
13 | protected $channel; |
14 | private mixed $user; |
15 | private mixed $pass; |
16 | private mixed $host; |
17 | private mixed $port; |
18 | private string $queueListUrl; |
19 | private mixed $apiHost; |
20 | private mixed $queue; |
21 | private mixed $consumers; |
22 | |
23 | /** |
24 | * RabbitMQService constructor. |
25 | * |
26 | * @throws \Exception |
27 | */ |
28 | public function __construct() |
29 | { |
30 | // Get configs |
31 | $this->user = env('RABBIT_USER'); |
32 | $this->pass = env('RABBIT_PASS'); |
33 | $this->host = env('RABBIT_HOST'); |
34 | $this->port = env('RABBIT_PORT'); |
35 | $this->apiHost = env('RABBIT_API_HOST'); |
36 | $this->queue = env('RABBIT_MESSAGE_QUEUE'); |
37 | $this->consumers = env('RABBIT_CONSUMERS_LIMIT'); |
38 | |
39 | // API url |
40 | $this->queueListUrl = "{$this->apiHost}/queues/%2F/{$this->queue}"; |
41 | |
42 | } |
43 | |
44 | /** |
45 | * @throws \Exception |
46 | */ |
47 | public function createConnection($isScheduled): void |
48 | { |
49 | try { |
50 | |
51 | if ($isScheduled) { |
52 | |
53 | // Create connection |
54 | $this->connection = new AMQPStreamConnection( |
55 | $this->host, $this->port, $this->user, $this->pass, |
56 | '/', |
57 | false, |
58 | 'AMQPLAIN', |
59 | null, |
60 | 'en_US', |
61 | 160 |
62 | ); |
63 | |
64 | // Create channel |
65 | $this->channel = $this->connection->channel(); |
66 | |
67 | } else { |
68 | $this->connection = null; |
69 | } |
70 | |
71 | } catch (\Exception $e) { |
72 | |
73 | // Log the exception message |
74 | Log::channel('messages') |
75 | ->error('Error in RabbitMQService constructor: ' . $e->getMessage()); |
76 | |
77 | // Rethrow the exception |
78 | throw $e; |
79 | } |
80 | } |
81 | |
82 | /** |
83 | * Get the AMQPStreamConnection instance. |
84 | * |
85 | * @return AMQPStreamConnection |
86 | */ |
87 | public function getConnection(): AMQPStreamConnection |
88 | { |
89 | return $this->connection; |
90 | } |
91 | |
92 | /** |
93 | * Get the AMQP channel instance. |
94 | * |
95 | * @return AbstractChannel |
96 | */ |
97 | public function getChannel(): AbstractChannel |
98 | { |
99 | return $this->channel; |
100 | } |
101 | |
102 | /** |
103 | * Publish a message to the specified RabbitMQ queue. |
104 | * |
105 | * @param string $queue |
106 | * @param mixed $message |
107 | * @return void |
108 | */ |
109 | public function publishMessage(string $queue, mixed $message): void |
110 | { |
111 | $this->channel->basic_publish(new AMQPMessage($message), '', $queue); |
112 | } |
113 | |
114 | /** |
115 | * Close the RabbitMQ channel and connection. |
116 | * |
117 | * @throws \Exception |
118 | */ |
119 | public function closeConnection(): void |
120 | { |
121 | $this->channel->close(); |
122 | $this->connection->close(); |
123 | } |
124 | |
125 | /** |
126 | * Get the number of current consumers of this queue via API. |
127 | * |
128 | * @return int |
129 | * @throws \Exception |
130 | */ |
131 | public function getConsumers(): int |
132 | { |
133 | $ch = curl_init($this->queueListUrl); |
134 | curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); |
135 | curl_setopt($ch, CURLOPT_USERPWD, "$this->user:$this->pass"); |
136 | |
137 | $response = curl_exec($ch); |
138 | |
139 | if ($response === false) { |
140 | $error = curl_error($ch); |
141 | curl_close($ch); |
142 | |
143 | // Throw an exception with the error message |
144 | Log::channel('messages') |
145 | ->error('Error checking information about the queues: ' . curl_error($ch)); |
146 | |
147 | throw new \Exception('Error checking information about the queues: ' . $error . ' No consumers up now.'); |
148 | } |
149 | |
150 | curl_close($ch); |
151 | $queueInfo = json_decode($response, true); |
152 | |
153 | // Check if 'consumers' key exists in the response |
154 | if (isset($queueInfo['consumers'])) { |
155 | return $queueInfo['consumers']; |
156 | } else { |
157 | // Throw an exception if 'consumers' key is not present |
158 | Log::channel('messages') |
159 | ->error('Error: Unable to retrieve the number of consumers.'); |
160 | |
161 | throw new \Exception('Error: Unable to retrieve the number of consumers. No consumers up now.'); |
162 | } |
163 | } |
164 | |
165 | |
166 | public function setConnection($connection): void |
167 | { |
168 | $this->connection = $connection; |
169 | } |
170 | |
171 | public function setChannel($channel): void |
172 | { |
173 | $this->channel = $channel; |
174 | } |
175 | |
176 | |
177 | } |