8
0
Fork 0
mirror of https://gitlab2.federez.net/re2o/re2o synced 2024-11-24 12:23:11 +00:00
re2o/search/engine.py

574 lines
20 KiB
Python
Raw Permalink Normal View History

2020-11-23 16:06:37 +00:00
# Re2o est un logiciel d'administration développé initiallement au Rézo Metz. Il
2020-02-18 17:16:08 +00:00
# se veut agnostique au réseau considéré, de manière à être installable en
# quelques clics.
#
# Copyright © 2017 Gabriel Détraz
# Copyright © 2017 Lara Kermarec
# Copyright © 2017 Augustin Lemesle
# Copyright © 2019 Jean-Romain Garnier
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""The views for the search app, responsible for finding the matches
Augustin lemesle, Gabriel Détraz, Lara Kermarec, Maël Kervella,
Jean-Romain Garnier
Gplv2"""
from __future__ import unicode_literals
2021-02-10 10:06:09 +00:00
from django.db.models import Q, Value
from django.db.models.functions import Concat
2021-02-10 10:06:09 +00:00
from netaddr import EUI, AddrFormatError
2020-02-18 17:16:08 +00:00
from cotisations.models import Facture
2021-02-10 10:06:09 +00:00
from machines.models import Machine
2020-02-18 17:16:08 +00:00
from preferences.models import GeneralOption
from re2o.base import SortTable, re2o_paginator
2021-02-10 10:06:09 +00:00
from topologie.models import Port, Room, Switch
from users.models import Adherent, Ban, Club, User, Whitelist
2020-02-18 17:16:08 +00:00
# List of fields the search applies to
FILTER_FIELDS = [
"users",
"clubs",
"machines",
"factures",
"bans",
"whitelists",
"rooms",
"ports",
"switches",
]
2020-02-18 17:16:08 +00:00
class Query:
"""Class representing a query.
It can contain the user-entered text, the operator for the query,
2020-04-28 19:47:39 +00:00
and a list of subqueries.
Attributes:
text: the string written by the user in a query.
operator: character used to link subqueries, e.g. "+".
subqueries: list of Query objects when the current query is split in
several parts.
"""
def __init__(self, text="", case_sensitive=False):
2020-04-28 19:47:39 +00:00
"""Initialise an instance of Query.
Args:
text: the content of the query (default: "").
case_sensitive: bool, True if the query is case sensitive and
False if not (default: False).
"""
self.text = text
self.operator = None
self.subqueries = None
self.case_sensitive = case_sensitive
2020-02-18 17:16:08 +00:00
def add_char(self, char):
2020-04-28 19:47:39 +00:00
"""Add the given character to the query's text.
Args:
char: the character to be added.
"""
2020-02-18 17:16:08 +00:00
self.text += char
def add_operator(self, operator):
"""Consider a new operator was entered, and that it must be processed.
The query's current text is moved to self.subqueries in the form
2020-04-28 19:47:39 +00:00
of a plain Query object.
Args:
operator: the operator to be added.
"""
2020-02-18 17:16:08 +00:00
self.operator = operator
if self.subqueries is None:
self.subqueries = []
self.subqueries.append(Query(self.text, self.case_sensitive))
2020-02-18 17:16:08 +00:00
self.text = ""
self.case_sensitive = False
2020-02-18 17:16:08 +00:00
@property
def plaintext(self):
2020-04-28 19:47:39 +00:00
"""Return the textual representation of the query's content."""
2020-02-18 17:16:08 +00:00
if self.operator is not None:
return self.operator.join([q.plaintext for q in self.subqueries])
if self.case_sensitive:
return '"{}"'.format(self.text)
2020-02-18 17:16:08 +00:00
return self.text
def empty_filters():
2020-04-28 19:47:39 +00:00
"""Build empty filters used by Django."""
return {f: Q() for f in FILTER_FIELDS}
2020-02-18 17:16:08 +00:00
def is_int(variable):
2020-04-28 19:47:39 +00:00
"""Check if the variable can be cast to an integer."""
2020-02-18 17:16:08 +00:00
try:
int(variable)
except ValueError:
return False
else:
return True
def finish_results(request, results, col, order):
"""Sort the results by applying filters and then limit them to the
2020-04-28 19:47:39 +00:00
number of max results. Finally add the info of the maximum number of
results to the dictionary.
Args:
request: django request, corresponding to the search.
results: dict, the results of the search.
col: the column used to sort the results.
order: the order used to sort the results.
Returns:
The dictionary of results sorted and paginated.
"""
2020-02-18 17:16:08 +00:00
results["users"] = SortTable.sort(
results["users"], col, order, SortTable.USERS_INDEX
)
results["clubs"] = SortTable.sort(
results["clubs"], col, order, SortTable.USERS_INDEX
)
results["machines"] = SortTable.sort(
results["machines"], col, order, SortTable.MACHINES_INDEX
)
results["factures"] = SortTable.sort(
results["factures"], col, order, SortTable.COTISATIONS_INDEX
)
results["bans"] = SortTable.sort(
results["bans"], col, order, SortTable.USERS_INDEX_BAN
)
results["whitelists"] = SortTable.sort(
results["whitelists"], col, order, SortTable.USERS_INDEX_WHITE
)
results["rooms"] = SortTable.sort(
results["rooms"], col, order, SortTable.TOPOLOGIE_INDEX_ROOM
)
results["ports"] = SortTable.sort(
results["ports"], col, order, SortTable.TOPOLOGIE_INDEX_PORT
)
results["switches"] = SortTable.sort(
results["switches"], col, order, SortTable.TOPOLOGIE_INDEX
)
max_result = GeneralOption.get_cached_value("search_display_page")
for name, val in results.items():
page_arg = name + "_page"
results[name] = re2o_paginator(
request, val.distinct(), max_result, page_arg=page_arg
)
2020-02-18 17:16:08 +00:00
results.update({"max_result": max_result})
return results
def contains_filter(attribute, word, case_sensitive=False):
"""Create a django model filtering whether the given attribute
2020-04-28 19:47:39 +00:00
contains the specified value.
Args:
attribute: the attribute used to check if it contains the given word or
not.
word: the word used to check if it is contained in the attribute or
not.
case_sensitive: bool, True if the check is case sensitive and
False if not (default: False).
"""
if case_sensitive:
attr = "{}__{}".format(attribute, "contains")
else:
attr = "{}__{}".format(attribute, "icontains")
return Q(**{attr: word})
def search_single_word(
word, filters, user, start, end, user_state, email_state, aff, case_sensitive=False
):
2020-04-28 19:47:39 +00:00
"""Construct the correct filters to match differents fields of some models
2020-02-18 17:16:08 +00:00
with the given query according to the given filters.
2020-04-28 19:47:39 +00:00
The match fields are either CharField or IntegerField that will be displayed
2020-02-18 17:16:08 +00:00
on the results page (else, one might not see why a result has matched the
query). IntegerField are matched against the query only if it can be casted
2020-04-28 19:47:39 +00:00
to an int.
"""
2020-02-18 17:16:08 +00:00
# Users
if "0" in aff:
filter_clubs = (
contains_filter("surname", word, case_sensitive)
| contains_filter("pseudo", word, case_sensitive)
| contains_filter("email", word, case_sensitive)
| contains_filter("telephone", word, case_sensitive)
2020-02-19 10:06:57 +00:00
# Added through annotate
| contains_filter("room_full_name", word, case_sensitive)
| contains_filter("room_full_name_stuck", word, case_sensitive)
2020-02-18 17:16:08 +00:00
)
# Users have a name whereas clubs only have a surname
filter_users = filter_clubs | contains_filter("name", word, case_sensitive)
2020-02-18 17:16:08 +00:00
if not User.can_view_all(user)[0]:
filter_clubs &= Q(id=user.id)
filter_users &= Q(id=user.id)
filter_clubs &= Q(state__in=user_state)
filter_users &= Q(state__in=user_state)
2020-04-19 15:07:29 +00:00
filter_clubs &= Q(email_state__in=email_state)
filter_users &= Q(email_state__in=email_state)
2020-02-18 17:16:08 +00:00
filters["users"] |= filter_users
filters["clubs"] |= filter_clubs
# Machines
if "1" in aff:
filter_machines = (
contains_filter("name", word, case_sensitive)
| (
contains_filter("user__pseudo", word, case_sensitive)
& Q(user__state__in=user_state)
& Q(user__email_state__in=email_state)
)
| contains_filter("interface__domain__name", word, case_sensitive)
| contains_filter(
"interface__domain__related_domain__name", word, case_sensitive
)
| contains_filter("interface__mac_address", word, case_sensitive)
| contains_filter("interface__ipv4__ipv4", word, case_sensitive)
2020-02-18 17:16:08 +00:00
)
try:
2020-02-19 10:06:57 +00:00
_ = EUI(word, 48)
2020-02-18 17:16:08 +00:00
filter_machines |= Q(interface__mac_address=word)
except AddrFormatError:
pass
if not Machine.can_view_all(user)[0]:
filter_machines &= Q(user__id=user.id)
filters["machines"] |= filter_machines
# Factures
if "2" in aff:
2020-02-18 18:01:39 +00:00
filter_factures = (
contains_filter("user__pseudo", word, case_sensitive)
& Q(user__state__in=user_state)
2020-04-19 15:07:29 +00:00
& Q(user__email_state__in=email_state)
2020-02-18 18:01:39 +00:00
)
2020-02-18 17:16:08 +00:00
if start is not None:
filter_factures &= Q(date__gte=start)
if end is not None:
filter_factures &= Q(date__lte=end)
filters["factures"] |= filter_factures
# Bans
if "3" in aff:
filter_bans = (
contains_filter("user__pseudo", word, case_sensitive)
& Q(user__state__in=user_state)
2020-04-19 15:07:29 +00:00
& Q(user__email_state__in=email_state)
) | contains_filter("raison", word, case_sensitive)
2020-02-18 17:16:08 +00:00
if start is not None:
filter_bans &= (
(Q(date_start__gte=start) & Q(date_end__gte=start))
| (Q(date_start__lte=start) & Q(date_end__gte=start))
| (Q(date_start__gte=start) & Q(date_end__lte=start))
)
if end is not None:
filter_bans &= (
(Q(date_start__lte=end) & Q(date_end__lte=end))
| (Q(date_start__lte=end) & Q(date_end__gte=end))
| (Q(date_start__gte=end) & Q(date_end__lte=end))
)
filters["bans"] |= filter_bans
# Whitelists
if "4" in aff:
filter_whitelists = (
contains_filter("user__pseudo", word, case_sensitive)
& Q(user__state__in=user_state)
2020-04-19 15:07:29 +00:00
& Q(user__email_state__in=email_state)
) | contains_filter("raison", word, case_sensitive)
2020-02-18 17:16:08 +00:00
if start is not None:
filter_whitelists &= (
(Q(date_start__gte=start) & Q(date_end__gte=start))
| (Q(date_start__lte=start) & Q(date_end__gte=start))
| (Q(date_start__gte=start) & Q(date_end__lte=start))
)
if end is not None:
filter_whitelists &= (
(Q(date_start__lte=end) & Q(date_end__lte=end))
| (Q(date_start__lte=end) & Q(date_end__gte=end))
| (Q(date_start__gte=end) & Q(date_end__lte=end))
)
filters["whitelists"] |= filter_whitelists
# Rooms
if "5" in aff and Room.can_view_all(user):
filter_rooms = (
contains_filter("details", word, case_sensitive)
2020-02-19 10:06:57 +00:00
# Added through annotate
| contains_filter("full_name", word, case_sensitive)
| contains_filter("full_name_stuck", word, case_sensitive)
| Q(port__details=word)
2020-02-18 17:16:08 +00:00
)
filters["rooms"] |= filter_rooms
# Switch ports
if "6" in aff and User.can_view_all(user):
filter_ports = (
contains_filter("machine_interface__domain__name", word, case_sensitive)
| contains_filter(
"related__switch__interface__domain__name", word, case_sensitive
)
| contains_filter("custom_profile__name", word, case_sensitive)
| contains_filter("custom_profile__profil_default", word, case_sensitive)
| contains_filter("details", word, case_sensitive)
2020-02-19 10:06:57 +00:00
# Added through annotate
| contains_filter("room_full_name", word, case_sensitive)
| contains_filter("room_full_name_stuck", word, case_sensitive)
2020-02-18 17:16:08 +00:00
)
if is_int(word):
filter_ports |= Q(port=word)
filters["ports"] |= filter_ports
# Switches
if "7" in aff and Switch.can_view_all(user):
filter_switches = (
contains_filter("interface__domain__name", word, case_sensitive)
| contains_filter("interface__ipv4__ipv4", word, case_sensitive)
| contains_filter("switchbay__building__name", word, case_sensitive)
| contains_filter("stack__name", word, case_sensitive)
| contains_filter("model__reference", word, case_sensitive)
| contains_filter("model__constructor__name", word, case_sensitive)
| contains_filter("interface__details", word, case_sensitive)
2020-02-18 17:16:08 +00:00
)
if is_int(word):
filter_switches |= Q(number=word) | Q(stack_member_id=word)
filters["switches"] |= filter_switches
return filters
def apply_filters(filters, user, aff):
2020-04-28 19:47:39 +00:00
"""Apply the filters constructed by search_single_query.
2020-02-18 17:16:08 +00:00
It also takes into account the visual filters defined during
the search query.
"""
# Results are later filled-in depending on the display filter
2020-02-19 10:06:57 +00:00
# In some cases, annotations are used to match what is displayed in the
# results. For example, the displayed room is actually
# "room__building__name room__name", so queries wouldn't match what the
# user expects if we just kept the database's format
2020-02-18 17:16:08 +00:00
results = {
"users": Adherent.objects.none(),
"clubs": Club.objects.none(),
"machines": Machine.objects.none(),
"factures": Facture.objects.none(),
"bans": Ban.objects.none(),
"whitelists": Whitelist.objects.none(),
"rooms": Room.objects.none(),
"ports": Port.objects.none(),
"switches": Switch.objects.none(),
}
# Users and clubs
if "0" in aff:
results["users"] = Adherent.objects.annotate(
room_full_name=Concat("room__building__name", Value(" "), "room__name"),
room_full_name_stuck=Concat("room__building__name", "room__name"),
).filter(filters["users"])
results["clubs"] = Club.objects.annotate(
room_full_name=Concat("room__building__name", Value(" "), "room__name"),
room_full_name_stuck=Concat("room__building__name", "room__name"),
).filter(filters["clubs"])
2020-02-18 17:16:08 +00:00
# Machines
if "1" in aff:
results["machines"] = Machine.objects.filter(filters["machines"])
# Factures
if "2" in aff:
results["factures"] = Facture.objects.filter(filters["factures"])
# Bans
if "3" in aff:
results["bans"] = Ban.objects.filter(filters["bans"])
# Whitelists
if "4" in aff:
results["whitelists"] = Whitelist.objects.filter(filters["whitelists"])
# Rooms
if "5" in aff and Room.can_view_all(user):
results["rooms"] = Room.objects.annotate(
full_name=Concat("building__name", Value(" "), "name"),
full_name_stuck=Concat("building__name", "name"),
).filter(filters["rooms"])
2020-02-18 17:16:08 +00:00
# Switch ports
if "6" in aff and User.can_view_all(user):
results["ports"] = Port.objects.annotate(
room_full_name=Concat("room__building__name", Value(" "), "room__name"),
room_full_name_stuck=Concat("room__building__name", "room__name"),
).filter(filters["ports"])
2020-02-18 17:16:08 +00:00
# Switches
if "7" in aff and Switch.can_view_all(user):
results["switches"] = Switch.objects.filter(filters["switches"])
return results
2020-04-19 15:07:29 +00:00
def search_single_query(query, filters, user, start, end, user_state, email_state, aff):
2020-04-28 19:47:39 +00:00
"""Handle different queries an construct the correct filters using
search_single_word."""
2020-02-18 17:16:08 +00:00
if query.operator == "+":
# Special queries with "+" operators should use & rather than |
newfilters = empty_filters()
for q in query.subqueries:
# Construct an independent filter for each subquery
subfilters = search_single_query(
q, empty_filters(), user, start, end, user_state, email_state, aff
)
2020-02-18 17:16:08 +00:00
# Apply the subfilter
for field in FILTER_FIELDS:
2020-02-18 17:16:08 +00:00
newfilters[field] &= subfilters[field]
# Add these filters to the existing ones
for field in FILTER_FIELDS:
2020-02-18 17:16:08 +00:00
filters[field] |= newfilters[field]
return filters
# Handle standard queries
return search_single_word(
query.text,
filters,
user,
start,
end,
user_state,
email_state,
aff,
query.case_sensitive,
)
2020-02-18 17:16:08 +00:00
def create_queries(query):
"""Function used to split the query in different words to look for.
The rules are the following :
- anti-slash ('\\') is used to escape characters
- anything between quotation marks ('"') is kept intact (not
interpreted as separators) excepts anti-slashes used to escape
Values in between quotation marks are not searched accross
multiple field in the database (contrary to +)
- spaces (' ') and commas (',') are used to separated words
- "+" signs are used as "and" operators
"""
# A dict representing the different queries extracted from the user's text
queries = []
current_query = None
# Whether the query is between "
keep_intact = False
# Whether the previous char was a \
escaping_char = False
for char in query:
if current_query is None:
# We are starting a new word
current_query = Query()
if escaping_char:
# The last char war a \ so we escape this char
escaping_char = False
current_query.add_char(char)
continue
if char == "\\":
# We need to escape the next char
escaping_char = True
continue
if char == '"':
# Toogle the keep_intact state, if true, we are between two "
keep_intact = not keep_intact
if keep_intact:
current_query.case_sensitive = True
2020-02-18 17:16:08 +00:00
continue
if keep_intact:
# If we are between two ", ignore separators
current_query.add_char(char)
continue
if char == "+":
if len(current_query.text) == 0:
# Can't sart a query with a "+", consider it escaped
current_query.add_char(char)
continue
current_query.add_operator("+")
continue
if char == " " or char == ",":
# If we encouter a separator outside of ", we create a new word
if len(current_query.text) == 0:
# Discard empty queries
continue
if current_query.operator is not None:
# If we were building a special structure, finish building it
current_query.add_operator(current_query.operator)
# Save the query and start a new one
queries.append(current_query)
current_query = None
continue
# If we haven't encountered any special case, add the char to the word
current_query.add_char(char)
# Save the current working query if necessary
if current_query is not None:
if current_query.operator is not None:
# There was an operator supposed to split multiple words
if len(current_query.text) > 0:
# Finish the current search
current_query.add_operator(current_query.operator)
queries.append(current_query)
# Make sure there is at least one query, even if it's empty
# Otherwise, display filters (for advanced search) won't work
# when the search text field is empty
queries = queries or [Query()]
2020-02-18 17:16:08 +00:00
return queries