List:Internals« Previous MessageNext Message »
From:tomas Date:April 13 2005 11:18pm
Subject:bk commit into 5.1 tree (tomas:1.1870)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1870 05/04/14 01:18:02 tomas@stripped +1 -0
  ha_ndbcluster.cc:
    using openfrm instead of create_virtual_tmp_table to get a TABLE object (less bugs to fix)
    added mutex around usage if "injector_ndb"

  sql/ha_ndbcluster.cc
    1.214 05/04/14 01:14:50 tomas@stripped +211 -238
    using openfrm instead of create_virtual_tmp_table to get a TABLE object (less bugs to fix)
    added mutex around usage if "injector_ndb"

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	tomas
# Host:	poseidon.ndb.mysql.com
# Root:	/home/tomas/mysql-5.1-wl2325

--- 1.213/sql/ha_ndbcluster.cc	2005-04-13 14:28:23 +02:00
+++ 1.214/sql/ha_ndbcluster.cc	2005-04-14 01:14:50 +02:00
@@ -101,7 +101,7 @@
 static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
                                 my_bool not_used __attribute__((unused)));
 static NDB_SHARE *get_share(const char *table_name,
-			    bool create_if_not_exists= 1,
+			    bool create_if_not_exists= TRUE,
 			    TABLE *table= 0);
 static void free_share(NDB_SHARE **share);
 
@@ -129,8 +129,6 @@
 #define INJECTOR_EVENT_LEN 200
 // NDB Injector thread (used for replication)
 static pthread_t ndb_injector_thread;
-
-static int add_table_to_share(NDB_SHARE *share, TABLE *table= 0);
 static int ndbcluster_create_event(Ndb *ndb, const NDBTAB* table,
 				   const char* event_name);
 static int ndbcluster_drop_event(Ndb *ndb, const NDBTAB* table,
@@ -138,12 +136,12 @@
 static int ndbcluster_create_event_ops(Ndb *ndb, NDB_SHARE *share, const NDBTAB* ndbtab,
 				       const char* event_name);
 static int replication_handle_drop_table(Ndb *ndb, const char* event_name,
-					 NDB_SHARE **share);
+					 NDB_SHARE *share);
 inline void ndb_rep_event_name(String *event_name, const char* db, const char* tbl);
 
 extern "C" pthread_handler_decl(ndb_injector_thread_func, arg);
 
-pthread_mutex_t injector_startup_mutex;
+pthread_mutex_t injector_mutex;
 pthread_cond_t  injector_cond;
 static int ndb_injector_thread_running= 0;
 
