Coverage for src/nats_contrib/micro/request.py: 100%
33 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-27 05:11 +0100
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-27 05:11 +0100
1from __future__ import annotations
3import abc
4from dataclasses import dataclass
5from typing import Awaitable, Callable
7from nats.aio.msg import Msg
8from typing_extensions import TypeAlias
10Handler: TypeAlias = Callable[["Request"], Awaitable[None]]
11"""Handler is a function that processes a micro request."""
14class Request(metaclass=abc.ABCMeta):
15 """Request is the interface for a request received by a service.
17 An interface is used instead of a class to allow for different implementations.
18 It makes it easy to test a service by using a stub implementation of Request.
20 Four methods must be implemented:
22 - `def subject() -> str`: the subject on which the request was received.
23 - `def headers() -> dict[str, str]`: the headers of the request.
24 - `def data() -> bytes`: the data of the request.
25 - `async def respond(...) -> None`: send a response to the request.
26 """
28 @abc.abstractmethod
29 def subject(self) -> str:
30 """The subject on which request was received."""
31 raise NotImplementedError()
33 @abc.abstractmethod
34 def headers(self) -> dict[str, str]:
35 """The headers of the request."""
36 raise NotImplementedError()
38 @abc.abstractmethod
39 def data(self) -> bytes:
40 """The data of the request."""
41 raise NotImplementedError()
43 @abc.abstractmethod
44 async def respond(self, data: bytes, headers: dict[str, str] | None = None) -> None:
45 """Send a success response to the request.
47 Args:
48 data: The response data.
49 headers: Additional response headers.
50 """
51 raise NotImplementedError()
53 async def respond_success(
54 self,
55 code: int,
56 data: bytes | None = None,
57 headers: dict[str, str] | None = None,
58 ) -> None:
59 """Send a success response to the request.
61 Args:
62 code: The status code describing the success.
63 data: The response data.
64 headers: Additional response headers.
65 """
66 if not headers:
67 headers = {}
68 headers["Nats-Service-Success-Code"] = str(code)
69 await self.respond(data or b"", headers=headers)
71 async def respond_error(
72 self,
73 code: int,
74 description: str,
75 data: bytes | None = None,
76 headers: dict[str, str] | None = None,
77 ) -> None:
78 """Send an error response to the request.
80 Args:
81 code: The error code describing the error.
82 description: A string describing the error which can be displayed to the client.
83 data: The error data.
84 headers: Additional response headers.
85 """
86 if not headers:
87 headers = {}
88 headers["Nats-Service-Error"] = description
89 headers["Nats-Service-Error-Code"] = str(code)
90 await self.respond(data or b"", headers=headers)
93@dataclass
94class NatsRequest(Request):
95 """Implementation of Request using nats-py client library."""
97 msg: Msg
99 def subject(self) -> str:
100 """The subject on which request was received."""
101 return self.msg.subject
103 def headers(self) -> dict[str, str]:
104 """The headers of the request."""
105 return self.msg.headers or {}
107 def data(self) -> bytes:
108 """The data of the request."""
109 return self.msg.data
111 async def respond(self, data: bytes, headers: dict[str, str] | None = None) -> None:
112 """Send a success response to the request.
114 Args:
115 data: The response data.
116 headers: Additional response headers.
117 """
118 if not self.msg.reply:
119 return
120 await self.msg._client.publish( # type: ignore[reportPrivateUsage]
121 self.msg.reply,
122 data,
123 headers=headers,
124 )