En la mayoría de los escenarios vamos a querer al menos proteger partes de la API; ya sea que emplees un motor de plantillas como Jinja o no, casi siempre va a ser necesario proteger los métodos de la API; para ello, la manera más básica es emplear un sistema de autenticación, ya sea en base a sesión o tokens de autenticación; con estos mecanismos, se le otorga derechos al usuario para que pueda acceder a los métodos privados de la API y podamos conocer en todo momento (realizada esta implementación) cuando el usuario manipula los datos y en qué nivel.
En este apartado veremos algunos métodos de autenticación que se integran directamente a la documentación automática y otros esquemas personalizados que constan de usuario y contraseña integrándose con la base de datos y la generación de tokens de autenticación o accesos.
El esquema que se puede considerar más básico de autenticación, viene siendo en donde las credenciales del usuario se manejan en base a un token que se establece en la cabecera de la petición, específicamente en la cabecera llamada Authorization para luego ser consultadas y evaluadas por la aplicación:
api.py
from fastapi.security import APIKeyHeader
from fastapi import Depends, FastAPI, HTTPException, status
API_KEY_TOKEN = "SECRET_PASSWORD"
api_key_header = APIKeyHeader(name="Token")
@app.get("/protected-route")
async def protected_route(token: str = Depends(api_key_header)):
if token != API_KEY_TOKEN:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return {"hello": "FASTAPI"}
En el ejemplo anterior, se establece un token genérico mediante API_KEY_TOKEN y se verifica si el token suministrado en la cabecera de la petición es igual al del token; se utiliza la clase APIKeyHeader (la cual, devuelve un error de tipo 403 cuando el token no es suministrado) para recuperar un valor de un encabezado (en el ejemplo anterior la variable llamada token). Finalmente, se establece como una dependencia más de la petición.
Si vamos a la documentación, en el método creado anteriormente, veremos un candado, que indica que está protegido:
Si realizamos la petición sin autenticarnos, veremos un error 403 (el establecido anteriormente):
Code Details
403
Undocumented
Error: Forbidden
Response body
Download
{
"detail": "Not authenticated"
}
Si damos un click sobre el candado y establecemos el token de manera correcta (un token válido):
La petición será procesada de manera exitosa con un código de estado de tipo 200.
El problema con este enfoque es que solamente podemos proteger una ruta; podemos crear una función que realice la verificación anterior:
from fastapi.security import APIKeyHeader
from fastapi import Depends, FastAPI, HTTPException, status
API_KEY_TOKEN = "SECRET_PASSWORD"
async def authenticate(token: str = Depends(APIKeyHeader(name="Token"))):
if token != API_KEY_TOKEN:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return token
Y se inyecta en cada una de los métodos de la API en los cuales requerimos que el usuario esté autenticado; por ejemplo:
@app.get("/page/")
def page(db: Session = Depends(get_database_session), dependencies=Depends(authenticate)):
print(getAll(db))
#create_user(db)
print(dependencies)
return {"page": 1}
Estos ejemplos ofrecen un esquema de autenticación muy básico en donde el token recibido se verifica con una constante; no se manejan usuarios autenticados y no hay manera de saber que usuario está realizando la petición.
La autenticación anterior puede servir para definir esquemas simples, pero, en la mayoría de los casos es necesaria manejar esquemas más completos, en los cuales tendremos múltiples usuarios con la combinación ya conocida de usuario/contraseña para poder ingresar a la aplicación, para estos casos, podemos emplear el siguiente esquema.
Para este desarrollo, vamos a necesitar instalar un paquete al proyecto con el cual, podremos convertir textos planos en hash, específicamente lo utilizaremos para generar un hash del password del usuario:
$ pip install 'passlib[bcrypt]'
A continuación, modificaremos el modelo de usuarios para especificar la columna del password (en formato hash) y creamos una entidad relacional al usuario con la cual, manejaremos los tokens de acceso:
database\models.py
class User(Base):
***
hashed_password = Column(String(255))
class AccessToken(Base):
__tablename__ = 'access_tokens'
user_id = Column(Integer, ForeignKey('users.id'), primary_key=True)
access_token = Column(String(255))
expiration_date = Column(DateTime(timezone=True))
user = relationship('User',lazy="joined")
Con cada una de las columnas de la relación anterior, tenemos:
Recuerda borrar todas las tablas de la base de datos para crear la nueva relación y verse reflejado el cambio en el modelo de usuario.
Creamos un esquema de la nueva entidad y modificamos la entidad del usuario para indicar el campo de contraseña:
schemas.py
class User(BaseModel):
name: str = Field(min_length=5)
surname: str
email: EmailStr
website: str #HttpUrl
class Config:
from_attributes = True
class UserCreate(User):
password: str
class UserDB(User):
hashed_password: str
class AccessToken(BaseModel):
user_id: int
access_token: str
expiration_date: datetime
class Config:
from_attributes = True
Para el usuario, si se va a crear un usuario, entonces la contraseña esta en texto plano, ya que, es la introducida por el usuario (UserCreate.password), pero, cuando se trata del usuario obtenido desde la base de datos, la contraseña esta en formato hash (UserDB.hashed_password).
Para gestionar la contraseña, usaremos un nuevo archivo:
authentication\password.py
import secrets
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"])
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def generate_token() -> str:
return secrets.token_urlsafe(32)
Explicación de las funciones anteriores
Para realizar el proceso de la autenticación del usuario y generar el token de acceso, usaremos un nuevo archivo:
authentication\authentication.py
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
# from time
from database.models import User, AccessToken
from authentication.password import verify_password, generate_token
def authenticate(email: str, password: str, db: Session) -> User|None:
user = db.query(User).filter(User.email == email).first()
if user is None:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def create_access_token(user:User, db: Session) -> AccessToken:
tomorrow = datetime.now() + timedelta(days=1) # time.time + 60 * 60 * 24
accessToken = AccessToken(user_id=user.id, expiration_date=tomorrow, access_token=generate_token())
db.add(accessToken)
db.commit()
db.refresh(accessToken)
return accessToken
Explicación de las funciones anteriores
Crearemos un nuevo archivo en donde implementamos las opciones de registrar un usuario y crear el token (login) del usuario:
user.py
from fastapi import APIRouter, HTTPException, status, Depends
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from schemes import UserCreate
from authentication import password, authentication
from database import database, models
user_router = APIRouter()
@user_router.post('/token')
def create_token(form_data : OAuth2PasswordRequestForm = Depends(OAuth2PasswordRequestForm), db: Session = Depends(database.get_database_session)):
email = form_data.username
password = form_data.password
user = authentication.authenticate(email,password,db)
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
token = authentication.create_access_token(user, db)
return {"access_token": token.access_token}
@user_router.post('/register', status_code=status.HTTP_201_CREATED)
def register(user: UserCreate, db:Session = Depends(database.get_database_session)): # -> models.User
user_exist = db.query(models.User).filter(models.User.email == user.email).first()
if user_exist:
raise HTTPException(status_code=status.HTTP_409_CONFLICT,
detail="User email already exist")
hashed_password = password.get_password_hash(user.password)
print(hashed_password)
userdb = models.User(name=user.name,
email= user.email,
surname = user.surname,
website= user.website,
hashed_password=hashed_password)
db.add(userdb)
db.commit()
db.refresh(userdb)
return userdb
Con la clase OAuth2PasswordRequestForm es inyectada como dependencia de la función para generar el token y permite declarar en un cuerpo de formulario con los siguientes campos:
Claro está, pudieras emplear en su lugar las clases de Pydantic o las clases Body, Form o similares, algo como:
create_token(email: str = Form(), password: str = Form())
Finalmente, son incluidas las rutas anteriores en la aplicación:
api.py
from user import user_router
***
app.include_router(user_router)
***
Y con esto tendremos:
Este material forma parte de mi curso y libro completo sobre FastAPI.
- Andrés Cruz
Desarrollo con Laravel, Django, Flask, CodeIgniter, HTML5, CSS3, MySQL, JavaScript, Vue, Android, iOS, Flutter
Acepto recibir anuncios de interes sobre este Blog.
!Cursos desde!
10$
En Udemy
Quedan 3d 18:19!
!Cursos desde!
4$
En Academia
Ver los cursos!Libros desde!
1$
Ver los libros