@@ -3858,9 +3856,6 @@
 {
   NDBTAB tab;
   NDBCOL col;
-#ifdef HAVE_REPLICATION
-  NDB_SHARE *share= 0;
-#endif
   uint pack_length, length, i, pk_length= 0;
   const void *data, *pack_data;
   char name2[FN_HEADLEN];
@@ -3872,29 +3867,6 @@
   set_dbname(name2);
   set_tabname(name2);    
 
-#ifdef HAVE_REPLICATION
-  if(ndb_injector_thread_running > 0)
-  {
-    /**
-     * Share is needed to hold replication information
-     */
-    if (!(share=get_share(name2,true,form)))
-    {
-      sql_print_error("NDB Replication: allocating table share for %s failed", name2);
-      // ToDo: return error here;
-    }
-#if 0
-    if (share &&
-	(share->use_count > 1 && !share->op ||
-	 share->use_count > 2))
-    {
-      // we have a share since before, no need for us to keep it open
-      free_share(&share);
-    }
-#endif
-  }
-#endif
-
   if (create_from_engine)
   {
     /*
@@ -3902,18 +3874,26 @@
       caller.
       Do Ndb specific stuff, such as create a .ndb file
     */
-    my_errno= write_ndb_file();
-#ifdef HAVE_REPLICATION
-    if (my_errno)
-    {
-      if (share)
-	free(share);
+    if ((my_errno= write_ndb_file()))
       DBUG_RETURN(my_errno);
-    }
-    if (share && m_tabname[0] != '#')
+#ifdef HAVE_REPLICATION
+    while(ndb_injector_thread_running > 0)
     {
       if ((my_errno= check_ndb_connection()))
 	DBUG_RETURN(my_errno);
+      /**
+       * Share is needed to hold replication information
+       */
+      NDB_SHARE *share;
+      if (!(share=get_share(name2,true)))
+      {
+	sql_print_error("NDB Replication: "
+			"allocating table share for %s failed", name2);
+	break;
+	// ToDo: return error here;
+      }
+      if (m_tabname[0] == '#') // ToDo keep the share or not?
+	break;                 // to enable rename if kept?
 
       // Create the table in NDB     
       Ndb *ndb= get_ndb();
@@ -3926,7 +3906,8 @@
       {
 	ndb->setDatabaseName(m_dbname);
 	ndbcluster_create_event(ndb,ndbtab,event_name.c_ptr());
-	sql_print_information("NDB Replication: CREATE (DISCOVER) TABLE Event: %s",
+	sql_print_information("NDB Replication: "
+			      "CREATE (DISCOVER) TABLE Event: %s",
 			      event_name.c_ptr());
       }
       else
@@ -3935,12 +3916,16 @@
       if (ndbcluster_create_event_ops(injector_ndb, share, ndbtab,
 				      event_name.c_ptr()) < 0)
       {
+	free(&share);
 	; /* ToDo: handle error? */
+	DBUG_RETURN(0)
       }
+      // keep lock on share, injector thread now has responsability
+      DBUG_RETURN(0)
     }
 #endif
     DBUG_RETURN(my_errno);
-  }
+  } // if (create_from_engine)
 
   DBUG_PRINT("table", ("name: %s", m_tabname));  
   tab.setName(m_tabname);
@@ -4033,6 +4018,14 @@
   }
 
 #ifdef HAVE_REPLICATION
+  NDB_SHARE *share= 0;
+  if (ndb_injector_thread_running > 0 &&
+      !(share=get_share(name2,true)))
+  {
+    sql_print_error("NDB Replication: "
+		    "allocating table share for %s failed", name2);
+    // ToDo: return error here;
+  }
   if (m_tabname[0] != '#') // FIXME tabname hack
   {
     const NDBTAB *t= dict->getTable(m_tabname);
@@ -4046,10 +4039,10 @@
 			    event_name.c_ptr());
       if(share)
       {
-	if (injector_ndb &&
-	    ndbcluster_create_event_ops(injector_ndb, share, t,
+	if (ndbcluster_create_event_ops(injector_ndb, share, t,
 					event_name.c_ptr() ) < 0)
 	{
+	  free(&share);
 	  /* ToDo; handle error? */
 	}
       }
@@ -4073,6 +4066,12 @@
   // ToDo remove replication stuff if error
 
   DBUG_RETURN(my_errno);
+create_err:
+#ifdef HAVE_REPLICATION
+  if (share)
+    free(&share);
+#endif
+  DBUG_RETURN(my_errno);
 }
 
 
@@ -4185,6 +4184,18 @@
   set_dbname(to);
   ndb->setDatabaseName(m_dbname);
 
+  if (!(result= alter_table_name(new_tabname)))
+  {
+#ifdef HAVE_REPLICATION
+    if (wait_for_drop)
+    {
+      dict->forceGCPWait();
+    }
+#endif
+    // Rename .ndb file
+    result= handler::rename_table(from, to);
+  }
+
 #ifdef HAVE_REPLICATION
   NDB_SHARE *new_share= 0;
   if ( old_share )
@@ -4194,32 +4205,40 @@
     // or because we explicitly did a get_share at ::create
     char new_share_key[FN_REFLEN];
     (void)strxnmov(new_share_key, FN_REFLEN,share_prefix, m_dbname,"/",new_tabname,NullS);
-    new_share= get_share(new_share_key,true,old_share->table);
+    new_share= get_share(new_share_key,true,old_share->table); // handover table to new_share
     DBUG_ASSERT(new_share->no_use_count_check || new_share->use_count == 1);
 
     if (wait_for_drop == 0)
     {
       DBUG_ASSERT(old_share->no_use_count_check || old_share->use_count == 2);
-      free_share(&old_share);
-      free_share(&old_share);
+      old_share->table= 0;
+      free_share(&old_share); // for the get_share in this routine
+      free_share(&old_share); // sice there is no event op to free the other lock
       old_share=0;
+      new_share->table->s->db= new_share->db;
+      new_share->table->s->table_name= new_share->table_name;
     }
   }
 #endif
 
