Coverage for src/nats_contrib/request_many/client.py: 100%

16 statements  

« prev     ^ index     » next       coverage.py v7.4.2, created at 2024-02-25 01:51 +0100

1from __future__ import annotations 

2 

3from typing import AsyncContextManager, AsyncIterator 

4 

5from nats.aio.client import Client as NATSClient 

6from nats.aio.msg import Msg 

7 

8from .executor import RequestManyExecutor 

9from .iterator import RequestManyIterator 

10 

11 

12class Client(NATSClient): 

13 def __init__( 

14 self, 

15 max_wait: float = 0.5, 

16 ) -> None: 

17 super().__init__() 

18 self.max_wait = max_wait 

19 

20 def request_many_iter( 

21 self, 

22 subject: str, 

23 payload: bytes | None = None, 

24 headers: dict[str, str] | None = None, 

25 reply_inbox: str | None = None, 

26 max_wait: float | None = None, 

27 max_count: int | None = None, 

28 max_interval: float | None = None, 

29 stop_on_sentinel: bool = False, 

30 ) -> AsyncContextManager[AsyncIterator[Msg]]: 

31 """Request many responses from the same subject. 

32 

33 The iterator exits without raising an error when no responses are received. 

34 

35 Responses are received until one of the following conditions is met: 

36 

37 - max_wait seconds have passed. 

38 - max_count responses have been received. 

39 - max_interval seconds have passed between responses. 

40 - A sentinel message is received and stop_on_sentinel is True. 

41 

42 When any of the condition is met, the async iterator yielded by the 

43 context manager raises StopAsyncIteration on the next iteration. 

44 

45 The subscription is started when entering the async context manager and 

46 stopped when exiting. 

47 

48 Args: 

49 subject: The subject to send the request to. 

50 payload: The payload to send with the request. 

51 headers: The headers to send with the request. 

52 reply_inbox: The inbox to receive the responses in. A new inbox is created if None. 

53 max_wait: The maximum amount of time to wait for responses. 1 second by default. 

54 max_count: The maximum number of responses to accept. No limit by default. 

55 max_interval: The maximum amount of time between responses. No limit by default. 

56 stop_on_sentinel: Whether to stop when a sentinel message is received. False by default. 

57 """ 

58 inbox = reply_inbox or self.new_inbox() 

59 return RequestManyIterator( 

60 self, 

61 subject, 

62 payload=payload, 

63 headers=headers, 

64 inbox=inbox, 

65 max_wait=max_wait, 

66 max_count=max_count, 

67 max_interval=max_interval, 

68 stop_on_sentinel=stop_on_sentinel, 

69 ) 

70 

71 async def request_many( 

72 self, 

73 subject: str, 

74 payload: bytes | None = None, 

75 headers: dict[str, str] | None = None, 

76 reply_inbox: str | None = None, 

77 max_wait: float | None = None, 

78 max_count: int | None = None, 

79 max_interval: float | None = None, 

80 stop_on_sentinel: bool = False, 

81 ) -> list[Msg]: 

82 """Request many responses from the same subject. 

83 

84 This function does not raise an error when no responses are received. 

85 

86 Responses are received until one of the following conditions is met: 

87 

88 - max_wait seconds have passed. 

89 - max_count responses have been received. 

90 - max_interval seconds have passed between responses. 

91 - A sentinel message is received and stop_on_sentinel is True. 

92 

93 Subscription is always stopped when the function returns. 

94 

95 Args: 

96 subject: The subject to send the request to. 

97 payload: The payload to send with the request. 

98 headers: The headers to send with the request. 

99 reply_inbox: The inbox to receive the responses in. A new inbox is created if None. 

100 max_wait: The maximum amount of time to wait for responses. 1 second by default. 

101 max_count: The maximum number of responses to accept. No limit by default. 

102 max_interval: The maximum amount of time between responses. No limit by default. 

103 stop_on_sentinel: Whether to stop when a sentinel message is received. False by default. 

104 """ 

105 executor = RequestManyExecutor(self, max_wait) 

106 return await executor( 

107 subject, 

108 reply_inbox=reply_inbox, 

109 payload=payload, 

110 headers=headers, 

111 max_wait=max_wait, 

112 max_count=max_count, 

113 max_interval=max_interval, 

114 stop_on_sentinel=stop_on_sentinel, 

115 )