Skip to content

Reference

Client

Bases: Client

Source code in src/nats_contrib/request_many/client.py
class Client(NATSClient):
    def __init__(
        self,
        max_wait: float = 0.5,
    ) -> None:
        super().__init__()
        self.max_wait = max_wait

    def request_many_iter(
        self,
        subject: str,
        payload: bytes | None = None,
        headers: dict[str, str] | None = None,
        reply_inbox: str | None = None,
        max_wait: float | None = None,
        max_count: int | None = None,
        max_interval: float | None = None,
        stop_on_sentinel: bool = False,
    ) -> AsyncContextManager[AsyncIterator[Msg]]:
        """Request many responses from the same subject.

        The iterator exits without raising an error when no responses are received.

        Responses are received until one of the following conditions is met:

        - max_wait seconds have passed.
        - max_count responses have been received.
        - max_interval seconds have passed between responses.
        - A sentinel message is received and stop_on_sentinel is True.

        When any of the condition is met, the async iterator yielded by the
        context manager raises StopAsyncIteration on the next iteration.

        The subscription is started when entering the async context manager and
        stopped when exiting.

        Args:
            subject: The subject to send the request to.
            payload: The payload to send with the request.
            headers: The headers to send with the request.
            reply_inbox: The inbox to receive the responses in. A new inbox is created if None.
            max_wait: The maximum amount of time to wait for responses. 1 second by default.
            max_count: The maximum number of responses to accept. No limit by default.
            max_interval: The maximum amount of time between responses. No limit by default.
            stop_on_sentinel: Whether to stop when a sentinel message is received. False by default.
        """
        inbox = reply_inbox or self.new_inbox()
        return RequestManyIterator(
            self,
            subject,
            payload=payload,
            headers=headers,
            inbox=inbox,
            max_wait=max_wait,
            max_count=max_count,
            max_interval=max_interval,
            stop_on_sentinel=stop_on_sentinel,
        )

    async def request_many(
        self,
        subject: str,
        payload: bytes | None = None,
        headers: dict[str, str] | None = None,
        reply_inbox: str | None = None,
        max_wait: float | None = None,
        max_count: int | None = None,
        max_interval: float | None = None,
        stop_on_sentinel: bool = False,
    ) -> list[Msg]:
        """Request many responses from the same subject.

        This function does not raise an error when no responses are received.

        Responses are received until one of the following conditions is met:

        - max_wait seconds have passed.
        - max_count responses have been received.
        - max_interval seconds have passed between responses.
        - A sentinel message is received and stop_on_sentinel is True.

        Subscription is always stopped when the function returns.

        Args:
            subject: The subject to send the request to.
            payload: The payload to send with the request.
            headers: The headers to send with the request.
            reply_inbox: The inbox to receive the responses in. A new inbox is created if None.
            max_wait: The maximum amount of time to wait for responses. 1 second by default.
            max_count: The maximum number of responses to accept. No limit by default.
            max_interval: The maximum amount of time between responses. No limit by default.
            stop_on_sentinel: Whether to stop when a sentinel message is received. False by default.
        """
        executor = RequestManyExecutor(self, max_wait)
        return await executor(
            subject,
            reply_inbox=reply_inbox,
            payload=payload,
            headers=headers,
            max_wait=max_wait,
            max_count=max_count,
            max_interval=max_interval,
            stop_on_sentinel=stop_on_sentinel,
        )

request_many(subject, payload=None, headers=None, reply_inbox=None, max_wait=None, max_count=None, max_interval=None, stop_on_sentinel=False) async

Request many responses from the same subject.

This function does not raise an error when no responses are received.

Responses are received until one of the following conditions is met:

  • max_wait seconds have passed.
  • max_count responses have been received.
  • max_interval seconds have passed between responses.
  • A sentinel message is received and stop_on_sentinel is True.

Subscription is always stopped when the function returns.

Parameters:

Name Type Description Default
subject str

The subject to send the request to.

required
payload bytes | None

