List:Commits« Previous MessageNext Message »
From:Luis Soares Date:November 19 2010 11:33am
Subject:bzr commit into mysql-next-mr branch (luis.soares:3206) WL#5597
View as plain text  
#At file:///home/lsoares/Workspace/bzr/work/features/wl5597/mysql-next-mr/ based on revid:luis.soares@stripped

 3206 Luis Soares	2010-11-19
      WL#5597
      
      Work-in-progress!
      
      Improvements on this cset:
      - fixes to idempotency handling.
      - fixes to m_curr_row pointer in exec row main loop.
      - refactory on main loop: we check what apply method to use only 
        once keep the pointer and use it inside the exec row main loop
      - added some DBUG macros
      - fixes to bugs inside do_index_scan_and_update

    modified:
      sql/log_event.cc
      sql/log_event.h
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-11-12 13:59:15 +0000
+++ b/sql/log_event.cc	2010-11-19 11:33:10 +0000
@@ -7592,6 +7592,7 @@ int Rows_log_event::do_add_row_data(ucha
 static
 my_bool is_any_column_signaled_for_table(TABLE *table, MY_BITMAP *cols)
 {
+  DBUG_ENTER("is_any_column_signaled_for_table");
 
   int nfields_set= 0;
   for (Field **ptr=table->field ;
@@ -7602,7 +7603,7 @@ my_bool is_any_column_signaled_for_table
       nfields_set++;
   }
 
-  return (nfields_set != 0);
+  DBUG_RETURN (nfields_set != 0);
 }
 
 /**
@@ -7639,15 +7640,17 @@ my_bool is_any_column_signaled_for_table
 static
 my_bool are_all_columns_signaled_for_key(KEY *keyinfo, MY_BITMAP *cols)
 {
+  DBUG_ENTER("are_all_columns_signaled_for_key");
+
   for (uint i=0 ; i < keyinfo->key_parts ;i++)
   {
     uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
     if (fieldnr >= cols->n_bits ||
         !bitmap_is_set(cols, fieldnr))
-      return FALSE;
+      DBUG_RETURN(FALSE);
   }
 
-  return TRUE;
+  DBUG_RETURN(TRUE);
 }
 
 /**
@@ -7693,6 +7696,8 @@ static
 uint
 search_key_in_table(TABLE *table, MY_BITMAP *bi_cols, uint key_type)
 {
+  DBUG_ENTER("search_key_in_table");
+
   KEY *keyinfo;
   uint res= MAX_KEY;
   uint key;
@@ -7701,7 +7706,7 @@ search_key_in_table(TABLE *table, MY_BIT
   {
     keyinfo= table->s->key_info + (uint) table->s->primary_key;
     if (are_all_columns_signaled_for_key(keyinfo, bi_cols))
-      return table->s->primary_key;
+      DBUG_RETURN(table->s->primary_key);
   }
 
   if (key_type & UNIQUE_KEY_FLAG && table->s->uniques)
@@ -7722,7 +7727,7 @@ search_key_in_table(TABLE *table, MY_BIT
            key : MAX_KEY;
 
       if (res < MAX_KEY)
-        return res;
+        DBUG_RETURN(res);
     }
   }
 
@@ -7746,11 +7751,11 @@ search_key_in_table(TABLE *table, MY_BIT
            key : MAX_KEY;
 
       if (res < MAX_KEY)
-        return res;
+        DBUG_RETURN(res);
     }
   }
 
-  return res;
+  DBUG_RETURN(res);
 }
 
 static uint decide_row_lookup_method(TABLE* table, MY_BITMAP *cols, uint event_type)
@@ -7782,6 +7787,8 @@ static uint decide_row_lookup_method(TAB
 */
 static bool record_compare(TABLE *table, MY_BITMAP *cols)
 {
+  DBUG_ENTER("record_compare");
+
   /*
     Need to set the X bit and the filler bits in both records since
     there are engines that do not set it correctly.
@@ -7880,7 +7887,7 @@ record_compare_exit:
     }
   }
 
-  return result;
+  DBUG_RETURN(result);
 }
 
 /**
@@ -7904,6 +7911,7 @@ struct row_entry
      place).
   */
   const uchar *m_curr_row;
+
 } typedef row_entry;
 
 extern "C" uchar *rows_log_event_get_key(const uchar *record, size_t *length,
@@ -7926,6 +7934,9 @@ static void rows_log_event_free_entry(ro
 
 int remove_blobs_from_record(TABLE* table)
 { 
+  
+  DBUG_ENTER("remove_blobs_from_record");
+
   int res= 0;
 
   if (table->s->blob_fields)
@@ -7942,7 +7953,7 @@ int remove_blobs_from_record(TABLE* tabl
     }
   }
 
-  return res;
+  DBUG_RETURN(res);
 }
 
 bool
@@ -7971,10 +7982,7 @@ int Rows_log_event::hash_row(Relay_log_i
   /**
      Unpack the row to be hashed.
    */ 
-  if ((error= unpack_current_row(rli, &m_cols)))
-    goto err;
-
-  else
+  if (!(error= unpack_current_row(rli, &m_cols)))
   {
     /**
        Save a copy of the original record unpacked.
@@ -8008,14 +8016,42 @@ int Rows_log_event::hash_row(Relay_log_i
       m_curr_row=m_curr_row_end;
       error= unpack_current_row(rli, &m_cols_ai);
     }
-
-    m_curr_row=m_curr_row_end;
   }
 
-err:
   return error;
 }
 
+void Rows_log_event::do_post_row_operations(Relay_log_info const *rli, int error)
+{
+        
+  /*
+    If m_curr_row_end  was not set during event execution (e.g., because
+    of errors) we can't proceed to the next row. If the error is transient
+    (i.e., error==0 at this point) we must call unpack_current_row() to set
+    m_curr_row_end.
+  */
+  
+  DBUG_PRINT("info", ("curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu",
+                      (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
+  
+  if (!m_curr_row_end && !error)
+  {
+    error= unpack_current_row(rli, &m_cols);
+  }
+
+  // at this moment m_curr_row_end should be set
+  DBUG_ASSERT(error || m_curr_row_end != NULL);
+  DBUG_ASSERT(error || m_curr_row <= m_curr_row_end);
+  DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
+  
+  m_curr_row= m_curr_row_end;
+  
+  if (error == 0 && !m_table->file->has_transactions())
+    thd->transaction.all.modified_non_trans_table=
+      thd->transaction.stmt.modified_non_trans_table= TRUE;
+  
+}
+
 int Rows_log_event::handle_idempotent_errors(Relay_log_info const *rli, int *err)
 {
   int error= *err;
@@ -8037,57 +8073,32 @@ int Rows_log_event::handle_idempotent_er
       clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
       *err= 0;
       if (idempotent_error == 0)
-        return true;
+        return *err;
     }
   }
 
-  return false;
+  return *err;
 }
 
 int Rows_log_event::do_apply_row(Relay_log_info const *rli)
 {
-   int error= 0;
-
-   /* in_use can have been set to NULL in close_tables_for_reopen */
-   THD* old_thd= m_table->in_use;
-   if (!m_table->in_use)
-     m_table->in_use= thd;
+  DBUG_ENTER("Rows_log_event::do_apply_row");
 
-   error= do_exec_row(rli);
-
-   DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
-   DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
-
-   m_table->in_use = old_thd;
-
-   if (handle_idempotent_errors(rli, &error))
-     return error;
-
-   /*
-    If m_curr_row_end  was not set during event execution (e.g., because
-    of errors) we can't proceed to the next row. If the error is transient
-    (i.e., error==0 at this point) we must call unpack_current_row() to set
-    m_curr_row_end.
-   */
-
-   DBUG_PRINT("info", ("curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu",
-                       (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
-
-   if (!m_curr_row_end && !error)
-     error= unpack_current_row(rli, &m_cols);
-
-   // at this moment m_curr_row_end should be set
-   DBUG_ASSERT(error || m_curr_row_end != NULL);
-   DBUG_ASSERT(error || m_curr_row <= m_curr_row_end);
-   DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
-
-   m_curr_row= m_curr_row_end;
-
-   if (error == 0 && !m_table->file->has_transactions())
-     thd->transaction.all.modified_non_trans_table=
-       thd->transaction.stmt.modified_non_trans_table= TRUE;
+  int error= 0;
+  
+  /* in_use can have been set to NULL in close_tables_for_reopen */
+  THD* old_thd= m_table->in_use;
+  if (!m_table->in_use)
+    m_table->in_use= thd;
+  
+  error= do_exec_row(rli);
+  
+  DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
+  DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
+  
+  m_table->in_use = old_thd;
 
-   return error;
+  DBUG_RETURN(error);
 }
 
 
@@ -8100,6 +8111,7 @@ int Rows_log_event::do_index_scan_and_up
   int error= 0;
   KEY *keyinfo;
   uint key;
+  const uchar *saved_m_curr_row= m_curr_row;
 
   /*
     rpl_row_tabledefs.test specifies that
@@ -8109,7 +8121,10 @@ int Rows_log_event::do_index_scan_and_up
   */
 
   prepare_record(table, &m_cols, FALSE);
-  error= unpack_current_row(rli, &m_cols);
+  if ((error= unpack_current_row(rli, &m_cols)))
+    goto end;
+
+  saved_m_curr_row= m_curr_row;
 
   // Temporary fix to find out why it fails [/Matz]
   memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
@@ -8117,7 +8132,7 @@ int Rows_log_event::do_index_scan_and_up
   if (!is_any_column_signaled_for_table(table, &m_cols))
   {
     error= HA_ERR_END_OF_FILE;
-    goto err;
+    goto end;
   }
 
 #ifndef DBUG_OFF
@@ -8149,12 +8164,13 @@ int Rows_log_event::do_index_scan_and_up
                                  table->s->reclength) == 0);
 
     */
+
     DBUG_PRINT("info",("locating record using primary key (position)"));
-    int error;
     if (table->file->inited && (error= table->file->ha_index_end()))
-      DBUG_RETURN(error);
+      goto end;
+
     if ((error= table->file->ha_rnd_init(FALSE)))
-      DBUG_RETURN(error);
+      goto end;
 
     error= table->file->rnd_pos_by_record(table->record[0]);
 
@@ -8166,7 +8182,8 @@ int Rows_log_event::do_index_scan_and_up
         error= HA_ERR_KEY_NOT_FOUND;
       table->file->print_error(error, MYF(0));
     }
-    DBUG_RETURN(error);
+    
+    goto end;
   }
 
   // We can't use position() - try other methods.
@@ -8185,7 +8202,7 @@ INDEX_SCAN:
     /* we dont have a key, or no key is suitable for the BI values */
   {
     error= HA_ERR_KEY_NOT_FOUND;
-    goto err;
+    goto end;
   }
 
   {
@@ -8199,7 +8216,7 @@ INDEX_SCAN:
     {
       DBUG_PRINT("info",("ha_index_init returns error %d",error));
       table->file->print_error(error, MYF(0));
-      goto err;
+      goto end;
     }
 
     /* Fill key data for the row */
@@ -8233,8 +8250,7 @@ INDEX_SCAN:
       if (error == HA_ERR_RECORD_DELETED)
         error= HA_ERR_KEY_NOT_FOUND;
       table->file->print_error(error, MYF(0));
-      table->file->ha_index_end();
-      goto err;
+      goto end;
     }
 
   /*
@@ -8262,11 +8278,8 @@ INDEX_SCAN:
     if (keyinfo->flags & HA_NOSAME || key == table->s->primary_key)
     {
       /* Unique does not have non nullable part */
-      if (!(table->key_info->flags & (HA_NULL_PART_KEY)))
-      {
-        table->file->ha_index_end();
-        goto record_found;
-      }
+      if (!(table->key_info->flags & (HA_NULL_PART_KEY)))      
+        goto end;  // record found
       else
       {
         KEY *keyinfo= table->key_info;
@@ -8283,10 +8296,7 @@ INDEX_SCAN:
         }
 
         if (!null_found)
-        {
-          table->file->ha_index_end();
-          goto record_found;
-        }
+          goto end;           // record found
 
         /* else fall through to index scan */
       }
@@ -8323,21 +8333,25 @@ INDEX_SCAN:
           continue;
         DBUG_PRINT("info",("no record matching the given row found"));
         table->file->print_error(error, MYF(0));
-        table->file->ha_index_end();
-        goto err;
+        goto end;
       }
     }
