Coverage for src/nats_contrib/micro/api.py: 98%

141 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-03-06 11:09 +0100

1"""NATS Micro implementation based on nats-py client library. 

2 

3NATS Micro is a protocol for building microservices with NATS. 

4 

5It is documented in [ADR-32: Service API](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-32.md). 

6 

7The reference implementation is in [nats.go](https://github.com/nats-io/nats.go) under [micro package](https://pkg.go.dev/github.com/nats-io/nats.go/micro). 

8A typescript implementation is available in [nats.deno](https://github.com/nats-io/nats.deno/blob/main/nats-base-client/service.ts) 

9""" 

10 

11from __future__ import annotations 

12 

13import asyncio 

14from datetime import datetime 

15from typing import Awaitable, Callable 

16 

17from nats.aio.client import Client as NatsClient 

18from nats.aio.msg import Msg 

19from nats.aio.subscription import ( 

20 DEFAULT_SUB_PENDING_BYTES_LIMIT, 

21 DEFAULT_SUB_PENDING_MSGS_LIMIT, 

22 Subscription, 

23) 

24from nats.errors import BadSubscriptionError 

25 

26from . import internal 

27from .middleware import Middleware, apply_middlewares 

28from .models import ServiceInfo, ServiceStats 

29from .request import Handler, NatsRequest 

30 

31DEFAULT_QUEUE_GROUP = "q" 

32"""Queue Group name used across all services.""" 

33 

34API_PREFIX = "$SRV" 

35"""APIPrefix is the root of all control subjects.""" 

36 

37 

38def add_service( 

39 nc: NatsClient, 

40 name: str, 

41 version: str, 

42 description: str | None = None, 

43 metadata: dict[str, str] | None = None, 

44 queue_group: str | None = None, 

45 pending_bytes_limit_by_endpoint: int | None = None, 

46 pending_msgs_limit_by_endpoint: int | None = None, 

47 now: Callable[[], datetime] | None = None, 

48 id_generator: Callable[[], str] | None = None, 

49 api_prefix: str | None = None, 

50) -> Service: 

51 """Create a new service. 

52 

53 A service is a collection of endpoints that are grouped together 

54 under a common name. 

55 

56 Each endpoint is a request-reply handler for a subject. 

57 

58 It's possible to add endpoints to a service after it has been created AND 

59 started. 

60 

61 Args: 

62 nc: The NATS client. 

63 name: The name of the service. 

64 version: The version of the service. Must be a valid semver version. 

65 description: The description of the service. 

66 metadata: The metadata of the service. 

67 queue_group: The default queue group of the service. 

68 pending_bytes_limit_by_endpoint: The default pending bytes limit for each endpoint within the service. 

69 pending_msgs_limit_by_endpoint: The default pending messages limit for each endpoint within the service. 

70 now: The function to get the current time. 

71 id_generator: The function to generate a unique service instance id. 

72 api_prefix: The prefix of the control subjects. 

73 """ 

74 if id_generator is None: 74 ↛ 75line 74 didn't jump to line 75, because the condition on line 74 was never true

75 id_generator = internal.default_id_generator 

76 instance_id = id_generator() 

77 service_config = internal.ServiceConfig( 

78 name=name, 

79 version=version, 

80 description=description or "", 

81 metadata=metadata or {}, 

82 queue_group=queue_group or DEFAULT_QUEUE_GROUP, 

83 pending_bytes_limit_by_endpoint=pending_bytes_limit_by_endpoint 

84 or DEFAULT_SUB_PENDING_BYTES_LIMIT, 

85 pending_msgs_limit_by_endpoint=pending_msgs_limit_by_endpoint 

86 or DEFAULT_SUB_PENDING_MSGS_LIMIT, 

87 ) 

88 return Service( 

89 nc=nc, 

90 id=instance_id, 

91 config=service_config, 

92 api_prefix=api_prefix or API_PREFIX, 

93 clock=now or internal.default_clock, 

94 ) 

95 

96 

97class Endpoint: 

98 """Endpoint manages a service endpoint.""" 

99 

100 def __init__(self, config: internal.EndpointConfig) -> None: 

101 self.config = config 

102 self.stats = internal.create_endpoint_stats(config) 

103 self.info = internal.create_endpoint_info(config) 

104 self._sub: Subscription | None = None 

105 

106 def reset(self) -> None: 

107 """Reset the endpoint statistics.""" 

108 self.stats = internal.create_endpoint_stats(self.config) 

109 self.info = internal.create_endpoint_info(self.config) 

110 

111 

112class Group: 

113 """Group allows for grouping endpoints on a service. 

114 

115 Endpoints created using `Group.add_endpoint` will be grouped 

116 under common prefix (group name). New groups can also be derived 

117 from a group using `Group.add_group`. 

118 """ 

