Skip to content

Reference

Handler: TypeAlias = Callable[['Request'], Awaitable[None]] module-attribute

Handler is a function that processes a micro request.

Client

Source code in src/nats_contrib/micro/client.py
class Client:

    def __init__(
        self,
        nc: NATS,
        default_max_wait: float = 0.5,
        api_prefix: str = API_PREFIX,
    ) -> None:
        self.nc = nc
        self.api_prefix = api_prefix
        self.request_executor = RequestManyExecutor(nc, default_max_wait)

    async def request(
        self,
        subject: str,
        data: bytes | None = None,
        headers: dict[str, str] | None = None,
        timeout: float = 1,
    ) -> Msg:
        """Send a request and get the response.

        This method should be prefered over using the NATS client directly
        because it will handle the service errors properly.

        Args:
            subject: The subject to send the request to.
            data: The request data.
            headers: Additional request headers.
            timeout: The maximum time to wait for a response.

        Returns:

        """
        response = await self.nc.request(
            subject, data or b"", headers=headers, timeout=timeout
        )
        if response.headers:
            error_code = response.headers.get("Nats-Service-Error-Code")
            if error_code:
                raise ServiceError(
                    int(error_code),
                    response.headers.get("Nats-Service-Error", ""),
                    subject=subject,
                    data=response.data,
                    headers=response.headers or {},
                )

        return response

    async def ping(
        self,
        service: str | None = None,
        max_wait: float | None = None,
        max_count: int | None = None,
        max_interval: float | None = None,
    ) -> list[PingInfo]:
        """Ping all the services."""
        subject = internal.get_internal_subject(
            internal.ServiceVerb.PING, service, None, self.api_prefix
        )
        responses = await self.request_executor(
            subject,
            max_count=max_count,
            max_wait=max_wait,
            max_interval=max_interval,
        )
        return [PingInfo.from_response(json.loads(res.data)) for res in responses]

    async def info(
        self,
        service: str | None = None,
        max_wait: float | None = None,
        max_count: int | None = None,
        max_interval: float | None = None,
    ) -> list[ServiceInfo]:
        """Get all service informations."""
        subject = internal.get_internal_subject(
            internal.ServiceVerb.INFO, service, None, self.api_prefix
        )
        responses = await self.request_executor(
            subject,
            max_count=max_count,
            max_wait=max_wait,
            max_interval=max_interval,
        )
        return [ServiceInfo.from_response(json.loads(res.data)) for res in responses]

    async def stats(
        self,
        service: str | None = None,
        max_wait: float | None = None,
        max_count: int | None = None,
        max_interval: float | None = None,
    ) -> list[ServiceStats]:
        """Get all services stats."""
        subject = internal.get_internal_subject(
            internal.ServiceVerb.STATS, service, None, self.api_prefix
        )
        responses = await self.request_executor(
            subject,
            max_count=max_count,
            max_wait=max_wait,
            max_interval=max_interval,
        )
        return [ServiceStats.from_response(json.loads(res.data)) for res in responses]

    def ping_iter(
        self,
        service: str | None = None,
        max_wait: float | None = None,
        max_count: int | None = None,
        max_interval: float | None = None,
    ) -> AsyncContextManager[AsyncIterator[PingInfo]]:
        """Ping all the services."""
        subject = internal.get_internal_subject(
            internal.ServiceVerb.PING, service, None, self.api_prefix
        )
        return transform(
            RequestManyIterator(
                self.nc,
                subject,
                inbox=self.nc.new_inbox(),
                max_count=max_count,
                max_wait=max_wait,
                max_interval=max_interval,
            ),
            lambda res: PingInfo.from_response(json.loads(res.data)),
        )

    def info_iter(
        self,
        service: str | None = None,
        max_wait: float | None = None,
        max_count: int | None = None,
        max_interval: float | None = None,
    ) -> AsyncContextManager[AsyncIterator[ServiceInfo]]:
        """Get all service informations."""
        subject = internal.get_internal_subject(
            internal.ServiceVerb.INFO, service, None, self.api_prefix
        )
        return transform(
            RequestManyIterator(
                self.nc,
                subject,
                inbox=self.nc.new_inbox(),
                max_count=max_count,
                max_wait=max_wait,
                max_interval=max_interval,
            ),
            lambda res: ServiceInfo.from_response(json.loads(res.data)),
        )

    def stats_iter(
        self,
        service: str | None = None,
        max_wait: float | None = None,
        max_count: int | None = None,
        max_interval: float | None = None,
    ) -> AsyncContextManager[AsyncIterator[ServiceStats]]:
        """Get all services stats."""
        subject = internal.get_internal_subject(
            internal.ServiceVerb.STATS, service, None, self.api_prefix
        )
        return transform(
            RequestManyIterator(
                self.nc,
                subject,
                inbox=self.nc.new_inbox(),
                max_count=max_count,
                max_wait=max_wait,
                max_interval=max_interval,
            ),
            lambda res: ServiceStats.from_response(json.loads(res.data)),
        )

    def service(self, service: str) -> Service:
        """Get a client for a single service."""
        return Service(self, service)

    def instance(self, service: str, id: str) -> Instance:
        """Get a client for a single service instance."""
        return Instance(self, service, id)

info(service=None, max_wait=None, max_count=None, max_interval=None) async

Get all service informations.

Source code in src/nats_contrib/micro/client.py
async def info(
    self,
    service: str | None = None,
    max_wait: float | None = None,
    max_count: int | None = None,
    max_interval: float | None = None,
) -> list[ServiceInfo]:
    """Get all service informations."""
    subject = internal.get_internal_subject(
        internal.ServiceVerb.INFO, service, None, self.api_prefix
    )
    responses = await self.request_executor(
        subject,
        max_count=max_count,
        max_wait=max_wait,
        max_interval=max_interval,
    )
    return [ServiceInfo.from_response(json.loads(res.data)) for res in responses]

info_iter(service=None, max_wait=None, max_count=None, max_interval=None)

Get all service informations.

Source code in src/nats_contrib/micro/client.py
def info_iter(
    self,
    service: str | None = None,
    max_wait: float | None = None,
    max_count: int | None = None,
    max_interval: float | None = None,
) -> AsyncContextManager[AsyncIterator[ServiceInfo]]:
    """Get all service informations."""
    subject = internal.get_internal_subject(
        internal.ServiceVerb.INFO, service, None, self.api_prefix
    )
    return transform(
        RequestManyIterator(
            self.nc,
            subject,
            inbox=self.nc.new_inbox(),
            max_count=max_count,
            max_wait=max_wait,
            max_interval=max_interval,
        ),
        lambda res: ServiceInfo.from_response(json.loads(res.data)),
    )

instance(service, id)

Get a client for a single service instance.

Source code in src/nats_contrib/micro/client.py
def instance(self, service: str, id: str) -> Instance:
    """Get a client for a single service instance."""
    return Instance(self, service, id)

ping(service=None, max_wait=None, max_count=None, max_interval=None) async

Ping all the services.

Source code in src/nats_contrib/micro/client.py
async def ping(
    self,
    service: str | None = None,
    max_wait: float | None = None,
    max_count: int | None = None,
    max_interval: float | None = None,
) -> list[PingInfo]:
    """Ping all the services."""
    subject = internal.get_internal_subject(
        internal.ServiceVerb.PING, service, None, self.api_prefix
    )
    responses = await self.request_executor(
        subject,
        max_count=max_count,
        max_wait=max_wait,
        max_interval=max_interval,
    )
    return [PingInfo.from_response(json.loads(res.data)) for res in responses]

ping_iter(service=None, max_wait=None, max_count=None, max_interval=None)

Ping all the services.

Source code in src/nats_contrib/micro/client.py
def ping_iter(
    self,
    service: str | None = None,
    max_wait: float | None = None,
    max_count: int | None = None,
    max_interval: float | None = None,
) -> AsyncContextManager[AsyncIterator[PingInfo]]:
    """Ping all the services."""
    subject = internal.get_internal_subject(
        internal.ServiceVerb.PING, service, None, self.api_prefix
    )
    return transform(
        RequestManyIterator(
            self.nc,
            subject,
            inbox=self.nc.new_inbox(),
            max_count=max_count,
            max_wait=max_wait,
            max_interval=max_interval,
        ),
        lambda res: PingInfo.from_response(json.loads(res.data)),
    )

request(subject, data=None, headers=None, timeout=1) async

Send a request and get the response.

This method should be prefered over using the NATS client directly because it will handle the service errors properly.

