Coverage for src/nats_contrib/micro/api.py: 98%
141 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-03-06 11:09 +0100
« prev ^ index » next coverage.py v7.4.3, created at 2024-03-06 11:09 +0100
1"""NATS Micro implementation based on nats-py client library.
3NATS Micro is a protocol for building microservices with NATS.
5It is documented in [ADR-32: Service API](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-32.md).
7The reference implementation is in [nats.go](https://github.com/nats-io/nats.go) under [micro package](https://pkg.go.dev/github.com/nats-io/nats.go/micro).
8A typescript implementation is available in [nats.deno](https://github.com/nats-io/nats.deno/blob/main/nats-base-client/service.ts)
9"""
11from __future__ import annotations
13import asyncio
14from datetime import datetime
15from typing import Awaitable, Callable
17from nats.aio.client import Client as NatsClient
18from nats.aio.msg import Msg
19from nats.aio.subscription import (
20 DEFAULT_SUB_PENDING_BYTES_LIMIT,
21 DEFAULT_SUB_PENDING_MSGS_LIMIT,
22 Subscription,
23)
24from nats.errors import BadSubscriptionError
26from . import internal
27from .middleware import Middleware, apply_middlewares
28from .models import ServiceInfo, ServiceStats
29from .request import Handler, NatsRequest
31DEFAULT_QUEUE_GROUP = "q"
32"""Queue Group name used across all services."""
34API_PREFIX = "$SRV"
35"""APIPrefix is the root of all control subjects."""
38def add_service(
39 nc: NatsClient,
40 name: str,
41 version: str,
42 description: str | None = None,
43 metadata: dict[str, str] | None = None,
44 queue_group: str | None = None,
45 pending_bytes_limit_by_endpoint: int | None = None,
46 pending_msgs_limit_by_endpoint: int | None = None,
47 now: Callable[[], datetime] | None = None,
48 id_generator: Callable[[], str] | None = None,
49 api_prefix: str | None = None,
50) -> Service:
51 """Create a new service.
53 A service is a collection of endpoints that are grouped together
54 under a common name.
56 Each endpoint is a request-reply handler for a subject.
58 It's possible to add endpoints to a service after it has been created AND
59 started.
61 Args:
62 nc: The NATS client.
63 name: The name of the service.
64 version: The version of the service. Must be a valid semver version.
65 description: The description of the service.
66 metadata: The metadata of the service.
67 queue_group: The default queue group of the service.
68 pending_bytes_limit_by_endpoint: The default pending bytes limit for each endpoint within the service.
69 pending_msgs_limit_by_endpoint: The default pending messages limit for each endpoint within the service.
70 now: The function to get the current time.
71 id_generator: The function to generate a unique service instance id.
72 api_prefix: The prefix of the control subjects.
73 """
74 if id_generator is None: 74 ↛ 75line 74 didn't jump to line 75, because the condition on line 74 was never true
75 id_generator = internal.default_id_generator
76 instance_id = id_generator()
77 service_config = internal.ServiceConfig(
78 name=name,
79 version=version,
80 description=description or "",
81 metadata=metadata or {},
82 queue_group=queue_group or DEFAULT_QUEUE_GROUP,
83 pending_bytes_limit_by_endpoint=pending_bytes_limit_by_endpoint
84 or DEFAULT_SUB_PENDING_BYTES_LIMIT,
85 pending_msgs_limit_by_endpoint=pending_msgs_limit_by_endpoint
86 or DEFAULT_SUB_PENDING_MSGS_LIMIT,
87 )
88 return Service(
89 nc=nc,
90 id=instance_id,
91 config=service_config,
92 api_prefix=api_prefix or API_PREFIX,
93 clock=now or internal.default_clock,
94 )
97class Endpoint:
98 """Endpoint manages a service endpoint."""
100 def __init__(self, config: internal.EndpointConfig) -> None:
101 self.config = config
102 self.stats = internal.create_endpoint_stats(config)
103 self.info = internal.create_endpoint_info(config)
104 self._sub: Subscription | None = None
106 def reset(self) -> None:
107 """Reset the endpoint statistics."""
108 self.stats = internal.create_endpoint_stats(self.config)
109 self.info = internal.create_endpoint_info(self.config)
112class Group:
113 """Group allows for grouping endpoints on a service.
115 Endpoints created using `Group.add_endpoint` will be grouped
116 under common prefix (group name). New groups can also be derived
117 from a group using `Group.add_group`.
118 """
120 def __init__(self, config: internal.GroupConfig, service: Service) -> None:
121 self._config = config
122 self._service = service
124 def add_group(
125 self,
126 name: str,
127 queue_group: str | None = None,
128 pending_bytes_limit_by_endpoint: int | None = None,
129 pending_msgs_limit_by_endpoint: int | None = None,
130 ) -> Group:
131 """Add a group to the group.
133 Args:
134 name: The name of the group. Must be a valid NATS subject prefix.
135 queue_group: The default queue group of the group. When queue group is not set, it defaults to the queue group of the parent group or service.
136 pending_bytes_limit_by_endpoint: The default pending bytes limit for each endpoint within the group.
137 pending_msgs_limit_by_endpoint: The default pending messages limit for each endpoint within the group.
138 """
139 config = self._config.child(
140 name=name,
141 queue_group=queue_group,
142 pending_bytes_limit=pending_bytes_limit_by_endpoint,
143 pending_msgs_limit=pending_msgs_limit_by_endpoint,
144 )
145 group = Group(config, self._service)
146 return group
148 async def add_endpoint(
149 self,
150 name: str,
151 handler: Handler,
152 subject: str | None = None,
153 queue_group: str | None = None,
154 metadata: dict[str, str] | None = None,
155 pending_bytes_limit: int | None = None,
156 pending_msgs_limit: int | None = None,
157 middlewares: list[Middleware] | None = None,
158 ) -> Endpoint:
159 """Add an endpoint to the group.
161 Args:
162 name: The name of the endpoint.
163 handler: The handler of the endpoint.
164 subject: The subject of the endpoint. When subject is not set, it defaults to the name of the endpoint.
165 queue_group: The queue group of the endpoint. When queue group is not set, it defaults to the queue group of the parent group or service.
166 metadata: The metadata of the endpoint.
167 pending_bytes_limit: The pending bytes limit for this endpoint.
168 pending_msgs_limit: The pending messages limit for this endpoint.
169 """
170 return await self._service.add_endpoint(
171 name=name,
172 subject=f"{self._config.name}.{subject or name}",
173 handler=handler,
174 metadata=metadata,
175 queue_group=queue_group or self._config.queue_group,
176 pending_bytes_limit=pending_bytes_limit
177 or self._config.pending_bytes_limit_by_endpoint,
178 pending_msgs_limit=pending_msgs_limit
179 or self._config.pending_msgs_limit_by_endpoint,
180 middlewares=middlewares,
181 )
184class Service:
185 """Services simplify the development of NATS micro-services.
187 Endpoints can be added to a service after it has been created and started.
188 Each endpoint is a request-reply handler for a subject.
190 Groups can be added to a service to group endpoints under a common prefix.
191 """
193 def __init__(
194 self,
195 nc: NatsClient,
196 id: str,
197 config: internal.ServiceConfig,
198 api_prefix: str,
199 clock: Callable[[], datetime],
200 ) -> None:
201 self._nc = nc
202 self._config = config
203 self._api_prefix = api_prefix
204 self._clock = clock
205 # Initialize state
206 self._id = id
207 self._endpoints: list[Endpoint] = []
208 self._stopped = False
209 # Internal responses
210 self._stats = internal.new_service_stats(self._id, self._clock(), config)
211 self._info = internal.new_service_info(self._id, config)
212 self._ping_response = internal.new_ping_info(self._id, config)
213 # Cache the serialized ping response
214 self._ping_response_message = internal.encode_ping_info(self._ping_response)
215 # Internal subscriptions
216 self._ping_subscriptions: list[Subscription] = []
217 self._info_subscriptions: list[Subscription] = []
218 self._stats_subscriptions: list[Subscription] = []
220 async def start(self) -> None:
221 """Start the service.
223 A service MUST be started before adding endpoints.
225 This will start the internal subscriptions and enable
226 service discovery.
227 """
228 # Start PING subscriptions:
229 # - $SRV.PING
230 # - $SRV.{name}.PING
231 # - $SRV.{name}.{id}.PING
232 for subject in internal.get_internal_subjects(
233 internal.ServiceVerb.PING,
234 self._id,
235 self._config,
236 api_prefix=self._api_prefix,
237 ):
238 sub = await self._nc.subscribe( # pyright: ignore[reportUnknownMemberType]
239 subject,
240 cb=self._handle_ping_request,
241 )
242 self._ping_subscriptions.append(sub)
243 # Start INFO subscriptions:
244 # - $SRV.INFO
245 # - $SRV.{name}.INFO
246 # - $SRV.{name}.{id}.INFO
247 for subject in internal.get_internal_subjects(
248 internal.ServiceVerb.INFO,
249 self._id,
250 self._config,
251 api_prefix=self._api_prefix,
252 ):
253 sub = await self._nc.subscribe( # pyright: ignore[reportUnknownMemberType]
254 subject,
255 cb=self._handle_info_request,
256 )
257 self._info_subscriptions.append(sub)
258 # Start STATS subscriptions:
259 # - $SRV.STATS
260 # - $SRV.{name}.STATS
261 # - $SRV.{name}.{id}.STATS
262 for subject in internal.get_internal_subjects(
263 internal.ServiceVerb.STATS,
264 self._id,
265 self._config,
266 api_prefix=self._api_prefix,
267 ):
268 sub = await self._nc.subscribe( # pyright: ignore[reportUnknownMemberType]
269 subject,
270 cb=self._handle_stats_request,
271 )
272 self._stats_subscriptions.append(sub)
274 async def stop(self) -> None:
275 """Stop the service.
277 This will stop all endpoints and internal subscriptions.
278 """
279 self._stopped = True
280 # Stop all endpoints
281 await asyncio.gather(*(_stop_endpoint(ep) for ep in self._endpoints))
282 # Stop all internal subscriptions
283 await asyncio.gather(
284 *(
285 _unsubscribe(sub)
286 for subscriptions in (
287 self._stats_subscriptions,
288 self._info_subscriptions,
289 self._ping_subscriptions,
290 )
291 for sub in subscriptions
292 )
293 )
295 def stopped(self) -> bool:
296 """Stopped informs whether [Stop] was executed on the service."""
297 return self._stopped
299 def info(self) -> ServiceInfo:
300 """Returns the service info."""
301 return self._info.copy()
303 def stats(self) -> ServiceStats:
304 """Returns statistics for the service endpoint and all monitoring endpoints."""
305 return self._stats.copy()
307 def reset(self) -> None:
308 """Resets all statistics (for all endpoints) on a service instance."""
310 # Internal responses
311 self._stats = internal.new_service_stats(self._id, self._clock(), self._config)
312 self._info = internal.new_service_info(self._id, self._config)
313 self._ping_response = internal.new_ping_info(self._id, self._config)
314 self._ping_response_message = internal.encode_ping_info(self._ping_response)
315 # Reset all endpoints
316 endpoints = list(self._endpoints)
317 self._endpoints.clear()
318 for ep in endpoints:
319 ep.reset()
320 self._endpoints.append(ep)
321 self._stats.endpoints.append(ep.stats)
322 self._info.endpoints.append(ep.info)
324 def add_group(
325 self,
326 name: str,
327 queue_group: str | None = None,
328 pending_bytes_limit_by_endpoint: int | None = None,
329 pending_msgs_limit_by_endpoint: int | None = None,
330 ) -> Group:
331 """Add a group to the service.
333 A group is a collection of endpoints that share the same prefix,
334 and the same default queue group and pending limits.
336 At runtime, a group does not exist as a separate entity, only
337 endpoints exist. However, groups are useful to organize endpoints
338 and to set default values for queue group and pending limits.
340 Args:
341 name: The name of the group.
342 queue_group: The default queue group of the group. When queue group is not set, it defaults to the queue group of the parent group or service.
343 pending_bytes_limit_by_endpoint: The default pending bytes limit for each endpoint within the group.
344 pending_msgs_limit_by_endpoint: The default pending messages limit for each endpoint within the group.
345 """
346 config = internal.GroupConfig(
347 name=name,
348 queue_group=queue_group or self._config.queue_group,
349 pending_bytes_limit_by_endpoint=pending_bytes_limit_by_endpoint
350 or self._config.pending_bytes_limit_by_endpoint,
351 pending_msgs_limit_by_endpoint=pending_msgs_limit_by_endpoint
352 or self._config.pending_msgs_limit_by_endpoint,
353 )
354 return Group(config, self)
356 async def add_endpoint(
357 self,
358 name: str,
359 handler: Handler,
360 subject: str | None = None,
361 queue_group: str | None = None,
362 metadata: dict[str, str] | None = None,
363 pending_bytes_limit: int | None = None,
364 pending_msgs_limit: int | None = None,
365 middlewares: list[Middleware] | None = None,
366 ) -> Endpoint:
367 """Add an endpoint to the service.
369 An endpoint is a request-reply handler for a subject.
371 Args:
372 name: The name of the endpoint.
373 handler: The handler of the endpoint.
374 subject: The subject of the endpoint. When subject is not set, it defaults to the name of the endpoint.
375 queue_group: The queue group of the endpoint. When queue group is not set, it defaults to the queue group of the parent group or service.
376 metadata: The metadata of the endpoint.
377 pending_bytes_limit: The pending bytes limit for this endpoint.
378 pending_msgs_limit: The pending messages limit for this endpoint.
379 """
380 if self._stopped:
381 raise RuntimeError("Cannot add endpoint to a stopped service")
382 config = self._config.endpoint_config(
383 name=name,
384 handler=handler,
385 subject=subject,
386 queue_group=queue_group,
387 metadata=metadata,
388 pending_bytes_limit=pending_bytes_limit,
389 pending_msgs_limit=pending_msgs_limit,
390 )
391 # Create the endpoint
392 ep = Endpoint(config)
393 # Create the endpoint handler
394 subscription_handler = _create_handler(ep, middlewares)
395 # Start the endpoint subscription
396 subscription = (
397 await self._nc.subscribe( # pyright: ignore[reportUnknownMemberType]
398 config.subject,
399 queue=config.queue_group,
400 cb=subscription_handler,
401 )
402 )
403 # Attach the subscription to the endpoint
404 ep._sub = subscription # pyright: ignore[reportPrivateUsage]
405 # Append the endpoint to the service
406 self._endpoints.append(ep)
407 # Append the endpoint to the service stats and info
408 self._stats.endpoints.append(ep.stats)
409 self._info.endpoints.append(ep.info)
410 return ep
412 async def _handle_ping_request(self, msg: Msg) -> None:
413 """Handle the ping message."""
414 await msg.respond(data=self._ping_response_message)
416 async def _handle_info_request(self, msg: Msg) -> None:
417 """Handle the info message."""
418 await msg.respond(data=internal.encode_info(self._info))
420 async def _handle_stats_request(self, msg: Msg) -> None:
421 """Handle the stats message."""
422 await msg.respond(data=internal.encode_stats(self._stats))
424 async def __aenter__(self) -> Service:
425 """Implement the asynchronous context manager interface."""
426 await self.start()
427 return self
429 async def __aexit__(self, *args: object, **kwargs: object) -> None:
430 """Implement the asynchronous context manager interface."""
431 await self.stop()
434def _create_handler(
435 endpoint: Endpoint, middlewares: list[Middleware] | None = None
436) -> Callable[[Msg], Awaitable[None]]:
437 """A helper function called internally to create endpoint message handlers."""
438 if middlewares: 438 ↛ 439line 438 didn't jump to line 439, because the condition on line 438 was never true
439 micro_handler = apply_middlewares(endpoint.config.handler, middlewares)
440 else:
441 micro_handler = endpoint.config.handler
443 async def handler(msg: Msg) -> None:
444 timer = internal.Timer()
445 endpoint.stats.num_requests += 1
446 request = NatsRequest(msg)
447 try:
448 await micro_handler(request)
449 except Exception as exc:
450 endpoint.stats.num_errors += 1
451 endpoint.stats.last_error = repr(exc)
452 await request.respond_error(
453 code=500,
454 description="Internal Server Error",
455 )
456 endpoint.stats.processing_time += timer.elapsed_nanoseconds()
457 endpoint.stats.average_processing_time = int(
458 endpoint.stats.processing_time / endpoint.stats.num_requests
459 )
461 return handler
464async def _stop_endpoint(endpoint: Endpoint) -> None:
465 """Stop the endpoint by draining its subscription."""
466 if endpoint._sub: # pyright: ignore[reportPrivateUsage]
467 await _unsubscribe(endpoint._sub) # pyright: ignore[reportPrivateUsage]
468 endpoint._sub = None # pyright: ignore[reportPrivateUsage]
471async def _unsubscribe(sub: Subscription) -> None:
472 try:
473 await sub.unsubscribe()
474 except BadSubscriptionError:
475 pass