404

[ Avaa Bypassed ]




Upload:

Command:

elspacio@3.144.85.147: ~ $
#
# We use a background thread for sharing fds on Unix, and for sharing sockets on
# Windows.
#
# A client which wants to pickle a resource registers it with the resource
# sharer and gets an identifier in return.  The unpickling process will connect
# to the resource sharer, sends the identifier and its pid, and then receives
# the resource.
#

import os
import signal
import socket
import sys
import threading

from . import process
from .context import reduction
from . import util

__all__ = ['stop']


if sys.platform == 'win32':
    __all__ += ['DupSocket']

    class DupSocket(object):
        '''Picklable wrapper for a socket.'''
        def __init__(self, sock):
            new_sock = sock.dup()
            def send(conn, pid):
                share = new_sock.share(pid)
                conn.send_bytes(share)
            self._id = _resource_sharer.register(send, new_sock.close)

        def detach(self):
            '''Get the socket.  This should only be called once.'''
            with _resource_sharer.get_connection(self._id) as conn:
                share = conn.recv_bytes()
                return socket.fromshare(share)

else:
    __all__ += ['DupFd']

    class DupFd(object):
        '''Wrapper for fd which can be used at any time.'''
        def __init__(self, fd):
            new_fd = os.dup(fd)
            def send(conn, pid):
                reduction.send_handle(conn, new_fd, pid)
            def close():
                os.close(new_fd)
            self._id = _resource_sharer.register(send, close)

        def detach(self):
            '''Get the fd.  This should only be called once.'''
            with _resource_sharer.get_connection(self._id) as conn:
                return reduction.recv_handle(conn)


class _ResourceSharer(object):
    '''Manager for resources using background thread.'''
    def __init__(self):
        self._key = 0
        self._cache = {}
        self._lock = threading.Lock()
        self._listener = None
        self._address = None
        self._thread = None
        util.register_after_fork(self, _ResourceSharer._afterfork)

    def register(self, send, close):
        '''Register resource, returning an identifier.'''
        with self._lock:
            if self._address is None:
                self._start()
            self._key += 1
            self._cache[self._key] = (send, close)
            return (self._address, self._key)

    @staticmethod
    def get_connection(ident):
        '''Return connection from which to receive identified resource.'''
        from .connection import Client
        address, key = ident
        c = Client(address, authkey=process.current_process().authkey)
        c.send((key, os.getpid()))
        return c

    def stop(self, timeout=None):
        '''Stop the background thread and clear registered resources.'''
        from .connection import Client
        with self._lock:
            if self._address is not None:
                c = Client(self._address,
                           authkey=process.current_process().authkey)
                c.send(None)
                c.close()
                self._thread.join(timeout)
                if self._thread.is_alive():
                    util.sub_warning('_ResourceSharer thread did '
                                     'not stop when asked')
                self._listener.close()
                self._thread = None
                self._address = None
                self._listener = None
                for key, (send, close) in self._cache.items():
                    close()
                self._cache.clear()

    def _afterfork(self):
        for key, (send, close) in self._cache.items():
            close()
        self._cache.clear()
        self._lock._at_fork_reinit()
        if self._listener is not None:
            self._listener.close()
        self._listener = None
        self._address = None
        self._thread = None

    def _start(self):
        from .connection import Listener
        assert self._listener is None, "Already have Listener"
        util.debug('starting listener and thread for sending handles')
        self._listener = Listener(authkey=process.current_process().authkey)
        self._address = self._listener.address
        t = threading.Thread(target=self._serve)
        t.daemon = True
        t.start()
        self._thread = t

    def _serve(self):
        if hasattr(signal, 'pthread_sigmask'):
            signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
        while 1:
            try:
                with self._listener.accept() as conn:
                    msg = conn.recv()
                    if msg is None:
                        break
                    key, destination_pid = msg
                    send, close = self._cache.pop(key)
                    try:
                        send(conn, destination_pid)
                    finally:
                        close()
            except:
                if not util.is_exiting():
                    sys.excepthook(*sys.exc_info())


_resource_sharer = _ResourceSharer()
stop = _resource_sharer.stop

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
dummy Folder 0755
__init__.py File 916 B 0644
connection.py File 30.94 KB 0644
context.py File 10.99 KB 0644
forkserver.py File 11.86 KB 0644
heap.py File 11.35 KB 0644
managers.py File 46.19 KB 0644
pool.py File 31.79 KB 0644
popen_fork.py File 2.32 KB 0644
popen_forkserver.py File 2.18 KB 0644
popen_spawn_posix.py File 1.98 KB 0644
popen_spawn_win32.py File 3.92 KB 0644
process.py File 11.72 KB 0644
queues.py File 11.74 KB 0644
reduction.py File 9.29 KB 0644
resource_sharer.py File 5.01 KB 0644
resource_tracker.py File 8.41 KB 0644
shared_memory.py File 17.96 KB 0644
sharedctypes.py File 6.16 KB 0644
spawn.py File 9.08 KB 0644
synchronize.py File 11.34 KB 0644
util.py File 13.68 KB 0644