Parameters:

Name Type Description Default
subject str

The subject to send the request to.

required
data bytes | None

The request data.

None
headers dict[str, str] | None

Additional request headers.

None
timeout float

The maximum time to wait for a response.

1

Returns:

Source code in src/nats_contrib/micro/client.py
async def request(
    self,
    subject: str,
    data: bytes | None = None,
    headers: dict[str, str] | None = None,
    timeout: float = 1,
) -> Msg:
    """Send a request and get the response.

    This method should be prefered over using the NATS client directly
    because it will handle the service errors properly.

    Args:
        subject: The subject to send the request to.
        data: The request data.
        headers: Additional request headers.
        timeout: The maximum time to wait for a response.

    Returns:

    """
    response = await self.nc.request(
        subject, data or b"", headers=headers, timeout=timeout
    )
    if response.headers:
        error_code = response.headers.get("Nats-Service-Error-Code")
        if error_code:
            raise ServiceError(
                int(error_code),
                response.headers.get("Nats-Service-Error", ""),
                subject=subject,
                data=response.data,
                headers=response.headers or {},
            )

    return response

service(service)

Get a client for a single service.

Source code in src/nats_contrib/micro/client.py
def service(self, service: str) -> Service:
    """Get a client for a single service."""
    return Service(self, service)

stats(service=None, max_wait=None, max_count=None, max_interval=None) async

Get all services stats.

Source code in src/nats_contrib/micro/client.py
async def stats(
    self,
    service: str | None = None,
    max_wait: float | None = None,
    max_count: int | None = None,
    max_interval: float | None = None,
) -> list[ServiceStats]:
    """Get all services stats."""
    subject = internal.get_internal_subject(
        internal.ServiceVerb.STATS, service, None, self.api_prefix
    )
    responses = await self.request_executor(
        subject,
        max_count=max_count,
        max_wait=max_wait,
        max_interval=max_interval,
    )
    return [ServiceStats.from_response(json.loads(res.data)) for res in responses]

stats_iter(service=None, max_wait=None, max_count=None, max_interval=None)

Get all services stats.

Source code in src/nats_contrib/micro/client.py
def stats_iter(
    self,
    service: str | None = None,
    max_wait: float | None = None,
    max_count: int | None = None,
    max_interval: float | None = None,
) -> AsyncContextManager[AsyncIterator[ServiceStats]]:
    """Get all services stats."""
    subject = internal.get_internal_subject(
        internal.ServiceVerb.STATS, service, None, self.api_prefix
    )
    return transform(
        RequestManyIterator(
            self.nc,
            subject,
            inbox=self.nc.new_inbox(),
            max_count=max_count,
            max_wait=max_wait,
            max_interval=max_interval,
        ),
        lambda res: ServiceStats.from_response(json.loads(res.data)),
    )

Context

A class to run micro services easily.

This class is useful in a main function to manage ensure that all async resources are cleaned up properly when the program is cancelled.

It also allows to listen to signals and cancel the program when a signal is received easily.

Source code in src/nats_contrib/micro/context.py
class Context:
    """A class to run micro services easily.

    This class is useful in a main function to manage ensure
    that all async resources are cleaned up properly when the
    program is cancelled.

    It also allows to listen to signals and cancel the program
    when a signal is received easily.
    """

    def __init__(self, client: NATS | None = None):
        self.exit_stack = contextlib.AsyncExitStack()
        self.cancel_event = asyncio.Event()
        self.client = client or NATS()
        self.services: list[Service] = []

    async def connect(self, *options: ConnectOption) -> None:
        """Connect to the NATS server. Does not raise an error when cancelled"""
        await self.wait_for(connect(client=self.client, *options))
        if not self.cancelled():
            await self.enter(self.client)

    async def add_service(
        self,
        name: str,
        version: str,
        description: str | None = None,
        metadata: dict[str, str] | None = None,
        queue_group: str | None = None,
        pending_bytes_limit_by_endpoint: int | None = None,
        pending_msgs_limit_by_endpoint: int | None = None,
        now: Callable[[], datetime.datetime] | None = None,
        id_generator: Callable[[], str] | None = None,
        api_prefix: str | None = None,
    ) -> Service:
        """Add a service to the context.

        This will start the service using the client used
        to connect to the NATS server.
        """
        service = add_service(
            self.client,
            name,
            version,
            description,
            metadata,
            queue_group,
            pending_bytes_limit_by_endpoint,
            pending_msgs_limit_by_endpoint,
            now,
            id_generator,
            api_prefix,
        )
        await self.enter(service)
        self.services.append(service)
        return service

    def reset(self) -> None:
        """Reset all the services."""
        for service in self.services:
            service.reset()

    def cancel(self) -> None:
        """Set the cancel event."""
        self.cancel_event.set()

    def cancelled(self) -> bool:
        """Check if the context was cancelled."""
        return self.cancel_event.is_set()

    def add_disconnected_callback(
        self, callback: Callable[[], Awaitable[None]]
    ) -> None:
        """Add a disconnected callback to the NATS client."""
        existing = self.client._disconnected_cb  # pyright: ignore[reportPrivateUsage]
        self.client._disconnected_cb = _chain0(  # pyright: ignore[reportPrivateUsage]
            existing, callback
        )

    def add_closed_callback(self, callback: Callable[[], Awaitable[None]]) -> None:
        """Add a closed callback to the NATS client."""
        existing = self.client._closed_cb  # pyright: ignore[reportPrivateUsage]
        self.client._closed_cb = _chain0(  # pyright: ignore[reportPrivateUsage]
            existing, callback
        )

    def add_reconnected_callback(self, callback: Callable[[], Awaitable[None]]) -> None:
        """Add a reconnected callback to the NATS client."""
        existing = self.client._reconnected_cb  # pyright: ignore[reportPrivateUsage]
        self.client._reconnected_cb = _chain0(  # pyright: ignore[reportPrivateUsage]
            existing, callback
        )

    def add_error_callback(
        self, callback: Callable[[Exception], Awaitable[None]]
    ) -> None:
        """Add an error callback to the NATS client."""
        existing = self.client._error_cb  # pyright: ignore[reportPrivateUsage]
        self.client._error_cb = _chain1(  # pyright: ignore[reportPrivateUsage]
            existing, callback
        )

    def trap_signal(self, *signals: signal.Signals) -> None:
        """Notify the context that a signal has been received."""
        if not signals:
            signals = (signal.Signals.SIGINT, signal.Signals.SIGTERM)
        loop = asyncio.get_event_loop()
        for sig in signals:
            loop.add_signal_handler(sig, self.cancel)

    def push(self, callback: Callable[[], Awaitable[None] | None]) -> None:
        """Add a callback to the exit stack."""
        if inspect.iscoroutinefunction(callback):
            self.exit_stack.push_async_callback(callback)
        else:
            self.exit_stack.callback(callback)

    async def enter(self, async_context: AsyncContextManager[T]) -> T:
        """Enter an async context."""
        return await self.exit_stack.enter_async_context(async_context)

    async def wait(self) -> None:
        """Wait for the cancel event to be set."""
        await self.cancel_event.wait()

    async def wait_for(self, coro: Coroutine[Any, Any, Any]) -> None:
        """Run a coroutine in the context and cancel it context is cancelled.

        This method does not raise an exception if the coroutine is cancelled.
        You can use .cancelled() on the context to check if the coroutine was
        cancelled.
        """
        await _run_until_first_complete(coro, self.wait())

    async def __aenter__(self) -> "Context":
        await self.exit_stack.__aenter__()
        return self

    async def __aexit__(self, *args: Any, **kwargs: Any) -> None:
        try:
            await self.exit_stack.__aexit__(None, None, None)
        finally:
            self.services.clear()

    async def run_forever(
        self,
        setup: Callable[[Context], Coroutine[Any, Any, None]],
        /,
        *options: ConnectOption,
        trap_signals: bool | tuple[signal.Signals, ...] = False,
    ) -> None:
        """Useful in a main function of a program.

        This method will first connect to the NATS server using the provided
        options. It will then run the setup function and finally enter any
        additional services provided.

        If trap_signals is True, it will trap SIGINT and SIGTERM signals
        and cancel the context when one of these signals is received.

        Other signals can be trapped by providing a tuple of signals to
        trap.

        This method will not raise an exception if the context is cancelled.

        You can use .cancelled() on the context to check if the coroutine was
        cancelled.

        Warning:
            The context must not have been used as an async context manager
            before calling this method.

        Args:
            setup: A coroutine to setup the program.
            options: The options to pass to the connect method.
            trap_signals: If True, trap SIGINT and SIGTERM signals.
        """
        async with self as ctx:
            if trap_signals:
                if trap_signals is True:
                    trap_signals = (signal.Signals.SIGINT, signal.Signals.SIGTERM)
                ctx.trap_signal(*trap_signals)
            await ctx.wait_for(connect(client=ctx.client, *options))
            if ctx.cancelled():
                return
            await ctx.wait_for(setup(ctx))
            if ctx.cancelled():
                return
            await ctx.wait()