-  if (!(result= alter_table_name(new_tabname)))
-  {
 #ifdef HAVE_REPLICATION
-    if (wait_for_drop)
+  // handle old table
+  if ( m_tabname[0] != '#' ) // FIXME tabname hack
+  {
+    String event_name(INJECTOR_EVENT_LEN);
+    ndb_rep_event_name(&event_name,old_share_key+sizeof(share_prefix)-1,0);
+
+    replication_handle_drop_table(ndb, event_name.c_ptr(), old_share);
+    if (old_share)
     {
-      dict->forceGCPWait();
+      old_share->table= 0;
+      free_share(&old_share);
+      old_share= 0;
+      new_share->table->s->db= new_share->db;
+      new_share->table->s->table_name= new_share->table_name;
     }
-#endif
-    // Rename .ndb file
-    result= handler::rename_table(from, to);
   }
 
-#ifdef HAVE_REPLICATION
   if( !result && new_tabname[0] != '#')
   {
     /* always create an event for the table */
@@ -4245,15 +4264,6 @@
       /* ToDo; handle error? */
     }
   }
-
-  // handle old table
-  if ( m_tabname[0] != '#' ) // FIXME tabname hack
-  {
-    String event_name(INJECTOR_EVENT_LEN);
-    ndb_rep_event_name(&event_name,old_share_key+sizeof(share_prefix)-1,0);
-
-    replication_handle_drop_table(ndb, event_name.c_ptr(), &old_share);
-  }
 #endif
 
   DBUG_RETURN(result);
@@ -4334,11 +4344,13 @@
     String event_name(INJECTOR_EVENT_LEN);
     ndb_rep_event_name(&event_name,name+sizeof(share_prefix)-1,0);
     Ndb *ndb= get_ndb();
-    if ( share )
+    if ( share && share->op )
     {
       ndb->getDictionary()->forceGCPWait();
     }
-    replication_handle_drop_table(ndb,event_name.c_ptr(),&share);
+    replication_handle_drop_table(ndb,event_name.c_ptr(),share);
+    if (share)
+      free_share(&share);
   }
 #if 0
   // make sure we don't have a share left
@@ -4992,7 +5004,7 @@
 #ifdef HAVE_REPLICATION
   if (opt_bin_log)
   {
-    pthread_mutex_init(&injector_startup_mutex,MY_MUTEX_INIT_FAST);
+    pthread_mutex_init(&injector_mutex,MY_MUTEX_INIT_FAST);
     pthread_cond_init(&injector_cond,NULL);
 
 
@@ -5002,19 +5014,17 @@
     {
       DBUG_PRINT("error", ("Could not create ndb injector thread"));
       pthread_cond_destroy(&injector_cond);
-      pthread_mutex_destroy(&injector_startup_mutex);
+      pthread_mutex_destroy(&injector_mutex);
       goto ndbcluster_init_error;
     }
     /**
      * Wait for the ndb injector thread to finish starting up.
      * Afterwards, we no longer need this mutex
      */
-    pthread_mutex_lock(&injector_startup_mutex);
+    pthread_mutex_lock(&injector_mutex);
     while(!ndb_injector_thread_running)
-      pthread_cond_wait(&injector_cond,&injector_startup_mutex);
-    pthread_mutex_unlock(&injector_startup_mutex);
-
-    pthread_mutex_destroy(&injector_startup_mutex);
+      pthread_cond_wait(&injector_cond,&injector_mutex);
+    pthread_mutex_unlock(&injector_mutex);
 
     if(ndb_injector_thread_running<0)
       goto ndbcluster_init_error;
@@ -5082,6 +5092,7 @@
       DBUG_PRINT("info",("pthread_join(ndb_injector_thread, NULL) %d", r));
       DBUG_PRINT("info",("%d %d %d", ESRCH, EINVAL, EDEADLK));
     }