+  }
 
-    /*
-      Have to restart the scan to be able to fetch the next row.
-    */
+end:
+  if (!error)
+    error= do_apply_row(rli);
+
+  if (table->file->inited)
     table->file->ha_index_end();
-  }
 
-record_found:
-  error= do_apply_row(rli);
+  if ((get_type_code() == UPDATE_ROWS_EVENT) && 
+      (saved_m_curr_row== m_curr_row)) // we need to unpack the AI
+  {
+    m_curr_row= m_curr_row_end;
+    unpack_current_row(rli, &m_cols);
+  }
 
-err:
   table->default_column_bitmaps();
   DBUG_RETURN(error);
 
@@ -8353,14 +8367,12 @@ int Rows_log_event::do_hash_scan_and_upd
     goto err;
 
   /**
-    Last row hashed.
-
-    Now do the table scan and update according to the hash table
-    matches.
+    Last row hashed. We are handling the last (pair of) row(s).  So
+    now we do the table scan and match against the entries in the hash
+    table.
    */
-  if (m_curr_row == m_rows_end)
+  if (m_curr_row_end == m_rows_end)
   {
-
     TABLE* table= m_table;
     MY_BITMAP* read_set= &m_cols;
 
@@ -8375,6 +8387,7 @@ int Rows_log_event::do_hash_scan_and_upd
     /* Continue until we find the right record or reached the end of the table */
     do
     {
+      /* get the first record from the table */
       error= table->file->ha_rnd_next(table->record[0]);
 
       DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
@@ -8415,22 +8428,26 @@ int Rows_log_event::do_hash_scan_and_upd
                both table->record[0] and table->record[1] should have
                approximately the same contents (except maybe for blobs
                - thence we need to do full comparison below).
-             */
-            m_curr_row= entry->m_curr_row;
-            if ((error= unpack_current_row(rli, &m_cols)))
-              goto close_table_and_err;
-
-            /*
-              compare the row_entry with row taking into account
-              blobs, if there is any. If there is a match do the
-              operation and remove the entry from the hash table.
             */
-            if (!record_compare(table, read_set))
-            {
-              found_in_hash= true;
-
-              my_hash_delete(&m_hash, (uchar *)entry);
-              break;
+            m_curr_row= entry->m_curr_row;
+            if ((error= unpack_current_row(rli, &m_cols))) 
+              /* unpack errors are not elegible for idempotency */                 
+              goto close_table;
+
+            else
+            {              
+              /*
+                compare the row_entry with row taking into account
+                blobs, if there is any. If there is a match do the
+                operation and remove the entry from the hash table.
+              */
+              if (!record_compare(table, read_set))
+              {
+                found_in_hash= true;
+
+                my_hash_delete(&m_hash, (uchar *)entry);
+                break;
+              }
             }
 
             /* Next row for the loop... */
@@ -8441,8 +8458,15 @@ int Rows_log_event::do_hash_scan_and_upd
           }
 
           if (found_in_hash)
+          {
             if ((error= do_apply_row(rli)))
-              goto err;
+            {
+              if (handle_idempotent_errors(rli, &error) || error)
+                goto close_table;
+
+              do_post_row_operations(rli, error);
+            }
+          }
         }
         break;
 
@@ -8453,25 +8477,29 @@ int Rows_log_event::do_hash_scan_and_upd
         case HA_ERR_RECORD_DELETED:
           break;
 
-        case HA_ERR_END_OF_FILE: // to make it clear
+        case HA_ERR_END_OF_FILE:
         default:
           DBUG_PRINT("info", ("Failed to get next record"
               " (ha_rnd_next returns %d)",error));
-          goto close_table_and_err;
+          goto close_table;
       }
     }
 
