Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
22.92% covered (danger)
22.92%
11 / 48
44.44% covered (danger)
44.44%
4 / 9
CRAP
0.00% covered (danger)
0.00%
0 / 1
RabbitMQService
22.92% covered (danger)
22.92%
11 / 48
44.44% covered (danger)
44.44%
4 / 9
90.40
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
1
 createConnection
0.00% covered (danger)
0.00%
0 / 16
0.00% covered (danger)
0.00%
0 / 1
12
 getConnection
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getChannel
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 publishMessage
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 closeConnection
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 getConsumers
0.00% covered (danger)
0.00%
0 / 17
0.00% covered (danger)
0.00%
0 / 1
12
 setConnection
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 setChannel
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
1<?php
2
3namespace App\Services;
4
5use Illuminate\Support\Facades\Log;
6use PhpAmqpLib\Connection\AMQPStreamConnection;
7use PhpAmqpLib\Message\AMQPMessage;
8use PhpAmqpLib\Channel\AbstractChannel;
9
10class RabbitMQService
11{
12    protected $connection;
13    protected $channel;
14    private mixed $user;
15    private mixed $pass;
16    private mixed $host;
17    private mixed $port;
18    private string $queueListUrl;
19    private mixed $apiHost;
20    private mixed $queue;
21    private mixed $consumers;
22
23    /**
24     * RabbitMQService constructor.
25     *
26     * @throws \Exception
27     */
28    public function __construct()
29    {
30        // Get configs
31        $this->user      = env('RABBIT_USER');
32        $this->pass      = env('RABBIT_PASS');
33        $this->host      = env('RABBIT_HOST');
34        $this->port      = env('RABBIT_PORT');
35        $this->apiHost   = env('RABBIT_API_HOST');
36        $this->queue     = env('RABBIT_MESSAGE_QUEUE');
37        $this->consumers = env('RABBIT_CONSUMERS_LIMIT');
38
39        // API url
40        $this->queueListUrl = "{$this->apiHost}/queues/%2F/{$this->queue}";
41
42    }
43
44    /**
45     * @throws \Exception
46     */
47    public function createConnection($isScheduled): void
48    {
49        try {
50
51            if ($isScheduled) {
52
53                // Create connection
54                $this->connection = new AMQPStreamConnection(
55                    $this->host, $this->port, $this->user, $this->pass,
56                    '/',
57                    false,
58                    'AMQPLAIN',
59                    null,
60                    'en_US',
61                    160
62                );
63
64                // Create channel
65                $this->channel = $this->connection->channel();
66
67            } else {
68                $this->connection = null;
69            }
70
71        } catch (\Exception $e) {
72
73            // Log the exception message
74            Log::channel('messages')
75                ->error('Error in RabbitMQService constructor: ' . $e->getMessage());
76
77            // Rethrow the exception
78            throw $e;
79        }
80    }
81
82    /**
83     * Get the AMQPStreamConnection instance.
84     *
85     * @return AMQPStreamConnection
86     */
87    public function getConnection(): AMQPStreamConnection
88    {
89        return $this->connection;
90    }
91
92    /**
93     * Get the AMQP channel instance.
94     *
95     * @return AbstractChannel
96     */
97    public function getChannel(): AbstractChannel
98    {
99        return $this->channel;
100    }
101
102    /**
103     * Publish a message to the specified RabbitMQ queue.
104     *
105     * @param string $queue
106     * @param mixed $message
107     * @return void
108     */
109    public function publishMessage(string $queue, mixed $message): void
110    {
111        $this->channel->basic_publish(new AMQPMessage($message), '', $queue);
112    }
113
114    /**
115     * Close the RabbitMQ channel and connection.
116     *
117     * @throws \Exception
118     */
119    public function closeConnection(): void
120    {
121        $this->channel->close();
122        $this->connection->close();
123    }
124
125    /**
126     * Get the number of current consumers of this queue via API.
127     *
128     * @return int
129     * @throws \Exception
130     */
131    public function getConsumers(): int
132    {
133        $ch = curl_init($this->queueListUrl);
134        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
135        curl_setopt($ch, CURLOPT_USERPWD, "$this->user:$this->pass");
136
137        $response = curl_exec($ch);
138
139        if ($response === false) {
140            $error = curl_error($ch);
141            curl_close($ch);
142
143            // Throw an exception with the error message
144            Log::channel('messages')
145                ->error('Error checking information about the queues: ' . curl_error($ch));
146
147            throw new \Exception('Error checking information about the queues: ' . $error . ' No consumers up now.');
148        }
149
150        curl_close($ch);
151        $queueInfo = json_decode($response, true);
152
153        // Check if 'consumers' key exists in the response
154        if (isset($queueInfo['consumers'])) {
155            return $queueInfo['consumers'];
156        } else {
157            // Throw an exception if 'consumers' key is not present
158            Log::channel('messages')
159                ->error('Error: Unable to retrieve the number of consumers.');
160
161            throw new \Exception('Error: Unable to retrieve the number of consumers. No consumers up now.');
162        }
163    }
164
165
166    public function setConnection($connection): void
167    {
168        $this->connection = $connection;
169    }
170
171    public function setChannel($channel): void
172    {
173        $this->channel = $channel;
174    }
175
176
177}