Source code for shinto.rabbitmq

"""RabbitMQ handler module."""

from __future__ import annotations

import os
from typing import TYPE_CHECKING

from pika import BlockingConnection, ConnectionParameters, PlainCredentials
from pika.exceptions import AMQPError

if TYPE_CHECKING:  # pragma: no cover
    from collections.abc import Callable

    from pika import BasicProperties
    from pika.amqp_object import Method
    from pika.channel import Channel
    from pika.connection import Connection


[docs]class QueueError(Exception): """Exception while queueing the message."""
[docs]class QueueHandler: """RabbitMQ handler. Will wait for messages or publish messages.""" _connection: Connection _channel: Channel _exchange: str _queue_name: str def __init__( self, host: str | None = None, port: int | str | None = None, username: str | None = None, password: str | None = None, queue: str | None = None, exchange: str | None = None, ) -> None: """ Initialize the RabbitMQ connection. Args: host (str): RabbitMQ host. port (int | str | None): RabbitMQ port. Defaults to 5672. username (str): RabbitMQ username. password (str): RabbitMQ password. queue (str): RabbitMQ queue name. exchange (str): RabbitMQ exchange name. Defaults to "". Raises: TypeError: If any of the required parameters are missing. Arguments can also be provided as environment variables: ------------------------------------------------------- - `RABBITMQ_HOST` - `RABBITMQ_PORT` - `RABBITMQ_USERNAME` - `RABBITMQ_PASSWORD` - `RABBITMQ_QUEUE` - `RABBITMQ_EXCHANGE` If arguments are provided they will take precedence over the environment variables. """ host = host or os.getenv("RABBITMQ_HOST") port = port or os.getenv("RABBITMQ_PORT", "5672") username = username or os.getenv("RABBITMQ_USERNAME") password = password or os.getenv("RABBITMQ_PASSWORD") queue = queue or os.getenv("RABBITMQ_QUEUE") exchange = exchange or os.getenv("RABBITMQ_EXCHANGE", "") missing_params = [k for k, v in locals().items() if v is None] if len(missing_params) > 0: msg = f"Missing required parameters: {missing_params}" raise TypeError(msg) try: self._connection = BlockingConnection( ConnectionParameters(host, port, "/", PlainCredentials(username, password, True)) ) except AMQPError as amqp_error: raise QueueError from amqp_error self._queue_name = queue self._exchange = exchange self._channel = self._connection.channel() def __del__(self): """Close the connection when the object is deleted.""" if hasattr(self, "_connection"): self._connection.close()
[docs] def check_for_queue(self): """Check if queue exists and if not: Declare it.""" self._channel.queue_declare( queue=self._queue_name, durable=True, arguments={"x-max-priority": 10} )
[docs] def consume( self, callback: Callable[[Channel, Method, BasicProperties, bytes], None], ): """ Consume messages from the queue. Wait for messages in the RabbitMQ Queue and use callback when one is received The callback function: def callback(channel, method, properties, body): !Callback function MUST ACK or NACK the message after processing use: 'channel.basic_ack(delivery_tag=method.delivery_tag)' to ACK message """ try: self._channel.basic_consume( queue=self._queue_name, on_message_callback=callback, auto_ack=False ) self._channel.start_consuming() except AMQPError as amqp_error: raise QueueError from amqp_error
[docs] def publish(self, message: str): """Publish message.""" try: self._channel.basic_publish( exchange=self._exchange, routing_key=self._queue_name, body=message ) except AMQPError as amqp_error: raise QueueError from amqp_error