#!/opt/cloudlinux/venv/bin/python3
import re
from dataclasses import dataclass, asdict
from sqlalchemy import func
from sqlalchemy.sql.expression import literal_column
from wmt.common import cfg
from wmt.db import ScrapeResult, session_scope
@dataclass
class SummaryReport:
count_all: int
count_successful: int
count_failed: int
count_undone: int
average_time: float
def to_template(self, *args):
return [self.count_all,
self.count_successful,
self.count_failed,
self.count_undone,
# emails need time in ms
int(self.average_time / 10**3)]
@dataclass
class ErrorReport:
code: str
count_errors: int
url: str
def to_template(self, alternative):
url = url_to_domain(self.url)
if alternative == 'html':
url = f'<a href="{self.url}">{url}</a>'
return [url,
self.count_errors,
self.code]
@dataclass
class DurationReport:
url: str
average_time: float
def to_template(self, alternative):
url = url_to_domain(self.url)
if alternative == 'html':
url = f'<a href="{self.url}">{url}</a>'
return [url,
# emails need time in ms
int(self.average_time / 10**3)]
def url_to_domain(url):
pattern = r'http(s)?://'
return re.sub(pattern, '', url)
def generate_report(engine, start_date, end_date):
with session_scope(engine) as session:
# gets counter per status code per website -> group key: website: status_code pair
# e.g (test.com 404 3), (test.com 500 2)
subquery = session.query(ScrapeResult.response_code,
ScrapeResult.website,
func.count().label('err_count')).\
filter(ScrapeResult.create_date >= start_date,
ScrapeResult.create_date <= end_date,
ScrapeResult.response_code != 200,
ScrapeResult.is_finished == True)\
.group_by(ScrapeResult.response_code, ScrapeResult.website)\
.subquery()
# group previous subquery by website
# code count website
# [('451,500', 3, 'http://www.flightradar24.com'),
# ('404', 2, 'http://broken.com')]
error_stats = session.query(func.group_concat(subquery.c.response_code),
func.sum(subquery.c.err_count),
subquery.c.website)\
.group_by(subquery.c.website)\
.all()
# website avg ms count
# [('http://www.stackoverflow.com', 538.0816599732262, 2241),
# ('http://www.suser.com', 66.53859883980365, 2241)]
success_stats = session.query(ScrapeResult.website,
func.avg(ScrapeResult.response_time_ms).label('average_time'),
func.count())\
.filter(ScrapeResult.create_date >= start_date,
ScrapeResult.create_date <= end_date,
ScrapeResult.response_code == 200)\
.group_by(ScrapeResult.website)\
.order_by(literal_column('average_time').desc()) \
.all()
count_unsuccessful = session.query(ScrapeResult)\
.filter(ScrapeResult.create_date >= start_date,
ScrapeResult.create_date <= end_date,
ScrapeResult.is_finished == False)\
.count()
success_stats = [(url, average_time, count) for url, average_time, count in success_stats if
not cfg.is_domain_ignored(url)]
error_stats = [(code, count, url) for code, count, url in error_stats if
not cfg.is_domain_ignored(url)]
error_report = [ErrorReport(code=code, count_errors=count_errors, url=url)
for code, count_errors, url in error_stats]
duration_report = [DurationReport(url=url, average_time=int(round(average_time * 1000)))
for url, average_time, _ in success_stats]
successful_requests_count = sum(success_count for url, _, success_count in success_stats)
error_requests_count = sum(errors_count for _, errors_count, url in error_stats)
averages = [item[1] for item in success_stats]
average_count = 0 if not averages else int(round(1000 * sum(averages) / len(averages)))
summary_report = SummaryReport(count_all=successful_requests_count + error_requests_count + count_unsuccessful,
count_successful=successful_requests_count,
count_failed=error_requests_count,
count_undone=count_unsuccessful,
average_time=average_count)
return {
'summary_report': summary_report,
'error_report': error_report,
'duration_report': duration_report
}
def report_dict(report):
return {
'summary_report': asdict(report['summary_report']),
'error_report': [asdict(item) for item in report['error_report']],
'duration_report': [asdict(item) for item in report['duration_report']]
}