404

[ Avaa Bypassed ]




Upload:

Command:

elspacio@3.147.74.100: ~ $
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.


This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 
See the GNU General Public License for more details.


You should have received a copy of the GNU General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.

Copyright © 2019 Cloud Linux Software Inc.

This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import json
import logging
import os
from contextlib import suppress
from functools import partial
from itertools import chain
from typing import Callable

from peewee import SqliteDatabase, TextField

from defence360agent.contracts.config import LocalConfig
from imav.migration_utils.other import im360_present
from imav.migration_utils.plesk_sdk import PleskSdk
from imav.migration_utils.revisium import find_revisium_db

logger = logging.getLogger(__name__)

db = SqliteDatabase(None)


class KeyValue(db.Model):
    """ex-Revisium settings database model"""

    key = TextField()
    value = TextField()

    class Meta:
        primary_key = False


class SettingsExtractor:
    """Base class for ex-Revisium and Plesk settings extractors"""

    def __init__(self, key, default):
        self.key = key
        self.default = default

    def _get(self):
        raise NotImplementedError

    def get(self):
        try:
            value = self._get()
        except Exception as e:
            logger.warning("Fail to get %r value: %r", self.key, e)
            return self.default

        with suppress(Exception):
            value = json.loads(value)

        return value


class Revisium(SettingsExtractor):
    """ex-Revisium database settings extractor"""

    def _get(self):
        return KeyValue.get(KeyValue.key == self.key).value


class Plesk(SettingsExtractor):
    """Plesk settings extractor using PHP wrapper"""

    def _get(self):
        return PleskSdk.settings__get(self.key, self.default)


class ConfigMapping:
    """
    Transform a single or multiple dependent ex-Revisium settings parameters
    to a single ImunifyAV/360 config value and map it to a single or multiple
    config keys
    """

    def __init__(
        self,
        source: SettingsExtractor | tuple[SettingsExtractor, ...],
        target: str | tuple[str, ...],
        *,
        converter: Callable,
    ):
        """
        :param source: settings extractor(s) for ex-Revisium / Plesk
        :param target: name(s) of target config parameter(s) for ImunifyAV/360
        :param converter: a callable to convert source value(s) to a target one
        """
        if not isinstance(source, tuple):
            source = (source,)
        if not isinstance(target, tuple):
            target = (target,)

        self.source = source
        self.target = target
        self.converter = converter

    def convert(self):
        """Get target value from source value(s) and assign it to target(s)"""
        source_values = [source.get() for source in self.source]

        try:
            target_value = self.converter(*source_values)
        except Exception as e:
            logger.warning(
                "Fail to convert %r value(s) (%r): %r",
                self.source,
                source_values,
                e,
            )
            return ()

        return tuple(zip(self.target, (target_value,) * len(self.target)))


def clamp(minimum: int, maximum: int, value: int) -> int:
    """
    Ensure that a value is within limits
    """
    return max(minimum, min(value, maximum))


def intensity_cpu(value, max_value) -> int:
    """
    Calculate ImunifyAV/360 CPU intensity based on a value in a range
    """
    intensity = 4  # the middle point of the CPU intensity
    with suppress(ZeroDivisionError):
        intensity = intensity * value // max_value

    return clamp(2, 7, intensity)


schedule_interval_mapping = {
    "never": "none",
    "daily": "day",
    "weekly": "week",
    "monthly": "month",
}


def get_max_possible_cpu() -> int:
    """A half of available CPUs/cores (at least one)"""
    return max((os.cpu_count() or 1) // 2, 1)


# we agreed to have all default settings in IM360 except for scanning time
scan_time_mapping_only = (
    ConfigMapping(
        Plesk("ra_auto_scan_period", "monthly"),
        "MALWARE_SCAN_SCHEDULE.interval",
        converter=schedule_interval_mapping.__getitem__,
    ),
    ConfigMapping(
        Plesk("ra_hour_auto_scan", 4),
        "MALWARE_SCAN_SCHEDULE.hour",
        converter=partial(clamp, 0, 23),
    ),
)

all_mappings = (
    ConfigMapping(
        (
            Plesk("ra_max_worker_count", 2),
            Plesk("ra_max_possible_worker_count", get_max_possible_cpu()),
        ),
        (
            "MALWARE_SCAN_INTENSITY.cpu",
            "MALWARE_SCAN_INTENSITY.user_scan_cpu",
        ),
        converter=intensity_cpu,
    ),
    ConfigMapping(
        Plesk("ra_keep_backups_days", 7),
        "MALWARE_CLEANUP.keep_original_files_days",
        converter=partial(max, 1),
    ),
    ConfigMapping(
        Plesk("ra_trim_files", True),
        "MALWARE_CLEANUP.trim_file_instead_of_removal",
        converter=bool,
    ),
    ConfigMapping(
        Revisium("ra_use_ignore_list_by_user", True),
        "PERMISSIONS.user_ignore_list",
        converter=bool,
    ),
) + scan_time_mapping_only


def migrate_imav4plesk_settings(database=None):
    db.init(database)

    # convert strings like `MALWARE_SCAN_INTENSITY.cpu` to dicts and join them
    config = {}
    mappings = scan_time_mapping_only if im360_present() else all_mappings
    for target, value in chain.from_iterable(m.convert() for m in mappings):
        d = config
        *path, param = target.split(".")
        for key in path:
            d = d.setdefault(key, {})
        d[param] = value

    return config


def migrate(migrator, database, fake=False, **kwargs):
    if fake:
        return

    revisium_db_path = find_revisium_db()
    if revisium_db_path is None:
        logger.info("No legacy ImunifyAV database found. Skipping...")
        return

    try:
        if migrated_config := migrate_imav4plesk_settings(revisium_db_path):
            LocalConfig().dict_to_config(migrated_config, normalize=False)
    except Exception as e:
        logger.warning("Failed to migrate ImunifyAV for Plesk settings: %r", e)


def rollback(migrator, database, fake=False, **kwargs):
    pass


if __name__ == "__main__":
    print(migrate_imav4plesk_settings(find_revisium_db()))

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
000_noop_migration.py File 968 B 0644
001_fix_scan_unserialization.py File 2.85 KB 0644
002_convert_serialized_scans.py File 2.5 KB 0644
003_add_new_fields_to_malware_history.py File 1.29 KB 0644
003_whmapi1_set_importance_imav.py File 1.64 KB 0644
004_add_malwarehit_owner.py File 1.1 KB 0644
004_plesk_configs.py File 6.71 KB 0644
005_plesk_cleanup_storage.py File 4.01 KB 0644
005_populate_malwarehit_owner.py File 1.12 KB 0644
006_add_malwarehit_snippet.py File 1.11 KB 0644
007_add_malwarehistory_fileuser.py File 1.13 KB 0644
007_revisium_ignore_list.py File 2.62 KB 0644
008_populate_malwarehistory_foleuser.py File 1.16 KB 0644
008_subscription_permissions.py File 3.57 KB 0644
009_revisium_scan_history.py File 8.99 KB 0644
010_add_malwarehistory_scan_id.py File 1.13 KB 0644
011_add_malwarehistory_index.py File 1.49 KB 0644
012_add_rescan_outdated_scan_type.py File 1.4 KB 0644
__init__.py File 0 B 0644