+    pthread_mutex_destroy(&injector_mutex);
     pthread_cond_destroy(&injector_cond);
     ndb_injector_thread_running= 0;
   }
@@ -5460,7 +5471,7 @@
   {
     Field * f= table->field[i];
     DBUG_PRINT("info",
-	       ("[%d] \"%s\"(%x:%s%s%s%s%s%s) type: %d pack_length: %d ptr: %x[+%d]",
+	       ("[%d] \"%s\"(%x:%s%s%s%s%s%s) type: %d pack_length: %d ptr: %x[+%d] null_bit: %u null_ptr: %x[+%d]",
 		i,
 		f->field_name,
 		f->flags,
@@ -5472,7 +5483,15 @@
 		(f->flags & BINARY_FLAG)   ? ",binary"   : "",
 		f->real_type(),
 		f->pack_length(),
-		f->ptr, f->ptr - table->record[0]));
+		f->ptr, f->ptr - table->record[0],
+		f->null_bit,
+		f->null_ptr, (byte*)f->null_ptr - table->record[0]));
+    if (f->type() == MYSQL_TYPE_BIT)
+    {
+      Field_bit *g= (Field_bit*)f;
+      DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d bit_ptr: %x[+%d] bit_ofs: %u bit_len: %u",
+				   g->field_length,g->bit_ptr,(byte*)g->bit_ptr-table->record[0],g->bit_ofs,g->bit_len));
+    }
   }
 }
 
@@ -5480,7 +5499,13 @@
   DBUG_ASSERT( old_table->THE_VAR == new_table->THE_VAR)
 #define CHECK_S_VAR(THE_VAR) \
   DBUG_ASSERT( old_table->s->THE_VAR == new_table->s->THE_VAR)
