8
0
Fork 0
mirror of https://gitlab2.federez.net/re2o/re2o synced 2024-11-22 11:23:10 +00:00

documentation

This commit is contained in:
chapeau 2020-11-29 10:33:40 +01:00
parent 048140db20
commit fdb93e1759
4 changed files with 204 additions and 129 deletions

View file

@ -48,25 +48,12 @@ import radiusd
from requests import HTTPError from requests import HTTPError
import urllib.parse import urllib.parse
path = (os.path.dirname(os.path.abspath(__file__)))
config = ConfigParser()
config.read(path+'/config.ini')
api_hostname = config.get('Re2o', 'hostname')
api_password = config.get('Re2o', 'password')
api_username = config.get('Re2o', 'username')
global api_client
api_client = Re2oAPIClient(
api_hostname, api_username, api_password, use_tls=True)
class RadiusdHandler(logging.Handler): class RadiusdHandler(logging.Handler):
"""Handler de logs pour freeradius""" """Logs handler for freeradius"""
def emit(self, record): def emit(self, record):
"""Process un message de log, en convertissant les niveaux""" """Log message processing, level are converted"""
if record.levelno >= logging.WARN: if record.levelno >= logging.WARN:
rad_sig = radiusd.L_ERR rad_sig = radiusd.L_ERR
elif record.levelno >= logging.INFO: elif record.levelno >= logging.INFO:
@ -76,7 +63,7 @@ class RadiusdHandler(logging.Handler):
radiusd.radlog(rad_sig, str(record.msg)) radiusd.radlog(rad_sig, str(record.msg))
# Initialisation d'un logger (pour logguer unifi ) # Init for logging
logger = logging.getLogger("auth.py") logger = logging.getLogger("auth.py")
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(name)s: [%(levelname)s] %(message)s") formatter = logging.Formatter("%(name)s: [%(levelname)s] %(message)s")
@ -86,17 +73,15 @@ logger.addHandler(handler)
def radius_event(fun): def radius_event(fun):
"""Décorateur pour les fonctions d'interfaces avec radius. """Decorator for freeradius fonction with radius.
Une telle fonction prend un uniquement argument, qui est une liste de This function take a unique argument which is a list of tuples (key, value)
tuples (clé, valeur) et renvoie un triplet dont les composantes sont : and return a tuple of 3 values which are:
* le code de retour (voir radiusd.RLM_MODULE_* ) * return code (see radiusd.RLM_MODULE_* )
* un tuple de couples (clé, valeur) pour les valeurs de réponse (accès ok * a tuple of 2 elements for response value (access ok , etc)
et autres trucs du genre) * a tuple of 2 elements for internal value to update (password for example)
* un tuple de couples (clé, valeur) pour les valeurs internes à mettre à
jour (mot de passe par exemple)
On se contente avec ce décorateur (pour l'instant) de convertir la liste de Here, we convert the list of tuples into a dictionnary.
tuples en entrée en un dictionnaire.""" """
def new_f(auth_data): def new_f(auth_data):
""" The function transforming the tuples as dict """ """ The function transforming the tuples as dict """
@ -125,19 +110,39 @@ def radius_event(fun):
@radius_event @radius_event
def instantiate(*_): def instantiate(*_):
"""Usefull for instantiate ldap connexions otherwise, """Instantiate api connection
do nothing""" """
logger.info("Instantiation") logger.info("Instantiation")
path = (os.path.dirname(os.path.abspath(__file__)))
config = ConfigParser()
config.read(path+'/config.ini')
api_hostname = config.get('Re2o', 'hostname')
api_password = config.get('Re2o', 'password')
api_username = config.get('Re2o', 'username')
global api_client
api_client = Re2oAPIClient(
api_hostname, api_username, api_password, use_tls=True)
@radius_event @radius_event
def authorize(data): def authorize(data):
# Pour les requetes proxifiees, on split """Here, we test if the Nas is known.
- If the nas is unknown, we assume that it is a 802.1X request,
- If the nas is known, we apply the 802.1X if enabled,
- It the nas is known AND nas auth is enabled with mac address, returns accept here
"""
nas = data.get("NAS-IP-Address", data.get("NAS-Identifier", None)) nas = data.get("NAS-IP-Address", data.get("NAS-Identifier", None))
username = data.get("User-Name", "") username = data.get("User-Name", "")
# For proxified request, split
username = username.split("@", 1)[0] username = username.split("@", 1)[0]
mac = data.get("Calling-Station-Id", "") mac = data.get("Calling-Station-Id", "")
# Get all required objects from API
data_from_api = api_client.view( data_from_api = api_client.view(
"radius/authorize/{0}/{1}/{2}".format( "radius/authorize/{0}/{1}/{2}".format(
urllib.parse.quote(nas or "None", safe=""), urllib.parse.quote(nas or "None", safe=""),
@ -149,7 +154,7 @@ def authorize(data):
user = data_from_api["user"] user = data_from_api["user"]
user_interface = data_from_api["user_interface"] user_interface = data_from_api["user_interface"]
if nas_type and nas_type["port_access_mode"] == "802.1X": if not nas_type or nas_type and nas_type["port_access_mode"] == "802.1X":
result, log, password = check_user_machine_and_register( result, log, password = check_user_machine_and_register(
nas_type, user, user_interface, nas, username, mac) nas_type, user, user_interface, nas, username, mac)
logger.info(log.encode("utf-8")) logger.info(log.encode("utf-8"))
@ -177,6 +182,7 @@ def post_auth(data):
nas_port = data.get("NAS-Port-Id", data.get("NAS-Port", None)) nas_port = data.get("NAS-Port-Id", data.get("NAS-Port", None))
mac = data.get("Calling-Station-Id", None) mac = data.get("Calling-Station-Id", None)
# Get all required objects from API
data_from_api = api_client.view( data_from_api = api_client.view(
"radius/post_auth/{0}/{1}/{2}".format( "radius/post_auth/{0}/{1}/{2}".format(
urllib.parse.quote(nas or "None", safe=""), urllib.parse.quote(nas or "None", safe=""),
@ -188,12 +194,14 @@ def post_auth(data):
port = data_from_api["port"] port = data_from_api["port"]
switch = data_from_api["switch"] switch = data_from_api["switch"]
# If proxified request
if not nas_type: if not nas_type:
logger.info("Proxified request, nas unknown") logger.info("Proxified request, nas unknown")
return radiusd.RLM_MODULE_OK return radiusd.RLM_MODULE_OK
# If it is a switch # If the request is from a switch (wired connection)
if switch: if switch:
# For logging
sw_name = switch["name"] or "?" sw_name = switch["name"] or "?"
room = "Unknown port" room = "Unknown port"
if port: if port:
@ -211,7 +219,7 @@ def post_auth(data):
) )
logger.info(log_message) logger.info(log_message)
# Wired connexion # Apply vlan from decide_vlan_switch
return ( return (
radiusd.RLM_MODULE_UPDATED, radiusd.RLM_MODULE_UPDATED,
( (
@ -232,6 +240,7 @@ def post_auth(data):
return (radiusd.RLM_MODULE_REJECT, tuple(attributes), ()) return (radiusd.RLM_MODULE_REJECT, tuple(attributes), ())
# Else it is from wifi
else: else:
return radiusd.RLM_MODULE_OK return radiusd.RLM_MODULE_OK
@ -239,11 +248,16 @@ def post_auth(data):
def check_user_machine_and_register(nas_type, user, user_interface, nas_id, username, mac_address): def check_user_machine_and_register(nas_type, user, user_interface, nas_id, username, mac_address):
"""Check if username and mac are registered. Register it if unknown. """Check if username and mac are registered. Register it if unknown.
Return the user ntlm password if everything is ok. Return the user ntlm password if everything is ok.
Used for 802.1X auth""" Used for 802.1X auth
"""
if not user: if not user:
# No username provided
return (False, "User unknown", "") return (False, "User unknown", "")
if not user["access"]: if not user["access"]:
return (False, "Invalid connexion (non-contributing user)", "") return (False, "Invalid connexion (non-contributing user)", "")
if user_interface: if user_interface:
if user_interface["user_pk"] != user["pk"]: if user_interface["user_pk"] != user["pk"]:
return ( return (
@ -251,9 +265,12 @@ def check_user_machine_and_register(nas_type, user, user_interface, nas_id, user
"Mac address registered on another user account", "Mac address registered on another user account",
"", "",
) )
elif not user_interface["active"]: elif not user_interface["active"]:
return (False, "Interface/Machine disabled", "") return (False, "Interface/Machine disabled", "")
elif not user_interface["ipv4"]: elif not user_interface["ipv4"]:
# Try to autoassign ip
try: try:
api_client.view( api_client.view(
"radius/assign_ip/{0}".format( "radius/assign_ip/{0}".format(
@ -264,7 +281,9 @@ def check_user_machine_and_register(nas_type, user, user_interface, nas_id, user
return (False, "Error during ip assignement %s" % err.response.text, "") return (False, "Error during ip assignement %s" % err.response.text, "")
else: else:
return (True, "Access ok", user.get("pwd_ntlm", "")) return (True, "Access ok", user.get("pwd_ntlm", ""))
elif nas_type: elif nas_type:
# The interface is not yet registred, try to autoregister if enabled
if nas_type["autocapture_mac"]: if nas_type["autocapture_mac"]:
try: try:
api_client.view( api_client.view(
@ -276,7 +295,7 @@ def check_user_machine_and_register(nas_type, user, user_interface, nas_id, user
return (True, "Access Ok, Registering mac...", user["pwd_ntlm"]) return (True, "Access Ok, Registering mac...", user["pwd_ntlm"])
except HTTPError as err: except HTTPError as err:
return (False, "Error during mac register %s" % err.response.text, "") return (False, "Error during mac register %s" % err.response.text, "")
return (False, "L'auto capture est désactivée", "") return (False, "Autoregistering is disabled", "")
else: else:
return (False, "Unknown interface/machine", "") return (False, "Unknown interface/machine", "")
else: else:
@ -284,6 +303,7 @@ def check_user_machine_and_register(nas_type, user, user_interface, nas_id, user
def set_radius_attributes_values(attributes, values): def set_radius_attributes_values(attributes, values):
"""Set values of parameters in radius attributes"""
return ( return (
(str(attribute["attribute"]), str(attribute["value"] % values)) (str(attribute["attribute"]), str(attribute["value"] % values))
for attribute in attributes for attribute in attributes
@ -292,38 +312,39 @@ def set_radius_attributes_values(attributes, values):
def decide_vlan_switch(data_from_api, user_mac, nas_port): def decide_vlan_switch(data_from_api, user_mac, nas_port):
"""Function for selecting vlan for a switch with wired mac auth radius. """Function for selecting vlan for a switch with wired mac auth radius.
Several modes are available : Two modes exist : in strict mode, a registered user cannot connect with
their machines in a non-registered user room
Sequentially :
- all modes: - all modes:
- unknown NAS : VLAN_OK, - unknown NAS : VLAN_OK,
- unknown port : Decision set in Re2o RadiusOption - unknown port : Decision set in Re2o RadiusOption
- No radius on this port : VLAN_OK - No radius on this port : VLAN_OK
- force : returns vlan provided by the database - force : replace VLAN_OK with vlan provided by the database
- mode strict: - mode strict:
- no room : Decision set in Re2o RadiusOption, - no room : Decision set in Re2o RadiusOption,
- no user in this room : Reject, - no user in this room : Reject,
- user of this room is banned or disable : Reject, - user of this room is banned or disable : Reject,
- user of this room non-contributor and not whitelisted: - user of this room non-contributor and not whitelisted:
Decision set in Re2o RadiusOption Decision set in Re2o RadiusOption
- mode common : - all modes :
- mac-address already registered: - mac-address already registered:
- related user non contributor / interface disabled: - related user non contributor / interface disabled:
Decision set in Re2o RadiusOption Decision set in Re2o RadiusOption
- related user is banned: - related user is banned:
Decision set in Re2o RadiusOption Decision set in Re2o RadiusOption
- user contributing : VLAN_OK (can assign ipv4 if needed) - user contributing : VLAN_OK (can assign ipv4 if needed)
- unknown interface : - unknown interface :
- register mac disabled : Decision set in Re2o RadiusOption - register mac disabled : Decision set in Re2o RadiusOption
- register mac enabled : redirect to webauth - register mac enabled : redirect to webauth (not implemented)
Returns: Returns:
tuple with : tuple with :
- Switch name (str)
- Room (str)
- Reason of the decision (str) - Reason of the decision (str)
- vlan_id (int) - vlan_id (int)
- decision (bool) - decision (bool)
- Other Attributs (attribut:str, operator:str, value:str) - Other Attributs (attribut:str, value:str)
""" """
# Get values from api
nas_type = data_from_api["nas"] nas_type = data_from_api["nas"]
room_users = data_from_api["room_users"] room_users = data_from_api["room_users"]
port = data_from_api["port"] port = data_from_api["port"]
@ -335,17 +356,17 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port):
RADIUS_OPTION_REJECT = data_from_api["RADIUS_OPTION_REJECT"] RADIUS_OPTION_REJECT = data_from_api["RADIUS_OPTION_REJECT"]
USER_STATE_ACTIVE = data_from_api["USER_STATE_ACTIVE"] USER_STATE_ACTIVE = data_from_api["USER_STATE_ACTIVE"]
# Values which can be used as parameters in radius attributes
attributes_kwargs = { attributes_kwargs = {
"client_mac": str(user_mac), "client_mac": str(user_mac),
# magic split
"switch_port": str(nas_port.split(".")[0].split("/")[-1][-2:]), "switch_port": str(nas_port.split(".")[0].split("/")[-1][-2:]),
"switch_ip": str(switch["ipv4"]) "switch_ip": str(switch["ipv4"])
} }
# Get port from switch and port number
extra_log = "" extra_log = ""
# If the port is unknwon, go to default vlan # If the port is unknown, do as in RadiusOption
# We don't have enought information to make a better decision
if not port or not port_profile: if not port or not port_profile:
return ( return (
"Unknown port", "Unknown port",
@ -355,7 +376,7 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port):
radius_option["unknown_port_attributes"], attributes_kwargs), radius_option["unknown_port_attributes"], attributes_kwargs),
) )
# If a vlan is precised in port config, we use it # If a vlan is precised in port config, we use it in place of VLAN_OK
if port_profile["vlan_untagged"]: if port_profile["vlan_untagged"]:
DECISION_VLAN = int(port_profile["vlan_untagged"]["vlan_id"]) DECISION_VLAN = int(port_profile["vlan_untagged"]["vlan_id"])
extra_log = "Force sur vlan " + str(DECISION_VLAN) extra_log = "Force sur vlan " + str(DECISION_VLAN)
@ -390,7 +411,7 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port):
# Otherwise, we are in mac radius. # Otherwise, we are in mac radius.
# If strict mode is enabled, we check every user related with this port. If # If strict mode is enabled, we check every user related with this port. If
# one user or more is not enabled, we reject to prevent from sharing or # all users and clubs are disabled, we reject to prevent from sharing or
# spoofing mac. # spoofing mac.
if port_profile["radius_mode"] == "STRICT": if port_profile["radius_mode"] == "STRICT":
if not port["room"]: if not port["room"]:
@ -438,76 +459,75 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port):
) )
# else: user OK, so we check MAC now # else: user OK, so we check MAC now
# If we are authenticating with mac, we look for the interfaces and its mac address # If mac is unknown,
if port_profile["radius_mode"] == "COMMON" or port_profile["radius_mode"] == "STRICT": if not user_interface:
# If mac is unknown, # We try to register mac, if autocapture is enabled
if not user_interface: # Final decision depend on RADIUSOption set in re2o
# We try to register mac, if autocapture is enabled # Something is not implemented here...
# Final decision depend on RADIUSOption set in re2o if nas_type["autocapture_mac"]:
if nas_type["autocapture_mac"]: return (
return ( "Unknown mac/interface",
"Unknown mac/interface", radius_option["unknown_machine_vlan"] and radius_option["unknown_machine_vlan"]["vlan_id"] or None,
radius_option["unknown_machine_vlan"] and radius_option["unknown_machine_vlan"]["vlan_id"] or None, radius_option["unknown_machine"] != RADIUS_OPTION_REJECT,
radius_option["unknown_machine"] != RADIUS_OPTION_REJECT, set_radius_attributes_values(
set_radius_attributes_values( radius_option["unknown_machine_attributes"], attributes_kwargs),
radius_option["unknown_machine_attributes"], attributes_kwargs), )
) # Otherwise, if autocapture mac is not enabled,
# Otherwise, if autocapture mac is not enabled,
else:
return (
"Unknown mac/interface",
radius_option["unknown_machine_vlan"] and radius_option["unknown_machine_vlan"]["vlan_id"] or None,
radius_option["unknown_machine"] != RADIUS_OPTION_REJECT,
set_radius_attributes_values(
radius_option["unknown_machine_attributes"], attributes_kwargs),
)
# Mac/Interface is found, check if related user is contributing and ok
# If needed, set ipv4 to it
else: else:
if user_interface["is_ban"]: return (
return ( "Unknown mac/interface",
"Banned user", radius_option["unknown_machine_vlan"] and radius_option["unknown_machine_vlan"]["vlan_id"] or None,
radius_option["banned_vlan"] and radius_option["banned_vlan"]["vlan_id"] or None, radius_option["unknown_machine"] != RADIUS_OPTION_REJECT,
radius_option["banned"] != RADIUS_OPTION_REJECT, set_radius_attributes_values(
set_radius_attributes_values( radius_option["unknown_machine_attributes"], attributes_kwargs),
radius_option["banned_attributes"], attributes_kwargs), )
)
if not user_interface["active"]:
return (
"Disabled interface / non-contributing member",
radius_option["non_member_vlan"] and radius_option["non_member_vlan"]["vlan_id"] or None,
radius_option["non_member"] != RADIUS_OPTION_REJECT,
set_radius_attributes_values(
radius_option["non_member_attributes"], attributes_kwargs),
)
# If settings is set to related interface vlan policy based on interface type:
if radius_option["radius_general_policy"] == "MACHINE":
DECISION_VLAN = user_interface["vlan_id"]
if not user_interface["ipv4"]:
try:
api_client.view(
"radius/assign_ip/{0}".format(
urllib.parse.quote(user_mac or "None", safe="")
))
return (
"Ok, assigning new ipv4" + extra_log,
DECISION_VLAN,
True,
attributes,
)
except HTTPError as err:
return (
"Error during ip assignement %s" % err.response.text + extra_log,
DECISION_VLAN,
True,
attributes,
)
else: # Mac/Interface is found, check if related user is contributing and ok
# If needed, set ipv4 to it
else:
if user_interface["is_ban"]:
return (
"Banned user",
radius_option["banned_vlan"] and radius_option["banned_vlan"]["vlan_id"] or None,
radius_option["banned"] != RADIUS_OPTION_REJECT,
set_radius_attributes_values(
radius_option["banned_attributes"], attributes_kwargs),
)
if not user_interface["active"]:
return (
"Disabled interface / non-contributing member",
radius_option["non_member_vlan"] and radius_option["non_member_vlan"]["vlan_id"] or None,
radius_option["non_member"] != RADIUS_OPTION_REJECT,
set_radius_attributes_values(
radius_option["non_member_attributes"], attributes_kwargs),
)
# If settings is set to related interface vlan policy based on interface type:
if radius_option["radius_general_policy"] == "MACHINE":
DECISION_VLAN = user_interface["vlan_id"]
if not user_interface["ipv4"]:
try:
api_client.view(
"radius/assign_ip/{0}".format(
urllib.parse.quote(user_mac or "None", safe="")
))
return ( return (
"Interface OK" + extra_log, "Ok, assigning new ipv4" + extra_log,
DECISION_VLAN, DECISION_VLAN,
True, True,
attributes, attributes,
) )
except HTTPError as err:
return (
"Error during ip assignement %s" % err.response.text + extra_log,
DECISION_VLAN,
True,
attributes,
)
else:
return (
"Interface OK" + extra_log,
DECISION_VLAN,
True,
attributes,
)

View file

@ -105,12 +105,18 @@ class RadiusOptionSerializer(Serializer):
class AuthorizeResponseSerializer(Serializer): class AuthorizeResponseSerializer(Serializer):
"""Serializer for AuthorizeResponse objects
See views.py for the declaration of AuthorizeResponse
"""
nas = NasSerializer(read_only=True) nas = NasSerializer(read_only=True)
user = UserSerializer(read_only=True) user = UserSerializer(read_only=True)
user_interface = InterfaceSerializer(read_only=True) user_interface = InterfaceSerializer(read_only=True)
class PostAuthResponseSerializer(Serializer): class PostAuthResponseSerializer(Serializer):
"""Serializer for PostAuthResponse objects
See views.py for the declaration of PostAuthResponse
"""
nas = NasSerializer(read_only=True) nas = NasSerializer(read_only=True)
room_users = UserSerializer(many=True) room_users = UserSerializer(many=True)
port = PortSerializer() port = PortSerializer()

View file

@ -28,6 +28,5 @@ urls_functional_view = [
views.post_auth, None), views.post_auth, None),
(r"radius/autoregister/(?P<nas_id>[^/]+)/(?P<username>.+)/(?P<mac_address>[0-9a-fA-F\:\-]{17})$", (r"radius/autoregister/(?P<nas_id>[^/]+)/(?P<username>.+)/(?P<mac_address>[0-9a-fA-F\:\-]{17})$",
views.autoregister_machine, None), views.autoregister_machine, None),
(r"radius/assign_ip/(?P<mac_address>[0-9a-fA-F\:\-]{17})$", (r"radius/assign_ip/(?P<mac_address>[0-9a-fA-F\:\-]{17})$", views.assign_ip, None),
views.assign_ip, None),
] ]

View file

@ -33,18 +33,23 @@ from topologie.models import Port, Switch
class AuthorizeResponse: class AuthorizeResponse:
"""Contains objects the radius needs for the Authorize step
"""
def __init__(self, nas, user, user_interface): def __init__(self, nas, user, user_interface):
self.nas = nas self.nas = nas
self.user = user self.user = user
self.user_interface = user_interface self.user_interface = user_interface
def can_view(self, user): def can_view(self, user):
"""Temp method to bypass ACL
"""
return [True] return [True]
@api_view(['GET']) @api_view(['GET'])
def authorize(request, nas_id, username, mac_address): def authorize(request, nas_id, username, mac_address):
"""Return objects the radius need for the Authorize step """Return objects the radius needs for the Authorize step
Parameters: Parameters:
nas_id (string): NAS name or ipv4 nas_id (string): NAS name or ipv4
@ -52,9 +57,10 @@ def authorize(request, nas_id, username, mac_address):
mac_address (string): mac address of the device which is trying to connect mac_address (string): mac address of the device which is trying to connect
Return: Return:
AuthorizeResponse: contains all the informations AuthorizeResponse: contains all required informations
""" """
# get the Nas object which made the request (if exists)
nas_interface = Interface.objects.filter( nas_interface = Interface.objects.filter(
Q(domain=Domain.objects.filter(name=nas_id)) Q(domain=Domain.objects.filter(name=nas_id))
| Q(ipv4=IpList.objects.filter(ipv4=nas_id)) | Q(ipv4=IpList.objects.filter(ipv4=nas_id))
@ -64,7 +70,11 @@ def authorize(request, nas_id, username, mac_address):
nas_type = Nas.objects.filter( nas_type = Nas.objects.filter(
nas_type=nas_interface.machine_type).first() nas_type=nas_interface.machine_type).first()
# get the User corresponding to the username in the URL
# If no username was provided (wired connection), username="None"
user = User.objects.filter(pseudo__iexact=username).first() user = User.objects.filter(pseudo__iexact=username).first()
# get the interface which is trying to connect (if already created)
user_interface = Interface.objects.filter(mac_address=mac_address).first() user_interface = Interface.objects.filter(mac_address=mac_address).first()
serialized = serializers.AuthorizeResponseSerializer( serialized = serializers.AuthorizeResponseSerializer(
@ -74,6 +84,9 @@ def authorize(request, nas_id, username, mac_address):
class PostAuthResponse: class PostAuthResponse:
"""Contains objects the radius needs for the Post-Auth step
"""
def __init__(self, nas, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT, USER_STATE_ACTIVE): def __init__(self, nas, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT, USER_STATE_ACTIVE):
self.nas = nas self.nas = nas
self.room_users = room_users self.room_users = room_users
@ -92,7 +105,18 @@ class PostAuthResponse:
@api_view(['GET']) @api_view(['GET'])
def post_auth(request, nas_id, nas_port, user_mac): def post_auth(request, nas_id, nas_port, user_mac):
# get nas_type """Return objects the radius needs for the Post-Auth step
Parameters:
nas_id (string): NAS name or ipv4
nas_port (string): NAS port from wich the request came. Work with Cisco, HP and Juniper convention
user_mac (string): mac address of the device which is trying to connect
Return:
PostAuthResponse: contains all required informations
"""
# get the Nas object which made the request (if exists)
nas_interface = Interface.objects.prefetch_related("machine__switch__stack").filter( nas_interface = Interface.objects.prefetch_related("machine__switch__stack").filter(
Q(domain=Domain.objects.filter(name=nas_id)) Q(domain=Domain.objects.filter(name=nas_id))
| Q(ipv4=IpList.objects.filter(ipv4=nas_id)) | Q(ipv4=IpList.objects.filter(ipv4=nas_id))
@ -102,14 +126,17 @@ def post_auth(request, nas_id, nas_port, user_mac):
nas_type = Nas.objects.filter( nas_type = Nas.objects.filter(
nas_type=nas_interface.machine_type).first() nas_type=nas_interface.machine_type).first()
# get switch # get the switch (if wired connection)
switch = None switch = None
if nas_interface: if nas_interface:
switch = Switch.objects.filter( switch = Switch.objects.filter(
machine_ptr=nas_interface.machine).first() machine_ptr=nas_interface.machine).first()
# If the switch is part of a stack, get the correct object
if hasattr(nas_interface.machine, "switch"): if hasattr(nas_interface.machine, "switch"):
stack = nas_interface.machine.switch.stack stack = nas_interface.machine.switch.stack
if stack: if stack:
# magic split
id_stack_member = nas_port.split("-")[1].split("/")[0] id_stack_member = nas_port.split("-")[1].split("/")[0]
switch = ( switch = (
Switch.objects.filter(stack=stack) Switch.objects.filter(stack=stack)
@ -117,9 +144,10 @@ def post_auth(request, nas_id, nas_port, user_mac):
.first() .first()
) )
# get port # get the switch port
port = None port = None
if nas_port and nas_port != "None": if nas_port and nas_port != "None":
# magic split
port_number = nas_port.split(".")[0].split("/")[-1][-2:] port_number = nas_port.split(".")[0].split("/")[-1][-2:]
port = Port.objects.filter(switch=switch, port=port_number).first() port = Port.objects.filter(switch=switch, port=port_number).first()
@ -127,7 +155,7 @@ def post_auth(request, nas_id, nas_port, user_mac):
if port: if port:
port_profile = port.get_port_profile port_profile = port.get_port_profile
# get user_interface # get the interface which is trying to connect (if already created)
user_interface = ( user_interface = (
Interface.objects.filter(mac_address=user_mac) Interface.objects.filter(mac_address=user_mac)
.select_related("machine__user") .select_related("machine__user")
@ -135,20 +163,22 @@ def post_auth(request, nas_id, nas_port, user_mac):
.first() .first()
) )
# get room users # get all users and clubs of the room
room_users = [] room_users = []
if port: if port:
room_users = User.objects.filter( room_users = User.objects.filter(
Q(club__room=port.room) | Q(adherent__room=port.room) Q(club__room=port.room) | Q(adherent__room=port.room)
) )
# get radius options # get all radius options
radius_option = RadiusOption.objects.first() radius_option = RadiusOption.objects.first()
print(radius_option) print(radius_option)
# get a few class constants the radius will need
EMAIL_STATE_UNVERIFIED = User.EMAIL_STATE_UNVERIFIED EMAIL_STATE_UNVERIFIED = User.EMAIL_STATE_UNVERIFIED
RADIUS_OPTION_REJECT = RadiusOption.REJECT RADIUS_OPTION_REJECT = RadiusOption.REJECT
USER_STATE_ACTIVE = User.STATE_ACTIVE USER_STATE_ACTIVE = User.STATE_ACTIVE
serialized = serializers.PostAuthResponseSerializer( serialized = serializers.PostAuthResponseSerializer(
PostAuthResponse(nas_type, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT, USER_STATE_ACTIVE)) PostAuthResponse(nas_type, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT, USER_STATE_ACTIVE))
@ -157,6 +187,17 @@ def post_auth(request, nas_id, nas_port, user_mac):
@api_view(['GET']) @api_view(['GET'])
def autoregister_machine(request, nas_id, username, mac_address): def autoregister_machine(request, nas_id, username, mac_address):
"""Autoregister machine in the Authorize step of the radius
Parameters:
nas_id (string): NAS name or ipv4
username (string): username of the user who is trying to connect
mac_address (string): mac address of the device which is trying to connect
Return:
200 if autoregistering worked
400 if it failed, and the reason why
"""
nas_interface = Interface.objects.filter( nas_interface = Interface.objects.filter(
Q(domain=Domain.objects.filter(name=nas_id)) Q(domain=Domain.objects.filter(name=nas_id))
| Q(ipv4=IpList.objects.filter(ipv4=nas_id)) | Q(ipv4=IpList.objects.filter(ipv4=nas_id))
@ -170,12 +211,21 @@ def autoregister_machine(request, nas_id, username, mac_address):
result, reason = user.autoregister_machine(mac_address, nas_type) result, reason = user.autoregister_machine(mac_address, nas_type)
if result: if result:
return Response(data=reason) return Response(reason)
return Response(reason, status=400) return Response(reason, status=400)
@api_view(['GET']) @api_view(['GET'])
def assign_ip(request, mac_address): def assign_ip(request, mac_address):
"""Autoassign ip in the Authorize and Post-Auth steps of the Radius
Parameters:
mac_address (string): mac address of the device which is trying to connect
Return:
200 if it worked
400 if it failed, and the reason why
"""
interface = ( interface = (
Interface.objects.filter(mac_address=mac_address) Interface.objects.filter(mac_address=mac_address)
.first() .first()