List:Commits« Previous MessageNext Message »
From:Dmitry Shulga Date:November 1 2011 8:48am
Subject:bzr push into mysql-trunk branch (Dmitry.Shulga:3467 to 3468) WL#6030
View as plain text  
 3468 Dmitry Shulga	2011-10-31
      This is core patch that implements WL#6030 (The BEFORE triggers are not
      processed for NOT NULL columns).
      
      The main idea for WL implemantation is to add possibility for table's fields
      to store NULL value temporary even if that field is defined as NOT NULL.
      Such possibility is turned on before start trigger processing and turned off
      after it completed. When BEFORE trigger has been executed we start check
      NOT NULL constraints prior disable temporary nullbility for fields.
      
      In order to implement such behaviour we fill duplicated set of Field's
      objects when create new TABLE's object. This duplicated set of fields
      is referenced by pointer TABLE::nullable_field and is used when make handling
      of INSERT/UPDATE statements. Also interface of Field class is extended
      by including method set_temporary_nullable() and check_constraints(). The last
      method is used to check against NOT NULL constraints after finish execution
      of BEFORE triggers.
      
      This patch causes two test (main.warning and main.trigger) be faield
      therefore new testcase that should cover new functional is not included.

    modified:
      include/mysql_com.h
      sql/field.cc
      sql/field.h
      sql/field_conv.cc
      sql/ha_ndbcluster_cond.cc
      sql/item.cc
      sql/item.h
      sql/item_cmpfunc.cc
      sql/item_func.h
      sql/item_sum.cc
      sql/partition_info.cc
      sql/sp_head.cc
      sql/sql_base.cc
      sql/sql_base.h
      sql/sql_handler.cc
      sql/sql_insert.cc
      sql/sql_load.cc
      sql/sql_select.cc
      sql/sql_select.h
      sql/sql_show.cc
      sql/sql_table.cc
      sql/sql_trigger.cc
      sql/sql_union.cc
      sql/sql_update.cc
      sql/sql_view.cc
      sql/table.cc
      sql/table.h
 3467 Dmitry Shulga	2011-10-12
      Prerequisite patch for WL#6030.
      
      This patch minimizes a direct access to the data member Field::null_ptr.
      Direct access to null_ptr was replaced by call methods null_offset(),
      null_default_value_offset(), is_null_in_record().
      
      Redundant static functions field_in_record_is_null() were removed from 
      ha_federated.cc, ha_innodb.cc.
     @ sql/field.h
        The methods Field::null_offset() and Field::null_default_value_offset 
        were added. Added overloaded method Field::new_key_field without
        arguments new_null_ptr, new_nullbit, is_nullable.
        This method is used when values for these missed arguments may be 
        get from Field's instance.
     @ sql/item_func.cc
        Item_func_sp::init_result_field modified: pass pointer to null value
        as a last argument of sp_head::create_result_field instead of 
        explicit use of attributes Field::null_ptr, Field::null_bit.
     @ sql/sp_head.cc
        sp_head::create_result_field() was midified: added pointer to byte that
        stores a null value as the last argument. Subsequently this pointer is passed
        to make_field() to initialize datamember Field::nul_ptr.
     @ sql/sp_head.h
        Added pointer to byte that stores a null value as a last argument
        of sp_head::create_result_field.
     @ sql/sql_select.cc
        Use Field::null_offset() when makes calculation of null byte offset
        for record. Call of Field::new_kye_field() was replaced by overloaded
        method that doesn't get arguments new_null_ptr, new_null_bit, is_nullable.
     @ sql/sql_update.cc
        Use Field::null_offset() when makes calculation of null byte offset
        for record.
     @ sql/table.cc
        Use Field::null_offset() and Field::null_default_value_offset() when
        makes calculation of null byte offset for record.
     @ storage/federated/ha_federated.cc
        Static function field_in_record_is_null() was removed since
        its implementation is a duplication of functional
        of Field::is_null_in_record().
     @ storage/heap/ha_heap.cc
        Use Field::null_offset() when makes calculation of null byte offset
        for record.
     @ storage/innobase/handler/ha_innodb.cc
        Static function field_in_record_is_null() was removed since
        its implementation is a duplication of functional
        of Field::is_null_in_record().
        Use Field::null_offset() when makes calculation of null byte offset
        for record.
     @ storage/myisam/ha_myisam.cc
        Use Field::null_offset() when makes calculation of null byte offset
        for record.

    modified:
      sql/field.cc
      sql/field.h
      sql/item_func.cc
      sql/sp.cc
      sql/sp_head.cc
      sql/sp_head.h
      sql/sql_select.cc
      sql/sql_update.cc
      sql/table.cc
      storage/federated/ha_federated.cc
      storage/heap/ha_heap.cc
      storage/innobase/handler/ha_innodb.cc
      storage/myisam/ha_myisam.cc
=== modified file 'include/mysql_com.h'
--- a/include/mysql_com.h	2011-08-09 19:09:51 +0000
+++ b/include/mysql_com.h	2011-10-31 17:59:04 +0000
@@ -118,6 +118,7 @@ enum enum_server_command
                                            reserved by MySQL Cluster */
 #define FIELD_FLAGS_COLUMN_FORMAT 24    /* Field column format, bit 24-25,
                                            reserved by MySQL Cluster */
+#define TEMPORARY_NULLABLE_FLAG (1<< 26) /* Field is temporary nullable for trigger processing */
 
 #define REFRESH_GRANT		1	/* Refresh grant tables */
 #define REFRESH_LOG		2	/* Start on new log file */

=== modified file 'sql/field.cc'
--- a/sql/field.cc	2011-10-12 15:35:10 +0000
+++ b/sql/field.cc	2011-10-31 17:59:04 +0000
@@ -8549,6 +8549,49 @@ Field_bit::Field_bit(uchar *ptr_arg, uin
 }
 
 
