# Copyright (C) 2007-2012 Red Hat
# see file 'COPYING' for use and warranty information
#
# policygentool is a tool for the initial generation of SELinux policy
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of
# the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# 02111-1307 USA
#
#
import os
import sys
import stat
import re
import sepolicy
from sepolicy import get_all_types, get_all_attributes, get_all_roles
import time
import platform
from .templates import executable
from .templates import boolean
from .templates import etc_rw
from .templates import unit_file
from .templates import var_cache
from .templates import var_spool
from .templates import var_lib
from .templates import var_log
from .templates import var_run
from .templates import tmp
from .templates import rw
from .templates import network
from .templates import script
from .templates import spec
from .templates import user
import sepolgen.interfaces as interfaces
import sepolgen.defaults as defaults
##
## I18N
##
PROGNAME = "selinux-python"
try:
import gettext
kwargs = {}
if sys.version_info < (3,):
kwargs['unicode'] = True
gettext.install(PROGNAME,
localedir="/usr/share/locale",
codeset='utf-8',
**kwargs)
except:
try:
import builtins
builtins.__dict__['_'] = str
except ImportError:
import __builtin__
__builtin__.__dict__['_'] = unicode
def get_rpm_nvr_from_header(hdr):
'Given an RPM header return the package NVR as a string'
name = hdr['name']
version = hdr['version']
release = hdr['release']
release_version = version + "-" + release.split(".")[0]
os_version = release.split(".")[1]
return [name, release_version, os_version]
def get_rpm_nvr_list(package):
try:
import rpm
nvr = None
ts = rpm.ts()
mi = ts.dbMatch(rpm.RPMTAG_NAME, package)
for h in mi:
nvr = get_rpm_nvr_from_header(h)
break
except:
print(("Failed to retrieve rpm info for %s") % package)
nvr = None
return nvr
def get_all_ports():
dict = {}
for p in sepolicy.info(sepolicy.PORT):
if p['type'] == "reserved_port_t" or \
p['type'] == "port_t" or \
p['type'] == "hi_reserved_port_t" or \
p['type'] == "ephemeral_port_t" or \
p['type'] == "unreserved_port_t":
continue
dict[(p['low'], p['high'], p['protocol'])] = (p['type'], p.get('range'))
return dict
def get_all_users():
users = [x['name'] for x in sepolicy.info(sepolicy.USER)]
users.remove("system_u")
users.remove("root")
users.sort()
return users
ALL = 0
RESERVED = 1
UNRESERVED = 2
PORTS = 3
ADMIN_TRANSITION_INTERFACE = "_admin$"
USER_TRANSITION_INTERFACE = "_role$"
DAEMON = 0
DBUS = 1
INETD = 2
CGI = 3
SANDBOX = 4
USER = 5
EUSER = 6
TUSER = 7
XUSER = 8
LUSER = 9
AUSER = 10
RUSER = 11
NEWTYPE = 12
poltype = {}
poltype[DAEMON] = _("Standard Init Daemon")
poltype[DBUS] = _("DBUS System Daemon")
poltype[INETD] = _("Internet Services Daemon")
poltype[CGI] = _("Web Application/Script (CGI)")
poltype[SANDBOX] = _("Sandbox")
poltype[USER] = _("User Application")
poltype[EUSER] = _("Existing Domain Type")
poltype[TUSER] = _("Minimal Terminal Login User Role")
poltype[XUSER] = _("Minimal X Windows Login User Role")
poltype[LUSER] = _("Desktop Login User Role")
poltype[AUSER] = _("Administrator Login User Role")
poltype[RUSER] = _("Confined Root Administrator Role")
poltype[NEWTYPE] = _("Module information for a new type")
def get_poltype_desc():
keys = poltype.keys()
keys.sort()
msg = _("Valid Types:\n")
for k in keys:
msg += "%2s: %s\n" % (k, poltype[k])
return msg
APPLICATIONS = [DAEMON, DBUS, INETD, USER, CGI]
USERS = [XUSER, TUSER, LUSER, AUSER, RUSER]
def verify_ports(ports):
if ports == "":
return []
max_port = 2 ** 16
try:
temp = []
for a in ports.split(","):
r = a.split("-")
if len(r) > 2:
raise ValueError
if len(r) == 1:
begin = int(r[0])
end = int(r[0])
else:
begin = int(r[0])
end = int(r[1])
if begin > end:
raise ValueError
for p in range(begin, end + 1):
if p < 1 or p > max_port:
raise ValueError
temp.append(p)
return temp
except ValueError:
raise ValueError(_("Ports must be numbers or ranges of numbers from 1 to %d ") % max_port)
class policy:
def __init__(self, name, type):
self.rpms = []
self.ports = {}
self.all_roles = get_all_roles()
self.types = []
if type not in poltype:
raise ValueError(_("You must enter a valid policy type"))
if not name:
raise ValueError(_("You must enter a name for your policy module for your '%s'.") % poltype[type])
try:
self.ports = get_all_ports()
except ValueError as e:
print("Can not get port types, must be root for this information")
except RuntimeError as e:
print("Can not get port types", e)
self.symbols = {}
self.symbols["openlog"] = "set_use_kerberos(True)"
self.symbols["openlog"] = "set_use_kerb_rcache(True)"
self.symbols["openlog"] = "set_use_syslog(True)"
self.symbols["gethostby"] = "set_use_resolve(True)"
self.symbols["getaddrinfo"] = "set_use_resolve(True)"
self.symbols["getnameinfo"] = "set_use_resolve(True)"
self.symbols["krb"] = "set_use_kerberos(True)"
self.symbols["gss_accept_sec_context"] = "set_manage_krb5_rcache(True)"
self.symbols["krb5_verify_init_creds"] = "set_manage_krb5_rcache(True)"
self.symbols["krb5_rd_req"] = "set_manage_krb5_rcache(True)"
self.symbols["__syslog_chk"] = "set_use_syslog(True)"
self.symbols["getpwnam"] = "set_use_uid(True)"
self.symbols["getpwuid"] = "set_use_uid(True)"
self.symbols["dbus_"] = "set_use_dbus(True)"
self.symbols["pam_"] = "set_use_pam(True)"
self.symbols["pam_"] = "set_use_audit(True)"
self.symbols["fork"] = "add_process('fork')"
self.symbols["transition"] = "add_process('transition')"
self.symbols["sigchld"] = "add_process('sigchld')"
self.symbols["sigkill"] = "add_process('sigkill')"
self.symbols["sigstop"] = "add_process('sigstop')"
self.symbols["signull"] = "add_process('signull')"
self.symbols["ptrace"] = "add_process('ptrace')"
self.symbols["getsched"] = "add_process('getsched')"
self.symbols["setsched"] = "add_process('setsched')"
self.symbols["getsession"] = "add_process('getsession')"
self.symbols["getpgid"] = "add_process('getpgid')"
self.symbols["setpgid"] = "add_process('setpgid')"
self.symbols["getcap"] = "add_process('getcap')"
self.symbols["setcap"] = "add_process('setcap')"
self.symbols["share"] = "add_process('share')"
self.symbols["getattr"] = "add_process('getattr')"
self.symbols["setexec"] = "add_process('setexec')"
self.symbols["setfscreate"] = "add_process('setfscreate')"
self.symbols["noatsecure"] = "add_process('noatsecure')"
self.symbols["siginh"] = "add_process('siginh')"
self.symbols["kill"] = "add_process('signal_perms')"
self.symbols["setrlimit"] = "add_process('setrlimit')"
self.symbols["rlimitinh"] = "add_process('rlimitinh')"
self.symbols["dyntransition"] = "add_process('dyntransition')"
self.symbols["setcurrent"] = "add_process('setcurrent')"
self.symbols["execmem"] = "add_process('execmem')"
self.symbols["execstack"] = "add_process('execstack')"
self.symbols["execheap"] = "add_process('execheap')"
self.symbols["setkeycreate"] = "add_process('setkeycreate')"
self.symbols["setsockcreate"] = "add_process('setsockcreate')"
self.symbols["chown"] = "add_capability('chown')"
self.symbols["dac_override"] = "add_capability('dac_override')"
self.symbols["dac_read_search"] = "add_capability('dac_read_search')"
self.symbols["fowner"] = "add_capability('fowner')"
self.symbols["fsetid"] = "add_capability('fsetid')"
self.symbols["setgid"] = "add_capability('setgid')"
self.symbols["setegid"] = "add_capability('setgid')"
self.symbols["setresgid"] = "add_capability('setgid')"
self.symbols["setregid"] = "add_capability('setgid')"
self.symbols["setresuid"] = "add_capability('setuid')"
self.symbols["setuid"] = "add_capability('setuid')"
self.symbols["seteuid"] = "add_capability('setuid')"
self.symbols["setreuid"] = "add_capability('setuid')"
self.symbols["setresuid"] = "add_capability('setuid')"
self.symbols["setpcap"] = "add_capability('setpcap')"
self.symbols["linux_immutable"] = "add_capability('linux_immutable')"
self.symbols["net_bind_service"] = "add_capability('net_bind_service')"
self.symbols["net_broadcast"] = "add_capability('net_broadcast')"
self.symbols["net_admin"] = "add_capability('net_admin')"
self.symbols["net_raw"] = "add_capability('net_raw')"
self.symbols["ipc_lock"] = "add_capability('ipc_lock')"
self.symbols["ipc_owner"] = "add_capability('ipc_owner')"
self.symbols["sys_module"] = "add_capability('sys_module')"
self.symbols["sys_rawio"] = "add_capability('sys_rawio')"
self.symbols["chroot"] = "add_capability('sys_chroot')"
self.symbols["sys_chroot"] = "add_capability('sys_chroot')"
self.symbols["sys_ptrace"] = "add_capability('sys_ptrace')"
self.symbols["sys_pacct"] = "add_capability('sys_pacct')"
self.symbols["mount"] = "add_capability('sys_admin')"
self.symbols["unshare"] = "add_capability('sys_admin')"
self.symbols["sys_admin"] = "add_capability('sys_admin')"
self.symbols["sys_boot"] = "add_capability('sys_boot')"
self.symbols["sys_nice"] = "add_capability('sys_nice')"
self.symbols["sys_resource"] = "add_capability('sys_resource')"
self.symbols["sys_time"] = "add_capability('sys_time')"
self.symbols["sys_tty_config"] = "add_capability('sys_tty_config')"
self.symbols["mknod"] = "add_capability('mknod')"
self.symbols["lease"] = "add_capability('lease')"
self.symbols["audit_write"] = "add_capability('audit_write')"
self.symbols["audit_control"] = "add_capability('audit_control')"
self.symbols["setfcap"] = "add_capability('setfcap')"
self.DEFAULT_DIRS = {}
self.DEFAULT_DIRS["/etc"] = ["etc_rw", [], etc_rw]
self.DEFAULT_DIRS["/tmp"] = ["tmp", [], tmp]
self.DEFAULT_DIRS["rw"] = ["rw", [], rw]
self.DEFAULT_DIRS["/usr/lib/systemd/system"] = ["unit_file", [], unit_file]
self.DEFAULT_DIRS["/lib/systemd/system"] = ["unit_file", [], unit_file]
self.DEFAULT_DIRS["/etc/systemd/system"] = ["unit_file", [], unit_file]
self.DEFAULT_DIRS["/var/cache"] = ["var_cache", [], var_cache]
self.DEFAULT_DIRS["/var/lib"] = ["var_lib", [], var_lib]
self.DEFAULT_DIRS["/var/log"] = ["var_log", [], var_log]
self.DEFAULT_DIRS["/var/run"] = ["var_run", [], var_run]
self.DEFAULT_DIRS["/var/spool"] = ["var_spool", [], var_spool]
self.DEFAULT_EXT = {}
self.DEFAULT_EXT["_tmp_t"] = tmp
self.DEFAULT_EXT["_unit_file_t"] = unit_file
self.DEFAULT_EXT["_var_cache_t"] = var_cache
self.DEFAULT_EXT["_var_lib_t"] = var_lib
self.DEFAULT_EXT["_var_log_t"] = var_log
self.DEFAULT_EXT["_var_run_t"] = var_run
self.DEFAULT_EXT["_var_spool_t"] = var_spool
self.DEFAULT_EXT["_port_t"] = network
self.DEFAULT_KEYS = ["/etc", "/var/cache", "/var/log", "/tmp", "rw", "/var/lib", "/var/run", "/var/spool", "/etc/systemd/system", "/usr/lib/systemd/system", "/lib/systemd/system"]
self.DEFAULT_TYPES = (
(self.generate_daemon_types, self.generate_daemon_rules),
(self.generate_dbusd_types, self.generate_dbusd_rules),
(self.generate_inetd_types, self.generate_inetd_rules),
(self.generate_cgi_types, self.generate_cgi_rules),
(self.generate_sandbox_types, self.generate_sandbox_rules),
(self.generate_userapp_types, self.generate_userapp_rules),
(self.generate_existing_user_types, self.generate_existing_user_rules),
(self.generate_min_login_user_types, self.generate_login_user_rules),
(self.generate_x_login_user_types, self.generate_x_login_user_rules),
(self.generate_login_user_types, self.generate_login_user_rules),
(self.generate_admin_user_types, self.generate_login_user_rules),
(self.generate_root_user_types, self.generate_root_user_rules),
(self.generate_new_types, self.generate_new_rules))
if not re.match(r"^[a-zA-Z0-9-_]+$", name):
raise ValueError(_("Name must be alpha numberic with no spaces. Consider using option \"-n MODULENAME\""))
if type == CGI:
self.name = "httpd_%s_script" % name
else:
self.name = name
self.file_name = name
self.capabilities = []
self.processes = []
self.type = type
self.initscript = ""
self.program = None
self.in_tcp = [False, False, False, []]
self.in_udp = [False, False, False, []]
self.out_tcp = [False, False, False, []]
self.out_udp = [False, False, False, []]
self.use_resolve = False
self.use_tmp = False
self.use_uid = False
self.use_syslog = False
self.use_kerberos = False
self.manage_krb5_rcache = False
self.use_pam = False
self.use_dbus = False
self.use_audit = False
self.use_etc = self.type not in [EUSER, NEWTYPE]
self.use_localization = self.type not in [EUSER, NEWTYPE]
self.use_fd = self.type not in [EUSER, NEWTYPE]
self.use_terminal = False
self.use_mail = False
self.booleans = {}
self.files = {}
self.dirs = {}
self.found_tcp_ports = []
self.found_udp_ports = []
self.need_tcp_type = False
self.need_udp_type = False
self.admin_domains = []
self.existing_domains = []
self.transition_domains = []
self.transition_users = []
self.roles = []
def __isnetset(self, l):
return l[ALL] or l[RESERVED] or l[UNRESERVED] or len(l[PORTS]) > 0
def set_admin_domains(self, admin_domains):
self.admin_domains = admin_domains
def set_existing_domains(self, existing_domains):
self.existing_domains = existing_domains
def set_admin_roles(self, roles):
self.roles = roles
def set_transition_domains(self, transition_domains):
self.transition_domains = transition_domains
def set_transition_users(self, transition_users):
self.transition_users = transition_users
def use_in_udp(self):
return self.__isnetset(self.in_udp)
def use_out_udp(self):
return self.__isnetset(self.out_udp)
def use_udp(self):
return self.use_in_udp() or self.use_out_udp()
def use_in_tcp(self):
return self.__isnetset(self.in_tcp)
def use_out_tcp(self):
return self.__isnetset(self.out_tcp)
def use_tcp(self):
return self.use_in_tcp() or self.use_out_tcp()
def use_network(self):
return self.use_tcp() or self.use_udp()
def find_port(self, port, protocol="tcp"):
for begin, end, p in self.ports.keys():
if port >= begin and port <= end and protocol == p:
return self.ports[begin, end, protocol]
return None
def set_program(self, program):
if self.type not in APPLICATIONS:
raise ValueError(_("User Role types can not be assigned executables."))
self.program = program
def set_init_script(self, initscript):
if self.type != DAEMON:
raise ValueError(_("Only Daemon apps can use an init script.."))
self.initscript = initscript
def set_in_tcp(self, all, reserved, unreserved, ports):
self.in_tcp = [all, reserved, unreserved, verify_ports(ports)]
def set_in_udp(self, all, reserved, unreserved, ports):
self.in_udp = [all, reserved, unreserved, verify_ports(ports)]
def set_out_tcp(self, all, ports):
self.out_tcp = [all, False, False, verify_ports(ports)]
def set_out_udp(self, all, ports):
self.out_udp = [all, False, False, verify_ports(ports)]
def set_use_resolve(self, val):
if type(val) is not bool:
raise ValueError(_("use_resolve must be a boolean value "))
self.use_resolve = val
def set_use_syslog(self, val):
if type(val) is not bool:
raise ValueError(_("use_syslog must be a boolean value "))
self.use_syslog = val
def set_use_kerberos(self, val):
if type(val) is not bool:
raise ValueError(_("use_kerberos must be a boolean value "))
self.use_kerberos = val
def set_manage_krb5_rcache(self, val):
if type(val) is not bool:
raise ValueError(_("manage_krb5_rcache must be a boolean value "))
self.manage_krb5_rcache = val
def set_use_pam(self, val):
self.use_pam = (val is True)
def set_use_dbus(self, val):
self.use_dbus = (val is True)
def set_use_audit(self, val):
self.use_audit = (val is True)
def set_use_etc(self, val):
self.use_etc = (val is True)
def set_use_localization(self, val):
self.use_localization = (val is True)
def set_use_fd(self, val):
self.use_fd = (val is True)
def set_use_terminal(self, val):
self.use_terminal = (val is True)
def set_use_mail(self, val):
self.use_mail = (val is True)
def set_use_tmp(self, val):
if self.type in USERS:
raise ValueError(_("USER Types automatically get a tmp type"))
if val:
self.DEFAULT_DIRS["/tmp"][1].append("/tmp")
else:
self.DEFAULT_DIRS["/tmp"][1] = []
def set_use_uid(self, val):
self.use_uid = (val is True)
def generate_uid_rules(self):
if self.use_uid:
return re.sub("TEMPLATETYPE", self.name, executable.te_uid_rules)
else:
return ""
def generate_syslog_rules(self):
if self.use_syslog:
return re.sub("TEMPLATETYPE", self.name, executable.te_syslog_rules)
else:
return ""
def generate_resolve_rules(self):
if self.use_resolve:
return re.sub("TEMPLATETYPE", self.name, executable.te_resolve_rules)
else:
return ""
def generate_kerberos_rules(self):
if self.use_kerberos:
return re.sub("TEMPLATETYPE", self.name, executable.te_kerberos_rules)
else:
return ""
def generate_manage_krb5_rcache_rules(self):
if self.manage_krb5_rcache:
return re.sub("TEMPLATETYPE", self.name, executable.te_manage_krb5_rcache_rules)
else:
return ""
def generate_pam_rules(self):
newte = ""
if self.use_pam:
newte = re.sub("TEMPLATETYPE", self.name, executable.te_pam_rules)
return newte
def generate_audit_rules(self):
newte = ""
if self.use_audit:
newte = re.sub("TEMPLATETYPE", self.name, executable.te_audit_rules)
return newte
def generate_etc_rules(self):
newte = ""
if self.use_etc:
newte = re.sub("TEMPLATETYPE", self.name, executable.te_etc_rules)
return newte
def generate_fd_rules(self):
newte = ""
if self.use_fd:
newte = re.sub("TEMPLATETYPE", self.name, executable.te_fd_rules)
return newte
def generate_localization_rules(self):
newte = ""
if self.use_localization:
newte = re.sub("TEMPLATETYPE", self.name, executable.te_localization_rules)
return newte
def generate_dbus_rules(self):
newte = ""
if self.type != DBUS and self.use_dbus:
newte = re.sub("TEMPLATETYPE", self.name, executable.te_dbus_rules)
return newte
def generate_mail_rules(self):
newte = ""
if self.use_mail:
newte = re.sub("TEMPLATETYPE", self.name, executable.te_mail_rules)
return newte
def generate_network_action(self, protocol, action, port_name):
line = ""
method = "corenet_%s_%s_%s" % (protocol, action, port_name)
if method in sepolicy.get_methods():
line = "%s(%s_t)\n" % (method, self.name)
else:
line = """
gen_require(`
type %s_t;
')
allow %s_t %s_t:%s_socket name_%s;
""" % (port_name, self.name, port_name, protocol, action)
return line
def generate_network_types(self):
for i in self.in_tcp[PORTS]:
rec = self.find_port(int(i), "tcp")
if rec is None:
self.need_tcp_type = True
else:
port_name = rec[0][:-2]
line = self.generate_network_action("tcp", "bind", port_name)
# line = "corenet_tcp_bind_%s(%s_t)\n" % (port_name, self.name)
if line not in self.found_tcp_ports:
self.found_tcp_ports.append(line)
for i in self.out_tcp[PORTS]:
rec = self.find_port(int(i), "tcp")
if rec is None:
self.need_tcp_type = True
else:
port_name = rec[0][:-2]
line = self.generate_network_action("tcp", "connect", port_name)
# line = "corenet_tcp_connect_%s(%s_t)\n" % (port_name, self.name)
if line not in self.found_tcp_ports:
self.found_tcp_ports.append(line)
for i in self.in_udp[PORTS]:
rec = self.find_port(int(i), "udp")
if rec is None:
self.need_udp_type = True
else:
port_name = rec[0][:-2]
line = self.generate_network_action("udp", "bind", port_name)
# line = "corenet_udp_bind_%s(%s_t)\n" % (port_name, self.name)
if line not in self.found_udp_ports:
self.found_udp_ports.append(line)
if self.need_udp_type is True or self.need_tcp_type is True:
return re.sub("TEMPLATETYPE", self.name, network.te_types)
return ""
def __find_path(self, file):
for d in self.DEFAULT_DIRS:
if file.find(d) == 0:
self.DEFAULT_DIRS[d][1].append(file)
return self.DEFAULT_DIRS[d]
self.DEFAULT_DIRS["rw"][1].append(file)
return self.DEFAULT_DIRS["rw"]
def add_capability(self, capability):
if capability not in self.capabilities:
self.capabilities.append(capability)
def set_types(self, types):
self.types = types
def add_process(self, process):
if process not in self.processes:
self.processes.append(process)
def add_boolean(self, name, description):
self.booleans[name] = description
def add_file(self, file):
self.files[file] = self.__find_path(file)
def add_dir(self, file):
self.dirs[file] = self.__find_path(file)
def generate_capabilities(self):
newte = ""
self.capabilities.sort()
if len(self.capabilities) > 0:
newte = "allow %s_t self:capability { %s };\n" % (self.name, " ".join(self.capabilities))
return newte
def generate_process(self):
newte = ""
self.processes.sort()
if len(self.processes) > 0:
newte = "allow %s_t self:process { %s };\n" % (self.name, " ".join(self.processes))
return newte
def generate_network_rules(self):
newte = ""
if self.use_network():
newte = "\n"
newte += re.sub("TEMPLATETYPE", self.name, network.te_network)
if self.use_tcp():
newte += "\n"
newte += re.sub("TEMPLATETYPE", self.name, network.te_tcp)
if self.use_in_tcp():
newte += re.sub("TEMPLATETYPE", self.name, network.te_in_tcp)
if self.need_tcp_type and len(self.in_tcp[PORTS]) > 0:
newte += re.sub("TEMPLATETYPE", self.name, network.te_in_need_port_tcp)
if self.need_tcp_type and len(self.out_tcp[PORTS]) > 0:
newte += re.sub("TEMPLATETYPE", self.name, network.te_out_need_port_tcp)
if self.in_tcp[ALL]:
newte += re.sub("TEMPLATETYPE", self.name, network.te_in_all_ports_tcp)
if self.in_tcp[RESERVED]:
newte += re.sub("TEMPLATETYPE", self.name, network.te_in_reserved_ports_tcp)
if self.in_tcp[UNRESERVED]:
newte += re.sub("TEMPLATETYPE", self.name, network.te_in_unreserved_ports_tcp)
if self.out_tcp[ALL]:
newte += re.sub("TEMPLATETYPE", self.name, network.te_out_all_ports_tcp)
if self.out_tcp[RESERVED]:
newte += re.sub("TEMPLATETYPE", self.name, network.te_out_reserved_ports_tcp)
if self.out_tcp[UNRESERVED]:
newte += re.sub("TEMPLATETYPE", self.name, network.te_out_unreserved_ports_tcp)
for i in self.found_tcp_ports:
newte += i
if self.use_udp():
newte += "\n"
newte += re.sub("TEMPLATETYPE", self.name, network.te_udp)
if self.need_udp_type:
newte += re.sub("TEMPLATETYPE", self.name, network.te_in_need_port_udp)
if self.use_in_udp():
newte += re.sub("TEMPLATETYPE", self.name, network.te_in_udp)
if self.in_udp[ALL]:
newte += re.sub("TEMPLATETYPE", self.name, network.te_in_all_ports_udp)
if self.in_udp[RESERVED]:
newte += re.sub("TEMPLATETYPE", self.name, network.te_in_reserved_ports_udp)
if self.in_udp[UNRESERVED]:
newte += re.sub("TEMPLATETYPE", self.name, network.te_in_unreserved_ports_udp)
for i in self.found_udp_ports:
newte += i
return newte
def generate_transition_rules(self):
newte = ""
for app in self.transition_domains:
tmp = re.sub("TEMPLATETYPE", self.name, user.te_transition_rules)
newte += re.sub("APPLICATION", app, tmp)
if self.type == USER:
for u in self.transition_users:
temp = re.sub("TEMPLATETYPE", self.name, executable.te_run_rules)
newte += re.sub("USER", u.split("_u")[0], temp)
return newte
def generate_admin_rules(self):
newte = ""
if self.type == EUSER:
for d in self.existing_domains:
name = d.split("_t")[0]
role = name + "_r"
for app in self.admin_domains:
tmp = re.sub("TEMPLATETYPE", name, user.te_admin_domain_rules)
if role not in self.all_roles:
tmp = re.sub(role, "system_r", tmp)
newte += re.sub("APPLICATION", app, tmp)
return newte
if self.type == RUSER:
newte += re.sub("TEMPLATETYPE", self.name, user.te_admin_rules)
for app in self.admin_domains:
tmp = re.sub("TEMPLATETYPE", self.name, user.te_admin_domain_rules)
newte += re.sub("APPLICATION", app, tmp)
for u in self.transition_users:
role = u.split("_u")[0]
if (role + "_r") in self.all_roles:
tmp = re.sub("TEMPLATETYPE", self.name, user.te_admin_trans_rules)
newte += re.sub("USER", role, tmp)
return newte
def generate_dbus_if(self):
newif = ""
if self.use_dbus:
newif = re.sub("TEMPLATETYPE", self.name, executable.if_dbus_rules)
return newif
def generate_sandbox_if(self):
newif = ""
if self.type != SANDBOX:
return newif
newif = re.sub("TEMPLATETYPE", self.name, executable.if_sandbox_rules)
return newif
def generate_admin_if(self):
newif = ""
newtypes = ""
if self.initscript != "":
newtypes += re.sub("TEMPLATETYPE", self.name, executable.if_initscript_admin_types)
newif += re.sub("TEMPLATETYPE", self.name, executable.if_initscript_admin)
for d in self.DEFAULT_KEYS:
if len(self.DEFAULT_DIRS[d][1]) > 0:
newtypes += re.sub("TEMPLATETYPE", self.name, self.DEFAULT_DIRS[d][2].if_admin_types)
newif += re.sub("TEMPLATETYPE", self.name, self.DEFAULT_DIRS[d][2].if_admin_rules)
if newif != "":
ret = re.sub("TEMPLATETYPE", self.name, executable.if_begin_admin)
ret += newtypes
ret += re.sub("TEMPLATETYPE", self.name, executable.if_middle_admin)
ret += newif
ret += re.sub("TEMPLATETYPE", self.name, executable.if_end_admin)
return ret
return ""
def generate_cgi_types(self):
return re.sub("TEMPLATETYPE", self.file_name, executable.te_cgi_types)
def generate_sandbox_types(self):
return re.sub("TEMPLATETYPE", self.file_name, executable.te_sandbox_types)
def generate_userapp_types(self):
return re.sub("TEMPLATETYPE", self.name, executable.te_userapp_types)
def generate_inetd_types(self):
return re.sub("TEMPLATETYPE", self.name, executable.te_inetd_types)
def generate_dbusd_types(self):
return re.sub("TEMPLATETYPE", self.name, executable.te_dbusd_types)
def generate_min_login_user_types(self):
return re.sub("TEMPLATETYPE", self.name, user.te_min_login_user_types)
def generate_login_user_types(self):
return re.sub("TEMPLATETYPE", self.name, user.te_login_user_types)
def generate_admin_user_types(self):
return re.sub("TEMPLATETYPE", self.name, user.te_admin_user_types)
def generate_existing_user_types(self):
if len(self.existing_domains) == 0:
raise ValueError(_("'%s' policy modules require existing domains") % poltype[self.type])
newte = re.sub("TEMPLATETYPE", self.name, user.te_existing_user_types)
newte += """gen_require(`"""
for d in self.existing_domains:
newte += """
type %s;""" % d
role = d.split("_t")[0] + "_r"
if role in self.all_roles:
newte += """
role %s;""" % role
newte += """
')
"""
return newte
def generate_x_login_user_types(self):
return re.sub("TEMPLATETYPE", self.name, user.te_x_login_user_types)
def generate_root_user_types(self):
return re.sub("TEMPLATETYPE", self.name, user.te_root_user_types)
def generate_new_types(self):
newte = ""
if len(self.types) == 0:
raise ValueError(_("Type field required"))
for t in self.types:
for i in self.DEFAULT_EXT:
if t.endswith(i):
print(t, t[:-len(i)])
newte += re.sub("TEMPLATETYPE", t[:-len(i)], self.DEFAULT_EXT[i].te_types)
break
if NEWTYPE and newte == "":
default_ext = []
for i in self.DEFAULT_EXT:
default_ext.append(i)
raise ValueError(_("You need to define a new type which ends with: \n %s") % "\n ".join(default_ext))
return newte
def generate_new_rules(self):
return ""
def generate_daemon_types(self):
newte = re.sub("TEMPLATETYPE", self.name, executable.te_daemon_types)
if self.initscript != "":
newte += re.sub("TEMPLATETYPE", self.name, executable.te_initscript_types)
return newte
def generate_tmp_types(self):
if self.use_tmp:
return re.sub("TEMPLATETYPE", self.name, tmp.te_types)
else:
return ""
def generate_booleans(self):
newte = ""
for b in self.booleans:
tmp = re.sub("BOOLEAN", b, boolean.te_boolean)
newte += re.sub("DESCRIPTION", self.booleans[b], tmp)
return newte
def generate_boolean_rules(self):
newte = ""
for b in self.booleans:
newte += re.sub("BOOLEAN", b, boolean.te_rules)
return newte
def generate_sandbox_te(self):
return re.sub("TEMPLATETYPE", self.name, executable.te_sandbox_types)
def generate_cgi_te(self):
return re.sub("TEMPLATETYPE", self.name, executable.te_cgi_types)
def generate_daemon_rules(self):
newif = re.sub("TEMPLATETYPE", self.name, executable.te_daemon_rules)
return newif
def generate_new_type_if(self):
newif = ""
for t in self.types:
for i in self.DEFAULT_EXT:
if t.endswith(i):
reqtype = t[:-len(i)] + "_t"
newif += re.sub("TEMPLATETYPE", t[:-len(i)], self.DEFAULT_EXT[i].if_rules)
break
return newif
def generate_login_user_rules(self):
return re.sub("TEMPLATETYPE", self.name, user.te_login_user_rules)
def generate_existing_user_rules(self):
nerules = re.sub("TEMPLATETYPE", self.name, user.te_existing_user_rules)
return nerules
def generate_x_login_user_rules(self):
return re.sub("TEMPLATETYPE", self.name, user.te_x_login_user_rules)
def generate_root_user_rules(self):
newte = re.sub("TEMPLATETYPE", self.name, user.te_root_user_rules)
return newte
def generate_userapp_rules(self):
return re.sub("TEMPLATETYPE", self.name, executable.te_userapp_rules)
def generate_inetd_rules(self):
return re.sub("TEMPLATETYPE", self.name, executable.te_inetd_rules)
def generate_dbusd_rules(self):
return re.sub("TEMPLATETYPE", self.name, executable.te_dbusd_rules)
def generate_tmp_rules(self):
if self.use_tmp:
return re.sub("TEMPLATETYPE", self.name, tmp.te_rules)
else:
return ""
def generate_cgi_rules(self):
newte = ""
newte += re.sub("TEMPLATETYPE", self.name, executable.te_cgi_rules)
return newte
def generate_sandbox_rules(self):
newte = ""
newte += re.sub("TEMPLATETYPE", self.name, executable.te_sandbox_rules)
return newte
def generate_user_if(self):
newif = ""
if self.use_terminal or self.type == USER:
newif = re.sub("TEMPLATETYPE", self.name, executable.if_user_program_rules)
if self.type in (TUSER, XUSER, AUSER, LUSER):
newif += re.sub("TEMPLATETYPE", self.name, executable.if_role_change_rules)
return newif
def generate_if(self):
newif = ""
newif += re.sub("TEMPLATETYPE", self.name, executable.if_heading_rules)
if self.program:
newif += re.sub("TEMPLATETYPE", self.name, executable.if_program_rules)
if self.initscript != "":
newif += re.sub("TEMPLATETYPE", self.name, executable.if_initscript_rules)
for d in self.DEFAULT_KEYS:
if len(self.DEFAULT_DIRS[d][1]) > 0:
newif += re.sub("TEMPLATETYPE", self.name, self.DEFAULT_DIRS[d][2].if_rules)
for i in self.DEFAULT_DIRS[d][1]:
if os.path.exists(i) and stat.S_ISSOCK(os.stat(i)[stat.ST_MODE]):
newif += re.sub("TEMPLATETYPE", self.name, self.DEFAULT_DIRS[d][2].if_stream_rules)
break
newif += self.generate_user_if()
newif += self.generate_dbus_if()
newif += self.generate_admin_if()
newif += self.generate_sandbox_if()
newif += self.generate_new_type_if()
newif += self.generate_new_rules()
return newif
def generate_default_types(self):
return self.DEFAULT_TYPES[self.type][0]()
def generate_default_rules(self):
if self.DEFAULT_TYPES[self.type][1]:
return self.DEFAULT_TYPES[self.type][1]()
return ""
def generate_roles_rules(self):
newte = ""
if self.type in (TUSER, XUSER, AUSER, LUSER):
roles = ""
if len(self.roles) > 0:
newte += re.sub("TEMPLATETYPE", self.name, user.te_sudo_rules)
newte += re.sub("TEMPLATETYPE", self.name, user.te_newrole_rules)
for role in self.roles:
tmp = re.sub("TEMPLATETYPE", self.name, user.te_roles_rules)
newte += re.sub("ROLE", role, tmp)
return newte
def generate_te(self):
newte = self.generate_default_types()
for d in self.DEFAULT_KEYS:
if len(self.DEFAULT_DIRS[d][1]) > 0:
# CGI scripts already have a rw_t
if self.type != CGI or d != "rw":
newte += re.sub("TEMPLATETYPE", self.name, self.DEFAULT_DIRS[d][2].te_types)
if self.type != EUSER:
newte += """
########################################
#
# %s local policy
#
""" % self.name
newte += self.generate_capabilities()
newte += self.generate_process()
newte += self.generate_network_types()
newte += self.generate_tmp_types()
newte += self.generate_booleans()
newte += self.generate_default_rules()
newte += self.generate_boolean_rules()
for d in self.DEFAULT_KEYS:
if len(self.DEFAULT_DIRS[d][1]) > 0:
if self.type == EUSER:
newte_tmp = ""
for domain in self.existing_domains:
newte_tmp += re.sub("TEMPLATETYPE_t", domain[:-2] + "_t", self.DEFAULT_DIRS[d][2].te_rules)
newte += re.sub("TEMPLATETYPE_rw_t", self.name + "_rw_t", newte_tmp)
else:
newte += re.sub("TEMPLATETYPE", self.name, self.DEFAULT_DIRS[d][2].te_rules)
for i in self.DEFAULT_DIRS[d][1]:
if os.path.exists(i) and stat.S_ISSOCK(os.stat(i)[stat.ST_MODE]):
if self.type == EUSER:
for domain in self.existing_domains:
newte += re.sub("TEMPLATETYPE", domain[:-2], self.DEFAULT_DIRS[d][2].te_stream_rules)
else:
newte += re.sub("TEMPLATETYPE", self.name, self.DEFAULT_DIRS[d][2].te_stream_rules)
break
newte += self.generate_tmp_rules()
newte += self.generate_network_rules()
newte += self.generate_fd_rules()
newte += self.generate_etc_rules()
newte += self.generate_pam_rules()
newte += self.generate_uid_rules()
newte += self.generate_audit_rules()
newte += self.generate_syslog_rules()
newte += self.generate_localization_rules()
newte += self.generate_resolve_rules()
newte += self.generate_roles_rules()
newte += self.generate_mail_rules()
newte += self.generate_transition_rules()
newte += self.generate_admin_rules()
newte += self.generate_dbus_rules()
newte += self.generate_kerberos_rules()
newte += self.generate_manage_krb5_rcache_rules()
return newte
def generate_fc(self):
newfc = ""
fclist = []
for i in self.files.keys():
if os.path.exists(i) and stat.S_ISSOCK(os.stat(i)[stat.ST_MODE]):
t1 = re.sub("TEMPLATETYPE", self.name, self.files[i][2].fc_sock_file)
else:
t1 = re.sub("TEMPLATETYPE", self.name, self.files[i][2].fc_file)
t2 = re.sub("FILENAME", i, t1)
fclist.append(re.sub("FILETYPE", self.files[i][0], t2))
for i in self.dirs.keys():
t1 = re.sub("TEMPLATETYPE", self.name, self.dirs[i][2].fc_dir)
t2 = re.sub("FILENAME", i, t1)
fclist.append(re.sub("FILETYPE", self.dirs[i][0], t2))
if self.type in USERS + [SANDBOX]:
if len(fclist) == 0:
return executable.fc_user
if self.type not in USERS + [SANDBOX, EUSER, NEWTYPE] and not self.program:
raise ValueError(_("You must enter the executable path for your confined process"))
if self.program:
t1 = re.sub("EXECUTABLE", self.program, executable.fc_program)
fclist.append(re.sub("TEMPLATETYPE", self.name, t1))
if self.initscript != "":
t1 = re.sub("EXECUTABLE", self.initscript, executable.fc_initscript)
fclist.append(re.sub("TEMPLATETYPE", self.name, t1))
fclist.sort()
newfc = "\n".join(fclist)
return newfc
def generate_user_sh(self):
newsh = ""
if self.type not in (TUSER, XUSER, AUSER, LUSER, RUSER):
return newsh
roles = ""
for role in self.roles:
roles += " %s_r" % role
if roles != "":
roles += " system_r"
tmp = re.sub("TEMPLATETYPE", self.name, script.users)
newsh += re.sub("ROLES", roles, tmp)
if self.type == RUSER or self.type == AUSER:
for u in self.transition_users:
tmp = re.sub("TEMPLATETYPE", self.name, script.admin_trans)
newsh += re.sub("USER", u, tmp)
if self.type == LUSER:
newsh += re.sub("TEMPLATETYPE", self.name, script.min_login_user_default_context)
else:
newsh += re.sub("TEMPLATETYPE", self.name, script.x_login_user_default_context)
return newsh
def generate_sh(self):
temp = re.sub("TEMPLATETYPE", self.file_name, script.compile)
temp = re.sub("DOMAINTYPE", self.name, temp)
if self.type == EUSER:
newsh = re.sub("TEMPLATEFILE", "%s" % self.file_name, temp)
else:
newsh = re.sub("TEMPLATEFILE", self.file_name, temp)
newsh += re.sub("DOMAINTYPE", self.name, script.manpage)
if self.program:
newsh += re.sub("FILENAME", self.program, script.restorecon)
if self.initscript != "":
newsh += re.sub("FILENAME", self.initscript, script.restorecon)
for i in self.files.keys():
newsh += re.sub("FILENAME", i, script.restorecon)
for i in self.dirs.keys():
newsh += re.sub("FILENAME", i, script.restorecon)
for i in self.in_tcp[PORTS] + self.out_tcp[PORTS]:
if self.find_port(i, "tcp") is None:
t1 = re.sub("PORTNUM", "%d" % i, script.tcp_ports)
newsh += re.sub("TEMPLATETYPE", self.name, t1)
for i in self.in_udp[PORTS]:
if self.find_port(i, "udp") is None:
t1 = re.sub("PORTNUM", "%d" % i, script.udp_ports)
newsh += re.sub("TEMPLATETYPE", self.name, t1)
newsh += self.generate_user_sh()
if (platform.linux_distribution(full_distribution_name=0)[0] in ("redhat", "centos", "SuSE", "fedora", "mandrake", "mandriva")):
newsh += re.sub("TEMPLATEFILE", self.file_name, script.rpm)
return newsh
def generate_spec(self):
newspec = ""
selinux_policynvr = get_rpm_nvr_list("selinux-policy")
if selinux_policynvr is None:
selinux_policyver = "0.0.0"
else:
selinux_policyver = selinux_policynvr[1]
newspec += spec.header_comment_section
if self.type in APPLICATIONS:
newspec += spec.define_relabel_files_begin
if self.program:
newspec += re.sub("FILENAME", self.program, spec.define_relabel_files_end)
if self.initscript != "":
newspec += re.sub("FILENAME", self.initscript, spec.define_relabel_files_end)
for i in self.files.keys():
newspec += re.sub("FILENAME", i, spec.define_relabel_files_end)
for i in self.dirs.keys():
newspec += re.sub("FILENAME", i, spec.define_relabel_files_end)
newspec += re.sub("VERSION", selinux_policyver, spec.base_section)
newspec = re.sub("MODULENAME", self.file_name, newspec)
newspec = re.sub("DOMAINNAME", self.name, newspec)
if len(self.rpms) > 0:
newspec += "Requires(post): %s\n" % ", ".join(self.rpms)
newspec += re.sub("MODULENAME", self.file_name, spec.mid_section)
newspec = re.sub("DOMAINNAME", self.name, newspec)
newspec = re.sub("TODAYSDATE", time.strftime("%a %b %e %Y"), newspec)
if self.type not in APPLICATIONS:
newspec = re.sub("%relabel_files", "", newspec)
# Remove man pages from EUSER spec file
if self.type == EUSER:
newspec = re.sub(".*%s_selinux.8.*" % self.name, "", newspec)
# Remove user context file from non users spec file
if self.type not in (TUSER, XUSER, AUSER, LUSER, RUSER):
newspec = re.sub(".*%s_u.*" % self.name, "", newspec)
return newspec
def write_spec(self, out_dir):
specfile = "%s/%s_selinux.spec" % (out_dir, self.file_name)
fd = open(specfile, "w")
fd.write(self.generate_spec())
fd.close()
return specfile
def write_te(self, out_dir):
tefile = "%s/%s.te" % (out_dir, self.file_name)
fd = open(tefile, "w")
fd.write(self.generate_te())
fd.close()
return tefile
def write_sh(self, out_dir):
shfile = "%s/%s.sh" % (out_dir, self.file_name)
fd = open(shfile, "w")
fd.write(self.generate_sh())
fd.close()
os.chmod(shfile, 0o750)
return shfile
def write_if(self, out_dir):
iffile = "%s/%s.if" % (out_dir, self.file_name)
fd = open(iffile, "w")
fd.write(self.generate_if())
fd.close()
return iffile
def write_fc(self, out_dir):
fcfile = "%s/%s.fc" % (out_dir, self.file_name)
fd = open(fcfile, "w")
fd.write(self.generate_fc())
fd.close()
return fcfile
def __extract_rpms(self):
import dnf
with dnf.Base() as base:
base.read_all_repos()
base.fill_sack(load_system_repo=True)
query = base.sack.query()
pq = query.available()
pq = pq.filter(file=self.program)
for pkg in pq:
self.rpms.append(pkg.name)
for fname in pkg.files:
for b in self.DEFAULT_DIRS:
if b == "/etc":
continue
if fname.startswith(b):
if os.path.isfile(fname):
self.add_file(fname)
else:
self.add_dir(fname)
sq = query.available()
sq = sq.filter(provides=pkg.source_name)
for bpkg in sq:
for fname in bpkg.files:
for b in self.DEFAULT_DIRS:
if b == "/etc":
continue
if fname.startswith(b):
if os.path.isfile(fname):
self.add_file(fname)
else:
self.add_dir(fname)
def gen_writeable(self):
try:
self.__extract_rpms()
except ImportError:
pass
if os.path.isfile("/var/run/%s.pid" % self.name):
self.add_file("/var/run/%s.pid" % self.name)
if os.path.isdir("/var/run/%s" % self.name):
self.add_dir("/var/run/%s" % self.name)
if os.path.isdir("/var/log/%s" % self.name):
self.add_dir("/var/log/%s" % self.name)
if os.path.isfile("/var/log/%s.log" % self.name):
self.add_file("/var/log/%s.log" % self.name)
if os.path.isdir("/var/lib/%s" % self.name):
self.add_dir("/var/lib/%s" % self.name)
if os.path.isfile("/etc/rc.d/init.d/%s" % self.name):
self.set_init_script(r"/etc/rc\.d/init\.d/%s" % self.name)
# we don't want to have subdir in the .fc policy file
# if we already specify labeling for parent dir
temp_basepath = []
for p in self.DEFAULT_DIRS.keys():
temp_dirs = []
try:
temp_basepath = self.DEFAULT_DIRS[p][1][0] + "/"
except IndexError:
continue
for i in self.DEFAULT_DIRS[p][1]:
if i.startswith(temp_basepath):
temp_dirs.append(i)
else:
continue
if len(temp_dirs) != 0:
for i in temp_dirs:
if i in self.dirs.keys():
del(self.dirs[i])
elif i in self.files.keys():
del(self.files[i])
else:
continue
self.DEFAULT_DIRS[p][1] = list(set(self.DEFAULT_DIRS[p][1]) - set(temp_dirs))
def gen_symbols(self):
if self.type not in APPLICATIONS:
return
if not os.path.exists(self.program):
sys.stderr.write("""
***************************************
Warning %s does not exist
***************************************
""" % self.program)
return
fd = os.popen("nm -D %s | grep U" % self.program)
for s in fd.read().split():
for b in self.symbols:
if s.startswith(b):
exec("self.%s" % self.symbols[b])
fd.close()
def generate(self, out_dir=os.getcwd()):
out = "Created the following files:\n"
out += "%s # %s\n" % (self.write_te(out_dir), _("Type Enforcement file"))
out += "%s # %s\n" % (self.write_if(out_dir), _("Interface file"))
out += "%s # %s\n" % (self.write_fc(out_dir), _("File Contexts file"))
if self.type != NEWTYPE:
if (platform.linux_distribution(full_distribution_name=0)[0] in ("redhat", "centos", "SuSE", "fedora", "mandrake", "mandriva")):
out += "%s # %s\n" % (self.write_spec(out_dir), _("Spec file"))
out += "%s # %s\n" % (self.write_sh(out_dir), _("Setup Script"))
return out