List:Commits« Previous MessageNext Message »
From:tomas Date:April 10 2006 11:40am
Subject:bk commit into 5.1 tree (tomas:1.2009) BUG#18932
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2009 06/04/10 11:40:41 tomas@stripped +1 -0
  Bug #18932 Cluster binlog mysqld accepts updating although binlog not setup

  sql/ha_ndbcluster.cc
    1.243 06/04/10 11:40:35 tomas@stripped +151 -114
    Bug #18932 Cluster binlog mysqld accepts updating although binlog not setup

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	tomas
# Host:	poseidon.ndb.mysql.com
# Root:	/home/tomas/wl2325-alcatel

--- 1.242/sql/ha_ndbcluster.cc	2006-03-29 18:18:44 +02:00
+++ 1.243/sql/ha_ndbcluster.cc	2006-04-10 11:40:35 +02:00
@@ -224,13 +224,12 @@
   Flag showing if the ndb injector thread is running, if so == 1
 */
 static int ndb_binlog_thread_running= 0;
+static my_bool ndb_binlog_tables_inited= FALSE;
 
 /*
   table cluster_replication.apply_status
 */
-static int ndbcluster_create_apply_status_table(THD *thd);
-static NDB_SHARE *ndbcluster_check_apply_status_share();
-static NDB_SHARE *ndbcluster_get_apply_status_share();
+static int ndbcluster_setup_binlog_table_shares(THD *thd);
 static NDB_SHARE *apply_status_share= 0;
 
 /*
@@ -4734,6 +4733,11 @@
   if (!res)
     info(HA_STATUS_VARIABLE | HA_STATUS_CONST);
 
+#ifdef HAVE_NDB_BINLOG
+  if (!ndb_binlog_tables_inited && ndb_binlog_thread_running > 0)
+    table->db_stat|= HA_READ_ONLY;
+#endif
+
   if (table->s->part_info)
   {
     m_part_info= table->s->part_info;
@@ -5102,7 +5106,7 @@
   }
   while (unhandled && retries--);
 
-  DBUG_RETURN(0);
+  DBUG_RETURN(-(unhandled));
 }
 
 int ndbcluster_find_files(THD *thd,const char *db,const char *path,
@@ -7122,6 +7126,8 @@
     pthread_cond_wait(&COND_server_started, &LOCK_server_started);
   pthread_mutex_unlock(&LOCK_server_started);
 
+  ndbcluster_util_inited= 1;
+
   /*
     Wait for cluster to start
   */
@@ -7141,22 +7147,11 @@
   }
   pthread_mutex_unlock(&LOCK_ndb_util_thread);
 
-  /*
-    Get all table definitions from the storage node
-  */
-  ndbcluster_find_all_files(thd);
-
-#ifdef HAVE_NDB_BINLOG
-  /* create tables needed by the replication */
-  ndbcluster_create_apply_status_table(thd);
-#endif
-
-  ndbcluster_util_inited= 1;
-
 #ifdef HAVE_NDB_BINLOG
-  /* If running, signal injector thread that all is setup */
   if (ndb_binlog_thread_running > 0)
-    pthread_cond_signal(&injector_cond);
+    sql_print_information("NDB Binlog: Ndb tables initially read only.");
+  /* create tables needed by the replication */
+  ndbcluster_setup_binlog_table_shares(thd);
 #endif
 
   set_timespec(abstime, 0);
@@ -7180,13 +7175,8 @@
       Check that the apply_status_share has been created.
       If not try to create it
     */
-    if ((ndb_binlog_thread_running > 0 || active_mi) &&
-        !apply_status_share &&
-        ndbcluster_check_apply_status_share() == 0)
-    {
-      ndbcluster_find_all_files(thd);
-      ndbcluster_create_apply_status_table(thd);
-    }
+    if (!ndb_binlog_tables_inited)
+      ndbcluster_setup_binlog_table_shares(thd);
 #endif
 
     if (ndb_cache_check_time == 0)
@@ -8939,16 +8929,6 @@
 }
 
 /*
-  Get the share for the cluster_replication.apply_status share
-
-  - return 0 if share does not exist
-*/
-static NDB_SHARE *ndbcluster_get_apply_status_share()
-{
-  return get_share(NDB_APPLY_TABLE_FILE, false);
-}
-
-/*
   Create the cluster_replication.apply_status table
 */
 static int ndbcluster_create_apply_status_table(THD *thd)
@@ -8998,6 +8978,37 @@
   DBUG_RETURN(0);
 }
 
