Coverage for src/nats_contrib/connect_opts/connect_opts.py: 88%

258 statements  

« prev     ^ index     » next       coverage.py v7.4.2, created at 2024-02-26 14:37 +0100

1"""Connect options for NATS python client.""" 

2 

3from __future__ import annotations 

4 

5import abc 

6import ssl 

7from dataclasses import asdict, dataclass 

8from pathlib import Path 

9from typing import Any, Awaitable, Callable 

10 

11try: 

12 import nkeys # type: ignore 

13 

14 __NKEYS_AVAILABLE__ = True 

15except ModuleNotFoundError: 

16 __NKEYS_AVAILABLE__ = False # type: ignore 

17 

18 

19@dataclass 

20class ConnectOpts: 

21 """Connect options for NATS python client. 

22 

23 Args: 

24 servers: A single server URL or a list of server URLs. 

25 name: The connection name. 

26 dont_randomize: Disable randomizing the server list. 

27 inbox_prefix: The inbox prefix to use. 

28 pedantic: Enable pedantic mode. 

29 verbose: Enable verbose logging. 

30 no_echo: Disable echo. 

31 connect_timeout: The connection timeout in seconds. 

32 drain_timeout: The drain timeout in seconds. 

33 allow_reconnect: Enable automatic reconnection. 

34 max_reconnect_attempts: The maximum number of reconnection attempts. `-1` for infinite. 

35 reconnect_time_wait: The delay between reconnection attempts in seconds. 

36 ping_interval: The ping interval in seconds. 

37 max_outstanding_pings: The maximum number of outstanding pings before closing the connection. 

38 pending_size: The maximum size of the pending queue in bytes. 

39 flusher_queue_size: The size of the flusher queue in number of messages. 

40 flush_timeout: The flusher timeout in seconds. 

41 tls: The TLS context to use. 

42 tls_hostname: The hostname to use for TLS verification. 

43 user: The username to use for authentication. 

44 password: The password to use for authentication. 

45 token: The token to use for authentication. 

46 user_credentials: The path to the credentials file to use for authentication. 

47 nkeys_seed: The nkeys seed to use for authentication. 

48 signature_cb: The callback function to sign the nonce during authentication. 

49 user_jwt_cb: The callback function to return the jwt during authentication. 

50 error_cb: The callback function to call each time an error occurs. 

51 disconnected_cb: The callback function to call each time connection is lost. 

52 closed_cb: The callback function to call once connection is closed. 

53 discovered_server_cb: The callback function to call each time a new server is discovered. 

54 reconnected_cb: The callback function to call each time connection is reestablished. 

55 """ 

56 

57 servers: str | list[str] = "nats://localhost:4222" 

58 name: str | None = None 

59 dont_randomize: bool = False 

60 inbox_prefix: str | bytes = b"_INBOX" # Note: No trailing "." in inbox prefix 

61 pedantic: bool = False 

62 verbose: bool = False 

63 no_echo: bool = False 

64 # First connect 

65 connect_timeout: float = 2 # seconds 

66 # Drain 

67 drain_timeout: float = 30 # seconds 

68 # Reconnect 

69 allow_reconnect: bool = True 

70 max_reconnect_attempts: int = -1 # -1 for infinite 

71 reconnect_time_wait: float = 2 # seconds 

72 # PingPong 

73 ping_interval: float = 60 # seconds 

74 max_outstanding_pings: int = 2 

75 # Pending queue 

76 pending_size: int = 1024 * 1024 * 2 # bytes (2MiB) 

77 # Flusher 

78 flusher_queue_size: int = 1024 

79 flush_timeout: float | None = None 

80 # tls 

81 tls: ssl.SSLContext | None = None 

82 tls_hostname: str | None = None 

83 # Auth 

84 user: str | None = None 

85 password: str | None = None 

86 token: str | None = None 

87 user_credentials: str | tuple[str, str] | None = None 

88 nkeys_seed: str | None = None 

89 signature_cb: Callable[[str], bytes] | None = None 

90 user_jwt_cb: Callable[[], bytearray | bytes] | None = None 

91 # Connection state callbacks 

92 error_cb: Callable[[Exception], Awaitable[None]] | None = None 

93 disconnected_cb: Callable[[], Awaitable[None]] | None = None 

