[ Avaa Bypassed ]



elspacio@ ~ $
# engine/strategies.py
# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
# <see AUTHORS file>
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php

"""Strategies for creating new instances of Engine types.

These are semi-private implementation classes which provide the
underlying behavior for the "strategy" keyword argument available on
:func:`~sqlalchemy.engine.create_engine`.  Current available options are
``plain``, ``threadlocal``, and ``mock``.

New strategies can be added via new ``EngineStrategy`` classes.

from operator import attrgetter

from . import base
from . import threadlocal
from . import url
from .. import event
from .. import pool as poollib
from .. import util
from ..sql import schema

strategies = {}

class EngineStrategy(object):
    """An adaptor that processes input arguments and produces an Engine.

    Provides a ``create`` method that receives input arguments and
    produces an instance of base.Engine or a subclass.


    def __init__(self):
        strategies[self.name] = self

    def create(self, *args, **kwargs):
        """Given arguments, returns a new Engine instance."""

        raise NotImplementedError()

class DefaultEngineStrategy(EngineStrategy):
    """Base class for built-in strategies."""

    def create(self, name_or_url, **kwargs):
        # create url.URL object
        u = url.make_url(name_or_url)

        plugins = u._instantiate_plugins(kwargs)

        u.query.pop("plugin", None)
        kwargs.pop("plugins", None)

        entrypoint = u._get_entrypoint()
        dialect_cls = entrypoint.get_dialect_cls(u)

        if kwargs.pop("_coerce_config", False):

            def pop_kwarg(key, default=None):
                value = kwargs.pop(key, default)
                if key in dialect_cls.engine_config_types:
                    value = dialect_cls.engine_config_types[key](value)
                return value

            pop_kwarg = kwargs.pop

        dialect_args = {}
        # consume dialect arguments from kwargs
        for k in util.get_cls_kwargs(dialect_cls):
            if k in kwargs:
                dialect_args[k] = pop_kwarg(k)

        dbapi = kwargs.pop("module", None)
        if dbapi is None:
            dbapi_args = {}
            for k in util.get_func_kwargs(dialect_cls.dbapi):
                if k in kwargs:
                    dbapi_args[k] = pop_kwarg(k)
            dbapi = dialect_cls.dbapi(**dbapi_args)

        dialect_args["dbapi"] = dbapi

        for plugin in plugins:
            plugin.handle_dialect_kwargs(dialect_cls, dialect_args)

        # create dialect
        dialect = dialect_cls(**dialect_args)

        # assemble connection arguments
        (cargs, cparams) = dialect.create_connect_args(u)
        cparams.update(pop_kwarg("connect_args", {}))
        cargs = list(cargs)  # allow mutability

        # look for existing pool or create
        pool = pop_kwarg("pool", None)
        if pool is None:

            def connect(connection_record=None):
                if dialect._has_events:
                    for fn in dialect.dispatch.do_connect:
                        connection = fn(
                            dialect, connection_record, cargs, cparams
                        if connection is not None:
                            return connection
                return dialect.connect(*cargs, **cparams)

            creator = pop_kwarg("creator", connect)

            poolclass = pop_kwarg("poolclass", None)
            if poolclass is None:
                poolclass = dialect_cls.get_pool_class(u)
            pool_args = {"dialect": dialect}

            # consume pool arguments from kwargs, translating a few of
            # the arguments
            translate = {
                "logging_name": "pool_logging_name",
                "echo": "echo_pool",
                "timeout": "pool_timeout",
                "recycle": "pool_recycle",
                "events": "pool_events",
                "use_threadlocal": "pool_threadlocal",
                "reset_on_return": "pool_reset_on_return",
                "pre_ping": "pool_pre_ping",
                "use_lifo": "pool_use_lifo",
            for k in util.get_cls_kwargs(poolclass):
                tk = translate.get(k, k)
                if tk in kwargs:
                    pool_args[k] = pop_kwarg(tk)

            for plugin in plugins:
                plugin.handle_pool_kwargs(poolclass, pool_args)

            pool = poolclass(creator, **pool_args)
            if isinstance(pool, poollib.dbapi_proxy._DBProxy):
                pool = pool.get_pool(*cargs, **cparams)
                pool = pool

            pool._dialect = dialect

        # create engine.
        engineclass = self.engine_cls
        engine_args = {}
        for k in util.get_cls_kwargs(engineclass):
            if k in kwargs:
                engine_args[k] = pop_kwarg(k)

        _initialize = kwargs.pop("_initialize", True)

        # all kwargs should be consumed
        if kwargs:
            raise TypeError(
                "Invalid argument(s) %s sent to create_engine(), "
                "using configuration %s/%s/%s.  Please check that the "
                "keyword arguments are appropriate for this combination "
                "of components."
                % (
                    ",".join("'%s'" % k for k in kwargs),

        engine = engineclass(pool, dialect, u, **engine_args)

        if _initialize:
            do_on_connect = dialect.on_connect()
            if do_on_connect:

                def on_connect(dbapi_connection, connection_record):
                    conn = getattr(
                        dbapi_connection, "_sqla_unwrap", dbapi_connection
                    if conn is None:

                event.listen(pool, "first_connect", on_connect)
                event.listen(pool, "connect", on_connect)

            def first_connect(dbapi_connection, connection_record):
                c = base.Connection(
                    engine, connection=dbapi_connection, _has_events=False
                c._execution_options = util.immutabledict()


        if entrypoint is not dialect_cls:

        for plugin in plugins:

        return engine

class PlainEngineStrategy(DefaultEngineStrategy):
    """Strategy for configuring a regular Engine."""

    name = "plain"
    engine_cls = base.Engine


class ThreadLocalEngineStrategy(DefaultEngineStrategy):
    """Strategy for configuring an Engine with threadlocal behavior."""

    name = "threadlocal"
    engine_cls = threadlocal.TLEngine


class MockEngineStrategy(EngineStrategy):
    """Strategy for configuring an Engine-like object with mocked execution.

    Produces a single mock Connectable object which dispatches
    statement execution to a passed-in function.


    name = "mock"

    def create(self, name_or_url, executor, **kwargs):
        # create url.URL object
        u = url.make_url(name_or_url)

        dialect_cls = u.get_dialect()

        dialect_args = {}
        # consume dialect arguments from kwargs
        for k in util.get_cls_kwargs(dialect_cls):
            if k in kwargs:
                dialect_args[k] = kwargs.pop(k)

        # create dialect
        dialect = dialect_cls(**dialect_args)

        return MockEngineStrategy.MockConnection(dialect, executor)

    class MockConnection(base.Connectable):
        def __init__(self, dialect, execute):
            self._dialect = dialect
            self.execute = execute

        engine = property(lambda s: s)
        dialect = property(attrgetter("_dialect"))
        name = property(lambda s: s._dialect.name)

        schema_for_object = schema._schema_getter(None)

        def contextual_connect(self, **kwargs):
            return self

        def connect(self, **kwargs):
            return self

        def execution_options(self, **kw):
            return self

        def compiler(self, statement, parameters, **kwargs):
            return self._dialect.compiler(
                statement, parameters, engine=self, **kwargs

        def create(self, entity, **kwargs):
            kwargs["checkfirst"] = False
            from sqlalchemy.engine import ddl

            ddl.SchemaGenerator(self.dialect, self, **kwargs).traverse_single(

        def drop(self, entity, **kwargs):
            kwargs["checkfirst"] = False
            from sqlalchemy.engine import ddl

            ddl.SchemaDropper(self.dialect, self, **kwargs).traverse_single(

        def _run_visitor(
            self, visitorcallable, element, connection=None, **kwargs
            kwargs["checkfirst"] = False
            visitorcallable(self.dialect, self, **kwargs).traverse_single(

        def execute(self, object_, *multiparams, **params):
            raise NotImplementedError()



Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 24.74 KB 0644
base.py File 85.11 KB 0644
default.py File 54.82 KB 0644
interfaces.py File 47.35 KB 0644
reflection.py File 33.9 KB 0644
result.py File 53.43 KB 0644
strategies.py File 9.62 KB 0644
threadlocal.py File 4.65 KB 0644
url.py File 9.22 KB 0644
util.py File 2.36 KB 0644