+static int ndbcluster_setup_binlog_table_shares(THD *thd)
+{
+  if (!apply_status_share &&
+      ndbcluster_check_apply_status_share() == 0)
+  {
+    pthread_mutex_lock(&LOCK_open);
+    ha_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE);
+    pthread_mutex_unlock(&LOCK_open);
+    if (!apply_status_share)
+    {
+      ndbcluster_create_apply_status_table(thd);
+      if (!apply_status_share)
+        return 1;
+    }
+  }
+  if (!ndbcluster_find_all_files(thd))
+  {
+    pthread_mutex_lock(&LOCK_open);
+    ndb_binlog_tables_inited= TRUE;
+    if (ndb_binlog_thread_running > 0)
+    {
+      sql_print_information("NDB Binlog: ndb tables writable");
+      close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE);
+    }
+    pthread_mutex_unlock(&LOCK_open);
+    /* Signal injector thread that all is setup */
+    pthread_cond_signal(&injector_cond);
+  }
+  return 0;
+}
+
 
 /*********************************************************************
   Functions for start, stop, wait for ndbcluster binlog thread
@@ -9401,78 +9412,102 @@
     }
   }
 
-  pthread_mutex_lock(&injector_mutex);
-  if (injector_ndb == 0)
-  {
-    pthread_mutex_unlock(&injector_mutex);
-    DBUG_RETURN(-1);
-  }
+  int do_apply_status_share= 0;
+  int retries= 100;
+  if (!apply_status_share && strcmp(share->db, NDB_REP_DB) == 0 &&
+      strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
+    do_apply_status_share= 1;
 
-  NdbEventOperation *op= injector_ndb->createEventOperation(event_name);
-  if (!op)
+  while (1)
   {
-    pthread_mutex_unlock(&injector_mutex);
-    sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
-                    " %s",event_name);
-    push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
-                        injector_ndb->getNdbError().code,
-                        injector_ndb->getNdbError().message,
-                        "NDB");
-    DBUG_RETURN(-1);
-  }
+    pthread_mutex_lock(&injector_mutex);
+    if (injector_ndb == 0)
+    {
+      pthread_mutex_unlock(&injector_mutex);
+      DBUG_RETURN(-1);
+    }
+
+    NdbEventOperation *op= injector_ndb->createEventOperation(event_name);
+    if (!op)
+    {
+      pthread_mutex_unlock(&injector_mutex);
+      sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
+                      " %s",event_name);
+      push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                          ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+                          injector_ndb->getNdbError().code,
+                          injector_ndb->getNdbError().message,
+                          "NDB");
+      DBUG_RETURN(-1);
+    }
 
-  int n_columns= ndbtab->getNoOfColumns();
-  int n_fields= table ? table->s->fields : 0;
-  for (int j= 0; j < n_columns; j++)
-  {
-    const char *col_name= ndbtab->getColumn(j)->getName();
-    NdbRecAttr *attr0, *attr1;
-    if (j < n_fields)
-    {
-      Field *f= share->table->field[j];
-      if (is_ndb_compatible_type(f))
-      {
-        DBUG_PRINT("info", ("%s compatible", col_name));
-        attr0= op->getValue(col_name, f->ptr);
-        attr1= op->getPreValue(col_name, (f->ptr-share->table->record[0]) +
-                               share->table->record[1]);
+    int n_columns= ndbtab->getNoOfColumns();
+    int n_fields= table ? table->s->fields : 0;
+    for (int j= 0; j < n_columns; j++)
+    {
+      const char *col_name= ndbtab->getColumn(j)->getName();
+      NdbRecAttr *attr0, *attr1;
+      if (j < n_fields)
+      {
+        Field *f= share->table->field[j];
+        if (is_ndb_compatible_type(f))
+        {
+          DBUG_PRINT("info", ("%s compatible", col_name));
+          attr0= op->getValue(col_name, f->ptr);
+          attr1= op->getPreValue(col_name, (f->ptr-share->table->record[0]) +
+                                 share->table->record[1]);
+        }
+        else
+        {
+          DBUG_PRINT("info", ("%s non compatible", col_name));
+          attr0= op->getValue(col_name);
+          attr1= op->getPreValue(col_name);
+        }
       }
       else
       {
-        DBUG_PRINT("info", ("%s non compatible", col_name));
+        DBUG_PRINT("info", ("%s hidden key", col_name));
         attr0= op->getValue(col_name);
         attr1= op->getPreValue(col_name);
       }
+      share->ndb_value[0][j].rec= attr0;
+      share->ndb_value[1][j].rec= attr1;
+    }
+    op->setCustomData((void *) share); // set before execute
+    share->op= op; // assign op in NDB_SHARE
+    if (op->execute())
+    {
+      share->op= NULL;
+      retries--;
+      if (op->getNdbError().status != NdbError::TemporaryError &&
+          op->getNdbError().code != 1407)
+        retries= 0;
+      if (retries == 0)
+      {
+        push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                            ER_GET_ERRMSG, ER(ER_GET_ERRMSG), 
+                            op->getNdbError().code, op->getNdbError().message,
+                            "NDB");
+        sql_print_error("NDB Binlog: ndbevent->execute failed for %s; %d %s",
+                        event_name,
+                        op->getNdbError().code, op->getNdbError().message);
+      }
+      injector_ndb->dropEventOperation(op);
+      pthread_mutex_unlock(&injector_mutex);
+      if (retries)
+        continue;
+      DBUG_RETURN(-1);
     }
-    else
-    {
-      DBUG_PRINT("info", ("%s hidden key", col_name));
-      attr0= op->getValue(col_name);
-      attr1= op->getPreValue(col_name);
-    }
-    share->ndb_value[0][j].rec= attr0;
-    share->ndb_value[1][j].rec= attr1;
-  }
-  op->setCustomData((void *) share); // set before execute
-  share->op= op; // assign op in NDB_SHARE
-  if (op->execute())
-  {
-    share->op= NULL;
-    push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG), 
-                        op->getNdbError().code, op->getNdbError().message,
-                        "NDB");
-    sql_print_error("NDB Binlog: ndbevent->execute failed for %s; %d %s",
-                    event_name,
-                    op->getNdbError().code, op->getNdbError().message);
-    injector_ndb->dropEventOperation(op);
     pthread_mutex_unlock(&injector_mutex);
-    DBUG_RETURN(-1);
+    break;
   }
-  pthread_mutex_unlock(&injector_mutex);
 
   get_share(share);
+  if (do_apply_status_share)
+  {
+    apply_status_share= get_share(share);
+    (void) pthread_cond_signal(&injector_cond);
+  }
 
   DBUG_PRINT("info",("%s share->op: 0x%lx, share->use_count: %u",
                      share->key, share->op, share->use_count));
@@ -9632,7 +9667,9 @@
   switch (type)
   {
   case NDBEVENT::TE_CLUSTER_FAILURE:
-    sql_print_information("NDB Binlog: cluster failure for %s.", share->key);
+    sql_print_information("NDB Binlog: cluster failure for %s. "
+                          "Binlog may be incomplete.",
+                          share->key);
 
     DBUG_PRINT("info", ("CLUSTER FAILURE EVENT: "
                         "%s  received share: 0x%lx  op: %lx  share op: %lx  "
@@ -9642,6 +9679,10 @@
     {
       free_share(&apply_status_share);
       apply_status_share= 0;
+      ndb_binlog_tables_inited= FALSE;
+      if (ndb_binlog_thread_running > 0)
+        sql_print_information("NDB Binlog: ndb tables initially "
+                              "read only on reconnect.");
     }
     break;
   case NDBEVENT::TE_ALTER:
@@ -9727,6 +9768,8 @@
                                     injector::transaction &trans)
 {
   NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
+  if (share == apply_status_share)
+    return 0;
   TABLE *table= share->table;
   
   assert(table != 0);
@@ -9888,6 +9931,15 @@
   thd->init_for_queries();
   thd->command= COM_DAEMON;
   injector_thd= thd;
+  thd->lex->start_transaction_opt= 0;
+  thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
+  thd->version= refresh_version;
+  thd->set_time();
+  thd->main_security_ctx.host_or_ip= "";
+  thd->client_capabilities= 0;
+  my_net_init(&thd->net, 0);
+  thd->main_security_ctx.master_access= ~0;
+  thd->main_security_ctx.priv_user= 0;
 
   /*
     Set up ndb binlog
@@ -9900,7 +9952,6 @@
   thd->thread_id= thread_id++;
   threads.append(thd);
   pthread_mutex_unlock(&LOCK_thread_count);
-  thd->lex->start_transaction_opt= 0;
 
   if (!(ndb= new Ndb(g_ndb_cluster_connection, "")) ||
       ndb->init())
@@ -9929,19 +9980,10 @@
   pthread_mutex_unlock(&injector_mutex);
   pthread_cond_signal(&injector_cond);
 
-  thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
-  thd->version= refresh_version;
-  thd->set_time();
-  thd->main_security_ctx.host_or_ip= "";
-  thd->client_capabilities= 0;
-  my_net_init(&thd->net, 0);
-  thd->main_security_ctx.master_access= ~0;
-  thd->main_security_ctx.priv_user= 0;
-
   thd->proc_info= "Waiting for ndbcluster to start";
 
   pthread_mutex_lock(&injector_mutex);
-  while (!ndbcluster_util_inited)
+  while (!apply_status_share)
   {
     /* ndb not connected yet */
     struct timespec abstime;
@@ -9964,10 +10006,6 @@
     static char db[]= "";
     thd->db= db;
     open_binlog_index(thd, &binlog_tables, &binlog_index);
-    if (!(apply_status_share= ndbcluster_get_apply_status_share()))
-    {
-      sql_print_error("NDB: Could not get apply status share");
-    }
     thd->db= db;
   }
 
@@ -10025,11 +10063,6 @@
         ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
 
         assert(pOp->getGCI() <= ndb_latest_received_binlog_epoch);
-        if (!apply_status_share)
-        {
-          if (!(apply_status_share= ndbcluster_get_apply_status_share()))
-            sql_print_error("NDB: Could not get apply status share");
-        }
         bzero((char*) &row, sizeof(row));
         injector::transaction trans= inj->new_trans(thd);
         gci= pOp->getGCI();
@@ -10090,6 +10123,10 @@
         if (row.n_inserts || row.n_updates
             || row.n_deletes || row.n_schemaops)
         {
+          if (!apply_status_share)
+          {
+            sql_print_error("DB binlog: no apply_status table");
+          }
           injector::transaction::binlog_pos start= trans.start_pos();
           if (int r= trans.commit())
           {
Thread
bk commit into 5.1 tree (tomas:1.2009) BUG#18932tomas10 Apr