import logging
import functools
import os
import secrets
import shutil
import string
import subprocess
from typing import Any, Callable, Type
from defence360agent.utils import LazyLock
logger = logging.getLogger(__name__)
class RulesLock(LazyLock):
pass
class ModSecLock(LazyLock):
pass
def return_value_on_error(error: Type[Exception], value: Any) -> Callable:
"""
Decorator that causes coroutine to return *value* if it raises *error*.
For example:
return await return_value_on_error(FileNotFoundError, X)(coro)(*args)
is equivalent to:
try:
return await coro(*args)
except FileNotFoundError:
return X
"""
def decorator(coro):
@functools.wraps(coro)
async def wrapper(*args, **kwargs):
try:
return await coro(*args, **kwargs)
except error as e:
logger.info("Replacing %s from %s with %s", e, coro, value)
return value
return wrapper
return decorator
class PasswordChangeError(Exception):
def __init__(self, stderr, returncode):
super().__init__(
f"Password change process exited with code {returncode}: {stderr}"
)
def generate_strong_password(
length=20,
required_char_groups=(
string.ascii_lowercase,
string.ascii_uppercase,
string.digits,
string.punctuation,
),
):
"""
Return password with *length* char.
It is guaranteed that password have at least one character from
each given *required_char_groups*
"""
if length < len(required_char_groups):
raise ValueError(
f"Given {length=} is too short. "
"Can't get chars from all required groups "
f"{len(required_char_groups)=}"
)
assert all(
map(
len,
required_char_groups,
)
), "got empty char group"
alphabet = "".join(required_char_groups)
while True:
# generate password with given *length*
# take characters from *alphabet* randomly
# (uniformly/with equal probability)
password = "".join([secrets.choice(alphabet) for _ in range(length)])
# retry if the password is missing some required char groups
if all(
any(c in group for c in password) for group in required_char_groups
):
return password
def change_system_password(username, new_password, *, passwd_cmd=None):
if passwd_cmd is None:
passwd_cmd = [shutil.which("passwd", path=os.defpath)]
assert isinstance(passwd_cmd, list)
try:
subprocess.run(
[*passwd_cmd, username],
input=b"\n".join([new_password.encode()] * 2),
capture_output=True,
check=True,
)
except subprocess.CalledProcessError as e:
raise PasswordChangeError(e.stderr, e.returncode)