The payload to send with the request.

None
headers dict[str, str] | None

The headers to send with the request.

None
reply_inbox str | None

The inbox to receive the responses in. A new inbox is created if None.

None
max_wait float | None

The maximum amount of time to wait for responses. 1 second by default.

None
max_count int | None

The maximum number of responses to accept. No limit by default.

None
max_interval float | None

The maximum amount of time between responses. No limit by default.

None
stop_on_sentinel bool

Whether to stop when a sentinel message is received. False by default.

False
Source code in src/nats_contrib/request_many/client.py
async def request_many(
    self,
    subject: str,
    payload: bytes | None = None,
    headers: dict[str, str] | None = None,
    reply_inbox: str | None = None,
    max_wait: float | None = None,
    max_count: int | None = None,
    max_interval: float | None = None,
    stop_on_sentinel: bool = False,
) -> list[Msg]:
    """Request many responses from the same subject.

    This function does not raise an error when no responses are received.

    Responses are received until one of the following conditions is met:

    - max_wait seconds have passed.
    - max_count responses have been received.
    - max_interval seconds have passed between responses.
    - A sentinel message is received and stop_on_sentinel is True.

    Subscription is always stopped when the function returns.

    Args:
        subject: The subject to send the request to.
        payload: The payload to send with the request.
        headers: The headers to send with the request.
        reply_inbox: The inbox to receive the responses in. A new inbox is created if None.
        max_wait: The maximum amount of time to wait for responses. 1 second by default.
        max_count: The maximum number of responses to accept. No limit by default.
        max_interval: The maximum amount of time between responses. No limit by default.
        stop_on_sentinel: Whether to stop when a sentinel message is received. False by default.
    """
    executor = RequestManyExecutor(self, max_wait)
    return await executor(
        subject,
        reply_inbox=reply_inbox,
        payload=payload,
        headers=headers,
        max_wait=max_wait,
        max_count=max_count,
        max_interval=max_interval,
        stop_on_sentinel=stop_on_sentinel,
    )

request_many_iter(subject, payload=None, headers=None, reply_inbox=None, max_wait=None, max_count=None, max_interval=None, stop_on_sentinel=False)

Request many responses from the same subject.

The iterator exits without raising an error when no responses are received.

Responses are received until one of the following conditions is met:

  • max_wait seconds have passed.
  • max_count responses have been received.
  • max_interval seconds have passed between responses.
  • A sentinel message is received and stop_on_sentinel is True.

When any of the condition is met, the async iterator yielded by the context manager raises StopAsyncIteration on the next iteration.

The subscription is started when entering the async context manager and stopped when exiting.

Parameters:

Name Type Description Default
subject str

The subject to send the request to.

required
payload bytes | None

The payload to send with the request.

None
headers dict[str, str] | None

The headers to send with the request.

None
reply_inbox str | None

The inbox to receive the responses in. A new inbox is created if None.

None
max_wait float | None

The maximum amount of time to wait for responses. 1 second by default.

None
max_count int | None

The maximum number of responses to accept. No limit by default.

None
max_interval float | None

The maximum amount of time between responses. No limit by default.

None
stop_on_sentinel bool

Whether to stop when a sentinel message is received. False by default.

