Skip to content

API Reference

arrlio.settings

ENV_PREFIX module-attribute

ENV_PREFIX = get('ARRLIO_ENV_PREFIX', 'ARRLIO_')

Environment prefix by default.

SERIALIZER module-attribute

SERIALIZER = 'arrlio.serializers.json'

Default serializer module.

BROKER module-attribute

BROKER = 'arrlio.backends.brokers.local'

Default broker module.

RESULT_BACKEND module-attribute

RESULT_BACKEND = 'arrlio.backends.result_backends.local'

Default result backend module.

EVENT_BACKEND module-attribute

EVENT_BACKEND = 'arrlio.backends.event_backends.local'

Default evnt backend module.

TASK_QUEUE module-attribute

TASK_QUEUE = 'arrlio.tasks'

Default task queue.

TASK_MIN_PRIORITY module-attribute

TASK_MIN_PRIORITY = 1

Task minimum priority value.

TASK_MAX_PRIORITY module-attribute

TASK_MAX_PRIORITY = 5

Task maximum priority value.

TASK_PRIORITY module-attribute

TASK_PRIORITY = 1

Task default priority. Lower number means higher priority.

TASK_TIMEOUT module-attribute

TASK_TIMEOUT = 300

Default task timeout in seconds.

TASK_TTL module-attribute

TASK_TTL = 300

Default task TTL in seconds.

TASK_ACK_LATE module-attribute

TASK_ACK_LATE = False

Task ack late option.

TASK_RESULT_TTL module-attribute

TASK_RESULT_TTL = 300

Default task result TTL in seconds.

TASK_RESULT_RETURN module-attribute

TASK_RESULT_RETURN = True

Is it necessary to return the task result by default.

TASK_EVENTS module-attribute

TASK_EVENTS = False

Will events be generated for the task. Requires arrlio.plugins.events plugin.

EVENT_TTL module-attribute

EVENT_TTL = 300

Default event TTL in seconds.

TASK_QUEUES module-attribute

TASK_QUEUES = [TASK_QUEUE]

Default queue names to consume tasks.

EXECUTOR module-attribute

EXECUTOR = 'arrlio.executor'

Default task executor module.

LOG_LEVEL module-attribute

LOG_LEVEL = 'ERROR'

Default logger level.

LOG_SANITIZE module-attribute

LOG_SANITIZE = True

Logger sanitize flag. If True task args and kwargs will be replaces with <hidden> value.

arrlio.configs

SerializerConfig

Bases: BaseSettings, ModuleConfigValidatorMixIn

Config for serializer module.

Attributes:

  • module (SerializerModule) –

    Serializer module as module path or SerializerModule instance. Default.

  • config (ModuleConfig) –

    Module config as dict or module.Config instances.

Examples:

>>> SerializerConfig(
        module='myapp.serializers.custom_serializer',
        config={}
    )
Source code in arrlio/configs.py
class SerializerConfig(BaseSettings, ModuleConfigValidatorMixIn):
    """
    Config for serializer module.

    Attributes:
        module: Serializer module as module path or `SerializerModule` instance.
            [Default][arrlio.settings.SERIALIZER].
        config: Module config as `dict` or `module.Config` instances.

    Examples:
        ```python
        >>> SerializerConfig(
                module='myapp.serializers.custom_serializer',
                config={}
            )
        ```
    """

    module: SerializerModule = cast(SerializerModule, SERIALIZER)
    config: ModuleConfig = Field(default_factory=BaseSettings)

BrokerModuleConfig

Bases: BaseSettings, ModuleConfigValidatorMixIn

Config for broker module.

Default env prefix: ${ENV_PREFIX}BROKER_.

Attributes:

  • module (BrokerModule) –

    Broker module as module path or BrokerModule instance.

    Default.

  • config (ModuleConfig) –

    Module config as dict or module.Config instances.

Examples:

>>> BrokerModuleConfig(
        module='myapp.borkers.custom_broker',
        config={}
    )
Source code in arrlio/configs.py
class BrokerModuleConfig(BaseSettings, ModuleConfigValidatorMixIn):
    """
    Config for broker module.

    Default env prefix: `${ENV_PREFIX}BROKER_`.

    Attributes:
        module: Broker module as module path or `BrokerModule` instance.

            [Default][arrlio.settings.BROKER].

        config: Module config as `dict` or `module.Config` instances.

    Examples:
        ```python
        >>> BrokerModuleConfig(
                module='myapp.borkers.custom_broker',
                config={}
            )
        ```
    """

    model_config = SettingsConfigDict(env_prefix=f"{ENV_PREFIX}BROKER_")

    module: BrokerModule = cast(BrokerModule, BROKER)
    config: ModuleConfig = Field(default_factory=BaseSettings)

ResultBackendModuleConfig

Bases: BaseSettings, ModuleConfigValidatorMixIn

Config for result backend module.

Default env prefix: ${ENV_PREFIX}RESULT_BACKEND_.

Attributes:

  • module (ResultBackendModule) –

    Result backend module as module path or ResultBackendModule instance.

    Default.

  • config (ModuleConfig) –

    Module config as dict or module.Config instances.

Examples:

>>> ResultBackendModuleConfig(
        module='myapp.result_backends.custom_result_backend',
        config={}
    )
Source code in arrlio/configs.py
class ResultBackendModuleConfig(BaseSettings, ModuleConfigValidatorMixIn):
    """
    Config for result backend module.

    Default env prefix: `${ENV_PREFIX}RESULT_BACKEND_`.

    Attributes:
        module: Result backend module as module path or `ResultBackendModule` instance.

            [Default][arrlio.settings.RESULT_BACKEND].

        config: Module config as `dict` or `module.Config` instances.

    Examples:
        ```python
        >>> ResultBackendModuleConfig(
                module='myapp.result_backends.custom_result_backend',
                config={}
            )
        ```
    """

    model_config = SettingsConfigDict(env_prefix=f"{ENV_PREFIX}RESULT_BACKEND_")

    module: ResultBackendModule = cast(ResultBackendModule, RESULT_BACKEND)
    config: ModuleConfig = Field(default_factory=BaseSettings)

EventBackendModuleConfig

Bases: BaseSettings, ModuleConfigValidatorMixIn

Config for event backend module.

Default env prefix: ${ENV_PREFIX}EVENT_BACKEND_.

Attributes:

  • module (EventBackendModule) –

    Event backend module as module path or EventBackendModule instance.

    Default.

  • config (ModuleConfig) –

    Module config as dict or module.Config instances.

Examples:

>>> EventBackendModuleConfig(
        module='myapp.event_backends.custom_event_backend',
        config={}
    )
Source code in arrlio/configs.py
class EventBackendModuleConfig(BaseSettings, ModuleConfigValidatorMixIn):
    """
    Config for event backend module.

    Default env prefix: `${ENV_PREFIX}EVENT_BACKEND_`.

    Attributes:
        module: Event backend module as module path or `EventBackendModule` instance.

            [Default][arrlio.settings.EVENT_BACKEND].

        config: Module config as `dict` or `module.Config` instances.

    Examples:
        ```python
        >>> EventBackendModuleConfig(
                module='myapp.event_backends.custom_event_backend',
                config={}
            )
        ```
    """

    model_config = SettingsConfigDict(env_prefix=f"{ENV_PREFIX}EVENT_BACKEND_")

    module: EventBackendModule = cast(EventBackendModule, EVENT_BACKEND)
    config: ModuleConfig = Field(default_factory=BaseSettings)

TaskConfig

Bases: BaseSettings

Config for task.

Attributes:

  • queue (str) –

    Default queue name for task. Default.

  • priority (TaskPriority) –

    Default task priority. Default.

  • timeout (Optional[Timeout]) –

    Default task excution timeout in seconds. Default.

  • ttl (Optional[TTL]) –

    Default task TTL in seconds. Default.

  • ack_late (bool) –

    Task ack late options. Default.

  • result_return (bool) –

    Is it necessary to return the task result by default. Default.

  • result_ttl (Optional[TTL]) –

    Result TTL in seconds. Default.

  • events (set[str] | bool) –

    Will events be generated for task. This option requires arrlio.plugins.events plugin. Default.

Source code in arrlio/configs.py
class TaskConfig(BaseSettings):
    """
    Config for task.

    Attributes:
        queue: Default queue name for task.
            [Default][arrlio.settings.TASK_QUEUE].
        priority: Default task priority.
            [Default][arrlio.settings.TASK_PRIORITY].
        timeout: Default task excution timeout in seconds.
            [Default][arrlio.settings.TASK_TIMEOUT].
        ttl: Default task TTL in seconds.
            [Default][arrlio.settings.TASK_TTL].
        ack_late: Task ack late options.
            [Default][arrlio.settings.TASK_ACK_LATE].
        result_return: Is it necessary to return the task result by default.
            [Default][arrlio.settings.TASK_RESULT_RETURN].
        result_ttl: Result TTL in seconds.
            [Default][arrlio.settings.TASK_RESULT_TTL].
        events: Will events be generated for task. This option requires `arrlio.plugins.events` plugin.
            [Default][arrlio.settings.TASK_EVENTS].
    """

    model_config = SettingsConfigDict(env_prefix=f"{ENV_PREFIX}TASK_")

    queue: str = Field(default_factory=lambda: TASK_QUEUE)
    priority: TaskPriority = Field(default_factory=lambda: TASK_PRIORITY)
    timeout: Optional[Timeout] = Field(default_factory=lambda: TASK_TIMEOUT)
    ttl: Optional[TTL] = Field(default_factory=lambda: TASK_TTL)
    ack_late: bool = Field(default_factory=lambda: TASK_ACK_LATE)
    result_return: bool = Field(default_factory=lambda: TASK_RESULT_RETURN)
    result_ttl: Optional[TTL] = Field(default_factory=lambda: TASK_RESULT_TTL)
    events: set[str] | bool = Field(default_factory=lambda: TASK_EVENTS)

EventConfig

Bases: BaseSettings

Config for event.

Default env prefix: ${ENV_PREFIX}EVENT_.

Attributes:

  • ttl (Optional[TTL]) –

    Event TTL in seconds. Default

Source code in arrlio/configs.py
class EventConfig(BaseSettings):
    """
    Config for event.

    Default env prefix: `${ENV_PREFIX}EVENT_`.

    Attributes:
        ttl: Event TTL in seconds.
            [Default][arrlio.settings.EVENT_TTL]
    """

    model_config = SettingsConfigDict(env_prefix=f"{ENV_PREFIX}EVENT_")

    ttl: Optional[TTL] = Field(default_factory=lambda: EVENT_TTL)

PluginModuleConfig

Bases: ModuleConfigValidatorMixIn, BaseSettings

Config for plugin module.

Attributes:

  • module (PluginModule) –

    Plugin module as module path or PluginModule instance.

  • config (ModuleConfig) –

    Module config as dict or module.Config instances.

Source code in arrlio/configs.py
class PluginModuleConfig(ModuleConfigValidatorMixIn, BaseSettings):
    """
    Config for plugin module.

    Attributes:
        module: Plugin module as module path or `PluginModule` instance.
        config: Module config as `dict` or `module.Config` instances.
    """

    model_config = SettingsConfigDict()

    module: PluginModule
    config: ModuleConfig = Field(default_factory=BaseSettings)

ExecutorModuleConfig

Bases: ModuleConfigValidatorMixIn, BaseSettings

Config for executor module.

Default env prefix: ${ENV_PREFIX}EXECUTOR_.

Attributes:

  • module (ExecutorModule) –

    Executor module as module path or ExecutorModule instance.

    Default.

  • config (ModuleConfig) –

    Module config as dict or module.Config instances.

Source code in arrlio/configs.py
class ExecutorModuleConfig(ModuleConfigValidatorMixIn, BaseSettings):
    """
    Config for executor module.

    Default env prefix: `${ENV_PREFIX}EXECUTOR_`.

    Attributes:
        module: Executor module as module path or `ExecutorModule` instance.

            [Default][arrlio.settings.EXECUTOR].

        config: Module config as `dict` or `module.Config` instances.
    """

    model_config = SettingsConfigDict(env_prefix=f"{ENV_PREFIX}EXECUTOR_")

    module: ExecutorModule = cast(ExecutorModule, EXECUTOR)
    config: ModuleConfig = Field(default_factory=BaseSettings)

Config

Bases: BaseSettings

Arrlio application main config.

Attributes:

Source code in arrlio/configs.py
class Config(BaseSettings):
    """
    Arrlio application main config.

    Attributes:
        app_id: Arrlio application Id.
        broker: Broker module config.
        result_backend: Result backend module config.
        event_backend: Event backend module config.
        task: Task config.
        event: Event config.
        task_queues: Task queues to consume.
        plugins: List of plugins.
        executor: Executor module config.
    """

    model_config = SettingsConfigDict(env_prefix=ENV_PREFIX)

    app_id: Annotated[str, MinLen(4)] = Field(default_factory=lambda: f"{uuid4().hex[-6:]}")
    broker: BrokerModuleConfig = Field(default_factory=BrokerModuleConfig)
    result_backend: ResultBackendModuleConfig = Field(default_factory=ResultBackendModuleConfig)
    event_backend: EventBackendModuleConfig = Field(default_factory=EventBackendModuleConfig)
    task: TaskConfig = Field(default_factory=TaskConfig)
    event: EventConfig = Field(default_factory=EventConfig)
    task_queues: UniqueList[str] = Field(default_factory=lambda: TASK_QUEUES)
    plugins: list[PluginModuleConfig] = Field(default_factory=list)
    executor: ExecutorModuleConfig = Field(default_factory=ExecutorModuleConfig)

arrlio.models

FuncProxy

Proxy class for function object.

Source code in arrlio/models.py
class FuncProxy:
    """Proxy class for function object."""

    def __init__(self, func):
        self._original = func

    def __getattribute__(self, name: str):
        if name not in ("_original", "__deepcopy__"):
            return getattr(object.__getattribute__(self, "_original"), name)
        return object.__getattribute__(self, name)

    def __str__(self):
        return self._original.__str__()

    def __repr__(self):
        return self._original.__repr__()

    def __call__(self, *args, **kwds):
        return self._original(*args, **kwds)

    def __deepcopy__(self, memo):
        return self

Shared

Bases: MutableMapping

Object to share settings between broker/result_backend/event_backend.

