Source code for domain_event_broker.transport

from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import logging
import json
from pika import (
    BasicProperties,
    BlockingConnection,
    URLParameters,
    )
from pika import channel, frame, spec
from .events import DomainEvent
from . import settings

log = logging.getLogger(__name__)


[docs]def publish_domain_event(routing_key: str, data: Dict[str, Any], domain_object_id: str = None, uuid_string: Optional[str] = None, timestamp: Optional[float] = None, connection_settings: Optional[str] = '', ) -> DomainEvent: """ Send a domain event to the message broker. The broker will take care of dispatching the event to registered subscribers. :param str routing_key: The routing key is of the form ``<DOMAIN>.<EVENT_TYPE>``. The routing key should be a descriptive name of the domain event such as ``user.registered``. :param dict data: The actual event data. *Must* be json serializable. :param str domain_object_id: Domain identifier of the event. This field is optional. If used, it might make search in an event store easier. :param str uuid_string: This UUID identifier of the event. If left ``None``, a new one will be created. :param float timestamp: Unix timestamp. If timestamp is None, a new (UTC) timestamp will be created. :param str connection_settings: Specify the broker with an AMQP URL. If not given, the default broker will be used. If set to ``None``, the domain event is not published to a broker. :return: The domain event that was published. :rtype: :py:class:`domain_event_broker.DomainEvent` """ event = DomainEvent( routing_key=routing_key, data=data, domain_object_id=domain_object_id, uuid_string=uuid_string, timestamp=timestamp) json_data = json.dumps(event.event_data) if connection_settings == '': connection_settings = settings.BROKER publisher = Publisher(connection_settings) publisher.publish(json_data, event.routing_key) publisher.disconnect() return event
[docs]class Retry(Exception): """ Raise this exception in an event handler to schedule a delayed retry. The delay is specified in seconds. .. note:: Internally a delay exchange with a per-message TTL is used and all delayed events for a handler that share the same delay are placed in one queue. The RabbitMQ TTL has a Millisecond resolution. """ def __init__(self, delay: float = 10.0): super(Retry, self).__init__() self.delay = delay
def _retry_message(name: str, retry_exchange: str, channel: channel.Channel, method: frame.Method, properties: spec.BasicProperties, body: str, delay: float, ) -> None: delay = int(delay * 1000) # Create queue that should be automatically deleted shortly after # the last message expires. The queue is re-declared for each retry # which resets the queue expiry. delay_name = '{}-delay-{}'.format(name, delay) result = channel.queue_declare( queue=delay_name, durable=True, arguments={ 'x-dead-letter-exchange': retry_exchange, 'x-message-ttl': delay, 'x-expires': delay + 10000, }, ) queue_name = result.method.queue # Bind the wait queue to the delay exchange before publishing channel.exchange_declare( exchange=delay_name, durable=True, auto_delete=True, # Delete exchange when queue is deleted exchange_type='topic') channel.queue_bind( exchange=delay_name, routing_key='#', queue=queue_name) channel.basic_publish( exchange=delay_name, routing_key=method.routing_key, body=body, properties=properties) def _call_event_handler(handler: Callable, event: DomainEvent, connection: BlockingConnection, acknowledge: Callable, retry: Callable, reject: Callable, max_retries: int, ) -> None: # The handler is executed in a separate worker thread. Handle any errors # and trigger retries, dead-lettering or acknowledgement via threadsafe # callback on the connection. try: handler(event) except Retry as error: if event.retries < max_retries: # Publish manually to the delay exchange with a per-message TTL msg = "Retry ({retries}) consuming event {event} in {delay:.1f}s" log.info(msg.format( event=event, retries=event.retries, delay=error.delay)) delayed_retry = partial(retry, delay=error.delay) connection.add_callback_threadsafe(acknowledge) connection.add_callback_threadsafe(delayed_retry) else: # Reject puts the message into the dead-letter queue if there is # one, otherwise the message is discarded. msg = "Exceeded max retries ({}) for {} event".format( max_retries, event.routing_key) log.error(msg, exc_info=True, extra=event.event_data) connection.add_callback_threadsafe(reject) except: # noqa: E722 # Note: If we want immediate requeueing, add a `RequeueError` # that a consumer can raise to trigger requeuing. Dead-letter # queues are a better choice in most cases. connection.add_callback_threadsafe(reject) log.exception("Event has been dead-lettered or discarded") else: connection.add_callback_threadsafe(acknowledge) def receive_callback(transport: 'Subscriber', handler: Callable, name: str, retry_exchange: str, max_retries: int, channel: channel.Channel, method: frame.Method, properties: spec.BasicProperties, body: str, ) -> None: try: event = DomainEvent.from_json(body) except Exception: # We cannot parse the message; requeuing would not help. channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False) log.exception("Failed to load message: %s", body) else: if properties.headers and 'x-death' in properties.headers: # Older RabbitMQ versions (< 3.5) keep adding x-death entries, new # versions only keep the most recent entry and increment 'count', # see: https://github.com/rabbitmq/rabbitmq-server/issues/78 expiry_info = properties.headers['x-death'][0] if 'count' in expiry_info: event.retries = expiry_info['count'] else: event.retries = len(properties.headers['x-death']) log.debug("Received {}:{}".format(method.routing_key, event)) # The channel and connection objects are not threadsafe. Only call any # of those function from the main thread via a threadsafe callback. acknowledge = partial( channel.basic_ack, delivery_tag=method.delivery_tag) reject = partial( channel.basic_reject, delivery_tag=method.delivery_tag, requeue=False) retry = partial( _retry_message, name=name, retry_exchange=retry_exchange, channel=channel, method=method, properties=properties, body=body) event_handler = partial( _call_event_handler, handler=handler, event=event, connection=channel.connection, acknowledge=acknowledge, retry=retry, reject=reject, max_retries=max_retries) transport.workers.submit(event_handler) def requires_broker(method: Callable) -> Callable: """ If connection_settings are set to ``None`` on the transport object, don't perform the action and log the call instead. This is used for environments where no broker is available, e.g. development and testing. """ def wrapper(transport: 'Transport', *args: Any, **kwargs: Any) -> Any: if transport.connection_settings is None: log.debug("No broker configured: {}.{}() is deactivated.".format( transport.__class__.__name__, method.__name__)) else: return method(transport, *args, **kwargs) return wrapper class Transport(object): def __init__(self, connection_settings: Optional[str] = '', exchange: str = "domain-events", exchange_type: str = "topic", ): if connection_settings == '': connection_settings = settings.BROKER self.exchange = exchange self.exchange_type = exchange_type self.context_depth = 0 self.connection = None self.connection_settings = connection_settings self.channel = None self.connect() @requires_broker def connect(self) -> None: """ For now we use a synchronous connection - caller is blocked until a message is added to the queue. We might switch to asynch connections should this incur noticable latencies. """ params = URLParameters(self.connection_settings) connection = BlockingConnection(params) channel = connection.channel() # set up the Exchange (if it does not exist) channel.exchange_declare( exchange=self.exchange, exchange_type=self.exchange_type, durable=True, auto_delete=False) self.connection = connection self.channel = channel @requires_broker def disconnect(self) -> None: """ Disconnect from queue. The API is a little weird. First we close the connection, then disconnect from the socket. The last step will remove the connection from the RabbitMQ connection pool. Make sure you don't close the channel, otherwise the connection cannot be closed anymore. """ if self.connection is not None: self.connection.close() self.channel = None self.connection = None
[docs]class Publisher(Transport): @requires_broker def publish(self, message: bytes, routing_key: Optional[str] = None) -> None: """ Send as persistent message. """ if self.channel is None: raise Exception('Not connected to broker.') self.channel.basic_publish( exchange=self.exchange, routing_key=routing_key, body=message, properties=BasicProperties(delivery_mode=2), )
[docs]class Subscriber(Transport): """ A subscriber manages the registration of one or more event handlers. Once instantiated, call ``register`` to add subscribers and ``start_consuming`` to wait for incoming events. .. note:: The subscriber only uses one thread for processing events. Even if multiple handlers are registered, only one event is processed at a time. """
[docs] def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) self.workers = ThreadPoolExecutor(max_workers=1)
@requires_broker def bind_routing_keys(self, exchange: str, queue_name: str, binding_keys: Union[List[str], Tuple[str]], ) -> None: if self.channel is None: raise Exception('Not connected to broker.') for binding_key in binding_keys: self.channel.queue_bind( exchange=exchange, routing_key=binding_key, queue=queue_name) @requires_broker def register(self, handler: Callable, name: str, binding_keys: Union[List[str], Tuple[str]], dead_letter: bool = False, durable: bool = True, exclusive: bool = False, auto_delete: bool = False, max_retries: int = 0, ) -> None: """ Register a handler for one or more types of domain events. :param function handler: This function will be called when an event happens. It receives a ``DomainEvent`` as the only parameter. :param str name: Name of the handler. Used as the queue name. :param tuple|list binding_keys: One or more routing keys, e.g. ``["user.registered", "user.imported"]``. The binding keys may include wildcards. Use ``user.*`` to subscribe to all events in the user domain. Use ``#`` to call ``handler`` for all domain events across all domains. :param bool dead_letter: Whether to store events in a dead-letter queue if the handler raises an exception while processing the event. :param int max_retries: The handler may raise ``domain_event_broker.Retry`` to indicate the event processing should be retried later. This parameter controls how often an event is rescheduled before it is dead-lettered or discarded. """ if self.channel is None: raise Exception('Not connected to broker.') retry_exchange = name + '-retry' dead_letter_exchange = name + '-dlx' arguments = {} if dead_letter: self.channel.exchange_declare( exchange=dead_letter_exchange, exchange_type=self.exchange_type) result = self.channel.queue_declare( queue=name + '-dl', durable=True) queue_name = result.method.queue self.bind_routing_keys( dead_letter_exchange, queue_name, binding_keys) arguments["x-dead-letter-exchange"] = dead_letter_exchange # Create subscriber queue and bind to the default exchange self.channel.queue_declare( queue=name, durable=durable, exclusive=exclusive, auto_delete=auto_delete, arguments=arguments) self.bind_routing_keys(self.exchange, name, binding_keys) # Re-route failed messages to a retry dead letter queue. # This is only used if max_retries > 0 but we set up the exchanges # anyway so that messages can be replayed manually in the consumer # context. # Declare the exchange where messages expired in the wait queue are # routed. self.channel.exchange_declare( exchange=retry_exchange, exchange_type=self.exchange_type) # Bind the consumer queue to the retry exchange self.bind_routing_keys(retry_exchange, name, binding_keys) callback = partial( receive_callback, self, handler, name, retry_exchange, max_retries) self.channel.basic_qos(prefetch_count=1) self.channel.basic_consume(queue=name, on_message_callback=callback) @requires_broker def stop_consuming(self) -> None: if self.channel is not None: self.channel.stop_consuming() self.disconnect() @requires_broker def start_consuming(self, timeout: Optional[float] = None) -> None: """ Enter IO consumer loop after calling `register`. If timeout is given, the consumer will be stopped after the specified number of seconds. """ if self.connection is None or self.channel is None: raise Exception('Not connected to broker.') if timeout: self.connection.call_later(timeout, self.stop_consuming) try: self.channel.start_consuming() except KeyboardInterrupt: self.stop_consuming() except: # noqa: E722 self.stop_consuming() raise