False
Source code in src/nats_contrib/request_many/client.py
def request_many_iter(
    self,
    subject: str,
    payload: bytes | None = None,
    headers: dict[str, str] | None = None,
    reply_inbox: str | None = None,
    max_wait: float | None = None,
    max_count: int | None = None,
    max_interval: float | None = None,
    stop_on_sentinel: bool = False,
) -> AsyncContextManager[AsyncIterator[Msg]]:
    """Request many responses from the same subject.

    The iterator exits without raising an error when no responses are received.

    Responses are received until one of the following conditions is met:

    - max_wait seconds have passed.
    - max_count responses have been received.
    - max_interval seconds have passed between responses.
    - A sentinel message is received and stop_on_sentinel is True.

    When any of the condition is met, the async iterator yielded by the
    context manager raises StopAsyncIteration on the next iteration.

    The subscription is started when entering the async context manager and
    stopped when exiting.

    Args:
        subject: The subject to send the request to.
        payload: The payload to send with the request.
        headers: The headers to send with the request.
        reply_inbox: The inbox to receive the responses in. A new inbox is created if None.
        max_wait: The maximum amount of time to wait for responses. 1 second by default.
        max_count: The maximum number of responses to accept. No limit by default.
        max_interval: The maximum amount of time between responses. No limit by default.
        stop_on_sentinel: Whether to stop when a sentinel message is received. False by default.
    """
    inbox = reply_inbox or self.new_inbox()
    return RequestManyIterator(
        self,
        subject,
        payload=payload,
        headers=headers,
        inbox=inbox,
        max_wait=max_wait,
        max_count=max_count,
        max_interval=max_interval,
        stop_on_sentinel=stop_on_sentinel,
    )

RequestManyExecutor

Source code in src/nats_contrib/request_many/executor.py
class RequestManyExecutor:
    def __init__(
        self,
        nc: Client,
        max_wait: float | None = None,
    ) -> None:
        self.nc = nc
        self.max_wait = max_wait or 0.5

    async def __call__(
        self,
        subject: str,
        reply_inbox: str | None = None,
        payload: bytes | None = None,
        headers: dict[str, str] | None = None,
        max_wait: float | None = None,
        max_count: int | None = None,
        max_interval: float | None = None,
        stop_on_sentinel: bool = False,
    ) -> list[Msg]:
        """Request many responses from the same subject.

        This function does not raise an error when no responses are received.

        Responses are received until one of the following conditions is met:

        - max_wait seconds have passed.
        - max_count responses have been received.
        - max_interval seconds have passed between responses.
        - A sentinel message is received and stop_on_sentinel is True.

        Args:
            subject: The subject to send the request to.
            payload: The payload to send with the request.
            headers: The headers to send with the request.
            reply_inbox: The inbox to receive the responses in. A new inbox is created if None.
            max_wait: The maximum amount of time to wait for responses. Default max wait can be configured at the instance level.
            max_count: The maximum number of responses to accept. No limit by default.
            max_interval: The maximum amount of time between responses. No limit by default.
            stop_on_sentinel: Whether to stop when a sentinel message is received. False by default.
        """
        if max_wait is None and max_interval is None:
            max_wait = self.max_wait
        # Create an inbox for the responses if one wasn't provided.
        if reply_inbox is None:
            reply_inbox = self.nc.new_inbox()
        # Create an empty list to store the responses.
        responses: list[Msg] = []
        # Get the event loop
        loop = asyncio.get_event_loop()
        # Create an event to signal when the request is complete.
        event = asyncio.Event()
        # Create a marker to indicate that a message was received
        # and the interval has passed.
        last_received = loop.time()

        # Define a callback to handle the responses.
        async def callback(msg: Msg) -> None:
            # Update the last received time.
            nonlocal last_received
            last_received = loop.time()
            # Check message headers
            # If the message is a 503 error, set the event and return.
            if msg.headers and msg.headers.get("Status") == "503":
                event.set()
                return
            # If we're stopping on a sentinel message, check for it
            # and don't append the message to the list of responses.
            if stop_on_sentinel and msg.data == b"":
                event.set()
                return
            # In all other cases, append the message to the list of responses.
            responses.append(msg)
            # And check if we've received all the responses.
            if len(responses) == max_count:
                event.set()

        # Subscribe to the inbox.
        sub = await self.nc.subscribe(  # pyright: ignore[reportUnknownMemberType]
            reply_inbox,
            cb=callback,
            max_msgs=max_count or 0,
        )
        # Initialize a list of tasks to wait for.
        tasks: list[asyncio.Task[object]] = []
        # Enter try/finally clause to ensure that the subscription is
        # unsubscribed from even if an error occurs.
        try:
            # Create task to wait for the stop event.
            tasks.append(asyncio.create_task(event.wait()))

            # Add a task to wait for the max_wait time if needed
            if max_wait:
                tasks.append(asyncio.create_task(asyncio.sleep(max_wait)))

            # Add a task to check the interval if needed
            if max_interval:

                async def check_interval() -> None:
                    nonlocal last_received
                    while True:
                        await asyncio.sleep(max_interval)
                        if loop.time() - last_received > max_interval:
                            event.set()
                            return

                tasks.append(asyncio.create_task(check_interval()))

            # At this point the subscription is ready and all tasks are submitted
            # Publish the request.
            await self.nc.publish(
                subject, payload or b"", reply=reply_inbox, headers=headers
            )
            # Wait for the first task to complete.
            await asyncio.wait(
                tasks,
                return_when=asyncio.FIRST_COMPLETED,
            )
        # Always cancel tasks and unsubscribe from the inbox.
        finally:
            # Cancel the remaining tasks as soon as first one completes.
            for task in tasks:
                if not task.done():
                    task.cancel()
            # Unsubscribe from the inbox.
            try:
                await sub.unsubscribe()
            except BadSubscriptionError:
                # It's possible that auto-unsubscribe has already been called.
                pass

        # Return the list of responses.
        return responses

__call__(subject, reply_inbox=None, payload=None, headers=None, max_wait=None, max_count=None, max_interval=None, stop_on_sentinel=False) async

Request many responses from the same subject.

This function does not raise an error when no responses are received.

Responses are received until one of the following conditions is met:

  • max_wait seconds have passed.
  • max_count responses have been received.
  • max_interval seconds have passed between responses.
  • A sentinel message is received and stop_on_sentinel is True.

Parameters:

Name Type Description Default
subject str

The subject to send the request to.

required
payload bytes | None

The payload to send with the request.

None
headers dict[str, str] | None

The headers to send with the request.

None
reply_inbox str | None

The inbox to receive the responses in. A new inbox is created if None.

None
max_wait float | None

The maximum amount of time to wait for responses. Default max wait can be configured at the instance level.

None
max_count int | None

The maximum number of responses to accept. No limit by default.

None
max_interval float | None

The maximum amount of time between responses. No limit by default.

None
stop_on_sentinel bool

Whether to stop when a sentinel message is received. False by default.

False
Source code in src/nats_contrib/request_many/executor.py
async def __call__(
    self,
    subject: str,
    reply_inbox: str | None = None,
    payload: bytes | None = None,
    headers: dict[str, str] | None = None,
    max_wait: float | None = None,
    max_count: int | None = None,
    max_interval: float | None = None,
    stop_on_sentinel: bool = False,
) -> list[Msg]:
    """Request many responses from the same subject.

    This function does not raise an error when no responses are received.

    Responses are received until one of the following conditions is met:

    - max_wait seconds have passed.
    - max_count responses have been received.
    - max_interval seconds have passed between responses.
    - A sentinel message is received and stop_on_sentinel is True.

    Args:
        subject: The subject to send the request to.
        payload: The payload to send with the request.
        headers: The headers to send with the request.
        reply_inbox: The inbox to receive the responses in. A new inbox is created if None.
        max_wait: The maximum amount of time to wait for responses. Default max wait can be configured at the instance level.
        max_count: The maximum number of responses to accept. No limit by default.
        max_interval: The maximum amount of time between responses. No limit by default.
        stop_on_sentinel: Whether to stop when a sentinel message is received. False by default.
    """
    if max_wait is None and max_interval is None:
        max_wait = self.max_wait
    # Create an inbox for the responses if one wasn't provided.
    if reply_inbox is None:
        reply_inbox = self.nc.new_inbox()
    # Create an empty list to store the responses.
    responses: list[Msg] = []
    # Get the event loop
    loop = asyncio.get_event_loop()
    # Create an event to signal when the request is complete.
    event = asyncio.Event()
    # Create a marker to indicate that a message was received
    # and the interval has passed.
    last_received = loop.time()

    # Define a callback to handle the responses.
    async def callback(msg: Msg) -> None:
        # Update the last received time.
        nonlocal last_received
        last_received = loop.time()
        # Check message headers
        # If the message is a 503 error, set the event and return.
        if msg.headers and msg.headers.get("Status") == "503":
            event.set()
            return
        # If we're stopping on a sentinel message, check for it
        # and don't append the message to the list of responses.
        if stop_on_sentinel and msg.data == b"":
            event.set()
            return
        # In all other cases, append the message to the list of responses.
        responses.append(msg)
        # And check if we've received all the responses.
        if len(responses) == max_count:
            event.set()

    # Subscribe to the inbox.
    sub = await self.nc.subscribe(  # pyright: ignore[reportUnknownMemberType]
        reply_inbox,
        cb=callback,
        max_msgs=max_count or 0,
    )
    # Initialize a list of tasks to wait for.
    tasks: list[asyncio.Task[object]] = []
    # Enter try/finally clause to ensure that the subscription is
    # unsubscribed from even if an error occurs.
    try:
        # Create task to wait for the stop event.
        tasks.append(asyncio.create_task(event.wait()))

        # Add a task to wait for the max_wait time if needed
        if max_wait:
            tasks.append(asyncio.create_task(asyncio.sleep(max_wait)))

        # Add a task to check the interval if needed
        if max_interval:

            async def check_interval() -> None:
                nonlocal last_received
                while True:
                    await asyncio.sleep(max_interval)
                    if loop.time() - last_received > max_interval:
                        event.set()
                        return

            tasks.append(asyncio.create_task(check_interval()))

        # At this point the subscription is ready and all tasks are submitted
        # Publish the request.
        await self.nc.publish(
            subject, payload or b"", reply=reply_inbox, headers=headers
        )
        # Wait for the first task to complete.
        await asyncio.wait(
            tasks,
            return_when=asyncio.FIRST_COMPLETED,
        )
    # Always cancel tasks and unsubscribe from the inbox.
    finally:
        # Cancel the remaining tasks as soon as first one completes.
        for task in tasks:
            if not task.done():
                task.cancel()
        # Unsubscribe from the inbox.
        try:
            await sub.unsubscribe()
        except BadSubscriptionError:
            # It's possible that auto-unsubscribe has already been called.
            pass

    # Return the list of responses.
    return responses