119 

120 def __init__(self, config: internal.GroupConfig, service: Service) -> None: 

121 self._config = config 

122 self._service = service 

123 

124 def add_group( 

125 self, 

126 name: str, 

127 queue_group: str | None = None, 

128 pending_bytes_limit_by_endpoint: int | None = None, 

129 pending_msgs_limit_by_endpoint: int | None = None, 

130 ) -> Group: 

131 """Add a group to the group. 

132 

133 Args: 

134 name: The name of the group. Must be a valid NATS subject prefix. 

135 queue_group: The default queue group of the group. When queue group is not set, it defaults to the queue group of the parent group or service. 

136 pending_bytes_limit_by_endpoint: The default pending bytes limit for each endpoint within the group. 

137 pending_msgs_limit_by_endpoint: The default pending messages limit for each endpoint within the group. 

138 """ 

139 config = self._config.child( 

140 name=name, 

141 queue_group=queue_group, 

142 pending_bytes_limit=pending_bytes_limit_by_endpoint, 

143 pending_msgs_limit=pending_msgs_limit_by_endpoint, 

144 ) 

145 group = Group(config, self._service) 

146 return group 

147 

148 async def add_endpoint( 

149 self, 

150 name: str, 

151 handler: Handler, 

152 subject: str | None = None, 

153 queue_group: str | None = None, 

154 metadata: dict[str, str] | None = None, 

155 pending_bytes_limit: int | None = None, 

156 pending_msgs_limit: int | None = None, 

157 middlewares: list[Middleware] | None = None, 

158 ) -> Endpoint: 

159 """Add an endpoint to the group. 

160 

161 Args: 

162 name: The name of the endpoint. 

163 handler: The handler of the endpoint. 

164 subject: The subject of the endpoint. When subject is not set, it defaults to the name of the endpoint. 

165 queue_group: The queue group of the endpoint. When queue group is not set, it defaults to the queue group of the parent group or service. 

166 metadata: The metadata of the endpoint. 

167 pending_bytes_limit: The pending bytes limit for this endpoint. 

168 pending_msgs_limit: The pending messages limit for this endpoint. 

169 """ 

170 return await self._service.add_endpoint( 

171 name=name, 

172 subject=f"{self._config.name}.{subject or name}", 

173 handler=handler, 

174 metadata=metadata, 

175 queue_group=queue_group or self._config.queue_group, 

176 pending_bytes_limit=pending_bytes_limit 

177 or self._config.pending_bytes_limit_by_endpoint, 

178 pending_msgs_limit=pending_msgs_limit 

179 or self._config.pending_msgs_limit_by_endpoint, 

180 middlewares=middlewares, 

181 ) 

182 

183 

184class Service: 

185 """Services simplify the development of NATS micro-services. 

186 

187 Endpoints can be added to a service after it has been created and started. 

188 Each endpoint is a request-reply handler for a subject. 

189 

190 Groups can be added to a service to group endpoints under a common prefix. 

191 """ 

192 

193 def __init__( 

194 self, 

195 nc: NatsClient, 

196 id: str, 

197 config: internal.ServiceConfig, 

198 api_prefix: str, 

199 clock: Callable[[], datetime], 

200 ) -> None: 

201 self._nc = nc 

202 self._config = config 

203 self._api_prefix = api_prefix 

204 self._clock = clock 

205 # Initialize state 

206 self._id = id 

207 self._endpoints: list[Endpoint] = [] 

208 self._stopped = False 

209 # Internal responses 

210 self._stats = internal.new_service_stats(self._id, self._clock(), config) 

211 self._info = internal.new_service_info(self._id, config) 

212 self._ping_response = internal.new_ping_info(self._id, config) 

213 # Cache the serialized ping response 

214 self._ping_response_message = internal.encode_ping_info(self._ping_response) 

215 # Internal subscriptions 

216 self._ping_subscriptions: list[Subscription] = [] 

217 self._info_subscriptions: list[Subscription] = [] 

218 self._stats_subscriptions: list[Subscription] = [] 

219 

220 async def start(self) -> None: 

221 """Start the service. 

222 

223 A service MUST be started before adding endpoints. 

224 

225 This will start the internal subscriptions and enable 

226 service discovery. 

227 """ 

228 # Start PING subscriptions: 

229 # - $SRV.PING 

230 # - $SRV.{name}.PING 

231 # - $SRV.{name}.{id}.PING 

232 for subject in internal.get_internal_subjects( 

233 internal.ServiceVerb.PING, 

234 self._id, 

235 self._config, 

236 api_prefix=self._api_prefix, 

237 ): 

238 sub = await self._nc.subscribe( # pyright: ignore[reportUnknownMemberType] 

239 subject, 

240 cb=self._handle_ping_request, 

241 ) 

242 self._ping_subscriptions.append(sub) 

243 # Start INFO subscriptions: 

244 # - $SRV.INFO 

245 # - $SRV.{name}.INFO 

246 # - $SRV.{name}.{id}.INFO 

247 for subject in internal.get_internal_subjects( 

248 internal.ServiceVerb.INFO, 

249 self._id, 

250 self._config, 

251 api_prefix=self._api_prefix, 

252 ): 

253 sub = await self._nc.subscribe( # pyright: ignore[reportUnknownMemberType] 

254 subject, 

255 cb=self._handle_info_request, 

256 ) 

257 self._info_subscriptions.append(sub) 

258 # Start STATS subscriptions: 

259 # - $SRV.STATS 

260 # - $SRV.{name}.STATS 

261 # - $SRV.{name}.{id}.STATS 

262 for subject in internal.get_internal_subjects( 

263 internal.ServiceVerb.STATS, 

264 self._id, 

265 self._config, 

266 api_prefix=self._api_prefix, 

267 ): 

268 sub = await self._nc.subscribe( # pyright: ignore[reportUnknownMemberType] 

269 subject, 

270 cb=self._handle_stats_request, 

271 ) 

272 self._stats_subscriptions.append(sub) 

273 

274 async def stop(self) -> None: 

275 """Stop the service. 

276 

277 This will stop all endpoints and internal subscriptions. 

278 """ 

279 self._stopped = True 

280 # Stop all endpoints 

281 await asyncio.gather(*(_stop_endpoint(ep) for ep in self._endpoints)) 

282 # Stop all internal subscriptions 

283 await asyncio.gather( 

284 *( 

285 _unsubscribe(sub) 

286 for subscriptions in ( 

287 self._stats_subscriptions, 

288 self._info_subscriptions, 

289 self._ping_subscriptions, 

290 ) 

291 for sub in subscriptions 

292 ) 

293 ) 

294 

295 def stopped(self) -> bool: 

296 """Stopped informs whether [Stop] was executed on the service.""" 

297 return self._stopped 

298 

299 def info(self) -> ServiceInfo: 

300 """Returns the service info.""" 

301 return self._info.copy() 

302 

303 def stats(self) -> ServiceStats: 

304 """Returns statistics for the service endpoint and all monitoring endpoints.""" 

305 return self._stats.copy() 

306 

307 def reset(self) -> None: 

308 """Resets all statistics (for all endpoints) on a service instance.""" 

309 

310 # Internal responses 

311 self._stats = internal.new_service_stats(self._id, self._clock(), self._config) 

312 self._info = internal.new_service_info(self._id, self._config) 

313 self._ping_response = internal.new_ping_info(self._id, self._config) 

314 self._ping_response_message = internal.encode_ping_info(self._ping_response) 

315 # Reset all endpoints 

316 endpoints = list(self._endpoints) 

317 self._endpoints.clear() 

318 for ep in endpoints: 

319 ep.reset() 

320 self._endpoints.append(ep) 

321 self._stats.endpoints.append(ep.stats) 

322 self._info.endpoints.append(ep.info) 

323 

324 def add_group( 

325 self, 

326 name: str, 

327 queue_group: str | None = None, 

328 pending_bytes_limit_by_endpoint: int | None = None, 

329 pending_msgs_limit_by_endpoint: int | None = None, 

330 ) -> Group: 

331 """Add a group to the service. 

332 

333 A group is a collection of endpoints that share the same prefix, 

334 and the same default queue group and pending limits. 

335 

336 At runtime, a group does not exist as a separate entity, only 

337 endpoints exist. However, groups are useful to organize endpoints 

338 and to set default values for queue group and pending limits. 

339 

340 Args: 

341 name: The name of the group. 

342 queue_group: The default queue group of the group. When queue group is not set, it defaults to the queue group of the parent group or service. 

343 pending_bytes_limit_by_endpoint: The default pending bytes limit for each endpoint within the group. 

344 pending_msgs_limit_by_endpoint: The default pending messages limit for each endpoint within the group. 

345 """ 

346 config = internal.GroupConfig( 

347 name=name, 

348 queue_group=queue_group or self._config.queue_group, 

349 pending_bytes_limit_by_endpoint=pending_bytes_limit_by_endpoint 

350 or self._config.pending_bytes_limit_by_endpoint, 

351 pending_msgs_limit_by_endpoint=pending_msgs_limit_by_endpoint 

352 or self._config.pending_msgs_limit_by_endpoint, 

353 ) 

354 return Group(config, self) 

355 

