Coverage for src/nats_contrib/connect_opts/connect_opts.py: 88%
258 statements
« prev ^ index » next coverage.py v7.4.2, created at 2024-02-26 14:37 +0100
« prev ^ index » next coverage.py v7.4.2, created at 2024-02-26 14:37 +0100
1"""Connect options for NATS python client."""
3from __future__ import annotations
5import abc
6import ssl
7from dataclasses import asdict, dataclass
8from pathlib import Path
9from typing import Any, Awaitable, Callable
11try:
12 import nkeys # type: ignore
14 __NKEYS_AVAILABLE__ = True
15except ModuleNotFoundError:
16 __NKEYS_AVAILABLE__ = False # type: ignore
19@dataclass
20class ConnectOpts:
21 """Connect options for NATS python client.
23 Args:
24 servers: A single server URL or a list of server URLs.
25 name: The connection name.
26 dont_randomize: Disable randomizing the server list.
27 inbox_prefix: The inbox prefix to use.
28 pedantic: Enable pedantic mode.
29 verbose: Enable verbose logging.
30 no_echo: Disable echo.
31 connect_timeout: The connection timeout in seconds.
32 drain_timeout: The drain timeout in seconds.
33 allow_reconnect: Enable automatic reconnection.
34 max_reconnect_attempts: The maximum number of reconnection attempts. `-1` for infinite.
35 reconnect_time_wait: The delay between reconnection attempts in seconds.
36 ping_interval: The ping interval in seconds.
37 max_outstanding_pings: The maximum number of outstanding pings before closing the connection.
38 pending_size: The maximum size of the pending queue in bytes.
39 flusher_queue_size: The size of the flusher queue in number of messages.
40 flush_timeout: The flusher timeout in seconds.
41 tls: The TLS context to use.
42 tls_hostname: The hostname to use for TLS verification.
43 user: The username to use for authentication.
44 password: The password to use for authentication.
45 token: The token to use for authentication.
46 user_credentials: The path to the credentials file to use for authentication.
47 nkeys_seed: The nkeys seed to use for authentication.
48 signature_cb: The callback function to sign the nonce during authentication.
49 user_jwt_cb: The callback function to return the jwt during authentication.
50 error_cb: The callback function to call each time an error occurs.
51 disconnected_cb: The callback function to call each time connection is lost.
52 closed_cb: The callback function to call once connection is closed.
53 discovered_server_cb: The callback function to call each time a new server is discovered.
54 reconnected_cb: The callback function to call each time connection is reestablished.
55 """
57 servers: str | list[str] = "nats://localhost:4222"
58 name: str | None = None
59 dont_randomize: bool = False
60 inbox_prefix: str | bytes = b"_INBOX" # Note: No trailing "." in inbox prefix
61 pedantic: bool = False
62 verbose: bool = False
63 no_echo: bool = False
64 # First connect
65 connect_timeout: float = 2 # seconds
66 # Drain
67 drain_timeout: float = 30 # seconds
68 # Reconnect
69 allow_reconnect: bool = True
70 max_reconnect_attempts: int = -1 # -1 for infinite
71 reconnect_time_wait: float = 2 # seconds
72 # PingPong
73 ping_interval: float = 60 # seconds
74 max_outstanding_pings: int = 2
75 # Pending queue
76 pending_size: int = 1024 * 1024 * 2 # bytes (2MiB)
77 # Flusher
78 flusher_queue_size: int = 1024
79 flush_timeout: float | None = None
80 # tls
81 tls: ssl.SSLContext | None = None
82 tls_hostname: str | None = None
83 # Auth
84 user: str | None = None
85 password: str | None = None
86 token: str | None = None
87 user_credentials: str | tuple[str, str] | None = None
88 nkeys_seed: str | None = None
89 signature_cb: Callable[[str], bytes] | None = None
90 user_jwt_cb: Callable[[], bytearray | bytes] | None = None
91 # Connection state callbacks
92 error_cb: Callable[[Exception], Awaitable[None]] | None = None
93 disconnected_cb: Callable[[], Awaitable[None]] | None = None
94 closed_cb: Callable[[], Awaitable[None]] | None = None
95 discovered_server_cb: Callable[[], Awaitable[None]] | None = None
96 reconnected_cb: Callable[[], Awaitable[None]] | None = None
98 def to_dict(self) -> dict[str, Any]:
99 ctx = self.tls
100 self.tls = None
101 opts = asdict(self)
102 if ctx:
103 opts["tls"] = ctx
104 return opts
106 @classmethod
107 def from_dict(cls, opts: dict[str, Any]) -> ConnectOpts:
108 return cls(**opts)
111class ConnectOption(metaclass=abc.ABCMeta):
112 """Base class for connect options.
114 A connect option is a callable which can transform a
115 [`ConnectOpts`][nats_contrib.connect_opts.ConnectOpts] object.
116 """
118 @abc.abstractmethod
119 def __call__(self, opts: ConnectOpts) -> None:
120 raise NotImplementedError
123@dataclass
124class WithServer(ConnectOption):
125 """Connect option to specify the server URL.
127 Args:
128 url: The server URL to connect to.
129 """
131 url: str
133 def __call__(self, opts: ConnectOpts) -> None:
134 opts.servers = self.url
137@dataclass
138class WithServers(ConnectOption):
139 """Connect option to specify the server URLs.
141 Args:
142 urls: The server URLs to connect to.
143 """
145 urls: list[str]
147 def __call__(self, opts: ConnectOpts) -> None:
148 opts.servers = self.urls
151@dataclass
152class WithConnectionName(ConnectOption):
153 """Connect option to specify the connection name.
155 Args:
156 name: The connection name to use.
157 """
159 name: str
161 def __call__(self, opts: ConnectOpts) -> None:
162 opts.name = self.name
165class WithDeterministicServers(ConnectOption):
166 """Connect option to disable randomizing the server list."""
168 def __call__(self, opts: ConnectOpts) -> None:
169 opts.dont_randomize = True
172@dataclass
173class WithInboxPrefix(ConnectOption):
174 """Connect option to specify the inbox prefix.
176 Args:
177 prefix: The inbox prefix to use.
178 """
180 prefix: str | bytes
182 def __call__(self, opts: ConnectOpts) -> None:
183 if isinstance(self.prefix, str): 183 ↛ 186line 183 didn't jump to line 186, because the condition on line 183 was never false
184 opts.inbox_prefix = self.prefix.encode("utf-8")
185 else:
186 opts.inbox_prefix = self.prefix
189class WithPedanticMode(ConnectOption):
190 """Connect option to enable pedantic mode."""
192 def __call__(self, opts: ConnectOpts) -> None:
193 opts.pedantic = True
196class WithVerboseLogging(ConnectOption):
197 """Connect option to enable verbose logging."""
199 def __call__(self, opts: ConnectOpts) -> None:
200 opts.verbose = True
203class WithNoEcho(ConnectOption):
204 """Connect option to disable echo."""
206 def __call__(self, opts: ConnectOpts) -> None:
207 opts.no_echo = True
210@dataclass
211class WithConnectTimeout(ConnectOption):
212 """Connect option to specify the connection timeout.
214 Args:
215 timeout: The connection timeout in seconds.
216 """
218 timeout: float
220 def __call__(self, opts: ConnectOpts) -> None:
221 opts.connect_timeout = self.timeout
224@dataclass
225class WithDrainTimeout(ConnectOption):
226 """Connect option to specify the drain timeout.
228 Args:
229 timeout: The drain timeout in seconds.
230 """
232 timeout: float
234 def __call__(self, opts: ConnectOpts) -> None:
235 opts.drain_timeout = self.timeout
238@dataclass
239class WithDisallowReconnect(ConnectOption):
240 """Connect option to disable automatic reconnection."""
242 def __call__(self, opts: ConnectOpts) -> None:
243 opts.allow_reconnect = False
246@dataclass
247class WithAllowReconnect(ConnectOption):
248 """Connect option to enable automatic reconnection.
250 The default is to allow reconnection, so this option is only needed
251 to override a previous [`WithDisallowReconnect`][nats_contrib.connect_opts.WithDisallowReconnect]
252 connect option or to configure the reconnection options.
254 Args:
255 max_attempts: The maximum number of reconnection attempts. `-1` for infinite.
256 delay_seconds: The delay between reconnection attempts in seconds.
257 """
259 max_attempts: int = -1
260 delay_seconds: float = 2
262 def __call__(self, opts: ConnectOpts) -> None:
263 opts.allow_reconnect = True
264 opts.max_reconnect_attempts = self.max_attempts
265 opts.reconnect_time_wait = self.delay_seconds
268@dataclass
269class WithPingPong(ConnectOption):
270 """Connect option to configure ping/pong.
272 Args:
273 interval: The ping interval in seconds.
274 max_outstanding: The maximum number of outstanding pings before closing the connection.
275 """
277 interval: float = 60
278 max_outstanding: int = 2
280 def __call__(self, opts: ConnectOpts) -> None:
281 opts.ping_interval = self.interval
282 opts.max_outstanding_pings = self.max_outstanding
285@dataclass
286class WithPendingQueue(ConnectOption):
287 """Connect option to configure the pending queue.
289 Args:
290 max_bytes: The maximum size of the pending queue in bytes.
291 """
293 max_bytes: int = 1024 * 1024 * 2 # bytes (2MiB)
295 def __call__(self, opts: ConnectOpts) -> None:
296 opts.pending_size = self.max_bytes
299@dataclass
300class WithFlusher(ConnectOption):
301 """Connect option to configure the flusher.
303 Args:
304 queue_size: The size of the flusher queue in number of messages.
305 timeout_seconds: The flusher timeout in seconds.
306 """
308 queue_size: int = 1024
309 timeout_seconds: float = 10
311 def __call__(self, opts: ConnectOpts) -> None:
312 opts.flusher_queue_size = self.queue_size
313 opts.flush_timeout = self.timeout_seconds
316@dataclass
317class WithTLSCertificate(ConnectOption):
318 """Connect option to configure client TLS certficiate.
320 Args:
321 cert_file: The path to the client certificate file.
322 key_file: The path to the client key file.
323 ca_file: The path to the CA certificate file.
324 key_file_password: The password for the client key file.
325 hostname: The hostname to use for TLS verification.
326 """
328 cert_file: str
329 key_file: str
330 ca_file: str | None = None
331 key_file_password: str | None = None
332 hostname: str | None = None
334 def __call__(self, opts: ConnectOpts) -> None:
335 if self.ca_file: 335 ↛ 341line 335 didn't jump to line 341, because the condition on line 335 was never false
336 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
337 context.load_verify_locations(
338 self.ca_file,
339 )
340 else:
341 context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
342 context.load_cert_chain(
343 self.cert_file,
344 self.key_file,
345 self.key_file_password,
346 )
347 opts.tls = context
348 if self.hostname: 348 ↛ 349line 348 didn't jump to line 349, because the condition on line 348 was never true
349 opts.tls_hostname = self.hostname
352@dataclass
353class WithUserPassword(ConnectOption):
354 """Connect option to configure user/password authentication.
356 Args:
357 user: The username.
358 password: The password.
359 """
361 user: str
362 password: str
364 def __call__(self, opts: ConnectOpts) -> None:
365 opts.user = self.user
366 opts.password = self.password
369@dataclass
370class WithUsername(ConnectOption):
371 """Connect option to configure username authentication.
373 Args:
374 user: The username.
375 """
377 user: str
379 def __call__(self, opts: ConnectOpts) -> None:
380 opts.user = self.user
383@dataclass
384class WithPassword(ConnectOption):
385 """Connect option to configure password authentication.
387 Args:
388 password: The password.
389 """
391 password: str
393 def __call__(self, opts: ConnectOpts) -> None:
394 opts.password = self.password
397@dataclass
398class WithToken(ConnectOption):
399 """Connect option to configure token authentication.
401 Args:
402 token: The token.
403 """
405 token: str
407 def __call__(self, opts: ConnectOpts) -> None:
408 opts.token = self.token
411@dataclass
412class WithCredentialsFile(ConnectOption):
413 """Connect option to configure user credentials (concatenated user jwt + nkeys seed).
415 Args:
416 filepath: The path to the credentials file.
417 """
419 filepath: str
421 def __call__(self, opts: ConnectOpts) -> None:
422 path = Path(self.filepath).expanduser().resolve()
423 if not path.is_file():
424 raise FileNotFoundError(f"Credentials file not found: {path}")
425 opts.user_credentials = path.as_posix()
428@dataclass
429class WithNKeySeed(ConnectOption):
430 """Connect option to configure nkeys authentication.
432 Args:
433 seed: The nkeys seed.
434 """
436 seed: str
438 def __call__(self, opts: ConnectOpts) -> None:
439 opts.nkeys_seed = self.seed
442@dataclass
443class WithNKeyFile(ConnectOption):
444 """Connect option to configure nkeys authentication.
446 Args:
447 filepath: The path to the nkeys seed file.
448 """
450 filepath: str
452 def __call__(self, opts: ConnectOpts) -> None:
453 path = Path(self.filepath).expanduser().resolve()
454 if not path.is_file():
455 raise FileNotFoundError(f"NKey file not found: {path}")
456 opts.nkeys_seed = path.read_text()
459@dataclass
460class WithSignatureCallback(ConnectOption):
461 """Connect option to configure nkeys authentication.
463 Args:
464 callback: The callback function to sign the nonce.
465 """
467 callback: Callable[[str], bytes]
469 def __call__(self, opts: ConnectOpts) -> None:
470 opts.signature_cb = self.callback
473@dataclass
474class WithUserJwtCallback(ConnectOption):
475 """Connect option to configure jwt authentication.
477 Args:
478 callback: The callback function to return the jwt.
479 """
481 callback: Callable[[], bytearray | bytes]
483 def __call__(self, opts: ConnectOpts) -> None:
484 opts.user_jwt_cb = self.callback
487@dataclass
488class WithNKeySeedAndJwt(ConnectOption):
489 """Connect option to configure user credentials.
491 Args:
492 seed: The nkeys seed.
493 jwt: The user jwt.
494 """
496 seed: str
497 jwt: str
499 def __call__(self, opts: ConnectOpts) -> None:
500 if not __NKEYS_AVAILABLE__:
501 raise ModuleNotFoundError("nkeys module not installed")
502 nkey = nkeys.from_seed(self.seed.encode()) # type: ignore
503 opts.signature_cb = lambda nonce: nkey.sign(nonce.encode()) # type: ignore
504 opts.user_jwt_cb = lambda: self.jwt.encode()
507@dataclass
508class WithNkeyFileAndJwtFile(ConnectOption):
509 """Connect option to configure user credentials.
511 Args:
512 nkey_file: The path to the nkeys seed file.
513 jwt_file: The path to the user jwt file.
514 """
516 nkey_file: str
517 jwt_file: str
519 def __call__(self, opts: ConnectOpts) -> None:
520 return WithNKeySeedAndJwt(
521 Path(self.nkey_file).read_text(),
522 Path(self.jwt_file).read_text(),
523 ).__call__(opts)
526@dataclass
527class WithErrorCallback(ConnectOption):
528 """Connect option to configure the error callback.
530 Args:
531 callback: The callback function to call each time an error occurs.
532 """
534 callback: Callable[[Exception], Awaitable[None]]
536 def __call__(self, opts: ConnectOpts) -> None:
537 opts.error_cb = self.callback
540@dataclass
541class WithDisconnectedCallback(ConnectOption):
542 """Connect option to configure the disconnection callback.
544 Args:
545 callback: The callback function to call each time connection is lost.
546 """
548 callback: Callable[[], Awaitable[None]]
550 def __call__(self, opts: ConnectOpts) -> None:
551 opts.disconnected_cb = self.callback
554@dataclass
555class WithReconnectedCallback(ConnectOption):
556 """Connect option to configure the reconnection callback.
558 Args:
559 callback: The callback function to call each time connection is reestablished.
560 """
562 callback: Callable[[], Awaitable[None]]
564 def __call__(self, opts: ConnectOpts) -> None:
565 opts.reconnected_cb = self.callback
568@dataclass
569class WithConnectionClosedCallback(ConnectOption):
570 """Connect option to configure the connection closed callback.
572 Args:
573 callback: The callback function to call once connection is closed.
574 """
576 callback: Callable[[], Awaitable[None]]
578 def __call__(self, opts: ConnectOpts) -> None:
579 opts.closed_cb = self.callback
582@dataclass
583class WithServerDiscoveredCallback(ConnectOption):
584 """Connect option to configure the server discovered callback.
586 Args:
587 callback: The callback function to call each time a new server is discovered.
588 """
590 callback: Callable[[], Awaitable[None]]
592 def __call__(self, opts: ConnectOpts) -> None:
593 opts.discovered_server_cb = self.callback
596@dataclass
597class WithCallbacks(ConnectOption):
598 """Connect option to configure all connection state callbacks.
600 Args:
601 on_error: The callback function to call each time an error occurs.
602 on_disconnection: The callback function to call each time connection is lost.
603 on_connection_closed: The callback function to call once connection is closed.
604 on_server_discovered: The callback function to call each time a new server is discovered.
605 on_reconnection: The callback function to call each time connection is reestablished.
606 """
608 on_error: Callable[[Exception], Awaitable[None]] | None = None
609 on_disconnection: Callable[[], Awaitable[None]] | None = None
610 on_connection_closed: Callable[[], Awaitable[None]] | None = None
611 on_server_discovered: Callable[[], Awaitable[None]] | None = None
612 on_reconnection: Callable[[], Awaitable[None]] | None = None
614 def __call__(self, opts: ConnectOpts) -> None:
615 if self.on_error: 615 ↛ 617line 615 didn't jump to line 617, because the condition on line 615 was never false
616 opts.error_cb = self.on_error
617 if self.on_disconnection: 617 ↛ 619line 617 didn't jump to line 619, because the condition on line 617 was never false
618 opts.disconnected_cb = self.on_disconnection
619 if self.on_connection_closed: 619 ↛ 621line 619 didn't jump to line 621, because the condition on line 619 was never false
620 opts.closed_cb = self.on_connection_closed
621 if self.on_server_discovered: 621 ↛ 622line 621 didn't jump to line 622, because the condition on line 621 was never true
622 opts.discovered_server_cb = self.on_server_discovered
623 if self.on_reconnection: 623 ↛ exitline 623 didn't return from function '__call__', because the condition on line 623 was never false
624 opts.reconnected_cb = self.on_reconnection