8
0
Fork 0
mirror of https://gitlab2.federez.net/re2o/re2o synced 2024-11-22 03:13:12 +00:00

première version du script, pre debug

This commit is contained in:
chapeau 2020-11-28 15:42:08 +01:00
parent c213703a30
commit 51e312e4c9
4 changed files with 254 additions and 329 deletions

View file

@ -1,5 +1,5 @@
# -*- mode: python; coding: utf-8 -*-
# Re2o est un logiciel d'administration développé initiallement au Rézo Metz. Il
# Re2o est un logiciel d'administration développé initiallement au rezometz. Il
# se veut agnostique au réseau considéré, de manière à être installable en
# quelques clics.
#
@ -7,6 +7,7 @@
# Copyright © 2017 Gabriel Détraz
# Copyright © 2017 Lara Kermarec
# Copyright © 2017 Augustin Lemesle
# Copyright © 2020 Corentin Canebier
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -34,49 +35,36 @@ https://github.com/FreeRADIUS/freeradius-server/blob/master/src/modules/rlm_pyth
Inspired by Daniel Stan in Crans
"""
import os
from configparser import ConfigParser
from re2oapi import Re2oAPIClient
import sys
import os
import subprocess
import logging
import traceback
import radiusd # Magic module freeradius (radiusd.py is dummy)
import radiusd
import urllib.parse
from django.core.wsgi import get_wsgi_application
from django.db.models import Q
proj_path = "/var/www/re2o/"
# This is so Django knows where to find stuff.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "re2o.settings")
sys.path.append(proj_path)
# This is so my local_settings.py gets loaded.
os.chdir(proj_path)
# This is so models get loaded.
application = get_wsgi_application()
from machines.models import Interface, IpList, Nas, Domain
from topologie.models import Port, Switch
from users.models import User
from preferences.models import RadiusOption
api_client = None
# Logging
class RadiusdHandler(logging.Handler):
"""Logs handler for freeradius"""
"""Handler de logs pour freeradius"""
def emit(self, record):
"""Log message processing, level are converted"""
"""Process un message de log, en convertissant les niveaux"""
if record.levelno >= logging.WARN:
rad_sig = radiusd.L_ERR
elif record.levelno >= logging.INFO:
rad_sig = radiusd.L_INFO
else:
rad_sig = radiusd.L_DBG
radiusd.radlog(rad_sig, str(record.msg))
radiusd.radlog(rad_sig, record.msg.encode("utf-8"))
# Init for logging
# Initialisation d'un logger (pour logguer unifi )
logger = logging.getLogger("auth.py")
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(name)s: [%(levelname)s] %(message)s")
@ -86,15 +74,17 @@ logger.addHandler(handler)
def radius_event(fun):
"""Decorator for freeradius fonction with radius.
This function take a unique argument which is a list of tuples (key, value)
and return a tuple of 3 values which are:
* return code (see radiusd.RLM_MODULE_* )
* a tuple of 2 elements for response value (access ok , etc)
* a tuple of 2 elements for internal value to update (password for example)
"""Décorateur pour les fonctions d'interfaces avec radius.
Une telle fonction prend un uniquement argument, qui est une liste de
tuples (clé, valeur) et renvoie un triplet dont les composantes sont :
* le code de retour (voir radiusd.RLM_MODULE_* )
* un tuple de couples (clé, valeur) pour les valeurs de réponse (accès ok
et autres trucs du genre)
* un tuple de couples (clé, valeur) pour les valeurs internes à mettre à
jour (mot de passe par exemple)
Here, we convert the list of tuples into a dictionnary.
"""
On se contente avec ce décorateur (pour l'instant) de convertir la liste de
tuples en entrée en un dictionnaire."""
def new_f(auth_data):
""" The function transforming the tuples as dict """
@ -107,12 +97,15 @@ def radius_event(fun):
# Ex: Calling-Station-Id: "une_adresse_mac"
data[key] = value.replace('"', "")
try:
# TODO s'assurer ici que les tuples renvoy s sont bien des
# (str,str) : rlm_python ne dig re PAS les unicodes
return fun(data)
except Exception as err:
exc_type, exc_instance, exc_traceback = sys.exc_info()
formatted_traceback = "".join(traceback.format_tb(exc_traceback))
logger.error("Failed %r on data %r" % (err, auth_data))
logger.error("Function %r, Traceback : %r" % (fun, formatted_traceback))
logger.error("Function %r, Traceback : %r" %
(fun, formatted_traceback))
return radiusd.RLM_MODULE_FAIL
return new_f
@ -123,29 +116,40 @@ def instantiate(*_):
"""Usefull for instantiate ldap connexions otherwise,
do nothing"""
logger.info("Instantiation")
path = (os.path.dirname(os.path.abspath(__file__)))
config = ConfigParser()
config.read(path+'/config.ini')
api_hostname = config.get('Re2o', 'hostname')
api_password = config.get('Re2o', 'password')
api_username = config.get('Re2o', 'username')
global api_client
api_client = Re2oAPIClient(
api_hostname, api_username, api_password, use_tls=True)
@radius_event
def authorize(data):
"""Here, we test if the Nas is known.
- If the nas is unknown, we assume that it is a 802.1X request,
- If the nas is known, we apply the 802.1X if enabled,
- It the nas is known AND nas auth is enabled with mac address, returns
accept here"""
# For proxified request, split
# Pour les requetes proxifiees, on split
nas = data.get("NAS-IP-Address", data.get("NAS-Identifier", None))
nas_instance = find_nas_from_request(nas)
# For none proxified requests
nas_type = None
if nas_instance:
nas_type = Nas.objects.filter(nas_type=nas_instance.machine_type).first()
if not nas_type or nas_type.port_access_mode == "802.1X":
user = data.get("User-Name", "")
user = user.split("@", 1)[0]
mac = data.get("Calling-Station-Id", "")
result, log, password = check_user_machine_and_register(nas_type, user, mac)
logger.info(str(log))
logger.info(str(user))
user = data.get("User-Name", "").decode("utf-8", errors="replace")
user = user.split("@", 1)[0]
mac = data.get("Calling-Station-Id", "")
data_from_api = api_client.view(
"radius/authorize/{0}/{1}/{2}".format(nas, user, mac))
nas_type = data_from_api["nas"]
user = data_from_api["user"]
user_interface = data_from_api["user_interface"]
if nas_type and nas_type["port_access_mode"] == "802.1X":
result, log, password = check_user_machine_and_register(
nas_type, user, user_interface)
logger.info(log.encode("utf-8"))
logger.info(user.encode("utf-8"))
if not result:
return radiusd.RLM_MODULE_REJECT
@ -166,39 +170,33 @@ def post_auth(data):
"""
nas = data.get("NAS-IP-Address", data.get("NAS-Identifier", None))
nas_instance = find_nas_from_request(nas)
# All non proxified requests
if not nas_instance:
logger.info("Proxified request, nas unknown")
return radiusd.RLM_MODULE_OK
nas_type = Nas.objects.filter(nas_type=nas_instance.machine_type).first()
if not nas_type:
logger.info("This kind of nas is not registered in the database!")
return radiusd.RLM_MODULE_OK
nas_port = data.get("NAS-Port-Id", data.get("NAS-Port", None))
mac = data.get("Calling-Station-Id", None)
# Switchs and access point can have several interfaces
nas_machine = nas_instance.machine
# If it is a switchs
if hasattr(nas_machine, "switch"):
port = data.get("NAS-Port-Id", data.get("NAS-Port", None))
# If the switch is part of a stack, calling ip is different from calling switch.
instance_stack = nas_machine.switch.stack
if instance_stack:
# If it is a stack, we select the correct switch in the stack
id_stack_member = port.split("-")[1].split("/")[0]
nas_machine = (
Switch.objects.filter(stack=instance_stack)
.filter(stack_member_id=id_stack_member)
.prefetch_related("interface_set__domain__extension")
.first()
)
# Find the port number from freeradius, works both with HP, Cisco
# and juniper output
port = port.split(".")[0].split("/")[-1][-2:]
out = decide_vlan_switch(nas_machine, nas_type, port, mac)
sw_name, room, reason, vlan_id, decision, attributes = out
data_from_api = api_client.view(
"radius/post_auth/{0}/{1}/{2}".format(
urllib.parse.quote(nas),
urllib.parse.quote(nas_port),
urllib.parse.quote(mac)
))
nas_type = data_from_api["nas"]
port = data_from_api["port"]
switch = data_from_api["switch"]
if not nas_type:
logger.info("Proxified request, nas unknown")
return radiusd.RLM_MODULE_OK
# If it is a switch
if switch:
sw_name = switch["name"] or "?"
room = "Unknown port"
if port:
room = port.room or "Unknown room"
out = decide_vlan_switch(data_from_api, mac, nas_port)
reason, vlan_id, decision, attributes = out
if decision:
log_message = "(wired) %s -> %s [%s%s]" % (
@ -234,70 +232,50 @@ def post_auth(data):
return radiusd.RLM_MODULE_OK
# TODO : remove this function
@radius_event
def dummy_fun(_):
"""Do nothing, successfully. """
return radiusd.RLM_MODULE_OK
def detach(_=None):
"""Detatch the auth"""
print("*** goodbye from auth.py ***")
return radiusd.RLM_MODULE_OK
def find_nas_from_request(nas_id):
""" Get the nas object from its ID """
nas = (
Interface.objects.filter(
Q(domain=Domain.objects.filter(name=nas_id))
| Q(ipv4=IpList.objects.filter(ipv4=nas_id))
)
.select_related("machine_type")
.select_related("machine__switch__stack")
)
return nas.first()
def check_user_machine_and_register(nas_type, username, mac_address):
def check_user_machine_and_register(nas_type, user, user_interface):
"""Check if username and mac are registered. Register it if unknown.
Return the user ntlm password if everything is ok.
Used for 802.1X auth"""
interface = Interface.objects.filter(mac_address=mac_address).first()
user = User.objects.filter(pseudo__iexact=username).first()
if not user:
return (False, "User unknown", "")
if not user.has_access():
if not user["access"]:
return (False, "Invalid connexion (non-contributing user)", "")
if interface:
if interface.machine.user != user:
if user_interface:
if user_interface["user_pk"] != user["pk"]:
return (
False,
"Mac address registered on another user account",
"",
)
elif not interface.is_active:
elif not user_interface["active"]:
return (False, "Interface/Machine disabled", "")
elif not interface.ipv4:
interface.assign_ipv4()
return (True, "Ok, new ipv4 assignement...", user.pwd_ntlm)
elif not user_interface["ipv4"]:
# interface.assign_ipv4()
return (True, "Ok, new ipv4 assignement...", user.get("pwd_ntlm", ""))
else:
return (True, "Access ok", user.pwd_ntlm)
return (True, "Access ok", user.get("pwd_ntlm", ""))
elif nas_type:
if nas_type.autocapture_mac:
result, reason = user.autoregister_machine(mac_address, nas_type)
if result:
return (True, "Access Ok, Registering mac...", user.pwd_ntlm)
else:
return (False, "Error during mac register %s" % reason, "")
if nas_type["autocapture_mac"]:
# result, reason = user.autoregister_machine(mac_address, nas_type)
# if result:
# return (True, "Access Ok, Registering mac...", user.pwd_ntlm)
# else:
# return (False, "Error during mac register %s" % reason, "")
return (False, "L'auto capture est désactivée", "")
else:
return (False, "Unknown interface/machine", "")
else:
return (False, "Unknown interface/machine", "")
def decide_vlan_switch(nas_machine, nas_type, port_number, mac_address):
def set_radius_attributes_values(attributes, values):
return (
(str(attribute.attribute), str(attribute.value % values))
for attribute in attributes
)
def decide_vlan_switch(data_from_api, user_mac, nas_port):
"""Function for selecting vlan for a switch with wired mac auth radius.
Several modes are available :
- all modes:
@ -330,64 +308,55 @@ def decide_vlan_switch(nas_machine, nas_type, port_number, mac_address):
- decision (bool)
- Other Attributs (attribut:str, operator:str, value:str)
"""
nas_type = data_from_api["nas"]
room_users = data_from_api["room_users"]
port = data_from_api["port"]
port_profile = data_from_api["port_profile"]
switch = data_from_api["switch"]
user_interface = data_from_api["user_interface"]
radius_option = data_from_api["radius_option"]
EMAIL_STATE_UNVERIFIED = data_from_api["EMAIL_STATE_UNVERIFIED"]
RADIUS_OPTION_REJECT = data_from_api["RADIUS_OPTION_REJECT"]
USER_STATE_ACTIVE = data_from_api["USER_STATE_ACTIVE"]
attributes_kwargs = {
"client_mac": str(mac_address),
"switch_port": str(port_number),
"client_mac": str(user_mac),
"switch_port": str(nas_port.split(".")[0].split("/")[-1][-2:]),
"switch_ip": str(switch.ipv4)
}
# Get port from switch and port number
extra_log = ""
# If NAS is unknown, go to default vlan
if not nas_machine:
return (
"?",
"Unknown room",
"Unknown NAS",
RadiusOption.get_cached_value("vlan_decision_ok").vlan_id,
True,
RadiusOption.get_attributes("ok_attributes", attributes_kwargs),
)
sw_name = str(getattr(nas_machine, "short_name", str(nas_machine)))
switch = Switch.objects.filter(machine_ptr=nas_machine).first()
attributes_kwargs["switch_ip"] = str(switch.ipv4)
port = Port.objects.filter(switch=switch, port=port_number).first()
# If the port is unknwon, go to default vlan
# We don't have enought information to make a better decision
if not port:
if not port or not port_profile:
return (
sw_name,
"Unknown port",
"PUnknown port",
getattr(
RadiusOption.get_cached_value("unknown_port_vlan"), "vlan_id", None
),
RadiusOption.get_cached_value("unknown_port") != RadiusOption.REJECT,
RadiusOption.get_attributes("unknown_port_attributes", attributes_kwargs),
radius_option["unknown_port_vlan"] and radius_option["unknown_port_vlan"]["vlan_id"] or None,
radius_option["unknown_port"] != RADIUS_OPTION_REJECT,
set_radius_attributes_values(
radius_option["unknown_port_attributes"], attributes_kwargs),
)
# Retrieve port profile
port_profile = port.get_port_profile
# If a vlan is precised in port config, we use it
if port_profile.vlan_untagged:
DECISION_VLAN = int(port_profile.vlan_untagged.vlan_id)
if port_profile["vlan_untagged"]:
DECISION_VLAN = int(port_profile["vlan_untagged"]["vlan_id"])
extra_log = "Force sur vlan " + str(DECISION_VLAN)
attributes = ()
else:
DECISION_VLAN = RadiusOption.get_cached_value("vlan_decision_ok").vlan_id
attributes = RadiusOption.get_attributes("ok_attributes", attributes_kwargs)
DECISION_VLAN = radius_option["vlan_decision_ok"]["vlan_id"]
attributes = set_radius_attributes_values(
radius_option["ok_attributes"], attributes_kwargs)
# If the port is disabled in re2o, REJECT
if not port.state:
return (sw_name, port.room, "Port disabled", None, False, ())
if not port["state"]:
return ("Port disabled", None, False, ())
# If radius is disabled, decision is OK
if port_profile.radius_type == "NO":
if port_profile["radius_type"] == "NO":
return (
sw_name,
"",
"No Radius auth enabled on this port" + extra_log,
DECISION_VLAN,
True,
@ -396,11 +365,8 @@ def decide_vlan_switch(nas_machine, nas_type, port_number, mac_address):
# If 802.1X is enabled, people has been previously accepted.
# Go to the decision vlan
if (nas_type.port_access_mode, port_profile.radius_type) == ("802.1X", "802.1X"):
room = port.room or "Room unknown"
if (nas_type["port_access_mode"], port_profile["radius_type"]) == ("802.1X", "802.1X"):
return (
sw_name,
room,
"Accept authentication 802.1X",
DECISION_VLAN,
True,
@ -411,166 +377,101 @@ def decide_vlan_switch(nas_machine, nas_type, port_number, mac_address):
# If strict mode is enabled, we check every user related with this port. If
# one user or more is not enabled, we reject to prevent from sharing or
# spoofing mac.
if port_profile.radius_mode == "STRICT":
room = port.room
if not room:
if port_profile["radius_mode"] == "STRICT":
if not port["room"]:
return (
sw_name,
"Unknown",
"Unkwown room",
getattr(
RadiusOption.get_cached_value("unknown_room_vlan"), "vlan_id", None
),
RadiusOption.get_cached_value("unknown_room") != RadiusOption.REJECT,
RadiusOption.get_attributes(
"unknown_room_attributes", attributes_kwargs
),
radius_option["unknown_room_vlan"] and radius_option["unknown_room_vlan"]["vlan_id"] or None,
radius_option["unknown_room"] != RADIUS_OPTION_REJECT,
set_radius_attributes_values(
radius_option["unknown_room_attributes"], attributes_kwargs),
)
room_user = User.objects.filter(
Q(club__room=port.room) | Q(adherent__room=port.room)
)
if not room_user:
if not room_users:
return (
sw_name,
room,
"Non-contributing room",
getattr(
RadiusOption.get_cached_value("non_member_vlan"), "vlan_id", None
),
RadiusOption.get_cached_value("non_member") != RadiusOption.REJECT,
RadiusOption.get_attributes("non_member_attributes", attributes_kwargs),
radius_option["non_member_vlan"] and radius_option["non_member_vlan"]["vlan_id"] or None,
radius_option["non_member"] != RADIUS_OPTION_REJECT,
set_radius_attributes_values(
radius_option["non_member_attributes"], attributes_kwargs),
)
all_user_ban = True
at_least_one_active_user = False
for user in room_users:
if not user["is_ban"] and user["state"] == USER_STATE_ACTIVE:
all_user_ban = False
elif user["email_state"] != EMAIL_STATE_UNVERIFIED and (user["is_connected"] or user["is_whitelisted"]):
at_least_one_active_user = True
if all_user_ban:
return (
"User is banned or disabled",
radius_option["banned_vlan"] and radius_option["banned_vlan"]["vlan_id"] or None,
radius_option["banned"] != RADIUS_OPTION_REJECT,
set_radius_attributes_values(
radius_option["banned_attributes"], attributes_kwargs),
)
if not at_least_one_active_user:
return (
"Non-contributing member or unconfirmed mail",
radius_option["non_member_vlan"] and radius_option["non_member_vlan"]["vlan_id"] or None,
radius_option["non_member"] != RADIUS_OPTION_REJECT,
set_radius_attributes_values(
radius_option["non_member_attributes"], attributes_kwargs),
)
for user in room_user:
if user.is_ban() or user.state != User.STATE_ACTIVE:
return (
sw_name,
room,
"User is banned or disabled",
getattr(
RadiusOption.get_cached_value("banned_vlan"), "vlan_id", None
),
RadiusOption.get_cached_value("banned") != RadiusOption.REJECT,
RadiusOption.get_attributes("banned_attributes", attributes_kwargs),
)
elif user.email_state == User.EMAIL_STATE_UNVERIFIED:
return (
sw_name,
room,
"User is suspended (mail has not been confirmed)",
getattr(
RadiusOption.get_cached_value("non_member_vlan"),
"vlan_id",
None,
),
RadiusOption.get_cached_value("non_member") != RadiusOption.REJECT,
RadiusOption.get_attributes(
"non_member_attributes", attributes_kwargs
),
)
elif not (user.is_connected() or user.is_whitelisted()):
return (
sw_name,
room,
"Non-contributing member",
getattr(
RadiusOption.get_cached_value("non_member_vlan"),
"vlan_id",
None,
),
RadiusOption.get_cached_value("non_member") != RadiusOption.REJECT,
RadiusOption.get_attributes(
"non_member_attributes", attributes_kwargs
),
)
# else: user OK, so we check MAC now
# If we are authenticating with mac, we look for the interfaces and its mac address
if port_profile.radius_mode == "COMMON" or port_profile.radius_mode == "STRICT":
# Mac auth
interface = (
Interface.objects.filter(mac_address=mac_address)
.select_related("machine__user")
.select_related("ipv4")
.first()
)
# If mac is unknown,
if not interface:
room = port.room
if not user_interface:
# We try to register mac, if autocapture is enabled
# Final decision depend on RADIUSOption set in re2o
if nas_type.autocapture_mac:
if nas_type["autocapture_mac"]:
return (
sw_name,
room,
"Unknown mac/interface",
getattr(
RadiusOption.get_cached_value("unknown_machine_vlan"),
"vlan_id",
None,
),
RadiusOption.get_cached_value("unknown_machine")
!= RadiusOption.REJECT,
RadiusOption.get_attributes(
"unknown_machine_attributes", attributes_kwargs
),
radius_option["unknown_machine_vlan"] and radius_option["unknown_machine_vlan"]["vlan_id"] or None,
radius_option["unknown_machine"] != RADIUS_OPTION_REJECT,
set_radius_attributes_values(
radius_option["unknown_machine_attributes"], attributes_kwargs),
)
# Otherwise, if autocapture mac is not enabled,
else:
return (
sw_name,
"",
"Unknown mac/interface",
getattr(
RadiusOption.get_cached_value("unknown_machine_vlan"),
"vlan_id",
None,
),
RadiusOption.get_cached_value("unknown_machine")
!= RadiusOption.REJECT,
RadiusOption.get_attributes(
"unknown_machine_attributes", attributes_kwargs
),
radius_option["unknown_machine_vlan"] and radius_option["unknown_machine_vlan"]["vlan_id"] or None,
radius_option["unknown_machine"] != RADIUS_OPTION_REJECT,
set_radius_attributes_values(
radius_option["unknown_machine_attributes"], attributes_kwargs),
)
# Mac/Interface is found, check if related user is contributing and ok
# If needed, set ipv4 to it
else:
room = port.room
if interface.machine.user.is_ban():
if user_interface["is_ban"]:
return (
sw_name,
room,
"Banned user",
getattr(
RadiusOption.get_cached_value("banned_vlan"), "vlan_id", None
),
RadiusOption.get_cached_value("banned") != RadiusOption.REJECT,
RadiusOption.get_attributes("banned_attributes", attributes_kwargs),
radius_option["banned_vlan"] and radius_option["banned_vlan"]["vlan_id"] or None,
radius_option["banned"] != RADIUS_OPTION_REJECT,
set_radius_attributes_values(
radius_option["banned_attributes"], attributes_kwargs),
)
if not interface.is_active:
if not user_interface["active"]:
return (
sw_name,
room,
"Disabled interface / non-contributing member",
getattr(
RadiusOption.get_cached_value("non_member_vlan"),
"vlan_id",
None,
),
RadiusOption.get_cached_value("non_member") != RadiusOption.REJECT,
RadiusOption.get_attributes(
"non_member_attributes", attributes_kwargs
),
radius_option["non_member_vlan"] and radius_option["non_member_vlan"]["vlan_id"] or None,
radius_option["non_member"] != RADIUS_OPTION_REJECT,
set_radius_attributes_values(
radius_option["non_member_attributes"], attributes_kwargs),
)
# If settings is set to related interface vlan policy based on interface type:
if RadiusOption.get_cached_value("radius_general_policy") == "MACHINE":
DECISION_VLAN = interface.machine_type.ip_type.vlan.vlan_id
if not interface.ipv4:
interface.assign_ipv4()
if radius_option["radius_general_policy"] == "MACHINE":
DECISION_VLAN = user_interface["vlan_id"]
if not user_interface["ipv4"]:
# interface.assign_ipv4()
return (
sw_name,
room,
"Ok, assigning new ipv4" + extra_log,
DECISION_VLAN,
True,
@ -578,8 +479,6 @@ def decide_vlan_switch(nas_machine, nas_type, port_number, mac_address):
)
else:
return (
sw_name,
room,
"Interface OK" + extra_log,
DECISION_VLAN,
True,

View file

@ -1,3 +1,24 @@
# -*- mode: python; coding: utf-8 -*-
# Re2o est un logiciel d'administration développé initiallement au Rézo Metz. Il
# se veut agnostique au réseau considéré, de manière à être installable en
# quelques clics.
#
# Copyright © 2020 Corentin Canebier
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from rest_framework import serializers
import machines.models as machines
@ -6,13 +27,15 @@ from api.serializers import NamespacedHMSerializer
from rest_framework.serializers import Serializer
class Ipv4Serializer(Serializer):
ipv4 = serializers.CharField()
class InterfaceSerializer(Serializer):
mac_address = serializers.CharField()
ipv4 = serializers.CharField(source="ipv4.ipv4")
ipv4 = Ipv4Serializer()
active = serializers.BooleanField(source="is_active")
user_pk = serializers.CharField(source="machine.user.pk")
# machine_type_pk = serializers.CharField(source="machine_type.pk")
# switch_stack = serializers.CharField(source="machine.switch.stack")
machine_short_name = serializers.CharField(source="machine.short_name")
is_ban = serializers.BooleanField(source="machine.user.is_ban")
vlan_id = serializers.IntegerField(
@ -96,3 +119,4 @@ class PostAuthResponseSerializer(Serializer):
radius_option = RadiusOptionSerializer()
EMAIL_STATE_UNVERIFIED = serializers.IntegerField()
RADIUS_OPTION_REJECT = serializers.CharField()
USER_STATE_ACTIVE = serializers.CharField()

View file

@ -1,5 +1,5 @@
# -*- mode: python; coding: utf-8 -*-
# Re2o est un logiciel d'administration développé initiallement au rezometz. Il
# Re2o est un logiciel d'administration développé initiallement au Rézo Metz. Il
# se veut agnostique au réseau considéré, de manière à être installable en
# quelques clics.
#
@ -21,15 +21,9 @@
from . import views
urls_view = [
# (r"radius/nas-interface-from-id/(?P<nas_id>.+)$", views.nas_from_id_view),
# (r"radius/nas-from-machine-type/(?P<machine_type>.+)$", views.NasFromMachineTypeView),
# (r"radius/user-from-username/(?P<username>.+)$", views.UserFromUsernameView),
# (r"radius/interface-from-mac-address/(?P<mac_address>.+)$", views.InterfaceFromMacAddressView),
]
urls_functional_view = [
(r"radius/authorize/(?P<nas_id>[^/]+)/(?P<username>.+)/(?P<mac_address>[0-9a-fA-F:\-]{17})$",
(r"radius/authorize/(?P<nas_id>[^/]+)/(?P<username>.+)/(?P<mac_address>[0-9a-fA-F\:\-]{17})$",
views.authorize, None),
(r"radius/post_auth/(?P<nas_id>[^/]+)/(?P<nas_port>.+)/(?P<user_mac>[0-9a-fA-F:\-]{17})$",
(r"radius/post_auth/(?P<nas_id>[^/]+)/(?P<nas_port>.+)/(?P<user_mac>[0-9a-fA-F\:\-]{17})$",
views.post_auth, None),
]

View file

@ -1,5 +1,5 @@
# -*- mode: python; coding: utf-8 -*-
# Re2o est un logiciel d'administration développé initiallement au rezometz. Il
# Re2o est un logiciel d'administration développé initiallement au Rézo Metz. Il
# se veut agnostique au réseau considéré, de manière à être installable en
# quelques clics.
#
@ -62,7 +62,7 @@ def authorize(request, nas_id, username, mac_address):
class PostAuthResponse:
def __init__(self, nas, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT):
def __init__(self, nas, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT, USER_STATE_ACTIVE):
self.nas = nas
self.room_users = room_users
self.port = port
@ -72,6 +72,7 @@ class PostAuthResponse:
self.radius_option = radius_option
self.EMAIL_STATE_UNVERIFIED = EMAIL_STATE_UNVERIFIED
self.RADIUS_OPTION_REJECT = RADIUS_OPTION_REJECT
self.USER_STATE_ACTIVE = USER_STATE_ACTIVE
def can_view(self, user):
return [True]
@ -84,29 +85,33 @@ def post_auth(request, nas_id, nas_port, user_mac):
Q(domain=Domain.objects.filter(name=nas_id))
| Q(ipv4=IpList.objects.filter(ipv4=nas_id))
).first()
print(nas_id)
nas_type = None
if nas_interface:
nas_type = Nas.objects.filter(
nas_type=nas_interface.machine_type).first()
# get switch
switch = Switch.objects.filter(machine_ptr=nas_interface.machine).first()
if hasattr(nas_interface.machine, "switch"):
stack = nas_interface.machine.switch.stack
if stack:
id_stack_member = nas_port.split("-")[1].split("/")[0]
switch = (
Switch.objects.filter(stack=stack)
.filter(stack_member_id=id_stack_member)
.first()
)
switch = None
if nas_interface:
switch = Switch.objects.filter(
machine_ptr=nas_interface.machine).first()
if hasattr(nas_interface.machine, "switch"):
stack = nas_interface.machine.switch.stack
if stack:
id_stack_member = nas_port.split("-")[1].split("/")[0]
switch = (
Switch.objects.filter(stack=stack)
.filter(stack_member_id=id_stack_member)
.first()
)
# get port
port_number = nas_port.split(".")[0].split("/")[-1][-2:]
port = Port.objects.filter(switch=switch, port=port_number).first()
port_profile = port.get_port_profile
port_profile = None
if port:
port_profile = port.get_port_profile
# get user_interface
user_interface = (
@ -117,9 +122,11 @@ def post_auth(request, nas_id, nas_port, user_mac):
)
# get room users
room_users = User.objects.filter(
Q(club__room=port.room) | Q(adherent__room=port.room)
)
room_users = []
if port:
room_users = User.objects.filter(
Q(club__room=port.room) | Q(adherent__room=port.room)
)
# get radius options
radius_option = RadiusOption.objects.first()
@ -127,7 +134,8 @@ def post_auth(request, nas_id, nas_port, user_mac):
EMAIL_STATE_UNVERIFIED = User.EMAIL_STATE_UNVERIFIED
RADIUS_OPTION_REJECT = RadiusOption.REJECT
USER_STATE_ACTIVE = User.STATE_ACTIVE
serialized = serializers.PostAuthResponseSerializer(
PostAuthResponse(nas_type, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT))
PostAuthResponse(nas_type, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT, USER_STATE_ACTIVE))
return Response(data=serialized.data)