}

SQLAlchemy Bulk Insert: The Complete Performance Guide

Introduction

When you need to insert thousands or millions of rows into a database, the default SQLAlchemy ORM approach becomes painfully slow. A naive loop inserting one row at a time can take minutes for what should be a seconds-long operation.

This guide covers every bulk insert method in SQLAlchemy, from ORM conveniences to raw Core performance, with real benchmarks to help you choose the right approach.

Why Bulk Insert Matters: The Performance Gap

Consider inserting 100,000 rows. Here's what you might see:

Method Time Relative Speed
Individual session.add() in loop ~120s 1x (baseline)
bulk_save_objects() ~15s 8x faster
bulk_insert_mappings() ~8s 15x faster
Core insert().values() ~3s 40x faster
PostgreSQL COPY ~0.5s 240x faster

The difference is dramatic. Let's explore each method.

Setting Up Our Example

All examples use this model:

from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
from datetime import datetime

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    username = Column(String(100), unique=True, nullable=False)
    email = Column(String(255))
    created_at = Column(DateTime, default=datetime.utcnow)
    role_id = Column(Integer, ForeignKey('roles.id'))

    role = relationship('Role', back_populates='users')

class Role(Base):
    __tablename__ = 'roles'

    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True)

    users = relationship('User', back_populates='role')

# Database setup
engine = create_engine('postgresql://user:pass@localhost/mydb')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

Method 1: bulk_insert_mappings() — Fast Dictionary Inserts

The bulk_insert_mappings() method accepts a list of dictionaries. Dictionary keys must match model column names.

def insert_with_mappings(session, users_data):
    """Insert using bulk_insert_mappings - recommended for most cases."""
    session.bulk_insert_mappings(User, users_data)
    session.commit()

# Usage
users_to_insert = [
    {'username': 'john', 'email': '[email protected]', 'role_id': 1},
    {'username': 'mary', 'email': '[email protected]', 'role_id': 2},
    {'username': 'susan', 'email': '[email protected]', 'role_id': 1},
]
insert_with_mappings(session, users_to_insert)

Pros: - Fast - skips ORM object instantiation overhead - Memory efficient - dictionaries use less RAM than ORM objects - Clean syntax for data from external sources (CSV, JSON, APIs)

Cons: - No relationship handling - No default value population (except database defaults) - Objects not tracked by session after insert

Handling Relationships with Mappings

For relationships, use foreign key IDs directly:

# First insert roles
roles = [{'name': 'admin'}, {'name': 'user'}]
session.bulk_insert_mappings(Role, roles)
session.commit()

# Get role IDs
admin_id = session.query(Role).filter_by(name='admin').first().id

# Insert users with role_id
users = [
    {'username': 'john', 'role_id': admin_id},
    {'username': 'mary', 'role_id': admin_id},
]
session.bulk_insert_mappings(User, users)
session.commit()

Method 2: bulk_save_objects() — ORM Object Bulk Insert

When you already have ORM objects or need SQLAlchemy features like defaults:

def insert_with_objects(session, users):
    """Insert using bulk_save_objects - when you have ORM instances."""
    session.bulk_save_objects(users)
    session.commit()

# Usage
users = [
    User(username='alice', email='[email protected]'),
    User(username='bob', email='[email protected]'),
    User(username='charlie', email='[email protected]'),
]
insert_with_objects(session, users)

Getting Generated IDs Back

If you need the generated primary keys:

users = [User(username='test1'), User(username='test2')]
session.bulk_save_objects(users, return_defaults=True)
session.commit()

for user in users:
    print(f"Created user with ID: {user.id}")

Warning: return_defaults=True significantly reduces performance because it requires individual INSERT statements to retrieve each ID.

Method 3: Core insert().values() — The Fastest ORM-Compatible Method

SQLAlchemy Core provides the fastest insert while staying within SQLAlchemy:

from sqlalchemy import insert

def insert_with_core(session, users_data):
    """Insert using Core - fastest SQLAlchemy method."""
    stmt = insert(User).values(users_data)
    session.execute(stmt)
    session.commit()

# Usage
users_data = [
    {'username': 'user1', 'email': '[email protected]'},
    {'username': 'user2', 'email': '[email protected]'},
    {'username': 'user3', 'email': '[email protected]'},
]
insert_with_core(session, users_data)

Alternative: Using connection.execute()

For maximum performance, bypass the session entirely:

from sqlalchemy import insert

def insert_with_connection(engine, users_data):
    """Insert using connection directly - minimal overhead."""
    with engine.connect() as conn:
        stmt = insert(User).values(users_data)
        conn.execute(stmt)
        conn.commit()

# Or using the table directly
def insert_with_table(engine, users_data):
    """Insert using table object."""
    with engine.connect() as conn:
        conn.execute(User.__table__.insert(), users_data)
        conn.commit()

PostgreSQL: Using RETURNING

Get inserted IDs with PostgreSQL's RETURNING clause:

from sqlalchemy import insert

