Coverage for src/nats_contrib/micro/internal.py: 99%

94 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-02-27 05:11 +0100

1"""NATS Micro implementation based on nats-py client library. 

2 

3NATS Micro is a protocol for building microservices with NATS. 

4 

5It is documented in [ADR-32: Service API](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-32.md). 

6 

7The reference implementation is in [nats.go](https://github.com/nats-io/nats.go) under [micro package](https://pkg.go.dev/github.com/nats-io/nats.go/micro). 

8A typescript implementation is available in [nats.deno](https://github.com/nats-io/nats.deno/blob/main/nats-base-client/service.ts) 

9""" 

10 

11from __future__ import annotations 

12 

13import secrets 

14import time 

15from dataclasses import dataclass 

16from datetime import datetime, timezone 

17from enum import Enum 

18from json import dumps 

19 

20from .models import EndpointInfo, EndpointStats, PingInfo, ServiceInfo, ServiceStats 

21from .request import Handler 

22 

23 

24class ServiceVerb(str, Enum): 

25 PING = "PING" 

26 INFO = "INFO" 

27 STATS = "STATS" 

28 

29 

30def get_internal_subject( 

31 verb: ServiceVerb, 

32 service: str | None, 

33 id: str | None, 

34 api_prefix: str, 

35) -> str: 

36 """Get the internal subject for a verb.""" 

37 verb_literal = verb.value 

38 if service: 

39 if id: 

40 return f"{api_prefix}.{verb_literal}.{service}.{id}" 

41 return f"{api_prefix}.{verb_literal}.{service}" 

42 return f"{api_prefix}.{verb_literal}" 

43 

44 

45def get_internal_subjects( 

46 verb: ServiceVerb, 

47 id: str, 

48 config: ServiceConfig, 

49 api_prefix: str, 

50) -> list[str]: 

51 """Get the internal subjects for a verb.""" 

52 return [ 

53 get_internal_subject(verb, service=None, id=None, api_prefix=api_prefix), 

54 get_internal_subject(verb, service=config.name, id=None, api_prefix=api_prefix), 

55 get_internal_subject(verb, service=config.name, id=id, api_prefix=api_prefix), 

56 ] 

57 

58 

59@dataclass 

60class ServiceConfig: 

61 """The configuration of a service.""" 

62 

63 name: str 

64 """The kind of the service. Shared by all services that have the same name. 

65 This name can only have A-Z, a-z, 0-9, dash, underscore.""" 

66 

67 version: str 

68 """The version of the service. 

69 This verson must be a valid semantic version.""" 

70 

71 description: str 

72 """The description of the service.""" 

73 

74 metadata: dict[str, str] 

75 """The metadata of the service.""" 

76 

77 queue_group: str 

78 """The default queue group of the service.""" 

79 

80 pending_msgs_limit_by_endpoint: int 

81 """The default pending messages limit of the service. 

82 

83 This limit is applied BY subject. 

84 """ 

85 

86 pending_bytes_limit_by_endpoint: int 

87 """The default pending bytes limit of the service. 

88 

89 This limit is applied BY subject. 

90 """ 

91 

92 def endpoint_config( 

93 self, 

94 name: str, 

95 handler: Handler, 

96 subject: str | None = None, 

97 queue_group: str | None = None, 

98 metadata: dict[str, str] | None = None, 

99 pending_bytes_limit: int | None = None, 

100 pending_msgs_limit: int | None = None, 

101 ) -> EndpointConfig: 

102 return EndpointConfig( 

103 name=name, 

104 subject=subject or name, 

105 handler=handler, 

106 metadata=metadata or {}, 

107 queue_group=queue_group or self.queue_group, 

108 pending_bytes_limit=pending_bytes_limit 

109 or self.pending_bytes_limit_by_endpoint, 

110 pending_msgs_limit=pending_msgs_limit 

111 or self.pending_msgs_limit_by_endpoint, 

112 ) 

113 

114 

115@dataclass 

116class EndpointConfig: 

117 name: str 

118 """An alphanumeric human-readable string used to describe the endpoint. 

119 

120 Multiple endpoints can have the same names. 

121 """ 

122 

123 subject: str 

124 """The subject of the endpoint. When subject is not set, it defaults to the name of the endpoint.""" 

125 

126 handler: Handler 