94 closed_cb: Callable[[], Awaitable[None]] | None = None 

95 discovered_server_cb: Callable[[], Awaitable[None]] | None = None 

96 reconnected_cb: Callable[[], Awaitable[None]] | None = None 

97 

98 def to_dict(self) -> dict[str, Any]: 

99 ctx = self.tls 

100 self.tls = None 

101 opts = asdict(self) 

102 if ctx: 

103 opts["tls"] = ctx 

104 return opts 

105 

106 @classmethod 

107 def from_dict(cls, opts: dict[str, Any]) -> ConnectOpts: 

108 return cls(**opts) 

109 

110 

111class ConnectOption(metaclass=abc.ABCMeta): 

112 """Base class for connect options. 

113 

114 A connect option is a callable which can transform a 

115 [`ConnectOpts`][nats_contrib.connect_opts.ConnectOpts] object. 

116 """ 

117 

118 @abc.abstractmethod 

119 def __call__(self, opts: ConnectOpts) -> None: 

120 raise NotImplementedError 

121 

122 

123@dataclass 

124class WithServer(ConnectOption): 

125 """Connect option to specify the server URL. 

126 

127 Args: 

128 url: The server URL to connect to. 

129 """ 

130 

131 url: str 

132 

133 def __call__(self, opts: ConnectOpts) -> None: 

134 opts.servers = self.url 

135 

136 

137@dataclass 

138class WithServers(ConnectOption): 

139 """Connect option to specify the server URLs. 

140 

141 Args: 

142 urls: The server URLs to connect to. 

143 """ 

144 

145 urls: list[str] 

146 

147 def __call__(self, opts: ConnectOpts) -> None: 

148 opts.servers = self.urls 

149 

150 

151@dataclass 

152class WithConnectionName(ConnectOption): 

153 """Connect option to specify the connection name. 

154 

155 Args: 

156 name: The connection name to use. 

157 """ 

158 

159 name: str 

160 

161 def __call__(self, opts: ConnectOpts) -> None: 

162 opts.name = self.name 

163 

164 

165class WithDeterministicServers(ConnectOption): 

166 """Connect option to disable randomizing the server list.""" 

167 

168 def __call__(self, opts: ConnectOpts) -> None: 

169 opts.dont_randomize = True 

170 

171 

172@dataclass 

173class WithInboxPrefix(ConnectOption): 

174 """Connect option to specify the inbox prefix. 

175 

176 Args: 

177 prefix: The inbox prefix to use. 

178 """ 

179 

180 prefix: str | bytes 

181 

182 def __call__(self, opts: ConnectOpts) -> None: 

183 if isinstance(self.prefix, str): 183 ↛ 186line 183 didn't jump to line 186, because the condition on line 183 was never false

184 opts.inbox_prefix = self.prefix.encode("utf-8") 

185 else: 

186 opts.inbox_prefix = self.prefix 

187 

188 

189class WithPedanticMode(ConnectOption): 

190 """Connect option to enable pedantic mode.""" 

191 

192 def __call__(self, opts: ConnectOpts) -> None: 

193 opts.pedantic = True 

194 

195 

196class WithVerboseLogging(ConnectOption): 

197 """Connect option to enable verbose logging.""" 

198 

199 def __call__(self, opts: ConnectOpts) -> None: 

200 opts.verbose = True 

201 

202 

203class WithNoEcho(ConnectOption): 

204 """Connect option to disable echo.""" 

205 

206 def __call__(self, opts: ConnectOpts) -> None: 

207 opts.no_echo = True 

208 

209 

210@dataclass 

211class WithConnectTimeout(ConnectOption): 

212 """Connect option to specify the connection timeout. 

213 

214 Args: 

215 timeout: The connection timeout in seconds. 

216 """ 

217 

218 timeout: float 

219 

220 def __call__(self, opts: ConnectOpts) -> None: 

221 opts.connect_timeout = self.timeout 

222 

223 

224@dataclass 

225class WithDrainTimeout(ConnectOption): 

226 """Connect option to specify the drain timeout. 

227 

228 Args: 

229 timeout: The drain timeout in seconds. 

230 """ 

231 

232 timeout: float 

233 

234 def __call__(self, opts: ConnectOpts) -> None: 

235 opts.drain_timeout = self.timeout 

