404

[ Avaa Bypassed ]




Upload:

Command:

elspacio@18.226.248.119: ~ $
__all__ = ()

import reprlib
from _thread import get_ident

from . import format_helpers

# States for Future.
_PENDING = 'PENDING'
_CANCELLED = 'CANCELLED'
_FINISHED = 'FINISHED'


def isfuture(obj):
    """Check for a Future.

    This returns True when obj is a Future instance or is advertising
    itself as duck-type compatible by setting _asyncio_future_blocking.
    See comment in Future for more details.
    """
    return (hasattr(obj.__class__, '_asyncio_future_blocking') and
            obj._asyncio_future_blocking is not None)


def _format_callbacks(cb):
    """helper function for Future.__repr__"""
    size = len(cb)
    if not size:
        cb = ''

    def format_cb(callback):
        return format_helpers._format_callback_source(callback, ())

    if size == 1:
        cb = format_cb(cb[0][0])
    elif size == 2:
        cb = '{}, {}'.format(format_cb(cb[0][0]), format_cb(cb[1][0]))
    elif size > 2:
        cb = '{}, <{} more>, {}'.format(format_cb(cb[0][0]),
                                        size - 2,
                                        format_cb(cb[-1][0]))
    return f'cb=[{cb}]'


# bpo-42183: _repr_running is needed for repr protection
# when a Future or Task result contains itself directly or indirectly.
# The logic is borrowed from @reprlib.recursive_repr decorator.
# Unfortunately, the direct decorator usage is impossible because of
# AttributeError: '_asyncio.Task' object has no attribute '__module__' error.
#
# After fixing this thing we can return to the decorator based approach.
_repr_running = set()


def _future_repr_info(future):
    # (Future) -> str
    """helper function for Future.__repr__"""
    info = [future._state.lower()]
    if future._state == _FINISHED:
        if future._exception is not None:
            info.append(f'exception={future._exception!r}')
        else:
            key = id(future), get_ident()
            if key in _repr_running:
                result = '...'
            else:
                _repr_running.add(key)
                try:
                    # use reprlib to limit the length of the output, especially
                    # for very long strings
                    result = reprlib.repr(future._result)
                finally:
                    _repr_running.discard(key)
            info.append(f'result={result}')
    if future._callbacks:
        info.append(_format_callbacks(future._callbacks))
    if future._source_traceback:
        frame = future._source_traceback[-1]
        info.append(f'created at {frame[0]}:{frame[1]}')
    return info

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 1.25 KB 0644
__main__.py File 3.3 KB 0644
base_events.py File 72.45 KB 0644
base_futures.py File 2.51 KB 0644
base_subprocess.py File 8.64 KB 0644
base_tasks.py File 2.41 KB 0644
constants.py File 888 B 0644
coroutines.py File 8.59 KB 0644
events.py File 25.83 KB 0644
exceptions.py File 1.59 KB 0644
format_helpers.py File 2.35 KB 0644
futures.py File 13.71 KB 0644
locks.py File 14.83 KB 0644
log.py File 124 B 0644
proactor_events.py File 31.39 KB 0644
protocols.py File 6.79 KB 0644
queues.py File 8.11 KB 0644
runners.py File 2.08 KB 0644
selector_events.py File 38.57 KB 0644
sslproto.py File 26.82 KB 0644
staggered.py File 5.85 KB 0644
streams.py File 26.03 KB 0644
subprocess.py File 7.88 KB 0644
tasks.py File 33.61 KB 0644
threads.py File 790 B 0644
transports.py File 10.47 KB 0644
trsock.py File 5.74 KB 0644
unix_events.py File 50.54 KB 0644
windows_events.py File 32.28 KB 0644
windows_utils.py File 4.94 KB 0644