# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
"""
This module contains Autotracer class
"""
__package__ = 'ssa.modules'
import json
import logging
import os
from collections import defaultdict
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from fnmatch import fnmatchcase
from typing import Iterator, Optional, NamedTuple, List, Iterable
from urllib.parse import urlparse
import numpy as np
from ssa.db import setup_database
from ssa.internal.exceptions import SSAError
from ssa.modules.common import Common
from ssa.modules.storage import (
iter_domains_data,
iter_urls_data,
get_url_durations
)
from ssa.autotracing import status, disabled_users, misconfiguration_checks
from ssa.configuration import load_tunables
from ssa.configuration.schemes import autotracing_tunables_schema
from ssa.internal.constants import autotracing_stats_file
from ssa.internal.utils import (
is_xray_version_supported,
is_kernel_version_supported,
sentry_init
)
xray_import_error = None
try:
import xray.shared_library as xray_lib
except ImportError as e:
xray_import_error = e
xray_lib = None
@dataclass
class URL:
"""URL data container"""
uri: str # full URL including domain and URI
avg_duration: int # average duration during the day
max_duration: int # maximum duration during the day
total_slow_reqs: int # total SLOW request count during the day
total_reqs: int # total request count during the day
belongs_to: str # domain
belongs_to_user: str # domain owner
@dataclass
class RulesRejects:
"""Rejects counters container"""
non_wp: int = 0 # number of URLs, skipped due to non-WP site
throttled: int = 0 # number of URLs, skipped due to throttling
density: int = 0 # number of URLs, skipped due to low density
slowness: int = 0 # number of URLs, skipped because they are considered fast
max_slowness: int = 0 # number of URLs, skipped because they are considered too slow
disabled: int = 0 # number of URLs, skipped due to disabled autotracing
nginx: int = 0 # number of URLs, skipped due to nginx caching
frequency: int = 0 # number of URLs, skipped due to min_retracing_interval
server_limit: int = 0 # number of URLs, skipped due to per-server limit
domain_limit: int = 0 # number of URLs, skipped due to per-domain limit
no_domain: int = 0 # number of URLs, skipped due to unavailable domain info (e.g. no domain)
@dataclass
class Stats:
"""AutoTracer statistics container"""
rules_version: str # version of rules applied from autotracing.json
urls_processed: int = 0 # number of URLs, processed within the latest iteration
urls_selected: int = 0 # number of URLs, selected to launch autotracing tasks
rejects: RulesRejects = field(default_factory=RulesRejects) # rejects counters
class URLS(NamedTuple):
"""
Representation of an URL
"""
domain_name: str
uri_path: str
def url_split(url: str) -> URLS:
"""
Split URL into domain_name and uripath including query string
:param url: URL of format protocol://domain/path;parameters?query#fragment
:return: namedtuple URL(domain_name, uripath)
"""
fragments = urlparse(url)
qs = f'?{fragments.query}' if fragments.query else ''
uri = f'{fragments.path}{qs}' if fragments.path else '/'
_no_www_netloc = fragments.netloc.replace('www.', '')
_no_port_netloc = _no_www_netloc.split(':')[0]
return URLS(_no_port_netloc, uri)
class AutoTracer(Common):
"""
SSA autotracing module implementation.
"""
def __init__(self, engine=None):
super().__init__()
self.logger = logging.getLogger('auto_tracer')
self.logger.info('AutoTracer enabled: %s', __package__)
self.disabled_users = list()
self.tasks_list = list()
self.stats = None # initial value for an iteration statistics
self.engine = engine if engine else setup_database()
def __call__(self):
try:
misconfiguration_checks()
except SSAError as exc:
self.logger.info('AutoTracer skipped: %s', exc.reason)
return
if xray_lib is None:
# to send sentry event only if x-ray really not installed
if os.path.exists('/usr/sbin/cloudlinux-xray-agent'):
self.logger.error('AutoTracer skipped: X-ray module import error: %s', str(xray_import_error))
else:
self.logger.info('AutoTracer skipped: X-Ray not installed')
return
start_tool = xray_lib.start_autotracing
if start_tool is not None:
self.logger.info('AutoTracer started')
# reload config
super().__init__()
self.logger.debug('AutoTracer loaded config: %s', self.config)
# start gather own statistics for a current iteration
self.stats = Stats(self.rules_version)
for url in self.urls_scheduled():
self.logger.info("Starting auto task for %s", url.uri)
self.stats.urls_selected += 1
try:
start_tool(url=url.uri, tracing_count=self.request_number)
except Exception as e:
# most likely XRayError
self.logger.error('Failed to start task: %s', str(e))
self.save_iteration_stats()
@staticmethod
def load_conf() -> dict:
"""
Load configuration
"""
return load_tunables('autotracing.json',
autotracing_tunables_schema)
@staticmethod
def gets_domaininfo(domain_name: str) -> Optional[object]:
"""
Gets domain info for the specified domain
"""
if xray_lib is None:
return None
return xray_lib.domain_info(domain_name)
def nginx_is_enabled(self, user_name: str) -> bool:
"""
Says if nginx is enabled for a specific user
"""
if self.skip_nginx:
if xray_lib is None:
return False
return xray_lib.NginxUserCache(user_name).is_enabled
return False
def per_server_limit_recalculated(self) -> int:
"""Recalculate limit per server taking into account running tasks"""
running_count = len([task for task in self.tasks_list if
task['status'] == 'running' and task[
'user'] == '*autotracing*'])
self.logger.debug('Number of running autotracing tasks %s',
running_count)
return self.per_server_limit - running_count
def excludes_old_tasks(self, full_list: Optional[list] = None) -> list:
"""
Excludes tasks older than N days from the general list of tasks
"""
if full_list is None:
full_list = xray_lib.tasks_list()
self.logger.debug('Task list loaded %s', full_list)
n_days_ago = datetime.now() - timedelta(
days=self.min_retracing_interval)
return [task for task in full_list
if (task.get('createtime') if task.get('createtime') is not None else 0) > int(n_days_ago.timestamp())]
def exclude_thesame_urls(self, current_url: str) -> bool:
"""
Excludes url from the list if it completely matches the current url
or if domain names match and "*" follows the domain name in the list
"""
# c - current parsed url
c = url_split(current_url)
for task_data in self.tasks_list:
# t - task parsed url
t = url_split(task_data['url'])
direct_match = c.domain_name == t.domain_name and c.uri_path == t.uri_path
wildcard_match = fnmatchcase(
c.domain_name, t.domain_name) and fnmatchcase(c.uri_path,
t.uri_path)
if direct_match or wildcard_match:
self.logger.debug(
'Skipped: URL %s was traced recently. Matched by %s',
current_url, task_data['url'])
return True
return False
def pass_by_density(self, url_total_reqs: list,
domain_total_reqs: list) -> bool:
"""Check that URL density passes given threshold"""
if self.density:
url_density = np.amin(
np.corrcoef(url_total_reqs, domain_total_reqs))
self.logger.debug('Calculated density %s', url_density)
return url_density > self.density_threshold
return True
def pass_by_slowness_percentile(self, url_durations: Iterable[int]) -> bool:
"""
The measure of "slowness" for URL is:
at least N% of its requests take more than X seconds.
N% -- self.slow_duration_percentage
X -- self.slow_duration_threshold
"""
reversed_percentile = 100 - self.slow_duration_percentage
reversed_percentile_value = np.percentile(url_durations,
reversed_percentile)
self.logger.debug('Calculated %sth percentile %s for min duration',
reversed_percentile,
reversed_percentile_value)
return reversed_percentile_value >= self.slow_duration_threshold
def pass_by_max_slowness_percentile(self,
url_durations: Iterable[int]) -> bool:
"""
The opposite to pass_by_slowness_percentile method.
The measure of "much slowness" for URL is:
at least N% of its requests must take less than X seconds.
N% -- self.max_slow_duration_percentage
X -- self.max_slow_duration_threshold
"""
percentile_value = np.percentile(url_durations,
self.max_slow_duration_percentage)
self.logger.debug('Calculated %sth percentile %s for max duration',
self.max_slow_duration_percentage,
percentile_value)
return percentile_value <= self.max_slow_duration_threshold
def pass_by_allowed_throttling(self, url_throttled_reqs: Optional[list],
url_total_reqs: list) -> bool:
"""
Check that percent of throttled requests per URL passes given threshold
"""
if url_throttled_reqs is None:
# skip URL with unavailable throttling info
return False
throttled_percent = (sum(url_throttled_reqs)/sum(url_total_reqs))*100
self.logger.debug('Calculated throttled percent %s', throttled_percent)
return throttled_percent <= self.allowed_throttling_percentage
def pass_by_engine(self, wp_status: Optional[bool]) -> bool:
"""
Check that URLs of a particular domain should be analyzed.
For now we skip non-wordpress sites
"""
if self.only_wp:
# turn unavailable wp_status, aka None, into False
return bool(wp_status)
return True
def urls_computationally_filtered(self) -> Iterator[URL]:
"""
Select all URLs suitable for auto tracing by very basic rules:
- WP site
- suitable throttling
- suitable density
- measure of "slow" URL
ORDER OF RULES MUST NOT BE CHANGED: IT AFFECTS STATISTICS COUNTERS
"""
for domain_data in iter_domains_data(self.engine):
domain_owner = None
if not self.pass_by_engine(domain_data.is_a_wordpress_domain):
# rule ordering #1: skip non-wordpress sites
# corresponds to non_wp RulesRejects counter
self.logger.debug('Skipped by engine: non-wordpress')
# all URLs of non-WP site are skipped then,
# thus counters (reject abd total) are to be increased
# on number of these URLs ignoring non-URL data fields
skipped_count = domain_data.urls_number
self.stats.rejects.non_wp += skipped_count
self.stats.urls_processed += skipped_count
continue
domain_url_durations = dict(get_url_durations(
self.engine, domain_data.domain_name))
for url, data in iter_urls_data(self.engine,
domain_data.domain_name, list(domain_url_durations.keys())):
if url in self.non_url_fields:
# skip entry of domain requests counter
continue
self.logger.debug('Processing URL %s', url)
self.stats.urls_processed += 1
if not self.pass_by_allowed_throttling(
data.get('url_throttled_reqs'), data['url_total_reqs']):
# rule ordering #2: percent of throttled requests
# corresponds to throttled RulesRejects counter
self.logger.debug('Skipped by throttled percent')
self.stats.rejects.throttled += 1
continue
if not self.pass_by_density(data['url_total_reqs'],
domain_data.domain_total_reqs):
# rule ordering #3: density threshold
# corresponds to density RulesRejects counter
self.logger.debug('Skipped by density')
self.stats.rejects.density += 1
continue
durations = domain_url_durations[url]
if not self.pass_by_slowness_percentile(durations):
# rule ordering #4: slowness assessment
# corresponds to slowness RulesRejects counter
self.logger.debug('Skipped by slowness percentile')
self.stats.rejects.slowness += 1
continue
if not self.pass_by_max_slowness_percentile(durations):
# rule ordering #5: maximum allowed slowness assessment
# corresponds to max_slowness RulesRejects counter
self.logger.debug('Skipped by max slowness percentile')
self.stats.rejects.max_slowness += 1
continue
# the URL has passed all computational checks
# from here we already need username
if domain_owner is None:
domain_info = self.gets_domaininfo(domain_data.domain_name)
if domain_info is None:
# this generally indicates "Domain does not exist"
# error, we should skip such URLs
self.logger.debug('Skipped by unavailable domain info')
self.stats.rejects.no_domain += 1
continue
else:
domain_owner = domain_info.user
yield URL(url,
avg_duration=int(np.mean(durations)),
total_slow_reqs=sum(data['url_slow_reqs']),
total_reqs=sum(data['url_total_reqs']),
max_duration=max(durations),
belongs_to=domain_data.domain_name,
belongs_to_user=domain_owner)
def urls_selected(self,
stats_collected: Optional[dict] = None) -> Iterator[URL]:
"""
From selected by computed thresholds URLs take those for which:
- autotracing enabled
- nginx disabled
ORDER OF RULES MUST NOT BE CHANGED: IT AFFECTS STATISTICS COUNTERS
"""
# fill disabled users list, initially empty
self.fill_in_disabled_users()
for url in self.urls_computationally_filtered():
if url.belongs_to_user in self.disabled_users:
# rule ordering #6: skip users for whom autotracing is disabled
# corresponds to disabled RulesRejects counter
self.logger.debug('Skipped: autotracing is disabled for %s',
url.belongs_to_user)
self.stats.rejects.disabled += 1
continue
if self.nginx_is_enabled(url.belongs_to_user):
# rule ordering #7: skip url the owner of which uses nginx
# corresponds to nginx RulesRejects counter
self.logger.debug('Skipped: nginx is enabled for %s',
url.belongs_to_user)
self.stats.rejects.nginx += 1
continue
yield url
def urls_scheduled(self) -> Iterator[URL]:
"""
Schedule autotracing by sorted list taking into account the limits:
- no same task for 10 days
- limit per server
- limit per domain
ORDER OF RULES MUST NOT BE CHANGED: IT AFFECTS STATISTICS COUNTERS
Return resulting list of URLs scheduled for auto tracing
"""
# initial limit counters
general_tasks_counter = 0
tasks_counter_per_domain = defaultdict(int)
# sort selected URLs
sorted_urls = self.urls_sorted()
self.logger.debug('Sorted scheduled list %s', sorted_urls)
# get task list and filter out all old tasks in one turn
self.tasks_list = self.excludes_old_tasks()
per_server_smart_limit = self.per_server_limit_recalculated()
for url in sorted_urls:
if self.exclude_thesame_urls(url.uri):
# rule ordering #8: limit of days for url (min_retracing_interval)
# corresponds to frequency RulesRejects counter
self.stats.rejects.frequency += 1
continue
if general_tasks_counter < per_server_smart_limit:
if tasks_counter_per_domain[url.belongs_to] < self.per_domain_limit:
general_tasks_counter += 1
tasks_counter_per_domain[url.belongs_to] += 1
yield url
else:
# rule ordering #9: domain limit is hit
# corresponds to domain_limit RulesRejects counter
self.logger.debug('Skipped URL %s by domain limit (%s)',
url.uri,
self.per_domain_limit)
self.stats.rejects.domain_limit += 1
else:
# rule ordering #10: server limit is hit
# corresponds to server_limit RulesRejects counter
self.logger.debug('Skipped URL %s by server limit (%s)',
url.uri,
per_server_smart_limit)
self.stats.rejects.server_limit += 1
def urls_sorted(self) -> List[URL]:
"""
Sort URLs by total number of requests first
and by average duration second
"""
first_serie = sorted(list(self.urls_selected()),
key=lambda u: u.avg_duration, reverse=True)
return sorted(first_serie, key=lambda u: u.total_reqs,
reverse=True)
def fill_in_disabled_users(self) -> None:
"""
Fill internal list of disabled users
"""
self.disabled_users = disabled_users()
def save_iteration_stats(self) -> None:
"""
Save collected statistics for current iteration to file
"""
if self.stats is not None:
try:
with open(autotracing_stats_file, 'w') as stats_file:
json.dump(asdict(self.stats), stats_file)
except OSError as e:
self.logger.warning(
'Unable to save iteration stats to file: %s', str(e))
def load_iteration_stats(self) -> dict:
"""
Load statistics for latest iteration from file
"""
try:
with open(autotracing_stats_file) as stats_file:
_data = json.load(stats_file)
stat_data = Stats(_data['rules_version'],
_data['urls_processed'],
_data['urls_selected'],
RulesRejects(**_data['rejects']))
except (OSError, json.JSONDecodeError, KeyError):
stat_data = Stats(self.rules_version)
return asdict(stat_data)
def get_stats(self) -> dict:
""""""
# load saved stats
stats_loaded = self.load_iteration_stats()
stats_loaded.update(dict(
status=status()[0],
disabled_users_quantity=len(disabled_users())
))
return stats_loaded
if __name__ == "__main__":
sentry_init()
logging.basicConfig(filename='auto_tracer_standalone.log',
level=logging.DEBUG)
t = AutoTracer()
t()