List:Commits« Previous MessageNext Message »
From:pekka Date:February 20 2006 12:07pm
Subject:bk commit into 5.1 tree (pekka:1.2138)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of pekka. When pekka does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2138 06/02/20 12:07:12 pekka@stripped +5 -0
  ndb - rbr blobs etc: set db/schema in injector_ndb before calling NDB

  sql/ha_ndbcluster_binlog.cc
    1.20 06/02/20 12:05:37 pekka@stripped +21 -2
    set injector_ndb db/schema before calling NDB (may be more cases..).  only place to get it is table internal name

  storage/ndb/src/ndbapi/Ndb.cpp
    1.63 06/02/20 12:01:20 pekka@stripped +33 -10
    method to set db/schema from table name + format check in internalize_table_name

  storage/ndb/include/ndbapi/NdbDictionary.hpp
    1.69 06/02/20 12:01:20 pekka@stripped +1 -0
    method to set db/schema from table name + format check in internalize_table_name

  storage/ndb/include/ndbapi/Ndb.hpp
    1.48 06/02/20 12:01:20 pekka@stripped +9 -0
    method to set db/schema from table name + format check in internalize_table_name

  storage/ndb/src/ndbapi/NdbBlob.cpp
    1.37 06/02/20 11:55:47 pekka@stripped +8 -4
    DBUG

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	pekka
# Host:	orca.ndb.mysql.com
# Root:	/space/pekka/ndb/version/my51

--- 1.47/storage/ndb/include/ndbapi/Ndb.hpp	2006-02-16 14:52:11 +01:00
+++ 1.48/storage/ndb/include/ndbapi/Ndb.hpp	2006-02-20 12:01:20 +01:00
@@ -1147,6 +1147,15 @@
    */
   void setDatabaseSchemaName(const char * aDatabaseSchemaName);
 