add_closed_callback(callback)

Add a closed callback to the NATS client.

Source code in src/nats_contrib/micro/context.py
def add_closed_callback(self, callback: Callable[[], Awaitable[None]]) -> None:
    """Add a closed callback to the NATS client."""
    existing = self.client._closed_cb  # pyright: ignore[reportPrivateUsage]
    self.client._closed_cb = _chain0(  # pyright: ignore[reportPrivateUsage]
        existing, callback
    )

add_disconnected_callback(callback)

Add a disconnected callback to the NATS client.

Source code in src/nats_contrib/micro/context.py
def add_disconnected_callback(
    self, callback: Callable[[], Awaitable[None]]
) -> None:
    """Add a disconnected callback to the NATS client."""
    existing = self.client._disconnected_cb  # pyright: ignore[reportPrivateUsage]
    self.client._disconnected_cb = _chain0(  # pyright: ignore[reportPrivateUsage]
        existing, callback
    )

add_error_callback(callback)

Add an error callback to the NATS client.

Source code in src/nats_contrib/micro/context.py
def add_error_callback(
    self, callback: Callable[[Exception], Awaitable[None]]
) -> None:
    """Add an error callback to the NATS client."""
    existing = self.client._error_cb  # pyright: ignore[reportPrivateUsage]
    self.client._error_cb = _chain1(  # pyright: ignore[reportPrivateUsage]
        existing, callback
    )

add_reconnected_callback(callback)

Add a reconnected callback to the NATS client.

Source code in src/nats_contrib/micro/context.py
def add_reconnected_callback(self, callback: Callable[[], Awaitable[None]]) -> None:
    """Add a reconnected callback to the NATS client."""
    existing = self.client._reconnected_cb  # pyright: ignore[reportPrivateUsage]
    self.client._reconnected_cb = _chain0(  # pyright: ignore[reportPrivateUsage]
        existing, callback
    )

add_service(name, version, description=None, metadata=None, queue_group=None, pending_bytes_limit_by_endpoint=None, pending_msgs_limit_by_endpoint=None, now=None, id_generator=None, api_prefix=None) async

Add a service to the context.

This will start the service using the client used to connect to the NATS server.

Source code in src/nats_contrib/micro/context.py
async def add_service(
    self,
    name: str,
    version: str,
    description: str | None = None,
    metadata: dict[str, str] | None = None,
    queue_group: str | None = None,
    pending_bytes_limit_by_endpoint: int | None = None,
    pending_msgs_limit_by_endpoint: int | None = None,
    now: Callable[[], datetime.datetime] | None = None,
    id_generator: Callable[[], str] | None = None,
    api_prefix: str | None = None,
) -> Service:
    """Add a service to the context.

    This will start the service using the client used
    to connect to the NATS server.
    """
    service = add_service(
        self.client,
        name,
        version,
        description,
        metadata,
        queue_group,
        pending_bytes_limit_by_endpoint,
        pending_msgs_limit_by_endpoint,
        now,
        id_generator,
        api_prefix,
    )
    await self.enter(service)
    self.services.append(service)
    return service

cancel()

Set the cancel event.

Source code in src/nats_contrib/micro/context.py
def cancel(self) -> None:
    """Set the cancel event."""
    self.cancel_event.set()

cancelled()

Check if the context was cancelled.

Source code in src/nats_contrib/micro/context.py
def cancelled(self) -> bool:
    """Check if the context was cancelled."""
    return self.cancel_event.is_set()

connect(*options) async

Connect to the NATS server. Does not raise an error when cancelled

Source code in src/nats_contrib/micro/context.py
async def connect(self, *options: ConnectOption) -> None:
    """Connect to the NATS server. Does not raise an error when cancelled"""
    await self.wait_for(connect(client=self.client, *options))
    if not self.cancelled():
        await self.enter(self.client)

enter(async_context) async

Enter an async context.

Source code in src/nats_contrib/micro/context.py
async def enter(self, async_context: AsyncContextManager[T]) -> T:
    """Enter an async context."""
    return await self.exit_stack.enter_async_context(async_context)

push(callback)

Add a callback to the exit stack.

Source code in src/nats_contrib/micro/context.py
def push(self, callback: Callable[[], Awaitable[None] | None]) -> None:
    """Add a callback to the exit stack."""
    if inspect.iscoroutinefunction(callback):
        self.exit_stack.push_async_callback(callback)
    else:
        self.exit_stack.callback(callback)

reset()

Reset all the services.

Source code in src/nats_contrib/micro/context.py
def reset(self) -> None:
    """Reset all the services."""
    for service in self.services:
        service.reset()

run_forever(setup, /, *options, trap_signals=False) async

Useful in a main function of a program.

This method will first connect to the NATS server using the provided options. It will then run the setup function and finally enter any additional services provided.

If trap_signals is True, it will trap SIGINT and SIGTERM signals and cancel the context when one of these signals is received.

Other signals can be trapped by providing a tuple of signals to trap.

This method will not raise an exception if the context is cancelled.

You can use .cancelled() on the context to check if the coroutine was cancelled.

Warning

The context must not have been used as an async context manager before calling this method.

Parameters:

Name Type Description Default
setup Callable[[Context], Coroutine[Any, Any, None]]

A coroutine to setup the program.

required
options ConnectOption

The options to pass to the connect method.

()
trap_signals bool | tuple[Signals, ...]

If True, trap SIGINT and SIGTERM signals.

False
Source code in src/nats_contrib/micro/context.py
async def run_forever(
    self,
    setup: Callable[[Context], Coroutine[Any, Any, None]],
    /,
    *options: ConnectOption,
    trap_signals: bool | tuple[signal.Signals, ...] = False,
) -> None:
    """Useful in a main function of a program.

    This method will first connect to the NATS server using the provided
    options. It will then run the setup function and finally enter any
    additional services provided.

    If trap_signals is True, it will trap SIGINT and SIGTERM signals
    and cancel the context when one of these signals is received.

    Other signals can be trapped by providing a tuple of signals to
    trap.

    This method will not raise an exception if the context is cancelled.

    You can use .cancelled() on the context to check if the coroutine was
    cancelled.

    Warning:
        The context must not have been used as an async context manager
        before calling this method.

    Args:
        setup: A coroutine to setup the program.
        options: The options to pass to the connect method.
        trap_signals: If True, trap SIGINT and SIGTERM signals.
    """
    async with self as ctx:
        if trap_signals:
            if trap_signals is True:
                trap_signals = (signal.Signals.SIGINT, signal.Signals.SIGTERM)
            ctx.trap_signal(*trap_signals)
        await ctx.wait_for(connect(client=ctx.client, *options))
        if ctx.cancelled():
            return
        await ctx.wait_for(setup(ctx))
        if ctx.cancelled():
            return
        await ctx.wait()

trap_signal(*signals)

Notify the context that a signal has been received.

Source code in src/nats_contrib/micro/context.py
def trap_signal(self, *signals: signal.Signals) -> None:
    """Notify the context that a signal has been received."""
    if not signals:
        signals = (signal.Signals.SIGINT, signal.Signals.SIGTERM)
    loop = asyncio.get_event_loop()
    for sig in signals:
        loop.add_signal_handler(sig, self.cancel)

wait() async

Wait for the cancel event to be set.

Source code in src/nats_contrib/micro/context.py
async def wait(self) -> None:
    """Wait for the cancel event to be set."""
    await self.cancel_event.wait()

wait_for(coro) async

Run a coroutine in the context and cancel it context is cancelled.

This method does not raise an exception if the coroutine is cancelled. You can use .cancelled() on the context to check if the coroutine was cancelled.

Source code in src/nats_contrib/micro/context.py
async def wait_for(self, coro: Coroutine[Any, Any, Any]) -> None:
    """Run a coroutine in the context and cancel it context is cancelled.

    This method does not raise an exception if the coroutine is cancelled.
    You can use .cancelled() on the context to check if the coroutine was
    cancelled.
    """
    await _run_until_first_complete(coro, self.wait())

