Below is the list of changes that have just been committed into a local
5.1 repository of pekka. When pekka does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.2138 06/02/20 12:07:12 pekka@stripped +5 -0
ndb - rbr blobs etc: set db/schema in injector_ndb before calling NDB
sql/ha_ndbcluster_binlog.cc
1.20 06/02/20 12:05:37 pekka@stripped +21 -2
set injector_ndb db/schema before calling NDB (may be more cases..). only place to get it is table internal name
storage/ndb/src/ndbapi/Ndb.cpp
1.63 06/02/20 12:01:20 pekka@stripped +33 -10
method to set db/schema from table name + format check in internalize_table_name
storage/ndb/include/ndbapi/NdbDictionary.hpp
1.69 06/02/20 12:01:20 pekka@stripped +1 -0
method to set db/schema from table name + format check in internalize_table_name
storage/ndb/include/ndbapi/Ndb.hpp
1.48 06/02/20 12:01:20 pekka@stripped +9 -0
method to set db/schema from table name + format check in internalize_table_name
storage/ndb/src/ndbapi/NdbBlob.cpp
1.37 06/02/20 11:55:47 pekka@stripped +8 -4
DBUG
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: pekka
# Host: orca.ndb.mysql.com
# Root: /space/pekka/ndb/version/my51
--- 1.47/storage/ndb/include/ndbapi/Ndb.hpp 2006-02-16 14:52:11 +01:00
+++ 1.48/storage/ndb/include/ndbapi/Ndb.hpp 2006-02-20 12:01:20 +01:00
@@ -1147,6 +1147,15 @@
*/
void setDatabaseSchemaName(const char * aDatabaseSchemaName);
+#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
+ /** Set database and schema name to match previously retrieved table
+ *
+ * Returns non-zero if table internal name does not contain
+ * non-empty database and schema names
+ */
+ int setDatabaseAndSchemaName(const NdbDictionary::Table* t);
+#endif
+
/**
* Initializes the Ndb object
*
--- 1.68/storage/ndb/include/ndbapi/NdbDictionary.hpp 2006-02-17 11:50:19 +01:00
+++ 1.69/storage/ndb/include/ndbapi/NdbDictionary.hpp 2006-02-20 12:01:20 +01:00
@@ -884,6 +884,7 @@
private:
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
+ friend class Ndb;
friend class NdbDictionaryImpl;
friend class NdbTableImpl;
friend class NdbEventOperationImpl;
--- 1.62/storage/ndb/src/ndbapi/Ndb.cpp 2006-02-16 14:52:11 +01:00
+++ 1.63/storage/ndb/src/ndbapi/Ndb.cpp 2006-02-20 12:01:20 +01:00
@@ -1035,39 +1035,37 @@
return Data;
#endif
}
+
+// <internal>
const char * Ndb::getCatalogName() const
{
return theImpl->m_dbname.c_str();
}
-
void Ndb::setCatalogName(const char * a_catalog_name)
{
- if (a_catalog_name)
- {
+ // TODO can table_name_separator be escaped?
+ if (a_catalog_name && ! strchr(a_catalog_name, table_name_separator)) {
theImpl->m_dbname.assign(a_catalog_name);
theImpl->update_prefix();
}
}
-
const char * Ndb::getSchemaName() const
{
return theImpl->m_schemaname.c_str();
}
-
void Ndb::setSchemaName(const char * a_schema_name)
{
- if (a_schema_name) {
+ // TODO can table_name_separator be escaped?
+ if (a_schema_name && ! strchr(a_schema_name, table_name_separator)) {
theImpl->m_schemaname.assign(a_schema_name);
theImpl->update_prefix();
}
}
+// </internal>
-/*
-Deprecated functions
-*/
const char * Ndb::getDatabaseName() const
{
return getCatalogName();
@@ -1087,6 +1085,24 @@
{
setSchemaName(a_schema_name);
}
+
+int Ndb::setDatabaseAndSchemaName(const NdbDictionary::Table* t)
+{
+ const char* s0 = t->m_impl.m_internalName.c_str();
+ const char* s1 = strchr(s0, table_name_separator);
+ if (s1 && s1 != s0) {
+ const char* s2 = strchr(s1 + 1, table_name_separator);
+ if (s2 && s2 != s1 + 1) {
+ char buf[200];
+ sprintf(buf, "%.*s", s1 - s0, s0);
+ setDatabaseName(buf);
+ sprintf(buf, "%.*s", s2 - (s1 + 1), s1 + 1);
+ setDatabaseSchemaName(buf);
+ return 0;
+ }
+ }
+ return -1;
+}
bool Ndb::usingFullyQualifiedNames()
{
@@ -1149,9 +1165,16 @@
if (fullyQualifiedNames)
{
/* Internal table name format <db>/<schema>/<table>
- <db>/<schema> is already available in m_prefix
+ <db>/<schema>/ is already available in m_prefix
so just concat the two strings
*/
+#ifdef VM_TRACE
+ // verify that m_prefix looks like abc/def/
+ const char* s0 = theImpl->m_prefix.c_str();
+ const char* s1 = s0 ? strchr(s0, table_name_separator) : 0;
+ const char* s2 = s1 ? strchr(s1 + 1, table_name_separator) : 0;
+ assert(s1 && s1 != s0 && s2 && s2 != s1 + 1 && *(s2 + 1) == 0);
+#endif
ret.assfmt("%s%s",
theImpl->m_prefix.c_str(),
external_name);
--- 1.36/storage/ndb/src/ndbapi/NdbBlob.cpp 2006-02-09 11:34:38 +01:00
+++ 1.37/storage/ndb/src/ndbapi/NdbBlob.cpp 2006-02-20 11:55:47 +01:00
@@ -61,22 +61,26 @@
int
NdbBlob::getBlobTableName(char* btname, Ndb* anNdb, const char* tableName, const char* columnName)
{
+ DBUG_ENTER("NdbBlob::getBlobTableName");
NdbTableImpl* t = anNdb->theDictionary->m_impl.getTable(tableName);
if (t == NULL)
- return -1;
+ DBUG_RETURN(-1);
NdbColumnImpl* c = t->getColumn(columnName);
if (c == NULL)
- return -1;
+ DBUG_RETURN(-1);
getBlobTableName(btname, t, c);
- return 0;
+ DBUG_RETURN(0);
}
void
NdbBlob::getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c)
{
- assert(t != 0 && c != 0 && c->getBlobType());
+ DBUG_ENTER("NdbBlob::getBlobTableName");
+ assert(t != 0 && c != 0 && c->getBlobType() && c->getPartSize() != 0);
memset(btname, 0, NdbBlobImpl::BlobTableNameSize);
sprintf(btname, "NDB$BLOB_%d_%d", (int)t->m_id, (int)c->m_column_no);
+ DBUG_PRINT("info", ("blob table name: %s", btname));
+ DBUG_VOID_RETURN;
}
void
--- 1.19/sql/ha_ndbcluster_binlog.cc 2006-02-17 11:50:19 +01:00
+++ 1.20/sql/ha_ndbcluster_binlog.cc 2006-02-20 12:05:37 +01:00
@@ -2065,7 +2065,19 @@
DBUG_RETURN(-1);
}
- NdbEventOperation *op= ndb->createEventOperation(event_name);
+ NdbEventOperation* op;
+ if (do_schema_share)
+ op= ndb->createEventOperation(event_name);
+ else
+ {
+ // set injector_ndb database/schema from table internal name
+ int ret= ndb->setDatabaseAndSchemaName(ndbtab);
+ assert(ret == 0);
+ op= ndb->createEventOperation(event_name);
+ // reset to catch errors
+ ndb->setDatabaseName("");
+ ndb->setDatabaseSchemaName("");
+ }
if (!op)
{
pthread_mutex_unlock(&injector_mutex);
@@ -2632,7 +2644,8 @@
goto err;
}
- if (!(ndb= new Ndb(g_ndb_cluster_connection, "")) ||
+ // empty database and schema
+ if (!(ndb= new Ndb(g_ndb_cluster_connection, "", "")) ||
ndb->init())
{
sql_print_error("NDB Binlog: Getting Ndb object failed");
@@ -2885,11 +2898,17 @@
DBUG_ASSERT(share != 0);
}
#endif
+ // set injector_ndb database/schema from table internal name
+ int ret= ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable());
+ assert(ret == 0);
if ((unsigned) pOp->getEventType() <
(unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
ndb_binlog_thread_handle_data_event(ndb, pOp, row, trans);
else
ndb_binlog_thread_handle_non_data_event(ndb, pOp, row);
+ // reset to catch errors
+ ndb->setDatabaseName("");
+ ndb->setDatabaseSchemaName("");
pOp= ndb->nextEvent();
} while (pOp && pOp->getGCI() == gci);
| Thread |
|---|
| • bk commit into 5.1 tree (pekka:1.2138) | pekka | 20 Feb |