Source code in arrlio/models.py
class Shared(MutableMapping):
    """Object to share settings between broker/result_backend/event_backend."""

    def __init__(self):
        self._data = {}

    def __getitem__(self, *args, **kwds):
        return self._data.__getitem__(*args, **kwds)

    def __setitem__(self, *args, **kwds):
        return self._data.__setitem__(*args, **kwds)

    def __delitem__(self, *args, **kwds):
        return self._data.__delitem__(*args, **kwds)

    def __contains__(self, *args, **kwds):
        return self._data.__contains__(*args, **kwds)

    def __len__(self, *args, **kwds):
        return self._data.__len__(*args, **kwds)

    def __iter__(self, *args, **kwds):
        return self._data.__iter__(*args, **kwds)

    def __deepcopy__(self, memo):
        return self

    def get(self, *args, **kwds):
        return self._data.get(*args, **kwds)

    def update(self, *args, **kwds):
        return self._data.update(*args, **kwds)

Task dataclass

Task.

Attributes:

  • func (Callable | AsyncCallable) –

    Task function.

  • name (str) –

    Task name.

  • queue (str) –

    Task queue. Default.

  • priority (TaskPriority) –

    Task priority (min, max).

  • timeout (Timeout) –

    Task timeout in seconds. Default.

  • ttl (Timeout) –

    Task TTL in seconds. Default.

  • ack_late (bool) –

    Ack late option. Default.

  • result_ttl (Timeout) –

    Task result TTL in seconds. Default.

  • result_return (bool) –

    Whether the worker should return or not the result of the task. Default.

  • thread (Optional[bool]) –

    Should Executor execute task in the separate thread.

  • events (bool | set[str]) –

    Enable or disable events for the task. Default.

  • event_ttl (Timeout) –

    Event TTL in seconds. Default.

  • headers (dict) –

    Task headers.

  • loads (Optional[Callable]) –

    Function to load task arguments.

  • dumps (Optional[Callable]) –

    Function to dump task result.

Source code in arrlio/models.py
@dataclass(slots=True, frozen=True)
class Task:
    """
    Task.

    Attributes:
        func: Task function.
        name: Task name.
        queue: Task queue.
            [Default][arrlio.settings.TASK_QUEUE].
        priority: Task priority
            ([min][arrlio.settings.TASK_MIN_PRIORITY], [max][arrlio.settings.TASK_MAX_PRIORITY]).
        timeout: Task timeout in seconds.
            [Default][arrlio.settings.TASK_TIMEOUT].
        ttl: Task TTL in seconds.
            [Default][arrlio.settings.TASK_TTL].
        ack_late: Ack late option.
            [Default][arrlio.settings.TASK_ACK_LATE].
        result_ttl: Task result TTL in seconds.
            [Default][arrlio.settings.TASK_RESULT_TTL].
        result_return: Whether the worker should return or not the result of the task.
            [Default][arrlio.settings.TASK_RESULT_RETURN].
        thread: Should `Executor` execute task in the separate thread.
        events: Enable or disable events for the task.
            [Default][arrlio.settings.TASK_EVENTS].
        event_ttl: Event TTL in seconds.
            [Default][arrlio.settings.EVENT_TTL].
        headers: Task headers.
        loads: Function to load task arguments.
        dumps: Function to dump task result.
    """

    func: Callable | AsyncCallable

    name: str
    queue: str = TASK_QUEUE
    priority: TaskPriority = TASK_PRIORITY
    timeout: Timeout = TASK_TIMEOUT
    ttl: Timeout = TASK_TTL
    ack_late: bool = TASK_ACK_LATE
    result_ttl: Timeout = TASK_RESULT_TTL
    result_return: bool = TASK_RESULT_RETURN
    thread: Optional[bool] = None
    events: bool | set[str] = TASK_EVENTS
    event_ttl: Timeout = EVENT_TTL
    headers: dict = field(default_factory=dict)

    # NOTE
    # args_kwds_loads
    # result_dumps
    # result_loads
    loads: Optional[Callable] = None
    dumps: Optional[Callable] = None

    def __post_init__(self):
        if self.func:
            object.__setattr__(self, "func", FuncProxy(self.func))
        if self.loads:
            object.__setattr__(self, "loads", FuncProxy(self.loads))
        if self.dumps:
            object.__setattr__(self, "dumps", FuncProxy(self.dumps))

    def __call__(self, *args, **kwds) -> Any:
        """Call task function with args and kwds."""

        return self.func(*args, **kwds)

    def asdict(
        self,
        exclude: list[str] | None = None,
        sanitize: bool | None = None,
    ) -> dict:
        """Convert to dict.

        Args:
            exclude: fields to exclude.
            sanitize: flag to sanitize sensitive data.

        Returns:
            `Task` as `dict`.
        """

        # exclude = (exclude or []) + ["loads", "dumps"]
        exclude = exclude or []
        return {k: v for k, v in asdict(self).items() if k not in exclude}

    def pretty_repr(self, exclude: list[str] | None = None, sanitize: bool | None = None):
        return pretty_repr(self.asdict(exclude=exclude, sanitize=sanitize))

    def instantiate(
        self,
        task_id: TaskId | None = None,
        args: Args | None = None,
        kwds: Kwds | None = None,
        meta: dict | None = None,
        headers: dict | None = None,
        **kwargs,
    ) -> "TaskInstance":
        """
        Instantiate new `TaskInstance` object with provided arguments.

        Args:
            task_id: Task Id.
            args: Task positional arguments.
            kwds: Task keyword arguments.
            meta: Task additional `meta` keyword argument.
            headers: Task headers.

        Returns:
            Task instance.
        """

        headers = {**self.headers, **(headers or {})}
        return TaskInstance(
            **{
                **self.asdict(),
                "task_id": task_id,
                "args": args or (),
                "kwds": kwds or {},
                "meta": meta or {},
                "headers": headers or {},
                **kwargs,
            }
        )
__call__
__call__(*args, **kwds) -> Any

Call task function with args and kwds.

Source code in arrlio/models.py
def __call__(self, *args, **kwds) -> Any:
    """Call task function with args and kwds."""

    return self.func(*args, **kwds)
asdict
asdict(
    exclude: list[str] | None = None,
    sanitize: bool | None = None,
) -> dict

Convert to dict.

Parameters:

  • exclude (list[str] | None, default: None ) –

    fields to exclude.

  • sanitize (bool | None, default: None ) –

    flag to sanitize sensitive data.

Returns:

  • dict

    Task as dict.

Source code in arrlio/models.py
def asdict(
    self,
    exclude: list[str] | None = None,
    sanitize: bool | None = None,
) -> dict:
    """Convert to dict.

    Args:
        exclude: fields to exclude.
        sanitize: flag to sanitize sensitive data.

    Returns:
        `Task` as `dict`.
    """

    # exclude = (exclude or []) + ["loads", "dumps"]
    exclude = exclude or []
    return {k: v for k, v in asdict(self).items() if k not in exclude}
instantiate
instantiate(
    task_id: TaskId | None = None,
    args: Args | None = None,
    kwds: Kwds | None = None,
    meta: dict | None = None,
    headers: dict | None = None,
    **kwargs,
) -> TaskInstance

Instantiate new TaskInstance object with provided arguments.

Parameters:

  • task_id (TaskId | None, default: None ) –

    Task Id.

  • args (Args | None, default: None ) –

    Task positional arguments.

  • kwds (Kwds | None, default: None ) –

    Task keyword arguments.

  • meta (dict | None, default: None ) –

    Task additional meta keyword argument.

  • headers (dict | None, default: None ) –

    Task headers.

Returns:

Source code in arrlio/models.py
def instantiate(
    self,
    task_id: TaskId | None = None,
    args: Args | None = None,
    kwds: Kwds | None = None,
    meta: dict | None = None,
    headers: dict | None = None,
    **kwargs,
) -> "TaskInstance":
    """
    Instantiate new `TaskInstance` object with provided arguments.

    Args:
        task_id: Task Id.
        args: Task positional arguments.
        kwds: Task keyword arguments.
        meta: Task additional `meta` keyword argument.
        headers: Task headers.

    Returns:
        Task instance.
    """

    headers = {**self.headers, **(headers or {})}
    return TaskInstance(
        **{
            **self.asdict(),
            "task_id": task_id,
            "args": args or (),
            "kwds": kwds or {},
            "meta": meta or {},
            "headers": headers or {},
            **kwargs,
        }
    )

TaskInstance dataclass

Bases: Task

Task instance dataclass.

Attributes:

  • task_id (UUID) –

    Task Id.

  • args (Args) –

    Task function positional arguments.

  • kwds (Kwds) –

    Task function keyword arguments.

  • meta (dict) –

    Task function additional meta keyword argument.

Source code in arrlio/models.py
@dataclass(slots=True, frozen=True)
class TaskInstance(Task):
    """
    Task instance `dataclass`.

    Attributes:
        task_id: Task Id.
        args: Task function positional arguments.
        kwds: Task function keyword arguments.
        meta: Task function additional `meta` keyword argument.
    """

    task_id: UUID = field(default_factory=uuid4)
    args: Args = field(default_factory=tuple)
    kwds: Kwds = field(default_factory=dict)
    meta: dict = field(default_factory=dict)

    shared: Shared = field(default_factory=Shared, init=False)

    sanitizer: ClassVar[Optional[Callable]] = None

    def __post_init__(self):
        if self.task_id is None:
            object.__setattr__(self, "task_id", uuid4())
        elif isinstance(self.task_id, str):
            object.__setattr__(self, "task_id", UUID(self.task_id))
        if not isinstance(self.args, tuple):
            object.__setattr__(self, "args", tuple(self.args))

    def asdict(self, exclude: list[str] | None = None, sanitize: bool | None = None) -> dict:
        """
        Convert to dict.

        Args:
            exclude: fields to exclude.
            sanitize: flag to sanitize sensitive data.

        Returns:
            `TaskInstance` as `dict`.
        """

        exclude = exclude or []
        data = super(TaskInstance, self).asdict(exclude=exclude, sanitize=sanitize)
        if sanitize:
            if self.sanitizer:
                data = self.sanitizer(data)
            else:
                if data["args"]:
                    data["args"] = "<hidden>"
                if data["kwds"]:
                    data["kwds"] = "<hidden>"
        return data

    def pretty_repr(self, exclude: list[str] | None = None, sanitize: bool | None = None):
        exclude = (exclude or []) + ["shared"]
        return pretty_repr(self.asdict(exclude=exclude, sanitize=sanitize))

    def __call__(self, meta: bool | None = None):
        """
        Call `TaskInstance`.

        Args:
            meta: Add additional keyword argument `meta` to the task function call.
        """

        args = self.args
        kwds = self.kwds
        if meta is True:
            kwds = {"meta": self.meta, **kwds}
        if isinstance(self.func, type):
            func = self.func()
        else:
            func = self.func
        return func(*args, **kwds)

    def instantiate(self, *args, **kwds):
        raise NotImplementedError
asdict
asdict(
    exclude: list[str] | None = None,
    sanitize: bool | None = None,
) -> dict

Convert to dict.

Parameters:

  • exclude (list[str] | None, default: None ) –

    fields to exclude.

  • sanitize (bool | None, default: None ) –

    flag to sanitize sensitive data.

Returns:

  • dict

    TaskInstance as dict.

Source code in arrlio/models.py
def asdict(self, exclude: list[str] | None = None, sanitize: bool | None = None) -> dict:
    """
    Convert to dict.

    Args:
        exclude: fields to exclude.
        sanitize: flag to sanitize sensitive data.

    Returns:
        `TaskInstance` as `dict`.
    """

    exclude = exclude or []
    data = super(TaskInstance, self).asdict(exclude=exclude, sanitize=sanitize)
    if sanitize:
        if self.sanitizer:
            data = self.sanitizer(data)
        else:
            if data["args"]:
                data["args"] = "<hidden>"
            if data["kwds"]:
                data["kwds"] = "<hidden>"
    return data
__call__
__call__(meta: bool | None = None)

Call TaskInstance.

Parameters:

  • meta (bool | None, default: None ) –

    Add additional keyword argument meta to the task function call.

Source code in arrlio/models.py
def __call__(self, meta: bool | None = None):
    """
    Call `TaskInstance`.

    Args:
        meta: Add additional keyword argument `meta` to the task function call.
    """

    args = self.args
    kwds = self.kwds
    if meta is True:
        kwds = {"meta": self.meta, **kwds}
    if isinstance(self.func, type):
        func = self.func()
    else:
        func = self.func
    return func(*args, **kwds)

TaskResult dataclass

Task result dataclass.

Attributes:

  • res (Any) –

    Task result.

  • exc (Optional[Exception | tuple[str, str, str]]) –

    Task exception.

  • trb (Optional[TracebackType | str]) –

    Task exception traceback.

  • idx (Optional[tuple[str, int]]) –

    Result index.

  • routes (Optional[str | list[str]]) –

    Task result routes.

Source code in arrlio/models.py
@dataclass(slots=True, frozen=True)
class TaskResult:
    """
    Task result `dataclass`.

    Attributes:
        res: Task result.
        exc: Task exception.
        trb: Task exception traceback.
        idx: Result index.
        routes: Task result routes.
    """

    res: Any = None
    exc: Optional[Exception | tuple[str, str, str]] = None
    trb: Optional[TracebackType | str] = None
    idx: Optional[tuple[str, int]] = None
    routes: Optional[str | list[str]] = None

    def set_idx(self, idx: tuple[str, int]):
        object.__setattr__(self, "idx", idx)

    def asdict(self, sanitize: bool | None = None) -> dict:
        """
        Convert to dict.

        Args:
            sanitize: flag to sanitize sensitive data.

        Returns:
            `TaskResult` as `dict`.
        """

        return {
            "res": self.res if self.res is None or not sanitize else "<hidden>",
            "exc": self.exc,
            "trb": self.trb,
            "idx": self.idx,
            "routes": self.routes,
        }

    def pretty_repr(self, sanitize: bool | None = None):
        return pretty_repr(self.asdict(sanitize=sanitize))
asdict
asdict(sanitize: bool | None = None) -> dict

Convert to dict.

Parameters:

  • sanitize (bool | None, default: None ) –

    flag to sanitize sensitive data.

Returns:

  • dict

    TaskResult as dict.

Source code in arrlio/models.py
def asdict(self, sanitize: bool | None = None) -> dict:
    """
    Convert to dict.

    Args:
        sanitize: flag to sanitize sensitive data.

    Returns:
        `TaskResult` as `dict`.
    """

    return {
        "res": self.res if self.res is None or not sanitize else "<hidden>",
        "exc": self.exc,
        "trb": self.trb,
        "idx": self.idx,
        "routes": self.routes,
    }

Event dataclass

Event dataclass.

Attributes:

  • type (str) –

    Event type.

  • event_id (UUID) –

    Event Id.

  • dt (Optional[datetime]) –

    Event datetime.

  • ttl (Timeout) –

    Event TTL in seconds.

