Coverage for src/nats_contrib/micro/models.py: 94%

135 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-02-27 05:11 +0100

1from __future__ import annotations 

2 

3import datetime 

4from dataclasses import dataclass, fields, replace 

5from typing import Any, TypeVar 

6 

7T = TypeVar("T", bound="Base") 

8 

9 

10@dataclass 

11class Base: 

12 @classmethod 

13 def from_response(cls: type[T], resp: dict[str, Any]) -> T: 

14 """Read the class instance from a server response. 

15 

16 Unknown fields are ignored ("open-world assumption"). 

17 """ 

18 params = {} 

19 for field in fields(cls): 

20 if field.name in resp: 20 ↛ 19line 20 didn't jump to line 19, because the condition on line 20 was never false

21 params[field.name] = resp[field.name] 

22 return cls(**params) 

23 

24 def as_dict(self) -> dict[str, Any]: 

25 """Return the object converted into an API-friendly dict.""" 

26 result: dict[str, Any] = {} 

27 for field in fields(self): 

28 val = getattr(self, field.name) 

29 if val is None: 29 ↛ 30line 29 didn't jump to line 30, because the condition on line 29 was never true

30 continue 

31 result[field.name] = val 

32 return result 

33 

34 @staticmethod 

35 def _convert_rfc3339(resp: dict[str, Any], field: str) -> None: 

36 """Convert a RFC 3339 formatted string into a datetime. 

37 If the string is None, None is returned. 

38 """ 

39 val = resp.get(field, None) 

40 if val is None: 40 ↛ 41line 40 didn't jump to line 41, because the condition on line 40 was never true

41 return None 

42 raw_date = val[:26] 

43 if raw_date.endswith("Z"): 

44 raw_date = raw_date[:-1] + "+00:00" 

45 resp[field] = datetime.datetime.fromisoformat(raw_date).replace( 

46 tzinfo=datetime.timezone.utc 

47 ) 

48 

49 @staticmethod 

50 def _to_rfc3339(date: datetime.datetime) -> str: 

51 """Convert a datetime into RFC 3339 formatted string. 

52 If datetime does not have timezone information, datetime 

53 is assumed to be in UTC timezone. 

54 """ 

55 if date.tzinfo is None: 55 ↛ 56line 55 didn't jump to line 56, because the condition on line 55 was never true

56 date = date.replace(tzinfo=datetime.timezone.utc) 

57 elif date.tzinfo != datetime.timezone.utc: 57 ↛ 58line 57 didn't jump to line 58, because the condition on line 57 was never true

58 date = date.astimezone(datetime.timezone.utc) 

59 return date.isoformat().replace("+00:00", "Z").replace(".000000", "") 

60 

61 

62@dataclass 

63class EndpointStats(Base): 

64 """ 

65 Statistics about a specific service endpoint 

66 """ 

67 

68 name: str 

69 """ 

70 The endpoint name 

71 """ 

72 subject: str 

73 """ 

74 The subject the endpoint listens on 

75 """ 

76 num_requests: int 

77 """ 

78 The number of requests this endpoint received 

79 """ 

80 num_errors: int 

81 """ 

82 The number of errors this endpoint encountered 

83 """ 

84 last_error: str 

85 """ 

86 The last error the service encountered 

87 """ 

88 processing_time: int 

89 """ 

90 How long, in total, was spent processing requests in the handler 

91 """ 

92 average_processing_time: int 

93 """ 

94 The average time spent processing requests 

95 """ 

96 queue_group: str | None = None 

97 """ 

98 The queue group this endpoint listens on for requests 

99 """ 

100 data: dict[str, object] | None = None 

101 """ 

102 Additional statistics the endpoint makes available 

103 """ 

104 

105 def copy(self) -> EndpointStats: 

106 return replace(self, data=None if self.data is None else self.data.copy()) 

107 

108 

109@dataclass 

110class ServiceStats(Base): 

111 """The statistics of a service.""" 

112 

113 name: str 

114 """ 

115 The kind of the service. Shared by all the services that have the same name 

116 """ 

117 id: str 

118 """ 

119 A unique ID for this instance of a service 

120 """ 

