Coverage for src/nats_contrib/micro/client.py: 99%

89 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-03-06 11:09 +0100

1from __future__ import annotations 

2 

3import json 

4from typing import AsyncContextManager, AsyncIterator 

5 

6from nats.aio.client import Client as NATS 

7from nats.aio.msg import Msg 

8from nats_contrib.request_many import ( 

9 RequestManyExecutor, 

10 RequestManyIterator, 

11 transform, 

12) 

13 

14from . import internal 

15from .api import API_PREFIX 

16from .models import PingInfo, ServiceInfo, ServiceStats 

17 

18 

19class ServiceError(Exception): 

20 """Raised when a service error is received.""" 

21 

22 def __init__( 

23 self, 

24 code: int, 

25 description: str, 

26 subject: str, 

27 data: bytes, 

28 headers: dict[str, str], 

29 ) -> None: 

30 super().__init__(f"Service error {code}: {description}") 

31 self.code = code 

32 self.description = description 

33 self.subject = subject 

34 self.data = data 

35 self.headers = headers 

36 

37 

38class Client: 

39 

40 def __init__( 

41 self, 

42 nc: NATS, 

43 default_max_wait: float = 0.5, 

44 api_prefix: str = API_PREFIX, 

45 ) -> None: 

46 self.nc = nc 

47 self.api_prefix = api_prefix 

48 self.request_executor = RequestManyExecutor(nc, default_max_wait) 

49 

50 async def request( 

51 self, 

52 subject: str, 

53 data: bytes | None = None, 

54 headers: dict[str, str] | None = None, 

55 timeout: float = 1, 

56 ) -> Msg: 

57 """Send a request and get the response. 

58 

59 This method should be prefered over using the NATS client directly 

60 because it will handle the service errors properly. 

61 

62 Args: 

63 subject: The subject to send the request to. 

64 data: The request data. 

65 headers: Additional request headers. 

66 timeout: The maximum time to wait for a response. 

67 

68 Returns: 

69 

70 """ 

71 response = await self.nc.request( 

72 subject, data or b"", headers=headers, timeout=timeout 

73 ) 

74 if response.headers: 

75 error_code = response.headers.get("Nats-Service-Error-Code") 

76 if error_code: 76 ↛ 85line 76 didn't jump to line 85, because the condition on line 76 was never false

77 raise ServiceError( 

78 int(error_code), 

79 response.headers.get("Nats-Service-Error", ""), 

80 subject=subject, 

81 data=response.data, 

82 headers=response.headers or {}, 

83 ) 

84 

85 return response 

86 

87 async def ping( 

88 self, 

89 service: str | None = None, 

90 max_wait: float | None = None, 

91 max_count: int | None = None, 

92 max_interval: float | None = None, 

93 ) -> list[PingInfo]: 

94 """Ping all the services.""" 

95 subject = internal.get_internal_subject( 

96 internal.ServiceVerb.PING, service, None, self.api_prefix 

97 ) 

98 responses = await self.request_executor( 

99 subject, 

100 max_count=max_count, 

101 max_wait=max_wait, 

102 max_interval=max_interval, 

103 ) 

104 return [PingInfo.from_response(json.loads(res.data)) for res in responses] 

105 

106 async def info( 

107 self, 

108 service: str | None = None, 

109 max_wait: float | None = None, 

110 max_count: int | None = None, 

111 max_interval: float | None = None, 

112 ) -> list[ServiceInfo]: 

113 """Get all service informations.""" 

114 subject = internal.get_internal_subject( 

115 internal.ServiceVerb.INFO, service, None, self.api_prefix 

116 ) 

117 responses = await self.request_executor( 

118 subject, 

119 max_count=max_count, 

120 max_wait=max_wait, 

121 max_interval=max_interval, 

122 ) 

123 return [ServiceInfo.from_response(json.loads(res.data)) for res in responses] 

124 

125 async def stats( 

126 self, 

127 service: str | None = None, 

128 max_wait: float | None = None, 

129 max_count: int | None = None, 

130 max_interval: float | None = None, 

131 ) -> list[ServiceStats]: 

132 """Get all services stats.""" 

133 subject = internal.get_internal_subject( 

134 internal.ServiceVerb.STATS, service, None, self.api_prefix 

135 ) 

136 responses = await self.request_executor( 

137 subject, 

138 max_count=max_count, 

139 max_wait=max_wait, 

140 max_interval=max_interval, 

141 ) 

142 return [ServiceStats.from_response(json.loads(res.data)) for res in responses] 

143 

144 def ping_iter( 

145 self, 

146 service: str | None = None, 

147 max_wait: float | None = None, 

148 max_count: int | None = None, 

149 max_interval: float | None = None, 

150 ) -> AsyncContextManager[AsyncIterator[PingInfo]]: 

151 """Ping all the services.""" 

152 subject = internal.get_internal_subject( 

153 internal.ServiceVerb.PING, service, None, self.api_prefix 

154 ) 

155 return transform( 

156 RequestManyIterator( 

157 self.nc, 

158 subject, 

159 inbox=self.nc.new_inbox(), 

160 max_count=max_count, 

161 max_wait=max_wait, 

162 max_interval=max_interval, 

163 ), 

164 lambda res: PingInfo.from_response(json.loads(res.data)), 

165 ) 

166 

167 def info_iter( 

168 self, 

169 service: str | None = None, 

170 max_wait: float | None = None, 

171 max_count: int | None = None, 

172 max_interval: float | None = None, 

173 ) -> AsyncContextManager[AsyncIterator[ServiceInfo]]: 

