etesync-server/myauth/ldap.py

105 lines
3.7 KiB
Python
Raw Normal View History

import logging
from django.utils import timezone
from django.conf import settings
from django.core.exceptions import PermissionDenied as DjangoPermissionDenied
2021-02-05 11:23:32 +00:00
from django_etebase.utils import CallbackContext
from myauth.models import get_typed_user_model, UserType
from etebase_fastapi.dependencies import get_authenticated_user
from etebase_fastapi.exceptions import PermissionDenied as FastAPIPermissionDenied
from fastapi import Depends
import ldap
2021-02-05 11:23:32 +00:00
User = get_typed_user_model()
2021-02-05 16:00:55 +00:00
def ldap_setting(name, default):
2020-11-14 15:44:45 +00:00
"""Wrapper around django.conf.settings"""
return getattr(settings, f"LDAP_{name}", default)
class LDAPConnection:
__instance__ = None
2020-11-14 15:44:45 +00:00
__user_cache = {} # Username -> Valid until
@staticmethod
def get_instance():
2020-11-14 15:44:45 +00:00
"""To get a Singleton"""
if not LDAPConnection.__instance__:
return LDAPConnection()
else:
return LDAPConnection.__instance__
def __init__(self):
# Cache some settings
2020-11-14 15:44:45 +00:00
self.__LDAP_FILTER = ldap_setting("FILTER", "")
self.__LDAP_SEARCH_BASE = ldap_setting("SEARCH_BASE", "")
2021-12-09 12:18:55 +00:00
password = ldap_setting("BIND_PW", "")
if not password:
pwfile = ldap_setting("BIND_PW_FILE", "")
if pw_file:
with open(pwfile, "r") as f:
password = f.read().replace("\n", "")
else:
logging.error("No bind password specified")
2020-11-14 15:44:45 +00:00
self.__ldap_connection = ldap.initialize(ldap_setting("SERVER", ""))
try:
2021-12-09 12:18:55 +00:00
self.__ldap_connection.simple_bind_s(ldap_setting("BIND_DN", ""), password)
except ldap.LDAPError as err:
2020-11-14 15:44:45 +00:00
logging.error(f"LDAP Error occuring during bind: {err.desc}")
def __is_cache_valid(self, username):
2020-11-14 15:44:45 +00:00
"""Returns True if the cache entry is still valid. Returns False otherwise."""
if username in self.__user_cache:
if timezone.now() <= self.__user_cache[username]:
# Cache entry is still valid
return True
return False
def __remove_cache(self, username):
del self.__user_cache[username]
def has_user(self, username):
2020-11-14 15:44:45 +00:00
"""
Since we don't care about the password and so authentication
another way, all we care about is whether the user exists.
2020-11-14 15:44:45 +00:00
"""
if self.__is_cache_valid(username):
return True
if username in self.__user_cache:
self.__remove_cache(username)
2020-11-14 15:44:45 +00:00
filterstr = self.__LDAP_FILTER.replace("%s", username)
try:
2020-11-14 15:44:45 +00:00
result = self.__ldap_connection.search_s(self.__LDAP_SEARCH_BASE, ldap.SCOPE_SUBTREE, filterstr=filterstr)
except ldap.NO_RESULTS_RETURNED:
# We handle the specific error first and the the generic error, as
# we may expect ldap.NO_RESULTS_RETURNED, but not any other error
return False
except ldap.LDAPError as err:
2020-11-14 15:44:45 +00:00
logging.error(f"Error occured while performing an LDAP query: {err.desc}")
return False
if len(result) == 1:
self.__user_cache[username] = timezone.now() + timezone.timedelta(hours=1)
return True
return False
2021-02-05 16:00:55 +00:00
def is_user_in_ldap(user: UserType = Depends(get_authenticated_user)):
if not LDAPConnection.get_instance().has_user(user.username):
raise FastAPIPermissionDenied(detail="User not in LDAP directory.")
2021-02-05 16:00:55 +00:00
2021-02-05 11:23:32 +00:00
def create_user(context: CallbackContext, *args, **kwargs):
2020-11-14 15:44:45 +00:00
"""
A create_user function which first checks if the user already exists in the
configured LDAP directory.
2020-11-14 15:44:45 +00:00
"""
if not LDAPConnection.get_instance().has_user(kwargs["username"]):
raise DjangoPermissionDenied("User not in the LDAP directory.")
return User.objects.create_user(*args, **kwargs)