Implement authentication with LDAP
Enable an additional lookup against an LDAP directory during login and user creation to ensure that only specific users can login and register on the EteBase server instance.
This commit is contained in:
parent
801826b8b6
commit
1fef1e2b7a
@ -79,5 +79,33 @@ class AppSettings:
|
||||
def CHALLENGE_VALID_SECONDS(self): # pylint: disable=invalid-name
|
||||
return self._setting("CHALLENGE_VALID_SECONDS", 60)
|
||||
|
||||
@cached_property
|
||||
def USE_LDAP(self): # pylint: disable=invalid-name
|
||||
return self._setting("USE_LDAP", False)
|
||||
|
||||
@cached_property
|
||||
def LDAP_FILTER(self): # pylint: disable=invalid-name
|
||||
return self._setting("LDAP_FILTER", "")
|
||||
|
||||
@cached_property
|
||||
def LDAP_SEARCH_BASE(self): # pylint: disable=invalid-name
|
||||
return self._setting("LDAP_SEARCH_BASE", "")
|
||||
|
||||
@cached_property
|
||||
def LDAP_SERVER(self): # pylint: disable=invalid-name
|
||||
return self._setting("LDAP_SERVER", "")
|
||||
|
||||
@cached_property
|
||||
def LDAP_BIND_DN(self): # pylint: disable=invalid-name
|
||||
return self._setting("LDAP_BIND_DN", "")
|
||||
|
||||
@cached_property
|
||||
def LDAP_BIND_PASSWORD(self): # pylint: disable=invalid-name
|
||||
return self._setting("LDAP_BIND_PW", "")
|
||||
|
||||
@cached_property
|
||||
def LDAP_SEARCH_BASE(self): # pylint: disable=invalid-name
|
||||
return self._setting("LDAP_SEARCH_BASE", "")
|
||||
|
||||
|
||||
app_settings = AppSettings('ETEBASE_')
|
||||
|
45
django_etebase/ldap.py
Normal file
45
django_etebase/ldap.py
Normal file
@ -0,0 +1,45 @@
|
||||
import logging
|
||||
|
||||
from . import app_settings
|
||||
|
||||
import ldap
|
||||
|
||||
class LDAPConnection:
|
||||
__instance__ = None
|
||||
|
||||
@staticmethod
|
||||
def get_instance():
|
||||
'''To get a Singleton'''
|
||||
if not LDAPConnection.__instance__:
|
||||
return LDAPConnection()
|
||||
else:
|
||||
return LDAPConnection.__instance__
|
||||
|
||||
def __init__(self):
|
||||
# Pull settings from django.conf.settings
|
||||
# NOTE: We assume that settings.USE_LDAP is True
|
||||
self.__ldap_connection = ldap.initialize(app_settings.LDAP_SERVER)
|
||||
try:
|
||||
self.__ldap_connection.simple_bind_s(app_settings.LDAP_BIND_DN,
|
||||
app_settings.LDAP_BIND_PASSWORD)
|
||||
except ldap.LDAPError as err:
|
||||
logging.error(f'LDAP Error occuring during initialization: {err.desc}')
|
||||
|
||||
def has_user(self, username):
|
||||
'''
|
||||
Since we don't care about the password and so authentication
|
||||
another way, all we care about is whether the user exists.
|
||||
'''
|
||||
filterstr = app_settings.LDAP_FILTER.replace('%s', username)
|
||||
try:
|
||||
result = self.__ldap_connection.search_s(app_settings.LDAP_SEARCH_BASE,
|
||||
ldap.SCOPE_SUBTREE,
|
||||
filterstr=filterstr)
|
||||
except ldap.NO_RESULTS_RETURNED:
|
||||
# We handle the specific error first and the the generic error, as
|
||||
# we may expect ldap.NO_RESULTS_RETURNED, but not any other error
|
||||
return False
|
||||
except ldap.LDAPError as err:
|
||||
logging.error(f'Error occured while performing an LDAP query: {err.desc}')
|
||||
return False
|
||||
return len(result) == 1
|
@ -1,16 +1,19 @@
|
||||
from django.utils import timezone
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from .. import app_settings
|
||||
|
||||
from rest_framework import exceptions
|
||||
from rest_framework.authentication import TokenAuthentication as DRFTokenAuthentication
|
||||
|
||||
from .models import AuthToken, get_default_expiry
|
||||
|
||||
if app_settings.USE_LDAP:
|
||||
from ..ldap import LDAPConnection
|
||||
|
||||
|
||||
AUTO_REFRESH = True
|
||||
MIN_REFRESH_INTERVAL = 60
|
||||
|
||||
|
||||
class TokenAuthentication(DRFTokenAuthentication):
|
||||
keyword = 'Token'
|
||||
model = AuthToken
|
||||
@ -23,6 +26,10 @@ class TokenAuthentication(DRFTokenAuthentication):
|
||||
except model.DoesNotExist:
|
||||
raise exceptions.AuthenticationFailed(msg)
|
||||
|
||||
if app_settings.USE_LDAP:
|
||||
if not LDAPConnection.get_instance().has_user(token.user.username):
|
||||
raise exceptions.AuthenticationFailed('User is not listed in the LDAP registry.')
|
||||
|
||||
if not token.user.is_active:
|
||||
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
|
||||
|
||||
|
@ -2,6 +2,7 @@ from django.contrib.auth import get_user_model
|
||||
from django.core.exceptions import PermissionDenied
|
||||
|
||||
from . import app_settings
|
||||
from .ldap import LDAPConnection
|
||||
|
||||
|
||||
User = get_user_model()
|
||||
@ -15,6 +16,11 @@ def get_user_queryset(queryset, view):
|
||||
|
||||
|
||||
def create_user(*args, **kwargs):
|
||||
# Check if the LDAP query returns exactly one user
|
||||
if app_settings.USE_LDAP:
|
||||
if not LDAPConnection.get_instance().has_user(kwargs['username']):
|
||||
raise PermissionDenied('User is not listed in the LDAP registry.')
|
||||
|
||||
custom_func = app_settings.CREATE_USER_FUNC
|
||||
if custom_func is not None:
|
||||
return custom_func(*args, **kwargs)
|
||||
|
@ -164,6 +164,17 @@ if any(os.path.isfile(x) for x in config_locations):
|
||||
if 'database' in config:
|
||||
DATABASES = { 'default': { x.upper(): y for x, y in config.items('database') } }
|
||||
|
||||
if 'ldap' in config:
|
||||
ldap = config['ldap']
|
||||
ETEBASE_USE_LDAP = True
|
||||
ETEBASE_LDAP_SERVER = ldap.get('server', '')
|
||||
ETEBASE_LDAP_SEARCH_BASE = ldap.get('search_base', '')
|
||||
ETEBASE_LDAP_FILTER = ldap.get('filter', '')
|
||||
ETEBASE_LDAP_BIND_DN = ldap.get('bind_dn', '')
|
||||
ETEBASE_LDAP_BIND_PW = ldap.get('bind_pw', '')
|
||||
else:
|
||||
ETEBASE_USE_LDAP = False
|
||||
|
||||
ETEBASE_API_PERMISSIONS = ('rest_framework.permissions.IsAuthenticated', )
|
||||
ETEBASE_API_AUTHENTICATORS = ('django_etebase.token_auth.authentication.TokenAuthentication',
|
||||
'rest_framework.authentication.SessionAuthentication')
|
||||
|
Loading…
Reference in New Issue
Block a user