Skip to content

Reference

ConnectOption

Base class for connect options.

A connect option is a callable which can transform a ConnectOpts object.

Source code in src/nats_contrib/connect_opts/connect_opts.py
class ConnectOption(metaclass=abc.ABCMeta):
    """Base class for connect options.

    A connect option is a callable which can transform a
    [`ConnectOpts`][nats_contrib.connect_opts.ConnectOpts] object.
    """

    @abc.abstractmethod
    def __call__(self, opts: ConnectOpts) -> None:
        raise NotImplementedError

ConnectOpts dataclass

Connect options for NATS python client.

Parameters:

Name Type Description Default
servers str | list[str]

A single server URL or a list of server URLs.

'nats://localhost:4222'
name str | None

The connection name.

None
dont_randomize bool

Disable randomizing the server list.

False
inbox_prefix str | bytes

The inbox prefix to use.

b'_INBOX'
pedantic bool

Enable pedantic mode.

False
verbose bool

Enable verbose logging.

False
no_echo bool

Disable echo.

False
connect_timeout float

The connection timeout in seconds.

2
drain_timeout float

The drain timeout in seconds.

30
allow_reconnect bool

Enable automatic reconnection.

True
max_reconnect_attempts int

The maximum number of reconnection attempts. -1 for infinite.

-1
reconnect_time_wait float

The delay between reconnection attempts in seconds.

2
ping_interval float

The ping interval in seconds.

60
max_outstanding_pings int

The maximum number of outstanding pings before closing the connection.

2
pending_size int

The maximum size of the pending queue in bytes.

1024 * 1024 * 2
flusher_queue_size int

The size of the flusher queue in number of messages.

1024
flush_timeout float | None

The flusher timeout in seconds.

None
tls SSLContext | None

The TLS context to use.

None
tls_hostname str | None

The hostname to use for TLS verification.

None
user str | None

The username to use for authentication.

None
password str | None

The password to use for authentication.

None
token str | None

The token to use for authentication.

None
user_credentials str | tuple[str, str] | None

The path to the credentials file to use for authentication.

None
nkeys_seed str | None

The nkeys seed to use for authentication.

None
signature_cb Callable[[str], bytes] | None

The callback function to sign the nonce during authentication.

None
user_jwt_cb Callable[[], bytearray | bytes] | None

The callback function to return the jwt during authentication.

None
error_cb Callable[[Exception], Awaitable[None]] | None

The callback function to call each time an error occurs.

None
disconnected_cb Callable[[], Awaitable[None]] | None

The callback function to call each time connection is lost.

None
closed_cb Callable[[], Awaitable[None]] | None

The callback function to call once connection is closed.

None
discovered_server_cb Callable[[], Awaitable[None]] | None

The callback function to call each time a new server is discovered.

None
reconnected_cb Callable[[], Awaitable[None]] | None

The callback function to call each time connection is reestablished.

None
Source code in src/nats_contrib/connect_opts/connect_opts.py
@dataclass
class ConnectOpts:
    """Connect options for NATS python client.

    Args:
        servers: A single server URL or a list of server URLs.
        name: The connection name.
        dont_randomize: Disable randomizing the server list.
        inbox_prefix: The inbox prefix to use.
        pedantic: Enable pedantic mode.
        verbose: Enable verbose logging.
        no_echo: Disable echo.
        connect_timeout: The connection timeout in seconds.
        drain_timeout: The drain timeout in seconds.
        allow_reconnect: Enable automatic reconnection.
        max_reconnect_attempts: The maximum number of reconnection attempts. `-1` for infinite.
        reconnect_time_wait: The delay between reconnection attempts in seconds.
        ping_interval: The ping interval in seconds.
        max_outstanding_pings: The maximum number of outstanding pings before closing the connection.
        pending_size: The maximum size of the pending queue in bytes.
        flusher_queue_size: The size of the flusher queue in number of messages.
        flush_timeout: The flusher timeout in seconds.
        tls: The TLS context to use.
        tls_hostname: The hostname to use for TLS verification.
        user: The username to use for authentication.
        password: The password to use for authentication.
        token: The token to use for authentication.
        user_credentials: The path to the credentials file to use for authentication.
        nkeys_seed: The nkeys seed to use for authentication.
        signature_cb: The callback function to sign the nonce during authentication.
        user_jwt_cb: The callback function to return the jwt during authentication.
        error_cb: The callback function to call each time an error occurs.
        disconnected_cb: The callback function to call each time connection is lost.
        closed_cb: The callback function to call once connection is closed.
        discovered_server_cb: The callback function to call each time a new server is discovered.
        reconnected_cb: The callback function to call each time connection is reestablished.
    """

    servers: str | list[str] = "nats://localhost:4222"
    name: str | None = None
    dont_randomize: bool = False
    inbox_prefix: str | bytes = b"_INBOX"  # Note: No trailing "." in inbox prefix
    pedantic: bool = False
    verbose: bool = False
    no_echo: bool = False
    # First connect
    connect_timeout: float = 2  # seconds
    # Drain
    drain_timeout: float = 30  # seconds
    # Reconnect
    allow_reconnect: bool = True
    max_reconnect_attempts: int = -1  # -1 for infinite
    reconnect_time_wait: float = 2  # seconds
    # PingPong
    ping_interval: float = 60  # seconds
    max_outstanding_pings: int = 2
    # Pending queue
    pending_size: int = 1024 * 1024 * 2  # bytes (2MiB)
    # Flusher
    flusher_queue_size: int = 1024
    flush_timeout: float | None = None
    # tls
    tls: ssl.SSLContext | None = None
    tls_hostname: str | None = None
    # Auth
    user: str | None = None
    password: str | None = None
    token: str | None = None
    user_credentials: str | tuple[str, str] | None = None
    nkeys_seed: str | None = None
    signature_cb: Callable[[str], bytes] | None = None
    user_jwt_cb: Callable[[], bytearray | bytes] | None = None
    # Connection state callbacks
    error_cb: Callable[[Exception], Awaitable[None]] | None = None
    disconnected_cb: Callable[[], Awaitable[None]] | None = None
    closed_cb: Callable[[], Awaitable[None]] | None = None
    discovered_server_cb: Callable[[], Awaitable[None]] | None = None
    reconnected_cb: Callable[[], Awaitable[None]] | None = None

    def to_dict(self) -> dict[str, Any]:
        ctx = self.tls
        self.tls = None
        opts = asdict(self)
        if ctx:
            opts["tls"] = ctx
        return opts

    @classmethod
    def from_dict(cls, opts: dict[str, Any]) -> ConnectOpts:
        return cls(**opts)