Coverage for src/nats_contrib/micro/request.py: 100%

33 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-02-27 05:11 +0100

1from __future__ import annotations 

2 

3import abc 

4from dataclasses import dataclass 

5from typing import Awaitable, Callable 

6 

7from nats.aio.msg import Msg 

8from typing_extensions import TypeAlias 

9 

10Handler: TypeAlias = Callable[["Request"], Awaitable[None]] 

11"""Handler is a function that processes a micro request.""" 

12 

13 

14class Request(metaclass=abc.ABCMeta): 

15 """Request is the interface for a request received by a service. 

16 

17 An interface is used instead of a class to allow for different implementations. 

18 It makes it easy to test a service by using a stub implementation of Request. 

19 

20 Four methods must be implemented: 

21 

22 - `def subject() -> str`: the subject on which the request was received. 

23 - `def headers() -> dict[str, str]`: the headers of the request. 

24 - `def data() -> bytes`: the data of the request. 

25 - `async def respond(...) -> None`: send a response to the request. 

26 """ 

27 

28 @abc.abstractmethod 

29 def subject(self) -> str: 

30 """The subject on which request was received.""" 

31 raise NotImplementedError() 

32 

33 @abc.abstractmethod 

34 def headers(self) -> dict[str, str]: 

35 """The headers of the request.""" 

36 raise NotImplementedError() 

37 

38 @abc.abstractmethod 

39 def data(self) -> bytes: 

40 """The data of the request.""" 

41 raise NotImplementedError() 

42 

43 @abc.abstractmethod 

44 async def respond(self, data: bytes, headers: dict[str, str] | None = None) -> None: 

45 """Send a success response to the request. 

46 

47 Args: 

48 data: The response data. 

49 headers: Additional response headers. 

50 """ 

51 raise NotImplementedError() 

52 

53 async def respond_success( 

54 self, 

55 code: int, 

56 data: bytes | None = None, 

57 headers: dict[str, str] | None = None, 

58 ) -> None: 

59 """Send a success response to the request. 

60 

61 Args: 

62 code: The status code describing the success. 

63 data: The response data. 

64 headers: Additional response headers. 

65 """ 

66 if not headers: 

67 headers = {} 

68 headers["Nats-Service-Success-Code"] = str(code) 

69 await self.respond(data or b"", headers=headers) 

70 

71 async def respond_error( 

72 self, 

73 code: int, 

74 description: str, 

75 data: bytes | None = None, 

76 headers: dict[str, str] | None = None, 

77 ) -> None: 

78 """Send an error response to the request. 

79 

80 Args: 

81 code: The error code describing the error. 

82 description: A string describing the error which can be displayed to the client. 

83 data: The error data. 

84 headers: Additional response headers. 

85 """ 

86 if not headers: 

87 headers = {} 

88 headers["Nats-Service-Error"] = description 

89 headers["Nats-Service-Error-Code"] = str(code) 

90 await self.respond(data or b"", headers=headers) 

91 

92 

93@dataclass 

94class NatsRequest(Request): 

95 """Implementation of Request using nats-py client library.""" 

96 

97 msg: Msg 

98 

99 def subject(self) -> str: 

100 """The subject on which request was received.""" 

101 return self.msg.subject 

102 

103 def headers(self) -> dict[str, str]: 

104 """The headers of the request.""" 

105 return self.msg.headers or {} 

106 

107 def data(self) -> bytes: 

108 """The data of the request.""" 

109 return self.msg.data 

110 

111 async def respond(self, data: bytes, headers: dict[str, str] | None = None) -> None: 

112 """Send a success response to the request. 

113 

114 Args: 

115 data: The response data. 

116 headers: Additional response headers. 

117 """ 

118 if not self.msg.reply: 

119 return 

120 await self.msg._client.publish( # type: ignore[reportPrivateUsage] 

121 self.msg.reply, 

122 data, 

123 headers=headers, 

124 )