From: Pekka Nousiainen Date: May 29 2011 10:55am Subject: bzr commit into mysql-5.1-telco-7.0-wl4124 branch (pekka.nousiainen:4384) List-Archive: http://lists.mysql.com/commits/138376 Message-Id: <20110529105540.BB46D5586A@sama.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit #At file:///export/space/pekka/ms/ms-wl4124-70/ based on revid:pekka.nousiainen@stripped 4384 Pekka Nousiainen 2011-05-29 [merge] merge from main into wl4124 modified: storage/ndb/include/kernel/ndb_limits.h storage/ndb/include/kernel/signaldata/FireTrigOrd.hpp storage/ndb/include/kernel/signaldata/SumaImpl.hpp storage/ndb/include/kernel/signaldata/TupCommit.hpp storage/ndb/include/ndbapi/NdbEventOperation.hpp storage/ndb/src/common/debugger/signaldata/SumaImpl.cpp storage/ndb/src/kernel/blocks/LocalProxy.cpp storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp storage/ndb/src/kernel/blocks/dbtc/DbtcProxy.cpp storage/ndb/src/kernel/blocks/dbtc/DbtcProxy.hpp storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp storage/ndb/src/kernel/blocks/suma/Suma.cpp storage/ndb/src/ndbapi/NdbEventOperation.cpp storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp storage/ndb/src/ndbapi/NdbQueryOperation.cpp storage/ndb/test/include/HugoOperations.hpp storage/ndb/test/ndbapi/test_event.cpp storage/ndb/test/run-test/daily-basic-tests.txt storage/ndb/test/run-test/daily-devel-tests.txt storage/ndb/test/src/HugoOperations.cpp === modified file 'storage/ndb/include/kernel/ndb_limits.h' --- a/storage/ndb/include/kernel/ndb_limits.h 2011-05-19 09:16:32 +0000 +++ b/storage/ndb/include/kernel/ndb_limits.h 2011-05-29 10:55:32 +0000 @@ -197,6 +197,7 @@ #define MAX_NDBMT_LQH_WORKERS 4 #define MAX_NDBMT_LQH_THREADS 4 +#define MAX_NDBMT_TC_THREADS 2 #define NDB_FILE_BUFFER_SIZE (256*1024) === modified file 'storage/ndb/include/kernel/signaldata/FireTrigOrd.hpp' --- a/storage/ndb/include/kernel/signaldata/FireTrigOrd.hpp 2011-04-28 07:47:53 +0000 +++ b/storage/ndb/include/kernel/signaldata/FireTrigOrd.hpp 2011-05-25 14:31:47 +0000 @@ -57,7 +57,7 @@ class FireTrigOrd { public: STATIC_CONST( SignalLength = 11 ); STATIC_CONST( SignalWithGCILength = 9 ); - STATIC_CONST( SignalLengthSuma = 12 ); + STATIC_CONST( SignalLengthSuma = 14 ); private: Uint32 m_connectionPtr; @@ -72,15 +72,11 @@ private: Uint32 m_gci_hi; Uint32 m_triggerType; }; - union { - Uint32 m_hashValue; - Uint32 m_transId1; - }; - union { - Uint32 m_any_value; - Uint32 m_transId2; - }; + Uint32 m_transId1; + Uint32 m_transId2; Uint32 m_gci_lo; + Uint32 m_hashValue; + Uint32 m_any_value; // Public methods public: Uint32 getConnectionPtr() const; === modified file 'storage/ndb/include/kernel/signaldata/SumaImpl.hpp' --- a/storage/ndb/include/kernel/signaldata/SumaImpl.hpp 2011-05-19 09:38:03 +0000 +++ b/storage/ndb/include/kernel/signaldata/SumaImpl.hpp 2011-05-29 10:55:32 +0000 @@ -311,6 +311,7 @@ struct SubSyncConf { struct SubTableData { friend bool printSUB_TABLE_DATA(FILE *, const Uint32 *, Uint32, Uint16); STATIC_CONST( SignalLength = 8 ); + STATIC_CONST( SignalLengthWithTransId = 10 ); SECTION( DICT_TAB_INFO = 0 ); SECTION( ATTR_INFO = 0 ); SECTION( AFTER_VALUES = 1 ); @@ -334,6 +335,8 @@ struct SubTableData { }; Uint32 totalLen; Uint32 gci_lo; + Uint32 transId1; + Uint32 transId2; static void setOperation(Uint32& ri, Uint32 val) { ri = (ri & 0xFFFFFF00) | val; === modified file 'storage/ndb/include/kernel/signaldata/TupCommit.hpp' --- a/storage/ndb/include/kernel/signaldata/TupCommit.hpp 2011-02-01 23:27:25 +0000 +++ b/storage/ndb/include/kernel/signaldata/TupCommit.hpp 2011-05-25 14:31:47 +0000 @@ -38,7 +38,7 @@ class TupCommitReq { friend bool printTUPCOMMITREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiverBlockNo); public: - STATIC_CONST( SignalLength = 5 ); + STATIC_CONST( SignalLength = 7 ); private: @@ -50,6 +50,8 @@ private: Uint32 hashValue; Uint32 diskpage; Uint32 gci_lo; + Uint32 transId1; + Uint32 transId2; }; #endif === modified file 'storage/ndb/include/ndbapi/NdbEventOperation.hpp' --- a/storage/ndb/include/ndbapi/NdbEventOperation.hpp 2011-02-01 23:27:25 +0000 +++ b/storage/ndb/include/ndbapi/NdbEventOperation.hpp 2011-05-25 14:31:47 +0000 @@ -221,6 +221,17 @@ public: Uint64 getLatestGCI() const; /** + * Retrieve the TransId of the latest retrieved event + * + * Only valid for data events. If the kernel does not + * support transaction ids with events, the max Uint64 + * value is returned. + * + * @return TransId + */ + Uint64 getTransId() const; + + /** * Get the latest error * * @return Error object. === modified file 'storage/ndb/src/common/debugger/signaldata/SumaImpl.cpp' --- a/storage/ndb/src/common/debugger/signaldata/SumaImpl.cpp 2011-02-01 23:27:25 +0000 +++ b/storage/ndb/src/common/debugger/signaldata/SumaImpl.cpp 2011-05-25 14:31:47 +0000 @@ -182,6 +182,11 @@ printSUB_TABLE_DATA(FILE * output, const fprintf(output, " tableId: %x\n", sig->tableId); fprintf(output, " operation: %x\n", SubTableData::getOperation(sig->requestInfo)); + if (len == SubTableData::SignalLengthWithTransId) + { + fprintf(output, " TransId : %x %x\n", + sig->transId1, sig->transId2); + } return false; } === modified file 'storage/ndb/src/kernel/blocks/LocalProxy.cpp' --- a/storage/ndb/src/kernel/blocks/LocalProxy.cpp 2011-04-27 10:48:16 +0000 +++ b/storage/ndb/src/kernel/blocks/LocalProxy.cpp 2011-05-26 11:52:38 +0000 @@ -661,6 +661,19 @@ LocalProxy::sendNF_COMPLETEREP(Signal* s sendSignal(DBDIH_REF, GSN_NF_COMPLETEREP, signal, NFCompleteRep::SignalLength, JBB); + + if (number() == DBTC) + { + /** + * DBTC send NF_COMPLETEREP "early" to QMGR + * so that it can allow api to handle node-failure of + * transactions eariler... + * See Qmgr::execNF_COMPLETEREP + */ + jam(); + sendSignal(QMGR_REF, GSN_NF_COMPLETEREP, signal, + NFCompleteRep::SignalLength, JBB); + } } } === modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp' --- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2011-05-25 09:30:37 +0000 +++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2011-05-26 11:49:47 +0000 @@ -1314,6 +1314,7 @@ private: struct DIVERIFY_queue { DIVERIFY_queue() { + m_ref = 0; cfirstVerifyQueue = clastVerifyQueue = 0; apiConnectRecord = 0; m_empty_done = 1; @@ -1322,6 +1323,7 @@ private: Uint32 cfirstVerifyQueue; Uint32 clastVerifyQueue; Uint32 m_empty_done; + Uint32 m_ref; }; bool isEmpty(const DIVERIFY_queue&); @@ -1330,7 +1332,7 @@ private: void emptyverificbuffer(Signal *, Uint32 q, bool aContintueB); void emptyverificbuffer_check(Signal*, Uint32, Uint32); - DIVERIFY_queue c_diverify_queue[1]; + DIVERIFY_queue c_diverify_queue[MAX_NDBMT_LQH_THREADS]; Uint32 c_diverify_queue_cnt; /*------------------------------------------------------------------------*/ === modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-05-25 09:30:37 +0000 +++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-05-26 11:49:47 +0000 @@ -9241,6 +9241,16 @@ inline void Dbdih::enqueue(DIVERIFY_queue & q, Uint32 senderData, Uint64 gci) { +#ifndef NDEBUG + /** + * - assert only + * - we must read first *before* "publishing last + * or else DIH-thread could already have consumed entry + * when we call assert + */ + Uint32 first = q.cfirstVerifyQueue; +#endif + Uint32 last = q.clastVerifyQueue; ApiConnectRecord * apiConnectRecord = q.apiConnectRecord; @@ -9255,7 +9265,7 @@ Dbdih::enqueue(DIVERIFY_queue & q, Uint3 { q.clastVerifyQueue = last + 1; } - assert(q.clastVerifyQueue != q.cfirstVerifyQueue); + assert(q.clastVerifyQueue != first); } inline @@ -9294,10 +9304,13 @@ void Dbdih::execDIVERIFYREQ(Signal* sign { EmulatedJamBuffer * jambuf = * (EmulatedJamBuffer**)(signal->theData+2); thrjamEntry(jambuf); + Uint32 qno = signal->theData[1]; + ndbassert(qno < NDB_ARRAY_SIZE(c_diverify_queue)); + DIVERIFY_queue & q = c_diverify_queue[qno]; loop: Uint32 val = m_micro_gcp.m_lock.read_lock(); Uint32 blocked = getBlockCommit() == true ? 1 : 0; - if (blocked == 0 && isEmpty(c_diverify_queue[0])) + if (blocked == 0 && isEmpty(q)) { thrjam(jambuf); /*-----------------------------------------------------------------------*/ @@ -9318,7 +9331,6 @@ loop: // Since we are blocked we need to put this operation last in the verify // queue to ensure that operation starts up in the correct order. /*-------------------------------------------------------------------------*/ - DIVERIFY_queue & q = c_diverify_queue[0]; enqueue(q, signal->theData[0], m_micro_gcp.m_new_gci); if (blocked == 0 && jambuf == jamBuffer()) { @@ -14775,7 +14787,7 @@ Dbdih::emptyverificbuffer(Signal* signal signal->theData[1] = (Uint32)(m_micro_gcp.m_current_gci >> 32); signal->theData[2] = (Uint32)(m_micro_gcp.m_current_gci & 0xFFFFFFFF); signal->theData[3] = 0; - sendSignal(clocaltcblockref, GSN_DIVERIFYCONF, signal, 4, JBB); + sendSignal(c_diverify_queue[q].m_ref, GSN_DIVERIFYCONF, signal, 4, JBB); } else if (aContinueB == true) { @@ -15522,8 +15534,13 @@ void Dbdih::initialiseRecordsLab(Signal* case 1:{ ApiConnectRecordPtr apiConnectptr; jam(); + c_diverify_queue[0].m_ref = calcTcBlockRef(getOwnNodeId()); for (Uint32 i = 0; i < c_diverify_queue_cnt; i++) { + if (c_diverify_queue_cnt > 1) + { + c_diverify_queue[i].m_ref = numberToRef(DBTC, i + 1, 0); + } /******** INTIALIZING API CONNECT RECORDS ********/ for (apiConnectptr.i = 0; apiConnectptr.i < capiConnectFileSize; apiConnectptr.i++) === modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp' --- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2011-05-25 15:03:11 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2011-05-29 10:55:32 +0000 @@ -3159,7 +3159,8 @@ private: public: bool is_same_trans(Uint32 opId, Uint32 trid1, Uint32 trid2); - void get_op_info(Uint32 opId, Uint32 *hash, Uint32* gci_hi, Uint32* gci_lo); + void get_op_info(Uint32 opId, Uint32 *hash, Uint32* gci_hi, Uint32* gci_lo, + Uint32* transId1, Uint32* transId2); void accminupdate(Signal*, Uint32 opPtrI, const Local_key*); void accremoverow(Signal*, Uint32 opPtrI, const Local_key*); @@ -3332,7 +3333,8 @@ Dblqh::is_same_trans(Uint32 opId, Uint32 inline void -Dblqh::get_op_info(Uint32 opId, Uint32 *hash, Uint32* gci_hi, Uint32* gci_lo) +Dblqh::get_op_info(Uint32 opId, Uint32 *hash, Uint32* gci_hi, Uint32* gci_lo, + Uint32* transId1, Uint32* transId2) { TcConnectionrecPtr regTcPtr; regTcPtr.i= opId; @@ -3340,6 +3342,8 @@ Dblqh::get_op_info(Uint32 opId, Uint32 * *hash = regTcPtr.p->hashValue; *gci_hi = regTcPtr.p->gci_hi; *gci_lo = regTcPtr.p->gci_lo; + *transId1 = regTcPtr.p->transid[0]; + *transId2 = regTcPtr.p->transid[1]; } #include "../dbacc/Dbacc.hpp" === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2011-05-25 15:03:11 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2011-05-29 10:55:32 +0000 @@ -8072,6 +8072,8 @@ void Dblqh::commitContinueAfterBlockedLa tupCommitReq->hashValue = regTcPtr.p->hashValue; tupCommitReq->diskpage = RNIL; tupCommitReq->gci_lo = regTcPtr.p->gci_lo; + tupCommitReq->transId1 = regTcPtr.p->transid[0]; + tupCommitReq->transId2 = regTcPtr.p->transid[1]; EXECUTE_DIRECT(tup, GSN_TUP_COMMITREQ, signal, TupCommitReq::SignalLength); @@ -10131,7 +10133,11 @@ Dblqh::seize_acc_ptr_list(ScanRecord* sc Uint32 segments= (new_batch_size + (SectionSegment::DataLength -2 )) / SectionSegment::DataLength; - ndbassert(segments >= scanP->scan_acc_segments); + if (segments <= scanP->scan_acc_segments) + { + // No need to allocate more segments. + return true; + } if (new_batch_size > 1) { === modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp' --- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2011-05-25 13:19:02 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2011-05-26 11:52:38 +0000 @@ -144,6 +144,13 @@ class Dbtc: public SimulatedBlock { public: + + /** + * Incase of mt-TC...only one instance will perform actual take-over + * let this be TAKE_OVER_INSTANCE + */ + STATIC_CONST( TAKE_OVER_INSTANCE = 1 ); + enum ConnectionState { CS_CONNECTED = 0, CS_DISCONNECTED = 1, === modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2011-05-25 15:03:11 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2011-05-29 10:55:32 +0000 @@ -4887,6 +4887,7 @@ void Dbtc::diverify010Lab(Signal* signal UintR TfirstfreeApiConnectCopy = cfirstfreeApiConnectCopy; ApiConnectRecord * const regApiPtr = apiConnectptr.p; signal->theData[0] = apiConnectptr.i; + signal->theData[1] = instance() ? instance() - 1 : 0; if (ERROR_INSERTED(8022)) { jam(); systemErrorLab(signal, __LINE__); @@ -8478,11 +8479,23 @@ Dbtc::checkNodeFailComplete(Signal* sign nfRep->blockNo = DBTC; nfRep->nodeId = cownNodeid; nfRep->failedNodeId = hostptr.i; - sendSignal(cdihblockref, GSN_NF_COMPLETEREP, signal, - NFCompleteRep::SignalLength, JBB); - sendSignal(QMGR_REF, GSN_NF_COMPLETEREP, signal, - NFCompleteRep::SignalLength, JBB); + if (instance() == 0) + { + jam(); + sendSignal(cdihblockref, GSN_NF_COMPLETEREP, signal, + NFCompleteRep::SignalLength, JBB); + sendSignal(QMGR_REF, GSN_NF_COMPLETEREP, signal, + NFCompleteRep::SignalLength, JBB); + } + else + { + /** + * Send to proxy + */ + sendSignal(DBTC_REF, GSN_NF_COMPLETEREP, signal, + NFCompleteRep::SignalLength, JBB); + } } CRASH_INSERTION(8058); @@ -8632,7 +8645,7 @@ Dbtc::checkScanFragList(Signal* signal, DEBUG("checkScanActiveInFailedLqh: scanFragError"); } -void Dbtc::execTAKE_OVERTCCONF(Signal* signal) +void Dbtc::execTAKE_OVERTCCONF(Signal* signal) { jamEntry(); @@ -8712,7 +8725,10 @@ void Dbtc::execTAKE_OVERTCREQ(Signal* si tcNodeFailptr.i = 0; ptrAss(tcNodeFailptr, tcFailRecord); if (tcNodeFailptr.p->failStatus != FS_IDLE || - cmasterNodeId != getOwnNodeId()) + cmasterNodeId != getOwnNodeId() || + (! (instance() == 0 /* single TC */ || + instance() == TAKE_OVER_INSTANCE))) /* in mt-TC case let 1 instance + do take-over */ { jam(); /*------------------------------------------------------------*/ @@ -8727,6 +8743,7 @@ void Dbtc::execTAKE_OVERTCREQ(Signal* si tcNodeFailptr.p->queueIndex = tcNodeFailptr.p->queueIndex + 1; return; }//if + ndbrequire(instance() == 0 || instance() == TAKE_OVER_INSTANCE); startTakeOverLab(signal); }//Dbtc::execTAKE_OVERTCREQ() === modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcProxy.cpp' --- a/storage/ndb/src/kernel/blocks/dbtc/DbtcProxy.cpp 2011-04-27 10:48:16 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcProxy.cpp 2011-05-26 11:54:16 +0000 @@ -79,6 +79,9 @@ DbtcProxy::DbtcProxy(Block_context& ctx) addRecSignal(GSN_DROP_INDX_IMPL_CONF,&DbtcProxy::execDROP_INDX_IMPL_CONF); addRecSignal(GSN_DROP_INDX_IMPL_REF, &DbtcProxy::execDROP_INDX_IMPL_REF); + // GSN_TAKE_OVERTCCONF + addRecSignal(GSN_TAKE_OVERTCCONF,&DbtcProxy::execTAKE_OVERTCCONF); + m_tc_seize_req_instance = 0; } @@ -500,7 +503,7 @@ DbtcProxy::execTCSEIZEREQ(Signal* signal return; } - signal->theData[2] = m_tc_seize_req_instance; + signal->theData[2] = 1 + m_tc_seize_req_instance; sendSignal(workerRef(m_tc_seize_req_instance), GSN_TCSEIZEREQ, signal, signal->getLength(), JBB); m_tc_seize_req_instance = (m_tc_seize_req_instance + 1) % c_workers; @@ -922,4 +925,25 @@ DbtcProxy::sendDROP_INDX_IMPL_CONF(Signa ssRelease(ssId); } +void +DbtcProxy::execTAKE_OVERTCCONF(Signal* signal) +{ + jamEntry(); + + if (!checkNodeFailSequence(signal)) + { + jam(); + return; + } + + for (Uint32 i = 0; i < c_workers; i++) + { + jam(); + Uint32 ref = numberToRef(number(), workerInstance(i), getOwnNodeId()); + sendSignal(ref, GSN_TAKE_OVERTCCONF, signal, + signal->getLength(), + JBB); + } +} + BLOCK_FUNCTIONS(DbtcProxy) === modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcProxy.hpp' --- a/storage/ndb/src/kernel/blocks/dbtc/DbtcProxy.hpp 2011-04-27 10:48:16 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcProxy.hpp 2011-05-26 11:52:38 +0000 @@ -304,6 +304,9 @@ protected: void execDROP_INDX_IMPL_CONF(Signal*); void execDROP_INDX_IMPL_REF(Signal*); void sendDROP_INDX_IMPL_CONF(Signal*, Uint32 ssId); + + // GSN_TAKE_OVERTCCONF + void execTAKE_OVERTCCONF(Signal*); }; #endif === modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp' --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp 2011-05-25 13:19:02 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp 2011-05-25 14:31:47 +0000 @@ -485,12 +485,14 @@ Dbtup::disk_page_commit_callback(Signal* { Uint32 hash_value; Uint32 gci_hi, gci_lo; + Uint32 transId1, transId2; OperationrecPtr regOperPtr; jamEntry(); c_operation_pool.getPtr(regOperPtr, opPtrI); - c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci_hi, &gci_lo); + c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci_hi, &gci_lo, + &transId1, &transId2); TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr(); @@ -499,6 +501,8 @@ Dbtup::disk_page_commit_callback(Signal* tupCommitReq->gci_hi= gci_hi; tupCommitReq->gci_lo= gci_lo; tupCommitReq->diskpage = page_id; + tupCommitReq->transId1 = transId1; + tupCommitReq->transId2 = transId2; regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0; regOperPtr.p->m_commit_disk_callback_page= page_id; @@ -526,12 +530,14 @@ Dbtup::disk_page_log_buffer_callback(Sig { Uint32 hash_value; Uint32 gci_hi, gci_lo; + Uint32 transId1, transId2; OperationrecPtr regOperPtr; jamEntry(); c_operation_pool.getPtr(regOperPtr, opPtrI); - c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci_hi, &gci_lo); + c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci_hi, &gci_lo, + &transId1, &transId2); Uint32 page= regOperPtr.p->m_commit_disk_callback_page; TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr(); @@ -541,6 +547,8 @@ Dbtup::disk_page_log_buffer_callback(Sig tupCommitReq->gci_hi= gci_hi; tupCommitReq->gci_lo= gci_lo; tupCommitReq->diskpage = page; + tupCommitReq->transId1 = transId1; + tupCommitReq->transId2 = transId2; ndbassert(regOperPtr.p->op_struct.m_load_diskpage_on_commit == 0); regOperPtr.p->op_struct.m_wait_log_buffer= 0; @@ -667,6 +675,8 @@ void Dbtup::execTUP_COMMITREQ(Signal* si Uint32 hash_value= tupCommitReq->hashValue; Uint32 gci_hi = tupCommitReq->gci_hi; Uint32 gci_lo = tupCommitReq->gci_lo; + Uint32 transId1 = tupCommitReq->transId1; + Uint32 transId2 = tupCommitReq->transId2; jamEntry(); @@ -687,6 +697,9 @@ void Dbtup::execTUP_COMMITREQ(Signal* si req_struct.hash_value= hash_value; req_struct.gci_hi = gci_hi; req_struct.gci_lo = gci_lo; + /* Put transid in req_struct, so detached triggers can access it */ + req_struct.trans_id1 = transId1; + req_struct.trans_id2 = transId2; regOperPtr.p->m_commit_disk_callback_page = tupCommitReq->diskpage; #ifdef VM_TRACE === modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp' --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp 2011-05-25 13:19:02 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp 2011-05-25 14:31:47 +0000 @@ -1510,6 +1510,8 @@ out: break; case (TriggerType::SUBSCRIPTION_BEFORE): // Only Suma jam(); + fireTrigOrd->m_transId1 = req_struct->trans_id1; + fireTrigOrd->m_transId2 = req_struct->trans_id2; fireTrigOrd->setGCI(req_struct->gci_hi); fireTrigOrd->setHashValue(req_struct->hash_value); fireTrigOrd->m_any_value = regOperPtr->m_any_value; === modified file 'storage/ndb/src/kernel/blocks/suma/Suma.cpp' --- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2011-05-19 09:38:03 +0000 +++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2011-05-29 10:55:32 +0000 @@ -4359,6 +4359,8 @@ Suma::execFIRE_TRIG_ORD(Signal* signal) const Uint64 gci = gci_lo | (Uint64(gci_hi) << 32); const Uint32 event = trg->getTriggerEvent(); const Uint32 any_value = trg->getAnyValue(); + const Uint32 transId1 = trg->m_transId1; + const Uint32 transId2 = trg->m_transId2; Ptr subPtr; c_subscriptionPool.getPtr(subPtr, trigId & 0xFFFF); @@ -4431,6 +4433,8 @@ Suma::execFIRE_TRIG_ORD(Signal* signal) data->flags = 0; data->anyValue = any_value; data->totalLen = ptrLen; + data->transId1 = transId1; + data->transId2 = transId2; { LocalDLList list(c_subscriberPool, subPtr.p->m_subscribers); @@ -4439,13 +4443,13 @@ Suma::execFIRE_TRIG_ORD(Signal* signal) { data->senderData = subbPtr.p->m_senderData; sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, - SubTableData::SignalLength, JBB, ptr, nptr); + SubTableData::SignalLengthWithTransId, JBB, ptr, nptr); } } } else { - const uint buffer_header_sz = 4; + const uint buffer_header_sz = 6; Uint32* dst; Uint32 sz = f_trigBufferSize + b_trigBufferSize + buffer_header_sz; if((dst = get_buffer_ptr(signal, bucket, gci, sz))) @@ -4454,6 +4458,8 @@ Suma::execFIRE_TRIG_ORD(Signal* signal) * dst++ = schemaVersion; * dst++ = (event << 16) | f_trigBufferSize; * dst++ = any_value; + * dst++ = transId1; + * dst++ = transId2; memcpy(dst, f_buffer, f_trigBufferSize << 2); dst += f_trigBufferSize; memcpy(dst, b_buffer, b_trigBufferSize << 2); @@ -6432,13 +6438,15 @@ Suma::resend_bucket(Signal* signal, Uint } else { - const uint buffer_header_sz = 4; + const uint buffer_header_sz = 6; g_cnt++; Uint32 subPtrI = * src++ ; Uint32 schemaVersion = * src++; Uint32 event = * src >> 16; Uint32 sz_1 = (* src ++) & 0xFFFF; Uint32 any_value = * src++; + Uint32 transId1 = * src++; + Uint32 transId2 = * src++; ndbassert(sz - buffer_header_sz >= sz_1); @@ -6470,6 +6478,8 @@ Suma::resend_bucket(Signal* signal, Uint data->flags = 0; data->anyValue = any_value; data->totalLen = ptrLen; + data->transId1 = transId1; + data->transId2 = transId2; { LocalDLList list(c_subscriberPool, @@ -6479,7 +6489,7 @@ Suma::resend_bucket(Signal* signal, Uint { data->senderData = subbPtr.p->m_senderData; sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, - SubTableData::SignalLength, JBB, ptr, nptr); + SubTableData::SignalLengthWithTransId, JBB, ptr, nptr); } } } === modified file 'storage/ndb/src/ndbapi/NdbEventOperation.cpp' --- a/storage/ndb/src/ndbapi/NdbEventOperation.cpp 2011-02-01 23:27:25 +0000 +++ b/storage/ndb/src/ndbapi/NdbEventOperation.cpp 2011-05-25 14:31:47 +0000 @@ -137,6 +137,12 @@ NdbEventOperation::getLatestGCI() const return m_impl.getLatestGCI(); } +Uint64 +NdbEventOperation::getTransId() const +{ + return m_impl.getTransId(); +} + NdbDictionary::Event::TableEvent NdbEventOperation::getEventType() const { === modified file 'storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp' --- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp 2011-02-23 12:15:04 +0000 +++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp 2011-05-25 14:31:47 +0000 @@ -791,6 +791,15 @@ NdbEventOperationImpl::getLatestGCI() return m_ndb->theEventBuffer->getLatestGCI(); } +Uint64 +NdbEventOperationImpl::getTransId() const +{ + /* Return 64 bit composite */ + Uint32 transId1 = m_data_item->sdata->transId1; + Uint32 transId2 = m_data_item->sdata->transId2; + return Uint64(transId1) << 32 | transId2; +} + bool NdbEventOperationImpl::execSUB_TABLE_DATA(const NdbApiSignal * signal, const LinearSectionPtr ptr[3]) @@ -2763,6 +2772,12 @@ NdbEventBuffer::copy_data(const SubTable { data->sdata->gci_lo = 0; } + if (len < SubTableData::SignalLengthWithTransId) + { + /* No TransId, set to uninit value */ + data->sdata->transId1 = ~Uint32(0); + data->sdata->transId2 = ~Uint32(0); + } int i; for (i = 0; i <= 2; i++) @@ -2838,6 +2853,11 @@ NdbEventBuffer::merge_data(const SubTabl { DBUG_ENTER_EVENT("NdbEventBuffer::merge_data"); + /* TODO : Consider how/if to merge multiple events/key with different + * transid + * Same consideration probably applies to AnyValue! + */ + Uint32 nkey = data->m_event_op->m_eventImpl->m_tableImpl->m_noOfKeys; int t1 = SubTableData::getOperation(data->sdata->requestInfo); === modified file 'storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp' --- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp 2011-02-01 23:27:25 +0000 +++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp 2011-05-25 14:31:47 +0000 @@ -382,6 +382,7 @@ public: Uint64 getGCI(); Uint32 getAnyValue() const; Uint64 getLatestGCI(); + Uint64 getTransId() const; bool execSUB_TABLE_DATA(const NdbApiSignal * signal, const LinearSectionPtr ptr[3]); === modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-05-11 13:31:44 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-05-26 14:44:59 +0000 @@ -3186,18 +3186,19 @@ NdbQueryImpl::OrderedFragSet::reorganize while(firstgetTransactionId(); + return transId ^ (transId >> 32); +} + +bool checkAnyValueTransId(Uint64 transId, Uint32 anyValue) +{ + return transId && (anyValue == Uint32(transId ^ (transId >> 32))); +} + struct receivedEvent { Uint32 pk; Uint32 count; @@ -295,6 +307,24 @@ eventOperation(Ndb* pNdb, const NdbDicti abort(); } + /* Check event transaction id */ + Uint32 anyValue = pOp->getAnyValue(); + Uint64 transId = pOp->getTransId(); + if (anyValue) + { + if (!checkAnyValueTransId(transId, anyValue)) + { + g_err << "ERROR : TransId and AnyValue mismatch. " + << "Transid : " << transId + << ", AnyValue : " << anyValue + << ", Expected AnyValue : " + << (Uint32) ((transId >> 32) ^ transId) + << endl; + abort(); + return NDBT_FAILED; + } + } + if ((int)pk < records) { recEvent[pk].pk = pk; recEvent[pk].count++; @@ -498,6 +528,8 @@ int runEventLoad(NDBT_Context* ctx, NDBT int records = ctx->getNumRecords(); HugoTransactions hugoTrans(*ctx->getTab()); + hugoTrans.setAnyValueCallback(setAnyValue); + sleep(1); #if 0 sleep(5); @@ -520,6 +552,7 @@ int runEventMixedLoad(NDBT_Context* ctx, int loops = ctx->getNumLoops(); int records = ctx->getNumRecords(); HugoTransactions hugoTrans(*ctx->getTab()); + hugoTrans.setAnyValueCallback(setAnyValue); if(ctx->getPropertyWait("LastGCI_hi", ~(Uint32)0)) { @@ -721,6 +754,24 @@ int runEventApplier(NDBT_Context* ctx, N abort(); } + /* Check event transaction id */ + Uint32 anyValue = pOp->getAnyValue(); + Uint64 transId = pOp->getTransId(); + if (anyValue) + { + if (!checkAnyValueTransId(transId, anyValue)) + { + g_err << "ERROR : TransId and AnyValue mismatch. " + << "Transid : " << transId + << ", AnyValue : " << anyValue + << ", Expected AnyValue : " + << (Uint32) ((transId >> 32) ^ transId) + << endl; + abort(); + return NDBT_FAILED; + } + } + for (i= 0; i < n_columns; i++) { if (recAttr[i]->isNULL()) === modified file 'storage/ndb/test/run-test/daily-basic-tests.txt' --- a/storage/ndb/test/run-test/daily-basic-tests.txt 2011-05-17 23:29:55 +0000 +++ b/storage/ndb/test/run-test/daily-basic-tests.txt 2011-05-27 07:53:31 +0000 @@ -1719,3 +1719,16 @@ max-time: 600 cmd: testNodeRestart args: -n ClusterSplitLatency T1 +# Refresh tuple +max-time: 300 +cmd: testBasic +args: -n RefreshTuple T6 D1 + +max-time: 300 +cmd: testIndex +args: -n RefreshWithOrderedIndex T2 D2 + +max-time: 300 +cmd: testBasic +args: -n RefreshLocking D1 + === modified file 'storage/ndb/test/run-test/daily-devel-tests.txt' --- a/storage/ndb/test/run-test/daily-devel-tests.txt 2011-05-25 13:19:02 +0000 +++ b/storage/ndb/test/run-test/daily-devel-tests.txt 2011-05-27 07:53:31 +0000 @@ -129,16 +129,3 @@ max-time: 1800 cmd: testDict args: -n SchemaTrans -l 1 -# Refresh tuple -max-time: 300 -cmd: testBasic -args: -n RefreshTuple T6 D1 - -max-time: 300 -cmd: testIndex -args: -n RefreshWithOrderedIndex T2 D2 - -max-time: 300 -cmd: testBasic -args: -n RefreshLocking D1 - === modified file 'storage/ndb/test/src/HugoOperations.cpp' --- a/storage/ndb/test/src/HugoOperations.cpp 2011-05-25 13:19:02 +0000 +++ b/storage/ndb/test/src/HugoOperations.cpp 2011-05-25 14:31:47 +0000 @@ -400,6 +400,8 @@ int HugoOperations::pkUpdateRecord(Ndb* Uint32 partId; if(getPartIdForRow(pOp, r+recordNo, partId)) pOp->setPartitionId(partId); + + pOp->setAnyValue(getAnyValueForRowUpd(r+recordNo, updatesValue)); } return NDBT_OK; @@ -803,7 +805,8 @@ HugoOperations::HugoOperations(const Ndb UtilTransactions(_tab, idx), pIndexScanOp(NULL), calc(_tab), - m_quiet(false) + m_quiet(false), + avCallback(NULL) { } @@ -1201,5 +1204,21 @@ HugoOperations::setNdbError(const NdbErr m_error.code = error.code ? error.code : 1; } +void +HugoOperations::setAnyValueCallback(AnyValueCallback avc) +{ + avCallback = avc; +} + +Uint32 +HugoOperations::getAnyValueForRowUpd(int row, int update) +{ + if (avCallback == NULL) + return 0; + + return (avCallback)(pTrans->getNdb(), pTrans, + row, update); +} + template class Vector; template class Vector; No bundle (reason: revision is a merge (you can force generation of a bundle with env var BZR_FORCE_BUNDLE=1)).