404

[ Avaa Bypassed ]




Upload:

Command:

elspacio@18.223.213.102: ~ $
from __future__ import absolute_import

import sys
import time

from sentry_sdk.api import continue_trace
from sentry_sdk.consts import OP
from sentry_sdk._compat import reraise
from sentry_sdk._functools import wraps
from sentry_sdk.crons import capture_checkin, MonitorStatus
from sentry_sdk.hub import Hub
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, TRANSACTION_SOURCE_TASK
from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk.utils import (
    capture_internal_exceptions,
    event_from_exception,
    logger,
    match_regex_list,
)

if TYPE_CHECKING:
    from typing import Any
    from typing import Callable
    from typing import Dict
    from typing import List
    from typing import Optional
    from typing import Tuple
    from typing import TypeVar
    from typing import Union

    from sentry_sdk._types import EventProcessor, Event, Hint, ExcInfo

    F = TypeVar("F", bound=Callable[..., Any])


try:
    from celery import VERSION as CELERY_VERSION  # type: ignore
    from celery import Task, Celery
    from celery.app.trace import task_has_custom
    from celery.beat import Scheduler  # type: ignore
    from celery.exceptions import (  # type: ignore
        Ignore,
        Reject,
        Retry,
        SoftTimeLimitExceeded,
    )
    from celery.schedules import crontab, schedule  # type: ignore
    from celery.signals import (  # type: ignore
        task_failure,
        task_success,
        task_retry,
    )
except ImportError:
    raise DidNotEnable("Celery not installed")


CELERY_CONTROL_FLOW_EXCEPTIONS = (Retry, Ignore, Reject)


class CeleryIntegration(Integration):
    identifier = "celery"

    def __init__(
        self,
        propagate_traces=True,
        monitor_beat_tasks=False,
        exclude_beat_tasks=None,
    ):
        # type: (bool, bool, Optional[List[str]]) -> None
        self.propagate_traces = propagate_traces
        self.monitor_beat_tasks = monitor_beat_tasks
        self.exclude_beat_tasks = exclude_beat_tasks

        if monitor_beat_tasks:
            _patch_beat_apply_entry()
            _setup_celery_beat_signals()

    @staticmethod
    def setup_once():
        # type: () -> None
        if CELERY_VERSION < (3,):
            raise DidNotEnable("Celery 3 or newer required.")

        import celery.app.trace as trace  # type: ignore

        old_build_tracer = trace.build_tracer

        def sentry_build_tracer(name, task, *args, **kwargs):
            # type: (Any, Any, *Any, **Any) -> Any
            if not getattr(task, "_sentry_is_patched", False):
                # determine whether Celery will use __call__ or run and patch
                # accordingly
                if task_has_custom(task, "__call__"):
                    type(task).__call__ = _wrap_task_call(task, type(task).__call__)
                else:
                    task.run = _wrap_task_call(task, task.run)

                # `build_tracer` is apparently called for every task
                # invocation. Can't wrap every celery task for every invocation
                # or we will get infinitely nested wrapper functions.
                task._sentry_is_patched = True

            return _wrap_tracer(task, old_build_tracer(name, task, *args, **kwargs))

        trace.build_tracer = sentry_build_tracer

        from celery.app.task import Task  # type: ignore

        Task.apply_async = _wrap_apply_async(Task.apply_async)

        _patch_worker_exit()

        # This logger logs every status of every task that ran on the worker.
        # Meaning that every task's breadcrumbs are full of stuff like "Task
        # <foo> raised unexpected <bar>".
        ignore_logger("celery.worker.job")
        ignore_logger("celery.app.trace")

        # This is stdout/err redirected to a logger, can't deal with this
        # (need event_level=logging.WARN to reproduce)
        ignore_logger("celery.redirected")