Source code in arrlio/models.py
@dataclass(slots=True, frozen=True)
class Event:
    """
    Event `dataclass`.

    Attributes:
        type: Event type.
        event_id: Event Id.
        dt: Event datetime.
        ttl: Event TTL in seconds.
    """

    type: str
    data: dict
    event_id: UUID = field(default_factory=uuid4)
    dt: Optional[datetime.datetime] = None
    ttl: Timeout = EVENT_TTL

    def __post_init__(self):
        if not isinstance(self.event_id, UUID):
            object.__setattr__(self, "event_id", UUID(self.event_id))
        if self.dt is None:
            object.__setattr__(self, "dt", datetime.datetime.now(tz=datetime.timezone.utc))
        elif isinstance(self.dt, str):
            object.__setattr__(self, "dt", datetime.datetime.fromisoformat(self.dt))

    def asdict(self, sanitize: bool | None = None) -> dict:
        """
        Convert to dict.

        Args:
            sanitize: flag to sanitize sensitive data.

        Returns:
            `Event` as `dict`.
        """

        data = asdict(self)
        if hasattr(data["data"], "sanitize"):
            data["data"] = data["data"].sanitize()
        return data

    def pretty_repr(self, sanitize: bool | None = None):
        return pretty_repr(self.asdict(sanitize=sanitize))
asdict
asdict(sanitize: bool | None = None) -> dict

Convert to dict.

Parameters:

  • sanitize (bool | None, default: None ) –

    flag to sanitize sensitive data.

Returns:

  • dict

    Event as dict.

Source code in arrlio/models.py
def asdict(self, sanitize: bool | None = None) -> dict:
    """
    Convert to dict.

    Args:
        sanitize: flag to sanitize sensitive data.

    Returns:
        `Event` as `dict`.
    """

    data = asdict(self)
    if hasattr(data["data"], "sanitize"):
        data["data"] = data["data"].sanitize()
    return data

Graph

Graph class.

Parameters:

  • name (str) –

    graph name.

  • nodes (dict[str, list] | None, default: None ) –

    graph nodes.

  • edges (dict[str, list] | None, default: None ) –

    graph edges.

  • roots (set | None, default: None ) –

    graph roots.

Source code in arrlio/models.py
class Graph:
    """
    Graph class.

    Args:
        name: graph name.
        nodes: graph nodes.
        edges: graph edges.
        roots: graph roots.
    """

    def __init__(
        self,
        name: str,
        nodes: dict[str, list] | None = None,
        edges: dict[str, list] | None = None,
        roots: set | None = None,
    ):
        self.name = name
        self.nodes: dict[str, list] = rodict({}, nested=True)
        self.edges: dict[str, list] = rodict({}, nested=True)
        self.roots: set[str] = roset(set())
        nodes = nodes or {}
        edges = edges or {}
        roots = roots or set()
        for node_id, (task, kwds) in nodes.items():
            self.add_node(node_id, task, root=node_id in roots, **kwds)
        for node_id_from, nodes_to in edges.items():
            for node_id_to, routes in nodes_to:
                self.add_edge(node_id_from, node_id_to, routes=routes)

    def __str__(self):
        return f"{self.__class__.__name__}(name={self.name} nodes={self.nodes} edges={self.edges} roots={self.roots}"

    def __repr__(self):
        return self.__str__()

    def add_node(self, node_id: str, task: Task | str, root: bool | None = None, **kwds):
        """
        Add node to the graph.

        Args:
            node_id: Node Id.
            task: `Task` or task name.
            root: Is node the root of the graph.
        """

        if node_id in self.nodes:
            raise GraphError(f"Node '{node_id}' already in graph")
        if isinstance(task, Task):
            task = task.name
        self.nodes.__original__[node_id] = (task, kwds)
        if root:
            self.roots.__original__.add(node_id)

    def add_edge(self, node_id_from: str, node_id_to: str, routes: str | list[str] | None = None):
        """
        Add edge to the graph.
        If routes are specified then only results with a matching route will be passed to the incoming node.

        Args:
            node_id_from: Outgoing node.
            node_id_to: Incomming node.
            routes: Edge route.
        """

        if node_id_from not in self.nodes:
            raise GraphError(f"Node '{node_id_from}' not found in graph")
        if node_id_to not in self.nodes:
            raise GraphError(f"Node '{node_id_to}' not found in graph")
        if isinstance(routes, str):
            routes = [routes]
        self.edges.__original__.setdefault(node_id_from, []).append((node_id_to, routes))

    def asdict(self, sanitize: bool | None = None) -> dict:
        """
        Convert to the dict.

        Args:
            sanitize: flag to sanitize sensitive data.

        Returns:
            `Graph` as `dict`.
        """

        return {
            "name": self.name,
            "nodes": self.nodes,
            "edges": self.edges,
            "roots": self.roots,
        }

    @classmethod
    def from_dict(cls, data: dict) -> "Graph":
        """
        Create `Graph` from `dict`.

        Args:
            data: Data as dictionary object.

        Returns:
            `Graph` object.
        """

        return cls(
            name=data["name"],
            nodes=data["nodes"],
            edges=data["edges"],
            roots=data["roots"],
        )

    def pretty_repr(self, sanitize: bool | None = None):
        return pretty_repr(self.asdict(sanitize=sanitize))
add_node
add_node(
    node_id: str,
    task: Task | str,
    root: bool | None = None,
    **kwds,
)

Add node to the graph.

Parameters:

  • node_id (str) –

    Node Id.

  • task (Task | str) –

    Task or task name.

  • root (bool | None, default: None ) –

    Is node the root of the graph.

Source code in arrlio/models.py
def add_node(self, node_id: str, task: Task | str, root: bool | None = None, **kwds):
    """
    Add node to the graph.

    Args:
        node_id: Node Id.
        task: `Task` or task name.
        root: Is node the root of the graph.
    """

    if node_id in self.nodes:
        raise GraphError(f"Node '{node_id}' already in graph")
    if isinstance(task, Task):
        task = task.name
    self.nodes.__original__[node_id] = (task, kwds)
    if root:
        self.roots.__original__.add(node_id)
add_edge
add_edge(
    node_id_from: str,
    node_id_to: str,
    routes: str | list[str] | None = None,
)

Add edge to the graph. If routes are specified then only results with a matching route will be passed to the incoming node.

Parameters:

  • node_id_from (str) –

    Outgoing node.

  • node_id_to (str) –

    Incomming node.

  • routes (str | list[str] | None, default: None ) –

    Edge route.

Source code in arrlio/models.py
def add_edge(self, node_id_from: str, node_id_to: str, routes: str | list[str] | None = None):
    """
    Add edge to the graph.
    If routes are specified then only results with a matching route will be passed to the incoming node.

    Args:
        node_id_from: Outgoing node.
        node_id_to: Incomming node.
        routes: Edge route.
    """

    if node_id_from not in self.nodes:
        raise GraphError(f"Node '{node_id_from}' not found in graph")
    if node_id_to not in self.nodes:
        raise GraphError(f"Node '{node_id_to}' not found in graph")
    if isinstance(routes, str):
        routes = [routes]
    self.edges.__original__.setdefault(node_id_from, []).append((node_id_to, routes))
asdict
asdict(sanitize: bool | None = None) -> dict

Convert to the dict.

Parameters:

  • sanitize (bool | None, default: None ) –

    flag to sanitize sensitive data.

Returns:

  • dict

    Graph as dict.

Source code in arrlio/models.py
def asdict(self, sanitize: bool | None = None) -> dict:
    """
    Convert to the dict.

    Args:
        sanitize: flag to sanitize sensitive data.

    Returns:
        `Graph` as `dict`.
    """

    return {
        "name": self.name,
        "nodes": self.nodes,
        "edges": self.edges,
        "roots": self.roots,
    }
from_dict classmethod
from_dict(data: dict) -> Graph

Create Graph from dict.

Parameters:

  • data (dict) –

    Data as dictionary object.

Returns:

  • Graph

    Graph object.

Source code in arrlio/models.py
@classmethod
def from_dict(cls, data: dict) -> "Graph":
    """
    Create `Graph` from `dict`.

    Args:
        data: Data as dictionary object.

    Returns:
        `Graph` object.
    """

    return cls(
        name=data["name"],
        nodes=data["nodes"],
        edges=data["edges"],
        roots=data["roots"],
    )

arrlio.core

App

Arrlio application.

Parameters:

  • config (Config) –

    Arrlio application Config.

Source code in arrlio/core.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
class App:
    """
    Arrlio application.

    Args:
        config: Arrlio application `Config`.
    """

    def __init__(self, config: Config):
        self.config = config

        self._broker = config.broker.module.Broker(config.broker.config)
        self._result_backend = config.result_backend.module.ResultBackend(config.result_backend.config)
        self._event_backend = config.event_backend.module.EventBackend(config.event_backend.config)

        self._closed: asyncio.Future = asyncio.Future()
        self._running_tasks: dict[UUID, asyncio.Task] = {}
        self._executor = config.executor.module.Executor(config.executor.config)
        self._context = ContextVar("context", default={})

        self._hooks = {
            "on_init": [],
            "on_close": [],
            "on_task_send": [],
            "on_task_received": [],
            "on_task_result": [],
            "on_task_done": [],
            "task_context": [],
        }

        self._plugins = {}
        for plugin_config in config.plugins:
            plugin = plugin_config.module.Plugin(self, plugin_config.config)
            self._plugins[plugin.name] = plugin
            for k, hooks in self._hooks.items():
                if getattr(plugin, k).__func__ != getattr(Plugin, k):
                    hooks.append(getattr(plugin, k))

        self._task_settings = {
            k: v for k, v in config.task.model_dump(exclude_unset=True).items() if k in dataclasses.fields(Task)
        }

    def __str__(self):
        return f"{self.__class__.__name__}[{self.config.app_id}]"

    def __repr__(self):
        return self.__str__()

    async def __aenter__(self):
        await self.init()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        await self.close()

    @property
    def hooks(self):
        """Application hooks."""

        return rodict(self._hooks, nested=True)

    @property
    def plugins(self) -> dict[str, Plugin]:
        """Application plugins."""

        return rodict(self._plugins, nested=True)

    @property
    def broker(self) -> AbstractBroker:
        """Application broker."""

        return self._broker

    @property
    def result_backend(self) -> AbstractResultBackend:
        """Application result storage."""

        return self._result_backend

    @property
    def event_backend(self) -> AbstractEventBackend | None:
        """Application event backend."""

        return self._event_backend

    @property
    def executor(self) -> Executor:
        """Application executor."""

        return self._executor

    @property
    def context(self) -> dict:
        """Application current context."""

        return self._context.get()

    @property
    def is_closed(self) -> bool:
        """Application close satatus."""

        return self._closed.done()

    @property
    def task_settings(self) -> dict:
        return self._task_settings

    async def init(self):
        """Init application and plugins."""

        if self.is_closed:
            return

        logger.info(_("%s initializing with config\n%s"), self, pretty_repr(self.config.model_dump()))

        await self._broker.init()
        await self._result_backend.init()
        await self._event_backend.init()

        await self._execute_hooks("on_init")

        logger.info(_("%s initialization done"), self)

    async def close(self):
        """Close application."""

        if self.is_closed:
            return

        try:
            await self._execute_hooks("on_close")
            for hooks in self._hooks.values():
                hooks.clear()

            await gather(
                self.stop_consume_tasks(),
                self.stop_consume_events(),
                return_exceptions=True,
            )

            await self._broker.close()
            await self._result_backend.close()
            await self._event_backend.close()

            for task_id, aio_task in tuple(self._running_tasks.items()):
                logger.warning(_("%s cancel processing task '%s'"), self, task_id)
                aio_task.cancel()
                try:
                    await aio_task
                except asyncio.CancelledError:
                    pass
            self._running_tasks = {}

        finally:
            self._closed.set_result(None)

    async def _execute_hook(self, hook_fn, *args, **kwds):
        try:
            if is_debug_level():
                logger.debug(_("%s execute hook %s"), self, hook_fn)
            await hook_fn(*args, **kwds)
        except Exception as e:
            logger.exception(_("%s hook %s error"), self, hook_fn)
            raise e

    async def _execute_hooks(self, hook: str, *args, **kwds):
        try:
            async with TaskGroup() as tg:
                for hook_fn in self._hooks[hook]:
                    tg.create_task(self._execute_hook(hook_fn, *args, **kwds))
        except ExceptionGroup as eg:
            raise HooksError(exceptions=eg.exceptions)

    async def send_task(
        self,
        task: Task | str,
        args: Args | None = None,
        kwds: Kwds | None = None,
        headers: dict | None = None,
        **kwargs: dict,
    ) -> "AsyncResult":
        """
        Send task.

        Args:
            task: `Task` or task name.
            args: Task positional arguments.
            kwds: Task keyword arguments.
            headers: `Task` headers argument.
            kwargs: Other `Task` arguments.

        Returns:
            Task `AsyncResult`.
        """

        name = task
        if isinstance(task, Task):
            name = task.name

        if headers is None:
            headers = {}

        headers["app_id"] = self.config.app_id

        if name in registered_tasks:
            task_instance = registered_tasks[name].instantiate(
                args=args,
                kwds=kwds,
                headers=headers,
                **{**self._task_settings, **kwargs},
            )
        else:
            task_instance = Task(None, name).instantiate(
                args=args,
                kwds=kwds,
                headers=headers,
                **{**self._task_settings, **kwargs},
            )

        task_instance.headers.update(self._result_backend.make_headers(task_instance))
        task_instance.shared.update(self._result_backend.make_shared(task_instance))

        if is_info_level():
            logger.info(
                _("%s send task\n%s"),
                self,
                task_instance.pretty_repr(sanitize=settings.LOG_SANITIZE),
            )

        await self._execute_hooks("on_task_send", task_instance)

        await self._broker.send_task(task_instance)

        await self._result_backend.allocate_storage(task_instance)

        return AsyncResult(self, task_instance)

    async def send_event(self, event: Event):
        """
        Send event.

        Args:
            event: Event to send.
        """
        if is_info_level():
            logger.info(_("%s send event\n%s"), self, event.pretty_repr(sanitize=settings.LOG_SANITIZE))

        await self._event_backend.send_event(event)

    async def pop_result(self, task_instance: TaskInstance) -> AsyncGenerator[TaskResult, None]:
        """
        Pop result for provided `TaskInstance`.

        Args:
            task_instance: Task instance.

        Yields:
            Task result.
        """
        async for task_result in self._result_backend.pop_task_result(task_instance):
            if is_info_level():
                logger.info(
                    _("%s got result[idx=%s, exc=%s] for task %s[%s]"),
                    self,
                    task_result.idx,
                    task_result.exc is not None,
                    task_instance.name,
                    task_instance.task_id,
                )
            if task_result.exc:
                if isinstance(task_result.exc, TaskError):
                    raise task_result.exc
                raise TaskError(task_instance.task_id, task_result.exc, task_result.trb)

            yield task_result.res

    def cancel_local_task(self, task_id: UUID | str):
        if isinstance(task_id, str):
            task_id = UUID(f"{task_id}")
        if task_id not in self._running_tasks:
            raise TaskNotFoundError(task_id)
        self._running_tasks[task_id].cancel()

    async def consume_tasks(self, queues: list[str] | None = None):
        """
        Consume tasks from the queues.

        Args:
            queues: List of queue names to consume.
        """

        queues = queues or self.config.task_queues
        if not queues:
            return

        async def cb(task_instance: TaskInstance):
            task_id: UUID = task_instance.task_id
            self._running_tasks[task_id] = current_task()

            idx_0 = uuid4().hex
            idx_1 = 0

            self._context.set({})
            context = self.context

            try:
                task_result: TaskResult = TaskResult()

                async with AsyncExitStack() as stack:
                    try:
                        context["task_instance"] = task_instance

                        for context_hook in self._hooks["task_context"]:
                            await stack.enter_async_context(context_hook(task_instance))

                        await self._execute_hooks("on_task_received", task_instance)

                        async for task_result in self.execute_task(task_instance):
                            task_result.set_idx((idx_0, idx_1 + 1))

                            if task_instance.result_return:
                                await self._result_backend.push_task_result(task_result, task_instance)

                            await self._execute_hooks("on_task_result", task_instance, task_result)
                            idx_1 += 1

                    except (asyncio.CancelledError, Exception) as e:
                        if isinstance(e, asyncio.CancelledError):
                            logger.error(_("%s task %s[%s] cancelled"), self, task_instance.name, task_id)
                            task_result = TaskResult(exc=TaskCancelledError(task_id))
                            raise e
                        if isinstance(e, HooksError):
                            if len(e.exceptions) == 1:
                                e = e.exceptions[0]
                            else:
                                e = TaskError(exceptions=e.exceptions)
                            logger.error(_("%s task %s[%s] %s: %s"), self, task_instance.name, task_id, e.__class__, e)
                            task_result = TaskResult(exc=e)
                        else:
                            logger.exception(e)
                            task_result = TaskResult(exc=InternalError())
                        task_result.set_idx((idx_0, idx_1 + 1))
                        if task_instance.result_return:
                            await self._result_backend.push_task_result(task_result, task_instance)
                        idx_1 += 1
                    finally:
                        try:
                            if task_instance.result_return and not task_instance.headers.get("graph:graph"):
                                func = task_instance.func
                                if isasyncgenfunction(func) or isgeneratorfunction(func):
                                    await self._result_backend.close_task(task_instance, idx=(idx_0, idx_1 + 1))
                                    idx_1 += 1
                        finally:
                            await self._execute_hooks("on_task_done", task_instance, task_result)

            except Exception as e:
                logger.exception(e)
            finally:
                self._running_tasks.pop(task_id, None)

        await self._broker.consume_tasks(queues, cb)
        logger.info(_("%s consuming task queues %s"), self, queues)

    async def stop_consume_tasks(self, queues: list[str] | None = None):
        """
        Stop consuming tasks.

        Args:
            queues: List of queue names to stop consume.
        """

        await self._broker.stop_consume_tasks(queues=queues)
        if queues is not None:
            logger.info(_("%s stop consuming task queues %s"), self, queues)
        else:
            logger.info(_("%s stop consuming task queues"), self)

    async def execute_task(self, task_instance: TaskInstance) -> AsyncGenerator[TaskResult, None]:
        """
        Execute the `TaskInstance` locally by the executor.

        Args:
            task_instance: Task instance to execute.

        Yields:
            Task result.
        """

        token = _curr_app.set(self)
        try:
            async for task_result in self._executor(task_instance):
                yield task_result
        finally:
            _curr_app.reset(token)

    async def consume_events(
        self,
        callback_id: str,
        callback: Callable[[Event], Any],
        event_types: list[str] | None = None,
    ):
        """
        Consume events and invoke `callback` on `Event` received.

        Args:
            callback_id: Callback Id. Needed for later use when stop consuming.
            callback: Callback to invoke then event received by backend.
            event_types: List of event types to consume.
        """

        await self._event_backend.consume_events(callback_id, callback, event_types=event_types)

    async def stop_consume_events(self, callback_id: str | None = None):
        """
        Stop consuming events.

        Args:
            callback_id: Callback Id for wich to stop consuming events.
        """

        await self._event_backend.stop_consume_events(callback_id=callback_id)

    def send_graph(self, *args, **kwds):
        """
        Send graph.
        """

        if "arrlio.graphs" not in self.plugins:
            raise GraphError(_("Plugin required: allrio.graphs"))

        return self.plugins["arrlio.graphs"].send_graph(*args, **kwds)
