404

[ Avaa Bypassed ]




Upload:

Command:

elspacio@3.147.82.108: ~ $
"""SQL composition utility module
"""

# psycopg/sql.py - SQL composition utility module
#
# Copyright (C) 2016-2019 Daniele Varrazzo  <daniele.varrazzo@gmail.com>
# Copyright (C) 2020-2021 The Psycopg Team
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# In addition, as a special exception, the copyright holders give
# permission to link this program with the OpenSSL library (or with
# modified versions of OpenSSL that use the same license as OpenSSL),
# and distribute linked combinations including the two.
#
# You must obey the GNU Lesser General Public License in all respects for
# all of the code used other than OpenSSL.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
# License for more details.

import string

from psycopg2 import extensions as ext


_formatter = string.Formatter()


class Composable:
    """
    Abstract base class for objects that can be used to compose an SQL string.

    `!Composable` objects can be passed directly to `~cursor.execute()`,
    `~cursor.executemany()`, `~cursor.copy_expert()` in place of the query
    string.

    `!Composable` objects can be joined using the ``+`` operator: the result
    will be a `Composed` instance containing the objects joined. The operator
    ``*`` is also supported with an integer argument: the result is a
    `!Composed` instance containing the left argument repeated as many times as
    requested.
    """
    def __init__(self, wrapped):
        self._wrapped = wrapped

    def __repr__(self):
        return f"{self.__class__.__name__}({self._wrapped!r})"

    def as_string(self, context):
        """
        Return the string value of the object.

        :param context: the context to evaluate the string into.
        :type context: `connection` or `cursor`

        The method is automatically invoked by `~cursor.execute()`,
        `~cursor.executemany()`, `~cursor.copy_expert()` if a `!Composable` is
        passed instead of the query string.
        """
        raise NotImplementedError

    def __add__(self, other):
        if isinstance(other, Composed):
            return Composed([self]) + other
        if isinstance(other, Composable):
            return Composed([self]) + Composed([other])
        else:
            return NotImplemented

    def __mul__(self, n):
        return Composed([self] * n)

    def __eq__(self, other):
        return type(self) is type(other) and self._wrapped == other._wrapped

    def __ne__(self, other):
        return not self.__eq__(other)


class Composed(Composable):
    """
    A `Composable` object made of a sequence of `!Composable`.

    The object is usually created using `!Composable` operators and methods.
    However it is possible to create a `!Composed` directly specifying a
    sequence of `!Composable` as arguments.

    Example::

        >>> comp = sql.Composed(
        ...     [sql.SQL("insert into "), sql.Identifier("table")])
        >>> print(comp.as_string(conn))
        insert into "table"

    `!Composed` objects are iterable (so they can be used in `SQL.join` for
    instance).
    """
    def __init__(self, seq):
        wrapped = []
        for i in seq:
            if not isinstance(i, Composable):
                raise TypeError(
                    f"Composed elements must be Composable, got {i!r} instead")
            wrapped.append(i)

        super().__init__(wrapped)

    @property
    def seq(self):
        """The list of the content of the `!Composed`."""
        return list(self._wrapped)

    def as_string(self, context):
        rv = []
        for i in self._wrapped:
            rv.append(i.as_string(context))
        return ''.join(rv)

    def __iter__(self):
        return iter(self._wrapped)

    def __add__(self, other):
        if isinstance(other, Composed):
            return Composed(self._wrapped + other._wrapped)
        if isinstance(other, Composable):
            return Composed(self._wrapped + [other])
        else:
            return NotImplemented

    def join(self, joiner):
        """
        Return a new `!Composed` interposing the *joiner* with the `!Composed` items.

        The *joiner* must be a `SQL` or a string which will be interpreted as
        an `SQL`.

        Example::

            >>> fields = sql.Identifier('foo') + sql.Identifier('bar')  # a Composed
            >>> print(fields.join(', ').as_string(conn))
            "foo", "bar"

        """
        if isinstance(joiner, str):
            joiner = SQL(joiner)
        elif not isinstance(joiner, SQL):
            raise TypeError(
                "Composed.join() argument must be a string or an SQL")

        return joiner.join(self)


