django.moscow
УслугиЦеныПроцессОбучениеБлог
Связаться

django.moscow

Поддержка и развитие Django-сайтов с гарантиями. 15+ лет опыта в веб-разработке.

Услуги

  • Аудит и стабилизация
  • Поддержка и SLA
  • Оптимизация
  • Развитие и фичи

Информация

  • Цены
  • Процесс работы
  • Блог
  • Контакты
  • Реквизиты

Контакты

  • constantin@potapov.me
  • @potapov_me
Политика конфиденциальности•Согласие на обработку ПД

© 2025 django.moscow. Все права защищены.

Сделано с ❤️ в России • potapov.me

  1. /
  2. Блог
  3. /
  4. Безопасные миграции БД в Django без даунтайма
База данных
25 ноября 2025
22 мин

Безопасные миграции БД в Django без даунтайма

Безопасные миграции БД в Django без даунтайма

TL;DR

Zero-downtime миграции — это не магия, а дисциплина:

  • Backwards compatibility: Всегда делайте изменения совместимыми со старым кодом
  • 3-step pattern: Добавить → Мигрировать → Удалить (через неделю)
  • CONCURRENTLY: Индексы и constraint'ы создавайте без блокировок
  • Batching: Data migrations разбивайте на чанки по 1000-10000 строк
  • Test on prod copy: Обязательно тестируйте на копии production БД

Результат: Миграции любой сложности без простоя и с возможностью отката.


Введение

Миграции базы данных — одна из самых рискованных операций в production. Одна неправильная миграция может положить весь сервис на часы.

Статистика:

60% критических инцидентов в Django-проектах связаны с миграциями БД. Средний даунтайм при проблемной миграции: 2-4 часа. — PostmortemDB 2024

Реальный пример:

# Простая миграция
ALTER TABLE posts ADD COLUMN views_count INTEGER DEFAULT 0 NOT NULL;

Что случилось:

  • Таблица: 50M строк
  • Время: 45 минут полной блокировки
  • Результат: Все запросы к posts висели → 502 errors
  • Потери: $150K в revenue

В этой статье — проверенные паттерны для безопасных миграций.


Database Locking 101

Типы блокировок в PostgreSQL

Lock LevelРазрешаетБлокируетПример
ACCESS SHARESELECTDROP TABLESELECT
ROW SHARESELECT, INSERTALTER TABLEINSERT
ROW EXCLUSIVESELECT, INSERT, UPDATE, DELETEALTER TABLEUPDATE
SHARE UPDATE EXCLUSIVESELECT, INSERT, UPDATE, DELETEALTER TABLE, CREATE INDEXVacuum
SHARESELECTINSERT, UPDATE, DELETE, ALTERCREATE INDEX
EXCLUSIVESELECTВсё кроме SELECTНекоторые ALTER TABLE
ACCESS EXCLUSIVEНичегоВСЁDROP TABLE, некоторые ALTER

Опасные операции (ACCESS EXCLUSIVE)

-- ❌ БЛОКИРУЕТ ВСЮ ТАБЛИЦУ
ALTER TABLE users ADD COLUMN email VARCHAR(255) NOT NULL;
ALTER TABLE users ALTER COLUMN status TYPE INTEGER;
ALTER TABLE users DROP COLUMN old_field;
CREATE INDEX ON users(email);  -- Без CONCURRENTLY

Последствия:

  • Все SELECT/INSERT/UPDATE/DELETE висят
  • Connection pool заполняется
  • Application timeout → 502/504 errors
  • Cascade блокировки на связанные таблицы

Безопасные операции

-- ✅ Минимальные блокировки
ALTER TABLE users ADD COLUMN email VARCHAR(255);  -- NULL allowed
CREATE INDEX CONCURRENTLY idx_users_email ON users(email);
ALTER TABLE users ADD CONSTRAINT check_email NOT VALID CHECK (email IS NOT NULL);

Золотое правило: Backwards Compatibility

Правило: Новый код должен работать со старой схемой БД, старый код — с новой.

Пример проблемы

