Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
4.26% covered (danger)
4.26%
4 / 94
7.69% covered (danger)
7.69%
1 / 13
CRAP
0.00% covered (danger)
0.00%
0 / 1
MessagesFromRabbit
4.26% covered (danger)
4.26%
4 / 94
7.69% covered (danger)
7.69%
1 / 13
408.06
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 handle
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
12
 init
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
6
 setCloseOnDestruct
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 startChannel
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 startQueueDeclare
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
2
 handleInitException
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 consumeQueue
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
12
 handleChannelNotInitialized
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
2
 saveMessage
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
12
 handleValidationFailure
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
2
 saveMessageToDatabase
0.00% covered (danger)
0.00%
0 / 17
0.00% covered (danger)
0.00%
0 / 1
2
 closeConnection
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
1<?php
2
3namespace App\Console\Commands\Messages;
4
5use App\Mail\MessageEmail;
6use App\Mail\RabbitEmail;
7use App\Models\Messages;
8use App\Services\RabbitMQService;
9use Illuminate\Console\Command;
10use Illuminate\Contracts\Validation\Validator;
11use Illuminate\Support\Facades\Log;
12use Illuminate\Support\Facades\Mail;
13
14class MessagesFromRabbit extends Command
15{
16    protected $signature   = 'queue:messages {--is-scheduled=}';
17    protected $description = 'Messages from RabbitMQ queue and stored at DB';
18
19    private mixed $queue;
20    private mixed $consumers;
21    private mixed $channel;
22    private RabbitMQService $rabbitMQService;
23
24    /**
25     * @throws \Exception
26     */
27    public function __construct(RabbitMQService $rabbitMQService)
28    {
29        parent::__construct();
30
31        $this->rabbitMQService = $rabbitMQService;
32
33        // Get missing settings according the env
34        $this->queue     = env('RABBIT_MESSAGE_QUEUE');
35        $this->consumers = env('RABBIT_CONSUMERS_LIMIT');
36    }
37
38    /**
39     * Job starts here.
40     * @return bool
41     * @throws \Exception
42     */
43    public function handle(): bool
44    {
45        // Check the number of consumers up. If it reaches the limit, don't need to create more. Abort here.
46        if ($this->rabbitMQService->getConsumers() >= $this->consumers) {
47            $this->info("All total $this->consumers consumers are running. No more consumers needed.");
48            return false;
49        }
50
51        $this->rabbitMQService->createConnection((bool)$this->option('is-scheduled'));
52
53        // Init the new listener
54        $this->init();
55
56        // Init new consumer
57        $this->consumeQueue(function ($msg) {
58            $result = $this->saveMessage($msg->body);
59            if($result)
60                $msg->ack();
61        });
62
63        // Close connections and consumers
64        $this->closeConnection();
65
66        return true;
67    }
68
69    /**
70     * Start a new listener.
71     * @return void
72     */
73    private function init(): void
74    {
75        try {
76            $this->setCloseOnDestruct();
77            $this->startChannel();
78            $this->startQueueDeclare();
79            $this->info("Init listener done.");
80        } catch (\Exception $ex) {
81            $this->handleInitException($ex);
82        }
83    }
84
85    /**
86     * Set close_on_destruct to false.
87     *
88     * @return void
89     */
90    private function setCloseOnDestruct(): void
91    {
92        $this->info("Set close_on_destruct..");
93        $this->rabbitMQService->getConnection()->set_close_on_destruct(false);
94    }
95
96    /**
97     * Start the channel.
98     *
99     * @return void
100     */
101    private function startChannel(): void
102    {
103        $this->info("Start channel..");
104        $this->channel = $this->rabbitMQService->getChannel();
105    }
106
107    /**
108     * Start queue declaration.
109     *
110     * @return void
111     */
112    private function startQueueDeclare(): void
113    {
114        $this->info("Start queue_declare..");
115        $this->channel->queue_declare(
116            $this->queue,
117            false,
118            true,
119            false,
120            false
121        );
122    }
123
124    /**
125     * Handle the exception during initialization.
126     *
127     * @param \Exception $ex
128     * @return void
129     */
130    private function handleInitException(\Exception $ex): void
131    {
132        $this->error($ex->getMessage());
133
134        // Log the error
135        Log::channel('messages')
136            ->error('Error on Console processing a message from rabbit: ' . $ex->getMessage());
137    }
138
139
140    /**
141     * Start a new consumer.
142     * @param $callback
143     * @return void
144     */
145    private function  consumeQueue($callback): void
146    {
147        // Ensure that the channel is initialized
148        if (!$this->channel) {
149            $this->handleChannelNotInitialized();
150            return;
151        }
152
153        // Set up basic consumption with the provided callback
154        $this->channel->basic_consume(
155            $this->queue,
156            '',
157            false,
158            false,
159            false,
160            false,
161            $callback
162        );
163
164        // Continue consuming messages until the channel stops
165        while ($this->channel->is_consuming()) {
166            $this->channel->wait();
167        }
168    }
169
170    /**
171     * Handle the case when the channel is not initialized.
172     *
173     * @return void
174     */
175    private function handleChannelNotInitialized(): void
176    {
177        $composedError = 'Channel not initialized.';
178        $this->error($composedError);
179
180        // Log the error
181        Log::channel('messages')
182            ->error('Error on Console processing a message from rabbit: ' . $composedError);
183    }
184
185    /**
186     * Read message from the queue and store it in DB.
187     *
188     * @param $originalData - Message in this format:
189     *                  {
190     *                     "name"    : "name",
191     *                     "email"   : "to@to.to",
192     *                     "subject" : "subject",
193     *                     "content" : "message"
194     *                   }
195     * @return bool - true if the message is well delivered.
196     *                false if there's some problem with the message.
197     * @throws \Throwable
198     */
199    private function saveMessage($originalData): bool
200    {
201        // Decode the JSON data
202        $data = json_decode($originalData, true);
203
204        try {
205
206            // Validate the data using the Messages model
207            $validator = Messages::validateData($data);
208
209            // Check if validation fails
210            if ($validator->fails()) {
211
212                // Handle validation failure
213                $this->handleValidationFailure($validator, $originalData);
214
215            } else {
216
217                // Save the validated data to the database
218                $this->saveMessageToDatabase($data, $originalData);
219            }
220
221        } catch (\Throwable $e) {
222
223            // Send mail notification with the the exception
224            Log::channel('messages')
225                ->error($e->getMessage());
226
227            // Send email with the fail msg
228            Mail::to(env('MAIL_USERNAME'))
229                ->send(new RabbitEmail($originalData, $e->getMessage()));
230        }
231
232        return true;
233    }
234
235
236    /**
237     * Handle validation failure by 8logging the error.
238     *
239     * @param Validator $validator
240     * @param string $originalData
241     * @return void
242     */
243    private function handleValidationFailure(Validator $validator, string $originalData): void
244    {
245        $errors        = $validator->errors()->toArray();
246        $composedError = "\nValidation failed: " . json_encode($errors) .
247            "\nOriginal message: " . $originalData;
248
249        // I/O
250        $this->error($composedError);
251
252        // Log the error
253        Log::channel('messages')
254            ->error('Error processing a message from rabbit: ' . $composedError);
255
256        // Send email with the fail msg
257        Mail::to(env('MAIL_USERNAME'))
258            ->send(new RabbitEmail(json_encode($originalData), $composedError));
259    }
260
261    /**
262     * Save the validated data to the database and log the success.
263     *
264     * @param array $data
265     * @param string $originalData
266     * @return void
267     */
268    private function saveMessageToDatabase(array $data, string $originalData): void
269    {
270        // Create a new message in the database
271        $message = Messages::create([
272            'name'       => $data['name'],
273            'email'      => $data['email'],
274            'subject'    => $data['subject'] ?? null,
275            'content'    => $data['content'],
276            'created_at' => now()
277        ]);
278
279        // Log the success message in I/O
280        $this->info(
281            "\nMessage {$originalData} \n- Sent from queue:messages."
282            . "\n- Saved in the database with ID: {$message->id}."
283        );
284
285        // Log the success message in file
286        Log::channel('messages')
287            ->info("Message {$originalData} sent and saved in the database with ID: {$message->id}");
288
289        // Send email
290        Mail::to(env('MAIL_USERNAME'))
291            ->send(new MessageEmail($data));
292
293        // Log email sent in file
294        Log::channel('emails')
295            ->info("Email sent with {$originalData} to " . env('MAIL_USERNAME') . " | DB ID: {$message->id}");
296    }
297
298    /**
299     * End connection and consumer.
300     * @return void
301     * @throws \Exception
302     */
303    private function closeConnection(): void
304    {
305        try {
306            $this->rabbitMQService->closeConnection();
307        } catch (\Exception $ex) {
308            $this->error($ex->getMessage());
309        }
310    }
311}