236 

237 

238@dataclass 

239class WithDisallowReconnect(ConnectOption): 

240 """Connect option to disable automatic reconnection.""" 

241 

242 def __call__(self, opts: ConnectOpts) -> None: 

243 opts.allow_reconnect = False 

244 

245 

246@dataclass 

247class WithAllowReconnect(ConnectOption): 

248 """Connect option to enable automatic reconnection. 

249 

250 The default is to allow reconnection, so this option is only needed 

251 to override a previous [`WithDisallowReconnect`][nats_contrib.connect_opts.WithDisallowReconnect] 

252 connect option or to configure the reconnection options. 

253 

254 Args: 

255 max_attempts: The maximum number of reconnection attempts. `-1` for infinite. 

256 delay_seconds: The delay between reconnection attempts in seconds. 

257 """ 

258 

259 max_attempts: int = -1 

260 delay_seconds: float = 2 

261 

262 def __call__(self, opts: ConnectOpts) -> None: 

263 opts.allow_reconnect = True 

264 opts.max_reconnect_attempts = self.max_attempts 

265 opts.reconnect_time_wait = self.delay_seconds 

266 

267 

268@dataclass 

269class WithPingPong(ConnectOption): 

270 """Connect option to configure ping/pong. 

271 

272 Args: 

273 interval: The ping interval in seconds. 

274 max_outstanding: The maximum number of outstanding pings before closing the connection. 

275 """ 

276 

277 interval: float = 60 

278 max_outstanding: int = 2 

279 

280 def __call__(self, opts: ConnectOpts) -> None: 

281 opts.ping_interval = self.interval 

282 opts.max_outstanding_pings = self.max_outstanding 

283 

284 

285@dataclass 

286class WithPendingQueue(ConnectOption): 

287 """Connect option to configure the pending queue. 

288 

289 Args: 

290 max_bytes: The maximum size of the pending queue in bytes. 

291 """ 

292 

293 max_bytes: int = 1024 * 1024 * 2 # bytes (2MiB) 

294 

295 def __call__(self, opts: ConnectOpts) -> None: 

296 opts.pending_size = self.max_bytes 

297 

298 

299@dataclass 

300class WithFlusher(ConnectOption): 

301 """Connect option to configure the flusher. 

302 

303 Args: 

304 queue_size: The size of the flusher queue in number of messages. 

305 timeout_seconds: The flusher timeout in seconds. 

306 """ 

307 

308 queue_size: int = 1024 

309 timeout_seconds: float = 10 

310 

311 def __call__(self, opts: ConnectOpts) -> None: 

312 opts.flusher_queue_size = self.queue_size 

313 opts.flush_timeout = self.timeout_seconds 

314 

315 

316@dataclass 

317class WithTLSCertificate(ConnectOption): 

318 """Connect option to configure client TLS certficiate. 

319 

320 Args: 

321 cert_file: The path to the client certificate file. 

322 key_file: The path to the client key file. 

323 ca_file: The path to the CA certificate file. 

324 key_file_password: The password for the client key file. 

325 hostname: The hostname to use for TLS verification. 

326 """ 

327 

328 cert_file: str 

329 key_file: str 

330 ca_file: str | None = None 

331 key_file_password: str | None = None 

332 hostname: str | None = None 

333 

334 def __call__(self, opts: ConnectOpts) -> None: 

335 if self.ca_file: 335 ↛ 341line 335 didn't jump to line 341, because the condition on line 335 was never false

336 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 

337 context.load_verify_locations( 

338 self.ca_file, 

339 ) 

340 else: 

341 context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) 

342 context.load_cert_chain( 

343 self.cert_file, 

344 self.key_file, 

345 self.key_file_password, 

346 ) 

347 opts.tls = context 

348 if self.hostname: 348 ↛ 349line 348 didn't jump to line 349, because the condition on line 348 was never true

349 opts.tls_hostname = self.hostname 

350 

351 

352@dataclass 

353class WithUserPassword(ConnectOption): 

354 """Connect option to configure user/password authentication. 

355 

356 Args: 

357 user: The username. 

358 password: The password. 

359 """ 

360 

361 user: str 

362 password: str 

363 

364 def __call__(self, opts: ConnectOpts) -> None: 

365 opts.user = self.user 

