94 lines
3.3 KiB
Python
94 lines
3.3 KiB
Python
|
import logging
|
||
|
|
||
|
from django.utils import timezone
|
||
|
from django.conf import settings
|
||
|
from django.core.exceptions import PermissionDenied
|
||
|
from rest_framework.permissions import BasePermission
|
||
|
|
||
|
import ldap
|
||
|
|
||
|
|
||
|
def ldap_setting(name, default):
|
||
|
'''Wrapper around django.conf.settings'''
|
||
|
return getattr(settings, f'LDAP_{name}', default)
|
||
|
|
||
|
class LDAPConnection:
|
||
|
__instance__ = None
|
||
|
__user_cache = {} # Username -> Valid until
|
||
|
|
||
|
@staticmethod
|
||
|
def get_instance():
|
||
|
'''To get a Singleton'''
|
||
|
if not LDAPConnection.__instance__:
|
||
|
return LDAPConnection()
|
||
|
else:
|
||
|
return LDAPConnection.__instance__
|
||
|
|
||
|
def __init__(self):
|
||
|
# Cache some settings
|
||
|
self.__LDAP_FILTER = ldap_setting('FILTER', '')
|
||
|
self.__LDAP_SEARCH_BASE = ldap_setting('SEARCH_BASE', '')
|
||
|
|
||
|
self.__ldap_connection = ldap.initialize(ldap_setting('SERVER', ''))
|
||
|
try:
|
||
|
self.__ldap_connection.simple_bind_s(ldap_setting('BIND_DN', ''),
|
||
|
ldap_setting('BIND_PW', ''))
|
||
|
except ldap.LDAPError as err:
|
||
|
logging.error(f'LDAP Error occuring during bind: {err.desc}')
|
||
|
|
||
|
def __is_cache_valid(self, username):
|
||
|
'''Returns True if the cache entry is still valid. Returns False otherwise.'''
|
||
|
if username in self.__user_cache:
|
||
|
if timezone.now() <= self.__user_cache[username]:
|
||
|
# Cache entry is still valid
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
def __remove_cache(self, username):
|
||
|
del self.__user_cache[username]
|
||
|
|
||
|
def has_user(self, username):
|
||
|
'''
|
||
|
Since we don't care about the password and so authentication
|
||
|
another way, all we care about is whether the user exists.
|
||
|
'''
|
||
|
if self.__is_cache_valid(username):
|
||
|
return True
|
||
|
if username in self.__user_cache:
|
||
|
self.__remove_cache(username)
|
||
|
|
||
|
filterstr = self.__LDAP_FILTER.replace('%s', username)
|
||
|
try:
|
||
|
result = self.__ldap_connection.search_s(self.__LDAP_SEARCH_BASE,
|
||
|
ldap.SCOPE_SUBTREE,
|
||
|
filterstr=filterstr)
|
||
|
except ldap.NO_RESULTS_RETURNED:
|
||
|
# We handle the specific error first and the the generic error, as
|
||
|
# we may expect ldap.NO_RESULTS_RETURNED, but not any other error
|
||
|
return False
|
||
|
except ldap.LDAPError as err:
|
||
|
logging.error(f'Error occured while performing an LDAP query: {err.desc}')
|
||
|
return False
|
||
|
|
||
|
if len(result) == 1:
|
||
|
self.__user_cache[username] = timezone.now() + timezone.timedelta(hours=1)
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
class LDAPUserExists(BasePermission):
|
||
|
'''
|
||
|
A permission check which first checks with the LDAP directory if the user
|
||
|
exists.
|
||
|
'''
|
||
|
def has_permission(self, request, view):
|
||
|
return LDAPConnection.get_instance().has_user(request.user.username):
|
||
|
|
||
|
def create_user(*args, **kwargs):
|
||
|
'''
|
||
|
A create_user function which first checks if the user already exists in the
|
||
|
configured LDAP directory.
|
||
|
'''
|
||
|
if not LDAPConnection.get_instance().has_user(kwargs['username']):
|
||
|
raise PermissionDenied('User not in the LDAP directory.')
|
||
|
return User.objects.create_user(*args, **kwargs)
|