hooks property
hooks

Application hooks.

plugins property
plugins: dict[str, Plugin]

Application plugins.

broker property
broker: AbstractBroker

Application broker.

result_backend property
result_backend: AbstractResultBackend

Application result storage.

event_backend property
event_backend: AbstractEventBackend | None

Application event backend.

executor property
executor: Executor

Application executor.

context property
context: dict

Application current context.

is_closed property
is_closed: bool

Application close satatus.

init async
init()

Init application and plugins.

Source code in arrlio/core.py
async def init(self):
    """Init application and plugins."""

    if self.is_closed:
        return

    logger.info(_("%s initializing with config\n%s"), self, pretty_repr(self.config.model_dump()))

    await self._broker.init()
    await self._result_backend.init()
    await self._event_backend.init()

    await self._execute_hooks("on_init")

    logger.info(_("%s initialization done"), self)
close async
close()

Close application.

Source code in arrlio/core.py
async def close(self):
    """Close application."""

    if self.is_closed:
        return

    try:
        await self._execute_hooks("on_close")
        for hooks in self._hooks.values():
            hooks.clear()

        await gather(
            self.stop_consume_tasks(),
            self.stop_consume_events(),
            return_exceptions=True,
        )

        await self._broker.close()
        await self._result_backend.close()
        await self._event_backend.close()

        for task_id, aio_task in tuple(self._running_tasks.items()):
            logger.warning(_("%s cancel processing task '%s'"), self, task_id)
            aio_task.cancel()
            try:
                await aio_task
            except asyncio.CancelledError:
                pass
        self._running_tasks = {}

    finally:
        self._closed.set_result(None)
send_task async
send_task(
    task: Task | str,
    args: Args | None = None,
    kwds: Kwds | None = None,
    headers: dict | None = None,
    **kwargs: dict,
) -> AsyncResult

Send task.

Parameters:

  • task (Task | str) –

    Task or task name.

  • args (Args | None, default: None ) –

    Task positional arguments.

  • kwds (Kwds | None, default: None ) –

    Task keyword arguments.

  • headers (dict | None, default: None ) –

    Task headers argument.

  • kwargs (dict, default: {} ) –

    Other Task arguments.

Returns:

Source code in arrlio/core.py
async def send_task(
    self,
    task: Task | str,
    args: Args | None = None,
    kwds: Kwds | None = None,
    headers: dict | None = None,
    **kwargs: dict,
) -> "AsyncResult":
    """
    Send task.

    Args:
        task: `Task` or task name.
        args: Task positional arguments.
        kwds: Task keyword arguments.
        headers: `Task` headers argument.
        kwargs: Other `Task` arguments.

    Returns:
        Task `AsyncResult`.
    """

    name = task
    if isinstance(task, Task):
        name = task.name

    if headers is None:
        headers = {}

    headers["app_id"] = self.config.app_id

    if name in registered_tasks:
        task_instance = registered_tasks[name].instantiate(
            args=args,
            kwds=kwds,
            headers=headers,
            **{**self._task_settings, **kwargs},
        )
    else:
        task_instance = Task(None, name).instantiate(
            args=args,
            kwds=kwds,
            headers=headers,
            **{**self._task_settings, **kwargs},
        )

    task_instance.headers.update(self._result_backend.make_headers(task_instance))
    task_instance.shared.update(self._result_backend.make_shared(task_instance))

    if is_info_level():
        logger.info(
            _("%s send task\n%s"),
            self,
            task_instance.pretty_repr(sanitize=settings.LOG_SANITIZE),
        )

    await self._execute_hooks("on_task_send", task_instance)

    await self._broker.send_task(task_instance)

    await self._result_backend.allocate_storage(task_instance)

    return AsyncResult(self, task_instance)
send_event async
send_event(event: Event)

Send event.

Parameters:

  • event (Event) –

    Event to send.

Source code in arrlio/core.py
async def send_event(self, event: Event):
    """
    Send event.

    Args:
        event: Event to send.
    """
    if is_info_level():
        logger.info(_("%s send event\n%s"), self, event.pretty_repr(sanitize=settings.LOG_SANITIZE))

    await self._event_backend.send_event(event)
pop_result async
pop_result(
    task_instance: TaskInstance,
) -> AsyncGenerator[TaskResult, None]

Pop result for provided TaskInstance.

Parameters:

Yields:

  • AsyncGenerator[TaskResult, None]

    Task result.

Source code in arrlio/core.py
async def pop_result(self, task_instance: TaskInstance) -> AsyncGenerator[TaskResult, None]:
    """
    Pop result for provided `TaskInstance`.

    Args:
        task_instance: Task instance.

    Yields:
        Task result.
    """
    async for task_result in self._result_backend.pop_task_result(task_instance):
        if is_info_level():
            logger.info(
                _("%s got result[idx=%s, exc=%s] for task %s[%s]"),
                self,
                task_result.idx,
                task_result.exc is not None,
                task_instance.name,
                task_instance.task_id,
            )
        if task_result.exc:
            if isinstance(task_result.exc, TaskError):
                raise task_result.exc
            raise TaskError(task_instance.task_id, task_result.exc, task_result.trb)

        yield task_result.res
consume_tasks async
consume_tasks(queues: list[str] | None = None)

Consume tasks from the queues.

Parameters:

  • queues (list[str] | None, default: None ) –

    List of queue names to consume.

Source code in arrlio/core.py
async def consume_tasks(self, queues: list[str] | None = None):
    """
    Consume tasks from the queues.

    Args:
        queues: List of queue names to consume.
    """

    queues = queues or self.config.task_queues
    if not queues:
        return

    async def cb(task_instance: TaskInstance):
        task_id: UUID = task_instance.task_id
        self._running_tasks[task_id] = current_task()

        idx_0 = uuid4().hex
        idx_1 = 0

        self._context.set({})
        context = self.context

        try:
            task_result: TaskResult = TaskResult()

            async with AsyncExitStack() as stack:
                try:
                    context["task_instance"] = task_instance

                    for context_hook in self._hooks["task_context"]:
                        await stack.enter_async_context(context_hook(task_instance))

                    await self._execute_hooks("on_task_received", task_instance)

                    async for task_result in self.execute_task(task_instance):
                        task_result.set_idx((idx_0, idx_1 + 1))

                        if task_instance.result_return:
                            await self._result_backend.push_task_result(task_result, task_instance)

                        await self._execute_hooks("on_task_result", task_instance, task_result)
                        idx_1 += 1

                except (asyncio.CancelledError, Exception) as e:
                    if isinstance(e, asyncio.CancelledError):
                        logger.error(_("%s task %s[%s] cancelled"), self, task_instance.name, task_id)
                        task_result = TaskResult(exc=TaskCancelledError(task_id))
                        raise e
                    if isinstance(e, HooksError):
                        if len(e.exceptions) == 1:
                            e = e.exceptions[0]
                        else:
                            e = TaskError(exceptions=e.exceptions)
                        logger.error(_("%s task %s[%s] %s: %s"), self, task_instance.name, task_id, e.__class__, e)
                        task_result = TaskResult(exc=e)
                    else:
                        logger.exception(e)
                        task_result = TaskResult(exc=InternalError())
                    task_result.set_idx((idx_0, idx_1 + 1))
                    if task_instance.result_return:
                        await self._result_backend.push_task_result(task_result, task_instance)
                    idx_1 += 1
                finally:
                    try:
                        if task_instance.result_return and not task_instance.headers.get("graph:graph"):
                            func = task_instance.func
                            if isasyncgenfunction(func) or isgeneratorfunction(func):
                                await self._result_backend.close_task(task_instance, idx=(idx_0, idx_1 + 1))
                                idx_1 += 1
                    finally:
                        await self._execute_hooks("on_task_done", task_instance, task_result)

        except Exception as e:
            logger.exception(e)
        finally:
            self._running_tasks.pop(task_id, None)

    await self._broker.consume_tasks(queues, cb)
    logger.info(_("%s consuming task queues %s"), self, queues)
stop_consume_tasks async
stop_consume_tasks(queues: list[str] | None = None)

Stop consuming tasks.

Parameters:

  • queues (list[str] | None, default: None ) –

    List of queue names to stop consume.

Source code in arrlio/core.py
async def stop_consume_tasks(self, queues: list[str] | None = None):
    """
    Stop consuming tasks.

    Args:
        queues: List of queue names to stop consume.
    """

    await self._broker.stop_consume_tasks(queues=queues)
    if queues is not None:
        logger.info(_("%s stop consuming task queues %s"), self, queues)
    else:
        logger.info(_("%s stop consuming task queues"), self)
execute_task async
execute_task(
    task_instance: TaskInstance,
) -> AsyncGenerator[TaskResult, None]

Execute the TaskInstance locally by the executor.

Parameters:

  • task_instance (TaskInstance) –

    Task instance to execute.

Yields:

  • AsyncGenerator[TaskResult, None]

    Task result.

Source code in arrlio/core.py
async def execute_task(self, task_instance: TaskInstance) -> AsyncGenerator[TaskResult, None]:
    """
    Execute the `TaskInstance` locally by the executor.

    Args:
        task_instance: Task instance to execute.

    Yields:
        Task result.
    """

    token = _curr_app.set(self)
    try:
        async for task_result in self._executor(task_instance):
            yield task_result
    finally:
        _curr_app.reset(token)
consume_events async
consume_events(
    callback_id: str,
    callback: Callable[[Event], Any],
    event_types: list[str] | None = None,
)

Consume events and invoke callback on Event received.

Parameters:

  • callback_id (str) –

    Callback Id. Needed for later use when stop consuming.

  • callback (Callable[[Event], Any]) –

    Callback to invoke then event received by backend.

  • event_types (list[str] | None, default: None ) –

    List of event types to consume.