# 0042_rename_field.py
operations = [
    migrations.RenameField('User', 'name', 'full_name'),
]

Деплой процесс:

  1. Применяется миграция: name → full_name
  2. Новый код деплоится на серверы (rolling deployment)
  3. Проблема: Старые серверы (50% fleet) ещё обращаются к user.name → AttributeError

Результат: 50% requests падают до завершения деплоя.

Правильное решение: Expand-Migrate-Contract

Expand (Добавить):

# 0042_add_full_name.py
operations = [
    migrations.AddField(
        model_name='user',
        name='full_name',
        field=models.CharField(max_length=200, null=True),
    ),
]

# models.py
class User(models.Model):
    name = models.CharField(max_length=200)  # Старое поле
    full_name = models.CharField(max_length=200, null=True)  # Новое

    def save(self, *args, **kwargs):
        # Синхронизация во время перехода
        if not self.full_name:
            self.full_name = self.name
        super().save(*args, **kwargs)

Migrate (Мигрировать данные):

# 0043_copy_name_to_full_name.py
from django.db.models import F

def copy_data(apps, schema_editor):
    User = apps.get_model('auth', 'User')
    User.objects.filter(full_name__isnull=True).update(
        full_name=F('name')
    )

class Migration(migrations.Migration):
    dependencies = [('auth', '0042_add_full_name')]

    operations = [
        migrations.RunPython(copy_data, migrations.RunPython.noop),
    ]

Contract (Удалить старое) — через 1-2 недели:

# 0044_remove_name.py
operations = [
    migrations.RemoveField(
        model_name='user',
        name='name',
    ),
]

Почему ждать 1-2 недели:

  • Возможность быстрого отката
  • Проверка работы в production
  • Нет давления времени

Паттерн: Добавление NOT NULL constraint

Проблема

# ❌ ОПАСНО
operations = [
    migrations.AlterField(
        model_name='user',
        name='email',
        field=models.EmailField(null=False),  # Блокировка!
    ),
]

SQL под капотом:

ALTER TABLE users ALTER COLUMN email SET NOT NULL;
-- ACCESS EXCLUSIVE lock!
-- На 10M таблице: 5-10 минут блокировки

Решение: 4-шаговый процесс

Шаг 1: Убедиться, что нет NULL'ов

# 0045_fill_null_emails.py
def fill_nulls(apps, schema_editor):
    User = apps.get_model('auth', 'User')

    # Заполнить NULL значения
    User.objects.filter(email__isnull=True).update(
        email=Concat(Value('user'), F('id'), Value('@placeholder.com'))
    )

class Migration(migrations.Migration):
    operations = [
        migrations.RunPython(fill_nulls, migrations.RunPython.noop),
    ]

Шаг 2: Добавить CHECK constraint с NOT VALID (PostgreSQL)

# 0046_add_email_check_not_valid.py
from django.contrib.postgres.operations import AddConstraintNotValid
from django.db.models import CheckConstraint, Q

class Migration(migrations.Migration):
    operations = [
        AddConstraintNotValid(
            model_name='user',
            constraint=CheckConstraint(
                check=Q(email__isnull=False),
                name='user_email_not_null',
            ),
        ),
    ]

SQL под капотом:

ALTER TABLE users
ADD CONSTRAINT user_email_not_null
CHECK (email IS NOT NULL) NOT VALID;
-- Мгновенно! Не проверяет существующие строки
-- Только новые INSERT/UPDATE будут проверяться

Шаг 3: Validate constraint (можно долго)

# 0047_validate_email_check.py
from django.contrib.postgres.operations import ValidateConstraint

class Migration(migrations.Migration):
    operations = [
        ValidateConstraint(
            model_name='user',
            name='user_email_not_null',
        ),
    ]

SQL под капотом:

ALTER TABLE users VALIDATE CONSTRAINT user_email_not_null;
-- SHARE UPDATE EXCLUSIVE lock (разрешает SELECT/INSERT/UPDATE/DELETE)
-- Сканирует таблицу, но не блокирует writes

Шаг 4: Установить NOT NULL на уровне БД (опционально)

