404

[ Avaa Bypassed ]




Upload:

Command:

elspacio@3.17.166.233: ~ $
# Copyright (C) 2017 Canonical Ltd.
#
# Author: Ryan Harper <ryan.harper@canonical.com>
#
# This file is part of cloud-init. See LICENSE file for license information.

import copy
import functools
import logging
from typing import TYPE_CHECKING, Any, Dict, Optional

from cloudinit import safeyaml, util
from cloudinit.net import (
    find_interface_name_from_mac,
    get_interfaces_by_mac,
    ipv4_mask_to_net_prefix,
    ipv6_mask_to_net_prefix,
    is_ip_network,
    is_ipv4_network,
    is_ipv6_address,
    is_ipv6_network,
    net_prefix_to_ipv4_mask,
)

if TYPE_CHECKING:
    from cloudinit.net.renderer import Renderer

LOG = logging.getLogger(__name__)

NETWORK_STATE_VERSION = 1
NETWORK_STATE_REQUIRED_KEYS = {
    1: ["version", "config", "network_state"],
}
NETWORK_V2_KEY_FILTER = [
    "addresses",
    "dhcp4",
    "dhcp4-overrides",
    "dhcp6",
    "dhcp6-overrides",
    "gateway4",
    "gateway6",
    "interfaces",
    "match",
    "mtu",
    "nameservers",
    "renderer",
    "set-name",
    "wakeonlan",
    "accept-ra",
]

NET_CONFIG_TO_V2: Dict[str, Dict[str, Any]] = {
    "bond": {
        "bond-ad-select": "ad-select",
        "bond-arp-interval": "arp-interval",
        "bond-arp-ip-target": "arp-ip-target",
        "bond-arp-validate": "arp-validate",
        "bond-downdelay": "down-delay",
        "bond-fail-over-mac": "fail-over-mac-policy",
        "bond-lacp-rate": "lacp-rate",
        "bond-miimon": "mii-monitor-interval",
        "bond-min-links": "min-links",
        "bond-mode": "mode",
        "bond-num-grat-arp": "gratuitous-arp",
        "bond-primary": "primary",
        "bond-primary-reselect": "primary-reselect-policy",
        "bond-updelay": "up-delay",
        "bond-xmit-hash-policy": "transmit-hash-policy",
    },
    "bridge": {
        "bridge_ageing": "ageing-time",
        "bridge_bridgeprio": "priority",
        "bridge_fd": "forward-delay",
        "bridge_gcint": None,
        "bridge_hello": "hello-time",
        "bridge_maxage": "max-age",
        "bridge_maxwait": None,
        "bridge_pathcost": "path-cost",
        "bridge_portprio": "port-priority",
        "bridge_stp": "stp",
        "bridge_waitport": None,
    },
}


def warn_deprecated_all_devices(dikt: dict) -> None:
    """Warn about deprecations of v2 properties for all devices"""
    if "gateway4" in dikt or "gateway6" in dikt:
        util.deprecate(
            deprecated="The use of `gateway4` and `gateway6`",
            deprecated_version="22.4",
            extra_message="For more info check out: "
            "https://cloudinit.readthedocs.io/en/latest/topics/network-config-format-v2.html",  # noqa: E501
        )


def diff_keys(expected, actual):
    missing = set(expected)
    for key in actual:
        missing.discard(key)
    return missing


class InvalidCommand(Exception):
    pass


def ensure_command_keys(required_keys):
    def wrapper(func):
        @functools.wraps(func)
        def decorator(self, command, *args, **kwargs):
            if required_keys:
                missing_keys = diff_keys(required_keys, command)
                if missing_keys:
                    raise InvalidCommand(
                        "Command missing %s of required keys %s"
                        % (missing_keys, required_keys)
                    )
            return func(self, command, *args, **kwargs)

        return decorator

    return wrapper