121 version: str 

122 """ 

123 The version of the service 

124 """ 

125 started: datetime.datetime 

126 """ 

127 The time the service was stated in RFC3339 format 

128 """ 

129 endpoints: list[EndpointStats] 

130 """ 

131 Statistics for each known endpoint 

132 """ 

133 metadata: dict[str, str] | None = None 

134 """Service metadata.""" 

135 

136 type: str = "io.nats.micro.v1.stats_response" 

137 

138 def copy(self) -> ServiceStats: 

139 return replace( 

140 self, 

141 endpoints=[ep.copy() for ep in self.endpoints], 

142 metadata=None if self.metadata is None else self.metadata.copy(), 

143 ) 

144 

145 def as_dict(self) -> dict[str, Any]: 

146 """Return the object converted into an API-friendly dict.""" 

147 result = super().as_dict() 

148 result["endpoints"] = [ep.as_dict() for ep in self.endpoints] 

149 result["started"] = self._to_rfc3339(self.started) 

150 return result 

151 

152 @classmethod 

153 def from_response(cls, resp: dict[str, Any]) -> ServiceStats: 

154 """Read the class instance from a server response. 

155 

156 Unknown fields are ignored ("open-world assumption"). 

157 """ 

158 cls._convert_rfc3339(resp, "started") 

159 stats = super().from_response(resp) 

160 stats.endpoints = [EndpointStats.from_response(ep) for ep in resp["endpoints"]] 

161 return stats 

162 

163 

164@dataclass 

165class EndpointInfo(Base): 

166 """The information of an endpoint.""" 

167 

168 name: str 

169 """ 

170 The endopoint name 

171 """ 

172 subject: str 

173 """ 

174 The subject the endpoint listens on 

175 """ 

176 metadata: dict[str, str] | None = None 

177 """ 

178 The endpoint metadata. 

179 """ 

180 queue_group: str | None = None 

181 """ 

182 The queue group this endpoint listens on for requests 

183 """ 

184 

185 def copy(self) -> EndpointInfo: 

186 return replace( 

187 self, 

188 metadata=None if self.metadata is None else self.metadata.copy(), 

189 ) 

190 

191 

192@dataclass 

193class ServiceInfo(Base): 

194 """The information of a service.""" 

195 

196 name: str 

197 """ 

198 The kind of the service. Shared by all the services that have the same name 

199 """ 

200 id: str 

201 """ 

202 A unique ID for this instance of a service 

203 """ 

204 version: str 

205 """ 

206 The version of the service 

207 """ 

208 description: str 

209 """ 

210 The description of the service supplied as configuration while creating the service 

211 """ 

212 metadata: dict[str, str] 

213 """ 

214 The service metadata 

215 """ 

216 endpoints: list[EndpointInfo] 

217 """ 

218 Information for all service endpoints 

219 """ 

220 type: str = "io.nats.micro.v1.info_response" 

221 

222 def copy(self) -> ServiceInfo: 

223 return replace( 

224 self, 

225 endpoints=[ep.copy() for ep in self.endpoints], 

226 metadata=self.metadata.copy(), 

227 ) 

228 

229 def as_dict(self) -> dict[str, Any]: 

230 """Return the object converted into an API-friendly dict.""" 

231 result = super().as_dict() 

232 result["endpoints"] = [ep.as_dict() for ep in self.endpoints] 

233 return result 

234 

235 @classmethod 

236 def from_response(cls, resp: dict[str, Any]) -> ServiceInfo: 

237 """Read the class instance from a server response. 

238 

239 Unknown fields are ignored ("open-world assumption"). 

240 """ 

241 info = super().from_response(resp) 

242 info.endpoints = [EndpointInfo(**ep) for ep in resp["endpoints"]] 

243 return info 

244 

245 

246@dataclass 

247class PingInfo(Base): 

248 """The response to a ping message.""" 

249 

250 name: str 

251 id: str 

252 version: str 

253 metadata: dict[str, str] 

254 type: str = "io.nats.micro.v1.ping_response" 

255 

256 def copy(self) -> PingInfo: 

257 return replace(self, metadata=self.metadata.copy())