Source code in arrlio/core.py
async def consume_events(
    self,
    callback_id: str,
    callback: Callable[[Event], Any],
    event_types: list[str] | None = None,
):
    """
    Consume events and invoke `callback` on `Event` received.

    Args:
        callback_id: Callback Id. Needed for later use when stop consuming.
        callback: Callback to invoke then event received by backend.
        event_types: List of event types to consume.
    """

    await self._event_backend.consume_events(callback_id, callback, event_types=event_types)
stop_consume_events async
stop_consume_events(callback_id: str | None = None)

Stop consuming events.

Parameters:

  • callback_id (str | None, default: None ) –

    Callback Id for wich to stop consuming events.

Source code in arrlio/core.py
async def stop_consume_events(self, callback_id: str | None = None):
    """
    Stop consuming events.

    Args:
        callback_id: Callback Id for wich to stop consuming events.
    """

    await self._event_backend.stop_consume_events(callback_id=callback_id)
send_graph
send_graph(*args, **kwds)

Send graph.

Source code in arrlio/core.py
def send_graph(self, *args, **kwds):
    """
    Send graph.
    """

    if "arrlio.graphs" not in self.plugins:
        raise GraphError(_("Plugin required: allrio.graphs"))

    return self.plugins["arrlio.graphs"].send_graph(*args, **kwds)

AsyncResult

Source code in arrlio/core.py
class AsyncResult:
    __slots__ = ("_app", "_task_instance", "_gen", "_result", "_exception", "_ready")

    def __init__(self, app: App, task_instance: TaskInstance):
        self._app = app
        self._task_instance = task_instance
        self._gen = app.pop_result(task_instance)
        self._result = None
        self._exception = None
        self._ready = False

    @property
    def task_instance(self) -> TaskInstance:
        """Task instance."""

        return self._task_instance

    @property
    def task_id(self):
        """Task Id."""

        return self._task_instance.task_id

    @property
    def result(self):
        """Task last result."""

        return self._result

    @property
    def exception(self) -> Exception:
        """Task exception."""

        return self._exception

    @property
    def ready(self) -> bool:
        """Task ready status."""

        return self._ready

    def __aiter__(self):
        return self

    async def __anext__(self):
        if not self._ready:
            try:
                self._result = await self._gen.__anext__()
                return self._result
            except TaskError as e:
                self._ready = True
                self._exception = e
            except StopAsyncIteration as e:
                self._ready = True
                raise e

        if exception := self._exception:
            if isinstance(exception.args[0], Exception):
                raise exception from exception.args[0]
            raise exception

        raise StopAsyncIteration

    async def get(self) -> Any:
        """
        Get task result. Blocking until the task result available.

        Returns:
            Task result. For generator or asyncgenerator return the last available result.
        """

        noresult = not self._ready
        async for _ in self:
            noresult = False
        if noresult:
            raise TaskClosedError(self.task_id)
        return self._result
task_instance property
task_instance: TaskInstance

Task instance.

task_id property
task_id

Task Id.

result property
result

Task last result.

exception property
exception: Exception

Task exception.

ready property
ready: bool

Task ready status.

get async
get() -> Any

Get task result. Blocking until the task result available.

Returns:

  • Any

    Task result. For generator or asyncgenerator return the last available result.

Source code in arrlio/core.py
async def get(self) -> Any:
    """
    Get task result. Blocking until the task result available.

    Returns:
        Task result. For generator or asyncgenerator return the last available result.
    """

    noresult = not self._ready
    async for _ in self:
        noresult = False
    if noresult:
        raise TaskClosedError(self.task_id)
    return self._result

task

task(
    func: FunctionType | MethodType | Type | None = None,
    name: str | None = None,
    base: Type[Task] | None = None,
    **kwds,
)

Task decorator.

Parameters:

  • func (FunctionType | MethodType | Type | None, default: None ) –

    Task function.

  • name (str | None, default: None ) –

    Task name.

  • base (Type[Task] | None, default: None ) –

    Task base class.

  • kwds

    Task arguments.

Source code in arrlio/core.py
def task(
    func: FunctionType | MethodType | Type | None = None,
    name: str | None = None,
    base: Type[Task] | None = None,
    **kwds,
):
    """Task decorator.

    Args:
        func: Task function.
        name: Task name.
        base: Task base class.
        kwds: `Task` arguments.
    """

    if base is None:
        base = Task
    if func is not None:
        if not isinstance(func, (FunctionType, MethodType)):
            raise TypeError(_("Argument 'func' does not a function or method"))
        if name is None:
            name = f"{func.__module__}.{func.__name__}"
        if name in registered_tasks:
            raise ValueError(_("Task '{}' already registered").format(name))
        t = base(func=func, name=name, **kwds)
        registered_tasks.__original__[name] = t
        logger.debug(_("Register task '%s'"), t.name)
        return t

    def wrapper(func):
        return task(base=base, func=func, name=name, **kwds)

    return wrapper

arrlio.executor

Config

Bases: BaseSettings

Task executor config.

Source code in arrlio/executor.py
class Config(BaseSettings):
    """Task executor config."""

Executor

Task executor.

Parameters:

  • config (Config) –

    Executor config.

Source code in arrlio/executor.py
class Executor:
    """
    Task executor.

    Args:
        config: Executor config.
    """

    def __init__(self, config: Config):
        self.config = config

    def __str__(self):
        return self.__class__.__name__

    def __repr__(self):
        return self.__str__()

    async def __call__(self, task_instance: TaskInstance) -> AsyncGenerator[TaskResult, None]:
        """
        Execute `TaskInstance`. Blocking until the task result available.

        Args:
            task_instance: Task instance to execute.

        Yields:
            Task result.
        """

        if task_instance.thread:
            execute = self.execute_in_thread
        else:
            execute = self.execute
        async for task_result in execute(task_instance):
            yield task_result

    async def execute(self, task_instance: TaskInstance) -> AsyncGenerator[TaskResult, None]:
        """
        Execute `TaskInstance` in the same thread. Blocking until the task result available.

        Args:
            task_instance: Task instance to execute.

        Yields:
            Task result.
        """

        res, exc, trb = None, None, None
        t0 = monotonic()

        try:
            if (func := task_instance.func) is None:
                raise TaskNotFoundError(f"task with name '{task_instance.name}' not found")

            # task_instance.validate()

            meta: bool = (kwdefaults := func.__kwdefaults__) is not None and "meta" in kwdefaults

            if is_info_level():
                logger.info(
                    "%s[%s] execute task %s[%s]",
                    self,
                    current_thread().name,
                    task_instance.name,
                    task_instance.task_id,
                )

            try:
                if iscoroutinefunction(func):
                    res = await wait_for(task_instance(meta=meta), task_instance.timeout)
                    if isinstance(res, TaskResult):
                        yield res
                    else:
                        yield TaskResult(res=res, exc=exc, trb=trb)

                elif isgeneratorfunction(func):
                    for res in task_instance(meta=meta):
                        if isinstance(res, TaskResult):
                            yield res
                        else:
                            yield TaskResult(res=res, exc=exc, trb=trb)

                elif isasyncgenfunction(func):
                    __anext__ = task_instance(meta=meta).__anext__

                    timeout_time = (monotonic() + timeout) if (timeout := task_instance.timeout) is not None else None

                    while True:
                        timeout = (timeout_time - monotonic()) if timeout_time is not None else None

                        try:
                            res = await wait_for(__anext__(), timeout)
                            if isinstance(res, TaskResult):
                                yield res
                            else:
                                yield TaskResult(res=res, exc=exc, trb=trb)

                        except StopAsyncIteration:
                            break
                        except asyncio.TimeoutError:
                            raise TaskTimeoutError(task_instance.timeout)

                else:
                    res = task_instance(meta=meta)
                    if isinstance(res, TaskResult):
                        yield res
                    else:
                        yield TaskResult(res=res, exc=exc, trb=trb)

            except asyncio.TimeoutError:
                raise TaskTimeoutError(task_instance.timeout)

        except Exception as e:
            exc_info = sys.exc_info()
            exc, trb = exc_info[1], exc_info[2]
            if isinstance(e, TaskTimeoutError):
                logger.error(
                    "%s[%s] task %s[%s] timeout",
                    self,
                    current_thread().name,
                    task_instance.name,
                    task_instance.task_id,
                )
            else:
                logger.exception(
                    "%s[%s] task %s[%s]",
                    self,
                    current_thread().name,
                    task_instance.name,
                    task_instance.task_id,
                )
            yield TaskResult(res=res, exc=exc, trb=trb)

        if is_info_level():
            logger.info(
                "%s[%s] task %s[%s] done[%s] in %.2f second(s)",
                self,
                current_thread().name,
                task_instance.name,
                task_instance.task_id,
                "success" if exc is None else "error",
                monotonic() - t0,
            )

    async def execute_in_thread(self, task_instance: TaskInstance) -> AsyncGenerator[TaskResult, None]:
        """
        Execute `TaskInstance` in the separate thread. Blocking until the task result available.

        Args:
            task_instance: Task instance to execute.

        Yields:
            Task result.
        """

        root_loop = get_running_loop()
        done_ev = asyncio_Event()
        sync_ev = threading_Event()
        res_ev = asyncio_Event()
        task_result: TaskResult = None  # type: ignore

        def thread(root_loop, res_ev, sync_ev, done_ev):
            nonlocal task_result
            loop = new_event_loop()
            root_loop_call_soon_threadsafe = root_loop.call_soon_threadsafe
            run_until_complete = loop.run_until_complete
            try:
                set_event_loop(loop)
                __anext__ = self.execute(task_instance).__anext__
                while True:
                    try:
                        sync_ev.clear()
                        task_result = run_until_complete(__anext__())
                        root_loop_call_soon_threadsafe(res_ev.set)
                        sync_ev.wait()
                    except StopAsyncIteration:
                        break
                    except Exception as e:
                        logger.exception(e)
            finally:
                run_until_complete(loop.shutdown_asyncgens())
                loop.close()
                if not root_loop.is_closed():
                    root_loop_call_soon_threadsafe(done_ev.set)
                    root_loop_call_soon_threadsafe(res_ev.set)

        Thread(
            target=thread,
            args=(root_loop, res_ev, sync_ev, done_ev),
            # name=f"Task[{task_instance.task_id}]",
        ).start()

        while True:
            await res_ev.wait()
            if done_ev.is_set():
                break
            res_ev.clear()
            sync_ev.set()
            yield task_result
__call__ async
__call__(
    task_instance: TaskInstance,
) -> AsyncGenerator[TaskResult, None]

Execute TaskInstance. Blocking until the task result available.

Parameters:

  • task_instance (TaskInstance) –

    Task instance to execute.

Yields:

  • AsyncGenerator[TaskResult, None]

    Task result.

Source code in arrlio/executor.py
async def __call__(self, task_instance: TaskInstance) -> AsyncGenerator[TaskResult, None]:
    """
    Execute `TaskInstance`. Blocking until the task result available.

    Args:
        task_instance: Task instance to execute.

    Yields:
        Task result.
    """

    if task_instance.thread:
        execute = self.execute_in_thread
    else:
        execute = self.execute
    async for task_result in execute(task_instance):
        yield task_result
execute async
execute(
    task_instance: TaskInstance,
) -> AsyncGenerator[TaskResult, None]

Execute TaskInstance in the same thread. Blocking until the task result available.

Parameters:

  • task_instance (TaskInstance) –

    Task instance to execute.

Yields:

  • AsyncGenerator[TaskResult, None]

    Task result.

Source code in arrlio/executor.py
async def execute(self, task_instance: TaskInstance) -> AsyncGenerator[TaskResult, None]:
    """
    Execute `TaskInstance` in the same thread. Blocking until the task result available.

    Args:
        task_instance: Task instance to execute.

    Yields:
        Task result.
    """

    res, exc, trb = None, None, None
    t0 = monotonic()

    try:
        if (func := task_instance.func) is None:
            raise TaskNotFoundError(f"task with name '{task_instance.name}' not found")

        # task_instance.validate()

        meta: bool = (kwdefaults := func.__kwdefaults__) is not None and "meta" in kwdefaults

        if is_info_level():
            logger.info(
                "%s[%s] execute task %s[%s]",
                self,
                current_thread().name,
                task_instance.name,
                task_instance.task_id,
            )

        try:
            if iscoroutinefunction(func):
                res = await wait_for(task_instance(meta=meta), task_instance.timeout)
                if isinstance(res, TaskResult):
                    yield res
                else:
                    yield TaskResult(res=res, exc=exc, trb=trb)

            elif isgeneratorfunction(func):
                for res in task_instance(meta=meta):
                    if isinstance(res, TaskResult):
                        yield res
                    else:
                        yield TaskResult(res=res, exc=exc, trb=trb)

            elif isasyncgenfunction(func):
                __anext__ = task_instance(meta=meta).__anext__

                timeout_time = (monotonic() + timeout) if (timeout := task_instance.timeout) is not None else None

                while True:
                    timeout = (timeout_time - monotonic()) if timeout_time is not None else None

                    try:
                        res = await wait_for(__anext__(), timeout)
                        if isinstance(res, TaskResult):
                            yield res
                        else:
                            yield TaskResult(res=res, exc=exc, trb=trb)

                    except StopAsyncIteration:
                        break
                    except asyncio.TimeoutError:
                        raise TaskTimeoutError(task_instance.timeout)

            else:
                res = task_instance(meta=meta)
                if isinstance(res, TaskResult):
                    yield res
                else:
                    yield TaskResult(res=res, exc=exc, trb=trb)

        except asyncio.TimeoutError:
            raise TaskTimeoutError(task_instance.timeout)

    except Exception as e:
        exc_info = sys.exc_info()
        exc, trb = exc_info[1], exc_info[2]
        if isinstance(e, TaskTimeoutError):
            logger.error(
                "%s[%s] task %s[%s] timeout",
                self,
                current_thread().name,
                task_instance.name,
                task_instance.task_id,
            )
        else:
            logger.exception(
                "%s[%s] task %s[%s]",
                self,
                current_thread().name,
                task_instance.name,
                task_instance.task_id,
            )
        yield TaskResult(res=res, exc=exc, trb=trb)

    if is_info_level():
        logger.info(
            "%s[%s] task %s[%s] done[%s] in %.2f second(s)",
            self,
            current_thread().name,
            task_instance.name,
            task_instance.task_id,
            "success" if exc is None else "error",
            monotonic() - t0,
        )
execute_in_thread async
execute_in_thread(
    task_instance: TaskInstance,
) -> AsyncGenerator[TaskResult, None]

Execute TaskInstance in the separate thread. Blocking until the task result available.

Parameters:

  • task_instance (TaskInstance) –

    Task instance to execute.

Yields:

  • AsyncGenerator[TaskResult, None]

    Task result.