def _now_seconds_since_epoch():
    # type: () -> float
    # We cannot use `time.perf_counter()` when dealing with the duration
    # of a Celery task, because the start of a Celery task and
    # the end are recorded in different processes.
    # Start happens in the Celery Beat process,
    # the end in a Celery Worker process.
    return time.time()


def _wrap_apply_async(f):
    # type: (F) -> F
    @wraps(f)
    def apply_async(*args, **kwargs):
        # type: (*Any, **Any) -> Any
        hub = Hub.current
        integration = hub.get_integration(CeleryIntegration)
        if integration is not None and integration.propagate_traces:
            with hub.start_span(
                op=OP.QUEUE_SUBMIT_CELERY, description=args[0].name
            ) as span:
                with capture_internal_exceptions():
                    headers = dict(hub.iter_trace_propagation_headers(span))
                    if integration.monitor_beat_tasks:
                        headers.update(
                            {
                                "sentry-monitor-start-timestamp-s": "%.9f"
                                % _now_seconds_since_epoch(),
                            }
                        )

                    if headers:
                        # Note: kwargs can contain headers=None, so no setdefault!
                        # Unsure which backend though.
                        kwarg_headers = kwargs.get("headers") or {}

                        existing_baggage = kwarg_headers.get(BAGGAGE_HEADER_NAME)
                        sentry_baggage = headers.get(BAGGAGE_HEADER_NAME)

                        combined_baggage = sentry_baggage or existing_baggage
                        if sentry_baggage and existing_baggage:
                            combined_baggage = "{},{}".format(
                                existing_baggage,
                                sentry_baggage,
                            )

                        kwarg_headers.update(headers)
                        if combined_baggage:
                            kwarg_headers[BAGGAGE_HEADER_NAME] = combined_baggage

                        # https://github.com/celery/celery/issues/4875
                        #
                        # Need to setdefault the inner headers too since other
                        # tracing tools (dd-trace-py) also employ this exact
                        # workaround and we don't want to break them.
                        kwarg_headers.setdefault("headers", {}).update(headers)
                        if combined_baggage:
                            kwarg_headers["headers"][
                                BAGGAGE_HEADER_NAME
                            ] = combined_baggage

                        # Add the Sentry options potentially added in `sentry_apply_entry`
                        # to the headers (done when auto-instrumenting Celery Beat tasks)
                        for key, value in kwarg_headers.items():
                            if key.startswith("sentry-"):
                                kwarg_headers["headers"][key] = value

                        kwargs["headers"] = kwarg_headers

                return f(*args, **kwargs)
        else:
            return f(*args, **kwargs)

    return apply_async  # type: ignore


def _wrap_tracer(task, f):
    # type: (Any, F) -> F

    # Need to wrap tracer for pushing the scope before prerun is sent, and
    # popping it after postrun is sent.
    #
    # This is the reason we don't use signals for hooking in the first place.
    # Also because in Celery 3, signal dispatch returns early if one handler
    # crashes.
    @wraps(f)
    def _inner(*args, **kwargs):
        # type: (*Any, **Any) -> Any
        hub = Hub.current
        if hub.get_integration(CeleryIntegration) is None:
            return f(*args, **kwargs)

        with hub.push_scope() as scope:
            scope._name = "celery"
            scope.clear_breadcrumbs()
            scope.add_event_processor(_make_event_processor(task, *args, **kwargs))

            transaction = None

            # Celery task objects are not a thing to be trusted. Even
            # something such as attribute access can fail.
            with capture_internal_exceptions():
                transaction = continue_trace(
                    args[3].get("headers") or {},
                    op=OP.QUEUE_TASK_CELERY,
                    name="unknown celery task",
                    source=TRANSACTION_SOURCE_TASK,
                )
                transaction.name = task.name
                transaction.set_status("ok")

            if transaction is None:
                return f(*args, **kwargs)

            with hub.start_transaction(
                transaction,
                custom_sampling_context={
                    "celery_job": {
                        "task": task.name,
                        # for some reason, args[1] is a list if non-empty but a
                        # tuple if empty
                        "args": list(args[1]),
                        "kwargs": args[2],
                    }
                },
            ):
                return f(*args, **kwargs)

    return _inner  # type: ignore