+/**
+   Check that the current value stored in the Field does not break
+   any constraints defined for the Field.
+
+   Currently it only checks NOT NULL constraints.
+
+   @retval
+     0   Constraints not broken, or warning reported.
+   @retval
+     -1  Constraints broken, error reported.
+*/
+int Field::check_constraints()
+{
+  set_temporary_nullable(FALSE);
+  // Check for NULL in NOT NULL fields
+  if (!real_is_nullable() && null_ptr && (null_ptr[0] & null_bit))
+  {
+    if (this == table->next_number_field ||
+        (table->s->found_next_number_field &&
+         this == table->field[table->s->found_next_number_field -
+                              table->s->field]))
+      return 0;
+    if (type() == MYSQL_TYPE_TIMESTAMP)
+      return 0;
+    set_notnull();
+    switch (table->in_use->count_cuted_fields) {
+    case CHECK_FIELD_WARN:
+      set_warning(Sql_condition::WARN_LEVEL_WARN, ER_BAD_NULL_ERROR, 1);
+      /* fall through */
+    case CHECK_FIELD_IGNORE:
+      return 0;
+    case CHECK_FIELD_ERROR_FOR_NULL:
+      if (!table->in_use->no_errors)
+        my_error(ER_BAD_NULL_ERROR, MYF(0), field_name);
+      return -1;
+    }
+    DBUG_ASSERT(0); // impossible
+    return -1;
+  }
+  return 0;
+}
+
+
 void Field_bit::hash(ulong *nr, ulong *nr2)
 {
   if (is_null())

=== modified file 'sql/field.h'
--- a/sql/field.h	2011-10-12 15:35:10 +0000
+++ b/sql/field.h	2011-10-31 17:59:04 +0000
@@ -300,7 +300,16 @@ public:
      Signals that this field is NULL-able.
   */
   inline bool real_is_nullable(void) const
-  { return !(flags & NOT_NULL_FLAG); }
+  { return !(flags & NOT_NULL_FLAG) || (flags & TEMPORARY_NULLABLE_FLAG); }
+
+  inline void set_temporary_nullable(bool temporary_nullable)
+  {
+    if (!temporary_nullable)
+      flags &= ~TEMPORARY_NULLABLE_FLAG;
+    else if (null_ptr)
+      flags |= TEMPORARY_NULLABLE_FLAG;
+  }
+  int check_constraints();
 
   enum {
     LAST_NULL_BYTE_UNDEF= 0

=== modified file 'sql/field_conv.cc'
--- a/sql/field_conv.cc	2011-10-06 07:10:19 +0000
+++ b/sql/field_conv.cc	2011-10-31 17:59:04 +0000
@@ -117,25 +117,13 @@ static void do_outer_field_to_null_str(C
 int
 set_field_to_null(Field *field)
 {
+  field->reset();
+  field->set_temporary_nullable(true);
   if (field->real_is_nullable())
   {
     field->set_null();
-    field->reset();
     return 0;
   }
-  field->reset();
-  switch (field->table->in_use->count_cuted_fields) {
-  case CHECK_FIELD_WARN:
-    field->set_warning(Sql_condition::WARN_LEVEL_WARN, WARN_DATA_TRUNCATED, 1);
-    /* fall through */
-  case CHECK_FIELD_IGNORE:
-    return 0;
-  case CHECK_FIELD_ERROR_FOR_NULL:
-    if (!field->table->in_use->no_errors)
-      my_error(ER_BAD_NULL_ERROR, MYF(0), field->field_name);
-    return -1;
-  }
-  DBUG_ASSERT(0); // impossible
   return -1;
 }
 
@@ -164,9 +152,8 @@ set_field_to_null_with_conversions(Field
   {
     field->set_null();
     field->reset();
-    return 0;
   }
-  if (no_conversions)
+  else if (no_conversions)
     return -1;
 
   /*
@@ -174,33 +161,21 @@ set_field_to_null_with_conversions(Field
     when set to NULL (TIMESTAMP fields which allow setting to NULL
     are handled by first check).
   */
-  if (field->type() == MYSQL_TYPE_TIMESTAMP)
-  {
+  else if (field->type() == MYSQL_TYPE_TIMESTAMP)
     ((Field_timestamp*) field)->set_time();
-    return 0;					// Ok to set time to NULL
-  }
   
-  // Note: we ignore any potential failure of reset() here.
-  field->reset();
-
-  if (field == field->table->next_number_field)
+  else if (field == field->table->next_number_field ||
+           (field->table->s->found_next_number_field &&
+            field == field->table->nullable_field[field->table->s->found_next_number_field -
+                                                  field->table->s->field]))
   {
+    field->reset();
     field->table->auto_increment_field_not_null= FALSE;
-    return 0;				  // field is set in fill_record()
   }
-  switch (field->table->in_use->count_cuted_fields) {
-  case CHECK_FIELD_WARN:
-    field->set_warning(Sql_condition::WARN_LEVEL_WARN, ER_BAD_NULL_ERROR, 1);
-    /* fall through */
-  case CHECK_FIELD_IGNORE:
-    return 0;
-  case CHECK_FIELD_ERROR_FOR_NULL:
-    if (!field->table->in_use->no_errors)
-      my_error(ER_BAD_NULL_ERROR, MYF(0), field->field_name);
-    return -1;
-  }
-  DBUG_ASSERT(0); // impossible
-  return -1;
+  else
+    return set_field_to_null(field);
+
+  return 0;
 }
 
 

=== modified file 'sql/ha_ndbcluster_cond.cc'
--- a/sql/ha_ndbcluster_cond.cc	2011-07-28 10:54:44 +0000
+++ b/sql/ha_ndbcluster_cond.cc	2011-10-31 17:59:04 +0000
@@ -1051,7 +1051,9 @@ ha_ndbcluster_cond::build_scan_filter_pr
     {
       if (!value || !field) break;
       // Save value in right format for the field type
-      value->save_in_field(field);
+      if (!value->save_in_field(field) && field->check_constraints())
+        DBUG_RETURN(1);
+
       DBUG_PRINT("info", ("Generating EQ filter"));
       if (filter->cmp(NdbScanFilter::COND_EQ, 
                       field->get_field_no(),
@@ -1065,7 +1067,9 @@ ha_ndbcluster_cond::build_scan_filter_pr
     {
       if (!value || !field) break;
       // Save value in right format for the field type
-      value->save_in_field(field);
+      if (!value->save_in_field(field) && field->check_constraints())
+        DBUG_RETURN(1);
+
       DBUG_PRINT("info", ("Generating NE filter"));
       if (filter->cmp(NdbScanFilter::COND_NE, 
                       field->get_field_no(),
@@ -1079,7 +1083,9 @@ ha_ndbcluster_cond::build_scan_filter_pr
     {
       if (!value || !field) break;
       // Save value in right format for the field type
-      value->save_in_field(field);
+      if (!value->save_in_field(field) && field->check_constraints())
+        DBUG_RETURN(1);
+
       if (a == field)
       {
         DBUG_PRINT("info", ("Generating LT filter")); 
@@ -1105,7 +1111,9 @@ ha_ndbcluster_cond::build_scan_filter_pr
     {
       if (!value || !field) break;
       // Save value in right format for the field type
-      value->save_in_field(field);
+      if (!value->save_in_field(field) && field->check_constraints())
+        DBUG_RETURN(1);
+
       if (a == field)
       {
         DBUG_PRINT("info", ("Generating LE filter")); 
@@ -1131,7 +1139,9 @@ ha_ndbcluster_cond::build_scan_filter_pr
     {
       if (!value || !field) break;
       // Save value in right format for the field type
-      value->save_in_field(field);
+      if (!value->save_in_field(field) && field->check_constraints())
+        DBUG_RETURN(1);
+
       if (a == field)
       {
         DBUG_PRINT("info", ("Generating GE filter")); 
@@ -1157,7 +1167,9 @@ ha_ndbcluster_cond::build_scan_filter_pr
     {
       if (!value || !field) break;
       // Save value in right format for the field type
-      value->save_in_field(field);
+      if (!value->save_in_field(field) && field->check_constraints())
+        DBUG_RETURN(1);
+
       if (a == field)
       {
         DBUG_PRINT("info", ("Generating GT filter"));
@@ -1186,7 +1198,9 @@ ha_ndbcluster_cond::build_scan_filter_pr
           (value->qualification.value_type != Item::VARBIN_ITEM))
           break;
       // Save value in right format for the field type
-      value->save_in_field(field);
+      if (!value->save_in_field(field) && field->check_constraints())
+        DBUG_RETURN(1);
+
       DBUG_PRINT("info", ("Generating LIKE filter: like(%d,%s,%d)", 
                           field->get_field_no(), value->get_val(), 
                           value->pack_length()));
@@ -1205,7 +1219,9 @@ ha_ndbcluster_cond::build_scan_filter_pr
           (value->qualification.value_type != Item::VARBIN_ITEM))
           break;
       // Save value in right format for the field type
-      value->save_in_field(field);
+      if (!value->save_in_field(field) && field->check_constraints())
+        DBUG_RETURN(1);
+
       DBUG_PRINT("info", ("Generating NOTLIKE filter: notlike(%d,%s,%d)", 
                           field->get_field_no(), value->get_val(), 
                           value->pack_length()));

=== modified file 'sql/item.cc'
--- a/sql/item.cc	2011-10-06 07:10:19 +0000
+++ b/sql/item.cc	2011-10-31 17:59:04 +0000
@@ -1074,7 +1074,8 @@ int Item::save_in_field_no_warnings(Fiel
   thd->count_cuted_fields= CHECK_FIELD_IGNORE;
 
   res= save_in_field(field, no_conversions);
-
+  if (!res)
+    res= field->check_constraints();
   thd->count_cuted_fields= tmp;
   dbug_tmp_restore_column_map(table->write_set, old_map);
   thd->variables.sql_mode= sql_mode;
@@ -7466,6 +7467,11 @@ bool Item_trigger_field::fix_fields(THD 
 
     field= (row_version == OLD_ROW) ? triggers->old_field[field_idx] :
                                       triggers->new_field[field_idx];
+
+    /* Allow new fields to temporarily store NULL during trigger processing. */
+    if (row_version == NEW_ROW)
+      field->set_temporary_nullable(true);
+
     set_field(field);
     fixed= 1;
     return FALSE;

=== modified file 'sql/item.h'
--- a/sql/item.h	2011-09-07 12:41:53 +0000
+++ b/sql/item.h	2011-10-31 17:59:04 +0000
@@ -624,7 +624,10 @@ public:
   int save_in_field_no_warnings(Field *field, bool no_conversions);
   virtual int save_in_field(Field *field, bool no_conversions);
   virtual void save_org_in_field(Field *field)
-  { (void) save_in_field(field, 1); }
+  {
+    if (!save_in_field(field, 1))
+      field->check_constraints();
+  }
   virtual int save_safe_in_field(Field *field)
   { return save_in_field(field, 1); }
   virtual bool send(Protocol *protocol, String *str);
@@ -1936,7 +1939,8 @@ public:
   bool is_result_field() { return result_field != 0; }
   void save_in_result_field(bool no_conversions)
   {
-    save_in_field(result_field, no_conversions);
+    if (!save_in_field(result_field, no_conversions))
+      result_field->check_constraints();
   }
   bool check_partition_func_processor(uchar *int_arg) {return TRUE;}
 };  
@@ -2541,7 +2545,8 @@ public:
   bool is_result_field() { return 1; }
   void save_in_result_field(bool no_conversions)
   {
-    save_in_field(result_field, no_conversions);
+    if (!save_in_field(result_field, no_conversions))
+      result_field->check_constraints();
   }
   void cleanup();
   /*
@@ -2646,7 +2651,8 @@ public:
   bool is_result_field() { return 1; }
   void save_in_result_field(bool no_conversions)
   {
-    (*ref)->save_in_field(result_field, no_conversions);
+    if (!((*ref)->save_in_field(result_field, no_conversions)))
+      result_field->check_constraints();
   }
   Item *real_item()
   {

=== modified file 'sql/item_cmpfunc.cc'
--- a/sql/item_cmpfunc.cc	2011-09-08 12:48:08 +0000
+++ b/sql/item_cmpfunc.cc	2011-10-31 17:59:04 +0000
@@ -444,7 +444,8 @@ static bool convert_constant_item(THD *t
                                (STATUS_GARBAGE | STATUS_NOT_FOUND))));
     if (save_field_value)
       orig_field_val= field->val_int();
-    if (!(*item)->is_null() && !(*item)->save_in_field(field, 1))
+    if (!(*item)->is_null() && !(*item)->save_in_field(field, 1) &&
+        !field->check_constraints())
     {
       Item *tmp= new Item_int_with_ref(field->val_int(), *item,
                                        test(field->flags & UNSIGNED_FLAG));

=== modified file 'sql/item_func.h'
--- a/sql/item_func.h	2011-07-04 00:25:46 +0000
+++ b/sql/item_func.h	2011-10-31 17:59:04 +0000
@@ -1514,7 +1514,11 @@ public:
   {
     return save_in_field(field, no_conversions, 1);
   }
-  void save_org_in_field(Field *field) { (void)save_in_field(field, 1, 0); }
+  void save_org_in_field(Field *field)
+  {
+    if (!save_in_field(field, 1, 0))
+      field->check_constraints();
+  }
   bool register_field_in_read_map(uchar *arg);
   bool set_entry(THD *thd, bool create_if_not_exists);
   void cleanup();

=== modified file 'sql/item_sum.cc'
--- a/sql/item_sum.cc	2011-07-29 09:29:11 +0000
+++ b/sql/item_sum.cc	2011-10-31 17:59:04 +0000
@@ -990,7 +990,9 @@ bool Aggregator_distinct::add()
   }
   else
   {
-    item_sum->get_arg(0)->save_in_field(table->field[0], FALSE);
+    if (!item_sum->get_arg(0)->save_in_field(table->field[0], FALSE))
+      table->field[0]->check_constraints();
+
     if (table->field[0]->is_null())
       return 0;
     DBUG_ASSERT(tree);

=== modified file 'sql/partition_info.cc'
--- a/sql/partition_info.cc	2011-07-28 10:54:44 +0000
+++ b/sql/partition_info.cc	2011-10-31 17:59:04 +0000
@@ -2282,6 +2282,7 @@ bool partition_info::fix_column_value_fu
         save_got_warning= thd->got_warning;
         thd->got_warning= 0;
         if (column_item->save_in_field(field, TRUE) ||
+            field->check_constraints() ||
             thd->got_warning)
         {
           my_error(ER_WRONG_TYPE_COLUMN_VALUE_ERROR, MYF(0));

=== modified file 'sql/sp_head.cc'
--- a/sql/sp_head.cc	2011-10-12 15:35:10 +0000
+++ b/sql/sp_head.cc	2011-10-31 17:59:04 +0000
@@ -400,7 +400,8 @@ sp_eval_expr(THD *thd, Field *result_fie
 
   /* Save the value in the field. Convert the value if needed. */
 
-  expr_item->save_in_field(result_field, 0);
+  if (!expr_item->save_in_field(result_field, 0))
+    result_field->check_constraints();
 
   thd->count_cuted_fields= save_count_cuted_fields;
   thd->abort_on_warning= save_abort_on_warning;

=== modified file 'sql/sql_base.cc'
--- a/sql/sql_base.cc	2011-09-28 07:42:55 +0000
+++ b/sql/sql_base.cc	2011-10-31 17:59:04 +0000
@@ -6563,16 +6563,21 @@ Field *
 find_field_in_table(THD *thd, TABLE *table, const char *name, uint length,
                     bool allow_rowid, uint *cached_field_index_ptr)
 {
-  Field **field_ptr, *field;
+  Field **field_ptr, *field, **table_field;
   uint cached_field_index= *cached_field_index_ptr;
   DBUG_ENTER("find_field_in_table");
   DBUG_PRINT("enter", ("table: '%s', field name: '%s'", table->alias, name));
 
+  if (thd->mark_used_columns == MARK_COLUMNS_WRITE && table->nullable_field)
+    table_field= table->nullable_field;
+  else
+    table_field= table->field;
+
   /* We assume here that table->field < NO_CACHED_FIELD_INDEX = UINT_MAX */
   if (cached_field_index < table->s->fields &&
       !my_strcasecmp(system_charset_info,
                      table->field[cached_field_index]->field_name, name))
-    field_ptr= table->field + cached_field_index;
+    field_ptr= table_field + cached_field_index;
   else if (table->s->name_hash.records)
   {
     field_ptr= (Field**) my_hash_search(&table->s->name_hash, (uchar*) name,
@@ -6583,12 +6588,12 @@ find_field_in_table(THD *thd, TABLE *tab
         field_ptr points to field in TABLE_SHARE. Convert it to the matching
         field in table
       */
-      field_ptr= (table->field + (field_ptr - table->s->field));
+      field_ptr= (table_field + (field_ptr - table->s->field));
     }
   }
   else
   {
-    if (!(field_ptr= table->field))
+    if (!(field_ptr= table_field))
       DBUG_RETURN((Field *)0);
     for (; *field_ptr; ++field_ptr)
       if (!my_strcasecmp(system_charset_info, (*field_ptr)->field_name, name))
@@ -6597,7 +6602,7 @@ find_field_in_table(THD *thd, TABLE *tab
 
   if (field_ptr && *field_ptr)
   {
-    *cached_field_index_ptr= field_ptr - table->field;
+    *cached_field_index_ptr= field_ptr - table_field;
     field= *field_ptr;
   }
   else
@@ -6606,7 +6611,7 @@ find_field_in_table(THD *thd, TABLE *tab
         my_strcasecmp(system_charset_info, name, "_rowid") ||
         table->s->rowid_field_offset == 0)
       DBUG_RETURN((Field*) 0);
-    field= table->field[table->s->rowid_field_offset-1];
+    field= table_field[table->s->rowid_field_offset-1];
   }
 
   update_field_dependencies(thd, field, table);
@@ -8864,6 +8869,40 @@ err:
 
 
 /*
+  Check that field values does not break constraints.
+
+  SYNOPSIS
+    check_record()
+    thd           thread handler
+    fields        Item_fields list to be checked
+    ignore_errors TRUE if we should ignore errors
+
+  RETURN
+    FALSE   OK
+    TRUE    error occured
+*/
+
+static bool
+check_record(THD *thd, List<Item> &fields, bool ignore_errors)
+{
+  List_iterator_fast<Item> f(fields);
+  Item *fld;
+  Item_field *field;
+
+  while ((fld= f++))
+  {
+    field= fld->filed_for_view_update();
+    if (field->field->check_constraints() && !ignore_errors)
+    {
+      my_message(ER_UNKNOWN_ERROR, ER(ER_UNKNOWN_ERROR), MYF(0));
+      return TRUE;
+    }
+  }
+  return thd->is_error();
+}
+
+
+/*
   Fill fields in list with values from the list of items and invoke
   before triggers.
 
@@ -8892,9 +8931,16 @@ fill_record_n_invoke_before_triggers(THD
                                      Table_triggers_list *triggers,
                                      enum trg_event_type event)
 {
-  return (fill_record(thd, fields, values, ignore_errors) ||
-          (triggers && triggers->process_triggers(thd, event,
-                                                 TRG_ACTION_BEFORE, TRUE)));
+  bool error_fill= (fill_record(thd, fields, values, ignore_errors) ||
+                    (triggers && triggers->process_triggers(thd, event,
+                                                            TRG_ACTION_BEFORE,
+                                                            true)));
+  /*
+    We have to call check_record even if fill_record failed in order
+    to revert any illegal values (such as NULL in NOT NULL fields).
+  */
+  bool error_check= check_record(thd, fields, ignore_errors);
+  return (error_fill || error_check);
 }
 
 
@@ -8959,6 +9005,32 @@ err:
 
 
 /*
+  Check that field values does not break constraints.
+
+  SYNOPSIS
+    check_record()
+    thd           thread handler
+    ptr           pointer on pointer to record
+
+  RETURN
+    FALSE   OK
+    TRUE    error occured
+*/
+
+bool
+check_record(THD *thd, Field **ptr)
+{
+  Field *field;
+  while ((field = *ptr++) && !thd->is_error())
+  {
+    if (field->check_constraints())
+      return TRUE;
+  }
+  return thd->is_error();
+}
+
+
+/*
   Fill fields in array with values from the list of items and invoke
   before triggers.
 
@@ -8987,9 +9059,16 @@ fill_record_n_invoke_before_triggers(THD
                                      Table_triggers_list *triggers,
                                      enum trg_event_type event)
 {
-  return (fill_record(thd, ptr, values, ignore_errors) ||
-          (triggers && triggers->process_triggers(thd, event,
-                                                 TRG_ACTION_BEFORE, TRUE)));
+  bool error_fill= (fill_record(thd, ptr, values, ignore_errors) ||
+                    (triggers && triggers->process_triggers(thd, event,
+                                                            TRG_ACTION_BEFORE,
+                                                            true)));
+  /*
+    We have to call check_record even if fill_record failed in order
+    to revert any illegal values (such as NULL in NOT NULL fields).
+  */
+  bool error_check= check_record(thd, ptr);
+  return (error_fill || error_check);
 }
 
 

=== modified file 'sql/sql_base.h'
--- a/sql/sql_base.h	2011-07-28 10:54:44 +0000
+++ b/sql/sql_base.h	2011-10-31 17:59:04 +0000
@@ -189,6 +189,7 @@ bool setup_fields(THD *thd, Ref_ptr_arra
                   List<Item> *sum_func_list, bool allow_sum_func);
 bool fill_record(THD *thd, Field **field, List<Item> &values,
                  bool ignore_errors);
+bool check_record(THD *thd, Field **field);
 
 Field *
 find_field_in_tables(THD *thd, Item_ident *item,

=== modified file 'sql/sql_handler.cc'
--- a/sql/sql_handler.cc	2011-07-28 10:54:44 +0000
+++ b/sql/sql_handler.cc	2011-10-31 17:59:04 +0000
@@ -747,7 +747,8 @@ retry:
 	  goto err;
         }
         old_map= dbug_tmp_use_all_columns(table, table->write_set);
-	(void) item->save_in_field(key_part->field, 1);
+	if (!item->save_in_field(key_part->field, 1))
+	  key_part->field->check_constraints();
         dbug_tmp_restore_column_map(table->write_set, old_map);
 	key_len+=key_part->store_length;
         keypart_map= (keypart_map << 1) | 1;

=== modified file 'sql/sql_insert.cc'
--- a/sql/sql_insert.cc	2011-08-16 19:33:03 +0000
+++ b/sql/sql_insert.cc	2011-10-31 17:59:04 +0000
@@ -909,8 +909,8 @@ bool mysql_insert(THD *thd,TABLE_LIST *t
             share->default_values[share->null_bytes - 1];
         }
       }
-      if (fill_record_n_invoke_before_triggers(thd, table->field, *values, 0,
-                                               table->triggers,
+      if (fill_record_n_invoke_before_triggers(thd, table->nullable_field,
+                                               *values, 0, table->triggers,
                                                TRG_EVENT_INSERT))
       {
 	if (values_list.elements != 1 && ! thd->is_error())
@@ -2229,7 +2229,9 @@ TABLE *Delayed_insert::get_local_table(T
   Field **field,**org_field, *found_next_number_field;
   TABLE *copy;
   TABLE_SHARE *share;
-  uchar *bitmap;
+  uchar *bitmap, *extra_null_ptr;
+  uint extra_null_bytes, extra_null_bit_pos;
+
   DBUG_ENTER("Delayed_insert::get_local_table");
 
   /* First request insert thread to get a lock */
@@ -2273,17 +2275,19 @@ TABLE *Delayed_insert::get_local_table(T
   share= table->s;
 
   /*
-    Allocate memory for the TABLE object, the field pointers array, and
+    Allocate memory for the TABLE object, the field pointers arrays, and
     one record buffer of reclength size. Normally a table has three
     record buffers of rec_buff_length size, which includes alignment
     bytes. Since the table copy is used for creating one record only,
     the other record buffers and alignment are unnecessary.
   */
   THD_STAGE_INFO(client_thd, stage_allocating_local_table);
-  copy= (TABLE*) client_thd->alloc(sizeof(*copy)+
-				   (share->fields+1)*sizeof(Field**)+
-				   share->reclength +
-                                   share->column_bitmap_size*2);
+  // Extra bytes to temporarily store null_bits for NOT NULL fields
+  extra_null_bytes= (share->fields - share->null_fields + 7) / 8;
+  copy= (TABLE*) client_thd->alloc(sizeof(*copy) +
+                                   2 * (share->fields+1) * sizeof(Field**) +
+                                   share->reclength + extra_null_bytes +
+                                   share->column_bitmap_size * 2);
   if (!copy)
     goto error;
 
@@ -2292,9 +2296,12 @@ TABLE *Delayed_insert::get_local_table(T
   /* We don't need to change the file handler here */
   /* Assign the pointers for the field pointers array and the record. */
   field= copy->field= (Field**) (copy + 1);
-  bitmap= (uchar*) (field + share->fields + 1);
+  bitmap= (uchar*) (field + 2 * (share->fields + 1));
   copy->record[0]= (bitmap + share->column_bitmap_size * 2);
   memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength);
+  extra_null_ptr= copy->record[0] + share->reclength;
+  memset(extra_null_ptr, 0, extra_null_bytes);
+  extra_null_bit_pos= 0;
   /*
     Make a copy of all fields.
     The copied fields need to point into the copied record. This is done
@@ -2310,17 +2317,60 @@ TABLE *Delayed_insert::get_local_table(T
       goto error;
     (*field)->orig_table= copy;			// Remove connection
     (*field)->move_field_offset(adjust_ptrs);	// Point at copy->record[0]
+
+    if (!(*field)->real_is_nullable())
+    {
+      (*field)->null_ptr= extra_null_ptr;
+      (*field)->null_bit= ((uchar) 1) << extra_null_bit_pos;
+      if (++extra_null_bit_pos == 8)
+      {
+        extra_null_bit_pos= 0;
+        extra_null_ptr++;
+      }
+    }
+    if (*org_field == found_next_number_field)
+      (*field)->table->found_next_number_field= *field;
+  }
+  *field=0;
+
+  /* nullable Fields */
+  copy->nullable_field= ++field;
+  extra_null_ptr= copy->record[0] + share->reclength;
+  extra_null_bit_pos= 0;
+
+  for (org_field= table->field; *org_field; org_field++, field++)
+  {
+    if (!(*field= (*org_field)->new_field(client_thd->mem_root, copy, 1)))
+      goto error;
+    (*field)->orig_table= copy;         // Remove connection
+    (*field)->move_field_offset(adjust_ptrs);   // Point at copy->record[0]
+
+    if (!(*field)->real_is_nullable())
+    {
+      (*field)->null_ptr= extra_null_ptr;
+      (*field)->null_bit= ((uchar) 1) << extra_null_bit_pos;
+      if (++extra_null_bit_pos == 8)
+      {
+        extra_null_bit_pos= 0;
+        extra_null_ptr++;
+      }
+    }
     if (*org_field == found_next_number_field)
       (*field)->table->found_next_number_field= *field;
   }
   *field=0;
 
+  if (table->found_next_number_field)
+    copy->found_next_number_field=
+      copy->nullable_field[(uint) (share->found_next_number_field -
+                                   share->field)];
+
   /* Adjust timestamp */
   if (table->timestamp_field)
   {
     /* Restore offset as this may have been reset in handle_inserts */
     copy->timestamp_field=
-      (Field_timestamp*) copy->field[share->timestamp_field_offset];
+      (Field_timestamp*) copy->nullable_field[share->timestamp_field_offset];
     copy->timestamp_field->unireg_check= table->timestamp_field->unireg_check;
     copy->timestamp_field_type= copy->timestamp_field->get_auto_set_type();
   }
@@ -3513,7 +3563,7 @@ void select_insert::store_values(List<It
     fill_record_n_invoke_before_triggers(thd, *fields, values, 1,
                                          table->triggers, TRG_EVENT_INSERT);
   else
-    fill_record_n_invoke_before_triggers(thd, table->field, values, 1,
+    fill_record_n_invoke_before_triggers(thd, table->nullable_field, values, 1,
                                          table->triggers, TRG_EVENT_INSERT);
 }
 

=== modified file 'sql/sql_load.cc'
--- a/sql/sql_load.cc	2011-10-06 07:10:19 +0000
+++ b/sql/sql_load.cc	2011-10-31 17:59:04 +0000
@@ -263,7 +263,7 @@ int mysql_load(THD *thd,sql_exchange *ex
   if (!fields_vars.elements)
   {
     Field **field;
-    for (field=table->field; *field ; field++)
+    for (field=table->nullable_field; *field ; field++)
       fields_vars.push_back(new Item_field(*field));
     bitmap_set_all(table->write_set);
     table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;

=== modified file 'sql/sql_select.cc'
--- a/sql/sql_select.cc	2011-10-12 15:35:10 +0000
+++ b/sql/sql_select.cc	2011-10-31 17:59:04 +0000
@@ -10202,6 +10202,8 @@ store_val_in_field(Field *field, Item *i
   enum_check_fields old_count_cuted_fields= thd->count_cuted_fields;
   thd->count_cuted_fields= check_flag;
   error= item->save_in_field(field, 1);
+  if (!error)
+    error= field->check_constraints();
   thd->count_cuted_fields= old_count_cuted_fields;
   dbug_tmp_restore_column_map(table->write_set, old_map);
   return error || cuted_fields != thd->cuted_fields;

=== modified file 'sql/sql_select.h'
--- a/sql/sql_select.h	2011-10-06 07:10:19 +0000
+++ b/sql/sql_select.h	2011-10-31 17:59:04 +0000
@@ -2328,6 +2328,8 @@ public:
     my_bitmap_map *old_map= dbug_tmp_use_all_columns(table,
                                                      table->write_set);
     int res= item->save_in_field(to_field, 1);
+    if (!res)
+      res= to_field->check_constraints();
     /*
      Item::save_in_field() may call Item::val_xxx(). And if this is a subquery
      we need to check for errors executing it and react accordingly

=== modified file 'sql/sql_show.cc'
--- a/sql/sql_show.cc	2011-10-06 07:10:19 +0000
+++ b/sql/sql_show.cc	2011-10-31 17:59:04 +0000
@@ -1451,6 +1451,10 @@ int store_create_info(THD *thd, TABLE_LI
   */
   old_map= tmp_use_all_columns(table, table->read_set);
 
+  Field *timestamp_field= table->timestamp_field;
+  if (timestamp_field)
+    timestamp_field= table->field[share->timestamp_field_offset];
+
   for (ptr=table->field ; (field= *ptr); ptr++)
   {
     if (ptr != table->field)
@@ -1498,14 +1502,14 @@ int store_create_info(THD *thd, TABLE_LI
       packet->append(STRING_WITH_LEN(" NULL"));
     }
 
-    if (get_field_default_value(thd, table->timestamp_field,
+    if (get_field_default_value(thd, timestamp_field,
                                 field, &def_value, 1))
     {
       packet->append(STRING_WITH_LEN(" DEFAULT "));
       packet->append(def_value.ptr(), def_value.length(), system_charset_info);
     }
 
-    if (!limited_mysql_mode && table->timestamp_field == field && 
+    if (!limited_mysql_mode && timestamp_field == field &&
         field->unireg_check != Field::TIMESTAMP_DN_FIELD)
       packet->append(STRING_WITH_LEN(" ON UPDATE CURRENT_TIMESTAMP"));
 
@@ -4540,6 +4544,9 @@ static int get_schema_column_record(THD 
   count= 0;
   ptr= show_table->field;
   timestamp_field= show_table->timestamp_field;
+  if (timestamp_field)
+    timestamp_field= show_table->field[show_table->s->timestamp_field_offset];
+
   show_table->use_all_columns();               // Required for default
   restore_record(show_table, s->default_values);
 

=== modified file 'sql/sql_table.cc'
--- a/sql/sql_table.cc	2011-10-06 07:10:19 +0000
+++ b/sql/sql_table.cc	2011-10-31 17:59:04 +0000
@@ -5576,7 +5576,10 @@ mysql_prepare_alter_table(THD *thd, TABL
     First collect all fields from table which isn't in drop_list
   */
   Field **f_ptr,*field;
-  for (f_ptr=table->field ; (field= *f_ptr) ; f_ptr++)
+  Field **table_fields= table->field;
+  if (table->nullable_field)
+    table_fields= table->nullable_field;
+  for (f_ptr=table_fields ; (field= *f_ptr) ; f_ptr++)
   {
     if (field->type() == MYSQL_TYPE_STRING)
       create_info->varchar= TRUE;
@@ -7298,7 +7301,10 @@ copy_data_between_tables(TABLE *from,TAB
   List_iterator<Create_field> it(create);
   Create_field *def;
   copy_end=copy;
-  for (Field **ptr=to->field ; *ptr ; ptr++)
+  Field** table_fields= to->field;
+  if (to->nullable_field)
+    table_fields= to->nullable_field;
+  for (Field **ptr=table_fields ; *ptr ; ptr++)
   {
     def=it++;
     if (def->field)

=== modified file 'sql/sql_trigger.cc'
--- a/sql/sql_trigger.cc	2011-08-19 13:04:28 +0000
+++ b/sql/sql_trigger.cc	2011-10-31 17:59:04 +0000
@@ -2123,7 +2123,7 @@ bool Table_triggers_list::process_trigge
   if (old_row_is_record1)
   {
     old_field= record1_field;
-    new_field= trigger_table->field;
+    new_field= trigger_table->nullable_field;
   }
   else
   {
@@ -2155,6 +2155,18 @@ bool Table_triggers_list::process_trigge
 
   thd->restore_sub_statement_state(&statement_state);
 
+  /*
+    Turn off temporal nullability of new fields.
+    Also check that fields to be written conform to constraints.
+  */
+  Field *field;
+  while ((field = *new_field++))
+  {
+    field->set_temporary_nullable(FALSE);
+    if (field->table == trigger_table)
+      err_status |= field->check_constraints();
+  }
+
   return err_status;
 }
 

=== modified file 'sql/sql_union.cc'
--- a/sql/sql_union.cc	2011-09-21 11:01:41 +0000
+++ b/sql/sql_union.cc	2011-10-31 17:59:04 +0000
@@ -60,6 +60,7 @@ bool select_union::send_data(List<Item> 
     return 0;
   }
   fill_record(thd, table->field, values, 1);
+  check_record(thd, table->field);
   if (thd->is_error())
     return 1;
 

=== modified file 'sql/sql_update.cc'
--- a/sql/sql_update.cc	2011-10-12 15:35:10 +0000
+++ b/sql/sql_update.cc	2011-10-31 17:59:04 +0000
@@ -2025,7 +2025,8 @@ bool multi_update::send_data(List<Item> 
       fill_record(thd,
                   tmp_table->field + 1 + unupdated_check_opt_tables.elements,
                   *values_for_table[offset], 1);
-
+      check_record(thd,
+                   tmp_table->field + 1 + unupdated_check_opt_tables.elements);
       /* Write row, ignoring duplicated updates to a row */
       error= tmp_table->file->ha_write_row(tmp_table->record[0]);
       if (error != HA_ERR_FOUND_DUPP_KEY && error != HA_ERR_FOUND_DUPP_UNIQUE)

=== modified file 'sql/sql_view.cc'
--- a/sql/sql_view.cc	2011-08-19 13:04:28 +0000
+++ b/sql/sql_view.cc	2011-10-31 17:59:04 +0000
@@ -1857,9 +1857,9 @@ bool check_key_in_view(THD *thd, TABLE_L
         Field_translator *k;
         for (k= trans; k < end_of_trans; k++)
         {
-          Item_field *field;
-          if ((field= k->item->filed_for_view_update()) &&
-              field->field == key_part->field)
+          Item_field *field= k->item->filed_for_view_update();
+          if (field && ((field->field == table->nullable_field[key_part->fieldnr-1]) ||
+              (field->field == table->field[key_part->fieldnr-1])))
             break;
         }
         if (k == end_of_trans)
@@ -1875,7 +1875,7 @@ bool check_key_in_view(THD *thd, TABLE_L
   {
     Field **field_ptr;
     Field_translator *fld;
-    for (field_ptr= table->field; *field_ptr; field_ptr++)
+    for (field_ptr= table->nullable_field; *field_ptr; field_ptr++)
     {
       for (fld= trans; fld < end_of_trans; fld++)
       {

=== modified file 'sql/table.cc'
--- a/sql/table.cc	2011-10-12 15:35:10 +0000
+++ b/sql/table.cc	2011-10-31 17:59:04 +0000
@@ -1908,9 +1908,9 @@ int open_table_from_share(THD *thd, TABL
                           TABLE *outparam, bool is_create_table)
 {
   int error;
-  uint records, i, bitmap_size;
+  uint records, i, bitmap_size, extra_null_bytes, extra_null_bit_pos;
   bool error_reported= FALSE;
-  uchar *record, *bitmaps;
+  uchar *record, *bitmaps, *extra_null_ptr;
   Field **field_ptr;
   DBUG_ENTER("open_table_from_share");
   DBUG_PRINT("enter",("name: '%s.%s'  form: 0x%lx", share->db.str,
@@ -1954,8 +1954,12 @@ int open_table_from_share(THD *thd, TABL
   if (prgflag & (READ_ALL+EXTRA_RECORD))
     records++;
 
-  if (!(record= (uchar*) alloc_root(&outparam->mem_root,
-                                   share->rec_buff_length * records)))
+  // Extra bytes to temporarily store null_bits for NOT NULL fields
+  extra_null_bytes= 0;
+  if (records)
+    extra_null_bytes= (share->fields - share->null_fields + 7) / 8;
+  if (!(record= (uchar*) alloc_root(&outparam->mem_root, extra_null_bytes +
+                                    share->rec_buff_length * records)))
     goto err;                                   /* purecov: inspected */
 
   if (records == 0)
@@ -1965,6 +1969,7 @@ int open_table_from_share(THD *thd, TABL
   }
   else
   {
+    memset((char*)(record + (share->rec_buff_length * records)), 0, extra_null_bytes);
     outparam->record[0]= record;
     if (records > 1)
       outparam->record[1]= record+ share->rec_buff_length;
@@ -1985,17 +1990,18 @@ int open_table_from_share(THD *thd, TABL
 #endif
 
   if (!(field_ptr = (Field **) alloc_root(&outparam->mem_root,
-                                          (uint) ((share->fields+1)*
+                                          (uint) (2 * (share->fields+1) *
                                                   sizeof(Field*)))))
     goto err;                                   /* purecov: inspected */
 
   outparam->field= field_ptr;
+  extra_null_ptr= outparam->record[0] + (share->rec_buff_length * records);
+  extra_null_bit_pos= 0;
 
-  record= (uchar*) outparam->record[0]-1;	/* Fieldstart = 1 */
   if (share->null_field_first)
-    outparam->null_flags= (uchar*) record+1;
+    outparam->null_flags= outparam->record[0];
   else
-    outparam->null_flags= (uchar*) (record+ 1+ share->reclength -
+    outparam->null_flags= (uchar*) (outparam->record[0] + share->reclength -
                                     share->null_bytes);
 
   /* Setup copy of fields from share, but use the right alias and record */
@@ -2008,14 +2014,54 @@ int open_table_from_share(THD *thd, TABL
     new_field->init(outparam);
     new_field->move_field_offset((my_ptrdiff_t) (outparam->record[0] -
                                                  outparam->s->default_values));
+    if (!(new_field->real_is_nullable()))
+    {
+      new_field->null_ptr= extra_null_ptr;
+      new_field->null_bit= ((uchar) 1) << extra_null_bit_pos;
+      if (++extra_null_bit_pos == 8)
+      {
+        extra_null_bit_pos= 0;
+        extra_null_ptr++;
+      }
+    }
+  }
+  (*field_ptr)= 0;                              // End marker
+
+  /* nullable Fields */
+  outparam->nullable_field= ++field_ptr;
+  extra_null_ptr= outparam->record[0] + (share->rec_buff_length * records);
+  extra_null_bit_pos= 0;
+
+  /* Setup copy of fields from share, but use the right alias and record */
+  for (i=0 ; i < share->fields; i++, field_ptr++)
+  {
+    Field *new_field= share->field[i]->clone(&outparam->mem_root);
+    *field_ptr= new_field;
+    if (new_field == NULL)
+      goto err;
+    new_field->init(outparam);
+    new_field->move_field_offset((my_ptrdiff_t) (outparam->record[0] -
+                                                 outparam->s->default_values));
+
+    if (!((*field_ptr)->real_is_nullable()))
+    {
+      (*field_ptr)->null_ptr= extra_null_ptr;
+      (*field_ptr)->null_bit= ((uchar) 1) << extra_null_bit_pos;
+      if (++extra_null_bit_pos == 8)
+      {
+        extra_null_bit_pos= 0;
+        extra_null_ptr++;
+      }
+    }
   }
   (*field_ptr)= 0;                              // End marker
 
   if (share->found_next_number_field)
     outparam->found_next_number_field=
-      outparam->field[(uint) (share->found_next_number_field - share->field)];
+      outparam->nullable_field[(uint) (share->found_next_number_field - share->field)];
   if (share->timestamp_field)
-    outparam->timestamp_field= (Field_timestamp*) outparam->field[share->timestamp_field_offset];
+    outparam->timestamp_field=
+        (Field_timestamp*) outparam->nullable_field[share->timestamp_field_offset];
 
 
   /* Fix key->name and key_part->field */
@@ -2248,6 +2294,12 @@ int closefrm(register TABLE *table, bool
       delete *ptr;
     table->field= 0;
   }
+  if (table->nullable_field)
+  {
+    for (Field **ptr=table->nullable_field ; *ptr ; ptr++)
+      delete *ptr;
+    table->nullable_field= 0;
+  }
   delete table->file;
   table->file= 0;				/* For easier errorchecking */
 #ifdef WITH_PARTITION_STORAGE_ENGINE

=== modified file 'sql/table.h'
--- a/sql/table.h	2011-09-16 10:12:49 +0000
+++ b/sql/table.h	2011-10-31 17:59:04 +0000
@@ -958,7 +958,8 @@ public:
 
   THD	*in_use;                        /* Which thread uses this */
   Field **field;			/* Pointer to fields */
-
+  Field **nullable_field;         /* Pointer to nullable copy of fields
+                                     used during trigger processing */
   uchar *record[2];			/* Pointer to records */
   uchar *write_row_record;		/* Used as optimisation in
 					   THD::write_row */

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-trunk branch (Dmitry.Shulga:3467 to 3468) WL#6030Dmitry Shulga1 Nov