class SQL(Composable):
    """
    A `Composable` representing a snippet of SQL statement.

    `!SQL` exposes `join()` and `format()` methods useful to create a template
    where to merge variable parts of a query (for instance field or table
    names).

    The *string* doesn't undergo any form of escaping, so it is not suitable to
    represent variable identifiers or values: you should only use it to pass
    constant strings representing templates or snippets of SQL statements; use
    other objects such as `Identifier` or `Literal` to represent variable
    parts.

    Example::

        >>> query = sql.SQL("select {0} from {1}").format(
        ...    sql.SQL(', ').join([sql.Identifier('foo'), sql.Identifier('bar')]),
        ...    sql.Identifier('table'))
        >>> print(query.as_string(conn))
        select "foo", "bar" from "table"
    """
    def __init__(self, string):
        if not isinstance(string, str):
            raise TypeError("SQL values must be strings")
        super().__init__(string)

    @property
    def string(self):
        """The string wrapped by the `!SQL` object."""
        return self._wrapped

    def as_string(self, context):
        return self._wrapped

    def format(self, *args, **kwargs):
        """
        Merge `Composable` objects into a template.

        :param `Composable` args: parameters to replace to numbered
            (``{0}``, ``{1}``) or auto-numbered (``{}``) placeholders
        :param `Composable` kwargs: parameters to replace to named (``{name}``)
            placeholders
        :return: the union of the `!SQL` string with placeholders replaced
        :rtype: `Composed`

        The method is similar to the Python `str.format()` method: the string
        template supports auto-numbered (``{}``), numbered (``{0}``,
        ``{1}``...), and named placeholders (``{name}``), with positional
        arguments replacing the numbered placeholders and keywords replacing
        the named ones. However placeholder modifiers (``{0!r}``, ``{0:<10}``)
        are not supported. Only `!Composable` objects can be passed to the
        template.

        Example::

            >>> print(sql.SQL("select * from {} where {} = %s")
            ...     .format(sql.Identifier('people'), sql.Identifier('id'))
            ...     .as_string(conn))
            select * from "people" where "id" = %s

            >>> print(sql.SQL("select * from {tbl} where {pkey} = %s")
            ...     .format(tbl=sql.Identifier('people'), pkey=sql.Identifier('id'))
            ...     .as_string(conn))
            select * from "people" where "id" = %s

        """
        rv = []
        autonum = 0
        for pre, name, spec, conv in _formatter.parse(self._wrapped):
            if spec:
                raise ValueError("no format specification supported by SQL")
            if conv:
                raise ValueError("no format conversion supported by SQL")
            if pre:
                rv.append(SQL(pre))

            if name is None:
                continue

            if name.isdigit():
                if autonum:
                    raise ValueError(
                        "cannot switch from automatic field numbering to manual")
                rv.append(args[int(name)])
                autonum = None

            elif not name:
                if autonum is None:
                    raise ValueError(
                        "cannot switch from manual field numbering to automatic")
                rv.append(args[autonum])
                autonum += 1

            else:
                rv.append(kwargs[name])

        return Composed(rv)

    def join(self, seq):
        """
        Join a sequence of `Composable`.

        :param seq: the elements to join.
        :type seq: iterable of `!Composable`

        Use the `!SQL` object's *string* to separate the elements in *seq*.
        Note that `Composed` objects are iterable too, so they can be used as
        argument for this method.

        Example::

            >>> snip = sql.SQL(', ').join(
            ...     sql.Identifier(n) for n in ['foo', 'bar', 'baz'])
            >>> print(snip.as_string(conn))
            "foo", "bar", "baz"
        """
        rv = []
        it = iter(seq)
        try:
            rv.append(next(it))
        except StopIteration:
            pass
        else:
            for i in it:
                rv.append(self)
                rv.append(i)

        return Composed(rv)


class Identifier(Composable):
    """
    A `Composable` representing an SQL identifier or a dot-separated sequence.

    Identifiers usually represent names of database objects, such as tables or
    fields. PostgreSQL identifiers follow `different rules`__ than SQL string
    literals for escaping (e.g. they use double quotes instead of single).

    .. __: https://www.postgresql.org/docs/current/static/sql-syntax-lexical.html# \
        SQL-SYNTAX-IDENTIFIERS

    Example::

        >>> t1 = sql.Identifier("foo")
        >>> t2 = sql.Identifier("ba'r")
        >>> t3 = sql.Identifier('ba"z')
        >>> print(sql.SQL(', ').join([t1, t2, t3]).as_string(conn))
        "foo", "ba'r", "ba""z"

    Multiple strings can be passed to the object to represent a qualified name,
    i.e. a dot-separated sequence of identifiers.

    Example::

        >>> query = sql.SQL("select {} from {}").format(
        ...     sql.Identifier("table", "field"),
        ...     sql.Identifier("schema", "table"))
        >>> print(query.as_string(conn))
        select "table"."field" from "schema"."table"

    """
    def __init__(self, *strings):
        if not strings:
            raise TypeError("Identifier cannot be empty")

        for s in strings:
            if not isinstance(s, str):
                raise TypeError("SQL identifier parts must be strings")

        super().__init__(strings)

    @property
    def strings(self):
        """A tuple with the strings wrapped by the `Identifier`."""
        return self._wrapped

    @property
    def string(self):
        """The string wrapped by the `Identifier`.
        """
        if len(self._wrapped) == 1:
            return self._wrapped[0]
        else:
            raise AttributeError(
                "the Identifier wraps more than one than one string")

    def __repr__(self):
        return f"{self.__class__.__name__}({', '.join(map(repr, self._wrapped))})"

    def as_string(self, context):
        return '.'.join(ext.quote_ident(s, context) for s in self._wrapped)