Source code in arrlio/executor.py
async def execute_in_thread(self, task_instance: TaskInstance) -> AsyncGenerator[TaskResult, None]:
    """
    Execute `TaskInstance` in the separate thread. Blocking until the task result available.

    Args:
        task_instance: Task instance to execute.

    Yields:
        Task result.
    """

    root_loop = get_running_loop()
    done_ev = asyncio_Event()
    sync_ev = threading_Event()
    res_ev = asyncio_Event()
    task_result: TaskResult = None  # type: ignore

    def thread(root_loop, res_ev, sync_ev, done_ev):
        nonlocal task_result
        loop = new_event_loop()
        root_loop_call_soon_threadsafe = root_loop.call_soon_threadsafe
        run_until_complete = loop.run_until_complete
        try:
            set_event_loop(loop)
            __anext__ = self.execute(task_instance).__anext__
            while True:
                try:
                    sync_ev.clear()
                    task_result = run_until_complete(__anext__())
                    root_loop_call_soon_threadsafe(res_ev.set)
                    sync_ev.wait()
                except StopAsyncIteration:
                    break
                except Exception as e:
                    logger.exception(e)
        finally:
            run_until_complete(loop.shutdown_asyncgens())
            loop.close()
            if not root_loop.is_closed():
                root_loop_call_soon_threadsafe(done_ev.set)
                root_loop_call_soon_threadsafe(res_ev.set)

    Thread(
        target=thread,
        args=(root_loop, res_ev, sync_ev, done_ev),
        # name=f"Task[{task_instance.task_id}]",
    ).start()

    while True:
        await res_ev.wait()
        if done_ev.is_set():
            break
        res_ev.clear()
        sync_ev.set()
        yield task_result

arrlio.backends.rabbitmq

URL module-attribute

URL = 'amqp://guest:guest@localhost'

RabbitMQ URL.

TIMEOUT module-attribute

TIMEOUT = 15

RabbitMQ network operation timeout in seconds.

PUSH_RETRY_TIMEOUTS module-attribute

PUSH_RETRY_TIMEOUTS = [5, 5, 5, 5]

Push operation retry timeouts(sequence) in seconds.

PULL_RETRY_TIMEOUT module-attribute

PULL_RETRY_TIMEOUT = 5

Pull operation retry timeout in seconds.

ReplyToMode

Bases: StrEnum

RabbitMQ reply_to mode.

Source code in arrlio/backends/rabbitmq.py
class ReplyToMode(StrEnum):
    """RabbitMQ reply_to mode."""

    DIRECT_REPLY_TO = "direct_reply_to"
    """
    See [spec](https://www.rabbitmq.com/docs/direct-reply-to).
    """

    COMMON_QUEUE = "common_queue"
    """Common(single) results queue per `ResultBackend` Id used for all task results."""
DIRECT_REPLY_TO class-attribute instance-attribute
DIRECT_REPLY_TO = 'direct_reply_to'

See spec.

COMMON_QUEUE class-attribute instance-attribute
COMMON_QUEUE = 'common_queue'

Common(single) results queue per ResultBackend Id used for all task results.

connection_factory

connection_factory(
    url: SecretAmqpDsn | list[SecretAmqpDsn],
) -> Connection

Connection factory.

Parameters:

  • url (SecretAmqpDsn | list[SecretAmqpDsn]) –

    RabbitMQ URL or list of URLs.

Returns:

  • Connection

    Connection instance.

Source code in arrlio/backends/rabbitmq.py
def connection_factory(url: SecretAmqpDsn | list[SecretAmqpDsn]) -> Connection:
    """Connection factory.

    Args:
        url: RabbitMQ URL or list of URLs.

    Returns:
        `Connection` instance.
    """

    if not isinstance(url, list):
        url = [url]
    return Connection([f"{u.get_secret_value()}" for u in url])

arrlio.backends.brokers.rabbitmq

EXCHANGE module-attribute

EXCHANGE = 'arrlio'

Broker exchange name.

EXCHANGE_DURABLE module-attribute

EXCHANGE_DURABLE = False

Broker exchange durable option.

QUEUE_TYPE module-attribute

QUEUE_TYPE = CLASSIC

Broker queue type.

QUEUE_DURABLE module-attribute

QUEUE_DURABLE = False

Broker queue durable option.

QUEUE_EXCLUSIVE module-attribute

QUEUE_EXCLUSIVE = False

Broker queue excusive option.

QUEUE_AUTO_DELETE module-attribute

QUEUE_AUTO_DELETE = True

Broker queue auto-delete option.

PREFETCH_COUNT module-attribute

PREFETCH_COUNT = 1

Tasks prefetch count.

Config

Bases: BaseSettings

RabbitMQ Broker config.

Attributes:

  • id (str) –

    Broker Id. Default:

    f"{uuid4().hex[-4:]}"
    

  • url (SecretAmqpDsn | list[SecretAmqpDsn]) –

    RabbitMQ URL. See amqp spec. Default.

  • timeout (Optional[Timeout]) –

    Network operation timeout in seconds. Default.

  • push_retry_timeouts (Optional[list[Timeout]]) –

    Push operation retry timeouts(sequence of seconds). Default.

  • pull_retry_timeouts (Optional[list[Timeout]]) –

    Pull operation retry timeout in seconds. Default.

  • serializer (SerializerConfig) –

    Config for Serializer.

  • exchange (str) –

    Exchange name. Default.

  • exchange_durable (bool) –

    Exchange durable option. Default.

  • queue_type (QueueType) –

    Tasks queue type. Default.

  • queue_durable (bool) –

    Queue durable option. Default.

  • queue_exclusive (bool) –

    Queue exclusive option. Default.

  • queue_auto_delete (bool) –

    Queue auto delete option. Default.

  • prefetch_count (PositiveInt) –

    Tasks prefetch count. Default.

Source code in arrlio/backends/brokers/rabbitmq.py
class Config(BaseSettings):
    """
    RabbitMQ `Broker` config.

    Attributes:
        id: `Broker` Id.
            Default:
            ```
            f"{uuid4().hex[-4:]}"
            ```
        url: RabbitMQ URL. See amqp [spec](https://www.rabbitmq.com/uri-spec.html).
            [Default][arrlio.backends.rabbitmq.URL].
        timeout: Network operation timeout in seconds.
            [Default][arrlio.backends.rabbitmq.TIMEOUT].
        push_retry_timeouts: Push operation retry timeouts(sequence of seconds).
            [Default][arrlio.backends.rabbitmq.PUSH_RETRY_TIMEOUTS].
        pull_retry_timeouts: Pull operation retry timeout in seconds.
            [Default][arrlio.backends.rabbitmq.PULL_RETRY_TIMEOUT].
        serializer: Config for Serializer.
        exchange: Exchange name.
            [Default][arrlio.backends.brokers.rabbitmq.EXCHANGE].
        exchange_durable: Exchange durable option.
            [Default][arrlio.backends.brokers.rabbitmq.EXCHANGE_DURABLE].
        queue_type: Tasks queue type.
            [Default][arrlio.backends.brokers.rabbitmq.QUEUE_TYPE].
        queue_durable: Queue durable option.
            [Default][arrlio.backends.brokers.rabbitmq.QUEUE_DURABLE].
        queue_exclusive: Queue exclusive option.
            [Default][arrlio.backends.brokers.rabbitmq.QUEUE_EXCLUSIVE].
        queue_auto_delete: Queue auto delete option.
            [Default][arrlio.backends.brokers.rabbitmq.QUEUE_AUTO_DELETE].
        prefetch_count: Tasks prefetch count.
            [Default][arrlio.backends.brokers.rabbitmq.PREFETCH_COUNT].
    """

    model_config = SettingsConfigDict(env_prefix=f"{ENV_PREFIX}RABBITMQ_BROKER_")

    id: str = Field(default_factory=lambda: f"{uuid4().hex[-4:]}")
    url: SecretAmqpDsn | list[SecretAmqpDsn] = Field(default_factory=lambda: URL)
    timeout: Optional[Timeout] = Field(default_factory=lambda: TIMEOUT)
    push_retry_timeouts: Optional[list[Timeout]] = Field(default_factory=lambda: PUSH_RETRY_TIMEOUTS)
    pull_retry_timeout: Optional[Timeout] = Field(default_factory=lambda: PULL_RETRY_TIMEOUT)
    serializer: SerializerConfig = Field(default_factory=SerializerConfig)
    exchange: str = Field(default_factory=lambda: EXCHANGE)
    exchange_durable: bool = Field(default_factory=lambda: EXCHANGE_DURABLE)
    queue_type: QueueType = Field(default_factory=lambda: QUEUE_TYPE)
    queue_durable: bool = Field(default_factory=lambda: QUEUE_DURABLE)
    queue_exclusive: bool = Field(default_factory=lambda: QUEUE_EXCLUSIVE)
    queue_auto_delete: bool = Field(default_factory=lambda: QUEUE_AUTO_DELETE)
    prefetch_count: PositiveInt = Field(default_factory=lambda: PREFETCH_COUNT)

Broker

Bases: Closable, AbstractBroker

RabbitMQ Broker.

Parameters:

  • config (Config) –

    Broker config.

Source code in arrlio/backends/brokers/rabbitmq.py
class Broker(Closable, AbstractBroker):
    """
    RabbitMQ `Broker`.

    Args:
        config: `Broker` config.
    """

    def __init__(self, config: Config):
        super().__init__()

        self.config = config

        self._internal_tasks_runner = AioTasksRunner()

        self.serializer = config.serializer.module.Serializer(config.serializer.config)

        self._conn: Connection = connection_factory(config.url)
        self._conn.set_callback("on_open", "on_conn_open_once", self._on_conn_open_once)

        self._default_exchange = Exchange(conn=self._conn)

        self._exchange = Exchange(
            config.exchange,
            conn=self._conn,
            durable=config.exchange_durable,
            auto_delete=not config.exchange_durable,
            timeout=config.timeout,
        )

        self._queues: dict[str, Queue] = {}

        self._send_task = retry(
            msg=f"{self} action send_task",
            retry_timeouts=config.push_retry_timeouts,
            exc_filter=exc_filter,
        )(self._send_task)

    def __str__(self):
        return f"Broker[rabbitmq#{self.config.id}][{self._conn}]"

    def __repr__(self):
        return self.__str__()

    async def init(self):
        await retry(
            msg=f"{self} init error",
            retry_timeouts=repeat(5),
            exc_filter=exc_filter,
        )(self._conn.open)()

    async def close(self):
        await self._exchange.close()
        for queue in self._queues.values():
            await queue.close()
        await super().close()

    async def _on_conn_open_once(self):
        await self._exchange.declare(restore=True, force=True)
        self._conn.remove_callback("on_open", "on_conn_open_once")

    async def _ensure_queue(self, name: str) -> Queue:
        if name not in self._queues:
            queue = Queue(
                name,
                conn=self._conn,
                type=self.config.queue_type,
                durable=self.config.queue_durable,
                exclusive=self.config.queue_exclusive,
                auto_delete=self.config.queue_auto_delete,
                prefetch_count=self.config.prefetch_count,
                max_priority=TASK_MAX_PRIORITY,
                timeout=self.config.timeout,
            )
            await queue.declare(restore=True)
            await queue.bind(self._exchange, name, timeout=self.config.timeout, restore=True)
            self._queues[name] = queue

        return self._queues[name]

    async def _on_task_message(
        self,
        callback,
        channel: aiormq.abc.AbstractChannel,
        message: aiormq.abc.DeliveredMessage,
    ):
        try:
            if is_debug_level():
                logger.debug(_("%s got raw message %s"), self, message.body if not settings.LOG_SANITIZE else "<hiden>")

            task_instance = self.serializer.loads_task_instance(
                message.body,
                message.header.properties.headers,
            )

            reply_to = message.header.properties.reply_to
            if reply_to is not None:
                task_instance.headers["rabbitmq:reply_to"] = reply_to

            if is_debug_level():
                logger.debug(
                    _("%s got task\n%s"),
                    self,
                    task_instance.pretty_repr(sanitize=settings.LOG_SANITIZE),
                )

            if not task_instance.ack_late:
                await channel.basic_ack(message.delivery.delivery_tag)

            await callback(task_instance)

            if task_instance.ack_late:
                await channel.basic_ack(message.delivery.delivery_tag)

        except Exception:
            logger.exception(message.header.properties)

    async def _send_task(self, task_instance: TaskInstance, **kwds):
        if is_debug_level():
            logger.debug(
                _("%s send task\n%s"),
                self,
                task_instance.pretty_repr(sanitize=settings.LOG_SANITIZE),
            )

        data, headers = self.serializer.dumps_task_instance(task_instance)
        task_headers = task_instance.headers

        reply_to = task_headers.get("rabbitmq:reply_to")

        # await self._ensure_queue(task_instance.queue)

        properties = {
            "delivery_mode": 2,
            "message_type": "arrlio:task",
            "headers": headers,
            "message_id": f"{task_instance.task_id}",
            "correlation_id": f"{task_instance.task_id}",
            "reply_to": reply_to,
            "timestamp": datetime.now(tz=timezone.utc),
            "priority": min(task_instance.priority, TASK_MAX_PRIORITY),
        }

        if self.serializer.content_type is not None:
            properties["content_type"] = self.serializer.content_type

        if task_instance.ttl is not None:
            properties["expiration"] = f"{int(task_instance.ttl * 1000)}"

        if task_headers.get("app_id"):
            properties["app_id"] = task_headers["app_id"]

        if reply_to == "amq.rabbitmq.reply-to":
            if not task_instance.shared.get("rabbitmq:conn"):
                raise ArrlioError("missing 'rabbitmq:conn' shared")
            channel = await task_instance.shared["rabbitmq:conn"].channel()
            await channel.basic_publish(
                data,
                exchange=self._exchange.name,
                routing_key=task_instance.queue,
                properties=BasicProperties(**properties),
                timeout=self._exchange.timeout,
            )
        else:
            await self._exchange.publish(
                data,
                routing_key=task_instance.queue,
                properties=properties,
            )

    async def send_task(self, task_instance: TaskInstance, **kwds):
        await self._internal_tasks_runner.create_task("send_task", lambda: self._send_task(task_instance, **kwds))

    async def consume_tasks(self, queues: list[str], callback: Callable[[TaskInstance | Exception], Coroutine]):
        for queue_name in queues:
            queue = await self._ensure_queue(queue_name)
            if not queue.consumer:
                logger.info(_("%s start consuming tasks queue '%s'"), self, queue.name)
                await queue.consume(
                    lambda *args, **kwds: self._internal_tasks_runner.create_task(
                        "on_task_message",
                        lambda: self._on_task_message(callback, *args, **kwds),
                    )
                    and None,
                    retry_timeout=self.config.pull_retry_timeout,
                )

    async def stop_consume_tasks(self, queues: list[str] | None = None):
        queues = queues if queues is not None else list(self._queues.keys())
        for queue_name in queues:
            if not (queue := self._queues.get(queue_name)):
                continue
            if queue.consumer:
                logger.info(_("%s stop consuming tasks queue '%s'"), self, queue_name)
                await queue.stop_consume()
            self._queues.pop(queue_name)