def _wrap_task_call(task, f):
    # type: (Any, F) -> F

    # Need to wrap task call because the exception is caught before we get to
    # see it. Also celery's reported stacktrace is untrustworthy.

    # functools.wraps is important here because celery-once looks at this
    # method's name.
    # https://github.com/getsentry/sentry-python/issues/421
    @wraps(f)
    def _inner(*args, **kwargs):
        # type: (*Any, **Any) -> Any
        try:
            return f(*args, **kwargs)
        except Exception:
            exc_info = sys.exc_info()
            with capture_internal_exceptions():
                _capture_exception(task, exc_info)
            reraise(*exc_info)

    return _inner  # type: ignore


def _make_event_processor(task, uuid, args, kwargs, request=None):
    # type: (Any, Any, Any, Any, Optional[Any]) -> EventProcessor
    def event_processor(event, hint):
        # type: (Event, Hint) -> Optional[Event]

        with capture_internal_exceptions():
            tags = event.setdefault("tags", {})
            tags["celery_task_id"] = uuid
            extra = event.setdefault("extra", {})
            extra["celery-job"] = {
                "task_name": task.name,
                "args": args,
                "kwargs": kwargs,
            }

        if "exc_info" in hint:
            with capture_internal_exceptions():
                if issubclass(hint["exc_info"][0], SoftTimeLimitExceeded):
                    event["fingerprint"] = [
                        "celery",
                        "SoftTimeLimitExceeded",
                        getattr(task, "name", task),
                    ]

        return event

    return event_processor


def _capture_exception(task, exc_info):
    # type: (Any, ExcInfo) -> None
    hub = Hub.current

    if hub.get_integration(CeleryIntegration) is None:
        return
    if isinstance(exc_info[1], CELERY_CONTROL_FLOW_EXCEPTIONS):
        # ??? Doesn't map to anything
        _set_status(hub, "aborted")
        return

    _set_status(hub, "internal_error")

    if hasattr(task, "throws") and isinstance(exc_info[1], task.throws):
        return

    # If an integration is there, a client has to be there.
    client = hub.client  # type: Any

    event, hint = event_from_exception(
        exc_info,
        client_options=client.options,
        mechanism={"type": "celery", "handled": False},
    )

    hub.capture_event(event, hint=hint)


def _set_status(hub, status):
    # type: (Hub, str) -> None
    with capture_internal_exceptions():
        with hub.configure_scope() as scope:
            if scope.span is not None:
                scope.span.set_status(status)


def _patch_worker_exit():
    # type: () -> None

    # Need to flush queue before worker shutdown because a crashing worker will
    # call os._exit
    from billiard.pool import Worker  # type: ignore

    old_workloop = Worker.workloop

    def sentry_workloop(*args, **kwargs):
        # type: (*Any, **Any) -> Any
        try:
            return old_workloop(*args, **kwargs)
        finally:
            with capture_internal_exceptions():
                hub = Hub.current
                if hub.get_integration(CeleryIntegration) is not None:
                    hub.flush()

    Worker.workloop = sentry_workloop


def _get_headers(task):
    # type: (Task) -> Dict[str, Any]
    headers = task.request.get("headers") or {}

    # flatten nested headers
    if "headers" in headers:
        headers.update(headers["headers"])
        del headers["headers"]

    headers.update(task.request.get("properties") or {})

    return headers


def _get_humanized_interval(seconds):
    # type: (float) -> Tuple[int, str]
    TIME_UNITS = (  # noqa: N806
        ("day", 60 * 60 * 24.0),
        ("hour", 60 * 60.0),
        ("minute", 60.0),
    )

    seconds = float(seconds)
    for unit, divider in TIME_UNITS:
        if seconds >= divider:
            interval = int(seconds / divider)
            return (interval, unit)

    return (int(seconds), "second")


