8
0
Fork 0
mirror of https://gitlab2.federez.net/re2o/re2o synced 2024-11-22 11:23:10 +00:00

Pep8 et nettoyage

This commit is contained in:
Gabriel Detraz 2017-10-15 22:10:33 +02:00 committed by root
parent 643c8235b0
commit a9dcc6d9c4

View file

@ -23,44 +23,59 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from datetime import timedelta
import re
from netaddr import mac_bare, EUI, IPSet, IPRange, IPNetwork, IPAddress
from django.db import models from django.db import models
from django.db.models.signals import post_save, pre_delete, post_delete from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver from django.dispatch import receiver
from django.forms import ValidationError from django.forms import ValidationError
from django.utils.functional import cached_property from django.utils.functional import cached_property
from django.utils import timezone from django.utils import timezone
from django.core.validators import MaxValueValidator
from macaddress.fields import MACAddressField from macaddress.fields import MACAddressField
from netaddr import mac_bare, EUI, IPSet, IPRange, IPNetwork, IPAddress
from django.core.validators import MinValueValidator,MaxValueValidator
import re
from reversion import revisions as reversion
from datetime import timedelta
class Machine(models.Model): class Machine(models.Model):
""" Class définissant une machine, object parent user, objets fils interfaces""" """ Class définissant une machine, object parent user, objets fils
interfaces"""
PRETTY_NAME = "Machine" PRETTY_NAME = "Machine"
user = models.ForeignKey('users.User', on_delete=models.PROTECT) user = models.ForeignKey('users.User', on_delete=models.PROTECT)
name = models.CharField(max_length=255, help_text="Optionnel", blank=True, null=True) name = models.CharField(
max_length=255,
help_text="Optionnel",
blank=True,
null=True
)
active = models.BooleanField(default=True) active = models.BooleanField(default=True)
def __str__(self): def __str__(self):
return str(self.user) + ' - ' + str(self.id) + ' - ' + str(self.name) return str(self.user) + ' - ' + str(self.id) + ' - ' + str(self.name)
class MachineType(models.Model): class MachineType(models.Model):
""" Type de machine, relié à un type d'ip, affecté aux interfaces""" """ Type de machine, relié à un type d'ip, affecté aux interfaces"""
PRETTY_NAME = "Type de machine" PRETTY_NAME = "Type de machine"
type = models.CharField(max_length=255) type = models.CharField(max_length=255)
ip_type = models.ForeignKey('IpType', on_delete=models.PROTECT, blank=True, null=True) ip_type = models.ForeignKey(
'IpType',
on_delete=models.PROTECT,
blank=True,
null=True
)
def all_interfaces(self): def all_interfaces(self):
""" Renvoie toutes les interfaces (cartes réseaux) de type machinetype""" """ Renvoie toutes les interfaces (cartes réseaux) de type
machinetype"""
return Interface.objects.filter(type=self) return Interface.objects.filter(type=self)
def __str__(self): def __str__(self):
return self.type return self.type
class IpType(models.Model): class IpType(models.Model):
""" Type d'ip, définissant un range d'ip, affecté aux machine types""" """ Type d'ip, définissant un range d'ip, affecté aux machine types"""
@ -71,14 +86,27 @@ class IpType(models.Model):
need_infra = models.BooleanField(default=False) need_infra = models.BooleanField(default=False)
domaine_ip_start = models.GenericIPAddressField(protocol='IPv4') domaine_ip_start = models.GenericIPAddressField(protocol='IPv4')
domaine_ip_stop = models.GenericIPAddressField(protocol='IPv4') domaine_ip_stop = models.GenericIPAddressField(protocol='IPv4')
prefix_v6 = models.GenericIPAddressField(protocol='IPv6', null=True, blank=True) prefix_v6 = models.GenericIPAddressField(
vlan = models.ForeignKey('Vlan', on_delete=models.PROTECT, blank=True, null=True) protocol='IPv6',
ouverture_ports = models.ForeignKey('OuverturePortList', blank=True, null=True) null=True,
blank=True
)
vlan = models.ForeignKey(
'Vlan',
on_delete=models.PROTECT,
blank=True,
null=True
)
ouverture_ports = models.ForeignKey(
'OuverturePortList',
blank=True,
null=True
)
@cached_property @cached_property
def ip_range(self): def ip_range(self):
""" Renvoie un objet IPRange à partir de l'objet IpType""" """ Renvoie un objet IPRange à partir de l'objet IpType"""
return IPRange(self.domaine_ip_start, end=self.domaine_ip_stop) return IPRange(self.domaine_ip_start, end=self.domaine_ip_stop)
@cached_property @cached_property
def ip_set(self): def ip_set(self):
@ -96,18 +124,22 @@ class IpType(models.Model):
def free_ip(self): def free_ip(self):
""" Renvoie toutes les ip libres associées au type donné (self)""" """ Renvoie toutes les ip libres associées au type donné (self)"""
return IpList.objects.filter(interface__isnull=True).filter(ip_type=self) return IpList.objects.filter(
interface__isnull=True
).filter(ip_type=self)
def gen_ip_range(self): def gen_ip_range(self):
""" Cree les IpList associées au type self. Parcours pédestrement et crée """ Cree les IpList associées au type self. Parcours pédestrement et
les ip une par une. Si elles existent déjà, met à jour le type associé crée les ip une par une. Si elles existent déjà, met à jour le type
à l'ip""" associé à l'ip"""
# Creation du range d'ip dans les objets iplist # Creation du range d'ip dans les objets iplist
networks = [] networks = []
for net in self.ip_range.cidrs(): for net in self.ip_range.cidrs():
networks += net.iter_hosts() networks += net.iter_hosts()
ip_obj = [IpList(ip_type=self, ipv4=str(ip)) for ip in networks] ip_obj = [IpList(ip_type=self, ipv4=str(ip)) for ip in networks]
listes_ip = IpList.objects.filter(ipv4__in=[str(ip) for ip in networks]) listes_ip = IpList.objects.filter(
ipv4__in=[str(ip) for ip in networks]
)
# Si il n'y a pas d'ip, on les crée # Si il n'y a pas d'ip, on les crée
if not listes_ip: if not listes_ip:
IpList.objects.bulk_create(ip_obj) IpList.objects.bulk_create(ip_obj)
@ -117,9 +149,11 @@ class IpType(models.Model):
return return
def del_ip_range(self): def del_ip_range(self):
""" Methode dépréciée, IpList est en mode cascade et supprimé automatiquement""" """ Methode dépréciée, IpList est en mode cascade et supprimé
automatiquement"""
if Interface.objects.filter(ipv4__in=self.ip_objects()): if Interface.objects.filter(ipv4__in=self.ip_objects()):
raise ValidationError("Une ou plusieurs ip du range sont affectées, impossible de supprimer le range") raise ValidationError("Une ou plusieurs ip du range sont\
affectées, impossible de supprimer le range")
for ip in self.ip_objects(): for ip in self.ip_objects():
ip.delete() ip.delete()
@ -133,11 +167,13 @@ class IpType(models.Model):
raise ValidationError("Domaine end doit être après start...") raise ValidationError("Domaine end doit être après start...")
# On ne crée pas plus grand qu'un /16 # On ne crée pas plus grand qu'un /16
if self.ip_range.size > 65536: if self.ip_range.size > 65536:
raise ValidationError("Le range est trop gros, vous ne devez pas créer plus grand qu'un /16") raise ValidationError("Le range est trop gros, vous ne devez\
pas créer plus grand qu'un /16")
# On check que les / ne se recoupent pas # On check que les / ne se recoupent pas
for element in IpType.objects.all().exclude(pk=self.pk): for element in IpType.objects.all().exclude(pk=self.pk):
if not self.ip_set.isdisjoint(element.ip_set): if not self.ip_set.isdisjoint(element.ip_set):
raise ValidationError("Le range indiqué n'est pas disjoint des ranges existants") raise ValidationError("Le range indiqué n'est pas disjoint\
des ranges existants")
# On formate le prefix v6 # On formate le prefix v6
if self.prefix_v6: if self.prefix_v6:
self.prefix_v6 = str(IPNetwork(self.prefix_v6 + '/64').network) self.prefix_v6 = str(IPNetwork(self.prefix_v6 + '/64').network)
@ -150,6 +186,7 @@ class IpType(models.Model):
def __str__(self): def __str__(self):
return self.type return self.type
class Vlan(models.Model): class Vlan(models.Model):
""" Un vlan : vlan_id et nom """ Un vlan : vlan_id et nom
On limite le vlan id entre 0 et 4096, comme défini par la norme""" On limite le vlan id entre 0 et 4096, comme défini par la norme"""
@ -162,8 +199,9 @@ class Vlan(models.Model):
def __str__(self): def __str__(self):
return self.name return self.name
class Nas(models.Model): class Nas(models.Model):
""" Les nas. Associé à un machine_type. """ Les nas. Associé à un machine_type.
Permet aussi de régler le port_access_mode (802.1X ou mac-address) pour Permet aussi de régler le port_access_mode (802.1X ou mac-address) pour
le radius. Champ autocapture de la mac à true ou false""" le radius. Champ autocapture de la mac à true ou false"""
PRETTY_NAME = "Correspondance entre les nas et les machines connectées" PRETTY_NAME = "Correspondance entre les nas et les machines connectées"
@ -175,33 +213,53 @@ class Nas(models.Model):
) )
name = models.CharField(max_length=255, unique=True) name = models.CharField(max_length=255, unique=True)
nas_type = models.ForeignKey('MachineType', on_delete=models.PROTECT, related_name='nas_type') nas_type = models.ForeignKey(
machine_type = models.ForeignKey('MachineType', on_delete=models.PROTECT, related_name='machinetype_on_nas') 'MachineType',
port_access_mode = models.CharField(choices=AUTH, default=default_mode, max_length=32) on_delete=models.PROTECT,
related_name='nas_type'
)
machine_type = models.ForeignKey(
'MachineType',
on_delete=models.PROTECT,
related_name='machinetype_on_nas'
)
port_access_mode = models.CharField(
choices=AUTH,
default=default_mode,
max_length=32
)
autocapture_mac = models.BooleanField(default=False) autocapture_mac = models.BooleanField(default=False)
def __str__(self): def __str__(self):
return self.name return self.name
class Extension(models.Model): class Extension(models.Model):
""" Extension dns type example.org. Précise si tout le monde peut l'utiliser, """ Extension dns type example.org. Précise si tout le monde peut
associé à un origin (ip d'origine)""" l'utiliser, associé à un origin (ip d'origine)"""
PRETTY_NAME = "Extensions dns" PRETTY_NAME = "Extensions dns"
name = models.CharField(max_length=255, unique=True) name = models.CharField(max_length=255, unique=True)
need_infra = models.BooleanField(default=False) need_infra = models.BooleanField(default=False)
origin = models.OneToOneField('IpList', on_delete=models.PROTECT, blank=True, null=True) origin = models.OneToOneField(
'IpList',
on_delete=models.PROTECT,
blank=True,
null=True
)
@cached_property @cached_property
def dns_entry(self): def dns_entry(self):
""" Une entrée DNS A""" """ Une entrée DNS A"""
return "@ IN A " + str(self.origin) return "@ IN A " + str(self.origin)
def __str__(self): def __str__(self):
return self.name return self.name
class Mx(models.Model): class Mx(models.Model):
""" Entrées des MX. Enregistre la zone (extension) associée et la priorité """ Entrées des MX. Enregistre la zone (extension) associée et la
priorité
Todo : pouvoir associer un MX à une interface """ Todo : pouvoir associer un MX à une interface """
PRETTY_NAME = "Enregistrements MX" PRETTY_NAME = "Enregistrements MX"
@ -211,12 +269,16 @@ class Mx(models.Model):
@cached_property @cached_property
def dns_entry(self): def dns_entry(self):
return "@ IN MX " + str(self.priority) + " " + str(self.name) """Renvoie l'entrée DNS complète pour un MX à mettre dans les
fichiers de zones"""
return "@ IN MX " + str(self.priority) + " " + str(self.name)
def __str__(self): def __str__(self):
return str(self.zone) + ' ' + str(self.priority) + ' ' + str(self.name) return str(self.zone) + ' ' + str(self.priority) + ' ' + str(self.name)
class Ns(models.Model): class Ns(models.Model):
"""Liste des enregistrements name servers par zone considéérée"""
PRETTY_NAME = "Enregistrements NS" PRETTY_NAME = "Enregistrements NS"
zone = models.ForeignKey('Extension', on_delete=models.PROTECT) zone = models.ForeignKey('Extension', on_delete=models.PROTECT)
@ -224,11 +286,13 @@ class Ns(models.Model):
@cached_property @cached_property
def dns_entry(self): def dns_entry(self):
return "@ IN NS " + str(self.ns) """Renvoie un enregistrement NS complet pour les filezones"""
return "@ IN NS " + str(self.ns)
def __str__(self): def __str__(self):
return str(self.zone) + ' ' + str(self.ns) return str(self.zone) + ' ' + str(self.ns)
class Text(models.Model): class Text(models.Model):
""" Un enregistrement TXT associé à une extension""" """ Un enregistrement TXT associé à une extension"""
PRETTY_NAME = "Enregistrement text" PRETTY_NAME = "Enregistrement text"
@ -236,24 +300,33 @@ class Text(models.Model):
zone = models.ForeignKey('Extension', on_delete=models.PROTECT) zone = models.ForeignKey('Extension', on_delete=models.PROTECT)
field1 = models.CharField(max_length=255) field1 = models.CharField(max_length=255)
field2 = models.CharField(max_length=255) field2 = models.CharField(max_length=255)
def __str__(self): def __str__(self):
return str(self.zone) + " : " + str(self.field1) + " " + str(self.field2) return str(self.zone) + " : " + str(self.field1) + " " +\
str(self.field2)
@cached_property @cached_property
def dns_entry(self): def dns_entry(self):
"""Renvoie l'enregistrement TXT complet pour le fichier de zone"""
return str(self.field1) + " IN TXT " + str(self.field2) return str(self.field1) + " IN TXT " + str(self.field2)
class Interface(models.Model): class Interface(models.Model):
""" Une interface. Objet clef de l'application machine : """ Une interface. Objet clef de l'application machine :
- une address mac unique. Possibilité de la rendre unique avec le typemachine - une address mac unique. Possibilité de la rendre unique avec le
typemachine
- une onetoone vers IpList pour attribution ipv4 - une onetoone vers IpList pour attribution ipv4
- le type parent associé au range ip et à l'extension - le type parent associé au range ip et à l'extension
- un objet domain associé contenant son nom - un objet domain associé contenant son nom
- la liste des ports oiuvert""" - la liste des ports oiuvert"""
PRETTY_NAME = "Interface" PRETTY_NAME = "Interface"
ipv4 = models.OneToOneField('IpList', on_delete=models.PROTECT, blank=True, null=True) ipv4 = models.OneToOneField(
'IpList',
on_delete=models.PROTECT,
blank=True,
null=True
)
mac_address = MACAddressField(integer=False, unique=True) mac_address = MACAddressField(integer=False, unique=True)
machine = models.ForeignKey('Machine', on_delete=models.CASCADE) machine = models.ForeignKey('Machine', on_delete=models.CASCADE)
type = models.ForeignKey('MachineType', on_delete=models.PROTECT) type = models.ForeignKey('MachineType', on_delete=models.PROTECT)
@ -267,12 +340,14 @@ class Interface(models.Model):
user = self.machine.user user = self.machine.user
return machine.active and user.has_access() return machine.active and user.has_access()
@cached_property @cached_property
def ipv6_object(self): def ipv6_object(self):
""" Renvoie un objet type ipv6 à partir du prefix associé à l'iptype parent""" """ Renvoie un objet type ipv6 à partir du prefix associé à
l'iptype parent"""
if self.type.ip_type.prefix_v6: if self.type.ip_type.prefix_v6:
return EUI(self.mac_address).ipv6(IPNetwork(self.type.ip_type.prefix_v6).network) return EUI(self.mac_address).ipv6(
IPNetwork(self.type.ip_type.prefix_v6).network
)
else: else:
return None return None
@ -286,10 +361,11 @@ class Interface(models.Model):
return str(EUI(self.mac_address, dialect=mac_bare)).lower() return str(EUI(self.mac_address, dialect=mac_bare)).lower()
def filter_macaddress(self): def filter_macaddress(self):
""" Tente un formatage mac_bare, si échoue, lève une erreur de validation""" """ Tente un formatage mac_bare, si échoue, lève une erreur de
validation"""
try: try:
self.mac_address = str(EUI(self.mac_address)) self.mac_address = str(EUI(self.mac_address))
except : except:
raise ValidationError("La mac donnée est invalide") raise ValidationError("La mac donnée est invalide")
def clean(self, *args, **kwargs): def clean(self, *args, **kwargs):
@ -307,7 +383,8 @@ class Interface(models.Model):
if free_ips: if free_ips:
self.ipv4 = free_ips[0] self.ipv4 = free_ips[0]
else: else:
raise ValidationError("Il n'y a plus d'ip disponibles dans le slash") raise ValidationError("Il n'y a plus d'ip disponibles\
dans le slash")
return return
def unassign_ipv4(self): def unassign_ipv4(self):
@ -323,7 +400,8 @@ class Interface(models.Model):
self.filter_macaddress() self.filter_macaddress()
# On verifie la cohérence en forçant l'extension par la méthode # On verifie la cohérence en forçant l'extension par la méthode
if self.type.ip_type != self.ipv4.ip_type: if self.type.ip_type != self.ipv4.ip_type:
raise ValidationError("L'ipv4 et le type de la machine ne correspondent pas") raise ValidationError("L'ipv4 et le type de la machine ne\
correspondent pas")
super(Interface, self).save(*args, **kwargs) super(Interface, self).save(*args, **kwargs)
def __str__(self): def __str__(self):
@ -342,18 +420,34 @@ class Interface(models.Model):
def may_have_port_open(self): def may_have_port_open(self):
""" True si l'interface a une ip et une ip publique. """ True si l'interface a une ip et une ip publique.
Permet de ne pas exporter des ouvertures sur des ip privées (useless)""" Permet de ne pas exporter des ouvertures sur des ip privées
(useless)"""
return self.ipv4 and not self.has_private_ip() return self.ipv4 and not self.has_private_ip()
class Domain(models.Model): class Domain(models.Model):
""" Objet domain. Enregistrement A et CNAME en même temps : permet de stocker les """ Objet domain. Enregistrement A et CNAME en même temps : permet de
alias et les nom de machines, suivant si interface_parent ou cname sont remplis""" stocker les alias et les nom de machines, suivant si interface_parent
ou cname sont remplis"""
PRETTY_NAME = "Domaine dns" PRETTY_NAME = "Domaine dns"
interface_parent = models.OneToOneField('Interface', on_delete=models.CASCADE, blank=True, null=True) interface_parent = models.OneToOneField(
name = models.CharField(help_text="Obligatoire et unique, ne doit pas comporter de points", max_length=255) 'Interface',
on_delete=models.CASCADE,
blank=True,
null=True
)
name = models.CharField(
help_text="Obligatoire et unique, ne doit pas comporter de points",
max_length=255
)
extension = models.ForeignKey('Extension', on_delete=models.PROTECT) extension = models.ForeignKey('Extension', on_delete=models.PROTECT)
cname = models.ForeignKey('self', null=True, blank=True, related_name='related_domain') cname = models.ForeignKey(
'self',
null=True,
blank=True,
related_name='related_domain'
)
class Meta: class Meta:
unique_together = (("name", "extension"),) unique_together = (("name", "extension"),)
@ -363,30 +457,35 @@ class Domain(models.Model):
Retourne l'extension propre si c'est un cname, renvoie None sinon""" Retourne l'extension propre si c'est un cname, renvoie None sinon"""
if self.interface_parent: if self.interface_parent:
return self.interface_parent.type.ip_type.extension return self.interface_parent.type.ip_type.extension
elif hasattr(self,'extension'): elif hasattr(self, 'extension'):
return self.extension return self.extension
else: else:
return None return None
def clean(self): def clean(self):
""" Validation : """ Validation :
- l'objet est bien soit A soit CNAME - l'objet est bien soit A soit CNAME
- le cname est pas pointé sur lui-même - le cname est pas pointé sur lui-même
- le nom contient bien les caractères autorisés par la norme dns et moins de 63 caractères au total - le nom contient bien les caractères autorisés par la norme
dns et moins de 63 caractères au total
- le couple nom/extension est bien unique""" - le couple nom/extension est bien unique"""
if self.get_extension(): if self.get_extension():
self.extension=self.get_extension() self.extension = self.get_extension()
""" Validation du nom de domaine, extensions dans type de machine, prefixe pas plus long que 63 caractères """
if self.interface_parent and self.cname: if self.interface_parent and self.cname:
raise ValidationError("On ne peut créer à la fois A et CNAME") raise ValidationError("On ne peut créer à la fois A et CNAME")
if self.cname==self: if self.cname == self:
raise ValidationError("On ne peut créer un cname sur lui même") raise ValidationError("On ne peut créer un cname sur lui même")
HOSTNAME_LABEL_PATTERN = re.compile("(?!-)[A-Z\d-]+(?<!-)$", re.IGNORECASE) HOSTNAME_LABEL_PATTERN = re.compile(
"(?!-)[A-Z\d-]+(?<!-)$",
re.IGNORECASE
)
dns = self.name.lower() dns = self.name.lower()
if len(dns) > 63: if len(dns) > 63:
raise ValidationError("Le nom de domaine %s est trop long (maximum de 63 caractères)." % dns) raise ValidationError("Le nom de domaine %s est trop long\
(maximum de 63 caractères)." % dns)
if not HOSTNAME_LABEL_PATTERN.match(dns): if not HOSTNAME_LABEL_PATTERN.match(dns):
raise ValidationError("Ce nom de domaine %s contient des carractères interdits." % dns) raise ValidationError("Ce nom de domaine %s contient des\
carractères interdits." % dns)
self.validate_unique() self.validate_unique()
super(Domain, self).clean() super(Domain, self).clean()
@ -394,10 +493,11 @@ class Domain(models.Model):
def dns_entry(self): def dns_entry(self):
""" Une entrée DNS""" """ Une entrée DNS"""
if self.cname: if self.cname:
return str(self.name) + " IN CNAME " + str(self.cname) + "." return str(self.name) + " IN CNAME " + str(self.cname) + "."
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
""" Empèche le save sans extension valide. Force à avoir appellé clean avant""" """ Empèche le save sans extension valide.
Force à avoir appellé clean avant"""
if not self.get_extension(): if not self.get_extension():
raise ValidationError("Extension invalide") raise ValidationError("Extension invalide")
self.full_clean() self.full_clean()
@ -406,6 +506,7 @@ class Domain(models.Model):
def __str__(self): def __str__(self):
return str(self.name) + str(self.extension) return str(self.name) + str(self.extension)
class IpList(models.Model): class IpList(models.Model):
PRETTY_NAME = "Addresses ipv4" PRETTY_NAME = "Addresses ipv4"
@ -414,13 +515,15 @@ class IpList(models.Model):
@cached_property @cached_property
def need_infra(self): def need_infra(self):
""" Permet de savoir si un user basique peut assigner cette ip ou non""" """ Permet de savoir si un user basique peut assigner cette ip ou
non"""
return self.ip_type.need_infra return self.ip_type.need_infra
def clean(self): def clean(self):
""" Erreur si l'ip_type est incorrect""" """ Erreur si l'ip_type est incorrect"""
if not str(self.ipv4) in self.ip_type.ip_set_as_str: if not str(self.ipv4) in self.ip_type.ip_set_as_str:
raise ValidationError("L'ipv4 et le range de l'iptype ne correspondent pas!") raise ValidationError("L'ipv4 et le range de l'iptype ne\
correspondent pas!")
return return
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
@ -430,24 +533,36 @@ class IpList(models.Model):
def __str__(self): def __str__(self):
return self.ipv4 return self.ipv4
class Service(models.Model): class Service(models.Model):
""" Definition d'un service (dhcp, dns, etc)""" """ Definition d'un service (dhcp, dns, etc)"""
service_type = models.CharField(max_length=255, blank=True, unique=True) service_type = models.CharField(max_length=255, blank=True, unique=True)
min_time_regen = models.DurationField(default=timedelta(minutes=1), help_text="Temps minimal avant nouvelle génération du service") min_time_regen = models.DurationField(
regular_time_regen = models.DurationField(default=timedelta(hours=1), help_text="Temps maximal avant nouvelle génération du service") default=timedelta(minutes=1),
help_text="Temps minimal avant nouvelle génération du service"
)
regular_time_regen = models.DurationField(
default=timedelta(hours=1),
help_text="Temps maximal avant nouvelle génération du service"
)
servers = models.ManyToManyField('Interface', through='Service_link') servers = models.ManyToManyField('Interface', through='Service_link')
def ask_regen(self): def ask_regen(self):
""" Marque à True la demande de régénération pour un service x """ """ Marque à True la demande de régénération pour un service x """
Service_link.objects.filter(service=self).exclude(asked_regen=True).update(asked_regen=True) Service_link.objects.filter(service=self).exclude(asked_regen=True)\
.update(asked_regen=True)
return return
def process_link(self, servers): def process_link(self, servers):
""" Django ne peut créer lui meme les relations manytomany avec table intermediaire explicite""" """ Django ne peut créer lui meme les relations manytomany avec table
for serv in servers.exclude(pk__in=Interface.objects.filter(service=self)): intermediaire explicite"""
for serv in servers.exclude(
pk__in=Interface.objects.filter(service=self)
):
link = Service_link(service=self, server=serv) link = Service_link(service=self, server=serv)
link.save() link.save()
Service_link.objects.filter(service=self).exclude(server__in=servers).delete() Service_link.objects.filter(service=self).exclude(server__in=servers)\
.delete()
return return
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
@ -456,13 +571,16 @@ class Service(models.Model):
def __str__(self): def __str__(self):
return str(self.service_type) return str(self.service_type)
def regen(service): def regen(service):
""" Fonction externe pour régérération d'un service, prend un objet service en arg""" """ Fonction externe pour régérération d'un service, prend un objet service
en arg"""
obj = Service.objects.filter(service_type=service) obj = Service.objects.filter(service_type=service)
if obj: if obj:
obj[0].ask_regen() obj[0].ask_regen()
return return
class Service_link(models.Model): class Service_link(models.Model):
""" Definition du lien entre serveurs et services""" """ Definition du lien entre serveurs et services"""
service = models.ForeignKey('Service', on_delete=models.CASCADE) service = models.ForeignKey('Service', on_delete=models.CASCADE)
@ -477,11 +595,16 @@ class Service_link(models.Model):
self.save() self.save()
def need_regen(self): def need_regen(self):
""" Décide si le temps minimal écoulé est suffisant pour provoquer une régénération de service""" """ Décide si le temps minimal écoulé est suffisant pour provoquer une
if (self.asked_regen and (self.last_regen + self.service.min_time_regen) < timezone.now()) or (self.last_regen + self.service.regular_time_regen) < timezone.now(): régénération de service"""
return True return bool(
else: (self.asked_regen and (
return False self.last_regen + self.service.min_time_regen
) < timezone.now()
) or (
self.last_regen + self.service.regular_time_regen
) < timezone.now()
)
def __str__(self): def __str__(self):
return str(self.server) + " " + str(self.service) return str(self.server) + " " + str(self.service)
@ -489,29 +612,48 @@ class Service_link(models.Model):
class OuverturePortList(models.Model): class OuverturePortList(models.Model):
"""Liste des ports ouverts sur une interface.""" """Liste des ports ouverts sur une interface."""
name = models.CharField(help_text="Nom de la configuration des ports.", max_length=255) name = models.CharField(
help_text="Nom de la configuration des ports.",
max_length=255
)
def __str__(self): def __str__(self):
return self.name return self.name
def tcp_ports_in(self): def tcp_ports_in(self):
return self.ouvertureport_set.filter(protocole=OuverturePort.TCP, io=OuverturePort.IN) """Renvoie la liste des ports ouverts en TCP IN pour ce profil"""
return self.ouvertureport_set.filter(
protocole=OuverturePort.TCP,
io=OuverturePort.IN
)
def udp_ports_in(self): def udp_ports_in(self):
return self.ouvertureport_set.filter(protocole=OuverturePort.UDP, io=OuverturePort.IN) """Renvoie la liste des ports ouverts en UDP IN pour ce profil"""
return self.ouvertureport_set.filter(
protocole=OuverturePort.UDP,
io=OuverturePort.IN
)
def tcp_ports_out(self): def tcp_ports_out(self):
return self.ouvertureport_set.filter(protocole=OuverturePort.TCP, io=OuverturePort.OUT) """Renvoie la liste des ports ouverts en TCP OUT pour ce profil"""
return self.ouvertureport_set.filter(
protocole=OuverturePort.TCP,
io=OuverturePort.OUT
)
def udp_ports_out(self): def udp_ports_out(self):
return self.ouvertureport_set.filter(protocole=OuverturePort.UDP, io=OuverturePort.OUT) """Renvoie la liste des ports ouverts en UDP OUT pour ce profil"""
return self.ouvertureport_set.filter(
protocole=OuverturePort.UDP,
io=OuverturePort.OUT
)
class OuverturePort(models.Model): class OuverturePort(models.Model):
""" """
Représente un simple port ou une plage de ports. Représente un simple port ou une plage de ports.
Les ports de la plage sont compris entre begin et en inclus. Les ports de la plage sont compris entre begin et en inclus.
Si begin == end alors on ne représente qu'un seul port. Si begin == end alors on ne représente qu'un seul port.
On limite les ports entre 0 et 65535, tels que défini par la RFC On limite les ports entre 0 et 65535, tels que défini par la RFC
@ -522,112 +664,150 @@ class OuverturePort(models.Model):
OUT = 'O' OUT = 'O'
begin = models.PositiveIntegerField(validators=[MaxValueValidator(65535)]) begin = models.PositiveIntegerField(validators=[MaxValueValidator(65535)])
end = models.PositiveIntegerField(validators=[MaxValueValidator(65535)]) end = models.PositiveIntegerField(validators=[MaxValueValidator(65535)])
port_list = models.ForeignKey('OuverturePortList', on_delete=models.CASCADE) port_list = models.ForeignKey(
'OuverturePortList',
on_delete=models.CASCADE
)
protocole = models.CharField( protocole = models.CharField(
max_length=1, max_length=1,
choices=( choices=(
(TCP, 'TCP'), (TCP, 'TCP'),
(UDP, 'UDP'), (UDP, 'UDP'),
), ),
default=TCP, default=TCP,
) )
io = models.CharField( io = models.CharField(
max_length=1, max_length=1,
choices=( choices=(
(IN, 'IN'), (IN, 'IN'),
(OUT, 'OUT'), (OUT, 'OUT'),
), ),
default=OUT, default=OUT,
) )
def __str__(self): def __str__(self):
if self.begin == self.end : if self.begin == self.end:
return str(self.begin) return str(self.begin)
return '-'.join([str(self.begin), str(self.end)]) return '-'.join([str(self.begin), str(self.end)])
def show_port(self): def show_port(self):
"""Formatage plus joli, alias pour str"""
return str(self) return str(self)
@receiver(post_save, sender=Machine) @receiver(post_save, sender=Machine)
def machine_post_save(sender, **kwargs): def machine_post_save(sender, **kwargs):
"""Synchronisation ldap et régen parefeu/dhcp lors de la modification
d'une machine"""
user = kwargs['instance'].user user = kwargs['instance'].user
user.ldap_sync(base=False, access_refresh=False, mac_refresh=True) user.ldap_sync(base=False, access_refresh=False, mac_refresh=True)
regen('dhcp') regen('dhcp')
regen('mac_ip_list') regen('mac_ip_list')
@receiver(post_delete, sender=Machine) @receiver(post_delete, sender=Machine)
def machine_post_delete(sender, **kwargs): def machine_post_delete(sender, **kwargs):
"""Synchronisation ldap et régen parefeu/dhcp lors de la suppression
d'une machine"""
machine = kwargs['instance'] machine = kwargs['instance']
user = machine.user user = machine.user
user.ldap_sync(base=False, access_refresh=False, mac_refresh=True) user.ldap_sync(base=False, access_refresh=False, mac_refresh=True)
regen('dhcp') regen('dhcp')
regen('mac_ip_list') regen('mac_ip_list')
@receiver(post_save, sender=Interface) @receiver(post_save, sender=Interface)
def interface_post_save(sender, **kwargs): def interface_post_save(sender, **kwargs):
"""Synchronisation ldap et régen parefeu/dhcp lors de la modification
d'une interface"""
interface = kwargs['instance'] interface = kwargs['instance']
user = interface.machine.user user = interface.machine.user
user.ldap_sync(base=False, access_refresh=False, mac_refresh=True) user.ldap_sync(base=False, access_refresh=False, mac_refresh=True)
if not interface.may_have_port_open() and interface.port_lists.all():
interface.port_lists.clear()
# Regen services # Regen services
regen('dhcp') regen('dhcp')
regen('mac_ip_list') regen('mac_ip_list')
@receiver(post_delete, sender=Interface) @receiver(post_delete, sender=Interface)
def interface_post_delete(sender, **kwargs): def interface_post_delete(sender, **kwargs):
"""Synchronisation ldap et régen parefeu/dhcp lors de la suppression
d'une interface"""
interface = kwargs['instance'] interface = kwargs['instance']
user = interface.machine.user user = interface.machine.user
user.ldap_sync(base=False, access_refresh=False, mac_refresh=True) user.ldap_sync(base=False, access_refresh=False, mac_refresh=True)
@receiver(post_save, sender=IpType) @receiver(post_save, sender=IpType)
def iptype_post_save(sender, **kwargs): def iptype_post_save(sender, **kwargs):
"""Generation des objets ip après modification d'un range ip"""
iptype = kwargs['instance'] iptype = kwargs['instance']
iptype.gen_ip_range() iptype.gen_ip_range()
@receiver(post_save, sender=MachineType) @receiver(post_save, sender=MachineType)
def machine_post_save(sender, **kwargs): def machine_post_save(sender, **kwargs):
"""Mise à jour des interfaces lorsque changement d'attribution
d'une machinetype (changement iptype parent)"""
machinetype = kwargs['instance'] machinetype = kwargs['instance']
for interface in machinetype.all_interfaces(): for interface in machinetype.all_interfaces():
interface.update_type() interface.update_type()
@receiver(post_save, sender=Domain) @receiver(post_save, sender=Domain)
def domain_post_save(sender, **kwargs): def domain_post_save(sender, **kwargs):
"""Regeneration dns après modification d'un domain object"""
regen('dns') regen('dns')
@receiver(post_delete, sender=Domain) @receiver(post_delete, sender=Domain)
def domain_post_delete(sender, **kwargs): def domain_post_delete(sender, **kwargs):
"""Regeneration dns après suppression d'un domain object"""
regen('dns') regen('dns')
@receiver(post_save, sender=Extension) @receiver(post_save, sender=Extension)
def extension_post_save(sender, **kwargs): def extension_post_save(sender, **kwargs):
"""Regeneration dns après modification d'une extension"""
regen('dns') regen('dns')
@receiver(post_delete, sender=Extension) @receiver(post_delete, sender=Extension)
def extension_post_selete(sender, **kwargs): def extension_post_selete(sender, **kwargs):
"""Regeneration dns après suppression d'une extension"""
regen('dns') regen('dns')
@receiver(post_save, sender=Mx) @receiver(post_save, sender=Mx)
def mx_post_save(sender, **kwargs): def mx_post_save(sender, **kwargs):
"""Regeneration dns après modification d'un MX"""
regen('dns') regen('dns')
@receiver(post_delete, sender=Mx) @receiver(post_delete, sender=Mx)
def mx_post_delete(sender, **kwargs): def mx_post_delete(sender, **kwargs):
"""Regeneration dns après suppresson d'un MX"""
regen('dns') regen('dns')
@receiver(post_save, sender=Ns) @receiver(post_save, sender=Ns)
def ns_post_save(sender, **kwargs): def ns_post_save(sender, **kwargs):
"""Regeneration dns après modification d'un NS"""
regen('dns') regen('dns')
@receiver(post_delete, sender=Ns) @receiver(post_delete, sender=Ns)
def ns_post_delete(sender, **kwargs): def ns_post_delete(sender, **kwargs):
"""Regeneration dns après modification d'un NS"""
regen('dns') regen('dns')
@receiver(post_save, sender=Text) @receiver(post_save, sender=Text)
def text_post_save(sender, **kwargs): def text_post_save(sender, **kwargs):
"""Regeneration dns après modification d'un TXT"""
regen('dns') regen('dns')
@receiver(post_delete, sender=Text) @receiver(post_delete, sender=Text)
def text_post_delete(sender, **kwargs): def text_post_delete(sender, **kwargs):
"""Regeneration dns après modification d'un TX"""
regen('dns') regen('dns')