File size: 6,584 Bytes
8a58cf3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
from __future__ import annotations

from abc import abstractmethod
from typing import Any, Callable, Generic, TypeVar, Union

from .._core._exceptions import EndOfStream
from .._core._typedattr import TypedAttributeProvider
from ._resources import AsyncResource
from ._tasks import TaskGroup

T_Item = TypeVar("T_Item")
T_co = TypeVar("T_co", covariant=True)
T_contra = TypeVar("T_contra", contravariant=True)


class UnreliableObjectReceiveStream(
    Generic[T_co], AsyncResource, TypedAttributeProvider
):
    """
    An interface for receiving objects.

    This interface makes no guarantees that the received messages arrive in the order in which they
    were sent, or that no messages are missed.

    Asynchronously iterating over objects of this type will yield objects matching the given type
    parameter.
    """

    def __aiter__(self) -> UnreliableObjectReceiveStream[T_co]:
        return self

    async def __anext__(self) -> T_co:
        try:
            return await self.receive()
        except EndOfStream:
            raise StopAsyncIteration

    @abstractmethod
    async def receive(self) -> T_co:
        """
        Receive the next item.

        :raises ~anyio.ClosedResourceError: if the receive stream has been explicitly
            closed
        :raises ~anyio.EndOfStream: if this stream has been closed from the other end
        :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable
            due to external causes
        """


class UnreliableObjectSendStream(
    Generic[T_contra], AsyncResource, TypedAttributeProvider
):
    """
    An interface for sending objects.

    This interface makes no guarantees that the messages sent will reach the recipient(s) in the
    same order in which they were sent, or at all.
    """

    @abstractmethod
    async def send(self, item: T_contra) -> None:
        """
        Send an item to the peer(s).

        :param item: the item to send
        :raises ~anyio.ClosedResourceError: if the send stream has been explicitly
            closed
        :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable
            due to external causes
        """


class UnreliableObjectStream(
    UnreliableObjectReceiveStream[T_Item], UnreliableObjectSendStream[T_Item]
):
    """
    A bidirectional message stream which does not guarantee the order or reliability of message
    delivery.
    """


class ObjectReceiveStream(UnreliableObjectReceiveStream[T_co]):
    """
    A receive message stream which guarantees that messages are received in the same order in
    which they were sent, and that no messages are missed.
    """


class ObjectSendStream(UnreliableObjectSendStream[T_contra]):
    """
    A send message stream which guarantees that messages are delivered in the same order in which
    they were sent, without missing any messages in the middle.
    """


class ObjectStream(
    ObjectReceiveStream[T_Item],
    ObjectSendStream[T_Item],
    UnreliableObjectStream[T_Item],
):
    """
    A bidirectional message stream which guarantees the order and reliability of message delivery.
    """

    @abstractmethod
    async def send_eof(self) -> None:
        """
        Send an end-of-file indication to the peer.

        You should not try to send any further data to this stream after calling this method.
        This method is idempotent (does nothing on successive calls).
        """


class ByteReceiveStream(AsyncResource, TypedAttributeProvider):
    """
    An interface for receiving bytes from a single peer.

    Iterating this byte stream will yield a byte string of arbitrary length, but no more than
    65536 bytes.
    """

    def __aiter__(self) -> ByteReceiveStream:
        return self

    async def __anext__(self) -> bytes:
        try:
            return await self.receive()
        except EndOfStream:
            raise StopAsyncIteration

    @abstractmethod
    async def receive(self, max_bytes: int = 65536) -> bytes:
        """
        Receive at most ``max_bytes`` bytes from the peer.

        .. note:: Implementors of this interface should not return an empty :class:`bytes` object,
            and users should ignore them.

        :param max_bytes: maximum number of bytes to receive
        :return: the received bytes
        :raises ~anyio.EndOfStream: if this stream has been closed from the other end
        """


class ByteSendStream(AsyncResource, TypedAttributeProvider):
    """An interface for sending bytes to a single peer."""

    @abstractmethod
    async def send(self, item: bytes) -> None:
        """
        Send the given bytes to the peer.

        :param item: the bytes to send
        """


class ByteStream(ByteReceiveStream, ByteSendStream):
    """A bidirectional byte stream."""

    @abstractmethod
    async def send_eof(self) -> None:
        """
        Send an end-of-file indication to the peer.

        You should not try to send any further data to this stream after calling this method.
        This method is idempotent (does nothing on successive calls).
        """


#: Type alias for all unreliable bytes-oriented receive streams.
AnyUnreliableByteReceiveStream = Union[
    UnreliableObjectReceiveStream[bytes], ByteReceiveStream
]
#: Type alias for all unreliable bytes-oriented send streams.
AnyUnreliableByteSendStream = Union[UnreliableObjectSendStream[bytes], ByteSendStream]
#: Type alias for all unreliable bytes-oriented streams.
AnyUnreliableByteStream = Union[UnreliableObjectStream[bytes], ByteStream]
#: Type alias for all bytes-oriented receive streams.
AnyByteReceiveStream = Union[ObjectReceiveStream[bytes], ByteReceiveStream]
#: Type alias for all bytes-oriented send streams.
AnyByteSendStream = Union[ObjectSendStream[bytes], ByteSendStream]
#: Type alias for all bytes-oriented streams.
AnyByteStream = Union[ObjectStream[bytes], ByteStream]


class Listener(Generic[T_co], AsyncResource, TypedAttributeProvider):
    """An interface for objects that let you accept incoming connections."""

    @abstractmethod
    async def serve(
        self,
        handler: Callable[[T_co], Any],
        task_group: TaskGroup | None = None,
    ) -> None:
        """
        Accept incoming connections as they come in and start tasks to handle them.

        :param handler: a callable that will be used to handle each accepted connection
        :param task_group: the task group that will be used to start tasks for handling each
            accepted connection (if omitted, an ad-hoc task group will be created)
        """