List:Commits« Previous MessageNext Message »
From:Martin Skold Date:August 29 2007 10:28am
Subject:bk commit into 5.1 tree (mskold:1.2613)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of marty. When marty does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-08-29 10:27:57+02:00, mskold@stripped +4 -0
  WL3680 On-line add attribute, ndbcluster handler part: Moved record[] from NDB_SHARE to
Ndb_event_data so each NdbEventOperation have dedicated buffers

  sql/ha_ndbcluster.cc@stripped, 2007-08-29 10:27:46+02:00, mskold@stripped +3 -1
    WL3680 On-line add attribute, ndbcluster handler part: Moved record[] from NDB_SHARE
to Ndb_event_data so each NdbEventOperation have dedicated buffers

  sql/ha_ndbcluster.h@stripped, 2007-08-29 10:27:46+02:00, mskold@stripped +1 -2
    WL3680 On-line add attribute, ndbcluster handler part: Moved record[] from NDB_SHARE
to Ndb_event_data so each NdbEventOperation have dedicated buffers

  sql/ha_ndbcluster_binlog.cc@stripped, 2007-08-29 10:27:46+02:00, mskold@stripped +81 -53
    WL3680 On-line add attribute, ndbcluster handler part: Moved record[] from NDB_SHARE
to Ndb_event_data so each NdbEventOperation have dedicated buffers

  sql/ha_ndbcluster_binlog.h@stripped, 2007-08-29 10:27:46+02:00, mskold@stripped +13 -7
    WL3680 On-line add attribute, ndbcluster handler part: Moved record[] from NDB_SHARE
to Ndb_event_data so each NdbEventOperation have dedicated buffers

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	mskold
# Host:	linux.site
# Root:	/windows/Linux_space/MySQL/mysql-5.1-telco-6.2