366 opts.password = self.password 

367 

368 

369@dataclass 

370class WithUsername(ConnectOption): 

371 """Connect option to configure username authentication. 

372 

373 Args: 

374 user: The username. 

375 """ 

376 

377 user: str 

378 

379 def __call__(self, opts: ConnectOpts) -> None: 

380 opts.user = self.user 

381 

382 

383@dataclass 

384class WithPassword(ConnectOption): 

385 """Connect option to configure password authentication. 

386 

387 Args: 

388 password: The password. 

389 """ 

390 

391 password: str 

392 

393 def __call__(self, opts: ConnectOpts) -> None: 

394 opts.password = self.password 

395 

396 

397@dataclass 

398class WithToken(ConnectOption): 

399 """Connect option to configure token authentication. 

400 

401 Args: 

402 token: The token. 

403 """ 

404 

405 token: str 

406 

407 def __call__(self, opts: ConnectOpts) -> None: 

408 opts.token = self.token 

409 

410 

411@dataclass 

412class WithCredentialsFile(ConnectOption): 

413 """Connect option to configure user credentials (concatenated user jwt + nkeys seed). 

414 

415 Args: 

416 filepath: The path to the credentials file. 

417 """ 

418 

419 filepath: str 

420 

421 def __call__(self, opts: ConnectOpts) -> None: 

422 path = Path(self.filepath).expanduser().resolve() 

423 if not path.is_file(): 

424 raise FileNotFoundError(f"Credentials file not found: {path}") 

425 opts.user_credentials = path.as_posix() 

426 

427 

428@dataclass 

429class WithNKeySeed(ConnectOption): 

430 """Connect option to configure nkeys authentication. 

431 

432 Args: 

433 seed: The nkeys seed. 

434 """ 

435 

436 seed: str 

437 

438 def __call__(self, opts: ConnectOpts) -> None: 

439 opts.nkeys_seed = self.seed 

440 

441 

442@dataclass 

443class WithNKeyFile(ConnectOption): 

444 """Connect option to configure nkeys authentication. 

445 

446 Args: 

447 filepath: The path to the nkeys seed file. 

448 """ 

449 

450 filepath: str 

451 

452 def __call__(self, opts: ConnectOpts) -> None: 

453 path = Path(self.filepath).expanduser().resolve() 

454 if not path.is_file(): 

455 raise FileNotFoundError(f"NKey file not found: {path}") 

456 opts.nkeys_seed = path.read_text() 

457 

458 

459@dataclass 

460class WithSignatureCallback(ConnectOption): 

461 """Connect option to configure nkeys authentication. 

462 

463 Args: 

464 callback: The callback function to sign the nonce. 

465 """ 

466 

467 callback: Callable[[str], bytes] 

468 

469 def __call__(self, opts: ConnectOpts) -> None: 

470 opts.signature_cb = self.callback 

471 

472 

473@dataclass 

474class WithUserJwtCallback(ConnectOption): 

475 """Connect option to configure jwt authentication. 

476 

477 Args: 

478 callback: The callback function to return the jwt. 

479 """ 

480 

481 callback: Callable[[], bytearray | bytes] 

482 

483 def __call__(self, opts: ConnectOpts) -> None: 

484 opts.user_jwt_cb = self.callback 

485 

486 

487@dataclass 

488class WithNKeySeedAndJwt(ConnectOption): 

489 """Connect option to configure user credentials. 

490 

491 Args: 

492 seed: The nkeys seed. 

493 jwt: The user jwt. 

494 """ 

495 

496 seed: str 

497 jwt: str 

498 

499 def __call__(self, opts: ConnectOpts) -> None: 

500 if not __NKEYS_AVAILABLE__: 

501 raise ModuleNotFoundError("nkeys module not installed") 

502 nkey = nkeys.from_seed(self.seed.encode()) # type: ignore 

503 opts.signature_cb = lambda nonce: nkey.sign(nonce.encode()) # type: ignore 

504 opts.user_jwt_cb = lambda: self.jwt.encode() 

505 

506 

507@dataclass 

508class WithNkeyFileAndJwtFile(ConnectOption): 

509 """Connect option to configure user credentials. 

510 

511 Args: 

512 nkey_file: The path to the nkeys seed file. 

513 jwt_file: The path to the user jwt file. 

514 """ 

