8
0
Fork 0
mirror of https://gitlab2.federez.net/re2o/re2o synced 2024-11-28 15:42:25 +00:00
This commit is contained in:
chapeau 2021-05-13 19:30:52 +02:00
parent d1a1d6613d
commit ce862c2853
4 changed files with 189 additions and 110 deletions

View file

@ -61,10 +61,10 @@ from topologie.models import Port, Switch
from users.models import User
class RadiusdHandler(logging.Handler):
"""Handler de logs pour freeradius"""
"""Logs handler for freeradius"""
def emit(self, record):
"""Process un message de log, en convertissant les niveaux"""
"""Log message processing, level are converted"""
if record.levelno >= logging.WARN:
rad_sig = radiusd.L_ERR
elif record.levelno >= logging.INFO:
@ -84,17 +84,15 @@ logger.addHandler(handler)
def radius_event(fun):
"""Décorateur pour les fonctions d'interfaces avec radius.
Une telle fonction prend un uniquement argument, qui est une liste de
tuples (clé, valeur) et renvoie un triplet dont les composantes sont :
* le code de retour (voir radiusd.RLM_MODULE_* )
* un tuple de couples (clé, valeur) pour les valeurs de réponse (accès ok
et autres trucs du genre)
* un tuple de couples (clé, valeur) pour les valeurs internes à mettre à
jour (mot de passe par exemple)
"""Decorator for freeradius fonction with radius.
This function take a unique argument which is a list of tuples (key, value)
and return a tuple of 3 values which are:
* return code (see radiusd.RLM_MODULE_* )
* a tuple of 2 elements for response value (access ok , etc)
* a tuple of 2 elements for internal value to update (password for example)
On se contente avec ce décorateur (pour l'instant) de convertir la liste de
tuples en entrée en un dictionnaire."""
Here, we convert the list of tuples into a dictionnary.
"""
def new_f(auth_data):
""" The function transforming the tuples as dict """
@ -123,10 +121,23 @@ def radius_event(fun):
@radius_event
def instantiate(*_):
"""Usefull for instantiate ldap connexions otherwise,
do nothing"""
"""Instantiate api connection
"""
logger.info("Instantiation")
path = (os.path.dirname(os.path.abspath(__file__)))
config = ConfigParser()
config.read(path+'/config.ini')
api_hostname = config.get('Re2o', 'hostname')
api_password = config.get('Re2o', 'password')
api_username = config.get('Re2o', 'username')
global api_client
api_client = Re2oAPIClient(
api_hostname, api_username, api_password, use_tls=True)
@radius_event
def authorize(data):
@ -178,8 +189,9 @@ def post_auth(data):
logger.info("Proxified request, nas unknown")
return radiusd.RLM_MODULE_OK
# If it is a switch
# If the request is from a switch (wired connection)
if switch:
# For logging
sw_name = switch["name"] or "?"
room = "Unknown port"
if port:
@ -197,7 +209,7 @@ def post_auth(data):
)
logger.info(log_message)
# Wired connexion
# Apply vlan from decide_vlan_switch
return (
radiusd.RLM_MODULE_UPDATED,
(
@ -218,6 +230,7 @@ def post_auth(data):
return (radiusd.RLM_MODULE_REJECT, tuple(attributes), ())
# Else it is from wifi
else:
return radiusd.RLM_MODULE_OK
@ -225,11 +238,16 @@ def post_auth(data):
def check_user_machine_and_register(nas_type, user, user_interface, nas_id, username, mac_address):
"""Check if username and mac are registered. Register it if unknown.
Return the user ntlm password if everything is ok.
Used for 802.1X auth"""
Used for 802.1X auth
"""
if not user:
# No username provided
return (False, "User unknown", "")
if not user["access"]:
return (False, "Invalid connexion (non-contributing user)", "")
if user_interface:
if user_interface["user_pk"] != user["pk"]:
return (
@ -237,9 +255,12 @@ def check_user_machine_and_register(nas_type, user, user_interface, nas_id, user
"Mac address registered on another user account",
"",
)
elif not user_interface["active"]:
return (False, "Interface/Machine disabled", "")
elif not user_interface["ipv4"]:
# Try to autoassign ip
try:
api_client.view(
"radius/assign_ip/{0}".format(
@ -250,7 +271,9 @@ def check_user_machine_and_register(nas_type, user, user_interface, nas_id, user
return (False, "Error during ip assignement %s" % err.response.text, "")
else:
return (True, "Access ok", user.get("pwd_ntlm", ""))
elif nas_type:
# The interface is not yet registred, try to autoregister if enabled
if nas_type["autocapture_mac"]:
try:
api_client.view(
@ -262,7 +285,7 @@ def check_user_machine_and_register(nas_type, user, user_interface, nas_id, user
return (True, "Access Ok, Registering mac...", user["pwd_ntlm"])
except HTTPError as err:
return (False, "Error during mac register %s" % err.response.text, "")
return (False, "L'auto capture est désactivée", "")
return (False, "Autoregistering is disabled", "")
else:
return (False, "Unknown interface/machine", "")
else:
@ -270,6 +293,7 @@ def check_user_machine_and_register(nas_type, user, user_interface, nas_id, user
def set_radius_attributes_values(attributes, values):
"""Set values of parameters in radius attributes"""
return (
(str(attribute["attribute"]), str(attribute["value"] % values))
for attribute in attributes
@ -278,12 +302,14 @@ def set_radius_attributes_values(attributes, values):
def decide_vlan_switch(data_from_api, user_mac, nas_port):
"""Function for selecting vlan for a switch with wired mac auth radius.
Several modes are available :
Two modes exist : in strict mode, a registered user cannot connect with
their machines in a non-registered user room
Sequentially :
- all modes:
- unknown NAS : VLAN_OK,
- unknown port : Decision set in Re2o RadiusOption
- No radius on this port : VLAN_OK
- force : returns vlan provided by the database
- force : replace VLAN_OK with vlan provided by the database
- mode strict:
- no room : Decision set in Re2o RadiusOption,
- no user in this room : Reject,
@ -299,17 +325,16 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port):
- user contributing : VLAN_OK (can assign ipv4 if needed)
- unknown interface :
- register mac disabled : Decision set in Re2o RadiusOption
- register mac enabled : redirect to webauth
- register mac enabled : redirect to webauth (not implemented)
Returns:
tuple with :
- Switch name (str)
- Room (str)
- Reason of the decision (str)
- vlan_id (int)
- decision (bool)
- Other Attributs (attribut:str, operator:str, value:str)
- Other Attributs (attribut:str, value:str)
"""
# Get values from api
nas_type = data_from_api["nas"]
room_users = data_from_api["room_users"]
port = data_from_api["port"]
@ -321,13 +346,14 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port):
RADIUS_OPTION_REJECT = data_from_api["RADIUS_OPTION_REJECT"]
USER_STATE_ACTIVE = data_from_api["USER_STATE_ACTIVE"]
# Values which can be used as parameters in radius attributes
attributes_kwargs = {
"client_mac": str(user_mac),
# magic split
"switch_port": str(nas_port.split(".")[0].split("/")[-1][-2:]),
"switch_ip": str(switch["ipv4"])
}
# Get port from switch and port number
extra_log = ""
# If NAS is unknown, go to default vlan
if not nas_machine:
@ -346,8 +372,7 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port):
attributes_kwargs["switch_ip"] = str(switch.ipv4)
port = Port.objects.filter(switch=switch, port=port_number).first()
# If the port is unknwon, go to default vlan
# We don't have enought information to make a better decision
# If the port is unknown, do as in RadiusOption
if not port or not port_profile:
return (
"Unknown port",
@ -395,7 +420,7 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port):
# Otherwise, we are in mac radius.
# If strict mode is enabled, we check every user related with this port. If
# one user or more is not enabled, we reject to prevent from sharing or
# all users and clubs are disabled, we reject to prevent from sharing or
# spoofing mac.
if port_profile["radius_mode"] == "STRICT":
if not port["room"]:
@ -443,12 +468,11 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port):
)
# else: user OK, so we check MAC now
# If we are authenticating with mac, we look for the interfaces and its mac address
if port_profile["radius_mode"] == "COMMON" or port_profile["radius_mode"] == "STRICT":
# If mac is unknown,
if not user_interface:
# We try to register mac, if autocapture is enabled
# Final decision depend on RADIUSOption set in re2o
# Something is not implemented here...
if nas_type["autocapture_mac"]:
return (
"Unknown mac/interface",

View file

@ -105,12 +105,18 @@ class RadiusOptionSerializer(Serializer):
class AuthorizeResponseSerializer(Serializer):
"""Serializer for AuthorizeResponse objects
See views.py for the declaration of AuthorizeResponse
"""
nas = NasSerializer(read_only=True)
user = UserSerializer(read_only=True)
user_interface = InterfaceSerializer(read_only=True)
class PostAuthResponseSerializer(Serializer):
"""Serializer for PostAuthResponse objects
See views.py for the declaration of PostAuthResponse
"""
nas = NasSerializer(read_only=True)
room_users = UserSerializer(many=True)
port = PortSerializer()

View file

@ -28,6 +28,5 @@ urls_functional_view = [
views.post_auth, None),
(r"radius/autoregister/(?P<nas_id>[^/]+)/(?P<username>.+)/(?P<mac_address>[0-9a-fA-F\:\-]{17})$",
views.autoregister_machine, None),
(r"radius/assign_ip/(?P<mac_address>[0-9a-fA-F\:\-]{17})$",
views.assign_ip, None),
(r"radius/assign_ip/(?P<mac_address>[0-9a-fA-F\:\-]{17})$", views.assign_ip, None),
]

View file

@ -33,18 +33,23 @@ from topologie.models import Port, Switch
class AuthorizeResponse:
"""Contains objects the radius needs for the Authorize step
"""
def __init__(self, nas, user, user_interface):
self.nas = nas
self.user = user
self.user_interface = user_interface
def can_view(self, user):
"""Temp method to bypass ACL
"""
return [True]
@api_view(['GET'])
def authorize(request, nas_id, username, mac_address):
"""Return objects the radius need for the Authorize step
"""Return objects the radius needs for the Authorize step
Parameters:
nas_id (string): NAS name or ipv4
@ -52,9 +57,10 @@ def authorize(request, nas_id, username, mac_address):
mac_address (string): mac address of the device which is trying to connect
Return:
AuthorizeResponse: contains all the informations
AuthorizeResponse: contains all required informations
"""
# get the Nas object which made the request (if exists)
nas_interface = Interface.objects.filter(
Q(domain=Domain.objects.filter(name=nas_id))
| Q(ipv4=IpList.objects.filter(ipv4=nas_id))
@ -64,7 +70,11 @@ def authorize(request, nas_id, username, mac_address):
nas_type = Nas.objects.filter(
nas_type=nas_interface.machine_type).first()
# get the User corresponding to the username in the URL
# If no username was provided (wired connection), username="None"
user = User.objects.filter(pseudo__iexact=username).first()
# get the interface which is trying to connect (if already created)
user_interface = Interface.objects.filter(mac_address=mac_address).first()
serialized = serializers.AuthorizeResponseSerializer(
@ -74,6 +84,9 @@ def authorize(request, nas_id, username, mac_address):
class PostAuthResponse:
"""Contains objects the radius needs for the Post-Auth step
"""
def __init__(self, nas, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT, USER_STATE_ACTIVE):
self.nas = nas
self.room_users = room_users
@ -92,7 +105,18 @@ class PostAuthResponse:
@api_view(['GET'])
def post_auth(request, nas_id, nas_port, user_mac):
# get nas_type
"""Return objects the radius needs for the Post-Auth step
Parameters:
nas_id (string): NAS name or ipv4
nas_port (string): NAS port from wich the request came. Work with Cisco, HP and Juniper convention
user_mac (string): mac address of the device which is trying to connect
Return:
PostAuthResponse: contains all required informations
"""
# get the Nas object which made the request (if exists)
nas_interface = Interface.objects.prefetch_related("machine__switch__stack").filter(
Q(domain=Domain.objects.filter(name=nas_id))
| Q(ipv4=IpList.objects.filter(ipv4=nas_id))
@ -102,14 +126,17 @@ def post_auth(request, nas_id, nas_port, user_mac):
nas_type = Nas.objects.filter(
nas_type=nas_interface.machine_type).first()
# get switch
# get the switch (if wired connection)
switch = None
if nas_interface:
switch = Switch.objects.filter(
machine_ptr=nas_interface.machine).first()
# If the switch is part of a stack, get the correct object
if hasattr(nas_interface.machine, "switch"):
stack = nas_interface.machine.switch.stack
if stack:
# magic split
id_stack_member = nas_port.split("-")[1].split("/")[0]
switch = (
Switch.objects.filter(stack=stack)
@ -117,9 +144,10 @@ def post_auth(request, nas_id, nas_port, user_mac):
.first()
)
# get port
# get the switch port
port = None
if nas_port and nas_port != "None":
# magic split
port_number = nas_port.split(".")[0].split("/")[-1][-2:]
port = Port.objects.filter(switch=switch, port=port_number).first()
@ -127,7 +155,7 @@ def post_auth(request, nas_id, nas_port, user_mac):
if port:
port_profile = port.get_port_profile
# get user_interface
# get the interface which is trying to connect (if already created)
user_interface = (
Interface.objects.filter(mac_address=user_mac)
.select_related("machine__user")
@ -135,20 +163,22 @@ def post_auth(request, nas_id, nas_port, user_mac):
.first()
)
# get room users
# get all users and clubs of the room
room_users = []
if port:
room_users = User.objects.filter(
Q(club__room=port.room) | Q(adherent__room=port.room)
)
# get radius options
# get all radius options
radius_option = RadiusOption.objects.first()
print(radius_option)
# get a few class constants the radius will need
EMAIL_STATE_UNVERIFIED = User.EMAIL_STATE_UNVERIFIED
RADIUS_OPTION_REJECT = RadiusOption.REJECT
USER_STATE_ACTIVE = User.STATE_ACTIVE
serialized = serializers.PostAuthResponseSerializer(
PostAuthResponse(nas_type, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT, USER_STATE_ACTIVE))
@ -157,6 +187,17 @@ def post_auth(request, nas_id, nas_port, user_mac):
@api_view(['GET'])
def autoregister_machine(request, nas_id, username, mac_address):
"""Autoregister machine in the Authorize step of the radius
Parameters:
nas_id (string): NAS name or ipv4
username (string): username of the user who is trying to connect
mac_address (string): mac address of the device which is trying to connect
Return:
200 if autoregistering worked
400 if it failed, and the reason why
"""
nas_interface = Interface.objects.filter(
Q(domain=Domain.objects.filter(name=nas_id))
| Q(ipv4=IpList.objects.filter(ipv4=nas_id))
@ -170,12 +211,21 @@ def autoregister_machine(request, nas_id, username, mac_address):
result, reason = user.autoregister_machine(mac_address, nas_type)
if result:
return Response(data=reason)
return Response(reason)
return Response(reason, status=400)
@api_view(['GET'])
def assign_ip(request, mac_address):
"""Autoassign ip in the Authorize and Post-Auth steps of the Radius
Parameters:
mac_address (string): mac address of the device which is trying to connect
Return:
200 if it worked
400 if it failed, and the reason why
"""
interface = (
Interface.objects.filter(mac_address=mac_address)
.first()