--- 1.489/sql/ha_ndbcluster.cc	2007-08-29 10:28:08 +02:00
+++ 1.490/sql/ha_ndbcluster.cc	2007-08-29 10:28:08 +02:00
@@ -8995,9 +8995,9 @@ void ndbcluster_real_free_share(NDB_SHAR
 
   hash_delete(&ndbcluster_open_tables, (byte*) *share);
   thr_lock_delete(&(*share)->lock);
-  pthread_mutex_destroy(&(*share)->mutex);
 
 #ifdef HAVE_NDB_BINLOG
+  (void) pthread_mutex_lock(&(*share)->mutex);
   if ((*share)->table)
   {
     // (*share)->table->mem_root is freed by closefrm
@@ -9021,7 +9021,9 @@ void ndbcluster_real_free_share(NDB_SHAR
     (*share)->table= 0;
 #endif
   }
+  (void) pthread_mutex_unlock(&(*share)->mutex);
 #endif
+  pthread_mutex_destroy(&(*share)->mutex);
   free_root(&(*share)->mem_root, MYF(0));
   my_free((gptr) *share, MYF(0));
   *share= 0;

--- 1.188/sql/ha_ndbcluster.h	2007-08-29 10:28:08 +02:00
+++ 1.189/sql/ha_ndbcluster.h	2007-08-29 10:28:08 +02:00
@@ -138,12 +138,11 @@ typedef struct st_ndbcluster_share {
   uint32 flags;
   NdbEventOperation *op;
   Uint64 op_gci;
-  List<NdbEventOperation> old_ops; // for on-line alter table
   char *old_names; // for rename table
   TABLE_SHARE *table_share;
   TABLE *table;
-  byte *record[2]; // pointer to allocated records for receiving data
   MY_BITMAP *subscriber_bitmap;
+  List<NdbEventOperation> old_ops; // for on-line alter table
 #endif
 } NDB_SHARE;
 

--- 1.130/sql/ha_ndbcluster_binlog.cc	2007-08-29 10:28:08 +02:00
+++ 1.131/sql/ha_ndbcluster_binlog.cc	2007-08-29 10:28:08 +02:00
@@ -344,8 +344,7 @@ ndbcluster_binlog_close_table(THD *thd, 
 
 static int
 ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
-                             TABLE_SHARE *table_share, TABLE *table,
-                             int reopen)
+                             TABLE_SHARE *table_share, TABLE *table)
 {
   int error;
   DBUG_ENTER("ndbcluster_binlog_open_table");
@@ -361,7 +360,7 @@ ndbcluster_binlog_open_table(THD *thd, N
     free_table_share(table_share);
     DBUG_RETURN(error);
   }
-  if ((error= open_table_from_share(thd, table_share, "", 0 /* fon't allocate buffers */,

+  if ((error= open_table_from_share(thd, table_share, "", 0 /* don't allocate buffers */,

                                     (uint) READ_ALL, 0, table, OTM_OPEN)))
   {
     sql_print_error("Unable to open table for %s, error=%d(%d)",
@@ -372,23 +371,6 @@ ndbcluster_binlog_open_table(THD *thd, N
   }
   assign_new_table_id(table_share);
 
-  if (!reopen)
-  {
-    // allocate memory on ndb share so it can be reused after online alter table
-    (void)multi_alloc_root(&share->mem_root,
-                           &(share->record[0]), table->s->rec_buff_length,
-                           &(share->record[1]), table->s->rec_buff_length,
-                           NULL);
-  }
-  {
-    my_ptrdiff_t row_offset= share->record[0] - table->record[0];
-    Field **p_field;
-    for (p_field= table->field; *p_field; p_field++)
-      (*p_field)->move_field_offset(row_offset);
-    table->record[0]= share->record[0];
-    table->record[1]= share->record[1];
-  }
-
   table->in_use= injector_thd;
   
   table->s->db.str= share->db;
@@ -423,6 +405,7 @@ void ndbcluster_binlog_init_share(NDB_SH
 
   share->op= 0;
   share->table= 0;
+  share->old_ops.empty();
 
   if (!ndb_schema_share &&
       strcmp(share->db, NDB_REP_DB) == 0 &&
@@ -459,7 +442,6 @@ void ndbcluster_binlog_init_share(NDB_SH
     {
       share->flags|= NSF_NO_BINLOG;
     }
-    share->old_ops.empty();
     DBUG_VOID_RETURN;
   }
   while (1) 
@@ -467,14 +449,13 @@ void ndbcluster_binlog_init_share(NDB_SH
     int error;
     TABLE_SHARE *table_share= (TABLE_SHARE *) alloc_root(mem_root, sizeof(*table_share));
     TABLE *table= (TABLE*) alloc_root(mem_root, sizeof(*table));
-    if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table, 0)))
+    if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table)))
       break;
 
     if (table->s->primary_key == MAX_KEY)
       share->flags|= NSF_HIDDEN_PK;
     if (table->s->blob_fields != 0)
       share->flags|= NSF_BLOB_FLAG;
-    share->old_ops.empty();
     break;
   }
   DBUG_VOID_RETURN;
@@ -1609,6 +1590,7 @@ inline
 bool
 ndb_is_old_event_op(NDB_SHARE *share, NdbEventOperation *pOp)
 {
+  safe_mutex_assert_owner(&share->mutex);
   List_iterator_fast<NdbEventOperation> it(share->old_ops);
   NdbEventOperation *op;
   if (share->old_ops.is_empty())
@@ -1625,6 +1607,7 @@ inline
 void
 ndb_remove_old_event_op(NDB_SHARE *share, NdbEventOperation *pOp)
 {
+  safe_mutex_assert_owner(&share->mutex);
   List_iterator<NdbEventOperation> it(share->old_ops);
   NdbEventOperation *op;
   if (share->old_ops.is_empty())
@@ -1642,6 +1625,7 @@ ndb_remove_old_event_op(NDB_SHARE *share
 void
 ndb_remove_old_event_ops(NDB_SHARE *share)
 {
+  safe_mutex_assert_owner(&share->mutex);
   List_iterator<NdbEventOperation> it(share->old_ops);
   NdbEventOperation *op;
   if (share->old_ops.is_empty())
@@ -1654,15 +1638,15 @@ ndb_remove_old_event_ops(NDB_SHARE *shar
       delete event_data;
       op->setCustomData(NULL);
     }
-    it.remove();
-    return;
   }
+  share->old_ops.empty();
 }
 
 inline
 void
 ndb_drop_old_event_ops(Ndb *ndb, NDB_SHARE *share)
 {
+  safe_mutex_assert_owner(&share->mutex);
   List_iterator<NdbEventOperation> it(share->old_ops);
   NdbEventOperation *op;
   if (share->old_ops.is_empty())
@@ -1676,9 +1660,9 @@ ndb_drop_old_event_ops(Ndb *ndb, NDB_SHA
         op->setCustomData(NULL);
       }
       ndb->dropEventOperation(op);
-      it.remove();
       free_share(&share);
   }
+  share->old_ops.empty();
 }
 
 
@@ -1692,6 +1676,7 @@ ndb_handle_schema_change(THD *thd, Ndb *
   DBUG_ENTER("ndb_handle_schema_change");
   MEM_ROOT *mem_root= &share->mem_root;
   TABLE_SHARE *table_share= share->table_share;
+  TABLE *table= share->table;
   const char *tabname= table_share->table_name.str;
   const char *dbname= table_share->db.str;
   const NDBTAB *altered_table= pOp->getTable();
@@ -1804,11 +1789,15 @@ ndb_handle_schema_change(THD *thd, Ndb *
 
     ndbcluster_binlog_close_table(thd, share);
 
-    TABLE *table= (share->table) ?
-	             share->table // allready allocated
-	          : (TABLE*) alloc_root(mem_root, sizeof(*table));
+    TABLE_SHARE *new_table_share= 
+      (share->table_share) ?
+        share->table_share // allready allocated
+      : (TABLE_SHARE *) alloc_root(mem_root, sizeof(*new_table_share));
+    TABLE *new_table= (share->table) ?
+                        share->table // allready allocated
+                      : (TABLE*) alloc_root(mem_root, sizeof(*new_table));
     if ((error= ndbcluster_binlog_open_table(thd, share,
-					     table_share, table, 1)))
+					     new_table_share, new_table)))
 	 sql_print_information("NDB: Failed to re-open table %s.%s",
 			       dbname, tabname);
     pthread_mutex_unlock(&LOCK_open);
@@ -2330,7 +2319,11 @@ ndb_binlog_thread_handle_schema_event_po
         // fall through
       case SOT_ALTER_TABLE:
         if (share && schema_type == SOT_ALTER_TABLE)
+        {
+          (void) pthread_mutex_lock(&share->mutex);
           ndb_drop_old_event_ops(injector_ndb, share);
+          (void) pthread_mutex_unlock(&share->mutex);
+        }
         // invalidation already handled by binlog thread
         if (!share || !share->op)
         {
@@ -3113,11 +3106,42 @@ ndbcluster_create_event_ops(THD *thd, ND
     if (share->flags & NSF_BLOB_FLAG)
       op->mergeEvents(TRUE); // currently not inherited from event
 
-    int n_columns= ndbtab->getNoOfColumns();
-    int n_fields= table ? table->s->fields : 0; // XXX ???
-    Ndb_event_data *event_data= new Ndb_event_data(share, n_columns);
+    uint n_columns= ndbtab->getNoOfColumns();
+    uint n_fields= table ? table->s->fields : 0; // XXX ???
+    Ndb_event_data *event_data= new Ndb_event_data(share, n_fields);
+    uint val_length= sizeof(NdbValue) * n_columns;
+    uint rec_length= table->s->rec_buff_length;
+    /*
+       Allocate memory globally so it can be reused after online alter table
+    */
+    if (my_multi_malloc(MYF(MY_WME),
+                        &event_data->ndb_value[0],
+                        val_length,
+	                &event_data->ndb_value[1],
+                        val_length,
+                        &event_data->record[0],
+                        rec_length,
+                        &event_data->record[1],
+                        rec_length,
+                        NULL) != 0)
+    {
+      /*
+         Set table record pointers to point to new records 
+      */
+      my_ptrdiff_t row_offset= event_data->record[0] - table->record[0];
+      Field **p_field;
+      for (p_field= table->field; *p_field; p_field++)
+        (*p_field)->move_field_offset(row_offset);
+      table->record[0]= event_data->record[0];
+      table->record[1]= event_data->record[1];
+    }
+    else
+    {
+      DBUG_PRINT("info", ("Failed to allocate records for event operation"));
+      DBUG_RETURN(-1);
+    }
 
-    for (int j= 0; j < n_columns; j++)
+    for (uint j= 0; j < n_columns; j++)
     {
       const char *col_name= ndbtab->getColumn(j)->getName();
       NdbValue attr0, attr1;
@@ -3154,9 +3178,10 @@ ndbcluster_create_event_ops(THD *thd, ND
                                 op->getNdbError().code,
                                 op->getNdbError().message,
                                 "NDB");
+            delete event_data;
+            op->setCustomData(NULL);
             ndb->dropEventOperation(op);
             pthread_mutex_unlock(&injector_mutex);
-            delete event_data;
             DBUG_RETURN(-1);
           }
         }
@@ -3206,7 +3231,6 @@ ndbcluster_create_event_ops(THD *thd, ND
       }
       ndb->dropEventOperation(op);
       pthread_mutex_unlock(&injector_mutex);
-      delete event_data;
       if (retries)
       {
         my_sleep(retry_sleep);
@@ -3693,12 +3717,12 @@ ndb_binlog_thread_handle_data_event(Ndb 
     {
       /* unpack data to fetch orig_server_id and orig_epoch */
       TABLE *table= share->table;
-      uint n_fields= table->s->fields;
+      uint n_fields= event_data->no_fields;
       MY_BITMAP b;
       uint32 bitbuf[128 / (sizeof(uint32) * 8)];
       bitmap_init(&b, bitbuf, n_fields, FALSE);
       bitmap_set_all(&b);
-      ndb_unpack_record(table, event_data->ndb_value[0], &b, table->record[0]);
+      ndb_unpack_record(table, event_data->ndb_value[0], &b,
event_data->record[0]);
       /* store */
       ndb_binlog_index_row *row= ndb_find_binlog_index_row
         (rows, ((Field_long *)table->field[0])->val_int(), 1);
@@ -3750,7 +3774,7 @@ ndb_binlog_thread_handle_data_event(Ndb 
   dbug_print_table("table", table);
 
   TABLE_SHARE *table_s= table->s;
-  uint n_fields= table_s->fields;
+  uint n_fields= event_data->no_fields;
   DBUG_PRINT("info", ("Assuming %u columns for table %s", n_fields,
table_s->table_name.str));
   MY_BITMAP b;
   /* Potential buffer for the bitmap */
@@ -3791,11 +3815,11 @@ ndb_binlog_thread_handle_data_event(Ndb 
                                                ptrdiff);
         DBUG_ASSERT(ret == 0);
       }
-      ndb_unpack_record(table, event_data->ndb_value[0], &b, table->record[0]);
+      ndb_unpack_record(table, event_data->ndb_value[0], &b,
event_data->record[0]);
       IF_DBUG(int ret=) trans.write_row(originating_server_id,
                                         injector::transaction::table(table,
                                                                      TRUE),
-                                        &b, n_fields, table->record[0]);
+                                        &b, n_fields, event_data->record[0]);
       DBUG_ASSERT(ret == 0);
     }
     break;
@@ -3823,19 +3847,19 @@ ndb_binlog_thread_handle_data_event(Ndb 
 
       if (share->flags & NSF_BLOB_FLAG)
       {
-        my_ptrdiff_t ptrdiff= table->record[n] - table->record[0];
+        my_ptrdiff_t ptrdiff= event_data->record[n] - event_data->record[0];
         IF_DBUG(int ret =) get_ndb_blobs_value(table, event_data->ndb_value[n],
                                                blobs_buffer[n],
                                                blobs_buffer_size[n],
                                                ptrdiff);
         DBUG_ASSERT(ret == 0);
       }
-      ndb_unpack_record(table, event_data->ndb_value[n], &b, table->record[n]);
-      DBUG_EXECUTE("info", print_records(table, table->record[n]););
+      ndb_unpack_record(table, event_data->ndb_value[n], &b,
event_data->record[n]);
+      DBUG_EXECUTE("info", print_records(table, event_data->record[n]););
       IF_DBUG(int ret =) trans.delete_row(originating_server_id,
                                           injector::transaction::table(table,
                                                                        TRUE),
-                                          &b, n_fields, table->record[n]);
+                                          &b, n_fields, event_data->record[n]);
       DBUG_ASSERT(ret == 0);
     }
     break;
@@ -3854,8 +3878,8 @@ ndb_binlog_thread_handle_data_event(Ndb 
         DBUG_ASSERT(ret == 0);
       }
       ndb_unpack_record(table, event_data->ndb_value[0],
-                        &b, table->record[0]);
-      DBUG_EXECUTE("info", print_records(table, table->record[0]););
+                        &b, event_data->record[0]);
+      DBUG_EXECUTE("info", print_records(table, event_data->record[0]););
       if (table->s->primary_key != MAX_KEY) 
       {
         /*
@@ -3864,7 +3888,7 @@ ndb_binlog_thread_handle_data_event(Ndb 
         */
         trans.write_row(originating_server_id,
                         injector::transaction::table(table, TRUE),
-                        &b, n_fields, table->record[0]);// after values
+                        &b, n_fields, event_data->record[0]);// after values
       }
       else
       {
@@ -3874,21 +3898,21 @@ ndb_binlog_thread_handle_data_event(Ndb 
         */
         if (share->flags & NSF_BLOB_FLAG)
         {
-          my_ptrdiff_t ptrdiff= table->record[1] - table->record[0];
+          my_ptrdiff_t ptrdiff= event_data->record[1] - event_data->record[0];
           IF_DBUG(int ret =) get_ndb_blobs_value(table, event_data->ndb_value[1],
                                                  blobs_buffer[1],
                                                  blobs_buffer_size[1],
                                                  ptrdiff);
           DBUG_ASSERT(ret == 0);
         }
-        ndb_unpack_record(table, event_data->ndb_value[1], &b,
table->record[1]);
-        DBUG_EXECUTE("info", print_records(table, table->record[1]););
+        ndb_unpack_record(table, event_data->ndb_value[1], &b,
event_data->record[1]);
+        DBUG_EXECUTE("info", print_records(table, event_data->record[1]););
         IF_DBUG(int ret =) trans.update_row(originating_server_id,
                                             injector::transaction::table(table,
                                                                          TRUE),
                                             &b, n_fields,
-                                            table->record[1], // before values
-                                            table->record[0]);// after values
+                                            event_data->record[1], // before values
+                                            event_data->record[0]);// after values
         DBUG_ASSERT(ret == 0);
       }
     }
@@ -4690,12 +4714,14 @@ err:
         delete event_data;
         op->setCustomData(NULL);
       }