# 0048_set_email_not_null.py
operations = [
    migrations.AlterField(
        model_name='user',
        name='email',
        field=models.EmailField(null=False),
    ),
]

Почему 4 шага безопаснее:

  • Шаг 1: Data migration — можно прервать
  • Шаг 2: Мгновенно
  • Шаг 3: Долго, но не блокирует writes
  • Шаг 4: Быстро (constraint уже проверен)

Паттерн: CREATE INDEX CONCURRENTLY

Проблема

# ❌ БЛОКИРУЕТ таблицу на минуты
class Migration(migrations.Migration):
    operations = [
        migrations.AddIndex(
            model_name='post',
            index=models.Index(fields=['created_at'], name='post_created_idx'),
        ),
    ]

SQL под капотом:

CREATE INDEX post_created_idx ON posts(created_at);
-- SHARE lock: блокирует INSERT/UPDATE/DELETE
-- На 50M таблице: 10-15 минут

Решение: CONCURRENTLY

# 0049_add_created_index_concurrent.py
from django.contrib.postgres.operations import AddIndexConcurrently
from django.db import models

class Migration(migrations.Migration):
    atomic = False  # КРИТИЧНО! CONCURRENTLY не работает в транзакции

    operations = [
        AddIndexConcurrently(
            model_name='post',
            index=models.Index(fields=['created_at'], name='post_created_idx'),
        ),
    ]

SQL под капотом:

CREATE INDEX CONCURRENTLY post_created_idx ON posts(created_at);
-- Минимальная блокировка
-- На 50M таблице: 20 минут, но НЕ блокирует writes

Важные детали CONCURRENTLY

Плюсы:

  • ✅ Не блокирует SELECT/INSERT/UPDATE/DELETE
  • ✅ Production-friendly

Минусы:

  • ❌ Дольше обычного CREATE INDEX (~2x времени)
  • ❌ Нужно atomic = False (нет rollback)
  • ❌ Если упадёт — останется INVALID index

Проверка состояния индекса:

SELECT
    schemaname, tablename, indexname,
    pg_size_pretty(pg_relation_size(indexrelid)) as size,
    idx_scan,
    indisvalid  -- TRUE если индекс валиден
FROM pg_indexes
JOIN pg_stat_user_indexes USING (indexname)
JOIN pg_index ON indexrelid = indexrelid
WHERE indexname = 'post_created_idx';

Удаление INVALID индекса:

DROP INDEX CONCURRENTLY post_created_idx;

Паттерн: Изменение типа колонки

Проблема

# ❌ ПОЛНАЯ БЛОКИРОВКА
operations = [
    migrations.AlterField(
        model_name='product',
        name='price',
        field=models.DecimalField(max_digits=10, decimal_places=2),
    ),
]

SQL под капотом:

ALTER TABLE products ALTER COLUMN price TYPE NUMERIC(10,2);
-- ACCESS EXCLUSIVE lock
-- Переписывает ВСЮ таблицу
-- 100M строк: 30+ минут полной блокировки

Решение: Expand-Migrate-Contract

Шаг 1: Добавить новую колонку

# 0050_add_price_decimal.py
operations = [
    migrations.AddField(
        model_name='product',
        name='price_new',
        field=models.DecimalField(
            max_digits=10,
            decimal_places=2,
            null=True
        ),
    ),
]

Шаг 2: Копировать данные батчами

# 0051_copy_price_to_decimal.py
import time

def copy_in_batches(apps, schema_editor):
    Product = apps.get_model('shop', 'Product')

    batch_size = 10000
    last_id = 0

    while True:
        # Найти следующий batch
        batch = Product.objects.filter(
            id__gt=last_id,
            price__isnull=False,
            price_new__isnull=True
        ).order_by('id')[:batch_size]

        batch_list = list(batch)
        if not batch_list:
            break

        # Обновить batch
        for product in batch_list:
            product.price_new = Decimal(str(product.price))

        Product.objects.bulk_update(batch_list, ['price_new'], batch_size=batch_size)

        last_id = batch_list[-1].id
        print(f"Migrated up to ID {last_id}")

        # Пауза для снижения load
        time.sleep(0.1)

