Source code for django.db.backends.base.schema

import logging
from datetime import datetime

from django.db.backends.ddl_references import (
    Columns, Expressions, ForeignKeyName, IndexName, Statement, Table,
)
from django.db.backends.utils import names_digest, split_identifier
from django.db.models import Deferrable, Index
from django.db.models.sql import Query
from django.db.transaction import TransactionManagementError, atomic
from django.utils import timezone

logger = logging.getLogger('django.db.backends.schema')


def _is_relevant_relation(relation, altered_field):
    """
    When altering the given field, must constraints on its model from the given
    relation be temporarily dropped?
    """
    field = relation.field
    if field.many_to_many:
        # M2M reverse field
        return False
    if altered_field.primary_key and field.to_fields == [None]:
        # Foreign key constraint on the primary key, which is being altered.
        return True
    # Is the constraint targeting the field being altered?
    return altered_field.name in field.to_fields


def _all_related_fields(model):
    return model._meta._get_fields(forward=False, reverse=True, include_hidden=True)


def _related_non_m2m_objects(old_field, new_field):
    # Filter out m2m objects from reverse relations.
    # Return (old_relation, new_relation) tuples.
    return zip(
        (obj for obj in _all_related_fields(old_field.model) if _is_relevant_relation(obj, old_field)),
        (obj for obj in _all_related_fields(new_field.model) if _is_relevant_relation(obj, new_field)),
    )


