}

Redis Tutorial for Python Developers 2026: Caching, Rate Limiting, Pub/Sub, and Streams

Redis Tutorial for Python Developers 2026: Caching, Rate Limiting, Pub/Sub, and Streams

Redis has become one of the most widely deployed infrastructure components in modern backend systems. Ask any Python engineer what they use Redis for and most will say "caching" — but Redis is far more capable than that. This tutorial walks through four production use cases: API response caching with cache invalidation, a sliding-window rate limiter, pub/sub for real-time cross-instance notifications, and Redis Streams for event sourcing and audit logs. All code examples use redis-py (the official Python client) and redis.asyncio for FastAPI-compatible async code.

Prerequisites: Python 3.11+, Docker (for local Redis), basic familiarity with FastAPI or any Python web framework.


What Is Redis?

Redis (Remote Dictionary Server) is an open-source, in-memory data structure store that can function as a database, cache, message broker, and streaming engine. The key design decisions that make Redis special:

  • In-memory by default — all data lives in RAM, making reads and writes sub-millisecond at typical workloads.
  • Rich data structures — Strings, Lists, Sets, Sorted Sets, Hashes, Bitmaps, HyperLogLogs, Geospatial indexes, and Streams. You are not limited to key/value strings.
  • Optional persistence — Redis is not a pure cache that discards data on restart. It supports RDB snapshots, Append-Only File (AOF) journaling, or both simultaneously.
  • Atomic operations — single-command operations (INCR, LPUSH, ZADD, etc.) are atomic by design; Lua scripts or MULTI/EXEC transactions give you multi-command atomicity.
  • Pub/Sub and Streams — built-in message passing primitives that replace lightweight message brokers for many use cases.

Official documentation: redis.io/docs


Installation

Running Redis with Docker

The fastest way to get a local Redis instance is Docker:

docker run -d -p 6379:6379 --name redis-dev redis:7-alpine

Redis 7 is the current stable major version. The alpine variant keeps the image small (~10 MB). Port 6379 is the default Redis port.

Verify it is running:

docker exec -it redis-dev redis-cli ping
# PONG

redis-cli Basics

redis-cli is the interactive command-line client bundled with every Redis installation:

docker exec -it redis-dev redis-cli

127.0.0.1:6379> SET greeting "hello"
OK
127.0.0.1:6379> GET greeting
"hello"
127.0.0.1:6379> EXPIRE greeting 60
(integer) 1
127.0.0.1:6379> TTL greeting
(integer) 58
127.0.0.1:6379> DEL greeting
(integer) 1
127.0.0.1:6379> EXISTS greeting
(integer) 0

These five commands — SET, GET, EXPIRE, TTL, DEL, EXISTS — cover the majority of basic cache operations.


Installing the Python Client

pip install redis

This installs redis-py, the official Python client maintained by Redis Ltd. It also ships the redis.asyncio submodule for async support (no separate aioredis package is needed since redis-py 4.2).

import redis

r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)

# Basic operations
r.set("framework", "FastAPI")
print(r.get("framework"))       # "FastAPI"

r.set("counter", 0)
r.incr("counter")               # atomic increment → 1
r.incr("counter")               # → 2

r.set("session:abc123", "user:42", ex=3600)   # expires in 1 hour
print(r.ttl("session:abc123"))   # seconds remaining

r.delete("framework")
print(r.exists("framework"))    # 0 (False)

decode_responses=True tells the client to automatically decode byte responses to Python strings, which is almost always what you want.

Connection Pooling

For any server application, use a connection pool rather than a new connection per request:

pool = redis.ConnectionPool(host="localhost", port=6379, db=0, decode_responses=True, max_connections=20)
r = redis.Redis(connection_pool=pool)

Redis Data Types Cheatsheet

Type Key commands Typical use case
String SET, GET, INCR, APPEND, SETEX Cached values, counters, sessions
List LPUSH, RPUSH, LRANGE, LLEN, BRPOP Queues, activity feeds
Set SADD, SMEMBERS, SISMEMBER, SUNION Unique tags, online user sets
Sorted Set ZADD, ZRANGE, ZRANGEBYSCORE, ZCARD Leaderboards, sliding-window rate limit
Hash HSET, HGET, HMGET, HGETALL Object fields without serialization
Stream XADD, XREAD, XREADGROUP Event log, audit trail, message bus

Use Case 1: API Response Caching

The Cache-Aside Pattern

Cache-aside (also called lazy loading) is the most common caching strategy:

  1. Incoming request arrives.
  2. Check the cache. On a hit, return the cached value immediately.
  3. On a miss, fetch from the database, store the result in the cache with a TTL, then return it.
import json
import redis
import psycopg2  # or any DB driver

r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)