class NetworkState:
    def __init__(
        self, network_state: dict, version: int = NETWORK_STATE_VERSION
    ):
        self._network_state = copy.deepcopy(network_state)
        self._version = version
        self.use_ipv6 = network_state.get("use_ipv6", False)
        self._has_default_route = None

    @property
    def config(self) -> dict:
        return self._network_state["config"]

    @property
    def version(self):
        return self._version

    @property
    def dns_nameservers(self):
        try:
            return self._network_state["dns"]["nameservers"]
        except KeyError:
            return []

    @property
    def dns_searchdomains(self):
        try:
            return self._network_state["dns"]["search"]
        except KeyError:
            return []

    @property
    def has_default_route(self):
        if self._has_default_route is None:
            self._has_default_route = self._maybe_has_default_route()
        return self._has_default_route

    def iter_interfaces(self, filter_func=None):
        ifaces = self._network_state.get("interfaces", {})
        for iface in ifaces.values():
            if filter_func is None:
                yield iface
            else:
                if filter_func(iface):
                    yield iface

    def iter_routes(self, filter_func=None):
        for route in self._network_state.get("routes", []):
            if filter_func is not None:
                if filter_func(route):
                    yield route
            else:
                yield route

    def _maybe_has_default_route(self):
        for route in self.iter_routes():
            if self._is_default_route(route):
                return True
        for iface in self.iter_interfaces():
            for subnet in iface.get("subnets", []):
                for route in subnet.get("routes", []):
                    if self._is_default_route(route):
                        return True
        return False

    def _is_default_route(self, route):
        default_nets = ("::", "0.0.0.0")
        return (
            route.get("prefix") == 0 and route.get("network") in default_nets
        )

    @classmethod
    def to_passthrough(cls, network_state: dict) -> "NetworkState":
        """Instantiates a `NetworkState` without interpreting its data.

        That means only `config` and `version` are copied.

        :param network_state: Network state data.
        :return: Instance of `NetworkState`.
        """
        kwargs = {}
        if "version" in network_state:
            kwargs["version"] = network_state["version"]
        return cls({"config": network_state}, **kwargs)


