Message Service with RabbitMQ


Diagram overview


Technical Implementation


class MessagesController extends Controller
    private Messages $messagesModel;

    public function __construct(Messages $messagesModel)
        $this->messagesModel = $messagesModel;

    public function send(Request $request): JsonResponse
        try {

            // Validate received data
            $validationResult = $this->validateData($request->all());
            if ($validationResult !== true)
               return response()->json(
                    ['error' => $validationResult],

            // Prepare message
            $message = $this->prepareMessage($request);

            // Send message to the RabbitMQ queue
            $queue = env('RABBIT_MESSAGE_QUEUE');
            dispatch(new MessagesJob(json_encode($message), $queue));

        } catch (\Exception $e) {
            return response()->json(
                ['error' => $e->getMessage()],

        // If flow reaches here, everything worked fine!
        // Confirm if it is an API
        $isApiRequest = $request->is('api/*');
        if ($isApiRequest) {
            return response()->json(['success-api']);

        return response()->json(['success-site']);

     * Validate the received data using the Messages model.
     * @param array $data
     * @return string|bool
    public function validateData(array $data) : string | bool
        $validator = $this->messagesModel->validateData($data);

        if ($validator->fails()) {

            // Log validation errors
            $errors = $validator->errors()->toArray();
            $this->logError('Validation failed: ' . json_encode($errors));

            // Return errors
            return json_encode($errors);

        return true;

     * Prepare the message data from the request.
     * @param Request $request
     * @return array
    private function prepareMessage(Request $request): array
        return [
            'name'    => $request->input('name'),
            'email'   => $request->input('email'),
            'subject' => $request->input('subject'),
            'content' => $request->input('content'),

     * Log an error message to the 'messages' channel.
     * @param string $message
     * @return void
    private function logError(string $message): void
            ->error('Error on Controller receiving message from client: ' . $message);


class Messages extends Model
    protected $table    = 'messages';
    public $timestamps  = false;
    protected $fillable = [

     * @param array $data
     * @return Validator
    public static function validateData(array $data): Validator
        // Define validation rules
        $rules = [
            'name'    => 'required|string|max:50',
            'email'   => 'required|email|max:50',
            'subject' => 'nullable|string|max:100',
            'content' => 'required|string|max:3000',

        return Validatior2::make($data, $rules);


class RabbitMQService
    protected $connection;
    protected $channel;
    private mixed $user;
    private mixed $pass;
    private mixed $host;
    private mixed $port;
    private string $queueListUrl;
    private mixed $apiHost;
    private mixed $queue;
    private mixed $consumers;

     * RabbitMQService constructor.
     * @throws \Exception
    public function __construct()
        // Get configs
        $this->user      = env('RABBIT_USER');
        $this->pass      = env('RABBIT_PASS');
        $this->host      = env('RABBIT_HOST');
        $this->port      = env('RABBIT_PORT');
        $this->apiHost   = env('RABBIT_API_HOST');
        $this->queue     = env('RABBIT_MESSAGE_QUEUE');
        $this->consumers = env('RABBIT_CONSUMERS_LIMIT');

        // API url
        $this->queueListUrl = "{$this->apiHost}/queues/%2F/{$this->queue}";


     * @throws \Exception
    public function createConnection($isScheduled): void
        try {

            if ($isScheduled) {

                // Create connection
                $this->connection = new AMQPStreamConnection(
                    $this->host, $this->port, $this->user, $this->pass,

                // Create channel
                $this->channel = $this->connection->channel();

            } else {
                $this->connection = null;

        } catch (\Exception $e) {

            // Log the exception message
                ->error('Error in RabbitMQService constructor: ' . $e->getMessage());

            // Rethrow the exception
            throw $e;

     * Get the AMQPStreamConnection instance.
     * @return AMQPStreamConnection
    public function getConnection(): AMQPStreamConnection
        return $this->connection;

     * Get the AMQP channel instance.
     * @return AbstractChannel
    public function getChannel(): AbstractChannel
        return $this->channel;

     * Publish a message to the specified RabbitMQ queue.
     * @param string $queue
     * @param mixed $message
     * @return void
    public function publishMessage(string $queue, mixed $message): void
        $this->channel->basic_publish(new AMQPMessage($message), '', $queue);

     * Close the RabbitMQ channel and connection.
     * @throws \Exception
    public function closeConnection(): void

     * Get the number of current consumers of this queue via API.
     * @return int
     * @throws \Exception
    public function getConsumers(): int
        $ch = curl_init($this->queueListUrl);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_USERPWD, "$this->user:$this->pass");

        $response = curl_exec($ch);

        if ($response === false) {
            $error = curl_error($ch);

            // Throw an exception with the error message
                ->error('Error checking information about the queues: ' . curl_error($ch));

            throw new \Exception('Error checking information about the queues: ' . $error . ' No consumers up now.');

        $queueInfo = json_decode($response, true);

        // Check if 'consumers' key exists in the response
        if (isset($queueInfo['consumers'])) {
            return $queueInfo['consumers'];
        } else {
            // Throw an exception if 'consumers' key is not present
                ->error('Error: Unable to retrieve the number of consumers.');

            throw new \Exception('Error: Unable to retrieve the number of consumers. No consumers up now.');


class MessagesJob implements ShouldQueue
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected mixed $data;
    public $queue;

     * Create a new job instance.
     * @param mixed $data The message data to be published to RabbitMQ.
     * @param string $queue The RabbitMQ queue name.
    public function __construct(mixed $data, string $queue)
        $this->data  = $data;
        $this->queue = $queue;

     * Execute the job.
     * @return void
     * @throws \Exception
    public function handle(): void
        // Create a new instance of RabbitMQService
        $rabbitMQService = new RabbitMQService();

        // Publish the message to the specified RabbitMQ queue
        $rabbitMQService->publishMessage($this->queue, $this->data);

        // Close the RabbitMQ connection


class MessagesFromRabbit extends Command
    protected $signature   = 'queue:messages {--is-scheduled=}';
    protected $description = 'Messages from RabbitMQ queue and stored at DB';

    private mixed $queue;
    private mixed $consumers;
    private mixed $channel;
    private RabbitMQService $rabbitMQService;

     * @throws \Exception
    public function __construct(RabbitMQService $rabbitMQService)

        $this->rabbitMQService = $rabbitMQService;

        // Get missing settings according the env
        $this->queue     = env('RABBIT_MESSAGE_QUEUE');
        $this->consumers = env('RABBIT_CONSUMERS_LIMIT');

     * Job starts here.
     * @return bool
     * @throws \Exception
    public function handle(): bool
        // Check the number of consumers up. If it reaches the limit, don't need to create more. Abort here.
        if ($this->rabbitMQService->getConsumers() >= $this->consumers) {
            $this->info("All total $this->consumers consumers are running. No more consumers needed.");
            return false;


        // Init the new listener

        // Init new consumer
        $this->consumeQueue(function ($msg) {
            $result = $this->saveMessage($msg->body);

        // Close connections and consumers

        return true;

     * Start a new listener.
     * @return void
    private function init(): void
        try {
            $this->info("Init listener done.");
        } catch (\Exception $ex) {

     * Set close_on_destruct to false.
     * @return void
    private function setCloseOnDestruct(): void
        $this->info("Set close_on_destruct..");

     * Start the channel.
     * @return void
    private function startChannel(): void
        $this->info("Start channel..");
        $this->channel = $this->rabbitMQService->getChannel();

     * Start queue declaration.
     * @return void
    private function startQueueDeclare(): void
        $this->info("Start queue_declare..");

     * Handle the exception during initialization.
     * @param \Exception $ex
     * @return void
    private function handleInitException(\Exception $ex): void

        // Log the error
            ->error('Error on Console processing a message from rabbit: ' . $ex->getMessage());

     * Start a new consumer.
     * @param $callback
     * @return void
    private function  consumeQueue($callback): void
        // Ensure that the channel is initialized
        if (!$this->channel) {

        // Set up basic consumption with the provided callback

        // Continue consuming messages until the channel stops
        while ($this->channel->is_consuming()) {

     * Handle the case when the channel is not initialized.
     * @return void
    private function handleChannelNotInitialized(): void
        $composedError = 'Channel not initialized.';

        // Log the error
            ->error('Error on Console processing a message from rabbit: ' . $composedError);

     * Read message from the queue and store it in DB.
     * @param $originalData - Message in this format:
     *                  {
     *                     "name"    : "name",
     *                     "email"   : "",
     *                     "subject" : "subject",
     *                     "content" : "message"
     *                   }
     * @return bool - true if the message is well delivered.
     *                false if there's some problem with the message.
     * @throws \Throwable
    private function saveMessage($originalData): bool
        // Decode the JSON data
        $data = json_decode($originalData, true);

        try {

            // Validate the data using the Messages model
            $validator = Messages::validateData($data);

            // Check if validation fails
            if ($validator->fails()) {

                // Handle validation failure
                $this->handleValidationFailure($validator, $originalData);

            } else {

                // Save the validated data to the database
                $this->saveMessageToDatabase($data, $originalData);

        } catch (\Throwable $e) {

            // Send mail notification with the the exception

            // Send email with the fail msg
                ->send(new RabbitEmail($originalData, $e->getMessage()));

        return true;

     * Handle validation failure by 8logging the error.
     * @param Validator $validator
     * @param string $originalData
     * @return void
    private function handleValidationFailure(Validator $validator, string $originalData): void
        $errors        = $validator->errors()->toArray();
        $composedError = "\nValidation failed: " . json_encode($errors) .
            "\nOriginal message: " . $originalData;

        // I/O

        // Log the error
            ->error('Error processing a message from rabbit: ' . $composedError);

        // Send email with the fail msg
            ->send(new RabbitEmail(json_encode($originalData), $composedError));

     * Save the validated data to the database and log the success.
     * @param array $data
     * @param string $originalData
     * @return void
    private function saveMessageToDatabase(array $data, string $originalData): void
        // Create a new message in the database
        $message = Messages::create([
            'name'       => $data['name'],
            'email'      => $data['email'],
            'subject'    => $data['subject'] ?? null,
            'content'    => $data['content'],
            'created_at' => now()

        // Log the success message in I/O
            "\nMessage {$originalData} \n- Sent from queue:messages."
            . "\n- Saved in the database with ID: {$message->id}."

        // Log the success message in file
            ->info("Message {$originalData} sent and saved in the database with ID: {$message->id}");

        // Send email
            ->send(new MessageEmail($data));

        // Log email sent in file
            ->info("Email sent with {$originalData} to " . env('MAIL_USERNAME') . " | DB ID: {$message->id}");

     * End connection and consumer.
     * @return void
     * @throws \Exception
    private function closeConnection(): void
        try {
        } catch (\Exception $ex) {

Demonstration ( Click on the image to see the video )

Demonstration video