diff --git a/oauth2_provider/views/base.py b/oauth2_provider/views/base.py index abaa81f59..62b391b0d 100644 --- a/oauth2_provider/views/base.py +++ b/oauth2_provider/views/base.py @@ -66,7 +66,83 @@ def redirect(self, redirect_to, application): RFC3339 = "%Y-%m-%dT%H:%M:%SZ" -class AuthorizationView(BaseAuthorizationView, FormView): +class AuthorizationMixin: + def get_context(self, request, *args, **kwargs): + try: + scopes, credentials = self.validate_authorization_request(request) + except OAuthToolkitError as error: + # Application is not available at this time. + return self.error_response(error, application=None) + + prompt = request.GET.get("prompt") + if prompt == "login": + return self.handle_prompt_login() + + all_scopes = get_scopes_backend().get_all_scopes() + kwargs["scopes_descriptions"] = [all_scopes[scope] for scope in scopes] + kwargs["scopes"] = scopes + # at this point we know an Application instance with such client_id exists in the database + + # TODO: Cache this! + application = get_application_model().objects.get(client_id=credentials["client_id"]) + + kwargs["application"] = application + kwargs["client_id"] = credentials["client_id"] + kwargs["redirect_uri"] = credentials["redirect_uri"] + kwargs["response_type"] = credentials["response_type"] + kwargs["state"] = credentials["state"] + if "code_challenge" in credentials: + kwargs["code_challenge"] = credentials["code_challenge"] + if "code_challenge_method" in credentials: + kwargs["code_challenge_method"] = credentials["code_challenge_method"] + if "nonce" in credentials: + kwargs["nonce"] = credentials["nonce"] + if "claims" in credentials: + kwargs["claims"] = json.dumps(credentials["claims"]) + + self.oauth2_data = kwargs + + # Check to see if the user has already granted access and return + # a successful response depending on "approval_prompt" url parameter + require_approval = request.GET.get("approval_prompt", oauth2_settings.REQUEST_APPROVAL_PROMPT) + + try: + # If skip_authorization field is True, skip the authorization screen even + # if this is the first use of the application and there was no previous authorization. + # This is useful for in-house applications-> assume an in-house applications + # are already approved. + if application.skip_authorization: + uri, headers, body, status = self.create_authorization_response( + request=self.request, scopes=" ".join(scopes), credentials=credentials, allow=True + ) + return self.redirect(uri, application) + + elif require_approval == "auto": + tokens = ( + get_access_token_model() + .objects.filter( + user=request.user, application=kwargs["application"], expires__gt=timezone.now() + ) + .all() + ) + + # check past authorizations regarded the same scopes as the current one + for token in tokens: + if token.allow_scopes(scopes): + uri, headers, body, status = self.create_authorization_response( + request=self.request, + scopes=" ".join(scopes), + credentials=credentials, + allow=True, + ) + return self.redirect(uri, application) + + except OAuthToolkitError as error: + return self.error_response(error, application) + return kwargs + + +class AuthorizationView(BaseAuthorizationView, FormView, AuthorizationMixin): """ Implements an endpoint to handle *Authorization Requests* as in :rfc:`4.1.1` and prompting the user with a form to determine if she authorizes the client application to access her data. @@ -128,7 +204,6 @@ def form_valid(self, form): scopes = form.cleaned_data.get("scope") allow = form.cleaned_data.get("allow") - try: uri, headers, body, status = self.create_authorization_response( request=self.request, scopes=scopes, credentials=credentials, allow=allow @@ -141,82 +216,13 @@ def form_valid(self, form): return self.redirect(self.success_url, application) def get(self, request, *args, **kwargs): - try: - scopes, credentials = self.validate_authorization_request(request) - except OAuthToolkitError as error: - # Application is not available at this time. - return self.error_response(error, application=None) - - prompt = request.GET.get("prompt") - if prompt == "login": - return self.handle_prompt_login() - - all_scopes = get_scopes_backend().get_all_scopes() - kwargs["scopes_descriptions"] = [all_scopes[scope] for scope in scopes] - kwargs["scopes"] = scopes - # at this point we know an Application instance with such client_id exists in the database - - # TODO: Cache this! - application = get_application_model().objects.get(client_id=credentials["client_id"]) - - kwargs["application"] = application - kwargs["client_id"] = credentials["client_id"] - kwargs["redirect_uri"] = credentials["redirect_uri"] - kwargs["response_type"] = credentials["response_type"] - kwargs["state"] = credentials["state"] - if "code_challenge" in credentials: - kwargs["code_challenge"] = credentials["code_challenge"] - if "code_challenge_method" in credentials: - kwargs["code_challenge_method"] = credentials["code_challenge_method"] - if "nonce" in credentials: - kwargs["nonce"] = credentials["nonce"] - if "claims" in credentials: - kwargs["claims"] = json.dumps(credentials["claims"]) - - self.oauth2_data = kwargs - # following two loc are here only because of https://code.djangoproject.com/ticket/17795 - form = self.get_form(self.get_form_class()) - kwargs["form"] = form - - # Check to see if the user has already granted access and return - # a successful response depending on "approval_prompt" url parameter - require_approval = request.GET.get("approval_prompt", oauth2_settings.REQUEST_APPROVAL_PROMPT) - - try: - # If skip_authorization field is True, skip the authorization screen even - # if this is the first use of the application and there was no previous authorization. - # This is useful for in-house applications-> assume an in-house applications - # are already approved. - if application.skip_authorization: - uri, headers, body, status = self.create_authorization_response( - request=self.request, scopes=" ".join(scopes), credentials=credentials, allow=True - ) - return self.redirect(uri, application) - - elif require_approval == "auto": - tokens = ( - get_access_token_model() - .objects.filter( - user=request.user, application=kwargs["application"], expires__gt=timezone.now() - ) - .all() - ) - - # check past authorizations regarded the same scopes as the current one - for token in tokens: - if token.allow_scopes(scopes): - uri, headers, body, status = self.create_authorization_response( - request=self.request, - scopes=" ".join(scopes), - credentials=credentials, - allow=True, - ) - return self.redirect(uri, application) - - except OAuthToolkitError as error: - return self.error_response(error, application) - - return self.render_to_response(self.get_context_data(**kwargs)) + context = self.get_context(request, *args, **kwargs) + if isinstance(context, dict): + form = self.get_form(self.get_form_class()) + context["form"] = form + return self.render_to_response(self.get_context_data(**context)) + else: + return context def handle_prompt_login(self): path = self.request.build_absolute_uri()