Reference
Handler: TypeAlias = Callable[['Request'], Awaitable[None]]
module-attribute
Handler is a function that processes a micro request.
Client
Source code in src/nats_contrib/micro/client.py
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
|
info(service=None, max_wait=None, max_count=None, max_interval=None)
async
Get all service informations.
Source code in src/nats_contrib/micro/client.py
info_iter(service=None, max_wait=None, max_count=None, max_interval=None)
Get all service informations.
Source code in src/nats_contrib/micro/client.py
instance(service, id)
ping(service=None, max_wait=None, max_count=None, max_interval=None)
async
Ping all the services.
Source code in src/nats_contrib/micro/client.py
ping_iter(service=None, max_wait=None, max_count=None, max_interval=None)
Ping all the services.
Source code in src/nats_contrib/micro/client.py
request(subject, data=None, headers=None, timeout=1)
async
Send a request and get the response.
This method should be prefered over using the NATS client directly because it will handle the service errors properly.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
subject |
str
|
The subject to send the request to. |
required |
data |
bytes | None
|
The request data. |
None
|
headers |
dict[str, str] | None
|
Additional request headers. |
None
|
timeout |
float
|
The maximum time to wait for a response. |
1
|
Returns:
Source code in src/nats_contrib/micro/client.py
service(service)
stats(service=None, max_wait=None, max_count=None, max_interval=None)
async
Get all services stats.
Source code in src/nats_contrib/micro/client.py
stats_iter(service=None, max_wait=None, max_count=None, max_interval=None)
Get all services stats.
Source code in src/nats_contrib/micro/client.py
Context
A class to run micro services easily.
This class is useful in a main function to manage ensure that all async resources are cleaned up properly when the program is cancelled.
It also allows to listen to signals and cancel the program when a signal is received easily.
Source code in src/nats_contrib/micro/context.py
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
|
add_closed_callback(callback)
Add a closed callback to the NATS client.
Source code in src/nats_contrib/micro/context.py
add_disconnected_callback(callback)
Add a disconnected callback to the NATS client.
Source code in src/nats_contrib/micro/context.py
add_error_callback(callback)
Add an error callback to the NATS client.
Source code in src/nats_contrib/micro/context.py
add_reconnected_callback(callback)
Add a reconnected callback to the NATS client.
Source code in src/nats_contrib/micro/context.py
add_service(name, version, description=None, metadata=None, queue_group=None, pending_bytes_limit_by_endpoint=None, pending_msgs_limit_by_endpoint=None, now=None, id_generator=None, api_prefix=None)
async
Add a service to the context.
This will start the service using the client used to connect to the NATS server.
Source code in src/nats_contrib/micro/context.py
cancel()
cancelled()
connect(*options)
async
Connect to the NATS server. Does not raise an error when cancelled
Source code in src/nats_contrib/micro/context.py
enter(async_context)
async
push(callback)
Add a callback to the exit stack.
Source code in src/nats_contrib/micro/context.py
reset()
run_forever(setup, /, *options, trap_signals=False)
async
Useful in a main function of a program.
This method will first connect to the NATS server using the provided options. It will then run the setup function and finally enter any additional services provided.
If trap_signals is True, it will trap SIGINT and SIGTERM signals and cancel the context when one of these signals is received.
Other signals can be trapped by providing a tuple of signals to trap.
This method will not raise an exception if the context is cancelled.
You can use .cancelled() on the context to check if the coroutine was cancelled.
Warning
The context must not have been used as an async context manager before calling this method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
setup |
Callable[[Context], Coroutine[Any, Any, None]]
|
A coroutine to setup the program. |
required |
options |
ConnectOption
|
The options to pass to the connect method. |
()
|
trap_signals |
bool | tuple[Signals, ...]
|
If True, trap SIGINT and SIGTERM signals. |
False
|
Source code in src/nats_contrib/micro/context.py
trap_signal(*signals)
Notify the context that a signal has been received.
Source code in src/nats_contrib/micro/context.py
wait()
async
wait_for(coro)
async
Run a coroutine in the context and cancel it context is cancelled.
This method does not raise an exception if the coroutine is cancelled. You can use .cancelled() on the context to check if the coroutine was cancelled.
Source code in src/nats_contrib/micro/context.py
Endpoint
Endpoint manages a service endpoint.
Source code in src/nats_contrib/micro/api.py
EndpointInfo
dataclass
Bases: Base
The information of an endpoint.
Source code in src/nats_contrib/micro/models.py
metadata: dict[str, str] | None = None
class-attribute
instance-attribute
The endpoint metadata.
name: str
instance-attribute
The endopoint name
queue_group: str | None = None
class-attribute
instance-attribute
The queue group this endpoint listens on for requests
subject: str
instance-attribute
The subject the endpoint listens on
EndpointStats
dataclass
Bases: Base
Statistics about a specific service endpoint
Source code in src/nats_contrib/micro/models.py
average_processing_time: int
instance-attribute
The average time spent processing requests
data: dict[str, object] | None = None
class-attribute
instance-attribute
Additional statistics the endpoint makes available
last_error: str
instance-attribute
The last error the service encountered
name: str
instance-attribute
The endpoint name
num_errors: int
instance-attribute
The number of errors this endpoint encountered
num_requests: int
instance-attribute
The number of requests this endpoint received
processing_time: int
instance-attribute
How long, in total, was spent processing requests in the handler
queue_group: str | None = None
class-attribute
instance-attribute
The queue group this endpoint listens on for requests
subject: str
instance-attribute
The subject the endpoint listens on
Group
Group allows for grouping endpoints on a service.
Endpoints created using Group.add_endpoint
will be grouped
under common prefix (group name). New groups can also be derived
from a group using Group.add_group
.
Source code in src/nats_contrib/micro/api.py
add_endpoint(name, handler, subject=None, queue_group=None, metadata=None, pending_bytes_limit=None, pending_msgs_limit=None, middlewares=None)
async
Add an endpoint to the group.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
The name of the endpoint. |
required |
handler |
Handler
|
The handler of the endpoint. |
required |
subject |
str | None
|
The subject of the endpoint. When subject is not set, it defaults to the name of the endpoint. |
None
|
queue_group |
str | None
|
The queue group of the endpoint. When queue group is not set, it defaults to the queue group of the parent group or service. |
None
|
metadata |
dict[str, str] | None
|
The metadata of the endpoint. |
None
|
pending_bytes_limit |
int | None
|
The pending bytes limit for this endpoint. |
None
|
pending_msgs_limit |
int | None
|
The pending messages limit for this endpoint. |
None
|
Source code in src/nats_contrib/micro/api.py
add_group(name, queue_group=None, pending_bytes_limit_by_endpoint=None, pending_msgs_limit_by_endpoint=None)
Add a group to the group.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
The name of the group. Must be a valid NATS subject prefix. |
required |
queue_group |
str | None
|
The default queue group of the group. When queue group is not set, it defaults to the queue group of the parent group or service. |
None
|
pending_bytes_limit_by_endpoint |
int | None
|
The default pending bytes limit for each endpoint within the group. |
None
|
pending_msgs_limit_by_endpoint |
int | None
|
The default pending messages limit for each endpoint within the group. |
None
|
Source code in src/nats_contrib/micro/api.py
PingInfo
dataclass
Bases: Base
The response to a ping message.
Source code in src/nats_contrib/micro/models.py
Request
Request is the interface for a request received by a service.
An interface is used instead of a class to allow for different implementations. It makes it easy to test a service by using a stub implementation of Request.
Four methods must be implemented:
def subject() -> str
: the subject on which the request was received.def headers() -> dict[str, str]
: the headers of the request.def data() -> bytes
: the data of the request.async def respond(...) -> None
: send a response to the request.
Source code in src/nats_contrib/micro/request.py
data()
abstractmethod
headers()
abstractmethod
respond(data, headers=None)
abstractmethod
async
Send a success response to the request.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
bytes
|
The response data. |
required |
headers |
dict[str, str] | None
|
Additional response headers. |
None
|
Source code in src/nats_contrib/micro/request.py
respond_error(code, description, data=None, headers=None)
async
Send an error response to the request.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
code |
int
|
The error code describing the error. |
required |
description |
str
|
A string describing the error which can be displayed to the client. |
required |
data |
bytes | None
|
The error data. |
None
|
headers |
dict[str, str] | None
|
Additional response headers. |
None
|
Source code in src/nats_contrib/micro/request.py
respond_success(code, data=None, headers=None)
async
Send a success response to the request.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
code |
int
|
The status code describing the success. |
required |
data |
bytes | None
|
The response data. |
None
|
headers |
dict[str, str] | None
|
Additional response headers. |
None
|
Source code in src/nats_contrib/micro/request.py
Service
Services simplify the development of NATS micro-services.
Endpoints can be added to a service after it has been created and started. Each endpoint is a request-reply handler for a subject.
Groups can be added to a service to group endpoints under a common prefix.
Source code in src/nats_contrib/micro/api.py
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 |
|
__aenter__()
async
__aexit__(*args, **kwargs)
async
add_endpoint(name, handler, subject=None, queue_group=None, metadata=None, pending_bytes_limit=None, pending_msgs_limit=None, middlewares=None)
async
Add an endpoint to the service.
An endpoint is a request-reply handler for a subject.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
The name of the endpoint. |
required |
handler |
Handler
|
The handler of the endpoint. |
required |
subject |
str | None
|
The subject of the endpoint. When subject is not set, it defaults to the name of the endpoint. |
None
|
queue_group |
str | None
|
The queue group of the endpoint. When queue group is not set, it defaults to the queue group of the parent group or service. |
None
|
metadata |
dict[str, str] | None
|
The metadata of the endpoint. |
None
|
pending_bytes_limit |
int | None
|
The pending bytes limit for this endpoint. |
None
|
pending_msgs_limit |
int | None
|
The pending messages limit for this endpoint. |
None
|
Source code in src/nats_contrib/micro/api.py
add_group(name, queue_group=None, pending_bytes_limit_by_endpoint=None, pending_msgs_limit_by_endpoint=None)
Add a group to the service.
A group is a collection of endpoints that share the same prefix, and the same default queue group and pending limits.
At runtime, a group does not exist as a separate entity, only endpoints exist. However, groups are useful to organize endpoints and to set default values for queue group and pending limits.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
The name of the group. |
required |
queue_group |
str | None
|
The default queue group of the group. When queue group is not set, it defaults to the queue group of the parent group or service. |
None
|
pending_bytes_limit_by_endpoint |
int | None
|
The default pending bytes limit for each endpoint within the group. |
None
|
pending_msgs_limit_by_endpoint |
int | None
|
The default pending messages limit for each endpoint within the group. |
None
|
Source code in src/nats_contrib/micro/api.py
info()
reset()
Resets all statistics (for all endpoints) on a service instance.
Source code in src/nats_contrib/micro/api.py
start()
async
Start the service.
A service MUST be started before adding endpoints.
This will start the internal subscriptions and enable service discovery.
Source code in src/nats_contrib/micro/api.py
stats()
stop()
async
Stop the service.
This will stop all endpoints and internal subscriptions.
Source code in src/nats_contrib/micro/api.py
ServiceError
Bases: Exception
Raised when a service error is received.
Source code in src/nats_contrib/micro/client.py
ServiceInfo
dataclass
Bases: Base
The information of a service.
Source code in src/nats_contrib/micro/models.py
description: str
instance-attribute
The description of the service supplied as configuration while creating the service
endpoints: list[EndpointInfo]
instance-attribute
Information for all service endpoints
id: str
instance-attribute
A unique ID for this instance of a service
metadata: dict[str, str]
instance-attribute
The service metadata
name: str
instance-attribute
The kind of the service. Shared by all the services that have the same name
version: str
instance-attribute
The version of the service
as_dict()
Return the object converted into an API-friendly dict.
from_response(resp)
classmethod
Read the class instance from a server response.
Unknown fields are ignored ("open-world assumption").
Source code in src/nats_contrib/micro/models.py
ServiceStats
dataclass
Bases: Base
The statistics of a service.
Source code in src/nats_contrib/micro/models.py
endpoints: list[EndpointStats]
instance-attribute
Statistics for each known endpoint
id: str
instance-attribute
A unique ID for this instance of a service
metadata: dict[str, str] | None = None
class-attribute
instance-attribute
Service metadata.
name: str
instance-attribute
The kind of the service. Shared by all the services that have the same name
started: datetime.datetime
instance-attribute
The time the service was stated in RFC3339 format
version: str
instance-attribute
The version of the service
as_dict()
Return the object converted into an API-friendly dict.
Source code in src/nats_contrib/micro/models.py
from_response(resp)
classmethod
Read the class instance from a server response.
Unknown fields are ignored ("open-world assumption").
Source code in src/nats_contrib/micro/models.py
add_service(nc, name, version, description=None, metadata=None, queue_group=None, pending_bytes_limit_by_endpoint=None, pending_msgs_limit_by_endpoint=None, now=None, id_generator=None, api_prefix=None)
Create a new service.
A service is a collection of endpoints that are grouped together under a common name.
Each endpoint is a request-reply handler for a subject.
It's possible to add endpoints to a service after it has been created AND started.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
nc |
Client
|
The NATS client. |
required |
name |
str
|
The name of the service. |
required |
version |
str
|
The version of the service. Must be a valid semver version. |
required |
description |
str | None
|
The description of the service. |
None
|
metadata |
dict[str, str] | None
|
The metadata of the service. |
None
|
queue_group |
str | None
|
The default queue group of the service. |
None
|
pending_bytes_limit_by_endpoint |
int | None
|
The default pending bytes limit for each endpoint within the service. |
None
|
pending_msgs_limit_by_endpoint |
int | None
|
The default pending messages limit for each endpoint within the service. |
None
|
now |
Callable[[], datetime] | None
|
The function to get the current time. |
None
|
id_generator |
Callable[[], str] | None
|
The function to generate a unique service instance id. |
None
|
api_prefix |
str | None
|
The prefix of the control subjects. |
None
|
Source code in src/nats_contrib/micro/api.py
run(setup, /, *options, trap_signals=False, client=None)
Helper function to run an async program.