Redis Tutorial for Python Developers 2026: Caching, Rate Limiting, Pub/Sub, and Streams
Redis has become one of the most widely deployed infrastructure components in modern backend systems. Ask any Python engineer what they use Redis for and most will say "caching" — but Redis is far more capable than that. This tutorial walks through four production use cases: API response caching with cache invalidation, a sliding-window rate limiter, pub/sub for real-time cross-instance notifications, and Redis Streams for event sourcing and audit logs. All code examples use redis-py (the official Python client) and redis.asyncio for FastAPI-compatible async code.
Prerequisites: Python 3.11+, Docker (for local Redis), basic familiarity with FastAPI or any Python web framework.
What Is Redis?
Redis (Remote Dictionary Server) is an open-source, in-memory data structure store that can function as a database, cache, message broker, and streaming engine. The key design decisions that make Redis special:
- In-memory by default — all data lives in RAM, making reads and writes sub-millisecond at typical workloads.
- Rich data structures — Strings, Lists, Sets, Sorted Sets, Hashes, Bitmaps, HyperLogLogs, Geospatial indexes, and Streams. You are not limited to key/value strings.
- Optional persistence — Redis is not a pure cache that discards data on restart. It supports RDB snapshots, Append-Only File (AOF) journaling, or both simultaneously.
- Atomic operations — single-command operations (INCR, LPUSH, ZADD, etc.) are atomic by design; Lua scripts or MULTI/EXEC transactions give you multi-command atomicity.
- Pub/Sub and Streams — built-in message passing primitives that replace lightweight message brokers for many use cases.
Official documentation: redis.io/docs
Installation
Running Redis with Docker
The fastest way to get a local Redis instance is Docker:
docker run -d -p 6379:6379 --name redis-dev redis:7-alpine
Redis 7 is the current stable major version. The alpine variant keeps the image small (~10 MB). Port 6379 is the default Redis port.
Verify it is running:
docker exec -it redis-dev redis-cli ping
# PONG
redis-cli Basics
redis-cli is the interactive command-line client bundled with every Redis installation:
docker exec -it redis-dev redis-cli
127.0.0.1:6379> SET greeting "hello"
OK
127.0.0.1:6379> GET greeting
"hello"
127.0.0.1:6379> EXPIRE greeting 60
(integer) 1
127.0.0.1:6379> TTL greeting
(integer) 58
127.0.0.1:6379> DEL greeting
(integer) 1
127.0.0.1:6379> EXISTS greeting
(integer) 0
These five commands — SET, GET, EXPIRE, TTL, DEL, EXISTS — cover the majority of basic cache operations.
Installing the Python Client
pip install redis
This installs redis-py, the official Python client maintained by Redis Ltd. It also ships the redis.asyncio submodule for async support (no separate aioredis package is needed since redis-py 4.2).
import redis
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
# Basic operations
r.set("framework", "FastAPI")
print(r.get("framework")) # "FastAPI"
r.set("counter", 0)
r.incr("counter") # atomic increment → 1
r.incr("counter") # → 2
r.set("session:abc123", "user:42", ex=3600) # expires in 1 hour
print(r.ttl("session:abc123")) # seconds remaining
r.delete("framework")
print(r.exists("framework")) # 0 (False)
decode_responses=True tells the client to automatically decode byte responses to Python strings, which is almost always what you want.
Connection Pooling
For any server application, use a connection pool rather than a new connection per request:
pool = redis.ConnectionPool(host="localhost", port=6379, db=0, decode_responses=True, max_connections=20)
r = redis.Redis(connection_pool=pool)
Redis Data Types Cheatsheet
| Type | Key commands | Typical use case |
|---|---|---|
| String | SET, GET, INCR, APPEND, SETEX | Cached values, counters, sessions |
| List | LPUSH, RPUSH, LRANGE, LLEN, BRPOP | Queues, activity feeds |
| Set | SADD, SMEMBERS, SISMEMBER, SUNION | Unique tags, online user sets |
| Sorted Set | ZADD, ZRANGE, ZRANGEBYSCORE, ZCARD | Leaderboards, sliding-window rate limit |
| Hash | HSET, HGET, HMGET, HGETALL | Object fields without serialization |
| Stream | XADD, XREAD, XREADGROUP | Event log, audit trail, message bus |
Use Case 1: API Response Caching
The Cache-Aside Pattern
Cache-aside (also called lazy loading) is the most common caching strategy:
- Incoming request arrives.
- Check the cache. On a hit, return the cached value immediately.
- On a miss, fetch from the database, store the result in the cache with a TTL, then return it.
import json
import redis
import psycopg2 # or any DB driver
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
def get_user_profile(user_id: int) -> dict:
cache_key = f"user:{user_id}:profile"
# 1. Check cache
cached = r.get(cache_key)
if cached:
return json.loads(cached) # cache hit
# 2. Cache miss — fetch from database
conn = psycopg2.connect("postgresql://localhost/mydb")
cur = conn.cursor()
cur.execute("SELECT id, name, email FROM users WHERE id = %s", (user_id,))
row = cur.fetchone()
conn.close()
if row is None:
return None
profile = {"id": row[0], "name": row[1], "email": row[2]}
# 3. Store in cache with 15-minute TTL
r.setex(cache_key, 900, json.dumps(profile))
return profile
Cache Key Design
Good cache key design prevents collisions and makes debugging straightforward. Use a consistent naming convention:
{entity}:{id}:{field_or_view}
Examples:
user:42:profile # full profile object
user:42:orders:page:1 # paginated order list
product:9901:inventory # stock count
session:abc123xyz # session token
Avoid generic keys like "profile" or "data". Always include enough context to make keys unique across the entire application.
Cache Invalidation on Write
Stale cache entries cause subtle bugs. Invalidate (delete) the cache entry whenever the underlying data changes:
def update_user_profile(user_id: int, name: str, email: str) -> None:
# 1. Update the database (source of truth)
conn = psycopg2.connect("postgresql://localhost/mydb")
cur = conn.cursor()
cur.execute(
"UPDATE users SET name=%s, email=%s WHERE id=%s",
(name, email, user_id)
)
conn.commit()
conn.close()
# 2. Invalidate the cache — next read will repopulate it
cache_key = f"user:{user_id}:profile"
r.delete(cache_key)
def delete_user(user_id: int) -> None:
conn = psycopg2.connect("postgresql://localhost/mydb")
cur = conn.cursor()
cur.execute("DELETE FROM users WHERE id=%s", (user_id,))
conn.commit()
conn.close()
# Remove all cache keys related to this user
r.delete(f"user:{user_id}:profile")
r.delete(f"user:{user_id}:orders:page:1")
Decorator Pattern for Easy Caching
Wrapping the cache-aside logic in a decorator keeps your business functions clean:
import functools
import hashlib
def redis_cache(ttl: int = 300, key_prefix: str = "cache"):
"""Cache the return value of a function in Redis."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Build a stable cache key from function name + arguments
raw_key = f"{key_prefix}:{func.__name__}:{args}:{sorted(kwargs.items())}"
cache_key = hashlib.md5(raw_key.encode()).hexdigest()
cached = r.get(cache_key)
if cached:
return json.loads(cached)
result = func(*args, **kwargs)
r.setex(cache_key, ttl, json.dumps(result))
return result
return wrapper
return decorator
@redis_cache(ttl=600, key_prefix="user")
def get_user_orders(user_id: int, page: int = 1) -> list:
# ... database query here
return []
Use Case 2: Rate Limiting
Rate limiting protects your API from abuse, controls costs, and ensures fair use. Redis is the standard tool for distributed rate limiting because it provides atomic counters accessible from every application instance.
Fixed Window Counter
The simplest approach uses INCR and EXPIRE:
def is_allowed_fixed_window(user_id: str, limit: int = 100, window_seconds: int = 60) -> bool:
key = f"ratelimit:fixed:{user_id}"
count = r.incr(key) # atomic increment; creates key if absent
if count == 1:
r.expire(key, window_seconds) # set expiry only on first request
return count <= limit
Limitation: A user can send limit requests in the last second of window N and limit requests in the first second of window N+1, effectively doubling the allowed rate across the window boundary.
Sliding Window with Sorted Sets
A sliding window avoids the boundary problem by tracking individual request timestamps in a sorted set:
import time
def is_allowed_sliding_window(user_id: str, limit: int = 100, window_seconds: int = 60) -> bool:
key = f"ratelimit:sliding:{user_id}"
now = time.time()
window_start = now - window_seconds
pipe = r.pipeline()
# Remove timestamps older than the window
pipe.zremrangebyscore(key, "-inf", window_start)
# Add the current request timestamp (score = timestamp, member = unique identifier)
pipe.zadd(key, {str(now): now})
# Count requests in the window
pipe.zcard(key)
# Reset expiry to avoid orphaned keys
pipe.expire(key, window_seconds + 1)
results = pipe.execute()
request_count = results[2]
return request_count <= limit
Using a pipeline() sends all four commands in a single round-trip, which is critical for performance on rate-limit checks that happen on every request.
FastAPI Middleware Integration
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
app = FastAPI()
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
# Identify user by IP or auth token
user_id = request.client.host
if not is_allowed_sliding_window(user_id, limit=60, window_seconds=60):
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded. Try again later."},
headers={"Retry-After": "60"},
)
response = await call_next(request)
return response
Use Case 3: Pub/Sub for Real-Time Notifications
Redis Pub/Sub lets processes publish messages to named channels and receive them in subscribers. It is ideal for lightweight fan-out scenarios: broadcasting events, invalidating cache across app instances, or sending real-time notifications.
Publishing a Message
def publish_cache_invalidation(user_id: int) -> None:
channel = "cache:invalidate:user"
message = json.dumps({"user_id": user_id})
r.publish(channel, message)
print(f"Published cache invalidation for user {user_id}")
Subscribing in a Background Thread
import threading
def cache_invalidation_listener():
"""Run in a background thread; listens for invalidation events."""
subscriber = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
pubsub = subscriber.pubsub()
pubsub.subscribe("cache:invalidate:user")
print("Cache invalidation listener started.")
for message in pubsub.listen():
if message["type"] != "message":
continue
data = json.loads(message["data"])
user_id = data["user_id"]
# Delete the local cache entry
subscriber.delete(f"user:{user_id}:profile")
print(f"Cache invalidated for user {user_id}")
# Start the listener when the application boots
listener_thread = threading.Thread(target=cache_invalidation_listener, daemon=True)
listener_thread.start()
Cross-Instance Cache Invalidation
When you run multiple instances of a service behind a load balancer, each instance has its own in-process state (e.g., a local LRU cache or even a Redis connection reading from a replica). Pub/Sub solves the coordination problem:
- Instance A receives a
PUT /users/42request and updates the database. - Instance A calls
r.publish("cache:invalidate:user", '{"user_id": 42}'). - All running instances (A, B, C, …) receive the message and delete their local cached copy.
- The next read on any instance triggers a fresh database fetch.
This pattern is sometimes called "cache invalidation via message bus" and is far simpler than trying to synchronize caches with a distributed lock.
Use Case 4: Redis Streams for Event Sourcing and Audit Logs
Redis Streams (introduced in Redis 5.0) provide a durable, append-only log structure similar to Apache Kafka, but built into Redis. Each entry has an auto-generated ID (millisecond timestamp + sequence number), key-value fields, and is retained until explicitly trimmed.
Writing Events with XADD
def log_user_event(user_id: int, event_type: str, metadata: dict) -> str:
stream_key = "events:user"
entry_id = r.xadd(
stream_key,
{
"user_id": str(user_id),
"event": event_type,
"metadata": json.dumps(metadata),
"ts": str(time.time()),
},
maxlen=10_000, # trim to last 10,000 entries (approximate)
approximate=True,
)
return entry_id # e.g., "1715510400000-0"
# Log a profile update
log_user_event(42, "profile.updated", {"fields_changed": ["email"]})
log_user_event(42, "password.changed", {})
Reading Events with XREAD
def read_recent_events(stream_key: str = "events:user", count: int = 50) -> list:
# Read up to `count` entries from the beginning
entries = r.xread({stream_key: "0-0"}, count=count)
if not entries:
return []
results = []
for stream_name, messages in entries:
for entry_id, fields in messages:
results.append({"id": entry_id, **fields})
return results
Consumer Groups for Parallel Processing
Consumer groups let multiple workers share the workload — each message is delivered to exactly one worker in the group:
# Create the consumer group once (usually at application startup)
try:
r.xgroup_create("events:user", "audit-processors", id="0", mkstream=True)
except redis.exceptions.ResponseError:
pass # group already exists
def process_events_worker(worker_name: str):
while True:
# Read up to 10 undelivered messages for this worker
messages = r.xreadgroup(
"audit-processors",
worker_name,
{"events:user": ">"},
count=10,
block=5000, # block for 5 seconds if no messages
)
if not messages:
continue
for stream_name, entries in messages:
for entry_id, fields in entries:
print(f"[{worker_name}] Processing event {entry_id}: {fields}")
# Acknowledge the message after successful processing
r.xack("events:user", "audit-processors", entry_id)
Streams vs. Pub/Sub: Pub/Sub messages are ephemeral — if no subscriber is listening when a message is published, it is lost. Streams persist messages and support replay, making them suitable for audit logs, event sourcing, and reliable task queues.
Async Support with redis.asyncio
For FastAPI and other asyncio-based frameworks, use the redis.asyncio interface:
import redis.asyncio as aioredis
from fastapi import FastAPI, Depends
app = FastAPI()
# Create an async Redis client (singleton, module-level)
async_redis: aioredis.Redis = aioredis.Redis(
host="localhost",
port=6379,
db=0,
decode_responses=True,
)
async def get_redis() -> aioredis.Redis:
return async_redis
@app.get("/users/{user_id}")
async def get_user(user_id: int, redis: aioredis.Redis = Depends(get_redis)):
cache_key = f"user:{user_id}:profile"
cached = await redis.get(cache_key)
if cached:
return json.loads(cached)
# Simulate DB fetch
profile = {"id": user_id, "name": "Ada Lovelace", "email": "[email protected]"}
await redis.setex(cache_key, 900, json.dumps(profile))
return profile
@app.on_event("shutdown")
async def shutdown():
await async_redis.aclose()
The async client uses the same API as the synchronous client — every method just needs to be awaited. Connection pooling is handled automatically.
Async Pipeline
async def async_sliding_window(redis: aioredis.Redis, user_id: str, limit: int = 100) -> bool:
key = f"ratelimit:sliding:{user_id}"
now = time.time()
window_start = now - 60
async with redis.pipeline() as pipe:
await pipe.zremrangebyscore(key, "-inf", window_start)
await pipe.zadd(key, {str(now): now})
await pipe.zcard(key)
await pipe.expire(key, 61)
results = await pipe.execute()
return results[2] <= limit
Production Tips
Connection Pooling
Always use a connection pool in production. The default pool size is 10; tune it based on your concurrency:
pool = redis.ConnectionPool(
host="redis-prod.internal",
port=6379,
db=0,
decode_responses=True,
max_connections=50,
socket_connect_timeout=2,
socket_timeout=2,
)
r = redis.Redis(connection_pool=pool)
For async:
async_redis = aioredis.Redis.from_pool(
aioredis.ConnectionPool(host="redis-prod.internal", port=6379, max_connections=50)
)
maxmemory Policy
When Redis reaches its configured memory limit, it needs to know which keys to evict. Set the policy in redis.conf or at runtime:
redis-cli CONFIG SET maxmemory 2gb
redis-cli CONFIG SET maxmemory-policy allkeys-lru
Common policies:
| Policy | Behavior |
|---|---|
noeviction |
Return errors when memory is full. Safe for primary DBs. |
allkeys-lru |
Evict least recently used keys from all keys. Good for caches. |
volatile-lru |
Evict LRU keys that have an expiry set. Protects persistent data. |
allkeys-lfu |
Evict least frequently used keys (Redis 4+). |
For pure caching workloads, allkeys-lru or allkeys-lfu are the right choices.
Persistence: RDB vs AOF
Redis offers two persistence mechanisms:
RDB (Redis Database Snapshots)
- Periodically saves a point-in-time snapshot of the dataset to disk.
- Fast to restore; compact files.
- Risk: data written since the last snapshot is lost on crash.
- Enable in redis.conf: save 900 1 (snapshot if at least 1 key changed in 900 seconds).
AOF (Append-Only File)
- Logs every write command. On restart, Redis replays the log.
- More durable (configure appendfsync everysec for at-most-1-second data loss).
- Larger files, slower startup on very large datasets.
- Enable: appendonly yes.
Recommendation: For caching-only workloads, you can disable persistence entirely (save "", appendonly no) since losing the cache on restart is acceptable. For Streams-based event logs or session storage that must survive restarts, enable AOF with appendfsync everysec.
Health Checks and Monitoring
try:
r.ping()
except redis.exceptions.ConnectionError as exc:
print(f"Redis unavailable: {exc}")
# fall back to database or return 503
Key metrics to monitor:
INFO memory→used_memory_human,mem_fragmentation_ratioINFO stats→instantaneous_ops_per_sec,evicted_keys,keyspace_hits,keyspace_misses- Hit rate formula:
keyspace_hits / (keyspace_hits + keyspace_misses)
A hit rate below 80% suggests your TTLs are too short or your key design is causing cache thrashing.
Summary
This tutorial covered four production-grade use cases for Redis in Python applications:
- API Response Caching — cache-aside pattern, TTL expiration, cache invalidation on write, decorator-based caching, and cache key namespacing conventions.
- Rate Limiting — fixed-window counters with
INCR/EXPIREand a more accurate sliding-window implementation using sorted sets (ZADD,ZREMRANGEBYSCORE,ZCARD), integrated as FastAPI middleware. - Pub/Sub for Real-Time Notifications — publishing and subscribing to channels, running a subscriber in a background thread, and using pub/sub to invalidate caches across multiple application instances.
- Redis Streams — append-only event log with
XADD/XREAD, consumer groups for parallel processing withXREADGROUP/XACK, and a comparison with Pub/Sub.
Beyond these four, Redis supports many other patterns: distributed locks (via SET NX EX), full-text search (RediSearch module), time-series data (RedisTimeSeries), and vector similarity search — making it one of the most versatile infrastructure components available.
Further Reading
- Redis Official Documentation — comprehensive reference for every command and configuration option.
- redis-py Documentation — Python client API reference, connection pool configuration, and async guide.
- Redis University — free, self-paced courses covering Redis fundamentals through advanced patterns, including a Python-focused course.
- Redis Streams Introduction — deep dive into Streams, consumer groups, and delivery guarantees.