Endpoint

Endpoint manages a service endpoint.

Source code in src/nats_contrib/micro/api.py
class Endpoint:
    """Endpoint manages a service endpoint."""

    def __init__(self, config: internal.EndpointConfig) -> None:
        self.config = config
        self.stats = internal.create_endpoint_stats(config)
        self.info = internal.create_endpoint_info(config)
        self._sub: Subscription | None = None

    def reset(self) -> None:
        """Reset the endpoint statistics."""
        self.stats = internal.create_endpoint_stats(self.config)
        self.info = internal.create_endpoint_info(self.config)

reset()

Reset the endpoint statistics.

Source code in src/nats_contrib/micro/api.py
def reset(self) -> None:
    """Reset the endpoint statistics."""
    self.stats = internal.create_endpoint_stats(self.config)
    self.info = internal.create_endpoint_info(self.config)

EndpointInfo dataclass

Bases: Base

The information of an endpoint.

Source code in src/nats_contrib/micro/models.py
@dataclass
class EndpointInfo(Base):
    """The information of an endpoint."""

    name: str
    """
    The endopoint name
    """
    subject: str
    """
    The subject the endpoint listens on
    """
    metadata: dict[str, str] | None = None
    """
    The endpoint metadata.
    """
    queue_group: str | None = None
    """
    The queue group this endpoint listens on for requests
    """

    def copy(self) -> EndpointInfo:
        return replace(
            self,
            metadata=None if self.metadata is None else self.metadata.copy(),
        )

metadata: dict[str, str] | None = None class-attribute instance-attribute

The endpoint metadata.

name: str instance-attribute

The endopoint name

queue_group: str | None = None class-attribute instance-attribute

The queue group this endpoint listens on for requests

subject: str instance-attribute

The subject the endpoint listens on

EndpointStats dataclass

Bases: Base

Statistics about a specific service endpoint

Source code in src/nats_contrib/micro/models.py
@dataclass
class EndpointStats(Base):
    """
    Statistics about a specific service endpoint
    """

    name: str
    """
    The endpoint name
    """
    subject: str
    """
    The subject the endpoint listens on
    """
    num_requests: int
    """
    The number of requests this endpoint received
    """
    num_errors: int
    """
    The number of errors this endpoint encountered
    """
    last_error: str
    """
    The last error the service encountered
    """
    processing_time: int
    """
    How long, in total, was spent processing requests in the handler
    """
    average_processing_time: int
    """
    The average time spent processing requests
    """
    queue_group: str | None = None
    """
    The queue group this endpoint listens on for requests
    """
    data: dict[str, object] | None = None
    """
    Additional statistics the endpoint makes available
    """

    def copy(self) -> EndpointStats:
        return replace(self, data=None if self.data is None else self.data.copy())

average_processing_time: int instance-attribute

The average time spent processing requests

data: dict[str, object] | None = None class-attribute instance-attribute

Additional statistics the endpoint makes available

last_error: str instance-attribute

The last error the service encountered

name: str instance-attribute

The endpoint name

num_errors: int instance-attribute

The number of errors this endpoint encountered

num_requests: int instance-attribute

The number of requests this endpoint received

processing_time: int instance-attribute

How long, in total, was spent processing requests in the handler

queue_group: str | None = None class-attribute instance-attribute

The queue group this endpoint listens on for requests

subject: str instance-attribute

The subject the endpoint listens on

Group

Group allows for grouping endpoints on a service.

Endpoints created using Group.add_endpoint will be grouped under common prefix (group name). New groups can also be derived from a group using Group.add_group.

Source code in src/nats_contrib/micro/api.py
class Group:
    """Group allows for grouping endpoints on a service.

    Endpoints created using `Group.add_endpoint` will be grouped
    under common prefix (group name). New groups can also be derived
    from a group using `Group.add_group`.
    """

    def __init__(self, config: internal.GroupConfig, service: Service) -> None:
        self._config = config
        self._service = service

    def add_group(
        self,
        name: str,
        queue_group: str | None = None,
        pending_bytes_limit_by_endpoint: int | None = None,
        pending_msgs_limit_by_endpoint: int | None = None,
    ) -> Group:
        """Add a group to the group.

        Args:
            name: The name of the group. Must be a valid NATS subject prefix.
            queue_group: The default queue group of the group. When queue group is not set, it defaults to the queue group of the parent group or service.
            pending_bytes_limit_by_endpoint: The default pending bytes limit for each endpoint within the group.
            pending_msgs_limit_by_endpoint: The default pending messages limit for each endpoint within the group.
        """
        config = self._config.child(
            name=name,
            queue_group=queue_group,
            pending_bytes_limit=pending_bytes_limit_by_endpoint,
            pending_msgs_limit=pending_msgs_limit_by_endpoint,
        )
        group = Group(config, self._service)
        return group

    async def add_endpoint(
        self,
        name: str,
        handler: Handler,
        subject: str | None = None,
        queue_group: str | None = None,
        metadata: dict[str, str] | None = None,
        pending_bytes_limit: int | None = None,
        pending_msgs_limit: int | None = None,
        middlewares: list[Middleware] | None = None,
    ) -> Endpoint:
        """Add an endpoint to the group.

        Args:
            name: The name of the endpoint.
            handler: The handler of the endpoint.
            subject: The subject of the endpoint. When subject is not set, it defaults to the name of the endpoint.
            queue_group: The queue group of the endpoint. When queue group is not set, it defaults to the queue group of the parent group or service.
            metadata: The metadata of the endpoint.
            pending_bytes_limit: The pending bytes limit for this endpoint.
            pending_msgs_limit: The pending messages limit for this endpoint.
        """
        return await self._service.add_endpoint(
            name=name,
            subject=f"{self._config.name}.{subject or name}",
            handler=handler,
            metadata=metadata,
            queue_group=queue_group or self._config.queue_group,
            pending_bytes_limit=pending_bytes_limit
            or self._config.pending_bytes_limit_by_endpoint,
            pending_msgs_limit=pending_msgs_limit
            or self._config.pending_msgs_limit_by_endpoint,
            middlewares=middlewares,
        )

add_endpoint(name, handler, subject=None, queue_group=None, metadata=None, pending_bytes_limit=None, pending_msgs_limit=None, middlewares=None) async

Add an endpoint to the group.

Parameters:

Name Type Description Default
name str

The name of the endpoint.

required
handler Handler

The handler of the endpoint.

required
subject str | None

The subject of the endpoint. When subject is not set, it defaults to the name of the endpoint.

None
queue_group str | None

The queue group of the endpoint. When queue group is not set, it defaults to the queue group of the parent group or service.

None
metadata dict[str, str] | None

The metadata of the endpoint.

None
pending_bytes_limit int | None

The pending bytes limit for this endpoint.

None
pending_msgs_limit int | None

The pending messages limit for this endpoint.

None
Source code in src/nats_contrib/micro/api.py
async def add_endpoint(
    self,
    name: str,
    handler: Handler,
    subject: str | None = None,
    queue_group: str | None = None,
    metadata: dict[str, str] | None = None,
    pending_bytes_limit: int | None = None,
    pending_msgs_limit: int | None = None,
    middlewares: list[Middleware] | None = None,
) -> Endpoint:
    """Add an endpoint to the group.

    Args:
        name: The name of the endpoint.
        handler: The handler of the endpoint.
        subject: The subject of the endpoint. When subject is not set, it defaults to the name of the endpoint.
        queue_group: The queue group of the endpoint. When queue group is not set, it defaults to the queue group of the parent group or service.
        metadata: The metadata of the endpoint.
        pending_bytes_limit: The pending bytes limit for this endpoint.
        pending_msgs_limit: The pending messages limit for this endpoint.
    """
    return await self._service.add_endpoint(
        name=name,
        subject=f"{self._config.name}.{subject or name}",
        handler=handler,
        metadata=metadata,
        queue_group=queue_group or self._config.queue_group,
        pending_bytes_limit=pending_bytes_limit
        or self._config.pending_bytes_limit_by_endpoint,
        pending_msgs_limit=pending_msgs_limit
        or self._config.pending_msgs_limit_by_endpoint,
        middlewares=middlewares,
    )

add_group(name, queue_group=None, pending_bytes_limit_by_endpoint=None, pending_msgs_limit_by_endpoint=None)

Add a group to the group.

Parameters:

Name Type Description Default
name str

The name of the group. Must be a valid NATS subject prefix.

required
queue_group str | None