-
+#define CHECK_F_VAR(THE_VAR) \
+  DBUG_ASSERT( old_table->field[i]->THE_VAR == new_table->field[i]->THE_VAR)
+#define CHECK_F_VAR_TYPE(COND, TYPE, THE_VAR) \
+  if (old_table->field[i]->type() == COND) \
+  { \
+    DBUG_ASSERT( ((TYPE *)old_table->field[i])->THE_VAR == ((TYPE *)new_table->field[i])->THE_VAR ); \
+  }
 void dbug_check_tabledup_virtual(TABLE *old_table, TABLE *new_table)
 {
   dbug_print_table("old_table", old_table);
@@ -5489,6 +5514,12 @@
   //CHECK_S_VAR(reclength); mismatch
   CHECK_S_VAR(fields);
   CHECK_S_VAR(blob_fields);
+  for (int i= 0; i < old_table->s->fields; i++)
+  {
+    //CHECK_F_VAR(null_bit); mismatch
+    CHECK_F_VAR_TYPE(MYSQL_TYPE_BIT, Field_bit, bit_ofs);
+    CHECK_F_VAR_TYPE(MYSQL_TYPE_BIT, Field_bit, bit_len);
+  }
   //  ToDo: check null_bytes?
 #if 0 // check this later
   uint* const beg = table->s->blob_field;
@@ -5564,19 +5595,44 @@
 #ifndef DBUG_OFF
       share->no_use_count_check= 1;
 #endif
-      if (table)
+      if (ndb_injector_thread_running > 0)
       {
-	share->ndb_value[0]= 
-	  (NdbValue*)alloc_root(*root_ptr,sizeof(NdbValue)*table->s->fields+1);
-	share->ndb_value[1]= 
-	  (NdbValue*)alloc_root(*root_ptr,sizeof(NdbValue)*table->s->fields+1);
-#ifndef DBUG_OFF
-	dbug_print_table("old_table",table);
-#endif
-	add_table_to_share(share, table);
-#ifndef DBUG_OFF
-	dbug_print_table("new_table",share->table);
-#endif
+	while (table == 0)
+	{
+	  table= (TABLE *)my_malloc(sizeof(*table),MYF(MY_WME));
+	  int r;
+	  if ((r= openfrm(current_thd, share->key,"",0,(uint) READ_ALL,
+			  0, table)))
+	  {
+	    sql_print_error("Unable to open frm for %s, frmerror=%d",
+			    share->key,r);
+	    DBUG_PRINT("error",("openfrm failed %d",r));
+	    my_free((gptr) table,MYF(0));
+	    table= 0;
+	    break;
+	  }
+	  if (!table->record[1])
+	  {
+	    table->record[1]= alloc_root(&table->mem_root,
+					 table->s->rec_buff_length);
+	  }
+	  table->in_use= injector_thd;
+	  break;
+	}
+	share->table= table;
+	if (table)
+	{
+	  // ! do not touch the contents of the table
+	  // it may be in use by the injector thread
+	  share->ndb_value[0]= 
+	    (NdbValue*)alloc_root(*root_ptr,
+				  sizeof(NdbValue)*table->s->fields
+				  +1 /*extra for hidden key*/);
+	  share->ndb_value[1]= 
+	    (NdbValue*)alloc_root(*root_ptr,
+				  sizeof(NdbValue)*table->s->fields
+				  +1 /*extra for hidden key*/);
+	}
       }
 #endif
       *root_ptr= old_root;
@@ -5605,18 +5661,28 @@
   pthread_mutex_lock(&ndbcluster_mutex);
   DBUG_PRINT("free_share",
 	     ("key: %s, key_length: %d, db: %s, table_name: %s, "
-	      "use_count: %d, commit_count: %d",
-	      (*share)->key, (*share)->key_length, (*share)->db, (*share)->table_name,
+	      "use_count: %d, commit_count: %d", (*share)->key,
+	      (*share)->key_length, (*share)->db, (*share)->table_name,
 	      (*share)->use_count-1, (*share)->commit_count));
   if ((*share)->util_lock == current_thd)
     (*share)->util_lock= 0;
-
   if (!--(*share)->use_count)
   {
     hash_delete(&ndbcluster_open_tables, (byte*) *share);
     thr_lock_delete(&(*share)->lock);
     pthread_mutex_destroy(&(*share)->mutex);
     free_root(&(*share)->mem_root, MYF(0));
+    if ((*share)->table)
+    {
+      free_root(&(*share)->table->mem_root, MYF(0));
+#ifndef DBUG_OFF
+      bzero((gptr)(*share)->table,sizeof(*(*share)->table));
+#endif
+      my_free((gptr) (*share)->table,MYF(0));
+#ifndef DBUG_OFF
+      (*share)->table= 0;
+#endif
+    }
     my_free((gptr) *share, MYF(0));
     *share= 0;
   }
@@ -7471,78 +7537,6 @@
   return error;
 }
 
