8
0
Fork 0
mirror of https://gitlab2.federez.net/re2o/re2o synced 2024-12-23 07:23:46 +00:00

Pep8 compliance on freeradius

This commit is contained in:
Maël Kervella 2018-04-13 20:11:55 +00:00
parent 0dace45c5e
commit 9d7a7fa1f5

View file

@ -1,4 +1,4 @@
# *- mode: python; coding: utf-8 -*-
# -*- mode: python; coding: utf-8 -*-
# Re2o est un logiciel d'administration développé initiallement au rezometz. Il
# se veut agnostique au réseau considéré, de manière à être installable en
# quelques clics.
@ -37,11 +37,20 @@ Inspiré du travail de Daniel Stan au Crans
import logging
import netaddr
import radiusd # Module magique freeradius (radiusd.py is dummy)
import radiusd # Module magique freeradius (radiusd.py is dummy)
import binascii
import hashlib
import os, sys
import os
import sys
from django.core.wsgi import get_wsgi_application
import argparse
from django.db.models import Q
from machines.models import Interface, IpList, Nas, Domain
from topologie.models import Room, Port, Switch
from users.models import User
from preferences.models import OptionalTopologie
proj_path = "/var/www/re2o/"
# This is so Django knows where to find stuff.
@ -52,28 +61,17 @@ sys.path.append(proj_path)
os.chdir(proj_path)
# This is so models get loaded.
from django.core.wsgi import get_wsgi_application
application = get_wsgi_application()
import argparse
from django.db.models import Q
from machines.models import Interface, IpList, Nas, Domain
from topologie.models import Room, Port, Switch
from users.models import User
from preferences.models import OptionalTopologie
options, created = OptionalTopologie.objects.get_or_create()
VLAN_NOK = options.vlan_decision_nok.vlan_id
VLAN_OK = options.vlan_decision_ok.vlan_id
#: Serveur radius de test (pas la prod)
TEST_SERVER = bool(os.getenv('DBG_FREERADIUS', False))
## -*- Logging -*-
# Logging
class RadiusdHandler(logging.Handler):
"""Handler de logs pour freeradius"""
@ -87,6 +85,7 @@ class RadiusdHandler(logging.Handler):
rad_sig = radiusd.L_DBG
radiusd.radlog(rad_sig, record.msg)
# Initialisation d'un logger (pour logguer unifié)
logger = logging.getLogger('auth.py')
logger.setLevel(logging.DEBUG)
@ -95,10 +94,11 @@ handler = RadiusdHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
def radius_event(fun):
"""Décorateur pour les fonctions d'interfaces avec radius.
Une telle fonction prend un uniquement argument, qui est une liste de tuples
(clé, valeur) et renvoie un triplet dont les composantes sont :
Une telle fonction prend un uniquement argument, qui est une liste de
tuples (clé, valeur) et renvoie un triplet dont les composantes sont :
* le code de retour (voir radiusd.RLM_MODULE_* )
* un tuple de couples (clé, valeur) pour les valeurs de réponse (accès ok
et autres trucs du genre)
@ -118,8 +118,8 @@ def radius_event(fun):
# Ex: Calling-Station-Id: "une_adresse_mac"
data[key] = value.replace('"', '')
try:
# TODO s'assurer ici que les tuples renvoyés sont bien des (str,str)
# rlm_python ne digère PAS les unicodes
# TODO s'assurer ici que les tuples renvoyés sont bien des
# (str,str) : rlm_python ne digère PAS les unicodes
return fun(data)
except Exception as err:
logger.error('Failed %r on data %r' % (err, auth_data))
@ -128,7 +128,6 @@ def radius_event(fun):
return new_f
@radius_event
def instantiate(*_):
"""Utile pour initialiser les connexions ldap une première fois (otherwise,
@ -137,12 +136,15 @@ def instantiate(*_):
if TEST_SERVER:
logger.info(u'DBG_FREERADIUS is enabled')
@radius_event
def authorize(data):
"""On test si on connait le calling nas:
- si le nas est inconnue, on suppose que c'est une requète 802.1X, on la traite
- si le nas est inconnue, on suppose que c'est une requète 802.1X, on la
traite
- si le nas est connu, on applique 802.1X si le mode est activé
- si le nas est connu et si il s'agit d'un nas auth par mac, on repond accept en authorize
- si le nas est connu et si il s'agit d'un nas auth par mac, on repond
accept en authorize
"""
# Pour les requetes proxifiees, on split
nas = data.get('NAS-IP-Address', data.get('NAS-Identifier', None))
@ -155,28 +157,35 @@ def authorize(data):
user = data.get('User-Name', '').decode('utf-8', errors='replace')
user = user.split('@', 1)[0]
mac = data.get('Calling-Station-Id', '')
result, log, password = check_user_machine_and_register(nas_type, user, mac)
result, log, password = check_user_machine_and_register(
nas_type,
user,
mac
)
logger.info(log.encode('utf-8'))
logger.info(user.encode('utf-8'))
if not result:
return radiusd.RLM_MODULE_REJECT
else:
return (radiusd.RLM_MODULE_UPDATED,
(),
(
(str("NT-Password"), str(password)),
),
return (
radiusd.RLM_MODULE_UPDATED,
(),
(
(str("NT-Password"), str(password)),
),
)
else:
return (radiusd.RLM_MODULE_UPDATED,
(),
(
("Auth-Type", "Accept"),
),
return (
radiusd.RLM_MODULE_UPDATED,
(),
(
("Auth-Type", "Accept"),
),
)
@radius_event
def post_auth(data):
nas = data.get('NAS-IP-Address', data.get('NAS-Identifier', None))
@ -187,61 +196,87 @@ def post_auth(data):
return radiusd.RLM_MODULE_OK
nas_type = Nas.objects.filter(nas_type=nas_instance.type).first()
if not nas_type:
logger.info(u"Type de nas non enregistré dans la bdd!".encode('utf-8'))
logger.info(
u"Type de nas non enregistré dans la bdd!".encode('utf-8')
)
return radiusd.RLM_MODULE_OK
mac = data.get('Calling-Station-Id', None)
# Switch et bornes héritent de machine et peuvent avoir plusieurs interfaces filles
# Switch et bornes héritent de machine et peuvent avoir plusieurs
# interfaces filles
nas_machine = nas_instance.machine
# Si il s'agit d'un switch
if hasattr(nas_machine, 'switch'):
port = data.get('NAS-Port-Id', data.get('NAS-Port', None))
#Pour les infrastructures possédant des switchs Juniper :
#On vérifie si le switch fait partie d'un stack Juniper
# Pour les infrastructures possédant des switchs Juniper :
# On vérifie si le switch fait partie d'un stack Juniper
instance_stack = nas_machine.switch.stack
if instance_stack:
# Si c'est le cas, on resélectionne le bon switch dans la stack
id_stack_member = port.split("-")[1].split('/')[0]
nas_machine = Switch.objects.filter(stack=instance_stack).filter(stack_member_id=id_stack_member).prefetch_related('interface_set__domain__extension').first()
# On récupère le numéro du port sur l'output de freeradius. La ligne suivante fonctionne pour cisco, HP et Juniper
nas_machine = (Switch.objects
.filter(stack=instance_stack)
.filter(stack_member_id=id_stack_member)
.prefetch_related(
'interface_set__domain__extension'
)
.first())
# On récupère le numéro du port sur l'output de freeradius.
# La ligne suivante fonctionne pour cisco, HP et Juniper
port = port.split(".")[0].split('/')[-1][-2:]
out = decide_vlan_and_register_switch(nas_machine, nas_type, port, mac)
sw_name, room, reason, vlan_id = out
log_message = '(fil) %s -> %s [%s%s]' % \
(sw_name + u":" + port + u"/" + unicode(room), mac, vlan_id, (reason and u': ' + reason).encode('utf-8'))
log_message = '(fil) %s -> %s [%s%s]' % (
sw_name + u":" + port + u"/" + unicode(room),
mac,
vlan_id,
(reason and u': ' + reason).encode('utf-8')
)
logger.info(log_message)
# Filaire
return (radiusd.RLM_MODULE_UPDATED,
return (
radiusd.RLM_MODULE_UPDATED,
(
("Tunnel-Type", "VLAN"),
("Tunnel-Medium-Type", "IEEE-802"),
("Tunnel-Private-Group-Id", '%d' % int(vlan_id)),
),
()
)
)
else:
return radiusd.RLM_MODULE_OK
@radius_event
def dummy_fun(_):
"""Do nothing, successfully. (C'est pour avoir un truc à mettre)"""
return radiusd.RLM_MODULE_OK
def detach(_=None):
"""Appelé lors du déchargement du module (enfin, normalement)"""
print "*** goodbye from auth.py ***"
return radiusd.RLM_MODULE_OK
def find_nas_from_request(nas_id):
nas = Interface.objects.filter(Q(domain=Domain.objects.filter(name=nas_id)) | Q(ipv4=IpList.objects.filter(ipv4=nas_id))).select_related('type').select_related('machine__switch__stack')
nas = (Interface.objects
.filter(
Q(domain=Domain.objects.filter(name=nas_id)) |
Q(ipv4=IpList.objects.filter(ipv4=nas_id))
)
.select_related('type')
.select_related('machine__switch__stack'))
return nas.first()
def check_user_machine_and_register(nas_type, username, mac_address):
""" Verifie le username et la mac renseignee. L'enregistre si elle est inconnue.
"""Verifie le username et la mac renseignee. L'enregistre si elle est
inconnue.
Renvoie le mot de passe ntlm de l'user si tout est ok
Utilise pour les authentifications en 802.1X"""
interface = Interface.objects.filter(mac_address=mac_address).first()
@ -252,7 +287,10 @@ def check_user_machine_and_register(nas_type, username, mac_address):
return (False, u"Adhérent non cotisant", '')
if interface:
if interface.machine.user != user:
return (False, u"Machine enregistrée sur le compte d'un autre user...", '')
return (False,
u"Machine enregistrée sur le compte d'un autre "
"user...",
'')
elif not interface.is_active:
return (False, u"Machine desactivée", '')
elif not interface.ipv4:
@ -264,7 +302,9 @@ def check_user_machine_and_register(nas_type, username, mac_address):
if nas_type.autocapture_mac:
result, reason = user.autoregister_machine(mac_address, nas_type)
if result:
return (True, u'Access Ok, Capture de la mac...', user.pwd_ntlm)
return (True,
u'Access Ok, Capture de la mac...',
user.pwd_ntlm)
else:
return (False, u'Erreur dans le register mac %s' % reason, '')
else:
@ -273,8 +313,10 @@ def check_user_machine_and_register(nas_type, username, mac_address):
return (False, u"Machine inconnue", '')
def decide_vlan_and_register_switch(nas_machine, nas_type, port_number, mac_address):
"""Fonction de placement vlan pour un switch en radius filaire auth par mac.
def decide_vlan_and_register_switch(nas_machine, nas_type, port_number,
mac_address):
"""Fonction de placement vlan pour un switch en radius filaire auth par
mac.
Plusieurs modes :
- nas inconnu, port inconnu : on place sur le vlan par defaut VLAN_OK
- pas de radius sur le port : VLAN_OK
@ -292,7 +334,8 @@ def decide_vlan_and_register_switch(nas_machine, nas_type, port_number, mac_addr
- interface inconnue :
- register mac désactivé : VLAN_NOK
- register mac activé :
- dans la chambre associé au port, pas d'user ou non à jour : VLAN_NOK
- dans la chambre associé au port, pas d'user ou non à
jour : VLAN_NOK
- user à jour, autocapture de la mac et VLAN_OK
"""
# Get port from switch and port number
@ -303,8 +346,13 @@ def decide_vlan_and_register_switch(nas_machine, nas_type, port_number, mac_addr
sw_name = str(nas_machine)
port = Port.objects.filter(switch=Switch.objects.filter(machine_ptr=nas_machine), port=port_number).first()
#Si le port est inconnu, on place sur le vlan defaut
port = (Port.objects
.filter(
switch=Switch.objects.filter(machine_ptr=nas_machine),
port=port_number
)
.first())
# Si le port est inconnu, on place sur le vlan defaut
if not port:
return (sw_name, "Chambre inconnue", u'Port inconnu', VLAN_OK)
@ -316,7 +364,10 @@ def decide_vlan_and_register_switch(nas_machine, nas_type, port_number, mac_addr
DECISION_VLAN = VLAN_OK
if port.radius == 'NO':
return (sw_name, "", u"Pas d'authentification sur ce port" + extra_log, DECISION_VLAN)
return (sw_name,
"",
u"Pas d'authentification sur ce port" + extra_log,
DECISION_VLAN)
if port.radius == 'BLOQ':
return (sw_name, port.room, u'Port desactive', VLAN_NOK)
@ -326,7 +377,9 @@ def decide_vlan_and_register_switch(nas_machine, nas_type, port_number, mac_addr
if not room:
return (sw_name, "Inconnue", u'Chambre inconnue', VLAN_NOK)
room_user = User.objects.filter(Q(club__room=port.room) | Q(adherent__room=port.room))
room_user = User.objects.filter(
Q(club__room=port.room) | Q(adherent__room=port.room)
)
if not room_user:
return (sw_name, room, u'Chambre non cotisante', VLAN_NOK)
for user in room_user:
@ -336,35 +389,78 @@ def decide_vlan_and_register_switch(nas_machine, nas_type, port_number, mac_addr
if port.radius == 'COMMON' or port.radius == 'STRICT':
# Authentification par mac
interface = Interface.objects.filter(mac_address=mac_address).select_related('machine__user').select_related('ipv4').first()
interface = (Interface.objects
.filter(mac_address=mac_address)
.select_related('machine__user')
.select_related('ipv4')
.first())
if not interface:
room = port.room
# On essaye de register la mac
if not nas_type.autocapture_mac:
return (sw_name, "", u'Machine inconnue', VLAN_NOK)
elif not room:
return (sw_name, "Inconnue", u'Chambre et machine inconnues', VLAN_NOK)
return (sw_name,
"Inconnue",
u'Chambre et machine inconnues',
VLAN_NOK)
else:
if not room_user:
room_user = User.objects.filter(Q(club__room=port.room) | Q(adherent__room=port.room))
room_user = User.objects.filter(
Q(club__room=port.room) | Q(adherent__room=port.room)
)
if not room_user:
return (sw_name, room, u'Machine et propriétaire de la chambre inconnus', VLAN_NOK)
return (sw_name,
room,
u'Machine et propriétaire de la chambre '
'inconnus',
VLAN_NOK)
elif room_user.count() > 1:
return (sw_name, room, u'Machine inconnue, il y a au moins 2 users dans la chambre/local -> ajout de mac automatique impossible', VLAN_NOK)
return (sw_name,
room,
u'Machine inconnue, il y a au moins 2 users '
'dans la chambre/local -> ajout de mac '
'automatique impossible',
VLAN_NOK)
elif not room_user.first().has_access():
return (sw_name, room, u'Machine inconnue et adhérent non cotisant', VLAN_NOK)
return (sw_name,
room,
u'Machine inconnue et adhérent non cotisant',
VLAN_NOK)
else:
result, reason = room_user.first().autoregister_machine(mac_address, nas_type)
result, reason = (room_user
.first()
.autoregister_machine(
mac_address,
nas_type
))
if result:
return (sw_name, room, u'Access Ok, Capture de la mac...' + extra_log, DECISION_VLAN)
return (sw_name,
room,
u'Access Ok, Capture de la mac: ' + extra_log,
DECISION_VLAN)
else:
return (sw_name, room, u'Erreur dans le register mac %s' % reason + unicode(mac_address), VLAN_NOK)
return (sw_name,
room,
u'Erreur dans le register mac %s' % (
reason + unicode(mac_address)
),
VLAN_NOK)
else:
room = port.room
if not interface.is_active:
return (sw_name, room, u'Machine non active / adherent non cotisant', VLAN_NOK)
return (sw_name,
room,
u'Machine non active / adherent non cotisant',
VLAN_NOK)
elif not interface.ipv4:
interface.assign_ipv4()
return (sw_name, room, u"Ok, Reassignation de l'ipv4" + extra_log, DECISION_VLAN)
return (sw_name,
room,
u"Ok, Reassignation de l'ipv4" + extra_log,
DECISION_VLAN)
else:
return (sw_name, room, u'Machine OK' + extra_log, DECISION_VLAN)
return (sw_name,
room,
u'Machine OK' + extra_log,
DECISION_VLAN)