
Zero-downtime миграции — это не магия, а дисциплина:
Результат: Миграции любой сложности без простоя и с возможностью отката.
Миграции базы данных — одна из самых рискованных операций в production. Одна неправильная миграция может положить весь сервис на часы.
Статистика:
60% критических инцидентов в Django-проектах связаны с миграциями БД. Средний даунтайм при проблемной миграции: 2-4 часа. — PostmortemDB 2024
Реальный пример:
# Простая миграция
ALTER TABLE posts ADD COLUMN views_count INTEGER DEFAULT 0 NOT NULL;
Что случилось:
posts висели → 502 errorsВ этой статье — проверенные паттерны для безопасных миграций.
| Lock Level | Разрешает | Блокирует | Пример |
|---|---|---|---|
| ACCESS SHARE | SELECT | DROP TABLE | SELECT |
| ROW SHARE | SELECT, INSERT | ALTER TABLE | INSERT |
| ROW EXCLUSIVE | SELECT, INSERT, UPDATE, DELETE | ALTER TABLE | UPDATE |
| SHARE UPDATE EXCLUSIVE | SELECT, INSERT, UPDATE, DELETE | ALTER TABLE, CREATE INDEX | Vacuum |
| SHARE | SELECT | INSERT, UPDATE, DELETE, ALTER | CREATE INDEX |
| EXCLUSIVE | SELECT | Всё кроме SELECT | Некоторые ALTER TABLE |
| ACCESS EXCLUSIVE | Ничего | ВСЁ | DROP TABLE, некоторые ALTER |
-- ❌ БЛОКИРУЕТ ВСЮ ТАБЛИЦУ
ALTER TABLE users ADD COLUMN email VARCHAR(255) NOT NULL;
ALTER TABLE users ALTER COLUMN status TYPE INTEGER;
ALTER TABLE users DROP COLUMN old_field;
CREATE INDEX ON users(email); -- Без CONCURRENTLY
Последствия:
-- ✅ Минимальные блокировки
ALTER TABLE users ADD COLUMN email VARCHAR(255); -- NULL allowed
CREATE INDEX CONCURRENTLY idx_users_email ON users(email);
ALTER TABLE users ADD CONSTRAINT check_email NOT VALID CHECK (email IS NOT NULL);
Правило: Новый код должен работать со старой схемой БД, старый код — с новой.
# 0042_rename_field.py
operations = [
migrations.RenameField('User', 'name', 'full_name'),
]
Деплой процесс:
name → full_nameuser.name → AttributeErrorРезультат: 50% requests падают до завершения деплоя.
Expand (Добавить):
# 0042_add_full_name.py
operations = [
migrations.AddField(
model_name='user',
name='full_name',
field=models.CharField(max_length=200, null=True),
),
]
# models.py
class User(models.Model):
name = models.CharField(max_length=200) # Старое поле
full_name = models.CharField(max_length=200, null=True) # Новое
def save(self, *args, **kwargs):
# Синхронизация во время перехода
if not self.full_name:
self.full_name = self.name
super().save(*args, **kwargs)
Migrate (Мигрировать данные):
# 0043_copy_name_to_full_name.py
from django.db.models import F
def copy_data(apps, schema_editor):
User = apps.get_model('auth', 'User')
User.objects.filter(full_name__isnull=True).update(
full_name=F('name')
)
class Migration(migrations.Migration):
dependencies = [('auth', '0042_add_full_name')]
operations = [
migrations.RunPython(copy_data, migrations.RunPython.noop),
]
Contract (Удалить старое) — через 1-2 недели:
# 0044_remove_name.py
operations = [
migrations.RemoveField(
model_name='user',
name='name',
),
]
Почему ждать 1-2 недели:
# ❌ ОПАСНО
operations = [
migrations.AlterField(
model_name='user',
name='email',
field=models.EmailField(null=False), # Блокировка!
),
]
SQL под капотом:
ALTER TABLE users ALTER COLUMN email SET NOT NULL;
-- ACCESS EXCLUSIVE lock!
-- На 10M таблице: 5-10 минут блокировки
Шаг 1: Убедиться, что нет NULL'ов
# 0045_fill_null_emails.py
def fill_nulls(apps, schema_editor):
User = apps.get_model('auth', 'User')
# Заполнить NULL значения
User.objects.filter(email__isnull=True).update(
email=Concat(Value('user'), F('id'), Value('@placeholder.com'))
)
class Migration(migrations.Migration):
operations = [
migrations.RunPython(fill_nulls, migrations.RunPython.noop),
]
Шаг 2: Добавить CHECK constraint с NOT VALID (PostgreSQL)
# 0046_add_email_check_not_valid.py
from django.contrib.postgres.operations import AddConstraintNotValid
from django.db.models import CheckConstraint, Q
class Migration(migrations.Migration):
operations = [
AddConstraintNotValid(
model_name='user',
constraint=CheckConstraint(
check=Q(email__isnull=False),
name='user_email_not_null',
),
),
]
SQL под капотом:
ALTER TABLE users
ADD CONSTRAINT user_email_not_null
CHECK (email IS NOT NULL) NOT VALID;
-- Мгновенно! Не проверяет существующие строки
-- Только новые INSERT/UPDATE будут проверяться
Шаг 3: Validate constraint (можно долго)
# 0047_validate_email_check.py
from django.contrib.postgres.operations import ValidateConstraint
class Migration(migrations.Migration):
operations = [
ValidateConstraint(
model_name='user',
name='user_email_not_null',
),
]
SQL под капотом:
ALTER TABLE users VALIDATE CONSTRAINT user_email_not_null;
-- SHARE UPDATE EXCLUSIVE lock (разрешает SELECT/INSERT/UPDATE/DELETE)
-- Сканирует таблицу, но не блокирует writes
Шаг 4: Установить NOT NULL на уровне БД (опционально)
# 0048_set_email_not_null.py
operations = [
migrations.AlterField(
model_name='user',
name='email',
field=models.EmailField(null=False),
),
]
Почему 4 шага безопаснее:
# ❌ БЛОКИРУЕТ таблицу на минуты
class Migration(migrations.Migration):
operations = [
migrations.AddIndex(
model_name='post',
index=models.Index(fields=['created_at'], name='post_created_idx'),
),
]
SQL под капотом:
CREATE INDEX post_created_idx ON posts(created_at);
-- SHARE lock: блокирует INSERT/UPDATE/DELETE
-- На 50M таблице: 10-15 минут
# 0049_add_created_index_concurrent.py
from django.contrib.postgres.operations import AddIndexConcurrently
from django.db import models
class Migration(migrations.Migration):
atomic = False # КРИТИЧНО! CONCURRENTLY не работает в транзакции
operations = [
AddIndexConcurrently(
model_name='post',
index=models.Index(fields=['created_at'], name='post_created_idx'),
),
]
SQL под капотом:
CREATE INDEX CONCURRENTLY post_created_idx ON posts(created_at);
-- Минимальная блокировка
-- На 50M таблице: 20 минут, но НЕ блокирует writes
Плюсы:
Минусы:
atomic = False (нет rollback)Проверка состояния индекса:
SELECT
schemaname, tablename, indexname,
pg_size_pretty(pg_relation_size(indexrelid)) as size,
idx_scan,
indisvalid -- TRUE если индекс валиден
FROM pg_indexes
JOIN pg_stat_user_indexes USING (indexname)
JOIN pg_index ON indexrelid = indexrelid
WHERE indexname = 'post_created_idx';
Удаление INVALID индекса:
DROP INDEX CONCURRENTLY post_created_idx;
# ❌ ПОЛНАЯ БЛОКИРОВКА
operations = [
migrations.AlterField(
model_name='product',
name='price',
field=models.DecimalField(max_digits=10, decimal_places=2),
),
]
SQL под капотом:
ALTER TABLE products ALTER COLUMN price TYPE NUMERIC(10,2);
-- ACCESS EXCLUSIVE lock
-- Переписывает ВСЮ таблицу
-- 100M строк: 30+ минут полной блокировки
Шаг 1: Добавить новую колонку
# 0050_add_price_decimal.py
operations = [
migrations.AddField(
model_name='product',
name='price_new',
field=models.DecimalField(
max_digits=10,
decimal_places=2,
null=True
),
),
]
Шаг 2: Копировать данные батчами
# 0051_copy_price_to_decimal.py
import time
def copy_in_batches(apps, schema_editor):
Product = apps.get_model('shop', 'Product')
batch_size = 10000
last_id = 0
while True:
# Найти следующий batch
batch = Product.objects.filter(
id__gt=last_id,
price__isnull=False,
price_new__isnull=True
).order_by('id')[:batch_size]
batch_list = list(batch)
if not batch_list:
break
# Обновить batch
for product in batch_list:
product.price_new = Decimal(str(product.price))
Product.objects.bulk_update(batch_list, ['price_new'], batch_size=batch_size)
last_id = batch_list[-1].id
print(f"Migrated up to ID {last_id}")
# Пауза для снижения load
time.sleep(0.1)
class Migration(migrations.Migration):
dependencies = [('shop', '0050_add_price_decimal')]
operations = [
migrations.RunPython(copy_in_batches, migrations.RunPython.noop),
]
Шаг 3: Обновить код
# models.py
class Product(models.Model):
price = models.IntegerField(null=True) # Deprecated
price_new = models.DecimalField(max_digits=10, decimal_places=2)
def save(self, *args, **kwargs):
# Синхронизация во время transition
if self.price_new is None and self.price is not None:
self.price_new = Decimal(str(self.price))
super().save(*args, **kwargs)
Деплой → Тестирование в production (1-2 недели)
Шаг 4: Swap колонок
# 0052_swap_price_columns.py
operations = [
migrations.RemoveField('Product', 'price'),
migrations.RenameField('Product', 'price_new', 'price'),
]
Результат: Миграция без блокировок, с возможностью отката на любом этапе.
# ❌ ПЛОХО: Одной транзакцией
def migrate_all(apps, schema_editor):
User = apps.get_model('auth', 'User')
for user in User.objects.all(): # 10M users!
user.email = user.email.lower()
user.save()
# Timeout! Lock escalation! OOM!
# 0053_normalize_emails_batched.py
from django.db.models import F
import time
from django.core.cache import cache
def migrate_in_batches(apps, schema_editor):
User = apps.get_model('auth', 'User')
batch_size = 5000
last_id = cache.get('migration_0053_last_id', 0)
total = User.objects.count()
processed = 0
while True:
# Получить batch
user_ids = list(
User.objects
.filter(id__gt=last_id)
.order_by('id')
.values_list('id', flat=True)[:batch_size]
)
if not user_ids:
break
# Обновить batch атомарно
User.objects.filter(id__in=user_ids).update(
email=Lower(F('email'))
)
last_id = user_ids[-1]
processed += len(user_ids)
# Сохранить прогресс для возможности resume
cache.set('migration_0053_last_id', last_id, timeout=86400)
print(f"Progress: {processed}/{total} ({processed/total*100:.1f}%)")
# Пауза для снижения load
time.sleep(0.5)
# Очистить cache после завершения
cache.delete('migration_0053_last_id')
class Migration(migrations.Migration):
dependencies = [('auth', '0052_swap_price_columns')]
operations = [
migrations.RunPython(
migrate_in_batches,
reverse_code=migrations.RunPython.noop,
),
]
# migrations/0053_email_normalize_trigger.py
class Migration(migrations.Migration):
operations = [] # Пустая миграция
# tasks.py
@shared_task
def migrate_emails_background():
from django.apps import apps
User = apps.get_model('auth', 'User')
# То же самое, что выше
# Но запускается вручную: migrate_emails_background.delay()
# management/commands/migrate_emails.py
class Command(BaseCommand):
help = 'Migrate emails to lowercase'
def handle(self, *args, **options):
from myapp.tasks import migrate_emails_background
migrate_emails_background.delay()
self.stdout.write('Migration task queued')
Плюсы Celery подхода:
# ❌ ОПАСНО
operations = [
migrations.RemoveField('User', 'old_field'),
]
SQL:
ALTER TABLE users DROP COLUMN old_field;
-- ACCESS EXCLUSIVE lock
-- Если старый код обращается к полю → AttributeError
Шаг 1: Удалить из Django state, оставить в БД
# 0054_remove_old_field_from_state.py
class Migration(migrations.Migration):
operations = [
migrations.SeparateDatabaseAndState(
state_operations=[
# Django думает, что поле удалено
migrations.RemoveField('User', 'old_field'),
],
database_operations=[
# НО в БД ничего не делаем!
],
),
]
Код:
# models.py
class User(models.Model):
# old_field удалено из модели
pass
Деплой → Если нужен rollback → просто откатить код (поле ещё в БД!)
Шаг 2: Через 1-2 недели — удалить физически
# 0055_drop_old_field_from_db.py
class Migration(migrations.Migration):
operations = [
migrations.SeparateDatabaseAndState(
state_operations=[], # State уже обновлён
database_operations=[
migrations.RunSQL(
"ALTER TABLE users DROP COLUMN IF EXISTS old_field;",
reverse_sql=migrations.RunSQL.noop,
),
],
),
]
# middleware.py
import time
from django.db import connection
class MigrationMonitoringMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
start = time.time()
response = self.get_response(request)
duration = time.time() - start
# Alert если запросы тормозят
if duration > 5: # 5 seconds
logger.warning(
f"Slow request: {request.path} took {duration:.2f}s",
extra={
'duration': duration,
'path': request.path,
'db_queries': len(connection.queries),
}
)
return response
-- Активные блокировки
SELECT
pid,
usename,
pg_blocking_pids(pid) as blocked_by,
query,
state,
wait_event_type,
wait_event
FROM pg_stat_activity
WHERE state != 'idle'
AND pid != pg_backend_pid()
ORDER BY state_change;
-- Долгие запросы (>30s)
SELECT
pid,
now() - query_start as duration,
query,
state
FROM pg_stat_activity
WHERE state != 'idle'
AND now() - query_start > interval '30 seconds'
ORDER BY duration DESC;
-- Прогресс VACUUM/CREATE INDEX
SELECT * FROM pg_stat_progress_create_index;
SELECT * FROM pg_stat_progress_vacuum;
# settings.py
import sentry_sdk
sentry_sdk.init(
dsn="your-dsn",
traces_sample_rate=1.0, # 100% во время миграции
)
# Перед миграцией
sentry_sdk.capture_message(
"Starting migration 0053",
level="info",
extras={'migration': '0053', 'table': 'users'}
)
# tests/test_migrations.py
from django.test import TestCase
from django.core.management import call_command
class TestMigration0053(TestCase):
"""Test email normalization migration"""
def test_backwards_compatible(self):
# 1. Откатиться
call_command('migrate', 'auth', '0052')
# 2. Создать test data
from django.contrib.auth.models import User
user = User.objects.create(username='test', email='Test@Example.COM')
# 3. Применить миграцию
call_command('migrate', 'auth', '0053')
# 4. Проверить результат
user.refresh_from_db()
self.assertEqual(user.email, 'test@example.com')
def test_rollback(self):
# Проверить, что rollback работает
call_command('migrate', 'auth', '0053')
call_command('migrate', 'auth', '0052')
# Не должно быть ошибок
# locustfile.py
from locust import HttpUser, task, between
class ProductUser(HttpUser):
wait_time = between(0.5, 1.5)
@task
def view_product(self):
self.client.get("/api/products/1/")
@task
def create_order(self):
self.client.post("/api/orders/", json={
"product_id": 1,
"quantity": 1,
})
# Запустить во время миграции
# locust -f locustfile.py --users 1000 --spawn-rate 10
EXPLAIN ANALYZE)SELECT * FROM pg_locks WHERE NOT grantedpip install django-migration-linter
# Проверить все миграции
python manage.py lintmigrations
# Автоматически найдёт:
# - Небезопасные ADD COLUMN NOT NULL
# - DROP COLUMN без предупреждения
# - CREATE INDEX без CONCURRENTLY
# - ALTER TYPE на больших таблицах
В CI/CD:
# .github/workflows/migrations.yml
- name: Lint migrations
run: |
python manage.py lintmigrations --warnings-as-errors
pip install django-zero-downtime-migrations
# settings.py
INSTALLED_APPS = [
'django_zero_downtime_migrations',
# ...
]
Автоматически превращает небезопасные миграции в безопасные.
# Объединить миграции 0001-0050
python manage.py squashmigrations app_name 0050
# Результат:
# - Старые БД: применяют все 50 миграций
# - Новые БД: одна squashed миграция
1. [Blue] Old app + Old schema
2. Apply backwards-compatible migration
3. [Green] New app + New schema (совместимо с Blue)
4. Health checks на Green
5. Switch traffic: Blue → Green
6. [Optional] Apply breaking changes
Плюсы:
Минусы:
1. Миграция (backwards-compatible)
2. Deploy 10% серверов
3. Health checks + мониторинг 15 мин
4. Deploy 50% серверов
5. Health checks + мониторинг 30 мин
6. Deploy 100%
7. [Опционально] Breaking changes через неделю
Ключевые принципы:
Помните:
"Лучшая миграция — та, которую можно откатить одной командой"
Порядок действий:
Следуя этим принципам, вы сможете делать миграции любой сложности без риска простоя!
Берём проекты на поддержку с чётким SLA. Стабилизируем за 2 недели, даём план развития на 90 дней. 15+ лет опыта с Django.