import asyncio
from .version import __version__
class ClosedError(Exception):
pass
class _SocketProtocol:
def __init__(self):
self._packets = asyncio.Queue()
def connection_made(self, transport):
pass
def connection_lost(self, transport):
self._packets.put_nowait(None)
def datagram_received(self, data, addr):
self._packets.put_nowait((data, addr))
async def recvfrom(self):
return await self._packets.get()
[docs]class Socket:
"""A UDP socket. Use :func:`~asyncudp.create_socket()` to create an
instance of this class.
"""
def __init__(self, transport, protocol):
self._transport = transport
self._protocol = protocol
[docs] def close(self):
"""Close the socket.
"""
self._transport.close()
[docs] def sendto(self, data, addr=None):
"""Send given packet to given address ``addr``. Sends to
``remote_addr`` given to the constructor if ``addr`` is
``None``.
>>> sock.sendto(b'Hi!')
"""
self._transport.sendto(data, addr)
[docs] async def recvfrom(self):
"""Receive a UDP packet.
Raises ClosedError on connection error, often by calling the
close() method from another task.
>>> data, addr = sock.recvfrom()
"""
packet = await self._protocol.recvfrom()
if packet is None:
raise ClosedError()
return packet
def getsockname(self):
"""Get bound infomation.
>>> local_address, local_port = sock.getsockname()
"""
return self._transport.get_extra_info('sockname')
async def __aenter__(self):
return self
async def __aexit__(self, *exc_info):
self.close()
[docs]async def create_socket(local_addr=None, remote_addr=None):
"""Create a UDP socket with given local and remote addresses.
>>> sock = await asyncudp.create_socket(local_addr=('127.0.0.1', 9999))
"""
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
_SocketProtocol,
local_addr=local_addr,
remote_addr=remote_addr)
return Socket(transport, protocol)