-TABLE *create_virtual_tmp_table(THD *thd, List<create_field> &field_list);
-static TABLE *tabledup_virtual(THD *thd, MEM_ROOT *mem_root, TABLE *table)
-{
-  DBUG_ENTER("tabledup_virtual");
-  MEM_ROOT *save_mem_root= thd->mem_root;
-  thd->mem_root= mem_root; // remember to reset before return
-
-  List<create_field> field_list;
-  TABLE *new_table;
-  unsigned i;
-
-  uint blob_columns= 0; // unused
-  int timestamps= 0; // unused
-  int timestamps_with_niladic= 0; // unused
-
-  for (i= 0; i < table->s->fields; i++)
-  {
-    Field *f= table->field[i];
-    create_field *cf=
-      new create_field(f,NULL,1);
-    prepare_create_field(cf,&blob_columns,&timestamps,
-			 &timestamps_with_niladic, HA_NDBCLUSTER_TABLE_FLAGS);
-    DBUG_PRINT("info",("%p(%s)",
-		       cf->field_name,
-		       cf->field_name));
-
-    if (field_list.push_back(cf))
-    {
-      thd->mem_root= save_mem_root;
-      DBUG_RETURN(0);
-    }
-  }
-
-  new_table= create_virtual_tmp_table(thd, field_list);
-  DBUG_ASSERT( new_table != 0 );
-
-  new_table->in_use= injector_thd;
-  if (!new_table->record[1])
-  {
-    new_table->record[1]= alloc_root(mem_root,new_table->s->rec_buff_length);
-  }
-  bzero(new_table->record[0],new_table->s->rec_buff_length);
-  bzero(new_table->record[1],new_table->s->rec_buff_length);
-
-  for (i= 0; i < table->s->fields; i++)
-  {
-    new_table->field[i]->field_name=
-      strdup_root(mem_root,table->field[i]->field_name);
-    DBUG_PRINT("info",("%p(%s) %p(%s)",
-		       table->field[i]->field_name,
-		       table->field[i]->field_name,
-		       new_table->field[i]->field_name,
-		       new_table->field[i]->field_name));
-  }
-#ifndef DBUG_OFF
-  dbug_check_tabledup_virtual(table,new_table);
-#endif
-  thd->mem_root= save_mem_root;
-  DBUG_RETURN(new_table);
-}
-
-static int add_table_to_share(NDB_SHARE *share, TABLE *table)
-{
-  DBUG_ASSERT(table != 0);
-
-  share->table= tabledup_virtual(current_thd, &share->mem_root, table);
-  DBUG_ASSERT( share->table != 0);
-  share->table->s->db= share->db;
-  share->table->s->table_name= share->table_name;
-  return 0;
-}
-
 static int
 ndbcluster_create_event(Ndb *ndb, const NDBTAB *table, const char* event_name)
 {
@@ -7669,7 +7663,7 @@
 
   DBUG_ASSERT(share != 0);
 
-  if (share->op != 0)
+  if (share->op)
   {
     assert(share->op->getCustomData() == (void *)share);
 
@@ -7684,14 +7678,16 @@
   TABLE *table= share->table;
   if (table && table->s->blob_fields)
   {
-    sql_print_error("NDB Replication: replication of blob tables not supported");
+    sql_print_error("NDB Replication: "
+		    "replication of blob tables not supported");
     DBUG_RETURN(0);
   }
 
+  pthread_mutex_lock(&injector_mutex);
   NdbEventOperation *op= ndb->createEventOperation(event_name,100);
-
   if(!op)
   {
+    pthread_mutex_unlock(&injector_mutex);
     sql_print_error("NDB Replication: Creating NdbEventOperation failed for"
 		    " %s",event_name);
     push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
@@ -7733,23 +7729,24 @@
     share->ndb_value[0][j].rec= attr0;
     share->ndb_value[1][j].rec= attr1;
   }
-  
+  op->setCustomData((void *)share); // set before execute
+  share->op= op; // assign op in NDB_SHARE
   if (op->execute())
   {
+    share->op= NULL;
     push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
 			ER_GET_ERRMSG, ER(ER_GET_ERRMSG), 
 			op->getNdbError().code, op->getNdbError().message,
 			"NDB");
     ndb->dropEventOperation(op);
+    pthread_mutex_unlock(&injector_mutex);
     sql_print_error("NDB Replication: ndbevent->execute failed for %s",
 		    event_name);
     DBUG_RETURN(-1);
   }
-
-  op->setCustomData((void *)share);
+  pthread_mutex_unlock(&injector_mutex);
 
   DBUG_ASSERT(share->no_use_count_check || share->use_count == 1);
-  share->op= op; // assign op in NDB_SHARE
   DBUG_PRINT("info",("%s share->op: %x, share->use_count: %u",
 		     share->key, share->op, share->use_count));
 
@@ -7763,22 +7760,16 @@
  * (unless it has already dropped... then share->op == 0)
  */
 static int
-replication_handle_drop_table(Ndb *ndb, const char *event_name, NDB_SHARE **share)
+replication_handle_drop_table(Ndb *ndb, const char *event_name, NDB_SHARE *share)
 {
   DBUG_ENTER("replication_handle_drop_table");
 
-  if ( ndb_injector_thread_running > 0 && *share == 0)
-  {
-    sql_print_error("NDB delete table: share does not exist for %s. Ignoring...",
-		    event_name);
-  }
-
   NdbError ndb_error;
   if ( ndbcluster_drop_event(ndb,0/* ToDo */,event_name, ndb_error) < 0 )
   {
     /* ToDo; handle error? */
-    if (*share && (*share)->op &&
-	(*share)->op->getState() == NdbEventOperation::EO_EXECUTING &&
+    if (share && share->op &&
+	share->op->getState() == NdbEventOperation::EO_EXECUTING &&
 	ndb_error.code != 4009)
     {
       DBUG_ASSERT(false);
@@ -7787,27 +7778,27 @@
     DBUG_RETURN(0);
   }
 
-  if (*share == 0)
+  if (share == 0 || share->op == 0)
   {
     DBUG_RETURN(0);
   }
 
+  (void) pthread_mutex_lock(&share->mutex);
   int max_timeout= 10;
-  (void) pthread_mutex_lock(&(*share)->mutex);
-  while ((*share)->op && max_timeout)
+  while (share->op && max_timeout)
   {
-    DBUG_ASSERT((*share)->no_use_count_check || (*share)->use_count == 2);
+    DBUG_ASSERT(share->no_use_count_check || share->use_count == 2);
     struct timespec abstime;
     set_timespec(abstime, 1);
-    sql_print_information("NDB delete table: waiting max %u sec for drop table %s.",
-			  max_timeout, (*share)->key);
+    sql_print_information("NDB delete table: "
+			  "waiting max %u sec for drop table %s.",
+			  max_timeout, share->key);
     (void) pthread_cond_timedwait(&injector_cond,
-				  &(*share)->mutex,
+				  &share->mutex,
 				  &abstime);
     max_timeout--;
   }
-  (void) pthread_mutex_unlock(&(*share)->mutex);
-  free_share(share);
+  (void) pthread_mutex_unlock(&share->mutex);
   if (max_timeout == 0)
   {
     sql_print_error("NDB delete table: timed out. Ignoring...");
@@ -7840,64 +7831,49 @@
   case NDBEVENT::TE_CLUSTER_FAILURE:
     sql_print_information("NDB Replication: cluster failure for %s.",
 			  ndbtab->getMysqlName());
+    pthread_mutex_lock(&injector_mutex);
     ndb->dropEventOperation(pOp);
-    if (share)
-    {
-      (void) pthread_mutex_lock(&share->mutex);
-      DBUG_PRINT("info",("CLUSTER FAILURE EVENT: %s received op: %x share op: %x",
-			 ndbtab->getMysqlName(), pOp, share->op));
-      assert(share->op == pOp);
-      share->op= 0;
+    pthread_mutex_unlock(&injector_mutex);
+    (void) pthread_mutex_lock(&share->mutex);
+    DBUG_PRINT("info",("CLUSTER FAILURE EVENT: %s received op: %x share op: %x",
+		       ndbtab->getMysqlName(), pOp, share->op));
+    assert(share->op == pOp);
+    share->op= 0;
       
-      // Signal ha_ndbcluster::delete_table that drop is done
-      (void) pthread_mutex_unlock(&share->mutex);
-      (void) pthread_cond_signal(&injector_cond);
+    // Signal ha_ndbcluster::delete_table that drop is done
+    (void) pthread_mutex_unlock(&share->mutex);
+    (void) pthread_cond_signal(&injector_cond);
       
-      // release previous lock
-      free_share(&share);
-    }
-    else
-    {
-      sql_print_error("NDB Replication (handle cluster failure): "
-		      "share does not exist for %s. Ignoring...",
-		      ndbtab->getMysqlName());
-    }
+    // release previous lock
+    free_share(&share);
     break;
   case NDBEVENT::TE_DROP:
   case NDBEVENT::TE_ALTER:
     row.n_schemaops++;
-    if (share)
-    {
-      (void) pthread_mutex_lock(&share->mutex);
-      DBUG_PRINT("info",("TABLE %S EVENT: %s received op: %x share op: %x",
-			 type == NDBEVENT::TE_DROP ? "DROP" : "ALTER",
-			 ndbtab->getMysqlName(), pOp, share->op));
-      assert(share->op == pOp);
-      share->op= 0;
-
-      // either just us or drop table handling as well
-      DBUG_ASSERT(share->no_use_count_check ||
-		  share->use_count == 1 || share->use_count == 2);
+    (void) pthread_mutex_lock(&share->mutex);
+    DBUG_PRINT("info",("TABLE %S EVENT: %s received op: %x share op: %x",
+		       type == NDBEVENT::TE_DROP ? "DROP" : "ALTER",
+		       ndbtab->getMysqlName(), pOp, share->op));
+    assert(share->op == pOp);
+    share->op= 0;
+
+    // either just us or drop table handling as well
+    DBUG_ASSERT(share->no_use_count_check ||
+		share->use_count == 1 || share->use_count == 2);
       
-      // Signal ha_ndbcluster::delete_table that drop is done
-      (void) pthread_mutex_unlock(&share->mutex);
-      (void) pthread_cond_signal(&injector_cond);
+    // Signal ha_ndbcluster::delete_table that drop is done
+    (void) pthread_mutex_unlock(&share->mutex);
+    (void) pthread_cond_signal(&injector_cond);
       
-      // release previous lock
-      free_share(&share);
-    }
-    else
-    {
-      sql_print_error("NDB Replication (handle %s table): "
-		      "share does not exist for %s. Ignoring...",
-		      type == NDBEVENT::TE_DROP ? "drop" : "alter",
-		      ndbtab->getMysqlName());
-    }
+    // release previous lock
+    free_share(&share);
     // ToDo: remove printout
     sql_print_information("NDB Replication: %s table %s.",
 			  type == NDBEVENT::TE_DROP ? "drop" : "alter",
 			  ndbtab->getMysqlName());
+    pthread_mutex_lock(&injector_mutex);
     ndb->dropEventOperation(pOp);
+    pthread_mutex_unlock(&injector_mutex);
     break;
   default:
     sql_print_error("NDB Replication: unknown non data event %d for %s. "
@@ -7921,7 +7897,7 @@
   Uint64 last_gci_in_binlog_index= 0;
   injector *inj= injector::instance();
 
-  pthread_mutex_lock(&injector_startup_mutex);
+  pthread_mutex_lock(&injector_mutex);
   /*
    * Set up the Thread
    */
@@ -7937,7 +7913,7 @@
     thd->cleanup();
     delete thd;
     ndb_injector_thread_running= -1;
-    pthread_mutex_unlock(&injector_startup_mutex);
+    pthread_mutex_unlock(&injector_mutex);
     pthread_cond_signal(&injector_cond);
     my_thread_end();
     pthread_exit(0);
@@ -7954,7 +7930,7 @@
   {
     sql_print_error("NDB Replication: Getting Ndb object failed");
     ndb_injector_thread_running= -1;
-    pthread_mutex_unlock(&injector_startup_mutex);
+    pthread_mutex_unlock(&injector_mutex);
     pthread_cond_signal(&injector_cond);
     goto err;
   }
@@ -7969,7 +7945,7 @@
    * We signal the thread that started us that we've finished
    * starting up.
    */
-  pthread_mutex_unlock(&injector_startup_mutex);
+  pthread_mutex_unlock(&injector_mutex);
   pthread_cond_signal(&injector_cond);
 
   /**
@@ -8090,9 +8066,6 @@
 	}
 
 	assert(share->table != 0);
-#ifndef DBUG_OFF
-	dbug_print_table("share->table", share->table);
-#endif
 	switch(pOp->getEventType())
 	{
 	case NDBEVENT::TE_INSERT:
Thread
bk commit into 5.1 tree (tomas:1.1870)tomas13 Apr