# Author: Eric Benner <ebenner@vultr.com>
#
# This file is part of cloud-init. See LICENSE file for license information.
import json
import logging
from functools import lru_cache
from requests import exceptions
from cloudinit import dmi, net, subp, url_helper, util
from cloudinit.net.dhcp import NoDHCPLeaseError
from cloudinit.net.ephemeral import EphemeralDHCPv4
# Get LOG
LOG = logging.getLogger(__name__)
@lru_cache()
def get_metadata(
distro, url, timeout, retries, sec_between, agent, tmp_dir=None
):
# Bring up interface (and try untill one works)
exception = RuntimeError("Failed to DHCP")
# Seek iface with DHCP
for iface in get_interface_list():
try:
with EphemeralDHCPv4(
distro,
iface=iface,
connectivity_url_data={"url": url},
):
# Fetch the metadata
v1 = read_metadata(url, timeout, retries, sec_between, agent)
metadata = json.loads(v1)
refactor_metadata(metadata)
return metadata
except (
NoDHCPLeaseError,
subp.ProcessExecutionError,
RuntimeError,
exceptions.RequestException,
) as exc:
LOG.error("DHCP Exception: %s", exc)
exception = exc
raise exception
# Refactor metadata into acceptable format
def refactor_metadata(metadata):
metadata["instance-id"] = metadata["instance-v2-id"]
metadata["local-hostname"] = metadata["hostname"]
region = metadata["region"]["regioncode"]
if "countrycode" in metadata["region"]:
region = metadata["region"]["countrycode"]
metadata["region"] = region.lower()
# Get interface list, sort, and clean
def get_interface_list():
ifaces = []
for iface in net.find_candidate_nics():
# Skip dummy
if "dummy" in iface:
continue
ifaces.append(iface)
return ifaces
# Read the system information from SMBIOS
def get_sysinfo():
return {
"manufacturer": dmi.read_dmi_data("system-manufacturer"),
"subid": dmi.read_dmi_data("system-serial-number"),
}
# Assumes is Vultr is already checked
def is_baremetal():
if get_sysinfo()["manufacturer"] != "Vultr":
return True
return False
# Confirm is Vultr
def is_vultr():
# VC2, VDC, and HFC use DMI
sysinfo = get_sysinfo()
if sysinfo["manufacturer"] == "Vultr":
return True
# Baremetal requires a kernel parameter
if "vultr" in util.get_cmdline().split():
return True
return False
# Read Metadata endpoint
def read_metadata(url, timeout, retries, sec_between, agent):
url = "%s/v1.json" % url
# Announce os details so we can handle non Vultr origin
# images and provide correct vendordata generation.
headers = {"Metadata-Token": "cloudinit", "User-Agent": agent}
response = url_helper.readurl(
url,
timeout=timeout,
retries=retries,
headers=headers,
sec_between=sec_between,
)
if not response.ok():
raise RuntimeError(
"Failed to connect to %s: Code: %s" % url, response.code
)
return response.contents.decode()
# Wrapped for caching
@lru_cache()
def get_interface_map():
return net.get_interfaces_by_mac()
# Convert macs to nics
def get_interface_name(mac):
macs_to_nic = get_interface_map()
if mac not in macs_to_nic:
return None
return macs_to_nic.get(mac)
# Generate network configs
def generate_network_config(interfaces):
network = {
"version": 1,
"config": [
{
"type": "nameserver",
"address": ["108.61.10.10", "2001:19f0:300:1704::6"],
}
],
}
# Prepare interface 0, public
if len(interfaces) > 0:
public = generate_interface(interfaces[0], primary=True)
network["config"].append(public)
# Prepare additional interfaces, private
for i in range(1, len(interfaces)):
interface = interfaces[i]
# Skip interfaces set not to be configured
if interface.get("unconfigured"):
continue
private = generate_interface(interface)
network["config"].append(private)
return network
def generate_interface(interface, primary=False):
interface_name = get_interface_name(interface["mac"])
if not interface_name:
raise RuntimeError(
"Interface: %s could not be found on the system" % interface["mac"]
)
netcfg = {
"name": interface_name,
"type": "physical",
"mac_address": interface["mac"],
}
if primary:
netcfg["accept-ra"] = 1
netcfg["subnets"] = [
{"type": "dhcp", "control": "auto"},
{"type": "ipv6_slaac", "control": "auto"},
]
if not primary:
netcfg["subnets"] = [
{
"type": "static",
"control": "auto",
"address": interface["ipv4"]["address"],
"netmask": interface["ipv4"]["netmask"],
}
]
generate_interface_routes(interface, netcfg)
generate_interface_additional_addresses(interface, netcfg)
# Add config to template
return netcfg
def generate_interface_routes(interface, netcfg):
# Options that may or may not be used
if "mtu" in interface:
netcfg["mtu"] = interface["mtu"]
if "accept-ra" in interface:
netcfg["accept-ra"] = interface["accept-ra"]
if "routes" in interface:
netcfg["subnets"][0]["routes"] = interface["routes"]
def generate_interface_additional_addresses(interface, netcfg):
# Check for additional IP's
additional_count = len(interface["ipv4"]["additional"])
if "ipv4" in interface and additional_count > 0:
for additional in interface["ipv4"]["additional"]:
add = {
"type": "static",
"control": "auto",
"address": additional["address"],
"netmask": additional["netmask"],
}
if "routes" in additional:
add["routes"] = additional["routes"]
netcfg["subnets"].append(add)
# Check for additional IPv6's
additional_count = len(interface["ipv6"]["additional"])
if "ipv6" in interface and additional_count > 0:
for additional in interface["ipv6"]["additional"]:
add = {
"type": "static6",
"control": "auto",
"address": "%s/%s"
% (additional["network"], additional["prefix"]),
}
if "routes" in additional:
add["routes"] = additional["routes"]
netcfg["subnets"].append(add)
# Make required adjustments to the network configs provided
def add_interface_names(netcfg):
for interface in netcfg["config"]:
if interface["type"] != "physical":
continue
interface_name = get_interface_name(interface["mac_address"])
if not interface_name:
raise RuntimeError(
"Interface: %s could not be found on the system"
% interface["mac_address"]
)
interface["name"] = interface_name