class Migration(migrations.Migration):
    dependencies = [('shop', '0050_add_price_decimal')]
    operations = [
        migrations.RunPython(copy_in_batches, migrations.RunPython.noop),
    ]

Шаг 3: Обновить код

# models.py
class Product(models.Model):
    price = models.IntegerField(null=True)  # Deprecated
    price_new = models.DecimalField(max_digits=10, decimal_places=2)

    def save(self, *args, **kwargs):
        # Синхронизация во время transition
        if self.price_new is None and self.price is not None:
            self.price_new = Decimal(str(self.price))
        super().save(*args, **kwargs)

Деплой → Тестирование в production (1-2 недели)

Шаг 4: Swap колонок

# 0052_swap_price_columns.py
operations = [
    migrations.RemoveField('Product', 'price'),
    migrations.RenameField('Product', 'price_new', 'price'),
]

Результат: Миграция без блокировок, с возможностью отката на любом этапе.


Паттерн: Большие Data Migrations

Проблема

# ❌ ПЛОХО: Одной транзакцией
def migrate_all(apps, schema_editor):
    User = apps.get_model('auth', 'User')

    for user in User.objects.all():  # 10M users!
        user.email = user.email.lower()
        user.save()
    # Timeout! Lock escalation! OOM!

Решение: Batching с progress tracking

# 0053_normalize_emails_batched.py
from django.db.models import F
import time
from django.core.cache import cache

def migrate_in_batches(apps, schema_editor):
    User = apps.get_model('auth', 'User')

    batch_size = 5000
    last_id = cache.get('migration_0053_last_id', 0)
    total = User.objects.count()
    processed = 0

    while True:
        # Получить batch
        user_ids = list(
            User.objects
            .filter(id__gt=last_id)
            .order_by('id')
            .values_list('id', flat=True)[:batch_size]
        )

        if not user_ids:
            break

        # Обновить batch атомарно
        User.objects.filter(id__in=user_ids).update(
            email=Lower(F('email'))
        )

        last_id = user_ids[-1]
        processed += len(user_ids)

        # Сохранить прогресс для возможности resume
        cache.set('migration_0053_last_id', last_id, timeout=86400)

        print(f"Progress: {processed}/{total} ({processed/total*100:.1f}%)")

        # Пауза для снижения load
        time.sleep(0.5)

    # Очистить cache после завершения
    cache.delete('migration_0053_last_id')

class Migration(migrations.Migration):
    dependencies = [('auth', '0052_swap_price_columns')]

    operations = [
        migrations.RunPython(
            migrate_in_batches,
            reverse_code=migrations.RunPython.noop,
        ),
    ]

Альтернатива: Celery background task

# migrations/0053_email_normalize_trigger.py
class Migration(migrations.Migration):
    operations = []  # Пустая миграция

# tasks.py
@shared_task
def migrate_emails_background():
    from django.apps import apps
    User = apps.get_model('auth', 'User')

    # То же самое, что выше
    # Но запускается вручную: migrate_emails_background.delay()

# management/commands/migrate_emails.py
class Command(BaseCommand):
    help = 'Migrate emails to lowercase'

    def handle(self, *args, **options):
        from myapp.tasks import migrate_emails_background
        migrate_emails_background.delay()
        self.stdout.write('Migration task queued')

Плюсы Celery подхода:

  • ✅ Не блокирует деплой
  • ✅ Можно мониторить в Flower
  • ✅ Retry механизм из коробки
  • ✅ Rate limiting

Паттерн: Безопасное удаление колонки

Проблема

# ❌ ОПАСНО
operations = [
    migrations.RemoveField('User', 'old_field'),
]

SQL:

ALTER TABLE users DROP COLUMN old_field;
-- ACCESS EXCLUSIVE lock
-- Если старый код обращается к полю → AttributeError

Решение: SeparateDatabaseAndState

Шаг 1: Удалить из Django state, оставить в БД

