From ce862c2853ad25d1a7d4caf79f0414e6c0e085d0 Mon Sep 17 00:00:00 2001 From: chapeau Date: Thu, 13 May 2021 19:30:52 +0200 Subject: [PATCH] merge --- freeradius_utils/auth.py | 222 +++++++++++++++++++++----------------- radius/api/serializers.py | 6 ++ radius/api/urls.py | 3 +- radius/api/views.py | 68 ++++++++++-- 4 files changed, 189 insertions(+), 110 deletions(-) diff --git a/freeradius_utils/auth.py b/freeradius_utils/auth.py index ce947bea..328d2cd0 100644 --- a/freeradius_utils/auth.py +++ b/freeradius_utils/auth.py @@ -61,10 +61,10 @@ from topologie.models import Port, Switch from users.models import User class RadiusdHandler(logging.Handler): - """Handler de logs pour freeradius""" + """Logs handler for freeradius""" def emit(self, record): - """Process un message de log, en convertissant les niveaux""" + """Log message processing, level are converted""" if record.levelno >= logging.WARN: rad_sig = radiusd.L_ERR elif record.levelno >= logging.INFO: @@ -84,17 +84,15 @@ logger.addHandler(handler) def radius_event(fun): - """Décorateur pour les fonctions d'interfaces avec radius. - Une telle fonction prend un uniquement argument, qui est une liste de - tuples (clé, valeur) et renvoie un triplet dont les composantes sont : - * le code de retour (voir radiusd.RLM_MODULE_* ) - * un tuple de couples (clé, valeur) pour les valeurs de réponse (accès ok - et autres trucs du genre) - * un tuple de couples (clé, valeur) pour les valeurs internes à mettre à - jour (mot de passe par exemple) + """Decorator for freeradius fonction with radius. + This function take a unique argument which is a list of tuples (key, value) + and return a tuple of 3 values which are: + * return code (see radiusd.RLM_MODULE_* ) + * a tuple of 2 elements for response value (access ok , etc) + * a tuple of 2 elements for internal value to update (password for example) - On se contente avec ce décorateur (pour l'instant) de convertir la liste de - tuples en entrée en un dictionnaire.""" + Here, we convert the list of tuples into a dictionnary. + """ def new_f(auth_data): """ The function transforming the tuples as dict """ @@ -123,10 +121,23 @@ def radius_event(fun): @radius_event def instantiate(*_): - """Usefull for instantiate ldap connexions otherwise, - do nothing""" + """Instantiate api connection + """ logger.info("Instantiation") + path = (os.path.dirname(os.path.abspath(__file__))) + + config = ConfigParser() + config.read(path+'/config.ini') + + api_hostname = config.get('Re2o', 'hostname') + api_password = config.get('Re2o', 'password') + api_username = config.get('Re2o', 'username') + + global api_client + api_client = Re2oAPIClient( + api_hostname, api_username, api_password, use_tls=True) + @radius_event def authorize(data): @@ -178,8 +189,9 @@ def post_auth(data): logger.info("Proxified request, nas unknown") return radiusd.RLM_MODULE_OK - # If it is a switch + # If the request is from a switch (wired connection) if switch: + # For logging sw_name = switch["name"] or "?" room = "Unknown port" if port: @@ -197,7 +209,7 @@ def post_auth(data): ) logger.info(log_message) - # Wired connexion + # Apply vlan from decide_vlan_switch return ( radiusd.RLM_MODULE_UPDATED, ( @@ -218,6 +230,7 @@ def post_auth(data): return (radiusd.RLM_MODULE_REJECT, tuple(attributes), ()) + # Else it is from wifi else: return radiusd.RLM_MODULE_OK @@ -225,11 +238,16 @@ def post_auth(data): def check_user_machine_and_register(nas_type, user, user_interface, nas_id, username, mac_address): """Check if username and mac are registered. Register it if unknown. Return the user ntlm password if everything is ok. - Used for 802.1X auth""" + Used for 802.1X auth + """ + if not user: + # No username provided return (False, "User unknown", "") + if not user["access"]: return (False, "Invalid connexion (non-contributing user)", "") + if user_interface: if user_interface["user_pk"] != user["pk"]: return ( @@ -237,9 +255,12 @@ def check_user_machine_and_register(nas_type, user, user_interface, nas_id, user "Mac address registered on another user account", "", ) + elif not user_interface["active"]: return (False, "Interface/Machine disabled", "") + elif not user_interface["ipv4"]: + # Try to autoassign ip try: api_client.view( "radius/assign_ip/{0}".format( @@ -250,7 +271,9 @@ def check_user_machine_and_register(nas_type, user, user_interface, nas_id, user return (False, "Error during ip assignement %s" % err.response.text, "") else: return (True, "Access ok", user.get("pwd_ntlm", "")) + elif nas_type: + # The interface is not yet registred, try to autoregister if enabled if nas_type["autocapture_mac"]: try: api_client.view( @@ -262,7 +285,7 @@ def check_user_machine_and_register(nas_type, user, user_interface, nas_id, user return (True, "Access Ok, Registering mac...", user["pwd_ntlm"]) except HTTPError as err: return (False, "Error during mac register %s" % err.response.text, "") - return (False, "L'auto capture est désactivée", "") + return (False, "Autoregistering is disabled", "") else: return (False, "Unknown interface/machine", "") else: @@ -270,6 +293,7 @@ def check_user_machine_and_register(nas_type, user, user_interface, nas_id, user def set_radius_attributes_values(attributes, values): + """Set values of parameters in radius attributes""" return ( (str(attribute["attribute"]), str(attribute["value"] % values)) for attribute in attributes @@ -278,12 +302,14 @@ def set_radius_attributes_values(attributes, values): def decide_vlan_switch(data_from_api, user_mac, nas_port): """Function for selecting vlan for a switch with wired mac auth radius. - Several modes are available : + Two modes exist : in strict mode, a registered user cannot connect with + their machines in a non-registered user room + Sequentially : - all modes: - - unknown NAS : VLAN_OK, - - unknown port : Decision set in Re2o RadiusOption - - No radius on this port : VLAN_OK - - force : returns vlan provided by the database + - unknown NAS : VLAN_OK, + - unknown port : Decision set in Re2o RadiusOption + - No radius on this port : VLAN_OK + - force : replace VLAN_OK with vlan provided by the database - mode strict: - no room : Decision set in Re2o RadiusOption, - no user in this room : Reject, @@ -293,23 +319,22 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port): - mode common : - mac-address already registered: - related user non contributor / interface disabled: - Decision set in Re2o RadiusOption + Decision set in Re2o RadiusOption - related user is banned: - Decision set in Re2o RadiusOption + Decision set in Re2o RadiusOption - user contributing : VLAN_OK (can assign ipv4 if needed) - unknown interface : - register mac disabled : Decision set in Re2o RadiusOption - - register mac enabled : redirect to webauth + - register mac enabled : redirect to webauth (not implemented) Returns: tuple with : - - Switch name (str) - - Room (str) - Reason of the decision (str) - vlan_id (int) - decision (bool) - - Other Attributs (attribut:str, operator:str, value:str) + - Other Attributs (attribut:str, value:str) """ + # Get values from api nas_type = data_from_api["nas"] room_users = data_from_api["room_users"] port = data_from_api["port"] @@ -321,13 +346,14 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port): RADIUS_OPTION_REJECT = data_from_api["RADIUS_OPTION_REJECT"] USER_STATE_ACTIVE = data_from_api["USER_STATE_ACTIVE"] + # Values which can be used as parameters in radius attributes attributes_kwargs = { "client_mac": str(user_mac), + # magic split "switch_port": str(nas_port.split(".")[0].split("/")[-1][-2:]), "switch_ip": str(switch["ipv4"]) } - # Get port from switch and port number extra_log = "" # If NAS is unknown, go to default vlan if not nas_machine: @@ -346,8 +372,7 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port): attributes_kwargs["switch_ip"] = str(switch.ipv4) port = Port.objects.filter(switch=switch, port=port_number).first() - # If the port is unknwon, go to default vlan - # We don't have enought information to make a better decision + # If the port is unknown, do as in RadiusOption if not port or not port_profile: return ( "Unknown port", @@ -395,7 +420,7 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port): # Otherwise, we are in mac radius. # If strict mode is enabled, we check every user related with this port. If - # one user or more is not enabled, we reject to prevent from sharing or + # all users and clubs are disabled, we reject to prevent from sharing or # spoofing mac. if port_profile["radius_mode"] == "STRICT": if not port["room"]: @@ -443,76 +468,75 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port): ) # else: user OK, so we check MAC now - # If we are authenticating with mac, we look for the interfaces and its mac address - if port_profile["radius_mode"] == "COMMON" or port_profile["radius_mode"] == "STRICT": - # If mac is unknown, - if not user_interface: - # We try to register mac, if autocapture is enabled - # Final decision depend on RADIUSOption set in re2o - if nas_type["autocapture_mac"]: - return ( - "Unknown mac/interface", - radius_option["unknown_machine_vlan"] and radius_option["unknown_machine_vlan"]["vlan_id"] or None, - radius_option["unknown_machine"] != RADIUS_OPTION_REJECT, - set_radius_attributes_values( - radius_option["unknown_machine_attributes"], attributes_kwargs), - ) - # Otherwise, if autocapture mac is not enabled, - else: - return ( - "Unknown mac/interface", - radius_option["unknown_machine_vlan"] and radius_option["unknown_machine_vlan"]["vlan_id"] or None, - radius_option["unknown_machine"] != RADIUS_OPTION_REJECT, - set_radius_attributes_values( - radius_option["unknown_machine_attributes"], attributes_kwargs), - ) - - # Mac/Interface is found, check if related user is contributing and ok - # If needed, set ipv4 to it + # If mac is unknown, + if not user_interface: + # We try to register mac, if autocapture is enabled + # Final decision depend on RADIUSOption set in re2o + # Something is not implemented here... + if nas_type["autocapture_mac"]: + return ( + "Unknown mac/interface", + radius_option["unknown_machine_vlan"] and radius_option["unknown_machine_vlan"]["vlan_id"] or None, + radius_option["unknown_machine"] != RADIUS_OPTION_REJECT, + set_radius_attributes_values( + radius_option["unknown_machine_attributes"], attributes_kwargs), + ) + # Otherwise, if autocapture mac is not enabled, else: - if user_interface["is_ban"]: - return ( - "Banned user", - radius_option["banned_vlan"] and radius_option["banned_vlan"]["vlan_id"] or None, - radius_option["banned"] != RADIUS_OPTION_REJECT, - set_radius_attributes_values( - radius_option["banned_attributes"], attributes_kwargs), - ) - if not user_interface["active"]: - return ( - "Disabled interface / non-contributing member", - radius_option["non_member_vlan"] and radius_option["non_member_vlan"]["vlan_id"] or None, - radius_option["non_member"] != RADIUS_OPTION_REJECT, - set_radius_attributes_values( - radius_option["non_member_attributes"], attributes_kwargs), - ) - # If settings is set to related interface vlan policy based on interface type: - if radius_option["radius_general_policy"] == "MACHINE": - DECISION_VLAN = user_interface["vlan_id"] - if not user_interface["ipv4"]: - try: - api_client.view( - "radius/assign_ip/{0}".format( - urllib.parse.quote(user_mac or "None", safe="") - )) - return ( - "Ok, assigning new ipv4" + extra_log, - DECISION_VLAN, - True, - attributes, - ) - except HTTPError as err: - return ( - "Error during ip assignement %s" % err.response.text + extra_log, - DECISION_VLAN, - True, - attributes, - ) + return ( + "Unknown mac/interface", + radius_option["unknown_machine_vlan"] and radius_option["unknown_machine_vlan"]["vlan_id"] or None, + radius_option["unknown_machine"] != RADIUS_OPTION_REJECT, + set_radius_attributes_values( + radius_option["unknown_machine_attributes"], attributes_kwargs), + ) - else: + # Mac/Interface is found, check if related user is contributing and ok + # If needed, set ipv4 to it + else: + if user_interface["is_ban"]: + return ( + "Banned user", + radius_option["banned_vlan"] and radius_option["banned_vlan"]["vlan_id"] or None, + radius_option["banned"] != RADIUS_OPTION_REJECT, + set_radius_attributes_values( + radius_option["banned_attributes"], attributes_kwargs), + ) + if not user_interface["active"]: + return ( + "Disabled interface / non-contributing member", + radius_option["non_member_vlan"] and radius_option["non_member_vlan"]["vlan_id"] or None, + radius_option["non_member"] != RADIUS_OPTION_REJECT, + set_radius_attributes_values( + radius_option["non_member_attributes"], attributes_kwargs), + ) + # If settings is set to related interface vlan policy based on interface type: + if radius_option["radius_general_policy"] == "MACHINE": + DECISION_VLAN = user_interface["vlan_id"] + if not user_interface["ipv4"]: + try: + api_client.view( + "radius/assign_ip/{0}".format( + urllib.parse.quote(user_mac or "None", safe="") + )) return ( - "Interface OK" + extra_log, + "Ok, assigning new ipv4" + extra_log, DECISION_VLAN, True, attributes, ) + except HTTPError as err: + return ( + "Error during ip assignement %s" % err.response.text + extra_log, + DECISION_VLAN, + True, + attributes, + ) + + else: + return ( + "Interface OK" + extra_log, + DECISION_VLAN, + True, + attributes, + ) diff --git a/radius/api/serializers.py b/radius/api/serializers.py index 33b4127a..867f7a54 100644 --- a/radius/api/serializers.py +++ b/radius/api/serializers.py @@ -105,12 +105,18 @@ class RadiusOptionSerializer(Serializer): class AuthorizeResponseSerializer(Serializer): + """Serializer for AuthorizeResponse objects + See views.py for the declaration of AuthorizeResponse + """ nas = NasSerializer(read_only=True) user = UserSerializer(read_only=True) user_interface = InterfaceSerializer(read_only=True) class PostAuthResponseSerializer(Serializer): + """Serializer for PostAuthResponse objects + See views.py for the declaration of PostAuthResponse + """ nas = NasSerializer(read_only=True) room_users = UserSerializer(many=True) port = PortSerializer() diff --git a/radius/api/urls.py b/radius/api/urls.py index 528e015b..d24b89eb 100644 --- a/radius/api/urls.py +++ b/radius/api/urls.py @@ -28,6 +28,5 @@ urls_functional_view = [ views.post_auth, None), (r"radius/autoregister/(?P[^/]+)/(?P.+)/(?P[0-9a-fA-F\:\-]{17})$", views.autoregister_machine, None), - (r"radius/assign_ip/(?P[0-9a-fA-F\:\-]{17})$", - views.assign_ip, None), + (r"radius/assign_ip/(?P[0-9a-fA-F\:\-]{17})$", views.assign_ip, None), ] diff --git a/radius/api/views.py b/radius/api/views.py index 72576348..7ace6013 100644 --- a/radius/api/views.py +++ b/radius/api/views.py @@ -33,18 +33,23 @@ from topologie.models import Port, Switch class AuthorizeResponse: + """Contains objects the radius needs for the Authorize step + """ + def __init__(self, nas, user, user_interface): self.nas = nas self.user = user self.user_interface = user_interface def can_view(self, user): + """Temp method to bypass ACL + """ return [True] @api_view(['GET']) def authorize(request, nas_id, username, mac_address): - """Return objects the radius need for the Authorize step + """Return objects the radius needs for the Authorize step Parameters: nas_id (string): NAS name or ipv4 @@ -52,9 +57,10 @@ def authorize(request, nas_id, username, mac_address): mac_address (string): mac address of the device which is trying to connect Return: - AuthorizeResponse: contains all the informations + AuthorizeResponse: contains all required informations """ + # get the Nas object which made the request (if exists) nas_interface = Interface.objects.filter( Q(domain=Domain.objects.filter(name=nas_id)) | Q(ipv4=IpList.objects.filter(ipv4=nas_id)) @@ -64,7 +70,11 @@ def authorize(request, nas_id, username, mac_address): nas_type = Nas.objects.filter( nas_type=nas_interface.machine_type).first() + # get the User corresponding to the username in the URL + # If no username was provided (wired connection), username="None" user = User.objects.filter(pseudo__iexact=username).first() + + # get the interface which is trying to connect (if already created) user_interface = Interface.objects.filter(mac_address=mac_address).first() serialized = serializers.AuthorizeResponseSerializer( @@ -74,6 +84,9 @@ def authorize(request, nas_id, username, mac_address): class PostAuthResponse: + """Contains objects the radius needs for the Post-Auth step + """ + def __init__(self, nas, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT, USER_STATE_ACTIVE): self.nas = nas self.room_users = room_users @@ -92,7 +105,18 @@ class PostAuthResponse: @api_view(['GET']) def post_auth(request, nas_id, nas_port, user_mac): - # get nas_type + """Return objects the radius needs for the Post-Auth step + + Parameters: + nas_id (string): NAS name or ipv4 + nas_port (string): NAS port from wich the request came. Work with Cisco, HP and Juniper convention + user_mac (string): mac address of the device which is trying to connect + + Return: + PostAuthResponse: contains all required informations + """ + + # get the Nas object which made the request (if exists) nas_interface = Interface.objects.prefetch_related("machine__switch__stack").filter( Q(domain=Domain.objects.filter(name=nas_id)) | Q(ipv4=IpList.objects.filter(ipv4=nas_id)) @@ -102,14 +126,17 @@ def post_auth(request, nas_id, nas_port, user_mac): nas_type = Nas.objects.filter( nas_type=nas_interface.machine_type).first() - # get switch + # get the switch (if wired connection) switch = None if nas_interface: switch = Switch.objects.filter( machine_ptr=nas_interface.machine).first() + + # If the switch is part of a stack, get the correct object if hasattr(nas_interface.machine, "switch"): stack = nas_interface.machine.switch.stack if stack: + # magic split id_stack_member = nas_port.split("-")[1].split("/")[0] switch = ( Switch.objects.filter(stack=stack) @@ -117,9 +144,10 @@ def post_auth(request, nas_id, nas_port, user_mac): .first() ) - # get port + # get the switch port port = None if nas_port and nas_port != "None": + # magic split port_number = nas_port.split(".")[0].split("/")[-1][-2:] port = Port.objects.filter(switch=switch, port=port_number).first() @@ -127,7 +155,7 @@ def post_auth(request, nas_id, nas_port, user_mac): if port: port_profile = port.get_port_profile - # get user_interface + # get the interface which is trying to connect (if already created) user_interface = ( Interface.objects.filter(mac_address=user_mac) .select_related("machine__user") @@ -135,20 +163,22 @@ def post_auth(request, nas_id, nas_port, user_mac): .first() ) - # get room users + # get all users and clubs of the room room_users = [] if port: room_users = User.objects.filter( Q(club__room=port.room) | Q(adherent__room=port.room) ) - # get radius options + # get all radius options radius_option = RadiusOption.objects.first() print(radius_option) + # get a few class constants the radius will need EMAIL_STATE_UNVERIFIED = User.EMAIL_STATE_UNVERIFIED RADIUS_OPTION_REJECT = RadiusOption.REJECT USER_STATE_ACTIVE = User.STATE_ACTIVE + serialized = serializers.PostAuthResponseSerializer( PostAuthResponse(nas_type, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT, USER_STATE_ACTIVE)) @@ -157,6 +187,17 @@ def post_auth(request, nas_id, nas_port, user_mac): @api_view(['GET']) def autoregister_machine(request, nas_id, username, mac_address): + """Autoregister machine in the Authorize step of the radius + + Parameters: + nas_id (string): NAS name or ipv4 + username (string): username of the user who is trying to connect + mac_address (string): mac address of the device which is trying to connect + + Return: + 200 if autoregistering worked + 400 if it failed, and the reason why + """ nas_interface = Interface.objects.filter( Q(domain=Domain.objects.filter(name=nas_id)) | Q(ipv4=IpList.objects.filter(ipv4=nas_id)) @@ -170,12 +211,21 @@ def autoregister_machine(request, nas_id, username, mac_address): result, reason = user.autoregister_machine(mac_address, nas_type) if result: - return Response(data=reason) + return Response(reason) return Response(reason, status=400) @api_view(['GET']) def assign_ip(request, mac_address): + """Autoassign ip in the Authorize and Post-Auth steps of the Radius + + Parameters: + mac_address (string): mac address of the device which is trying to connect + + Return: + 200 if it worked + 400 if it failed, and the reason why + """ interface = ( Interface.objects.filter(mac_address=mac_address) .first()