Coverage for src/nats_contrib/micro/client/client.py: 99%

82 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-02-27 05:11 +0100

1from __future__ import annotations 

2 

3import json 

4from typing import AsyncContextManager, AsyncIterator 

5 

6from nats.aio.client import Client as NATS 

7from nats.aio.msg import Msg 

8from nats_contrib.request_many import ( 

9 RequestManyExecutor, 

10 RequestManyIterator, 

11 transform, 

12) 

13 

14from .. import internal 

15from ..api import API_PREFIX 

16from ..models import PingInfo, ServiceInfo, ServiceStats 

17from .errors import ServiceError 

18 

19 

20class Client: 

21 

22 def __init__( 

23 self, 

24 nc: NATS, 

25 default_max_wait: float = 0.5, 

26 api_prefix: str = API_PREFIX, 

27 ) -> None: 

28 self.nc = nc 

29 self.api_prefix = api_prefix 

30 self.request_executor = RequestManyExecutor(nc, default_max_wait) 

31 

32 async def request( 

33 self, 

34 subject: str, 

35 data: bytes | None = None, 

36 headers: dict[str, str] | None = None, 

37 timeout: float = 1, 

38 ) -> Msg: 

39 """Send a request and get the response. 

40 

41 This method should be prefered over using the NATS client directly 

42 because it will handle the service errors properly. 

43 

44 Args: 

45 subject: The subject to send the request to. 

46 data: The request data. 

47 headers: Additional request headers. 

48 timeout: The maximum time to wait for a response. 

49 

50 Returns: 

51 

52 """ 

53 response = await self.nc.request( 

54 subject, data or b"", headers=headers, timeout=timeout 

55 ) 

56 if response.headers: 

57 error_code = response.headers.get("Nats-Service-Error-Code") 

58 if error_code: 58 ↛ 63line 58 didn't jump to line 63, because the condition on line 58 was never false

59 raise ServiceError( 

60 int(error_code), response.headers.get("Nats-Service-Error", "") 

61 ) 

62 

63 return response 

64 

65 async def ping( 

66 self, 

67 service: str | None = None, 

68 max_wait: float | None = None, 

69 max_count: int | None = None, 

70 max_interval: float | None = None, 

71 ) -> list[PingInfo]: 

72 """Ping all the services.""" 

73 subject = internal.get_internal_subject( 

74 internal.ServiceVerb.PING, service, None, self.api_prefix 

75 ) 

76 responses = await self.request_executor( 

77 subject, 

78 max_count=max_count, 

79 max_wait=max_wait, 

80 max_interval=max_interval, 

81 ) 

82 return [PingInfo.from_response(json.loads(res.data)) for res in responses] 

83 

84 async def info( 

85 self, 

86 service: str | None = None, 

87 max_wait: float | None = None, 

88 max_count: int | None = None, 

89 max_interval: float | None = None, 

90 ) -> list[ServiceInfo]: 

91 """Get all service informations.""" 

92 subject = internal.get_internal_subject( 

93 internal.ServiceVerb.INFO, service, None, self.api_prefix 

94 ) 

95 responses = await self.request_executor( 

96 subject, 

97 max_count=max_count, 

98 max_wait=max_wait, 

99 max_interval=max_interval, 

100 ) 

101 return [ServiceInfo.from_response(json.loads(res.data)) for res in responses] 

102 

103 async def stats( 

104 self, 

105 service: str | None = None, 

106 max_wait: float | None = None, 

107 max_count: int | None = None, 

108 max_interval: float | None = None, 

109 ) -> list[ServiceStats]: 

110 """Get all services stats.""" 

111 subject = internal.get_internal_subject( 

112 internal.ServiceVerb.STATS, service, None, self.api_prefix 

113 ) 

114 responses = await self.request_executor( 

115 subject, 

116 max_count=max_count, 

117 max_wait=max_wait, 

118 max_interval=max_interval, 

119 ) 

120 return [ServiceStats.from_response(json.loads(res.data)) for res in responses] 

121 

122 def ping_iter( 

123 self, 

124 service: str | None = None, 

125 max_wait: float | None = None, 

126 max_count: int | None = None, 

127 max_interval: float | None = None, 

128 ) -> AsyncContextManager[AsyncIterator[PingInfo]]: 

129 """Ping all the services.""" 

130 subject = internal.get_internal_subject( 

131 internal.ServiceVerb.PING, service, None, self.api_prefix 

132 ) 

133 return transform( 

134 RequestManyIterator( 

135 self.nc, 

136 subject, 

137 inbox=self.nc.new_inbox(), 

138 max_count=max_count, 

139 max_wait=max_wait, 

140 max_interval=max_interval, 

141 ), 

142 lambda res: PingInfo.from_response(json.loads(res.data)), 

143 ) 

144 

145 def info_iter( 

146 self, 

147 service: str | None = None, 

148 max_wait: float | None = None, 

149 max_count: int | None = None, 

150 max_interval: float | None = None, 

151 ) -> AsyncContextManager[AsyncIterator[ServiceInfo]]: 

152 """Get all service informations.""" 

153 subject = internal.get_internal_subject( 

154 internal.ServiceVerb.INFO, service, None, self.api_prefix 

155 ) 