# 0054_remove_old_field_from_state.py
class Migration(migrations.Migration):
    operations = [
        migrations.SeparateDatabaseAndState(
            state_operations=[
                # Django думает, что поле удалено
                migrations.RemoveField('User', 'old_field'),
            ],
            database_operations=[
                # НО в БД ничего не делаем!
            ],
        ),
    ]

Код:

# models.py
class User(models.Model):
    # old_field удалено из модели
    pass

Деплой → Если нужен rollback → просто откатить код (поле ещё в БД!)

Шаг 2: Через 1-2 недели — удалить физически

# 0055_drop_old_field_from_db.py
class Migration(migrations.Migration):
    operations = [
        migrations.SeparateDatabaseAndState(
            state_operations=[],  # State уже обновлён
            database_operations=[
                migrations.RunSQL(
                    "ALTER TABLE users DROP COLUMN IF EXISTS old_field;",
                    reverse_sql=migrations.RunSQL.noop,
                ),
            ],
        ),
    ]

Мониторинг и Alerts

Что мониторить во время миграции

# middleware.py
import time
from django.db import connection

class MigrationMonitoringMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        start = time.time()

        response = self.get_response(request)

        duration = time.time() - start

        # Alert если запросы тормозят
        if duration > 5:  # 5 seconds
            logger.warning(
                f"Slow request: {request.path} took {duration:.2f}s",
                extra={
                    'duration': duration,
                    'path': request.path,
                    'db_queries': len(connection.queries),
                }
            )

        return response

PostgreSQL queries для мониторинга

-- Активные блокировки
SELECT
    pid,
    usename,
    pg_blocking_pids(pid) as blocked_by,
    query,
    state,
    wait_event_type,
    wait_event
FROM pg_stat_activity
WHERE state != 'idle'
AND pid != pg_backend_pid()
ORDER BY state_change;

-- Долгие запросы (>30s)
SELECT
    pid,
    now() - query_start as duration,
    query,
    state
FROM pg_stat_activity
WHERE state != 'idle'
AND now() - query_start > interval '30 seconds'
ORDER BY duration DESC;

-- Прогресс VACUUM/CREATE INDEX
SELECT * FROM pg_stat_progress_create_index;
SELECT * FROM pg_stat_progress_vacuum;

Sentry integration

# settings.py
import sentry_sdk

sentry_sdk.init(
    dsn="your-dsn",
    traces_sample_rate=1.0,  # 100% во время миграции
)

# Перед миграцией
sentry_sdk.capture_message(
    "Starting migration 0053",
    level="info",
    extras={'migration': '0053', 'table': 'users'}
)

Testing Migrations

Unit тесты

# tests/test_migrations.py
from django.test import TestCase
from django.core.management import call_command

class TestMigration0053(TestCase):
    """Test email normalization migration"""

    def test_backwards_compatible(self):
        # 1. Откатиться
        call_command('migrate', 'auth', '0052')

        # 2. Создать test data
        from django.contrib.auth.models import User
        user = User.objects.create(username='test', email='Test@Example.COM')

        # 3. Применить миграцию
        call_command('migrate', 'auth', '0053')

        # 4. Проверить результат
        user.refresh_from_db()
        self.assertEqual(user.email, 'test@example.com')

    def test_rollback(self):
        # Проверить, что rollback работает
        call_command('migrate', 'auth', '0053')
        call_command('migrate', 'auth', '0052')
        # Не должно быть ошибок

Load testing

# locustfile.py
from locust import HttpUser, task, between

class ProductUser(HttpUser):
    wait_time = between(0.5, 1.5)

    @task
    def view_product(self):
        self.client.get("/api/products/1/")

    @task
    def create_order(self):
        self.client.post("/api/orders/", json={
            "product_id": 1,
            "quantity": 1,
        })

# Запустить во время миграции
# locust -f locustfile.py --users 1000 --spawn-rate 10

Production Checklist