RequestManyIterator

Source code in src/nats_contrib/request_many/iterator.py
class RequestManyIterator:

    def __init__(
        self,
        nc: Client,
        subject: str,
        inbox: str,
        payload: bytes | None = None,
        headers: dict[str, str] | None = None,
        max_wait: float | None = None,
        max_interval: float | None = None,
        max_count: int | None = None,
        stop_on_sentinel: bool = False,
    ) -> None:
        """Request many responses from the same subject.

        Request is sent when entering the async context manager and unsubscribed when exiting.

        The async iterator yieled by the context manager do not raise an
        error when no responses are received.

        Responses are received until one of the following conditions is met:

        - max_wait seconds have passed.
        - max_count responses have been received.
        - max_interval seconds have passed between responses.
        - A sentinel message is received and stop_on_sentinel is True.

        When any of the condition is met, the async iterator raises StopAsyncIteration on
        the next call to __anext__, and the subscription is unsubscribed on exit.

        Args:
            subject: The subject to send the request to.
            payload: The payload to send with the request.
            headers: The headers to send with the request.
            inbox: The inbox to receive the responses in. A new inbox is created if None.
            max_wait: The maximum amount of time to wait for responses. Default max wait can be configured at the instance level.
            max_count: The maximum number of responses to accept. No limit by default.
            max_interval: The maximum amount of time between responses. No limit by default.
            stop_on_sentinel: Whether to stop when a sentinel message is received. False by default.
        """
        if max_wait is None and max_interval is None:
            max_wait = 0.5
        # Save all the arguments as instance variables.
        self.nc = nc
        self.subject = subject
        self.payload = payload
        self.headers = headers
        self.inbox = inbox
        self.max_wait = max_wait
        self.max_count = max_count
        self.max_interval = max_interval
        self.stop_on_sentinel = stop_on_sentinel
        # Initialize the state of the request many iterator
        self._sub: Subscription | None = None
        self._did_unsubscribe = False
        self._total_received = 0
        self._last_received = asyncio.get_event_loop().time()
        self._tasks: list[asyncio.Task[object]] = []
        self._pending_task: asyncio.Task[Msg] | None = None

    def __aiter__(self) -> RequestManyIterator:
        """RequestManyIterator is an asynchronous iterator."""
        return self

    async def __anext__(self) -> Msg:
        """Return the next message or raise StopAsyncIteration."""
        if not self._sub:
            raise RuntimeError(
                "RequestManyIterator must be used as an async context manager"
            )
        # Exit early if we've already unsubscribed
        if self._did_unsubscribe:
            raise StopAsyncIteration
        # Exit early if we received all the messages
        if self.max_count and self._total_received == self.max_count:
            await self.cleanup()
            raise StopAsyncIteration
        # Create a task to wait for the next message
        task: asyncio.Task[Msg] = asyncio.create_task(_fetch(self._sub))
        self._pending_task = task
        # Wait for the next message or any of the other tasks to complete
        await asyncio.wait(
            [self._pending_task, *self._tasks],
            return_when=asyncio.FIRST_COMPLETED,
        )
        # If the pending task is cancelled or not done, raise StopAsyncIteration
        if self._pending_task.cancelled() or not self._pending_task.done():
            await self.cleanup()
            raise StopAsyncIteration
        # This will raise an exception if an error occurred within the task
        msg = self._pending_task.result()
        # Check message headers
        # If the message is a 503 error, raise StopAsyncIteration
        if msg.headers and msg.headers.get("Status") == "503":
            await self.cleanup()
            raise StopAsyncIteration
        # Always increment the total received count
        self._total_received += 1
        # Check if this is a sentinel message, and if so, raise StopAsyncIteration
        if self.stop_on_sentinel and msg.data == b"":
            await self.cleanup()
            raise StopAsyncIteration
        # Return the message
        return msg

    async def __aenter__(self) -> RequestManyIterator:
        """Start the subscription and publish the request."""
        # Start the subscription
        sub = await self.nc.subscribe(  # pyright: ignore[reportUnknownMemberType]
            self.inbox,
            max_msgs=self.max_count or 0,
        )
        # Save the subscription and the iterator
        self._sub = sub
        # Add a task to wait for the max_wait time if needed
        if self.max_wait:
            self._tasks.append(asyncio.create_task(asyncio.sleep(self.max_wait)))
        # Add a task to check the interval if needed
        if self.max_interval:
            interval = self.max_interval

            async def check_interval() -> None:
                while True:
                    await asyncio.sleep(interval)
                    if asyncio.get_event_loop().time() - self._last_received > interval:
                        await self.cleanup()
                        return

            self._tasks.append(asyncio.create_task(check_interval()))

        # Publish the request
        await self.nc.publish(
            self.subject, self.payload or b"", reply=self.inbox, headers=self.headers
        )
        # At this point the subscription is ready and all tasks are submitted
        return self

    async def __aexit__(self, *args: Any, **kwargs: Any) -> None:
        """Unsubscribe from the inbox and cancel all the tasks."""
        await self.cleanup()

    async def cleanup(self) -> None:
        """Unsubscribe from the inbox and cancel all the tasks."""
        if self._did_unsubscribe:
            return
        self._did_unsubscribe = True
        for task in self._tasks:
            if not task.done():
                task.cancel()
        if self._pending_task and not self._pending_task.done():
            self._pending_task.cancel()
        if self._sub:
            await _unsubscribe(self._sub)