class NetworkStateInterpreter:
    initial_network_state = {
        "interfaces": {},
        "routes": [],
        "dns": {
            "nameservers": [],
            "search": [],
        },
        "use_ipv6": False,
        "config": None,
    }

    def __init__(
        self,
        version=NETWORK_STATE_VERSION,
        config=None,
        renderer: "Optional[Renderer]" = None,
    ):
        self._version = version
        self._config = config
        self._network_state = copy.deepcopy(self.initial_network_state)
        self._network_state["config"] = config
        self._parsed = False
        self._interface_dns_map: dict = {}
        self._renderer = renderer
        self.command_handlers = {
            "bond": self.handle_bond,
            "bonds": self.handle_bonds,
            "bridge": self.handle_bridge,
            "bridges": self.handle_bridges,
            "ethernets": self.handle_ethernets,
            "infiniband": self.handle_infiniband,
            "loopback": self.handle_loopback,
            "nameserver": self.handle_nameserver,
            "physical": self.handle_physical,
            "route": self.handle_route,
            "vlan": self.handle_vlan,
            "vlans": self.handle_vlans,
            "wifis": self.handle_wifis,
        }

    @property
    def network_state(self) -> NetworkState:
        from cloudinit.net.netplan import Renderer as NetplanRenderer

        if self._version == 2 and isinstance(self._renderer, NetplanRenderer):
            LOG.debug("Passthrough netplan v2 config")
            return NetworkState.to_passthrough(self._config)
        return NetworkState(self._network_state, version=self._version)

    @property
    def use_ipv6(self):
        return self._network_state.get("use_ipv6")

    @use_ipv6.setter
    def use_ipv6(self, val):
        self._network_state.update({"use_ipv6": val})

    def dump(self):
        state = {
            "version": self._version,
            "config": self._config,
            "network_state": self._network_state,
        }
        return safeyaml.dumps(state)

    def load(self, state):
        if "version" not in state:
            LOG.error("Invalid state, missing version field")
            raise ValueError("Invalid state, missing version field")

        required_keys = NETWORK_STATE_REQUIRED_KEYS[state["version"]]
        missing_keys = diff_keys(required_keys, state)
        if missing_keys:
            msg = "Invalid state, missing keys: %s" % (missing_keys)
            LOG.error(msg)
            raise ValueError(msg)

        # v1 - direct attr mapping, except version
        for key in [k for k in required_keys if k not in ["version"]]:
            setattr(self, key, state[key])

    def dump_network_state(self):
        return safeyaml.dumps(self._network_state)

    def as_dict(self):
        return {"version": self._version, "config": self._config}

    def parse_config(self, skip_broken=True):
        if self._version == 1:
            self.parse_config_v1(skip_broken=skip_broken)
            self._parsed = True
        elif self._version == 2:
            self.parse_config_v2(skip_broken=skip_broken)
            self._parsed = True

    def parse_config_v1(self, skip_broken=True):
        for command in self._config:
            command_type = command["type"]
            try:
                handler = self.command_handlers[command_type]
            except KeyError as e:
                raise RuntimeError(
                    "No handler found for  command '%s'" % command_type
                ) from e
            try:
                handler(command)
            except InvalidCommand:
                if not skip_broken:
                    raise
                else:
                    LOG.warning(
                        "Skipping invalid command: %s", command, exc_info=True
                    )
                    LOG.debug(self.dump_network_state())
        for interface, dns in self._interface_dns_map.items():
            iface = None
            try:
                iface = self._network_state["interfaces"][interface]
            except KeyError as e:
                raise ValueError(
                    "Nameserver specified for interface {0}, "
                    "but interface {0} does not exist!".format(interface)
                ) from e
            if iface:
                nameservers, search = dns
                iface["dns"] = {
                    "addresses": nameservers,
                    "search": search,
                }

    def parse_config_v2(self, skip_broken=True):
        from cloudinit.net.netplan import Renderer as NetplanRenderer

        if isinstance(self._renderer, NetplanRenderer):
            # Nothing to parse as we are going to perform a Netplan passthrough
            return

        for command_type, command in self._config.items():
            if command_type in ["version", "renderer"]:
                continue
            try:
                handler = self.command_handlers[command_type]
            except KeyError as e:
                raise RuntimeError(
                    "No handler found for command '%s'" % command_type
                ) from e
            try:
                handler(command)
                self._v2_common(command)
            except InvalidCommand:
                if not skip_broken:
                    raise
                else:
                    LOG.warning(
                        "Skipping invalid command: %s", command, exc_info=True
                    )
                    LOG.debug(self.dump_network_state())

    @ensure_command_keys(["name"])
    def handle_loopback(self, command):
        return self.handle_physical(command)

    @ensure_command_keys(["name"])
    def handle_physical(self, command):
        """
        command = {
            'type': 'physical',
            'mac_address': 'c0:d6:9f:2c:e8:80',
            'name': 'eth0',
            'subnets': [
                {'type': 'dhcp4'}
             ],
            'accept-ra': 'true'
        }
        """

        interfaces = self._network_state.get("interfaces", {})
        iface = interfaces.get(command["name"], {})
        for param, val in command.get("params", {}).items():
            iface.update({param: val})

        # convert subnet ipv6 netmask to cidr as needed
        subnets = _normalize_subnets(command.get("subnets"))

        # automatically set 'use_ipv6' if any addresses are ipv6
        if not self.use_ipv6:
            for subnet in subnets:
                if subnet.get("type").endswith("6") or is_ipv6_address(
                    subnet.get("address")
                ):
                    self.use_ipv6 = True
                    break

        accept_ra = command.get("accept-ra", None)
        if accept_ra is not None:
            accept_ra = util.is_true(accept_ra)
        wakeonlan = command.get("wakeonlan", None)
        if wakeonlan is not None:
            wakeonlan = util.is_true(wakeonlan)
        iface.update(
            {
                "name": command.get("name"),
                "type": command.get("type"),
                "mac_address": command.get("mac_address"),
                "inet": "inet",
                "mode": "manual",
                "mtu": command.get("mtu"),
                "address": None,
                "gateway": None,
                "subnets": subnets,
                "accept-ra": accept_ra,
                "wakeonlan": wakeonlan,
            }
        )
        self._network_state["interfaces"].update({command.get("name"): iface})
        self.dump_network_state()

    @ensure_command_keys(["name", "vlan_id", "vlan_link"])
    def handle_vlan(self, command):
        """
        auto eth0.222
        iface eth0.222 inet static
                address 10.10.10.1
                netmask 255.255.255.0
                hwaddress ether BC:76:4E:06:96:B3
                vlan-raw-device eth0
        """
        interfaces = self._network_state.get("interfaces", {})
        self.handle_physical(command)
        iface = interfaces.get(command.get("name"), {})
        iface["vlan-raw-device"] = command.get("vlan_link")
        iface["vlan_id"] = command.get("vlan_id")
        interfaces.update({iface["name"]: iface})

    @ensure_command_keys(["name", "bond_interfaces", "params"])
    def handle_bond(self, command):
        """
        #/etc/network/interfaces
        auto eth0
        iface eth0 inet manual
            bond-master bond0
            bond-mode 802.3ad

        auto eth1
        iface eth1 inet manual
            bond-master bond0
            bond-mode 802.3ad

        auto bond0
        iface bond0 inet static
             address 192.168.0.10
             gateway 192.168.0.1
             netmask 255.255.255.0
             bond-slaves none
             bond-mode 802.3ad
             bond-miimon 100
             bond-downdelay 200
             bond-updelay 200
             bond-lacp-rate 4
        """

        self.handle_physical(command)
        interfaces = self._network_state.get("interfaces")
        iface = interfaces.get(command.get("name"), {})
        for param, val in command.get("params").items():
            iface.update({param: val})
        iface.update({"bond-slaves": "none"})
        self._network_state["interfaces"].update({iface["name"]: iface})

        # handle bond slaves
        for ifname in command.get("bond_interfaces"):
            if ifname not in interfaces:
                cmd = {
                    "name": ifname,
                    "type": "bond",
                }
                # inject placeholder
                self.handle_physical(cmd)

            interfaces = self._network_state.get("interfaces", {})
            bond_if = interfaces.get(ifname)
            bond_if["bond-master"] = command.get("name")
            # copy in bond config into slave
            for param, val in command.get("params").items():
                bond_if.update({param: val})
            self._network_state["interfaces"].update({ifname: bond_if})

    @ensure_command_keys(["name", "bridge_interfaces"])
    def handle_bridge(self, command):
        """
            auto br0
            iface br0 inet static
                    address 10.10.10.1
                    netmask 255.255.255.0
                    bridge_ports eth0 eth1
                    bridge_stp off
                    bridge_fd 0
                    bridge_maxwait 0

        bridge_params = [
            "bridge_ports",
            "bridge_ageing",
            "bridge_bridgeprio",
            "bridge_fd",
            "bridge_gcint",
            "bridge_hello",
            "bridge_hw",
            "bridge_maxage",
            "bridge_maxwait",
            "bridge_pathcost",
            "bridge_portprio",
            "bridge_stp",
            "bridge_waitport",
        ]
        """

        # find one of the bridge port ifaces to get mac_addr
        # handle bridge_slaves
        interfaces = self._network_state.get("interfaces", {})
        for ifname in command.get("bridge_interfaces"):
            if ifname in interfaces:
                continue

            cmd = {
                "name": ifname,
            }
            # inject placeholder
            self.handle_physical(cmd)

        interfaces = self._network_state.get("interfaces", {})
        self.handle_physical(command)
        iface = interfaces.get(command.get("name"), {})
        iface["bridge_ports"] = command["bridge_interfaces"]
        for param, val in command.get("params", {}).items():
            iface.update({param: val})

        # convert value to boolean
        bridge_stp = iface.get("bridge_stp")
        if bridge_stp is not None and not isinstance(bridge_stp, bool):
            if bridge_stp in ["on", "1", 1]:
                bridge_stp = True
            elif bridge_stp in ["off", "0", 0]:
                bridge_stp = False
            else:
                raise ValueError(
                    "Cannot convert bridge_stp value ({stp}) to"
                    " boolean".format(stp=bridge_stp)
                )
            iface.update({"bridge_stp": bridge_stp})

        interfaces.update({iface["name"]: iface})

    @ensure_command_keys(["name"])
    def handle_infiniband(self, command):
        self.handle_physical(command)

    def _parse_dns(self, command):
        nameservers = []
        search = []
        if "address" in command:
            addrs = command["address"]
            if not isinstance(addrs, list):
                addrs = [addrs]
            for addr in addrs:
                nameservers.append(addr)
        if "search" in command:
            paths = command["search"]
            if not isinstance(paths, list):
                paths = [paths]
            for path in paths:
                search.append(path)
        return nameservers, search

    @ensure_command_keys(["address"])
    def handle_nameserver(self, command):
        dns = self._network_state.get("dns")
        nameservers, search = self._parse_dns(command)
        if "interface" in command:
            self._interface_dns_map[command["interface"]] = (
                nameservers,
                search,
            )
        else:
            dns["nameservers"].extend(nameservers)
            dns["search"].extend(search)

    @ensure_command_keys(["address"])
    def _handle_individual_nameserver(self, command, iface):
        _iface = self._network_state.get("interfaces")
        nameservers, search = self._parse_dns(command)
        _iface[iface]["dns"] = {"nameservers": nameservers, "search": search}

    @ensure_command_keys(["destination"])
    def handle_route(self, command):
        self._network_state["routes"].append(_normalize_route(command))

    # V2 handlers
    def handle_bonds(self, command):
        """
        v2_command = {
          bond0: {
            'interfaces': ['interface0', 'interface1'],
            'parameters': {
               'mii-monitor-interval': 100,
               'mode': '802.3ad',
               'xmit_hash_policy': 'layer3+4'}},
          bond1: {
            'bond-slaves': ['interface2', 'interface7'],
            'parameters': {
                'mode': 1,
            }
          }
        }

        v1_command = {
            'type': 'bond'
            'name': 'bond0',
            'bond_interfaces': [interface0, interface1],
            'params': {
                'bond-mode': '802.3ad',
                'bond_miimon: 100,
                'bond_xmit_hash_policy': 'layer3+4',
            }
        }

        """
        self._handle_bond_bridge(command, cmd_type="bond")

    def handle_bridges(self, command):
        """
        v2_command = {
          br0: {
            'interfaces': ['interface0', 'interface1'],
            'forward-delay': 0,
            'stp': False,
            'maxwait': 0,
          }
        }

        v1_command = {
            'type': 'bridge'
            'name': 'br0',
            'bridge_interfaces': [interface0, interface1],
            'params': {
                'bridge_stp': 'off',
                'bridge_fd: 0,
                'bridge_maxwait': 0
            }
        }

        """
        self._handle_bond_bridge(command, cmd_type="bridge")

    def handle_ethernets(self, command):
        """
        ethernets:
          eno1:
            match:
              macaddress: 00:11:22:33:44:55
              driver: hv_netvsc
            wakeonlan: true
            dhcp4: true
            dhcp6: false
            addresses:
              - 192.168.14.2/24
              - 2001:1::1/64
            gateway4: 192.168.14.1
            gateway6: 2001:1::2
            nameservers:
              search: [foo.local, bar.local]
              addresses: [8.8.8.8, 8.8.4.4]
          lom:
            match:
              driver: ixgbe
            set-name: lom1
            dhcp6: true
            accept-ra: true
          switchports:
            match:
              name: enp2*
            mtu: 1280

        command = {
            'type': 'physical',
            'mac_address': 'c0:d6:9f:2c:e8:80',
            'name': 'eth0',
            'subnets': [
                {'type': 'dhcp4'}
             ]
        }
        """

        # Get the interfaces by MAC address to update an interface's
        # device name to the name of the device that matches a provided
        # MAC address when the set-name directive is not present.
        #
        # Please see https://bugs.launchpad.net/cloud-init/+bug/1855945
        # for more information.
        ifaces_by_mac = get_interfaces_by_mac()

        for eth, cfg in command.items():
            phy_cmd = {
                "type": "physical",
            }
            match = cfg.get("match", {})
            mac_address = match.get("macaddress", None)
            if not mac_address:
                LOG.debug(
                    'NetworkState Version2: missing "macaddress" info '
                    "in config entry: %s: %s",
                    eth,
                    str(cfg),
                )
            phy_cmd["mac_address"] = mac_address

            # Determine the name of the interface by using one of the
            # following in the order they are listed:
            #   * set-name
            #   * interface name looked up by mac
            #   * value of "eth" key from this loop
            name = eth
            set_name = cfg.get("set-name")
            if set_name:
                name = set_name
            elif mac_address and ifaces_by_mac:
                lcase_mac_address = mac_address.lower()
                mac = find_interface_name_from_mac(lcase_mac_address)
                if mac:
                    name = mac
            phy_cmd["name"] = name

            driver = match.get("driver", None)
            if driver:
                phy_cmd["params"] = {"driver": driver}
            for key in ["mtu", "match", "wakeonlan", "accept-ra"]:
                if key in cfg:
                    phy_cmd[key] = cfg[key]

            warn_deprecated_all_devices(cfg)

            subnets = self._v2_to_v1_ipcfg(cfg)
            if len(subnets) > 0:
                phy_cmd.update({"subnets": subnets})

            LOG.debug("v2(ethernets) -> v1(physical):\n%s", phy_cmd)
            self.handle_physical(phy_cmd)

    def handle_vlans(self, command):
        """
        v2_vlans = {
            'eth0.123': {
                'id': 123,
                'link': 'eth0',
                'dhcp4': True,
            }
        }

        v1_command = {
            'type': 'vlan',
            'name': 'eth0.123',
            'vlan_link': 'eth0',
            'vlan_id': 123,
            'subnets': [{'type': 'dhcp4'}],
        }
        """
        for vlan, cfg in command.items():
            vlan_cmd = {
                "type": "vlan",
                "name": vlan,
                "vlan_id": cfg.get("id"),
                "vlan_link": cfg.get("link"),
            }
            if "mtu" in cfg:
                vlan_cmd["mtu"] = cfg["mtu"]
            warn_deprecated_all_devices(cfg)
            subnets = self._v2_to_v1_ipcfg(cfg)
            if len(subnets) > 0:
                vlan_cmd.update({"subnets": subnets})
            LOG.debug("v2(vlans) -> v1(vlan):\n%s", vlan_cmd)
            self.handle_vlan(vlan_cmd)

    def handle_wifis(self, command):
        LOG.warning(
            "Wifi configuration is only available to distros with"
            " netplan rendering support."
        )

    def _v2_common(self, cfg) -> None:
        LOG.debug("v2_common: handling config:\n%s", cfg)
        for iface, dev_cfg in cfg.items():
            if "set-name" in dev_cfg:
                set_name_iface = dev_cfg.get("set-name")
                if set_name_iface:
                    iface = set_name_iface
            if "nameservers" in dev_cfg:
                search = dev_cfg.get("nameservers").get("search", [])
                dns = dev_cfg.get("nameservers").get("addresses", [])
                name_cmd = {"type": "nameserver"}
                if len(search) > 0:
                    name_cmd.update({"search": search})
                if len(dns) > 0:
                    name_cmd.update({"address": dns})
                self.handle_nameserver(name_cmd)

                mac_address: Optional[str] = dev_cfg.get("match", {}).get(
                    "macaddress"
                )
                if mac_address:
                    real_if_name = find_interface_name_from_mac(mac_address)
                    if real_if_name:
                        iface = real_if_name

                self._handle_individual_nameserver(name_cmd, iface)

    def _handle_bond_bridge(self, command, cmd_type=None):
        """Common handler for bond and bridge types"""

        # inverse mapping for v2 keynames to v1 keynames
        v2key_to_v1 = dict(
            (v, k) for k, v in NET_CONFIG_TO_V2.get(cmd_type).items()
        )

        for item_name, item_cfg in command.items():
            item_params = dict(
                (key, value)
                for (key, value) in item_cfg.items()
                if key not in NETWORK_V2_KEY_FILTER
            )
            # We accept both spellings (as netplan does).  LP: #1756701
            # Normalize internally to the new spelling:
            params = item_params.get("parameters", {})
            grat_value = params.pop("gratuitious-arp", None)
            if grat_value:
                params["gratuitous-arp"] = grat_value

            v1_cmd = {
                "type": cmd_type,
                "name": item_name,
                cmd_type + "_interfaces": item_cfg.get("interfaces"),
                "params": dict((v2key_to_v1[k], v) for k, v in params.items()),
            }
            if "mtu" in item_cfg:
                v1_cmd["mtu"] = item_cfg["mtu"]

            warn_deprecated_all_devices(item_cfg)
            subnets = self._v2_to_v1_ipcfg(item_cfg)
            if len(subnets) > 0:
                v1_cmd.update({"subnets": subnets})

            LOG.debug("v2(%s) -> v1(%s):\n%s", cmd_type, cmd_type, v1_cmd)
            if cmd_type == "bridge":
                self.handle_bridge(v1_cmd)
            elif cmd_type == "bond":
                self.handle_bond(v1_cmd)
            else:
                raise ValueError(
                    "Unknown command type: {cmd_type}".format(
                        cmd_type=cmd_type
                    )
                )

    def _v2_to_v1_ipcfg(self, cfg):
        """Common ipconfig extraction from v2 to v1 subnets array."""

        def _add_dhcp_overrides(overrides, subnet):
            if "route-metric" in overrides:
                subnet["metric"] = overrides["route-metric"]

        subnets = []
        if cfg.get("dhcp4"):
            subnet = {"type": "dhcp4"}
            _add_dhcp_overrides(cfg.get("dhcp4-overrides", {}), subnet)
            subnets.append(subnet)
        if cfg.get("dhcp6"):
            subnet = {"type": "dhcp6"}
            self.use_ipv6 = True
            _add_dhcp_overrides(cfg.get("dhcp6-overrides", {}), subnet)
            subnets.append(subnet)

        gateway4 = None
        gateway6 = None
        nameservers = {}
        for address in cfg.get("addresses", []):
            subnet = {
                "type": "static",
                "address": address,
            }

            if ":" in address:
                if "gateway6" in cfg and gateway6 is None:
                    gateway6 = cfg.get("gateway6")
                    subnet.update({"gateway": gateway6})
            else:
                if "gateway4" in cfg and gateway4 is None:
                    gateway4 = cfg.get("gateway4")
                    subnet.update({"gateway": gateway4})

            if "nameservers" in cfg and not nameservers:
                addresses = cfg.get("nameservers").get("addresses")
                if addresses:
                    nameservers["dns_nameservers"] = addresses
                search = cfg.get("nameservers").get("search")
                if search:
                    nameservers["dns_search"] = search
                subnet.update(nameservers)

            subnets.append(subnet)

        routes = []
        for route in cfg.get("routes", []):
            routes.append(
                _normalize_route(
                    {
                        "destination": route.get("to"),
                        "gateway": route.get("via"),
                        "metric": route.get("metric"),
                    }
                )
            )

        # v2 routes are bound to the interface, in v1 we add them under
        # the first subnet since there isn't an equivalent interface level.
        if len(subnets) and len(routes):
            subnets[0]["routes"] = routes

        return subnets