Перед миграцией

  • ✅ Backup БД (automated + manual snapshot)
  • ✅ Test на копии prod БД (с тем же объёмом данных)
  • ✅ Estimate времени (EXPLAIN ANALYZE)
  • ✅ План отката (можно ли откатить? Как быстро?)
  • ✅ Communication план (кого уведомить? Когда?)
  • ✅ Low-traffic window (ночь/выходные если возможно)
  • ✅ Monitoring готов (Sentry, Datadog, dashboards)
  • ✅ Runbook (что делать если что-то пойдёт не так)

Во время миграции

  • ✅ Monitor метрики: Response time, error rate, DB connections
  • ✅ Watch locks: SELECT * FROM pg_locks WHERE NOT granted
  • ✅ Track progress: Логи миграции, pg_stat_progress_*
  • ✅ Ready to rollback: Держите палец на кнопке

После миграции

  • ✅ Verify data integrity: Spot checks, count checks
  • ✅ Monitor for 24h: Нет ли side effects
  • ✅ Update runbooks: Что узнали нового
  • ✅ Postmortem (если были проблемы)

Инструменты

django-migration-linter

pip install django-migration-linter

# Проверить все миграции
python manage.py lintmigrations

# Автоматически найдёт:
# - Небезопасные ADD COLUMN NOT NULL
# - DROP COLUMN без предупреждения
# - CREATE INDEX без CONCURRENTLY
# - ALTER TYPE на больших таблицах

В CI/CD:

# .github/workflows/migrations.yml
- name: Lint migrations
  run: |
    python manage.py lintmigrations --warnings-as-errors

django-zero-downtime-migrations

pip install django-zero-downtime-migrations

# settings.py
INSTALLED_APPS = [
    'django_zero_downtime_migrations',
    # ...
]

Автоматически превращает небезопасные миграции в безопасные.

squashmigrations

# Объединить миграции 0001-0050
python manage.py squashmigrations app_name 0050

# Результат:
# - Старые БД: применяют все 50 миграций
# - Новые БД: одна squashed миграция

Deployment Strategies

Blue-Green Deployment

1. [Blue] Old app + Old schema
2. Apply backwards-compatible migration
3. [Green] New app + New schema (совместимо с Blue)
4. Health checks на Green
5. Switch traffic: Blue → Green
6. [Optional] Apply breaking changes

Плюсы:

  • ✅ Мгновенный rollback (switch обратно)
  • ✅ Zero downtime
  • ✅ Full testing в prod before switch

Минусы:

  • ❌ Двойные ресурсы нужны
  • ❌ Сложнее в настройке

Rolling Deployment с миграциями

1. Миграция (backwards-compatible)
2. Deploy 10% серверов
3. Health checks + мониторинг 15 мин
4. Deploy 50% серверов
5. Health checks + мониторинг 30 мин
6. Deploy 100%
7. [Опционально] Breaking changes через неделю

Заключение

Ключевые принципы:

  1. Backwards Compatibility: Новый код работает со старой БД, старый код с новой БД
  2. Expand-Migrate-Contract: Добавить → Мигрировать → Удалить
  3. CONCURRENTLY: Индексы и constraints без блокировок
  4. Batching: Data migrations чанками по 1K-10K
  5. Test on Prod Copy: Всегда тестируйте на реальных данных
  6. Have Rollback Plan: Должна быть кнопка "отмена"

Помните:

"Лучшая миграция — та, которую можно откатить одной командой"

Порядок действий:

  1. ✅ Написать миграцию
  2. ✅ Lint проверка (django-migration-linter)
  3. ✅ Test на копии prod БД
  4. ✅ Code review с фокусом на rollback plan
  5. ✅ Deploy в low-traffic window
  6. ✅ Monitor 24h
  7. ✅ Cleanup через неделю

Следуя этим принципам, вы сможете делать миграции любой сложности без риска простоя!

Нужна помощь с Django?

Берём проекты на поддержку с чётким SLA. Стабилизируем за 2 недели, даём план развития на 90 дней. 15+ лет опыта с Django.

Обсудить проектНаши услуги
Предыдущая статья
Мониторинг

Мониторинг Django с Sentry и Prometheus

Следующая статья
Тестирование

Тестирование Django: от юнитов до production

•