-    while ((m_hash.records > 0) && (!error || (error == HA_ERR_RECORD_DELETED)));
-  }
+    while ((m_hash.records > 0) && 
+           (!error || (error == HA_ERR_RECORD_DELETED)));
 
-err:
-  DBUG_RETURN(error);
+close_table:
+    m_table->file->print_error(error, MYF(0));
+    m_table->file->ha_rnd_end();
 
-close_table_and_err:
-  m_table->file->print_error(error, MYF(0));
-  m_table->file->ha_rnd_end();
-  DBUG_RETURN (error);
+    /* set back the pointer to the end of the rows */
+//    if (!error || (error == HA_ERR_RECORD_DELETED))
+//      m_curr_row= m_curr_row_end= m_rows_end;
+  }
 
+err:
+  DBUG_ASSERT((m_hash.records == 0) ? (error == 0) : (m_hash.records > 0));
+  DBUG_RETURN(error);  
 }
 
 int Rows_log_event::do_table_scan_and_update(Relay_log_info const *rli)
@@ -8809,35 +8837,42 @@ int Rows_log_event::do_apply_event(Relay
         is_any_column_signaled_for_table(table, &m_cols_ai))
     {
       uint row_lookup_method= decide_row_lookup_method(table, &m_cols, get_type_code());
+      
+      int (Rows_log_event::*do_apply_row_ptr)(Relay_log_info const *)= NULL;
+      switch (row_lookup_method)
+      {
+        case ROW_LOOKUP_HASH_SCAN:
+          do_apply_row_ptr= &Rows_log_event::do_hash_scan_and_update;
+          break;
+        
+        case ROW_LOOKUP_INDEX_SCAN:
+          do_apply_row_ptr= &Rows_log_event::do_index_scan_and_update;
+          break;
+        
+        case ROW_LOOKUP_TABLE_SCAN:
+          do_apply_row_ptr= &Rows_log_event::do_table_scan_and_update;
+          break;
+        
+        case ROW_LOOKUP_NOT_NEEDED:
+          DBUG_ASSERT(get_type_code() == WRITE_ROWS_EVENT);
+        
+          /* No need to scan for rows, just apply it */
+          do_apply_row_ptr= &Rows_log_event::do_apply_row;
+          break;
+      }
 
       // row processing loop
       while (!error && (m_curr_row != m_rows_end))
       {
-        switch (row_lookup_method)
-        {
-          case ROW_LOOKUP_HASH_SCAN:
-            /**
-               scan the table and for each entry in the table, if
-               it exists in the hash, execute the row.
-             */
-            error= do_hash_scan_and_update(rli);
-           break;
-
-          case ROW_LOOKUP_INDEX_SCAN:
-            error= do_index_scan_and_update(rli);
-            break;
-
-          case ROW_LOOKUP_TABLE_SCAN:
-            error= do_table_scan_and_update(rli);
-            break;
-
-          case ROW_LOOKUP_NOT_NEEDED:
-            DBUG_ASSERT(get_type_code() == WRITE_ROWS_EVENT);
-
-            /* No need to scan for rows, just apply it */
-            error= do_apply_row(rli);
-            break;
-        }
+
+        error= (this->*do_apply_row_ptr)(rli);
+
+        if (handle_idempotent_errors(rli, &error))
+          break;
+
+        /* this advances m_curr_row */
+        do_post_row_operations(rli, error);
+
       }
     }
 

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2010-11-05 00:31:22 +0000
+++ b/sql/log_event.h	2010-11-19 11:33:10 +0000
@@ -3821,6 +3821,7 @@ private:
 
   int hash_row(Relay_log_info const *rli);
   int handle_idempotent_errors(Relay_log_info const *rli, int *err);
+  void do_post_row_operations(Relay_log_info const *rli, int err);
   int do_apply_row(Relay_log_info const *rli);
   int do_index_scan_and_update(Relay_log_info const *rli);
   int do_hash_scan_and_update(Relay_log_info const *rli);


Attachment: [text/bzr-bundle] bzr/luis.soares@oracle.com-20101119113310-suzckexehkbhyev0.bundle
Thread
bzr commit into mysql-next-mr branch (luis.soares:3206) WL#5597Luis Soares19 Nov