#!/opt/alt/python38/bin/python3
import argparse
import inspect
import logging
import os
import sys
import yaml
from restore_infected import backup_backends, helpers, log, restore
from restore_infected.backup_backends_lib import BackendNonApplicableError
logger = logging.getLogger(log.CLI_LOGGER)
logger.setLevel(logging.DEBUG)
# The most complicated logic in `action` methods is just another layer
# of parsing. The reason it wasn't build into ArgumentParser is that it
# requires the information about a chosen `backend`. For example, action_list,
# uses backend method's `spec` to filter out unneeded arguments.
def action(func):
spec = inspect.getfullargspec(func)
def wrapper(parser, extra_args, **kwargs):
if "extra_args" not in spec.args and extra_args:
parser.error(
"unrecognized arguments: " + " ".join(extra_args)
)
func(parser=parser, extra_args=extra_args, **kwargs)
return wrapper
@action
def action_init(backend, extra_args, parser, **_):
"""
Convert unparsed arguments to the appropriate format and validate them
"""
spec = inspect.getfullargspec(backend.init)
parsed_args, parsed_kwargs = helpers.parse_extra_args(extra_args)
env_vars = _get_values_for_envvars_init_action(backend.init, spec)
init_args, init_kwargs = helpers.fill_args_from_env(
spec,
parsed_args,
parsed_kwargs,
env_vars=env_vars,
)
missing, unknown = helpers.validate_params(spec, init_args, init_kwargs)
if missing := [param for param in missing if param not in env_vars]:
parser.error("init arguments required: " + " ".join(missing))
# Ignored for backward compatibility
# if forbidden := [
# env_var for env_var in env_vars
# if env_var in parsed_kwargs
# ]:
# parser.error("forbidden keyword arguments: " + " ".join(forbidden))
if unknown:
parser.error("init unknown keys: " + " ".join(unknown))
backend.init(*init_args, **init_kwargs)
@action
def action_list(backend, until, **kwargs):
spec = inspect.getfullargspec(backend.backups)
args = {k: v for k, v in kwargs.items() if k in spec.kwonlyargs}
backups = backend.backups(until, **args)
if backups:
logger.info(os.linesep.join([str(b) for b in backups]))
else:
sys.exit('No backups found')
@action
def action_restore(backend, files, until, **_):
success, failed = restore.restore_infected(backend, files, until)
if success:
logger.info('Restore success:')
logger.info(os.linesep.join(success))
if failed:
logger.info('Restore failed:')
logger.info(os.linesep.join(failed))
@action
def action_cleanup(backend, **_):
backend.cleanup()
@action
def action_info(backend, **_):
info = backend.info()
if info:
logger.info(yaml.safe_dump(info))
@action
def action_extra(backend, extra_action, extra_args, parser, **_):
extra_func = getattr(backend, extra_action, None)
if not getattr(extra_func, 'extra', False):
parser.error('invalid extra action: {0}'.format(extra_action))
result = extra_func(*extra_args)
if result:
logger.info(result)
# @action
def action_none(parser, subparsers, **_):
parser.error(
"one of the action must be specified: " + " ".join(subparsers.choices)
)
def add_parser(name, subparsers, parent):
return subparsers.add_parser(name, parents=[parent])
def setup_parsers():
parent = argparse.ArgumentParser(add_help=False)
parent.add_argument('-v', '--verbose', action='count', default=0,
help='Print more messages depending on flag count: '
'v=verbose, '
'vv=verbose with timestamps, '
'vvv=debug messages')
parent.add_argument('-o', '--output', dest='output', metavar='PATH',
help='Write log to specified file')
parser = argparse.ArgumentParser(parents=[parent])
parser.set_defaults(action=action_none)
parser.add_argument('backend', metavar='BACKEND',
type=backup_backends.backend,
help='Backup backend to use')
return parser, parent
def setup_subparsers(parser, parent):
subparsers = parser.add_subparsers()
parser_init = add_parser('init', subparsers, parent)
parser_init.set_defaults(action=action_init, action_repr='Init')
parser_list = add_parser('list', subparsers, parent)
parser_list.set_defaults(action=action_list,
action_repr='Getting backup list')
parser_list.add_argument('-u', '--until', default=None, metavar='DATE',
type=helpers.DateTime,
help='Dig backups not earlier this date')
parser_restore = add_parser('restore', subparsers, parent)
parser_restore.set_defaults(action=action_restore, action_repr='Restore')
parser_restore.add_argument('files', nargs='+', metavar='FILE',
help='List files to restore')
parser_restore.add_argument('-u', '--until', default=None, metavar='DATE',
type=helpers.DateTime,
help='Dig backups not earlier this date')
parser_cleanup = add_parser('cleanup', subparsers, parent)
parser_cleanup.set_defaults(action=action_cleanup, action_repr='Cleanup')
parser_info = add_parser('info', subparsers, parent)
parser_info.set_defaults(action=action_info, action_repr='Info')
parser_extra = add_parser('extra', subparsers, parent)
parser_extra.set_defaults(action=action_extra, action_repr='Extra actions')
parser_extra.add_argument('extra_action', help='Extra action for backend')
return subparsers
def list_custom_args_parse(namespace, args):
spec = inspect.getfullargspec(namespace.backend.backups)
keys = spec.kwonlyargs
defaults = spec.kwonlydefaults or {}
parser = argparse.ArgumentParser()
for key in keys:
name = '--%s' % key
argument_kw = {}
if key in defaults:
default = defaults[key]
argument_kw['default'] = default
if isinstance(default, tuple):
argument_kw['nargs'] = '+'
else:
argument_kw['required'] = True
parser.add_argument(name, metavar=key.upper(), **argument_kw)
return parser.parse_known_args(args)
def parse_args(parser, subparsers, args):
namespace, extra_args = parser.parse_known_args(args=args)
if namespace.action is action_list:
list_ns, extra_args = list_custom_args_parse(namespace, extra_args)
namespace = argparse.Namespace(**vars(namespace), **vars(list_ns))
namespace.extra_args = extra_args
namespace.parser = parser
namespace.subparsers = subparsers
return namespace
def enable_logs(verbosity, output):
if not verbosity and not output:
log.log_only_cli_module()
return
log.log_to_console(verbosity)
log.log_to_file(output)
def apply_action(func, kwargs):
try:
func(**kwargs)
except helpers.ActionError as e:
if e.message:
logger.error(e.message)
sys.exit(e.code)
except Exception as e:
logger.error('{} error: %s'.format(kwargs['action_repr']), str(e))
sys.exit(1)
def _get_values_for_envvars_init_action(func, spec):
# determine which arguments can be retrieved from environment variables
from_env = getattr(func, "from_env", {})
# get list of arguments that are not defined as environment variables
missing = [
arg for arg, env_name in from_env.items() if env_name not in os.environ
]
# find defaults for missing arguments
defaults = {
k: v
for k, v in zip(
spec.args[-len(spec.defaults) if spec.defaults else 0:],
spec.defaults if spec.defaults else [],
)
if k in missing
}
# Ignored for backward compatibility
#
# update list of missing arguments considering defaults
# missing = [arg.upper() for arg in missing if arg not in defaults]
# if missing:
# parser.error(
# f"environment variables required: {' '.join(missing)}",
# )
# get values for arguments that are defined in environment variables
values_from_env = {
arg: os.environ[env_name]
for arg, env_name in from_env.items()
if arg not in defaults and env_name in os.environ
}
# merge values from the environment variables with values from the defaults
return {**defaults, **values_from_env}
def main(args=None):
if args is None:
args = sys.argv[1:]
arg_parser, parent_parser = setup_parsers()
subs = setup_subparsers(arg_parser, parent_parser)
try:
ns = parse_args(arg_parser, subs, args=args)
except BackendNonApplicableError as err:
sys.exit(err)
enable_logs(ns.verbose, ns.output)
apply_action(ns.action, {**vars(ns)})
if __name__ == '__main__':
main()