Coverage for src/nats_contrib/micro/models.py: 94%
135 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-27 05:11 +0100
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-27 05:11 +0100
1from __future__ import annotations
3import datetime
4from dataclasses import dataclass, fields, replace
5from typing import Any, TypeVar
7T = TypeVar("T", bound="Base")
10@dataclass
11class Base:
12 @classmethod
13 def from_response(cls: type[T], resp: dict[str, Any]) -> T:
14 """Read the class instance from a server response.
16 Unknown fields are ignored ("open-world assumption").
17 """
18 params = {}
19 for field in fields(cls):
20 if field.name in resp: 20 ↛ 19line 20 didn't jump to line 19, because the condition on line 20 was never false
21 params[field.name] = resp[field.name]
22 return cls(**params)
24 def as_dict(self) -> dict[str, Any]:
25 """Return the object converted into an API-friendly dict."""
26 result: dict[str, Any] = {}
27 for field in fields(self):
28 val = getattr(self, field.name)
29 if val is None: 29 ↛ 30line 29 didn't jump to line 30, because the condition on line 29 was never true
30 continue
31 result[field.name] = val
32 return result
34 @staticmethod
35 def _convert_rfc3339(resp: dict[str, Any], field: str) -> None:
36 """Convert a RFC 3339 formatted string into a datetime.
37 If the string is None, None is returned.
38 """
39 val = resp.get(field, None)
40 if val is None: 40 ↛ 41line 40 didn't jump to line 41, because the condition on line 40 was never true
41 return None
42 raw_date = val[:26]
43 if raw_date.endswith("Z"):
44 raw_date = raw_date[:-1] + "+00:00"
45 resp[field] = datetime.datetime.fromisoformat(raw_date).replace(
46 tzinfo=datetime.timezone.utc
47 )
49 @staticmethod
50 def _to_rfc3339(date: datetime.datetime) -> str:
51 """Convert a datetime into RFC 3339 formatted string.
52 If datetime does not have timezone information, datetime
53 is assumed to be in UTC timezone.
54 """
55 if date.tzinfo is None: 55 ↛ 56line 55 didn't jump to line 56, because the condition on line 55 was never true
56 date = date.replace(tzinfo=datetime.timezone.utc)
57 elif date.tzinfo != datetime.timezone.utc: 57 ↛ 58line 57 didn't jump to line 58, because the condition on line 57 was never true
58 date = date.astimezone(datetime.timezone.utc)
59 return date.isoformat().replace("+00:00", "Z").replace(".000000", "")
62@dataclass
63class EndpointStats(Base):
64 """
65 Statistics about a specific service endpoint
66 """
68 name: str
69 """
70 The endpoint name
71 """
72 subject: str
73 """
74 The subject the endpoint listens on
75 """
76 num_requests: int
77 """
78 The number of requests this endpoint received
79 """
80 num_errors: int
81 """
82 The number of errors this endpoint encountered
83 """
84 last_error: str
85 """
86 The last error the service encountered
87 """
88 processing_time: int
89 """
90 How long, in total, was spent processing requests in the handler
91 """
92 average_processing_time: int
93 """
94 The average time spent processing requests
95 """
96 queue_group: str | None = None
97 """
98 The queue group this endpoint listens on for requests
99 """
100 data: dict[str, object] | None = None
101 """
102 Additional statistics the endpoint makes available
103 """
105 def copy(self) -> EndpointStats:
106 return replace(self, data=None if self.data is None else self.data.copy())
109@dataclass
110class ServiceStats(Base):
111 """The statistics of a service."""
113 name: str
114 """
115 The kind of the service. Shared by all the services that have the same name
116 """
117 id: str
118 """
119 A unique ID for this instance of a service
120 """
121 version: str
122 """
123 The version of the service
124 """
125 started: datetime.datetime
126 """
127 The time the service was stated in RFC3339 format
128 """
129 endpoints: list[EndpointStats]
130 """
131 Statistics for each known endpoint
132 """
133 metadata: dict[str, str] | None = None
134 """Service metadata."""
136 type: str = "io.nats.micro.v1.stats_response"
138 def copy(self) -> ServiceStats:
139 return replace(
140 self,
141 endpoints=[ep.copy() for ep in self.endpoints],
142 metadata=None if self.metadata is None else self.metadata.copy(),
143 )
145 def as_dict(self) -> dict[str, Any]:
146 """Return the object converted into an API-friendly dict."""
147 result = super().as_dict()
148 result["endpoints"] = [ep.as_dict() for ep in self.endpoints]
149 result["started"] = self._to_rfc3339(self.started)
150 return result
152 @classmethod
153 def from_response(cls, resp: dict[str, Any]) -> ServiceStats:
154 """Read the class instance from a server response.
156 Unknown fields are ignored ("open-world assumption").
157 """
158 cls._convert_rfc3339(resp, "started")
159 stats = super().from_response(resp)
160 stats.endpoints = [EndpointStats.from_response(ep) for ep in resp["endpoints"]]
161 return stats
164@dataclass
165class EndpointInfo(Base):
166 """The information of an endpoint."""
168 name: str
169 """
170 The endopoint name
171 """
172 subject: str
173 """
174 The subject the endpoint listens on
175 """
176 metadata: dict[str, str] | None = None
177 """
178 The endpoint metadata.
179 """
180 queue_group: str | None = None
181 """
182 The queue group this endpoint listens on for requests
183 """
185 def copy(self) -> EndpointInfo:
186 return replace(
187 self,
188 metadata=None if self.metadata is None else self.metadata.copy(),
189 )
192@dataclass
193class ServiceInfo(Base):
194 """The information of a service."""
196 name: str
197 """
198 The kind of the service. Shared by all the services that have the same name
199 """
200 id: str
201 """
202 A unique ID for this instance of a service
203 """
204 version: str
205 """
206 The version of the service
207 """
208 description: str
209 """
210 The description of the service supplied as configuration while creating the service
211 """
212 metadata: dict[str, str]
213 """
214 The service metadata
215 """
216 endpoints: list[EndpointInfo]
217 """
218 Information for all service endpoints
219 """
220 type: str = "io.nats.micro.v1.info_response"
222 def copy(self) -> ServiceInfo:
223 return replace(
224 self,
225 endpoints=[ep.copy() for ep in self.endpoints],
226 metadata=self.metadata.copy(),
227 )
229 def as_dict(self) -> dict[str, Any]:
230 """Return the object converted into an API-friendly dict."""
231 result = super().as_dict()
232 result["endpoints"] = [ep.as_dict() for ep in self.endpoints]
233 return result
235 @classmethod
236 def from_response(cls, resp: dict[str, Any]) -> ServiceInfo:
237 """Read the class instance from a server response.
239 Unknown fields are ignored ("open-world assumption").
240 """
241 info = super().from_response(resp)
242 info.endpoints = [EndpointInfo(**ep) for ep in resp["endpoints"]]
243 return info
246@dataclass
247class PingInfo(Base):
248 """The response to a ping message."""
250 name: str
251 id: str
252 version: str
253 metadata: dict[str, str]
254 type: str = "io.nats.micro.v1.ping_response"
256 def copy(self) -> PingInfo:
257 return replace(self, metadata=self.metadata.copy())