127 """The handler of the endpoint.""" 

128 

129 queue_group: str 

130 """The queue group of the endpoint. When queue group is not set, it defaults to the queue group of the parent group or service.""" 

131 

132 metadata: dict[str, str] 

133 """The metadata of the endpoint.""" 

134 

135 pending_msgs_limit: int 

136 """The pending message limit for this endpoint.""" 

137 

138 pending_bytes_limit: int 

139 """The pending bytes limit for this endpoint.""" 

140 

141 

142@dataclass 

143class GroupConfig: 

144 """The configuration of a group.""" 

145 

146 name: str 

147 """The name of the group. 

148 Group names cannot contain '>' wildcard (as group name serves as subject prefix).""" 

149 

150 queue_group: str 

151 """The default queue group of the group.""" 

152 

153 pending_msgs_limit_by_endpoint: int 

154 """The default pending message limit for each endpoint within the group.""" 

155 

156 pending_bytes_limit_by_endpoint: int 

157 """The default pending bytes limit for each endpoint within the group.""" 

158 

159 def child( 

160 self, 

161 name: str, 

162 queue_group: str | None = None, 

163 pending_bytes_limit: int | None = None, 

164 pending_msgs_limit: int | None = None, 

165 ) -> GroupConfig: 

166 return GroupConfig( 

167 name=f"{self.name}.{name}", 

168 queue_group=queue_group or self.queue_group, 

169 pending_bytes_limit_by_endpoint=pending_bytes_limit 

170 or self.pending_bytes_limit_by_endpoint, 

171 pending_msgs_limit_by_endpoint=pending_msgs_limit 

172 or self.pending_msgs_limit_by_endpoint, 

173 ) 

174 

175 

176def create_endpoint_stats(config: EndpointConfig) -> EndpointStats: 

177 return EndpointStats( 

178 name=config.name, 

179 subject=config.subject, 

180 num_requests=0, 

181 num_errors=0, 

182 last_error="", 

183 processing_time=0, 

184 average_processing_time=0, 

185 queue_group=config.queue_group, 

186 data={}, 

187 ) 

188 

189 

190def new_service_stats( 

191 id: str, started: datetime, config: ServiceConfig 

192) -> ServiceStats: 

193 return ServiceStats( 

194 name=config.name, 

195 id=id, 

196 version=config.version, 

197 started=started, 

198 endpoints=[], 

199 metadata=config.metadata, 

200 ) 

201 

202 

203def create_endpoint_info(config: EndpointConfig) -> EndpointInfo: 

204 return EndpointInfo( 

205 name=config.name, 

206 subject=config.subject, 

207 metadata=config.metadata, 

208 queue_group=config.queue_group, 

209 ) 

210 

211 

212def new_service_info(id: str, config: ServiceConfig) -> ServiceInfo: 

213 return ServiceInfo( 

214 name=config.name, 

215 id=id, 

216 version=config.version, 

217 description=config.description, 

218 metadata=config.metadata, 

219 endpoints=[], 

220 ) 

221 

222 

223def new_ping_info(id: str, config: ServiceConfig) -> PingInfo: 

224 return PingInfo( 

225 name=config.name, 

226 id=id, 

227 version=config.version, 

228 metadata=config.metadata, 

229 ) 

230 

231 

232def encode_ping_info(info: PingInfo) -> bytes: 

233 return dumps(info.as_dict(), separators=(",", ":")).encode() 

234 

235 

236def encode_stats(stats: ServiceStats) -> bytes: 

237 return dumps(stats.as_dict(), separators=(",", ":")).encode() 

238 

239 

240def encode_info(info: ServiceInfo) -> bytes: 

241 return dumps(info.as_dict(), separators=(",", ":")).encode() 

242 

243 

244def default_clock() -> datetime: 

245 """A default clock implementation.""" 

246 return datetime.now(timezone.utc) 

247 

248 

249def default_id_generator() -> str: 

250 """A default ID generator implementation.""" 

251 return secrets.token_hex(16) 

252 

253 

254class Timer: 

255 __slots__ = "_start" 

256 

257 def __init__(self) -> None: 

258 self._start = time.perf_counter_ns() 

259 

260 def elapsed_nanoseconds(self) -> int: 

261 return time.perf_counter_ns() - self._start