Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
4.26% |
4 / 94 |
|
7.69% |
1 / 13 |
CRAP | |
0.00% |
0 / 1 |
MessagesFromRabbit | |
4.26% |
4 / 94 |
|
7.69% |
1 / 13 |
408.06 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
1 | |||
handle | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
12 | |||
init | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
6 | |||
setCloseOnDestruct | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
startChannel | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
startQueueDeclare | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
2 | |||
handleInitException | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
consumeQueue | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
12 | |||
handleChannelNotInitialized | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
2 | |||
saveMessage | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
12 | |||
handleValidationFailure | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
2 | |||
saveMessageToDatabase | |
0.00% |
0 / 17 |
|
0.00% |
0 / 1 |
2 | |||
closeConnection | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 |
1 | <?php |
2 | |
3 | namespace App\Console\Commands\Messages; |
4 | |
5 | use App\Mail\MessageEmail; |
6 | use App\Mail\RabbitEmail; |
7 | use App\Models\Messages; |
8 | use App\Services\RabbitMQService; |
9 | use Illuminate\Console\Command; |
10 | use Illuminate\Contracts\Validation\Validator; |
11 | use Illuminate\Support\Facades\Log; |
12 | use Illuminate\Support\Facades\Mail; |
13 | |
14 | class MessagesFromRabbit extends Command |
15 | { |
16 | protected $signature = 'queue:messages {--is-scheduled=}'; |
17 | protected $description = 'Messages from RabbitMQ queue and stored at DB'; |
18 | |
19 | private mixed $queue; |
20 | private mixed $consumers; |
21 | private mixed $channel; |
22 | private RabbitMQService $rabbitMQService; |
23 | |
24 | /** |
25 | * @throws \Exception |
26 | */ |
27 | public function __construct(RabbitMQService $rabbitMQService) |
28 | { |
29 | parent::__construct(); |
30 | |
31 | $this->rabbitMQService = $rabbitMQService; |
32 | |
33 | // Get missing settings according the env |
34 | $this->queue = env('RABBIT_MESSAGE_QUEUE'); |
35 | $this->consumers = env('RABBIT_CONSUMERS_LIMIT'); |
36 | } |
37 | |
38 | /** |
39 | * Job starts here. |
40 | * @return bool |
41 | * @throws \Exception |
42 | */ |
43 | public function handle(): bool |
44 | { |
45 | // Check the number of consumers up. If it reaches the limit, don't need to create more. Abort here. |
46 | if ($this->rabbitMQService->getConsumers() >= $this->consumers) { |
47 | $this->info("All total $this->consumers consumers are running. No more consumers needed."); |
48 | return false; |
49 | } |
50 | |
51 | $this->rabbitMQService->createConnection((bool)$this->option('is-scheduled')); |
52 | |
53 | // Init the new listener |
54 | $this->init(); |
55 | |
56 | // Init new consumer |
57 | $this->consumeQueue(function ($msg) { |
58 | $result = $this->saveMessage($msg->body); |
59 | if($result) |
60 | $msg->ack(); |
61 | }); |
62 | |
63 | // Close connections and consumers |
64 | $this->closeConnection(); |
65 | |
66 | return true; |
67 | } |
68 | |
69 | /** |
70 | * Start a new listener. |
71 | * @return void |
72 | */ |
73 | private function init(): void |
74 | { |
75 | try { |
76 | $this->setCloseOnDestruct(); |
77 | $this->startChannel(); |
78 | $this->startQueueDeclare(); |
79 | $this->info("Init listener done."); |
80 | } catch (\Exception $ex) { |
81 | $this->handleInitException($ex); |
82 | } |
83 | } |
84 | |
85 | /** |
86 | * Set close_on_destruct to false. |
87 | * |
88 | * @return void |
89 | */ |
90 | private function setCloseOnDestruct(): void |
91 | { |
92 | $this->info("Set close_on_destruct.."); |
93 | $this->rabbitMQService->getConnection()->set_close_on_destruct(false); |
94 | } |
95 | |
96 | /** |
97 | * Start the channel. |
98 | * |
99 | * @return void |
100 | */ |
101 | private function startChannel(): void |
102 | { |
103 | $this->info("Start channel.."); |
104 | $this->channel = $this->rabbitMQService->getChannel(); |
105 | } |
106 | |
107 | /** |
108 | * Start queue declaration. |
109 | * |
110 | * @return void |
111 | */ |
112 | private function startQueueDeclare(): void |
113 | { |
114 | $this->info("Start queue_declare.."); |
115 | $this->channel->queue_declare( |
116 | $this->queue, |
117 | false, |
118 | true, |
119 | false, |
120 | false |
121 | ); |
122 | } |
123 | |
124 | /** |
125 | * Handle the exception during initialization. |
126 | * |
127 | * @param \Exception $ex |
128 | * @return void |
129 | */ |
130 | private function handleInitException(\Exception $ex): void |
131 | { |
132 | $this->error($ex->getMessage()); |
133 | |
134 | // Log the error |
135 | Log::channel('messages') |
136 | ->error('Error on Console processing a message from rabbit: ' . $ex->getMessage()); |
137 | } |
138 | |
139 | |
140 | /** |
141 | * Start a new consumer. |
142 | * @param $callback |
143 | * @return void |
144 | */ |
145 | private function consumeQueue($callback): void |
146 | { |
147 | // Ensure that the channel is initialized |
148 | if (!$this->channel) { |
149 | $this->handleChannelNotInitialized(); |
150 | return; |
151 | } |
152 | |
153 | // Set up basic consumption with the provided callback |
154 | $this->channel->basic_consume( |
155 | $this->queue, |
156 | '', |
157 | false, |
158 | false, |
159 | false, |
160 | false, |
161 | $callback |
162 | ); |
163 | |
164 | // Continue consuming messages until the channel stops |
165 | while ($this->channel->is_consuming()) { |
166 | $this->channel->wait(); |
167 | } |
168 | } |
169 | |
170 | /** |
171 | * Handle the case when the channel is not initialized. |
172 | * |
173 | * @return void |
174 | */ |
175 | private function handleChannelNotInitialized(): void |
176 | { |
177 | $composedError = 'Channel not initialized.'; |
178 | $this->error($composedError); |
179 | |
180 | // Log the error |
181 | Log::channel('messages') |
182 | ->error('Error on Console processing a message from rabbit: ' . $composedError); |
183 | } |
184 | |
185 | /** |
186 | * Read message from the queue and store it in DB. |
187 | * |
188 | * @param $originalData - Message in this format: |
189 | * { |
190 | * "name" : "name", |
191 | * "email" : "to@to.to", |
192 | * "subject" : "subject", |
193 | * "content" : "message" |
194 | * } |
195 | * @return bool - true if the message is well delivered. |
196 | * false if there's some problem with the message. |
197 | * @throws \Throwable |
198 | */ |
199 | private function saveMessage($originalData): bool |
200 | { |
201 | // Decode the JSON data |
202 | $data = json_decode($originalData, true); |
203 | |
204 | try { |
205 | |
206 | // Validate the data using the Messages model |
207 | $validator = Messages::validateData($data); |
208 | |
209 | // Check if validation fails |
210 | if ($validator->fails()) { |
211 | |
212 | // Handle validation failure |
213 | $this->handleValidationFailure($validator, $originalData); |
214 | |
215 | } else { |
216 | |
217 | // Save the validated data to the database |
218 | $this->saveMessageToDatabase($data, $originalData); |
219 | } |
220 | |
221 | } catch (\Throwable $e) { |
222 | |
223 | // Send mail notification with the the exception |
224 | Log::channel('messages') |
225 | ->error($e->getMessage()); |
226 | |
227 | // Send email with the fail msg |
228 | Mail::to(env('MAIL_USERNAME')) |
229 | ->send(new RabbitEmail($originalData, $e->getMessage())); |
230 | } |
231 | |
232 | return true; |
233 | } |
234 | |
235 | |
236 | /** |
237 | * Handle validation failure by 8logging the error. |
238 | * |
239 | * @param Validator $validator |
240 | * @param string $originalData |
241 | * @return void |
242 | */ |
243 | private function handleValidationFailure(Validator $validator, string $originalData): void |
244 | { |
245 | $errors = $validator->errors()->toArray(); |
246 | $composedError = "\nValidation failed: " . json_encode($errors) . |
247 | "\nOriginal message: " . $originalData; |
248 | |
249 | // I/O |
250 | $this->error($composedError); |
251 | |
252 | // Log the error |
253 | Log::channel('messages') |
254 | ->error('Error processing a message from rabbit: ' . $composedError); |
255 | |
256 | // Send email with the fail msg |
257 | Mail::to(env('MAIL_USERNAME')) |
258 | ->send(new RabbitEmail(json_encode($originalData), $composedError)); |
259 | } |
260 | |
261 | /** |
262 | * Save the validated data to the database and log the success. |
263 | * |
264 | * @param array $data |
265 | * @param string $originalData |
266 | * @return void |
267 | */ |
268 | private function saveMessageToDatabase(array $data, string $originalData): void |
269 | { |
270 | // Create a new message in the database |
271 | $message = Messages::create([ |
272 | 'name' => $data['name'], |
273 | 'email' => $data['email'], |
274 | 'subject' => $data['subject'] ?? null, |
275 | 'content' => $data['content'], |
276 | 'created_at' => now() |
277 | ]); |
278 | |
279 | // Log the success message in I/O |
280 | $this->info( |
281 | "\nMessage {$originalData} \n- Sent from queue:messages." |
282 | . "\n- Saved in the database with ID: {$message->id}." |
283 | ); |
284 | |
285 | // Log the success message in file |
286 | Log::channel('messages') |
287 | ->info("Message {$originalData} sent and saved in the database with ID: {$message->id}"); |
288 | |
289 | // Send email |
290 | Mail::to(env('MAIL_USERNAME')) |
291 | ->send(new MessageEmail($data)); |
292 | |
293 | // Log email sent in file |
294 | Log::channel('emails') |
295 | ->info("Email sent with {$originalData} to " . env('MAIL_USERNAME') . " | DB ID: {$message->id}"); |
296 | } |
297 | |
298 | /** |
299 | * End connection and consumer. |
300 | * @return void |
301 | * @throws \Exception |
302 | */ |
303 | private function closeConnection(): void |
304 | { |
305 | try { |
306 | $this->rabbitMQService->closeConnection(); |
307 | } catch (\Exception $ex) { |
308 | $this->error($ex->getMessage()); |
309 | } |
310 | } |
311 | } |