[docs]class BaseDatabaseSchemaEditor: """ This class and its subclasses are responsible for emitting schema-changing statements to the databases - model creation/removal/alteration, field renaming, index fiddling, and so on. """ # Overrideable SQL templates sql_create_table = "CREATE TABLE %(table)s (%(definition)s)" sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s" sql_retablespace_table = "ALTER TABLE %(table)s SET TABLESPACE %(new_tablespace)s" sql_delete_table = "DROP TABLE %(table)s CASCADE" sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s" sql_alter_column = "ALTER TABLE %(table)s %(changes)s" sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s" sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL" sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL" sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s" sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT" sql_alter_column_no_default_null = sql_alter_column_no_default sql_alter_column_collate = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s" sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE" sql_rename_column = "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s" sql_update_with_default = "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL" sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s" sql_check_constraint = "CHECK (%(check)s)" sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s" sql_constraint = "CONSTRAINT %(name)s %(constraint)s" sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)" sql_delete_check = sql_delete_constraint sql_create_unique = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s UNIQUE (%(columns)s)%(deferrable)s" sql_delete_unique = sql_delete_constraint sql_create_fk = ( "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) " "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s" ) sql_create_inline_fk = None sql_create_column_inline_fk = None sql_delete_fk = sql_delete_constraint sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(include)s%(extra)s%(condition)s" sql_create_unique_index = "CREATE UNIQUE INDEX %(name)s ON %(table)s (%(columns)s)%(include)s%(condition)s" sql_delete_index = "DROP INDEX %(name)s" sql_create_pk = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)" sql_delete_pk = sql_delete_constraint sql_delete_procedure = 'DROP PROCEDURE %(procedure)s' def __init__(self, connection, collect_sql=False, atomic=True): self.connection = connection self.collect_sql = collect_sql if self.collect_sql: self.collected_sql = [] self.atomic_migration = self.connection.features.can_rollback_ddl and atomic # State-managing methods def __enter__(self): self.deferred_sql = [] if self.atomic_migration: self.atomic = atomic(self.connection.alias) self.atomic.__enter__() return self def __exit__(self, exc_type, exc_value, traceback): if exc_type is None: for sql in self.deferred_sql: self.execute(sql) if self.atomic_migration: self.atomic.__exit__(exc_type, exc_value, traceback) # Core utility functions
[docs] def execute(self, sql, params=()): """Execute the given SQL statement, with optional parameters.""" # Don't perform the transactional DDL check if SQL is being collected # as it's not going to be executed anyway. if not self.collect_sql and self.connection.in_atomic_block and not self.connection.features.can_rollback_ddl: raise TransactionManagementError( "Executing DDL statements while in a transaction on databases " "that can't perform a rollback is prohibited." ) # Account for non-string statement objects. sql = str(sql) # Log the command we're running, then run it logger.debug("%s; (params %r)", sql, params, extra={'params': params, 'sql': sql}) if self.collect_sql: ending = "" if sql.endswith(";") else ";" if params is not None: self.collected_sql.append((sql % tuple(map(self.quote_value, params))) + ending) else: self.collected_sql.append(sql + ending) else: with self.connection.cursor() as cursor: cursor.execute(sql, params)
def quote_name(self, name): return self.connection.ops.quote_name(name) def table_sql(self, model): """Take a model and return its table definition.""" # Add any unique_togethers (always deferred, as some fields might be # created afterwards, like geometry fields with some backends). for fields in model._meta.unique_together: columns = [model._meta.get_field(field).column for field in fields] self.deferred_sql.append(self._create_unique_sql(model, columns)) # Create column SQL, add FK deferreds if needed. column_sqls = [] params = [] for field in model._meta.local_fields: # SQL. definition, extra_params = self.column_sql(model, field) if definition is None: continue # Check constraints can go on the column SQL here. db_params = field.db_parameters(connection=self.connection) if db_params['check']: definition += ' ' + self.sql_check_constraint % db_params # Autoincrement SQL (for backends with inline variant). col_type_suffix = field.db_type_suffix(connection=self.connection) if col_type_suffix: definition += ' %s' % col_type_suffix params.extend(extra_params) # FK. if field.remote_field and field.db_constraint: to_table = field.remote_field.model._meta.db_table to_column = field.remote_field.model._meta.get_field(field.remote_field.field_name).column if self.sql_create_inline_fk: definition += ' ' + self.sql_create_inline_fk % { 'to_table': self.quote_name(to_table), 'to_column': self.quote_name(to_column), } elif self.connection.features.supports_foreign_keys: self.deferred_sql.append(self._create_fk_sql(model, field, '_fk_%(to_table)s_%(to_column)s')) # Add the SQL to our big list. column_sqls.append('%s %s' % ( self.quote_name(field.column), definition, )) # Autoincrement SQL (for backends with post table definition # variant). if field.get_internal_type() in ('AutoField', 'BigAutoField', 'SmallAutoField'): autoinc_sql = self.connection.ops.autoinc_sql(model._meta.db_table, field.column) if autoinc_sql: self.deferred_sql.extend(autoinc_sql) constraints = [constraint.constraint_sql(model, self) for constraint in model._meta.constraints] sql = self.sql_create_table % { 'table': self.quote_name(model._meta.db_table), 'definition': ', '.join(constraint for constraint in (*column_sqls, *constraints) if constraint), } if model._meta.db_tablespace: tablespace_sql = self.connection.ops.tablespace_sql(model._meta.db_tablespace) if tablespace_sql: sql += ' ' + tablespace_sql return sql, params # Field <-> database mapping functions def column_sql(self, model, field, include_default=False): """ Take a field and return its column definition. The field must already have had set_attributes_from_name() called. """ # Get the column's type and use that as the basis of the SQL db_params = field.db_parameters(connection=self.connection) sql = db_params['type'] params = [] # Check for fields that aren't actually columns (e.g. M2M) if sql is None: return None, None # Collation. collation = getattr(field, 'db_collation', None) if collation: sql += self._collate_sql(collation) # Work out nullability null = field.null # If we were told to include a default value, do so include_default = ( include_default and not self.skip_default(field) and # Don't include a default value if it's a nullable field and the # default cannot be dropped in the ALTER COLUMN statement (e.g. # MySQL longtext and longblob). not (null and self.skip_default_on_alter(field)) ) if include_default: default_value = self.effective_default(field) column_default = ' DEFAULT ' + self._column_default_sql(field) if default_value is not None: if self.connection.features.requires_literal_defaults: # Some databases can't take defaults as a parameter (oracle) # If this is the case, the individual schema backend should # implement prepare_default sql += column_default % self.prepare_default(default_value) else: sql += column_default params += [default_value] # Oracle treats the empty string ('') as null, so coerce the null # option whenever '' is a possible value. if (field.empty_strings_allowed and not field.primary_key and self.connection.features.interprets_empty_strings_as_nulls): null = True if null and not self.connection.features.implied_column_null: sql += " NULL" elif not null: sql += " NOT NULL" # Primary key/unique outputs if field.primary_key: sql += " PRIMARY KEY" elif field.unique: sql += " UNIQUE" # Optionally add the tablespace if it's an implicitly indexed column tablespace = field.db_tablespace or model._meta.db_tablespace if tablespace and self.connection.features.supports_tablespaces and field.unique: sql += " %s" % self.connection.ops.tablespace_sql(tablespace, inline=True) # Return the sql return sql, params def skip_default(self, field): """ Some backends don't accept default values for certain columns types (i.e. MySQL longtext and longblob). """ return False def skip_default_on_alter(self, field): """ Some backends don't accept default values for certain columns types (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement. """ return False def prepare_default(self, value): """ Only used for backends which have requires_literal_defaults feature """ raise NotImplementedError( 'subclasses of BaseDatabaseSchemaEditor for backends which have ' 'requires_literal_defaults must provide a prepare_default() method' ) def _column_default_sql(self, field): """ Return the SQL to use in a DEFAULT clause. The resulting string should contain a '%s' placeholder for a default value. """ return '%s' @staticmethod def _effective_default(field): # This method allows testing its logic without a connection. if field.has_default(): default = field.get_default() elif not field.null and field.blank and field.empty_strings_allowed: if field.get_internal_type() == "BinaryField": default = b'' else: default = '' elif getattr(field, 'auto_now', False) or getattr(field, 'auto_now_add', False): default = datetime.now() internal_type = field.get_internal_type() if internal_type == 'DateField': default = default.date() elif internal_type == 'TimeField': default = default.time() elif internal_type == 'DateTimeField': default = timezone.now() else: default = None return default def effective_default(self, field): """Return a field's effective database default value.""" return field.get_db_prep_save(self._effective_default(field), self.connection) def quote_value(self, value): """ Return a quoted version of the value so it's safe to use in an SQL string. This is not safe against injection from user code; it is intended only for use in making SQL scripts or preparing default values for particularly tricky backends (defaults are not user-defined, though, so this is safe). """ raise NotImplementedError() # Actions
[docs] def create_model(self, model): """ Create a table and any accompanying indexes or unique constraints for the given `model`. """ sql, params = self.table_sql(model) # Prevent using [] as params, in the case a literal '%' is used in the definition self.execute(sql, params or None) # Add any field index and index_together's (deferred as SQLite _remake_table needs it) self.deferred_sql.extend(self._model_indexes_sql(model)) # Make M2M tables for field in model._meta.local_many_to_many: if field.remote_field.through._meta.auto_created: self.create_model(field.remote_field.through)
[docs] def delete_model(self, model): """Delete a model from the database.""" # Handle auto-created intermediary models for field in model._meta.local_many_to_many: if field.remote_field.through._meta.auto_created: self.delete_model(field.remote_field.through) # Delete the table self.execute(self.sql_delete_table % { "table": self.quote_name(model._meta.db_table), }) # Remove all deferred statements referencing the deleted table. for sql in list(self.deferred_sql): if isinstance(sql, Statement) and sql.references_table(model._meta.db_table): self.deferred_sql.remove(sql)
[docs] def add_index(self, model, index): """Add an index on a model.""" if ( index.contains_expressions and not self.connection.features.supports_expression_indexes ): return None # Index.create_sql returns interpolated SQL which makes params=None a # necessity to avoid escaping attempts on execution. self.execute(index.create_sql(model, self), params=None)
[docs] def remove_index(self, model, index): """Remove an index from a model.""" if ( index.contains_expressions and not self.connection.features.supports_expression_indexes ): return None self.execute(index.remove_sql(model, self))
[docs] def add_constraint(self, model, constraint): """Add a constraint to a model.""" sql = constraint.create_sql(model, self) if sql: # Constraint.create_sql returns interpolated SQL which makes # params=None a necessity to avoid escaping attempts on execution. self.execute(sql, params=None)
[docs] def remove_constraint(self, model, constraint): """Remove a constraint from a model.""" sql = constraint.remove_sql(model, self) if sql: self.execute(sql)
[docs] def alter_unique_together(self, model, old_unique_together, new_unique_together): """ Deal with a model changing its unique_together. The input unique_togethers must be doubly-nested, not the single-nested ["foo", "bar"] format. """ olds = {tuple(fields) for fields in old_unique_together} news = {tuple(fields) for fields in new_unique_together} # Deleted uniques for fields in olds.difference(news): self._delete_composed_index(model, fields, {'unique': True}, self.sql_delete_unique) # Created uniques for fields in news.difference(olds): columns = [model._meta.get_field(field).column for field in fields] self.execute(self._create_unique_sql(model, columns))
[docs] def alter_index_together(self, model, old_index_together, new_index_together): """ Deal with a model changing its index_together. The input index_togethers must be doubly-nested, not the single-nested ["foo", "bar"] format. """ olds = {tuple(fields) for fields in old_index_together} news = {tuple(fields) for fields in new_index_together} # Deleted indexes for fields in olds.difference(news): self._delete_composed_index( model, fields, {'index': True, 'unique': False}, self.sql_delete_index, ) # Created indexes for field_names in news.difference(olds): fields = [model._meta.get_field(field) for field in field_names] self.execute(self._create_index_sql(model, fields=fields, suffix='_idx'))
def _delete_composed_index(self, model, fields, constraint_kwargs, sql): meta_constraint_names = {constraint.name for constraint in model._meta.constraints} meta_index_names = {constraint.name for constraint in model._meta.indexes} columns = [model._meta.get_field(field).column for field in fields] constraint_names = self._constraint_names( model, columns, exclude=meta_constraint_names | meta_index_names, **constraint_kwargs ) if len(constraint_names) != 1: raise ValueError("Found wrong number (%s) of constraints for %s(%s)" % ( len(constraint_names), model._meta.db_table, ", ".join(columns), )) self.execute(self._delete_constraint_sql(sql, model, constraint_names[0]))
[docs] def alter_db_table(self, model, old_db_table, new_db_table): """Rename the table a model points to.""" if (old_db_table == new_db_table or (self.connection.features.ignores_table_name_case and old_db_table.lower() == new_db_table.lower())): return self.execute(self.sql_rename_table % { "old_table": self.quote_name(old_db_table), "new_table": self.quote_name(new_db_table), }) # Rename all references to the old table name. for sql in self.deferred_sql: if isinstance(sql, Statement): sql.rename_table_references(old_db_table, new_db_table)
[docs] def alter_db_tablespace(self, model, old_db_tablespace, new_db_tablespace): """Move a model's table between tablespaces.""" self.execute(self.sql_retablespace_table % { "table": self.quote_name(model._meta.db_table), "old_tablespace": self.quote_name(old_db_tablespace), "new_tablespace": self.quote_name(new_db_tablespace), })
[docs] def add_field(self, model, field): """ Create a field on a model. Usually involves adding a column, but may involve adding a table instead (for M2M fields). """ # Special-case implicit M2M tables if field.many_to_many and field.remote_field.through._meta.auto_created: return self.create_model(field.remote_field.through) # Get the column's definition definition, params = self.column_sql(model, field, include_default=True) # It might not actually have a column behind it if definition is None: return # Check constraints can go on the column SQL here db_params = field.db_parameters(connection=self.connection) if db_params['check']: definition += " " + self.sql_check_constraint % db_params if field.remote_field and self.connection.features.supports_foreign_keys and field.db_constraint: constraint_suffix = '_fk_%(to_table)s_%(to_column)s' # Add FK constraint inline, if supported. if self.sql_create_column_inline_fk: to_table = field.remote_field.model._meta.db_table to_column = field.remote_field.model._meta.get_field(field.remote_field.field_name).column namespace, _ = split_identifier(model._meta.db_table) definition += " " + self.sql_create_column_inline_fk % { 'name': self._fk_constraint_name(model, field, constraint_suffix), 'namespace': '%s.' % self.quote_name(namespace) if namespace else '', 'column': self.quote_name(field.column), 'to_table': self.quote_name(to_table), 'to_column': self.quote_name(to_column), 'deferrable': self.connection.ops.deferrable_sql() } # Otherwise, add FK constraints later. else: self.deferred_sql.append(self._create_fk_sql(model, field, constraint_suffix)) # Build the SQL and run it sql = self.sql_create_column % { "table": self.quote_name(model._meta.db_table), "column": self.quote_name(field.column), "definition": definition, } self.execute(sql, params) # Drop the default if we need to # (Django usually does not use in-database defaults) if not self.skip_default_on_alter(field) and self.effective_default(field) is not None: changes_sql, params = self._alter_column_default_sql(model, None, field, drop=True) sql = self.sql_alter_column % { "table": self.quote_name(model._meta.db_table), "changes": changes_sql, } self.execute(sql, params) # Add an index, if required self.deferred_sql.extend(self._field_indexes_sql(model, field)) # Reset connection if required if self.connection.features.connection_persists_old_columns: self.connection.close()
[docs] def remove_field(self, model, field): """ Remove a field from a model. Usually involves deleting a column, but for M2Ms may involve deleting a table. """ # Special-case implicit M2M tables if field.many_to_many and field.remote_field.through._meta.auto_created: return self.delete_model(field.remote_field.through) # It might not actually have a column behind it if field.db_parameters(connection=self.connection)['type'] is None: return # Drop any FK constraints, MySQL requires explicit deletion if field.remote_field: fk_names = self._constraint_names(model, [field.column], foreign_key=True) for fk_name in fk_names: self.execute(self._delete_fk_sql(model, fk_name)) # Delete the column sql = self.sql_delete_column % { "table": self.quote_name(model._meta.db_table), "column": self.quote_name(field.column), } self.execute(sql) # Reset connection if required if self.connection.features.connection_persists_old_columns: self.connection.close() # Remove all deferred statements referencing the deleted column. for sql in list(self.deferred_sql): if isinstance(sql, Statement) and sql.references_column(model._meta.db_table, field.column): self.deferred_sql.remove(sql)
[docs] def alter_field(self, model, old_field, new_field, strict=False): """ Allow a field's type, uniqueness, nullability, default, column, constraints, etc. to be modified. `old_field` is required to compute the necessary changes. If `strict` is True, raise errors if the old column does not match `old_field` precisely. """ if not self._field_should_be_altered(old_field, new_field): return # Ensure this field is even column-based old_db_params = old_field.db_parameters(connection=self.connection) old_type = old_db_params['type'] new_db_params = new_field.db_parameters(connection=self.connection) new_type = new_db_params['type'] if ((old_type is None and old_field.remote_field is None) or (new_type is None and new_field.remote_field is None)): raise ValueError( "Cannot alter field %s into %s - they do not properly define " "db_type (are you using a badly-written custom field?)" % (old_field, new_field), ) elif old_type is None and new_type is None and ( old_field.remote_field.through and new_field.remote_field.through and old_field.remote_field.through._meta.auto_created and new_field.remote_field.through._meta.auto_created): return self._alter_many_to_many(model, old_field, new_field, strict) elif old_type is None and new_type is None and ( old_field.remote_field.through and new_field.remote_field.through and not old_field.remote_field.through._meta.auto_created and not new_field.remote_field.through._meta.auto_created): # Both sides have through models; this is a no-op. return elif old_type is None or new_type is None: raise ValueError( "Cannot alter field %s into %s - they are not compatible types " "(you cannot alter to or from M2M fields, or add or remove " "through= on M2M fields)" % (old_field, new_field) ) self._alter_field(model, old_field, new_field, old_type, new_type, old_db_params, new_db_params, strict)
def _alter_field(self, model, old_field, new_field, old_type, new_type, old_db_params, new_db_params, strict=False): """Perform a "physical" (non-ManyToMany) field update.""" # Drop any FK constraints, we'll remake them later fks_dropped = set() if ( self.connection.features.supports_foreign_keys and old_field.remote_field and old_field.db_constraint ): fk_names = self._constraint_names(model, [old_field.column], foreign_key=True) if strict and len(fk_names) != 1: raise ValueError("Found wrong number (%s) of foreign key constraints for %s.%s" % ( len(fk_names), model._meta.db_table, old_field.column, )) for fk_name in fk_names: fks_dropped.add((old_field.column,)) self.execute(self._delete_fk_sql(model, fk_name)) # Has unique been removed? if old_field.unique and (not new_field.unique or self._field_became_primary_key(old_field, new_field)): # Find the unique constraint for this field meta_constraint_names = {constraint.name for constraint in model._meta.constraints} constraint_names = self._constraint_names( model, [old_field.column], unique=True, primary_key=False, exclude=meta_constraint_names, ) if strict and len(constraint_names) != 1: raise ValueError("Found wrong number (%s) of unique constraints for %s.%s" % ( len(constraint_names), model._meta.db_table, old_field.column, )) for constraint_name in constraint_names: self.execute(self._delete_unique_sql(model, constraint_name)) # Drop incoming FK constraints if the field is a primary key or unique, # which might be a to_field target, and things are going to change. drop_foreign_keys = ( self.connection.features.supports_foreign_keys and ( (old_field.primary_key and new_field.primary_key) or (old_field.unique and new_field.unique) ) and old_type != new_type ) if drop_foreign_keys: # '_meta.related_field' also contains M2M reverse fields, these # will be filtered out for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field): rel_fk_names = self._constraint_names( new_rel.related_model, [new_rel.field.column], foreign_key=True ) for fk_name in rel_fk_names: self.execute(self._delete_fk_sql(new_rel.related_model, fk_name)) # Removed an index? (no strict check, as multiple indexes are possible) # Remove indexes if db_index switched to False or a unique constraint # will now be used in lieu of an index. The following lines from the # truth table show all True cases; the rest are False: # # old_field.db_index | old_field.unique | new_field.db_index | new_field.unique # ------------------------------------------------------------------------------ # True | False | False | False # True | False | False | True # True | False | True | True if old_field.db_index and not old_field.unique and (not new_field.db_index or new_field.unique): # Find the index for this field meta_index_names = {index.name for index in model._meta.indexes} # Retrieve only BTREE indexes since this is what's created with # db_index=True. index_names = self._constraint_names( model, [old_field.column], index=True, type_=Index.suffix, exclude=meta_index_names, ) for index_name in index_names: # The only way to check if an index was created with # db_index=True or with Index(['field'], name='foo') # is to look at its name (refs #28053). self.execute(self._delete_index_sql(model, index_name)) # Change check constraints? if old_db_params['check'] != new_db_params['check'] and old_db_params['check']: meta_constraint_names = {constraint.name for constraint in model._meta.constraints} constraint_names = self._constraint_names( model, [old_field.column], check=True, exclude=meta_constraint_names, ) if strict and len(constraint_names) != 1: raise ValueError("Found wrong number (%s) of check constraints for %s.%s" % ( len(constraint_names), model._meta.db_table, old_field.column, )) for constraint_name in constraint_names: self.execute(self._delete_check_sql(model, constraint_name)) # Have they renamed the column? if old_field.column != new_field.column: self.execute(self._rename_field_sql(model._meta.db_table, old_field, new_field, new_type)) # Rename all references to the renamed column. for sql in self.deferred_sql: if isinstance(sql, Statement): sql.rename_column_references(model._meta.db_table, old_field.column, new_field.column) # Next, start accumulating actions to do actions = [] null_actions = [] post_actions = [] # Collation change? old_collation = getattr(old_field, 'db_collation', None) new_collation = getattr(new_field, 'db_collation', None) if old_collation != new_collation: # Collation change handles also a type change. fragment = self._alter_column_collation_sql(model, new_field, new_type, new_collation) actions.append(fragment) # Type change? elif old_type != new_type: fragment, other_actions = self._alter_column_type_sql(model, old_field, new_field, new_type) actions.append(fragment) post_actions.extend(other_actions) # When changing a column NULL constraint to NOT NULL with a given # default value, we need to perform 4 steps: # 1. Add a default for new incoming writes # 2. Update existing NULL rows with new default # 3. Replace NULL constraint with NOT NULL # 4. Drop the default again. # Default change? needs_database_default = False if old_field.null and not new_field.null: old_default = self.effective_default(old_field) new_default = self.effective_default(new_field) if ( not self.skip_default_on_alter(new_field) and old_default != new_default and new_default is not None ): needs_database_default = True actions.append(self._alter_column_default_sql(model, old_field, new_field)) # Nullability change? if old_field.null != new_field.null: fragment = self._alter_column_null_sql(model, old_field, new_field) if fragment: null_actions.append(fragment) # Only if we have a default and there is a change from NULL to NOT NULL four_way_default_alteration = ( new_field.has_default() and (old_field.null and not new_field.null) ) if actions or null_actions: if not four_way_default_alteration: # If we don't have to do a 4-way default alteration we can # directly run a (NOT) NULL alteration actions = actions + null_actions # Combine actions together if we can (e.g. postgres) if self.connection.features.supports_combined_alters and actions: sql, params = tuple(zip(*actions)) actions = [(", ".join(sql), sum(params, []))] # Apply those actions for sql, params in actions: self.execute( self.sql_alter_column % { "table": self.quote_name(model._meta.db_table), "changes": sql, }, params, ) if four_way_default_alteration: # Update existing rows with default value self.execute( self.sql_update_with_default % { "table": self.quote_name(model._meta.db_table), "column": self.quote_name(new_field.column), "default": "%s", }, [new_default], ) # Since we didn't run a NOT NULL change before we need to do it # now for sql, params in null_actions: self.execute( self.sql_alter_column % { "table": self.quote_name(model._meta.db_table), "changes": sql, }, params, ) if post_actions: for sql, params in post_actions: self.execute(sql, params) # If primary_key changed to False, delete the primary key constraint. if old_field.primary_key and not new_field.primary_key: self._delete_primary_key(model, strict) # Added a unique? if self._unique_should_be_added(old_field, new_field): self.execute(self._create_unique_sql(model, [new_field.column])) # Added an index? Add an index if db_index switched to True or a unique # constraint will no longer be used in lieu of an index. The following # lines from the truth table show all True cases; the rest are False: # # old_field.db_index | old_field.unique | new_field.db_index | new_field.unique # ------------------------------------------------------------------------------ # False | False | True | False # False | True | True | False # True | True | True | False if (not old_field.db_index or old_field.unique) and new_field.db_index and not new_field.unique: self.execute(self._create_index_sql(model, fields=[new_field])) # Type alteration on primary key? Then we need to alter the column # referring to us. rels_to_update = [] if drop_foreign_keys: rels_to_update.extend(_related_non_m2m_objects(old_field, new_field)) # Changed to become primary key? if self._field_became_primary_key(old_field, new_field): # Make the new one self.execute(self._create_primary_key_sql(model, new_field)) # Update all referencing columns rels_to_update.extend(_related_non_m2m_objects(old_field, new_field)) # Handle our type alters on the other end of rels from the PK stuff above for old_rel, new_rel in rels_to_update: rel_db_params = new_rel.field.db_parameters(connection=self.connection) rel_type = rel_db_params['type'] fragment, other_actions = self._alter_column_type_sql( new_rel.related_model, old_rel.field, new_rel.field, rel_type ) self.execute( self.sql_alter_column % { "table": self.quote_name(new_rel.related_model._meta.db_table), "changes": fragment[0], }, fragment[1], ) for sql, params in other_actions: self.execute(sql, params) # Does it have a foreign key? if (self.connection.features.supports_foreign_keys and new_field.remote_field and (fks_dropped or not old_field.remote_field or not old_field.db_constraint) and new_field.db_constraint): self.execute(self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")) # Rebuild FKs that pointed to us if we previously had to drop them if drop_foreign_keys: for rel in new_field.model._meta.related_objects: if _is_relevant_relation(rel, new_field) and rel.field.db_constraint: self.execute(self._create_fk_sql(rel.related_model, rel.field, "_fk")) # Does it have check constraints we need to add? if old_db_params['check'] != new_db_params['check'] and new_db_params['check']: constraint_name = self._create_index_name(model._meta.db_table, [new_field.column], suffix='_check') self.execute(self._create_check_sql(model, constraint_name, new_db_params['check'])) # Drop the default if we need to # (Django usually does not use in-database defaults) if needs_database_default: changes_sql, params = self._alter_column_default_sql(model, old_field, new_field, drop=True) sql = self.sql_alter_column % { "table": self.quote_name(model._meta.db_table), "changes": changes_sql, } self.execute(sql, params) # Reset connection if required if self.connection.features.connection_persists_old_columns: self.connection.close() def _alter_column_null_sql(self, model, old_field, new_field): """ Hook to specialize column null alteration. Return a (sql, params) fragment to set a column to null or non-null as required by new_field, or None if no changes are required. """ if (self.connection.features.interprets_empty_strings_as_nulls and new_field.get_internal_type() in ("CharField", "TextField")): # The field is nullable in the database anyway, leave it alone. return else: new_db_params = new_field.db_parameters(connection=self.connection) sql = self.sql_alter_column_null if new_field.null else self.sql_alter_column_not_null return ( sql % { 'column': self.quote_name(new_field.column), 'type': new_db_params['type'], }, [], ) def _alter_column_default_sql(self, model, old_field, new_field, drop=False): """ Hook to specialize column default alteration. Return a (sql, params) fragment to add or drop (depending on the drop argument) a default to new_field's column. """ new_default = self.effective_default(new_field) default = self._column_default_sql(new_field) params = [new_default] if drop: params = [] elif self.connection.features.requires_literal_defaults: # Some databases (Oracle) can't take defaults as a parameter # If this is the case, the SchemaEditor for that database should # implement prepare_default(). default = self.prepare_default(new_default) params = [] new_db_params = new_field.db_parameters(connection=self.connection) if drop: if new_field.null: sql = self.sql_alter_column_no_default_null else: sql = self.sql_alter_column_no_default else: sql = self.sql_alter_column_default return ( sql % { 'column': self.quote_name(new_field.column), 'type': new_db_params['type'], 'default': default, }, params, ) def _alter_column_type_sql(self, model, old_field, new_field, new_type): """ Hook to specialize column type alteration for different backends, for cases when a creation type is different to an alteration type (e.g. SERIAL in PostgreSQL, PostGIS fields). Return a two-tuple of: an SQL fragment of (sql, params) to insert into an ALTER TABLE statement and a list of extra (sql, params) tuples to run once the field is altered. """ return ( ( self.sql_alter_column_type % { "column": self.quote_name(new_field.column), "type": new_type, }, [], ), [], ) def _alter_column_collation_sql(self, model, new_field, new_type, new_collation): return ( self.sql_alter_column_collate % { 'column': self.quote_name(new_field.column), 'type': new_type, 'collation': self._collate_sql(new_collation) if new_collation else '', }, [], ) def _alter_many_to_many(self, model, old_field, new_field, strict): """Alter M2Ms to repoint their to= endpoints.""" # Rename the through table if old_field.remote_field.through._meta.db_table != new_field.remote_field.through._meta.db_table: self.alter_db_table(old_field.remote_field.through, old_field.remote_field.through._meta.db_table, new_field.remote_field.through._meta.db_table) # Repoint the FK to the other side self.alter_field( new_field.remote_field.through, # We need the field that points to the target model, so we can tell alter_field to change it - # this is m2m_reverse_field_name() (as opposed to m2m_field_name, which points to our model) old_field.remote_field.through._meta.get_field(old_field.m2m_reverse_field_name()), new_field.remote_field.through._meta.get_field(new_field.m2m_reverse_field_name()), ) self.alter_field( new_field.remote_field.through, # for self-referential models we need to alter field from the other end too old_field.remote_field.through._meta.get_field(old_field.m2m_field_name()), new_field.remote_field.through._meta.get_field(new_field.m2m_field_name()), ) def _create_index_name(self, table_name, column_names, suffix=""): """ Generate a unique name for an index/unique constraint. The name is divided into 3 parts: the table name, the column names, and a unique digest and suffix. """ _, table_name = split_identifier(table_name) hash_suffix_part = '%s%s' % (names_digest(table_name, *column_names, length=8), suffix) max_length = self.connection.ops.max_name_length() or 200 # If everything fits into max_length, use that name. index_name = '%s_%s_%s' % (table_name, '_'.join(column_names), hash_suffix_part) if len(index_name) <= max_length: return index_name # Shorten a long suffix. if len(hash_suffix_part) > max_length / 3: hash_suffix_part = hash_suffix_part[:max_length // 3] other_length = (max_length - len(hash_suffix_part)) // 2 - 1 index_name = '%s_%s_%s' % ( table_name[:other_length], '_'.join(column_names)[:other_length], hash_suffix_part, ) # Prepend D if needed to prevent the name from starting with an # underscore or a number (not permitted on Oracle). if index_name[0] == "_" or index_name[0].isdigit(): index_name = "D%s" % index_name[:-1] return index_name def _get_index_tablespace_sql(self, model, fields, db_tablespace=None): if db_tablespace is None: if len(fields) == 1 and fields[0].db_tablespace: db_tablespace = fields[0].db_tablespace elif model._meta.db_tablespace: db_tablespace = model._meta.db_tablespace if db_tablespace is not None: return ' ' + self.connection.ops.tablespace_sql(db_tablespace) return '' def _index_condition_sql(self, condition): if condition: return ' WHERE ' + condition return '' def _index_include_sql(self, model, columns): if not columns or not self.connection.features.supports_covering_indexes: return '' return Statement( ' INCLUDE (%(columns)s)', columns=Columns(model._meta.db_table, columns, self.quote_name), ) def _create_index_sql(self, model, *, fields=None, name=None, suffix='', using='', db_tablespace=None, col_suffixes=(), sql=None, opclasses=(), condition=None, include=None, expressions=None): """ Return the SQL statement to create the index for one or several fields or expressions. `sql` can be specified if the syntax differs from the standard (GIS indexes, ...). """ fields = fields or [] expressions = expressions or [] compiler = Query(model, alias_cols=False).get_compiler( connection=self.connection, ) tablespace_sql = self._get_index_tablespace_sql(model, fields, db_tablespace=db_tablespace) columns = [field.column for field in fields] sql_create_index = sql or self.sql_create_index table = model._meta.db_table def create_index_name(*args, **kwargs): nonlocal name if name is None: name = self._create_index_name(*args, **kwargs) return self.quote_name(name) return Statement( sql_create_index, table=Table(table, self.quote_name), name=IndexName(table, columns, suffix, create_index_name), using=using, columns=( self._index_columns(table, columns, col_suffixes, opclasses) if columns else Expressions(table, expressions, compiler, self.quote_value) ), extra=tablespace_sql, condition=self._index_condition_sql(condition), include=self._index_include_sql(model, include), ) def _delete_index_sql(self, model, name, sql=None): return Statement( sql or self.sql_delete_index, table=Table(model._meta.db_table, self.quote_name), name=self.quote_name(name), ) def _index_columns(self, table, columns, col_suffixes, opclasses): return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes) def _model_indexes_sql(self, model): """ Return a list of all index SQL statements (field indexes, index_together, Meta.indexes) for the specified model. """ if not model._meta.managed or model._meta.proxy or model._meta.swapped: return [] output = [] for field in model._meta.local_fields: output.extend(self._field_indexes_sql(model, field)) for field_names in model._meta.index_together: fields = [model._meta.get_field(field) for field in field_names] output.append(self._create_index_sql(model, fields=fields, suffix='_idx')) for index in model._meta.indexes: if ( not index.contains_expressions or self.connection.features.supports_expression_indexes ): output.append(index.create_sql(model, self)) return output def _field_indexes_sql(self, model, field): """ Return a list of all index SQL statements for the specified field. """ output = [] if self._field_should_be_indexed(model, field): output.append(self._create_index_sql(model, fields=[field])) return output def _field_should_be_altered(self, old_field, new_field): _, old_path, old_args, old_kwargs = old_field.deconstruct() _, new_path, new_args, new_kwargs = new_field.deconstruct() # Don't alter when: # - changing only a field name # - changing an attribute that doesn't affect the schema # - adding only a db_column and the column name is not changed non_database_attrs = [ 'blank', 'db_column', 'editable', 'error_messages', 'help_text', 'limit_choices_to', # Database-level options are not supported, see #21961. 'on_delete', 'related_name', 'related_query_name', 'validators', 'verbose_name', ] for attr in non_database_attrs: old_kwargs.pop(attr, None) new_kwargs.pop(attr, None) return ( self.quote_name(old_field.column) != self.quote_name(new_field.column) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs) ) def _field_should_be_indexed(self, model, field): return field.db_index and not field.unique def _field_became_primary_key(self, old_field, new_field): return not old_field.primary_key and new_field.primary_key def _unique_should_be_added(self, old_field, new_field): return (not old_field.unique and new_field.unique) or ( old_field.primary_key and not new_field.primary_key and new_field.unique ) def _rename_field_sql(self, table, old_field, new_field, new_type): return self.sql_rename_column % { "table": self.quote_name(table), "old_column": self.quote_name(old_field.column), "new_column": self.quote_name(new_field.column), "type": new_type, } def _create_fk_sql(self, model, field, suffix): table = Table(model._meta.db_table, self.quote_name) name = self._fk_constraint_name(model, field, suffix) column = Columns(model._meta.db_table, [field.column], self.quote_name) to_table = Table(field.target_field.model._meta.db_table, self.quote_name) to_column = Columns(field.target_field.model._meta.db_table, [field.target_field.column], self.quote_name) deferrable = self.connection.ops.deferrable_sql() return Statement( self.sql_create_fk, table=table, name=name, column=column, to_table=to_table, to_column=to_column, deferrable=deferrable, ) def _fk_constraint_name(self, model, field, suffix): def create_fk_name(*args, **kwargs): return self.quote_name(self._create_index_name(*args, **kwargs)) return ForeignKeyName( model._meta.db_table, [field.column], split_identifier(field.target_field.model._meta.db_table)[1], [field.target_field.column], suffix, create_fk_name, ) def _delete_fk_sql(self, model, name): return self._delete_constraint_sql(self.sql_delete_fk, model, name) def _deferrable_constraint_sql(self, deferrable): if deferrable is None: return '' if deferrable == Deferrable.DEFERRED: return ' DEFERRABLE INITIALLY DEFERRED' if deferrable == Deferrable.IMMEDIATE: return ' DEFERRABLE INITIALLY IMMEDIATE' def _unique_sql( self, model, fields, name, condition=None, deferrable=None, include=None, opclasses=None, ): if ( deferrable and not self.connection.features.supports_deferrable_unique_constraints ): return None if condition or include or opclasses: # Databases support conditional and covering unique constraints via # a unique index. sql = self._create_unique_sql( model, fields, name=name, condition=condition, include=include, opclasses=opclasses, ) if sql: self.deferred_sql.append(sql) return None constraint = self.sql_unique_constraint % { 'columns': ', '.join(map(self.quote_name, fields)), 'deferrable': self._deferrable_constraint_sql(deferrable), } return self.sql_constraint % { 'name': self.quote_name(name), 'constraint': constraint, } def _create_unique_sql( self, model, columns, name=None, condition=None, deferrable=None, include=None, opclasses=None, ): if ( ( deferrable and not self.connection.features.supports_deferrable_unique_constraints ) or (condition and not self.connection.features.supports_partial_indexes) or (include and not self.connection.features.supports_covering_indexes) ): return None def create_unique_name(*args, **kwargs): return self.quote_name(self._create_index_name(*args, **kwargs)) table = Table(model._meta.db_table, self.quote_name) if name is None: name = IndexName(model._meta.db_table, columns, '_uniq', create_unique_name) else: name = self.quote_name(name) columns = self._index_columns(table, columns, col_suffixes=(), opclasses=opclasses) if condition or include or opclasses: sql = self.sql_create_unique_index else: sql = self.sql_create_unique return Statement( sql, table=table, name=name, columns=columns, condition=self._index_condition_sql(condition), deferrable=self._deferrable_constraint_sql(deferrable), include=self._index_include_sql(model, include), ) def _delete_unique_sql( self, model, name, condition=None, deferrable=None, include=None, opclasses=None, ): if ( ( deferrable and not self.connection.features.supports_deferrable_unique_constraints ) or (condition and not self.connection.features.supports_partial_indexes) or (include and not self.connection.features.supports_covering_indexes) ): return None if condition or include or opclasses: sql = self.sql_delete_index else: sql = self.sql_delete_unique return self._delete_constraint_sql(sql, model, name) def _check_sql(self, name, check): return self.sql_constraint % { 'name': self.quote_name(name), 'constraint': self.sql_check_constraint % {'check': check}, } def _create_check_sql(self, model, name, check): return Statement( self.sql_create_check, table=Table(model._meta.db_table, self.quote_name), name=self.quote_name(name), check=check, ) def _delete_check_sql(self, model, name): return self._delete_constraint_sql(self.sql_delete_check, model, name) def _delete_constraint_sql(self, template, model, name): return Statement( template, table=Table(model._meta.db_table, self.quote_name), name=self.quote_name(name), ) def _constraint_names(self, model, column_names=None, unique=None, primary_key=None, index=None, foreign_key=None, check=None, type_=None, exclude=None): """Return all constraint names matching the columns and conditions.""" if column_names is not None: column_names = [ self.connection.introspection.identifier_converter(name) for name in column_names ] with self.connection.cursor() as cursor: constraints = self.connection.introspection.get_constraints(cursor, model._meta.db_table) result = [] for name, infodict in constraints.items(): if column_names is None or column_names == infodict['columns']: if unique is not None and infodict['unique'] != unique: continue if primary_key is not None and infodict['primary_key'] != primary_key: continue if index is not None and infodict['index'] != index: continue if check is not None and infodict['check'] != check: continue if foreign_key is not None and not infodict['foreign_key']: continue if type_ is not None and infodict['type'] != type_: continue if not exclude or name not in exclude: result.append(name) return result def _delete_primary_key(self, model, strict=False): constraint_names = self._constraint_names(model, primary_key=True) if strict and len(constraint_names) != 1: raise ValueError('Found wrong number (%s) of PK constraints for %s' % ( len(constraint_names), model._meta.db_table, )) for constraint_name in constraint_names: self.execute(self._delete_primary_key_sql(model, constraint_name)) def _create_primary_key_sql(self, model, field): return Statement( self.sql_create_pk, table=Table(model._meta.db_table, self.quote_name), name=self.quote_name( self._create_index_name(model._meta.db_table, [field.column], suffix="_pk") ), columns=Columns(model._meta.db_table, [field.column], self.quote_name), ) def _delete_primary_key_sql(self, model, name): return self._delete_constraint_sql(self.sql_delete_pk, model, name) def _collate_sql(self, collation): return ' COLLATE ' + self.quote_name(collation) def remove_procedure(self, procedure_name, param_types=()): sql = self.sql_delete_procedure % { 'procedure': self.quote_name(procedure_name), 'param_types': ','.join(param_types), } self.execute(sql)
Back to Top