def _normalize_subnet(subnet):
    # Prune all keys with None values.
    subnet = copy.deepcopy(subnet)
    normal_subnet = dict((k, v) for k, v in subnet.items() if v)

    if subnet.get("type") in ("static", "static6"):
        normal_subnet.update(
            _normalize_net_keys(
                normal_subnet,
                address_keys=(
                    "address",
                    "ip_address",
                ),
            )
        )
    normal_subnet["routes"] = [
        _normalize_route(r) for r in subnet.get("routes", [])
    ]

    def listify(snet, name):
        if name in snet and not isinstance(snet[name], list):
            snet[name] = snet[name].split()

    for k in ("dns_search", "dns_nameservers"):
        listify(normal_subnet, k)

    return normal_subnet


def _normalize_net_keys(network, address_keys=()):
    """Normalize dictionary network keys returning prefix and address keys.

    @param network: A dict of network-related definition containing prefix,
        netmask and address_keys.
    @param address_keys: A tuple of keys to search for representing the address
        or cidr. The first address_key discovered will be used for
        normalization.

    @returns: A dict containing normalized prefix and matching addr_key.
    """
    net = {k: v for k, v in network.items() if v or v == 0}
    addr_key = None
    for key in address_keys:
        if net.get(key):
            addr_key = key
            break
    if not addr_key:
        message = "No config network address keys [%s] found in %s" % (
            ",".join(address_keys),
            network,
        )
        LOG.error(message)
        raise ValueError(message)

    addr = str(net.get(addr_key))
    if not is_ip_network(addr):
        LOG.error("Address %s is not a valid ip network", addr)
        raise ValueError(f"Address {addr} is not a valid ip address")

    ipv6 = is_ipv6_network(addr)
    ipv4 = is_ipv4_network(addr)

    netmask = net.get("netmask")
    if "/" in addr:
        addr_part, _, maybe_prefix = addr.partition("/")
        net[addr_key] = addr_part
        if ipv6:
            # this supports input of ffff:ffff:ffff::
            prefix = ipv6_mask_to_net_prefix(maybe_prefix)
        elif ipv4:
            # this supports input of 255.255.255.0
            prefix = ipv4_mask_to_net_prefix(maybe_prefix)
        else:
            # In theory this never happens, is_ip_network() should catch all
            # invalid networks
            LOG.error("Address %s is not a valid ip network", addr)
            raise ValueError(f"Address {addr} is not a valid ip address")
    elif "prefix" in net:
        prefix = int(net["prefix"])
    elif netmask and ipv4:
        prefix = ipv4_mask_to_net_prefix(netmask)
    elif netmask and ipv6:
        prefix = ipv6_mask_to_net_prefix(netmask)
    else:
        prefix = 64 if ipv6 else 24

    if "prefix" in net and str(net["prefix"]) != str(prefix):
        LOG.warning(
            "Overwriting existing 'prefix' with '%s' in network info: %s",
            prefix,
            net,
        )
    net["prefix"] = prefix

    if ipv6:
        # TODO: we could/maybe should add this back with the very uncommon
        # 'netmask' for ipv6.  We need a 'net_prefix_to_ipv6_mask' for that.
        if "netmask" in net:
            del net["netmask"]
    elif ipv4:
        net["netmask"] = net_prefix_to_ipv4_mask(net["prefix"])

    return net