__aenter__() async

Start the subscription and publish the request.

Source code in src/nats_contrib/request_many/iterator.py
async def __aenter__(self) -> RequestManyIterator:
    """Start the subscription and publish the request."""
    # Start the subscription
    sub = await self.nc.subscribe(  # pyright: ignore[reportUnknownMemberType]
        self.inbox,
        max_msgs=self.max_count or 0,
    )
    # Save the subscription and the iterator
    self._sub = sub
    # Add a task to wait for the max_wait time if needed
    if self.max_wait:
        self._tasks.append(asyncio.create_task(asyncio.sleep(self.max_wait)))
    # Add a task to check the interval if needed
    if self.max_interval:
        interval = self.max_interval

        async def check_interval() -> None:
            while True:
                await asyncio.sleep(interval)
                if asyncio.get_event_loop().time() - self._last_received > interval:
                    await self.cleanup()
                    return

        self._tasks.append(asyncio.create_task(check_interval()))

    # Publish the request
    await self.nc.publish(
        self.subject, self.payload or b"", reply=self.inbox, headers=self.headers
    )
    # At this point the subscription is ready and all tasks are submitted
    return self

__aexit__(*args, **kwargs) async

Unsubscribe from the inbox and cancel all the tasks.

Source code in src/nats_contrib/request_many/iterator.py
async def __aexit__(self, *args: Any, **kwargs: Any) -> None:
    """Unsubscribe from the inbox and cancel all the tasks."""
    await self.cleanup()