174 """Get all service informations.""" 

175 subject = internal.get_internal_subject( 

176 internal.ServiceVerb.INFO, service, None, self.api_prefix 

177 ) 

178 return transform( 

179 RequestManyIterator( 

180 self.nc, 

181 subject, 

182 inbox=self.nc.new_inbox(), 

183 max_count=max_count, 

184 max_wait=max_wait, 

185 max_interval=max_interval, 

186 ), 

187 lambda res: ServiceInfo.from_response(json.loads(res.data)), 

188 ) 

189 

190 def stats_iter( 

191 self, 

192 service: str | None = None, 

193 max_wait: float | None = None, 

194 max_count: int | None = None, 

195 max_interval: float | None = None, 

196 ) -> AsyncContextManager[AsyncIterator[ServiceStats]]: 

197 """Get all services stats.""" 

198 subject = internal.get_internal_subject( 

199 internal.ServiceVerb.STATS, service, None, self.api_prefix 

200 ) 

201 return transform( 

202 RequestManyIterator( 

203 self.nc, 

204 subject, 

205 inbox=self.nc.new_inbox(), 

206 max_count=max_count, 

207 max_wait=max_wait, 

208 max_interval=max_interval, 

209 ), 

210 lambda res: ServiceStats.from_response(json.loads(res.data)), 

211 ) 

212 

213 def service(self, service: str) -> Service: 

214 """Get a client for a single service.""" 

215 return Service(self, service) 

216 

217 def instance(self, service: str, id: str) -> Instance: 

218 """Get a client for a single service instance.""" 

219 return Instance(self, service, id) 

220 

221 

222class Service: 

223 def __init__(self, client: Client, service: str) -> None: 

224 self.client = client 

225 self.service = service 

226 

227 async def ping( 

228 self, 

229 max_wait: float | None = None, 

230 max_count: int | None = None, 

231 max_interval: float | None = None, 

232 ) -> list[PingInfo]: 

233 """Ping all the service instances.""" 

234 return await self.client.ping(self.service, max_wait, max_count, max_interval) 

235 

236 async def info( 

237 self, 

238 max_wait: float | None = None, 

239 max_count: int | None = None, 

240 max_interval: float | None = None, 

241 ) -> list[ServiceInfo]: 

242 """Get all service instance informations.""" 

243 return await self.client.info(self.service, max_wait, max_count, max_interval) 

244 

245 async def stats( 

246 self, 

247 max_wait: float | None = None, 

248 max_count: int | None = None, 

249 max_interval: float | None = None, 

250 ) -> list[ServiceStats]: 

251 """Get all service instance stats.""" 

252 return await self.client.stats(self.service, max_wait, max_count, max_interval) 

253 

254 def ping_iter( 

255 self, 

256 max_wait: float | None = None, 

257 max_count: int | None = None, 

258 max_interval: float | None = None, 

259 ) -> AsyncContextManager[AsyncIterator[PingInfo]]: 

260 """Ping all the service instances.""" 

261 return self.client.ping_iter(self.service, max_wait, max_count, max_interval) 

262 

263 def info_iter( 

264 self, 

265 max_wait: float | None = None, 

266 max_count: int | None = None, 

267 max_interval: float | None = None, 

268 ) -> AsyncContextManager[AsyncIterator[ServiceInfo]]: 

269 """Get all service instance informations.""" 

270 return self.client.info_iter(self.service, max_wait, max_count, max_interval) 

271 

272 def stats_iter( 

273 self, 

274 max_wait: float | None = None, 

275 max_count: int | None = None, 

276 max_interval: float | None = None, 

277 ) -> AsyncContextManager[AsyncIterator[ServiceStats]]: 

278 """Get all service instance stats.""" 

279 return self.client.stats_iter(self.service, max_wait, max_count, max_interval) 

280 

281 def instance(self, id: str) -> Instance: 

282 """Get a client for a single service instance.""" 

283 return Instance(self.client, self.service, id) 

284 

285 

286class Instance: 

287 def __init__(self, client: Client, service: str, id: str) -> None: 

288 self.client = client 

289 self.service = service 

290 self.id = id 

291 

292 async def ping( 

293 self, 

294 timeout: float = 0.5, 

295 ) -> PingInfo: 

296 """Ping a service instance.""" 

297 subject = internal.get_internal_subject( 

298 internal.ServiceVerb.PING, self.service, self.id, self.client.api_prefix 

299 ) 

300 response = await self.client.nc.request(subject, b"", timeout=timeout) 

301 return PingInfo.from_response(json.loads(response.data)) 

302 

303 async def info( 

304 self, 

305 timeout: float = 0.5, 

306 ) -> ServiceInfo: 

307 """Get the service instance information.""" 

308 subject = internal.get_internal_subject( 

309 internal.ServiceVerb.INFO, self.service, self.id, self.client.api_prefix 

310 ) 

311 response = await self.client.nc.request(subject, b"", timeout=timeout) 

312 return ServiceInfo.from_response(json.loads(response.data)) 

313 

314 async def stats( 

315 self, 

316 timeout: float = 0.5, 

317 ) -> ServiceStats: 

318 """Get the service instance stats.""" 

319 subject = internal.get_internal_subject( 

320 internal.ServiceVerb.STATS, 

321 self.service, 

322 self.id, 

323 self.client.api_prefix, 

324 ) 

325 response = await self.client.nc.request(subject, b"", timeout=timeout) 

326 return ServiceStats.from_response(json.loads(response.data))