def _get_monitor_config(celery_schedule, app, monitor_name):
    # type: (Any, Celery, str) -> Dict[str, Any]
    monitor_config = {}  # type: Dict[str, Any]
    schedule_type = None  # type: Optional[str]
    schedule_value = None  # type: Optional[Union[str, int]]
    schedule_unit = None  # type: Optional[str]

    if isinstance(celery_schedule, crontab):
        schedule_type = "crontab"
        schedule_value = (
            "{0._orig_minute} "
            "{0._orig_hour} "
            "{0._orig_day_of_month} "
            "{0._orig_month_of_year} "
            "{0._orig_day_of_week}".format(celery_schedule)
        )
    elif isinstance(celery_schedule, schedule):
        schedule_type = "interval"
        (schedule_value, schedule_unit) = _get_humanized_interval(
            celery_schedule.seconds
        )

        if schedule_unit == "second":
            logger.warning(
                "Intervals shorter than one minute are not supported by Sentry Crons. Monitor '%s' has an interval of %s seconds. Use the `exclude_beat_tasks` option in the celery integration to exclude it.",
                monitor_name,
                schedule_value,
            )
            return {}

    else:
        logger.warning(
            "Celery schedule type '%s' not supported by Sentry Crons.",
            type(celery_schedule),
        )
        return {}

    monitor_config["schedule"] = {}
    monitor_config["schedule"]["type"] = schedule_type
    monitor_config["schedule"]["value"] = schedule_value

    if schedule_unit is not None:
        monitor_config["schedule"]["unit"] = schedule_unit

    monitor_config["timezone"] = app.conf.timezone or "UTC"

    return monitor_config


def _patch_beat_apply_entry():
    # type: () -> None
    original_apply_entry = Scheduler.apply_entry

    def sentry_apply_entry(*args, **kwargs):
        # type: (*Any, **Any) -> None
        scheduler, schedule_entry = args
        app = scheduler.app

        celery_schedule = schedule_entry.schedule
        monitor_name = schedule_entry.name

        hub = Hub.current
        integration = hub.get_integration(CeleryIntegration)
        if integration is None:
            return original_apply_entry(*args, **kwargs)

        if match_regex_list(monitor_name, integration.exclude_beat_tasks):
            return original_apply_entry(*args, **kwargs)

        with hub.configure_scope() as scope:
            # When tasks are started from Celery Beat, make sure each task has its own trace.
            scope.set_new_propagation_context()

            monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)

            is_supported_schedule = bool(monitor_config)
            if is_supported_schedule:
                headers = schedule_entry.options.pop("headers", {})
                headers.update(
                    {
                        "sentry-monitor-slug": monitor_name,
                        "sentry-monitor-config": monitor_config,
                    }
                )

                check_in_id = capture_checkin(
                    monitor_slug=monitor_name,
                    monitor_config=monitor_config,
                    status=MonitorStatus.IN_PROGRESS,
                )
                headers.update({"sentry-monitor-check-in-id": check_in_id})

                # Set the Sentry configuration in the options of the ScheduleEntry.
                # Those will be picked up in `apply_async` and added to the headers.
                schedule_entry.options["headers"] = headers

            return original_apply_entry(*args, **kwargs)

    Scheduler.apply_entry = sentry_apply_entry


def _setup_celery_beat_signals():
    # type: () -> None
    task_success.connect(crons_task_success)
    task_failure.connect(crons_task_failure)
    task_retry.connect(crons_task_retry)


