3403 magnus.blaudd@stripped 2011-10-27 [merge]
Merge 5.5-cluster -> trunk-cluster
added:
sql/ndb_anyvalue.cc
sql/ndb_anyvalue.h
sql/ndb_binlog_extra_row_info.cc
sql/ndb_binlog_extra_row_info.h
sql/ndb_ndbapi_util.cc
sql/ndb_ndbapi_util.h
modified:
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.h
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbcluster_binlog.h
sql/ha_ndbcluster_connection.cc
sql/ndb_share.h
storage/ndb/CMakeLists.txt
3402 magnus.blaudd@stripped 2011-10-27 [merge]
Merge 5.5-cluster -> trunk-cluster
added:
mysql-test/include/not_ndb_is.inc
mysql-test/suite/rpl_ndb/r/rpl_ndb_not_null.result
mysql-test/suite/rpl_ndb/t/rpl_ndb_not_null.test
storage/ndb/cmake/ndb_get_config_value.cmake
modified:
mysql-test/r/information_schema.result
mysql-test/r/information_schema_db.result
mysql-test/suite/funcs_1/r/is_columns_is.result
mysql-test/suite/funcs_1/r/is_tables_is.result
mysql-test/suite/funcs_1/t/is_columns_is.test
mysql-test/suite/funcs_1/t/is_tables_is.test
mysql-test/suite/ndb/include/have_clusterj.inc
mysql-test/suite/ndb/r/ndb_join_pushdown.result
mysql-test/suite/ndb/t/have_ndb_dist_priv.inc
mysql-test/suite/ndb/t/ndb_join_pushdown.test
mysql-test/suite/ndb_big/rqg_spj.test
mysql-test/t/information_schema.test
mysql-test/t/information_schema_db.test
mysql-test/t/mysqlshow.test
sql/abstract_query_plan.cc
sql/abstract_query_plan.h
sql/ha_ndb_index_stat.cc
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.h
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbcluster_binlog.h
sql/ha_ndbcluster_connection.cc
sql/ha_ndbinfo.cc
sql/ha_ndbinfo.h
sql/ndb_local_connection.cc
sql/ndb_thd_ndb.cc
storage/ndb/CMakeLists.txt
storage/ndb/VERSION
storage/ndb/clusterj/CMakeLists.txt
storage/ndb/clusterj/clusterj-api/CMakeLists.txt
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/AndPredicateImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/BetweenPredicateImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/CandidateIndexImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/ComparativePredicateImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/EqualPredicateImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/GreaterEqualPredicateImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/GreaterThanPredicateImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/InPredicateImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/LessEqualPredicateImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/LessThanPredicateImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/PredicateImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/PropertyImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/QueryDomainTypeImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/QueryExecutionContextImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/QueryExecutionContext.java
storage/ndb/clusterj/clusterj-jpatest/CMakeLists.txt
storage/ndb/clusterj/clusterj-openjpa/CMakeLists.txt
storage/ndb/clusterj/clusterj-test/CMakeLists.txt
storage/ndb/include/CMakeLists.txt
storage/ndb/include/kernel/signaldata/DiGetNodes.hpp
storage/ndb/include/ndb_global.h
storage/ndb/include/ndbapi/Ndb.hpp
storage/ndb/include/ndbapi/NdbScanOperation.hpp
storage/ndb/include/util/OutputStream.hpp
storage/ndb/src/CMakeLists.txt
storage/ndb/src/common/debugger/EventLogger.cpp
storage/ndb/src/common/debugger/SignalLoggerManager.cpp
storage/ndb/src/common/logger/LogHandler.cpp
storage/ndb/src/common/logger/Logger.cpp
storage/ndb/src/common/portlib/NdbConfig.c
storage/ndb/src/common/portlib/NdbDir.cpp
storage/ndb/src/common/portlib/NdbThread.c
storage/ndb/src/common/portlib/ndb_daemon.cc
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/common/util/BaseString.cpp
storage/ndb/src/common/util/ConfigValues.cpp
storage/ndb/src/common/util/File.cpp
storage/ndb/src/common/util/InputStream.cpp
storage/ndb/src/common/util/NdbSqlUtil.cpp
storage/ndb/src/common/util/OutputStream.cpp
storage/ndb/src/common/util/Parser.cpp
storage/ndb/src/common/util/Properties.cpp
storage/ndb/src/common/util/ndb_init.cpp
storage/ndb/src/common/util/ndbzio.c
storage/ndb/src/common/util/socket_io.cpp
storage/ndb/src/cw/cpcd/APIService.cpp
storage/ndb/src/cw/cpcd/CPCD.cpp
storage/ndb/src/cw/cpcd/Monitor.cpp
storage/ndb/src/cw/cpcd/Process.cpp
storage/ndb/src/kernel/blocks/backup/read.cpp
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/src/kernel/blocks/dbdih/printSysfile.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dblqh/redoLogReader/reader.cpp
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp
storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp
storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp
storage/ndb/src/kernel/blocks/ndbfs/Filename.cpp
storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp
storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
storage/ndb/src/kernel/error/ErrorReporter.cpp
storage/ndb/src/kernel/error/ndbd_exit_codes.c
storage/ndb/src/kernel/vm/NdbinfoTables.cpp
storage/ndb/src/kernel/vm/SimulatedBlock.cpp
storage/ndb/src/mgmapi/mgmapi.cpp
storage/ndb/src/mgmapi/ndb_logevent.cpp
storage/ndb/src/mgmclient/CommandInterpreter.cpp
storage/ndb/src/mgmsrv/Defragger.hpp
storage/ndb/src/mgmsrv/InitConfigFileParser.cpp
storage/ndb/src/mgmsrv/MgmtSrvr.cpp
storage/ndb/src/mgmsrv/Services.cpp
storage/ndb/src/ndbapi/Ndb.cpp
storage/ndb/src/ndbapi/NdbBlob.cpp
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
storage/ndb/src/ndbapi/NdbImpl.hpp
storage/ndb/src/ndbapi/NdbOperationExec.cpp
storage/ndb/src/ndbapi/NdbQueryBuilder.cpp
storage/ndb/src/ndbapi/NdbQueryBuilderImpl.hpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/src/ndbapi/NdbScanOperation.cpp
storage/ndb/src/ndbapi/NdbTransaction.cpp
storage/ndb/src/ndbapi/NdbWaitGroup.cpp
storage/ndb/src/ndbapi/Ndbinit.cpp
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/ndberror.c
storage/ndb/test/include/NDBT_Table.hpp
storage/ndb/test/include/NdbMgmd.hpp
storage/ndb/test/ndbapi/ScanFunctions.hpp
storage/ndb/test/ndbapi/testDict.cpp
storage/ndb/test/ndbapi/testMgm.cpp
storage/ndb/test/ndbapi/testNodeRestart.cpp
storage/ndb/test/rqg/parseargs.sh
storage/ndb/test/rqg/run_rqg.sh
storage/ndb/test/run-test/atrt.hpp
storage/ndb/test/run-test/files.cpp
storage/ndb/test/run-test/main.cpp
storage/ndb/test/src/DbUtil.cpp
storage/ndb/test/src/HugoQueries.cpp
storage/ndb/test/src/HugoQueryBuilder.cpp
storage/ndb/test/src/NDBT_Test.cpp
storage/ndb/test/src/NdbBackup.cpp
storage/ndb/test/src/NdbRestarter.cpp
storage/ndb/test/src/getarg.c
storage/ndb/test/tools/cpcc.cpp
storage/ndb/tools/CMakeLists.txt
storage/ndb/tools/ndb_dump_frm_data.cpp
storage/ndb/tools/ndb_index_stat.cpp
storage/ndb/tools/restore/consumer_restore.cpp
storage/ndb/tools/waiter.cpp
support-files/compiler_warnings.supp
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc 2011-10-27 12:19:57 +0000
+++ b/sql/ha_ndbcluster.cc 2011-10-27 12:33:39 +0000
@@ -51,6 +51,8 @@
#include <ndb_version.h>
#include "ndb_mi.h"
#include "ndb_conflict_trans.h"
+#include "ndb_anyvalue.h"
+#include "ndb_binlog_extra_row_info.h"
// ndb interface initialization/cleanup
extern "C" void ndb_init_internal();
@@ -1876,87 +1878,6 @@ ha_ndbcluster::set_blob_values(const Ndb
DBUG_RETURN(res);
}
-/*
- This routine is shared by injector. There is no common blobs buffer
- so the buffer and length are passed by reference. Injector also
- passes a record pointer diff.
- */
-int get_ndb_blobs_value(TABLE* table, NdbValue* value_array,
- uchar*& buffer, uint& buffer_size,
- my_ptrdiff_t ptrdiff)
-{
- DBUG_ENTER("get_ndb_blobs_value");
-
- // Field has no field number so cannot use TABLE blob_field
- // Loop twice, first only counting total buffer size
- for (int loop= 0; loop <= 1; loop++)
- {
- uint32 offset= 0;
- for (uint i= 0; i < table->s->fields; i++)
- {
- Field *field= table->field[i];
- NdbValue value= value_array[i];
- if (! (field->flags & BLOB_FLAG))
- continue;
- if (value.blob == NULL)
- {
- DBUG_PRINT("info",("[%u] skipped", i));
- continue;
- }
- Field_blob *field_blob= (Field_blob *)field;
- NdbBlob *ndb_blob= value.blob;
- int isNull;
- if (ndb_blob->getNull(isNull) != 0)
- ERR_RETURN(ndb_blob->getNdbError());
- if (isNull == 0) {
- Uint64 len64= 0;
- if (ndb_blob->getLength(len64) != 0)
- ERR_RETURN(ndb_blob->getNdbError());
- // Align to Uint64
- uint32 size= Uint32(len64);
- if (size % 8 != 0)
- size+= 8 - size % 8;
- if (loop == 1)
- {
- uchar *buf= buffer + offset;
- uint32 len= 0xffffffff; // Max uint32
- if (ndb_blob->readData(buf, len) != 0)
- ERR_RETURN(ndb_blob->getNdbError());
- DBUG_PRINT("info", ("[%u] offset: %u buf: 0x%lx len=%u [ptrdiff=%d]",
- i, offset, (long) buf, len, (int)ptrdiff));
- DBUG_ASSERT(len == len64);
- // Ugly hack assumes only ptr needs to be changed
- field_blob->set_ptr_offset(ptrdiff, len, buf);
- }
- offset+= size;
- }
- else if (loop == 1) // undefined or null
- {
- // have to set length even in this case
- uchar *buf= buffer + offset; // or maybe NULL
- uint32 len= 0;
- field_blob->set_ptr_offset(ptrdiff, len, buf);
- DBUG_PRINT("info", ("[%u] isNull=%d", i, isNull));
- }
- }
- if (loop == 0 && offset > buffer_size)
- {
- my_free(buffer, MYF(MY_ALLOW_ZERO_PTR));
- buffer_size= 0;
- DBUG_PRINT("info", ("allocate blobs buffer size %u", offset));
- buffer= (uchar*) my_malloc(offset, MYF(MY_WME));
- if (buffer == NULL)
- {
- sql_print_error("ha_ndbcluster::get_ndb_blobs_value: "
- "my_malloc(%u) failed", offset);
- DBUG_RETURN(-1);
- }
- buffer_size= offset;
- }
- }
- DBUG_RETURN(0);
-}
-
/**
Check if any set or get of blob value in current query.
@@ -4373,6 +4294,24 @@ ha_ndbcluster::set_auto_inc(THD *thd, Fi
DBUG_RETURN(set_auto_inc_val(thd, next_val));
}
+
+class Ndb_tuple_id_range_guard {
+ NDB_SHARE* m_share;
+public:
+ Ndb_tuple_id_range_guard(NDB_SHARE* share) :
+ m_share(share),
+ range(share->tuple_id_range)
+ {
+ pthread_mutex_lock(&m_share->mutex);
+ }
+ ~Ndb_tuple_id_range_guard()
+ {
+ pthread_mutex_unlock(&m_share->mutex);
+ }
+ Ndb::TupleIdRange& range;
+};
+
+
inline
int
ha_ndbcluster::set_auto_inc_val(THD *thd, Uint64 value)
@@ -12186,137 +12125,6 @@ int ndb_create_table_from_engine(THD *th
return res;
}
-/*
- find all tables in ndb and discover those needed
-*/
-int ndbcluster_find_all_files(THD *thd)
-{
- Ndb* ndb;
- char key[FN_REFLEN + 1];
- NDBDICT *dict;
- int unhandled, retries= 5, skipped;
- DBUG_ENTER("ndbcluster_find_all_files");
-
- if (!(ndb= check_ndb_in_thd(thd)))
- DBUG_RETURN(HA_ERR_NO_CONNECTION);
-
- dict= ndb->getDictionary();
-
- LINT_INIT(unhandled);
- LINT_INIT(skipped);
- do
- {
- NdbDictionary::Dictionary::List list;
- if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
- ERR_RETURN(dict->getNdbError());
- unhandled= 0;
- skipped= 0;
- retries--;
- for (uint i= 0 ; i < list.count ; i++)
- {
- NDBDICT::List::Element& elmt= list.elements[i];
- if (IS_TMP_PREFIX(elmt.name) || IS_NDB_BLOB_PREFIX(elmt.name))
- {
- DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name));
- continue;
- }
- DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name));
- if (elmt.state != NDBOBJ::StateOnline &&
- elmt.state != NDBOBJ::StateBackup &&
- elmt.state != NDBOBJ::StateBuilding)
- {
- sql_print_information("NDB: skipping setup table %s.%s, in state %d",
- elmt.database, elmt.name, elmt.state);
- skipped++;
- continue;
- }
-
- ndb->setDatabaseName(elmt.database);
- Ndb_table_guard ndbtab_g(dict, elmt.name);
- const NDBTAB *ndbtab= ndbtab_g.get_table();
- if (!ndbtab)
- {
- if (retries == 0)
- sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
- elmt.database, elmt.name,
- dict->getNdbError().code,
- dict->getNdbError().message);
- unhandled++;
- continue;
- }
-
- if (ndbtab->getFrmLength() == 0)
- continue;
-
- /* check if database exists */
- char *end= key +
- build_table_filename(key, sizeof(key) - 1, elmt.database, "", "", 0);
- if (my_access(key, F_OK))
- {
- /* no such database defined, skip table */
- continue;
- }
- /* finalize construction of path */
- end+= tablename_to_filename(elmt.name, end,
- (uint)(sizeof(key)-(end-key)));
- uchar *data= 0, *pack_data= 0;
- size_t length, pack_length;
- int discover= 0;
- if (readfrm(key, &data, &length) ||
- packfrm(data, length, &pack_data, &pack_length))
- {
- discover= 1;
- sql_print_information("NDB: missing frm for %s.%s, discovering...",
- elmt.database, elmt.name);
- }
- else if (cmp_frm(ndbtab, pack_data, pack_length))
- {
- /* ndb_share reference temporary */
- NDB_SHARE *share= get_share(key, 0, FALSE);
- if (share)
- {
- DBUG_PRINT("NDB_SHARE", ("%s temporary use_count: %u",
- share->key, share->use_count));
- }
- if (!share || get_ndb_share_state(share) != NSS_ALTERED)
- {
- discover= 1;
- sql_print_information("NDB: mismatch in frm for %s.%s, discovering...",
- elmt.database, elmt.name);
- }
- if (share)
- {
- /* ndb_share reference temporary free */
- DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
- share->key, share->use_count));
- free_share(&share);
- }
- }
- my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR));
- my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR));
-
- if (discover)
- {
- /* ToDo 4.1 database needs to be created if missing */
- if (ndb_create_table_from_engine(thd, elmt.database, elmt.name))
- {
- /* ToDo 4.1 handle error */
- }
- }
- else
- {
- /* set up replication for this table */
- ndbcluster_create_binlog_setup(thd, ndb, key, (uint)(end-key),
- elmt.database, elmt.name,
- 0);
- }
- }
- }
- while (unhandled && retries);
-
- DBUG_RETURN(-(skipped + unhandled));
-}
-
static int
ndbcluster_find_files(handlerton *hton, THD *thd,
=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h 2011-10-27 12:19:57 +0000
+++ b/sql/ha_ndbcluster.h 2011-10-27 12:33:39 +0000
@@ -108,71 +108,9 @@ public:
Uint32 old_table_version;
};
-typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
-
-int get_ndb_blobs_value(TABLE* table, NdbValue* value_array,
- uchar*& buffer, uint& buffer_size,
- my_ptrdiff_t ptrdiff);
-
+#include "ndb_ndbapi_util.h"
#include "ndb_share.h"
-struct Ndb_tuple_id_range_guard {
- Ndb_tuple_id_range_guard(NDB_SHARE* _share) :
- share(_share),
- range(share->tuple_id_range) {
- pthread_mutex_lock(&share->mutex);
- }
- ~Ndb_tuple_id_range_guard() {
- pthread_mutex_unlock(&share->mutex);
- }
- NDB_SHARE* share;
- Ndb::TupleIdRange& range;
-};
-
-/* NDB_SHARE.flags */
-#define NSF_HIDDEN_PK 1u /* table has hidden primary key */
-#define NSF_BLOB_FLAG 2u /* table has blob attributes */
-#define NSF_NO_BINLOG 4u /* table should not be binlogged */
-#define NSF_BINLOG_FULL 8u /* table should be binlogged with full rows */
-#define NSF_BINLOG_USE_UPDATE 16u /* table update should be binlogged using
- update log event */
-inline void set_binlog_logging(NDB_SHARE *share)
-{
- DBUG_PRINT("info", ("set_binlog_logging"));
- share->flags&= ~NSF_NO_BINLOG;
-}
-inline void set_binlog_nologging(NDB_SHARE *share)
-{
- DBUG_PRINT("info", ("set_binlog_nologging"));
- share->flags|= NSF_NO_BINLOG;
-}
-inline my_bool get_binlog_nologging(NDB_SHARE *share)
-{ return (share->flags & NSF_NO_BINLOG) != 0; }
-inline void set_binlog_updated_only(NDB_SHARE *share)
-{
- DBUG_PRINT("info", ("set_binlog_updated_only"));
- share->flags&= ~NSF_BINLOG_FULL;
-}
-inline void set_binlog_full(NDB_SHARE *share)
-{
- DBUG_PRINT("info", ("set_binlog_full"));
- share->flags|= NSF_BINLOG_FULL;
-}
-inline my_bool get_binlog_full(NDB_SHARE *share)
-{ return (share->flags & NSF_BINLOG_FULL) != 0; }
-inline void set_binlog_use_write(NDB_SHARE *share)
-{
- DBUG_PRINT("info", ("set_binlog_use_write"));
- share->flags&= ~NSF_BINLOG_USE_UPDATE;
-}
-inline void set_binlog_use_update(NDB_SHARE *share)
-{
- DBUG_PRINT("info", ("set_binlog_use_update"));
- share->flags|= NSF_BINLOG_USE_UPDATE;
-}
-inline my_bool get_binlog_use_update(NDB_SHARE *share)
-{ return (share->flags & NSF_BINLOG_USE_UPDATE) != 0; }
-
enum enum_slave_trans_conflict_apply_state
{
/* Normal with optional row-level conflict detection */
=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc 2011-10-27 12:19:57 +0000
+++ b/sql/ha_ndbcluster_binlog.cc 2011-10-27 12:33:39 +0000
@@ -56,6 +56,8 @@ bool ndb_log_empty_epochs(void);
#include "ha_ndbcluster_tables.h"
#include "ndb_dist_priv_util.h"
+#include "ndb_anyvalue.h"
+#include "ndb_binlog_extra_row_info.h"
/*
Timeout for syncing schema events between
@@ -1313,6 +1315,141 @@ static int ndbcluster_find_all_databases
}
}
+
+/*
+ find all tables in ndb and discover those needed
+*/
+static
+int ndbcluster_find_all_files(THD *thd)
+{
+ Ndb* ndb;
+ char key[FN_REFLEN + 1];
+ NDBDICT *dict;
+ int unhandled, retries= 5, skipped;
+ DBUG_ENTER("ndbcluster_find_all_files");
+
+ if (!(ndb= check_ndb_in_thd(thd)))
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+ dict= ndb->getDictionary();
+
+ LINT_INIT(unhandled);
+ LINT_INIT(skipped);
+ do
+ {
+ NdbDictionary::Dictionary::List list;
+ if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
+ DBUG_RETURN(1);
+ unhandled= 0;
+ skipped= 0;
+ retries--;
+ for (uint i= 0 ; i < list.count ; i++)
+ {
+ NDBDICT::List::Element& elmt= list.elements[i];
+ if (IS_TMP_PREFIX(elmt.name) || IS_NDB_BLOB_PREFIX(elmt.name))
+ {
+ DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name));
+ continue;
+ }
+ DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name));
+ if (elmt.state != NDBOBJ::StateOnline &&
+ elmt.state != NDBOBJ::StateBackup &&
+ elmt.state != NDBOBJ::StateBuilding)
+ {
+ sql_print_information("NDB: skipping setup table %s.%s, in state %d",
+ elmt.database, elmt.name, elmt.state);
+ skipped++;
+ continue;
+ }
+
+ ndb->setDatabaseName(elmt.database);
+ Ndb_table_guard ndbtab_g(dict, elmt.name);
+ const NDBTAB *ndbtab= ndbtab_g.get_table();
+ if (!ndbtab)
+ {
+ if (retries == 0)
+ sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
+ elmt.database, elmt.name,
+ dict->getNdbError().code,
+ dict->getNdbError().message);
+ unhandled++;
+ continue;
+ }
+
+ if (ndbtab->getFrmLength() == 0)
+ continue;
+
+ /* check if database exists */
+ char *end= key +
+ build_table_filename(key, sizeof(key) - 1, elmt.database, "", "", 0);
+ if (my_access(key, F_OK))
+ {
+ /* no such database defined, skip table */
+ continue;
+ }
+ /* finalize construction of path */
+ end+= tablename_to_filename(elmt.name, end,
+ (uint)(sizeof(key)-(end-key)));
+ uchar *data= 0, *pack_data= 0;
+ size_t length, pack_length;
+ int discover= 0;
+ if (readfrm(key, &data, &length) ||
+ packfrm(data, length, &pack_data, &pack_length))
+ {
+ discover= 1;
+ sql_print_information("NDB: missing frm for %s.%s, discovering...",
+ elmt.database, elmt.name);
+ }
+ else if (cmp_frm(ndbtab, pack_data, pack_length))
+ {
+ /* ndb_share reference temporary */
+ NDB_SHARE *share= get_share(key, 0, FALSE);
+ if (share)
+ {
+ DBUG_PRINT("NDB_SHARE", ("%s temporary use_count: %u",
+ share->key, share->use_count));
+ }
+ if (!share || get_ndb_share_state(share) != NSS_ALTERED)
+ {
+ discover= 1;
+ sql_print_information("NDB: mismatch in frm for %s.%s,"
+ " discovering...",
+ elmt.database, elmt.name);
+ }
+ if (share)
+ {
+ /* ndb_share reference temporary free */
+ DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
+ share->key, share->use_count));
+ free_share(&share);
+ }
+ }
+ my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR));
+ my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR));
+
+ if (discover)
+ {
+ /* ToDo 4.1 database needs to be created if missing */
+ if (ndb_create_table_from_engine(thd, elmt.database, elmt.name))
+ {
+ /* ToDo 4.1 handle error */
+ }
+ }
+ else
+ {
+ /* set up replication for this table */
+ ndbcluster_create_binlog_setup(thd, ndb, key, (uint)(end-key),
+ elmt.database, elmt.name,
+ 0);
+ }
+ }
+ }
+ while (unhandled && retries);
+
+ DBUG_RETURN(-(skipped + unhandled));
+}
+
+
bool
ndb_binlog_setup(THD *thd)
{
@@ -1391,307 +1528,6 @@ ndb_binlog_setup(THD *thd)
#define SCHEMA_SIZE 9u
#define SCHEMA_SLOCK_SIZE 32u
-struct Cluster_schema
-{
- uchar db_length;
- char db[64];
- uchar name_length;
- char name[64];
- uchar slock_length;
- uint32 slock[SCHEMA_SLOCK_SIZE/4];
- unsigned short query_length;
- char *query;
- Uint64 epoch;
- uint32 node_id;
- uint32 id;
- uint32 version;
- uint32 type;
- uint32 any_value;
-};
-
-static void
-print_could_not_discover_error(THD *thd,
- const Cluster_schema *schema)
-{
- sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
- "binlog schema event '%s' from node %d. "
- "my_errno: %d",
- schema->db, schema->name, schema->query,
- schema->node_id, my_errno);
- print_warning_list("NDB Binlog", thd);
-}
-
-
-/*
- Transfer schema table data into corresponding struct
-*/
-static void ndbcluster_get_schema(Ndb_event_data *event_data,
- Cluster_schema *s)
-{
- TABLE *table= event_data->shadow_table;
- Field **field;
- /* unpack blob values */
- uchar* blobs_buffer= 0;
- uint blobs_buffer_size= 0;
- my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
- {
- ptrdiff_t ptrdiff= 0;
- int ret= get_ndb_blobs_value(table, event_data->ndb_value[0],
- blobs_buffer, blobs_buffer_size,
- ptrdiff);
- if (ret != 0)
- {
- my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
- DBUG_PRINT("info", ("blob read error"));
- DBUG_ASSERT(FALSE);
- }
- }
- /* db varchar 1 length uchar */
- field= table->field;
- s->db_length= *(uint8*)(*field)->ptr;
- DBUG_ASSERT(s->db_length <= (*field)->field_length);
- DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->db));
- memcpy(s->db, (*field)->ptr + 1, s->db_length);
- s->db[s->db_length]= 0;
- /* name varchar 1 length uchar */
- field++;
- s->name_length= *(uint8*)(*field)->ptr;
- DBUG_ASSERT(s->name_length <= (*field)->field_length);
- DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->name));
- memcpy(s->name, (*field)->ptr + 1, s->name_length);
- s->name[s->name_length]= 0;
- /* slock fixed length */
- field++;
- s->slock_length= (*field)->field_length;
- DBUG_ASSERT((*field)->field_length == sizeof(s->slock));
- memcpy(s->slock, (*field)->ptr, s->slock_length);
- /* query blob */
- field++;
- {
- Field_blob *field_blob= (Field_blob*)(*field);
- uint blob_len= field_blob->get_length((*field)->ptr);
- uchar *blob_ptr= 0;
- field_blob->get_ptr(&blob_ptr);
- DBUG_ASSERT(blob_len == 0 || blob_ptr != 0);
- s->query_length= blob_len;
- s->query= sql_strmake((char*) blob_ptr, blob_len);
- }
- /* node_id */
- field++;
- s->node_id= (Uint32)((Field_long *)*field)->val_int();
- /* epoch */
- field++;
- s->epoch= ((Field_long *)*field)->val_int();
- /* id */
- field++;
- s->id= (Uint32)((Field_long *)*field)->val_int();
- /* version */
- field++;
- s->version= (Uint32)((Field_long *)*field)->val_int();
- /* type */
- field++;
- s->type= (Uint32)((Field_long *)*field)->val_int();
- /* free blobs buffer */
- my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
- dbug_tmp_restore_column_map(table->read_set, old_map);
-}
-
-/*
- helper function to pack a ndb varchar
-*/
-char *ndb_pack_varchar(const NDBCOL *col, char *buf,
- const char *str, int sz)
-{
- switch (col->getArrayType())
- {
- case NDBCOL::ArrayTypeFixed:
- memcpy(buf, str, sz);
- break;
- case NDBCOL::ArrayTypeShortVar:
- *(uchar*)buf= (uchar)sz;
- memcpy(buf + 1, str, sz);
- break;
- case NDBCOL::ArrayTypeMediumVar:
- int2store(buf, sz);
- memcpy(buf + 2, str, sz);
- break;
- }
- return buf;
-}
-
-/*
- acknowledge handling of schema operation
-*/
-static int
-ndbcluster_update_slock(THD *thd,
- const char *db,
- const char *table_name,
- uint32 table_id,
- uint32 table_version)
-{
- DBUG_ENTER("ndbcluster_update_slock");
- if (!ndb_schema_share)
- {
- DBUG_RETURN(0);
- }
-
- const NdbError *ndb_error= 0;
- uint32 node_id= g_ndb_cluster_connection->node_id();
- Ndb *ndb= check_ndb_in_thd(thd);
- char save_db[FN_HEADLEN];
- strcpy(save_db, ndb->getDatabaseName());
-
- char tmp_buf[FN_REFLEN];
- NDBDICT *dict= ndb->getDictionary();
- ndb->setDatabaseName(NDB_REP_DB);
- Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
- const NDBTAB *ndbtab= ndbtab_g.get_table();
- NdbTransaction *trans= 0;
- int retries= 100;
- int retry_sleep= 30; /* 30 milliseconds, transaction */
- const NDBCOL *col[SCHEMA_SIZE];
- unsigned sz[SCHEMA_SIZE];
-
- MY_BITMAP slock;
- uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
- bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);
-
- if (ndbtab == 0)
- {
- if (dict->getNdbError().code != 4009)
- abort();
- DBUG_RETURN(0);
- }
-
- {
- uint i;
- for (i= 0; i < SCHEMA_SIZE; i++)
- {
- col[i]= ndbtab->getColumn(i);
- if (i != SCHEMA_QUERY_I)
- {
- sz[i]= col[i]->getLength();
- DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
- }
- }
- }
-
- while (1)
- {
- if ((trans= ndb->startTransaction()) == 0)
- goto err;
- {
- NdbOperation *op= 0;
- int r= 0;
-
- /* read the bitmap exlusive */
- r|= (op= trans->getNdbOperation(ndbtab)) == 0;
- DBUG_ASSERT(r == 0);
- r|= op->readTupleExclusive();
- DBUG_ASSERT(r == 0);
-
- /* db */
- ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, (int)strlen(db));
- r|= op->equal(SCHEMA_DB_I, tmp_buf);
- DBUG_ASSERT(r == 0);
- /* name */
- ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
- (int)strlen(table_name));
- r|= op->equal(SCHEMA_NAME_I, tmp_buf);
- DBUG_ASSERT(r == 0);
- /* slock */
- r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
- DBUG_ASSERT(r == 0);
- }
- if (trans->execute(NdbTransaction::NoCommit))
- goto err;
-
- if (opt_ndb_extra_logging > 19)
- {
- uint32 copy[SCHEMA_SLOCK_SIZE/4];
- memcpy(copy, bitbuf, sizeof(copy));
- bitmap_clear_bit(&slock, node_id);
- sql_print_information("NDB: reply to %s.%s(%u/%u) from %x%x to %x%x",
- db, table_name,
- table_id, table_version,
- copy[0], copy[1],
- slock.bitmap[0],
- slock.bitmap[1]);
- }
- else
- {
- bitmap_clear_bit(&slock, node_id);
- }
-
- {
- NdbOperation *op= 0;
- int r= 0;
-
- /* now update the tuple */
- r|= (op= trans->getNdbOperation(ndbtab)) == 0;
- DBUG_ASSERT(r == 0);
- r|= op->updateTuple();
- DBUG_ASSERT(r == 0);
-
- /* db */
- ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, (int)strlen(db));
- r|= op->equal(SCHEMA_DB_I, tmp_buf);
- DBUG_ASSERT(r == 0);
- /* name */
- ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
- (int)strlen(table_name));
- r|= op->equal(SCHEMA_NAME_I, tmp_buf);
- DBUG_ASSERT(r == 0);
- /* slock */
- r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
- DBUG_ASSERT(r == 0);
- /* node_id */
- r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
- DBUG_ASSERT(r == 0);
- /* type */
- r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
- DBUG_ASSERT(r == 0);
- }
- if (trans->execute(NdbTransaction::Commit,
- NdbOperation::DefaultAbortOption, 1 /*force send*/) == 0)
- {
- DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
- node_id, db, table_name));
- dict->forceGCPWait(1);
- break;
- }
- err:
- const NdbError *this_error= trans ?
- &trans->getNdbError() : &ndb->getNdbError();
- if (this_error->status == NdbError::TemporaryError && !thd->killed)
- {
- if (retries--)
- {
- if (trans)
- ndb->closeTransaction(trans);
- do_retry_sleep(retry_sleep);
- continue; // retry
- }
- }
- ndb_error= this_error;
- break;
- }
-
- if (ndb_error)
- {
- char buf[1024];
- my_snprintf(buf, sizeof(buf), "Could not release lock on '%s.%s'",
- db, table_name);
- push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- ndb_error->code, ndb_error->message, buf);
- }
- if (trans)
- ndb->closeTransaction(trans);
- ndb->setDatabaseName(save_db);
- DBUG_RETURN(0);
-}
-
/*
log query in schema table
@@ -2355,68 +2191,6 @@ ndb_handle_schema_change(THD *thd, Ndb *
DBUG_RETURN(0);
}
-static void ndb_binlog_query(THD *thd, Cluster_schema *schema)
-{
- /* any_value == 0 means local cluster sourced change that
- * should be logged
- */
- if (ndbcluster_anyvalue_is_reserved(schema->any_value))
- {
- /* Originating SQL node did not want this query logged */
- if (!ndbcluster_anyvalue_is_nologging(schema->any_value))
- sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
- "query not logged",
- schema->any_value);
- return;
- }
-
- Uint32 queryServerId = ndbcluster_anyvalue_get_serverid(schema->any_value);
- /*
- Start with serverId as received AnyValue, in case it's a composite
- (server_id_bits < 31).
- This is for 'future', as currently schema ops do not have composite
- AnyValues.
- In future it may be useful to support *not* mapping composite
- AnyValues to/from Binlogged server-ids.
- */
- Uint32 loggedServerId = schema->any_value;
-
- if (queryServerId)
- {
- /*
- AnyValue has non-zero serverId, must be a query applied by a slave
- mysqld.
- TODO : Assert that we are running in the Binlog injector thread?
- */
- if (! g_ndb_log_slave_updates)
- {
- /* This MySQLD does not log slave updates */
- return;
- }
- }
- else
- {
- /* No ServerId associated with this query, mark it as ours */
- ndbcluster_anyvalue_set_serverid(loggedServerId, ::server_id);
- }
-
- uint32 thd_server_id_save= thd->server_id;
- DBUG_ASSERT(sizeof(thd_server_id_save) == sizeof(thd->server_id));
- char *thd_db_save= thd->db;
- thd->server_id = loggedServerId;
- thd->db= schema->db;
- int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED);
- thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
- schema->query_length, FALSE,
-#ifdef NDB_THD_BINLOG_QUERY_HAS_DIRECT
- TRUE,
-#endif
- schema->name[0] == 0 || thd->db[0] == 0,
- errcode);
- thd->server_id= thd_server_id_save;
- thd->db= thd_db_save;
-}
-
class Mutex_guard
{
@@ -2434,16 +2208,361 @@ private:
};
+class Ndb_schema_event_handler {
+
+ struct Cluster_schema
+ {
+ uchar db_length;
+ char db[64];
+ uchar name_length;
+ char name[64];
+ uchar slock_length;
+ uint32 slock[SCHEMA_SLOCK_SIZE/4];
+ unsigned short query_length;
+ char *query;
+ Uint64 epoch;
+ uint32 node_id;
+ uint32 id;
+ uint32 version;
+ uint32 type;
+ uint32 any_value;
+ };
+
+
+ static void
+ print_could_not_discover_error(THD *thd,
+ const Cluster_schema *schema)
+ {
+ sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
+ "binlog schema event '%s' from node %d. "
+ "my_errno: %d",
+ schema->db, schema->name, schema->query,
+ schema->node_id, my_errno);
+ print_warning_list("NDB Binlog", thd);
+ }
+
+
+ /*
+ Transfer schema table event data into Cluster_schema struct
+ */
+ static void ndbcluster_get_schema(const Ndb_event_data *event_data,
+ Cluster_schema *s)
+ {
+ TABLE *table= event_data->shadow_table;
+ Field **field;
+ /* unpack blob values */
+ uchar* blobs_buffer= 0;
+ uint blobs_buffer_size= 0;
+ my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
+ {
+ ptrdiff_t ptrdiff= 0;
+ int ret= get_ndb_blobs_value(table, event_data->ndb_value[0],
+ blobs_buffer, blobs_buffer_size,
+ ptrdiff);
+ if (ret != 0)
+ {
+ my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
+ DBUG_PRINT("info", ("blob read error"));
+ DBUG_ASSERT(FALSE);
+ }
+ }
+ /* db varchar 1 length uchar */
+ field= table->field;
+ s->db_length= *(uint8*)(*field)->ptr;
+ DBUG_ASSERT(s->db_length <= (*field)->field_length);
+ DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->db));
+ memcpy(s->db, (*field)->ptr + 1, s->db_length);
+ s->db[s->db_length]= 0;
+ /* name varchar 1 length uchar */
+ field++;
+ s->name_length= *(uint8*)(*field)->ptr;
+ DBUG_ASSERT(s->name_length <= (*field)->field_length);
+ DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->name));
+ memcpy(s->name, (*field)->ptr + 1, s->name_length);
+ s->name[s->name_length]= 0;
+ /* slock fixed length */
+ field++;
+ s->slock_length= (*field)->field_length;
+ DBUG_ASSERT((*field)->field_length == sizeof(s->slock));
+ memcpy(s->slock, (*field)->ptr, s->slock_length);
+ /* query blob */
+ field++;
+ {
+ Field_blob *field_blob= (Field_blob*)(*field);
+ uint blob_len= field_blob->get_length((*field)->ptr);
+ uchar *blob_ptr= 0;
+ field_blob->get_ptr(&blob_ptr);
+ DBUG_ASSERT(blob_len == 0 || blob_ptr != 0);
+ s->query_length= blob_len;
+ s->query= sql_strmake((char*) blob_ptr, blob_len);
+ }
+ /* node_id */
+ field++;
+ s->node_id= (Uint32)((Field_long *)*field)->val_int();
+ /* epoch */
+ field++;
+ s->epoch= ((Field_long *)*field)->val_int();
+ /* id */
+ field++;
+ s->id= (Uint32)((Field_long *)*field)->val_int();
+ /* version */
+ field++;
+ s->version= (Uint32)((Field_long *)*field)->val_int();
+ /* type */
+ field++;
+ s->type= (Uint32)((Field_long *)*field)->val_int();
+ /* free blobs buffer */
+ my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
+ dbug_tmp_restore_column_map(table->read_set, old_map);
+ }
+
+
+ static void ndb_binlog_query(THD *thd, Cluster_schema *schema)
+ {
+ /* any_value == 0 means local cluster sourced change that
+ * should be logged
+ */
+ if (ndbcluster_anyvalue_is_reserved(schema->any_value))
+ {
+ /* Originating SQL node did not want this query logged */
+ if (!ndbcluster_anyvalue_is_nologging(schema->any_value))
+ sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
+ "query not logged",
+ schema->any_value);
+ return;
+ }
+
+ Uint32 queryServerId = ndbcluster_anyvalue_get_serverid(schema->any_value);
+ /*
+ Start with serverId as received AnyValue, in case it's a composite
+ (server_id_bits < 31).
+ This is for 'future', as currently schema ops do not have composite
+ AnyValues.
+ In future it may be useful to support *not* mapping composite
+ AnyValues to/from Binlogged server-ids.
+ */
+ Uint32 loggedServerId = schema->any_value;
+
+ if (queryServerId)
+ {
+ /*
+ AnyValue has non-zero serverId, must be a query applied by a slave
+ mysqld.
+ TODO : Assert that we are running in the Binlog injector thread?
+ */
+ if (! g_ndb_log_slave_updates)
+ {
+ /* This MySQLD does not log slave updates */
+ return;
+ }
+ }
+ else
+ {
+ /* No ServerId associated with this query, mark it as ours */
+ ndbcluster_anyvalue_set_serverid(loggedServerId, ::server_id);
+ }
+
+ uint32 thd_server_id_save= thd->server_id;
+ DBUG_ASSERT(sizeof(thd_server_id_save) == sizeof(thd->server_id));
+ char *thd_db_save= thd->db;
+ thd->server_id = loggedServerId;
+ thd->db= schema->db;
+ int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED);
+ thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
+ schema->query_length, FALSE,
+ #ifdef NDB_THD_BINLOG_QUERY_HAS_DIRECT
+ TRUE,
+ #endif
+ schema->name[0] == 0 || thd->db[0] == 0,
+ errcode);
+ thd->server_id= thd_server_id_save;
+ thd->db= thd_db_save;
+ }
+
+
+
+ /*
+ acknowledge handling of schema operation
+ */
+ static int
+ ndbcluster_update_slock(THD *thd,
+ const char *db,
+ const char *table_name,
+ uint32 table_id,
+ uint32 table_version)
+ {
+ DBUG_ENTER("ndbcluster_update_slock");
+ if (!ndb_schema_share)
+ {
+ DBUG_RETURN(0);
+ }
+
+ const NdbError *ndb_error= 0;
+ uint32 node_id= g_ndb_cluster_connection->node_id();
+ Ndb *ndb= check_ndb_in_thd(thd);
+ char save_db[FN_HEADLEN];
+ strcpy(save_db, ndb->getDatabaseName());
+
+ char tmp_buf[FN_REFLEN];
+ NDBDICT *dict= ndb->getDictionary();
+ ndb->setDatabaseName(NDB_REP_DB);
+ Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
+ const NDBTAB *ndbtab= ndbtab_g.get_table();
+ NdbTransaction *trans= 0;
+ int retries= 100;
+ int retry_sleep= 30; /* 30 milliseconds, transaction */
+ const NDBCOL *col[SCHEMA_SIZE];
+ unsigned sz[SCHEMA_SIZE];
+
+ MY_BITMAP slock;
+ uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
+ bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);
+
+ if (ndbtab == 0)
+ {
+ if (dict->getNdbError().code != 4009)
+ abort();
+ DBUG_RETURN(0);
+ }
+
+ {
+ uint i;
+ for (i= 0; i < SCHEMA_SIZE; i++)
+ {
+ col[i]= ndbtab->getColumn(i);
+ if (i != SCHEMA_QUERY_I)
+ {
+ sz[i]= col[i]->getLength();
+ DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
+ }
+ }
+ }
+
+ while (1)
+ {
+ if ((trans= ndb->startTransaction()) == 0)
+ goto err;
+ {
+ NdbOperation *op= 0;
+ int r= 0;
+
+ /* read the bitmap exlusive */
+ r|= (op= trans->getNdbOperation(ndbtab)) == 0;
+ DBUG_ASSERT(r == 0);
+ r|= op->readTupleExclusive();
+ DBUG_ASSERT(r == 0);
+
+ /* db */
+ ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, (int)strlen(db));
+ r|= op->equal(SCHEMA_DB_I, tmp_buf);
+ DBUG_ASSERT(r == 0);
+ /* name */
+ ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
+ (int)strlen(table_name));
+ r|= op->equal(SCHEMA_NAME_I, tmp_buf);
+ DBUG_ASSERT(r == 0);
+ /* slock */
+ r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
+ DBUG_ASSERT(r == 0);
+ }
+ if (trans->execute(NdbTransaction::NoCommit))
+ goto err;
+
+ if (opt_ndb_extra_logging > 19)
+ {
+ uint32 copy[SCHEMA_SLOCK_SIZE/4];
+ memcpy(copy, bitbuf, sizeof(copy));
+ bitmap_clear_bit(&slock, node_id);
+ sql_print_information("NDB: reply to %s.%s(%u/%u) from %x%x to %x%x",
+ db, table_name,
+ table_id, table_version,
+ copy[0], copy[1],
+ slock.bitmap[0],
+ slock.bitmap[1]);
+ }
+ else
+ {
+ bitmap_clear_bit(&slock, node_id);
+ }
+
+ {
+ NdbOperation *op= 0;
+ int r= 0;
+
+ /* now update the tuple */
+ r|= (op= trans->getNdbOperation(ndbtab)) == 0;
+ DBUG_ASSERT(r == 0);
+ r|= op->updateTuple();
+ DBUG_ASSERT(r == 0);
+
+ /* db */
+ ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, (int)strlen(db));
+ r|= op->equal(SCHEMA_DB_I, tmp_buf);
+ DBUG_ASSERT(r == 0);
+ /* name */
+ ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
+ (int)strlen(table_name));
+ r|= op->equal(SCHEMA_NAME_I, tmp_buf);
+ DBUG_ASSERT(r == 0);
+ /* slock */
+ r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
+ DBUG_ASSERT(r == 0);
+ /* node_id */
+ r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
+ DBUG_ASSERT(r == 0);
+ /* type */
+ r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
+ DBUG_ASSERT(r == 0);
+ }
+ if (trans->execute(NdbTransaction::Commit,
+ NdbOperation::DefaultAbortOption, 1 /*force send*/) == 0)
+ {
+ DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
+ node_id, db, table_name));
+ dict->forceGCPWait(1);
+ break;
+ }
+ err:
+ const NdbError *this_error= trans ?
+ &trans->getNdbError() : &ndb->getNdbError();
+ if (this_error->status == NdbError::TemporaryError && !thd->killed)
+ {
+ if (retries--)
+ {
+ if (trans)
+ ndb->closeTransaction(trans);
+ do_retry_sleep(retry_sleep);
+ continue; // retry
+ }
+ }
+ ndb_error= this_error;
+ break;
+ }
+
+ if (ndb_error)
+ {
+ char buf[1024];
+ my_snprintf(buf, sizeof(buf), "Could not release lock on '%s.%s'",
+ db, table_name);
+ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
+ ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+ ndb_error->code, ndb_error->message, buf);
+ }
+ if (trans)
+ ndb->closeTransaction(trans);
+ ndb->setDatabaseName(save_db);
+ DBUG_RETURN(0);
+ }
+
+
static int
-ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *s_ndb,
- NdbEventOperation *pOp,
- List<Cluster_schema>
- *post_epoch_log_list,
- List<Cluster_schema>
- *post_epoch_unlock_list,
- MEM_ROOT *mem_root)
+handle_schema_event(THD *thd, Ndb *s_ndb,
+ NdbEventOperation *pOp,
+ List<Cluster_schema> *post_epoch_log_list,
+ List<Cluster_schema> *post_epoch_unlock_list,
+ MEM_ROOT *mem_root)
{
- DBUG_ENTER("ndb_binlog_thread_handle_schema_event");
+ DBUG_ENTER("handle_schema_event");
Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
NDB_SHARE *tmp_share= event_data->share;
if (tmp_share && ndb_schema_share == tmp_share)
@@ -2811,20 +2930,17 @@ ndb_binlog_thread_handle_schema_event(TH
the epoch is complete
*/
static void
-ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
- List<Cluster_schema>
- *post_epoch_log_list,
- List<Cluster_schema>
- *post_epoch_unlock_list)
+handle_schema_log_post_epoch(THD *thd,
+ List<Cluster_schema> *log_list)
{
- if (post_epoch_log_list->elements == 0)
- return;
- DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
- Cluster_schema *schema;
+ DBUG_ENTER("handle_schema_log_post_epoch");
+
Thd_ndb *thd_ndb= get_thd_ndb(thd);
Ndb *ndb= thd_ndb->ndb;
NDBDICT *dict= ndb->getDictionary();
- while ((schema= post_epoch_log_list->pop()))
+
+ Cluster_schema *schema;
+ while ((schema= log_list->pop()))
{
Thd_ndb_options_guard thd_ndb_options(thd_ndb);
DBUG_PRINT("info",
@@ -3181,7 +3297,18 @@ ndb_binlog_thread_handle_schema_event_po
if (ndb_binlog_running && log_query)
ndb_binlog_query(thd, schema);
}
- while ((schema= post_epoch_unlock_list->pop()))
+ DBUG_VOID_RETURN;
+}
+
+
+static void
+handle_schema_unlock_post_epoch(THD *thd,
+ List<Cluster_schema> *unlock_list)
+{
+ DBUG_ENTER("handle_schema_unlock_post_epoch");
+
+ Cluster_schema *schema;
+ while ((schema= unlock_list->pop()))
{
ndbcluster_update_slock(thd, schema->db, schema->name,
schema->id, schema->version);
@@ -3189,9 +3316,49 @@ ndb_binlog_thread_handle_schema_event_po
DBUG_VOID_RETURN;
}
-/*
- Timer class for doing performance measurements
-*/
+ THD* m_thd;
+ MEM_ROOT* m_mem_root;
+
+ List<Cluster_schema> m_post_epoch_log_list;
+ List<Cluster_schema> m_post_epoch_unlock_list;
+
+public:
+ Ndb_schema_event_handler(); // Not implemented
+ Ndb_schema_event_handler(const Ndb_schema_event_handler&); // Not implemented
+
+ Ndb_schema_event_handler(THD* thd, MEM_ROOT* mem_root):
+ m_thd(thd), m_mem_root(mem_root)
+ {
+ }
+
+ ~Ndb_schema_event_handler()
+ {
+ // There should be no work left todo...
+ DBUG_ASSERT(m_post_epoch_log_list.elements == 0);
+ DBUG_ASSERT(m_post_epoch_unlock_list.elements == 0);
+ }
+
+ void handle_event(Ndb* s_ndb, NdbEventOperation *pOp)
+ {
+ handle_schema_event(m_thd, s_ndb, pOp,
+ &m_post_epoch_log_list,
+ &m_post_epoch_unlock_list,
+ m_mem_root);
+ }
+
+ void post_epoch()
+ {
+ if (m_post_epoch_log_list.elements > 0)
+ {
+ handle_schema_log_post_epoch(m_thd, &m_post_epoch_log_list);
+ // NOTE post_epoch_unlock_list may not be handled!
+ handle_schema_unlock_post_epoch(m_thd, &m_post_epoch_unlock_list);
+ }
+ // There should be no work left todo...
+ DBUG_ASSERT(m_post_epoch_log_list.elements == 0);
+ DBUG_ASSERT(m_post_epoch_unlock_list.elements == 0);
+ }
+};
/*********************************************************************
Internal helper functions for handeling of the cluster replication tables
@@ -5721,11 +5888,11 @@ static void ndb_unpack_record(TABLE *tab
Handle error states on events from the storage nodes
*/
static int
-ndb_binlog_thread_handle_error(NdbEventOperation *pOp)
+handle_error(NdbEventOperation *pOp)
{
Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
NDB_SHARE *share= event_data->share;
- DBUG_ENTER("ndb_binlog_thread_handle_error");
+ DBUG_ENTER("handle_error");
int overrun= pOp->isOverrun();
if (overrun)
@@ -5762,9 +5929,9 @@ ndb_binlog_thread_handle_error(NdbEventO
}
static int
-ndb_binlog_thread_handle_non_data_event(THD *thd,
- NdbEventOperation *pOp,
- ndb_binlog_index_row &row)
+handle_non_data_event(THD *thd,
+ NdbEventOperation *pOp,
+ ndb_binlog_index_row &row)
{
Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
NDB_SHARE *share= event_data->share;
@@ -5897,11 +6064,11 @@ ndb_find_binlog_index_row(ndb_binlog_ind
static int
-ndb_binlog_thread_handle_data_event(THD* thd, Ndb *ndb, NdbEventOperation *pOp,
- ndb_binlog_index_row **rows,
- injector::transaction &trans,
- unsigned &trans_row_count,
- unsigned &trans_slave_row_count)
+handle_data_event(THD* thd, Ndb *ndb, NdbEventOperation *pOp,
+ ndb_binlog_index_row **rows,
+ injector::transaction &trans,
+ unsigned &trans_row_count,
+ unsigned &trans_slave_row_count)
{
Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
TABLE *table= event_data->shadow_table;
@@ -6243,25 +6410,6 @@ ndb_binlog_thread_handle_data_event(THD*
return 0;
}
-//#define RUN_NDB_BINLOG_TIMER
-#ifdef RUN_NDB_BINLOG_TIMER
-class Timer
-{
-public:
- Timer() { start(); }
- void start() { gettimeofday(&m_start, 0); }
- void stop() { gettimeofday(&m_stop, 0); }
- ulong elapsed_ms()
- {
- return (ulong)
- (((longlong) m_stop.tv_sec - (longlong) m_start.tv_sec) * 1000 +
- ((longlong) m_stop.tv_usec -
- (longlong) m_start.tv_usec + 999) / 1000);
- }
-private:
- struct timeval m_start,m_stop;
-};
-#endif
/****************************************************************
Injector thread main loop
@@ -6409,12 +6557,6 @@ void updateInjectorStats(Ndb* schemaNdb,
dataNdb->getClientStat(Ndb::EventBytesRecvdCount);
}
-enum Binlog_thread_state
-{
- BCCC_running= 0,
- BCCC_exit= 1,
- BCCC_restart= 2
-};
extern ulong opt_ndb_report_thresh_binlog_epoch_slip;
extern ulong opt_ndb_report_thresh_binlog_mem_usage;
@@ -6428,7 +6570,8 @@ ndb_binlog_thread_func(void *arg)
Thd_ndb *thd_ndb=0;
injector *inj= injector::instance();
uint incident_id= 0;
- Binlog_thread_state do_ndbcluster_binlog_close_connection;
+
+ enum { BCCC_running, BCCC_exit, BCCC_restart } binlog_thread_state;
/**
* If we get error after having reported incident
@@ -6437,10 +6580,6 @@ ndb_binlog_thread_func(void *arg)
*/
bool do_incident = true;
-#ifdef RUN_NDB_BINLOG_TIMER
- Timer main_timer;
-#endif
-
pthread_mutex_lock(&injector_mutex);
/*
Set up the Thread
@@ -6502,7 +6641,7 @@ ndb_binlog_thread_func(void *arg)
restart_cluster_failure:
int have_injector_mutex_lock= 0;
- do_ndbcluster_binlog_close_connection= BCCC_exit;
+ binlog_thread_state= BCCC_exit;
if (!(thd_ndb= Thd_ndb::seize(thd)))
{
@@ -6633,7 +6772,7 @@ restart_cluster_failure:
thread to get rid of any garbage on the ndb objects
*/
have_injector_mutex_lock= 1;
- do_ndbcluster_binlog_close_connection= BCCC_restart;
+ binlog_thread_state= BCCC_restart;
goto err;
}
/* ndb not connected yet */
@@ -6739,30 +6878,25 @@ restart_cluster_failure:
thd->db= db;
}
do_incident = true; // If we get disconnected again...do incident report
- do_ndbcluster_binlog_close_connection= BCCC_running;
+ binlog_thread_state= BCCC_running;
for ( ; !((ndbcluster_binlog_terminating ||
- do_ndbcluster_binlog_close_connection) &&
+ binlog_thread_state) &&
ndb_latest_handled_binlog_epoch >= ndb_get_latest_trans_gci()) &&
- do_ndbcluster_binlog_close_connection != BCCC_restart; )
+ binlog_thread_state != BCCC_restart; )
{
#ifndef DBUG_OFF
- if (do_ndbcluster_binlog_close_connection)
+ if (binlog_thread_state)
{
- DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection: %d, "
+ DBUG_PRINT("info", ("binlog_thread_state: %d, "
"ndb_latest_handled_binlog_epoch: %u/%u, "
"*get_latest_trans_gci(): %u/%u",
- do_ndbcluster_binlog_close_connection,
+ binlog_thread_state,
(uint)(ndb_latest_handled_binlog_epoch >> 32),
(uint)(ndb_latest_handled_binlog_epoch),
(uint)(ndb_get_latest_trans_gci() >> 32),
(uint)(ndb_get_latest_trans_gci())));
}
#endif
-#ifdef RUN_NDB_BINLOG_TIMER
- main_timer.stop();
- sql_print_information("main_timer %ld ms", main_timer.elapsed_ms());
- main_timer.start();
-#endif
/*
now we don't want any events before next gci is complete
@@ -6795,7 +6929,7 @@ restart_cluster_failure:
}
if ((ndbcluster_binlog_terminating ||
- do_ndbcluster_binlog_close_connection) &&
+ binlog_thread_state) &&
(ndb_latest_handled_binlog_epoch >= ndb_get_latest_trans_gci() ||
!ndb_binlog_running))
break; /* Shutting down server */
@@ -6805,8 +6939,11 @@ restart_cluster_failure:
MEM_ROOT *old_root= *root_ptr;
MEM_ROOT mem_root;
init_sql_alloc(&mem_root, 4096, 0);
- List<Cluster_schema> post_epoch_log_list;
- List<Cluster_schema> post_epoch_unlock_list;
+
+ // The Ndb_schema_event_handler does not necessarily need
+ // to use the same memroot(or vice versa)
+ Ndb_schema_event_handler schema_event_handler(thd, &mem_root);
+
*root_ptr= &mem_root;
if (unlikely(schema_res > 0))
@@ -6822,10 +6959,8 @@ restart_cluster_failure:
{
if (!pOp->hasError())
{
- ndb_binlog_thread_handle_schema_event(thd, s_ndb, pOp,
- &post_epoch_log_list,
- &post_epoch_unlock_list,
- &mem_root);
+ schema_event_handler.handle_event(s_ndb, pOp);
+
DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
"<empty>"));
@@ -6834,10 +6969,10 @@ restart_cluster_failure:
"<empty>"));
if (i_ndb->getEventOperation() == NULL &&
s_ndb->getEventOperation() == NULL &&
- do_ndbcluster_binlog_close_connection == BCCC_running)
+ binlog_thread_state == BCCC_running)
{
- DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
- do_ndbcluster_binlog_close_connection= BCCC_restart;
+ DBUG_PRINT("info", ("binlog_thread_state= BCCC_restart"));
+ binlog_thread_state= BCCC_restart;
if (ndb_latest_received_binlog_epoch < ndb_get_latest_trans_gci() && ndb_binlog_running)
{
sql_print_error("NDB Binlog: latest transaction in epoch %u/%u not in binlog "
@@ -6875,15 +7010,15 @@ restart_cluster_failure:
(unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
{
ndb_binlog_index_row row;
- ndb_binlog_thread_handle_non_data_event(thd, pOp, row);
+ handle_non_data_event(thd, pOp, row);
}
}
if (i_ndb->getEventOperation() == NULL &&
s_ndb->getEventOperation() == NULL &&
- do_ndbcluster_binlog_close_connection == BCCC_running)
+ binlog_thread_state == BCCC_running)
{
- DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
- do_ndbcluster_binlog_close_connection= BCCC_restart;
+ DBUG_PRINT("info", ("binlog_thread_state= BCCC_restart"));
+ binlog_thread_state= BCCC_restart;
}
}
updateInjectorStats(s_ndb, i_ndb);
@@ -6958,11 +7093,6 @@ restart_cluster_failure:
while (pOp != NULL)
{
rows= &_row;
-#ifdef RUN_NDB_BINLOG_TIMER
- Timer gci_timer, write_timer;
- int event_count= 0;
- gci_timer.start();
-#endif
gci= pOp->getGCI();
DBUG_PRINT("info", ("Handling gci: %u/%u",
(uint)(gci >> 32),
@@ -7090,16 +7220,11 @@ restart_cluster_failure:
sql_print_error("NDB: Could not get apply status share");
}
}
-#ifdef RUN_NDB_BINLOG_TIMER
- write_timer.start();
-#endif
+
do
{
-#ifdef RUN_NDB_BINLOG_TIMER
- event_count++;
-#endif
if (pOp->hasError() &&
- ndb_binlog_thread_handle_error(pOp) < 0)
+ handle_error(pOp) < 0)
goto err;
#ifndef DBUG_OFF
@@ -7137,11 +7262,11 @@ restart_cluster_failure:
#endif
if ((unsigned) pOp->getEventType() <
(unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
- ndb_binlog_thread_handle_data_event(thd, i_ndb, pOp, &rows, trans,
- trans_row_count, trans_slave_row_count);
+ handle_data_event(thd, i_ndb, pOp, &rows, trans,
+ trans_row_count, trans_slave_row_count);
else
{
- ndb_binlog_thread_handle_non_data_event(thd, pOp, *rows);
+ handle_non_data_event(thd, pOp, *rows);
DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
"<empty>"));
@@ -7150,10 +7275,10 @@ restart_cluster_failure:
"<empty>"));
if (i_ndb->getEventOperation() == NULL &&
s_ndb->getEventOperation() == NULL &&
- do_ndbcluster_binlog_close_connection == BCCC_running)
+ binlog_thread_state == BCCC_running)
{
- DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
- do_ndbcluster_binlog_close_connection= BCCC_restart;
+ DBUG_PRINT("info", ("binlog_thread_state= BCCC_restart"));
+ binlog_thread_state= BCCC_restart;
if (ndb_latest_received_binlog_epoch < ndb_get_latest_trans_gci() && ndb_binlog_running)
{
sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
@@ -7173,9 +7298,6 @@ restart_cluster_failure:
note! pOp is not referring to an event in the next epoch
or is == 0
*/
-#ifdef RUN_NDB_BINLOG_TIMER
- write_timer.stop();
-#endif
while (trans.good())
{
@@ -7250,18 +7372,8 @@ restart_cluster_failure:
break;
}
ndb_latest_handled_binlog_epoch= gci;
-
-#ifdef RUN_NDB_BINLOG_TIMER
- gci_timer.stop();
- sql_print_information("gci %ld event_count %d write time "
- "%ld(%d e/s), total time %ld(%d e/s)",
- (ulong)gci, event_count,
- write_timer.elapsed_ms(),
- (1000*event_count) / write_timer.elapsed_ms(),
- gci_timer.elapsed_ms(),
- (1000*event_count) / gci_timer.elapsed_ms());
-#endif
}
+
if(!i_ndb->isConsistent(gci))
{
char errmsg[64];
@@ -7277,15 +7389,16 @@ restart_cluster_failure:
}
}
- ndb_binlog_thread_handle_schema_event_post_epoch(thd,
- &post_epoch_log_list,
- &post_epoch_unlock_list);
+ // Notify the schema event handler about post_epoch so it may finish
+ // any outstanding business
+ schema_event_handler.post_epoch();
+
free_root(&mem_root, MYF(0));
*root_ptr= old_root;
ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
}
err:
- if (do_ndbcluster_binlog_close_connection != BCCC_restart)
+ if (binlog_thread_state != BCCC_restart)
{
sql_print_information("Stopping Cluster Binlog");
DBUG_PRINT("info",("Shutting down cluster binlog thread"));
@@ -7411,7 +7524,7 @@ restart_cluster_failure:
}
}
- if (do_ndbcluster_binlog_close_connection == BCCC_restart)
+ if (binlog_thread_state == BCCC_restart)
{
pthread_mutex_lock(&injector_mutex);
goto restart_cluster_failure;
@@ -7471,88 +7584,6 @@ ndbcluster_show_status_binlog(THD* thd,
DBUG_RETURN(FALSE);
}
-/*
- AnyValue carries ServerId or Reserved codes
- Bits from opt_server_id_bits to 30 may carry other data
- so we ignore them when reading/setting AnyValue.
-
- 332 21 10 0
- 10987654321098765432109876543210
- roooooooooooooooooooooooosssssss
-
- r = Reserved bit indicates whether
- bits 0-7+ have ServerId (0) or
- some special reserved code (1).
- o = Optional bits, depending on value
- of server-id-bits will be
- serverid bits or user-specific
- data
- s = Serverid bits or reserved codes
- At least 7 bits will be available
- for serverid or reserved codes
-
-*/
-
-#define NDB_ANYVALUE_RESERVED_BIT 0x80000000
-#define NDB_ANYVALUE_RESERVED_MASK 0x8000007f
-
-#define NDB_ANYVALUE_NOLOGGING_CODE 0x8000007f
-
-#ifndef DBUG_OFF
-void dbug_ndbcluster_anyvalue_set_userbits(Uint32& anyValue)
-{
- /*
- Set userData part of AnyValue (if there is one) to
- all 1s to test that it is ignored
- */
- const Uint32 userDataMask = ~(opt_server_id_mask |
- NDB_ANYVALUE_RESERVED_BIT);
-
- anyValue |= userDataMask;
-}
-#endif
-
-bool ndbcluster_anyvalue_is_reserved(Uint32 anyValue)
-{
- return ((anyValue & NDB_ANYVALUE_RESERVED_BIT) != 0);
-}
-
-bool ndbcluster_anyvalue_is_nologging(Uint32 anyValue)
-{
- return ((anyValue & NDB_ANYVALUE_RESERVED_MASK) ==
- NDB_ANYVALUE_NOLOGGING_CODE);
-}
-
-void ndbcluster_anyvalue_set_nologging(Uint32& anyValue)
-{
- anyValue |= NDB_ANYVALUE_NOLOGGING_CODE;
-}
-
-void ndbcluster_anyvalue_set_normal(Uint32& anyValue)
-{
- /* Clear reserved bit and serverid bits */
- anyValue &= ~(NDB_ANYVALUE_RESERVED_BIT);
- anyValue &= ~(opt_server_id_mask);
-}
-
-bool ndbcluster_anyvalue_is_serverid_in_range(Uint32 serverId)
-{
- return ((serverId & ~opt_server_id_mask) == 0);
-}
-
-Uint32 ndbcluster_anyvalue_get_serverid(Uint32 anyValue)
-{
- assert(! (anyValue & NDB_ANYVALUE_RESERVED_BIT) );
-
- return (anyValue & opt_server_id_mask);
-}
-
-void ndbcluster_anyvalue_set_serverid(Uint32& anyValue, Uint32 serverId)
-{
- assert(! (anyValue & NDB_ANYVALUE_RESERVED_BIT) );
- anyValue &= ~(opt_server_id_mask);
- anyValue |= (serverId & opt_server_id_mask);
-}
#ifdef NDB_WITHOUT_SERVER_ID_BITS
@@ -7561,121 +7592,5 @@ ulong opt_server_id_mask = ~0;
#endif
-Ndb_binlog_extra_row_info::
-Ndb_binlog_extra_row_info()
-{
- flags = 0;
- transactionId = InvalidTransactionId;
- /* Prepare buffer with extra row info buffer bytes */
- buff[ EXTRA_ROW_INFO_LEN_OFFSET ] = 0;
- buff[ EXTRA_ROW_INFO_FORMAT_OFFSET ] = ERIF_NDB;
-}
-
-void
-Ndb_binlog_extra_row_info::
-setFlags(Uint16 _flags)
-{
- flags = _flags;
-}
-
-void
-Ndb_binlog_extra_row_info::
-setTransactionId(Uint64 _transactionId)
-{
- assert(_transactionId != InvalidTransactionId);
- transactionId = _transactionId;
-};
-
-int
-Ndb_binlog_extra_row_info::
-loadFromBuffer(const uchar* extra_row_info)
-{
- assert(extra_row_info);
-
- Uint8 length = extra_row_info[ EXTRA_ROW_INFO_LEN_OFFSET ];
- assert(length >= EXTRA_ROW_INFO_HDR_BYTES);
- Uint8 payload_length = length - EXTRA_ROW_INFO_HDR_BYTES;
- Uint8 format = extra_row_info[ EXTRA_ROW_INFO_FORMAT_OFFSET ];
-
- if (likely(format == ERIF_NDB))
- {
- if (likely(payload_length >= FLAGS_SIZE))
- {
- const uchar* data = &extra_row_info[ EXTRA_ROW_INFO_HDR_BYTES ];
- Uint8 nextPos = 0;
-
- /* Have flags at least */
- Uint16 netFlags;
- memcpy(&netFlags, &data[ nextPos ], FLAGS_SIZE);
- nextPos += FLAGS_SIZE;
- flags = uint2korr((const char*) &netFlags);
-
- if (flags & NDB_ERIF_TRANSID)
- {
- if (likely((nextPos + TRANSID_SIZE) <= payload_length))
- {
- /*
- Correct length, retrieve transaction id, converting from
- little endian if necessary.
- */
- Uint64 netTransId;
- memcpy(&netTransId,
- &data[ nextPos ],
- TRANSID_SIZE);
- nextPos += TRANSID_SIZE;
- transactionId = uint8korr((const char*) &netTransId);
- }
- else
- {
- /*
- Error - supposed to have transaction id, but
- buffer too short
- */
- return -1;
- }
- }
- }
- }
-
- /* We currently ignore other formats of extra binlog info, and
- * different lengths.
- */
-
- return 0;
-}
-
-uchar*
-Ndb_binlog_extra_row_info::generateBuffer()
-{
- /*
- Here we write out the buffer in network format,
- based on the current member settings.
- */
- Uint8 nextPos = EXTRA_ROW_INFO_HDR_BYTES;
-
- if (flags)
- {
- /* Write current flags into buff */
- Uint16 netFlags = uint2korr((const char*) &flags);
- memcpy(&buff[ nextPos ], &netFlags, FLAGS_SIZE);
- nextPos += FLAGS_SIZE;
-
- if (flags & NDB_ERIF_TRANSID)
- {
- Uint64 netTransactionId = uint8korr((const char*) &transactionId);
- memcpy(&buff[ nextPos ], &netTransactionId, TRANSID_SIZE);
- nextPos += TRANSID_SIZE;
- }
-
- assert( nextPos <= MaxLen );
- /* Set length */
- assert( buff[ EXTRA_ROW_INFO_FORMAT_OFFSET ] == ERIF_NDB );
- buff[ EXTRA_ROW_INFO_LEN_OFFSET ] = nextPos;
-
- return buff;
- }
- return 0;
-}
-
// #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
#endif
=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h 2011-10-27 12:19:57 +0000
+++ b/sql/ha_ndbcluster_binlog.h 2011-10-27 12:33:39 +0000
@@ -212,42 +212,6 @@ ndbcluster_show_status_binlog(THD* thd,
*/
int cmp_frm(const NDBTAB *ndbtab, const void *pack_data,
size_t pack_length);
-int ndbcluster_find_all_files(THD *thd);
-
-char *ndb_pack_varchar(const NDBCOL *col, char *buf,
- const char *str, int sz);
-
-NDB_SHARE *ndbcluster_get_share(const char *key,
- TABLE *table,
- bool create_if_not_exists,
- bool have_lock);
-NDB_SHARE *ndbcluster_get_share(NDB_SHARE *share);
-void ndbcluster_free_share(NDB_SHARE **share, bool have_lock);
-void ndbcluster_real_free_share(NDB_SHARE **share);
-int handle_trailing_share(THD *thd, NDB_SHARE *share);
-int ndbcluster_prepare_rename_share(NDB_SHARE *share, const char *new_key);
-int ndbcluster_rename_share(THD *thd, NDB_SHARE *share);
-int ndbcluster_undo_rename_share(THD *thd, NDB_SHARE *share);
-void ndbcluster_mark_share_dropped(NDB_SHARE*);
-inline NDB_SHARE *get_share(const char *key,
- TABLE *table,
- bool create_if_not_exists= TRUE,
- bool have_lock= FALSE)
-{
- return ndbcluster_get_share(key, table, create_if_not_exists, have_lock);
-}
-
-inline NDB_SHARE *get_share(NDB_SHARE *share)
-{
- return ndbcluster_get_share(share);
-}
-
-inline void free_share(NDB_SHARE **share, bool have_lock= FALSE)
-{
- ndbcluster_free_share(share, have_lock);
-}
-
-void set_binlog_flags(NDB_SHARE *share);
/*
Helper functions
@@ -257,60 +221,3 @@ ndbcluster_check_if_local_table(const ch
bool
ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname);
-bool ndbcluster_anyvalue_is_reserved(Uint32 anyValue);
-bool ndbcluster_anyvalue_is_nologging(Uint32 anyValue);
-void ndbcluster_anyvalue_set_nologging(Uint32& anyValue);
-bool ndbcluster_anyvalue_is_serverid_in_range(Uint32 serverId);
-void ndbcluster_anyvalue_set_normal(Uint32& anyValue);
-Uint32 ndbcluster_anyvalue_get_serverid(Uint32 anyValue);
-void ndbcluster_anyvalue_set_serverid(Uint32& anyValue, Uint32 serverId);
-
-#ifndef DBUG_OFF
-void dbug_ndbcluster_anyvalue_set_userbits(Uint32& anyValue);
-#endif
-
-/*
- Helper for reading/writing Binlog extra row info
- in Ndb format.
- It contains an internal buffer, which can be passed
- in the thd variable when writing binlog entries if
- the object stays in scope around the write.
-*/
-class Ndb_binlog_extra_row_info
-{
-public:
- static const Uint32 FLAGS_SIZE = sizeof(Uint16);
- static const Uint32 TRANSID_SIZE = sizeof(Uint64);
- static const Uint32 MaxLen =
- EXTRA_ROW_INFO_HDR_BYTES +
- FLAGS_SIZE +
- TRANSID_SIZE;
-
- static const Uint64 InvalidTransactionId = ~Uint64(0);
-
- enum Flags
- {
- NDB_ERIF_TRANSID = 0x1
- };
-
- Ndb_binlog_extra_row_info();
-
- int loadFromBuffer(const uchar* extra_row_info_ptr);
-
- Uint16 getFlags() const
- {
- return flags;
- }
- void setFlags(Uint16 _flags);
- Uint64 getTransactionId() const
- { return transactionId; };
- void setTransactionId(Uint64 _transactionId);
- uchar* getBuffPtr()
- { return buff; };
- uchar* generateBuffer();
-private:
- uchar buff[MaxLen];
- Uint16 flags;
- Uint64 transactionId;
-};
-
=== modified file 'sql/ha_ndbcluster_connection.cc'
--- a/sql/ha_ndbcluster_connection.cc 2011-10-27 12:19:57 +0000
+++ b/sql/ha_ndbcluster_connection.cc 2011-10-27 12:33:39 +0000
@@ -341,7 +341,8 @@ static ST_FIELD_INFO ndb_transid_mysql_c
static
int
-ndb_transid_mysql_connection_map_fill_table(THD* thd, TABLE_LIST* tables, Item* cond)
+ndb_transid_mysql_connection_map_fill_table(THD* thd, TABLE_LIST* tables,
+ Item*)
{
DBUG_ENTER("ndb_transid_mysql_connection_map_fill_table");
=== added file 'sql/ndb_anyvalue.cc'
--- a/sql/ndb_anyvalue.cc 1970-01-01 00:00:00 +0000
+++ b/sql/ndb_anyvalue.cc 2011-10-27 07:42:25 +0000
@@ -0,0 +1,105 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#include "ndb_anyvalue.h"
+
+/*
+ AnyValue carries ServerId or Reserved codes
+ Bits from opt_server_id_bits to 30 may carry other data
+ so we ignore them when reading/setting AnyValue.
+
+ 332 21 10 0
+ 10987654321098765432109876543210
+ roooooooooooooooooooooooosssssss
+
+ r = Reserved bit indicates whether
+ bits 0-7+ have ServerId (0) or
+ some special reserved code (1).
+ o = Optional bits, depending on value
+ of server-id-bits will be
+ serverid bits or user-specific
+ data
+ s = Serverid bits or reserved codes
+ At least 7 bits will be available
+ for serverid or reserved codes
+
+*/
+
+#include <my_global.h>
+
+extern ulong opt_server_id_mask;
+
+#define NDB_ANYVALUE_RESERVED_BIT 0x80000000
+#define NDB_ANYVALUE_RESERVED_MASK 0x8000007f
+
+#define NDB_ANYVALUE_NOLOGGING_CODE 0x8000007f
+
+#ifndef DBUG_OFF
+void dbug_ndbcluster_anyvalue_set_userbits(Uint32& anyValue)
+{
+ /*
+ Set userData part of AnyValue (if there is one) to
+ all 1s to test that it is ignored
+ */
+ const Uint32 userDataMask = ~(opt_server_id_mask |
+ NDB_ANYVALUE_RESERVED_BIT);
+
+ anyValue |= userDataMask;
+}
+#endif
+
+bool ndbcluster_anyvalue_is_reserved(Uint32 anyValue)
+{
+ return ((anyValue & NDB_ANYVALUE_RESERVED_BIT) != 0);
+}
+
+bool ndbcluster_anyvalue_is_nologging(Uint32 anyValue)
+{
+ return ((anyValue & NDB_ANYVALUE_RESERVED_MASK) ==
+ NDB_ANYVALUE_NOLOGGING_CODE);
+}
+
+void ndbcluster_anyvalue_set_nologging(Uint32& anyValue)
+{
+ anyValue |= NDB_ANYVALUE_NOLOGGING_CODE;
+}
+
+void ndbcluster_anyvalue_set_normal(Uint32& anyValue)
+{
+ /* Clear reserved bit and serverid bits */
+ anyValue &= ~(NDB_ANYVALUE_RESERVED_BIT);
+ anyValue &= ~(opt_server_id_mask);
+}
+
+bool ndbcluster_anyvalue_is_serverid_in_range(Uint32 serverId)
+{
+ return ((serverId & ~opt_server_id_mask) == 0);
+}
+
+Uint32 ndbcluster_anyvalue_get_serverid(Uint32 anyValue)
+{
+ assert(! (anyValue & NDB_ANYVALUE_RESERVED_BIT) );
+
+ return (anyValue & opt_server_id_mask);
+}
+
+void ndbcluster_anyvalue_set_serverid(Uint32& anyValue, Uint32 serverId)
+{
+ assert(! (anyValue & NDB_ANYVALUE_RESERVED_BIT) );
+ anyValue &= ~(opt_server_id_mask);
+ anyValue |= (serverId & opt_server_id_mask);
+}
=== added file 'sql/ndb_anyvalue.h'
--- a/sql/ndb_anyvalue.h 1970-01-01 00:00:00 +0000
+++ b/sql/ndb_anyvalue.h 2011-10-27 07:42:25 +0000
@@ -0,0 +1,35 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifndef NDB_ANYVALUE_H
+#define NDB_ANYVALUE_H
+
+#include <ndb_types.h>
+
+bool ndbcluster_anyvalue_is_reserved(Uint32 anyValue);
+bool ndbcluster_anyvalue_is_nologging(Uint32 anyValue);
+void ndbcluster_anyvalue_set_nologging(Uint32& anyValue);
+bool ndbcluster_anyvalue_is_serverid_in_range(Uint32 serverId);
+void ndbcluster_anyvalue_set_normal(Uint32& anyValue);
+Uint32 ndbcluster_anyvalue_get_serverid(Uint32 anyValue);
+void ndbcluster_anyvalue_set_serverid(Uint32& anyValue, Uint32 serverId);
+
+#ifndef DBUG_OFF
+void dbug_ndbcluster_anyvalue_set_userbits(Uint32& anyValue);
+#endif
+
+#endif
=== added file 'sql/ndb_binlog_extra_row_info.cc'
--- a/sql/ndb_binlog_extra_row_info.cc 1970-01-01 00:00:00 +0000
+++ b/sql/ndb_binlog_extra_row_info.cc 2011-10-27 09:34:06 +0000
@@ -0,0 +1,136 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#include "ndb_binlog_extra_row_info.h"
+#include <string.h> // memcpy
+
+
+Ndb_binlog_extra_row_info::
+Ndb_binlog_extra_row_info()
+{
+ flags = 0;
+ transactionId = InvalidTransactionId;
+ /* Prepare buffer with extra row info buffer bytes */
+ buff[ EXTRA_ROW_INFO_LEN_OFFSET ] = 0;
+ buff[ EXTRA_ROW_INFO_FORMAT_OFFSET ] = ERIF_NDB;
+}
+
+void
+Ndb_binlog_extra_row_info::
+setFlags(Uint16 _flags)
+{
+ flags = _flags;
+}
+
+void
+Ndb_binlog_extra_row_info::
+setTransactionId(Uint64 _transactionId)
+{
+ assert(_transactionId != InvalidTransactionId);
+ transactionId = _transactionId;
+};
+
+int
+Ndb_binlog_extra_row_info::
+loadFromBuffer(const uchar* extra_row_info)
+{
+ assert(extra_row_info);
+
+ Uint8 length = extra_row_info[ EXTRA_ROW_INFO_LEN_OFFSET ];
+ assert(length >= EXTRA_ROW_INFO_HDR_BYTES);
+ Uint8 payload_length = length - EXTRA_ROW_INFO_HDR_BYTES;
+ Uint8 format = extra_row_info[ EXTRA_ROW_INFO_FORMAT_OFFSET ];
+
+ if (likely(format == ERIF_NDB))
+ {
+ if (likely(payload_length >= FLAGS_SIZE))
+ {
+ const uchar* data = &extra_row_info[ EXTRA_ROW_INFO_HDR_BYTES ];
+ Uint8 nextPos = 0;
+
+ /* Have flags at least */
+ Uint16 netFlags;
+ memcpy(&netFlags, &data[ nextPos ], FLAGS_SIZE);
+ nextPos += FLAGS_SIZE;
+ flags = uint2korr((const char*) &netFlags);
+
+ if (flags & NDB_ERIF_TRANSID)
+ {
+ if (likely((nextPos + TRANSID_SIZE) <= payload_length))
+ {
+ /*
+ Correct length, retrieve transaction id, converting from
+ little endian if necessary.
+ */
+ Uint64 netTransId;
+ memcpy(&netTransId,
+ &data[ nextPos ],
+ TRANSID_SIZE);
+ nextPos += TRANSID_SIZE;
+ transactionId = uint8korr((const char*) &netTransId);
+ }
+ else
+ {
+ /*
+ Error - supposed to have transaction id, but
+ buffer too short
+ */
+ return -1;
+ }
+ }
+ }
+ }
+
+ /* We currently ignore other formats of extra binlog info, and
+ * different lengths.
+ */
+
+ return 0;
+}
+
+uchar*
+Ndb_binlog_extra_row_info::generateBuffer()
+{
+ /*
+ Here we write out the buffer in network format,
+ based on the current member settings.
+ */
+ Uint8 nextPos = EXTRA_ROW_INFO_HDR_BYTES;
+
+ if (flags)
+ {
+ /* Write current flags into buff */
+ Uint16 netFlags = uint2korr((const char*) &flags);
+ memcpy(&buff[ nextPos ], &netFlags, FLAGS_SIZE);
+ nextPos += FLAGS_SIZE;
+
+ if (flags & NDB_ERIF_TRANSID)
+ {
+ Uint64 netTransactionId = uint8korr((const char*) &transactionId);
+ memcpy(&buff[ nextPos ], &netTransactionId, TRANSID_SIZE);
+ nextPos += TRANSID_SIZE;
+ }
+
+ assert( nextPos <= MaxLen );
+ /* Set length */
+ assert( buff[ EXTRA_ROW_INFO_FORMAT_OFFSET ] == ERIF_NDB );
+ buff[ EXTRA_ROW_INFO_LEN_OFFSET ] = nextPos;
+
+ return buff;
+ }
+ return 0;
+}
=== added file 'sql/ndb_binlog_extra_row_info.h'
--- a/sql/ndb_binlog_extra_row_info.h 1970-01-01 00:00:00 +0000
+++ b/sql/ndb_binlog_extra_row_info.h 2011-10-27 09:34:06 +0000
@@ -0,0 +1,71 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifndef NDB_BINLOG_EXTRA_ROW_INFO_H
+#define NDB_BINLOG_EXTRA_ROW_INFO_H
+
+#include <my_global.h>
+#include <ndb_types.h>
+#include <rpl_constants.h>
+
+/*
+ Helper for reading/writing Binlog extra row info
+ in Ndb format.
+ It contains an internal buffer, which can be passed
+ in the thd variable when writing binlog entries if
+ the object stays in scope around the write.
+*/
+class Ndb_binlog_extra_row_info
+{
+public:
+ static const Uint32 FLAGS_SIZE = sizeof(Uint16);
+ static const Uint32 TRANSID_SIZE = sizeof(Uint64);
+ static const Uint32 MaxLen =
+ EXTRA_ROW_INFO_HDR_BYTES +
+ FLAGS_SIZE +
+ TRANSID_SIZE;
+
+ static const Uint64 InvalidTransactionId = ~Uint64(0);
+
+ enum Flags
+ {
+ NDB_ERIF_TRANSID = 0x1
+ };
+
+ Ndb_binlog_extra_row_info();
+
+ int loadFromBuffer(const uchar* extra_row_info_ptr);
+
+ Uint16 getFlags() const
+ {
+ return flags;
+ }
+ void setFlags(Uint16 _flags);
+ Uint64 getTransactionId() const
+ { return transactionId; };
+ void setTransactionId(Uint64 _transactionId);
+ uchar* getBuffPtr()
+ { return buff; };
+ uchar* generateBuffer();
+private:
+ uchar buff[MaxLen];
+ Uint16 flags;
+ Uint64 transactionId;
+};
+
+
+#endif
=== added file 'sql/ndb_ndbapi_util.cc'
--- a/sql/ndb_ndbapi_util.cc 1970-01-01 00:00:00 +0000
+++ b/sql/ndb_ndbapi_util.cc 2011-10-27 08:27:44 +0000
@@ -0,0 +1,131 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#include "ndb_ndbapi_util.h"
+
+#include <string.h> // memcpy
+
+/*
+ helper function to pack a ndb varchar
+*/
+char *ndb_pack_varchar(const NdbDictionary::Column *col, char *buf,
+ const char *str, int sz)
+{
+ switch (col->getArrayType())
+ {
+ case NdbDictionary::Column::ArrayTypeFixed:
+ memcpy(buf, str, sz);
+ break;
+ case NdbDictionary::Column::ArrayTypeShortVar:
+ *(uchar*)buf= (uchar)sz;
+ memcpy(buf + 1, str, sz);
+ break;
+ case NdbDictionary::Column::ArrayTypeMediumVar:
+ int2store(buf, sz);
+ memcpy(buf + 2, str, sz);
+ break;
+ }
+ return buf;
+}
+
+
+#ifndef MYSQL_SERVER
+#define MYSQL_SERVER
+#endif
+
+#include <sql_class.h> // TABLE, Field etc.
+
+/*
+ This routine is shared by injector. There is no common blobs buffer
+ so the buffer and length are passed by reference. Injector also
+ passes a record pointer diff.
+ */
+int get_ndb_blobs_value(TABLE* table, NdbValue* value_array,
+ uchar*& buffer, uint& buffer_size,
+ my_ptrdiff_t ptrdiff)
+{
+ DBUG_ENTER("get_ndb_blobs_value");
+
+ // Field has no field number so cannot use TABLE blob_field
+ // Loop twice, first only counting total buffer size
+ for (int loop= 0; loop <= 1; loop++)
+ {
+ uint32 offset= 0;
+ for (uint i= 0; i < table->s->fields; i++)
+ {
+ Field *field= table->field[i];
+ NdbValue value= value_array[i];
+ if (! (field->flags & BLOB_FLAG))
+ continue;
+ if (value.blob == NULL)
+ {
+ DBUG_PRINT("info",("[%u] skipped", i));
+ continue;
+ }
+ Field_blob *field_blob= (Field_blob *)field;
+ NdbBlob *ndb_blob= value.blob;
+ int isNull;
+ if (ndb_blob->getNull(isNull) != 0)
+ DBUG_RETURN(-1);
+ if (isNull == 0) {
+ Uint64 len64= 0;
+ if (ndb_blob->getLength(len64) != 0)
+ DBUG_RETURN(-1);
+ // Align to Uint64
+ uint32 size= Uint32(len64);
+ if (size % 8 != 0)
+ size+= 8 - size % 8;
+ if (loop == 1)
+ {
+ uchar *buf= buffer + offset;
+ uint32 len= 0xffffffff; // Max uint32
+ if (ndb_blob->readData(buf, len) != 0)
+ DBUG_RETURN(-1);
+ DBUG_PRINT("info", ("[%u] offset: %u buf: 0x%lx len=%u [ptrdiff=%d]",
+ i, offset, (long) buf, len, (int)ptrdiff));
+ DBUG_ASSERT(len == len64);
+ // Ugly hack assumes only ptr needs to be changed
+ field_blob->set_ptr_offset(ptrdiff, len, buf);
+ }
+ offset+= size;
+ }
+ else if (loop == 1) // undefined or null
+ {
+ // have to set length even in this case
+ uchar *buf= buffer + offset; // or maybe NULL
+ uint32 len= 0;
+ field_blob->set_ptr_offset(ptrdiff, len, buf);
+ DBUG_PRINT("info", ("[%u] isNull=%d", i, isNull));
+ }
+ }
+ if (loop == 0 && offset > buffer_size)
+ {
+ my_free(buffer);
+ buffer_size= 0;
+ DBUG_PRINT("info", ("allocate blobs buffer size %u", offset));
+ buffer= (uchar*) my_malloc(offset, MYF(MY_WME));
+ if (buffer == NULL)
+ {
+ sql_print_error("ha_ndbcluster::get_ndb_blobs_value: "
+ "my_malloc(%u) failed", offset);
+ DBUG_RETURN(-1);
+ }
+ buffer_size= offset;
+ }
+ }
+ DBUG_RETURN(0);
+}
=== added file 'sql/ndb_ndbapi_util.h'
--- a/sql/ndb_ndbapi_util.h 1970-01-01 00:00:00 +0000
+++ b/sql/ndb_ndbapi_util.h 2011-10-27 08:27:44 +0000
@@ -0,0 +1,41 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifndef NDB_NDBAPI_UTIL_H
+#define NDB_NDBAPI_UTIL_H
+
+#include <my_global.h>
+
+#include <ndbapi/NdbRecAttr.hpp>
+#include <ndbapi/NdbBlob.hpp>
+#include <ndbapi/NdbDictionary.hpp>
+
+union NdbValue
+{
+ const NdbRecAttr *rec;
+ NdbBlob *blob;
+ void *ptr;
+};
+
+int get_ndb_blobs_value(struct TABLE* table, NdbValue* value_array,
+ uchar*& buffer, uint& buffer_size,
+ my_ptrdiff_t ptrdiff);
+
+char *ndb_pack_varchar(const NdbDictionary::Column *col,
+ char *buf, const char *str, int sz);
+
+#endif
=== modified file 'sql/ndb_share.h'
--- a/sql/ndb_share.h 2011-09-09 09:30:43 +0000
+++ b/sql/ndb_share.h 2011-10-27 09:10:08 +0000
@@ -212,4 +212,80 @@ set_ndb_share_state(NDB_SHARE *share, ND
pthread_mutex_unlock(&share->mutex);
}
+
+/* NDB_SHARE.flags */
+#define NSF_HIDDEN_PK 1u /* table has hidden primary key */
+#define NSF_BLOB_FLAG 2u /* table has blob attributes */
+#define NSF_NO_BINLOG 4u /* table should not be binlogged */
+#define NSF_BINLOG_FULL 8u /* table should be binlogged with full rows */
+#define NSF_BINLOG_USE_UPDATE 16u /* table update should be binlogged using
+ update log event */
+inline void set_binlog_logging(NDB_SHARE *share)
+{
+ DBUG_PRINT("info", ("set_binlog_logging"));
+ share->flags&= ~NSF_NO_BINLOG;
+}
+inline void set_binlog_nologging(NDB_SHARE *share)
+{
+ DBUG_PRINT("info", ("set_binlog_nologging"));
+ share->flags|= NSF_NO_BINLOG;
+}
+inline my_bool get_binlog_nologging(NDB_SHARE *share)
+{ return (share->flags & NSF_NO_BINLOG) != 0; }
+inline void set_binlog_updated_only(NDB_SHARE *share)
+{
+ DBUG_PRINT("info", ("set_binlog_updated_only"));
+ share->flags&= ~NSF_BINLOG_FULL;
+}
+inline void set_binlog_full(NDB_SHARE *share)
+{
+ DBUG_PRINT("info", ("set_binlog_full"));
+ share->flags|= NSF_BINLOG_FULL;
+}
+inline my_bool get_binlog_full(NDB_SHARE *share)
+{ return (share->flags & NSF_BINLOG_FULL) != 0; }
+inline void set_binlog_use_write(NDB_SHARE *share)
+{
+ DBUG_PRINT("info", ("set_binlog_use_write"));
+ share->flags&= ~NSF_BINLOG_USE_UPDATE;
+}
+inline void set_binlog_use_update(NDB_SHARE *share)
+{
+ DBUG_PRINT("info", ("set_binlog_use_update"));
+ share->flags|= NSF_BINLOG_USE_UPDATE;
+}
+inline my_bool get_binlog_use_update(NDB_SHARE *share)
+{ return (share->flags & NSF_BINLOG_USE_UPDATE) != 0; }
+
+
+NDB_SHARE *ndbcluster_get_share(const char *key,
+ struct TABLE *table,
+ bool create_if_not_exists,
+ bool have_lock);
+NDB_SHARE *ndbcluster_get_share(NDB_SHARE *share);
+void ndbcluster_free_share(NDB_SHARE **share, bool have_lock);
+void ndbcluster_real_free_share(NDB_SHARE **share);
+int handle_trailing_share(THD *thd, NDB_SHARE *share);
+int ndbcluster_prepare_rename_share(NDB_SHARE *share, const char *new_key);
+int ndbcluster_rename_share(THD *thd, NDB_SHARE *share);
+int ndbcluster_undo_rename_share(THD *thd, NDB_SHARE *share);
+void ndbcluster_mark_share_dropped(NDB_SHARE*);
+inline NDB_SHARE *get_share(const char *key,
+ struct TABLE *table,
+ bool create_if_not_exists= TRUE,
+ bool have_lock= FALSE)
+{
+ return ndbcluster_get_share(key, table, create_if_not_exists, have_lock);
+}
+
+inline NDB_SHARE *get_share(NDB_SHARE *share)
+{
+ return ndbcluster_get_share(share);
+}
+
+inline void free_share(NDB_SHARE **share, bool have_lock= FALSE)
+{
+ ndbcluster_free_share(share, have_lock);
+}
+
#endif
=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt 2011-10-27 12:19:57 +0000
+++ b/storage/ndb/CMakeLists.txt 2011-10-27 12:33:39 +0000
@@ -80,6 +80,9 @@ SET(NDBCLUSTER_SOURCES
../../sql/ndb_global_schema_lock.cc
../../sql/ndb_mi.cc
../../sql/ndb_conflict_trans.cc
+ ../../sql/ndb_anyvalue.cc
+ ../../sql/ndb_ndbapi_util.cc
+ ../../sql/ndb_binlog_extra_row_info.cc
)
# Include directories used when building ha_ndbcluster
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-trunk-cluster branch (magnus.blaudd:3402 to 3403) | magnus.blaudd | 28 Oct |