Coverage for src/nats_contrib/micro/client.py: 99%
89 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-03-06 11:09 +0100
« prev ^ index » next coverage.py v7.4.3, created at 2024-03-06 11:09 +0100
1from __future__ import annotations
3import json
4from typing import AsyncContextManager, AsyncIterator
6from nats.aio.client import Client as NATS
7from nats.aio.msg import Msg
8from nats_contrib.request_many import (
9 RequestManyExecutor,
10 RequestManyIterator,
11 transform,
12)
14from . import internal
15from .api import API_PREFIX
16from .models import PingInfo, ServiceInfo, ServiceStats
19class ServiceError(Exception):
20 """Raised when a service error is received."""
22 def __init__(
23 self,
24 code: int,
25 description: str,
26 subject: str,
27 data: bytes,
28 headers: dict[str, str],
29 ) -> None:
30 super().__init__(f"Service error {code}: {description}")
31 self.code = code
32 self.description = description
33 self.subject = subject
34 self.data = data
35 self.headers = headers
38class Client:
40 def __init__(
41 self,
42 nc: NATS,
43 default_max_wait: float = 0.5,
44 api_prefix: str = API_PREFIX,
45 ) -> None:
46 self.nc = nc
47 self.api_prefix = api_prefix
48 self.request_executor = RequestManyExecutor(nc, default_max_wait)
50 async def request(
51 self,
52 subject: str,
53 data: bytes | None = None,
54 headers: dict[str, str] | None = None,
55 timeout: float = 1,
56 ) -> Msg:
57 """Send a request and get the response.
59 This method should be prefered over using the NATS client directly
60 because it will handle the service errors properly.
62 Args:
63 subject: The subject to send the request to.
64 data: The request data.
65 headers: Additional request headers.
66 timeout: The maximum time to wait for a response.
68 Returns:
70 """
71 response = await self.nc.request(
72 subject, data or b"", headers=headers, timeout=timeout
73 )
74 if response.headers:
75 error_code = response.headers.get("Nats-Service-Error-Code")
76 if error_code: 76 ↛ 85line 76 didn't jump to line 85, because the condition on line 76 was never false
77 raise ServiceError(
78 int(error_code),
79 response.headers.get("Nats-Service-Error", ""),
80 subject=subject,
81 data=response.data,
82 headers=response.headers or {},
83 )
85 return response
87 async def ping(
88 self,
89 service: str | None = None,
90 max_wait: float | None = None,
91 max_count: int | None = None,
92 max_interval: float | None = None,
93 ) -> list[PingInfo]:
94 """Ping all the services."""
95 subject = internal.get_internal_subject(
96 internal.ServiceVerb.PING, service, None, self.api_prefix
97 )
98 responses = await self.request_executor(
99 subject,
100 max_count=max_count,
101 max_wait=max_wait,
102 max_interval=max_interval,
103 )
104 return [PingInfo.from_response(json.loads(res.data)) for res in responses]
106 async def info(
107 self,
108 service: str | None = None,
109 max_wait: float | None = None,
110 max_count: int | None = None,
111 max_interval: float | None = None,
112 ) -> list[ServiceInfo]:
113 """Get all service informations."""
114 subject = internal.get_internal_subject(
115 internal.ServiceVerb.INFO, service, None, self.api_prefix
116 )
117 responses = await self.request_executor(
118 subject,
119 max_count=max_count,
120 max_wait=max_wait,
121 max_interval=max_interval,
122 )
123 return [ServiceInfo.from_response(json.loads(res.data)) for res in responses]
125 async def stats(
126 self,
127 service: str | None = None,
128 max_wait: float | None = None,
129 max_count: int | None = None,
130 max_interval: float | None = None,
131 ) -> list[ServiceStats]:
132 """Get all services stats."""
133 subject = internal.get_internal_subject(
134 internal.ServiceVerb.STATS, service, None, self.api_prefix
135 )
136 responses = await self.request_executor(
137 subject,
138 max_count=max_count,
139 max_wait=max_wait,
140 max_interval=max_interval,
141 )
142 return [ServiceStats.from_response(json.loads(res.data)) for res in responses]
144 def ping_iter(
145 self,
146 service: str | None = None,
147 max_wait: float | None = None,
148 max_count: int | None = None,
149 max_interval: float | None = None,
150 ) -> AsyncContextManager[AsyncIterator[PingInfo]]:
151 """Ping all the services."""
152 subject = internal.get_internal_subject(
153 internal.ServiceVerb.PING, service, None, self.api_prefix
154 )
155 return transform(
156 RequestManyIterator(
157 self.nc,
158 subject,
159 inbox=self.nc.new_inbox(),
160 max_count=max_count,
161 max_wait=max_wait,
162 max_interval=max_interval,
163 ),
164 lambda res: PingInfo.from_response(json.loads(res.data)),
165 )
167 def info_iter(
168 self,
169 service: str | None = None,
170 max_wait: float | None = None,
171 max_count: int | None = None,
172 max_interval: float | None = None,
173 ) -> AsyncContextManager[AsyncIterator[ServiceInfo]]:
174 """Get all service informations."""
175 subject = internal.get_internal_subject(
176 internal.ServiceVerb.INFO, service, None, self.api_prefix
177 )
178 return transform(
179 RequestManyIterator(
180 self.nc,
181 subject,
182 inbox=self.nc.new_inbox(),
183 max_count=max_count,
184 max_wait=max_wait,
185 max_interval=max_interval,
186 ),
187 lambda res: ServiceInfo.from_response(json.loads(res.data)),
188 )
190 def stats_iter(
191 self,
192 service: str | None = None,
193 max_wait: float | None = None,
194 max_count: int | None = None,
195 max_interval: float | None = None,
196 ) -> AsyncContextManager[AsyncIterator[ServiceStats]]:
197 """Get all services stats."""
198 subject = internal.get_internal_subject(
199 internal.ServiceVerb.STATS, service, None, self.api_prefix
200 )
201 return transform(
202 RequestManyIterator(
203 self.nc,
204 subject,
205 inbox=self.nc.new_inbox(),
206 max_count=max_count,
207 max_wait=max_wait,
208 max_interval=max_interval,
209 ),
210 lambda res: ServiceStats.from_response(json.loads(res.data)),
211 )
213 def service(self, service: str) -> Service:
214 """Get a client for a single service."""
215 return Service(self, service)
217 def instance(self, service: str, id: str) -> Instance:
218 """Get a client for a single service instance."""
219 return Instance(self, service, id)
222class Service:
223 def __init__(self, client: Client, service: str) -> None:
224 self.client = client
225 self.service = service
227 async def ping(
228 self,
229 max_wait: float | None = None,
230 max_count: int | None = None,
231 max_interval: float | None = None,
232 ) -> list[PingInfo]:
233 """Ping all the service instances."""
234 return await self.client.ping(self.service, max_wait, max_count, max_interval)
236 async def info(
237 self,
238 max_wait: float | None = None,
239 max_count: int | None = None,
240 max_interval: float | None = None,
241 ) -> list[ServiceInfo]:
242 """Get all service instance informations."""
243 return await self.client.info(self.service, max_wait, max_count, max_interval)
245 async def stats(
246 self,
247 max_wait: float | None = None,
248 max_count: int | None = None,
249 max_interval: float | None = None,
250 ) -> list[ServiceStats]:
251 """Get all service instance stats."""
252 return await self.client.stats(self.service, max_wait, max_count, max_interval)
254 def ping_iter(
255 self,
256 max_wait: float | None = None,
257 max_count: int | None = None,
258 max_interval: float | None = None,
259 ) -> AsyncContextManager[AsyncIterator[PingInfo]]:
260 """Ping all the service instances."""
261 return self.client.ping_iter(self.service, max_wait, max_count, max_interval)
263 def info_iter(
264 self,
265 max_wait: float | None = None,
266 max_count: int | None = None,
267 max_interval: float | None = None,
268 ) -> AsyncContextManager[AsyncIterator[ServiceInfo]]:
269 """Get all service instance informations."""
270 return self.client.info_iter(self.service, max_wait, max_count, max_interval)
272 def stats_iter(
273 self,
274 max_wait: float | None = None,
275 max_count: int | None = None,
276 max_interval: float | None = None,
277 ) -> AsyncContextManager[AsyncIterator[ServiceStats]]:
278 """Get all service instance stats."""
279 return self.client.stats_iter(self.service, max_wait, max_count, max_interval)
281 def instance(self, id: str) -> Instance:
282 """Get a client for a single service instance."""
283 return Instance(self.client, self.service, id)
286class Instance:
287 def __init__(self, client: Client, service: str, id: str) -> None:
288 self.client = client
289 self.service = service
290 self.id = id
292 async def ping(
293 self,
294 timeout: float = 0.5,
295 ) -> PingInfo:
296 """Ping a service instance."""
297 subject = internal.get_internal_subject(
298 internal.ServiceVerb.PING, self.service, self.id, self.client.api_prefix
299 )
300 response = await self.client.nc.request(subject, b"", timeout=timeout)
301 return PingInfo.from_response(json.loads(response.data))
303 async def info(
304 self,
305 timeout: float = 0.5,
306 ) -> ServiceInfo:
307 """Get the service instance information."""
308 subject = internal.get_internal_subject(
309 internal.ServiceVerb.INFO, self.service, self.id, self.client.api_prefix
310 )
311 response = await self.client.nc.request(subject, b"", timeout=timeout)
312 return ServiceInfo.from_response(json.loads(response.data))
314 async def stats(
315 self,
316 timeout: float = 0.5,
317 ) -> ServiceStats:
318 """Get the service instance stats."""
319 subject = internal.get_internal_subject(
320 internal.ServiceVerb.STATS,
321 self.service,
322 self.id,
323 self.client.api_prefix,
324 )
325 response = await self.client.nc.request(subject, b"", timeout=timeout)
326 return ServiceStats.from_response(json.loads(response.data))