In most scenarios we will want to at least protect parts of the API; whether you're using a templating engine like Jinja or not, you'll almost always need to protect your API methods; for this, the most basic way is to use an authentication system, either based on session or authentication tokens; with these mechanisms, the user is granted rights so that he can access the private methods of the API and we can know at all times (done this implementation) when the user manipulates the data and at what level.
In this section we will see some authentication methods that are directly integrated into the automatic documentation and other custom schemes that consist of a username and password, integrating with the database and the generation of authentication or access tokens.
The most basic authentication scheme that can be considered is where the user's credentials are managed based on a token established in the request header, specifically in the header called Authorization to be later consulted and evaluated by the request application:
api.py
from fastapi.security import APIKeyHeader
from fastapi import Depends, FastAPI, HTTPException, status
API_KEY_TOKEN = "SECRET_PASSWORD"
api_key_header = APIKeyHeader(name="Token")
@app.get("/protected-route")
async def protected_route(token: str = Depends(api_key_header)):
if token != API_KEY_TOKEN:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return {"hello": "FASTAPI"}
In the example above, a generic token is set using API_KEY_TOKEN and it is checked if the token supplied in the request header is equal to the token; the APIKeyHeader class (which returns a 403 error when the token is not supplied) is used to retrieve a header value (in the above example the variable called token). Finally, it is established as one more dependency of the request.
If we go to the documentation, in the method created previously, we will see a lock, which indicates that it is protected:
If we make the request without authenticating ourselves, we will see a 403 error (the one established above):
Code Details
403
Undocumented
Error: Forbidden
Response body
Download
{
"detail": "Not authenticated"
}
If we click on the lock and set the token correctly (a valid token):
The request will be successfully processed with a status code of type 200.
The problem with this approach is that we can only protect one route; we can create a function that performs the above check:
from fastapi.security import APIKeyHeader
from fastapi import Depends, FastAPI, HTTPException, status
API_KEY_TOKEN = "SECRET_PASSWORD"
async def authenticate(token: str = Depends(APIKeyHeader(name="Token"))):
if token != API_KEY_TOKEN:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return token
And it is injected into each of the API methods in which we require the user to be authenticated; for example:
@app.get("/page/")
def page(db: Session = Depends(get_database_session), dependencies=Depends(authenticate)):
print(getAll(db))
#create_user(db)
print(dependencies)
return {"page": 1}
These examples offer a very basic authentication scheme where the received token is verified against a constant; authenticated users are not handled and there is no way to know which user is making the request.
The previous authentication can be used to define simple schemes, but, in most cases, it is necessary to handle more complete schemes, in which we will have multiple users with the already known username/password combination to be able to enter the application, for these cases, we can use the following scheme.
For this development, we are going to need to install a package to the project with which we can convert plain texts into hashes, specifically we will use it to generate a hash of the user's password:
$ pip install 'passlib[bcrypt]'
Next, we will modify the users model to specify the password column (in hash format) and create a relational entity to the user with which we will manage the access tokens:
database\models.py
class User(Base):
***
hashed_password = Column(String(255))
class AccessToken(Base):
__tablename__ = 'access_tokens'
user_id = Column(Integer, ForeignKey('users.id'), primary_key=True)
access_token = Column(String(255))
expiration_date = Column(DateTime(timezone=True))
user = relationship('User',lazy="joined")
With each of the columns of the previous relation, we have:
Remember to delete all the tables in the database to create the new relationship and see the change reflected in the user model.
We create a schema of the new entity and modify the user entity to indicate the password field:
schemas.py
class User(BaseModel):
name: str = Field(min_length=5)
surname: str
email: EmailStr
website: str #HttpUrl
class Config:
from_attributes = True
class UserCreate(User):
password: str
class UserDB(User):
hashed_password: str
class AccessToken(BaseModel):
user_id: int
access_token: str
expiration_date: datetime
class Config:
from_attributes = True
For the user, if a user is going to be created, then the password is in plain text, since it is introduced by the user (UserCreate.password), but, when it comes to the user obtained from the database, the password is in hash format (UserDB.hashed_password).
To manage the password, we will use a new file:
authentication\password.py
import secrets
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"])
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def generate_token() -> str:
return secrets.token_urlsafe(32)
Explanation of the above functions
To carry out the user authentication process and generate the access token, we will use a new file:
authentication\authentication.py
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
# from time
from database.models import User, AccessToken
from authentication.password import verify_password, generate_token
def authenticate(email: str, password: str, db: Session) -> User|None:
user = db.query(User).filter(User.email == email).first()
if user is None:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def create_access_token(user:User, db: Session) -> AccessToken:
tomorrow = datetime.now() + timedelta(days=1) # time.time + 60 * 60 * 24
accessToken = AccessToken(user_id=user.id, expiration_date=tomorrow, access_token=generate_token())
db.add(accessToken)
db.commit()
db.refresh(accessToken)
return accessToken
Explanation of the above functions
We will create a new file where we implement the options to register a user and create the user's token (login):
user.py
from fastapi import APIRouter, HTTPException, status, Depends
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from schemes import UserCreate
from authentication import password, authentication
from database import database, models
user_router = APIRouter()
@user_router.post('/token')
def create_token(form_data : OAuth2PasswordRequestForm = Depends(OAuth2PasswordRequestForm), db: Session = Depends(database.get_database_session)):
email = form_data.username
password = form_data.password
user = authentication.authenticate(email,password,db)
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
token = authentication.create_access_token(user, db)
return {"access_token": token.access_token}
@user_router.post('/register', status_code=status.HTTP_201_CREATED)
def register(user: UserCreate, db:Session = Depends(database.get_database_session)): # -> models.User
user_exist = db.query(models.User).filter(models.User.email == user.email).first()
if user_exist:
raise HTTPException(status_code=status.HTTP_409_CONFLICT,
detail="User email already exist")
hashed_password = password.get_password_hash(user.password)
print(hashed_password)
userdb = models.User(name=user.name,
email= user.email,
surname = user.surname,
website= user.website,
hashed_password=hashed_password)
db.add(userdb)
db.commit()
db.refresh(userdb)
return userdb
With the OAuth2PasswordRequestForm class, it is injected as a dependency of the function to generate the token and allows to declare in a form body with the following fields:
Of course, you could use the similar Pydantic classes or the
Body, Form, or classes instead, something like:
create_token(email: str = Form(), password: str = Form())
Finally, the previous routes are included in the application:
api.py
from user import user_router
***
app.include_router(user_router)
***
And with this we will have:
This material is part of my complete course and book on FastAPI.
- Andrés Cruz
Develop with Laravel, Django, Flask, CodeIgniter, HTML5, CSS3, MySQL, JavaScript, Vue, Android, iOS, Flutter
I agree to receive announcements of interest about this Blog.
!Courses from!
10$
On Udemy
There are 1d 18:21!
!Courses from!
4$
In Academy
View courses!Books from!
1$
See the books