arrlio.backends.result_backends.rabbitmq

REPLY_TO_MODE module-attribute

REPLY_TO_MODE = COMMON_QUEUE

ResultBackend reply to mode.

EXCHANGE module-attribute

EXCHANGE = 'arrlio'

ResultBackend exchange name.

EXCHANGE_DURABLE module-attribute

EXCHANGE_DURABLE = False

ResultBackend exchange durable option.

QUEUE_PREFIX module-attribute

QUEUE_PREFIX = 'arrlio.'

ResultBackend queue prefix.

QUEUE_TYPE module-attribute

QUEUE_TYPE = CLASSIC

ResultBackend queue type.

QUEUE_DURABLE module-attribute

QUEUE_DURABLE = False

ResultBackend queue durable option.

QUEUE_EXCLUSIVE module-attribute

QUEUE_EXCLUSIVE = True

ResultBackend queue excusive option.

QUEUE_AUTO_DELETE module-attribute

QUEUE_AUTO_DELETE = False

ResultBackend queue auto-delete option.

PREFETCH_COUNT module-attribute

PREFETCH_COUNT = 10

Results prefetch count.

Config

Bases: BaseSettings

RabbitMQ ResultBackend config.

Attributes:

  • id (str) –

    ResultBackend Id.

  • url (SecretAmqpDsn | list[SecretAmqpDsn]) –

    RabbitMQ URL. See amqp spec. Default.

  • timeout (Optional[Timeout]) –

    Network operation timeout in seconds. Default.

  • push_retry_timeouts (Optional[list[Timeout]]) –

    Push operation retry timeouts(sequence of seconds). Default.

  • pull_retry_timeouts (Optional[list[Timeout]]) –

    Pull operation retry timeout in seconds. Default.

  • serializer (SerializerConfig) –

    Config for Serializer.

  • reply_to_mode (ReplyToMode) –

    Reply to mode. Default.

  • exchange (str) –

    Exchange name. Default.

    Only valid for ReplyToMode.COMMON_QUEUE.

  • exchange_durable (bool) –

    Exchange durable option. Default.

    Only valid for ReplyToMode.COMMON_QUEUE.

  • queue_prefix (str) –

    Results queue prefix. Default.

    Only valid for ReplyToMode.COMMON_QUEUE.

  • queue_type (QueueType) –

    Events queue type. Default.

    Only valid for ReplyToMode.COMMON_QUEUE.

  • queue_durable (bool) –

    Queue durable option. Default.

    Only valid for ReplyToMode.COMMON_QUEUE.

  • queue_exclusive (bool) –

    Queue exclusive option. Default.

    Only valid for ReplyToMode.COMMON_QUEUE.

  • queue_auto_delete (bool) –

    Queue auto delete option. Default.

    Only valid for ReplyToMode.COMMON_QUEUE.

  • prefetch_count (PositiveInt) –

    Results prefetch count. Default.

    Only valid for ReplyToMode.COMMON_QUEUE.

Source code in arrlio/backends/result_backends/rabbitmq.py
class Config(BaseSettings):
    """
    RabbitMQ `ResultBackend` config.

    Attributes:
        id: `ResultBackend` Id.
        url: RabbitMQ URL. See amqp [spec](https://www.rabbitmq.com/uri-spec.html).
            [Default][arrlio.backends.rabbitmq.URL].
        timeout: Network operation timeout in seconds.
            [Default][arrlio.backends.rabbitmq.TIMEOUT].
        push_retry_timeouts: Push operation retry timeouts(sequence of seconds).
            [Default][arrlio.backends.rabbitmq.PUSH_RETRY_TIMEOUTS].
        pull_retry_timeouts: Pull operation retry timeout in seconds.
            [Default][arrlio.backends.rabbitmq.PULL_RETRY_TIMEOUT].
        serializer: Config for Serializer.
        reply_to_mode: Reply to mode.
            [Default][arrlio.backends.result_backends.rabbitmq.REPLY_TO_MODE].
        exchange: Exchange name.
            [Default][arrlio.backends.result_backends.rabbitmq.EXCHANGE].
            !!! note "Only valid for `ReplyToMode.COMMON_QUEUE`."
        exchange_durable: Exchange durable option.
            [Default][arrlio.backends.result_backends.rabbitmq.EXCHANGE_DURABLE].
            !!! note "Only valid for `ReplyToMode.COMMON_QUEUE`."
        queue_prefix: Results queue prefix.
            [Default][arrlio.backends.result_backends.rabbitmq.QUEUE_PREFIX].
            !!! note "Only valid for `ReplyToMode.COMMON_QUEUE`."
        queue_type: Events queue type.
            [Default][arrlio.backends.result_backends.rabbitmq.QUEUE_TYPE].
            !!! note "Only valid for `ReplyToMode.COMMON_QUEUE`."
        queue_durable: Queue durable option.
            [Default][arrlio.backends.result_backends.rabbitmq.QUEUE_DURABLE].
            !!! note "Only valid for `ReplyToMode.COMMON_QUEUE`."
        queue_exclusive: Queue exclusive option.
            [Default][arrlio.backends.result_backends.rabbitmq.QUEUE_EXCLUSIVE].
            !!! note "Only valid for `ReplyToMode.COMMON_QUEUE`."
        queue_auto_delete: Queue auto delete option.
            [Default][arrlio.backends.result_backends.rabbitmq.QUEUE_AUTO_DELETE].
            !!! note "Only valid for `ReplyToMode.COMMON_QUEUE`."
        prefetch_count: Results prefetch count.
            [Default][arrlio.backends.result_backends.rabbitmq.PREFETCH_COUNT].
            !!! note "Only valid for `ReplyToMode.COMMON_QUEUE`."
    """

    model_config = SettingsConfigDict(env_prefix=f"{ENV_PREFIX}RABBITMQ_RESULT_BACKEND_")

    id: str = Field(default_factory=lambda: f"{uuid4().hex[-4:]}")
    url: SecretAmqpDsn | list[SecretAmqpDsn] = Field(default_factory=lambda: URL)
    timeout: Optional[Timeout] = Field(default_factory=lambda: TIMEOUT)
    push_retry_timeouts: Optional[list[Timeout]] = Field(default_factory=lambda: PUSH_RETRY_TIMEOUTS)
    pull_retry_timeout: Optional[Timeout] = Field(default_factory=lambda: PULL_RETRY_TIMEOUT)
    serializer: SerializerConfig = Field(default_factory=SerializerConfig)
    reply_to_mode: ReplyToMode = Field(default_factory=lambda: REPLY_TO_MODE)
    exchange: str = Field(default_factory=lambda: EXCHANGE)
    exchange_durable: bool = Field(default_factory=lambda: EXCHANGE_DURABLE)
    queue_prefix: str = Field(default_factory=lambda: QUEUE_PREFIX)
    queue_type: QueueType = Field(default_factory=lambda: QUEUE_TYPE)
    queue_durable: bool = Field(default_factory=lambda: QUEUE_DURABLE)
    queue_exclusive: bool = Field(default_factory=lambda: QUEUE_EXCLUSIVE)
    queue_auto_delete: bool = Field(default_factory=lambda: QUEUE_AUTO_DELETE)
    prefetch_count: PositiveInt = Field(default_factory=lambda: PREFETCH_COUNT)

ResultBackend

Bases: Closable, AbstractResultBackend

RabbitMQ ResultBackend.

Parameters:

  • config (Config) –

    result backend config.

Source code in arrlio/backends/result_backends/rabbitmq.py
class ResultBackend(Closable, AbstractResultBackend):
    """
    RabbitMQ `ResultBackend`.

    Args:
        config: result backend config.
    """

    def __init__(self, config: Config):

        super().__init__()

        self.config = config

        self._internal_tasks_runner = AioTasksRunner()

        self.serializer = config.serializer.module.Serializer(config.serializer.config)

        self._conn: Connection = connection_factory(config.url)
        self._conn.set_callback("on_open", "on_conn_open_once", self._on_conn_open_once)
        self._conn.set_callback("on_open", "on_conn_open", self._on_conn_open)

        self._direct_reply_to_consumer: tuple[
            aiormq.abc.AbstractChannel,
            aiormq.spec.Basic.ConsumeOk,
        ] = None  # type: ignore

        self._default_exchange = SimpleExchange(conn=self._conn)

        self._exchange = Exchange(
            config.exchange,
            conn=self._conn,
            durable=config.exchange_durable,
            auto_delete=not config.exchange_durable,
            timeout=config.timeout,
        )

        self._queue: Queue = Queue(
            f"{config.queue_prefix}results.{self.config.id}",
            conn=self._conn,
            type=config.queue_type,
            durable=config.queue_durable,
            exclusive=config.queue_exclusive,
            auto_delete=config.queue_auto_delete,
            prefetch_count=config.prefetch_count,
            timeout=config.timeout,
        )

        self._storage: dict[UUID, tuple[asyncio.Event, list[TaskResult]]] = {}

        self._push_task_result_ack_late = retry(
            msg=f"{self} action push_task_result",
            retry_timeouts=self.config.push_retry_timeouts,
            exc_filter=lambda e: isinstance(e, asyncio.TimeoutError),
        )(self._push_task_result)

        self._push_task_result = retry(
            msg=f"{self} action push_task_result",
            retry_timeouts=self.config.push_retry_timeouts,
            exc_filter=exc_filter,
        )(self._push_task_result)

    def __str__(self):
        return f"ResultBackend[rabbitmq#{self.config.id}][{self._conn}]"

    def __repr__(self):
        return self.__str__()

    async def init(self):
        await retry(
            msg=f"{self} init error",
            retry_timeouts=repeat(5),
            exc_filter=exc_filter,
        )(self._conn.open)()

    async def close(self):
        await super().close()
        await self._queue.close(delete=True)
        await self._conn.close()

    def _get_reply_to(self, task_instance: TaskInstance) -> str | None:
        reply_to = task_instance.headers.get("rabbitmq:reply_to")
        if reply_to is None:
            if self.config.reply_to_mode == ReplyToMode.COMMON_QUEUE:
                reply_to = self._queue.name
            elif self.config.reply_to_mode == ReplyToMode.DIRECT_REPLY_TO:
                reply_to = "amq.rabbitmq.reply-to"
        return reply_to

    def make_headers(self, task_instance: TaskInstance) -> dict:
        return {"rabbitmq:reply_to": self._get_reply_to(task_instance)}

    def make_shared(self, task_instance: TaskInstance) -> Shared:
        shared = Shared()
        if task_instance.headers.get("rabbitmq:reply_to") == "amq.rabbitmq.reply-to":
            shared["rabbitmq:conn"] = self._conn
        return shared

    async def _on_conn_open_once(self):
        await self._exchange.declare(restore=True, force=True)
        await self._queue.declare(restore=True, force=True)
        await self._queue.bind(self._exchange, self._queue.name, restore=True)

        logger.info(_("%s start consuming results queue %s"), self, self._queue)

        await self._queue.consume(
            lambda *args, **kwds: self._internal_tasks_runner.create_task(
                "on_result_message",
                lambda: self._on_result_message(*args, **kwds),
            )
            and None,
            retry_timeout=self.config.pull_retry_timeout,
        )

        self._conn.remove_callback("on_open", "on_conn_open_once")

    async def _on_conn_open(self):
        channel = await self._conn.channel()

        logger.info(
            _("%s channel[%s] start consuming results queue '%s'"),
            self,
            channel,
            "amq.rabbitmq.reply-to",
        )

        self._direct_reply_to_consumer = (
            channel,
            await channel.basic_consume(
                "amq.rabbitmq.reply-to",
                partial(self._on_result_message, channel, no_ack=True),
                no_ack=True,
                timeout=self.config.timeout,
            ),
        )

    def _allocate_storage(self, task_id: UUID) -> tuple:
        if task_id not in self._storage:
            self._storage[task_id] = (asyncio.Event(), [])
        return self._storage[task_id]

    async def allocate_storage(self, task_instance: TaskInstance):
        self._allocate_storage(task_instance.task_id)

    def _cleanup_storage(self, task_id: UUID):
        self._storage.pop(task_id, None)

    async def _on_result_message(
        self,
        channel: aiormq.abc.AbstractChannel,
        message: aiormq.abc.DeliveredMessage,
        no_ack: bool | None = None,
    ):
        try:
            properties: aiormq.spec.Basic.Properties = message.header.properties
            task_id: UUID = UUID(properties.message_id)

            task_result: TaskResult = self.serializer.loads_task_result(message.body, properties.headers)

            if not no_ack:
                await channel.basic_ack(message.delivery.delivery_tag)

            if is_debug_level():
                logger.debug(
                    _("%s channel[%s] got result for task %s\n%s"),
                    self,
                    channel,
                    task_id,
                    task_result.pretty_repr(sanitize=settings.LOG_SANITIZE),
                )

            ev, task_results = self._allocate_storage(task_id)

            task_results.append(task_result)
            ev.set()

            if expiration := properties.expiration:
                get_event_loop().call_later(
                    int(expiration) / 1000,
                    lambda *args: self._cleanup_storage(task_id),
                )

        except Exception as e:
            logger.exception(e)

    async def _get_result_routing(self, task_instance: TaskInstance) -> tuple[SimpleExchange | Exchange, str]:
        exchange_name = task_instance.headers.get("rabbitmq:reply_to.exchange", self._exchange.name)
        if exchange_name == self._exchange.name:
            exchange = self._exchange
        else:
            exchange = SimpleExchange(exchange_name, conn=self._conn)

        routing_key = task_instance.headers["rabbitmq:reply_to"]
        if routing_key.startswith("amq.rabbitmq.reply-to."):
            exchange = self._default_exchange

        return exchange, routing_key

    async def _push_task_result(
        self,
        task_result: TaskResult,
        task_instance: TaskInstance,
    ):
        exchange, routing_key = await self._get_result_routing(task_instance)

        if is_debug_level():
            logger.debug(
                _("%s push result for task %s[%s] into exchange '%s' with routing_key '%s'\n%s"),
                self,
                task_instance.name,
                task_instance.task_id,
                exchange.name,
                routing_key,
                task_result.pretty_repr(sanitize=settings.LOG_SANITIZE),
            )

        data, headers = self.serializer.dumps_task_result(task_result, task_instance=task_instance)

        properties = {
            "delivery_mode": 2,
            "message_type": "arrlio:result",
            "headers": headers,
            "message_id": f"{task_instance.task_id}",
            "correlation_id": f"{task_instance.task_id}",
            "timestamp": datetime.now(tz=timezone.utc),
        }

        if self.serializer.content_type is not None:
            properties["content_type"] = self.serializer.content_type

        if task_instance.result_ttl is not None:
            properties["expiration"] = f"{int(task_instance.result_ttl * 1000)}"

        # if task_instance.headers.get("app_id"):
        #     properties["app_id"] = task_instance.headers["app_id"]

        await exchange.publish(
            data,
            routing_key=routing_key,
            properties=properties,
            timeout=self.config.timeout,
        )

    async def push_task_result(self, task_result: TaskResult, task_instance: TaskInstance):
        if not task_instance.result_return:
            return

        if task_instance.ack_late:
            await self._internal_tasks_runner.create_task(
                "push_task_result",
                lambda: self._push_task_result_ack_late(task_result, task_instance),
            )
        else:
            await self._internal_tasks_runner.create_task(
                "push_task_result",
                lambda: self._push_task_result(task_result, task_instance),
            )

    async def _pop_task_results(self, task_instance: TaskInstance):
        task_id = task_instance.task_id

        async def fn():
            func = task_instance.func

            if task_instance.headers.get("arrlio:closable") or isasyncgenfunction(func) or isgeneratorfunction(func):
                while not self.is_closed:
                    if task_id not in self._storage:
                        raise TaskResultError(_("Result expired"))

                    ev, results = self._storage[task_id]
                    await ev.wait()
                    ev.clear()

                    while results:
                        task_result: TaskResult = results.pop(0)

                        if isinstance(task_result.exc, TaskClosedError):
                            yield task_result
                            return

                        if is_debug_level():
                            logger.debug(
                                _("%s pop result for task %s[%s]\n%s"),
                                self,
                                task_instance.name,
                                task_id,
                                task_result.pretty_repr(sanitize=settings.LOG_SANITIZE),
                            )

                        yield task_result

            else:
                ev, results = self._storage[task_id]
                await ev.wait()
                ev.clear()

                if is_debug_level():
                    logger.debug(
                        _("%s pop result for task %s[%s]\n%s"),
                        self,
                        task_instance.name,
                        task_id,
                        results[0].pretty_repr(sanitize=settings.LOG_SANITIZE),
                    )

                yield results.pop(0)

        __anext__ = fn().__anext__

        self._allocate_storage(task_id)

        idx_data: dict[str, int] = {}

        try:
            while not self.is_closed:
                task_result: TaskResult = await self._internal_tasks_runner.create_task("pop_task_result", __anext__)
                idx = task_result.idx
                if idx:
                    idx_0, idx_1 = idx
                    if idx_0 not in idx_data:
                        idx_data[idx_0] = idx_1 - 1
                    if idx_1 <= idx_data[idx_0]:
                        continue
                    idx_data[idx_0] += 1
                    if idx_1 > idx_data[idx_0]:
                        raise TaskResultError(
                            _("Unexpected result index, expect {}, got {}").format(idx_data[idx_0], idx_1)
                        )
                if not isinstance(task_result.exc, TaskClosedError):
                    yield task_result

        except StopAsyncIteration:
            return

        finally:
            self._cleanup_storage(task_id)

    async def pop_task_result(self, task_instance: TaskInstance) -> AsyncGenerator[TaskResult, None]:
        if not task_instance.result_return:
            raise TaskResultError(_("Try to pop result for task with result_return=False"))

        async for task_result in self._pop_task_results(task_instance):
            yield task_result

    async def close_task(self, task_instance: TaskInstance, idx: tuple[str, int] | None = None):
        if is_debug_level():
            logger.debug(_("%s close task %s[%s]"), self, task_instance.name, task_instance.task_id)

        if "rabbitmq:reply_to" not in task_instance.headers:
            task_instance.headers["rabbitmq:reply_to"] = self._get_reply_to(task_instance)

        await self.push_task_result(
            TaskResult(exc=TaskClosedError(task_instance.task_id), idx=idx),
            task_instance,
        )