__aiter__()

RequestManyIterator is an asynchronous iterator.

Source code in src/nats_contrib/request_many/iterator.py
def __aiter__(self) -> RequestManyIterator:
    """RequestManyIterator is an asynchronous iterator."""
    return self

__anext__() async

Return the next message or raise StopAsyncIteration.

Source code in src/nats_contrib/request_many/iterator.py
async def __anext__(self) -> Msg:
    """Return the next message or raise StopAsyncIteration."""
    if not self._sub:
        raise RuntimeError(
            "RequestManyIterator must be used as an async context manager"
        )
    # Exit early if we've already unsubscribed
    if self._did_unsubscribe:
        raise StopAsyncIteration
    # Exit early if we received all the messages
    if self.max_count and self._total_received == self.max_count:
        await self.cleanup()
        raise StopAsyncIteration
    # Create a task to wait for the next message
    task: asyncio.Task[Msg] = asyncio.create_task(_fetch(self._sub))
    self._pending_task = task
    # Wait for the next message or any of the other tasks to complete
    await asyncio.wait(
        [self._pending_task, *self._tasks],
        return_when=asyncio.FIRST_COMPLETED,
    )
    # If the pending task is cancelled or not done, raise StopAsyncIteration
    if self._pending_task.cancelled() or not self._pending_task.done():
        await self.cleanup()
        raise StopAsyncIteration
    # This will raise an exception if an error occurred within the task
    msg = self._pending_task.result()
    # Check message headers
    # If the message is a 503 error, raise StopAsyncIteration
    if msg.headers and msg.headers.get("Status") == "503":
        await self.cleanup()
        raise StopAsyncIteration
    # Always increment the total received count
    self._total_received += 1
    # Check if this is a sentinel message, and if so, raise StopAsyncIteration
    if self.stop_on_sentinel and msg.data == b"":
        await self.cleanup()
        raise StopAsyncIteration
    # Return the message
    return msg

__init__(nc, subject, inbox, payload=None, headers=None, max_wait=None, max_interval=None, max_count=None, stop_on_sentinel=False)

Request many responses from the same subject.

Request is sent when entering the async context manager and unsubscribed when exiting.

The async iterator yieled by the context manager do not raise an error when no responses are received.

Responses are received until one of the following conditions is met:

  • max_wait seconds have passed.
  • max_count responses have been received.
  • max_interval seconds have passed between responses.
  • A sentinel message is received and stop_on_sentinel is True.