The default queue group of the group. When queue group is not set, it defaults to the queue group of the parent group or service.

None
pending_bytes_limit_by_endpoint int | None

The default pending bytes limit for each endpoint within the group.

None
pending_msgs_limit_by_endpoint int | None

The default pending messages limit for each endpoint within the group.

None
Source code in src/nats_contrib/micro/api.py
def add_group(
    self,
    name: str,
    queue_group: str | None = None,
    pending_bytes_limit_by_endpoint: int | None = None,
    pending_msgs_limit_by_endpoint: int | None = None,
) -> Group:
    """Add a group to the group.

    Args:
        name: The name of the group. Must be a valid NATS subject prefix.
        queue_group: The default queue group of the group. When queue group is not set, it defaults to the queue group of the parent group or service.
        pending_bytes_limit_by_endpoint: The default pending bytes limit for each endpoint within the group.
        pending_msgs_limit_by_endpoint: The default pending messages limit for each endpoint within the group.
    """
    config = self._config.child(
        name=name,
        queue_group=queue_group,
        pending_bytes_limit=pending_bytes_limit_by_endpoint,
        pending_msgs_limit=pending_msgs_limit_by_endpoint,
    )
    group = Group(config, self._service)
    return group

PingInfo dataclass

Bases: Base

The response to a ping message.

Source code in src/nats_contrib/micro/models.py
@dataclass
class PingInfo(Base):
    """The response to a ping message."""

    name: str
    id: str
    version: str
    metadata: dict[str, str]
    type: str = "io.nats.micro.v1.ping_response"

    def copy(self) -> PingInfo:
        return replace(self, metadata=self.metadata.copy())

Request

Request is the interface for a request received by a service.

An interface is used instead of a class to allow for different implementations. It makes it easy to test a service by using a stub implementation of Request.

Four methods must be implemented:

  • def subject() -> str: the subject on which the request was received.
  • def headers() -> dict[str, str]: the headers of the request.
  • def data() -> bytes: the data of the request.
  • async def respond(...) -> None: send a response to the request.
Source code in src/nats_contrib/micro/request.py
class Request(metaclass=abc.ABCMeta):
    """Request is the interface for a request received by a service.

    An interface is used instead of a class to allow for different implementations.
    It makes it easy to test a service by using a stub implementation of Request.

    Four methods must be implemented:

    - `def subject() -> str`: the subject on which the request was received.
    - `def headers() -> dict[str, str]`: the headers of the request.
    - `def data() -> bytes`: the data of the request.
    - `async def respond(...) -> None`: send a response to the request.
    """

    @abc.abstractmethod
    def subject(self) -> str:
        """The subject on which request was received."""
        raise NotImplementedError()

    @abc.abstractmethod
    def headers(self) -> dict[str, str]:
        """The headers of the request."""
        raise NotImplementedError()

    @abc.abstractmethod
    def data(self) -> bytes:
        """The data of the request."""
        raise NotImplementedError()

    @abc.abstractmethod
    async def respond(self, data: bytes, headers: dict[str, str] | None = None) -> None:
        """Send a success response to the request.

        Args:
            data: The response data.
            headers: Additional response headers.
        """
        raise NotImplementedError()

    async def respond_success(
        self,
        code: int,
        data: bytes | None = None,
        headers: dict[str, str] | None = None,
    ) -> None:
        """Send a success response to the request.

        Args:
            code: The status code describing the success.
            data: The response data.
            headers: Additional response headers.
        """
        if not headers:
            headers = {}
        headers["Nats-Service-Success-Code"] = str(code)
        await self.respond(data or b"", headers=headers)

    async def respond_error(
        self,
        code: int,
        description: str,
        data: bytes | None = None,
        headers: dict[str, str] | None = None,
    ) -> None:
        """Send an error response to the request.

        Args:
            code: The error code describing the error.
            description: A string describing the error which can be displayed to the client.
            data: The error data.
            headers: Additional response headers.
        """
        if not headers:
            headers = {}
        headers["Nats-Service-Error"] = description
        headers["Nats-Service-Error-Code"] = str(code)
        await self.respond(data or b"", headers=headers)

data() abstractmethod

The data of the request.

Source code in src/nats_contrib/micro/request.py
@abc.abstractmethod
def data(self) -> bytes:
    """The data of the request."""
    raise NotImplementedError()

headers() abstractmethod

The headers of the request.

Source code in src/nats_contrib/micro/request.py
@abc.abstractmethod
def headers(self) -> dict[str, str]:
    """The headers of the request."""
    raise NotImplementedError()

respond(data, headers=None) abstractmethod async

Send a success response to the request.

Parameters:

Name Type Description Default
data bytes

The response data.

required
headers dict[str, str] | None

Additional response headers.

None
Source code in src/nats_contrib/micro/request.py
@abc.abstractmethod
async def respond(self, data: bytes, headers: dict[str, str] | None = None) -> None:
    """Send a success response to the request.

    Args:
        data: The response data.
        headers: Additional response headers.
    """
    raise NotImplementedError()

respond_error(code, description, data=None, headers=None) async

Send an error response to the request.

Parameters:

Name Type Description Default
code int

The error code describing the error.

required
description str

A string describing the error which can be displayed to the client.

required
data bytes | None

The error data.

None
headers dict[str, str] | None

Additional response headers.

None
Source code in src/nats_contrib/micro/request.py
async def respond_error(
    self,
    code: int,
    description: str,
    data: bytes | None = None,
    headers: dict[str, str] | None = None,
) -> None:
    """Send an error response to the request.

    Args:
        code: The error code describing the error.
        description: A string describing the error which can be displayed to the client.
        data: The error data.
        headers: Additional response headers.
    """
    if not headers:
        headers = {}
    headers["Nats-Service-Error"] = description
    headers["Nats-Service-Error-Code"] = str(code)
    await self.respond(data or b"", headers=headers)

respond_success(code, data=None, headers=None) async

Send a success response to the request.

Parameters:

Name Type Description Default
code int

The status code describing the success.

required
data bytes | None

The response data.

None
headers dict[str, str] | None

Additional response headers.

None
Source code in src/nats_contrib/micro/request.py
async def respond_success(
    self,
    code: int,
    data: bytes | None = None,
    headers: dict[str, str] | None = None,
) -> None:
    """Send a success response to the request.

    Args:
        code: The status code describing the success.
        data: The response data.
        headers: Additional response headers.
    """
    if not headers:
        headers = {}
    headers["Nats-Service-Success-Code"] = str(code)
    await self.respond(data or b"", headers=headers)

subject() abstractmethod

The subject on which request was received.

Source code in src/nats_contrib/micro/request.py
@abc.abstractmethod
def subject(self) -> str:
    """The subject on which request was received."""
    raise NotImplementedError()

Service

Services simplify the development of NATS micro-services.

Endpoints can be added to a service after it has been created and started. Each endpoint is a request-reply handler for a subject.

Groups can be added to a service to group endpoints under a common prefix.