arrlio.backends.event_backends.rabbitmq

EXCHANGE module-attribute

EXCHANGE = 'arrlio.events'

EventBackend exchange name.

EXCHANGE_DURABLE module-attribute

EXCHANGE_DURABLE = False

EventBackend exchange durable option.

QUEUE module-attribute

QUEUE = 'arrlio.events'

EventBackend queue name.

QUEUE_TYPE module-attribute

QUEUE_TYPE = CLASSIC

EventBackend queue type.

QUEUE_DURABLE module-attribute

QUEUE_DURABLE = False

EventBackend queue durable option.

QUEUE_EXCLUSIVE module-attribute

QUEUE_EXCLUSIVE = False

EventBackend queue excusive option.

QUEUE_AUTO_DELETE module-attribute

QUEUE_AUTO_DELETE = True

EventBackend queue auto-delete option.

PREFETCH_COUNT module-attribute

PREFETCH_COUNT = 10

Events prefetch count.

Config

Bases: BaseSettings

RabbitMQ EventBackend config.

Attributes:

  • id (str) –

    EventBackend Id.

  • url (SecretAmqpDsn | list[SecretAmqpDsn]) –

    RabbitMQ URL. See amqp spec. Default.

  • timeout (Optional[Timeout]) –

    Network operation timeout in seconds. Default.

  • push_retry_timeouts (Optional[list[Timeout]]) –

    Push operation retry timeouts(sequence of seconds). Default.

  • pull_retry_timeouts (Optional[list[Timeout]]) –

    Pull operation retry timeout in seconds. Default.

  • serializer (SerializerConfig) –

    Config for Serializer.

  • exchange (str) –

    Exchange name. Default.

  • exchange_durable (bool) –

    Exchange durable option. Default.

  • queue (str) –

    Events queue name. Default.

  • queue_type (QueueType) –

    Events queue type. Default.

  • queue_durable (bool) –

    Queue durable option. Default.

  • queue_exclusive (bool) –

    Queue exclusive option. Default.

  • queue_auto_delete (bool) –

    Queue auto delete option. Default.

  • prefetch_count (PositiveInt) –

    Events prefetch count. Default.

Source code in arrlio/backends/event_backends/rabbitmq.py
class Config(BaseSettings):
    """
    RabbitMQ `EventBackend` config.

    Attributes:
        id: `EventBackend` Id.
        url: RabbitMQ URL. See amqp [spec](https://www.rabbitmq.com/uri-spec.html).
            [Default][arrlio.backends.rabbitmq.URL].
        timeout: Network operation timeout in seconds.
            [Default][arrlio.backends.rabbitmq.TIMEOUT].
        push_retry_timeouts: Push operation retry timeouts(sequence of seconds).
            [Default][arrlio.backends.rabbitmq.PUSH_RETRY_TIMEOUTS].
        pull_retry_timeouts: Pull operation retry timeout in seconds.
            [Default][arrlio.backends.rabbitmq.PULL_RETRY_TIMEOUT].
        serializer: Config for Serializer.
        exchange: Exchange name.
            [Default][arrlio.backends.event_backends.rabbitmq.EXCHANGE].
        exchange_durable: Exchange durable option.
            [Default][arrlio.backends.event_backends.rabbitmq.EXCHANGE_DURABLE].
        queue: Events queue name.
            [Default][arrlio.backends.event_backends.rabbitmq.QUEUE].
        queue_type: Events queue type.
            [Default][arrlio.backends.event_backends.rabbitmq.QUEUE_TYPE].
        queue_durable: Queue durable option.
            [Default][arrlio.backends.event_backends.rabbitmq.QUEUE_DURABLE].
        queue_exclusive: Queue exclusive option.
            [Default][arrlio.backends.event_backends.rabbitmq.QUEUE_EXCLUSIVE].
        queue_auto_delete: Queue auto delete option.
            [Default][arrlio.backends.event_backends.rabbitmq.QUEUE_AUTO_DELETE].
        prefetch_count: Events prefetch count.
            [Default][arrlio.backends.event_backends.rabbitmq.PREFETCH_COUNT].
    """

    model_config = SettingsConfigDict(env_prefix=f"{ENV_PREFIX}RABBITMQ_EVENT_BACKEND_")

    id: str = Field(default_factory=lambda: f"{uuid4().hex[-4:]}")
    url: SecretAmqpDsn | list[SecretAmqpDsn] = Field(default_factory=lambda: URL)
    timeout: Optional[Timeout] = Field(default_factory=lambda: TIMEOUT)
    push_retry_timeouts: Optional[list[Timeout]] = Field(default_factory=lambda: PUSH_RETRY_TIMEOUTS)
    pull_retry_timeout: Optional[Timeout] = Field(default_factory=lambda: PULL_RETRY_TIMEOUT)
    serializer: SerializerConfig = Field(default_factory=SerializerConfig)
    exchange: str = Field(default_factory=lambda: EXCHANGE)
    exchange_durable: bool = Field(default_factory=lambda: EXCHANGE_DURABLE)
    queue: str = Field(default_factory=lambda: QUEUE)
    queue_type: QueueType = Field(default_factory=lambda: QUEUE_TYPE)
    queue_durable: bool = Field(default_factory=lambda: QUEUE_DURABLE)
    queue_exclusive: bool = Field(default_factory=lambda: QUEUE_EXCLUSIVE)
    queue_auto_delete: bool = Field(default_factory=lambda: QUEUE_AUTO_DELETE)
    prefetch_count: PositiveInt = Field(default_factory=lambda: PREFETCH_COUNT)

EventBackend

Bases: Closable, AbstractEventBackend

RabbitMQ EventBackend.

Parameters:

  • config (Config) –

    RabbitMQ EventBackend config.

Source code in arrlio/backends/event_backends/rabbitmq.py
class EventBackend(Closable, AbstractEventBackend):
    """
    RabbitMQ `EventBackend`.

    Args:
        config: RabbitMQ `EventBackend` config.
    """

    def __init__(self, config: Config):
        super().__init__()

        self.config = config

        self._internal_tasks_runner = AioTasksRunner()

        self.serializer = config.serializer.module.Serializer(config.serializer.config)

        self._conn: Connection = connection_factory(config.url)

        self._exchange: Exchange = Exchange(
            config.exchange,
            conn=self._conn,
            type="topic",
            durable=config.exchange_durable,
            auto_delete=not config.exchange_durable,
            timeout=config.timeout,
        )

        self._queue: Queue = Queue(
            config.queue,
            conn=self._conn,
            type=config.queue_type,
            durable=config.queue_durable,
            exclusive=config.queue_exclusive,
            auto_delete=self.config.queue_auto_delete,
            prefetch_count=config.prefetch_count,
            timeout=config.timeout,
        )
        self._callbacks: dict[str, tuple[Callable[[Event], Any], list[str], list]] = {}

        self._send_event = retry(
            msg=f"{self} action send_event",
            retry_timeouts=config.push_retry_timeouts,
            exc_filter=exc_filter,
        )(self._send_event)

    def __str__(self):
        return f"EventBackend[rabbitmq#{self.config.id}][{self._conn}]"

    def __repr__(self):
        return self.__str__()

    async def init(self):
        await retry(
            msg=f"{self} init error",
            retry_timeouts=repeat(5),
            exc_filter=exc_filter,
        )(self._conn.open)()

    async def close(self):
        await super().close()

    async def _send_event(self, event: Event):
        data, headers = self.serializer.dumps_event(event)
        await self._exchange.publish(
            data,
            routing_key=event.type,
            properties={
                "delivery_mode": 2,
                "timestamp": datetime.now(tz=timezone.utc),
                "expiration": f"{int(event.ttl * 1000)}" if event.ttl is not None else None,
                "headers": headers,
            },
        )

    async def send_event(self, event: Event):
        if is_debug_level():
            logger.debug(_("%s send event\n%s"), self, event.pretty_repr(sanitize=settings.LOG_SANITIZE))

        await self._internal_tasks_runner.create_task("send_event", lambda: self._send_event(event))

    async def consume_events(
        self,
        callback_id: str,
        callback: Callable[[Event], Any],
        event_types: list[str] | None = None,
    ):
        if callback_id in self._callbacks:
            raise ArrlioError(
                _("callback_id '{}' already in use for consuming '{}' event_types").format(
                    callback_id,
                    self._callbacks[callback_id][1],
                )
            )

        event_types = event_types or ["#"]

        self._callbacks[callback_id] = (
            callback,
            event_types,
            [re.compile(event_type_to_regex(event_type)) for event_type in event_types],
        )

        async def on_message(channel: aiormq.Channel, message: aiormq.abc.DeliveredMessage):
            try:
                event: Event = self.serializer.loads_event(
                    message.body,
                    message.header.properties.headers,
                )

                if is_debug_level():
                    logger.debug(_("%s got event\n%s"), self, event.pretty_repr(sanitize=settings.LOG_SANITIZE))

                await channel.basic_ack(message.delivery.delivery_tag)

                for callback, event_types, patterns in self._callbacks.values():
                    if event_types is not None and not any(pattern.match(event.type) for pattern in patterns):
                        continue
                    if iscoroutinefunction(callback):
                        self._internal_tasks_runner.create_task("callback", partial(callback, event))
                    else:
                        try:
                            callback(event)
                        except Exception as e:
                            logger.exception(e)

            except Exception as e:
                logger.exception(e)

        if not self._queue.consumer:
            await self._exchange.declare(restore=True, force=True)
            await self._queue.declare(restore=True, force=True)

        for event_type in event_types:
            await self._queue.bind(self._exchange, event_type, restore=True)

        # TODO
        logger.info(
            _("%s start consuming events[callback_id=%s, event_types=%s]"),
            self,
            callback_id,
            event_types,
        )

        await self._queue.consume(
            lambda *args, **kwds: create_task(on_message(*args, **kwds)) and None,
            retry_timeout=self.config.pull_retry_timeout,
        )

    async def stop_consume_events(self, callback_id: str | None = None):
        logger.info(_("%s stop consuming events[callback_id=%s]"), self, callback_id)

        if callback_id:
            if callback_id not in self._callbacks:
                return
            callback, event_types, regex = self._callbacks.pop(callback_id)
            for event_type in set(event_types) - {x for v in self._callbacks.values() for x in v[1]}:
                await self._queue.unbind(self._exchange, event_type)
            if not self._callbacks:
                await self._queue.stop_consume()
        else:
            for callback, event_types, regex in self._callbacks.values():
                for event_type in event_types:
                    await self._queue.unbind(self._exchange, event_type)
            self._callbacks = {}
            await self._queue.stop_consume()