8
0
Fork 0
mirror of https://gitlab2.federez.net/re2o/re2o synced 2024-11-22 19:33:11 +00:00
re2o/radius/api/views.py

267 lines
9 KiB
Python
Raw Normal View History

2020-11-28 11:09:36 +00:00
# -*- mode: python; coding: utf-8 -*-
2021-05-13 17:28:56 +00:00
# Re2o est un logiciel d'administration développé initiallement au Rézo Metz. Il
2020-11-28 11:09:36 +00:00
# se veut agnostique au réseau considéré, de manière à être installable en
# quelques clics.
#
# Copyright © 2020 Corentin Canebier
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from rest_framework.decorators import api_view
from rest_framework.response import Response
from django.db.models import Q
2021-05-13 17:29:49 +00:00
from django.http import HttpResponse
from django.forms import ValidationError
2020-11-29 17:08:20 +00:00
from django.contrib.auth.decorators import login_required
2020-11-28 11:09:36 +00:00
from . import serializers
2020-11-29 17:08:20 +00:00
from machines.models import Domain, IpList, Interface, Nas, Machine
2020-11-28 11:09:36 +00:00
from users.models import User
from preferences.models import RadiusOption
from topologie.models import Port, Switch
2020-11-29 17:08:20 +00:00
from re2o.acl import can_view_all_api, can_edit_all_api, can_create_api
2020-11-28 11:09:36 +00:00
class AuthorizeResponse:
2021-05-17 19:59:33 +00:00
"""Contains objects the radius needs for the Authorize step"""
2021-05-13 17:30:52 +00:00
2020-11-28 11:09:36 +00:00
def __init__(self, nas, user, user_interface):
self.nas = nas
self.user = user
self.user_interface = user_interface
2021-05-17 11:48:07 +00:00
def can_view(self, user):
2021-05-17 19:59:33 +00:00
"""Method to bypass api permissions, because we are using ACL decorators"""
2021-05-17 14:47:53 +00:00
return (True, None, None)
2021-05-17 11:48:07 +00:00
2020-11-28 11:09:36 +00:00
2021-05-17 19:59:33 +00:00
@api_view(["GET"])
2020-11-29 17:08:20 +00:00
@login_required
@can_view_all_api(Interface, Domain, IpList, Nas, User)
2020-11-28 11:09:36 +00:00
def authorize(request, nas_id, username, mac_address):
2021-05-13 17:30:52 +00:00
"""Return objects the radius needs for the Authorize step
2021-05-13 17:29:49 +00:00
Parameters:
nas_id (string): NAS name or ipv4
username (string): username of the user who is trying to connect
mac_address (string): mac address of the device which is trying to connect
Return:
2021-05-13 17:30:52 +00:00
AuthorizeResponse: contains all required informations
2021-05-13 17:29:49 +00:00
"""
2020-11-28 11:09:36 +00:00
2021-05-13 17:30:52 +00:00
# get the Nas object which made the request (if exists)
2020-11-28 11:09:36 +00:00
nas_interface = Interface.objects.filter(
2021-05-17 19:59:33 +00:00
Q(domain__name=nas_id) | Q(ipv4__ipv4=nas_id)
2020-11-28 11:09:36 +00:00
).first()
nas_type = None
if nas_interface:
2021-05-17 19:59:33 +00:00
nas_type = Nas.objects.filter(nas_type=nas_interface.machine_type).first()
2020-11-28 11:09:36 +00:00
2021-05-13 17:30:52 +00:00
# get the User corresponding to the username in the URL
# If no username was provided (wired connection), username="None"
2020-11-28 11:09:36 +00:00
user = User.objects.filter(pseudo__iexact=username).first()
2021-05-13 17:30:52 +00:00
# get the interface which is trying to connect (if already created)
2020-11-28 11:09:36 +00:00
user_interface = Interface.objects.filter(mac_address=mac_address).first()
serialized = serializers.AuthorizeResponseSerializer(
2021-05-17 19:59:33 +00:00
AuthorizeResponse(nas_type, user, user_interface)
)
2020-11-28 11:09:36 +00:00
return Response(data=serialized.data)
class PostAuthResponse:
2021-05-17 19:59:33 +00:00
"""Contains objects the radius needs for the Post-Auth step"""
def __init__(
self,
nas,
room_users,
port,
port_profile,
switch,
user_interface,
radius_option,
EMAIL_STATE_UNVERIFIED,
RADIUS_OPTION_REJECT,
USER_STATE_ACTIVE,
):
2020-11-28 11:09:36 +00:00
self.nas = nas
self.room_users = room_users
self.port = port
self.port_profile = port_profile
self.switch = switch
self.user_interface = user_interface
self.radius_option = radius_option
self.EMAIL_STATE_UNVERIFIED = EMAIL_STATE_UNVERIFIED
self.RADIUS_OPTION_REJECT = RADIUS_OPTION_REJECT
2021-05-13 17:28:56 +00:00
self.USER_STATE_ACTIVE = USER_STATE_ACTIVE
2020-11-28 11:09:36 +00:00
2021-05-17 11:48:07 +00:00
def can_view(self, user):
2021-05-17 19:59:33 +00:00
"""Method to bypass api permissions, because we are using ACL decorators"""
2021-05-17 14:47:53 +00:00
return (True, None, None)
2021-05-17 11:48:07 +00:00
2020-11-28 11:09:36 +00:00
2021-05-17 19:59:33 +00:00
@api_view(["GET"])
2020-11-29 17:08:20 +00:00
@login_required
@can_view_all_api(Interface, Domain, IpList, Nas, Switch, Port, User)
2020-11-28 11:09:36 +00:00
def post_auth(request, nas_id, nas_port, user_mac):
2021-05-13 17:30:52 +00:00
"""Return objects the radius needs for the Post-Auth step
Parameters:
nas_id (string): NAS name or ipv4
nas_port (string): NAS port from wich the request came. Work with Cisco, HP and Juniper convention
user_mac (string): mac address of the device which is trying to connect
Return:
PostAuthResponse: contains all required informations
"""
# get the Nas object which made the request (if exists)
2021-05-17 19:59:33 +00:00
nas_interface = (
Interface.objects.prefetch_related("machine__switch__stack")
.filter(Q(domain__name=nas_id) | Q(ipv4__ipv4=nas_id))
.first()
)
2020-11-28 11:09:36 +00:00
nas_type = None
if nas_interface:
2021-05-17 19:59:33 +00:00
nas_type = Nas.objects.filter(nas_type=nas_interface.machine_type).first()
2020-11-28 11:09:36 +00:00
2021-05-13 17:30:52 +00:00
# get the switch (if wired connection)
2021-05-13 17:28:56 +00:00
switch = None
if nas_interface:
2021-05-17 19:59:33 +00:00
switch = Switch.objects.filter(machine_ptr=nas_interface.machine).first()
2021-05-13 17:30:52 +00:00
# If the switch is part of a stack, get the correct object
2021-05-13 17:28:56 +00:00
if hasattr(nas_interface.machine, "switch"):
stack = nas_interface.machine.switch.stack
if stack:
2021-05-20 21:20:51 +00:00
# For Juniper, the result looks something like this: NAS-Port-Id = "ge-0/0/6.0""
# For other brands (e.g. HP or Mikrotik), the result usually looks like: NAS-Port-Id = "6.0"
# This "magic split" handles both cases
# Cisco can rot in Hell for all I care, so their format is not supported (it looks like NAS-Port-ID = atm 31/31/7:255.65535 guangzhou001/0/31/63/31/127)
2021-05-13 17:28:56 +00:00
id_stack_member = nas_port.split("-")[1].split("/")[0]
switch = (
Switch.objects.filter(stack=stack)
.filter(stack_member_id=id_stack_member)
.first()
)
2020-11-28 11:09:36 +00:00
2021-05-13 17:30:52 +00:00
# get the switch port
2021-05-13 17:29:49 +00:00
port = None
if nas_port and nas_port != "None":
2021-05-20 21:20:51 +00:00
# magic split (see above)
2021-05-13 17:29:49 +00:00
port_number = nas_port.split(".")[0].split("/")[-1][-2:]
port = Port.objects.filter(switch=switch, port=port_number).first()
2020-11-28 11:09:36 +00:00
2021-05-13 17:28:56 +00:00
port_profile = None
if port:
port_profile = port.get_port_profile
2020-11-28 11:09:36 +00:00
2021-05-13 17:30:52 +00:00
# get the interface which is trying to connect (if already created)
2020-11-28 11:09:36 +00:00
user_interface = (
Interface.objects.filter(mac_address=user_mac)
.select_related("machine__user")
.select_related("ipv4")
.first()
)
2021-05-13 17:30:52 +00:00
# get all users and clubs of the room
2021-05-13 17:28:56 +00:00
room_users = []
if port:
room_users = User.objects.filter(
Q(club__room=port.room) | Q(adherent__room=port.room)
)
2020-11-28 11:09:36 +00:00
2021-05-13 17:30:52 +00:00
# get all radius options
2020-11-28 11:09:36 +00:00
radius_option = RadiusOption.objects.first()
2021-05-13 17:30:52 +00:00
# get a few class constants the radius will need
2020-11-28 11:09:36 +00:00
EMAIL_STATE_UNVERIFIED = User.EMAIL_STATE_UNVERIFIED
RADIUS_OPTION_REJECT = RadiusOption.REJECT
2021-05-13 17:28:56 +00:00
USER_STATE_ACTIVE = User.STATE_ACTIVE
2021-05-13 17:30:52 +00:00
2020-11-28 11:09:36 +00:00
serialized = serializers.PostAuthResponseSerializer(
2021-05-17 19:59:33 +00:00
PostAuthResponse(
nas_type,
room_users,
port,
port_profile,
switch,
user_interface,
radius_option,
EMAIL_STATE_UNVERIFIED,
RADIUS_OPTION_REJECT,
USER_STATE_ACTIVE,
)
)
2020-11-28 11:09:36 +00:00
return Response(data=serialized.data)
2021-05-13 17:29:49 +00:00
2021-05-17 19:59:33 +00:00
@api_view(["GET"])
2020-11-29 17:08:20 +00:00
@login_required
@can_view_all_api(Interface, Domain, IpList, Nas, User)
@can_edit_all_api(User, Domain, Machine, Interface)
2021-05-13 17:29:49 +00:00
def autoregister_machine(request, nas_id, username, mac_address):
2021-05-13 17:30:52 +00:00
"""Autoregister machine in the Authorize step of the radius
Parameters:
nas_id (string): NAS name or ipv4
username (string): username of the user who is trying to connect
mac_address (string): mac address of the device which is trying to connect
Return:
200 if autoregistering worked
400 if it failed, and the reason why
"""
2021-05-13 17:29:49 +00:00
nas_interface = Interface.objects.filter(
2021-05-17 19:59:33 +00:00
Q(domain__name=nas_id) | Q(ipv4__ipv4=nas_id)
2021-05-13 17:29:49 +00:00
).first()
nas_type = None
if nas_interface:
2021-05-17 19:59:33 +00:00
nas_type = Nas.objects.filter(nas_type=nas_interface.machine_type).first()
2021-05-13 17:29:49 +00:00
user = User.objects.filter(pseudo__iexact=username).first()
result, reason = user.autoregister_machine(mac_address, nas_type)
if result:
2021-05-13 17:30:52 +00:00
return Response(reason)
2021-05-13 17:29:49 +00:00
return Response(reason, status=400)
2021-05-17 19:59:33 +00:00
@api_view(["GET"])
2020-11-29 17:08:20 +00:00
@can_view_all_api(Interface)
@can_edit_all_api(Interface)
2021-05-13 17:29:49 +00:00
def assign_ip(request, mac_address):
2021-05-13 17:30:52 +00:00
"""Autoassign ip in the Authorize and Post-Auth steps of the Radius
Parameters:
mac_address (string): mac address of the device which is trying to connect
Return:
200 if it worked
400 if it failed, and the reason why
"""
2021-05-17 19:59:33 +00:00
interface = Interface.objects.filter(mac_address=mac_address).first()
2021-05-13 17:29:49 +00:00
try:
interface.assign_ipv4()
return Response()
except ValidationError as err:
return Response(err.message, status=400)