Source code in src/nats_contrib/micro/api.py
class Service:
    """Services simplify the development of NATS micro-services.

    Endpoints can be added to a service after it has been created and started.
    Each endpoint is a request-reply handler for a subject.

    Groups can be added to a service to group endpoints under a common prefix.
    """

    def __init__(
        self,
        nc: NatsClient,
        id: str,
        config: internal.ServiceConfig,
        api_prefix: str,
        clock: Callable[[], datetime],
    ) -> None:
        self._nc = nc
        self._config = config
        self._api_prefix = api_prefix
        self._clock = clock
        # Initialize state
        self._id = id
        self._endpoints: list[Endpoint] = []
        self._stopped = False
        # Internal responses
        self._stats = internal.new_service_stats(self._id, self._clock(), config)
        self._info = internal.new_service_info(self._id, config)
        self._ping_response = internal.new_ping_info(self._id, config)
        # Cache the serialized ping response
        self._ping_response_message = internal.encode_ping_info(self._ping_response)
        # Internal subscriptions
        self._ping_subscriptions: list[Subscription] = []
        self._info_subscriptions: list[Subscription] = []
        self._stats_subscriptions: list[Subscription] = []

    async def start(self) -> None:
        """Start the service.

        A service MUST be started before adding endpoints.

        This will start the internal subscriptions and enable
        service discovery.
        """
        # Start PING subscriptions:
        # - $SRV.PING
        # - $SRV.{name}.PING
        # - $SRV.{name}.{id}.PING
        for subject in internal.get_internal_subjects(
            internal.ServiceVerb.PING,
            self._id,
            self._config,
            api_prefix=self._api_prefix,
        ):
            sub = await self._nc.subscribe(  # pyright: ignore[reportUnknownMemberType]
                subject,
                cb=self._handle_ping_request,
            )
            self._ping_subscriptions.append(sub)
        # Start INFO subscriptions:
        # - $SRV.INFO
        # - $SRV.{name}.INFO
        # - $SRV.{name}.{id}.INFO
        for subject in internal.get_internal_subjects(
            internal.ServiceVerb.INFO,
            self._id,
            self._config,
            api_prefix=self._api_prefix,
        ):
            sub = await self._nc.subscribe(  # pyright: ignore[reportUnknownMemberType]
                subject,
                cb=self._handle_info_request,
            )
            self._info_subscriptions.append(sub)
        # Start STATS subscriptions:
        # - $SRV.STATS
        # - $SRV.{name}.STATS
        # - $SRV.{name}.{id}.STATS
        for subject in internal.get_internal_subjects(
            internal.ServiceVerb.STATS,
            self._id,
            self._config,
            api_prefix=self._api_prefix,
        ):
            sub = await self._nc.subscribe(  # pyright: ignore[reportUnknownMemberType]
                subject,
                cb=self._handle_stats_request,
            )
            self._stats_subscriptions.append(sub)

    async def stop(self) -> None:
        """Stop the service.

        This will stop all endpoints and internal subscriptions.
        """
        self._stopped = True
        # Stop all endpoints
        await asyncio.gather(*(_stop_endpoint(ep) for ep in self._endpoints))
        # Stop all internal subscriptions
        await asyncio.gather(
            *(
                _unsubscribe(sub)
                for subscriptions in (
                    self._stats_subscriptions,
                    self._info_subscriptions,
                    self._ping_subscriptions,
                )
                for sub in subscriptions
            )
        )

    def stopped(self) -> bool:
        """Stopped informs whether [Stop] was executed on the service."""
        return self._stopped

    def info(self) -> ServiceInfo:
        """Returns the service info."""
        return self._info.copy()

    def stats(self) -> ServiceStats:
        """Returns statistics for the service endpoint and all monitoring endpoints."""
        return self._stats.copy()

    def reset(self) -> None:
        """Resets all statistics (for all endpoints) on a service instance."""

        # Internal responses
        self._stats = internal.new_service_stats(self._id, self._clock(), self._config)
        self._info = internal.new_service_info(self._id, self._config)
        self._ping_response = internal.new_ping_info(self._id, self._config)
        self._ping_response_message = internal.encode_ping_info(self._ping_response)
        # Reset all endpoints
        endpoints = list(self._endpoints)
        self._endpoints.clear()
        for ep in endpoints:
            ep.reset()
            self._endpoints.append(ep)
            self._stats.endpoints.append(ep.stats)
            self._info.endpoints.append(ep.info)

    def add_group(
        self,
        name: str,
        queue_group: str | None = None,
        pending_bytes_limit_by_endpoint: int | None = None,
        pending_msgs_limit_by_endpoint: int | None = None,
    ) -> Group:
        """Add a group to the service.

        A group is a collection of endpoints that share the same prefix,
        and the same default queue group and pending limits.

        At runtime, a group does not exist as a separate entity, only
        endpoints exist. However, groups are useful to organize endpoints
        and to set default values for queue group and pending limits.

        Args:
            name: The name of the group.
            queue_group: The default queue group of the group. When queue group is not set, it defaults to the queue group of the parent group or service.
            pending_bytes_limit_by_endpoint: The default pending bytes limit for each endpoint within the group.
            pending_msgs_limit_by_endpoint: The default pending messages limit for each endpoint within the group.
        """
        config = internal.GroupConfig(
            name=name,
            queue_group=queue_group or self._config.queue_group,
            pending_bytes_limit_by_endpoint=pending_bytes_limit_by_endpoint
            or self._config.pending_bytes_limit_by_endpoint,
            pending_msgs_limit_by_endpoint=pending_msgs_limit_by_endpoint
            or self._config.pending_msgs_limit_by_endpoint,
        )
        return Group(config, self)

    async def add_endpoint(
        self,
        name: str,
        handler: Handler,
        subject: str | None = None,
        queue_group: str | None = None,
        metadata: dict[str, str] | None = None,
        pending_bytes_limit: int | None = None,
        pending_msgs_limit: int | None = None,
        middlewares: list[Middleware] | None = None,
    ) -> Endpoint:
        """Add an endpoint to the service.

        An endpoint is a request-reply handler for a subject.

        Args:
            name: The name of the endpoint.
            handler: The handler of the endpoint.
            subject: The subject of the endpoint. When subject is not set, it defaults to the name of the endpoint.
            queue_group: The queue group of the endpoint. When queue group is not set, it defaults to the queue group of the parent group or service.
            metadata: The metadata of the endpoint.
            pending_bytes_limit: The pending bytes limit for this endpoint.
            pending_msgs_limit: The pending messages limit for this endpoint.
        """
        if self._stopped:
            raise RuntimeError("Cannot add endpoint to a stopped service")
        config = self._config.endpoint_config(
            name=name,
            handler=handler,
            subject=subject,
            queue_group=queue_group,
            metadata=metadata,
            pending_bytes_limit=pending_bytes_limit,
            pending_msgs_limit=pending_msgs_limit,
        )
        # Create the endpoint
        ep = Endpoint(config)
        # Create the endpoint handler
        subscription_handler = _create_handler(ep, middlewares)
        # Start the endpoint subscription
        subscription = (
            await self._nc.subscribe(  # pyright: ignore[reportUnknownMemberType]
                config.subject,
                queue=config.queue_group,
                cb=subscription_handler,
            )
        )
        # Attach the subscription to the endpoint
        ep._sub = subscription  # pyright: ignore[reportPrivateUsage]
        # Append the endpoint to the service
        self._endpoints.append(ep)
        # Append the endpoint to the service stats and info
        self._stats.endpoints.append(ep.stats)
        self._info.endpoints.append(ep.info)
        return ep

    async def _handle_ping_request(self, msg: Msg) -> None:
        """Handle the ping message."""
        await msg.respond(data=self._ping_response_message)

    async def _handle_info_request(self, msg: Msg) -> None:
        """Handle the info message."""
        await msg.respond(data=internal.encode_info(self._info))

    async def _handle_stats_request(self, msg: Msg) -> None:
        """Handle the stats message."""
        await msg.respond(data=internal.encode_stats(self._stats))

    async def __aenter__(self) -> Service:
        """Implement the asynchronous context manager interface."""
        await self.start()
        return self

    async def __aexit__(self, *args: object, **kwargs: object) -> None:
        """Implement the asynchronous context manager interface."""
        await self.stop()

__aenter__() async

Implement the asynchronous context manager interface.

Source code in src/nats_contrib/micro/api.py
async def __aenter__(self) -> Service:
    """Implement the asynchronous context manager interface."""
    await self.start()
    return self

__aexit__(*args, **kwargs) async

Implement the asynchronous context manager interface.

Source code in src/nats_contrib/micro/api.py
async def __aexit__(self, *args: object, **kwargs: object) -> None:
    """Implement the asynchronous context manager interface."""
    await self.stop()

add_endpoint(name, handler, subject=None, queue_group=None, metadata=None, pending_bytes_limit=None, pending_msgs_limit=None, middlewares=None) async

Add an endpoint to the service.

An endpoint is a request-reply handler for a subject.

Parameters:

Name Type Description Default
name str

The name of the endpoint.

required
handler Handler

The handler of the endpoint.

required
subject str | None

The subject of the endpoint. When subject is not set, it defaults to the name of the endpoint.

None
queue_group str | None

The queue group of the endpoint. When queue group is not set, it defaults to the queue group of the parent group or service.

None
metadata dict[str, str] | None

The metadata of the endpoint.

None
pending_bytes_limit int | None

The pending bytes limit for this endpoint.

None
pending_msgs_limit int | None

The pending messages limit for this endpoint.

