Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an authorize endpoint that uses JSON instead of a Django template… #1306

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
162 changes: 84 additions & 78 deletions oauth2_provider/views/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,83 @@ def redirect(self, redirect_to, application):
RFC3339 = "%Y-%m-%dT%H:%M:%SZ"


class AuthorizationView(BaseAuthorizationView, FormView):
class AuthorizationMixin:
def get_context(self, request, *args, **kwargs):
try:
scopes, credentials = self.validate_authorization_request(request)
except OAuthToolkitError as error:
# Application is not available at this time.
return self.error_response(error, application=None)

prompt = request.GET.get("prompt")
if prompt == "login":
return self.handle_prompt_login()

all_scopes = get_scopes_backend().get_all_scopes()
kwargs["scopes_descriptions"] = [all_scopes[scope] for scope in scopes]
kwargs["scopes"] = scopes
# at this point we know an Application instance with such client_id exists in the database

# TODO: Cache this!
application = get_application_model().objects.get(client_id=credentials["client_id"])

kwargs["application"] = application
kwargs["client_id"] = credentials["client_id"]
kwargs["redirect_uri"] = credentials["redirect_uri"]
kwargs["response_type"] = credentials["response_type"]
kwargs["state"] = credentials["state"]
if "code_challenge" in credentials:
kwargs["code_challenge"] = credentials["code_challenge"]
if "code_challenge_method" in credentials:
kwargs["code_challenge_method"] = credentials["code_challenge_method"]
if "nonce" in credentials:
kwargs["nonce"] = credentials["nonce"]
if "claims" in credentials:
kwargs["claims"] = json.dumps(credentials["claims"])

self.oauth2_data = kwargs

# Check to see if the user has already granted access and return
# a successful response depending on "approval_prompt" url parameter
require_approval = request.GET.get("approval_prompt", oauth2_settings.REQUEST_APPROVAL_PROMPT)

try:
# If skip_authorization field is True, skip the authorization screen even
# if this is the first use of the application and there was no previous authorization.
# This is useful for in-house applications-> assume an in-house applications
# are already approved.
if application.skip_authorization:
uri, headers, body, status = self.create_authorization_response(
request=self.request, scopes=" ".join(scopes), credentials=credentials, allow=True
)
return self.redirect(uri, application)

elif require_approval == "auto":
tokens = (
get_access_token_model()
.objects.filter(
Copy link

@dashdanw dashdanw Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could potentially avoid iteration by filtering by your desired scopes (assuming the scopes var here is the required scopes of the view)

Suggested change
.objects.filter(
scopes_filter = { 'scopes__icontains': scope for scope in scopes }
.objects.filter(
user=request.user,
application=kwargs["application"],
expires__gt=timezone.now(),
**scopes_filter
).first()

user=request.user, application=kwargs["application"], expires__gt=timezone.now()
)
.all()
)

# check past authorizations regarded the same scopes as the current one
for token in tokens:
if token.allow_scopes(scopes):
uri, headers, body, status = self.create_authorization_response(
request=self.request,
scopes=" ".join(scopes),
credentials=credentials,
allow=True,
)
return self.redirect(uri, application)

except OAuthToolkitError as error:
return self.error_response(error, application)
return kwargs


class AuthorizationView(BaseAuthorizationView, FormView, AuthorizationMixin):
"""
Implements an endpoint to handle *Authorization Requests* as in :rfc:`4.1.1` and prompting the
user with a form to determine if she authorizes the client application to access her data.
Expand Down Expand Up @@ -128,7 +204,6 @@ def form_valid(self, form):

scopes = form.cleaned_data.get("scope")
allow = form.cleaned_data.get("allow")

try:
uri, headers, body, status = self.create_authorization_response(
request=self.request, scopes=scopes, credentials=credentials, allow=allow
Expand All @@ -141,82 +216,13 @@ def form_valid(self, form):
return self.redirect(self.success_url, application)

def get(self, request, *args, **kwargs):
try:
scopes, credentials = self.validate_authorization_request(request)
except OAuthToolkitError as error:
# Application is not available at this time.
return self.error_response(error, application=None)

prompt = request.GET.get("prompt")
if prompt == "login":
return self.handle_prompt_login()

all_scopes = get_scopes_backend().get_all_scopes()
kwargs["scopes_descriptions"] = [all_scopes[scope] for scope in scopes]
kwargs["scopes"] = scopes
# at this point we know an Application instance with such client_id exists in the database

# TODO: Cache this!
application = get_application_model().objects.get(client_id=credentials["client_id"])

kwargs["application"] = application
kwargs["client_id"] = credentials["client_id"]
kwargs["redirect_uri"] = credentials["redirect_uri"]
kwargs["response_type"] = credentials["response_type"]
kwargs["state"] = credentials["state"]
if "code_challenge" in credentials:
kwargs["code_challenge"] = credentials["code_challenge"]
if "code_challenge_method" in credentials:
kwargs["code_challenge_method"] = credentials["code_challenge_method"]
if "nonce" in credentials:
kwargs["nonce"] = credentials["nonce"]
if "claims" in credentials:
kwargs["claims"] = json.dumps(credentials["claims"])

self.oauth2_data = kwargs
# following two loc are here only because of https://code.djangoproject.com/ticket/17795
form = self.get_form(self.get_form_class())
kwargs["form"] = form

# Check to see if the user has already granted access and return
# a successful response depending on "approval_prompt" url parameter
require_approval = request.GET.get("approval_prompt", oauth2_settings.REQUEST_APPROVAL_PROMPT)

try:
# If skip_authorization field is True, skip the authorization screen even
# if this is the first use of the application and there was no previous authorization.
# This is useful for in-house applications-> assume an in-house applications
# are already approved.
if application.skip_authorization:
uri, headers, body, status = self.create_authorization_response(
request=self.request, scopes=" ".join(scopes), credentials=credentials, allow=True
)
return self.redirect(uri, application)

elif require_approval == "auto":
tokens = (
get_access_token_model()
.objects.filter(
user=request.user, application=kwargs["application"], expires__gt=timezone.now()
)
.all()
)

# check past authorizations regarded the same scopes as the current one
for token in tokens:
if token.allow_scopes(scopes):
uri, headers, body, status = self.create_authorization_response(
request=self.request,
scopes=" ".join(scopes),
credentials=credentials,
allow=True,
)
return self.redirect(uri, application)

except OAuthToolkitError as error:
return self.error_response(error, application)

return self.render_to_response(self.get_context_data(**kwargs))
context = self.get_context(request, *args, **kwargs)
if isinstance(context, dict):
form = self.get_form(self.get_form_class())
context["form"] = form
return self.render_to_response(self.get_context_data(**context))
else:
return context

def handle_prompt_login(self):
path = self.request.build_absolute_uri()
Expand Down