+#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
+  /** Set database and schema name to match previously retrieved table
+   *
+   * Returns non-zero if table internal name does not contain
+   * non-empty database and schema names
+   */
+  int setDatabaseAndSchemaName(const NdbDictionary::Table* t);
+#endif
+
   /**
    * Initializes the Ndb object
    *

--- 1.68/storage/ndb/include/ndbapi/NdbDictionary.hpp	2006-02-17 11:50:19 +01:00
+++ 1.69/storage/ndb/include/ndbapi/NdbDictionary.hpp	2006-02-20 12:01:20 +01:00
@@ -884,6 +884,7 @@
 
   private:
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
+    friend class Ndb;
     friend class NdbDictionaryImpl;
     friend class NdbTableImpl;
     friend class NdbEventOperationImpl;

--- 1.62/storage/ndb/src/ndbapi/Ndb.cpp	2006-02-16 14:52:11 +01:00
+++ 1.63/storage/ndb/src/ndbapi/Ndb.cpp	2006-02-20 12:01:20 +01:00
@@ -1035,39 +1035,37 @@
   return Data;
 #endif
 }
+
+// <internal>
 const char * Ndb::getCatalogName() const
 {
   return theImpl->m_dbname.c_str();
 }
 
-
 void Ndb::setCatalogName(const char * a_catalog_name)
 {
-  if (a_catalog_name)
-  {
+  // TODO can table_name_separator be escaped?
+  if (a_catalog_name && ! strchr(a_catalog_name, table_name_separator)) {
     theImpl->m_dbname.assign(a_catalog_name);
     theImpl->update_prefix();
   }
 }
 
-
 const char * Ndb::getSchemaName() const
 {
   return theImpl->m_schemaname.c_str();
 }
 
-
 void Ndb::setSchemaName(const char * a_schema_name)
 {
-  if (a_schema_name) {
+  // TODO can table_name_separator be escaped?
+  if (a_schema_name && ! strchr(a_schema_name, table_name_separator)) {
     theImpl->m_schemaname.assign(a_schema_name);
     theImpl->update_prefix();
   }
 }
+// </internal>
  
-/*
-Deprecated functions
-*/
 const char * Ndb::getDatabaseName() const
 {
   return getCatalogName();
@@ -1087,6 +1085,24 @@
 {
   setSchemaName(a_schema_name);
 }
+
+int Ndb::setDatabaseAndSchemaName(const NdbDictionary::Table* t)
+{
+  const char* s0 = t->m_impl.m_internalName.c_str();
+  const char* s1 = strchr(s0, table_name_separator);
+  if (s1 && s1 != s0) {
+    const char* s2 = strchr(s1 + 1, table_name_separator);
+    if (s2 && s2 != s1 + 1) {
+      char buf[200];
+      sprintf(buf, "%.*s", s1 - s0, s0);
+      setDatabaseName(buf);
+      sprintf(buf, "%.*s", s2 - (s1 + 1), s1 + 1);
+      setDatabaseSchemaName(buf);
+      return 0;
+    }
+  }
+  return -1;
+}
  
 bool Ndb::usingFullyQualifiedNames()
 {
@@ -1149,9 +1165,16 @@
   if (fullyQualifiedNames)
   {
     /* Internal table name format <db>/<schema>/<table>
-       <db>/<schema> is already available in m_prefix
+       <db>/<schema>/ is already available in m_prefix
        so just concat the two strings
      */
+#ifdef VM_TRACE
+    // verify that m_prefix looks like abc/def/
+    const char* s0 = theImpl->m_prefix.c_str();
+    const char* s1 = s0 ? strchr(s0, table_name_separator) : 0;
+    const char* s2 = s1 ? strchr(s1 + 1, table_name_separator) : 0;
+    assert(s1 && s1 != s0 && s2 && s2 != s1 + 1 && *(s2 + 1) == 0);
+#endif
     ret.assfmt("%s%s",
                theImpl->m_prefix.c_str(),
                external_name);

--- 1.36/storage/ndb/src/ndbapi/NdbBlob.cpp	2006-02-09 11:34:38 +01:00
+++ 1.37/storage/ndb/src/ndbapi/NdbBlob.cpp	2006-02-20 11:55:47 +01:00
@@ -61,22 +61,26 @@
 int
 NdbBlob::getBlobTableName(char* btname, Ndb* anNdb, const char* tableName, const char* columnName)
 {
+  DBUG_ENTER("NdbBlob::getBlobTableName");
   NdbTableImpl* t = anNdb->theDictionary->m_impl.getTable(tableName);
   if (t == NULL)
-    return -1;
+    DBUG_RETURN(-1);
   NdbColumnImpl* c = t->getColumn(columnName);
   if (c == NULL)
-    return -1;
+    DBUG_RETURN(-1);
   getBlobTableName(btname, t, c);
-  return 0;
+  DBUG_RETURN(0);
 }
 
 void
 NdbBlob::getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c)
 {
-  assert(t != 0 && c != 0 && c->getBlobType());
+  DBUG_ENTER("NdbBlob::getBlobTableName");
+  assert(t != 0 && c != 0 && c->getBlobType() && c->getPartSize() != 0);
   memset(btname, 0, NdbBlobImpl::BlobTableNameSize);
   sprintf(btname, "NDB$BLOB_%d_%d", (int)t->m_id, (int)c->m_column_no);
+  DBUG_PRINT("info", ("blob table name: %s", btname));
+  DBUG_VOID_RETURN;
 }
 
 void

--- 1.19/sql/ha_ndbcluster_binlog.cc	2006-02-17 11:50:19 +01:00
+++ 1.20/sql/ha_ndbcluster_binlog.cc	2006-02-20 12:05:37 +01:00
@@ -2065,7 +2065,19 @@
       DBUG_RETURN(-1);
     }
 
-    NdbEventOperation *op= ndb->createEventOperation(event_name);
+    NdbEventOperation* op;
+    if (do_schema_share)
+      op= ndb->createEventOperation(event_name);
+    else
+    {
+      // set injector_ndb database/schema from table internal name
+      int ret= ndb->setDatabaseAndSchemaName(ndbtab);
+      assert(ret == 0);
+      op= ndb->createEventOperation(event_name);
+      // reset to catch errors
+      ndb->setDatabaseName("");
+      ndb->setDatabaseSchemaName("");
+    }
     if (!op)
     {
       pthread_mutex_unlock(&injector_mutex);
@@ -2632,7 +2644,8 @@
     goto err;
   }
 
-  if (!(ndb= new Ndb(g_ndb_cluster_connection, "")) ||
+  // empty database and schema
+  if (!(ndb= new Ndb(g_ndb_cluster_connection, "", "")) ||
       ndb->init())
   {
     sql_print_error("NDB Binlog: Getting Ndb object failed");
@@ -2885,11 +2898,17 @@
             DBUG_ASSERT(share != 0);
           }
 #endif
+          // set injector_ndb database/schema from table internal name
+          int ret= ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable());
+          assert(ret == 0);
           if ((unsigned) pOp->getEventType() <
               (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
             ndb_binlog_thread_handle_data_event(ndb, pOp, row, trans);
           else
             ndb_binlog_thread_handle_non_data_event(ndb, pOp, row);
+          // reset to catch errors
+          ndb->setDatabaseName("");
+          ndb->setDatabaseSchemaName("");
 
           pOp= ndb->nextEvent();
         } while (pOp && pOp->getGCI() == gci);
Thread
bk commit into 5.1 tree (pekka:1.2138)pekka20 Feb