404

[ Avaa Bypassed ]




Upload:

Command:

elspacio@3.138.137.150: ~ $
import enum

from types import TracebackType
from typing import final, Optional, Type

from . import events
from . import exceptions
from . import tasks


__all__ = (
    "Timeout",
    "timeout",
    "timeout_at",
)


class _State(enum.Enum):
    CREATED = "created"
    ENTERED = "active"
    EXPIRING = "expiring"
    EXPIRED = "expired"
    EXITED = "finished"


@final
class Timeout:
    """Asynchronous context manager for cancelling overdue coroutines.

    Use `timeout()` or `timeout_at()` rather than instantiating this class directly.
    """

    def __init__(self, when: Optional[float]) -> None:
        """Schedule a timeout that will trigger at a given loop time.

        - If `when` is `None`, the timeout will never trigger.
        - If `when < loop.time()`, the timeout will trigger on the next
          iteration of the event loop.
        """
        self._state = _State.CREATED

        self._timeout_handler: Optional[events.TimerHandle] = None
        self._task: Optional[tasks.Task] = None
        self._when = when

    def when(self) -> Optional[float]:
        """Return the current deadline."""
        return self._when

    def reschedule(self, when: Optional[float]) -> None:
        """Reschedule the timeout."""
        if self._state is not _State.ENTERED:
            if self._state is _State.CREATED:
                raise RuntimeError("Timeout has not been entered")
            raise RuntimeError(
                f"Cannot change state of {self._state.value} Timeout",
            )

        self._when = when

        if self._timeout_handler is not None:
            self._timeout_handler.cancel()

        if when is None:
            self._timeout_handler = None
        else:
            loop = events.get_running_loop()
            if when <= loop.time():
                self._timeout_handler = loop.call_soon(self._on_timeout)
            else:
                self._timeout_handler = loop.call_at(when, self._on_timeout)

    def expired(self) -> bool:
        """Is timeout expired during execution?"""
        return self._state in (_State.EXPIRING, _State.EXPIRED)

    def __repr__(self) -> str:
        info = ['']
        if self._state is _State.ENTERED:
            when = round(self._when, 3) if self._when is not None else None
            info.append(f"when={when}")
        info_str = ' '.join(info)
        return f"<Timeout [{self._state.value}]{info_str}>"

    async def __aenter__(self) -> "Timeout":
        if self._state is not _State.CREATED:
            raise RuntimeError("Timeout has already been entered")
        task = tasks.current_task()
        if task is None:
            raise RuntimeError("Timeout should be used inside a task")
        self._state = _State.ENTERED
        self._task = task
        self._cancelling = self._task.cancelling()
        self.reschedule(self._when)
        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[TracebackType],
    ) -> Optional[bool]:
        assert self._state in (_State.ENTERED, _State.EXPIRING)

        if self._timeout_handler is not None:
            self._timeout_handler.cancel()
            self._timeout_handler = None

        if self._state is _State.EXPIRING:
            self._state = _State.EXPIRED

            if self._task.uncancel() <= self._cancelling and exc_type is exceptions.CancelledError:
                # Since there are no new cancel requests, we're
                # handling this.
                raise TimeoutError from exc_val
        elif self._state is _State.ENTERED:
            self._state = _State.EXITED

        return None

    def _on_timeout(self) -> None:
        assert self._state is _State.ENTERED
        self._task.cancel()
        self._state = _State.EXPIRING
        # drop the reference early
        self._timeout_handler = None


def timeout(delay: Optional[float]) -> Timeout:
    """Timeout async context manager.

    Useful in cases when you want to apply timeout logic around block
    of code or in cases when asyncio.wait_for is not suitable. For example:

    >>> async with asyncio.timeout(10):  # 10 seconds timeout
    ...     await long_running_task()


    delay - value in seconds or None to disable timeout logic

    long_running_task() is interrupted by raising asyncio.CancelledError,
    the top-most affected timeout() context manager converts CancelledError
    into TimeoutError.
    """
    loop = events.get_running_loop()
    return Timeout(loop.time() + delay if delay is not None else None)


def timeout_at(when: Optional[float]) -> Timeout:
    """Schedule the timeout at absolute time.

    Like timeout() but argument gives absolute time in the same clock system
    as loop.time().

    Please note: it is not POSIX time but a time with
    undefined starting base, e.g. the time of the system power on.

    >>> async with asyncio.timeout_at(loop.time() + 10):
    ...     await long_running_task()


    when - a deadline when timeout occurs or None to disable timeout logic

    long_running_task() is interrupted by raising asyncio.CancelledError,
    the top-most affected timeout() context manager converts CancelledError
    into TimeoutError.
    """
    return Timeout(when)

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 1.16 KB 0644
__main__.py File 3.3 KB 0644
base_events.py File 73.62 KB 0644
base_futures.py File 1.96 KB 0644
base_subprocess.py File 8.66 KB 0644
base_tasks.py File 2.58 KB 0644
constants.py File 1.29 KB 0644
coroutines.py File 3.32 KB 0644
events.py File 27.97 KB 0644
exceptions.py File 1.71 KB 0644
format_helpers.py File 2.35 KB 0644
futures.py File 13.88 KB 0644
locks.py File 18.57 KB 0644
log.py File 124 B 0644
mixins.py File 481 B 0644
proactor_events.py File 32.48 KB 0644
protocols.py File 6.79 KB 0644
queues.py File 7.79 KB 0644
runners.py File 6.68 KB 0644
selector_events.py File 44.34 KB 0644
sslproto.py File 31 KB 0644
staggered.py File 5.85 KB 0644
streams.py File 26.86 KB 0644
subprocess.py File 7.5 KB 0644
taskgroups.py File 8.27 KB 0644
tasks.py File 33.63 KB 0644
threads.py File 790 B 0644
timeouts.py File 5.2 KB 0644
transports.py File 10.47 KB 0644
trsock.py File 2.42 KB 0644
unix_events.py File 50.7 KB 0644
windows_events.py File 33.88 KB 0644
windows_utils.py File 4.94 KB 0644