515 

516 nkey_file: str 

517 jwt_file: str 

518 

519 def __call__(self, opts: ConnectOpts) -> None: 

520 return WithNKeySeedAndJwt( 

521 Path(self.nkey_file).read_text(), 

522 Path(self.jwt_file).read_text(), 

523 ).__call__(opts) 

524 

525 

526@dataclass 

527class WithErrorCallback(ConnectOption): 

528 """Connect option to configure the error callback. 

529 

530 Args: 

531 callback: The callback function to call each time an error occurs. 

532 """ 

533 

534 callback: Callable[[Exception], Awaitable[None]] 

535 

536 def __call__(self, opts: ConnectOpts) -> None: 

537 opts.error_cb = self.callback 

538 

539 

540@dataclass 

541class WithDisconnectedCallback(ConnectOption): 

542 """Connect option to configure the disconnection callback. 

543 

544 Args: 

545 callback: The callback function to call each time connection is lost. 

546 """ 

547 

548 callback: Callable[[], Awaitable[None]] 

549 

550 def __call__(self, opts: ConnectOpts) -> None: 

551 opts.disconnected_cb = self.callback 

552 

553 

554@dataclass 

555class WithReconnectedCallback(ConnectOption): 

556 """Connect option to configure the reconnection callback. 

557 

558 Args: 

559 callback: The callback function to call each time connection is reestablished. 

560 """ 

561 

562 callback: Callable[[], Awaitable[None]] 

563 

564 def __call__(self, opts: ConnectOpts) -> None: 

565 opts.reconnected_cb = self.callback 

566 

567 

568@dataclass 

569class WithConnectionClosedCallback(ConnectOption): 

570 """Connect option to configure the connection closed callback. 

571 

572 Args: 

573 callback: The callback function to call once connection is closed. 

574 """ 

575 

576 callback: Callable[[], Awaitable[None]] 

577 

578 def __call__(self, opts: ConnectOpts) -> None: 

579 opts.closed_cb = self.callback 

580 

581 

582@dataclass 

583class WithServerDiscoveredCallback(ConnectOption): 

584 """Connect option to configure the server discovered callback. 

585 

586 Args: 

587 callback: The callback function to call each time a new server is discovered. 

588 """ 

589 

590 callback: Callable[[], Awaitable[None]] 

591 

592 def __call__(self, opts: ConnectOpts) -> None: 

593 opts.discovered_server_cb = self.callback 

594 

595 

596@dataclass 

597class WithCallbacks(ConnectOption): 

598 """Connect option to configure all connection state callbacks. 

599 

600 Args: 

601 on_error: The callback function to call each time an error occurs. 

602 on_disconnection: The callback function to call each time connection is lost. 

603 on_connection_closed: The callback function to call once connection is closed. 

604 on_server_discovered: The callback function to call each time a new server is discovered. 

605 on_reconnection: The callback function to call each time connection is reestablished. 

606 """ 

607 

608 on_error: Callable[[Exception], Awaitable[None]] | None = None 

609 on_disconnection: Callable[[], Awaitable[None]] | None = None 

610 on_connection_closed: Callable[[], Awaitable[None]] | None = None 

611 on_server_discovered: Callable[[], Awaitable[None]] | None = None 

612 on_reconnection: Callable[[], Awaitable[None]] | None = None 

613 

614 def __call__(self, opts: ConnectOpts) -> None: 

615 if self.on_error: 615 ↛ 617line 615 didn't jump to line 617, because the condition on line 615 was never false

616 opts.error_cb = self.on_error 

617 if self.on_disconnection: 617 ↛ 619line 617 didn't jump to line 619, because the condition on line 617 was never false

618 opts.disconnected_cb = self.on_disconnection 

619 if self.on_connection_closed: 619 ↛ 621line 619 didn't jump to line 621, because the condition on line 619 was never false

620 opts.closed_cb = self.on_connection_closed 

621 if self.on_server_discovered: 621 ↛ 622line 621 didn't jump to line 622, because the condition on line 621 was never true

622 opts.discovered_server_cb = self.on_server_discovered 

623 if self.on_reconnection: 623 ↛ exitline 623 didn't return from function '__call__', because the condition on line 623 was never false

624 opts.reconnected_cb = self.on_reconnection