def _normalize_route(route):
    """normalize a route.
    return a dictionary with only:
       'type': 'route' (only present if it was present in input)
       'network': the network portion of the route as a string.
       'prefix': the network prefix for address as an integer.
       'metric': integer metric (only if present in input).
       'netmask': netmask (string) equivalent to prefix iff network is ipv4.
    """
    # Prune None-value keys.  Specifically allow 0 (a valid metric).
    normal_route = dict(
        (k, v) for k, v in route.items() if v not in ("", None)
    )
    if "destination" in normal_route:
        normal_route["network"] = normal_route["destination"]
        del normal_route["destination"]

    normal_route.update(
        _normalize_net_keys(
            normal_route, address_keys=("network", "destination")
        )
    )

    metric = normal_route.get("metric")
    if metric:
        try:
            normal_route["metric"] = int(metric)
        except ValueError as e:
            raise TypeError(
                "Route config metric {} is not an integer".format(metric)
            ) from e
    return normal_route


def _normalize_subnets(subnets):
    if not subnets:
        subnets = []
    return [_normalize_subnet(s) for s in subnets]


def parse_net_config_data(
    net_config: dict,
    skip_broken: bool = True,
    renderer=None,  # type: Optional[Renderer]
) -> NetworkState:
    """Parses the config, returns NetworkState object

    :param net_config: curtin network config dict
    """
    state = None
    version = net_config.get("version")
    config = net_config.get("config")
    if version == 2:
        # v2 does not have explicit 'config' key so we
        # pass the whole net-config as-is
        config = net_config

    if version and config is not None:
        nsi = NetworkStateInterpreter(
            version=version, config=config, renderer=renderer
        )
        nsi.parse_config(skip_broken=skip_broken)
        state = nsi.network_state

    if not state:
        raise RuntimeError(
            "No valid network_state object created from network config. "
            "Did you specify the correct version? Network config:\n"
            f"{net_config}"
        )

    return state

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
netops Folder 0755
__init__.py File 42.24 KB 0644
activators.py File 8.77 KB 0644
bsd.py File 8.43 KB 0644
cmdline.py File 9.04 KB 0644
dhcp.py File 21.06 KB 0644
eni.py File 21.26 KB 0644
ephemeral.py File 14.33 KB 0644
freebsd.py File 3.25 KB 0644
netbsd.py File 1.41 KB 0644
netplan.py File 18.87 KB 0644
network_manager.py File 19.06 KB 0644
network_state.py File 35.77 KB 0644
networkd.py File 12.19 KB 0644
openbsd.py File 2.47 KB 0644
renderer.py File 1.64 KB 0644
renderers.py File 1.78 KB 0644
sysconfig.py File 42.8 KB 0644
udev.py File 1.39 KB 0644