Coverage for src/nats_contrib/micro/internal.py: 99%
94 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-27 05:11 +0100
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-27 05:11 +0100
1"""NATS Micro implementation based on nats-py client library.
3NATS Micro is a protocol for building microservices with NATS.
5It is documented in [ADR-32: Service API](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-32.md).
7The reference implementation is in [nats.go](https://github.com/nats-io/nats.go) under [micro package](https://pkg.go.dev/github.com/nats-io/nats.go/micro).
8A typescript implementation is available in [nats.deno](https://github.com/nats-io/nats.deno/blob/main/nats-base-client/service.ts)
9"""
11from __future__ import annotations
13import secrets
14import time
15from dataclasses import dataclass
16from datetime import datetime, timezone
17from enum import Enum
18from json import dumps
20from .models import EndpointInfo, EndpointStats, PingInfo, ServiceInfo, ServiceStats
21from .request import Handler
24class ServiceVerb(str, Enum):
25 PING = "PING"
26 INFO = "INFO"
27 STATS = "STATS"
30def get_internal_subject(
31 verb: ServiceVerb,
32 service: str | None,
33 id: str | None,
34 api_prefix: str,
35) -> str:
36 """Get the internal subject for a verb."""
37 verb_literal = verb.value
38 if service:
39 if id:
40 return f"{api_prefix}.{verb_literal}.{service}.{id}"
41 return f"{api_prefix}.{verb_literal}.{service}"
42 return f"{api_prefix}.{verb_literal}"
45def get_internal_subjects(
46 verb: ServiceVerb,
47 id: str,
48 config: ServiceConfig,
49 api_prefix: str,
50) -> list[str]:
51 """Get the internal subjects for a verb."""
52 return [
53 get_internal_subject(verb, service=None, id=None, api_prefix=api_prefix),
54 get_internal_subject(verb, service=config.name, id=None, api_prefix=api_prefix),
55 get_internal_subject(verb, service=config.name, id=id, api_prefix=api_prefix),
56 ]
59@dataclass
60class ServiceConfig:
61 """The configuration of a service."""
63 name: str
64 """The kind of the service. Shared by all services that have the same name.
65 This name can only have A-Z, a-z, 0-9, dash, underscore."""
67 version: str
68 """The version of the service.
69 This verson must be a valid semantic version."""
71 description: str
72 """The description of the service."""
74 metadata: dict[str, str]
75 """The metadata of the service."""
77 queue_group: str
78 """The default queue group of the service."""
80 pending_msgs_limit_by_endpoint: int
81 """The default pending messages limit of the service.
83 This limit is applied BY subject.
84 """
86 pending_bytes_limit_by_endpoint: int
87 """The default pending bytes limit of the service.
89 This limit is applied BY subject.
90 """
92 def endpoint_config(
93 self,
94 name: str,
95 handler: Handler,
96 subject: str | None = None,
97 queue_group: str | None = None,
98 metadata: dict[str, str] | None = None,
99 pending_bytes_limit: int | None = None,
100 pending_msgs_limit: int | None = None,
101 ) -> EndpointConfig:
102 return EndpointConfig(
103 name=name,
104 subject=subject or name,
105 handler=handler,
106 metadata=metadata or {},
107 queue_group=queue_group or self.queue_group,
108 pending_bytes_limit=pending_bytes_limit
109 or self.pending_bytes_limit_by_endpoint,
110 pending_msgs_limit=pending_msgs_limit
111 or self.pending_msgs_limit_by_endpoint,
112 )
115@dataclass
116class EndpointConfig:
117 name: str
118 """An alphanumeric human-readable string used to describe the endpoint.
120 Multiple endpoints can have the same names.
121 """
123 subject: str
124 """The subject of the endpoint. When subject is not set, it defaults to the name of the endpoint."""
126 handler: Handler
127 """The handler of the endpoint."""
129 queue_group: str
130 """The queue group of the endpoint. When queue group is not set, it defaults to the queue group of the parent group or service."""
132 metadata: dict[str, str]
133 """The metadata of the endpoint."""
135 pending_msgs_limit: int
136 """The pending message limit for this endpoint."""
138 pending_bytes_limit: int
139 """The pending bytes limit for this endpoint."""
142@dataclass
143class GroupConfig:
144 """The configuration of a group."""
146 name: str
147 """The name of the group.
148 Group names cannot contain '>' wildcard (as group name serves as subject prefix)."""
150 queue_group: str
151 """The default queue group of the group."""
153 pending_msgs_limit_by_endpoint: int
154 """The default pending message limit for each endpoint within the group."""
156 pending_bytes_limit_by_endpoint: int
157 """The default pending bytes limit for each endpoint within the group."""
159 def child(
160 self,
161 name: str,
162 queue_group: str | None = None,
163 pending_bytes_limit: int | None = None,
164 pending_msgs_limit: int | None = None,
165 ) -> GroupConfig:
166 return GroupConfig(
167 name=f"{self.name}.{name}",
168 queue_group=queue_group or self.queue_group,
169 pending_bytes_limit_by_endpoint=pending_bytes_limit
170 or self.pending_bytes_limit_by_endpoint,
171 pending_msgs_limit_by_endpoint=pending_msgs_limit
172 or self.pending_msgs_limit_by_endpoint,
173 )
176def create_endpoint_stats(config: EndpointConfig) -> EndpointStats:
177 return EndpointStats(
178 name=config.name,
179 subject=config.subject,
180 num_requests=0,
181 num_errors=0,
182 last_error="",
183 processing_time=0,
184 average_processing_time=0,
185 queue_group=config.queue_group,
186 data={},
187 )
190def new_service_stats(
191 id: str, started: datetime, config: ServiceConfig
192) -> ServiceStats:
193 return ServiceStats(
194 name=config.name,
195 id=id,
196 version=config.version,
197 started=started,
198 endpoints=[],
199 metadata=config.metadata,
200 )
203def create_endpoint_info(config: EndpointConfig) -> EndpointInfo:
204 return EndpointInfo(
205 name=config.name,
206 subject=config.subject,
207 metadata=config.metadata,
208 queue_group=config.queue_group,
209 )
212def new_service_info(id: str, config: ServiceConfig) -> ServiceInfo:
213 return ServiceInfo(
214 name=config.name,
215 id=id,
216 version=config.version,
217 description=config.description,
218 metadata=config.metadata,
219 endpoints=[],
220 )
223def new_ping_info(id: str, config: ServiceConfig) -> PingInfo:
224 return PingInfo(
225 name=config.name,
226 id=id,
227 version=config.version,
228 metadata=config.metadata,
229 )
232def encode_ping_info(info: PingInfo) -> bytes:
233 return dumps(info.as_dict(), separators=(",", ":")).encode()
236def encode_stats(stats: ServiceStats) -> bytes:
237 return dumps(stats.as_dict(), separators=(",", ":")).encode()
240def encode_info(info: ServiceInfo) -> bytes:
241 return dumps(info.as_dict(), separators=(",", ":")).encode()
244def default_clock() -> datetime:
245 """A default clock implementation."""
246 return datetime.now(timezone.utc)
249def default_id_generator() -> str:
250 """A default ID generator implementation."""
251 return secrets.token_hex(16)
254class Timer:
255 __slots__ = "_start"
257 def __init__(self) -> None:
258 self._start = time.perf_counter_ns()
260 def elapsed_nanoseconds(self) -> int:
261 return time.perf_counter_ns() - self._start