+      (void) pthread_mutex_lock(&share->mutex);
       share->op= 0;
       ndb_remove_old_event_op(share, op);
       /* ndb_share reference binlog free */
       DBUG_PRINT("NDB_SHARE", ("%s binlog free  use_count: %u",
                                share->key, share->use_count));
       free_share(&share);
+      (void) pthread_mutex_unlock(&share->mutex);
       s_ndb->dropEventOperation(op);
     }
     delete s_ndb;
@@ -4720,12 +4746,14 @@ err:
         delete event_data;
         op->setCustomData(NULL);
       }
+      (void) pthread_mutex_lock(&share->mutex);
       share->op= 0;
       ndb_remove_old_event_op(share, op);
       /* ndb_share reference binlog free */
       DBUG_PRINT("NDB_SHARE", ("%s binlog free  use_count: %u",
                                share->key, share->use_count));
       free_share(&share);
+      (void) pthread_mutex_unlock(&share->mutex);
       i_ndb->dropEventOperation(op);
     }
     delete i_ndb;

--- 1.28/sql/ha_ndbcluster_binlog.h	2007-08-29 10:28:08 +02:00
+++ 1.29/sql/ha_ndbcluster_binlog.h	2007-08-29 10:28:08 +02:00
@@ -38,21 +38,27 @@ extern handlerton *ndbcluster_hton;
 
 class Ndb_event_data {
 public:
-  Ndb_event_data(NDB_SHARE *the_share, int no_columns) : share(the_share)
+  Ndb_event_data(NDB_SHARE *the_share, uint n_fields) : 
+  share(the_share), no_fields(n_fields)
   {
-    ndb_value[0]= (NdbValue*) my_malloc(sizeof(NdbValue) * no_columns,
-                                        MYF(MY_WME));
-    ndb_value[1]= (NdbValue*) my_malloc(sizeof(NdbValue) * no_columns,
-                                        MYF(MY_WME));
+    ndb_value[0]= 0;
+    ndb_value[1]= 0;
+    record[0]= 0;
+    record[1]= 0;
   }
   ~Ndb_event_data()
   {
     share= 0;
-    my_free((gptr) ndb_value[0], MYF(MY_WME));
-    my_free((gptr) ndb_value[1], MYF(MY_WME));
+    /*
+       ndbvalue[] and record[] are allocated with my_multi_malloc
+       so only first pointer should be freed  
+    */
+    my_free((gptr) ndb_value[0], MYF(MY_WME|MY_ALLOW_ZERO_PTR));
   }
   NDB_SHARE *share;
+  uint no_fields;
   NdbValue *ndb_value[2];
+  byte *record[2]; // pointer to allocated records for receiving data
 };
 
 /*
Thread
bk commit into 5.1 tree (mskold:1.2613)Martin Skold29 Aug