def insert_with_returning(session, users_data):
    """Insert and return generated IDs (PostgreSQL)."""
    stmt = insert(User).values(users_data).returning(User.id, User.username)
    result = session.execute(stmt)
    session.commit()

    for row in result:
        print(f"Inserted: id={row.id}, username={row.username}")

Method 4: executemany() with Batching — Ultimate Control

For maximum control over batching:

from sqlalchemy import text

def insert_with_executemany(engine, users_data, batch_size=10000):
    """Insert with explicit batching using executemany."""
    with engine.connect() as conn:
        for i in range(0, len(users_data), batch_size):
            batch = users_data[i:i + batch_size]
            conn.execute(
                text("INSERT INTO users (username, email) VALUES (:username, :email)"),
                batch
            )
        conn.commit()

Performance Comparison: Real Benchmarks

Here's a benchmark inserting 100,000 rows into PostgreSQL:

import time
from sqlalchemy import insert

def benchmark(name, func, *args):
    start = time.perf_counter()
    func(*args)
    elapsed = time.perf_counter() - start
    print(f"{name}: {elapsed:.2f}s")

# Generate test data
test_data = [{'username': f'user_{i}', 'email': f'user_{i}@test.com'} 
             for i in range(100_000)]

# Run benchmarks
benchmark("Individual adds", insert_individual, session, test_data)      # ~120s
benchmark("bulk_save_objects", insert_bulk_objects, session, test_data)  # ~15s  
benchmark("bulk_insert_mappings", insert_bulk_mappings, session, test_data)  # ~8s
benchmark("Core insert().values()", insert_core, session, test_data)     # ~3s
benchmark("PostgreSQL COPY", insert_copy, engine, test_data)             # ~0.5s

Results Summary

Method 100K Rows Memory Usage ORM Features
Individual session.add() 120s High Full
bulk_save_objects() 15s Medium Partial
bulk_insert_mappings() 8s Low Minimal
Core insert().values() 3s Very Low None
PostgreSQL COPY 0.5s Minimal None

Batch Size Considerations

Batch size significantly impacts performance. Here's how to choose:

def insert_batched(session, data, batch_size=10000):
    """Insert with configurable batch size."""
    for i in range(0, len(data), batch_size):
        batch = data[i:i + batch_size]
        session.bulk_insert_mappings(User, batch)
        session.flush()  # Flush each batch
    session.commit()

Optimal Batch Sizes by Database

Database Recommended Batch Size
PostgreSQL 10,000 - 50,000
MySQL 5,000 - 20,000
SQLite 1,000 - 5,000
SQL Server 1,000 - 10,000

Factors affecting optimal batch size: - Available RAM - Network latency (remote databases need smaller batches) - Transaction log size limits - Database parameter limits (e.g., max_allowed_packet in MySQL)

# Memory-conscious batching for very large imports
def insert_memory_efficient(session, data_iterator, batch_size=10000):
    """Process data lazily without loading everything into memory."""
    batch = []
    for item in data_iterator:
        batch.append(item)
        if len(batch) >= batch_size:
            session.bulk_insert_mappings(User, batch)
            session.flush()
            batch = []

    if batch:  # Don't forget remaining items
        session.bulk_insert_mappings(User, batch)
    session.commit()

Common Pitfalls and Solutions

Pitfall 1: Autoflush Killing Performance

SQLAlchemy's autoflush can trigger unexpected database roundtrips:

# BAD: Autoflush triggers on each query
for user in users_data:
    session.add(User(**user))
    count = session.query(User).count()  # Triggers autoflush!

# GOOD: Disable autoflush during bulk operations
with session.no_autoflush:
    for user in users_data:
        session.add(User(**user))
    session.commit()

# OR: Configure session without autoflush
Session = sessionmaker(bind=engine, autoflush=False)

Pitfall 2: Relationships Not Working

Bulk operations bypass relationship handling:

# This WON'T work with bulk_save_objects
user = User(username='john')
user.role = Role(name='admin')  # Relationship ignored!
session.bulk_save_objects([user])

# Instead, insert separately with foreign keys
session.bulk_insert_mappings(Role, [{'name': 'admin'}])
session.flush()
admin_id = session.query(Role).filter_by(name='admin').first().id
session.bulk_insert_mappings(User, [{'username': 'john', 'role_id': admin_id}])

Pitfall 3: Duplicate Key Errors

Handle conflicts gracefully with PostgreSQL's ON CONFLICT:

from sqlalchemy.dialects.postgresql import insert as pg_insert

def upsert_users(session, users_data):
    """Insert or update on conflict (PostgreSQL)."""
    stmt = pg_insert(User).values(users_data)
    stmt = stmt.on_conflict_do_update(
        index_elements=['username'],
        set_={'email': stmt.excluded.email}
    )
    session.execute(stmt)
    session.commit()

Pitfall 4: Transaction Timeouts

Long-running bulk inserts can timeout:

def insert_with_checkpoints(session, data, batch_size=50000):
    """Commit in chunks to avoid transaction timeouts."""
    for i in range(0, len(data), batch_size):
        batch = data[i:i + batch_size]
        session.bulk_insert_mappings(User, batch)
        session.commit()  # Commit each chunk
        print(f"Inserted {min(i + batch_size, len(data))}/{len(data)} rows")

Pitfall 5: Python Datetime Not Working

Bulk operations may not apply Python defaults:

from datetime import datetime

# BAD: Python default won't be applied
users = [{'username': 'john'}]  # created_at will be NULL!
session.bulk_insert_mappings(User, users)

# GOOD: Include all values explicitly
users = [{'username': 'john', 'created_at': datetime.utcnow()}]
session.bulk_insert_mappings(User, users)

# OR: Use database default in model
created_at = Column(DateTime, server_default=func.now())

PostgreSQL COPY: The Nuclear Option

For massive datasets (millions of rows), PostgreSQL's COPY command is unbeatable:

import io
import csv

def insert_with_copy(engine, users_data, table_name='users'):
    """Ultra-fast insert using PostgreSQL COPY."""
    # Prepare CSV in memory
    output = io.StringIO()
    writer = csv.writer(output, delimiter='\t')

    for user in users_data:
        writer.writerow([user['username'], user['email']])

    output.seek(0)

    # Use raw connection for COPY
    raw_conn = engine.raw_connection()
    try:
        cursor = raw_conn.cursor()
        cursor.copy_from(
            output,
            table_name,
            columns=('username', 'email'),
            sep='\t',
            null=''
        )
        raw_conn.commit()
    finally:
        raw_conn.close()

Using psycopg2's copy_expert for More Control

def insert_with_copy_expert(engine, csv_file_path):
    """Load directly from a CSV file using COPY."""
    raw_conn = engine.raw_connection()
    try:
        cursor = raw_conn.cursor()
        with open(csv_file_path, 'r') as f:
            cursor.copy_expert(
                "COPY users (username, email) FROM STDIN WITH CSV HEADER",
                f
            )
        raw_conn.commit()
    finally:
        raw_conn.close()

Using StringIO for CSV Generation

import io
import csv

def bulk_copy_from_dicts(engine, table, data, columns):
    """Generic COPY helper for any table."""
    output = io.StringIO()
    writer = csv.DictWriter(output, fieldnames=columns, extrasaction='ignore')

    for row in data:
        writer.writerow(row)

    output.seek(0)

    raw_conn = engine.raw_connection()
    try:
        cursor = raw_conn.cursor()
        cursor.copy_from(output, table, columns=columns, sep=',', null='')
        raw_conn.commit()
    finally:
        raw_conn.close()

# Usage
bulk_copy_from_dicts(
    engine, 
    'users',
    users_data,
    columns=['username', 'email', 'role_id']
)

Loading CSV Files: Complete Example

Here's a production-ready CSV loader with progress tracking:

import csv
from tqdm import tqdm  # pip install tqdm

def load_csv_bulk(session, csv_path, model, batch_size=10000, columns=None):
    """
    Load a CSV file using bulk inserts with progress bar.

    Args:
        session: SQLAlchemy session
        csv_path: Path to CSV file
        model: SQLAlchemy model class
        batch_size: Rows per batch
        columns: Optional column mapping (CSV header -> model attribute)
    """
    with open(csv_path, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)

        # Get total rows for progress bar
        total = sum(1 for _ in f)
        f.seek(0)
        reader = csv.DictReader(f)

        batch = []
        for row in tqdm(reader, total=total, desc="Importing"):
            # Apply column mapping if provided
            if columns:
                row = {columns.get(k, k): v for k, v in row.items()}

            # Clean empty strings to None
            row = {k: (v if v != '' else None) for k, v in row.items()}

            batch.append(row)

            if len(batch) >= batch_size:
                session.bulk_insert_mappings(model, batch)
                session.flush()
                batch = []

        # Insert remaining rows
        if batch:
            session.bulk_insert_mappings(model, batch)

        session.commit()

# Usage
load_csv_bulk(
    session,
    'million_users.csv',
    User,
    batch_size=50000,
    columns={'user_name': 'username', 'user_email': 'email'}  # CSV -> model mapping
)

Decision Tree: Which Method Should You Use?

Need to insert bulk data?
│
├─ < 1,000 rows
│   └─ Use regular session.add_all() — simplicity wins
│
├─ 1,000 - 100,000 rows
│   ├─ Have ORM objects already? → bulk_save_objects()
│   └─ Have dictionaries/raw data? → bulk_insert_mappings()
│
├─ 100,000 - 1,000,000 rows
│   └─ Use Core insert().values() with batching
│
└─ > 1,000,000 rows (PostgreSQL)
    └─ Use COPY command — nothing else comes close

Summary

  1. For most cases: Use bulk_insert_mappings() with dictionaries — 15x faster than individual inserts
  2. For maximum SQLAlchemy speed: Use Core insert().values() — 40x faster
  3. For truly massive data: Use PostgreSQL COPY — 240x faster
  4. Always batch: 10,000-50,000 rows per batch for PostgreSQL
  5. Disable autoflush: Use session.no_autoflush during bulk operations
  6. Don't expect ORM features: Bulk methods bypass relationships and Python defaults

The performance difference between methods can mean the difference between a 2-hour import and a 30-second one. Choose wisely based on your data volume and requirements.