Source code for domain_event_broker.events

from datetime import datetime
import json
from typing import Dict, Optional, Union
from uuid import uuid4

FACTOR = 10**6


def to_timestamp(dt: datetime, epoch: datetime = datetime(1970, 1, 1)) -> float:
    """http://stackoverflow.com/questions/8777753/converting-datetime-date-to-utc-timestamp-in-python"""
    td = dt - epoch
    return (td.microseconds + (td.seconds + td.days * 86400) * FACTOR) / FACTOR


[docs]class DomainEvent(object):
[docs] def __init__(self, routing_key: str = "", data: Dict = {}, domain_object_id: Optional[str] = None, uuid_string: Optional[str] = None, # only set if recreated from json repr timestamp: Optional[float] = None, # only set if recreated from json repr retries: int = 0, ) -> None: """ Define a Domain Event. :param str routing_key: The routing key is of the form ``<DOMAIN>.<EVENT_TYPE>``. The routing key should be a descriptive name of the domain event such as ``user.registered``. :param dict data: The actual event data. *Must* be json serializable. :param str domain_object_id: Domain identifier of the event. This field is optional. If used, it might make search in an event store easier. :param str uuid_string: This UUID identifier of the event. If left ``None``, a new one will be created. :param float timestamp: Unix timestamp. If timestamp is None, a new (UTC) timestamp will be created. :param int retries: Number of times this event was delivered to a subscriber already. """ self.routing_key = routing_key self.data = data self.domain_object_id = domain_object_id self.uuid_string = uuid_string if uuid_string is None: self.uuid_string = str(uuid4()) self.timestamp = timestamp if timestamp is None: timestamp = to_timestamp(datetime.utcnow()) self.timestamp = timestamp event_data = self.__dict__.copy() self.event_data = event_data self.retries = retries
[docs] @classmethod def from_json(cls, json_data: Union[bytes, str]) -> 'DomainEvent': """ Create a DomainEvent from ``json_data``. Note that you probably want to dispatch first based on domain and event type. :param str json_data: Serialized domain event data. :rtype: DomainEvent """ return cls(**json.loads(json_data))
def __repr__(self) -> str: return "DomainEvent('{0.routing_key}', '{0.data}', (domain_obj_id: {0.domain_object_id}, uuid: {0.uuid_string}))".format(self) # noqa __str__ = __repr__ def __eq__(self, other: object) -> bool: if not isinstance(other, DomainEvent): return NotImplemented return self.event_data == other.event_data