API Documentation

Publish

domain_event_broker.publish_domain_event(routing_key: str, data: Dict[str, Any], domain_object_id: Optional[str] = None, uuid_string: Optional[str] = None, timestamp: Optional[float] = None, connection_settings: Optional[str] = '') → domain_event_broker.events.DomainEvent[source]

Send a domain event to the message broker. The broker will take care of dispatching the event to registered subscribers.

Parameters:
  • routing_key (str) – The routing key is of the form <DOMAIN>.<EVENT_TYPE>. The routing key should be a descriptive name of the domain event such as user.registered.
  • data (dict) – The actual event data. Must be json serializable.
  • domain_object_id (str) – Domain identifier of the event. This field is optional. If used, it might make search in an event store easier.
  • uuid_string (str) – This UUID identifier of the event. If left None, a new one will be created.
  • timestamp (float) – Unix timestamp. If timestamp is None, a new (UTC) timestamp will be created.
  • connection_settings (str) – Specify the broker with an AMQP URL. If not given, the default broker will be used. If set to None, the domain event is not published to a broker.
Returns:

The domain event that was published.

Return type:

domain_event_broker.DomainEvent

class domain_event_broker.Publisher(connection_settings: Optional[str] = '', exchange: str = 'domain-events', exchange_type: str = 'topic')[source]

Subscribe

class domain_event_broker.Subscriber(*args, **kwargs)[source]

A subscriber manages the registration of one or more event handlers. Once instantiated, call register to add subscribers and start_consuming to wait for incoming events.

Note

The subscriber only uses one thread for processing events. Even if multiple handlers are registered, only one event is processed at a time.

__init__(*args, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

class domain_event_broker.Retry(delay: float = 10.0)[source]

Raise this exception in an event handler to schedule a delayed retry. The delay is specified in seconds.

Note

Internally a delay exchange with a per-message TTL is used and all delayed events for a handler that share the same delay are placed in one queue. The RabbitMQ TTL has a Millisecond resolution.

Replay

domain_event_broker.replay_event(queue_name: str, message_callback: Callable = <function retry_event>, connection_settings: Optional[str] = '') → int[source]

Move one domain event from a dead-letter queue back into the processing queue.

Parameters:
  • queue_name (str) – Name of the queue where to move the event.
  • message_callback (function) – A callable that receives the event and returns either RETRY, LEAVE or DISCARD.
  • connection_settings (str) –
Returns:

The number of messages left in the dead letter queue.

Return type:

int

domain_event_broker.replay_all(queue_name: str, message_callback: Callable = <function retry_event>, connection_settings: Optional[str] = '') → int[source]

Replay all messages currently in the dead-letter queue. Return number of messages dead-lettered since starting the replay.

Domain event

class domain_event_broker.DomainEvent(routing_key: str = '', data: Dict[KT, VT] = {}, domain_object_id: Optional[str] = None, uuid_string: Optional[str] = None, timestamp: Optional[float] = None, retries: int = 0)[source]
__init__(routing_key: str = '', data: Dict[KT, VT] = {}, domain_object_id: Optional[str] = None, uuid_string: Optional[str] = None, timestamp: Optional[float] = None, retries: int = 0) → None[source]

Define a Domain Event.

Parameters:
  • routing_key (str) – The routing key is of the form <DOMAIN>.<EVENT_TYPE>. The routing key should be a descriptive name of the domain event such as user.registered.
  • data (dict) – The actual event data. Must be json serializable.
  • domain_object_id (str) – Domain identifier of the event. This field is optional. If used, it might make search in an event store easier.
  • uuid_string (str) – This UUID identifier of the event. If left None, a new one will be created.
  • timestamp (float) – Unix timestamp. If timestamp is None, a new (UTC) timestamp will be created.
  • retries (int) – Number of times this event was delivered to a subscriber already.
classmethod from_json(json_data: Union[bytes, str]) → domain_event_broker.events.DomainEvent[source]

Create a DomainEvent from json_data. Note that you probably want to dispatch first based on domain and event type.

Parameters:json_data (str) – Serialized domain event data.
Return type:DomainEvent