356 async def add_endpoint( 

357 self, 

358 name: str, 

359 handler: Handler, 

360 subject: str | None = None, 

361 queue_group: str | None = None, 

362 metadata: dict[str, str] | None = None, 

363 pending_bytes_limit: int | None = None, 

364 pending_msgs_limit: int | None = None, 

365 middlewares: list[Middleware] | None = None, 

366 ) -> Endpoint: 

367 """Add an endpoint to the service. 

368 

369 An endpoint is a request-reply handler for a subject. 

370 

371 Args: 

372 name: The name of the endpoint. 

373 handler: The handler of the endpoint. 

374 subject: The subject of the endpoint. When subject is not set, it defaults to the name of the endpoint. 

375 queue_group: The queue group of the endpoint. When queue group is not set, it defaults to the queue group of the parent group or service. 

376 metadata: The metadata of the endpoint. 

377 pending_bytes_limit: The pending bytes limit for this endpoint. 

378 pending_msgs_limit: The pending messages limit for this endpoint. 

379 """ 

380 if self._stopped: 

381 raise RuntimeError("Cannot add endpoint to a stopped service") 

382 config = self._config.endpoint_config( 

383 name=name, 

384 handler=handler, 

385 subject=subject, 

386 queue_group=queue_group, 

387 metadata=metadata, 

388 pending_bytes_limit=pending_bytes_limit, 

389 pending_msgs_limit=pending_msgs_limit, 

390 ) 

391 # Create the endpoint 

392 ep = Endpoint(config) 

393 # Create the endpoint handler 

394 subscription_handler = _create_handler(ep, middlewares) 

395 # Start the endpoint subscription 

396 subscription = ( 

397 await self._nc.subscribe( # pyright: ignore[reportUnknownMemberType] 

398 config.subject, 

399 queue=config.queue_group, 

400 cb=subscription_handler, 

401 ) 

402 ) 

403 # Attach the subscription to the endpoint 

404 ep._sub = subscription # pyright: ignore[reportPrivateUsage] 

405 # Append the endpoint to the service 

406 self._endpoints.append(ep) 

407 # Append the endpoint to the service stats and info 

408 self._stats.endpoints.append(ep.stats) 

409 self._info.endpoints.append(ep.info) 

410 return ep 

411 

412 async def _handle_ping_request(self, msg: Msg) -> None: 

413 """Handle the ping message.""" 

414 await msg.respond(data=self._ping_response_message) 

415 

416 async def _handle_info_request(self, msg: Msg) -> None: 

417 """Handle the info message.""" 

418 await msg.respond(data=internal.encode_info(self._info)) 

419 

420 async def _handle_stats_request(self, msg: Msg) -> None: 

421 """Handle the stats message.""" 

422 await msg.respond(data=internal.encode_stats(self._stats)) 

423 

424 async def __aenter__(self) -> Service: 

425 """Implement the asynchronous context manager interface.""" 

426 await self.start() 

427 return self 

428 

429 async def __aexit__(self, *args: object, **kwargs: object) -> None: 

430 """Implement the asynchronous context manager interface.""" 

431 await self.stop() 

432 

433 

434def _create_handler( 

435 endpoint: Endpoint, middlewares: list[Middleware] | None = None 

436) -> Callable[[Msg], Awaitable[None]]: 

437 """A helper function called internally to create endpoint message handlers.""" 

438 if middlewares: 438 ↛ 439line 438 didn't jump to line 439, because the condition on line 438 was never true

439 micro_handler = apply_middlewares(endpoint.config.handler, middlewares) 

440 else: 

441 micro_handler = endpoint.config.handler 

442 

443 async def handler(msg: Msg) -> None: 

444 timer = internal.Timer() 

445 endpoint.stats.num_requests += 1 

446 request = NatsRequest(msg) 

447 try: 

448 await micro_handler(request) 

449 except Exception as exc: 

450 endpoint.stats.num_errors += 1 

451 endpoint.stats.last_error = repr(exc) 

452 await request.respond_error( 

453 code=500, 

454 description="Internal Server Error", 

455 ) 

456 endpoint.stats.processing_time += timer.elapsed_nanoseconds() 

457 endpoint.stats.average_processing_time = int( 

458 endpoint.stats.processing_time / endpoint.stats.num_requests 

459 ) 

460 

461 return handler 

462 

463 

464async def _stop_endpoint(endpoint: Endpoint) -> None: 

465 """Stop the endpoint by draining its subscription.""" 

466 if endpoint._sub: # pyright: ignore[reportPrivateUsage] 

467 await _unsubscribe(endpoint._sub) # pyright: ignore[reportPrivateUsage] 

468 endpoint._sub = None # pyright: ignore[reportPrivateUsage] 

469 

470 

471async def _unsubscribe(sub: Subscription) -> None: 

472 try: 

473 await sub.unsubscribe() 

474 except BadSubscriptionError: 

475 pass