Introduction
When you need to insert thousands or millions of rows into a database, the default SQLAlchemy ORM approach becomes painfully slow. A naive loop inserting one row at a time can take minutes for what should be a seconds-long operation.
This guide covers every bulk insert method in SQLAlchemy 2.0, from the new ORM-style session.execute(insert(...)) to raw Core performance and async support, with real benchmarks to help you choose the right approach.
Performance Summary
| Method | Speed vs session.add() | Notes |
|---|---|---|
| PostgreSQL COPY | ~240x | Max performance, PostgreSQL only |
| Core insert().values() | ~40x | Best general approach |
| ORM 2.0 session.execute(insert()) | ~20x | Recommended for ORM usage |
| bulk_insert_mappings | ~10x | Legacy, still works in 2.0 |
| session.add_all() | 1x | Only for small datasets |
Why Bulk Insert Matters: The Performance Gap
Consider inserting 100,000 rows. Here's what you might see:
| Method | Time | Relative Speed |
|---|---|---|
Individual session.add() in loop |
~120s | 1x (baseline) |
bulk_save_objects() |
~15s | 8x faster |
bulk_insert_mappings() |
~8s | 15x faster |
ORM 2.0 session.execute(insert()) |
~6s | ~20x faster |
Core insert().values() |
~3s | 40x faster |
PostgreSQL COPY |
~0.5s | 240x faster |
The difference is dramatic. Let's explore each method.
Setting Up Our Example
All examples use this model:
from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
from datetime import datetime
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(100), unique=True, nullable=False)
email = Column(String(255))
created_at = Column(DateTime, default=datetime.utcnow)
role_id = Column(Integer, ForeignKey('roles.id'))
role = relationship('Role', back_populates='users')
class Role(Base):
__tablename__ = 'roles'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True)
users = relationship('User', back_populates='role')
# Database setup
engine = create_engine('postgresql://user:pass@localhost/mydb')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
Method 1: SQLAlchemy 2.0 ORM Bulk Insert — session.execute(insert())
SQLAlchemy 2.0 introduces a new recommended way to do bulk inserts using the ORM. It combines the clarity of ORM models with Core-level performance, and it works in both synchronous and async sessions.
from sqlalchemy import insert
from sqlalchemy.orm import Session
def insert_orm_2_style(session: Session, users_data: list[dict]) -> None:
"""SQLAlchemy 2.0 recommended ORM bulk insert."""
session.execute(insert(User), users_data)
session.commit()
# Usage
users_data = [
{"username": "alice", "email": "[email protected]", "role_id": 1},
{"username": "bob", "email": "[email protected]", "role_id": 2},
{"username": "carol", "email": "[email protected]", "role_id": 1},
]
insert_orm_2_style(session, users_data)
This is the preferred pattern in SQLAlchemy 2.0 for ORM-based bulk inserts. It uses INSERT ... VALUES under the hood via the ORM mapper, giving roughly 20x the speed of session.add() loops while still honouring ORM hooks.
Why prefer this over the legacy methods:
- Works with both Session and AsyncSession
- Supported in SQLAlchemy 2.0+ (not deprecated)
- Compatible with RETURNING on PostgreSQL
- Cleaner integration with type annotations and mapped classes
# With RETURNING (PostgreSQL) to get back generated IDs
from sqlalchemy import insert
stmt = insert(User).returning(User.id, User.username)
result = session.execute(stmt, users_data)
session.commit()
for row in result:
print(f"Inserted user id={row.id} username={row.username}")
Method 2: Async SQLAlchemy Bulk Insert with AsyncSession
If your application uses asyncio (FastAPI, Starlette, async Django, etc.), use AsyncSession with the same insert() construct.
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy import insert
# Create an async engine (using asyncpg driver for PostgreSQL)
async_engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/mydb",
echo=False,
)
AsyncSessionLocal = async_sessionmaker(async_engine, expire_on_commit=False)
async def async_bulk_insert(users_data: list[dict]) -> None:
"""Async bulk insert using SQLAlchemy 2.0 AsyncSession."""
async with AsyncSessionLocal() as session:
await session.execute(insert(User), users_data)
await session.commit()
# Call from an async context
import asyncio
async def main():
data = [
{"username": f"user_{i}", "email": f"user_{i}@example.com"}
for i in range(10_000)
]
await async_bulk_insert(data)
asyncio.run(main())
Batching with AsyncSession:
async def async_bulk_insert_batched(
users_data: list[dict],
batch_size: int = 500,
) -> None:
"""Async bulk insert with explicit batching."""
async with AsyncSessionLocal() as session:
for i in range(0, len(users_data), batch_size):
batch = users_data[i : i + batch_size]
await session.execute(insert(User), batch)
await session.flush() # flush each batch, single commit at end
await session.commit()
Recommended batch sizes for async inserts: - 500–1,000 rows per batch keeps memory usage low and avoids parameter count limits - Flush after each batch, commit once at the end (or per logical checkpoint)
Method 3: Disabling Autoflush During Bulk Operations
SQLAlchemy's autoflush fires a FLUSH to the database whenever you execute a query while there are pending ORM objects. During bulk inserts this can silently introduce thousands of extra round-trips.
# BAD: autoflush fires every time session.query() is called
with Session(engine) as session:
for row in large_dataset:
session.add(User(**row))
count = session.query(User).count() # triggers autoflush every iteration!
# GOOD: disable autoflush for the duration of the bulk operation
with Session(engine) as session:
with session.no_autoflush:
for row in large_dataset:
session.add(User(**row))
session.commit()
For bulk operations that use session.execute(insert(...)) you typically do not need no_autoflush because insert() does not create pending ORM objects — but it is still a good habit when mixing ORM adds with queries in the same session:
# Disable autoflush when mixing add() with queries
with Session(engine) as session:
session.autoflush = False
try:
session.execute(insert(User), users_batch)
# safe to query without surprise flushes
role = session.get(Role, 1)
session.commit()
finally:
session.autoflush = True # restore default
You can also configure the session factory to never autoflush:
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine, autoflush=False)
Batching Strategy: Recommended Chunk Sizes
Large bulk inserts should always be split into chunks. The optimal chunk size depends on row width, available RAM, and database parameters.
from sqlalchemy import insert
def bulk_insert_chunked(
session,
model,
data: list[dict],
chunk_size: int = 1000,
) -> int:
"""
Insert data in chunks. Returns total rows inserted.
chunk_size of 500-1000 is safe for most schemas.
"""
total = 0
for i in range(0, len(data), chunk_size):
chunk = data[i : i + chunk_size]
session.execute(insert(model), chunk)
session.flush()
total += len(chunk)
session.commit()
return total
# Usage
n = bulk_insert_chunked(session, User, users_data, chunk_size=750)
print(f"Inserted {n} rows")
Chunk size guidelines:
| Database | Recommended Chunk Size | Notes |
|---|---|---|
| PostgreSQL | 500–1,000 | Safe default; push to 5,000 for narrow rows |
| MySQL / MariaDB | 500–1,000 | Limited by max_allowed_packet |
| SQLite | 100–500 | Lower concurrency, smaller batches safer |
| SQL Server | 500–1,000 | Limited to 2,100 parameters per statement |
Why 500–1,000 and not 10,000?
- SQLAlchemy 2.0's insert() with a list of dicts generates one INSERT ... VALUES (...) per row in some drivers, consuming one parameter slot per column per row. With a wide table (20 columns × 1,000 rows = 20,000 parameters) you can hit driver limits.
- Smaller chunks also mean a crashed import can resume from the last committed chunk rather than re-doing everything.
For a streaming / generator-based import where the full dataset does not fit in memory:
def chunked(iterable, size):
"""Yield successive chunks from iterable."""
chunk = []
for item in iterable:
chunk.append(item)
if len(chunk) >= size:
yield chunk
chunk = []
if chunk:
yield chunk
def stream_insert(session, model, data_iterator, chunk_size=500):
"""Insert from a lazy iterator without loading all data into RAM."""
for chunk in chunked(data_iterator, chunk_size):
session.execute(insert(model), chunk)
session.flush()
session.commit()
Method 4: Legacy bulk_insert_mappings() — Fast Dictionary Inserts
The bulk_insert_mappings() method is still available in SQLAlchemy 2.0 but is considered legacy. Prefer session.execute(insert(Model), data) for new code. It is documented here because you will find it in many existing codebases.
The method accepts a list of dictionaries. Dictionary keys must match model column names.
def insert_with_mappings(session, users_data):
"""Insert using bulk_insert_mappings - recommended for most cases."""
session.bulk_insert_mappings(User, users_data)
session.commit()
# Usage
users_to_insert = [
{'username': 'john', 'email': '[email protected]', 'role_id': 1},
{'username': 'mary', 'email': '[email protected]', 'role_id': 2},
{'username': 'susan', 'email': '[email protected]', 'role_id': 1},
]
insert_with_mappings(session, users_to_insert)
Pros: - Fast - skips ORM object instantiation overhead - Memory efficient - dictionaries use less RAM than ORM objects - Clean syntax for data from external sources (CSV, JSON, APIs)
Cons: - No relationship handling - No default value population (except database defaults) - Objects not tracked by session after insert
Handling Relationships with Mappings
For relationships, use foreign key IDs directly:
# First insert roles
roles = [{'name': 'admin'}, {'name': 'user'}]
session.bulk_insert_mappings(Role, roles)
session.commit()
# Get role IDs
admin_id = session.query(Role).filter_by(name='admin').first().id
# Insert users with role_id
users = [
{'username': 'john', 'role_id': admin_id},
{'username': 'mary', 'role_id': admin_id},
]
session.bulk_insert_mappings(User, users)
session.commit()
Method 5: bulk_save_objects() — ORM Object Bulk Insert (Legacy)
When you already have ORM objects or need SQLAlchemy features like defaults:
def insert_with_objects(session, users):
"""Insert using bulk_save_objects - when you have ORM instances."""
session.bulk_save_objects(users)
session.commit()
# Usage
users = [
User(username='alice', email='[email protected]'),
User(username='bob', email='[email protected]'),
User(username='charlie', email='[email protected]'),
]
insert_with_objects(session, users)
Getting Generated IDs Back
If you need the generated primary keys:
users = [User(username='test1'), User(username='test2')]
session.bulk_save_objects(users, return_defaults=True)
session.commit()
for user in users:
print(f"Created user with ID: {user.id}")
Warning: return_defaults=True significantly reduces performance because it requires individual INSERT statements to retrieve each ID.
Method 6: Core insert().values() — The Fastest ORM-Compatible Method
SQLAlchemy Core provides the fastest insert while staying within SQLAlchemy:
from sqlalchemy import insert
def insert_with_core(session, users_data):
"""Insert using Core - fastest SQLAlchemy method."""
stmt = insert(User).values(users_data)
session.execute(stmt)
session.commit()
# Usage
users_data = [
{'username': 'user1', 'email': '[email protected]'},
{'username': 'user2', 'email': '[email protected]'},
{'username': 'user3', 'email': '[email protected]'},
]
insert_with_core(session, users_data)
Alternative: Using connection.execute()
For maximum performance, bypass the session entirely:
from sqlalchemy import insert
def insert_with_connection(engine, users_data):
"""Insert using connection directly - minimal overhead."""
with engine.connect() as conn:
stmt = insert(User).values(users_data)
conn.execute(stmt)
conn.commit()
# Or using the table directly
def insert_with_table(engine, users_data):
"""Insert using table object."""
with engine.connect() as conn:
conn.execute(User.__table__.insert(), users_data)
conn.commit()
PostgreSQL: Using RETURNING
Get inserted IDs with PostgreSQL's RETURNING clause:
from sqlalchemy import insert
def insert_with_returning(session, users_data):
"""Insert and return generated IDs (PostgreSQL)."""
stmt = insert(User).values(users_data).returning(User.id, User.username)
result = session.execute(stmt)
session.commit()
for row in result:
print(f"Inserted: id={row.id}, username={row.username}")
Method 7: executemany() with Batching — Ultimate Control
For maximum control over batching:
from sqlalchemy import text
def insert_with_executemany(engine, users_data, batch_size=10000):
"""Insert with explicit batching using executemany."""
with engine.connect() as conn:
for i in range(0, len(users_data), batch_size):
batch = users_data[i:i + batch_size]
conn.execute(
text("INSERT INTO users (username, email) VALUES (:username, :email)"),
batch
)
conn.commit()
Performance Comparison: Real Benchmarks
Here's a benchmark inserting 100,000 rows into PostgreSQL:
import time
from sqlalchemy import insert
def benchmark(name, func, *args):
start = time.perf_counter()
func(*args)
elapsed = time.perf_counter() - start
print(f"{name}: {elapsed:.2f}s")
# Generate test data
test_data = [{'username': f'user_{i}', 'email': f'user_{i}@test.com'}
for i in range(100_000)]
# Run benchmarks
benchmark("Individual adds", insert_individual, session, test_data) # ~120s
benchmark("bulk_save_objects", insert_bulk_objects, session, test_data) # ~15s
benchmark("bulk_insert_mappings", insert_bulk_mappings, session, test_data) # ~8s
benchmark("Core insert().values()", insert_core, session, test_data) # ~3s
benchmark("PostgreSQL COPY", insert_copy, engine, test_data) # ~0.5s
Results Summary
| Method | 100K Rows | Memory Usage | ORM Features | SQLAlchemy 2.0 Status |
|---|---|---|---|---|
| Individual session.add() | 120s | High | Full | Current |
| bulk_save_objects() | 15s | Medium | Partial | Legacy |
| bulk_insert_mappings() | 8s | Low | Minimal | Legacy |
| ORM 2.0 session.execute(insert()) | ~6s | Low | Partial | Recommended |
| Core insert().values() | 3s | Very Low | None | Current |
| PostgreSQL COPY | 0.5s | Minimal | None | Current |
Batch Size Considerations
Batch size significantly impacts performance. With SQLAlchemy 2.0's session.execute(insert(...)), use chunks of 500–1,000 rows as a safe default. Larger batches may hit driver parameter limits for wide tables.
from sqlalchemy import insert
def insert_batched(session, data, batch_size=1000):
"""Insert with configurable batch size — SQLAlchemy 2.0 style."""
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
session.execute(insert(User), batch)
session.flush() # Flush each batch
session.commit()
Optimal Batch Sizes by Database
| Database | Recommended Chunk Size | Notes |
|---|---|---|
| PostgreSQL | 500–1,000 | Push to 5,000 for narrow tables |
| MySQL / MariaDB | 500–1,000 | Limited by max_allowed_packet |
| SQLite | 100–500 | Lower concurrency |
| SQL Server | 500–1,000 | Hard limit: 2,100 parameters/statement |
Factors affecting optimal batch size:
- Available RAM
- Number of columns (more columns = fewer rows per chunk)
- Network latency (remote databases need smaller batches)
- Transaction log size limits
- Database parameter limits (e.g., max_allowed_packet in MySQL)
# Memory-conscious batching for very large imports (SQLAlchemy 2.0)
from sqlalchemy import insert
def insert_memory_efficient(session, data_iterator, batch_size=500):
"""Process data lazily without loading everything into memory."""
batch = []
for item in data_iterator:
batch.append(item)
if len(batch) >= batch_size:
session.execute(insert(User), batch)
session.flush()
batch = []
if batch: # Don't forget remaining items
session.execute(insert(User), batch)
session.commit()
Common Pitfalls and Solutions
Pitfall 1: Autoflush Killing Performance
SQLAlchemy's autoflush can trigger unexpected database roundtrips:
# BAD: Autoflush triggers on each query
for user in users_data:
session.add(User(**user))
count = session.query(User).count() # Triggers autoflush!
# GOOD: Disable autoflush during bulk operations
with session.no_autoflush:
for user in users_data:
session.add(User(**user))
session.commit()
# OR: Configure session without autoflush
Session = sessionmaker(bind=engine, autoflush=False)
Pitfall 2: Relationships Not Working
Bulk operations bypass relationship handling:
# This WON'T work with bulk_save_objects
user = User(username='john')
user.role = Role(name='admin') # Relationship ignored!
session.bulk_save_objects([user])
# Instead, insert separately with foreign keys
session.bulk_insert_mappings(Role, [{'name': 'admin'}])
session.flush()
admin_id = session.query(Role).filter_by(name='admin').first().id
session.bulk_insert_mappings(User, [{'username': 'john', 'role_id': admin_id}])
Pitfall 3: Duplicate Key Errors
Handle conflicts gracefully with PostgreSQL's ON CONFLICT:
from sqlalchemy.dialects.postgresql import insert as pg_insert
def upsert_users(session, users_data):
"""Insert or update on conflict (PostgreSQL)."""
stmt = pg_insert(User).values(users_data)
stmt = stmt.on_conflict_do_update(
index_elements=['username'],
set_={'email': stmt.excluded.email}
)
session.execute(stmt)
session.commit()
Pitfall 4: Transaction Timeouts
Long-running bulk inserts can timeout. Commit in chunks to stay within transaction limits:
from sqlalchemy import insert
def insert_with_checkpoints(session, data, batch_size=1000):
"""Commit in chunks to avoid transaction timeouts (SQLAlchemy 2.0)."""
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
session.execute(insert(User), batch)
session.commit() # Commit each chunk
print(f"Inserted {min(i + batch_size, len(data))}/{len(data)} rows")
Pitfall 5: Python Datetime Not Working
Bulk operations may not apply Python defaults:
from datetime import datetime
# BAD: Python default won't be applied
users = [{'username': 'john'}] # created_at will be NULL!
session.bulk_insert_mappings(User, users)
# GOOD: Include all values explicitly
users = [{'username': 'john', 'created_at': datetime.utcnow()}]
session.bulk_insert_mappings(User, users)
# OR: Use database default in model
created_at = Column(DateTime, server_default=func.now())
PostgreSQL COPY: The Nuclear Option
For massive datasets (millions of rows), PostgreSQL's COPY command is unbeatable:
import io
import csv
def insert_with_copy(engine, users_data, table_name='users'):
"""Ultra-fast insert using PostgreSQL COPY."""
# Prepare CSV in memory
output = io.StringIO()
writer = csv.writer(output, delimiter='\t')
for user in users_data:
writer.writerow([user['username'], user['email']])
output.seek(0)
# Use raw connection for COPY
raw_conn = engine.raw_connection()
try:
cursor = raw_conn.cursor()
cursor.copy_from(
output,
table_name,
columns=('username', 'email'),
sep='\t',
null=''
)
raw_conn.commit()
finally:
raw_conn.close()
Using psycopg2's copy_expert for More Control
def insert_with_copy_expert(engine, csv_file_path):
"""Load directly from a CSV file using COPY."""
raw_conn = engine.raw_connection()
try:
cursor = raw_conn.cursor()
with open(csv_file_path, 'r') as f:
cursor.copy_expert(
"COPY users (username, email) FROM STDIN WITH CSV HEADER",
f
)
raw_conn.commit()
finally:
raw_conn.close()
Using StringIO for CSV Generation
import io
import csv
def bulk_copy_from_dicts(engine, table, data, columns):
"""Generic COPY helper for any table."""
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=columns, extrasaction='ignore')
for row in data:
writer.writerow(row)
output.seek(0)
raw_conn = engine.raw_connection()
try:
cursor = raw_conn.cursor()
cursor.copy_from(output, table, columns=columns, sep=',', null='')
raw_conn.commit()
finally:
raw_conn.close()
# Usage
bulk_copy_from_dicts(
engine,
'users',
users_data,
columns=['username', 'email', 'role_id']
)
Loading CSV Files: Complete Example
Here's a production-ready CSV loader with progress tracking:
import csv
from sqlalchemy import insert
from tqdm import tqdm # pip install tqdm
def load_csv_bulk(session, csv_path, model, batch_size=1000, columns=None):
"""
Load a CSV file using bulk inserts with progress bar (SQLAlchemy 2.0).
Args:
session: SQLAlchemy session
csv_path: Path to CSV file
model: SQLAlchemy model class
batch_size: Rows per batch (500-1000 recommended for SQLAlchemy 2.0)
columns: Optional column mapping (CSV header -> model attribute)
"""
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
# Get total rows for progress bar
total = sum(1 for _ in f)
f.seek(0)
reader = csv.DictReader(f)
batch = []
for row in tqdm(reader, total=total, desc="Importing"):
# Apply column mapping if provided
if columns:
row = {columns.get(k, k): v for k, v in row.items()}
# Clean empty strings to None
row = {k: (v if v != '' else None) for k, v in row.items()}
batch.append(row)
if len(batch) >= batch_size:
session.execute(insert(model), batch)
session.flush()
batch = []
# Insert remaining rows
if batch:
session.execute(insert(model), batch)
session.commit()
# Usage
load_csv_bulk(
session,
'million_users.csv',
User,
batch_size=1000,
columns={'user_name': 'username', 'user_email': 'email'} # CSV -> model mapping
)
Decision Tree: Which Method Should You Use?
Need to insert bulk data?
│
├─ < 1,000 rows
│ └─ Use regular session.add_all() — simplicity wins
│
├─ 1,000 - 100,000 rows
│ ├─ Using async (FastAPI, etc.)? → AsyncSession + session.execute(insert(Model), data)
│ ├─ Using SQLAlchemy 2.0 ORM? → session.execute(insert(Model), data) [recommended]
│ ├─ Have ORM objects already? → bulk_save_objects() [legacy]
│ └─ Have dictionaries/raw data?→ bulk_insert_mappings() [legacy]
│
├─ 100,000 - 1,000,000 rows
│ └─ Use Core insert().values() with batching (chunk_size=500-1000)
│
└─ > 1,000,000 rows (PostgreSQL)
└─ Use COPY command — nothing else comes close
Summary
- SQLAlchemy 2.0 recommended: Use
session.execute(insert(Model), data)— ~20x faster thansession.add()and works with bothSessionandAsyncSession - Async support:
AsyncSessionwithawait session.execute(insert(Model), data)for asyncio-based applications (FastAPI, etc.) - For maximum SQLAlchemy speed: Use Core
insert().values()— 40x faster - For truly massive data: Use PostgreSQL COPY — 240x faster
- Always batch in chunks of 500–1,000 rows: Avoids driver parameter limits and allows resumable imports
- Disable autoflush: Use
session.no_autoflushorsession.autoflush = Falseduring bulk operations - Legacy APIs still work:
bulk_insert_mappings()andbulk_save_objects()remain functional in SQLAlchemy 2.0 but are not recommended for new code - Don't expect ORM features: Bulk methods bypass relationships and Python defaults
The performance difference between methods can mean the difference between a 2-hour import and a 30-second one. Choose wisely based on your data volume and requirements.