# Copyright (C) 2021-2022 VMware Inc.
#
# Author: Shreenidhi Shedi <yesshedi@gmail.com>
#
# This file is part of cloud-init. See LICENSE file for license information.
import logging
from collections import OrderedDict
from typing import Optional
from cloudinit import subp, util
from cloudinit.net import renderer
from cloudinit.net.network_state import NetworkState
LOG = logging.getLogger(__name__)
class CfgParser:
def __init__(self):
self.conf_dict = OrderedDict(
{
"Match": [],
"Link": [],
"Network": [],
"DHCPv4": [],
"DHCPv6": [],
"Address": [],
"Route": {},
}
)
def update_section(self, sec, key, val):
for k in self.conf_dict.keys():
if k == sec:
self.conf_dict[k].append(key + "=" + str(val))
# remove duplicates from list
self.conf_dict[k] = list(dict.fromkeys(self.conf_dict[k]))
self.conf_dict[k].sort()
def update_route_section(self, sec, rid, key, val):
"""
For each route section we use rid as a key, this allows us to isolate
this route from others on subsequent calls.
"""
for k in self.conf_dict.keys():
if k == sec:
if rid not in self.conf_dict[k]:
self.conf_dict[k][rid] = []
self.conf_dict[k][rid].append(key + "=" + str(val))
# remove duplicates from list
self.conf_dict[k][rid] = list(
dict.fromkeys(self.conf_dict[k][rid])
)
self.conf_dict[k][rid].sort()
def get_final_conf(self):
contents = ""
for k, v in sorted(self.conf_dict.items()):
if not v:
continue
if k == "Address":
for e in sorted(v):
contents += "[" + k + "]\n"
contents += e + "\n"
contents += "\n"
elif k == "Route":
for n in sorted(v):
contents += "[" + k + "]\n"
for e in sorted(v[n]):
contents += e + "\n"
contents += "\n"
else:
contents += "[" + k + "]\n"
for e in sorted(v):
contents += e + "\n"
contents += "\n"
return contents
class Renderer(renderer.Renderer):
"""
Renders network information in /etc/systemd/network
This Renderer is currently experimental and doesn't support all the
use cases supported by the other renderers yet.
"""
def __init__(self, config=None):
if not config:
config = {}
self.resolve_conf_fn = config.get(
"resolve_conf_fn", "/etc/systemd/resolved.conf"
)
self.network_conf_dir = config.get(
"network_conf_dir", "/etc/systemd/network/"
)
def generate_match_section(self, iface, cfg: CfgParser):
sec = "Match"
match_dict = {
"name": "Name",
"driver": "Driver",
"mac_address": "MACAddress",
}
if not iface:
return
for k, v in match_dict.items():
if k in iface and iface[k]:
cfg.update_section(sec, v, iface[k])
return iface["name"]
def generate_link_section(self, iface, cfg: CfgParser):
sec = "Link"
if not iface:
return
if "mtu" in iface and iface["mtu"]:
cfg.update_section(sec, "MTUBytes", iface["mtu"])
def parse_routes(self, rid, conf, cfg: CfgParser):
"""
Parse a route and use rid as a key in order to isolate the route from
others in the route dict.
"""
sec = "Route"
route_cfg_map = {
"gateway": "Gateway",
"network": "Destination",
"metric": "Metric",
}
# prefix is derived using netmask by network_state
prefix = ""
if "prefix" in conf:
prefix = "/" + str(conf["prefix"])
for k, v in conf.items():
if k not in route_cfg_map:
continue
if k == "network":
v += prefix
cfg.update_route_section(sec, rid, route_cfg_map[k], v)
def parse_subnets(self, iface, cfg: CfgParser):
dhcp = "no"
sec = "Network"
rid = 0
for e in iface.get("subnets", []):
t = e["type"]
if t == "dhcp4" or t == "dhcp":
if dhcp == "no":
dhcp = "ipv4"
elif dhcp == "ipv6":
dhcp = "yes"
elif t == "dhcp6":
if dhcp == "no":
dhcp = "ipv6"
elif dhcp == "ipv4":
dhcp = "yes"
if "routes" in e and e["routes"]:
for i in e["routes"]:
# Use "r" as a dict key prefix for this route to isolate
# it from other sources of routes
self.parse_routes(f"r{rid}", i, cfg)
rid = rid + 1
if "address" in e:
subnet_cfg_map = {
"address": "Address",
"gateway": "Gateway",
"dns_nameservers": "DNS",
"dns_search": "Domains",
}
for k, v in e.items():
if k == "address":
if "prefix" in e:
v += "/" + str(e["prefix"])
cfg.update_section("Address", subnet_cfg_map[k], v)
elif k == "gateway":
# Use "a" as a dict key prefix for this route to
# isolate it from other sources of routes
cfg.update_route_section(
"Route", f"a{rid}", subnet_cfg_map[k], v
)
rid = rid + 1
elif k == "dns_nameservers" or k == "dns_search":
cfg.update_section(sec, subnet_cfg_map[k], " ".join(v))
cfg.update_section(sec, "DHCP", dhcp)
if dhcp in ["ipv6", "yes"] and isinstance(
iface.get("accept-ra", ""), bool
):
cfg.update_section(sec, "IPv6AcceptRA", iface["accept-ra"])
return dhcp
# This is to accommodate extra keys present in VMware config
def dhcp_domain(self, d, cfg: CfgParser):
for item in ["dhcp4domain", "dhcp6domain"]:
if item not in d:
continue
ret = str(d[item]).casefold()
try:
ret = util.translate_bool(ret)
ret = "yes" if ret else "no"
except ValueError:
if ret != "route":
LOG.warning("Invalid dhcp4domain value - %s", ret)
ret = "no"
if item == "dhcp4domain":
section = "DHCPv4"
else:
section = "DHCPv6"
cfg.update_section(section, "UseDomains", ret)
def parse_dns(self, iface, cfg: CfgParser, ns: NetworkState):
sec = "Network"
dns_cfg_map = {
"search": "Domains",
"nameservers": "DNS",
"addresses": "DNS",
}
dns = iface.get("dns")
if not dns and ns.version == 1:
dns = {
"search": ns.dns_searchdomains,
"nameservers": ns.dns_nameservers,
}
elif not dns and ns.version == 2:
return
for k, v in dns_cfg_map.items():
if k in dns and dns[k]:
cfg.update_section(sec, v, " ".join(dns[k]))
def parse_dhcp_overrides(self, cfg: CfgParser, device, dhcp, version):
dhcp_config_maps = {
"UseDNS": "use-dns",
"UseDomains": "use-domains",
"UseHostname": "use-hostname",
"UseNTP": "use-ntp",
}
if version == "4":
dhcp_config_maps.update(
{
"SendHostname": "send-hostname",
"Hostname": "hostname",
"RouteMetric": "route-metric",
"UseMTU": "use-mtu",
"UseRoutes": "use-routes",
}
)
if f"dhcp{version}-overrides" in device and dhcp in [
"yes",
f"ipv{version}",
]:
dhcp_overrides = device[f"dhcp{version}-overrides"]
for k, v in dhcp_config_maps.items():
if v in dhcp_overrides:
cfg.update_section(f"DHCPv{version}", k, dhcp_overrides[v])
def create_network_file(self, link, conf, nwk_dir):
net_fn_owner = "systemd-network"
LOG.debug("Setting Networking Config for %s", link)
net_fn = nwk_dir + "10-cloud-init-" + link + ".network"
util.write_file(net_fn, conf)
util.chownbyname(net_fn, net_fn_owner, net_fn_owner)
def render_network_state(
self,
network_state: NetworkState,
templates: Optional[dict] = None,
target=None,
) -> None:
network_dir = self.network_conf_dir
if target:
network_dir = subp.target_path(target) + network_dir
util.ensure_dir(network_dir)
ret_dict = self._render_content(network_state)
for k, v in ret_dict.items():
self.create_network_file(k, v, network_dir)
def _render_content(self, ns: NetworkState) -> dict:
ret_dict = {}
for iface in ns.iter_interfaces():
cfg = CfgParser()
link = self.generate_match_section(iface, cfg)
self.generate_link_section(iface, cfg)
dhcp = self.parse_subnets(iface, cfg)
self.parse_dns(iface, cfg, ns)
rid = 0
for route in ns.iter_routes():
# Use "c" as a dict key prefix for this route to isolate it
# from other sources of routes
self.parse_routes(f"c{rid}", route, cfg)
rid = rid + 1
if ns.version == 2:
name: Optional[str] = iface["name"]
# network state doesn't give dhcp domain info
# using ns.config as a workaround here
# Check to see if this interface matches against an interface
# from the network state that specified a set-name directive.
# If there is a device with a set-name directive and it has
# set-name value that matches the current name, then update the
# current name to the device's name. That will be the value in
# the ns.config['ethernets'] dict below.
for dev_name, dev_cfg in ns.config["ethernets"].items():
if "set-name" in dev_cfg:
if dev_cfg.get("set-name") == name:
name = dev_name
break
if name in ns.config["ethernets"]:
device = ns.config["ethernets"][name]
# dhcp{version}domain are extra keys only present in
# VMware config
self.dhcp_domain(device, cfg)
for version in ["4", "6"]:
if (
f"dhcp{version}domain" in device
and "use-domains"
in device.get(f"dhcp{version}-overrides", {})
):
exception = (
f"{name} has both dhcp{version}domain"
f" and dhcp{version}-overrides.use-domains"
f" configured. Use one"
)
raise RuntimeError(exception)
self.parse_dhcp_overrides(cfg, device, dhcp, version)
ret_dict.update({link: cfg.get_final_conf()})
return ret_dict
def available(target=None):
expected = ["ip", "systemctl"]
search = ["/usr/sbin", "/bin"]
for p in expected:
if not subp.which(p, search=search, target=target):
return False
return True