a c @snddlmZddlmZddlZddlZedddZGdddejZeZ Gdd d e Z e d d d Z dS) ) ContextVar)OptionalNcurrent_async_library_cvar)defaultc@seZdZdZdS) _ThreadLocalN)__name__ __module__ __qualname__namer r Z/workspaces/shunt/resources/test-fastapi/venv/lib/python3.9/site-packages/sniffio/_impl.pyr src@s eZdZdS)AsyncLibraryNotFoundErrorN)rrr r r r r r sr )returncCstj}|dur|St}|dur&|Sdtjvrddl}z |j}Wnty\|j j}Yn0z|durpWdSWnt yYn0dtjvrddl m }|rdSt ddS)aeDetect which async library is currently running. The following libraries are currently supported: ================ =========== ============================ Library Requires Magic string ================ =========== ============================ **Trio** Trio v0.6+ ``"trio"`` **Curio** - ``"curio"`` **asyncio** ``"asyncio"`` **Trio-asyncio** v0.8.2+ ``"trio"`` or ``"asyncio"``, depending on current mode ================ =========== ============================ Returns: A string like ``"trio"``. Raises: AsyncLibraryNotFoundError: if called from synchronous context, or if the current async library was not recognized. Examples: .. code-block:: python3 from sniffio import current_async_library async def generic_sleep(seconds): library = current_async_library() if library == "trio": import trio await trio.sleep(seconds) elif library == "asyncio": import asyncio await asyncio.sleep(seconds) # ... and so on ... else: raise RuntimeError(f"Unsupported library {library!r}") NasynciorZcurio) curio_runningz.unknown async library, or not in async context) thread_localr rgetsysmodulesr current_taskAttributeErrorZTask RuntimeErrorZ curio.metarr )valuerrrr r r current_async_librarys0)        r)Z contextvarsrtypingrr threadingrlocalrrrr strrr r r r s