Source code for Aeros.patches.hypercorn.worker

from hypercorn.asyncio.run import *
from hypercorn.asyncio.run import _run, _share_socket, _windows_signal_support


[docs]async def worker_serve( app: ASGIFramework, config: Config, *, sockets: Optional[Sockets] = None, shutdown_trigger: Optional[Callable[..., Awaitable[None]]] = None, ) -> None: config.set_statsd_logger_class(StatsdLogger) lifespan = Lifespan(app, config) lifespan_task = asyncio.ensure_future(lifespan.handle_lifespan()) await lifespan.wait_for_startup() if lifespan_task.done(): exception = lifespan_task.exception() if exception is not None: raise exception if sockets is None: sockets = config.create_sockets() loop = asyncio.get_event_loop() tasks = [] if platform.system() == "Windows": tasks.append(loop.create_task(_windows_signal_support())) if shutdown_trigger is None: shutdown_trigger = asyncio.Future tasks.append(loop.create_task(raise_shutdown(shutdown_trigger))) if config.use_reloader: tasks.append(loop.create_task(observe_changes(asyncio.sleep))) ssl_handshake_timeout = None if config.ssl_enabled: ssl_context = config.create_ssl_context() ssl_handshake_timeout = config.ssl_handshake_timeout async def _server_callback(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: try: await TCPServer(app, loop, config, reader, writer) except ConnectionAbortedError: pass servers = [] for sock in sockets.secure_sockets: if config.workers > 1 and platform.system() == "Windows": sock = _share_socket(sock) servers.append( await asyncio.start_server( _server_callback, backlog=config.backlog, loop=loop, ssl=ssl_context, sock=sock, ssl_handshake_timeout=ssl_handshake_timeout, ) ) bind = repr_socket_addr(sock.family, sock.getsockname()) await config.log.info(f"Running on {bind} over https (CTRL + C to quit)") for sock in sockets.insecure_sockets: if config.workers > 1 and platform.system() == "Windows": sock = _share_socket(sock) servers.append( await asyncio.start_server( _server_callback, backlog=config.backlog, loop=loop, sock=sock ) ) bind = repr_socket_addr(sock.family, sock.getsockname()) await config.log.info(f"Running on {bind} over http (CTRL + C to quit)") for sock in sockets.quic_sockets: if config.workers > 1 and platform.system() == "Windows": sock = _share_socket(sock) await loop.create_datagram_endpoint(lambda: UDPServer(app, loop, config), sock=sock) bind = repr_socket_addr(sock.family, sock.getsockname()) await config.log.info(f"Running on {bind} over quic (CTRL + C to quit)") reload_ = False try: gathered_tasks = asyncio.gather(*tasks) await gathered_tasks except MustReloadException: reload_ = True except (Shutdown, KeyboardInterrupt): pass finally: for server in servers: server.close() await server.wait_closed() # Retrieve the Gathered Tasks Cancelled Exception, to # prevent a warning that this hasn't been done. gathered_tasks.exception() await lifespan.wait_for_shutdown() lifespan_task.cancel() await lifespan_task if reload_: restart()
[docs]def asyncio_worker(app, config: Config, sockets: Optional[Sockets] = None, shutdown_event: Optional[EventType] = None) -> None: shutdown_trigger = None if shutdown_event is not None: shutdown_trigger = partial(check_multiprocess_shutdown_event, shutdown_event, asyncio.sleep) if config.workers > 1 and platform.system() == "Windows": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # type: ignore _run( partial(worker_serve, app, config, sockets=sockets), debug=config.debug, shutdown_trigger=shutdown_trigger, )