def get_user_profile(user_id: int) -> dict:
    cache_key = f"user:{user_id}:profile"

    # 1. Check cache
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)  # cache hit

    # 2. Cache miss — fetch from database
    conn = psycopg2.connect("postgresql://localhost/mydb")
    cur = conn.cursor()
    cur.execute("SELECT id, name, email FROM users WHERE id = %s", (user_id,))
    row = cur.fetchone()
    conn.close()

    if row is None:
        return None

    profile = {"id": row[0], "name": row[1], "email": row[2]}

    # 3. Store in cache with 15-minute TTL
    r.setex(cache_key, 900, json.dumps(profile))

    return profile

Cache Key Design

Good cache key design prevents collisions and makes debugging straightforward. Use a consistent naming convention:

{entity}:{id}:{field_or_view}

Examples:

user:42:profile          # full profile object
user:42:orders:page:1   # paginated order list
product:9901:inventory   # stock count
session:abc123xyz        # session token

Avoid generic keys like "profile" or "data". Always include enough context to make keys unique across the entire application.

Cache Invalidation on Write

Stale cache entries cause subtle bugs. Invalidate (delete) the cache entry whenever the underlying data changes:

def update_user_profile(user_id: int, name: str, email: str) -> None:
    # 1. Update the database (source of truth)
    conn = psycopg2.connect("postgresql://localhost/mydb")
    cur = conn.cursor()
    cur.execute(
        "UPDATE users SET name=%s, email=%s WHERE id=%s",
        (name, email, user_id)
    )
    conn.commit()
    conn.close()

    # 2. Invalidate the cache — next read will repopulate it
    cache_key = f"user:{user_id}:profile"
    r.delete(cache_key)


def delete_user(user_id: int) -> None:
    conn = psycopg2.connect("postgresql://localhost/mydb")
    cur = conn.cursor()
    cur.execute("DELETE FROM users WHERE id=%s", (user_id,))
    conn.commit()
    conn.close()

    # Remove all cache keys related to this user
    r.delete(f"user:{user_id}:profile")
    r.delete(f"user:{user_id}:orders:page:1")

Decorator Pattern for Easy Caching

Wrapping the cache-aside logic in a decorator keeps your business functions clean:

import functools
import hashlib

def redis_cache(ttl: int = 300, key_prefix: str = "cache"):
    """Cache the return value of a function in Redis."""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # Build a stable cache key from function name + arguments
            raw_key = f"{key_prefix}:{func.__name__}:{args}:{sorted(kwargs.items())}"
            cache_key = hashlib.md5(raw_key.encode()).hexdigest()

            cached = r.get(cache_key)
            if cached:
                return json.loads(cached)

            result = func(*args, **kwargs)
            r.setex(cache_key, ttl, json.dumps(result))
            return result
        return wrapper
    return decorator


@redis_cache(ttl=600, key_prefix="user")
def get_user_orders(user_id: int, page: int = 1) -> list:
    # ... database query here
    return []

Use Case 2: Rate Limiting

Rate limiting protects your API from abuse, controls costs, and ensures fair use. Redis is the standard tool for distributed rate limiting because it provides atomic counters accessible from every application instance.

Fixed Window Counter

The simplest approach uses INCR and EXPIRE:

def is_allowed_fixed_window(user_id: str, limit: int = 100, window_seconds: int = 60) -> bool:
    key = f"ratelimit:fixed:{user_id}"
    count = r.incr(key)           # atomic increment; creates key if absent
    if count == 1:
        r.expire(key, window_seconds)   # set expiry only on first request
    return count <= limit

Limitation: A user can send limit requests in the last second of window N and limit requests in the first second of window N+1, effectively doubling the allowed rate across the window boundary.

Sliding Window with Sorted Sets

A sliding window avoids the boundary problem by tracking individual request timestamps in a sorted set:

import time

def is_allowed_sliding_window(user_id: str, limit: int = 100, window_seconds: int = 60) -> bool:
    key = f"ratelimit:sliding:{user_id}"
    now = time.time()
    window_start = now - window_seconds

    pipe = r.pipeline()
    # Remove timestamps older than the window
    pipe.zremrangebyscore(key, "-inf", window_start)
    # Add the current request timestamp (score = timestamp, member = unique identifier)
    pipe.zadd(key, {str(now): now})
    # Count requests in the window
    pipe.zcard(key)
    # Reset expiry to avoid orphaned keys
    pipe.expire(key, window_seconds + 1)
    results = pipe.execute()

    request_count = results[2]
    return request_count <= limit

Using a pipeline() sends all four commands in a single round-trip, which is critical for performance on rate-limit checks that happen on every request.

FastAPI Middleware Integration

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse

app = FastAPI()

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    # Identify user by IP or auth token
    user_id = request.client.host

    if not is_allowed_sliding_window(user_id, limit=60, window_seconds=60):
        return JSONResponse(
            status_code=429,
            content={"detail": "Rate limit exceeded. Try again later."},
            headers={"Retry-After": "60"},
        )

    response = await call_next(request)
    return response

