from typing import Any, Callable, Optional
from .transport import Transport
from . import settings
RETRY = 'retry'
LEAVE = 'leave'
DISCARD = 'discard'
def retry_event(body: bytes, **kwargs: Any) -> str:
return RETRY
[docs]def replay_event(queue_name: str,
message_callback: Callable = retry_event,
connection_settings: Optional[str] = '',
) -> int:
"""
Move one domain event from a dead-letter queue back into the processing queue.
:param str queue_name: Name of the queue where to move the event.
:param function message_callback: A callable that receives the event and
returns either ``RETRY``, ``LEAVE`` or ``DISCARD``.
:param str connection_settings:
:return: The number of messages left in the dead letter queue.
:rtype: int
"""
if connection_settings is None:
return 0
elif connection_settings == '':
connection_settings = settings.BROKER
retry_exchange = queue_name + '-retry'
dead_letter_queue = queue_name + '-dl'
transport = Transport(connection_settings)
transport.connect()
assert transport.channel is not None
frame, header, body = transport.channel.basic_get(dead_letter_queue)
if frame is None:
return 0
action = message_callback(frame=frame, header=header, body=body)
if action == RETRY:
transport.channel.basic_publish(exchange=retry_exchange,
routing_key=frame.routing_key,
body=body,
)
transport.channel.basic_ack(frame.delivery_tag)
elif action == DISCARD:
transport.channel.basic_ack(frame.delivery_tag)
elif action == LEAVE:
transport.channel.basic_reject(frame.delivery_tag, requeue=True)
else:
transport.channel.basic_reject(frame.delivery_tag, requeue=True)
raise Exception("Invalid action '{}'".format(action))
return frame.message_count
[docs]def replay_all(queue_name: str,
message_callback: Callable = retry_event,
connection_settings: Optional[str] = '',
) -> int:
"""
Replay all messages currently in the dead-letter queue.
Return number of messages dead-lettered since starting the replay.
"""
remainder = replay_event(queue_name, message_callback, connection_settings=connection_settings)
for _ in range(remainder):
remainder = replay_event(queue_name, message_callback, connection_settings=connection_settings)
return remainder