None
Source code in src/nats_contrib/micro/api.py
async def add_endpoint(
    self,
    name: str,
    handler: Handler,
    subject: str | None = None,
    queue_group: str | None = None,
    metadata: dict[str, str] | None = None,
    pending_bytes_limit: int | None = None,
    pending_msgs_limit: int | None = None,
    middlewares: list[Middleware] | None = None,
) -> Endpoint:
    """Add an endpoint to the service.

    An endpoint is a request-reply handler for a subject.

    Args:
        name: The name of the endpoint.
        handler: The handler of the endpoint.
        subject: The subject of the endpoint. When subject is not set, it defaults to the name of the endpoint.
        queue_group: The queue group of the endpoint. When queue group is not set, it defaults to the queue group of the parent group or service.
        metadata: The metadata of the endpoint.
        pending_bytes_limit: The pending bytes limit for this endpoint.
        pending_msgs_limit: The pending messages limit for this endpoint.
    """
    if self._stopped:
        raise RuntimeError("Cannot add endpoint to a stopped service")
    config = self._config.endpoint_config(
        name=name,
        handler=handler,
        subject=subject,
        queue_group=queue_group,
        metadata=metadata,
        pending_bytes_limit=pending_bytes_limit,
        pending_msgs_limit=pending_msgs_limit,
    )
    # Create the endpoint
    ep = Endpoint(config)
    # Create the endpoint handler
    subscription_handler = _create_handler(ep, middlewares)
    # Start the endpoint subscription
    subscription = (
        await self._nc.subscribe(  # pyright: ignore[reportUnknownMemberType]
            config.subject,
            queue=config.queue_group,
            cb=subscription_handler,
        )
    )
    # Attach the subscription to the endpoint
    ep._sub = subscription  # pyright: ignore[reportPrivateUsage]
    # Append the endpoint to the service
    self._endpoints.append(ep)
    # Append the endpoint to the service stats and info
    self._stats.endpoints.append(ep.stats)
    self._info.endpoints.append(ep.info)
    return ep

add_group(name, queue_group=None, pending_bytes_limit_by_endpoint=None, pending_msgs_limit_by_endpoint=None)

Add a group to the service.

A group is a collection of endpoints that share the same prefix, and the same default queue group and pending limits.

At runtime, a group does not exist as a separate entity, only endpoints exist. However, groups are useful to organize endpoints and to set default values for queue group and pending limits.

Parameters:

Name Type Description Default
name str

The name of the group.

required
queue_group str | None

The default queue group of the group. When queue group is not set, it defaults to the queue group of the parent group or service.

None
pending_bytes_limit_by_endpoint int | None

The default pending bytes limit for each endpoint within the group.

None
pending_msgs_limit_by_endpoint int | None

The default pending messages limit for each endpoint within the group.

None
Source code in src/nats_contrib/micro/api.py
def add_group(
    self,
    name: str,
    queue_group: str | None = None,
    pending_bytes_limit_by_endpoint: int | None = None,
    pending_msgs_limit_by_endpoint: int | None = None,
) -> Group:
    """Add a group to the service.

    A group is a collection of endpoints that share the same prefix,
    and the same default queue group and pending limits.

    At runtime, a group does not exist as a separate entity, only
    endpoints exist. However, groups are useful to organize endpoints
    and to set default values for queue group and pending limits.

    Args:
        name: The name of the group.
        queue_group: The default queue group of the group. When queue group is not set, it defaults to the queue group of the parent group or service.
        pending_bytes_limit_by_endpoint: The default pending bytes limit for each endpoint within the group.
        pending_msgs_limit_by_endpoint: The default pending messages limit for each endpoint within the group.
    """
    config = internal.GroupConfig(
        name=name,
        queue_group=queue_group or self._config.queue_group,
        pending_bytes_limit_by_endpoint=pending_bytes_limit_by_endpoint
        or self._config.pending_bytes_limit_by_endpoint,
        pending_msgs_limit_by_endpoint=pending_msgs_limit_by_endpoint
        or self._config.pending_msgs_limit_by_endpoint,
    )
    return Group(config, self)

info()

Returns the service info.

Source code in src/nats_contrib/micro/api.py
def info(self) -> ServiceInfo:
    """Returns the service info."""
    return self._info.copy()

reset()

Resets all statistics (for all endpoints) on a service instance.

Source code in src/nats_contrib/micro/api.py
def reset(self) -> None:
    """Resets all statistics (for all endpoints) on a service instance."""

    # Internal responses
    self._stats = internal.new_service_stats(self._id, self._clock(), self._config)
    self._info = internal.new_service_info(self._id, self._config)
    self._ping_response = internal.new_ping_info(self._id, self._config)
    self._ping_response_message = internal.encode_ping_info(self._ping_response)
    # Reset all endpoints
    endpoints = list(self._endpoints)
    self._endpoints.clear()
    for ep in endpoints:
        ep.reset()
        self._endpoints.append(ep)
        self._stats.endpoints.append(ep.stats)
        self._info.endpoints.append(ep.info)

start() async

Start the service.

A service MUST be started before adding endpoints.

This will start the internal subscriptions and enable service discovery.

Source code in src/nats_contrib/micro/api.py
async def start(self) -> None:
    """Start the service.

    A service MUST be started before adding endpoints.

    This will start the internal subscriptions and enable
    service discovery.
    """
    # Start PING subscriptions:
    # - $SRV.PING
    # - $SRV.{name}.PING
    # - $SRV.{name}.{id}.PING
    for subject in internal.get_internal_subjects(
        internal.ServiceVerb.PING,
        self._id,
        self._config,
        api_prefix=self._api_prefix,
    ):
        sub = await self._nc.subscribe(  # pyright: ignore[reportUnknownMemberType]
            subject,
            cb=self._handle_ping_request,
        )
        self._ping_subscriptions.append(sub)
    # Start INFO subscriptions:
    # - $SRV.INFO
    # - $SRV.{name}.INFO
    # - $SRV.{name}.{id}.INFO
    for subject in internal.get_internal_subjects(
        internal.ServiceVerb.INFO,
        self._id,
        self._config,
        api_prefix=self._api_prefix,
    ):
        sub = await self._nc.subscribe(  # pyright: ignore[reportUnknownMemberType]
            subject,
            cb=self._handle_info_request,
        )
        self._info_subscriptions.append(sub)
    # Start STATS subscriptions:
    # - $SRV.STATS
    # - $SRV.{name}.STATS
    # - $SRV.{name}.{id}.STATS
    for subject in internal.get_internal_subjects(
        internal.ServiceVerb.STATS,
        self._id,
        self._config,
        api_prefix=self._api_prefix,
    ):
        sub = await self._nc.subscribe(  # pyright: ignore[reportUnknownMemberType]
            subject,
            cb=self._handle_stats_request,
        )
        self._stats_subscriptions.append(sub)

stats()

Returns statistics for the service endpoint and all monitoring endpoints.

Source code in src/nats_contrib/micro/api.py
def stats(self) -> ServiceStats:
    """Returns statistics for the service endpoint and all monitoring endpoints."""
    return self._stats.copy()

stop() async

Stop the service.

This will stop all endpoints and internal subscriptions.

Source code in src/nats_contrib/micro/api.py
async def stop(self) -> None:
    """Stop the service.

    This will stop all endpoints and internal subscriptions.
    """
    self._stopped = True
    # Stop all endpoints
    await asyncio.gather(*(_stop_endpoint(ep) for ep in self._endpoints))
    # Stop all internal subscriptions
    await asyncio.gather(
        *(
            _unsubscribe(sub)
            for subscriptions in (
                self._stats_subscriptions,
                self._info_subscriptions,
                self._ping_subscriptions,
            )
            for sub in subscriptions
        )
    )

stopped()

Stopped informs whether [Stop] was executed on the service.

Source code in src/nats_contrib/micro/api.py
def stopped(self) -> bool:
    """Stopped informs whether [Stop] was executed on the service."""
    return self._stopped

ServiceError

Bases: Exception

Raised when a service error is received.

Source code in src/nats_contrib/micro/client.py
class ServiceError(Exception):
    """Raised when a service error is received."""

    def __init__(
        self,
        code: int,
        description: str,
        subject: str,
        data: bytes,
        headers: dict[str, str],
    ) -> None:
        super().__init__(f"Service error {code}: {description}")
        self.code = code
        self.description = description
        self.subject = subject
        self.data = data
        self.headers = headers

ServiceInfo dataclass

Bases: Base

The information of a service.

