import logging
import re
import socket
from functools import lru_cache
from ipaddress import (
IPV4LENGTH,
IPV6LENGTH,
IPv4Address,
IPv4Network,
IPv6Address,
IPv6Network,
ip_network,
)
from typing import Iterable, Optional, Tuple, Union
import psutil
from defence360agent.utils import get_kernel_module_parameter
from im360.utils.validate import IP, IPVersion
logger = logging.getLogger(__name__)
DNS_IP_TEMPLATE = re.compile(rb"^nameserver ([\w.:]+)", re.M)
TCP, UDP, ALL = "tcp", "udp", "all"
IN, OUT = "in", "out"
IPNetwork = Union[IPv4Network, IPv6Network]
RESOLV_CONF_FILE = "/etc/resolv.conf"
def local_dns_from_resolv_conf(ip_version: Optional[IPVersion] = None):
try:
with open(RESOLV_CONF_FILE, "rb") as f:
content = f.read()
except OSError:
return []
for ip in DNS_IP_TEMPLATE.findall(content):
ip = ip.decode("ascii") # regex guarantees ascii-only chars here
if IP.is_valid_ip(ip):
if (
ip_version is not None and IP.type_of(ip) == ip_version
) or ip_version is None:
yield ip
def local_ip_addresses() -> Iterable[Union[IPv4Address, IPv6Address]]:
"""Return a list of IP address assigned to local interfaces"""
for ips in psutil.net_if_addrs().values():
for ip in ips:
if ip.family == socket.AF_INET:
yield IPv4Address(ip.address)
elif ip.family == socket.AF_INET6:
# for some ipv6 addresses psutil returns them with interface
# name, like 'fe80::e9ff:fef3:da37%eth1'.
yield IPv6Address(ip.address.split("%", maxsplit=1)[0])
def listening_ports(proto):
"""
Return listening ports for ipv4 services
:param proto: tcp/udp
:return:
"""
assert proto in (TCP, UDP)
if proto == TCP:
kind = "tcp4"
else:
kind = "udp4"
return set(
c.laddr[1]
for c in psutil.net_connections(kind)
if (c.laddr[0] != "127.0.0.1" and c.status == psutil.CONN_LISTEN)
)
@lru_cache(maxsize=1)
def is_ipv6_enabled():
"""
Checks whether ipv6 kernel module is not disabled or absent
https://www.kernel.org/doc/Documentation/networking/ipv6.txt
:return:
"""
try:
disable = get_kernel_module_parameter(
module_name="ipv6", parameter="disable"
)
# '0' means false, '1' means true
return disable == "0"
except ValueError as e:
logger.warning(e)
# ipv6 module is absent
return False
def _signed_to_unsigned(ip: int):
return int.from_bytes(
ip.to_bytes(8, "big", signed=True), "big", signed=False
)
def _prefix_from_mask(ip_int, prefix_maxlength):
"""Return prefix length from the bitwise netmask.
Copied from ipaddress private method
Args:
ip_int: An integer, the netmask in expanded bitwise format
Returns:
An integer, the prefix length.
Raises:
ValueError: If the input intermingles zeroes & ones
"""
if ip_int == 0:
trailing_zeroes = prefix_maxlength
else:
trailing_zeroes = min(
prefix_maxlength, (~ip_int & (ip_int - 1)).bit_length()
)
prefixlen = prefix_maxlength - trailing_zeroes
leading_ones = ip_int >> trailing_zeroes
all_ones = (1 << prefixlen) - 1
if leading_ones != all_ones:
byteslen = prefix_maxlength // 8
details = ip_int.to_bytes(byteslen, "big")
msg = "Netmask pattern %r mixes zeroes & ones"
raise ValueError(msg % details)
return prefixlen
def pack_ip_address(ip_address: Union[IPv4Address, IPv6Address]):
if ip_address.version == 6:
return int.from_bytes(ip_address.packed[:8], "big", signed=True)
else:
return int(ip_address)
def pack_ip_network(
ip_network: Union[IPv4Network, IPv6Network]
) -> Tuple[int, int, int]:
net = pack_ip_address(ip_network.network_address)
mask = pack_ip_address(ip_network.netmask)
return net, mask, ip_network.version
def unpack_ip_network(net: int, mask: int, version: int) -> IPNetwork:
if version == 6:
net = _signed_to_unsigned(net) << 64
mask = _signed_to_unsigned(mask) << 64
prefix = _prefix_from_mask(mask, IPV6LENGTH)
return IPv6Network((net, prefix))
else:
prefix = _prefix_from_mask(mask, IPV4LENGTH)
return IPv4Network((net, prefix))
def is_net(ip: str) -> Union[None, IPv4Network, IPv6Network]:
try:
return ip_network(ip)
except ValueError:
return None