class Literal(Composable):
    """
    A `Composable` representing an SQL value to include in a query.

    Usually you will want to include placeholders in the query and pass values
    as `~cursor.execute()` arguments. If however you really really need to
    include a literal value in the query you can use this object.

    The string returned by `!as_string()` follows the normal :ref:`adaptation
    rules <python-types-adaptation>` for Python objects.

    Example::

        >>> s1 = sql.Literal("foo")
        >>> s2 = sql.Literal("ba'r")
        >>> s3 = sql.Literal(42)
        >>> print(sql.SQL(', ').join([s1, s2, s3]).as_string(conn))
        'foo', 'ba''r', 42

    """
    @property
    def wrapped(self):
        """The object wrapped by the `!Literal`."""
        return self._wrapped

    def as_string(self, context):
        # is it a connection or cursor?
        if isinstance(context, ext.connection):
            conn = context
        elif isinstance(context, ext.cursor):
            conn = context.connection
        else:
            raise TypeError("context must be a connection or a cursor")

        a = ext.adapt(self._wrapped)
        if hasattr(a, 'prepare'):
            a.prepare(conn)

        rv = a.getquoted()
        if isinstance(rv, bytes):
            rv = rv.decode(ext.encodings[conn.encoding])

        return rv


class Placeholder(Composable):
    """A `Composable` representing a placeholder for query parameters.

    If the name is specified, generate a named placeholder (e.g. ``%(name)s``),
    otherwise generate a positional placeholder (e.g. ``%s``).

    The object is useful to generate SQL queries with a variable number of
    arguments.

    Examples::

        >>> names = ['foo', 'bar', 'baz']

        >>> q1 = sql.SQL("insert into table ({}) values ({})").format(
        ...     sql.SQL(', ').join(map(sql.Identifier, names)),
        ...     sql.SQL(', ').join(sql.Placeholder() * len(names)))
        >>> print(q1.as_string(conn))
        insert into table ("foo", "bar", "baz") values (%s, %s, %s)

        >>> q2 = sql.SQL("insert into table ({}) values ({})").format(
        ...     sql.SQL(', ').join(map(sql.Identifier, names)),
        ...     sql.SQL(', ').join(map(sql.Placeholder, names)))
        >>> print(q2.as_string(conn))
        insert into table ("foo", "bar", "baz") values (%(foo)s, %(bar)s, %(baz)s)

    """

    def __init__(self, name=None):
        if isinstance(name, str):
            if ')' in name:
                raise ValueError(f"invalid name: {name!r}")

        elif name is not None:
            raise TypeError(f"expected string or None as name, got {name!r}")

        super().__init__(name)

    @property
    def name(self):
        """The name of the `!Placeholder`."""
        return self._wrapped

    def __repr__(self):
        if self._wrapped is None:
            return f"{self.__class__.__name__}()"
        else:
            return f"{self.__class__.__name__}({self._wrapped!r})"

    def as_string(self, context):
        if self._wrapped is not None:
            return f"%({self._wrapped})s"
        else:
            return "%s"


# Literals
NULL = SQL("NULL")
DEFAULT = SQL("DEFAULT")

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 4.66 KB 0644
_ipaddress.py File 2.85 KB 0644
_json.py File 6.99 KB 0644
_psycopg.cpython-311-x86_64-linux-gnu.so File 327.2 KB 0644
_range.py File 18.06 KB 0644
errorcodes.py File 14.03 KB 0644
errors.py File 1.39 KB 0644
extensions.py File 6.64 KB 0644
extras.py File 43.18 KB 0644
pool.py File 6.17 KB 0644
sql.py File 14.43 KB 0644
tz.py File 4.76 KB 0644