from __future__ import absolute_import import sys from sentry_sdk._compat import reraise from sentry_sdk._types import TYPE_CHECKING from sentry_sdk import Hub from sentry_sdk.consts import OP from sentry_sdk.hub import _should_send_default_pii from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.integrations.logging import ignore_logger from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_TASK from sentry_sdk.utils import ( capture_internal_exceptions, event_from_exception, SENSITIVE_DATA_SUBSTITUTE, parse_version, ) try: import arq.worker from arq.version import VERSION as ARQ_VERSION from arq.connections import ArqRedis from arq.worker import JobExecutionFailed, Retry, RetryJob, Worker except ImportError: raise DidNotEnable("Arq is not installed") if TYPE_CHECKING: from typing import Any, Dict, Optional, Union from sentry_sdk._types import EventProcessor, Event, ExcInfo, Hint from arq.cron import CronJob from arq.jobs import Job from arq.typing import WorkerCoroutine from arq.worker import Function ARQ_CONTROL_FLOW_EXCEPTIONS = (JobExecutionFailed, Retry, RetryJob) class ArqIntegration(Integration): identifier = "arq" @staticmethod def setup_once(): # type: () -> None try: if isinstance(ARQ_VERSION, str): version = parse_version(ARQ_VERSION) else: version = ARQ_VERSION.version[:2] except (TypeError, ValueError): version = None if version is None: raise DidNotEnable("Unparsable arq version: {}".format(ARQ_VERSION)) if version < (0, 23): raise DidNotEnable("arq 0.23 or newer required.") patch_enqueue_job() patch_run_job() patch_create_worker() ignore_logger("arq.worker") def patch_enqueue_job(): # type: () -> None old_enqueue_job = ArqRedis.enqueue_job async def _sentry_enqueue_job(self, function, *args, **kwargs): # type: (ArqRedis, str, *Any, **Any) -> Optional[Job] hub = Hub.current if hub.get_integration(ArqIntegration) is None: return await old_enqueue_job(self, function, *args, **kwargs) with hub.start_span(op=OP.QUEUE_SUBMIT_ARQ, description=function): return await old_enqueue_job(self, function, *args, **kwargs) ArqRedis.enqueue_job = _sentry_enqueue_job def patch_run_job(): # type: () -> None old_run_job = Worker.run_job async def _sentry_run_job(self, job_id, score): # type: (Worker, str, int) -> None hub = Hub(Hub.current) if hub.get_integration(ArqIntegration) is None: return await old_run_job(self, job_id, score) with hub.push_scope() as scope: scope._name = "arq" scope.clear_breadcrumbs() transaction = Transaction( name="unknown arq task", status="ok", op=OP.QUEUE_TASK_ARQ, source=TRANSACTION_SOURCE_TASK, ) with hub.start_transaction(transaction): return await old_run_job(self, job_id, score) Worker.run_job = _sentry_run_job def _capture_exception(exc_info): # type: (ExcInfo) -> None hub = Hub.current if hub.scope.transaction is not None: if exc_info[0] in ARQ_CONTROL_FLOW_EXCEPTIONS: hub.scope.transaction.set_status("aborted") return hub.scope.transaction.set_status("internal_error") event, hint = event_from_exception( exc_info, client_options=hub.client.options if hub.client else None, mechanism={"type": ArqIntegration.identifier, "handled": False}, ) hub.capture_event(event, hint=hint) def _make_event_processor(ctx, *args, **kwargs): # type: (Dict[Any, Any], *Any, **Any) -> EventProcessor def event_processor(event, hint): # type: (Event, Hint) -> Optional[Event] hub = Hub.current with capture_internal_exceptions(): if hub.scope.transaction is not None: hub.scope.transaction.name = ctx["job_name"] event["transaction"] = ctx["job_name"] tags = event.setdefault("tags", {}) tags["arq_task_id"] = ctx["job_id"] tags["arq_task_retry"] = ctx["job_try"] > 1 extra = event.setdefault("extra", {}) extra["arq-job"] = { "task": ctx["job_name"], "args": args if _should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE, "kwargs": kwargs if _should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE, "retry": ctx["job_try"], } return event return event_processor def _wrap_coroutine(name, coroutine): # type: (str, WorkerCoroutine) -> WorkerCoroutine async def _sentry_coroutine(ctx, *args, **kwargs): # type: (Dict[Any, Any], *Any, **Any) -> Any hub = Hub.current if hub.get_integration(ArqIntegration) is None: return await coroutine(*args, **kwargs) hub.scope.add_event_processor( _make_event_processor({**ctx, "job_name": name}, *args, **kwargs) ) try: result = await coroutine(ctx, *args, **kwargs) except Exception: exc_info = sys.exc_info() _capture_exception(exc_info) reraise(*exc_info) return result return _sentry_coroutine def patch_create_worker(): # type: () -> None old_create_worker = arq.worker.create_worker def _sentry_create_worker(*args, **kwargs): # type: (*Any, **Any) -> Worker hub = Hub.current if hub.get_integration(ArqIntegration) is None: return old_create_worker(*args, **kwargs) settings_cls = args[0] functions = settings_cls.functions cron_jobs = settings_cls.cron_jobs settings_cls.functions = [_get_arq_function(func) for func in functions] settings_cls.cron_jobs = [_get_arq_cron_job(cron_job) for cron_job in cron_jobs] return old_create_worker(*args, **kwargs) arq.worker.create_worker = _sentry_create_worker def _get_arq_function(func): # type: (Union[str, Function, WorkerCoroutine]) -> Function arq_func = arq.worker.func(func) arq_func.coroutine = _wrap_coroutine(arq_func.name, arq_func.coroutine) return arq_func def _get_arq_cron_job(cron_job): # type: (CronJob) -> CronJob cron_job.coroutine = _wrap_coroutine(cron_job.name, cron_job.coroutine) return cron_job
Name | Type | Size | Permission | Actions |
---|---|---|---|---|
__pycache__ | Folder | 0755 |
|
|
django | Folder | 0755 |
|
|
grpc | Folder | 0755 |
|
|
opentelemetry | Folder | 0755 |
|
|
redis | Folder | 0755 |
|
|
spark | Folder | 0755 |
|
|
__init__.py | File | 6.67 KB | 0644 |
|
_wsgi_common.py | File | 4.43 KB | 0644 |
|
aiohttp.py | File | 11.28 KB | 0644 |
|
argv.py | File | 963 B | 0644 |
|
arq.py | File | 6.58 KB | 0644 |
|
asgi.py | File | 11.54 KB | 0644 |
|
asyncio.py | File | 2.98 KB | 0644 |
|
atexit.py | File | 1.8 KB | 0644 |
|
aws_lambda.py | File | 15.44 KB | 0644 |
|
beam.py | File | 5.56 KB | 0644 |
|
boto3.py | File | 4.44 KB | 0644 |
|
bottle.py | File | 6.32 KB | 0644 |
|
celery.py | File | 18.65 KB | 0644 |
|
chalice.py | File | 4.66 KB | 0644 |
|
cloud_resource_context.py | File | 6.6 KB | 0644 |
|
dedupe.py | File | 1.16 KB | 0644 |
|
excepthook.py | File | 2.21 KB | 0644 |
|
executing.py | File | 1.99 KB | 0644 |
|
falcon.py | File | 7.8 KB | 0644 |
|
fastapi.py | File | 4.39 KB | 0644 |
|
flask.py | File | 7.72 KB | 0644 |
|
gcp.py | File | 8.02 KB | 0644 |
|
gnu_backtrace.py | File | 2.86 KB | 0644 |
|
httpx.py | File | 4.89 KB | 0644 |
|
huey.py | File | 4.59 KB | 0644 |
|
logging.py | File | 8.97 KB | 0644 |
|
loguru.py | File | 2.98 KB | 0644 |
|
modules.py | File | 2.06 KB | 0644 |
|
pure_eval.py | File | 4.45 KB | 0644 |
|
pymongo.py | File | 5.87 KB | 0644 |
|
pyramid.py | File | 7.27 KB | 0644 |
|
quart.py | File | 7.2 KB | 0644 |
|
rq.py | File | 5.28 KB | 0644 |
|
sanic.py | File | 11.06 KB | 0644 |
|
serverless.py | File | 1.93 KB | 0644 |
|
socket.py | File | 2.88 KB | 0644 |
|
sqlalchemy.py | File | 4.14 KB | 0644 |
|
starlette.py | File | 22.67 KB | 0644 |
|
starlite.py | File | 9.85 KB | 0644 |
|
stdlib.py | File | 8.06 KB | 0644 |
|
threading.py | File | 2.87 KB | 0644 |
|
tornado.py | File | 7.17 KB | 0644 |
|
trytond.py | File | 1.7 KB | 0644 |
|
wsgi.py | File | 9.36 KB | 0644 |
|