156 return transform( 

157 RequestManyIterator( 

158 self.nc, 

159 subject, 

160 inbox=self.nc.new_inbox(), 

161 max_count=max_count, 

162 max_wait=max_wait, 

163 max_interval=max_interval, 

164 ), 

165 lambda res: ServiceInfo.from_response(json.loads(res.data)), 

166 ) 

167 

168 def stats_iter( 

169 self, 

170 service: str | None = None, 

171 max_wait: float | None = None, 

172 max_count: int | None = None, 

173 max_interval: float | None = None, 

174 ) -> AsyncContextManager[AsyncIterator[ServiceStats]]: 

175 """Get all services stats.""" 

176 subject = internal.get_internal_subject( 

177 internal.ServiceVerb.STATS, service, None, self.api_prefix 

178 ) 

179 return transform( 

180 RequestManyIterator( 

181 self.nc, 

182 subject, 

183 inbox=self.nc.new_inbox(), 

184 max_count=max_count, 

185 max_wait=max_wait, 

186 max_interval=max_interval, 

187 ), 

188 lambda res: ServiceStats.from_response(json.loads(res.data)), 

189 ) 

190 

191 def service(self, service: str) -> Service: 

192 """Get a client for a single service.""" 

193 return Service(self, service) 

194 

195 def instance(self, service: str, id: str) -> Instance: 

196 """Get a client for a single service instance.""" 

197 return Instance(self, service, id) 

198 

199 

200class Service: 

201 def __init__(self, client: Client, service: str) -> None: 

202 self.client = client 

203 self.service = service 

204 

205 async def ping( 

206 self, 

207 max_wait: float | None = None, 

208 max_count: int | None = None, 

209 max_interval: float | None = None, 

210 ) -> list[PingInfo]: 

211 """Ping all the service instances.""" 

212 return await self.client.ping(self.service, max_wait, max_count, max_interval) 

213 

214 async def info( 

215 self, 

216 max_wait: float | None = None, 

217 max_count: int | None = None, 

218 max_interval: float | None = None, 

219 ) -> list[ServiceInfo]: 

220 """Get all service instance informations.""" 

221 return await self.client.info(self.service, max_wait, max_count, max_interval) 

222 

223 async def stats( 

224 self, 

225 max_wait: float | None = None, 

226 max_count: int | None = None, 

227 max_interval: float | None = None, 

228 ) -> list[ServiceStats]: 

229 """Get all service instance stats.""" 

230 return await self.client.stats(self.service, max_wait, max_count, max_interval) 

231 

232 def ping_iter( 

233 self, 

234 max_wait: float | None = None, 

235 max_count: int | None = None, 

236 max_interval: float | None = None, 

237 ) -> AsyncContextManager[AsyncIterator[PingInfo]]: 

238 """Ping all the service instances.""" 

239 return self.client.ping_iter(self.service, max_wait, max_count, max_interval) 

240 

241 def info_iter( 

242 self, 

243 max_wait: float | None = None, 

244 max_count: int | None = None, 

245 max_interval: float | None = None, 

246 ) -> AsyncContextManager[AsyncIterator[ServiceInfo]]: 

247 """Get all service instance informations.""" 

248 return self.client.info_iter(self.service, max_wait, max_count, max_interval) 

249 

250 def stats_iter( 

251 self, 

252 max_wait: float | None = None, 

253 max_count: int | None = None, 

254 max_interval: float | None = None, 

255 ) -> AsyncContextManager[AsyncIterator[ServiceStats]]: 

256 """Get all service instance stats.""" 

257 return self.client.stats_iter(self.service, max_wait, max_count, max_interval) 

258 

259 def instance(self, id: str) -> Instance: 

260 """Get a client for a single service instance.""" 

261 return Instance(self.client, self.service, id) 

262 

263 

264class Instance: 

265 def __init__(self, client: Client, service: str, id: str) -> None: 

266 self.client = client 

267 self.service = service 

268 self.id = id 

269 

270 async def ping( 

271 self, 

272 timeout: float = 0.5, 

273 ) -> PingInfo: 

274 """Ping a service instance.""" 

275 subject = internal.get_internal_subject( 

276 internal.ServiceVerb.PING, self.service, self.id, self.client.api_prefix 

277 ) 

278 response = await self.client.nc.request(subject, b"", timeout=timeout) 

279 return PingInfo.from_response(json.loads(response.data)) 

280 

281 async def info( 

282 self, 

283 timeout: float = 0.5, 

284 ) -> ServiceInfo: 

285 """Get the service instance information.""" 

286 subject = internal.get_internal_subject( 

287 internal.ServiceVerb.INFO, self.service, self.id, self.client.api_prefix 

288 ) 

289 response = await self.client.nc.request(subject, b"", timeout=timeout) 

290 return ServiceInfo.from_response(json.loads(response.data)) 

291 

292 async def stats( 

293 self, 

294 timeout: float = 0.5, 

295 ) -> ServiceStats: 

296 """Get the service instance stats.""" 

297 subject = internal.get_internal_subject( 

298 internal.ServiceVerb.STATS, 

299 self.service, 

300 self.id, 

301 self.client.api_prefix, 

302 ) 

303 response = await self.client.nc.request(subject, b"", timeout=timeout) 

304 return ServiceStats.from_response(json.loads(response.data))