import datetime
import ftplib
import functools
import inspect
import os
import re
import shutil
import socket
import warnings
from itertools import chain
from dateutil import parser
def from_env(**param_to_env):
def decorator(f):
spec = inspect.getfullargspec(f)
assert all(
param in chain(spec.args, spec.kwonlyargs)
for param in param_to_env
)
f.from_env = param_to_env
return f
return decorator
class DateTime(datetime.datetime):
def __new__(cls, s):
return parser.parse(s, ignoretz=True)
fromtimestamp = datetime.datetime.fromtimestamp
now = datetime.datetime.now
utcfromtimestamp = datetime.datetime.utcfromtimestamp
def reconnect_ftp(func):
@functools.wraps(func)
def wrapper(ftp, *args, **kwargs):
try:
return func(ftp, *args, **kwargs)
except ftplib.error_temp:
ftp.connect()
return func(ftp, *args, **kwargs)
return wrapper
class FtpError(Exception):
pass
def _handle_ftp_error(func):
@functools.wraps(func)
def wrapper(ftp, *args, **kwargs):
try:
return func(ftp, *args, **kwargs)
except (ftplib.Error, socket.error) as e:
raise FtpError(*e.args)
return wrapper
@functools.lru_cache(maxsize=None)
class Ftp:
ftp = None
def __init__(self, host, login, password, use_ftps, passive_mode=True,
port=21, timeout=10, **_):
self.host = host
self.login = login
self.password = password
self.passive_mode = passive_mode
self.use_ftps = use_ftps
self.port = port
self.timeout = timeout
@property
def _address(self):
return self.login + '@' + self.host + ':' + str(self.port)
def __str__(self):
protocol = 'ftps' if self.use_ftps else 'ftp'
return protocol + '://' + self._address
@_handle_ftp_error
def connect(self):
if self.use_ftps:
ftp = ftplib.FTP_TLS()
else:
ftp = ftplib.FTP()
ftp.connect(self.host, self.port, self.timeout)
ftp.login(self.login, self.password)
if self.use_ftps:
ftp.prot_p()
ftp.set_pasv(self.passive_mode)
self.ftp = ftp
@_handle_ftp_error
@reconnect_ftp
def listdir(self, *args):
return self.ftp.nlst(*args)
@_handle_ftp_error
@reconnect_ftp
def mlistdir(self, *args, **kwargs):
return self.ftp.mlsd(*args, **kwargs)
@_handle_ftp_error
@reconnect_ftp
def retrieve(self, path, destination):
"""
:raises FtpError:
:raises IsNotDirError:
:raises DirNotEmptyError:
"""
if not os.path.isabs(destination):
destination = os.path.abspath(destination)
destination = os.path.join(destination, self._address)
target_name = os.path.join(destination, path.lstrip(os.path.sep))
target_dir = os.path.dirname(target_name)
mkdir(target_dir)
free_space = shutil.disk_usage(target_dir).free
try:
target_size = self.size(path)
except FtpError:
# Skip this step if SIZE command is not supported on the server
pass
else:
if target_size > free_space:
raise FtpError('Disk is full', free_space, target_size)
with open(target_name, 'wb') as w:
self.ftp.retrbinary('RETR ' + path, w.write)
return target_name
@_handle_ftp_error
@reconnect_ftp
def size(self, dir_path):
""" Get the size of the file on the server """
try:
self.ftp.sendcmd('TYPE I')
return self.ftp.size(dir_path)
finally:
# Fallback to the default ASCII mode
self.ftp.sendcmd('TYPE A')
class DirNotEmptyError(FileExistsError):
def __init__(self, message=None):
message = message or 'destination directory exists and it is not empty'
super().__init__(message)
class IsNotDirError(FileExistsError):
def __init__(self, message=None):
message = message or 'destination exists but it is not a directory'
super().__init__(message)
class ActionError(Exception):
def __init__(self, message=None, code=1):
self.message = message
self.code = code
def mkdir(path, check_empty=False):
if os.path.isdir(path):
if check_empty and os.listdir(path):
raise DirNotEmptyError()
elif os.path.exists(path):
raise IsNotDirError()
else:
os.makedirs(path)
def read(fileobj, size=2 ** 20):
for chunk in iter(lambda: fileobj.read(size), b''):
yield chunk
optional_arg_match = re.compile(r'--(?P<key>\w+)(=(?P<value>\S+))?')
def parse_extra_args(arg_list):
# FIXME: move this logic into a special ArgumentParser argument and
# get rid of `extra_args` magic
args = []
kwargs = {}
for arg in arg_list:
match = optional_arg_match.match(arg)
if match:
kw = match.groupdict()
value = kw['value']
if value is None or value.lower() in ('1', 'true', 'yes', 'y'):
value = True
elif value.lower() in ('0', 'false', 'no', 'n'):
value = False
kwargs[kw['key']] = value
else:
args.append(arg)
return args, kwargs
def _fill_out_positional_args(parsed_args, env_vars, positional_parameters):
if len(parsed_args) < len(positional_parameters):
provided_args_it = iter(parsed_args)
result = []
for param in positional_parameters:
if param in env_vars:
result.append(env_vars[param])
else:
try:
result.append(next(provided_args_it))
except StopIteration:
continue
result.extend(provided_args_it)
return result
return parsed_args
def _fill_out_keyword_args(parsed_kwargs, env_vars, keyword_parameters):
return {
**{
arg: env_vars[arg] for arg in keyword_parameters if arg in env_vars
},
**parsed_kwargs,
}
def fill_args_from_env(spec, parsed_args, parsed_kwargs, env_vars):
defaults_len = len(spec.defaults) if spec.defaults else 0
required = spec.args[: -defaults_len]
optional = spec.args[-defaults_len:]
return (
_fill_out_positional_args(parsed_args, env_vars, required),
_fill_out_keyword_args(parsed_kwargs, env_vars, optional),
)
def validate_params(spec, args, kwargs):
"""
Return tuple of lists with missing and unknown arguments of a func
:param spec: func spec
:param args: list of args to validate
:param kwargs: list of args to validate
:return: tuple of missing and unknown lists of arguments
"""
defaults_len = -len(spec.defaults) if spec.defaults else None
required = spec.args[:defaults_len]
required_len = len(required)
args_len = len(args)
varargs = spec.varargs
if not varargs and required_len != args_len or required_len > args_len:
missing = required
else:
missing = []
optional = spec.args[defaults_len:]
unknown = list(set(kwargs) - set(optional))
return missing, unknown
def _formatwarning(message, *_, **__):
return 'Warning: %s%s' % (message, os.linesep)
warnings.formatwarning = _formatwarning
def warning(message, once=False):
if once:
warnings.warn(message)
else:
with warnings.catch_warnings():
warnings.simplefilter('always')
warnings.warn(message)