Fast Redis
Using Redis with FastAPI
1. Why Redis with FastAPI?
Section titled “1. Why Redis with FastAPI?”- Caching → Reduce DB load, speed up API responses.
- Sessions / Tokens → Store short-lived authentication tokens.
- Rate Limiting → Prevent abuse by tracking requests.
- Background Jobs / Queues → Async task execution (Celery, RQ).
- Pub/Sub → Real-time notifications, chat apps.
2. Setup
Section titled “2. Setup”Install Redis & Python Client
Section titled “Install Redis & Python Client”brew install redis # macOSsudo apt install redis-server # LinuxStart Redis server:
redis-serverInstall Python client:
pip install redis fastapi[all] aioredis3. Connecting Redis in FastAPI
Section titled “3. Connecting Redis in FastAPI”core/redis.py
Section titled “core/redis.py”import redis.asyncio as redis
redis_client = redis.from_url( "redis://localhost:6379", decode_responses=True)👉 Use redis_client.set, get, delete, etc.
4. Example Use Cases
Section titled “4. Example Use Cases”a) Caching API Responses
Section titled “a) Caching API Responses”from fastapi import FastAPI, Dependsfrom app.core.redis import redis_client
app = FastAPI()
@app.get("/items/{item_id}")async def get_item(item_id: int): # Check cache cached = await redis_client.get(f"item:{item_id}") if cached: return {"item": cached, "source": "cache"}
# Simulate DB call data = f"Item data for {item_id}" await redis_client.set(f"item:{item_id}", data, ex=60) # 60s expiry return {"item": data, "source": "db"}b) Storing Session Tokens
Section titled “b) Storing Session Tokens”@app.post("/login")async def login(username: str): session_token = f"token-{username}" await redis_client.set(f"session:{username}", session_token, ex=3600) # 1h session return {"token": session_token}
@app.get("/validate")async def validate(username: str, token: str): saved = await redis_client.get(f"session:{username}") if saved == token: return {"status": "valid"} return {"status": "invalid"}c) Rate Limiting Example
Section titled “c) Rate Limiting Example”from fastapi import HTTPException
@app.get("/limited")async def limited(username: str): key = f"rate:{username}" count = await redis_client.incr(key)
if count == 1: await redis_client.expire(key, 60) # reset after 60s
if count > 5: raise HTTPException(status_code=429, detail="Too many requests")
return {"message": f"Request {count} allowed"}d) Background Tasks with RQ
Section titled “d) Background Tasks with RQ”pip install rqWorker:
# worker.pyimport redisfrom rq import Worker, Queue, Connection
redis_conn = redis.Redis()queue = Queue(connection=redis_conn)
if __name__ == '__main__': with Connection(redis_conn): worker = Worker([queue]) worker.work()Producer in FastAPI:
from rq import Queuefrom redis import Redis
redis_conn = Redis()queue = Queue(connection=redis_conn)
def background_job(x, y): return x + y
@app.post("/task")def create_task(x: int, y: int): job = queue.enqueue(background_job, x, y) return {"job_id": job.get_id()}5. Project Structure
Section titled “5. Project Structure”project/│── app/│ ├── core/│ │ ├── config.py│ │ ├── redis.py # Redis connection│ ││ ├── api/│ │ ├── routes/│ │ │ ├── cache.py # caching endpoints│ │ │ ├── auth.py # sessions with redis│ │ │ ├── limit.py # rate limiting│ │ │ ├── tasks.py # background tasks│ ││ ├── main.py # FastAPI entrypoint││── worker.py # RQ worker6. Best Practices
Section titled “6. Best Practices”✅ Use async redis client (redis.asyncio).
✅ Always set expiry (ex) for cache/session keys.
✅ Use prefixes (item:, session:, rate:) for key organization.
✅ For distributed tasks → use RQ / Celery with Redis.
✅ Use Redis Cluster / Sentinel in production for HA.
✅ Monitor with RedisInsight or Prometheus exporters.