Use Case 3: Pub/Sub for Real-Time Notifications

Redis Pub/Sub lets processes publish messages to named channels and receive them in subscribers. It is ideal for lightweight fan-out scenarios: broadcasting events, invalidating cache across app instances, or sending real-time notifications.

Publishing a Message

def publish_cache_invalidation(user_id: int) -> None:
    channel = "cache:invalidate:user"
    message = json.dumps({"user_id": user_id})
    r.publish(channel, message)
    print(f"Published cache invalidation for user {user_id}")

Subscribing in a Background Thread

import threading

def cache_invalidation_listener():
    """Run in a background thread; listens for invalidation events."""
    subscriber = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
    pubsub = subscriber.pubsub()
    pubsub.subscribe("cache:invalidate:user")

    print("Cache invalidation listener started.")
    for message in pubsub.listen():
        if message["type"] != "message":
            continue
        data = json.loads(message["data"])
        user_id = data["user_id"]
        # Delete the local cache entry
        subscriber.delete(f"user:{user_id}:profile")
        print(f"Cache invalidated for user {user_id}")


# Start the listener when the application boots
listener_thread = threading.Thread(target=cache_invalidation_listener, daemon=True)
listener_thread.start()

Cross-Instance Cache Invalidation

When you run multiple instances of a service behind a load balancer, each instance has its own in-process state (e.g., a local LRU cache or even a Redis connection reading from a replica). Pub/Sub solves the coordination problem:

  1. Instance A receives a PUT /users/42 request and updates the database.
  2. Instance A calls r.publish("cache:invalidate:user", '{"user_id": 42}').
  3. All running instances (A, B, C, …) receive the message and delete their local cached copy.
  4. The next read on any instance triggers a fresh database fetch.

This pattern is sometimes called "cache invalidation via message bus" and is far simpler than trying to synchronize caches with a distributed lock.


Use Case 4: Redis Streams for Event Sourcing and Audit Logs

Redis Streams (introduced in Redis 5.0) provide a durable, append-only log structure similar to Apache Kafka, but built into Redis. Each entry has an auto-generated ID (millisecond timestamp + sequence number), key-value fields, and is retained until explicitly trimmed.

Writing Events with XADD

def log_user_event(user_id: int, event_type: str, metadata: dict) -> str:
    stream_key = "events:user"
    entry_id = r.xadd(
        stream_key,
        {
            "user_id": str(user_id),
            "event": event_type,
            "metadata": json.dumps(metadata),
            "ts": str(time.time()),
        },
        maxlen=10_000,   # trim to last 10,000 entries (approximate)
        approximate=True,
    )
    return entry_id  # e.g., "1715510400000-0"


# Log a profile update
log_user_event(42, "profile.updated", {"fields_changed": ["email"]})
log_user_event(42, "password.changed", {})

Reading Events with XREAD

def read_recent_events(stream_key: str = "events:user", count: int = 50) -> list:
    # Read up to `count` entries from the beginning
    entries = r.xread({stream_key: "0-0"}, count=count)
    if not entries:
        return []

    results = []
    for stream_name, messages in entries:
        for entry_id, fields in messages:
            results.append({"id": entry_id, **fields})
    return results

Consumer Groups for Parallel Processing

Consumer groups let multiple workers share the workload — each message is delivered to exactly one worker in the group:

# Create the consumer group once (usually at application startup)
try:
    r.xgroup_create("events:user", "audit-processors", id="0", mkstream=True)
except redis.exceptions.ResponseError:
    pass  # group already exists

def process_events_worker(worker_name: str):
    while True:
        # Read up to 10 undelivered messages for this worker
        messages = r.xreadgroup(
            "audit-processors",
            worker_name,
            {"events:user": ">"},
            count=10,
            block=5000,  # block for 5 seconds if no messages
        )
        if not messages:
            continue

        for stream_name, entries in messages:
            for entry_id, fields in entries:
                print(f"[{worker_name}] Processing event {entry_id}: {fields}")
                # Acknowledge the message after successful processing
                r.xack("events:user", "audit-processors", entry_id)

Streams vs. Pub/Sub: Pub/Sub messages are ephemeral — if no subscriber is listening when a message is published, it is lost. Streams persist messages and support replay, making them suitable for audit logs, event sourcing, and reliable task queues.


Async Support with redis.asyncio

For FastAPI and other asyncio-based frameworks, use the redis.asyncio interface:

import redis.asyncio as aioredis
from fastapi import FastAPI, Depends

app = FastAPI()

# Create an async Redis client (singleton, module-level)
async_redis: aioredis.Redis = aioredis.Redis(
    host="localhost",
    port=6379,
    db=0,
    decode_responses=True,
)

