Source code for flask_graphql_auth.decorators
from flask import _app_ctx_stack as ctx_stack, current_app, request
from functools import wraps
import jwt
from .exceptions import *
from .fields import *
[docs]def decode_jwt(encoded_token, secret, algorithm, identity_claim_key, user_claims_key):
"""
Decodes an encoded JWT
:param encoded_token: The encoded JWT string to decode
:param secret: Secret key used to encode the JWT
:param algorithm: Algorithm used to encode the JWT
:param identity_claim_key: expected key that contains the identity
:param user_claims_key: expected key that contains the user claims
:return: Dictionary containing contents of the JWT
"""
# This call verifies the ext, iat, and nbf claims
data = jwt.decode(encoded_token, secret, algorithms=[algorithm])
# Make sure that any custom claims we expect in the token are present
if "jti" not in data:
raise JWTDecodeError("Missing claim: jti")
if identity_claim_key not in data:
raise JWTDecodeError("Missing claim: {}".format(identity_claim_key))
if "type" not in data or data["type"] not in ("refresh", "access"):
raise JWTDecodeError("Missing or invalid claim: type")
if user_claims_key not in data:
data[user_claims_key] = {}
return data
[docs]def get_jwt_data(token, token_type):
"""
Decodes encoded JWT token by using extension setting and validates token type
:param token: The encoded JWT string to decode
:param token_type: JWT type for type validation (access or refresh)
:return: Dictionary containing contents of the JWT
"""
jwt_data = decode_jwt(
encoded_token=token,
secret=current_app.config["JWT_SECRET_KEY"],
algorithm="HS256",
identity_claim_key=current_app.config["JWT_IDENTITY_CLAIM"],
user_claims_key=current_app.config["JWT_USER_CLAIMS"],
)
# token type verification
if jwt_data["type"] != token_type:
raise WrongTokenError("Only {} tokens are allowed".format(token_type))
return jwt_data
[docs]def verify_jwt_in_argument(token):
"""
Verify access token
:param token: The encoded access type JWT string to decode
:return: Dictionary containing contents of the JWT
"""
jwt_data = get_jwt_data(token, "access")
ctx_stack.top.jwt = jwt_data
[docs]def verify_refresh_jwt_in_argument(token):
"""
Verify refresh token
:param token: The encoded refresh type JWT string to decode
:return: Dictionary containing contents of the JWT
"""
jwt_data = get_jwt_data(token, "refresh")
ctx_stack.top.jwt = jwt_data
def _extract_header_token_value(request_headers):
"""
Extract token value from the request headers.
It uses the token found in the header specified in the
JWT_HEADER_NAME configuration variable and requires
the token to have the prefix specified in the
JWT_HEADER_TOKEN_PREFIX variable
:param request_headers: Request headers as dict
:return: Token value as a string (None if token is not found)
"""
authorization_header = request_headers.get(current_app.config["JWT_HEADER_NAME"])
token_prefix = current_app.config["JWT_HEADER_TOKEN_PREFIX"].lower()
if authorization_header and authorization_header.lower().startswith(token_prefix):
return authorization_header.split()[-1]
return None
[docs]def query_jwt_required(fn):
"""
A decorator to protect a query resolver.
If you decorate an resolver with this, it will ensure that the requester
has a valid access token before allowing the resolver to be called. This
does not check the freshness of the access token.
"""
@wraps(fn)
def wrapper(*args, **kwargs):
token = kwargs.pop(current_app.config["JWT_TOKEN_ARGUMENT_NAME"])
try:
verify_jwt_in_argument(token)
except Exception as e:
return AuthInfoField(message=str(e))
return fn(*args, **kwargs)
return wrapper
[docs]def query_jwt_refresh_token_required(fn):
"""
A decorator to protect a query resolver.
If you decorate an query resolver with this, it will ensure that the requester
has a valid refresh token before allowing the resolver to be called.
"""
@wraps(fn)
def wrapper(*args, **kwargs):
token = kwargs.pop(current_app.config["JWT_REFRESH_TOKEN_ARGUMENT_NAME"])
try:
verify_refresh_jwt_in_argument(token)
except Exception as e:
return AuthInfoField(message=str(e))
return fn(*args, **kwargs)
return wrapper
[docs]def mutation_jwt_required(fn):
"""
A decorator to protect a mutation.
If you decorate a mutation with this, it will ensure that the requester
has a valid access token before allowing the mutation to be called. This
does not check the freshness of the access token.
"""
@wraps(fn)
def wrapper(cls, *args, **kwargs):
token = kwargs.pop(current_app.config["JWT_TOKEN_ARGUMENT_NAME"])
try:
verify_jwt_in_argument(token)
except Exception as e:
return cls(AuthInfoField(message=str(e)))
return fn(cls, *args, **kwargs)
return wrapper
[docs]def mutation_jwt_refresh_token_required(fn):
"""
A decorator to protect a mutation.
If you decorate a mutation with this, it will ensure that the requester
has a valid refresh token before allowing the mutation to be called.
"""
@wraps(fn)
def wrapper(cls, *args, **kwargs):
token = kwargs.pop(current_app.config["JWT_REFRESH_TOKEN_ARGUMENT_NAME"])
try:
verify_refresh_jwt_in_argument(token)
except Exception as e:
return cls(AuthInfoField(message=str(e)))
return fn(*args, **kwargs)
return wrapper