#At file:///home/tomas/mysql_src/mysql-5.1-telco-6.4/
3137 Tomas Ulin 2008-12-05 [merge]
merge
added:
storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp
storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp
modified:
mysql-test/suite/ndb/t/ndb_dd_dump.test
sql/ha_ndbcluster.cc
sql/table.cc
storage/ndb/include/kernel/NodeInfo.hpp
storage/ndb/include/kernel/ndb_limits.h
storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp
storage/ndb/include/kernel/signaldata/FsOpenReq.hpp
storage/ndb/include/mgmapi/mgmapi_config_parameters.h
storage/ndb/include/ndb_version.h.in
storage/ndb/src/kernel/blocks/ERROR_codes.txt
storage/ndb/src/kernel/blocks/LocalProxy.cpp
storage/ndb/src/kernel/blocks/LocalProxy.hpp
storage/ndb/src/kernel/blocks/Makefile.am
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp
storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
storage/ndb/src/kernel/blocks/lgman.cpp
storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp
storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp
storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp
storage/ndb/src/kernel/blocks/ndbfs/CMakeLists.txt
storage/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp
storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp
storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp
storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp
storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp
storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp
storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp
storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
storage/ndb/src/kernel/blocks/record_types.hpp
storage/ndb/src/kernel/blocks/tsman.cpp
storage/ndb/src/kernel/main.cpp
storage/ndb/src/kernel/vm/SafeMutex.cpp
storage/ndb/src/kernel/vm/SafeMutex.hpp
storage/ndb/src/kernel/vm/SimulatedBlock.hpp
storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp
storage/ndb/src/mgmapi/LocalConfig.cpp
storage/ndb/src/mgmapi/mgmapi.cpp
storage/ndb/src/mgmclient/CommandInterpreter.cpp
storage/ndb/src/mgmsrv/ConfigInfo.cpp
storage/ndb/test/ndbapi/testDict.cpp
storage/ndb/test/ndbapi/testNodeRestart.cpp
storage/ndb/test/run-test/command.cpp
storage/ndb/test/run-test/daily-basic-tests.txt
storage/ndb/test/run-test/daily-devel-tests.txt
storage/ndb/test/run-test/setup.cpp
=== modified file 'mysql-test/suite/ndb/t/ndb_dd_dump.test'
--- a/mysql-test/suite/ndb/t/ndb_dd_dump.test 2007-11-29 10:29:35 +0000
+++ b/mysql-test/suite/ndb/t/ndb_dd_dump.test 2008-12-03 19:48:24 +0000
@@ -260,6 +260,7 @@ CREATE TABLE test.t (
SELECT count(*) FROM test.t;
LOAD DATA INFILE 't_backup' INTO TABLE test.t;
+ --remove_file $MYSQLTEST_VARDIR/master-data/test/t_backup
SELECT * FROM test.t order by a;
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc 2008-12-05 08:33:43 +0000
+++ b/sql/ha_ndbcluster.cc 2008-12-05 09:09:17 +0000
@@ -4148,7 +4148,7 @@ int ha_ndbcluster::ndb_delete_row(const
/*
Poor approx. let delete ~ tabsize / 4
*/
- uint delete_size= 12 + m_bytes_per_write >> 2;
+ uint delete_size= 12 + (m_bytes_per_write >> 2);
bool need_flush= add_row_check_if_batch_full_size(thd_ndb, delete_size);
if ( allow_batch &&
table_share->primary_key != MAX_KEY &&
=== modified file 'sql/table.cc'
--- a/sql/table.cc 2008-11-21 21:06:44 +0000
+++ b/sql/table.cc 2008-12-02 15:07:57 +0000
@@ -4372,13 +4372,19 @@ void st_table::clear_column_bitmaps()
void st_table::prepare_for_position()
{
DBUG_ENTER("st_table::prepare_for_position");
-
- if ((file->ha_table_flags() & HA_PRIMARY_KEY_IN_READ_INDEX) &&
- s->primary_key < MAX_KEY)
+
+ if (s->primary_key < MAX_KEY)
{
- mark_columns_used_by_index_no_reset(s->primary_key, read_set);
- /* signal change */
- file->column_bitmaps_signal(HA_CHANGE_TABLE_READ_BITMAP);
+ if (file->ha_table_flags() & HA_PRIMARY_KEY_IN_READ_INDEX)
+ {
+ mark_columns_used_by_index_no_reset(s->primary_key, read_set);
+ }
+ if ((file->ha_table_flags() & HA_PRIMARY_KEY_IN_READ_INDEX) ||
+ (file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION))
+ {
+ /* signal change */
+ file->column_bitmaps_signal(HA_COMPLETE_TABLE_READ_BITMAP);
+ }
}
DBUG_VOID_RETURN;
}
=== modified file 'storage/ndb/include/kernel/NodeInfo.hpp'
--- a/storage/ndb/include/kernel/NodeInfo.hpp 2008-11-04 08:43:06 +0000
+++ b/storage/ndb/include/kernel/NodeInfo.hpp 2008-12-01 18:04:19 +0000
@@ -37,6 +37,7 @@ public:
Uint32 m_version; ///< Ndb version
Uint32 m_mysql_version; ///< MySQL version
+ Uint32 m_lqh_workers; ///< LQH workers
Uint32 m_type; ///< Node type
Uint32 m_connectCount; ///< No of times connected
bool m_connected; ///< Node is connected
@@ -50,6 +51,7 @@ inline
NodeInfo::NodeInfo(){
m_version = 0;
m_mysql_version = 0;
+ m_lqh_workers = 0;
m_type = INVALID;
m_connectCount = 0;
m_heartbeat_cnt= 0;
=== modified file 'storage/ndb/include/kernel/ndb_limits.h'
--- a/storage/ndb/include/kernel/ndb_limits.h 2008-11-13 15:22:59 +0000
+++ b/storage/ndb/include/kernel/ndb_limits.h 2008-12-02 13:10:49 +0000
@@ -187,4 +187,6 @@
#define MAX_NDBMT_LQH_WORKERS 4
#define MAX_NDBMT_LQH_THREADS 4
+#define NDB_FILE_BUFFER_SIZE (256*1024)
+
#endif
=== modified file 'storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp'
--- a/storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp 2007-01-06 00:21:39 +0000
+++ b/storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp 2008-12-01 18:04:19 +0000
@@ -161,7 +161,7 @@ class CmNodeInfoReq {
friend class Qmgr;
public:
- STATIC_CONST( SignalLength = 4 );
+ STATIC_CONST( SignalLength = 5 );
private:
/**
@@ -171,6 +171,7 @@ private:
Uint32 dynamicId;
Uint32 version;
Uint32 mysql_version;
+ Uint32 lqh_workers; // added in telco-6.4
};
class CmNodeInfoRef {
@@ -198,20 +199,14 @@ class CmNodeInfoConf {
friend class Qmgr;
public:
- STATIC_CONST( SignalLength = 4 );
+ STATIC_CONST( SignalLength = 5 );
private:
Uint32 nodeId;
Uint32 dynamicId;
Uint32 version;
Uint32 mysql_version;
+ Uint32 lqh_workers; // added in telco-6.4
};
#endif
-
-
-
-
-
-
-
=== modified file 'storage/ndb/include/kernel/signaldata/FsOpenReq.hpp'
--- a/storage/ndb/include/kernel/signaldata/FsOpenReq.hpp 2008-08-21 06:38:48 +0000
+++ b/storage/ndb/include/kernel/signaldata/FsOpenReq.hpp 2008-12-02 13:10:49 +0000
@@ -33,6 +33,7 @@ class FsOpenReq {
friend class Win32AsyncFile; // FIXME
friend class Filename;
friend class VoidFs;
+ friend class AsyncIoThread;
/**
* Sender(s)
@@ -90,6 +91,8 @@ private:
STATIC_CONST( OM_CHECK_SIZE = 0x2000 );
STATIC_CONST( OM_DIRECT = 0x4000 );
STATIC_CONST( OM_GZ = 0x8000 );
+ STATIC_CONST( OM_THREAD_POOL = 0x10000 );
+ STATIC_CONST( OM_WRITE_BUFFER = 0x20000 );
enum Suffixes {
S_DATA = 0,
=== modified file 'storage/ndb/include/mgmapi/mgmapi_config_parameters.h'
--- a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h 2008-11-19 11:01:17 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h 2008-12-02 13:10:49 +0000
@@ -147,6 +147,7 @@
#define CFG_NDBMT_LQH_WORKERS 188
#define CFG_DB_INIT_REDO 189
+#define CFG_DB_THREAD_POOL 190
#define CFG_DB_SGA 198 /* super pool mem */
#define CFG_DB_DATA_MEM_2 199 /* used in special build in 5.1 */
=== modified file 'storage/ndb/include/ndb_version.h.in'
--- a/storage/ndb/include/ndb_version.h.in 2008-11-14 09:12:01 +0000
+++ b/storage/ndb/include/ndb_version.h.in 2008-12-01 18:04:19 +0000
@@ -102,6 +102,7 @@ Uint32 ndbGetOwnVersion();
#define NDBD_LONG_LQHKEYREQ MAKE_VERSION(6,4,0)
#define NDBD_MAX_RECVBYTESIZE_32K MAKE_VERSION(6,3,18)
#define NDBD_LONG_SCANFRAGREQ MAKE_VERSION(6,4,0)
+#define NDBD_MT_LQH_VERSION MAKE_VERSION(6,4,0)
static
=== modified file 'storage/ndb/src/kernel/blocks/ERROR_codes.txt'
--- a/storage/ndb/src/kernel/blocks/ERROR_codes.txt 2008-11-13 14:16:21 +0000
+++ b/storage/ndb/src/kernel/blocks/ERROR_codes.txt 2008-12-03 19:49:40 +0000
@@ -5,7 +5,7 @@ Next DBACC 3002
Next DBTUP 4029
Next DBLQH 5051
Next DBDICT 6013
-Next DBDIH 7215
+Next DBDIH 7216
Next DBTC 8073
Next CMVMI 9000
Next BACKUP 10041
@@ -184,7 +184,9 @@ And crash when all have "not" been sent
7213: in GCP_COMMIT Kill specified node and self, stop processing
7214: in GCP_TCFINISHED kill specified node
-
+
+7215: set c_fragments_per_node = 1 (needs to be done at startup)
+
ERROR CODES FOR TESTING NODE FAILURE, FAILURE IN COPY FRAGMENT PROCESS:
-----------------------------------------------------------------------
=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.cpp 2008-11-16 12:58:27 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.cpp 2008-12-03 19:49:40 +0000
@@ -241,6 +241,12 @@ LocalProxy::setMask(SsParallel& ss)
ss.m_workerMask.set(i);
}
+void
+LocalProxy::setMask(SsParallel& ss, const WorkerMask& mask)
+{
+ ss.m_workerMask.assign(mask);
+}
+
// load workers (before first signal)
void
=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.hpp 2008-11-16 12:58:27 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.hpp 2008-12-03 19:49:40 +0000
@@ -167,8 +167,9 @@ protected:
bool firstReply(const SsParallel& ss);
bool lastReply(const SsParallel& ss);
bool lastExtra(Signal* signal, SsParallel& ss);
- // set all bits in worker mask
+ // set all or given bits in worker mask
void setMask(SsParallel& ss);
+ void setMask(SsParallel& ss, const WorkerMask& mask);
/*
* Ss instances are seized from a pool. Each pool is simply an array
=== modified file 'storage/ndb/src/kernel/blocks/Makefile.am'
--- a/storage/ndb/src/kernel/blocks/Makefile.am 2008-11-16 15:29:16 +0000
+++ b/storage/ndb/src/kernel/blocks/Makefile.am 2008-12-02 13:10:49 +0000
@@ -41,6 +41,7 @@ libblocks_a_SOURCES = tsman.cpp lgman.cp
dbtup/DbtupDebug.cpp dbtup/DbtupScan.cpp \
dbtup/DbtupDiskAlloc.cpp dbtup/DbtupVarAlloc.cpp \
dbtup/tuppage.cpp dbtup/Undo_buffer.cpp \
+ ndbfs/AsyncIoThread.cpp \
ndbfs/PosixAsyncFile.cpp ndbfs/AsyncFile.cpp \
ndbfs/Ndbfs.cpp \
ndbfs/VoidFs.cpp \
=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2008-11-21 11:04:36 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2008-12-02 14:25:58 +0000
@@ -9771,18 +9771,8 @@ flush:
ListTablesConf * const conf = (ListTablesConf*)signal->getDataPtrSend();
conf->senderData = senderData;
conf->noOfTables = count;
- if (handle.m_cnt)
- {
- jam();
- sendSignal(rg, GSN_LIST_TABLES_CONF, signal,
- sigLen, JBB, &handle);
- }
- else
- {
- jam();
- sendSignal(rg, GSN_LIST_TABLES_CONF, signal,
- sigLen, JBB);
- }
+ sendSignal(rg, GSN_LIST_TABLES_CONF, signal,
+ sigLen, JBB, &handle);
signal->header.m_noOfSections = 0;
signal->header.m_fragmentInfo = 0;
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2008-10-09 10:11:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2008-11-28 13:16:25 +0000
@@ -1246,7 +1246,8 @@ private:
Uint32 c_nextNodeGroup;
NodeGroupRecord *nodeGroupRecord;
-
+ RSS_OP_SNAPSHOT(cnghash);
+
NodeRecord *nodeRecord;
PageRecord *pageRecord;
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2008-11-26 10:37:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2008-12-03 19:49:40 +0000
@@ -1228,6 +1228,10 @@ void Dbdih::execREAD_CONFIG_REQ(Signal*
{
jam();
c_fragments_per_node = getLqhWorkers();
+ // try to get some LQH workers which initially handle no fragments
+ if (ERROR_INSERTED(7215)) {
+ c_fragments_per_node = 1;
+ }
}
ndbout_c("Using %u fragments per node", c_fragments_per_node);
@@ -7089,7 +7093,7 @@ void Dbdih::execCREATE_FRAGMENTATION_REQ
}
}
}
-
+
if (flags & CreateFragmentationReq::RI_GET_FRAGMENTATION)
{
jam();
@@ -7864,6 +7868,8 @@ void Dbdih::execALTER_TAB_REQ(Signal * s
if ((err = add_fragments_to_table(tabPtr, buf)))
{
jam();
+ ndbrequire(tabPtr.p->totalfragments == save);
+ ndbrequire(connectPtr.p->m_alter.m_org_totalfragments == save);
send_alter_tab_ref(signal, connectPtr, err);
return;
}
@@ -8057,8 +8063,7 @@ Dbdih::release_fragment_from_table(Ptr<T
getFragstore(tabPtr.p, fragId, fragPtr);
dec_ng_refcount(getNodeGroup(fragPtr.p->preferredPrimary));
- Uint32 allocated = chunks << LOG_NO_OF_FRAGS_PER_CHUNK;
- if (fragId < allocated)
+ if (fragId == ((chunks - 1) << LOG_NO_OF_FRAGS_PER_CHUNK))
{
jam();
@@ -8067,6 +8072,7 @@ Dbdih::release_fragment_from_table(Ptr<T
fragPtr.p->nextFragmentChunk = cfirstfragstore;
cfirstfragstore = fragPtr.i;
cremainingfrags += NO_OF_FRAGS_PER_CHUNK;
+ tabPtr.p->noOfFragChunks = chunks - 1;
}
tabPtr.p->totalfragments--;
@@ -8111,12 +8117,16 @@ Dbdih::drop_fragments(Signal* signal, Pt
Ptr<TabRecord> tabPtr;
tabPtr.i = connectPtr.p->table;
ptrAss(tabPtr, tabRecord);
- for (Uint32 i = connectPtr.p->m_alter.m_totalfragments - 1;
- i >= connectPtr.p->m_alter.m_org_totalfragments; i--)
+
+ Uint32 new_frags = connectPtr.p->m_alter.m_totalfragments;
+ Uint32 org_frags = connectPtr.p->m_alter.m_org_totalfragments;
+ tabPtr.p->totalfragments = new_frags;
+ for (Uint32 i = new_frags - 1; i >= org_frags; i--)
{
jam();
release_fragment_from_table(tabPtr, i);
}
+ connectPtr.p->m_alter.m_totalfragments = org_frags;
switch(connectPtr.p->connectState){
case ConnectRecord::ALTER_TABLE_ABORT:
@@ -16005,12 +16015,36 @@ Dbdih::execDUMP_STATE_ORD(Signal* signal
if (arg == DumpStateOrd::SchemaResourceSnapshot)
{
RSS_OP_SNAPSHOT_SAVE(cremainingfrags);
+
+ {
+ Uint32 cnghash = 0;
+ NodeGroupRecordPtr NGPtr;
+ for (Uint32 i = 0; i<cnoOfNodeGroups; i++)
+ {
+ NGPtr.i = c_node_groups[i];
+ ptrCheckGuard(NGPtr, MAX_NDB_NODES, nodeGroupRecord);
+ cnghash = (cnghash * 33) + NGPtr.p->m_ref_count;
+ }
+ RSS_OP_SNAPSHOT_SAVE(cnghash);
+ }
return;
}
if (arg == DumpStateOrd::SchemaResourceCheckLeak)
{
RSS_OP_SNAPSHOT_CHECK(cremainingfrags);
+
+ {
+ Uint32 cnghash = 0;
+ NodeGroupRecordPtr NGPtr;
+ for (Uint32 i = 0; i<cnoOfNodeGroups; i++)
+ {
+ NGPtr.i = c_node_groups[i];
+ ptrCheckGuard(NGPtr, MAX_NDB_NODES, nodeGroupRecord);
+ cnghash = (cnghash * 33) + NGPtr.p->m_ref_count;
+ }
+ RSS_OP_SNAPSHOT_CHECK(cnghash);
+ }
}
DECLARE_DUMP0(DBDIH, 7213, "Set error 7213 with extra arg")
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2008-12-01 18:05:11 +0000
@@ -859,6 +859,11 @@ public:
* Log part
*/
Uint32 m_log_part_ptr_i;
+
+ /**
+ * Instance key for fast access.
+ */
+ Uint16 lqhInstanceKey;
};
typedef Ptr<Fragrecord> FragrecordPtr;
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp 2008-11-16 12:59:12 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp 2008-12-01 18:05:11 +0000
@@ -72,3 +72,10 @@ NdbLogPartInfo::partNoIndex(Uint32 lpno)
assert(partNo[i] == lpno);
return i;
}
+
+Uint32
+NdbLogPartInfo::instanceKey(Uint32 lpno) const
+{
+ assert(lpno < LogParts);
+ return 1 + lpno;
+}
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp 2008-11-16 12:59:12 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp 2008-12-01 18:05:11 +0000
@@ -47,6 +47,7 @@ struct NdbLogPartInfo {
bool partNoOwner(Uint32 lpno) const;
bool partNoOwner(Uint32 tabId, Uint32 fragId);
Uint32 partNoIndex(Uint32 lpno) const;
+ Uint32 instanceKey(Uint32 lpno) const;
};
#endif
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-12-03 19:49:40 +0000
@@ -720,7 +720,10 @@ void Dblqh::startphase1Lab(Signal* signa
for (Ti = 0; Ti < chostFileSize; Ti++) {
ThostPtr.i = Ti;
ptrCheckGuard(ThostPtr, chostFileSize, hostRecord);
- // wl4391_todo using own instance() does not work with mixed versions
+ /*
+ * Valid only if receiver has same number of LQH workers.
+ * In general full instance key of fragment must be used.
+ */
ThostPtr.p->hostLqhBlockRef = calcInstanceBlockRef(DBLQH, ThostPtr.i);
ThostPtr.p->hostTcBlockRef = calcTcBlockRef(ThostPtr.i);
ThostPtr.p->inPackedList = false;
@@ -1594,6 +1597,7 @@ void Dblqh::execLQHFRAGREQ(Signal* signa
ndbrequire(ptr.p->logPartNo == logPartNo);
fragptr.p->m_log_part_ptr_i = ptr.i;
+ fragptr.p->lqhInstanceKey = lpinfo.instanceKey(logPartNo);
}
if (DictTabInfo::isOrderedIndex(tabptr.p->tableType)) {
@@ -3012,99 +3016,163 @@ void Dblqh::sendCommitLqh(Signal* signal
HostRecordPtr Thostptr;
Thostptr.i = refToNode(alqhBlockref);
ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+
+ Uint32 Tdata[5];
+ Tdata[0] = tcConnectptr.p->clientConnectrec;
+ Tdata[1] = tcConnectptr.p->gci_hi;
+ Tdata[2] = tcConnectptr.p->transid[0];
+ Tdata[3] = tcConnectptr.p->transid[1];
+ Tdata[4] = tcConnectptr.p->gci_lo;
+ Uint32 len = 5;
+
+ if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version)))
+ {
+ jam();
+ ndbassert(Tdata[4] == 0 || getNodeInfo(Thostptr.i).m_connected == false);
+ len = 4;
+ }
+
+ // currently packed signal cannot address specific instance
+ const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0;
+ if (send_unpacked) {
+ jam();
+ FragrecordPtr Tfragptr;
+ Tfragptr.i = tcConnectptr.p->fragmentptr;
+ c_fragment_pool.getPtr(Tfragptr);
+ memcpy(&signal->theData[0], &Tdata[0], len << 2);
+ Uint32 Tnode = Thostptr.i;
+ Uint32 instanceKey = Tfragptr.p->lqhInstanceKey;
+ BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
+ sendSignal(lqhRef, GSN_COMMIT, signal, len, JBB);
+ return;
+ }
+
if (Thostptr.p->noOfPackedWordsLqh > 25 - 5) {
jam();
sendPackedSignalLqh(signal, Thostptr.p);
} else {
jam();
updatePackedList(signal, Thostptr.p, Thostptr.i);
- }//if
- Uint32 pos = Thostptr.p->noOfPackedWordsLqh;
- Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMMIT << 28);
- Uint32 gci_hi = tcConnectptr.p->gci_hi;
- Uint32 gci_lo = tcConnectptr.p->gci_lo;
- Uint32 transid1 = tcConnectptr.p->transid[0];
- Uint32 transid2 = tcConnectptr.p->transid[1];
- Thostptr.p->packedWordsLqh[pos] = ptrAndType;
- Thostptr.p->packedWordsLqh[pos + 1] = gci_hi;
- Thostptr.p->packedWordsLqh[pos + 2] = transid1;
- Thostptr.p->packedWordsLqh[pos + 3] = transid2;
- Thostptr.p->packedWordsLqh[pos + 4] = gci_lo;
- Thostptr.p->noOfPackedWordsLqh = pos + 5;
-
- if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version)))
- {
- jam();
- ndbassert(gci_lo == 0 || getNodeInfo(Thostptr.i).m_connected == false);
- Thostptr.p->noOfPackedWordsLqh = pos + 4;
}
-}//Dblqh::sendCommitLqh()
+
+ Tdata[0] |= (ZCOMMIT << 28);
+ Uint32 pos = Thostptr.p->noOfPackedWordsLqh;
+ memcpy(&Thostptr.p->packedWordsLqh[pos], &Tdata[0], len << 2);
+ Thostptr.p->noOfPackedWordsLqh = pos + len;
+}
void Dblqh::sendCompleteLqh(Signal* signal, BlockReference alqhBlockref)
{
HostRecordPtr Thostptr;
Thostptr.i = refToNode(alqhBlockref);
ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+
+ Uint32 Tdata[3];
+ Tdata[0] = tcConnectptr.p->clientConnectrec;
+ Tdata[1] = tcConnectptr.p->transid[0];
+ Tdata[2] = tcConnectptr.p->transid[1];
+ Uint32 len = 3;
+
+ // currently packed signal cannot address specific instance
+ const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0;
+ if (send_unpacked) {
+ jam();
+ FragrecordPtr Tfragptr;
+ Tfragptr.i = tcConnectptr.p->fragmentptr;
+ c_fragment_pool.getPtr(Tfragptr);
+ memcpy(&signal->theData[0], &Tdata[0], len << 2);
+ Uint32 Tnode = Thostptr.i;
+ Uint32 instanceKey = Tfragptr.p->lqhInstanceKey;
+ BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
+ sendSignal(lqhRef, GSN_COMPLETE, signal, len, JBB);
+ return;
+ }
+
if (Thostptr.p->noOfPackedWordsLqh > 22) {
jam();
sendPackedSignalLqh(signal, Thostptr.p);
} else {
jam();
updatePackedList(signal, Thostptr.p, Thostptr.i);
- }//if
+ }
+
+ Tdata[0] |= (ZCOMPLETE << 28);
Uint32 pos = Thostptr.p->noOfPackedWordsLqh;
- Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMPLETE << 28);
- Uint32 transid1 = tcConnectptr.p->transid[0];
- Uint32 transid2 = tcConnectptr.p->transid[1];
- Thostptr.p->packedWordsLqh[pos] = ptrAndType;
- Thostptr.p->packedWordsLqh[pos + 1] = transid1;
- Thostptr.p->packedWordsLqh[pos + 2] = transid2;
- Thostptr.p->noOfPackedWordsLqh = pos + 3;
-}//Dblqh::sendCompleteLqh()
+ memcpy(&Thostptr.p->packedWordsLqh[pos], &Tdata[0], len << 2);
+ Thostptr.p->noOfPackedWordsLqh = pos + len;
+}
void Dblqh::sendCommittedTc(Signal* signal, BlockReference atcBlockref)
{
HostRecordPtr Thostptr;
Thostptr.i = refToNode(atcBlockref);
ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+
+ Uint32 Tdata[3];
+ Tdata[0] = tcConnectptr.p->clientConnectrec;
+ Tdata[1] = tcConnectptr.p->transid[0];
+ Tdata[2] = tcConnectptr.p->transid[1];
+ Uint32 len = 3;
+
+ // currently TC is single-threaded
+ const bool send_unpacked = false;
+ if (send_unpacked) {
+ jam();
+ memcpy(&signal->theData[0], &Tdata[0], len << 2);
+ BlockReference tcRef = Thostptr.p->hostTcBlockRef;
+ sendSignal(tcRef, GSN_COMMITTED, signal, len, JBB);
+ return;
+ }
+
if (Thostptr.p->noOfPackedWordsTc > 22) {
jam();
sendPackedSignalTc(signal, Thostptr.p);
} else {
jam();
updatePackedList(signal, Thostptr.p, Thostptr.i);
- }//if
+ }
+
+ Tdata[0] |= (ZCOMMITTED << 28);
Uint32 pos = Thostptr.p->noOfPackedWordsTc;
- Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMMITTED << 28);
- Uint32 transid1 = tcConnectptr.p->transid[0];
- Uint32 transid2 = tcConnectptr.p->transid[1];
- Thostptr.p->packedWordsTc[pos] = ptrAndType;
- Thostptr.p->packedWordsTc[pos + 1] = transid1;
- Thostptr.p->packedWordsTc[pos + 2] = transid2;
- Thostptr.p->noOfPackedWordsTc = pos + 3;
-}//Dblqh::sendCommittedTc()
+ memcpy(&Thostptr.p->packedWordsTc[pos], &Tdata[0], len << 2);
+ Thostptr.p->noOfPackedWordsTc = pos + len;
+}
void Dblqh::sendCompletedTc(Signal* signal, BlockReference atcBlockref)
{
HostRecordPtr Thostptr;
Thostptr.i = refToNode(atcBlockref);
ptrCheckGuard(Thostptr, chostFileSize, hostRecord);
+
+ Uint32 Tdata[3];
+ Tdata[0] = tcConnectptr.p->clientConnectrec;
+ Tdata[1] = tcConnectptr.p->transid[0];
+ Tdata[2] = tcConnectptr.p->transid[1];
+ Uint32 len = 3;
+
+ // currently TC is single-threaded
+ const bool send_unpacked = false;
+ if (send_unpacked) {
+ jam();
+ memcpy(&signal->theData[0], &Tdata[0], len << 2);
+ BlockReference tcRef = Thostptr.p->hostTcBlockRef;
+ sendSignal(tcRef, GSN_COMMITTED, signal, len, JBB);
+ return;
+ }
+
if (Thostptr.p->noOfPackedWordsTc > 22) {
jam();
sendPackedSignalTc(signal, Thostptr.p);
} else {
jam();
updatePackedList(signal, Thostptr.p, Thostptr.i);
- }//if
+ }
+
+ Tdata[0] |= (ZCOMPLETED << 28);
Uint32 pos = Thostptr.p->noOfPackedWordsTc;
- Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMPLETED << 28);
- Uint32 transid1 = tcConnectptr.p->transid[0];
- Uint32 transid2 = tcConnectptr.p->transid[1];
- Thostptr.p->packedWordsTc[pos] = ptrAndType;
- Thostptr.p->packedWordsTc[pos + 1] = transid1;
- Thostptr.p->packedWordsTc[pos + 2] = transid2;
- Thostptr.p->noOfPackedWordsTc = pos + 3;
-}//Dblqh::sendCompletedTc()
+ memcpy(&Thostptr.p->packedWordsTc[pos], &Tdata[0], len << 2);
+ Thostptr.p->noOfPackedWordsTc = pos + len;
+}
void Dblqh::sendLqhkeyconfTc(Signal* signal, BlockReference atcBlockref)
{
@@ -3130,7 +3198,9 @@ void Dblqh::sendLqhkeyconfTc(Signal* sig
lqhKeyConf = (LqhKeyConf *)
&Thostptr.p->packedWordsTc[Thostptr.p->noOfPackedWordsTc];
Thostptr.p->noOfPackedWordsTc += LqhKeyConf::SignalLength;
- } else if(refToMain(atcBlockref) == DBLQH){
+ } else if(refToMain(atcBlockref) == DBLQH &&
+ refToInstance(atcBlockref) == instance()) {
+ //wl4391_todo check
jam();
/*******************************************************************
// This signal was intended for DBLQH as part of log execution or
@@ -5781,8 +5851,10 @@ void Dblqh::packLqhkeyreqLab(Signal* sig
lqhKeyReq->variableData[nextPos + 0] = sig0;
nextPos += LqhKeyReq::getGCIFlag(Treqinfo);
- // wl4391_todo for mixed versions must recompute full instance key here
- BlockReference lqhRef = calcInstanceBlockRef(DBLQH, regTcPtr->nextReplica);
+ // pass full instance key for remote to map to real instance
+ BlockReference lqhRef = numberToRef(DBLQH,
+ fragptr.p->lqhInstanceKey,
+ regTcPtr->nextReplica);
if (likely(sendLongReq))
{
@@ -6112,7 +6184,17 @@ void Dblqh::writeAttrinfoLab(Signal* sig
/* ------------------------------------------------------------------------- */
void Dblqh::sendTupkey(Signal* signal)
{
- BlockReference lqhRef = calcInstanceBlockRef(DBLQH, tcConnectptr.p->nextReplica);
+ BlockReference lqhRef = 0;
+ {
+ // wl4391_todo fragptr
+ FragrecordPtr Tfragptr;
+ Tfragptr.i = tcConnectptr.p->fragmentptr;
+ c_fragment_pool.getPtr(Tfragptr);
+ Uint32 Tnode = tcConnectptr.p->nextReplica;
+ Uint32 instanceKey = Tfragptr.p->lqhInstanceKey;
+ lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
+ }
+
signal->theData[0] = tcConnectptr.p->tcOprec;
signal->theData[1] = tcConnectptr.p->transid[0];
signal->theData[2] = tcConnectptr.p->transid[1];
@@ -7286,7 +7368,12 @@ void Dblqh::execABORT(Signal* signal)
/* ------------------------------------------------------------------------- */
// We will immediately send the ABORT message also to the next LQH node in line.
/* ------------------------------------------------------------------------- */
- BlockReference TLqhRef = calcInstanceBlockRef(DBLQH, regTcPtr->nextReplica);
+ FragrecordPtr Tfragptr;
+ Tfragptr.i = regTcPtr->fragmentptr;
+ c_fragment_pool.getPtr(Tfragptr);
+ Uint32 Tnode = regTcPtr->nextReplica;
+ Uint32 instanceKey = Tfragptr.p->lqhInstanceKey;
+ BlockReference TLqhRef = numberToRef(DBLQH, instanceKey, Tnode);
signal->theData[0] = regTcPtr->tcOprec;
signal->theData[1] = regTcPtr->tcBlockref;
signal->theData[2] = regTcPtr->transid[0];
@@ -11909,10 +11996,6 @@ void Dblqh::execLCP_FRAG_ORD(Signal* sig
if (cnoOfFragsCheckpointed > 0) {
jam();
completeLcpRoundLab(signal, lcpId);
- } else if (isNdbMtLqh()) {
- jam();
- // makes proxy code simpler
- completeLcpRoundLab(signal, lcpId);
} else {
jam();
sendLCP_COMPLETE_REP(signal, lcpId);
@@ -14141,7 +14224,7 @@ void Dblqh::openFileRw(Signal* signal, L
signal->theData[3] = olfLogFilePtr.p->fileName[1];
signal->theData[4] = olfLogFilePtr.p->fileName[2];
signal->theData[5] = olfLogFilePtr.p->fileName[3];
- signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE;
+ signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE | FsOpenReq::OM_WRITE_BUFFER;
if (c_o_direct)
signal->theData[6] |= FsOpenReq::OM_DIRECT;
req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord);
@@ -14167,7 +14250,7 @@ void Dblqh::openLogfileInit(Signal* sign
signal->theData[3] = logFilePtr.p->fileName[1];
signal->theData[4] = logFilePtr.p->fileName[2];
signal->theData[5] = logFilePtr.p->fileName[3];
- signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE | FsOpenReq::OM_AUTOSYNC;
+ signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_WRITE_BUFFER;
if (c_o_direct)
signal->theData[6] |= FsOpenReq::OM_DIRECT;
@@ -14271,7 +14354,7 @@ void Dblqh::openNextLogfile(Signal* sign
signal->theData[3] = onlLogFilePtr.p->fileName[1];
signal->theData[4] = onlLogFilePtr.p->fileName[2];
signal->theData[5] = onlLogFilePtr.p->fileName[3];
- signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE;
+ signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE | FsOpenReq::OM_WRITE_BUFFER;
if (c_o_direct)
signal->theData[6] |= FsOpenReq::OM_DIRECT;
req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord);
@@ -15215,10 +15298,15 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
* WE NEED TO SEND THOSE SIGNALS EVEN IF WE HAVE NOT REQUESTED
* ANY FRAGMENTS PARTICIPATE IN THIS PHASE.
* --------------------------------------------------------------------- */
- BlockNumber lqhBlockNo = numberToBlock(DBLQH, instance());
- NodeReceiverGroup rg(lqhBlockNo, m_sr_nodes);
signal->theData[0] = cownNodeid;
- sendSignal(rg, GSN_EXEC_SRREQ, signal, 1, JBB);
+ if (!isNdbMtLqh()) {
+ NodeReceiverGroup rg(DBLQH, m_sr_nodes);
+ sendSignal(rg, GSN_EXEC_SRREQ, signal, 1, JBB);
+ } else {
+ const Uint32 sz = NdbNodeBitmask::Size;
+ m_sr_nodes.copyto(sz, &signal->theData[1]);
+ sendSignal(DBLQH_REF, GSN_EXEC_SRREQ, signal, 1 + sz, JBB);
+ }
return;
} else {
jam();
@@ -15232,7 +15320,9 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
Uint32 index = csrPhasesCompleted;
arrGuard(index, MAX_LOG_EXEC);
- BlockReference ref = calcInstanceBlockRef(DBLQH, fragptr.p->srLqhLognode[index]);
+ Uint32 Tnode = fragptr.p->srLqhLognode[index];
+ Uint32 instanceKey = fragptr.p->lqhInstanceKey;
+ BlockReference ref = numberToRef(DBLQH, instanceKey, Tnode);
fragptr.p->srStatus = Fragrecord::SS_STARTED;
/* --------------------------------------------------------------------
@@ -15249,7 +15339,6 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
execFragReq->lastGci = fragptr.p->srLastGci[index];
sendSignal(ref, GSN_EXEC_FRAGREQ, signal,
ExecFragReq::SignalLength, JBB);
-
}
signal->theData[0] = next;
sendSignal(cownref, GSN_START_EXEC_SR, signal, 1, JBB);
@@ -15339,6 +15428,13 @@ void Dblqh::execEXEC_FRAGREF(Signal* sig
/* *************** */
void Dblqh::execEXEC_SRCONF(Signal* signal)
{
+ // wl4391_todo workaround until timing fixed
+ if (cnoOutstandingExecFragReq != 0) {
+ ndbout << "delay: reqs=" << cnoOutstandingExecFragReq << endl;
+ sendSignalWithDelay(reference(), GSN_EXEC_SRCONF,
+ signal, 10, signal->getLength());
+ return;
+ }
jamEntry();
Uint32 nodeId = signal->theData[0];
arrGuard(nodeId, MAX_NDB_NODES);
@@ -16704,9 +16800,14 @@ void Dblqh::srPhase3Comp(Signal* signal)
jamEntry();
signal->theData[0] = cownNodeid;
- BlockNumber lqhBlockNo = numberToBlock(DBLQH, instance());
- NodeReceiverGroup rg(lqhBlockNo, m_sr_nodes);
- sendSignal(rg, GSN_EXEC_SRCONF, signal, 1, JBB);
+ if (!isNdbMtLqh()) {
+ NodeReceiverGroup rg(DBLQH, m_sr_nodes);
+ sendSignal(rg, GSN_EXEC_SRCONF, signal, 1, JBB);
+ } else {
+ const Uint32 sz = NdbNodeBitmask::Size;
+ m_sr_nodes.copyto(sz, &signal->theData[1]);
+ sendSignal(DBLQH_REF, GSN_EXEC_SRCONF, signal, 1 + sz, JBB);
+ }
return;
}//Dblqh::srPhase3Comp()
@@ -19543,7 +19644,7 @@ Dblqh::validate_filter(Signal* signal)
default:
infoEvent("Invalid filter op: 0x%x pos: %ld",
* start,
- start - (signal->theData + 1));
+ (long int)(start - (signal->theData + 1)));
return false;
}
}
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2008-11-16 15:28:22 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2008-12-03 19:49:40 +0000
@@ -84,6 +84,10 @@ DblqhProxy::DblqhProxy(Block_context& ct
// GSN_SUB_GCP_COMPLETE_REP
addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &DblqhProxy::execSUB_GCP_COMPLETE_REP);
+
+ // GSN_EXEC_SRREQ
+ addRecSignal(GSN_EXEC_SRREQ, &DblqhProxy::execEXEC_SRREQ);
+ addRecSignal(GSN_EXEC_SRCONF, &DblqhProxy::execEXEC_SRCONF);
}
DblqhProxy::~DblqhProxy()
@@ -443,6 +447,12 @@ DblqhProxy::sendLCP_FRAG_ORD(Signal* sig
return;
}
+ if (!ss.m_active.get(ss.m_worker)) {
+ jam();
+ D("LCP: active" << V(ss.m_worker));
+ ss.m_active.set(ss.m_worker);
+ }
+
sendSignal(workerRef(ss.m_worker), GSN_LCP_FRAG_ORD,
signal, LcpFragOrd::SignalLength, JBB);
}
@@ -488,6 +498,12 @@ DblqhProxy::execLCP_COMPLETE_ORD(Signal*
Ss_LCP_COMPLETE_ORD& ss = ssSeize<Ss_LCP_COMPLETE_ORD>(ssId);
ss.m_req = *req;
+ Ss_LCP_FRAG_ORD& ssLcp = ssFind<Ss_LCP_FRAG_ORD>(ssId);
+ const Uint32 activeCount = ssLcp.m_active.count();
+ D("LCP: complete" << V(activeCount));
+ // database with no fragments is not handled
+ ndbrequire(activeCount != 0);
+
// seize END_LCP_REQ records
Uint32 i;
for (i = 0; i < ss.BlockCnt; i++) {
@@ -497,9 +513,10 @@ DblqhProxy::execLCP_COMPLETE_ORD(Signal*
Uint32 ssIdEnd = getSsId(&tmp);
Ss_END_LCP_REQ& ssEnd = ssSeize<Ss_END_LCP_REQ>(ssIdEnd);
ss.m_endLcp[i].m_ssId = ssIdEnd;
+ ssEnd.m_ssIdLcp = ssId;
- // set wait-for bitmask in SsParallel
- setMask(ssEnd);
+ // set wait-for bitmask
+ setMask(ssEnd, ssLcp.m_active);
}
sendREQ(signal, ss);
@@ -655,6 +672,13 @@ void
DblqhProxy::sendEND_LCP_CONF(Signal* signal, Uint32 ssId)
{
Ss_END_LCP_REQ& ss = ssFind<Ss_END_LCP_REQ>(ssId);
+ Ss_LCP_FRAG_ORD& ssLcp = ssFind<Ss_LCP_FRAG_ORD>(ss.m_ssIdLcp);
+
+ // workers handling no fragments sent no REQ and get no CONF
+ if (!ssLcp.m_active.get(ss.m_worker)) {
+ jam();
+ return;
+ }
EndLcpConf* conf = (EndLcpConf*)signal->getDataPtrSend();
conf->senderData = ss.m_req[ss.m_worker].senderData;
@@ -1327,6 +1351,114 @@ DblqhProxy::sendEMPTY_LCP_CONF(Signal* s
}
ssRelease<Ss_EMPTY_LCP_REQ>(ssId);
+}
+
+// GSN_EXEC_SR_1 [fictional gsn ]
+
+void
+DblqhProxy::execEXEC_SRREQ(Signal* signal)
+{
+ const BlockReference senderRef = signal->getSendersBlockRef();
+
+ if (refToInstance(senderRef) != 0) {
+ jam();
+ execEXEC_SR_2(signal, GSN_EXEC_SRREQ);
+ return;
+ }
+
+ execEXEC_SR_1(signal, GSN_EXEC_SRREQ);
+}
+
+void
+DblqhProxy::execEXEC_SRCONF(Signal* signal)
+{
+ const BlockReference senderRef = signal->getSendersBlockRef();
+
+ if (refToInstance(senderRef) != 0) {
+ jam();
+ execEXEC_SR_2(signal, GSN_EXEC_SRCONF);
+ return;
+ }
+
+ execEXEC_SR_1(signal, GSN_EXEC_SRCONF);
+}
+
+void
+DblqhProxy::execEXEC_SR_1(Signal* signal, GlobalSignalNumber gsn)
+{
+ ndbrequire(signal->getLength() == Ss_EXEC_SR_1::Sig::SignalLength);
+
+ const Ss_EXEC_SR_1::Sig* sig =
+ (const Ss_EXEC_SR_1::Sig*)signal->getDataPtr();
+ Uint32 ssId = getSsId(sig);
+ Ss_EXEC_SR_1& ss = ssSeize<Ss_EXEC_SR_1>(ssId);
+ ss.m_gsn = gsn;
+ ss.m_sig = *sig;
+
+ sendREQ(signal, ss);
+ ssRelease<Ss_EXEC_SR_1>(ss);
+}
+
+void
+DblqhProxy::sendEXEC_SR_1(Signal* signal, Uint32 ssId)
+{
+ Ss_EXEC_SR_1& ss = ssFind<Ss_EXEC_SR_1>(ssId);
+ signal->theData[0] = ss.m_sig.nodeId;
+ sendSignal(workerRef(ss.m_worker), ss.m_gsn, signal, 1, JBB);
+}
+
+// GSN_EXEC_SRREQ_2 [ fictional gsn ]
+
+void
+DblqhProxy::execEXEC_SR_2(Signal* signal, GlobalSignalNumber gsn)
+{
+ ndbrequire(signal->getLength() == Ss_EXEC_SR_2::Sig::SignalLength);
+
+ const Ss_EXEC_SR_2::Sig* sig =
+ (const Ss_EXEC_SR_2::Sig*)signal->getDataPtr();
+ Uint32 ssId = getSsId(sig);
+
+ bool found = false;
+ Ss_EXEC_SR_2& ss = ssFindSeize<Ss_EXEC_SR_2>(ssId, &found);
+ if (!found) {
+ jam();
+ setMask(ss);
+ }
+
+ ndbrequire(sig->nodeId == getOwnNodeId());
+ if (ss.m_sigcount == 0) {
+ jam();
+ ss.m_gsn = gsn;
+ ss.m_sig = *sig;
+ } else {
+ jam();
+ ndbrequire(ss.m_gsn == gsn);
+ ndbrequire(memcmp(&ss.m_sig, sig, sizeof(*sig)) == 0);
+ }
+ ss.m_sigcount++;
+
+ // reversed roles
+ recvCONF(signal, ss);
+}
+
+void
+DblqhProxy::sendEXEC_SR_2(Signal* signal, Uint32 ssId)
+{
+ Ss_EXEC_SR_2& ss = ssFind<Ss_EXEC_SR_2>(ssId);
+
+ if (!lastReply(ss)) {
+ jam();
+ return;
+ }
+
+ NodeBitmask nodes;
+ nodes.assign(NdbNodeBitmask::Size, ss.m_sig.sr_nodes);
+ NodeReceiverGroup rg(DBLQH, nodes);
+
+ signal->theData[0] = ss.m_sig.nodeId;
+ sendSignal(rg, ss.m_gsn, signal, 1, JBB);
+
+ ssRelease<Ss_EXEC_SR_2>(ssId);
}
BLOCK_FUNCTIONS(DblqhProxy)
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 2008-11-16 15:28:22 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 2008-12-03 19:49:40 +0000
@@ -126,6 +126,7 @@ protected:
* set and is treated as a fictional signal GSN_LCP_COMPLETE_ORD.
*/
static const char* name() { return "LCP_FRAG_ORD"; }
+ WorkerMask m_active; // handled at least 1 fragment
Ss_LCP_FRAG_ORD() {
m_sendREQ = (SsFUNC)&DblqhProxy::sendLCP_FRAG_ORD;
m_sendCONF = (SsFUNC)0;
@@ -187,6 +188,7 @@ protected:
* Note TSMAN sends no END_LCP_CONF.
*/
static const char* name() { return "END_LCP_REQ"; }
+ Uint32 m_ssIdLcp;
Uint32 m_reqcount;
Uint32 m_backupId;
Uint32 m_proxyBlockNo;
@@ -195,6 +197,7 @@ protected:
Ss_END_LCP_REQ() {
m_sendREQ = (SsFUNC)&DblqhProxy::sendEND_LCP_CONF;
m_sendCONF = (SsFUNC)&DblqhProxy::sendEND_LCP_REQ;
+ m_ssIdLcp = 0;
m_reqcount = 0;
m_backupId = 0;
m_proxyBlockNo = 0;
@@ -380,9 +383,7 @@ protected:
// GSN_START_RECREQ_2 [ sub-op, fictional gsn ]
struct Ss_START_RECREQ_2 : SsParallel {
-#ifdef VM_TRACE
static const char* name() { return "START_RECREQ_2"; }
-#endif
struct Req {
enum { SignalLength = 2 };
Uint32 lcpId;
@@ -458,6 +459,71 @@ protected:
void execEMPTY_LCP_CONF(Signal*);
void execEMPTY_LCP_REF(Signal*);
void sendEMPTY_LCP_CONF(Signal*, Uint32 ssId);
+
+ // GSN_EXEC_SR_1 [ fictional gsn ]
+ struct Ss_EXEC_SR_1 : SsParallel {
+ /*
+ * Handle EXEC_SRREQ and EXEC_SRCONF. These are broadcast
+ * signals (not REQ/CONF). EXEC_SR_1 receives one signal and
+ * sends it to its workers. EXEC_SR_2 waits for signal from
+ * all workers and broadcasts it to all nodes. These are
+ * required to handle mixed versions (non-mt, mt-lqh-1,2,4).
+ */
+ static const char* name() { return "EXEC_SR_1"; }
+ struct Sig {
+ enum { SignalLength = 1 };
+ Uint32 nodeId;
+ };
+ GlobalSignalNumber m_gsn;
+ Sig m_sig;
+ Ss_EXEC_SR_1() {
+ m_sendREQ = (SsFUNC)&DblqhProxy::sendEXEC_SR_1;
+ m_sendCONF = (SsFUNC)0;
+ m_gsn = 0;
+ };
+ enum { poolSize = 1 };
+ static SsPool<Ss_EXEC_SR_1>& pool(LocalProxy* proxy) {
+ return ((DblqhProxy*)proxy)->c_ss_EXEC_SR_1;
+ }
+ };
+ SsPool<Ss_EXEC_SR_1> c_ss_EXEC_SR_1;
+ Uint32 getSsId(const Ss_EXEC_SR_1::Sig* sig) {
+ return SsIdBase | refToNode(sig->nodeId);
+ };
+ void execEXEC_SRREQ(Signal*);
+ void execEXEC_SRCONF(Signal*);
+ void execEXEC_SR_1(Signal*, GlobalSignalNumber gsn);
+ void sendEXEC_SR_1(Signal*, Uint32 ssId);
+
+ // GSN_EXEC_SR_2 [ fictional gsn ]
+ struct Ss_EXEC_SR_2 : SsParallel {
+ static const char* name() { return "EXEC_SR_2"; }
+ struct Sig {
+ enum { SignalLength = 1 + NdbNodeBitmask::Size };
+ Uint32 nodeId;
+ Uint32 sr_nodes[NdbNodeBitmask::Size]; // local signal so ok to add
+ };
+ GlobalSignalNumber m_gsn;
+ Uint32 m_sigcount;
+ Sig m_sig; // all signals must be identical
+ Ss_EXEC_SR_2() {
+ // reversed roles
+ m_sendREQ = (SsFUNC)0;
+ m_sendCONF = (SsFUNC)&DblqhProxy::sendEXEC_SR_2;
+ m_gsn = 0;
+ m_sigcount = 0;
+ };
+ enum { poolSize = 1 };
+ static SsPool<Ss_EXEC_SR_2>& pool(LocalProxy* proxy) {
+ return ((DblqhProxy*)proxy)->c_ss_EXEC_SR_2;
+ }
+ };
+ SsPool<Ss_EXEC_SR_2> c_ss_EXEC_SR_2;
+ Uint32 getSsId(const Ss_EXEC_SR_2::Sig* sig) {
+ return SsIdBase | refToNode(sig->nodeId);
+ };
+ void execEXEC_SR_2(Signal*, GlobalSignalNumber gsn);
+ void sendEXEC_SR_2(Signal*, Uint32 ssId);
};
#endif
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2008-10-30 09:43:50 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2008-12-03 19:51:33 +0000
@@ -1385,7 +1385,7 @@ private:
TcConnectRecord * const regTcPtr);
void sendCompleteLqh(Signal* signal,
TcConnectRecord * const regTcPtr);
- void sendTCKEY_FAILREF(Signal* signal, const ApiConnectRecord *);
+ void sendTCKEY_FAILREF(Signal* signal, ApiConnectRecord *);
void sendTCKEY_FAILCONF(Signal* signal, ApiConnectRecord *);
void routeTCKEY_FAILREFCONF(Signal* signal, const ApiConnectRecord *,
Uint32 gsn, Uint32 len);
@@ -1462,6 +1462,7 @@ private:
void seizeApiConnect(Signal* signal);
void seizeApiConnectCopy(Signal* signal);
void seizeApiConnectFail(Signal* signal);
+ void crash_gcp(Uint32 line);
void seizeGcp(Ptr<GcpRecord> & dst, Uint64 gci);
void seizeTcConnect(Signal* signal);
void seizeTcConnectFail(Signal* signal);
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp 2008-10-05 07:14:21 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp 2008-12-03 19:51:33 +0000
@@ -316,7 +316,8 @@ Dbtc::Dbtc(Block_context& ctx):
&cachePtr,
&hostptr,
&timeOutptr,
- &scanFragptr };
+ &scanFragptr,
+ &tcNodeFailptr };
init_globals_list(tmp, sizeof(tmp)/sizeof(tmp[0]));
}
#endif
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2008-11-13 15:05:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2008-12-03 19:51:33 +0000
@@ -4685,7 +4685,15 @@ void Dbtc::commitGciHandling(Signal* sig
linkApiToGcp(localGcpPointer, regApiPtr);
return;
} else {
- ndbrequire(regApiPtr.p->globalcheckpointid > localGcpPointer.p->gcpId);
+ if (unlikely(! (regApiPtr.p->globalcheckpointid > localGcpPointer.p->gcpId)))
+ {
+ ndbout_c("%u/%u %u/%u",
+ Uint32(regApiPtr.p->globalcheckpointid >> 32),
+ Uint32(regApiPtr.p->globalcheckpointid),
+ Uint32(localGcpPointer.p->gcpId >> 32),
+ Uint32(localGcpPointer.p->gcpId));
+ crash_gcp(__LINE__);
+ }
localGcpPointer.i = localGcpPointer.p->nextGcp;
jam();
if (localGcpPointer.i != RNIL) {
@@ -4734,6 +4742,33 @@ void Dbtc::linkApiToGcp(Ptr<GcpRecord> r
regGcpPtr.p->lastApiConnect = regApiPtr.i;
}//Dbtc::linkApiToGcp()
+void
+Dbtc::crash_gcp(Uint32 line)
+{
+ UintR Tfirstgcp = cfirstgcp;
+ UintR TgcpFilesize = cgcpFilesize;
+ GcpRecord *localGcpRecord = gcpRecord;
+ GcpRecordPtr localGcpPointer;
+
+ localGcpPointer.i = cfirstgcp;
+
+ while (localGcpPointer.i != RNIL)
+ {
+ ptrCheckGuard(localGcpPointer, cgcpFilesize, gcpRecord);
+ ndbout_c("%u : %u/%u nomoretrans: %u api %u %u next: %u",
+ localGcpPointer.i,
+ Uint32(localGcpPointer.p->gcpId >> 32),
+ Uint32(localGcpPointer.p->gcpId),
+ localGcpPointer.p->gcpNomoretransRec,
+ localGcpPointer.p->firstApiConnect,
+ localGcpPointer.p->lastApiConnect,
+ localGcpPointer.p->nextGcp);
+ localGcpPointer.i = localGcpPointer.p->nextGcp;
+ }
+ progError(line, NDBD_EXIT_NDBREQUIRE);
+ ndbrequire(false);
+}
+
void Dbtc::seizeGcp(Ptr<GcpRecord> & dst, Uint64 Tgci)
{
GcpRecordPtr tmpGcpPointer;
@@ -4744,6 +4779,11 @@ void Dbtc::seizeGcp(Ptr<GcpRecord> & dst
GcpRecord *localGcpRecord = gcpRecord;
localGcpPointer.i = cfirstfreeGcp;
+ if (unlikely(localGcpPointer.i > TgcpFilesize))
+ {
+ ndbout_c("%u/%u", Uint32(Tgci >> 32), Uint32(Tgci));
+ crash_gcp(__LINE__);
+ }
ptrCheckGuard(localGcpPointer, TgcpFilesize, localGcpRecord);
UintR TfirstfreeGcp = localGcpPointer.p->nextGcp;
localGcpPointer.p->gcpId = Tgci;
@@ -4829,25 +4869,30 @@ void Dbtc::sendCommitLqh(Signal* signal,
Thostptr.i = regTcPtr->lastLqhNodeId;
ptrCheckGuard(Thostptr, ThostFilesize, hostRecord);
- UintR Tdata1 = regTcPtr->lastLqhCon;
- UintR Tdata2 = Uint32(regApiPtr->globalcheckpointid >> 32);
- UintR Tdata3 = regApiPtr->transid[0];
- UintR Tdata4 = regApiPtr->transid[1];
- UintR Tdata5 = Uint32(regApiPtr->globalcheckpointid);
+ Uint32 Tdata[5];
+ Tdata[0] = regTcPtr->lastLqhCon;
+ Tdata[1] = Uint32(regApiPtr->globalcheckpointid >> 32);
+ Tdata[2] = regApiPtr->transid[0];
+ Tdata[3] = regApiPtr->transid[1];
+ Tdata[4] = Uint32(regApiPtr->globalcheckpointid);
+ Uint32 len = 5;
+
+ if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version)))
+ {
+ jam();
+ ndbassert(Tdata[4] == 0 || getNodeInfo(Thostptr.i).m_connected == false);
+ len = 4;
+ }
- // wl4391_todo testing own config is wrong for mixed versions
- bool send_unpacked = isNdbMtLqh();
+ // currently packed signal cannot address specific instance
+ const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0;
if (send_unpacked) {
Uint32* data = signal->getDataPtrSend();
- data[0] = Tdata1;
- data[1] = Tdata2;
- data[2] = Tdata3;
- data[3] = Tdata4;
- data[4] = Tdata5;
+ memcpy(&signal->theData[0], &Tdata[0], len << 2);
Uint32 Tnode = Thostptr.i;
Uint32 instanceKey = regTcPtr->lqhInstanceKey;
BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
- sendSignal(lqhRef, GSN_COMMIT, signal, 5, JBB);
+ sendSignal(lqhRef, GSN_COMMIT, signal, len, JBB);
return;
}
@@ -4857,23 +4902,14 @@ void Dbtc::sendCommitLqh(Signal* signal,
} else {
jam();
updatePackedList(signal, Thostptr.p, Thostptr.i);
- }//if
+ }
+
+ Tdata[0] |= (ZCOMMIT << 28);
UintR Tindex = Thostptr.p->noOfPackedWordsLqh;
UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex];
- TDataPtr[0] = Tdata1 | (ZCOMMIT << 28);
- TDataPtr[1] = Tdata2;
- TDataPtr[2] = Tdata3;
- TDataPtr[3] = Tdata4;
- TDataPtr[4] = Tdata5;
- Thostptr.p->noOfPackedWordsLqh = Tindex + 5;
-
- if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version)))
- {
- jam();
- ndbassert(Tdata5 == 0 || getNodeInfo(Thostptr.i).m_connected == false);
- Thostptr.p->noOfPackedWordsLqh = Tindex + 4; // no gci_lo
- }
-}//Dbtc::sendCommitLqh()
+ memcpy(TDataPtr, &Tdata[0], len << 2);
+ Thostptr.p->noOfPackedWordsLqh = Tindex + len;
+}
void
Dbtc::DIVER_node_fail_handling(Signal* signal, Uint64 Tgci)
@@ -5104,7 +5140,6 @@ void Dbtc::copyApi(ApiConnectRecordPtr c
UintR Tlqhkeyconfrec = regApiPtr.p->lqhkeyconfrec;
UintR TgcpPointer = regApiPtr.p->gcpPointer;
UintR TgcpFilesize = cgcpFilesize;
- UintR TcommitAckMarker = regApiPtr.p->commitAckMarker;
NdbNodeBitmask Tnodes = regApiPtr.p->m_transaction_nodes;
GcpRecord *localGcpRecord = gcpRecord;
@@ -5115,7 +5150,7 @@ void Dbtc::copyApi(ApiConnectRecordPtr c
copyPtr.p->transid[0] = Ttransid1;
copyPtr.p->transid[1] = Ttransid2;
copyPtr.p->lqhkeyconfrec = Tlqhkeyconfrec;
- copyPtr.p->commitAckMarker = TcommitAckMarker;
+ copyPtr.p->commitAckMarker = RNIL;
copyPtr.p->m_transaction_nodes = Tnodes;
copyPtr.p->singleUserMode = 0;
@@ -5222,16 +5257,16 @@ void Dbtc::sendCompleteLqh(Signal* signa
Thostptr.i = regTcPtr->lastLqhNodeId; //last???
ptrCheckGuard(Thostptr, ThostFilesize, hostRecord);
- UintR Tdata1 = regTcPtr->lastLqhCon;
- UintR Tdata2 = regApiPtr->transid[0];
- UintR Tdata3 = regApiPtr->transid[1];
+ Uint32 Tdata[3];
+ Tdata[0] = regTcPtr->lastLqhCon;
+ Tdata[1] = regApiPtr->transid[0];
+ Tdata[2] = regApiPtr->transid[1];
+ Uint32 len = 3;
- bool send_unpacked = isNdbMtLqh();
+ // currently packed signal cannot address specific instance
+ const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0;
if (send_unpacked) {
- Uint32* data = signal->getDataPtrSend();
- data[0] = Tdata1;
- data[1] = Tdata2;
- data[2] = Tdata3;
+ memcpy(&signal->theData[0], &Tdata[0], len << 2);
Uint32 Tnode = Thostptr.i;
Uint32 instanceKey = regTcPtr->lqhInstanceKey;
BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
@@ -5245,14 +5280,14 @@ void Dbtc::sendCompleteLqh(Signal* signa
} else {
jam();
updatePackedList(signal, Thostptr.p, Thostptr.i);
- }//if
+ }
+
+ Tdata[0] |= (ZCOMPLETE << 28);
UintR Tindex = Thostptr.p->noOfPackedWordsLqh;
UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex];
- TDataPtr[0] = Tdata1 | (ZCOMPLETE << 28);
- TDataPtr[1] = Tdata2;
- TDataPtr[2] = Tdata3;
- Thostptr.p->noOfPackedWordsLqh = Tindex + 3;
-}//Dbtc::sendCompleteLqh()
+ memcpy(TDataPtr, &Tdata[0], len << 2);
+ Thostptr.p->noOfPackedWordsLqh = Tindex + len;
+}
void
Dbtc::execTC_COMMIT_ACK(Signal* signal){
@@ -5301,22 +5336,25 @@ Dbtc::sendRemoveMarker(Signal* signal,
hostPtr.i = nodeId;
ptrCheckGuard(hostPtr, ThostFilesize, hostRecord);
- UintR Tdata1 = 0;
- UintR Tdata2 = transid1;
- UintR Tdata3 = transid2;
+ Uint32 Tdata[3];
+ Tdata[0] = 0;
+ Tdata[1] = transid1;
+ Tdata[2] = transid2;
+ Uint32 len = 3;
- bool send_unpacked = isNdbMtLqh();
+ // currently packed signals can not address specific instance
+ bool send_unpacked = getNodeInfo(hostPtr.i).m_lqh_workers != 0;
if (send_unpacked) {
- Uint32* data = signal->getDataPtrSend();
- data[0] = Tdata2;
- data[1] = Tdata3;
+ jam();
+ // first word omitted
+ memcpy(&signal->theData[0], &Tdata[1], (len - 1) << 2);
Uint32 Tnode = hostPtr.i;
Uint32 i;
for (i = 0; i < MAX_NDBMT_LQH_WORKERS; i++) {
// wl4391_todo skip workers not part of tx
Uint32 instanceKey = 1 + i;
BlockReference ref = numberToRef(DBLQH, instanceKey, Tnode);
- sendSignal(ref, GSN_REMOVE_MARKER_ORD, signal, 2, JBB);
+ sendSignal(ref, GSN_REMOVE_MARKER_ORD, signal, len - 1, JBB);
}
return;
}
@@ -5327,14 +5365,13 @@ Dbtc::sendRemoveMarker(Signal* signal,
} else {
jam();
updatePackedList(signal, hostPtr.p, hostPtr.i);
- }//if
+ }
UintR numWord = hostPtr.p->noOfPackedWordsLqh;
UintR* dataPtr = &hostPtr.p->packedWordsLqh[numWord];
- dataPtr[0] = Tdata1 | (ZREMOVE_MARKER << 28);
- dataPtr[1] = Tdata2;
- dataPtr[2] = Tdata3;
+ Tdata[0] |= (ZREMOVE_MARKER << 28);
+ memcpy(dataPtr, &Tdata[0], len << 2);
hostPtr.p->noOfPackedWordsLqh = numWord + 3;
}
@@ -5476,6 +5513,7 @@ void Dbtc::releaseApiConCopy(Signal* sig
regApiPtr->nextApiConnect = TfirstfreeApiConnectCopyOld;
setApiConTimer(apiConnectptr.i, 0, __LINE__);
regApiPtr->apiConnectstate = CS_RESTART;
+ ndbrequire(regApiPtr->commitAckMarker == RNIL);
}//Dbtc::releaseApiConCopy()
/* ========================================================================= */
@@ -7864,6 +7902,10 @@ void Dbtc::execTAKE_OVERTCCONF(Signal* s
if (signal->getSendersBlockRef() != reference())
{
jam();
+
+ tcNodeFailptr.i = 0;
+ ptrAss(tcNodeFailptr, tcFailRecord);
+
/**
* Node should be in queue
*/
@@ -8303,7 +8345,7 @@ void Dbtc::completeTransAtTakeOverDoOne(
}//Dbtc::completeTransAtTakeOverDoOne()
void
-Dbtc::sendTCKEY_FAILREF(Signal* signal, const ApiConnectRecord * regApiPtr){
+Dbtc::sendTCKEY_FAILREF(Signal* signal, ApiConnectRecord * regApiPtr){
jam();
const Uint32 ref = regApiPtr->ndbapiBlockref;
@@ -8326,6 +8368,14 @@ Dbtc::sendTCKEY_FAILREF(Signal* signal,
routeTCKEY_FAILREFCONF(signal, regApiPtr, GSN_TCKEY_FAILREF, 3);
}
}
+
+ const Uint32 marker = regApiPtr->commitAckMarker;
+ if(marker != RNIL)
+ {
+ jam();
+ m_commitAckMarkerHash.release(marker);
+ regApiPtr->commitAckMarker = RNIL;
+ }
}
void
@@ -8554,12 +8604,6 @@ void Dbtc::toAbortHandlingLab(Signal* si
if (apiConnectptr.p->takeOverRec != (Uint8)Z8NIL) {
jam();
sendTCKEY_FAILREF(signal, apiConnectptr.p);
- const Uint32 marker = apiConnectptr.p->commitAckMarker;
- if(marker != RNIL){
- jam();
- m_commitAckMarkerHash.release(marker);
- apiConnectptr.p->commitAckMarker = RNIL;
- }
/*------------------------------------------------------------*/
/* WE HAVE COMPLETED THIS TRANSACTION NOW AND CAN */
@@ -11607,6 +11651,7 @@ void Dbtc::releaseApiConnectFail(Signal*
setApiConTimer(apiConnectptr.i, 0, __LINE__);
apiConnectptr.p->nextApiConnect = cfirstfreeApiConnectFail;
cfirstfreeApiConnectFail = apiConnectptr.i;
+ ndbrequire(apiConnectptr.p->commitAckMarker == RNIL);
}//Dbtc::releaseApiConnectFail()
void Dbtc::releaseKeys()
@@ -12274,7 +12319,7 @@ Dbtc::validate_filter(Signal* signal)
default:
infoEvent("Invalid filter op: 0x%x pos: %ld",
* start,
- start - (signal->theData + 1));
+ (long int)(start - (signal->theData + 1)));
return false;
}
}
=== modified file 'storage/ndb/src/kernel/blocks/lgman.cpp'
--- a/storage/ndb/src/kernel/blocks/lgman.cpp 2008-11-20 13:32:13 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.cpp 2008-12-02 13:10:49 +0000
@@ -26,7 +26,6 @@
#include <signaldata/SumaImpl.hpp>
#include <signaldata/LgmanContinueB.hpp>
#include <signaldata/GetTabInfo.hpp>
-#include "ndbfs/Ndbfs.hpp"
#include "dbtup/Dbtup.hpp"
#include <EventLogger.hpp>
@@ -56,11 +55,7 @@ Lgman::Lgman(Block_context & ctx) :
m_tup(0),
m_logfile_group_list(m_logfile_group_pool),
m_logfile_group_hash(m_logfile_group_pool),
-#ifdef __sun // temp
- m_client_mutex(1, false)
-#else
- m_client_mutex(2, true)
-#endif
+ m_client_mutex("lgman-client", 2, true)
{
BLOCK_CONSTRUCTOR(Lgman);
=== modified file 'storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp 2008-10-05 07:12:56 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp 2008-12-02 13:10:49 +0000
@@ -98,8 +98,8 @@ static BlockInfo ALL_BLOCKS[] = {
static const Uint32 ALL_BLOCKS_SZ = sizeof(ALL_BLOCKS)/sizeof(BlockInfo);
static BlockReference readConfigOrder[ALL_BLOCKS_SZ] = {
- NDBFS_REF, // let it run first to make sure it can start the threads
CMVMI_REF,
+ NDBFS_REF,
DBINFO_REF,
DBTUP_REF,
DBACC_REF,
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp 2008-11-06 10:17:49 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp 2008-12-02 13:10:49 +0000
@@ -27,203 +27,44 @@
#include <signaldata/FsReadWriteReq.hpp>
#include <Configuration.hpp>
-const char *actionName[] = {
- "open",
- "close",
- "closeRemove",
- "read",
- "readv",
- "write",
- "writev",
- "writeSync",
- "writevSync",
- "sync",
- "end" };
-
-static int numAsyncFiles = 0;
-
-extern "C" void * runAsyncFile(void* arg)
-{
- ((AsyncFile*)arg)->run();
- return (NULL);
-}
-
-
AsyncFile::AsyncFile(SimulatedBlock& fs) :
theFileName(),
- theReportTo(0),
- theMemoryChannelPtr(NULL),
m_fs(fs)
{
- m_page_ptr.setNull();
- m_current_request= m_last_request= 0;
- m_auto_sync_freq = 0;
-}
-
-struct NdbThread*
-AsyncFile::doStart()
-{
- // Stacksize for filesystem threads
- // An 8k stack should be enough
- const NDB_THREAD_STACKSIZE stackSize = 8192;
-
- char buf[16];
- numAsyncFiles++;
- BaseString::snprintf(buf, sizeof(buf), "AsyncFile%d", numAsyncFiles);
-
- theStartMutexPtr = NdbMutex_Create();
- theStartConditionPtr = NdbCondition_Create();
- NdbMutex_Lock(theStartMutexPtr);
- theStartFlag = false;
-
- theThreadPtr = NdbThread_Create(runAsyncFile,
- (void**)this,
- stackSize,
- (char*)&buf,
- NDB_THREAD_PRIO_MEAN);
-
- if (theThreadPtr == 0)
- {
- ERROR_SET(fatal, NDBD_EXIT_MEMALLOC,
- "","Could not allocate file system thread");
- }
- NdbCondition_Wait(theStartConditionPtr,
- theStartMutexPtr);
- NdbMutex_Unlock(theStartMutexPtr);
- NdbMutex_Destroy(theStartMutexPtr);
- NdbCondition_Destroy(theStartConditionPtr);
-
- return theThreadPtr;
-}
+ m_thread = 0;
-void AsyncFile::shutdown()
-{
- void *status;
- Request request;
- request.action = Request::end;
- this->theMemoryChannelPtr->writeChannel( &request );
- NdbThread_WaitFor(theThreadPtr, &status);
- NdbThread_Destroy(&theThreadPtr);
- delete theMemoryChannelPtr;
+ m_page_cnt = 0;
+ m_page_ptr.setNull();
+ theWriteBuffer = 0;
+ theWriteBufferSize = 0;
}
void
-AsyncFile::reportTo( MemoryChannel<Request> *reportTo )
-{
- theReportTo = reportTo;
-}
-
-void AsyncFile::execute(Request* request)
-{
- theMemoryChannelPtr->writeChannel( request );
-}
-
-int AsyncFile::init()
+AsyncFile::attach(AsyncIoThread* thr)
{
- // Create write buffer for bigger writes
- theWriteBufferSize = WRITEBUFFERSIZE;
- theWriteBuffer = (char *) ndbd_malloc(theWriteBufferSize);
-
- return 0;
+#if 0
+ ndbout_c("%p:%s attach to %p (m_thread: %p)", this, theFileName.c_str(), thr,
+ m_thread);
+#endif
+ assert(m_thread == 0);
+ m_thread = thr;
}
void
-AsyncFile::run()
+AsyncFile::detach(AsyncIoThread* thr)
{
- Request *request;
-
- // Create theMemoryChannel in the thread that will wait for it
- NdbMutex_Lock(theStartMutexPtr);
- theMemoryChannelPtr = new MemoryChannel<Request>();
- theStartFlag = true;
-
- int r= this->init();
-
- NdbMutex_Unlock(theStartMutexPtr);
- NdbCondition_Signal(theStartConditionPtr);
-
- if(r!=0)
- {
- DEBUG(ndbout_c("AsyncFile::init() failed"));
- return;
- }
-
- while (1) {
- request = theMemoryChannelPtr->readChannel();
- if (!request) {
- DEBUG(ndbout_c("Nothing read from Memory Channel in AsyncFile"));
- endReq();
- return;
- }//if
- m_current_request= request;
- switch (request->action) {
- case Request:: open:
- openReq(request);
- break;
- case Request:: close:
- closeReq(request);
- break;
- case Request:: closeRemove:
- closeReq(request);
- removeReq(request);
- break;
- case Request:: readPartial:
- case Request:: read:
- readReq(request);
- break;
- case Request:: readv:
- readvReq(request);
- break;
- case Request:: write:
- writeReq(request);
- break;
- case Request:: writev:
- writevReq(request);
- break;
- case Request:: writeSync:
- writeReq(request);
- syncReq(request);
- break;
- case Request:: writevSync:
- writevReq(request);
- syncReq(request);
- break;
- case Request:: sync:
- syncReq(request);
- break;
- case Request:: append:
- appendReq(request);
- break;
- case Request:: append_synch:
- appendReq(request);
- syncReq(request);
- break;
- case Request::rmrf:
- rmrfReq(request, (char*)theFileName.c_str(), request->par.rmrf.own_directory);
- break;
- case Request:: end:
- if (isOpen())
- closeReq(request);
- endReq();
- return;
- default:
- DEBUG(ndbout_c("Invalid Request"));
- abort();
- break;
- }//switch
- m_last_request= request;
- m_current_request= 0;
-
- // No need to signal as ndbfs only uses tryRead
- theReportTo->writeChannelNoSignal(request);
- }//while
-}//AsyncFile::run()
-
+#if 0
+ ndbout_c("%p:%s detach from %p", this, theFileName.c_str(), thr);
+#endif
+ assert(m_thread == thr);
+ m_thread = 0;
+}
void
AsyncFile::readReq( Request * request)
{
- for(int i = 0; i < request->par.readWrite.numberOfPages ; i++) {
+ for(int i = 0; i < request->par.readWrite.numberOfPages ; i++)
+ {
off_t offset = request->par.readWrite.pages[i].offset;
size_t size = request->par.readWrite.pages[i].size;
char * buf = request->par.readWrite.pages[i].buf;
@@ -244,76 +85,90 @@ AsyncFile::readvReq( Request * request)
}
void
-AsyncFile::writeReq( Request * request)
+AsyncFile::writeReq(Request * request)
{
- int page_num = 0;
- bool write_not_complete = true;
+ Uint32 cnt = request->par.readWrite.numberOfPages;
+ if (theWriteBuffer == 0 || cnt == 1)
+ {
+ for (Uint32 i = 0; i<cnt; i++)
+ {
+ int err = writeBuffer(request->par.readWrite.pages[i].buf,
+ request->par.readWrite.pages[i].size,
+ request->par.readWrite.pages[i].offset);
+ if (err)
+ {
+ request->error = err;
+ return;
+ }
+ goto done;
+ }
+ }
- while(write_not_complete) {
- int totsize = 0;
- off_t offset = request->par.readWrite.pages[page_num].offset;
- char* bufptr = theWriteBuffer;
-
- write_not_complete = false;
- if (request->par.readWrite.numberOfPages > 1) {
- off_t page_offset = offset;
-
- // Multiple page write, copy to buffer for one write
- for(int i=page_num; i < request->par.readWrite.numberOfPages; i++) {
- memcpy(bufptr,
- request->par.readWrite.pages[i].buf,
- request->par.readWrite.pages[i].size);
- bufptr += request->par.readWrite.pages[i].size;
- totsize += request->par.readWrite.pages[i].size;
- if (((i + 1) < request->par.readWrite.numberOfPages)) {
- // There are more pages to write
- // Check that offsets are consequtive
- off_t tmp = page_offset + request->par.readWrite.pages[i].size;
- if (tmp != request->par.readWrite.pages[i+1].offset) {
- // Next page is not aligned with previous, not allowed
- DEBUG(ndbout_c("Page offsets are not aligned"));
- request->error = EINVAL;
- return;
- }
- if ((unsigned)(totsize + request->par.readWrite.pages[i+1].size) > (unsigned)theWriteBufferSize) {
- // We are not finished and the buffer is full
- write_not_complete = true;
- // Start again with next page
- page_num = i + 1;
- break;
+ {
+ int page_num = 0;
+ bool write_not_complete = true;
+
+ while(write_not_complete) {
+ int totsize = 0;
+ off_t offset = request->par.readWrite.pages[page_num].offset;
+ char* bufptr = theWriteBuffer;
+
+ write_not_complete = false;
+ if (request->par.readWrite.numberOfPages > 1) {
+ off_t page_offset = offset;
+
+ // Multiple page write, copy to buffer for one write
+ for(int i=page_num; i < request->par.readWrite.numberOfPages; i++) {
+ memcpy(bufptr,
+ request->par.readWrite.pages[i].buf,
+ request->par.readWrite.pages[i].size);
+ bufptr += request->par.readWrite.pages[i].size;
+ totsize += request->par.readWrite.pages[i].size;
+ if (((i + 1) < request->par.readWrite.numberOfPages)) {
+ // There are more pages to write
+ // Check that offsets are consequtive
+ off_t tmp = page_offset + request->par.readWrite.pages[i].size;
+ if (tmp != request->par.readWrite.pages[i+1].offset) {
+ // Next page is not aligned with previous, not allowed
+ DEBUG(ndbout_c("Page offsets are not aligned"));
+ request->error = EINVAL;
+ return;
+ }
+ if ((unsigned)(totsize + request->par.readWrite.pages[i+1].size) > (unsigned)theWriteBufferSize) {
+ // We are not finished and the buffer is full
+ write_not_complete = true;
+ // Start again with next page
+ page_num = i + 1;
+ break;
+ }
}
+ page_offset += request->par.readWrite.pages[i].size;
}
- page_offset += request->par.readWrite.pages[i].size;
+ bufptr = theWriteBuffer;
+ } else {
+ // One page write, write page directly
+ bufptr = request->par.readWrite.pages[0].buf;
+ totsize = request->par.readWrite.pages[0].size;
}
- bufptr = theWriteBuffer;
- } else {
- // One page write, write page directly
- bufptr = request->par.readWrite.pages[0].buf;
- totsize = request->par.readWrite.pages[0].size;
- }
- int err = writeBuffer(bufptr, totsize, offset);
- if(err != 0){
- request->error = err;
- return;
- }
- } // while(write_not_complete)
-
- if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq){
+ int err = writeBuffer(bufptr, totsize, offset);
+ if(err != 0){
+ request->error = err;
+ return;
+ }
+ } // while(write_not_complete)
+ }
+done:
+ if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq)
+ {
syncReq(request);
}
}
void
-AsyncFile::writevReq( Request * request)
+AsyncFile::writevReq(Request * request)
{
// WriteFileGather on WIN32?
writeReq(request);
-}
-
-void AsyncFile::endReq()
-{
- if (theWriteBuffer)
- ndbd_free(theWriteBuffer, theWriteBufferSize);
}
#ifdef DEBUG_ASYNCFILE
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp 2008-11-18 10:28:03 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp 2008-12-02 13:10:49 +0000
@@ -16,166 +16,33 @@
#ifndef AsyncFile_H
#define AsyncFile_H
-/**
- AsyncFile
-
- All file operations executed in thread-per-file, away from the DB threads.
- */
-
-
-// void execute(Request *request);
-// Description:
-// performens the requered action.
-// Parameters:
-// request: request to be called when open is finished.
-// action= open|close|read|write|sync
-// if action is open then:
-// par.open.flags= UNIX open flags, see man open
-// par.open.name= name of the file to open
-// if action is read or write then:
-// par.readWrite.buf= user provided buffer to read/write
-// the data from/to
-// par.readWrite.size= how many bytes must be read/written
-// par.readWrite.offset= absolute offset in file in bytes
-// return:
-// return values are stored in the request error field:
-// error= return state of the action, UNIX error see man open/errno
-// userData= is untouched can be used be user.
-
-
-
-// void reportTo( MemoryChannel<Request> *reportTo );
-// Description:
-// set the channel where the file must report the result of the
-// actions back to.
-// Parameters:
-// reportTo: the memory channel to use use MemoryChannelMultipleWriter
-// if more
-// than one file uses this channel to report back.
-
#include <kernel_types.h>
-#include "MemoryChannel.hpp"
+#include "AsyncIoThread.hpp"
#include "Filename.hpp"
-// Use this define if you want printouts from AsyncFile class
-//#define DEBUG_ASYNCFILE
-
-#ifdef DEBUG_ASYNCFILE
-#include <NdbOut.hpp>
-#define DEBUG(x) x
-#define PRINT_ERRORANDFLAGS(f) printErrorAndFlags(f)
-void printErrorAndFlags(Uint32 used_flags);
-#else
-#define DEBUG(x)
-#define PRINT_ERRORANDFLAGS(f)
-#endif
-
-// Define the size of the write buffer (for each thread)
-#define WRITEBUFFERSIZE 262144
-
-const int ERR_ReadUnderflow = 1000;
-
-const int WRITECHUNK = 262144;
-
-class AsyncFile;
-
-class Request
-{
-public:
- Request() {}
-
- enum Action {
- open,
- close,
- closeRemove,
- read, // Allways leave readv directly after
- // read because SimblockAsyncFileSystem depends on it
- readv,
- write,// Allways leave writev directly after
- // write because SimblockAsyncFileSystem depends on it
- writev,
- writeSync,// Allways leave writevSync directly after
- // writeSync because SimblockAsyncFileSystem depends on it
- writevSync,
- sync,
- end,
- append,
- append_synch,
- rmrf,
- readPartial
- };
- Action action;
- union {
- struct {
- Uint32 flags;
- Uint32 page_size;
- Uint64 file_size;
- Uint32 auto_sync_size;
- } open;
- struct {
- int numberOfPages;
- struct{
- char *buf;
- size_t size;
- off_t offset;
- } pages[16];
- } readWrite;
- struct {
- const char * buf;
- size_t size;
- } append;
- struct {
- bool directory;
- bool own_directory;
- } rmrf;
- } par;
- int error;
-
- void set(BlockReference userReference,
- Uint32 userPointer,
- Uint16 filePointer);
- BlockReference theUserReference;
- Uint32 theUserPointer;
- Uint16 theFilePointer;
- // Information for open, needed if the first open action fails.
- AsyncFile* file;
- Uint32 theTrace;
-};
-
-NdbOut& operator <<(NdbOut&, const Request&);
-
-inline
-void
-Request::set(BlockReference userReference,
- Uint32 userPointer, Uint16 filePointer)
-{
- theUserReference= userReference;
- theUserPointer= userPointer;
- theFilePointer= filePointer;
-}
-
class AsyncFile
{
friend class Ndbfs;
+ friend class AsyncIoThread;
+
public:
AsyncFile(SimulatedBlock& fs);
virtual ~AsyncFile() {};
- void reportTo( MemoryChannel<Request> *reportTo );
-
- void execute( Request* request );
-
- virtual struct NdbThread* doStart();
+ virtual int init() = 0;
- virtual void shutdown();
-
- // its a thread so its always running
- virtual void run();
+ void reportTo( MemoryChannel<Request> *reportTo );
virtual bool isOpen() = 0;
Filename theFileName;
Request *m_current_request, *m_last_request;
+
+ void set_buffer(Ptr<GlobalPage> ptr, Uint32 cnt);
+ bool has_buffer() const;
+ void clear_buffer(Ptr<GlobalPage> & ptr, Uint32 & cnt);
+
+ AsyncIoThread* getThread() const { return m_thread;}
private:
/**
@@ -184,14 +51,6 @@ private:
*/
/**
- * init()
- *
- * Initialise buffers etc. After init(), ready to execute()
- * Called with theStartMutexPtr held.
- */
- virtual int init();
-
- /**
* openReq() - open a file.
*/
virtual void openReq(Request *request) = 0;
@@ -204,15 +63,13 @@ private:
/**
* writeBuffer() - write into file
*/
- virtual int writeBuffer(const char * buf, size_t size, off_t offset,
- size_t chunk_size = WRITECHUNK)=0;
-
+ virtual int writeBuffer(const char * buf, size_t size, off_t offset)=0;
virtual void closeReq(Request *request)=0;
virtual void syncReq(Request *request)=0;
virtual void removeReq(Request *request)=0;
virtual void appendReq(Request *request)=0;
- virtual void rmrfReq(Request *request, char * path, bool removePath)=0;
+ virtual void rmrfReq(Request *request, const char * path, bool removePath)=0;
virtual void createDirectories()=0;
/**
@@ -229,38 +86,60 @@ protected:
virtual void writeReq(Request *request);
virtual void writevReq(Request *request);
- /**
- * endReq()
- *
- * Inverse to ::init(). Cleans up thread before it exits.
- */
- virtual void endReq();
-
private:
- /**
- * (end of what implementors need)
- */
+ void attach(AsyncIoThread* thr);
+ void detach(AsyncIoThread* thr);
- MemoryChannel<Request> *theReportTo;
- MemoryChannel<Request>* theMemoryChannelPtr;
-
- struct NdbThread* theThreadPtr;
- NdbMutex* theStartMutexPtr;
- NdbCondition* theStartConditionPtr;
- bool theStartFlag;
+ AsyncIoThread* m_thread; // For bound files
protected:
- int theWriteBufferSize;
- char* theWriteBuffer;
-
size_t m_write_wo_sync; // Writes wo/ sync
size_t m_auto_sync_freq; // Auto sync freq in bytes
+ Uint32 m_open_flags;
-public:
- SimulatedBlock& m_fs;
-
+ /**
+ * file buffers
+ */
Uint32 m_page_cnt;
Ptr<GlobalPage> m_page_ptr;
+
+ char* theWriteBuffer;
+ Uint32 theWriteBufferSize;
+
+public:
+ SimulatedBlock& m_fs;
};
+
+inline
+void
+AsyncFile::set_buffer(Ptr<GlobalPage> ptr, Uint32 cnt)
+{
+ assert(!has_buffer());
+ m_page_ptr = ptr;
+ m_page_cnt = cnt;
+ theWriteBuffer = (char*)ptr.p;
+ theWriteBufferSize = cnt * sizeof(GlobalPage);
+}
+
+inline
+bool
+AsyncFile::has_buffer() const
+{
+ return m_page_cnt > 0;
+}
+
+inline
+void
+AsyncFile::clear_buffer(Ptr<GlobalPage> & ptr, Uint32 & cnt)
+{
+ assert(has_buffer());
+ ptr = m_page_ptr;
+ cnt = m_page_cnt;
+ m_page_cnt = 0;
+ m_page_ptr.setNull();
+ theWriteBuffer = 0;
+ theWriteBufferSize = 0;
+}
+
#endif
=== added file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp 2008-12-02 13:10:49 +0000
@@ -0,0 +1,203 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <ndb_global.h>
+#include <my_sys.h>
+#include <my_pthread.h>
+
+#include "AsyncIoThread.hpp"
+#include "AsyncFile.hpp"
+#include <ErrorHandlingMacros.hpp>
+#include <kernel_types.h>
+#include <ndbd_malloc.hpp>
+#include <NdbThread.h>
+#include <signaldata/FsRef.hpp>
+#include <signaldata/FsOpenReq.hpp>
+#include <signaldata/FsReadWriteReq.hpp>
+#include <Configuration.hpp>
+#include "Ndbfs.hpp"
+
+AsyncIoThread::AsyncIoThread(class Ndbfs& fs, AsyncFile* file)
+ : m_fs(fs)
+{
+ m_current_file = file;
+ if (file)
+ {
+ theMemoryChannelPtr = &theMemoryChannel;
+ }
+ else
+ {
+ theMemoryChannelPtr = &m_fs.theToThreads;
+ }
+ theReportTo = &m_fs.theFromThreads;
+}
+
+static int numAsyncFiles = 0;
+
+extern "C"
+void *
+runAsyncIoThread(void* arg)
+{
+ ((AsyncIoThread*)arg)->run();
+ return (NULL);
+}
+
+
+struct NdbThread*
+AsyncIoThread::doStart()
+{
+ // Stacksize for filesystem threads
+ // An 8k stack should be enough
+ const NDB_THREAD_STACKSIZE stackSize = 8192;
+
+ char buf[16];
+ numAsyncFiles++;
+ BaseString::snprintf(buf, sizeof(buf), "AsyncIoThread%d", numAsyncFiles);
+
+ theStartMutexPtr = NdbMutex_Create();
+ theStartConditionPtr = NdbCondition_Create();
+ NdbMutex_Lock(theStartMutexPtr);
+ theStartFlag = false;
+
+ theThreadPtr = NdbThread_Create(runAsyncIoThread,
+ (void**)this,
+ stackSize,
+ buf,
+ NDB_THREAD_PRIO_MEAN);
+
+ if (theThreadPtr == 0)
+ {
+ ERROR_SET(fatal, NDBD_EXIT_MEMALLOC,
+ "","Could not allocate file system thread");
+ }
+
+ do
+ {
+ NdbCondition_Wait(theStartConditionPtr,
+ theStartMutexPtr);
+ }
+ while (theStartFlag == false);
+
+ NdbMutex_Unlock(theStartMutexPtr);
+ NdbMutex_Destroy(theStartMutexPtr);
+ NdbCondition_Destroy(theStartConditionPtr);
+
+ return theThreadPtr;
+}
+
+void
+AsyncIoThread::shutdown()
+{
+ void *status;
+ Request request;
+ request.action = Request::end;
+ this->theMemoryChannelPtr->writeChannel( &request );
+ NdbThread_WaitFor(theThreadPtr, &status);
+ NdbThread_Destroy(&theThreadPtr);
+}
+
+void
+AsyncIoThread::dispatch(Request *request)
+{
+ assert(m_current_file);
+ assert(m_current_file->getThread() == this);
+ assert(theMemoryChannelPtr == &theMemoryChannel);
+ theMemoryChannelPtr->writeChannel(request);
+}
+
+void
+AsyncIoThread::run()
+{
+ Request *request;
+
+ // Create theMemoryChannel in the thread that will wait for it
+ NdbMutex_Lock(theStartMutexPtr);
+ theStartFlag = true;
+ NdbMutex_Unlock(theStartMutexPtr);
+ NdbCondition_Signal(theStartConditionPtr);
+
+ while (1)
+ {
+ request = theMemoryChannelPtr->readChannel();
+ if (!request || request->action == Request::end)
+ {
+ DEBUG(ndbout_c("Nothing read from Memory Channel in AsyncFile"));
+ theStartFlag = false;
+ return;
+ }//if
+
+ AsyncFile * file = request->file;
+ m_current_request= request;
+ switch (request->action) {
+ case Request::open:
+ file->openReq(request);
+ break;
+ case Request::close:
+ file->closeReq(request);
+ break;
+ case Request::closeRemove:
+ file->closeReq(request);
+ file->removeReq(request);
+ break;
+ case Request::readPartial:
+ case Request::read:
+ file->readReq(request);
+ break;
+ case Request::readv:
+ file->readvReq(request);
+ break;
+ case Request::write:
+ file->writeReq(request);
+ break;
+ case Request::writev:
+ file->writevReq(request);
+ break;
+ case Request::writeSync:
+ file->writeReq(request);
+ file->syncReq(request);
+ break;
+ case Request::writevSync:
+ file->writevReq(request);
+ file->syncReq(request);
+ break;
+ case Request::sync:
+ file->syncReq(request);
+ break;
+ case Request::append:
+ file->appendReq(request);
+ break;
+ case Request::append_synch:
+ file->appendReq(request);
+ file->syncReq(request);
+ break;
+ case Request::rmrf:
+ file->rmrfReq(request, file->theFileName.c_str(),
+ request->par.rmrf.own_directory);
+ break;
+ case Request::end:
+ theStartFlag = false;
+ return;
+ default:
+ DEBUG(ndbout_c("Invalid Request"));
+ abort();
+ break;
+ }//switch
+ m_last_request = request;
+ m_current_request = 0;
+
+ // No need to signal as ndbfs only uses tryRead
+ theReportTo->writeChannelNoSignal(request);
+ }
+}
=== added file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp 2008-12-02 13:10:49 +0000
@@ -0,0 +1,151 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef AsyncIoThread_H
+#define AsyncIoThread_H
+
+#include <kernel_types.h>
+#include <Pool.hpp>
+#include "MemoryChannel.hpp"
+
+// Use this define if you want printouts from AsyncFile class
+//#define DEBUG_ASYNCFILE
+
+#ifdef DEBUG_ASYNCFILE
+#include <NdbOut.hpp>
+#define DEBUG(x) x
+#define PRINT_ERRORANDFLAGS(f) printErrorAndFlags(f)
+void printErrorAndFlags(Uint32 used_flags);
+#else
+#define DEBUG(x)
+#define PRINT_ERRORANDFLAGS(f)
+#endif
+
+const int ERR_ReadUnderflow = 1000;
+
+class AsyncFile;
+
+class Request
+{
+public:
+ Request() {}
+
+ enum Action {
+ open,
+ close,
+ closeRemove,
+ read, // Allways leave readv directly after
+ // read because SimblockAsyncFileSystem depends on it
+ readv,
+ write,// Allways leave writev directly after
+ // write because SimblockAsyncFileSystem depends on it
+ writev,
+ writeSync,// Allways leave writevSync directly after
+ // writeSync because SimblockAsyncFileSystem depends on it
+ writevSync,
+ sync,
+ end,
+ append,
+ append_synch,
+ rmrf,
+ readPartial
+ };
+ Action action;
+ union {
+ struct {
+ Uint32 flags;
+ Uint32 page_size;
+ Uint64 file_size;
+ Uint32 auto_sync_size;
+ } open;
+ struct {
+ int numberOfPages;
+ struct{
+ char *buf;
+ size_t size;
+ off_t offset;
+ } pages[16];
+ } readWrite;
+ struct {
+ const char * buf;
+ size_t size;
+ } append;
+ struct {
+ bool directory;
+ bool own_directory;
+ } rmrf;
+ } par;
+ int error;
+
+ void set(BlockReference userReference,
+ Uint32 userPointer,
+ Uint16 filePointer);
+ BlockReference theUserReference;
+ Uint32 theUserPointer;
+ Uint16 theFilePointer;
+ // Information for open, needed if the first open action fails.
+ AsyncFile* file;
+ Uint32 theTrace;
+};
+
+NdbOut& operator <<(NdbOut&, const Request&);
+
+inline
+void
+Request::set(BlockReference userReference,
+ Uint32 userPointer, Uint16 filePointer)
+{
+ theUserReference= userReference;
+ theUserPointer= userPointer;
+ theFilePointer= filePointer;
+}
+
+class AsyncIoThread
+{
+ friend class Ndbfs;
+ friend class AsyncFile;
+public:
+ AsyncIoThread(class Ndbfs&, AsyncFile* file);
+ virtual ~AsyncIoThread() {};
+
+ struct NdbThread* doStart();
+ void shutdown();
+
+ // its a thread so its always running
+ void run();
+
+ /**
+ * Add a request to a thread,
+ * should only be used with bound threads
+ */
+ void dispatch(Request*);
+
+ AsyncFile * m_current_file;
+ Request *m_current_request, *m_last_request;
+
+private:
+ Ndbfs & m_fs;
+
+ MemoryChannel<Request> *theReportTo;
+ MemoryChannel<Request> *theMemoryChannelPtr;
+ MemoryChannel<Request> theMemoryChannel; // If file-bound
+
+ bool theStartFlag;
+ struct NdbThread* theThreadPtr;
+ NdbMutex* theStartMutexPtr;
+ NdbCondition* theStartConditionPtr;
+};
+
+#endif
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/CMakeLists.txt'
--- a/storage/ndb/src/kernel/blocks/ndbfs/CMakeLists.txt 2008-08-20 13:22:09 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/CMakeLists.txt 2008-12-02 13:19:42 +0000
@@ -18,5 +18,5 @@ INCLUDE(${CMAKE_SOURCE_DIR}/storage/ndb/
ADD_LIBRARY(ndbndbfs STATIC
AsyncFile.cpp Ndbfs.cpp VoidFs.cpp Filename.cpp CircularIndex.cpp
- Win32AsyncFile.cpp
+ Win32AsyncFile.cpp AsyncIoThread.cpp
)
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp 2008-08-22 11:02:38 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp 2008-12-02 13:10:49 +0000
@@ -131,8 +131,8 @@ template <class T> void MemoryChannel<T>
if(full(theWriteIndex, theReadIndex) || theChannel == NULL) abort();
theChannel[theWriteIndex]= t;
++theWriteIndex;
- NdbMutex_Unlock(theMutexPtr);
NdbCondition_Signal(theConditionPtr);
+ NdbMutex_Unlock(theMutexPtr);
}
template <class T> void MemoryChannel<T>::writeChannelNoSignal( T *t)
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp 2008-11-18 10:28:03 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp 2008-12-02 13:10:49 +0000
@@ -53,7 +53,6 @@ int pageSize( const NewVARIABLE* baseAdd
return (1 << log_psize);
}
-
Ndbfs::Ndbfs(Block_context& ctx) :
SimulatedBlock(NDBFS, ctx),
scanningInProgress(false),
@@ -80,16 +79,47 @@ Ndbfs::Ndbfs(Block_context& ctx) :
Ndbfs::~Ndbfs()
{
- // Delete all files
- // AsyncFile destuctor will take care of deleting
- // the thread it has created
+ /**
+ * Stop all unbound threads
+ */
+
+ /**
+ * Post enought Request::end to saturate all unbound threads
+ */
+ Request request;
+ request.action = Request::end;
+ for (unsigned i = 0; i < theThreads.size(); i++)
+ {
+ theToThreads.writeChannel(&request);
+ }
+
+ for (unsigned i = 0; i < theThreads.size(); i++)
+ {
+ AsyncIoThread * thr = theThreads[i];
+ thr->shutdown();
+ }
+
+ /**
+ * delete all threads
+ */
+ for (unsigned i = 0; i < theThreads.size(); i++)
+ {
+ AsyncIoThread * thr = theThreads[i];
+ delete thr;
+ theThreads[i] = 0;
+ }
+ theThreads.clear();
+
+ /**
+ * Delete all files
+ */
for (unsigned i = 0; i < theFiles.size(); i++){
AsyncFile* file = theFiles[i];
- file->shutdown();
delete file;
theFiles[i] = NULL;
}//for
theFiles.clear();
+
if (theRequestPool)
delete theRequestPool;
}
@@ -114,12 +144,34 @@ Ndbfs::execREAD_CONFIG_REQ(Signal* signa
m_maxFiles = 0;
ndb_mgm_get_int_parameter(p, CFG_DB_MAX_OPEN_FILES, &m_maxFiles);
Uint32 noIdleFiles = 27;
+
ndb_mgm_get_int_parameter(p, CFG_DB_INITIAL_OPEN_FILES, &noIdleFiles);
if (noIdleFiles > m_maxFiles && m_maxFiles != 0)
m_maxFiles = noIdleFiles;
+
// Create idle AsyncFiles
- for (Uint32 i = 0; i < noIdleFiles; i++){
- theIdleFiles.push_back(createAsyncFile());
+ for (Uint32 i = 0; i < noIdleFiles; i++)
+ {
+ theIdleBoundFiles.push_back(createAsyncFile(true /* bound */));
+ }
+
+ Uint32 threadpool = 8;
+ ndb_mgm_get_int_parameter(p, CFG_DB_THREAD_POOL, &threadpool);
+
+ // Create IoThreads
+ for (Uint32 i = 0; i < threadpool; i++)
+ {
+ AsyncIoThread * thr = createIoThread(0);
+ if (thr)
+ {
+ jam();
+ theThreads.push_back(thr);
+ }
+ else
+ {
+ jam();
+ break;
+ }
}
ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
@@ -176,7 +228,15 @@ int
Ndbfs::forward( AsyncFile * file, Request* request)
{
jam();
- file->execute(request);
+ AsyncIoThread* thr = file->getThread();
+ if (thr) // bound
+ {
+ thr->dispatch(request);
+ }
+ else
+ {
+ theToThreads.writeChannel(request);
+ }
return 1;
}
@@ -186,13 +246,27 @@ Ndbfs::execFSOPENREQ(Signal* signal)
jamEntry();
const FsOpenReq * const fsOpenReq = (FsOpenReq *)&signal->theData[0];
const BlockReference userRef = fsOpenReq->userReference;
- AsyncFile* file = getIdleFile();
+
+ bool bound = (fsOpenReq->fileFlags & FsOpenReq::OM_THREAD_POOL) == 0;
+ AsyncFile* file = getIdleFile(bound);
ndbrequire(file != NULL);
Filename::NameSpec spec(theFileSystemPath, theBackupFilePath);
Uint32 userPointer = fsOpenReq->userPointer;
+
+ if(signal->getNoOfSections() == 0){
+ jam();
+ file->theFileName.set(spec, userRef, fsOpenReq->fileNumber);
+ } else {
+ jam();
+ SectionHandle handle(this, signal);
+ SegmentedSectionPtr ptr;
+ handle.getSection(ptr, FsOpenReq::FILENAME);
+ file->theFileName.set(spec, ptr, g_sectionSegmentPool);
+ releaseSections(handle);
+ }
- if(fsOpenReq->fileFlags & FsOpenReq::OM_INIT)
+ if (fsOpenReq->fileFlags & FsOpenReq::OM_INIT)
{
jam();
Uint32 cnt = 16; // 512k
@@ -211,9 +285,29 @@ Ndbfs::execFSOPENREQ(Signal* signal)
return;
}
m_shared_page_pool.getPtr(page_ptr);
- file->m_page_ptr = page_ptr;
- file->m_page_cnt = cnt;
+ file->set_buffer(page_ptr, cnt);
}
+ else if (fsOpenReq->fileFlags & FsOpenReq::OM_WRITE_BUFFER)
+ {
+ jam();
+ Uint32 cnt = NDB_FILE_BUFFER_SIZE / GLOBAL_PAGE_SIZE; // 256k
+ Ptr<GlobalPage> page_ptr;
+ m_ctx.m_mm.alloc_pages(RT_FILE_BUFFER, &page_ptr.i, &cnt, 1);
+ if(cnt == 0)
+ {
+ file->m_page_ptr.setNull();
+ file->m_page_cnt = 0;
+
+ FsRef * const fsRef = (FsRef *)&signal->theData[0];
+ fsRef->userPointer = userPointer;
+ fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrOutOfMemory);
+ fsRef->osErrorCode = ~0; // Indicate local error
+ sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB);
+ return;
+ }
+ m_shared_page_pool.getPtr(page_ptr);
+ file->set_buffer(page_ptr, cnt);
+ }
else
{
ndbassert(file->m_page_ptr.isNull());
@@ -221,20 +315,8 @@ Ndbfs::execFSOPENREQ(Signal* signal)
file->m_page_cnt = 0;
}
- if(signal->getNoOfSections() == 0){
- jam();
- file->theFileName.set(spec, userRef, fsOpenReq->fileNumber);
- } else {
- jam();
- SectionHandle handle(this, signal);
- SegmentedSectionPtr ptr;
- handle.getSection(ptr, FsOpenReq::FILENAME);
- file->theFileName.set(spec, ptr, g_sectionSegmentPool);
- releaseSections(handle);
- }
- file->reportTo(&theFromThreads);
if (getenv("NDB_TRACE_OPEN"))
- ndbout_c("open(%s)", file->theFileName.c_str());
+ ndbout_c("open(%s) bound: %u", file->theFileName.c_str(), bound);
Request* request = theRequestPool->get();
request->action = Request::open;
@@ -258,12 +340,11 @@ Ndbfs::execFSREMOVEREQ(Signal* signal)
jamEntry();
const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr();
const BlockReference userRef = req->userReference;
- AsyncFile* file = getIdleFile();
+ AsyncFile* file = getIdleFile(true);
ndbrequire(file != NULL);
Filename::NameSpec spec(theFileSystemPath, theBackupFilePath);
file->theFileName.set(spec, userRef, req->fileNumber, req->directory);
- file->reportTo(&theFromThreads);
Request* request = theRequestPool->get();
request->action = Request::rmrf;
@@ -303,6 +384,9 @@ Ndbfs::execFSCLOSEREQ(Signal * signal)
return;
}
+ if (getenv("NDB_TRACE_OPEN"))
+ ndbout_c("close(%s)", openFile->theFileName.c_str());
+
Request *request = theRequestPool->get();
if( fsCloseReq->getRemoveFileFlag(fsCloseReq->fileFlag) == true ) {
jam();
@@ -669,10 +753,11 @@ Ndbfs::newId()
}
AsyncFile*
-Ndbfs::createAsyncFile(){
+Ndbfs::createAsyncFile(bool bound){
// Check limit of open files
- if (m_maxFiles !=0 && theFiles.size() == m_maxFiles) {
+ if (m_maxFiles !=0 && theFiles.size() == m_maxFiles)
+ {
// Print info about all open files
for (unsigned i = 0; i < theFiles.size(); i++){
AsyncFile* file = theFiles[i];
@@ -687,29 +772,76 @@ Ndbfs::createAsyncFile(){
AsyncFile* file = new PosixAsyncFile(* this);
#endif
- struct NdbThread* thr = file->doStart();
- globalEmulatorData.theConfiguration->addThread(thr, NdbfsThread);
+ if (file->init())
+ {
+ ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,""," Ndbfs::createAsyncFile");
+ }
- // Put the file in list of all files
- theFiles.push_back(file);
+ if (bound)
+ {
+ AsyncIoThread * thr = createIoThread(file);
+ theThreads.push_back(thr);
+ file->attach(thr);
#ifdef VM_TRACE
- infoEvent("NDBFS: Created new file thread %d", theFiles.size());
+ ndbout_c("NDBFS: Created new file thread %d", theFiles.size());
#endif
+ }
+
+ theFiles.push_back(file);
return file;
}
+void
+Ndbfs::pushIdleFile(AsyncFile* file)
+{
+ if (file->getThread())
+ {
+ theIdleBoundFiles.push_back(file);
+ }
+ else
+ {
+ theIdleUnboundFiles.push_back(file);
+ }
+}
+
+AsyncIoThread*
+Ndbfs::createIoThread(AsyncFile* file)
+{
+ AsyncIoThread* thr = new AsyncIoThread(*this, file);
+
+ struct NdbThread* thrptr = thr->doStart();
+ globalEmulatorData.theConfiguration->addThread(thrptr, NdbfsThread);
+
+ return thr;
+}
+
AsyncFile*
-Ndbfs::getIdleFile(){
- AsyncFile* file;
- if (theIdleFiles.size() > 0){
- file = theIdleFiles[0];
- theIdleFiles.erase(0);
- } else {
- file = createAsyncFile();
- }
- return file;
+Ndbfs::getIdleFile(bool bound)
+{
+ if (bound)
+ {
+ Uint32 sz = theIdleBoundFiles.size();
+ if (sz)
+ {
+ AsyncFile* file = theIdleBoundFiles[sz - 1];
+ theIdleBoundFiles.erase(sz - 1);
+ return file;
+ }
+ }
+ else
+ {
+ Uint32 sz = theIdleUnboundFiles.size();
+ if (sz)
+ {
+ AsyncFile* file = theIdleUnboundFiles[sz - 1];
+ theIdleUnboundFiles.erase(sz - 1);
+ return file;
+ }
+ }
+
+ return createAsyncFile(bound);
}
@@ -721,14 +853,28 @@ Ndbfs::report(Request * request, Signal*
signal->setTrace(request->theTrace);
const BlockReference ref = request->theUserReference;
- if(!request->file->m_page_ptr.isNull())
+ if(request->file->has_buffer())
{
- assert(request->file->m_page_cnt > 0);
- m_ctx.m_mm.release_pages(RT_DBTUP_PAGE,
- request->file->m_page_ptr.i,
- request->file->m_page_cnt);
- request->file->m_page_ptr.setNull();
- request->file->m_page_cnt = 0;
+ Uint32 cnt;
+ Ptr<GlobalPage> ptr;
+ if (request->file->m_open_flags & FsOpenReq::OM_INIT)
+ {
+ jam();
+ request->file->clear_buffer(ptr, cnt);
+ m_ctx.m_mm.release_pages(RT_DBTUP_PAGE, ptr.i, cnt);
+ }
+ else if (request->file->m_open_flags & FsOpenReq::OM_WRITE_BUFFER)
+ {
+ jam();
+ if ((request->action == Request::open && request->error) ||
+ (request->action == Request::close ||
+ request->action == Request::closeRemove))
+ {
+ jam();
+ request->file->clear_buffer(ptr, cnt);
+ m_ctx.m_mm.release_pages(RT_FILE_BUFFER, ptr.i, cnt);
+ }
+ }
}
if (request->error) {
@@ -750,7 +896,7 @@ Ndbfs::report(Request * request, Signal*
case Request:: open: {
jam();
// Put the file back in idle files list
- theIdleFiles.push_back(request->file);
+ pushIdleFile(request->file);
sendSignal(ref, GSN_FSOPENREF, signal, FsRef::SignalLength, JBB);
break;
}
@@ -790,7 +936,7 @@ Ndbfs::report(Request * request, Signal*
case Request::rmrf: {
jam();
// Put the file back in idle files list
- theIdleFiles.push_back(request->file);
+ pushIdleFile(request->file);
sendSignal(ref, GSN_FSREMOVEREF, signal, FsRef::SignalLength, JBB);
break;
}
@@ -823,7 +969,7 @@ Ndbfs::report(Request * request, Signal*
// removes the file from OpenFiles list
theOpenFiles.erase(request->theFilePointer);
// Put the file in idle files list
- theIdleFiles.push_back(request->file);
+ pushIdleFile(request->file);
sendSignal(ref, GSN_FSCLOSECONF, signal, 1, JBB);
break;
}
@@ -863,7 +1009,7 @@ Ndbfs::report(Request * request, Signal*
case Request::rmrf: {
jam();
// Put the file in idle files list
- theIdleFiles.push_back(request->file);
+ pushIdleFile(request->file);
sendSignal(ref, GSN_FSREMOVECONF, signal, 1, JBB);
break;
}
@@ -1055,9 +1201,10 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
infoEvent("NDBFS: Files: %d Open files: %d",
theFiles.size(),
theOpenFiles.size());
- infoEvent(" Idle files: %d Max opened files: %d",
- theIdleFiles.size(),
- m_maxOpenedFiles);
+ infoEvent(" Idle files: (bound: %u unbound: %u) Max opened files: %d",
+ theIdleBoundFiles.size(),
+ theIdleUnboundFiles.size(),
+ m_maxOpenedFiles);
infoEvent(" Max files: %d",
m_maxFiles);
infoEvent(" Requests: %d",
@@ -1084,10 +1231,16 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
return;
}
if(signal->theData[0] == DumpStateOrd::NdbfsDumpIdleFiles){
- infoEvent("NDBFS: Dump idle files: %d", theIdleFiles.size());
+ infoEvent("NDBFS: Dump idle files: %d %u",
+ theIdleBoundFiles.size(), theIdleUnboundFiles.size());
- for (unsigned i = 0; i < theIdleFiles.size(); i++){
- AsyncFile* file = theIdleFiles[i];
+ for (unsigned i = 0; i < theIdleBoundFiles.size(); i++){
+ AsyncFile* file = theIdleBoundFiles[i];
+ infoEvent("%2d (0x%lx): %s", i, (long)file, file->isOpen()?"OPEN":"CLOSED");
+ }
+
+ for (unsigned i = 0; i < theIdleUnboundFiles.size(); i++){
+ AsyncFile* file = theIdleUnboundFiles[i];
infoEvent("%2d (0x%lx): %s", i, (long)file, file->isOpen()?"OPEN":"CLOSED");
}
return;
@@ -1095,6 +1248,7 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
if(signal->theData[0] == 404)
{
+#if 0
ndbrequire(signal->getLength() == 2);
Uint32 file= signal->theData[1];
AsyncFile* openFile = theOpenFiles.find(file);
@@ -1115,6 +1269,7 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
AsyncFile* file = theFiles[i];
ndbout_c("%2d (0x%lx): %s", i, (long) file, file->isOpen()?"OPEN":"CLOSED");
}
+#endif
}
}//Ndbfs::execDUMP_STATE_ORD()
@@ -1132,6 +1287,7 @@ Ndbfs::get_filename(Uint32 fd) const
BLOCK_FUNCTIONS(Ndbfs)
template class Vector<AsyncFile*>;
+template class Vector<AsyncIoThread*>;
template class Vector<OpenFiles::OpenFileItem>;
template class MemoryChannel<Request>;
template class Pool<Request>;
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp 2008-12-02 13:10:49 +0000
@@ -22,21 +22,21 @@
#include "AsyncFile.hpp"
#include "OpenFiles.hpp"
-
+class AsyncIoThread;
// Because one NDB Signal request can result in multiple requests to
// AsyncFile one class must be made responsible to keep track
// of all out standing request and when all are finished the result
// must be reported to the sending block.
-
class Ndbfs : public SimulatedBlock
{
+ friend class AsyncIoThread;
public:
Ndbfs(Block_context&);
virtual ~Ndbfs();
-
virtual const char* get_filename(Uint32 fd) const;
+
protected:
BLOCK_DEFINES(Ndbfs);
@@ -69,17 +69,22 @@ private:
Uint16 theLastId;
BlockReference cownref;
- // Communication from files
+ // Communication from/to files
MemoryChannel<Request> theFromThreads;
+ MemoryChannel<Request> theToThreads;
Pool<Request>* theRequestPool;
- AsyncFile* createAsyncFile();
- AsyncFile* getIdleFile();
-
- Vector<AsyncFile*> theFiles; // List all created AsyncFiles
- Vector<AsyncFile*> theIdleFiles; // List of idle AsyncFiles
- OpenFiles theOpenFiles; // List of open AsyncFiles
+ AsyncIoThread* createIoThread(AsyncFile* file);
+ AsyncFile* createAsyncFile(bool bound);
+ AsyncFile* getIdleFile(bool bound);
+ void pushIdleFile(AsyncFile*);
+
+ Vector<AsyncIoThread*> theThreads;// List of all created threads
+ Vector<AsyncFile*> theFiles; // List all created AsyncFiles
+ Vector<AsyncFile*> theIdleBoundFiles; // List of idle AsyncFiles
+ Vector<AsyncFile*> theIdleUnboundFiles; // List of idle AsyncFiles
+ OpenFiles theOpenFiles; // List of open AsyncFiles
BaseString theFileSystemPath;
BaseString theBackupFilePath;
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp 2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp 2008-12-02 20:01:33 +0000
@@ -21,6 +21,7 @@
#include <xfs/xfs.h>
#endif
+#include "Ndbfs.hpp"
#include "AsyncFile.hpp"
#include "PosixAsyncFile.hpp"
@@ -34,17 +35,8 @@
#include <NdbTick.h>
-// use this to test broken pread code
-//#define HAVE_BROKEN_PREAD
-
-#ifdef HAVE_BROKEN_PREAD
-#undef HAVE_PWRITE
-#undef HAVE_PREAD
-#endif
-
// For readv and writev
#include <sys/uio.h>
-
#include <dirent.h>
PosixAsyncFile::PosixAsyncFile(SimulatedBlock& fs) :
@@ -53,18 +45,12 @@ PosixAsyncFile::PosixAsyncFile(Simulated
use_gz(0)
{
memset(&azf,0,sizeof(azf));
+ init_mutex();
}
int PosixAsyncFile::init()
{
// Create write buffer for bigger writes
- theWriteBufferSize = WRITEBUFFERSIZE;
- theWriteBufferUnaligned = (char *) ndbd_malloc(theWriteBufferSize +
- NDB_O_DIRECT_WRITE_ALIGNMENT-1);
- theWriteBuffer = (char *)
- (((UintPtr)theWriteBufferUnaligned + NDB_O_DIRECT_WRITE_ALIGNMENT - 1) &
- ~(UintPtr)(NDB_O_DIRECT_WRITE_ALIGNMENT - 1));
-
azfBufferUnaligned= (Byte*)ndbd_malloc((AZ_BUFSIZE_READ+AZ_BUFSIZE_WRITE)
+NDB_O_DIRECT_WRITE_ALIGNMENT-1);
@@ -82,13 +68,8 @@ int PosixAsyncFile::init()
azf.stream.opaque= &az_mempool;
- if (!theWriteBuffer) {
- DEBUG(ndbout_c("AsyncFile::writeReq, Failed allocating write buffer"));
- return -1;
- }//if
-
return 0;
-}//AsyncFile::init()
+}
#ifdef O_DIRECT
static char g_odirect_readbuf[2*GLOBAL_PAGE_SIZE -1];
@@ -229,8 +210,14 @@ void PosixAsyncFile::openReq(Request *re
break;
return;
}
- if(flags & FsOpenReq::OM_GZ)
- use_gz= 1;
+ if (flags & FsOpenReq::OM_GZ)
+ {
+ use_gz = 1;
+ }
+ else
+ {
+ use_gz = 0;
+ }
// allow for user to choose any permissionsa with umask
const int mode = S_IRUSR | S_IWUSR |
@@ -363,7 +350,8 @@ no_odirect:
retry:
off_t save_size = size;
char* buf = (char*)m_page_ptr.p;
- while(size > 0){
+ while(size > 0)
+ {
#ifdef TRACE_INIT
write_cnt++;
#endif
@@ -504,8 +492,9 @@ int PosixAsyncFile::readBuffer(Request *
{
int return_value;
req->par.readWrite.pages[0].size = 0;
-#if ! defined(HAVE_PREAD)
off_t seek_val;
+#if ! defined(HAVE_PREAD)
+ FileGuard guard(this);
if(!use_gz)
{
while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1
@@ -516,7 +505,6 @@ int PosixAsyncFile::readBuffer(Request *
}
}
#endif
- off_t seek_val;
if(use_gz)
{
while((seek_val= azseek(&azf, offset, SEEK_SET)) == (off_t)-1
@@ -611,15 +599,16 @@ void PosixAsyncFile::readvReq(Request *r
#endif
}
-int PosixAsyncFile::writeBuffer(const char *buf, size_t size, off_t offset,
- size_t chunk_size)
+int PosixAsyncFile::writeBuffer(const char *buf, size_t size, off_t offset)
{
+ size_t chunk_size = 256*1024;
size_t bytes_to_write = chunk_size;
int return_value;
m_write_wo_sync += size;
#if ! defined(HAVE_PWRITE)
+ FileGuard guard(this);
off_t seek_val;
while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1
&& errno == EINTR);
@@ -772,56 +761,66 @@ void PosixAsyncFile::removeReq(Request *
}
}
-void PosixAsyncFile::rmrfReq(Request *request, char *path, bool removePath)
+void
+PosixAsyncFile::rmrfReq(Request *request, const char * src, bool removePath)
{
- Uint32 path_len = strlen(path);
- Uint32 path_max_copy = PATH_MAX - path_len;
- char* path_add = &path[path_len];
-
- if(!request->par.rmrf.directory){
+ if(!request->par.rmrf.directory)
+ {
// Remove file
- if(unlink((const char *)path) != 0 && errno != ENOENT)
+ if(unlink(src) != 0 && errno != ENOENT)
request->error = errno;
return;
}
- // Remove directory
- DIR* dirp = opendir((const char *)path);
- if(dirp == 0){
+
+ char path[PATH_MAX];
+ strcpy(path, src);
+ strcat(path, "/");
+
+ DIR* dirp;
+ struct dirent * dp;
+loop:
+ dirp = opendir(path);
+ if(dirp == 0)
+ {
if(errno != ENOENT)
request->error = errno;
return;
}
- struct dirent * dp;
- while ((dp = readdir(dirp)) != NULL){
- if ((strcmp(".", dp->d_name) != 0) && (strcmp("..", dp->d_name) != 0)) {
- BaseString::snprintf(path_add, (size_t)path_max_copy, "%s%s",
- DIR_SEPARATOR, dp->d_name);
- if(remove((const char*)path) == 0){
- path[path_len] = 0;
- continue;
- }
- rmrfReq(request, path, true);
- path[path_len] = 0;
- if(request->error != 0){
- closedir(dirp);
- return;
+ while ((dp = readdir(dirp)) != NULL)
+ {
+ if ((strcmp(".", dp->d_name) != 0) && (strcmp("..", dp->d_name) != 0))
+ {
+ int len = strlen(path);
+ strcat(path, dp->d_name);
+ if (remove(path) == 0)
+ {
+ path[len] = 0;
+ continue;
}
+
+ closedir(dirp);
+ strcat(path, "/");
+ goto loop;
}
}
closedir(dirp);
- if(removePath && rmdir((const char *)path) != 0){
+ path[strlen(path)-1] = 0; // remove /
+ if (strcmp(src, path) != 0)
+ {
+ char * t = strrchr(path, '/');
+ t[1] = 0;
+ goto loop;
+ }
+
+ if(removePath && rmdir(src) != 0)
+ {
request->error = errno;
}
- return;
}
-void PosixAsyncFile::endReq()
+PosixAsyncFile::~PosixAsyncFile()
{
- // Thread is ended with return
- if (theWriteBufferUnaligned)
- ndbd_free(theWriteBufferUnaligned, theWriteBufferSize);
-
if (azfBufferUnaligned)
ndbd_free(azfBufferUnaligned, (AZ_BUFSIZE_READ*AZ_BUFSIZE_WRITE)
+NDB_O_DIRECT_WRITE_ALIGNMENT-1);
@@ -830,10 +829,9 @@ void PosixAsyncFile::endReq()
ndbd_free(az_mempool.mem,az_mempool.size);
az_mempool.mem = NULL;
- theWriteBufferUnaligned = NULL;
azfBufferUnaligned = NULL;
+ destroy_mutex();
}
-
void PosixAsyncFile::createDirectories()
{
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp 2007-11-15 00:30:00 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.hpp 2008-12-02 13:10:49 +0000
@@ -24,15 +24,26 @@
#include <azlib.h>
+/**
+ * PREAD/PWRITE is needed to use file != thread
+ * therefor it's defined/checked here
+ */
+#ifdef HAVE_BROKEN_PREAD
+#undef HAVE_PWRITE
+#undef HAVE_PREAD
+#elif defined (HAVE_PREAD)
+#define HAVE_PWRITE
+#endif
+
class PosixAsyncFile : public AsyncFile
{
friend class Ndbfs;
public:
PosixAsyncFile(SimulatedBlock& fs);
+ virtual ~PosixAsyncFile();
- int init();
-
- bool isOpen();
+ virtual int init();
+ virtual bool isOpen();
virtual void openReq(Request *request);
virtual void readvReq(Request *request);
@@ -41,32 +52,59 @@ public:
virtual void syncReq(Request *request);
virtual void removeReq(Request *request);
virtual void appendReq(Request *request);
- virtual void rmrfReq(Request *request, char * path, bool removePath);
- void endReq();
+ virtual void rmrfReq(Request *request, const char * path, bool removePath);
virtual int readBuffer(Request*, char * buf, size_t size, off_t offset);
- virtual int writeBuffer(const char * buf, size_t size, off_t offset,
- size_t chunk_size = WRITECHUNK);
+ virtual int writeBuffer(const char * buf, size_t size, off_t offset);
virtual void createDirectories();
private:
int theFd;
- Uint32 m_open_flags; // OM_ flags from request to open file
-
int use_gz;
azio_stream azf;
struct az_alloc_rec az_mempool;
-
- void* theWriteBufferUnaligned;
void* azfBufferUnaligned;
- size_t m_write_wo_sync; // Writes wo/ sync
- size_t m_auto_sync_freq; // Auto sync freq in bytes
-
int check_odirect_read(Uint32 flags, int&new_flags, int mode);
int check_odirect_write(Uint32 flags, int&new_flags, int mode);
+
+#ifndef HAVE_PREAD
+ struct FileGuard;
+ friend struct FileGuard;
+ NdbMutex * m_mutex;
+ void init_mutex() { m_mutex = NdbMutex_Create();}
+ void destroy_mutex() { NdbMutex_Destroy(m_mutex);}
+
+ /**
+ * If dont HAVE_PREAD and using file != thread
+ */
+ struct FileGuard
+ {
+ PosixAsyncFile* m_file;
+ FileGuard (PosixAsyncFile* file) : m_file(file) {
+ if (m_file->getThread() == 0)
+ {
+ NdbMutex_Lock(m_file->m_mutex);
+ }
+ }
+ ~FileGuard() {
+ if (m_file->getThread() == 0)
+ {
+ NdbMutex_Unlock(m_file->m_mutex);
+ }
+ }
+ };
+#else
+ void init_mutex() {}
+ void destroy_mutex() {}
+ struct FileGuard
+ {
+ FileGuard (PosixAsyncFile* file){}
+ ~FileGuard () {}
+ };
+#endif
};
#endif
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp 2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp 2008-12-02 20:01:33 +0000
@@ -37,6 +37,12 @@ Win32AsyncFile::~Win32AsyncFile()
}
+int
+Win32AsyncFile::init()
+{
+ return 0;
+}
+
void Win32AsyncFile::openReq(Request* request)
{
m_auto_sync_freq = 0;
@@ -171,7 +177,7 @@ void Win32AsyncFile::openReq(Request* re
);
Uint32 size = request->par.open.page_size;
char* buf = (char*)m_page_ptr.p;
- DWORD dwWritten;
+ DWORD dwWritten;
while(size > 0){
BOOL bWrite= WriteFile(hFile, buf, size, &dwWritten, 0);
if(!bWrite || dwWritten!=size)
@@ -205,7 +211,8 @@ void Win32AsyncFile::openReq(Request* re
}
int
-Win32AsyncFile::readBuffer(Request* req, char * buf, size_t size, off_t offset){
+Win32AsyncFile::readBuffer(Request* req, char * buf, size_t size, off_t offset)
+{
req->par.readWrite.pages[0].size = 0;
while (size > 0) {
@@ -259,9 +266,9 @@ Win32AsyncFile::readBuffer(Request* req,
}
int
-Win32AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset,
- size_t chunk_size)
+Win32AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset)
{
+ size_t chunk_size = 256 * 1024;
size_t bytes_to_write = chunk_size;
m_write_wo_sync += size;
@@ -365,55 +372,65 @@ Win32AsyncFile::removeReq(Request * requ
}
void
-Win32AsyncFile::rmrfReq(Request * request, char * path, bool removePath){
- Uint32 path_len = strlen(path);
- Uint32 path_max_copy = PATH_MAX - path_len;
- char* path_add = &path[path_len];
-
- if(!request->par.rmrf.directory){
+Win32AsyncFile::rmrfReq(Request * request, const char * src, bool removePath){
+ if (!request->par.rmrf.directory)
+ {
// Remove file
- if(!DeleteFile(path)){
+ if (!DeleteFile(src))
+ {
DWORD dwError = GetLastError();
- if(dwError!=ERROR_FILE_NOT_FOUND)
+ if (dwError != ERROR_FILE_NOT_FOUND)
request->error = dwError;
}
return;
}
+ char path[PATH_MAX];
+ strcpy(path, src);
strcat(path, "\\*");
+
WIN32_FIND_DATA ffd;
- HANDLE hFindFile = FindFirstFile(path, &ffd);
- path[path_len] = 0;
- if(INVALID_HANDLE_VALUE==hFindFile){
+ HANDLE hFindFile;
+loop:
+ hFindFile = FindFirstFile(path, &ffd);
+ if (INVALID_HANDLE_VALUE == hFindFile)
+ {
DWORD dwError = GetLastError();
- if(dwError!=ERROR_PATH_NOT_FOUND)
+ if (dwError != ERROR_PATH_NOT_FOUND)
request->error = dwError;
return;
}
+ path[strlen(path) - 1] = 0; // remove '*'
do {
- if(0!=strcmp(".", ffd.cFileName) && 0!=strcmp("..", ffd.cFileName)){
- strcat(path, "\\");
+ if (0 != strcmp(".", ffd.cFileName) && 0 != strcmp("..", ffd.cFileName))
+ {
+ int len = strlen(path);
strcat(path, ffd.cFileName);
- if(DeleteFile(path)) {
- path[path_len] = 0;
+ if(DeleteFile(path))
+ {
+ path[len] = 0;
continue;
}//if
- rmrfReq(request, path, true);
- path[path_len] = 0;
- if(request->error != 0){
- FindClose(hFindFile);
- return;
- }
+ FindClose(hFindFile);
+ strcat(path, "\\*");
+ goto loop;
}
} while(FindNextFile(hFindFile, &ffd));
-
+
FindClose(hFindFile);
+ path[strlen(path)-1] = 0; // remove '\'
+ if (strcmp(src, path) != 0)
+ {
+ char * t = strrchr(path, '\\');
+ t[1] = '*';
+ t[2] = 0;
+ goto loop;
+ }
- if(removePath && !RemoveDirectory(path))
+ if(removePath && !RemoveDirectory(src))
request->error = GetLastError();
-
}
void Win32AsyncFile::createDirectories()
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp 2008-08-21 06:38:48 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.hpp 2008-12-02 17:02:41 +0000
@@ -30,32 +30,25 @@ class Win32AsyncFile : public AsyncFile
friend class Ndbfs;
public:
Win32AsyncFile(SimulatedBlock& fs);
- ~Win32AsyncFile();
+ virtual ~Win32AsyncFile();
- void reportTo( MemoryChannel<Request> *reportTo );
+ virtual int init();
+ virtual bool isOpen();
+ virtual void openReq(Request *request);
+ virtual void closeReq(Request *request);
+ virtual void syncReq(Request *request);
+ virtual void removeReq(Request *request);
+ virtual void appendReq(Request *request);
+ virtual void rmrfReq(Request *request, const char * path, bool removePath);
- void execute( Request* request );
-
- bool isOpen();
+ virtual int readBuffer(Request*, char * buf, size_t size, off_t offset);
+ virtual int writeBuffer(const char * buf, size_t size, off_t offset);
private:
-
- void openReq(Request *request);
- void closeReq(Request *request);
- void syncReq(Request *request);
- void removeReq(Request *request);
- void appendReq(Request *request);
- void rmrfReq(Request *request, char * path, bool removePath);
-
- int readBuffer(Request*, char * buf, size_t size, off_t offset);
- int writeBuffer(const char * buf, size_t size, off_t offset,
- size_t chunk_size = WRITECHUNK);
-
int extendfile(Request* request);
void createDirectories();
HANDLE hFile;
- Uint32 m_open_flags; // OM_ flags from request to open file
};
#endif
=== modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2008-11-13 13:36:29 +0000
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2008-12-01 18:04:19 +0000
@@ -991,6 +991,9 @@ void Qmgr::execCM_REGCONF(Signal* signal
c_clusterNodes.assign(NdbNodeBitmask::Size, cmRegConf->allNdbNodes);
myNodePtr.p->ndynamicId = TdynamicId;
+
+ // set own MT config here or in REF, and others in CM_NODEINFOREQ/CONF
+ setNodeInfo(getOwnNodeId()).m_lqh_workers = globalData.ndbMtLqhWorkers;
/*--------------------------------------------------------------*/
// Send this as an EVENT REPORT to inform about hearing about
@@ -1109,6 +1112,7 @@ Qmgr::sendCmNodeInfoReq(Signal* signal,
req->dynamicId = self->ndynamicId;
req->version = getNodeInfo(getOwnNodeId()).m_version;
req->mysql_version = getNodeInfo(getOwnNodeId()).m_mysql_version;
+ req->lqh_workers = getNodeInfo(getOwnNodeId()).m_lqh_workers;
const Uint32 ref = calcQmgrBlockRef(nodeId);
sendSignal(ref,GSN_CM_NODEINFOREQ, signal, CmNodeInfoReq::SignalLength, JBB);
DEBUG_START(GSN_CM_NODEINFOREQ, nodeId, "");
@@ -1214,6 +1218,9 @@ void Qmgr::execCM_REGREF(Signal* signal)
skip_nodes.bitAND(c_definedNodes);
c_start.m_skip_nodes.bitOR(skip_nodes);
+
+ // set own MT config here or in CONF, and others in CM_NODEINFOREQ/CONF
+ setNodeInfo(getOwnNodeId()).m_lqh_workers = globalData.ndbMtLqhWorkers;
char buf[100];
switch (TrefuseReason) {
@@ -1661,11 +1668,17 @@ void Qmgr::execCM_NODEINFOCONF(Signal* s
const Uint32 dynamicId = conf->dynamicId;
const Uint32 version = conf->version;
Uint32 mysql_version = conf->mysql_version;
+ Uint32 lqh_workers = conf->lqh_workers;
if (version < NDBD_SPLIT_VERSION)
{
jam();
mysql_version = 0;
}
+ if (version < NDBD_MT_LQH_VERSION)
+ {
+ jam();
+ lqh_workers = 0;
+ }
NodeRecPtr nodePtr;
nodePtr.i = getOwnNodeId();
@@ -1684,6 +1697,7 @@ void Qmgr::execCM_NODEINFOCONF(Signal* s
replyNodePtr.p->blockRef = signal->getSendersBlockRef();
setNodeInfo(replyNodePtr.i).m_version = version;
setNodeInfo(replyNodePtr.i).m_mysql_version = mysql_version;
+ setNodeInfo(replyNodePtr.i).m_lqh_workers = lqh_workers;
recompute_version_info(NodeInfo::DB, version);
@@ -1741,8 +1755,13 @@ void Qmgr::execCM_NODEINFOREQ(Signal* si
Uint32 mysql_version = req->mysql_version;
if (req->version < NDBD_SPLIT_VERSION)
mysql_version = 0;
-
setNodeInfo(addNodePtr.i).m_mysql_version = mysql_version;
+
+ Uint32 lqh_workers = req->lqh_workers;
+ if (req->version < NDBD_MT_LQH_VERSION)
+ lqh_workers = 0;
+ setNodeInfo(addNodePtr.i).m_lqh_workers = lqh_workers;
+
c_maxDynamicId = req->dynamicId;
cmAddPrepare(signal, addNodePtr, nodePtr.p);
@@ -1799,6 +1818,7 @@ Qmgr::cmAddPrepare(Signal* signal, NodeR
conf->dynamicId = self->ndynamicId;
conf->version = getNodeInfo(getOwnNodeId()).m_version;
conf->mysql_version = getNodeInfo(getOwnNodeId()).m_mysql_version;
+ conf->lqh_workers = getNodeInfo(getOwnNodeId()).m_lqh_workers;
sendSignal(nodePtr.p->blockRef, GSN_CM_NODEINFOCONF, signal,
CmNodeInfoConf::SignalLength, JBB);
DEBUG_START(GSN_CM_NODEINFOCONF, refToNode(nodePtr.p->blockRef), "");
=== modified file 'storage/ndb/src/kernel/blocks/record_types.hpp'
--- a/storage/ndb/src/kernel/blocks/record_types.hpp 2008-01-01 12:45:11 +0000
+++ b/storage/ndb/src/kernel/blocks/record_types.hpp 2008-12-02 13:10:49 +0000
@@ -45,6 +45,11 @@
#define RG_JOBBUFFER 4
/**
+ * File-thread buffers
+ */
+#define RG_FILE_BUFFERS 5
+
+/**
*
*/
#define RG_RESERVED 0
@@ -69,5 +74,7 @@
#define RT_DBTUP_PAGE_MAP MAKE_TID( 2, RG_DATAMEM)
#define RT_JOB_BUFFER MAKE_TID( 1, RG_JOBBUFFER)
+
+#define RT_FILE_BUFFER MAKE_TID( 1, RG_FILE_BUFFERS)
#endif
=== modified file 'storage/ndb/src/kernel/blocks/tsman.cpp'
--- a/storage/ndb/src/kernel/blocks/tsman.cpp 2008-11-20 13:32:13 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.cpp 2008-12-02 13:10:49 +0000
@@ -45,11 +45,7 @@ Tsman::Tsman(Block_context& ctx) :
m_pgman(0),
m_lgman(0),
m_tup(0),
-#ifdef __sun // temp
- m_client_mutex(1, false)
-#else
- m_client_mutex(2, true)
-#endif
+ m_client_mutex("tsman-client", 2, true)
{
BLOCK_CONSTRUCTOR(Tsman);
@@ -771,6 +767,7 @@ Tsman::open_file(Signal* signal,
req->fileFlags = 0;
req->fileFlags |= FsOpenReq::OM_READWRITE;
req->fileFlags |= FsOpenReq::OM_DIRECT;
+ req->fileFlags |= FsOpenReq::OM_THREAD_POOL;
switch(requestInfo){
case CreateFileImplReq::Create:
req->fileFlags |= FsOpenReq::OM_CREATE_IF_NONE;
=== modified file 'storage/ndb/src/kernel/main.cpp'
--- a/storage/ndb/src/kernel/main.cpp 2008-11-09 18:37:29 +0000
+++ b/storage/ndb/src/kernel/main.cpp 2008-12-02 13:10:49 +0000
@@ -250,20 +250,34 @@ init_global_memory_manager(EmulatorData
"config, exiting.");
return -1;
}
+
if (tupmem)
{
Resource_limit rl;
rl.m_min = tupmem;
rl.m_max = tupmem;
- rl.m_resource_id = 3;
+ rl.m_resource_id = RG_DATAMEM;
+ ed.m_mem_manager->set_resource_limit(rl);
+ }
+
+ Uint32 maxopen = 4 * 4; // 4 redo parts, max 4 files per part
+ Uint32 filebuffer = NDB_FILE_BUFFER_SIZE;
+ Uint32 filepages = (filebuffer / GLOBAL_PAGE_SIZE) * maxopen;
+
+ if (filepages)
+ {
+ Resource_limit rl;
+ rl.m_min = filepages;
+ rl.m_max = filepages;
+ rl.m_resource_id = RG_FILE_BUFFERS;
ed.m_mem_manager->set_resource_limit(rl);
}
- if (shared_mem+tupmem)
+ if (shared_mem + tupmem + filepages)
{
Resource_limit rl;
rl.m_min = 0;
- rl.m_max = shared_mem + tupmem;
+ rl.m_max = shared_mem + tupmem + filepages;
rl.m_resource_id = 0;
ed.m_mem_manager->set_resource_limit(rl);
}
@@ -280,7 +294,7 @@ init_global_memory_manager(EmulatorData
ndb_mgm_get_db_parameter_info(CFG_DB_SGA, &sga, &size);
g_eventLogger->alert("Malloc (%lld bytes) for %s and %s failed, exiting",
- Uint64(shared_mem + tupmem) * 32768,
+ Uint64(shared_mem + tupmem) * GLOBAL_PAGE_SIZE,
dm.m_name, sga.m_name);
return -1;
}
@@ -293,8 +307,10 @@ get_multithreaded_config(EmulatorData& e
{
// multithreaded is compiled in ndbd/ndbmtd for now
globalData.isNdbMt = SimulatedBlock::isMultiThreaded();
- if (!globalData.isNdbMt)
+ if (!globalData.isNdbMt) {
+ ndbout << "NDBMT: non-mt" << endl;
return 0;
+ }
const ndb_mgm_configuration_iterator * p =
ed.theConfiguration->getOwnConfigIterator();
=== modified file 'storage/ndb/src/kernel/vm/SafeMutex.cpp'
--- a/storage/ndb/src/kernel/vm/SafeMutex.cpp 2008-11-20 13:32:13 +0000
+++ b/storage/ndb/src/kernel/vm/SafeMutex.cpp 2008-12-01 18:03:48 +0000
@@ -15,94 +15,151 @@
#include "SafeMutex.hpp"
-NdbOut&
-operator<<(NdbOut& out, const SafeMutex& dm)
-{
- out << "level=" << dm.m_level << "," << "usage=" << dm.m_usage;
- return out;
-}
-
int
SafeMutex::create()
{
- if (m_init)
- return ErrState;
- int ret = -1;
-#ifdef HAVE_PTHREAD_MUTEX_RECURSIVE
-#ifndef __WIN__
- if (m_limit > 1 || m_debug) {
- pthread_mutexattr_t attr;
- pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
- ret = pthread_mutex_init(&m_mutex, &attr);
- } else {
- // error-check mutex does not work right on my linux, skip it
- ret = pthread_mutex_init(&m_mutex, 0);
- }
-#else
+ int ret;
+ if (m_initdone)
+ return err(ErrState, __LINE__);
ret = pthread_mutex_init(&m_mutex, 0);
-#endif
-#else
- if (m_limit > 1 || m_debug)
- return ErrUnsupp;
- ret = pthread_mutex_init(&m_mutex, 0);
-#endif
if (ret != 0)
- return ret;
- m_init = true;
+ return err(ret, __LINE__);
+ ret = pthread_cond_init(&m_cond, 0);
+ if (ret != 0)
+ return err(ret, __LINE__);
+ m_initdone = true;
return 0;
}
int
SafeMutex::destroy()
{
- if (!m_init)
- return ErrState;
- int ret = pthread_mutex_destroy(&m_mutex);
+ int ret;
+ if (!m_initdone)
+ return err(ErrState, __LINE__);
+ ret = pthread_cond_destroy(&m_cond);
+ if (ret != 0)
+ return err(ret, __LINE__);
+ ret = pthread_mutex_destroy(&m_mutex);
if (ret != 0)
- return ret;
- m_init = false;
+ return err(ret, __LINE__);
+ m_initdone = false;
return 0;
}
int
SafeMutex::lock()
{
- pthread_t self = pthread_self();
- int ret = pthread_mutex_lock(&m_mutex);
- /* have mutex */
+ int ret;
+ if (m_simple) {
+ ret = pthread_mutex_lock(&m_mutex);
+ if (ret != 0)
+ return err(ret, __LINE__);
+ return 0;
+ }
+ ret = pthread_mutex_lock(&m_mutex);
if (ret != 0)
- return ret;
- if (!(m_level < m_limit))
- return ErrLevel;
- m_level++;
- if (m_level > m_usage)
- m_usage = m_level;
- if (m_level == 1 && m_owner != 0)
- return ErrOwner1;
- if (m_level >= 2 && m_owner != self)
- return ErrOwner2;
- m_owner = self;
+ return err(ret, __LINE__);
+ return lock_impl();
+}
+
+int
+SafeMutex::lock_impl()
+{
+ int ret;
+ pthread_t self = pthread_self();
+ assert(self != 0);
+ while (1) {
+ if (m_level == 0) {
+ assert(m_owner == 0);
+ m_owner = self;
+ } else if (m_owner != self) {
+ ret = pthread_cond_wait(&m_cond, &m_mutex);
+ if (ret != 0)
+ return err(ret, __LINE__);
+ continue;
+ }
+ if (!(m_level < m_limit))
+ return err(ErrLevel, __LINE__);
+ m_level++;
+ if (m_usage < m_level)
+ m_usage = m_level;
+ ret = pthread_cond_signal(&m_cond);
+ if (ret != 0)
+ return err(ret, __LINE__);
+ ret = pthread_mutex_unlock(&m_mutex);
+ if (ret != 0)
+ return err(ret, __LINE__);
+ break;
+ }
return 0;
}
int
SafeMutex::unlock()
{
+ int ret;
+ if (m_simple) {
+ ret = pthread_mutex_unlock(&m_mutex);
+ if (ret != 0)
+ return err(ret, __LINE__);
+ return 0;
+ }
+ ret = pthread_mutex_lock(&m_mutex);
+ if (ret != 0)
+ return err(ret, __LINE__);
+ return unlock_impl();
+}
+
+int
+SafeMutex::unlock_impl()
+{
+ int ret;
pthread_t self = pthread_self();
- if (!(m_level > 0))
- return ErrState;
+ assert(self != 0);
if (m_owner != self)
- return ErrOwner3;
- if (m_level == 1)
- m_owner = 0;
+ return err(ErrOwner, __LINE__);
+ if (m_level == 0)
+ return err(ErrNolock, __LINE__);
m_level--;
- int ret = pthread_mutex_unlock(&m_mutex);
- /* lose mutex */
+ if (m_level == 0) {
+ m_owner = 0;
+ ret = pthread_cond_signal(&m_cond);
+ if (ret != 0)
+ return err(ret, __LINE__);
+ }
+ ret = pthread_mutex_unlock(&m_mutex);
if (ret != 0)
- return ret;
+ return err(ret, __LINE__);
return 0;
}
+int
+SafeMutex::err(int errcode, int errline)
+{
+ assert(errcode != 0);
+ m_errcode = errcode;
+ m_errline = errline;
+ ndbout << *this << endl;
+#ifdef UNIT_TEST
+ abort();
+#endif
+ return errcode;
+}
+
+NdbOut&
+operator<<(NdbOut& out, const SafeMutex& sm)
+{
+ out << sm.m_name << ":";
+ out << " level=" << sm.m_level;
+ out << " usage=" << sm.m_usage;
+ if (sm.m_errcode != 0) {
+ out << " errcode=" << sm.m_errcode;
+ out << " errline=" << sm.m_errline;
+ }
+ return out;
+}
+
#ifdef UNIT_TEST
struct sm_thr {
@@ -144,10 +201,12 @@ sm_run(void* arg)
}
if (op == +1) {
assert(level < thr.limit);
+ //ndbout << thr.index << ": lock" << endl;
int ret = sm.lock();
assert(ret == 0);
level++;
} else if (op == -1) {
+ //ndbout << thr.index << ": unlock" << endl;
int ret = sm.unlock();
assert(ret == 0);
assert(level != 0);
@@ -161,6 +220,7 @@ sm_run(void* arg)
assert(ret == 0);
level--;
}
+ return 0;
}
int
@@ -169,18 +229,19 @@ main(int argc, char** argv)
const uint max_thr = 128;
struct sm_thr thr[max_thr];
- // threads - loops - max level
+ // threads - loops - max level - debug
uint num_thr = argc > 1 ? atoi(argv[1]) : 4;
assert(num_thr != 0 && num_thr <= max_thr);
uint loops = argc > 2 ? atoi(argv[2]) : 1000000;
uint limit = argc > 3 ? atoi(argv[3]) : 10;
assert(limit != 0);
+ bool debug = argc > 4 ? atoi(argv[4]) : true;
ndbout << "threads=" << num_thr;
ndbout << " loops=" << loops;
ndbout << " max level=" << limit << endl;
- SafeMutex sm(limit, true);
+ SafeMutex sm("test-mutex", limit, debug);
int ret;
ret = sm.create();
assert(ret == 0);
=== modified file 'storage/ndb/src/kernel/vm/SafeMutex.hpp'
--- a/storage/ndb/src/kernel/vm/SafeMutex.hpp 2008-11-20 13:32:13 +0000
+++ b/storage/ndb/src/kernel/vm/SafeMutex.hpp 2008-12-01 18:03:48 +0000
@@ -26,61 +26,70 @@
#include <ndb_types.h>
#include <NdbOut.hpp>
-#undef HAVE_PTHREAD_MUTEX_RECURSIVE
-#ifdef __linux
-#define HAVE_PTHREAD_MUTEX_RECURSIVE
-#endif
-
/*
- * Recursive mutex with a recursion limit >= 1. Can be useful for
- * debugging. If a recursive mutex is not wanted, one must rewrite
- * caller code until limit 1 works.
+ * Recursive mutex with recursion limit >= 1. Intended for debugging.
+ * One should rewrite caller code until limit 1 works.
*
- * Implementation for limit > 1 uses a real OS recursive mutex. Should
- * work on linux and solaris 10. There is a unit test testSafeMutex.
+ * The implementation uses a default mutex. If limit is > 1 or debug
+ * is specified then a recursive mutex is simulated. Operating system
+ * recursive mutex (if any) is not used. The simulation is several
+ * times slower. There is a unit test testSafeMutex.
*
* The caller currently is multi-threaded disk data. Here it is easy
* to verify that the mutex is released within a time-slice.
*/
class SafeMutex {
+ const char* const m_name;
+ const Uint32 m_limit; // error if usage exceeds this
+ const bool m_debug; // use recursive implementation even for limit 1
+ const bool m_simple;
pthread_mutex_t m_mutex;
+ pthread_cond_t m_cond;
pthread_t m_owner;
- bool m_init;
+ bool m_initdone;
Uint32 m_level;
Uint32 m_usage; // max level used so far
- const Uint32 m_limit; // error if usage exceeds this
- const bool m_debug; // use recursive mutex even for limit 1
+ int m_errcode;
+ int m_errline;
+ int err(int errcode, int errline);
friend class NdbOut& operator<<(NdbOut&, const SafeMutex&);
public:
- SafeMutex(Uint32 limit, bool debug) :
+ SafeMutex(const char* name, Uint32 limit, bool debug) :
+ m_name(name),
m_limit(limit),
- m_debug(debug)
+ m_debug(debug),
+ m_simple(!(limit > 1 || debug))
{
assert(m_limit >= 1),
m_owner = 0; // wl4391_todo assuming numeric non-zero
- m_init = false;
+ m_initdone = false;
m_level = 0;
m_usage = 0;
+ m_errcode = 0;
+ m_errline = 0;
};
~SafeMutex() {
- (void)destroy();
+ if (m_initdone)
+ (void)destroy();
}
enum {
- // caller must crash on any error
- ErrUnsupp = -101, // limit > 1 or debug, and not supported by OS
- ErrState = -102, // user error
- ErrLevel = -103, // level exceeded limit
- ErrOwner1 = -104, // owner not 0 at first lock (OS error)
- ErrOwner2 = -105, // owner not self at recursive lock (OS error)
- ErrOwner3 = -106 // owner not self at unlock (OS error)
+ // caller must crash on any error - recovery is not possible
+ ErrState = -101, // user error
+ ErrLevel = -102, // level exceeded limit
+ ErrOwner = -103, // unlock when not owner
+ ErrNolock = -104 // unlock when no lock
};
int create();
int destroy();
int lock();
int unlock();
+
+private:
+ int lock_impl();
+ int unlock_impl();
};
#endif
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-12-01 18:05:11 +0000
@@ -544,6 +544,7 @@ protected:
BlockReference calcInstanceBlockRef(BlockNumber aBlock);
// matching instance on another node e.g. LQH-LQH
+ // valid only if receiver has same number of workers
BlockReference calcInstanceBlockRef(BlockNumber aBlock, NodeId aNode);
/**
=== modified file 'storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp'
--- a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp 2008-02-08 14:35:31 +0000
+++ b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp 2008-12-02 13:10:49 +0000
@@ -84,7 +84,7 @@ public:
private:
void grow(Uint32 start, Uint32 cnt);
-#define XX_RL_COUNT 5
+#define XX_RL_COUNT 6
/**
* Return pointer to free page data on page
*/
=== modified file 'storage/ndb/src/mgmapi/LocalConfig.cpp'
--- a/storage/ndb/src/mgmapi/LocalConfig.cpp 2008-11-18 16:33:59 +0000
+++ b/storage/ndb/src/mgmapi/LocalConfig.cpp 2008-12-02 14:25:58 +0000
@@ -275,7 +275,8 @@ LocalConfig::parseString(const char * co
err.assfmt("Unexpected entry: \"%s\"", tok);
return false;
}
-
+ bind_address_port= 0;
+ bind_address.assign("");
return true;
}
@@ -343,6 +344,15 @@ char *
LocalConfig::makeConnectString(char *buf, int sz)
{
int p= BaseString::snprintf(buf,sz,"nodeid=%d", _ownNodeId);
+ if (p < sz && bind_address.length())
+ {
+ int new_p= p+BaseString::snprintf(buf+p,sz-p,",bind-address=%s:%d",
+ bind_address.c_str(), bind_address_port);
+ if (new_p < sz)
+ p= new_p;
+ else
+ buf[p]= 0;
+ }
if (p < sz)
for (unsigned i = 0; i < ids.size(); i++)
{
@@ -356,6 +366,18 @@ LocalConfig::makeConnectString(char *buf
{
buf[p]= 0;
break;
+ }
+ if (!bind_address.length() && ids[i].bind_address.length())
+ {
+ new_p= p+BaseString::snprintf(buf+p,sz-p,",bind-address=%s:%d",
+ ids[i].bind_address.c_str(), ids[i].bind_address_port);
+ if (new_p < sz)
+ p= new_p;
+ else
+ {
+ buf[p]= 0;
+ break;
+ }
}
}
buf[sz-1]=0;
=== modified file 'storage/ndb/src/mgmapi/mgmapi.cpp'
--- a/storage/ndb/src/mgmapi/mgmapi.cpp 2008-11-19 14:47:19 +0000
+++ b/storage/ndb/src/mgmapi/mgmapi.cpp 2008-12-02 14:25:58 +0000
@@ -238,6 +238,8 @@ ndb_mgm_set_connectstring(NdbMgmHandle h
DBUG_RETURN(-1);
}
handle->cfg_i= -1;
+ handle->cfg.bind_address_port= handle->m_bindaddress_port;
+ handle->cfg.bind_address.assign(handle->m_bindaddress ? handle->m_bindaddress : "");
DBUG_RETURN(0);
}
@@ -265,6 +267,11 @@ ndb_mgm_set_bindaddress(NdbMgmHandle han
{
handle->m_bindaddress = 0;
handle->m_bindaddress_port = 0;
+ }
+ if (handle->cfg.ids.size() != 0)
+ {
+ handle->cfg.bind_address_port= handle->m_bindaddress_port;
+ handle->cfg.bind_address.assign(handle->m_bindaddress ? handle->m_bindaddress : "");
}
DBUG_RETURN(0);
}
=== modified file 'storage/ndb/src/mgmclient/CommandInterpreter.cpp'
--- a/storage/ndb/src/mgmclient/CommandInterpreter.cpp 2008-11-19 10:34:01 +0000
+++ b/storage/ndb/src/mgmclient/CommandInterpreter.cpp 2008-12-03 14:14:38 +0000
@@ -2289,10 +2289,10 @@ print_status(const ndb_mgm_node_state *
<< ": " << status_string(state->node_status);
switch(state->node_status){
case NDB_MGM_NODE_STATUS_STARTING:
- ndbout << " (Phase " << state->start_phase << ")";
+ ndbout << " (Last completed phase " << state->start_phase << ")";
break;
case NDB_MGM_NODE_STATUS_SHUTTING_DOWN:
- ndbout << " (Phase " << state->start_phase << ")";
+ ndbout << " (Last completed phase " << state->start_phase << ")";
break;
default:
break;
=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2008-11-19 11:01:17 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp 2008-12-02 13:10:49 +0000
@@ -1074,6 +1074,18 @@ const ConfigInfo::ParamInfo ConfigInfo::
0, 0 },
{
+ CFG_DB_THREAD_POOL,
+ "ThreadPool",
+ DB_TOKEN,
+ "No of unbound threads for file access (currently only for DD)",
+ ConfigInfo::CI_USED,
+ false,
+ ConfigInfo::CI_INT,
+ "8",
+ "0",
+ STR_VALUE(MAX_INT_RNIL) },
+
+ {
CFG_DB_MAX_OPEN_FILES,
"MaxNoOfOpenFiles",
DB_TOKEN,
=== modified file 'storage/ndb/test/ndbapi/testDict.cpp'
--- a/storage/ndb/test/ndbapi/testDict.cpp 2008-10-31 14:19:25 +0000
+++ b/storage/ndb/test/ndbapi/testDict.cpp 2008-12-02 12:05:54 +0000
@@ -1243,86 +1243,6 @@ int runGetPrimaryKey(NDBT_Context* ctx,
return result;
}
-struct ErrorCodes { int error_id; bool crash;};
-ErrorCodes
-NF_codes[] = {
- {6003, true}
- ,{6004, true}
- //,6005, true,
- //{7173, false}
-};
-
-int
-runNF1(NDBT_Context* ctx, NDBT_Step* step){
- NdbRestarter restarter;
- if(restarter.getNumDbNodes() < 2)
- return NDBT_OK;
-
- myRandom48Init((long)NdbTick_CurrentMillisecond());
-
- Ndb* pNdb = GETNDB(step);
- const NdbDictionary::Table* pTab = ctx->getTab();
-
- NdbDictionary::Dictionary* dict = pNdb->getDictionary();
- dict->dropTable(pTab->getName());
-
- int result = NDBT_OK;
-
- const int loops = ctx->getNumLoops();
- for (int l = 0; l < loops && result == NDBT_OK ; l++){
- const int sz = sizeof(NF_codes)/sizeof(NF_codes[0]);
- for(int i = 0; i<sz; i++){
- int rand = myRandom48(restarter.getNumDbNodes());
- int nodeId = restarter.getRandomNotMasterNodeId(rand);
- struct ErrorCodes err_struct = NF_codes[i];
- int error = err_struct.error_id;
- bool crash = err_struct.crash;
-
- g_info << "NF1: node = " << nodeId << " error code = " << error << endl;
-
- int val2[] = { DumpStateOrd::CmvmiSetRestartOnErrorInsert, 3};
-
- CHECK2(restarter.dumpStateOneNode(nodeId, val2, 2) == 0,
- "failed to set RestartOnErrorInsert");
-
- CHECK2(restarter.insertErrorInNode(nodeId, error) == 0,
- "failed to set error insert");
-
- CHECK2(dict->createTable(* pTab) == 0,
- "failed to create table");
-
- if (crash) {
- CHECK2(restarter.waitNodesNoStart(&nodeId, 1) == 0,
- "waitNodesNoStart failed");
-
- if(myRandom48(100) > 50){
- CHECK2(restarter.startNodes(&nodeId, 1) == 0,
- "failed to start node");
-
- CHECK2(restarter.waitClusterStarted() == 0,
- "waitClusterStarted failed");
-
- CHECK2(dict->dropTable(pTab->getName()) == 0,
- "drop table failed");
- } else {
- CHECK2(dict->dropTable(pTab->getName()) == 0,
- "drop table failed");
-
- CHECK2(restarter.startNodes(&nodeId, 1) == 0,
- "failed to start node");
-
- CHECK2(restarter.waitClusterStarted() == 0,
- "waitClusterStarted failed");
- }
- }
- }
- }
- end:
- dict->dropTable(pTab->getName());
-
- return result;
-}
-
#define APIERROR(error) \
{ g_err << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" \
<< error.code << ", msg: " << error.message << "." << endl; \
@@ -1468,102 +1388,6 @@ runTableRename(NDBT_Context* ctx, NDBT_S
}
int
-runTableRenameNF(NDBT_Context* ctx, NDBT_Step* step){
- NdbRestarter restarter;
- if(restarter.getNumDbNodes() < 2)
- return NDBT_OK;
-
- int result = NDBT_OK;
-
- Ndb* pNdb = GETNDB(step);
- NdbDictionary::Dictionary* dict = pNdb->getDictionary();
- int records = ctx->getNumRecords();
- const int loops = ctx->getNumLoops();
-
- ndbout << "|- " << ctx->getTab()->getName() << endl;
-
- for (int l = 0; l < loops && result == NDBT_OK ; l++){
- const NdbDictionary::Table* pTab = ctx->getTab();
-
- // Try to create table in db
- if (pTab->createTableInDb(pNdb) != 0){
- return NDBT_FAILED;
- }
-
- // Verify that table is in db
- const NdbDictionary::Table* pTab2 =
- NDBT_Table::discoverTableFromDb(pNdb, pTab->getName());
- if (pTab2 == NULL){
- ndbout << pTab->getName() << " was not found in DB"<< endl;
- return NDBT_FAILED;
- }
- ctx->setTab(pTab2);
-
- // Load table
- HugoTransactions hugoTrans(*ctx->getTab());
- if (hugoTrans.loadTable(pNdb, records) != 0){
- return NDBT_FAILED;
- }
-
- BaseString pTabName(pTab->getName());
- BaseString pTabNewName(pTabName);
- pTabNewName.append("xx");
-
- const NdbDictionary::Table * oldTable = dict->getTable(pTabName.c_str());
- if (oldTable) {
- NdbDictionary::Table newTable = *oldTable;
- newTable.setName(pTabNewName.c_str());
- CHECK2(dict->alterTable(*oldTable, newTable) == 0,
- "TableRename failed");
- }
- else {
- result = NDBT_FAILED;
- }
-
- // Restart one node at a time
-
- /**
- * Need to run LCP at high rate otherwise
- * packed replicas become "to many"
- */
- int val = DumpStateOrd::DihMinTimeBetweenLCP;
- if(restarter.dumpStateAllNodes(&val, 1) != 0){
- do { CHECK(0); } while(0);
- g_err << "Failed to set LCP to min value" << endl;
- return NDBT_FAILED;
- }
-
- const int numNodes = restarter.getNumDbNodes();
- for(int i = 0; i<numNodes; i++){
- int nodeId = restarter.getDbNodeId(i);
- int error = NF_codes[i].error_id;
-
- g_info << "NF1: node = " << nodeId << " error code = " << error << endl;
-
- CHECK2(restarter.restartOneDbNode(nodeId) == 0,
- "failed to set restartOneDbNode");
-
- CHECK2(restarter.waitClusterStarted() == 0,
- "waitClusterStarted failed");
-
- }
-
- // Verify table contents
- NdbDictionary::Table pNewTab(pTabNewName.c_str());
-
- UtilTransactions utilTrans(pNewTab);
- if (utilTrans.clearTable(pNdb, records) != 0){
- continue;
- }
-
- // Drop table
- dict->dropTable(pTabNewName.c_str());
- }
- end:
- return result;
-}
-
-int
runTableRenameSR(NDBT_Context* ctx, NDBT_Step* step){
NdbRestarter restarter;
if(restarter.getNumDbNodes() < 2)
@@ -6675,17 +6499,24 @@ runFailAddPartition(NDBT_Context* ctx, N
NdbDictionary::Table altered = * org;
altered.setFragmentCount(org->getFragmentCount() + 2);
- NdbDictionary::HashMap hm;
- pDic->initDefaultHashMap(hm, altered.getFragmentCount());
- if (pDic->getHashMap(hm, hm.getName()) == -1)
+ if (pDic->beginSchemaTrans())
{
- if (pDic->createHashMap(hm) != 0)
- {
- ndbout << "Failed to create hashmap: " << pDic->getNdbError() << endl;
- return NDBT_FAILED;
- }
+ ndbout << "Failed to beginSchemaTrans()" << pDic->getNdbError() << endl;
+ return NDBT_FAILED;
+ }
+
+ if (pDic->prepareHashMap(*org, altered) == -1)
+ {
+ ndbout << "Failed to create hashmap: " << pDic->getNdbError() << endl;
+ return NDBT_FAILED;
}
+ if (pDic->endSchemaTrans())
+ {
+ ndbout << "Failed to endSchemaTrans()" << pDic->getNdbError() << endl;
+ return NDBT_FAILED;
+ }
+
int dump1 = DumpStateOrd::SchemaResourceSnapshot;
int dump2 = DumpStateOrd::SchemaResourceCheckLeak;
@@ -6702,9 +6533,16 @@ runFailAddPartition(NDBT_Context* ctx, N
"failed to set error insert");
CHECK(restarter.dumpStateAllNodes(&dump1, 1) == 0);
- CHECK2(pDic->alterTable(*org, altered) != 0,
+ int res = pDic->alterTable(*org, altered);
+ if (res)
+ {
+ ndbout << pDic->getNdbError() << endl;
+ }
+ CHECK2(res != 0,
"failed to fail after error insert " << errval);
CHECK(restarter.dumpStateAllNodes(&dump2, 1) == 0);
+ CHECK2(restarter.insertErrorInNode(nodeId, 0) == 0,
+ "failed to clear error insert");
const NdbDictionary::Table* check = pDic->getTable(tab.getName());
@@ -6926,17 +6764,9 @@ TESTCASE("StoreFrmError",
"Test that a frm file with too long length can't be stored."){
INITIALIZER(runStoreFrmError);
}
-TESTCASE("NF1",
- "Test that create table can handle NF (not master)"){
- INITIALIZER(runNF1);
-}
TESTCASE("TableRename",
"Test basic table rename"){
INITIALIZER(runTableRename);
-}
-TESTCASE("TableRenameNF",
- "Test that table rename can handle node failure"){
- INITIALIZER(runTableRenameNF);
}
TESTCASE("TableRenameSR",
"Test that table rename can handle system restart"){
=== modified file 'storage/ndb/test/ndbapi/testNodeRestart.cpp'
--- a/storage/ndb/test/ndbapi/testNodeRestart.cpp 2008-11-03 08:38:27 +0000
+++ b/storage/ndb/test/ndbapi/testNodeRestart.cpp 2008-12-03 19:51:33 +0000
@@ -292,6 +292,9 @@ int runRestarter(NDBT_Context* ctx, NDBT
int result = NDBT_OK;
int loops = ctx->getNumLoops();
int sync_threads = ctx->getProperty("SyncThreads", (unsigned)0);
+ int sleep0 = ctx->getProperty("Sleep0", (unsigned)0);
+ int sleep1 = ctx->getProperty("Sleep1", (unsigned)0);
+ int randnode = ctx->getProperty("RandNode", (unsigned)0);
NdbRestarter restarter;
int i = 0;
int lastId = 0;
@@ -310,6 +313,10 @@ int runRestarter(NDBT_Context* ctx, NDBT
while(i<loops && result != NDBT_FAILED && !ctx->isTestStopped()){
int id = lastId % restarter.getNumDbNodes();
+ if (randnode == 1)
+ {
+ id = rand() % restarter.getNumDbNodes();
+ }
int nodeId = restarter.getDbNodeId(id);
ndbout << "Restart node " << nodeId << endl;
if(restarter.restartOneDbNode(nodeId, false, true, true) != 0){
@@ -325,6 +332,9 @@ int runRestarter(NDBT_Context* ctx, NDBT
break;
}
+ if (sleep1)
+ NdbSleep_MilliSleep(sleep1);
+
if (restarter.startNodes(&nodeId, 1))
{
g_err << "Failed to start node" << endl;
@@ -338,6 +348,9 @@ int runRestarter(NDBT_Context* ctx, NDBT
break;
}
+ if (sleep0)
+ NdbSleep_MilliSleep(sleep0);
+
ctx->sync_up_and_wait("PauseThreads", sync_threads);
lastId++;
@@ -3243,6 +3256,65 @@ loop2:
return NDBT_OK;
}
+int
+runHammer(NDBT_Context* ctx, NDBT_Step* step)
+{
+ int result = NDBT_OK;
+ int records = ctx->getNumRecords();
+ Ndb* pNdb = GETNDB(step);
+ HugoOperations hugoOps(*ctx->getTab());
+ while (!ctx->isTestStopped())
+ {
+ int r = rand() % records;
+ if (hugoOps.startTransaction(pNdb) != 0)
+ goto err;
+
+ if ((rand() % 100) < 50)
+ {
+ if (hugoOps.pkUpdateRecord(pNdb, r, 1, rand()) != 0)
+ goto err;
+ }
+ else
+ {
+ if (hugoOps.pkWriteRecord(pNdb, r, 1, rand()) != 0)
+ goto err;
+ }
+
+ if (hugoOps.execute_NoCommit(pNdb) != 0)
+ goto err;
+
+ if (hugoOps.pkDeleteRecord(pNdb, r, 1) != 0)
+ goto err;
+
+ if (hugoOps.execute_NoCommit(pNdb) != 0)
+ goto err;
+
+ if ((rand() % 100) < 50)
+ {
+ if (hugoOps.pkInsertRecord(pNdb, r, 1, rand()) != 0)
+ goto err;
+ }
+ else
+ {
+ if (hugoOps.pkWriteRecord(pNdb, r, 1, rand()) != 0)
+ goto err;
+ }
+
+ if ((rand() % 100) < 90)
+ {
+ hugoOps.execute_Commit(pNdb);
+ }
+ else
+ {
+ err:
+ hugoOps.execute_Rollback(pNdb);
+ }
+
+ hugoOps.closeTransaction(pNdb);
+ }
+ return NDBT_OK;
+}
+
NDBT_TESTSUITE(testNodeRestart);
TESTCASE("NoLoad",
"Test that one node at a time can be stopped and then restarted "\
@@ -3695,6 +3767,15 @@ TESTCASE("Bug36276", ""){
TESTCASE("Bug36245", ""){
INITIALIZER(runLoadTable);
STEP(runBug36245);
+ VERIFIER(runClearTable);
+}
+TESTCASE("NF_Hammer", ""){
+ TC_PROPERTY("Sleep0", 9000);
+ TC_PROPERTY("Sleep1", 3000);
+ TC_PROPERTY("Rand", 1);
+ INITIALIZER(runLoadTable);
+ STEPS(runHammer, 25);
+ STEP(runRestarter);
VERIFIER(runClearTable);
}
NDBT_TESTSUITE_END(testNodeRestart);
=== modified file 'storage/ndb/test/run-test/command.cpp'
--- a/storage/ndb/test/run-test/command.cpp 2008-02-21 13:57:42 +0000
+++ b/storage/ndb/test/run-test/command.cpp 2008-11-27 18:03:09 +0000
@@ -66,6 +66,9 @@ static
bool
do_change_version(atrt_config& config, SqlResultSet& command,
AtrtClient& atrtdb){
+ /**
+ * TODO make option to restart "not" initial
+ */
uint process_id= command.columnAsInt("process_id");
const char* process_args= command.column("process_args");
=== modified file 'storage/ndb/test/run-test/daily-basic-tests.txt'
--- a/storage/ndb/test/run-test/daily-basic-tests.txt 2008-11-08 21:43:03 +0000
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt 2008-12-03 20:14:25 +0000
@@ -321,7 +321,7 @@ max-time: 500
cmd: testScan
args: -n ScanRead488Timeout -l 10 T6 D1 D2
-max-time: 600
+max-time: 1200
cmd: testScan
args: -n ScanRead40 -l 100 T6 D1 D2
@@ -337,10 +337,6 @@ max-time: 1800
cmd: testScan
args: -n ScanRead40RandomTable -l 100 T1
-max-time: 3600
-cmd: testScan
-args: -n ScanRead40RandomTable -l 1000 T6
-
max-time: 500
cmd: testScan
args: -n ScanWithLocksAndInserts T6 D1 D2
@@ -954,10 +950,6 @@ cmd: DbAsyncGenerator
args: -time 60 -p 1 -proc 25
type: bench
-max-time: 300
-cmd: testMgm
-args:
-
max-time: 5000
cmd: testNodeRestart
args: -n GCP T1
@@ -1157,7 +1149,6 @@ cmd: testDict
args: -n FailAddPartition T1 I3
# EOF 2008-06-05
-
# Test data buffering for TCKEYREQ
max-time: 500
cmd: testLimits
@@ -1178,28 +1169,28 @@ cmd: testBasic
args: -n PkUpdate WIDE_MAXKEY_HUGO WIDE_MAXATTR_HUGO WIDE_MAXKEYMAXCOLS_HUGO WIDE_MINKEYMAXCOLS_HUGO
# EOF 2008-06-30
-
max-time: 500
cmd: test_event
args -n bug37672 T1
#EOF 2008-07-04
-
max-time: 500
cmd: testScanFilter
args:
#EOF 2008-07-09
-
max-time: 600
cmd: test_event
args -r 5000 -n Bug30780 T1
#EOF 2008-08-11
-
# Test data buffering for SCANTABREQ
max-time: 500
cmd: testLimits
args: -n ExhaustSegmentedSectionScan WIDE_2COL
#EOF 2008-08-20
+max-time: 300
+cmd: testMgm
+args:
+
=== modified file 'storage/ndb/test/run-test/daily-devel-tests.txt'
--- a/storage/ndb/test/run-test/daily-devel-tests.txt 2008-08-30 05:26:09 +0000
+++ b/storage/ndb/test/run-test/daily-devel-tests.txt 2008-12-03 19:51:33 +0000
@@ -133,6 +133,10 @@ max-time: 2500
cmd: testNodeRestart
args: -n FiftyPercentStopAndWait T6 T13
+max-time: 2500
+cmd: testNodeRestart
+args: -n NF_Hammer -r 5 T1
+
#max-time: 500
#cmd: testNodeRestart
#args: -n StopOnError T1
@@ -181,10 +185,6 @@ args: -l 2 -n SR1_O T6 T13
max-time: 500
cmd: testIndex
args: -n MixedTransaction T1
-
-max-time: 2500
-cmd: testDict
-args: -n NF1 T1 T6 T13
#
max-time: 1500
=== modified file 'storage/ndb/test/run-test/setup.cpp'
--- a/storage/ndb/test/run-test/setup.cpp 2008-11-03 12:33:34 +0000
+++ b/storage/ndb/test/run-test/setup.cpp 2008-11-27 19:42:21 +0000
@@ -317,7 +317,7 @@ load_process(atrt_config& config, atrt_c
proc.m_host->m_basedir.c_str());
proc.m_proc.m_args.appfmt(" --defaults-group-suffix=%s",
cluster.m_name.c_str());
- proc.m_proc.m_args.append(" --nodaemon -n");
+ proc.m_proc.m_args.append(" --nodaemon --initial -n");
if (g_fix_nodeid)
proc.m_proc.m_args.appfmt(" --ndb-nodeid=%d", proc.m_nodeid);
proc.m_proc.m_cwd.assfmt("%sndbd.%d", dir.c_str(), proc.m_index);
| Thread |
|---|
| • bzr commit into mysql-5.1 branch (tomas.ulin:3137) | Tomas Ulin | 5 Dec |