mirror of
https://gitlab2.federez.net/re2o/re2o
synced 2024-11-24 20:33:11 +00:00
2780 lines
92 KiB
Python
Executable file
2780 lines
92 KiB
Python
Executable file
# -*- mode: python; coding: utf-8 -*-
|
|
# Re2o est un logiciel d'administration développé initiallement au Rézo Metz.
|
|
# Il se veut agnostique au réseau considéré, de manière à être installable
|
|
# en quelques clics.
|
|
#
|
|
# Copyright © 2017-2020 Gabriel Détraz
|
|
# Copyright © 2017-2020 Lara Kermarec
|
|
# Copyright © 2017-2020 Augustin Lemesle
|
|
# Copyright © 2017-2020 Hugo Levy--Falk
|
|
# Copyright © 2017-2020 Jean-Romain Garnier
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along
|
|
# with this program; if not, write to the Free Software Foundation, Inc.,
|
|
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
"""
|
|
The database models for the 'users' app of re2o.
|
|
|
|
The goal is to keep the main actions here, i.e. the 'clean' and 'save'
|
|
function are higly reposnsible for the changes, checking the coherence of the
|
|
data and the good behaviour in general for not breaking the database.
|
|
|
|
For further details on each of those models, see the documentation details for
|
|
each.
|
|
|
|
Here are defined the following django models :
|
|
* Users : Adherent and Club (which inherit from Base User Abstract of django).
|
|
* Whitelists
|
|
* Bans
|
|
* Schools (teaching structures)
|
|
* Rights (Groups and ListRight)
|
|
* ServiceUser (for ldap connexions)
|
|
"""
|
|
|
|
|
|
from __future__ import unicode_literals
|
|
|
|
import datetime
|
|
import re
|
|
import sys
|
|
import traceback
|
|
import uuid
|
|
from datetime import timedelta
|
|
from io import BytesIO
|
|
|
|
from django import forms
|
|
from django.contrib.auth.models import (AbstractBaseUser, BaseUserManager,
|
|
Group, PermissionsMixin)
|
|
from django.core.files.uploadedfile import InMemoryUploadedFile
|
|
from django.core.validators import RegexValidator
|
|
from django.db import models, transaction
|
|
from django.db.models import Q
|
|
from django.db.models.signals import m2m_changed, post_delete, post_save
|
|
from django.dispatch import receiver
|
|
from django.forms import ValidationError
|
|
from django.template import loader
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
from django.utils.functional import cached_property
|
|
from django.utils.translation import ugettext_lazy as _
|
|
from PIL import Image
|
|
from reversion import revisions as reversion
|
|
|
|
from cotisations.models import Cotisation, Facture, Paiement, Vente
|
|
from machines.models import Domain, Interface, Machine, regen
|
|
from preferences.models import (AssoOption, GeneralOption, MailMessageOption,
|
|
OptionalMachine, OptionalUser)
|
|
from re2o.base import smtp_check
|
|
from re2o.field_permissions import FieldPermissionModelMixin
|
|
from re2o.mail_utils import send_mail
|
|
from re2o.mixins import AclMixin, RevMixin
|
|
from re2o.settings import GID_RANGES, LDAP, UID_RANGES
|
|
from users import signals
|
|
|
|
# General utilities
|
|
|
|
|
|
def linux_user_check(login):
|
|
"""Check if a login comply with unix base login policy
|
|
|
|
Parameters:
|
|
login (string): Login to check
|
|
|
|
Returns:
|
|
boolean: True if login comply with policy
|
|
"""
|
|
UNIX_LOGIN_PATTERN = re.compile("^[a-z][a-z0-9-]*[$]?$")
|
|
return UNIX_LOGIN_PATTERN.match(login)
|
|
|
|
|
|
def linux_user_validator(login):
|
|
"""Check if a login comply with unix base login policy, returns
|
|
a standard Django ValidationError if login is not correct
|
|
|
|
Parameters:
|
|
login (string): Login to check
|
|
|
|
Returns:
|
|
ValidationError if login comply with policy
|
|
"""
|
|
if not linux_user_check(login):
|
|
raise forms.ValidationError(
|
|
_('The username "%(label)s" contains forbidden characters.'),
|
|
params={"label": login},
|
|
)
|
|
|
|
|
|
def get_fresh_user_uid():
|
|
"""Return a fresh unused uid.
|
|
|
|
Returns:
|
|
uid (int): The fresh uid available
|
|
"""
|
|
uids = list(range(int(min(UID_RANGES["users"])), int(max(UID_RANGES["users"]))))
|
|
try:
|
|
used_uids = list(User.objects.values_list("uid_number", flat=True))
|
|
except:
|
|
used_uids = []
|
|
free_uids = [id for id in uids if id not in used_uids]
|
|
return min(free_uids)
|
|
|
|
|
|
def get_fresh_gid():
|
|
"""Return a fresh unused gid.
|
|
|
|
Returns:
|
|
uid (int): The fresh gid available
|
|
"""
|
|
gids = list(range(int(min(GID_RANGES["posix"])), int(max(GID_RANGES["posix"]))))
|
|
used_gids = list(ListRight.objects.values_list("gid", flat=True))
|
|
free_gids = [id for id in gids if id not in used_gids]
|
|
return min(free_gids)
|
|
|
|
|
|
class UserManager(BaseUserManager):
|
|
"""User manager basique de django"""
|
|
|
|
def _create_user(self, pseudo, surname, email, password=None, su=False):
|
|
if not pseudo:
|
|
raise ValueError(_("Users must have an username."))
|
|
|
|
if not linux_user_check(pseudo):
|
|
raise ValueError(_("Username should only contain [a-z0-9-]."))
|
|
|
|
user = Adherent(
|
|
pseudo=pseudo,
|
|
surname=surname,
|
|
name=surname,
|
|
email=self.normalize_email(email),
|
|
)
|
|
|
|
user.set_password(password)
|
|
user.confirm_mail()
|
|
if su:
|
|
user.is_superuser = True
|
|
user.save(using=self._db)
|
|
return user
|
|
|
|
def create_user(self, pseudo, surname, email, password=None):
|
|
"""
|
|
Creates and saves a User with the given pseudo, name, surname, email,
|
|
and password.
|
|
"""
|
|
return self._create_user(pseudo, surname, email, password, False)
|
|
|
|
def create_superuser(self, pseudo, surname, email, password):
|
|
"""
|
|
Creates and saves a superuser with the given pseudo, name, surname,
|
|
email, and password.
|
|
"""
|
|
return self._create_user(pseudo, surname, email, password, True)
|
|
|
|
|
|
class User(
|
|
RevMixin, FieldPermissionModelMixin, AbstractBaseUser, PermissionsMixin, AclMixin
|
|
):
|
|
"""Base re2o User model
|
|
|
|
Attributes:
|
|
surname: surname of the user
|
|
pseudo: login of the user
|
|
email: The main email of the user
|
|
local_email_redirect: Option for redirection of all emails to the main email
|
|
local_email_enabled: If True, enable a local email account
|
|
school: Optional field, the school of the user
|
|
shell: User shell linux
|
|
comment: Optionnal comment field
|
|
pwd_ntlm: Hash password in ntlm for freeradius
|
|
state: State of the user, can be active, not yet active, etc (see below)
|
|
email_state: State of the main email (if confirmed or not)
|
|
registered: Date of initial creation
|
|
telephone: Phone number
|
|
uid_number: Linux uid of this user
|
|
legacy_uid: Optionnal legacy user id
|
|
shortcuts_enabled : Option for js shortcuts
|
|
email_change_date: Date of the last email change
|
|
"""
|
|
|
|
STATE_ACTIVE = 0
|
|
STATE_DISABLED = 1
|
|
STATE_ARCHIVE = 2
|
|
STATE_NOT_YET_ACTIVE = 3
|
|
STATE_FULL_ARCHIVE = 4
|
|
STATES = (
|
|
(0, _("Active")),
|
|
(1, _("Disabled")),
|
|
(2, _("Archived")),
|
|
(3, _("Not yet active")),
|
|
(4, _("Fully archived")),
|
|
)
|
|
|
|
EMAIL_STATE_VERIFIED = 0
|
|
EMAIL_STATE_UNVERIFIED = 1
|
|
EMAIL_STATE_PENDING = 2
|
|
EMAIL_STATES = (
|
|
(0, _("Confirmed")),
|
|
(1, _("Not confirmed")),
|
|
(2, _("Waiting for email confirmation")),
|
|
)
|
|
|
|
surname = models.CharField(max_length=255)
|
|
pseudo = models.CharField(
|
|
max_length=32,
|
|
unique=True,
|
|
help_text=_("Must only contain letters, numerals or dashes."),
|
|
validators=[linux_user_validator],
|
|
)
|
|
email = models.EmailField(
|
|
blank=True,
|
|
default="",
|
|
help_text=_("External email address allowing us to contact you."),
|
|
)
|
|
local_email_redirect = models.BooleanField(
|
|
default=False,
|
|
help_text=_(
|
|
"Enable redirection of the local email messages to the"
|
|
" main email address."
|
|
),
|
|
)
|
|
local_email_enabled = models.BooleanField(
|
|
default=False, help_text=_("Enable the local email account.")
|
|
)
|
|
school = models.ForeignKey(
|
|
"School",
|
|
on_delete=models.PROTECT,
|
|
null=True,
|
|
blank=True,
|
|
help_text=_("Education institute."),
|
|
)
|
|
shell = models.ForeignKey(
|
|
"ListShell",
|
|
on_delete=models.PROTECT,
|
|
null=True,
|
|
blank=True,
|
|
help_text=_("Unix shell."),
|
|
)
|
|
comment = models.CharField(
|
|
help_text=_("Comment, school year."), max_length=255, blank=True
|
|
)
|
|
pwd_ntlm = models.CharField(max_length=255)
|
|
state = models.IntegerField(
|
|
choices=STATES, default=STATE_NOT_YET_ACTIVE, help_text=_("Account state.")
|
|
)
|
|
email_state = models.IntegerField(choices=EMAIL_STATES, default=EMAIL_STATE_PENDING)
|
|
registered = models.DateTimeField(auto_now_add=True)
|
|
telephone = models.CharField(max_length=15, blank=True, null=True)
|
|
uid_number = models.PositiveIntegerField(default=get_fresh_user_uid, unique=True)
|
|
legacy_uid = models.PositiveIntegerField(
|
|
unique=True,
|
|
blank=True,
|
|
null=True,
|
|
help_text=_("Optionnal legacy uid, for import and transition purpose"),
|
|
)
|
|
shortcuts_enabled = models.BooleanField(
|
|
verbose_name=_("enable shortcuts on Re2o website"), default=True
|
|
)
|
|
email_change_date = models.DateTimeField(auto_now_add=True)
|
|
theme = models.CharField(max_length=255, default="default.css")
|
|
|
|
USERNAME_FIELD = "pseudo"
|
|
REQUIRED_FIELDS = ["surname", "email"]
|
|
|
|
objects = UserManager()
|
|
request = None
|
|
|
|
class Meta:
|
|
permissions = (
|
|
("change_user_password", _("Can change the password of a user")),
|
|
("change_user_state", _("Can edit the state of a user")),
|
|
("change_user_force", _("Can force the move")),
|
|
("change_user_shell", _("Can edit the shell of a user")),
|
|
("change_user_pseudo", _("Can edit the pseudo of a user")),
|
|
(
|
|
"change_user_groups",
|
|
_("Can edit the groups of rights of a user (critical permission)"),
|
|
),
|
|
("change_all_users", _("Can edit all users, including those with rights")),
|
|
)
|
|
verbose_name = _("user (member or club)")
|
|
verbose_name_plural = _("users (members or clubs)")
|
|
|
|
###### Shortcuts and methods for user instance ######
|
|
|
|
@cached_property
|
|
def name(self):
|
|
"""Shortcuts, returns name attribute if the user is linked with
|
|
an adherent instance.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
name (string): Name value if available
|
|
"""
|
|
if self.is_class_adherent:
|
|
return self.adherent.name
|
|
else:
|
|
return ""
|
|
|
|
@cached_property
|
|
def room(self):
|
|
"""Shortcuts, returns room attribute; unique for adherent
|
|
and multiple (queryset) for club.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
room (room instance): Room instance
|
|
"""
|
|
if self.is_class_adherent:
|
|
return self.adherent.room
|
|
elif self.is_class_club:
|
|
return self.club.room
|
|
else:
|
|
raise NotImplementedError(_("Unknown type."))
|
|
|
|
@cached_property
|
|
def get_mail_addresses(self):
|
|
"""Shortcuts, returns all local email address queryset only if local_email
|
|
global option is enabled.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
emailaddresse_set (queryset): All Email address of the local account
|
|
"""
|
|
if self.local_email_enabled:
|
|
return self.emailaddress_set.all()
|
|
return None
|
|
|
|
@cached_property
|
|
def get_mail(self):
|
|
"""Shortcuts, returns the email address to use to contact the instance user self.
|
|
Depends on if local_email account has been activated, otherwise returns self.email.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
email (string): The correct email to use
|
|
"""
|
|
if (
|
|
not OptionalUser.get_cached_value("local_email_accounts_enabled")
|
|
or not self.local_email_enabled
|
|
or self.local_email_redirect
|
|
):
|
|
return str(self.email)
|
|
else:
|
|
return str(self.emailaddress_set.get(local_part=self.pseudo.lower()))
|
|
|
|
@cached_property
|
|
def class_type(self):
|
|
"""Shortcuts, returns the class string "Adherent" of "Club", related with the
|
|
self instance.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
class (string): The class "Adherent" or "Club"
|
|
"""
|
|
if hasattr(self, "adherent"):
|
|
return "Adherent"
|
|
elif hasattr(self, "club"):
|
|
return "Club"
|
|
else:
|
|
raise NotImplementedError(_("Unknown type."))
|
|
|
|
@cached_property
|
|
def class_display(self):
|
|
"""Shortcuts, returns the pretty string "Member" of "Club", related with the
|
|
self instance.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
class (string): "Member" or "Club"
|
|
"""
|
|
if hasattr(self, "adherent"):
|
|
return _("Member")
|
|
elif hasattr(self, "club"):
|
|
return _("Club")
|
|
else:
|
|
raise NotImplementedError(_("Unknown type."))
|
|
|
|
@cached_property
|
|
def gid_number(self):
|
|
"""Shortcuts, returns the main and default gid for users,
|
|
from settings file
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
gid (int): Default gid number
|
|
"""
|
|
return int(LDAP["user_gid"])
|
|
|
|
@cached_property
|
|
def gid(self):
|
|
"""Shortcuts, returns the main and default gid for users,
|
|
from settings file
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
gid (int): Default gid number
|
|
"""
|
|
return LDAP["user_gid"]
|
|
|
|
@cached_property
|
|
def is_class_club(self):
|
|
"""Shortcuts, returns if the instance related with user is
|
|
a club.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
boolean : Returns true if this user is a club
|
|
"""
|
|
return hasattr(self, "club")
|
|
|
|
@cached_property
|
|
def is_class_adherent(self):
|
|
"""Shortcuts, returns if the instance related with user is
|
|
an adherent.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
boolean : Returns true if this user is an adherent
|
|
"""
|
|
return hasattr(self, "adherent")
|
|
|
|
@property
|
|
def is_active(self):
|
|
"""Shortcuts, used by django for allowing connection from this user.
|
|
Returns True if this user has state active, or not yet active,
|
|
or if preferences allows connection for archived users.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
boolean : Returns true if this user is allow to connect.
|
|
"""
|
|
allow_archived = OptionalUser.get_cached_value("allow_archived_connexion")
|
|
return (
|
|
self.state == self.STATE_ACTIVE
|
|
or self.state == self.STATE_NOT_YET_ACTIVE
|
|
or (
|
|
allow_archived
|
|
and self.state in (self.STATE_ARCHIVE, self.STATE_FULL_ARCHIVE)
|
|
)
|
|
)
|
|
|
|
@property
|
|
def is_staff(self):
|
|
"""Shortcuts, used by django for admin pannel access, shortcuts to
|
|
is_admin.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
boolean : Returns true if this user is_staff.
|
|
"""
|
|
return self.is_admin
|
|
|
|
@property
|
|
def is_admin(self):
|
|
"""Shortcuts, used by django for admin pannel access. Test if user
|
|
instance is_superuser or member of admin group.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
boolean : Returns true if this user is allow to access to admin pannel.
|
|
"""
|
|
admin, _ = Group.objects.get_or_create(name="admin")
|
|
return self.is_superuser or admin in self.groups.all()
|
|
|
|
def get_full_name(self):
|
|
"""Shortcuts, returns pretty full name to display both in case of user
|
|
is a club or an adherent.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
full_name (string) : Returns full name, name + surname.
|
|
"""
|
|
name = self.name
|
|
if name:
|
|
return "%s %s" % (name, self.surname)
|
|
else:
|
|
return self.surname
|
|
|
|
def get_short_name(self):
|
|
"""Shortcuts, returns short name to display both in case of user is
|
|
a club or an adherent.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
surname (string) : Returns surname.
|
|
"""
|
|
return self.surname
|
|
|
|
@property
|
|
def get_shell(self):
|
|
"""Shortcuts, returns linux user shell to use for this user if
|
|
provided, otherwise the default shell defined in preferences.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
shell (linux shell) : Returns linux shell.
|
|
"""
|
|
return self.shell or OptionalUser.get_cached_value("shell_default")
|
|
|
|
@cached_property
|
|
def home_directory(self):
|
|
"""Shortcuts, returns linux user home directory to use.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
home dir (string) : Returns home directory.
|
|
"""
|
|
return "/home/" + self.pseudo
|
|
|
|
@cached_property
|
|
def get_shadow_expire(self):
|
|
"""Shortcuts, returns the shadow expire value : 0 if this account is
|
|
disabled or if the email has not been verified to block the account
|
|
access.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
shadow_expire (int) : Shadow expire value.
|
|
"""
|
|
if (
|
|
self.state == self.STATE_DISABLED
|
|
or self.email_state == self.EMAIL_STATE_UNVERIFIED
|
|
):
|
|
return str(0)
|
|
else:
|
|
return None
|
|
|
|
@cached_property
|
|
def solde(self):
|
|
"""Shortcuts, calculate and returns the balance for this user, as a
|
|
dynamic balance beetween debiti (-) and credit (+) "Vente" objects
|
|
flaged as balance operations.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
solde (float) : The balance of the user.
|
|
"""
|
|
solde_objects = Paiement.objects.filter(is_balance=True)
|
|
somme_debit = (
|
|
Vente.objects.filter(
|
|
facture__in=Facture.objects.filter(
|
|
user=self, paiement__in=solde_objects, valid=True
|
|
)
|
|
).aggregate(
|
|
total=models.Sum(
|
|
models.F("prix") * models.F("number"),
|
|
output_field=models.DecimalField(),
|
|
)
|
|
)[
|
|
"total"
|
|
]
|
|
or 0
|
|
)
|
|
somme_credit = (
|
|
Vente.objects.filter(
|
|
facture__in=Facture.objects.filter(user=self, valid=True), name="solde"
|
|
).aggregate(
|
|
total=models.Sum(
|
|
models.F("prix") * models.F("number"),
|
|
output_field=models.DecimalField(),
|
|
)
|
|
)[
|
|
"total"
|
|
]
|
|
or 0
|
|
)
|
|
return somme_credit - somme_debit
|
|
|
|
@cached_property
|
|
def email_address(self):
|
|
"""Shortcuts, returns all the email addresses (queryset) associated
|
|
with the local account, if the account has been activated,
|
|
otherwise return a none queryset.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
email_address (django queryset) : Returns a queryset containing
|
|
EMailAddress of this user.
|
|
"""
|
|
if (
|
|
OptionalUser.get_cached_value("local_email_accounts_enabled")
|
|
and self.local_email_enabled
|
|
):
|
|
return self.emailaddress_set.all()
|
|
return EMailAddress.objects.none()
|
|
|
|
def end_adhesion(self):
|
|
"""Methods, calculate and returns the end of membership value date of
|
|
this user with aggregation of Cotisation objects linked to user
|
|
instance.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
end_adhesion (date) : Date of the end of the membership.
|
|
"""
|
|
date_max = Cotisation.objects.filter(
|
|
vente__in=Vente.objects.filter(
|
|
facture__in=Facture.objects.filter(user=self).exclude(valid=False)
|
|
)
|
|
).aggregate(models.Max("date_end_memb"))["date_end_memb__max"]
|
|
return date_max
|
|
|
|
def end_connexion(self):
|
|
"""Methods, calculate and returns the end of connection subscription value date
|
|
of this user with aggregation of Cotisation objects linked to user instance.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
end_adhesion (date) : Date of the end of the connection subscription.
|
|
"""
|
|
date_max = Cotisation.objects.filter(
|
|
vente__in=Vente.objects.filter(
|
|
facture__in=Facture.objects.filter(user=self).exclude(valid=False)
|
|
)
|
|
).aggregate(models.Max("date_end_con"))["date_end_con__max"]
|
|
return date_max
|
|
|
|
def is_adherent(self):
|
|
"""Methods, calculate and returns if the user has a valid membership by testing
|
|
if end_adherent is after now or not.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
is_adherent (boolean) : True is user has a valid membership.
|
|
"""
|
|
end = self.end_adhesion()
|
|
if not end:
|
|
return False
|
|
elif end < timezone.now():
|
|
return False
|
|
else:
|
|
return True
|
|
# it looks wrong, we should check if there is a cotisation where
|
|
# were date_start_memb < timezone.now() < date_end_memb,
|
|
# in case the user purshased a cotisation starting in the futur
|
|
# somehow
|
|
|
|
def is_connected(self):
|
|
"""Methods, calculate and returns if the user has a valid membership AND a
|
|
valid connection subscription by testing if end_connexion is after now or not.
|
|
If true, returns is_adherent() method value.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
is_connected (boolean) : True is user has a valid membership and a valid connexion.
|
|
"""
|
|
end = self.end_connexion()
|
|
if not end:
|
|
return False
|
|
elif end < timezone.now():
|
|
return False
|
|
else:
|
|
return self.is_adherent()
|
|
# it looks wrong, we should check if there is a cotisation where
|
|
# were date_start_con < timezone.now() < date_end_con,
|
|
# in case the user purshased a cotisation starting in the futur
|
|
# somehow
|
|
|
|
def end_ban(self):
|
|
"""Methods, calculate and returns the end of a ban value date
|
|
of this user with aggregation of ban objects linked to user instance.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
end_ban (date) : Date of the end of the bans objects.
|
|
"""
|
|
date_max = Ban.objects.filter(user=self).aggregate(models.Max("date_end"))[
|
|
"date_end__max"
|
|
]
|
|
return date_max
|
|
|
|
def end_whitelist(self):
|
|
"""Methods, calculate and returns the end of a whitelist value date
|
|
of this user with aggregation of whitelists objects linked to user instance.
|
|
|
|
Parameters:
|
|
self (user instance): user to return infos
|
|
|
|
Returns:
|
|
end_whitelist (date) : Date of the end of the whitelists objects.
|
|
"""
|
|
date_max = Whitelist.objects.filter(user=self).aggregate(
|
|
models.Max("date_end")
|
|
)["date_end__max"]
|
|
return date_max
|
|
|
|
def is_ban(self):
|
|
"""Methods, calculate and returns if the user is banned by testing
|
|
if end_ban is after now or not.
|
|
|
|
parameters:
|
|
self (user instance): user to return infos
|
|
|
|
returns:
|
|
is_ban (boolean) : true if user is under a ban sanction decision.
|
|
"""
|
|
end = self.end_ban()
|
|
if not end:
|
|
return False
|
|
elif end < timezone.now():
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def is_whitelisted(self):
|
|
"""Methods, calculate and returns if the user has a whitelist free connection
|
|
if end_whitelist is after now or not.
|
|
|
|
parameters:
|
|
self (user instance): user to return infos
|
|
|
|
returns:
|
|
is_whitelisted (boolean) : true if user has a whitelist connection.
|
|
"""
|
|
end = self.end_whitelist()
|
|
if not end:
|
|
return False
|
|
elif end < timezone.now():
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def has_access(self):
|
|
"""Methods, returns if the user has an internet access.
|
|
Return True if user is active and has a verified email, is not under a ban
|
|
decision and has a valid membership and connection or a whitelist.
|
|
|
|
parameters:
|
|
self (user instance): user to return infos
|
|
|
|
returns:
|
|
has_access (boolean) : true if user has an internet connection.
|
|
"""
|
|
return (
|
|
self.state == User.STATE_ACTIVE
|
|
and self.email_state != User.EMAIL_STATE_UNVERIFIED
|
|
and not self.is_ban()
|
|
and (self.is_connected() or self.is_whitelisted())
|
|
) or self == AssoOption.get_cached_value("utilisateur_asso")
|
|
|
|
def end_access(self):
|
|
"""Methods, returns the date of the end of the connection for this user,
|
|
as the maximum date beetween connection (membership objects) and whitelists.
|
|
|
|
parameters:
|
|
self (user instance): user to return infos
|
|
|
|
returns:
|
|
end_access (datetime) : Returns the date of the end_access connection.
|
|
"""
|
|
if not self.end_connexion():
|
|
if not self.end_whitelist():
|
|
return None
|
|
else:
|
|
return self.end_whitelist()
|
|
else:
|
|
if not self.end_whitelist():
|
|
return self.end_connexion()
|
|
else:
|
|
return max(self.end_connexion(), self.end_whitelist())
|
|
|
|
@classmethod
|
|
def users_interfaces(cls, users, active=True, all_interfaces=False):
|
|
"""Class method, returns all interfaces related/belonging to users
|
|
contained in query_sert "users".
|
|
|
|
Parameters:
|
|
users (list of users queryset): users which interfaces
|
|
have to be returned
|
|
active (boolean): If true, filter on interfaces
|
|
all_interfaces (boolean): If true, returns all interfaces
|
|
|
|
returns:
|
|
interfaces (queryset): Queryset of interfaces instances
|
|
|
|
"""
|
|
if all_interfaces:
|
|
return Interface.objects.filter(
|
|
machine__in=Machine.objects.filter(user__in=users)
|
|
).select_related("domain__extension")
|
|
else:
|
|
return Interface.objects.filter(
|
|
machine__in=Machine.objects.filter(user__in=users, active=active)
|
|
).select_related("domain__extension")
|
|
|
|
def user_interfaces(self, active=True, all_interfaces=False):
|
|
"""Method, returns all interfaces related/belonging to an user.
|
|
|
|
Parameters:
|
|
self (user instance): user which interfaces
|
|
have to be returned
|
|
active (boolean): If true, filter on interfaces
|
|
all_interfaces (boolean): If true, returns all interfaces
|
|
|
|
returns:
|
|
interfaces (queryset): Queryset of interfaces instances
|
|
|
|
"""
|
|
return self.users_interfaces(
|
|
[self], active=active, all_interfaces=all_interfaces
|
|
)
|
|
|
|
###### Methods and user edition functions, modify user attributes ######
|
|
|
|
def set_active(self):
|
|
"""Method, make this user active. Called in post-saved of subscription,
|
|
set the state value active if state is not_yet_active, with
|
|
a valid membership.
|
|
Also make an archived user fully active.
|
|
|
|
Parameters:
|
|
self (user instance): user to set active
|
|
|
|
"""
|
|
if self.state == self.STATE_NOT_YET_ACTIVE:
|
|
# Look for ventes with non 0 subscription duration in the invoices set
|
|
not_zero = (
|
|
self.facture_set.filter(valid=True)
|
|
.exclude(Q(vente__duration_membership=0))
|
|
.exists()
|
|
)
|
|
days_not_zero = (
|
|
self.facture_set.filter(valid=True)
|
|
.exclude(Q(vente__duration_days_membership=0))
|
|
.exists()
|
|
)
|
|
if (
|
|
not_zero
|
|
or days_not_zero
|
|
or OptionalUser.get_cached_value("all_users_active")
|
|
):
|
|
self.state = self.STATE_ACTIVE
|
|
self.save()
|
|
if self.state == self.STATE_ARCHIVE or self.state == self.STATE_FULL_ARCHIVE:
|
|
self.unarchive()
|
|
self.state = self.STATE_ACTIVE
|
|
self.save()
|
|
|
|
def set_password(self, password):
|
|
"""Method, overload the basic set_password inherited from django BaseUser.
|
|
Called when setting a new password, to set the classic django password
|
|
hashed, and also the NTLM hashed pwd_ntlm password.
|
|
|
|
Parameters:
|
|
self (user instance): user to set password
|
|
password (string): new password (cleatext) to set.
|
|
|
|
"""
|
|
from re2o.login import hashNT
|
|
|
|
super().set_password(password)
|
|
self.pwd_ntlm = hashNT(password)
|
|
return
|
|
|
|
def confirm_mail(self):
|
|
"""Method, set the email_state to VERIFIED when the email has been verified.
|
|
|
|
Parameters:
|
|
self (user instance): user to set password
|
|
|
|
"""
|
|
self.email_state = self.EMAIL_STATE_VERIFIED
|
|
|
|
def assign_ips(self):
|
|
"""Method, assigns ipv4 to all interfaces related to a user.
|
|
|
|
Parameters:
|
|
self (user instance): user which interfaces have to be assigned
|
|
|
|
"""
|
|
interfaces = self.user_interfaces()
|
|
with transaction.atomic(), reversion.create_revision():
|
|
Interface.mass_assign_ipv4(interfaces)
|
|
reversion.set_comment("IPv4 assignment")
|
|
|
|
def unassign_ips(self):
|
|
"""Method, unassigns and remove ipv4 to all interfaces related to a user.
|
|
(set ipv4 field to null)
|
|
|
|
Parameters:
|
|
self (user instance): user which interfaces have to be assigned
|
|
|
|
"""
|
|
interfaces = self.user_interfaces()
|
|
with transaction.atomic(), reversion.create_revision():
|
|
Interface.mass_unassign_ipv4(interfaces)
|
|
reversion.set_comment("IPv4 unassignment")
|
|
|
|
@classmethod
|
|
def mass_unassign_ips(cls, users_list):
|
|
"""Class method, unassigns and remove ipv4 to all interfaces related
|
|
to a list of users.
|
|
|
|
Parameters:
|
|
users_list (list of users or queryset): users which interfaces
|
|
have to be unassigned
|
|
|
|
"""
|
|
interfaces = cls.users_interfaces(users_list)
|
|
with transaction.atomic(), reversion.create_revision():
|
|
Interface.mass_unassign_ipv4(interfaces)
|
|
reversion.set_comment("IPv4 assignment")
|
|
|
|
def disable_email(self):
|
|
"""Method, disable email account and email redirection for
|
|
an user.
|
|
|
|
Parameters:
|
|
self (user instance): user to disabled email.
|
|
|
|
"""
|
|
self.local_email_enabled = False
|
|
self.local_email_redirect = False
|
|
|
|
@classmethod
|
|
def mass_disable_email(cls, queryset_users):
|
|
"""Class method, disable email accounts and email redirection for
|
|
a list of users (or queryset).
|
|
|
|
Parameters:
|
|
users_list (list of users or queryset): users which email
|
|
account to disable.
|
|
|
|
"""
|
|
queryset_users.update(local_email_enabled=False)
|
|
queryset_users.update(local_email_redirect=False)
|
|
|
|
def delete_data(self):
|
|
"""Method, delete non mandatory data, delete machine,
|
|
and disable email accounts for a list of users (or queryset).
|
|
Called during full archive process.
|
|
|
|
Parameters:
|
|
self (user instance): user to delete data.
|
|
|
|
"""
|
|
self.disable_email()
|
|
self.machine_set.all().delete()
|
|
|
|
@classmethod
|
|
def mass_delete_data(cls, queryset_users):
|
|
"""Class method, delete non mandatory data, delete machine
|
|
and disable email accounts for a list of users (or queryset).
|
|
Called during full archive process.
|
|
|
|
Parameters:
|
|
users_list (list of users or queryset): users to perform
|
|
delete data.
|
|
|
|
"""
|
|
cls.mass_disable_email(queryset_users)
|
|
Machine.mass_delete(Machine.objects.filter(user__in=queryset_users))
|
|
signals.remove_mass.send(sender=cls, queryset=queryset_users)
|
|
|
|
def archive(self):
|
|
"""Method, archive user by unassigning ips.
|
|
|
|
Parameters:
|
|
self (user instance): user to archive.
|
|
|
|
"""
|
|
self.unassign_ips()
|
|
|
|
@classmethod
|
|
def mass_archive(cls, users_list):
|
|
"""Class method, mass archive a queryset of users.
|
|
Called during archive process, unassign ip and set to
|
|
archive state.
|
|
|
|
Parameters:
|
|
users_list (list of users queryset): users to perform
|
|
mass archive.
|
|
|
|
"""
|
|
# Force eval of queryset
|
|
bool(users_list)
|
|
users_list = users_list.all()
|
|
cls.mass_unassign_ips(users_list)
|
|
users_list.update(state=User.STATE_ARCHIVE)
|
|
|
|
def full_archive(self):
|
|
"""Method, full archive an user by unassigning ips, deleting data
|
|
and authentication deletion.
|
|
|
|
Parameters:
|
|
self (user instance): user to full archive.
|
|
|
|
"""
|
|
self.archive()
|
|
self.delete_data()
|
|
signals.remove.send(sender=User, instance=self)
|
|
|
|
@classmethod
|
|
def mass_full_archive(cls, users_list):
|
|
"""Class method, mass full archive a queryset of users.
|
|
Called during full archive process, unassign ip, delete
|
|
non mandatory data and set to full archive state.
|
|
|
|
Parameters:
|
|
users_list (list of users queryset): users to perform
|
|
mass full archive.
|
|
|
|
"""
|
|
# Force eval of queryset
|
|
bool(users_list)
|
|
users_list = users_list.all()
|
|
cls.mass_unassign_ips(users_list)
|
|
cls.mass_delete_data(users_list)
|
|
users_list.update(state=User.STATE_FULL_ARCHIVE)
|
|
|
|
def unarchive(self):
|
|
"""Method, unarchive an user by assigning ips, and recreating
|
|
authentication user associated.
|
|
|
|
Parameters:
|
|
self (user instance): user to unarchive.
|
|
|
|
"""
|
|
self.assign_ips()
|
|
signals.synchronise.send(sender=self.__class__, instance=self)
|
|
|
|
def state_sync(self):
|
|
"""Master Method, call unarchive, full_archive or archive method
|
|
on an user when state is changed, based on previous state.
|
|
|
|
Parameters:
|
|
self (user instance): user to sync state.
|
|
|
|
"""
|
|
if (
|
|
self.__original_state != self.STATE_ACTIVE
|
|
and self.state == self.STATE_ACTIVE
|
|
):
|
|
self.unarchive()
|
|
elif (
|
|
self.__original_state != self.STATE_ARCHIVE
|
|
and self.state == self.STATE_ARCHIVE
|
|
):
|
|
self.archive()
|
|
elif (
|
|
self.__original_state != self.STATE_FULL_ARCHIVE
|
|
and self.state == self.STATE_FULL_ARCHIVE
|
|
):
|
|
self.full_archive()
|
|
|
|
###### Send mail functions ######
|
|
|
|
def notif_inscription(self, request=None):
|
|
"""Method/function, send an email 'welcome' to user instance, after
|
|
successfull register.
|
|
|
|
Parameters:
|
|
self (user instance): user to send the welcome email
|
|
request (optional request): Specify request
|
|
|
|
Returns:
|
|
email: Welcome email after user register
|
|
"""
|
|
template = loader.get_template("users/email_welcome")
|
|
mailmessageoptions, _created = MailMessageOption.objects.get_or_create()
|
|
context = {
|
|
"nom": self.get_full_name(),
|
|
"asso_name": AssoOption.get_cached_value("name"),
|
|
"asso_email": AssoOption.get_cached_value("contact"),
|
|
"welcome_mail_fr": mailmessageoptions.welcome_mail_fr,
|
|
"welcome_mail_en": mailmessageoptions.welcome_mail_en,
|
|
"pseudo": self.pseudo,
|
|
}
|
|
|
|
send_mail(
|
|
request,
|
|
"Bienvenue au %(name)s / Welcome to %(name)s"
|
|
% {"name": AssoOption.get_cached_value("name")},
|
|
"",
|
|
GeneralOption.get_cached_value("email_from"),
|
|
[self.email],
|
|
html_message=template.render(context),
|
|
)
|
|
|
|
def reset_passwd_mail(self, request):
|
|
"""Method/function, makes a Request class instance, and send
|
|
an email to user instance for password change in case of initial
|
|
password set or forget password form.
|
|
|
|
Parameters:
|
|
self (user instance): user to send the welcome email
|
|
request: Specify request, mandatory to build the reset link
|
|
|
|
Returns:
|
|
email: Reset password email for user instance
|
|
"""
|
|
req = Request()
|
|
req.type = Request.PASSWD
|
|
req.user = self
|
|
req.save()
|
|
template = loader.get_template("users/email_passwd_request")
|
|
context = {
|
|
"name": req.user.get_full_name(),
|
|
"asso": AssoOption.get_cached_value("name"),
|
|
"asso_mail": AssoOption.get_cached_value("contact"),
|
|
"site_name": GeneralOption.get_cached_value("site_name"),
|
|
"url": request.build_absolute_uri(
|
|
reverse("users:process", kwargs={"token": req.token})
|
|
),
|
|
"expire_in": str(GeneralOption.get_cached_value("req_expire_hrs")),
|
|
}
|
|
|
|
send_mail(
|
|
request,
|
|
"Changement de mot de passe de %(name)s / Password change for "
|
|
"%(name)s" % {"name": AssoOption.get_cached_value("name")},
|
|
template.render(context),
|
|
GeneralOption.get_cached_value("email_from"),
|
|
[req.user.email],
|
|
fail_silently=False,
|
|
)
|
|
|
|
def send_confirm_email_if_necessary(self, request):
|
|
"""Method/function, check if a confirmation by email is needed,
|
|
and trigger send.
|
|
* If the user changed email, it needs to be confirmed
|
|
* If they're not fully archived, send a confirmation email
|
|
|
|
Parameters:
|
|
self (user instance): user to send the confirmation email
|
|
request: Specify request, mandatory to build the reset link
|
|
|
|
Returns:
|
|
boolean: True if a confirmation of the mail is needed
|
|
"""
|
|
# Only update the state if the email changed
|
|
if self.__original_email == self.email:
|
|
return False
|
|
|
|
# If the user was previously in the PENDING or UNVERIFIED state,
|
|
# we can't update email_change_date otherwise it would push back
|
|
# their due date
|
|
# However, if the user is in the VERIFIED state, we reset the date
|
|
if self.email_state == self.EMAIL_STATE_VERIFIED:
|
|
self.email_change_date = timezone.now()
|
|
|
|
# Remember that the user needs to confirm their email address again
|
|
self.email_state = self.EMAIL_STATE_PENDING
|
|
self.save()
|
|
|
|
# Fully archived users shouldn't get an email, so stop here
|
|
if self.state == self.STATE_FULL_ARCHIVE:
|
|
return False
|
|
|
|
# Send the email
|
|
self.confirm_email_address_mail(request)
|
|
return True
|
|
|
|
def trigger_email_changed_state(self, request):
|
|
"""Method/function, update the value of the last email change,
|
|
and call and send the confirm email link.
|
|
Function called only after a manual of email_state by an admin.
|
|
|
|
Parameters:
|
|
self (user instance): user to send the confirmation email
|
|
request: Specify request, mandatory to build the reset link
|
|
|
|
Returns:
|
|
boolean: True if a confirmation of the mail is needed
|
|
"""
|
|
if self.email_state == self.EMAIL_STATE_VERIFIED:
|
|
return False
|
|
|
|
self.email_change_date = timezone.now()
|
|
self.save()
|
|
|
|
self.confirm_email_address_mail(request)
|
|
return True
|
|
|
|
def confirm_email_before_date(self):
|
|
"""Method/function, calculate the maximum date for confirmation
|
|
of the new email address
|
|
|
|
Parameters:
|
|
self (user instance): user to calculate maximum date
|
|
for confirmation
|
|
|
|
Returns:
|
|
date: Date of the maximum time to perform email confirmation
|
|
"""
|
|
if self.email_state == self.EMAIL_STATE_VERIFIED:
|
|
return None
|
|
|
|
days = OptionalUser.get_cached_value("disable_emailnotyetconfirmed")
|
|
return self.email_change_date + timedelta(days=days)
|
|
|
|
def confirm_email_address_mail(self, request):
|
|
"""Method/function, makes a Request class instance, and send
|
|
an email to user instance to confirm a new email address.
|
|
* If the user changed email, it needs to be confirmed
|
|
* If they're not fully archived, send a confirmation email
|
|
|
|
Parameters:
|
|
self (user instance): user to send the confirmation email
|
|
request: Specify request, mandatory to build the reset link
|
|
|
|
Returns:
|
|
email: An email with a link to confirm the new email address
|
|
"""
|
|
# Delete all older requests for this user, that aren't for this email
|
|
filter = Q(user=self) & Q(type=Request.EMAIL) & ~Q(email=self.email)
|
|
Request.objects.filter(filter).delete()
|
|
|
|
# Create the request and send the email
|
|
req = Request()
|
|
req.type = Request.EMAIL
|
|
req.user = self
|
|
req.email = self.email
|
|
req.save()
|
|
|
|
template = loader.get_template("users/email_confirmation_request")
|
|
context = {
|
|
"name": req.user.get_full_name(),
|
|
"asso": AssoOption.get_cached_value("name"),
|
|
"asso_mail": AssoOption.get_cached_value("contact"),
|
|
"site_name": GeneralOption.get_cached_value("site_name"),
|
|
"url": request.build_absolute_uri(
|
|
reverse("users:process", kwargs={"token": req.token})
|
|
),
|
|
"expire_in": str(GeneralOption.get_cached_value("req_expire_hrs")),
|
|
"confirm_before_fr": self.confirm_email_before_date().strftime("%d/%m/%Y"),
|
|
"confirm_before_en": self.confirm_email_before_date().strftime("%Y-%m-%d"),
|
|
}
|
|
|
|
send_mail(
|
|
request,
|
|
"Confirmation du mail de %(name)s / Email confirmation for "
|
|
"%(name)s" % {"name": AssoOption.get_cached_value("name")},
|
|
template.render(context),
|
|
GeneralOption.get_cached_value("email_from"),
|
|
[req.user.email],
|
|
fail_silently=False,
|
|
)
|
|
return
|
|
|
|
def autoregister_machine(self, mac_address, nas_type, request=None):
|
|
"""Function, register a new interface on the user instance account.
|
|
Called automaticaly mainly by freeradius python backend, for autoregister.
|
|
|
|
Parameters:
|
|
self (user instance): user to register new interface
|
|
mac_address (string): New mac address to add on the new interface
|
|
nas_type (Django Nas object instance): The nas object calling
|
|
request: Optional django request
|
|
|
|
Returns:
|
|
interface (Interface instance): The new interface registered
|
|
|
|
"""
|
|
allowed, _message, _rights = Machine.can_create(self, self.id)
|
|
if not allowed:
|
|
return False, _("Maximum number of registered machines reached.")
|
|
if not nas_type:
|
|
return False, _("Re2o doesn't know wich machine type to assign.")
|
|
machine_type_cible = nas_type.machine_type
|
|
try:
|
|
machine_parent = Machine()
|
|
machine_parent.user = self
|
|
interface_cible = Interface()
|
|
interface_cible.mac_address = mac_address
|
|
interface_cible.machine_type = machine_type_cible
|
|
interface_cible.clean()
|
|
machine_parent.clean()
|
|
domain = Domain()
|
|
domain.name = self.get_next_domain_name()
|
|
domain.interface_parent = interface_cible
|
|
domain.clean()
|
|
machine_parent.save()
|
|
interface_cible.machine = machine_parent
|
|
interface_cible.save()
|
|
domain.interface_parent = interface_cible
|
|
domain.clean()
|
|
domain.save()
|
|
self.notif_auto_newmachine(interface_cible)
|
|
except Exception as error:
|
|
return False, traceback.format_exc()
|
|
return interface_cible, _("OK")
|
|
|
|
def notif_auto_newmachine(self, interface):
|
|
"""Function/method, send an email to notify the new interface
|
|
registered on user instance account.
|
|
|
|
Parameters:
|
|
self (user instance): user to notify new registration
|
|
interface (interface instance): new interface registered
|
|
|
|
Returns:
|
|
boolean: True if a confirmation of the mail is needed
|
|
"""
|
|
template = loader.get_template("users/email_auto_newmachine")
|
|
context = {
|
|
"nom": self.get_full_name(),
|
|
"mac_address": interface.mac_address,
|
|
"asso_name": AssoOption.get_cached_value("name"),
|
|
"interface_name": interface.domain,
|
|
"asso_email": AssoOption.get_cached_value("contact"),
|
|
"pseudo": self.pseudo,
|
|
}
|
|
|
|
send_mail(
|
|
None,
|
|
"Ajout automatique d'une machine / New machine autoregistered",
|
|
"",
|
|
GeneralOption.get_cached_value("email_from"),
|
|
[self.email],
|
|
html_message=template.render(context),
|
|
)
|
|
return
|
|
|
|
def notif_disable(self, request=None):
|
|
"""Function/method, send an email to notify that the account is disabled
|
|
in case of unconfirmed email address.
|
|
|
|
Parameters:
|
|
self (user instance): user to notif disabled decision
|
|
request (django request): request to build email
|
|
|
|
Returns:
|
|
email: Notification email
|
|
"""
|
|
template = loader.get_template("users/email_disable_notif")
|
|
context = {
|
|
"name": self.get_full_name(),
|
|
"asso_name": AssoOption.get_cached_value("name"),
|
|
"asso_email": AssoOption.get_cached_value("contact"),
|
|
"site_name": GeneralOption.get_cached_value("site_name"),
|
|
}
|
|
|
|
send_mail(
|
|
request,
|
|
"Suspension automatique / Automatic suspension",
|
|
template.render(context),
|
|
GeneralOption.get_cached_value("email_from"),
|
|
[self.email],
|
|
fail_silently=False,
|
|
)
|
|
return
|
|
|
|
def get_next_domain_name(self):
|
|
"""Function/method, provide a unique name for a new interface.
|
|
|
|
Parameters:
|
|
self (user instance): user to get a new domain name
|
|
|
|
Returns:
|
|
domain name (string): String of new domain name
|
|
"""
|
|
|
|
def simple_pseudo():
|
|
"""Renvoie le pseudo sans underscore (compat dns)"""
|
|
return self.pseudo.replace("_", "-").lower()
|
|
|
|
def composed_pseudo(name):
|
|
"""Renvoie le resultat de simplepseudo et rajoute le nom"""
|
|
return simple_pseudo() + str(name)
|
|
|
|
num = 0
|
|
while Domain.objects.filter(name=composed_pseudo(num)):
|
|
num += 1
|
|
return composed_pseudo(num)
|
|
|
|
def can_edit(self, user_request, *_args, **_kwargs):
|
|
"""Check if a user can edit a user object.
|
|
|
|
:param self: The user which is to be edited.
|
|
:param user_request: The user who requests to edit self.
|
|
:return: a message and a boolean which is True if self is a club and
|
|
user_request one of its member, or if user_request is self, or if
|
|
user_request has the 'cableur' right.
|
|
"""
|
|
if self.state in (self.STATE_ARCHIVE, self.STATE_FULL_ARCHIVE):
|
|
warning_message = _("This user is archived.")
|
|
else:
|
|
warning_message = None
|
|
|
|
if self.is_class_club and user_request.is_class_adherent:
|
|
if (
|
|
self == user_request
|
|
or user_request.has_perm("users.change_user")
|
|
or user_request.adherent in self.club.administrators.all()
|
|
):
|
|
return True, warning_message, None
|
|
else:
|
|
return (
|
|
False,
|
|
_("You don't have the right to edit this club."),
|
|
("users.change_user",),
|
|
)
|
|
else:
|
|
if self == user_request:
|
|
return True, warning_message, None
|
|
elif user_request.has_perm("users.change_all_users"):
|
|
return True, warning_message, None
|
|
elif user_request.has_perm("users.change_user"):
|
|
if self.groups.filter(listright__critical=True):
|
|
return (
|
|
False,
|
|
_("User with critical rights, can't be edited."),
|
|
("users.change_all_users",),
|
|
)
|
|
elif self == AssoOption.get_cached_value("utilisateur_asso"):
|
|
return (
|
|
False,
|
|
_(
|
|
"Impossible to edit the organisation's"
|
|
' user without the "change_all_users" right.'
|
|
),
|
|
("users.change_all_users",),
|
|
)
|
|
else:
|
|
return True, warning_message, None
|
|
elif user_request.has_perm("users.change_all_users"):
|
|
return True, warning_message, None
|
|
else:
|
|
return (
|
|
False,
|
|
_("You don't have the right to edit another user."),
|
|
("users.change_user", "users.change_all_users"),
|
|
)
|
|
|
|
def can_change_password(self, user_request, *_args, **_kwargs):
|
|
"""Check if a user can change a user's password
|
|
|
|
:param self: The user which is to be edited
|
|
:param user_request: The user who request to edit self
|
|
:returns: a message and a boolean which is True if self is a club
|
|
and user_request one of it's admins, or if user_request is self,
|
|
or if user_request has the right to change other's password
|
|
"""
|
|
if self.is_class_club and user_request.is_class_adherent:
|
|
if (
|
|
self == user_request
|
|
or user_request.has_perm("users.change_user_password")
|
|
or user_request.adherent in self.club.administrators.all()
|
|
):
|
|
return True, None, None
|
|
else:
|
|
return (
|
|
False,
|
|
_("You don't have the right to edit this club."),
|
|
("users.change_user_password",),
|
|
)
|
|
else:
|
|
if self == user_request or user_request.has_perm(
|
|
"users.change_user_groups"
|
|
):
|
|
# Peut éditer les groupes d'un user,
|
|
# c'est un privilège élevé, True
|
|
return True, None, None
|
|
elif user_request.has_perm("users.change_user") and not self.groups.all():
|
|
return True, None, None
|
|
else:
|
|
return (
|
|
False,
|
|
_("You don't have the right to edit another user."),
|
|
("users.change_user_groups", "users.change_user"),
|
|
)
|
|
|
|
def check_selfpasswd(self, user_request, *_args, **_kwargs):
|
|
"""Returns (True, None, None) if user_request is self, else returns
|
|
(False, None, None)
|
|
"""
|
|
return user_request == self, None, None
|
|
|
|
def can_change_room(self, user_request, *_args, **_kwargs):
|
|
"""Check if a user can change a room
|
|
|
|
:param user_request: The user who request
|
|
:returns: a message and a boolean which is True if the user has
|
|
the right to change a state
|
|
"""
|
|
if not (
|
|
(
|
|
self.pk == user_request.pk
|
|
and OptionalUser.get_cached_value("self_room_policy")
|
|
!= OptionalUser.DISABLED
|
|
)
|
|
or user_request.has_perm("users.change_user")
|
|
):
|
|
return (
|
|
False,
|
|
_("You don't have the right to change the room."),
|
|
("users.change_user",),
|
|
)
|
|
else:
|
|
return True, None, None
|
|
|
|
@staticmethod
|
|
def can_change_state(user_request, *_args, **_kwargs):
|
|
"""Check if a user can change a state
|
|
|
|
:param user_request: The user who request
|
|
:returns: a message and a boolean which is True if the user has
|
|
the right to change a state
|
|
"""
|
|
can = user_request.has_perm("users.change_user_state")
|
|
return (
|
|
can,
|
|
_("You don't have the right to change the state.") if not can else None,
|
|
("users.change_user_state",),
|
|
)
|
|
|
|
def can_change_shell(self, user_request, *_args, **_kwargs):
|
|
"""Check if a user can change a shell
|
|
|
|
:param user_request: The user who request
|
|
:returns: a message and a boolean which is True if the user has
|
|
the right to change a shell
|
|
"""
|
|
if not (
|
|
(
|
|
self.pk == user_request.pk
|
|
and OptionalUser.get_cached_value("self_change_shell")
|
|
)
|
|
or user_request.has_perm("users.change_user_shell")
|
|
):
|
|
return (
|
|
False,
|
|
_("You don't have the right to change the shell."),
|
|
("users.change_user_shell",),
|
|
)
|
|
else:
|
|
return True, None, None
|
|
|
|
def can_change_pseudo(self, user_request, *_args, **_kwargs):
|
|
"""Check if a user can change a pseudo
|
|
|
|
:param user_request: The user who request
|
|
:returns: a message and a boolean which is True if the user has
|
|
the right to change a shell
|
|
"""
|
|
if not (
|
|
(
|
|
self.pk == user_request.pk
|
|
and OptionalUser.get_cached_value("self_change_pseudo")
|
|
)
|
|
or user_request.has_perm("users.change_user_pseudo")
|
|
or not self.pk
|
|
):
|
|
return (
|
|
False,
|
|
_("You don't have the right to change the pseudo."),
|
|
("users.change_user_pseudo",),
|
|
)
|
|
else:
|
|
return True, None, None
|
|
|
|
@staticmethod
|
|
def can_change_local_email_redirect(user_request, *_args, **_kwargs):
|
|
"""Check if a user can change local_email_redirect.
|
|
|
|
:param user_request: The user who request
|
|
:returns: a message and a boolean which is True if the user has
|
|
the right to change a redirection
|
|
"""
|
|
can = OptionalUser.get_cached_value("local_email_accounts_enabled")
|
|
return (
|
|
can,
|
|
_("Local email accounts must be enabled.") if not can else None,
|
|
None,
|
|
)
|
|
|
|
@staticmethod
|
|
def can_change_local_email_enabled(user_request, *_args, **_kwargs):
|
|
"""Check if a user can change internal address.
|
|
|
|
:param user_request: The user who request
|
|
:returns: a message and a boolean which is True if the user has
|
|
the right to change internal address
|
|
"""
|
|
can = OptionalUser.get_cached_value("local_email_accounts_enabled")
|
|
return (
|
|
can,
|
|
_("Local email accounts must be enabled.") if not can else None,
|
|
None,
|
|
)
|
|
|
|
@staticmethod
|
|
def can_change_force(user_request, *_args, **_kwargs):
|
|
"""Check if a user can change a force
|
|
|
|
:param user_request: The user who request
|
|
:returns: a message and a boolean which is True if the user has
|
|
the right to change a force
|
|
"""
|
|
can = user_request.has_perm("users.change_user_force")
|
|
return (
|
|
can,
|
|
_("You don't have the right to force the move.") if not can else None,
|
|
("users.change_user_force",),
|
|
)
|
|
|
|
@staticmethod
|
|
def can_change_groups(user_request, *_args, **_kwargs):
|
|
"""Check if a user can change a group
|
|
|
|
:param user_request: The user who request
|
|
:returns: a message and a boolean which is True if the user has
|
|
the right to change a group
|
|
"""
|
|
can = user_request.has_perm("users.change_user_groups")
|
|
return (
|
|
can,
|
|
_("You don't have the right to edit the user's groups of rights.")
|
|
if not can
|
|
else None,
|
|
("users.change_user_groups"),
|
|
)
|
|
|
|
@staticmethod
|
|
def can_change_is_superuser(user_request, *_args, **_kwargs):
|
|
"""Check if an user can change a is_superuser flag
|
|
|
|
:param user_request: The user who request
|
|
:returns: a message and a boolean which is True if permission is granted.
|
|
|
|
"""
|
|
can = user_request.is_superuser
|
|
return (
|
|
can,
|
|
_('"superuser" right required to edit the superuser flag.')
|
|
if not can
|
|
else None,
|
|
[],
|
|
)
|
|
|
|
def can_view(self, user_request, *_args, **_kwargs):
|
|
"""Check if an user can view an user object.
|
|
|
|
:param self: The targeted user.
|
|
:param user_request: The user who ask for viewing the target.
|
|
:return: A boolean telling if the acces is granted and an explanation
|
|
text
|
|
|
|
"""
|
|
if self.is_class_club and user_request.is_class_adherent:
|
|
if (
|
|
self == user_request
|
|
or user_request.has_perm("users.view_user")
|
|
or user_request.adherent in self.club.administrators.all()
|
|
or user_request.adherent in self.club.members.all()
|
|
):
|
|
return True, None, None
|
|
else:
|
|
return (
|
|
False,
|
|
_("You don't have the right to view this club."),
|
|
("users.view_user",),
|
|
)
|
|
else:
|
|
if self == user_request or user_request.has_perm("users.view_user"):
|
|
return True, None, None
|
|
else:
|
|
return (
|
|
False,
|
|
_("You don't have the right to view another user."),
|
|
("users.view_user",),
|
|
)
|
|
|
|
@staticmethod
|
|
def can_view_all(user_request, *_args, **_kwargs):
|
|
"""Check if an user can access to the list of every user objects
|
|
|
|
:param user_request: The user who wants to view the list.
|
|
:return: True if the user can view the list and an explanation
|
|
message.
|
|
|
|
"""
|
|
can = user_request.has_perm("users.view_user")
|
|
return (
|
|
can,
|
|
_("You don't have the right to view the list of users.")
|
|
if not can
|
|
else None,
|
|
("users.view_user",),
|
|
)
|
|
|
|
def can_delete(self, user_request, *_args, **_kwargs):
|
|
"""Check if an user can delete an user object.
|
|
|
|
:param self: The user who is to be deleted.
|
|
:param user_request: The user who requests deletion.
|
|
:return: True if user_request has the right 'bureau', and a
|
|
message.
|
|
"""
|
|
can = user_request.has_perm("users.delete_user")
|
|
return (
|
|
can,
|
|
_("You don't have the right to delete this user.") if not can else None,
|
|
("users.delete_user",),
|
|
)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(User, self).__init__(*args, **kwargs)
|
|
self.field_permissions = {
|
|
"shell": self.can_change_shell,
|
|
"pseudo": self.can_change_pseudo,
|
|
"force": self.can_change_force,
|
|
"selfpasswd": self.check_selfpasswd,
|
|
"local_email_redirect": self.can_change_local_email_redirect,
|
|
"local_email_enabled": self.can_change_local_email_enabled,
|
|
"room": self.can_change_room,
|
|
}
|
|
self.__original_state = self.state
|
|
self.__original_email = self.email
|
|
|
|
def clean_pseudo(self, *args, **kwargs):
|
|
"""Method, clean the pseudo value. The pseudo must be unique, but also
|
|
it must not already be used an an email address, so a check is performed.
|
|
|
|
Parameters:
|
|
self (user instance): user to clean pseudo value.
|
|
|
|
Returns:
|
|
Django ValidationError: if the pseudo value can not be used.
|
|
|
|
"""
|
|
if EMailAddress.objects.filter(local_part=self.pseudo.lower()).exclude(
|
|
user_id=self.id
|
|
):
|
|
raise ValidationError(_("This username is already used."))
|
|
|
|
def clean_email(self, *args, **kwargs):
|
|
"""Method, clean the email value.
|
|
Validate that:
|
|
* An email value has been provided; email field can't be nullified.
|
|
(the user must be reachable by email)
|
|
* The provided email is not a local email to avoid loops
|
|
* Set the email as lower.
|
|
|
|
Parameters:
|
|
self (user instance): user to clean email value.
|
|
|
|
Returns:
|
|
Django ValidationError: if the email value can not be used.
|
|
|
|
"""
|
|
is_created = not self.pk
|
|
if not self.email and (self.__original_email or is_created):
|
|
raise forms.ValidationError(_("Email field cannot be empty."))
|
|
|
|
self.email = self.email.lower()
|
|
|
|
if OptionalUser.get_cached_value("local_email_domain") in self.email:
|
|
raise forms.ValidationError(
|
|
_("You can't use a {} address as an external contact address.").format(
|
|
OptionalUser.get_cached_value("local_email_domain")
|
|
)
|
|
)
|
|
|
|
def clean(self, *args, **kwargs):
|
|
"""Method, general clean for User model.
|
|
Clean pseudo and clean email.
|
|
|
|
Parameters:
|
|
self (user instance): user to clean.
|
|
|
|
"""
|
|
super(User, self).clean(*args, **kwargs)
|
|
self.clean_pseudo(*args, **kwargs)
|
|
self.clean_email(*args, **kwargs)
|
|
|
|
def __str__(self):
|
|
return self.pseudo
|
|
|
|
@property
|
|
def theme_name(self):
|
|
"""Return the theme without the extension
|
|
|
|
Returns:
|
|
str: name of theme
|
|
"""
|
|
return self.theme.split(".")[0]
|
|
|
|
|
|
class Adherent(User):
|
|
"""Base re2o Adherent model, inherit from User. Add other attributes.
|
|
|
|
Attributes:
|
|
name: name of the user
|
|
room: room of the user
|
|
gpg_fingerprint: The gpgfp of the user
|
|
|
|
"""
|
|
|
|
name = models.CharField(max_length=255)
|
|
room = models.OneToOneField(
|
|
"topologie.Room", on_delete=models.PROTECT, blank=True, null=True
|
|
)
|
|
gpg_fingerprint = models.CharField(max_length=49, blank=True, null=True)
|
|
|
|
class Meta(User.Meta):
|
|
verbose_name = _("member")
|
|
verbose_name_plural = _("members")
|
|
|
|
def format_gpgfp(self):
|
|
"""Method, format the gpgfp value, with blocks of 4 characters,
|
|
as AAAA BBBB instead of AAAABBBB.
|
|
|
|
Parameters:
|
|
self (user instance): user to clean gpgfp value.
|
|
|
|
"""
|
|
self.gpg_fingerprint = " ".join(
|
|
[
|
|
self.gpg_fingerprint[i : i + 4]
|
|
for i in range(0, len(self.gpg_fingerprint), 4)
|
|
]
|
|
)
|
|
|
|
def validate_gpgfp(self):
|
|
"""Method, clean the gpgfp value, validate if the raw entry is a valid gpg fp.
|
|
|
|
Parameters:
|
|
self (user instance): user to clean gpgfp check.
|
|
|
|
Returns:
|
|
Django ValidationError: if the gpgfp value is invalid.
|
|
|
|
"""
|
|
if self.gpg_fingerprint:
|
|
gpg_fingerprint = self.gpg_fingerprint.replace(" ", "").upper()
|
|
if not re.match("^[0-9A-F]{40}$", gpg_fingerprint):
|
|
raise ValidationError(
|
|
_("A GPG fingerprint must contain 40 hexadecimal characters.")
|
|
)
|
|
self.gpg_fingerprint = gpg_fingerprint
|
|
|
|
@classmethod
|
|
def get_instance(cls, object_id, *_args, **_kwargs):
|
|
"""Try to find an instance of `Adherent` with the given id.
|
|
|
|
:param object_id: The id of the adherent we are looking for.
|
|
:return: An adherent.
|
|
|
|
"""
|
|
return cls.objects.get(pk=object_id)
|
|
|
|
@staticmethod
|
|
def can_create(user_request, *_args, **_kwargs):
|
|
"""Check if an user can create an user object.
|
|
|
|
:param user_request: The user who wants to create a user object.
|
|
:return: a message and a boolean which is True if the user can create
|
|
a user or if the `options.all_can_create` is set.
|
|
|
|
"""
|
|
if not user_request.is_authenticated:
|
|
if not OptionalUser.get_cached_value("self_adhesion"):
|
|
return False, _("Self registration is disabled."), None
|
|
else:
|
|
return True, None, None
|
|
else:
|
|
if OptionalUser.get_cached_value("all_can_create_adherent"):
|
|
return True, None, None
|
|
else:
|
|
can = user_request.has_perm("users.add_user")
|
|
return (
|
|
can,
|
|
_("You don't have the right to create a user.")
|
|
if not can
|
|
else None,
|
|
("users.add_user",),
|
|
)
|
|
|
|
@classmethod
|
|
def can_list(cls, user_request, *_args, **_kwargs):
|
|
"""Users can list adherent only if they are :
|
|
- Members of view acl,
|
|
- Club administrator.
|
|
|
|
:param user_request: The user who wants to view the list.
|
|
:return: True if the user can view the list and an explanation
|
|
message.
|
|
|
|
"""
|
|
can, _message, _group = Club.can_view_all(user_request)
|
|
if user_request.has_perm("users.view_user") or can:
|
|
return (True, None, None, cls.objects.all())
|
|
else:
|
|
return (
|
|
True,
|
|
_("You don't have the right to list all adherents."),
|
|
("users.view_user",),
|
|
cls.objects.none(),
|
|
)
|
|
|
|
def clean(self, *args, **kwargs):
|
|
"""Method, clean and validate the gpgfp value.
|
|
|
|
Parameters:
|
|
self (user instance): user to perform clean.
|
|
|
|
"""
|
|
super(Adherent, self).clean(*args, **kwargs)
|
|
if self.gpg_fingerprint:
|
|
self.validate_gpgfp()
|
|
self.format_gpgfp()
|
|
|
|
|
|
class Club(User):
|
|
"""A class representing a club (it is considered as a user
|
|
with special informations)
|
|
|
|
Attributes:
|
|
administrators: administrators of the club
|
|
members: members of the club
|
|
room: room(s) of the club
|
|
mailing: Boolean, activate mailing list for this club.
|
|
|
|
"""
|
|
|
|
room = models.ForeignKey(
|
|
"topologie.Room", on_delete=models.PROTECT, blank=True, null=True
|
|
)
|
|
administrators = models.ManyToManyField(
|
|
blank=True, to="users.Adherent", related_name="club_administrator"
|
|
)
|
|
members = models.ManyToManyField(
|
|
blank=True, to="users.Adherent", related_name="club_members"
|
|
)
|
|
mailing = models.BooleanField(default=False)
|
|
|
|
class Meta(User.Meta):
|
|
verbose_name = _("club")
|
|
verbose_name_plural = _("clubs")
|
|
|
|
@staticmethod
|
|
def can_create(user_request, *_args, **_kwargs):
|
|
"""Check if an user can create an user object.
|
|
|
|
:param user_request: The user who wants to create a user object.
|
|
:return: a message and a boolean which is True if the user can create
|
|
an user or if the `options.all_can_create` is set.
|
|
"""
|
|
if not user_request.is_authenticated:
|
|
return False, _("You must be authenticated."), None
|
|
else:
|
|
if OptionalUser.get_cached_value("all_can_create_club"):
|
|
return True, None, None
|
|
else:
|
|
can = user_request.has_perm("users.add_user")
|
|
return (
|
|
can,
|
|
_("You don't have the right to create a club.")
|
|
if not can
|
|
else None,
|
|
("users.add_user",),
|
|
)
|
|
|
|
@staticmethod
|
|
def can_view_all(user_request, *_args, **_kwargs):
|
|
"""Check if an user can access to the list of every user objects
|
|
|
|
:param user_request: The user who wants to view the list.
|
|
:return: True if the user can view the list and an explanation
|
|
message.
|
|
"""
|
|
if user_request.has_perm("users.view_user"):
|
|
return True, None, None
|
|
if (
|
|
hasattr(user_request, "is_class_adherent")
|
|
and user_request.is_class_adherent
|
|
):
|
|
if (
|
|
user_request.adherent.club_administrator.all()
|
|
or user_request.adherent.club_members.all()
|
|
):
|
|
return True, None, None
|
|
return (
|
|
False,
|
|
_("You don't have the right to view the list of users."),
|
|
("users.view_user",),
|
|
)
|
|
|
|
@classmethod
|
|
def get_instance(cls, object_id, *_args, **_kwargs):
|
|
"""Try to find an instance of `Club` with the given id.
|
|
|
|
:param object_id: The id of the adherent we are looking for.
|
|
:return: A club.
|
|
"""
|
|
return cls.objects.get(pk=object_id)
|
|
|
|
|
|
@receiver(post_save, sender=Adherent)
|
|
@receiver(post_save, sender=Club)
|
|
@receiver(post_save, sender=User)
|
|
def user_post_save(**kwargs):
|
|
"""Django signal, post save operations on Adherent, Club and User.
|
|
Sync pseudo, sync authentication, create mailalias and send welcome email if needed
|
|
(new user)
|
|
|
|
"""
|
|
is_created = kwargs["created"]
|
|
user = kwargs["instance"]
|
|
EMailAddress.objects.get_or_create(local_part=user.pseudo.lower(), user=user)
|
|
|
|
if is_created:
|
|
user.notif_inscription(user.request)
|
|
user.set_active()
|
|
user.state_sync()
|
|
signals.synchronise.send(
|
|
sender=User,
|
|
instance=user,
|
|
base=True,
|
|
access_refresh=True,
|
|
mac_refresh=False,
|
|
group_refresh=True,
|
|
)
|
|
regen("mailing")
|
|
|
|
|
|
@receiver(m2m_changed, sender=User.groups.through)
|
|
def user_group_relation_changed(**kwargs):
|
|
"""Django signal, used for User Groups change (related models).
|
|
Sync authentication, with calling group_refresh.
|
|
|
|
"""
|
|
action = kwargs["action"]
|
|
if action in ("post_add", "post_remove", "post_clear"):
|
|
user = kwargs["instance"]
|
|
signals.synchronise.send(
|
|
sender=User,
|
|
instance=user,
|
|
base=False,
|
|
access_refresh=False,
|
|
mac_refresh=False,
|
|
group_refresh=True,
|
|
)
|
|
|
|
|
|
@receiver(post_delete, sender=Adherent)
|
|
@receiver(post_delete, sender=Club)
|
|
@receiver(post_delete, sender=User)
|
|
def user_post_delete(**kwargs):
|
|
"""Django signal, post delete operations on Adherent, Club and User.
|
|
Delete user in authentication.
|
|
|
|
"""
|
|
user = kwargs["instance"]
|
|
signals.remove.send(sender=User, instance=user)
|
|
regen("mailing")
|
|
|
|
|
|
class ServiceUser(RevMixin, AclMixin, AbstractBaseUser):
|
|
"""A class representing a serviceuser (it is considered as a user
|
|
with special informations).
|
|
The serviceuser is a special user used with special access to authentication tree. It is
|
|
its only usefullness, and service user can't connect to re2o.
|
|
Each service connected to authentication for auth (ex dokuwiki, owncloud, etc) should
|
|
have a different service user with special acl (readonly, auth) and password.
|
|
|
|
Attributes:
|
|
pseudo: login of the serviceuser
|
|
access_group: acl for this serviceuser
|
|
comment: Comment for this serviceuser.
|
|
|
|
"""
|
|
|
|
readonly = "readonly"
|
|
ACCESS = (("auth", "auth"), ("readonly", "readonly"), ("usermgmt", "usermgmt"))
|
|
|
|
pseudo = models.CharField(
|
|
max_length=32,
|
|
unique=True,
|
|
help_text=_("Must only contain letters, numerals or dashes."),
|
|
validators=[linux_user_validator],
|
|
)
|
|
access_group = models.CharField(choices=ACCESS, default=readonly, max_length=32)
|
|
comment = models.CharField(help_text=_("Comment."), max_length=255, blank=True)
|
|
|
|
USERNAME_FIELD = "pseudo"
|
|
objects = UserManager()
|
|
|
|
class Meta:
|
|
verbose_name = _("service user")
|
|
verbose_name_plural = _("service users")
|
|
|
|
def get_full_name(self):
|
|
"""Shortcuts, return a pretty name for the serviceuser.
|
|
|
|
Parameters:
|
|
self (ServiceUser instance): serviceuser to return infos.
|
|
|
|
"""
|
|
return _("Service user <{name}>").format(name=self.pseudo)
|
|
|
|
def get_short_name(self):
|
|
"""Shortcuts, return the shortname (pseudo) of the serviceuser.
|
|
|
|
Parameters:
|
|
self (ServiceUser instance): serviceuser to return infos.
|
|
|
|
"""
|
|
return self.pseudo
|
|
|
|
def __str__(self):
|
|
return self.pseudo
|
|
|
|
|
|
@receiver(post_save, sender=ServiceUser)
|
|
def service_user_post_save(**kwargs):
|
|
"""Django signal, post save operations on ServiceUser.
|
|
Sync or create serviceuser in authentication.
|
|
|
|
"""
|
|
service_user = kwargs["instance"]
|
|
signals.synchronise.send(sender=ServiceUser, instance=service_user)
|
|
|
|
|
|
@receiver(post_delete, sender=ServiceUser)
|
|
def service_user_post_delete(**kwargs):
|
|
"""Django signal, post delete operations on ServiceUser.
|
|
Delete service user in authentication.
|
|
|
|
"""
|
|
service_user = kwargs["instance"]
|
|
signals.remove.send(sender=ServiceUser, instance=service_user)
|
|
|
|
|
|
class School(RevMixin, AclMixin, models.Model):
|
|
"""A class representing a school; which users are linked.
|
|
|
|
Attributes:
|
|
name: name of the school
|
|
|
|
"""
|
|
|
|
name = models.CharField(max_length=255)
|
|
|
|
class Meta:
|
|
verbose_name = _("school")
|
|
verbose_name_plural = _("schools")
|
|
|
|
@classmethod
|
|
def can_list(cls, user_request, *_args, **_kwargs):
|
|
"""All users can list schools
|
|
|
|
:param user_request: The user who wants to view the list.
|
|
:return: True if the user can view the list and an explanation
|
|
message.
|
|
|
|
"""
|
|
return (True, None, None, cls.objects.all())
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
|
|
class ListRight(RevMixin, AclMixin, Group):
|
|
"""A class representing a listright, inherit from basic django Group object.
|
|
Each listrights/groups gathers several users, and can have individuals django
|
|
rights, like can_view, can_edit, etc.
|
|
Moreover, a ListRight is also a standard unix group, usefull for creating linux
|
|
unix groups for servers access or re2o single rights, or both.
|
|
Gid is used as a primary key, and can't be changed.
|
|
|
|
Attributes:
|
|
name: Inherited from Group, name of the ListRight
|
|
gid: Group id unix
|
|
critical: Boolean, if True the Group can't be changed without special acl
|
|
details: Details and description of the group
|
|
"""
|
|
|
|
unix_name = models.CharField(
|
|
max_length=255,
|
|
unique=True,
|
|
validators=[
|
|
RegexValidator(
|
|
"^[a-z]+$",
|
|
message=(_("UNIX group names can only contain lower case letters.")),
|
|
)
|
|
],
|
|
)
|
|
gid = models.PositiveIntegerField(unique=True, null=True)
|
|
critical = models.BooleanField(default=False)
|
|
details = models.CharField(help_text=_("Description."), max_length=255, blank=True)
|
|
|
|
class Meta:
|
|
verbose_name = _("group of rights")
|
|
verbose_name_plural = _("groups of rights")
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
|
|
@receiver(post_save, sender=ListRight)
|
|
def listright_post_save(**kwargs):
|
|
"""Django signal, post save operations on ListRight/Group objects.
|
|
Sync or create group in authentication.
|
|
|
|
"""
|
|
right = kwargs["instance"]
|
|
signals.synchronise.send(sender=ListRight, instance=right)
|
|
|
|
|
|
@receiver(post_delete, sender=ListRight)
|
|
def listright_post_delete(**kwargs):
|
|
"""Django signal, post delete operations on ListRight/Group objects.
|
|
Delete group in authentication.
|
|
|
|
"""
|
|
right = kwargs["instance"]
|
|
signals.remove.send(sender=ListRight, instance=right)
|
|
|
|
|
|
class ListShell(RevMixin, AclMixin, models.Model):
|
|
"""A class representing a shell; which users are linked.
|
|
A standard linux user shell. (zsh, bash, etc)
|
|
|
|
Attributes:
|
|
shell: name of the shell
|
|
|
|
"""
|
|
|
|
shell = models.CharField(max_length=255, unique=True)
|
|
|
|
class Meta:
|
|
verbose_name = _("shell")
|
|
verbose_name_plural = _("shells")
|
|
|
|
def get_pretty_name(self):
|
|
"""Method, returns a pretty name for a shell like "bash" or "zsh".
|
|
|
|
Parameters:
|
|
self (listshell): Shell to return a pretty name.
|
|
|
|
Returns:
|
|
pretty_name (string): Return a pretty name string for this shell.
|
|
|
|
"""
|
|
return self.shell.split("/")[-1]
|
|
|
|
@classmethod
|
|
def can_list(cls, user_request, *_args, **_kwargs):
|
|
"""All users can list shells
|
|
|
|
:param user_request: The user who wants to view the list.
|
|
:return: True if the user can view the list and an explanation
|
|
message.
|
|
|
|
"""
|
|
return (True, None, None, cls.objects.all())
|
|
|
|
def __str__(self):
|
|
return self.shell
|
|
|
|
|
|
class Ban(RevMixin, AclMixin, models.Model):
|
|
"""A class representing a ban, which cuts internet access,
|
|
as a sanction.
|
|
|
|
Attributes:
|
|
user: related user for this whitelist
|
|
raison: reason of this ban, can be null
|
|
date_start: Date of the start of the ban
|
|
date_end: Date of the end of the ban
|
|
state: Has no effect now, would specify this kind of ban
|
|
(hard, soft)
|
|
"""
|
|
|
|
STATE_HARD = 0
|
|
STATE_SOFT = 1
|
|
STATE_BRIDAGE = 2
|
|
STATES = (
|
|
(0, _("HARD (no access)")),
|
|
(1, _("SOFT (local access only)")),
|
|
(2, _("RESTRICTED (speed limitation)")),
|
|
)
|
|
|
|
user = models.ForeignKey("User", on_delete=models.PROTECT)
|
|
raison = models.CharField(max_length=255)
|
|
date_start = models.DateTimeField(auto_now_add=True)
|
|
date_end = models.DateTimeField()
|
|
state = models.IntegerField(choices=STATES, default=STATE_HARD)
|
|
request = None
|
|
|
|
class Meta:
|
|
verbose_name = _("ban")
|
|
verbose_name_plural = _("bans")
|
|
|
|
def notif_ban(self, request=None):
|
|
"""Function/method, send an email to notify that a ban has been
|
|
decided and internet access disabled.
|
|
|
|
Parameters:
|
|
self (ban instance): ban to notif disabled decision
|
|
request (django request): request to build email
|
|
|
|
Returns:
|
|
email: Notification email
|
|
"""
|
|
template = loader.get_template("users/email_ban_notif")
|
|
context = {
|
|
"name": self.user.get_full_name(),
|
|
"raison": self.raison,
|
|
"date_end": self.date_end,
|
|
"asso_name": AssoOption.get_cached_value("name"),
|
|
}
|
|
|
|
send_mail(
|
|
request,
|
|
"Déconnexion disciplinaire / Disciplinary disconnection",
|
|
template.render(context),
|
|
GeneralOption.get_cached_value("email_from"),
|
|
[self.user.email],
|
|
fail_silently=False,
|
|
)
|
|
return
|
|
|
|
def is_active(self):
|
|
"""Method, return if the ban is active now or not.
|
|
|
|
Parameters:
|
|
self (ban): Ban to test if is active.
|
|
|
|
Returns:
|
|
is_active (boolean): Return True if the ban is active.
|
|
|
|
"""
|
|
return self.date_end > timezone.now()
|
|
|
|
def can_view(self, user_request, *_args, **_kwargs):
|
|
"""Check if an user can view a Ban object.
|
|
|
|
:param self: The targeted object.
|
|
:param user_request: The user who ask for viewing the target.
|
|
:return: A boolean telling if the acces is granted and an explanation
|
|
text
|
|
"""
|
|
if not user_request.has_perm("users.view_ban") and self.user != user_request:
|
|
return (
|
|
False,
|
|
_("You don't have the right to view other bans than yours."),
|
|
("users.view_ban",),
|
|
)
|
|
else:
|
|
return True, None, None
|
|
|
|
def __str__(self):
|
|
return str(self.user) + " " + str(self.raison)
|
|
|
|
|
|
@receiver(post_save, sender=Ban)
|
|
def ban_post_save(**kwargs):
|
|
"""Django signal, post save operations on Ban objects.
|
|
Sync user's access state in authentication, call email notification if needed.
|
|
|
|
"""
|
|
ban = kwargs["instance"]
|
|
is_created = kwargs["created"]
|
|
user = ban.user
|
|
signals.synchronise.send(
|
|
sender=User, instance=user, base=False, access_refresh=True, mac_refresh=False
|
|
)
|
|
regen("mailing")
|
|
if is_created:
|
|
ban.notif_ban(ban.request)
|
|
regen("dhcp")
|
|
regen("mac_ip_list")
|
|
if user.has_access():
|
|
regen("dhcp")
|
|
regen("mac_ip_list")
|
|
|
|
|
|
@receiver(post_delete, sender=Ban)
|
|
def ban_post_delete(**kwargs):
|
|
"""Django signal, post delete operations on Ban objects.
|
|
Sync user's access state in authentication.
|
|
|
|
"""
|
|
user = kwargs["instance"].user
|
|
signals.synchronise.send(
|
|
sender=User, instance=user, base=False, access_refresh=True, mac_refresh=False
|
|
)
|
|
regen("mailing")
|
|
regen("dhcp")
|
|
regen("mac_ip_list")
|
|
|
|
|
|
class Whitelist(RevMixin, AclMixin, models.Model):
|
|
"""A class representing a whitelist, which gives a free internet
|
|
access to a user for special reason.
|
|
Is overrided by a ban object.
|
|
|
|
Attributes:
|
|
user: related user for this whitelist
|
|
raison: reason of this whitelist, can be null
|
|
date_start: Date of the start of the whitelist
|
|
date_end: Date of the end of the whitelist
|
|
"""
|
|
|
|
user = models.ForeignKey("User", on_delete=models.PROTECT)
|
|
raison = models.CharField(max_length=255)
|
|
date_start = models.DateTimeField(auto_now_add=True)
|
|
date_end = models.DateTimeField()
|
|
|
|
class Meta:
|
|
verbose_name = _("whitelist (free of charge access)")
|
|
verbose_name_plural = _("whitelists (free of charge access)")
|
|
|
|
def is_active(self):
|
|
"""Method, returns if the whitelist is active now or not.
|
|
|
|
Parameters:
|
|
self (whitelist): Whitelist to test if is active.
|
|
|
|
Returns:
|
|
is_active (boolean): Return True if the whistelist is active.
|
|
|
|
"""
|
|
return self.date_end > timezone.now()
|
|
|
|
def can_view(self, user_request, *_args, **_kwargs):
|
|
"""Check if an user can view a Whitelist object.
|
|
|
|
:param self: The targeted object.
|
|
:param user_request: The user who ask for viewing the target.
|
|
:return: A boolean telling if the acces is granted and an explanation
|
|
text
|
|
"""
|
|
if (
|
|
not user_request.has_perm("users.view_whitelist")
|
|
and self.user != user_request
|
|
):
|
|
return (
|
|
False,
|
|
_("You don't have the right to view other whitelists than yours."),
|
|
("users.view_whitelist",),
|
|
)
|
|
else:
|
|
return True, None, None
|
|
|
|
def __str__(self):
|
|
return str(self.user) + " " + str(self.raison)
|
|
|
|
|
|
@receiver(post_save, sender=Whitelist)
|
|
def whitelist_post_save(**kwargs):
|
|
"""Django signal, post save operations on Whitelist objects.
|
|
Sync user's access state in authentication.
|
|
|
|
"""
|
|
whitelist = kwargs["instance"]
|
|
user = whitelist.user
|
|
signals.synchronise.send(
|
|
sender=User, instance=user, base=False, access_refresh=True, mac_refresh=False
|
|
)
|
|
is_created = kwargs["created"]
|
|
regen("mailing")
|
|
if is_created:
|
|
regen("dhcp")
|
|
regen("mac_ip_list")
|
|
if user.has_access():
|
|
regen("dhcp")
|
|
regen("mac_ip_list")
|
|
|
|
|
|
@receiver(post_delete, sender=Whitelist)
|
|
def whitelist_post_delete(**kwargs):
|
|
"""Django signal, post delete operations on Whitelist objects.
|
|
Sync user's access state in authentication.
|
|
|
|
"""
|
|
user = kwargs["instance"].user
|
|
signals.synchronise.send(
|
|
sender=User, instance=user, base=False, access_refresh=True, mac_refresh=False
|
|
)
|
|
regen("mailing")
|
|
regen("dhcp")
|
|
regen("mac_ip_list")
|
|
|
|
|
|
class Request(models.Model):
|
|
"""A class representing for user's request of reset password by email, or
|
|
confirm a new email address, with a link.
|
|
|
|
Attributes:
|
|
type: type of request (password, or confirm email address)
|
|
token: single-user token for this request
|
|
user: related user for this request
|
|
email: If needed, related email to send the request and the link
|
|
created_at: Date at the request was created
|
|
expires_at: The request will be invalid after the expires_at date
|
|
"""
|
|
|
|
PASSWD = "PW"
|
|
EMAIL = "EM"
|
|
TYPE_CHOICES = ((PASSWD, _("Password")), (EMAIL, _("Email address")))
|
|
type = models.CharField(max_length=2, choices=TYPE_CHOICES)
|
|
token = models.CharField(max_length=32)
|
|
user = models.ForeignKey("User", on_delete=models.CASCADE)
|
|
email = models.EmailField(blank=True, null=True)
|
|
created_at = models.DateTimeField(auto_now_add=True, editable=False)
|
|
expires_at = models.DateTimeField()
|
|
|
|
def save(self):
|
|
if not self.expires_at:
|
|
self.expires_at = timezone.now() + datetime.timedelta(
|
|
hours=GeneralOption.get_cached_value("req_expire_hrs")
|
|
)
|
|
if not self.token:
|
|
self.token = str(uuid.uuid4()).replace("-", "") # remove hyphens
|
|
super(Request, self).save()
|
|
|
|
|
|
class EMailAddress(RevMixin, AclMixin, models.Model):
|
|
"""A class representing an EMailAddress, for local emailaccounts
|
|
support. Each emailaddress belongs to a user.
|
|
|
|
Attributes:
|
|
user: parent user address for this email
|
|
local_part: local extension of the email
|
|
"""
|
|
|
|
user = models.ForeignKey(
|
|
User, on_delete=models.CASCADE, help_text=_("User of the local email account.")
|
|
)
|
|
local_part = models.CharField(
|
|
unique=True, max_length=128, help_text=_("Local part of the email address.")
|
|
)
|
|
|
|
class Meta:
|
|
verbose_name = _("local email account")
|
|
verbose_name_plural = _("local email accounts")
|
|
|
|
def __str__(self):
|
|
return str(self.local_part) + OptionalUser.get_cached_value(
|
|
"local_email_domain"
|
|
)
|
|
|
|
@cached_property
|
|
def complete_email_address(self):
|
|
"""Shortcuts, returns a complete mailaddress from localpart and emaildomain
|
|
specified in preferences.
|
|
|
|
Parameters:
|
|
self (emailaddress): emailaddress.
|
|
|
|
Returns:
|
|
emailaddress (string): Complete valid emailaddress
|
|
|
|
"""
|
|
return str(self.local_part) + OptionalUser.get_cached_value(
|
|
"local_email_domain"
|
|
)
|
|
|
|
@staticmethod
|
|
def can_create(user_request, userid, *_args, **_kwargs):
|
|
"""Check if a user can create a `EMailAddress` object.
|
|
|
|
Args:
|
|
user_request: The user who wants to create the object.
|
|
userid: The id of the user to whom the account is to be created
|
|
|
|
Returns:
|
|
a message and a boolean which is True if the user can create
|
|
a local email account.
|
|
"""
|
|
if user_request.has_perm("users.add_emailaddress"):
|
|
return True, None, None
|
|
if not OptionalUser.get_cached_value("local_email_accounts_enabled"):
|
|
return (False, _("The local email accounts are not enabled."), None)
|
|
if int(user_request.id) != int(userid):
|
|
return (
|
|
False,
|
|
_(
|
|
"You don't have the right to add a local email"
|
|
" account to another user."
|
|
),
|
|
("users.add_emailaddress",),
|
|
)
|
|
elif user_request.email_address.count() >= OptionalUser.get_cached_value(
|
|
"max_email_address"
|
|
):
|
|
return (
|
|
False,
|
|
_("You reached the limit of {} local email accounts.").format(
|
|
OptionalUser.get_cached_value("max_email_address")
|
|
),
|
|
None,
|
|
)
|
|
return True, None, None
|
|
|
|
def can_view(self, user_request, *_args, **_kwargs):
|
|
"""Check if a user can view the local email account
|
|
|
|
Args:
|
|
user_request: The user who wants to view the object.
|
|
|
|
Returns:
|
|
a message and a boolean which is True if the user can see
|
|
the local email account.
|
|
"""
|
|
if user_request.has_perm("users.view_emailaddress"):
|
|
return True, None, None
|
|
if not OptionalUser.get_cached_value("local_email_accounts_enabled"):
|
|
return (False, _("The local email accounts are not enabled."), None)
|
|
if user_request == self.user:
|
|
return True, None, None
|
|
return (
|
|
False,
|
|
_(
|
|
"You don't have the right to view another user's local"
|
|
" email account."
|
|
),
|
|
("users.view_emailaddress",),
|
|
)
|
|
|
|
def can_delete(self, user_request, *_args, **_kwargs):
|
|
"""Check if a user can delete the alias
|
|
|
|
Args:
|
|
user_request: The user who wants to delete the object.
|
|
|
|
Returns:
|
|
a message and a boolean which is True if the user can delete
|
|
the local email account.
|
|
"""
|
|
if self.local_part == self.user.pseudo.lower():
|
|
return (
|
|
False,
|
|
_(
|
|
"You can't delete a local email account whose"
|
|
" local part is the same as the username."
|
|
),
|
|
None,
|
|
)
|
|
if user_request.has_perm("users.delete_emailaddress"):
|
|
return True, None, None
|
|
if not OptionalUser.get_cached_value("local_email_accounts_enabled"):
|
|
return False, _("The local email accounts are not enabled."), None
|
|
if user_request == self.user:
|
|
return True, None, None
|
|
return (
|
|
False,
|
|
_(
|
|
"You don't have the right to delete another user's"
|
|
" local email account."
|
|
),
|
|
("users.delete_emailaddress",),
|
|
)
|
|
|
|
def can_edit(self, user_request, *_args, **_kwargs):
|
|
"""Check if a user can edit the alias
|
|
|
|
Args:
|
|
user_request: The user who wants to edit the object.
|
|
|
|
Returns:
|
|
a message and a boolean which is True if the user can edit
|
|
the local email account.
|
|
"""
|
|
if self.local_part == self.user.pseudo.lower():
|
|
return (
|
|
False,
|
|
_(
|
|
"You can't edit a local email account whose local"
|
|
" part is the same as the username."
|
|
),
|
|
None,
|
|
)
|
|
if user_request.has_perm("users.change_emailaddress"):
|
|
return True, None, None
|
|
if not OptionalUser.get_cached_value("local_email_accounts_enabled"):
|
|
return False, _("The local email accounts are not enabled."), None
|
|
if user_request == self.user:
|
|
return True, None, None
|
|
return (
|
|
False,
|
|
_(
|
|
"You don't have the right to edit another user's local"
|
|
" email account."
|
|
),
|
|
("users.change_emailaddress",),
|
|
)
|
|
|
|
def clean(self, *args, **kwargs):
|
|
"""Method, general clean for EMailAddres model.
|
|
Clean email local_part field, checking if it is available by calling
|
|
the smtp..
|
|
|
|
Parameters:
|
|
self (emailaddress): emailaddress local_part to clean.
|
|
|
|
Returns:
|
|
Django ValidationError, if the localpart does not comply with the policy.
|
|
|
|
"""
|
|
self.local_part = self.local_part.lower()
|
|
if "@" in self.local_part or "+" in self.local_part:
|
|
raise ValidationError(_("The local part must not contain @ or +."))
|
|
result, reason = smtp_check(self.local_part)
|
|
if result:
|
|
raise ValidationError(reason)
|
|
super(EMailAddress, self).clean(*args, **kwargs)
|