When any of the condition is met, the async iterator raises StopAsyncIteration on the next call to anext, and the subscription is unsubscribed on exit.

Parameters:

Name Type Description Default
subject str

The subject to send the request to.

required
payload bytes | None

The payload to send with the request.

None
headers dict[str, str] | None

The headers to send with the request.

None
inbox str

The inbox to receive the responses in. A new inbox is created if None.

required
max_wait float | None

The maximum amount of time to wait for responses. Default max wait can be configured at the instance level.

None
max_count int | None

The maximum number of responses to accept. No limit by default.

None
max_interval float | None

The maximum amount of time between responses. No limit by default.

None
stop_on_sentinel bool

Whether to stop when a sentinel message is received. False by default.

False
Source code in src/nats_contrib/request_many/iterator.py
def __init__(
    self,
    nc: Client,
    subject: str,
    inbox: str,
    payload: bytes | None = None,
    headers: dict[str, str] | None = None,
    max_wait: float | None = None,
    max_interval: float | None = None,
    max_count: int | None = None,
    stop_on_sentinel: bool = False,
) -> None:
    """Request many responses from the same subject.

    Request is sent when entering the async context manager and unsubscribed when exiting.

    The async iterator yieled by the context manager do not raise an
    error when no responses are received.

    Responses are received until one of the following conditions is met:

    - max_wait seconds have passed.
    - max_count responses have been received.
    - max_interval seconds have passed between responses.
    - A sentinel message is received and stop_on_sentinel is True.

    When any of the condition is met, the async iterator raises StopAsyncIteration on
    the next call to __anext__, and the subscription is unsubscribed on exit.

    Args:
        subject: The subject to send the request to.
        payload: The payload to send with the request.
        headers: The headers to send with the request.
        inbox: The inbox to receive the responses in. A new inbox is created if None.
        max_wait: The maximum amount of time to wait for responses. Default max wait can be configured at the instance level.
        max_count: The maximum number of responses to accept. No limit by default.
        max_interval: The maximum amount of time between responses. No limit by default.
        stop_on_sentinel: Whether to stop when a sentinel message is received. False by default.
    """
    if max_wait is None and max_interval is None:
        max_wait = 0.5
    # Save all the arguments as instance variables.
    self.nc = nc
    self.subject = subject
    self.payload = payload
    self.headers = headers
    self.inbox = inbox
    self.max_wait = max_wait
    self.max_count = max_count
    self.max_interval = max_interval
    self.stop_on_sentinel = stop_on_sentinel
    # Initialize the state of the request many iterator
    self._sub: Subscription | None = None
    self._did_unsubscribe = False
    self._total_received = 0
    self._last_received = asyncio.get_event_loop().time()
    self._tasks: list[asyncio.Task[object]] = []
    self._pending_task: asyncio.Task[Msg] | None = None

cleanup() async

Unsubscribe from the inbox and cancel all the tasks.

Source code in src/nats_contrib/request_many/iterator.py
async def cleanup(self) -> None:
    """Unsubscribe from the inbox and cancel all the tasks."""
    if self._did_unsubscribe:
        return
    self._did_unsubscribe = True
    for task in self._tasks:
        if not task.done():
            task.cancel()
    if self._pending_task and not self._pending_task.done():
        self._pending_task.cancel()
    if self._sub:
        await _unsubscribe(self._sub)

transform(source, map)

Create a new async context manager which will yield an async iterator that applies the map function to each value yielded by the source async iterator.

It is useful for example to transform the return value of the request_many_iter method.

Source code in src/nats_contrib/request_many/utils.py
def transform(
    source: AsyncContextManager[AsyncIterator[T]],
    map: Callable[[T], R],
) -> AsyncContextManager[AsyncIterator[R]]:
    """Create a new async context manager which will
    yield an async iterator that applies the map function to each value
    yielded by the source async iterator.

    It is useful for example to transform the return value of the
    `request_many_iter` method.
    """
    return TransformAsyncIterator(source, map)