8
0
Fork 0
mirror of https://gitlab2.federez.net/re2o/re2o synced 2025-01-22 16:14:28 +00:00
re2o/logs/models.py

980 lines
31 KiB
Python
Raw Normal View History

2020-04-22 16:17:06 +00:00
# -*- mode: python; coding: utf-8 -*-
2020-11-23 16:06:37 +00:00
# Re2o est un logiciel d'administration développé initiallement au Rézo Metz. Il
2020-04-22 16:17:06 +00:00
# se veut agnostique au réseau considéré, de manière à être installable en
# quelques clics.
#
# Copyright © 2020 Jean-Romain Garnier
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""logs.models
The models definitions for the logs app
2020-04-22 16:17:06 +00:00
"""
2021-02-10 10:06:09 +00:00
from django.apps import apps
from django.contrib.auth.models import Group
2020-04-24 13:37:05 +00:00
from django.db.models import Q
2021-02-10 10:06:09 +00:00
from django.utils.translation import ugettext_lazy as _
2020-04-24 21:14:08 +00:00
from macaddress.fields import default_dialect
2021-02-10 10:06:09 +00:00
from netaddr import EUI
from reversion.models import Revision, Version
2021-02-10 10:06:09 +00:00
from machines.models import Interface, IpList, Machine, MachineType
from topologie.models import Port, Room
from users.models import Adherent, Club, User
2020-04-22 16:17:06 +00:00
from .forms import classes_for_action_type
2020-04-22 16:17:06 +00:00
2020-04-24 14:43:11 +00:00
def make_version_filter(key, value):
"""
Builds a filter for a Version object to filter by argument in its
serialized_date
:param key: str, The argument's key
:param value: str or int, The argument's value
:returns: A Q filter
"""
# The lookup is done in a json string, so it has to be formated
# based on the value's type (to add " or not)
if type(value) is str:
2021-02-10 10:06:09 +00:00
formatted_value = '"{}"'.format(value)
else:
formatted_value = str(value)
2021-02-10 10:06:09 +00:00
return Q(serialized_data__contains='"{}": {},'.format(key, formatted_value)) | Q(
serialized_data__contains='"{}": {}}}'.format(key, formatted_value)
)
############################
# Machine history search #
############################
2021-02-10 10:06:09 +00:00
class MachineHistorySearchEvent:
def __init__(self, user, machine, interface, start=None, end=None):
"""Initialise an instance of MachineHistorySearchEvent.
Args:
user: User, the user owning the machine at the time of the event.
machine: Version, the machine version related to the interface.
interface: Version, the interface targeted by this event.
start: datetime, the date at which this version was created
(default: None).
end: datetime, the date at which this version was replace by a new
one (default: None).
"""
2020-04-22 16:17:06 +00:00
self.user = user
self.machine = machine
self.interface = interface
self.ipv4 = IpList.objects.get(id=interface.field_dict["ipv4_id"]).ipv4
self.mac = self.interface.field_dict["mac_address"]
self.start_date = start
self.end_date = end
self.comment = interface.revision.get_comment() or None
def is_similar(self, elt2):
"""Check whether two events are similar enough to be merged.
Args:
elt2: MachineHistorySearchEvent, the event to compare with self.
Returns:
A boolean, True if the events can be merged and False otherwise.
2020-04-22 16:39:05 +00:00
"""
2020-04-22 16:17:06 +00:00
return (
elt2 is not None
and self.user.id == elt2.user.id
and self.ipv4 == elt2.ipv4
and self.machine.field_dict["id"] == elt2.machine.field_dict["id"]
and self.interface.field_dict["id"] == elt2.interface.field_dict["id"]
)
def __repr__(self):
return "{} ({} - ): from {} to {} ({})".format(
self.machine,
self.mac,
self.ipv4,
self.start_date,
self.end_date,
2021-02-10 10:06:09 +00:00
self.comment or "No comment",
2020-04-22 16:17:06 +00:00
)
class MachineHistorySearch:
2020-04-22 16:17:06 +00:00
def __init__(self):
self.events = []
2020-04-23 15:46:30 +00:00
self._last_evt = None
2020-04-22 16:17:06 +00:00
def get(self, search, params):
"""Get the events in machine histories related to the search.
Args:
search: the IP or MAC address used in the search.
params: the dictionary built by the search view.
Returns:
A list of MachineHistorySearchEvent in reverse chronological order.
"""
2020-04-22 16:17:06 +00:00
self.start = params.get("s", None)
self.end = params.get("e", None)
search_type = params.get("t", 0)
self.events = []
if search_type == "ip":
try:
return self._get_by_ip(search)[::-1]
except:
pass
2020-04-22 16:17:06 +00:00
elif search_type == "mac":
try:
search = EUI(search, dialect=default_dialect())
return self._get_by_mac(search)[::-1]
except:
pass
2020-04-22 16:17:06 +00:00
return []
2020-04-22 16:17:06 +00:00
2020-04-23 15:46:30 +00:00
def _add_revision(self, user, machine, interface):
"""Add a new revision to the chronological order.
Args:
user: User, the user owning the maching at the time of the event.
machine: Version, the machine version related to the interface.
interface: Version, the interface targeted by this event.
2020-04-22 16:39:05 +00:00
"""
evt = MachineHistorySearchEvent(user, machine, interface)
2020-04-22 16:17:06 +00:00
evt.start_date = interface.revision.date_created
2020-04-22 16:39:05 +00:00
# Try not to recreate events if it's unnecessary
2020-04-23 15:46:30 +00:00
if evt.is_similar(self._last_evt):
2020-04-22 16:17:06 +00:00
return
# Mark the end of validity of the last element
2020-04-23 15:46:30 +00:00
if self._last_evt and not self._last_evt.end_date:
self._last_evt.end_date = evt.start_date
2020-04-22 16:17:06 +00:00
# If the event ends before the given date, remove it
if self.start and evt.start_date.date() < self.start:
2020-04-23 15:46:30 +00:00
self._last_evt = None
2020-04-22 16:17:06 +00:00
self.events.pop()
# Make sure the new event starts before the given end date
if self.end and evt.start_date.date() > self.end:
2020-04-22 16:17:06 +00:00
return
# Save the new element
self.events.append(evt)
2020-04-23 15:46:30 +00:00
self._last_evt = evt
2020-04-22 16:17:06 +00:00
2020-04-23 15:46:30 +00:00
def _get_interfaces_for_ip(self, ip):
"""Get the Version objects of interfaces with the given IP
address.
Args:
ip: the string corresponding to the IP address.
Returns:
An iterable object with the Version objects of interfaces with the
given IP address.
2020-04-22 16:17:06 +00:00
"""
2020-04-22 16:39:05 +00:00
# TODO: What if ip list was deleted?
try:
ip_id = IpList.objects.get(ipv4=ip).id
except IpList.DoesNotExist:
return []
return (
Version.objects.get_for_model(Interface)
.filter(make_version_filter("ipv4", ip_id))
.order_by("revision__date_created")
2020-04-22 16:17:06 +00:00
)
2020-04-23 15:46:30 +00:00
def _get_interfaces_for_mac(self, mac):
"""Get the Version objects of interfaces with the given MAC
address.
Args:
mac: the string corresponding to the MAC address.
Returns:
An iterable object with the Version objects of interfaces with the
given MAC address.
2020-04-22 16:17:06 +00:00
"""
return (
Version.objects.get_for_model(Interface)
.filter(make_version_filter("mac_address", str(mac)))
.order_by("revision__date_created")
2020-04-22 16:17:06 +00:00
)
2020-04-23 15:46:30 +00:00
def _get_machines_for_interface(self, interface):
"""Get the Version objects of machines with the given interface.
Args:
interface: Version, the interface used to find machines.
Returns:
An iterable object with the Version objects of machines to which
the given interface was assigned.
2020-04-22 16:17:06 +00:00
"""
machine_id = interface.field_dict["machine_id"]
return (
Version.objects.get_for_model(Machine)
.filter(make_version_filter("pk", machine_id))
.order_by("revision__date_created")
2020-04-22 16:17:06 +00:00
)
2020-04-23 15:46:30 +00:00
def _get_user_for_machine(self, machine):
"""Get the User instance owning the given machine.
Args:
machine: Version, the machine used to find its owner.
Returns:
The User instance of the owner of the given machine.
2020-04-22 16:17:06 +00:00
"""
# TODO: What if user was deleted?
user_id = machine.field_dict["user_id"]
return User.objects.get(id=user_id)
2020-04-23 15:46:30 +00:00
def _get_by_ip(self, ip):
"""Get events related to the given IP address.
Args:
ip: the string corresponding to the IP address.
Returns:
A list of MachineHistorySearchEvent related to the given IP
address.
"""
2020-04-23 15:46:30 +00:00
interfaces = self._get_interfaces_for_ip(ip)
2020-04-22 16:17:06 +00:00
for interface in interfaces:
2020-04-23 15:46:30 +00:00
machines = self._get_machines_for_interface(interface)
2020-04-22 16:17:06 +00:00
for machine in machines:
2020-04-23 15:46:30 +00:00
user = self._get_user_for_machine(machine)
self._add_revision(user, machine, interface)
2020-04-22 16:17:06 +00:00
return self.events
2020-04-23 15:46:30 +00:00
def _get_by_mac(self, mac):
"""Get events related to the given MAC address.
Args:
mac: the string corresponding to the MAC address.
Returns:
A list of MachineHistorySearchEvent related to the given MAC
address.
"""
2020-04-23 15:46:30 +00:00
interfaces = self._get_interfaces_for_mac(mac)
2020-04-22 16:17:06 +00:00
for interface in interfaces:
2020-04-23 15:46:30 +00:00
machines = self._get_machines_for_interface(interface)
2020-04-22 16:17:06 +00:00
for machine in machines:
2020-04-23 15:46:30 +00:00
user = self._get_user_for_machine(machine)
self._add_revision(user, machine, interface)
2020-04-22 16:17:06 +00:00
return self.events
2020-04-23 11:18:16 +00:00
############################
# Generic history classes #
############################
2021-02-10 10:06:09 +00:00
2020-04-23 17:06:59 +00:00
class RelatedHistory:
2020-04-25 10:59:01 +00:00
def __init__(self, version):
"""Initialise an instance of RelatedHistory.
Args:
version: Version, the version related to the history.
2020-04-23 17:06:59 +00:00
"""
2020-04-25 10:59:01 +00:00
self.version = version
self.app_name = version.content_type.app_label
self.model_name = version.content_type.model
self.object_id = version.object_id
self.name = version.object_repr
2020-04-23 17:06:59 +00:00
if self.model_name:
self.name = "{}: {}".format(self.model_name.title(), self.name)
def __eq__(self, other):
2021-02-10 10:06:09 +00:00
return self.model_name == other.model_name and self.object_id == other.object_id
def __hash__(self):
return hash((self.model_name, self.object_id))
2020-04-23 17:06:59 +00:00
2020-04-23 15:39:45 +00:00
class HistoryEvent:
def __init__(self, version, previous_version=None, edited_fields=None):
"""Initialise an instance of HistoryEvent.
Args:
version: Version, the version of the object for this event.
previous_version: Version, the version of the object before this
event (default: None).
edited_fields: list, The list of modified fields by this event
(default: None).
2020-04-23 11:18:16 +00:00
"""
self.version = version
self.previous_version = previous_version
self.edited_fields = edited_fields or []
2020-04-23 11:18:16 +00:00
self.date = version.revision.date_created
self.performed_by = version.revision.user
self.comment = version.revision.get_comment() or None
2020-04-23 15:46:30 +00:00
def _repr(self, name, value):
"""Get the appropriate representation of the given field.
Args:
name: the name of the field
value: the value of the field
Returns:
The string corresponding to the appropriate representation of the
given field.
2020-04-23 15:39:45 +00:00
"""
if value is None:
return _("None")
return value
def edits(self, hide=["password", "pwd_ntlm"]):
"""Get the list of the changes performed during this event.
Args:
hide: the list of fields for which not to show details (default:
[]).
Returns:
The list of fields edited by the event to display.
2020-04-23 15:39:45 +00:00
"""
edits = []
for field in self.edited_fields:
old_value = None
new_value = None
2020-04-23 15:39:45 +00:00
if field in hide:
# Don't show sensitive information, so leave values at None
pass
2020-04-23 15:39:45 +00:00
else:
# Take into account keys that may exist in only one dict
if field in self.previous_version.field_dict:
old_value = self._repr(
2021-02-10 10:06:09 +00:00
field, self.previous_version.field_dict[field]
)
if field in self.version.field_dict:
2021-02-10 10:06:09 +00:00
new_value = self._repr(field, self.version.field_dict[field])
edits.append((field, old_value, new_value))
2020-04-23 15:39:45 +00:00
return edits
class History:
def __init__(self):
self.name = None
2020-04-23 15:39:45 +00:00
self.events = []
2020-04-23 17:06:59 +00:00
self.related = [] # For example, a machine has a list of its interfaces
2020-04-23 15:46:30 +00:00
self._last_version = None
2020-04-23 17:06:59 +00:00
self.event_type = HistoryEvent
def get(self, instance_id, model):
"""Get the list of history events of the given object.
Args:
instance_id: int, the id of the instance to lookup.
model: class, the type of object to lookup.
Returns:
A list of HistoryEvent, in reverse chronological order, related to
the given object or None if no version was found.
2020-04-23 17:06:59 +00:00
"""
self.events = []
# Get all the versions for this instance, with the oldest first
2020-04-23 17:06:59 +00:00
self._last_version = None
interface_versions = (
Version.objects.get_for_model(model)
.filter(make_version_filter("pk", instance_id))
.order_by("revision__date_created")
2020-04-23 17:06:59 +00:00
)
for version in interface_versions:
self._add_revision(version)
# Return None if interface_versions was empty
if self._last_version is None:
return None
self.name = self._last_version.object_repr
2020-04-23 17:06:59 +00:00
return self.events[::-1]
2020-04-23 15:39:45 +00:00
2020-04-23 15:46:30 +00:00
def _compute_diff(self, v1, v2, ignoring=[]):
"""Find the edited fields between two versions.
Args:
v1: Version to compare.
v2: Version to compare.
ignoring: a list of fields to ignore.
Returns:
The list of field names in v1 that are different from the ones in
v2.
2020-04-23 15:39:45 +00:00
"""
fields = []
v1_keys = set([k for k in v1.field_dict.keys() if k not in ignoring])
v2_keys = set([k for k in v2.field_dict.keys() if k not in ignoring])
2020-04-23 15:39:45 +00:00
common_keys = v1_keys.intersection(v2_keys)
fields += list(v2_keys - v1_keys)
fields += list(v1_keys - v2_keys)
for key in common_keys:
if v1.field_dict[key] != v2.field_dict[key]:
2020-04-23 15:39:45 +00:00
fields.append(key)
return fields
2020-04-23 17:06:59 +00:00
def _add_revision(self, version):
"""Add a new revision to the chronological order.
Args:
version: Version, the version of the interface for this event.
2020-04-23 17:06:59 +00:00
"""
diff = None
if self._last_version is not None:
diff = self._compute_diff(version, self._last_version)
# Ignore "empty" events
# but always keep the first event
if not diff and self._last_version:
2020-04-23 17:06:59 +00:00
self._last_version = version
return
evt = self.event_type(version, self._last_version, diff)
self.events.append(evt)
self._last_version = version
2020-04-23 15:39:45 +00:00
############################
# Revision history #
############################
2021-02-10 10:06:09 +00:00
2020-04-24 16:15:53 +00:00
class VersionAction(HistoryEvent):
def __init__(self, version):
self.version = version
def name(self):
return self.version._object_cache or self.version.object_repr
2020-04-24 16:15:53 +00:00
def application(self):
return self.version.content_type.app_label
def model_name(self):
return self.version.content_type.model
def object_id(self):
return self.version.object_id
def object_type(self):
return apps.get_model(self.application(), self.model_name())
def edits(self, hide=["password", "pwd_ntlm", "gpg_fingerprint"]):
"""Get the list of the changes performed during this event.
Args:
hide: the list of fields for which not to show details (default:
["password", "pwd_ntlm", "gpg_fingerprint"]).
Returns:
The list of fields edited by the event to display.
"""
2020-04-24 16:15:53 +00:00
self.previous_version = self._previous_version()
2020-04-24 16:23:52 +00:00
if self.previous_version is None:
return None, None, None
2020-04-24 16:15:53 +00:00
self.edited_fields = self._compute_diff(self.version, self.previous_version)
return super(VersionAction, self).edits(hide)
def _previous_version(self):
"""Get the previous version of self.
Returns:
The Version corresponding to the previous version of self, or None
in case of exception.
"""
2020-04-24 16:15:53 +00:00
model = self.object_type()
2020-04-24 16:23:52 +00:00
try:
2021-02-10 10:06:09 +00:00
query = make_version_filter("pk", self.object_id()) & Q(
revision__date_created__lt=self.version.revision.date_created
)
return (
Version.objects.get_for_model(model)
.filter(query)
.order_by("-revision__date_created")[0]
2020-04-24 16:15:53 +00:00
)
except Exception:
2020-04-24 16:23:52 +00:00
return None
2020-04-24 16:15:53 +00:00
2020-04-24 16:32:39 +00:00
def _compute_diff(self, v1, v2, ignoring=["pwd_ntlm"]):
"""Find the edited fields between two versions.
Args:
v1: Version to compare.
v2: Version to compare.
ignoring: a list of fields to ignore (default: ["pwd_ntlm"]).
Returns:
The list of field names in v1 that are different from the ones in
v2.
2020-04-24 16:15:53 +00:00
"""
fields = []
v1_keys = set([k for k in v1.field_dict.keys() if k not in ignoring])
v2_keys = set([k for k in v2.field_dict.keys() if k not in ignoring])
common_keys = v1_keys.intersection(v2_keys)
fields += list(v2_keys - v1_keys)
fields += list(v1_keys - v2_keys)
2020-04-24 16:15:53 +00:00
for key in common_keys:
if v1.field_dict[key] != v2.field_dict[key]:
2020-04-24 16:15:53 +00:00
fields.append(key)
return fields
class RevisionAction:
"""A Revision may group multiple Version objects together."""
2020-04-24 16:15:53 +00:00
def __init__(self, revision):
self.performed_by = revision.user
self.revision = revision
2020-04-24 16:32:39 +00:00
self.versions = [VersionAction(v) for v in revision.version_set.all()]
2020-04-24 16:15:53 +00:00
def id(self):
return self.revision.id
def date_created(self):
return self.revision.date_created
def comment(self):
return self.revision.get_comment()
class ActionsSearch:
def get(self, params):
"""Get the Revision objects corresponding to the search.
Args:
params: dictionary built by the search view.
Returns:
The QuerySet of Revision objects corresponding to the search.
"""
2020-12-30 17:49:52 +00:00
user = params.get("user", None)
start = params.get("start_date", None)
end = params.get("end_date", None)
action_types = params.get("action_type", None)
query = Q()
if user:
query &= Q(user__pseudo=user)
if start:
query &= Q(date_created__gte=start)
if end:
query &= Q(date_created__lte=end)
action_models = self.models_for_action_types(action_types)
if action_models:
query &= Q(version__content_type__model__in=action_models)
return (
Revision.objects.all()
.filter(query)
.select_related("user")
.prefetch_related("version_set__object")
)
def models_for_action_types(self, action_types):
if action_types is None:
return None
classes = []
for action_type in action_types:
c = classes_for_action_type(action_type)
# Selecting "all" removes the filter
if c is None:
return None
classes += list(map(str.lower, c))
return classes
############################
# Class-specific history #
############################
2021-02-10 10:06:09 +00:00
2020-04-23 15:39:45 +00:00
class UserHistoryEvent(HistoryEvent):
2020-04-23 15:46:30 +00:00
def _repr(self, name, value):
"""Get the appropriate representation of the given field.
Args:
name: the name of the field
value: the value of the field
Returns:
The string corresponding to the appropriate representation of the
given field.
"""
2020-04-23 12:44:23 +00:00
if name == "groups":
if len(value) == 0:
# Removed all the user's groups
return _("None")
# value is a list of ints
groups = []
for gid in value:
# Try to get the group name, if it's not deleted
try:
2020-04-23 12:44:23 +00:00
groups.append(Group.objects.get(id=gid).name)
except Group.DoesNotExist:
# TODO: Find the group name in the versions?
groups.append("{} ({})".format(_("Deleted"), gid))
2020-04-23 12:44:23 +00:00
return ", ".join(groups)
elif name == "state":
if value is not None:
2020-04-23 12:44:23 +00:00
return User.STATES[value][1]
else:
return _("Unknown")
elif name == "email_state":
if value is not None:
2020-04-23 12:44:23 +00:00
return User.EMAIL_STATES[value][1]
else:
return _("Unknown")
elif name == "room_id" and value is not None:
# Try to get the room name, if it's not deleted
try:
return Room.objects.get(id=value)
except Room.DoesNotExist:
# TODO: Find the room name in the versions?
return "{} ({})".format(_("Deleted"), value)
elif name == "members" or name == "administrators":
if len(value) == 0:
# Removed all the club's members
return _("None")
# value is a list of ints
users = []
for uid in value:
# Try to get the user's name, if theyr're not deleted
try:
users.append(User.objects.get(id=uid).pseudo)
except User.DoesNotExist:
# TODO: Find the user's name in the versions?
2020-04-23 14:13:58 +00:00
users.append("{} ({})".format(_("Deleted"), uid))
return ", ".join(users)
return super(UserHistoryEvent, self)._repr(name, value)
def edits(self, hide=["password", "pwd_ntlm", "gpg_fingerprint"]):
"""Get the list of the changes performed during this event.
Args:
hide: the list of fields for which not to show details (default:
["password", "pwd_ntlm", "gpg_fingerprint"]).
Returns:
The list of fields edited by the event to display.
2020-04-23 11:18:16 +00:00
"""
2020-04-23 15:39:45 +00:00
return super(UserHistoryEvent, self).edits(hide)
2020-04-23 11:18:16 +00:00
def __eq__(self, other):
return (
2020-04-23 18:30:54 +00:00
self.edited_fields == other.edited_fields
and self.date == other.date
and self.performed_by == other.performed_by
and self.comment == other.comment
)
def __hash__(self):
2021-02-10 10:06:09 +00:00
return hash(
(frozenset(self.edited_fields), self.date, self.performed_by, self.comment)
)
2020-04-23 11:18:16 +00:00
def __repr__(self):
return "{} edited fields {} ({})".format(
2020-04-23 11:18:16 +00:00
self.performed_by,
self.edited_fields or "nothing",
2021-02-10 10:06:09 +00:00
self.comment or "No comment",
2020-04-23 11:18:16 +00:00
)
2020-04-23 15:39:45 +00:00
class UserHistory(History):
2020-04-23 11:18:16 +00:00
def __init__(self):
2020-04-23 15:46:30 +00:00
super(UserHistory, self).__init__()
2020-04-23 17:06:59 +00:00
self.event_type = UserHistoryEvent
2020-04-23 11:18:16 +00:00
def get(self, user_id, model):
"""Get the the list of UserHistoryEvent related to the object.
Args:
user_id: int, the id of the user to lookup.
Returns:
The list of UserHistoryEvent, in reverse chronological order,
related to the object, or None if nothing was found.
2020-04-23 11:18:16 +00:00
"""
self.events = []
# Try to find an Adherent object
# If it exists, its id will be the same as the user's
2021-02-10 10:06:09 +00:00
adherents = Version.objects.get_for_model(Adherent).filter(
make_version_filter("pk", user_id)
)
try:
obj = adherents[0]
model = Adherent
except IndexError:
obj = None
# Fallback on a Club
if obj is None:
2021-02-10 10:06:09 +00:00
clubs = Version.objects.get_for_model(Club).filter(
make_version_filter("pk", user_id)
)
try:
obj = clubs[0]
model = Club
except IndexError:
obj = None
# If nothing was found, abort
if obj is None:
return None
# Add in "related" elements the list of objects
2020-04-23 17:06:59 +00:00
# that were once owned by this user
self.related = (
Version.objects.all()
.filter(make_version_filter("user", user_id))
.order_by("content_type__model")
2020-04-23 17:06:59 +00:00
)
2020-04-25 10:59:01 +00:00
self.related = [RelatedHistory(v) for v in self.related]
self.related = list(dict.fromkeys(self.related))
2020-04-23 17:06:59 +00:00
2020-04-23 11:18:16 +00:00
# Get all the versions for this user, with the oldest first
2020-04-23 15:46:30 +00:00
self._last_version = None
user_versions = (
Version.objects.get_for_model(User)
.filter(make_version_filter("pk", user_id))
.order_by("revision__date_created")
2020-04-23 11:18:16 +00:00
)
for version in user_versions:
2020-04-23 18:30:54 +00:00
self._add_revision(version)
2020-04-23 11:18:16 +00:00
# Update name
self.name = self._last_version.field_dict["pseudo"]
# Do the same thing for the Adherent of Club
2020-04-23 15:46:30 +00:00
self._last_version = None
obj_versions = (
Version.objects.get_for_model(model)
.filter(make_version_filter("pk", user_id))
.order_by("revision__date_created")
)
for version in obj_versions:
2020-04-23 18:30:54 +00:00
self._add_revision(version)
# Remove duplicates and sort
self.events = list(dict.fromkeys(self.events))
2021-02-10 10:06:09 +00:00
return sorted(self.events, key=lambda e: e.date, reverse=True)
2020-04-23 11:18:16 +00:00
2020-04-23 18:30:54 +00:00
def _add_revision(self, version):
"""Add a new revision to the chronological order.
Args:
version: Version, the version of the user for this event.
2020-04-23 11:18:16 +00:00
"""
2020-04-23 15:39:45 +00:00
diff = None
2020-04-23 15:46:30 +00:00
if self._last_version is not None:
diff = self._compute_diff(
2020-04-23 15:39:45 +00:00
version,
2020-04-23 15:46:30 +00:00
self._last_version,
2021-02-10 10:06:09 +00:00
ignoring=["last_login", "pwd_ntlm", "email_change_date"],
2020-04-23 15:39:45 +00:00
)
2020-04-23 11:18:16 +00:00
2020-04-23 15:39:45 +00:00
# Ignore "empty" events like login
# but always keep the first event
if not diff and self._last_version:
2020-04-23 15:46:30 +00:00
self._last_version = version
2020-04-23 15:39:45 +00:00
return
2020-04-23 11:18:16 +00:00
2020-04-23 18:30:54 +00:00
evt = UserHistoryEvent(version, self._last_version, diff)
2020-04-23 15:39:45 +00:00
self.events.append(evt)
2020-04-23 15:46:30 +00:00
self._last_version = version
2020-04-23 11:18:16 +00:00
2020-04-23 15:39:45 +00:00
2020-04-23 17:06:59 +00:00
class MachineHistoryEvent(HistoryEvent):
def _repr(self, name, value):
"""Return the appropriate representation of the given field.
Args:
name: the name of the field
value: the value of the field
Returns:
The string corresponding to the appropriate representation of the
given field.
2020-04-23 17:06:59 +00:00
"""
if name == "user_id":
try:
return User.objects.get(id=value).pseudo
except User.DoesNotExist:
return "{} ({})".format(_("Deleted"), value)
return super(MachineHistoryEvent, self)._repr(name, value)
class MachineHistory(History):
def __init__(self):
super(MachineHistory, self).__init__()
self.event_type = MachineHistoryEvent
def get(self, machine_id, model):
"""Get the the list of MachineHistoryEvent related to the object.
Args:
machine_id: int, the id of the machine to lookup.
Returns:
The list of MachineHistoryEvent, in reverse chronological order,
related to the object.
"""
self.related = (
Version.objects.get_for_model(Interface)
.filter(make_version_filter("machine", machine_id))
.order_by("content_type__model")
)
2020-04-23 17:06:59 +00:00
# Create RelatedHistory objects and remove duplicates
2020-04-25 10:59:01 +00:00
self.related = [RelatedHistory(v) for v in self.related]
self.related = list(dict.fromkeys(self.related))
return super(MachineHistory, self).get(machine_id, Machine)
2020-04-23 17:06:59 +00:00
2020-04-23 15:39:45 +00:00
class InterfaceHistoryEvent(HistoryEvent):
def _repr(self, name, value):
"""Get the appropriate representation of the given field.
Args:
name: the name of the field
value: the value of the field
Returns:
The string corresponding to the appropriate representation of the
given field.
"""
if name == "ipv4_id" and value is not None:
try:
return IpList.objects.get(id=value)
except IpList.DoesNotExist:
return "{} ({})".format(_("Deleted"), value)
elif name == "machine_type_id":
try:
return MachineType.objects.get(id=value).name
except MachineType.DoesNotExist:
return "{} ({})".format(_("Deleted"), value)
elif name == "machine_id":
try:
return Machine.objects.get(id=value).get_name() or _("No name")
except Machine.DoesNotExist:
return "{} ({})".format(_("Deleted"), value)
elif name == "port_lists":
if len(value) == 0:
return _("None")
ports = []
for pid in value:
try:
ports.append(Port.objects.get(id=pid).pretty_name())
except Group.DoesNotExist:
ports.append("{} ({})".format(_("Deleted"), pid))
return super(InterfaceHistoryEvent, self)._repr(name, value)
class InterfaceHistory(History):
2020-04-23 17:06:59 +00:00
def __init__(self):
super(InterfaceHistory, self).__init__()
self.event_type = InterfaceHistoryEvent
def get(self, interface_id, model):
"""Get the the list of InterfaceHistoryEvent related to the object.
Args:
interface_id: int, the id of the interface to lookup.
Returns:
The list of InterfaceHistoryEvent, in reverse chronological order,
related to the object.
"""
return super(InterfaceHistory, self).get(interface_id, Interface)
############################
# History auto-detect #
############################
HISTORY_CLASS_MAPPING = {
User: UserHistory,
Machine: MachineHistory,
Interface: InterfaceHistory,
2021-02-10 10:06:09 +00:00
"default": History,
}
def get_history_class(model):
"""Get the most appropriate History subclass to represent the given model's
history.
Args:
model: the class for which to get the history.
Returns:
The most appropriate History subclass for the given model's history,
or History if no other was found.
"""
try:
return HISTORY_CLASS_MAPPING[model]()
except KeyError:
return HISTORY_CLASS_MAPPING["default"]()