#!/opt/cloudlinux/venv/bin/python3 -sbb
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT
#
import os
import contextlib
import sqlite3
from datetime import datetime, timedelta
from sqlalchemy import (
Column,
Boolean,
DateTime,
Integer,
String,
create_engine,
event, func, text
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.orm import Session
from sqlalchemy.orm.session import close_all_sessions
from sqlalchemy.exc import DatabaseError
SSA_DB = '/var/lve/ssa.db'
OLD_SSA_DB = SSA_DB + '.old'
RETENTION_TIME_DAYS = 1
Base = declarative_base()
class RequestResult(Base):
"""
Describes processed request stored in database file.
E.g.
{
"timestamp": "1650008727",
"url": "http://mydomain.com/index.php",
"duration": 162077,
"hitting_limits": false,
"throttled_time": 0,
"io_throttled_time": 0,
"wordpress": true
}
Note: created_at, updated_at is saved in local TZ format
"""
__tablename__ = 'scrape_result'
id = Column(Integer, primary_key=True)
domain = Column(String, index=True, nullable=False)
path = Column(String, index=True, nullable=False)
timestamp = Column(Integer, nullable=False)
duration = Column(Integer, nullable=False)
is_slow_request = Column(Boolean, nullable=False)
hitting_limits = Column(Boolean, nullable=False)
throttled_time = Column(Integer, nullable=False)
io_throttled_time = Column(Integer, nullable=False)
wordpress = Column(Boolean, nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now(), server_default=func.now())
def cleanup_old_data(engine):
"""
Removes outdated records from database, saving disk space.
"""
n_days_ago = datetime.today() - timedelta(days=RETENTION_TIME_DAYS)
with session_scope(engine) as session:
session.query(RequestResult)\
.filter(RequestResult.created_at < n_days_ago)\
.delete()
def create_db_if_not_exist(engine):
if not is_db_present(engine):
Base.metadata.create_all(engine)
def is_db_present(engine):
if not os.path.isfile(SSA_DB):
return False
database_inspection = Inspector.from_engine(engine)
tables = [table for table in database_inspection.get_table_names()]
return len(tables) > 0
def setup_wal_mode(dbapi_con, con_record):
dbapi_con.execute('PRAGMA journal_mode = WAL')
def _setup_database(readonly):
connection_string = f'file:{SSA_DB}'
if readonly:
connection_string = f'{connection_string}?mode=ro'
creator = lambda: sqlite3.connect(connection_string, uri=True)
engine = create_engine(
'sqlite:////', creator=creator, echo=False,
)
event.listen(engine, 'connect', setup_wal_mode)
create_db_if_not_exist(engine)
return engine
def setup_database(readonly=False):
return _setup_database(readonly)
def restore_database(engine):
"""
Restore database by establish connections to old and new databases,
merge data to new one if possible and delete old one.
"""
if os.path.exists(SSA_DB):
# Closing all sessions to ensure that no sessions is using database during replacing
close_all_sessions()
os.replace(SSA_DB, OLD_SSA_DB)
new_engine = setup_database()
# Dispose of the existing engine to close and refresh all connections, ensuring it connects to the new database
engine.dispose()
old_engine = create_engine(f'sqlite:////{OLD_SSA_DB}')
try:
with session_scope(old_engine) as session_old, session_scope(new_engine) as session_new:
# Check if old database is able to read and it make sense to try save unharmed data
session_old.query(RequestResult).first()
merge_unharmed_data_from_database(session_old, session_new)
except DatabaseError:
pass
for path in (OLD_SSA_DB, SSA_DB + "-wal", SSA_DB + "-shm"):
if os.path.exists(path):
os.remove(path)
def merge_unharmed_data_from_database(session_old, session_new):
"""
Scrape all unharmed records from malformed database and merge them into new database.
"""
offset = 0
batch_size = 10
while True:
query = session_old.query(RequestResult).offset(offset).limit(batch_size)
try:
records_to_save = query.all()
if not records_to_save:
break
for record in records_to_save:
session_new.merge(record)
except DatabaseError:
for pos_in_batch in range(batch_size):
try:
record_to_save = query.offset(offset + pos_in_batch).first()
if not record_to_save:
break
session_new.merge(record_to_save)
except DatabaseError:
pass
session_new.commit()
offset += batch_size
def is_malformed_database(engine):
"""
Try integrity check of database file to see if it is malformed.
If database unable to execute it, will also count as malformed.
"""
if os.path.exists(OLD_SSA_DB) and os.path.exists(SSA_DB):
os.remove(OLD_SSA_DB)
try:
with session_scope(engine) as db:
result = db.execute(text("PRAGMA integrity_check"))
errors = result.fetchall()
return errors[0][0] != 'ok'
except DatabaseError:
return True
@contextlib.contextmanager
def session_scope(engine) -> Session:
"""
Provide a transactional scope around a series of operations.
"""
session = Session(bind=engine)
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()