#!/opt/cloudlinux/venv/bin/python3 -bb
import asyncio
import os
import sys
import aiohttp
import signal
import time
import concurrent.futures
from datetime import datetime, timedelta
from typing import Optional, Iterable
from dataclasses import dataclass
from sqlalchemy import or_
from wmt.db import (
ScrapeResult,
DomainAlerts,
setup_database,
session_scope,
cleanup_old_data
)
from wmt.common.utils import get_domains, setup_logger, save_pid_and_lock, intersect
from clsentry import init_sentry_client
from clsentry.utils import get_pkg_version
from wmt.common.const import (
PING_TIMEOUT_STATUS_CODE,
SENTRY_DNS,
ERROR_DOMAINS_PING_RETRY_INTERVAL,
ERROR_DOMAINS_ALERT_INTERVAL,
WMT_LOCK_FILE,
PING_CONNECTIONS,
LICENSE_EXPIRED_FAREWELL_LETTER_MARKER,
LICENSE_CHECK_PAUSE
)
from wmt.common.notification import Notifier, SupportedNotificationTypes
from wmt.common.report import ErrorReport
from wmt.common import cfg
from cllicense import CloudlinuxLicenseLib
logger = setup_logger('wmt_scanner')
def reload_conf(sig_number, frame):
cfg.reload()
logger.info('Reloading config: %s', str(cfg.to_dict()))
def shutdown(sig_number, frame):
"""
Shutdown to call finally block to
close all fds, remove lock and file
see: save_pid_and_lock()
"""
sys.exit(0)
@dataclass
class ScrapeCoroResult:
url: str
response_code: Optional[int] = None
response_time_ms: Optional[int] = None
async def ping(url, ping_timeout, semaphore):
"""
Main 'pinger'
1. Requests domains
- if domain responded - keep status code
- if no response for timeout - keep Timeout status code
- if unreachable (ConnectionError or so) - keep 523 status code
(same logic as go implementation)
"""
async with aiohttp.ClientSession() as session, semaphore:
start = time.time()
try:
async with session.get(url, timeout=ping_timeout) as resp:
return ScrapeCoroResult(
url, response_code=resp.status,
response_time_ms=int(1000 * (time.time() - start))
)
except concurrent.futures.TimeoutError:
return ScrapeCoroResult(url,
response_code=PING_TIMEOUT_STATUS_CODE,
response_time_ms=ping_timeout * 1000)
except aiohttp.client_exceptions.ClientError:
# logger.error(f'Error while requesting {url}, reason: {e}')
# 523 is code for unreachable resource
# same logic as in go implementation
return ScrapeCoroResult(url, response_code=523)
def executors(ping_timeout, semaphore, ping_target_domains=None):
"""
ping_timeout: specified in config timeout time (s) for request
semaphore: semaphore obj to handle asyncio tasks
ping_target_domains: mostly needed for re-pinging error domains
"""
domains = get_domains()
if ping_target_domains is not None:
domains &= set(ping_target_domains)
logger.debug('Those domains will be pinged: %s', str(domains))
for domain in domains:
if not cfg.is_domain_ignored(domain):
yield ping(domain, ping_timeout, semaphore)
async def scrape_sites(ping_site_timeout, ping_interval, semaphore, ping_target_domains=None) -> Iterable:
tasks = [asyncio.create_task(coroutine)
for coroutine in executors(ping_site_timeout, semaphore, ping_target_domains)]
if len(tasks) == 0:
return []
pinged, _ = await asyncio.wait(tuple(tasks), timeout=ping_interval)
return pinged
def manage_ping_results(engine, pinged, ping_target_domains=None):
"""
- obtains all scrape coro results from asyncio tasks
- saves ping results to ScrapeResult table
- updates 'is_resolved' field in DomainAlerts table, in case
error domain`s status code was changed to 200
- returns domains with non-200 status code code
"""
finished_domains = set()
errors_domains = {}
resolved = []
with session_scope(engine) as session:
for task in pinged:
result = task.result() # type: ScrapeCoroResult
session.add(ScrapeResult(
website=result.url,
is_finished=True,
response_code=result.response_code,
response_time_ms=result.response_time_ms
))
finished_domains.add(result.url)
if result.response_code != 200:
errors_domains[result.url] = result.response_code
else:
resolved.append(result.url)
# mark no-more failing domains as resolved
session.query(DomainAlerts) \
.filter(DomainAlerts.website.in_(resolved)) \
.update(dict(is_resolved=True), synchronize_session=False)
domains = get_domains()
if ping_target_domains is not None:
domains &= set(ping_target_domains)
unfinished_domains = set(domains) - finished_domains
with session_scope(engine) as session:
for unfinished in unfinished_domains:
session.add(ScrapeResult(
website=unfinished,
is_finished=False
))
return errors_domains
def get_recent_alerts(engine, alert_domains):
"""
- gets websites that must NOT be included in alert email:
less than ERROR_DOMAINS_ALERT_INTERVAL passed or is_resolved marker was not
changed from last alerting
"""
repeat_interval = datetime.now() - timedelta(hours=ERROR_DOMAINS_ALERT_INTERVAL)
with session_scope(engine) as session:
recently_alerted = session.query(DomainAlerts.website) \
.filter(DomainAlerts.website.in_(list(alert_domains.keys())),
or_(DomainAlerts.alert_time > repeat_interval,
DomainAlerts.is_resolved == False))
return [row.website for row in recently_alerted]
def alert(domains_data):
"""
prepares needed error report object with error domains
to be alerted and sends this mail
returns alerted domains
"""
logger.info('Alerts will be sent for %s',
str(list(domains_data.keys())))
error_report = [
ErrorReport(
url=domain,
code=', '.join(map(str, set(codes))),
count_errors=len(codes)
)
for domain, codes in domains_data.items()
]
Notifier(
target_email=cfg.target_email,
from_email=cfg.from_email,
report={
'error_report': error_report
},
notification_type=SupportedNotificationTypes.ALERT).notify()
return domains_data
def flush_alerts(engine, alert_domains):
"""
- gets recently alerted domains (those that must not be alerted again)
and does not include them for alerting
- calls alerting for left domains
- updates DomainAlerts table:
if website was not alerted -> adds new record
if website was alerted before -> updates alert time and is_resolved marker
"""
recently_alerted = get_recent_alerts(engine, alert_domains)
domains_to_alert = {k: v for k, v in alert_domains.items() if k not in recently_alerted}
if not domains_to_alert:
logger.info('All domains "%s" were alerted or still not resolved in last %d hours',
str(list(alert_domains.keys())),
ERROR_DOMAINS_ALERT_INTERVAL)
return
alert(domains_to_alert)
now = datetime.now()
with session_scope(engine) as session:
websites = session.query(DomainAlerts) \
.with_entities(DomainAlerts.website) \
.all()
urls = [row.website for row in websites]
for domain in domains_to_alert:
if domain in urls:
session.query(DomainAlerts) \
.filter(DomainAlerts.website == domain) \
.update(dict(alert_time=now, is_resolved=False))
else:
session.add(DomainAlerts(website=domain, alert_time=now))
def should_be_repinged(error_domains):
if error_domains and cfg.cfg.alert_notifications_enabled:
return True
return False
def cleanup_farewell_letter_marker():
if os.path.exists(LICENSE_EXPIRED_FAREWELL_LETTER_MARKER):
logger.info('CloudLinux license was updated')
os.remove(LICENSE_EXPIRED_FAREWELL_LETTER_MARKER)
def manage_license_farewell():
"""
Sends farewell letter once (if it was not sent before)
"""
try:
if not os.path.exists(LICENSE_EXPIRED_FAREWELL_LETTER_MARKER):
logger.warning('Going to send last email about expired license!')
Notifier(
target_email=cfg.target_email,
from_email=cfg.from_email,
report={},
notification_type=SupportedNotificationTypes.FAREWELL).notify()
except Exception:
logger.exception('Error while managing farewell letter')
async def scrape_iteration(previously_errored, engine):
"""
Scanner logic:
1. Scrapes domains and obtains ping results;
2. Manage ping results (e.g: saving to DB)
3. In case error domains found -> start re-pinging
Re-pinging:
- in min(ping_interval, 5 mins)
- flush alerts if needed
4. Sleep for ping_interval until next ping iteration
"""
start = time.time()
# ping interval parameter stored in minutes in config
ping_interval_seconds = cfg.cfg.ping_interval * 60
try:
cleanup_farewell_letter_marker()
cleanup_old_data(engine)
semaphore = asyncio.Semaphore(PING_CONNECTIONS)
ping_result = await scrape_sites(cfg.cfg.ping_timeout, ping_interval_seconds, semaphore)
error_domains = manage_ping_results(engine, ping_result)
# re-ping
if should_be_repinged(error_domains):
logger.info('Those domains are unsuccessful: %s \n Try to re-ping them', str(error_domains))
# let`s re-ping in ERROR_DOMAINS_PING_RETRY_INTERVAL time
while True:
ping_interval_seconds = cfg.cfg.ping_interval * 60
if ping_interval_seconds > ERROR_DOMAINS_PING_RETRY_INTERVAL:
elapsed_for_ping = time.time() - start
await asyncio.sleep(
min(max(ERROR_DOMAINS_PING_RETRY_INTERVAL - elapsed_for_ping, 0), 10)
)
if elapsed_for_ping > ERROR_DOMAINS_PING_RETRY_INTERVAL:
# re-ping during current ping iteration
ping_retry_result = await scrape_sites(
cfg.cfg.ping_timeout, ping_interval_seconds, semaphore,
ping_target_domains=error_domains.keys()
)
retry_errors = manage_ping_results(engine, ping_retry_result, error_domains.keys())
alert_domains = intersect(error_domains, retry_errors)
break
else:
# error domains will be re-pinged together with other domains
alert_domains = intersect(previously_errored, error_domains)
previously_errored = error_domains
break
if alert_domains:
logger.info('Domains with unsuccessful status code found: "%s"',
str(list(alert_domains.keys())))
flush_alerts(engine, alert_domains)
else:
# clean up, no error domains during current iteration
previously_errored = []
except Exception:
logger.exception('Error during ping iteration!')
finally:
while True:
ping_interval_seconds = cfg.cfg.ping_interval * 60 # it can be modified with reload
elapsed = time.time() - start
sleep_time = min(max(ping_interval_seconds - elapsed, 0), 10) # "10" to check "reload"
await asyncio.sleep(sleep_time)
if elapsed > ping_interval_seconds:
break
return previously_errored
async def scrape_loop():
"""
Main loop for wmt_scanner_solo service
each 'while: True' iteration returns errored domains
(domains that responded with non-200 status code)
"""
engine = setup_database()
previously_errored = []
license_attempt = 0
while True:
if CloudlinuxLicenseLib().get_license_status():
license_attempt = 0
previously_errored = await scrape_iteration(previously_errored, engine)
else:
license_attempt += 1
if license_attempt == 1:
logger.warning('Seems your CloudLinux license is expired!')
# let`s do several attempts to be really
# sure it is not false-positives from CLN or similar
if license_attempt >= 5:
manage_license_farewell()
license_attempt = 0
ping_interval_seconds = cfg.cfg.ping_interval * 60
sleep_time = min(ping_interval_seconds, LICENSE_CHECK_PAUSE)
await asyncio.sleep(sleep_time)
if __name__ == '__main__':
pid = str(os.getpid())
logger.info("PID: %s", pid)
with save_pid_and_lock(WMT_LOCK_FILE, pid):
init_sentry_client('web-monitoring-tool',
get_pkg_version('cl-web-monitoring-tool'),
SENTRY_DNS)
signal.signal(signal.SIGUSR1, reload_conf)
signal.signal(signal.SIGTERM, shutdown)
loop = asyncio.get_event_loop()
loop.run_until_complete(scrape_loop())