def crons_task_success(sender, **kwargs):
    # type: (Task, Dict[Any, Any]) -> None
    logger.debug("celery_task_success %s", sender)
    headers = _get_headers(sender)

    if "sentry-monitor-slug" not in headers:
        return

    monitor_config = headers.get("sentry-monitor-config", {})

    start_timestamp_s = float(headers["sentry-monitor-start-timestamp-s"])

    capture_checkin(
        monitor_slug=headers["sentry-monitor-slug"],
        monitor_config=monitor_config,
        check_in_id=headers["sentry-monitor-check-in-id"],
        duration=_now_seconds_since_epoch() - start_timestamp_s,
        status=MonitorStatus.OK,
    )


def crons_task_failure(sender, **kwargs):
    # type: (Task, Dict[Any, Any]) -> None
    logger.debug("celery_task_failure %s", sender)
    headers = _get_headers(sender)

    if "sentry-monitor-slug" not in headers:
        return

    monitor_config = headers.get("sentry-monitor-config", {})

    start_timestamp_s = float(headers["sentry-monitor-start-timestamp-s"])

    capture_checkin(
        monitor_slug=headers["sentry-monitor-slug"],
        monitor_config=monitor_config,
        check_in_id=headers["sentry-monitor-check-in-id"],
        duration=_now_seconds_since_epoch() - start_timestamp_s,
        status=MonitorStatus.ERROR,
    )


def crons_task_retry(sender, **kwargs):
    # type: (Task, Dict[Any, Any]) -> None
    logger.debug("celery_task_retry %s", sender)
    headers = _get_headers(sender)

    if "sentry-monitor-slug" not in headers:
        return

    monitor_config = headers.get("sentry-monitor-config", {})

    start_timestamp_s = float(headers["sentry-monitor-start-timestamp-s"])

    capture_checkin(
        monitor_slug=headers["sentry-monitor-slug"],
        monitor_config=monitor_config,
        check_in_id=headers["sentry-monitor-check-in-id"],
        duration=_now_seconds_since_epoch() - start_timestamp_s,
        status=MonitorStatus.ERROR,
    )

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
django Folder 0755
grpc Folder 0755
opentelemetry Folder 0755
redis Folder 0755
spark Folder 0755
__init__.py File 6.67 KB 0644
_wsgi_common.py File 4.43 KB 0644
aiohttp.py File 11.28 KB 0644
argv.py File 963 B 0644
arq.py File 6.58 KB 0644
asgi.py File 11.54 KB 0644
asyncio.py File 2.98 KB 0644
atexit.py File 1.8 KB 0644
aws_lambda.py File 15.44 KB 0644
beam.py File 5.56 KB 0644
boto3.py File 4.44 KB 0644
bottle.py File 6.32 KB 0644
celery.py File 18.65 KB 0644
chalice.py File 4.66 KB 0644
cloud_resource_context.py File 6.6 KB 0644
dedupe.py File 1.16 KB 0644
excepthook.py File 2.21 KB 0644
executing.py File 1.99 KB 0644
falcon.py File 7.8 KB 0644
fastapi.py File 4.39 KB 0644
flask.py File 7.72 KB 0644
gcp.py File 8.02 KB 0644
gnu_backtrace.py File 2.86 KB 0644
httpx.py File 4.89 KB 0644
huey.py File 4.59 KB 0644
logging.py File 8.97 KB 0644
loguru.py File 2.98 KB 0644
modules.py File 2.06 KB 0644
pure_eval.py File 4.45 KB 0644
pymongo.py File 5.87 KB 0644
pyramid.py File 7.27 KB 0644
quart.py File 7.2 KB 0644
rq.py File 5.28 KB 0644
sanic.py File 11.06 KB 0644
serverless.py File 1.93 KB 0644
socket.py File 2.88 KB 0644
sqlalchemy.py File 4.14 KB 0644
starlette.py File 22.67 KB 0644
starlite.py File 9.85 KB 0644
stdlib.py File 8.06 KB 0644
threading.py File 2.87 KB 0644
tornado.py File 7.17 KB 0644
trytond.py File 1.7 KB 0644
wsgi.py File 9.36 KB 0644