Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Token authentication issue with FastAPI and JWT tokens – "Could not validate credentials"

I am building an API using Python 3.10.8 and FastAPI 0.95.1, and I’m experiencing an issue with user authentication, specifically related to JWT tokens. I have followed the guide provided in FastAPI’s security documentation.

The problem arises when I make a request to an endpoint that requires user authentication. Instead of receiving a valid JWT token in the get_current_user() function, the token is being passed as the string "undefined". As a result, I encounter a 401 Error with the message "Could not validate credentials." In the Chrome developer tools, the Authorization header also shows "Bearer undefined".

# --- dependencies.py ---

from database import recipes_db
from datetime import datetime, timedelta
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from utils.models import UserInternal, UserExternal, TokenData
from settings import SECRET_KEY, ALGORITHM
from typing import Annotated


SELECT_USER = """
SELECT
    id, 
    email, 
    first_name as firstName, 
    hashed_password as hashedPassword
FROM
    users
WHERE
    email = :email
"""


password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# This context can be used to hash passwords and verify them later on.
# The deprecated argument is set to "auto", which means that the library
# will automatically deprecate old algorithms and switch to new ones as needed.

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# This is a security scheme used for authenticating users with OAuth2.
# The tokenUrl parameter is set to "token", which is the endpoint where the
# user can obtain an access token by providing their credentials. The oauth2_scheme
# object can be used as a dependency in FastAPI endpoints to enforce authentication.


def verify_hash(plain_text: str, hashed_text: str) -> bool:
    """
    Verifies that a plain text matches a hashed text.

    Args:
        plain_text (str): The plain text to verify.
        hashed_text (str): The hashed text to compare against.

    Returns:
        bool: True if the plain text matches the hashed text, False otherwise.
    """
    return password_context.verify(plain_text, hashed_text)


def create_hash(text: str) -> str:
    """
    Hashes a string (e.g., password or token)

    Args:
        text (str): The plain text to hash.

    Returns:
        str: The hashed text.
    """
    return password_context.hash(text)


async def get_user(email: str, internal: bool = True) -> UserExternal | UserInternal:
    """
    Fetches a user from the database by their email.

    Args:
        email (str): The email of the user.
        internal (bool, optional): If True, returns an internal user object.
            If False, returns an external user object. Defaults to True.

    Returns:
        UserInternal or UserExternal: The user object.
    """
    data = await recipes_db.fetch_one(
        query=SELECT_USER,
        values={'email': email}
    )
    if data:
        if internal:
            user = UserInternal(**data)
        else:
            user = UserExternal(**data)
        return user


async def authenticate_user(email: str, password: str):
    """
    Authenticates a user.

    Args:
        email (str): The email of the user.
        password (str): The user's password.

    Returns:
        UserInternal: The authenticated user object if successful, otherwise False.
    """
    # Retrieve an UserInternal object
    user = await get_user(email)
    if not user:
        return False
    if not verify_hash(password, user.hashedPassword):
        return False
    return user


def create_access_token(data: dict, expires_delta: timedelta | None = None):
    """
    Generates a JWT access token.

    Args:
        data (dict): The data to include in the token.
        expires_delta (timedelta | None): The expiration time for the token, or None for default.

    Returns:
        bytes: The encoded access token.
    """
    # Make a copy of the data so we don't modify the original dictionary
    to_encode = data.copy()

    # If an expiration time is provided, set the 'exp' claim to that time
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})

    # Encode the token using the JWT library
    encoded_jwt = jwt.encode(to_encode, str(SECRET_KEY), algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    """
    Gets the current user from an authentication token.

    Args:
        token (Annotated[str, Depends(oauth2_scheme)]): The authentication token.

    Raises:
        HTTPException: If the credentials cannot be validated.

    Returns:
        UserInternal: The current user.
    """
    # Define the exception to raise if credentials cannot be validated
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    try:
        # Decode the JWT token
        payload = jwt.decode(token, str(SECRET_KEY), algorithms=[ALGORITHM])
        email: str = payload.get("sub")

        # Raise exception if the email is missing from the payload
        if email is None:
            raise credentials_exception

        # Create TokenData object with email
        token_data = TokenData(email=email)
    except JWTError as e:
        # Raise exception if there is a JWT error
        raise credentials_exception

    # Get user from database using email from token data
    user = await get_user(email=token_data.email)

    # Raise exception if user not found
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(current_user: Annotated[UserInternal, Depends(get_current_user)]):
    # if current_user.disabled:
    #     raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

# --- main.py ---

@app.post("/token", response_model=Token)
async def login_for_access_token(
    formData: Annotated[OAuth2PasswordRequestForm, Depends()]
):
    """
    Logs in and receives an access token.

    ### Arguments

    - `formData` (`Annotated[OAuth2PasswordRequestForm, Depends()]`): The OAuth2 password request form.

    ### Returns

    - `Token`: The access token.
    """
    # Authenticate the user credentials, i.e. check username and password in database
    user = await dependencies.authenticate_user(
        formData.username, formData.password
    )
    # If user credentials are invalid, raise an HTTPException with a 401 status code
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    # Define access token expiry time
    access_token_expires = timedelta(
        minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
    )
    # Create access token using user email and expiry time
    access_token = dependencies.create_access_token(
        data={"sub": user.email}, expires_delta=access_token_expires
    )

    return {"accessToken": access_token, "tokenType": "bearer"}

When I request an endpoint that depends on a user sign in, i.e.:

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

@router.get("/")
async def get_user(
    currentUser: Annotated[UserInternal, Depends(get_current_active_user)]):
        return currentUser

I expected the get_current_user() function to receive a valid JWT token from the Authorization header, decode it, and extract the email address from the payload. The JWT is being correctly created in the login_for_access_token() endpoint but it seems as if this is not being passed in the headers of the request in the automatically generated OpenAPI docs.

What can I do to fix this?

>Solution :

You have wrote:

return {"accessToken": access_token, "tokenType": "bearer"}

But according to the documentation you have provided:

return {"access_token": access_token, "token_type": "bearer"}

It should be snake case, not camel case.

Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading