}

SQLAlchemy Bulk Insert 2026: 40x Faster with Core & ORM 2.0 Methods

Introduction

When you need to insert thousands or millions of rows into a database, the default SQLAlchemy ORM approach becomes painfully slow. A naive loop inserting one row at a time can take minutes for what should be a seconds-long operation.

This guide covers every bulk insert method in SQLAlchemy 2.0, from the new ORM-style session.execute(insert(...)) to raw Core performance and async support, with real benchmarks to help you choose the right approach.

Performance Summary

Method Speed vs session.add() Notes
PostgreSQL COPY ~240x Max performance, PostgreSQL only
Core insert().values() ~40x Best general approach
ORM 2.0 session.execute(insert()) ~20x Recommended for ORM usage
bulk_insert_mappings ~10x Legacy, still works in 2.0
session.add_all() 1x Only for small datasets

Why Bulk Insert Matters: The Performance Gap

Consider inserting 100,000 rows. Here's what you might see:

Method Time Relative Speed
Individual session.add() in loop ~120s 1x (baseline)
bulk_save_objects() ~15s 8x faster
bulk_insert_mappings() ~8s 15x faster
ORM 2.0 session.execute(insert()) ~6s ~20x faster
Core insert().values() ~3s 40x faster
PostgreSQL COPY ~0.5s 240x faster

The difference is dramatic. Let's explore each method.

Setting Up Our Example

All examples use this model:

from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
from datetime import datetime

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    username = Column(String(100), unique=True, nullable=False)
    email = Column(String(255))
    created_at = Column(DateTime, default=datetime.utcnow)
    role_id = Column(Integer, ForeignKey('roles.id'))

    role = relationship('Role', back_populates='users')

class Role(Base):
    __tablename__ = 'roles'

    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True)

    users = relationship('User', back_populates='role')

# Database setup
engine = create_engine('postgresql://user:pass@localhost/mydb')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

Method 1: SQLAlchemy 2.0 ORM Bulk Insert — session.execute(insert())

SQLAlchemy 2.0 introduces a new recommended way to do bulk inserts using the ORM. It combines the clarity of ORM models with Core-level performance, and it works in both synchronous and async sessions.

from sqlalchemy import insert
from sqlalchemy.orm import Session

def insert_orm_2_style(session: Session, users_data: list[dict]) -> None:
    """SQLAlchemy 2.0 recommended ORM bulk insert."""
    session.execute(insert(User), users_data)
    session.commit()

# Usage
users_data = [
    {"username": "alice", "email": "[email protected]", "role_id": 1},
    {"username": "bob",   "email": "[email protected]",   "role_id": 2},
    {"username": "carol", "email": "[email protected]", "role_id": 1},
]
insert_orm_2_style(session, users_data)

This is the preferred pattern in SQLAlchemy 2.0 for ORM-based bulk inserts. It uses INSERT ... VALUES under the hood via the ORM mapper, giving roughly 20x the speed of session.add() loops while still honouring ORM hooks.

Why prefer this over the legacy methods: - Works with both Session and AsyncSession - Supported in SQLAlchemy 2.0+ (not deprecated) - Compatible with RETURNING on PostgreSQL - Cleaner integration with type annotations and mapped classes

# With RETURNING (PostgreSQL) to get back generated IDs
from sqlalchemy import insert

stmt = insert(User).returning(User.id, User.username)
result = session.execute(stmt, users_data)
session.commit()

for row in result:
    print(f"Inserted user id={row.id} username={row.username}")

Method 2: Async SQLAlchemy Bulk Insert with AsyncSession

If your application uses asyncio (FastAPI, Starlette, async Django, etc.), use AsyncSession with the same insert() construct.

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy import insert

# Create an async engine (using asyncpg driver for PostgreSQL)
async_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/mydb",
    echo=False,
)
AsyncSessionLocal = async_sessionmaker(async_engine, expire_on_commit=False)

async def async_bulk_insert(users_data: list[dict]) -> None:
    """Async bulk insert using SQLAlchemy 2.0 AsyncSession."""
    async with AsyncSessionLocal() as session:
        await session.execute(insert(User), users_data)
        await session.commit()

# Call from an async context
import asyncio

async def main():
    data = [
        {"username": f"user_{i}", "email": f"user_{i}@example.com"}
        for i in range(10_000)
    ]
    await async_bulk_insert(data)

asyncio.run(main())

Batching with AsyncSession:

async def async_bulk_insert_batched(
    users_data: list[dict],
    batch_size: int = 500,
) -> None:
    """Async bulk insert with explicit batching."""
    async with AsyncSessionLocal() as session:
        for i in range(0, len(users_data), batch_size):
            batch = users_data[i : i + batch_size]
            await session.execute(insert(User), batch)
            await session.flush()  # flush each batch, single commit at end
        await session.commit()

Recommended batch sizes for async inserts: - 500–1,000 rows per batch keeps memory usage low and avoids parameter count limits - Flush after each batch, commit once at the end (or per logical checkpoint)


Method 3: Disabling Autoflush During Bulk Operations

SQLAlchemy's autoflush fires a FLUSH to the database whenever you execute a query while there are pending ORM objects. During bulk inserts this can silently introduce thousands of extra round-trips.

# BAD: autoflush fires every time session.query() is called
with Session(engine) as session:
    for row in large_dataset:
        session.add(User(**row))
        count = session.query(User).count()  # triggers autoflush every iteration!

# GOOD: disable autoflush for the duration of the bulk operation
with Session(engine) as session:
    with session.no_autoflush:
        for row in large_dataset:
            session.add(User(**row))
    session.commit()

For bulk operations that use session.execute(insert(...)) you typically do not need no_autoflush because insert() does not create pending ORM objects — but it is still a good habit when mixing ORM adds with queries in the same session:

# Disable autoflush when mixing add() with queries
with Session(engine) as session:
    session.autoflush = False
    try:
        session.execute(insert(User), users_batch)
        # safe to query without surprise flushes
        role = session.get(Role, 1)
        session.commit()
    finally:
        session.autoflush = True  # restore default

You can also configure the session factory to never autoflush:

from sqlalchemy.orm import sessionmaker

Session = sessionmaker(bind=engine, autoflush=False)

Batching Strategy: Recommended Chunk Sizes

Large bulk inserts should always be split into chunks. The optimal chunk size depends on row width, available RAM, and database parameters.

from sqlalchemy import insert

def bulk_insert_chunked(
    session,
    model,
    data: list[dict],
    chunk_size: int = 1000,
) -> int:
    """
    Insert data in chunks. Returns total rows inserted.
    chunk_size of 500-1000 is safe for most schemas.
    """
    total = 0
    for i in range(0, len(data), chunk_size):
        chunk = data[i : i + chunk_size]
        session.execute(insert(model), chunk)
        session.flush()
        total += len(chunk)
    session.commit()
    return total

# Usage
n = bulk_insert_chunked(session, User, users_data, chunk_size=750)
print(f"Inserted {n} rows")

Chunk size guidelines:

Database Recommended Chunk Size Notes
PostgreSQL 500–1,000 Safe default; push to 5,000 for narrow rows
MySQL / MariaDB 500–1,000 Limited by max_allowed_packet
SQLite 100–500 Lower concurrency, smaller batches safer
SQL Server 500–1,000 Limited to 2,100 parameters per statement

Why 500–1,000 and not 10,000? - SQLAlchemy 2.0's insert() with a list of dicts generates one INSERT ... VALUES (...) per row in some drivers, consuming one parameter slot per column per row. With a wide table (20 columns × 1,000 rows = 20,000 parameters) you can hit driver limits. - Smaller chunks also mean a crashed import can resume from the last committed chunk rather than re-doing everything.

For a streaming / generator-based import where the full dataset does not fit in memory:

def chunked(iterable, size):
    """Yield successive chunks from iterable."""
    chunk = []
    for item in iterable:
        chunk.append(item)
        if len(chunk) >= size:
            yield chunk
            chunk = []
    if chunk:
        yield chunk

def stream_insert(session, model, data_iterator, chunk_size=500):
    """Insert from a lazy iterator without loading all data into RAM."""
    for chunk in chunked(data_iterator, chunk_size):
        session.execute(insert(model), chunk)
        session.flush()
    session.commit()

Method 4: Legacy bulk_insert_mappings() — Fast Dictionary Inserts

The bulk_insert_mappings() method is still available in SQLAlchemy 2.0 but is considered legacy. Prefer session.execute(insert(Model), data) for new code. It is documented here because you will find it in many existing codebases.

The method accepts a list of dictionaries. Dictionary keys must match model column names.

def insert_with_mappings(session, users_data):
    """Insert using bulk_insert_mappings - recommended for most cases."""
    session.bulk_insert_mappings(User, users_data)
    session.commit()

# Usage
users_to_insert = [
    {'username': 'john', 'email': '[email protected]', 'role_id': 1},
    {'username': 'mary', 'email': '[email protected]', 'role_id': 2},
    {'username': 'susan', 'email': '[email protected]', 'role_id': 1},
]
insert_with_mappings(session, users_to_insert)

Pros: - Fast - skips ORM object instantiation overhead - Memory efficient - dictionaries use less RAM than ORM objects - Clean syntax for data from external sources (CSV, JSON, APIs)

Cons: - No relationship handling - No default value population (except database defaults) - Objects not tracked by session after insert

Handling Relationships with Mappings

For relationships, use foreign key IDs directly:

# First insert roles
roles = [{'name': 'admin'}, {'name': 'user'}]
session.bulk_insert_mappings(Role, roles)
session.commit()

# Get role IDs
admin_id = session.query(Role).filter_by(name='admin').first().id

# Insert users with role_id
users = [
    {'username': 'john', 'role_id': admin_id},
    {'username': 'mary', 'role_id': admin_id},
]
session.bulk_insert_mappings(User, users)
session.commit()

Method 5: bulk_save_objects() — ORM Object Bulk Insert (Legacy)

When you already have ORM objects or need SQLAlchemy features like defaults:

def insert_with_objects(session, users):
    """Insert using bulk_save_objects - when you have ORM instances."""
    session.bulk_save_objects(users)
    session.commit()

# Usage
users = [
    User(username='alice', email='[email protected]'),
    User(username='bob', email='[email protected]'),
    User(username='charlie', email='[email protected]'),
]
insert_with_objects(session, users)

Getting Generated IDs Back

If you need the generated primary keys:

users = [User(username='test1'), User(username='test2')]
session.bulk_save_objects(users, return_defaults=True)
session.commit()

for user in users:
    print(f"Created user with ID: {user.id}")

Warning: return_defaults=True significantly reduces performance because it requires individual INSERT statements to retrieve each ID.

Method 6: Core insert().values() — The Fastest ORM-Compatible Method

SQLAlchemy Core provides the fastest insert while staying within SQLAlchemy:

from sqlalchemy import insert

def insert_with_core(session, users_data):
    """Insert using Core - fastest SQLAlchemy method."""
    stmt = insert(User).values(users_data)
    session.execute(stmt)
    session.commit()

# Usage
users_data = [
    {'username': 'user1', 'email': '[email protected]'},
    {'username': 'user2', 'email': '[email protected]'},
    {'username': 'user3', 'email': '[email protected]'},
]
insert_with_core(session, users_data)

Alternative: Using connection.execute()

For maximum performance, bypass the session entirely:

from sqlalchemy import insert

def insert_with_connection(engine, users_data):
    """Insert using connection directly - minimal overhead."""
    with engine.connect() as conn:
        stmt = insert(User).values(users_data)
        conn.execute(stmt)
        conn.commit()

# Or using the table directly
def insert_with_table(engine, users_data):
    """Insert using table object."""
    with engine.connect() as conn:
        conn.execute(User.__table__.insert(), users_data)
        conn.commit()

PostgreSQL: Using RETURNING

Get inserted IDs with PostgreSQL's RETURNING clause:

from sqlalchemy import insert

def insert_with_returning(session, users_data):
    """Insert and return generated IDs (PostgreSQL)."""
    stmt = insert(User).values(users_data).returning(User.id, User.username)
    result = session.execute(stmt)
    session.commit()

    for row in result:
        print(f"Inserted: id={row.id}, username={row.username}")

Method 7: executemany() with Batching — Ultimate Control

For maximum control over batching:

from sqlalchemy import text

def insert_with_executemany(engine, users_data, batch_size=10000):
    """Insert with explicit batching using executemany."""
    with engine.connect() as conn:
        for i in range(0, len(users_data), batch_size):
            batch = users_data[i:i + batch_size]
            conn.execute(
                text("INSERT INTO users (username, email) VALUES (:username, :email)"),
                batch
            )
        conn.commit()

Performance Comparison: Real Benchmarks

Here's a benchmark inserting 100,000 rows into PostgreSQL:

import time
from sqlalchemy import insert

def benchmark(name, func, *args):
    start = time.perf_counter()
    func(*args)
    elapsed = time.perf_counter() - start
    print(f"{name}: {elapsed:.2f}s")

# Generate test data
test_data = [{'username': f'user_{i}', 'email': f'user_{i}@test.com'} 
             for i in range(100_000)]

# Run benchmarks
benchmark("Individual adds", insert_individual, session, test_data)      # ~120s
benchmark("bulk_save_objects", insert_bulk_objects, session, test_data)  # ~15s  
benchmark("bulk_insert_mappings", insert_bulk_mappings, session, test_data)  # ~8s
benchmark("Core insert().values()", insert_core, session, test_data)     # ~3s
benchmark("PostgreSQL COPY", insert_copy, engine, test_data)             # ~0.5s

Results Summary

Method 100K Rows Memory Usage ORM Features SQLAlchemy 2.0 Status
Individual session.add() 120s High Full Current
bulk_save_objects() 15s Medium Partial Legacy
bulk_insert_mappings() 8s Low Minimal Legacy
ORM 2.0 session.execute(insert()) ~6s Low Partial Recommended
Core insert().values() 3s Very Low None Current
PostgreSQL COPY 0.5s Minimal None Current

Batch Size Considerations

Batch size significantly impacts performance. With SQLAlchemy 2.0's session.execute(insert(...)), use chunks of 500–1,000 rows as a safe default. Larger batches may hit driver parameter limits for wide tables.

from sqlalchemy import insert

def insert_batched(session, data, batch_size=1000):
    """Insert with configurable batch size — SQLAlchemy 2.0 style."""
    for i in range(0, len(data), batch_size):
        batch = data[i:i + batch_size]
        session.execute(insert(User), batch)
        session.flush()  # Flush each batch
    session.commit()

Optimal Batch Sizes by Database

Database Recommended Chunk Size Notes
PostgreSQL 500–1,000 Push to 5,000 for narrow tables
MySQL / MariaDB 500–1,000 Limited by max_allowed_packet
SQLite 100–500 Lower concurrency
SQL Server 500–1,000 Hard limit: 2,100 parameters/statement

Factors affecting optimal batch size: - Available RAM - Number of columns (more columns = fewer rows per chunk) - Network latency (remote databases need smaller batches) - Transaction log size limits - Database parameter limits (e.g., max_allowed_packet in MySQL)

# Memory-conscious batching for very large imports (SQLAlchemy 2.0)
from sqlalchemy import insert

def insert_memory_efficient(session, data_iterator, batch_size=500):
    """Process data lazily without loading everything into memory."""
    batch = []
    for item in data_iterator:
        batch.append(item)
        if len(batch) >= batch_size:
            session.execute(insert(User), batch)
            session.flush()
            batch = []

    if batch:  # Don't forget remaining items
        session.execute(insert(User), batch)
    session.commit()

Common Pitfalls and Solutions

Pitfall 1: Autoflush Killing Performance

SQLAlchemy's autoflush can trigger unexpected database roundtrips:

# BAD: Autoflush triggers on each query
for user in users_data:
    session.add(User(**user))
    count = session.query(User).count()  # Triggers autoflush!

# GOOD: Disable autoflush during bulk operations
with session.no_autoflush:
    for user in users_data:
        session.add(User(**user))
    session.commit()

# OR: Configure session without autoflush
Session = sessionmaker(bind=engine, autoflush=False)

Pitfall 2: Relationships Not Working

Bulk operations bypass relationship handling:

# This WON'T work with bulk_save_objects
user = User(username='john')
user.role = Role(name='admin')  # Relationship ignored!
session.bulk_save_objects([user])

# Instead, insert separately with foreign keys
session.bulk_insert_mappings(Role, [{'name': 'admin'}])
session.flush()
admin_id = session.query(Role).filter_by(name='admin').first().id
session.bulk_insert_mappings(User, [{'username': 'john', 'role_id': admin_id}])

Pitfall 3: Duplicate Key Errors

Handle conflicts gracefully with PostgreSQL's ON CONFLICT:

from sqlalchemy.dialects.postgresql import insert as pg_insert

def upsert_users(session, users_data):
    """Insert or update on conflict (PostgreSQL)."""
    stmt = pg_insert(User).values(users_data)
    stmt = stmt.on_conflict_do_update(
        index_elements=['username'],
        set_={'email': stmt.excluded.email}
    )
    session.execute(stmt)
    session.commit()

Pitfall 4: Transaction Timeouts

Long-running bulk inserts can timeout. Commit in chunks to stay within transaction limits:

from sqlalchemy import insert

def insert_with_checkpoints(session, data, batch_size=1000):
    """Commit in chunks to avoid transaction timeouts (SQLAlchemy 2.0)."""
    for i in range(0, len(data), batch_size):
        batch = data[i:i + batch_size]
        session.execute(insert(User), batch)
        session.commit()  # Commit each chunk
        print(f"Inserted {min(i + batch_size, len(data))}/{len(data)} rows")

Pitfall 5: Python Datetime Not Working

Bulk operations may not apply Python defaults:

from datetime import datetime

# BAD: Python default won't be applied
users = [{'username': 'john'}]  # created_at will be NULL!
session.bulk_insert_mappings(User, users)

# GOOD: Include all values explicitly
users = [{'username': 'john', 'created_at': datetime.utcnow()}]
session.bulk_insert_mappings(User, users)

# OR: Use database default in model
created_at = Column(DateTime, server_default=func.now())

PostgreSQL COPY: The Nuclear Option

For massive datasets (millions of rows), PostgreSQL's COPY command is unbeatable:

import io
import csv

def insert_with_copy(engine, users_data, table_name='users'):
    """Ultra-fast insert using PostgreSQL COPY."""
    # Prepare CSV in memory
    output = io.StringIO()
    writer = csv.writer(output, delimiter='\t')

    for user in users_data:
        writer.writerow([user['username'], user['email']])

    output.seek(0)

    # Use raw connection for COPY
    raw_conn = engine.raw_connection()
    try:
        cursor = raw_conn.cursor()
        cursor.copy_from(
            output,
            table_name,
            columns=('username', 'email'),
            sep='\t',
            null=''
        )
        raw_conn.commit()
    finally:
        raw_conn.close()

Using psycopg2's copy_expert for More Control

def insert_with_copy_expert(engine, csv_file_path):
    """Load directly from a CSV file using COPY."""
    raw_conn = engine.raw_connection()
    try:
        cursor = raw_conn.cursor()
        with open(csv_file_path, 'r') as f:
            cursor.copy_expert(
                "COPY users (username, email) FROM STDIN WITH CSV HEADER",
                f
            )
        raw_conn.commit()
    finally:
        raw_conn.close()

Using StringIO for CSV Generation

import io
import csv

def bulk_copy_from_dicts(engine, table, data, columns):
    """Generic COPY helper for any table."""
    output = io.StringIO()
    writer = csv.DictWriter(output, fieldnames=columns, extrasaction='ignore')

    for row in data:
        writer.writerow(row)

    output.seek(0)

    raw_conn = engine.raw_connection()
    try:
        cursor = raw_conn.cursor()
        cursor.copy_from(output, table, columns=columns, sep=',', null='')
        raw_conn.commit()
    finally:
        raw_conn.close()

# Usage
bulk_copy_from_dicts(
    engine, 
    'users',
    users_data,
    columns=['username', 'email', 'role_id']
)

Loading CSV Files: Complete Example

Here's a production-ready CSV loader with progress tracking:

import csv
from sqlalchemy import insert
from tqdm import tqdm  # pip install tqdm

def load_csv_bulk(session, csv_path, model, batch_size=1000, columns=None):
    """
    Load a CSV file using bulk inserts with progress bar (SQLAlchemy 2.0).

    Args:
        session: SQLAlchemy session
        csv_path: Path to CSV file
        model: SQLAlchemy model class
        batch_size: Rows per batch (500-1000 recommended for SQLAlchemy 2.0)
        columns: Optional column mapping (CSV header -> model attribute)
    """
    with open(csv_path, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)

        # Get total rows for progress bar
        total = sum(1 for _ in f)
        f.seek(0)
        reader = csv.DictReader(f)

        batch = []
        for row in tqdm(reader, total=total, desc="Importing"):
            # Apply column mapping if provided
            if columns:
                row = {columns.get(k, k): v for k, v in row.items()}

            # Clean empty strings to None
            row = {k: (v if v != '' else None) for k, v in row.items()}

            batch.append(row)

            if len(batch) >= batch_size:
                session.execute(insert(model), batch)
                session.flush()
                batch = []

        # Insert remaining rows
        if batch:
            session.execute(insert(model), batch)

        session.commit()

# Usage
load_csv_bulk(
    session,
    'million_users.csv',
    User,
    batch_size=1000,
    columns={'user_name': 'username', 'user_email': 'email'}  # CSV -> model mapping
)

Decision Tree: Which Method Should You Use?

Need to insert bulk data?
│
├─ < 1,000 rows
│   └─ Use regular session.add_all() — simplicity wins
│
├─ 1,000 - 100,000 rows
│   ├─ Using async (FastAPI, etc.)? → AsyncSession + session.execute(insert(Model), data)
│   ├─ Using SQLAlchemy 2.0 ORM?  → session.execute(insert(Model), data)   [recommended]
│   ├─ Have ORM objects already?  → bulk_save_objects()                    [legacy]
│   └─ Have dictionaries/raw data?→ bulk_insert_mappings()                 [legacy]
│
├─ 100,000 - 1,000,000 rows
│   └─ Use Core insert().values() with batching (chunk_size=500-1000)
│
└─ > 1,000,000 rows (PostgreSQL)
    └─ Use COPY command — nothing else comes close

Summary

  1. SQLAlchemy 2.0 recommended: Use session.execute(insert(Model), data) — ~20x faster than session.add() and works with both Session and AsyncSession
  2. Async support: AsyncSession with await session.execute(insert(Model), data) for asyncio-based applications (FastAPI, etc.)
  3. For maximum SQLAlchemy speed: Use Core insert().values() — 40x faster
  4. For truly massive data: Use PostgreSQL COPY — 240x faster
  5. Always batch in chunks of 500–1,000 rows: Avoids driver parameter limits and allows resumable imports
  6. Disable autoflush: Use session.no_autoflush or session.autoflush = False during bulk operations
  7. Legacy APIs still work: bulk_insert_mappings() and bulk_save_objects() remain functional in SQLAlchemy 2.0 but are not recommended for new code
  8. Don't expect ORM features: Bulk methods bypass relationships and Python defaults

The performance difference between methods can mean the difference between a 2-hour import and a 30-second one. Choose wisely based on your data volume and requirements.