Source code in src/nats_contrib/micro/models.py
@dataclass
class ServiceInfo(Base):
    """The information of a service."""

    name: str
    """
    The kind of the service. Shared by all the services that have the same name
    """
    id: str
    """
    A unique ID for this instance of a service
    """
    version: str
    """
    The version of the service
    """
    description: str
    """
    The description of the service supplied as configuration while creating the service
    """
    metadata: dict[str, str]
    """
    The service metadata
    """
    endpoints: list[EndpointInfo]
    """
    Information for all service endpoints
    """
    type: str = "io.nats.micro.v1.info_response"

    def copy(self) -> ServiceInfo:
        return replace(
            self,
            endpoints=[ep.copy() for ep in self.endpoints],
            metadata=self.metadata.copy(),
        )

    def as_dict(self) -> dict[str, Any]:
        """Return the object converted into an API-friendly dict."""
        result = super().as_dict()
        result["endpoints"] = [ep.as_dict() for ep in self.endpoints]
        return result

    @classmethod
    def from_response(cls, resp: dict[str, Any]) -> ServiceInfo:
        """Read the class instance from a server response.

        Unknown fields are ignored ("open-world assumption").
        """
        info = super().from_response(resp)
        info.endpoints = [EndpointInfo(**ep) for ep in resp["endpoints"]]
        return info

description: str instance-attribute

The description of the service supplied as configuration while creating the service

endpoints: list[EndpointInfo] instance-attribute

Information for all service endpoints

id: str instance-attribute

A unique ID for this instance of a service

metadata: dict[str, str] instance-attribute

The service metadata

name: str instance-attribute

The kind of the service. Shared by all the services that have the same name

version: str instance-attribute

The version of the service

as_dict()

Return the object converted into an API-friendly dict.

Source code in src/nats_contrib/micro/models.py
def as_dict(self) -> dict[str, Any]:
    """Return the object converted into an API-friendly dict."""
    result = super().as_dict()
    result["endpoints"] = [ep.as_dict() for ep in self.endpoints]
    return result

from_response(resp) classmethod

Read the class instance from a server response.

Unknown fields are ignored ("open-world assumption").

Source code in src/nats_contrib/micro/models.py
@classmethod
def from_response(cls, resp: dict[str, Any]) -> ServiceInfo:
    """Read the class instance from a server response.

    Unknown fields are ignored ("open-world assumption").
    """
    info = super().from_response(resp)
    info.endpoints = [EndpointInfo(**ep) for ep in resp["endpoints"]]
    return info

ServiceStats dataclass

Bases: Base

The statistics of a service.

Source code in src/nats_contrib/micro/models.py
@dataclass
class ServiceStats(Base):
    """The statistics of a service."""

    name: str
    """
    The kind of the service. Shared by all the services that have the same name
    """
    id: str
    """
    A unique ID for this instance of a service
    """
    version: str
    """
    The version of the service
    """
    started: datetime.datetime
    """
    The time the service was stated in RFC3339 format
    """
    endpoints: list[EndpointStats]
    """
    Statistics for each known endpoint
    """
    metadata: dict[str, str] | None = None
    """Service metadata."""

    type: str = "io.nats.micro.v1.stats_response"

    def copy(self) -> ServiceStats:
        return replace(
            self,
            endpoints=[ep.copy() for ep in self.endpoints],
            metadata=None if self.metadata is None else self.metadata.copy(),
        )

    def as_dict(self) -> dict[str, Any]:
        """Return the object converted into an API-friendly dict."""
        result = super().as_dict()
        result["endpoints"] = [ep.as_dict() for ep in self.endpoints]
        result["started"] = self._to_rfc3339(self.started)
        return result

    @classmethod
    def from_response(cls, resp: dict[str, Any]) -> ServiceStats:
        """Read the class instance from a server response.

        Unknown fields are ignored ("open-world assumption").
        """
        cls._convert_rfc3339(resp, "started")
        stats = super().from_response(resp)
        stats.endpoints = [EndpointStats.from_response(ep) for ep in resp["endpoints"]]
        return stats

endpoints: list[EndpointStats] instance-attribute

Statistics for each known endpoint

id: str instance-attribute

A unique ID for this instance of a service

metadata: dict[str, str] | None = None class-attribute instance-attribute

Service metadata.

name: str instance-attribute

The kind of the service. Shared by all the services that have the same name

started: datetime.datetime instance-attribute

The time the service was stated in RFC3339 format

version: str instance-attribute

The version of the service

as_dict()

Return the object converted into an API-friendly dict.

Source code in src/nats_contrib/micro/models.py
def as_dict(self) -> dict[str, Any]:
    """Return the object converted into an API-friendly dict."""
    result = super().as_dict()
    result["endpoints"] = [ep.as_dict() for ep in self.endpoints]
    result["started"] = self._to_rfc3339(self.started)
    return result

from_response(resp) classmethod

Read the class instance from a server response.

Unknown fields are ignored ("open-world assumption").

Source code in src/nats_contrib/micro/models.py
@classmethod
def from_response(cls, resp: dict[str, Any]) -> ServiceStats:
    """Read the class instance from a server response.

    Unknown fields are ignored ("open-world assumption").
    """
    cls._convert_rfc3339(resp, "started")
    stats = super().from_response(resp)
    stats.endpoints = [EndpointStats.from_response(ep) for ep in resp["endpoints"]]
    return stats

add_service(nc, name, version, description=None, metadata=None, queue_group=None, pending_bytes_limit_by_endpoint=None, pending_msgs_limit_by_endpoint=None, now=None, id_generator=None, api_prefix=None)

Create a new service.

A service is a collection of endpoints that are grouped together under a common name.

Each endpoint is a request-reply handler for a subject.

It's possible to add endpoints to a service after it has been created AND started.

Parameters:

Name Type Description Default
nc Client

The NATS client.

required
name str

The name of the service.

required
version str

The version of the service. Must be a valid semver version.

required
description str | None

The description of the service.

None
metadata dict[str, str] | None

The metadata of the service.

None
queue_group str | None

The default queue group of the service.

None
pending_bytes_limit_by_endpoint int | None

The default pending bytes limit for each endpoint within the service.

None
pending_msgs_limit_by_endpoint int | None

The default pending messages limit for each endpoint within the service.

None
now Callable[[], datetime] | None

The function to get the current time.

None
id_generator Callable[[], str] | None

The function to generate a unique service instance id.

None
api_prefix str | None

The prefix of the control subjects.

None
Source code in src/nats_contrib/micro/api.py
def add_service(
    nc: NatsClient,
    name: str,
    version: str,
    description: str | None = None,
    metadata: dict[str, str] | None = None,
    queue_group: str | None = None,
    pending_bytes_limit_by_endpoint: int | None = None,
    pending_msgs_limit_by_endpoint: int | None = None,
    now: Callable[[], datetime] | None = None,
    id_generator: Callable[[], str] | None = None,
    api_prefix: str | None = None,
) -> Service:
    """Create a new service.

    A service is a collection of endpoints that are grouped together
    under a common name.

    Each endpoint is a request-reply handler for a subject.

    It's possible to add endpoints to a service after it has been created AND
    started.

    Args:
        nc: The NATS client.
        name: The name of the service.
        version: The version of the service. Must be a valid semver version.
        description: The description of the service.
        metadata: The metadata of the service.
        queue_group: The default queue group of the service.
        pending_bytes_limit_by_endpoint: The default pending bytes limit for each endpoint within the service.
        pending_msgs_limit_by_endpoint: The default pending messages limit for each endpoint within the service.
        now: The function to get the current time.
        id_generator: The function to generate a unique service instance id.
        api_prefix: The prefix of the control subjects.
    """
    if id_generator is None:
        id_generator = internal.default_id_generator
    instance_id = id_generator()
    service_config = internal.ServiceConfig(
        name=name,
        version=version,
        description=description or "",
        metadata=metadata or {},
        queue_group=queue_group or DEFAULT_QUEUE_GROUP,
        pending_bytes_limit_by_endpoint=pending_bytes_limit_by_endpoint
        or DEFAULT_SUB_PENDING_BYTES_LIMIT,
        pending_msgs_limit_by_endpoint=pending_msgs_limit_by_endpoint
        or DEFAULT_SUB_PENDING_MSGS_LIMIT,
    )
    return Service(
        nc=nc,
        id=instance_id,
        config=service_config,
        api_prefix=api_prefix or API_PREFIX,
        clock=now or internal.default_clock,
    )

run(setup, /, *options, trap_signals=False, client=None)

Helper function to run an async program.

Source code in src/nats_contrib/micro/context.py
def run(
    setup: Callable[[Context], Coroutine[Any, Any, None]],
    /,
    *options: ConnectOption,
    trap_signals: bool | tuple[signal.Signals, ...] = False,
    client: NATS | None = None,
) -> None:
    """Helper function to run an async program."""

    asyncio.run(
        Context(client=client).run_forever(
            setup,
            *options,
            trap_signals=trap_signals,
        )
    )