async def get_redis() -> aioredis.Redis:
    return async_redis


@app.get("/users/{user_id}")
async def get_user(user_id: int, redis: aioredis.Redis = Depends(get_redis)):
    cache_key = f"user:{user_id}:profile"
    cached = await redis.get(cache_key)
    if cached:
        return json.loads(cached)

    # Simulate DB fetch
    profile = {"id": user_id, "name": "Ada Lovelace", "email": "[email protected]"}
    await redis.setex(cache_key, 900, json.dumps(profile))
    return profile


@app.on_event("shutdown")
async def shutdown():
    await async_redis.aclose()

The async client uses the same API as the synchronous client — every method just needs to be awaited. Connection pooling is handled automatically.

Async Pipeline

async def async_sliding_window(redis: aioredis.Redis, user_id: str, limit: int = 100) -> bool:
    key = f"ratelimit:sliding:{user_id}"
    now = time.time()
    window_start = now - 60

    async with redis.pipeline() as pipe:
        await pipe.zremrangebyscore(key, "-inf", window_start)
        await pipe.zadd(key, {str(now): now})
        await pipe.zcard(key)
        await pipe.expire(key, 61)
        results = await pipe.execute()

    return results[2] <= limit

Production Tips

Connection Pooling

Always use a connection pool in production. The default pool size is 10; tune it based on your concurrency:

pool = redis.ConnectionPool(
    host="redis-prod.internal",
    port=6379,
    db=0,
    decode_responses=True,
    max_connections=50,
    socket_connect_timeout=2,
    socket_timeout=2,
)
r = redis.Redis(connection_pool=pool)

For async:

async_redis = aioredis.Redis.from_pool(
    aioredis.ConnectionPool(host="redis-prod.internal", port=6379, max_connections=50)
)

maxmemory Policy

When Redis reaches its configured memory limit, it needs to know which keys to evict. Set the policy in redis.conf or at runtime:

redis-cli CONFIG SET maxmemory 2gb
redis-cli CONFIG SET maxmemory-policy allkeys-lru

Common policies:

Policy Behavior
noeviction Return errors when memory is full. Safe for primary DBs.
allkeys-lru Evict least recently used keys from all keys. Good for caches.
volatile-lru Evict LRU keys that have an expiry set. Protects persistent data.
allkeys-lfu Evict least frequently used keys (Redis 4+).

For pure caching workloads, allkeys-lru or allkeys-lfu are the right choices.

Persistence: RDB vs AOF

Redis offers two persistence mechanisms:

RDB (Redis Database Snapshots) - Periodically saves a point-in-time snapshot of the dataset to disk. - Fast to restore; compact files. - Risk: data written since the last snapshot is lost on crash. - Enable in redis.conf: save 900 1 (snapshot if at least 1 key changed in 900 seconds).

AOF (Append-Only File) - Logs every write command. On restart, Redis replays the log. - More durable (configure appendfsync everysec for at-most-1-second data loss). - Larger files, slower startup on very large datasets. - Enable: appendonly yes.

Recommendation: For caching-only workloads, you can disable persistence entirely (save "", appendonly no) since losing the cache on restart is acceptable. For Streams-based event logs or session storage that must survive restarts, enable AOF with appendfsync everysec.

Health Checks and Monitoring

try:
    r.ping()
except redis.exceptions.ConnectionError as exc:
    print(f"Redis unavailable: {exc}")
    # fall back to database or return 503

Key metrics to monitor:

  • INFO memoryused_memory_human, mem_fragmentation_ratio
  • INFO statsinstantaneous_ops_per_sec, evicted_keys, keyspace_hits, keyspace_misses
  • Hit rate formula: keyspace_hits / (keyspace_hits + keyspace_misses)

A hit rate below 80% suggests your TTLs are too short or your key design is causing cache thrashing.


Summary

This tutorial covered four production-grade use cases for Redis in Python applications:

  1. API Response Caching — cache-aside pattern, TTL expiration, cache invalidation on write, decorator-based caching, and cache key namespacing conventions.
  2. Rate Limiting — fixed-window counters with INCR/EXPIRE and a more accurate sliding-window implementation using sorted sets (ZADD, ZREMRANGEBYSCORE, ZCARD), integrated as FastAPI middleware.
  3. Pub/Sub for Real-Time Notifications — publishing and subscribing to channels, running a subscriber in a background thread, and using pub/sub to invalidate caches across multiple application instances.
  4. Redis Streams — append-only event log with XADD/XREAD, consumer groups for parallel processing with XREADGROUP/XACK, and a comparison with Pub/Sub.

Beyond these four, Redis supports many other patterns: distributed locks (via SET NX EX), full-text search (RediSearch module), time-series data (RedisTimeSeries), and vector similarity search — making it one of the most versatile infrastructure components available.

Further Reading