Coverage for src/nats_contrib/micro/client/client.py: 99%
82 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-27 05:11 +0100
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-27 05:11 +0100
1from __future__ import annotations
3import json
4from typing import AsyncContextManager, AsyncIterator
6from nats.aio.client import Client as NATS
7from nats.aio.msg import Msg
8from nats_contrib.request_many import (
9 RequestManyExecutor,
10 RequestManyIterator,
11 transform,
12)
14from .. import internal
15from ..api import API_PREFIX
16from ..models import PingInfo, ServiceInfo, ServiceStats
17from .errors import ServiceError
20class Client:
22 def __init__(
23 self,
24 nc: NATS,
25 default_max_wait: float = 0.5,
26 api_prefix: str = API_PREFIX,
27 ) -> None:
28 self.nc = nc
29 self.api_prefix = api_prefix
30 self.request_executor = RequestManyExecutor(nc, default_max_wait)
32 async def request(
33 self,
34 subject: str,
35 data: bytes | None = None,
36 headers: dict[str, str] | None = None,
37 timeout: float = 1,
38 ) -> Msg:
39 """Send a request and get the response.
41 This method should be prefered over using the NATS client directly
42 because it will handle the service errors properly.
44 Args:
45 subject: The subject to send the request to.
46 data: The request data.
47 headers: Additional request headers.
48 timeout: The maximum time to wait for a response.
50 Returns:
52 """
53 response = await self.nc.request(
54 subject, data or b"", headers=headers, timeout=timeout
55 )
56 if response.headers:
57 error_code = response.headers.get("Nats-Service-Error-Code")
58 if error_code: 58 ↛ 63line 58 didn't jump to line 63, because the condition on line 58 was never false
59 raise ServiceError(
60 int(error_code), response.headers.get("Nats-Service-Error", "")
61 )
63 return response
65 async def ping(
66 self,
67 service: str | None = None,
68 max_wait: float | None = None,
69 max_count: int | None = None,
70 max_interval: float | None = None,
71 ) -> list[PingInfo]:
72 """Ping all the services."""
73 subject = internal.get_internal_subject(
74 internal.ServiceVerb.PING, service, None, self.api_prefix
75 )
76 responses = await self.request_executor(
77 subject,
78 max_count=max_count,
79 max_wait=max_wait,
80 max_interval=max_interval,
81 )
82 return [PingInfo.from_response(json.loads(res.data)) for res in responses]
84 async def info(
85 self,
86 service: str | None = None,
87 max_wait: float | None = None,
88 max_count: int | None = None,
89 max_interval: float | None = None,
90 ) -> list[ServiceInfo]:
91 """Get all service informations."""
92 subject = internal.get_internal_subject(
93 internal.ServiceVerb.INFO, service, None, self.api_prefix
94 )
95 responses = await self.request_executor(
96 subject,
97 max_count=max_count,
98 max_wait=max_wait,
99 max_interval=max_interval,
100 )
101 return [ServiceInfo.from_response(json.loads(res.data)) for res in responses]
103 async def stats(
104 self,
105 service: str | None = None,
106 max_wait: float | None = None,
107 max_count: int | None = None,
108 max_interval: float | None = None,
109 ) -> list[ServiceStats]:
110 """Get all services stats."""
111 subject = internal.get_internal_subject(
112 internal.ServiceVerb.STATS, service, None, self.api_prefix
113 )
114 responses = await self.request_executor(
115 subject,
116 max_count=max_count,
117 max_wait=max_wait,
118 max_interval=max_interval,
119 )
120 return [ServiceStats.from_response(json.loads(res.data)) for res in responses]
122 def ping_iter(
123 self,
124 service: str | None = None,
125 max_wait: float | None = None,
126 max_count: int | None = None,
127 max_interval: float | None = None,
128 ) -> AsyncContextManager[AsyncIterator[PingInfo]]:
129 """Ping all the services."""
130 subject = internal.get_internal_subject(
131 internal.ServiceVerb.PING, service, None, self.api_prefix
132 )
133 return transform(
134 RequestManyIterator(
135 self.nc,
136 subject,
137 inbox=self.nc.new_inbox(),
138 max_count=max_count,
139 max_wait=max_wait,
140 max_interval=max_interval,
141 ),
142 lambda res: PingInfo.from_response(json.loads(res.data)),
143 )
145 def info_iter(
146 self,
147 service: str | None = None,
148 max_wait: float | None = None,
149 max_count: int | None = None,
150 max_interval: float | None = None,
151 ) -> AsyncContextManager[AsyncIterator[ServiceInfo]]:
152 """Get all service informations."""
153 subject = internal.get_internal_subject(
154 internal.ServiceVerb.INFO, service, None, self.api_prefix
155 )
156 return transform(
157 RequestManyIterator(
158 self.nc,
159 subject,
160 inbox=self.nc.new_inbox(),
161 max_count=max_count,
162 max_wait=max_wait,
163 max_interval=max_interval,
164 ),
165 lambda res: ServiceInfo.from_response(json.loads(res.data)),
166 )
168 def stats_iter(
169 self,
170 service: str | None = None,
171 max_wait: float | None = None,
172 max_count: int | None = None,
173 max_interval: float | None = None,
174 ) -> AsyncContextManager[AsyncIterator[ServiceStats]]:
175 """Get all services stats."""
176 subject = internal.get_internal_subject(
177 internal.ServiceVerb.STATS, service, None, self.api_prefix
178 )
179 return transform(
180 RequestManyIterator(
181 self.nc,
182 subject,
183 inbox=self.nc.new_inbox(),
184 max_count=max_count,
185 max_wait=max_wait,
186 max_interval=max_interval,
187 ),
188 lambda res: ServiceStats.from_response(json.loads(res.data)),
189 )
191 def service(self, service: str) -> Service:
192 """Get a client for a single service."""
193 return Service(self, service)
195 def instance(self, service: str, id: str) -> Instance:
196 """Get a client for a single service instance."""
197 return Instance(self, service, id)
200class Service:
201 def __init__(self, client: Client, service: str) -> None:
202 self.client = client
203 self.service = service
205 async def ping(
206 self,
207 max_wait: float | None = None,
208 max_count: int | None = None,
209 max_interval: float | None = None,
210 ) -> list[PingInfo]:
211 """Ping all the service instances."""
212 return await self.client.ping(self.service, max_wait, max_count, max_interval)
214 async def info(
215 self,
216 max_wait: float | None = None,
217 max_count: int | None = None,
218 max_interval: float | None = None,
219 ) -> list[ServiceInfo]:
220 """Get all service instance informations."""
221 return await self.client.info(self.service, max_wait, max_count, max_interval)
223 async def stats(
224 self,
225 max_wait: float | None = None,
226 max_count: int | None = None,
227 max_interval: float | None = None,
228 ) -> list[ServiceStats]:
229 """Get all service instance stats."""
230 return await self.client.stats(self.service, max_wait, max_count, max_interval)
232 def ping_iter(
233 self,
234 max_wait: float | None = None,
235 max_count: int | None = None,
236 max_interval: float | None = None,
237 ) -> AsyncContextManager[AsyncIterator[PingInfo]]:
238 """Ping all the service instances."""
239 return self.client.ping_iter(self.service, max_wait, max_count, max_interval)
241 def info_iter(
242 self,
243 max_wait: float | None = None,
244 max_count: int | None = None,
245 max_interval: float | None = None,
246 ) -> AsyncContextManager[AsyncIterator[ServiceInfo]]:
247 """Get all service instance informations."""
248 return self.client.info_iter(self.service, max_wait, max_count, max_interval)
250 def stats_iter(
251 self,
252 max_wait: float | None = None,
253 max_count: int | None = None,
254 max_interval: float | None = None,
255 ) -> AsyncContextManager[AsyncIterator[ServiceStats]]:
256 """Get all service instance stats."""
257 return self.client.stats_iter(self.service, max_wait, max_count, max_interval)
259 def instance(self, id: str) -> Instance:
260 """Get a client for a single service instance."""
261 return Instance(self.client, self.service, id)
264class Instance:
265 def __init__(self, client: Client, service: str, id: str) -> None:
266 self.client = client
267 self.service = service
268 self.id = id
270 async def ping(
271 self,
272 timeout: float = 0.5,
273 ) -> PingInfo:
274 """Ping a service instance."""
275 subject = internal.get_internal_subject(
276 internal.ServiceVerb.PING, self.service, self.id, self.client.api_prefix
277 )
278 response = await self.client.nc.request(subject, b"", timeout=timeout)
279 return PingInfo.from_response(json.loads(response.data))
281 async def info(
282 self,
283 timeout: float = 0.5,
284 ) -> ServiceInfo:
285 """Get the service instance information."""
286 subject = internal.get_internal_subject(
287 internal.ServiceVerb.INFO, self.service, self.id, self.client.api_prefix
288 )
289 response = await self.client.nc.request(subject, b"", timeout=timeout)
290 return ServiceInfo.from_response(json.loads(response.data))
292 async def stats(
293 self,
294 timeout: float = 0.5,
295 ) -> ServiceStats:
296 """Get the service instance stats."""
297 subject = internal.get_internal_subject(
298 internal.ServiceVerb.STATS,
299 self.service,
300 self.id,
301 self.client.api_prefix,
302 )
303 response = await self.client.nc.request(subject, b"", timeout=timeout)
304 return ServiceStats.from_response(json.loads(response.data))