Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.1949 05/07/25 15:49:58 joreland@stripped +4 -0
Merge mysql.com:/home/jonas/src/mysql-5.0
into mysql.com:/home/jonas/src/mysql-5.1-ndb-dd
storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
1.37 05/07/25 15:49:55 joreland@stripped +0 -4
merge
storage/ndb/src/mgmsrv/ConfigInfo.cpp
1.65 05/07/25 15:47:37 joreland@stripped +0 -0
Auto merged
storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
1.60 05/07/25 15:47:36 joreland@stripped +0 -0
Auto merged
storage/ndb/include/mgmapi/mgmapi_config_parameters.h
1.20 05/07/25 15:47:36 joreland@stripped +0 -0
Auto merged
storage/ndb/src/mgmsrv/ConfigInfo.cpp
1.60.3.2 05/07/25 15:47:36 joreland@stripped +0 -0
Merge rename: ndb/src/mgmsrv/ConfigInfo.cpp -> storage/ndb/src/mgmsrv/ConfigInfo.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
1.3.6.2 05/07/25 15:47:36 joreland@stripped +0 -0
Merge rename: ndb/src/kernel/blocks/dbtup/DbtupGen.cpp -> storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
1.17.17.2 05/07/25 15:47:36 joreland@stripped +0 -0
Merge rename: ndb/src/kernel/blocks/dbacc/DbaccMain.cpp -> storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
storage/ndb/include/mgmapi/mgmapi_config_parameters.h
1.17.1.2 05/07/25 15:47:36 joreland@stripped +0 -0
Merge rename: ndb/include/mgmapi/mgmapi_config_parameters.h -> storage/ndb/include/mgmapi/mgmapi_config_parameters.h
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: joreland
# Host: eel.(none)
# Root: /home/jonas/src/mysql-5.1-ndb-dd/RESYNC
--- 1.17.17.1/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2005-07-25 13:23:29 +02:00
+++ 1.60/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2005-07-25 15:47:36 +02:00
@@ -28,7 +28,9 @@
#include <signaldata/FsRemoveReq.hpp>
#include <signaldata/DropTab.hpp>
#include <signaldata/DumpStateOrd.hpp>
+#include <signaldata/TuxMaint.hpp>
#include <SectionReader.hpp>
+#include <md5_hash.hpp>
// TO_DO_RONM is a label for comments on what needs to be improved in future versions
// when more time is given.
@@ -40,55 +42,6 @@
#endif
-Uint32
-Dbacc::remainingUndoPages(){
- Uint32 HeadPage = cundoposition >> ZUNDOPAGEINDEXBITS;
- Uint32 TailPage = clastUndoPageIdWritten;
-
- // Head must be larger or same as tail
- ndbrequire(HeadPage>=TailPage);
-
- Uint32 UsedPages = HeadPage - TailPage;
- Int32 Remaining = cundopagesize - UsedPages;
-
- // There can not be more than cundopagesize remaining
- if (Remaining <= 0){
- // No more undolog, crash node
- progError(__LINE__,
- ERR_NO_MORE_UNDOLOG,
- "There are more than 1Mbyte undolog writes outstanding");
- }
- return Remaining;
-}
-
-void
-Dbacc::updateLastUndoPageIdWritten(Signal* signal, Uint32 aNewValue){
- if (remainingUndoPages() < ZMIN_UNDO_PAGES_AT_COMMIT) {
- clastUndoPageIdWritten = aNewValue;
- if (remainingUndoPages() >= ZMIN_UNDO_PAGES_AT_COMMIT) {
- jam();
- EXECUTE_DIRECT(DBLQH, GSN_ACC_COM_UNBLOCK, signal, 1);
- jamEntry();
- }//if
- } else {
- clastUndoPageIdWritten = aNewValue;
- }//if
-}//Dbacc::updateLastUndoPageIdWritten()
-
-void
-Dbacc::updateUndoPositionPage(Signal* signal, Uint32 aNewValue){
- if (remainingUndoPages() >= ZMIN_UNDO_PAGES_AT_COMMIT) {
- cundoposition = aNewValue;
- if (remainingUndoPages() < ZMIN_UNDO_PAGES_AT_COMMIT) {
- jam();
- EXECUTE_DIRECT(DBLQH, GSN_ACC_COM_BLOCK, signal, 1);
- jamEntry();
- }//if
- } else {
- cundoposition = aNewValue;
- }//if
-}//Dbacc::updateUndoPositionPage()
-
// Signal entries and statement blocks
/* --------------------------------------------------------------------------------- */
/* --------------------------------------------------------------------------------- */
@@ -135,18 +88,6 @@
initialiseRecordsLab(signal, signal->theData[3], signal->theData[4]);
return;
break;
- case ZSR_READ_PAGES_ALLOC:
- jam();
- fragrecptr.i = tdata0;
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- srReadPagesAllocLab(signal);
- return;
- break;
- case ZSTART_UNDO:
- jam();
- startUndoLab(signal);
- return;
- break;
case ZSEND_SCAN_HBREP:
jam();
sendScanHbRep(signal, tdata0);
@@ -200,17 +141,6 @@
return;
}
- case ZLCP_OP_WRITE_RT_BREAK:
- {
- operationRecPtr.i= signal->theData[1];
- fragrecptr.i= signal->theData[2];
- lcpConnectptr.i= signal->theData[3];
- ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- lcp_write_op_to_undolog(signal);
- return;
- }
default:
ndbrequire(false);
break;
@@ -218,280 +148,6 @@
return;
}//Dbacc::execCONTINUEB()
-/* ******************--------------------------------------------------------------- */
-/* FSCLOSECONF CLOSE FILE CONF */
-/* ******************------------------------------+ */
-/* SENDER: FS, LEVEL B */
-void Dbacc::execFSCLOSECONF(Signal* signal)
-{
- jamEntry();
- fsConnectptr.i = signal->theData[0];
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- tresult = 0;
- switch (fsConnectptr.p->fsState) {
- case WAIT_CLOSE_UNDO:
- jam();
- releaseFsConnRec(signal);
- break;
- case LCP_CLOSE_DATA:
- jam();
- checkSyncUndoPagesLab(signal);
- return;
- break;
- case SR_CLOSE_DATA:
- jam();
- sendaccSrconfLab(signal);
- return;
- break;
- default:
- ndbrequire(false);
- break;
- }//switch
- return;
-}//Dbacc::execFSCLOSECONF()
-
-/* ******************--------------------------------------------------------------- */
-/* FSCLOSEREF OPENFILE CONF */
-/* ******************------------------------------+ */
-/* SENDER: FS, LEVEL B */
-void Dbacc::execFSCLOSEREF(Signal* signal)
-{
- jamEntry();
- ndbrequire(false);
-}//Dbacc::execFSCLOSEREF()
-
-/* ******************--------------------------------------------------------------- */
-/* FSOPENCONF OPENFILE CONF */
-/* ******************------------------------------+ */
-/* SENDER: FS, LEVEL B */
-void Dbacc::execFSOPENCONF(Signal* signal)
-{
- jamEntry();
- fsConnectptr.i = signal->theData[0];
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- tuserptr = signal->theData[1];
- tresult = 0; /* RESULT CHECK VALUE */
- switch (fsConnectptr.p->fsState) {
- case WAIT_OPEN_UNDO_LCP:
- jam();
- lcpOpenUndofileConfLab(signal);
- return;
- break;
- case WAIT_OPEN_UNDO_LCP_NEXT:
- jam();
- fsConnectptr.p->fsPtr = tuserptr;
- return;
- break;
- case OPEN_UNDO_FILE_SR:
- jam();
- fsConnectptr.p->fsPtr = tuserptr;
- srStartUndoLab(signal);
- return;
- break;
- case WAIT_OPEN_DATA_FILE_FOR_WRITE:
- jam();
- lcpFsOpenConfLab(signal);
- return;
- break;
- case WAIT_OPEN_DATA_FILE_FOR_READ:
- jam();
- fsConnectptr.p->fsPtr = tuserptr;
- srFsOpenConfLab(signal);
- return;
- break;
- default:
- ndbrequire(false);
- break;
- }//switch
- return;
-}//Dbacc::execFSOPENCONF()
-
-/* ******************--------------------------------------------------------------- */
-/* FSOPENREF OPENFILE REF */
-/* ******************------------------------------+ */
-/* SENDER: FS, LEVEL B */
-void Dbacc::execFSOPENREF(Signal* signal)
-{
- jamEntry();
- ndbrequire(false);
-}//Dbacc::execFSOPENREF()
-
-/* ******************--------------------------------------------------------------- */
-/* FSREADCONF OPENFILE CONF */
-/* ******************------------------------------+ */
-/* SENDER: FS, LEVEL B */
-void Dbacc::execFSREADCONF(Signal* signal)
-{
- jamEntry();
- fsConnectptr.i = signal->theData[0];
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- tresult = 0; /* RESULT CHECK VALUE */
- switch (fsConnectptr.p->fsState) {
- case WAIT_READ_PAGE_ZERO:
- jam();
- fragrecptr.i = fsConnectptr.p->fragrecPtr;
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- srReadPageZeroLab(signal);
- return;
- break;
- case WAIT_READ_DATA:
- jam();
- fragrecptr.i = fsConnectptr.p->fragrecPtr;
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- storeDataPageInDirectoryLab(signal);
- return;
- break;
- case READ_UNDO_PAGE:
- jam();
- srDoUndoLab(signal);
- return;
- break;
- case READ_UNDO_PAGE_AND_CLOSE:
- jam();
- fsConnectptr.p->fsState = WAIT_CLOSE_UNDO;
- /* ************************ */
- /* FSCLOSEREQ */
- /* ************************ */
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsConnectptr.i;
- signal->theData[3] = 0;
- sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, 4, JBA);
- /* FLAG = DO NOT DELETE FILE */
- srDoUndoLab(signal);
- return;
- break;
- default:
- ndbrequire(false);
- break;
- }//switch
- return;
-}//Dbacc::execFSREADCONF()
-
-/* ******************--------------------------------------------------------------- */
-/* FSREADRREF OPENFILE CONF */
-/* ******************------------------------------+ */
-/* SENDER: FS, LEVEL B */
-void Dbacc::execFSREADREF(Signal* signal)
-{
- jamEntry();
- progError(0, __LINE__, "Read of file refused");
- return;
-}//Dbacc::execFSREADREF()
-
-/* ******************--------------------------------------------------------------- */
-/* FSWRITECONF OPENFILE CONF */
-/* ******************------------------------------+ */
-/* SENDER: FS, LEVEL B */
-void Dbacc::execFSWRITECONF(Signal* signal)
-{
- jamEntry();
- fsOpptr.i = signal->theData[0];
- ptrCheckGuard(fsOpptr, cfsOpsize, fsOprec);
- /* FS_OPERATION PTR */
- tresult = 0; /* RESULT CHECK VALUE */
- fsConnectptr.i = fsOpptr.p->fsConptr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- fragrecptr.i = fsOpptr.p->fsOpfragrecPtr;
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- switch (fsOpptr.p->fsOpstate) {
- case WAIT_WRITE_UNDO:
- jam();
- updateLastUndoPageIdWritten(signal, fsOpptr.p->fsOpMemPage);
- releaseFsOpRec(signal);
- if (fragrecptr.p->nrWaitWriteUndoExit == 0) {
- jam();
- checkSendLcpConfLab(signal);
- return;
- } else {
- jam();
- fragrecptr.p->lastUndoIsStored = ZTRUE;
- }//if
- return;
- break;
- case WAIT_WRITE_UNDO_EXIT:
- jam();
- updateLastUndoPageIdWritten(signal, fsOpptr.p->fsOpMemPage);
- releaseFsOpRec(signal);
- if (fragrecptr.p->nrWaitWriteUndoExit > 0) {
- jam();
- fragrecptr.p->nrWaitWriteUndoExit--;
- }//if
- if (fsConnectptr.p->fsState == WAIT_CLOSE_UNDO) {
- jam();
- /* ************************ */
- /* FSCLOSEREQ */
- /* ************************ */
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsConnectptr.i;
- signal->theData[3] = ZFALSE;
- sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, 4, JBA);
- }//if
- if (fragrecptr.p->nrWaitWriteUndoExit == 0) {
- if (fragrecptr.p->lastUndoIsStored == ZTRUE) {
- jam();
- fragrecptr.p->lastUndoIsStored = ZFALSE;
- checkSendLcpConfLab(signal);
- return;
- }//if
- }//if
- return;
- break;
- case WAIT_WRITE_DATA:
- jam();
- releaseFsOpRec(signal);
- fragrecptr.p->activeDataFilePage += ZWRITEPAGESIZE;
- fragrecptr.p->activeDataPage = 0;
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- lcpConnectptr.i = rootfragrecptr.p->lcpPtr;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- switch (fragrecptr.p->fragState) {
- case LCP_SEND_PAGES:
- jam();
- savepagesLab(signal);
- return;
- break;
- case LCP_SEND_OVER_PAGES:
- jam();
- saveOverPagesLab(signal);
- return;
- break;
- case LCP_SEND_ZERO_PAGE:
- jam();
- saveZeroPageLab(signal);
- return;
- break;
- case WAIT_ZERO_PAGE_STORED:
- jam();
- lcpCloseDataFileLab(signal);
- return;
- break;
- default:
- ndbrequire(false);
- return;
- break;
- }//switch
- break;
- default:
- ndbrequire(false);
- break;
- }//switch
- return;
-}//Dbacc::execFSWRITECONF()
-
-/* ******************--------------------------------------------------------------- */
-/* FSWRITEREF OPENFILE CONF */
-/* ******************------------------------------+ */
-/* SENDER: FS, LEVEL B */
-void Dbacc::execFSWRITEREF(Signal* signal)
-{
- jamEntry();
- progError(0, __LINE__, "Write to file refused");
- return;
-}//Dbacc::execFSWRITEREF()
-
/* ------------------------------------------------------------------------- */
/* ------------------------------------------------------------------------- */
/* ------------------------------------------------------------------------- */
@@ -620,12 +276,7 @@
initialiseTableRec(signal);
break;
case 1:
- jam();
- initialiseFsConnectionRec(signal);
- break;
case 2:
- jam();
- initialiseFsOpRec(signal);
break;
case 3:
jam();
@@ -657,7 +308,6 @@
break;
case 10:
jam();
- initialiseRootfragRec(signal);
break;
case 11:
jam();
@@ -665,7 +315,6 @@
break;
case 12:
jam();
- initialiseSrVerRec(signal);
{
ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
@@ -723,8 +372,6 @@
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_ACC_OVERFLOW_RECS,
&coverflowrecsize));
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_ACC_PAGE8, &cpagesize));
- ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_ACC_ROOT_FRAG,
- &crootfragmentsize));
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_ACC_TABLE, &ctablesize));
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_ACC_SCAN, &cscanRecSize));
initRecords();
@@ -824,46 +471,6 @@
}//Dbacc::initialiseFragRec()
/* --------------------------------------------------------------------------------- */
-/* INITIALISE_FS_CONNECTION_REC */
-/* INITIALATES THE FS_CONNECTION RECORDS */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::initialiseFsConnectionRec(Signal* signal)
-{
- ndbrequire(cfsConnectsize > 0);
- for (fsConnectptr.i = 0; fsConnectptr.i < cfsConnectsize; fsConnectptr.i++) {
- ptrAss(fsConnectptr, fsConnectrec);
- fsConnectptr.p->fsNext = fsConnectptr.i + 1;
- fsConnectptr.p->fsPrev = RNIL;
- fsConnectptr.p->fragrecPtr = RNIL;
- fsConnectptr.p->fsState = WAIT_NOTHING;
- }//for
- fsConnectptr.i = cfsConnectsize - 1;
- ptrAss(fsConnectptr, fsConnectrec);
- fsConnectptr.p->fsNext = RNIL; /* INITIALITES THE LAST CONNECTRECORD */
- cfsFirstfreeconnect = 0; /* INITIATES THE FIRST FREE CONNECT RECORD */
-}//Dbacc::initialiseFsConnectionRec()
-
-/* --------------------------------------------------------------------------------- */
-/* INITIALISE_FS_OP_REC */
-/* INITIALATES THE FS_OP RECORDS */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::initialiseFsOpRec(Signal* signal)
-{
- ndbrequire(cfsOpsize > 0);
- for (fsOpptr.i = 0; fsOpptr.i < cfsOpsize; fsOpptr.i++) {
- ptrAss(fsOpptr, fsOprec);
- fsOpptr.p->fsOpnext = fsOpptr.i + 1;
- fsOpptr.p->fsOpfragrecPtr = RNIL;
- fsOpptr.p->fsConptr = RNIL;
- fsOpptr.p->fsOpstate = WAIT_NOTHING;
- }//for
- fsOpptr.i = cfsOpsize - 1;
- ptrAss(fsOpptr, fsOprec);
- fsOpptr.p->fsOpnext = RNIL;
- cfsFirstfreeop = 0;
-}//Dbacc::initialiseFsOpRec()
-
-/* --------------------------------------------------------------------------------- */
/* INITIALISE_LCP_CONNECTION_REC */
/* INITIALATES THE LCP_CONNECTION RECORDS */
/* --------------------------------------------------------------------------------- */
@@ -874,7 +481,7 @@
ptrAss(lcpConnectptr, lcpConnectrec);
lcpConnectptr.p->nextLcpConn = lcpConnectptr.i + 1;
lcpConnectptr.p->lcpUserptr = RNIL;
- lcpConnectptr.p->rootrecptr = RNIL;
+ lcpConnectptr.p->fragrecptr = RNIL;
lcpConnectptr.p->lcpstate = LCP_FREE;
}//for
lcpConnectptr.i = clcpConnectsize - 1;
@@ -971,22 +578,6 @@
/* INITIALISE_ROOTFRAG_REC */
/* INITIALATES THE ROOTFRAG RECORDS. */
/* --------------------------------------------------------------------------------- */
-void Dbacc::initialiseRootfragRec(Signal* signal)
-{
- ndbrequire(crootfragmentsize > 0);
- for (rootfragrecptr.i = 0; rootfragrecptr.i < crootfragmentsize; rootfragrecptr.i++) {
- refresh_watch_dog();
- ptrAss(rootfragrecptr, rootfragmentrec);
- rootfragrecptr.p->nextroot = rootfragrecptr.i + 1;
- rootfragrecptr.p->fragmentptr[0] = RNIL;
- rootfragrecptr.p->fragmentptr[1] = RNIL;
- }//for
- rootfragrecptr.i = crootfragmentsize - 1;
- ptrAss(rootfragrecptr, rootfragmentrec);
- rootfragrecptr.p->nextroot = RNIL;
- cfirstfreerootfrag = 0;
-}//Dbacc::initialiseRootfragRec()
-
/* --------------------------------------------------------------------------------- */
/* INITIALISE_SCAN_REC */
/* INITIALATES THE QUE_SCAN RECORDS. */
@@ -1007,21 +598,6 @@
cfirstFreeScanRec = 0;
}//Dbacc::initialiseScanRec()
-/* --------------------------------------------------------------------------------- */
-/* INITIALISE_SR_VER_REC */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::initialiseSrVerRec(Signal* signal)
-{
- ndbrequire(csrVersionRecSize > 0);
- for (srVersionPtr.i = 0; srVersionPtr.i < csrVersionRecSize; srVersionPtr.i++) {
- ptrAss(srVersionPtr, srVersionRec);
- srVersionPtr.p->nextFreeSr = srVersionPtr.i + 1;
- }//for
- srVersionPtr.i = csrVersionRecSize - 1;
- ptrAss(srVersionPtr, srVersionRec);
- srVersionPtr.p->nextFreeSr = RNIL;
- cfirstFreeSrVersionRec = 0;
-}//Dbacc::initialiseSrVerRec()
/* --------------------------------------------------------------------------------- */
/* INITIALISE_TABLE_REC */
@@ -1063,18 +639,7 @@
/* --------------------------------------------------------------------------------- */
/* --------------------------------------------------------------------------------- */
-void Dbacc::initRootfragrec(Signal* signal)
-{
- const AccFragReq * const req = (AccFragReq*)&signal->theData[0];
- rootfragrecptr.p->mytabptr = req->tableId;
- rootfragrecptr.p->roothashcheck = req->kValue + req->lhFragBits;
- rootfragrecptr.p->noOfElements = 0;
- rootfragrecptr.p->m_commit_count = 0;
- for (Uint32 i = 0; i < MAX_PARALLEL_SCANS_PER_FRAG; i++) {
- rootfragrecptr.p->scan[i] = RNIL;
- }//for
-}//Dbacc::initRootfragrec()
-
+// JONAS This methods "aer ett saall"
void Dbacc::execACCFRAGREQ(Signal* signal)
{
const AccFragReq * const req = (AccFragReq*)&signal->theData[0];
@@ -1096,91 +661,85 @@
#endif
ptrCheckGuard(tabptr, ctablesize, tabrec);
ndbrequire((req->reqInfo & 0xF) == ZADDFRAG);
- ndbrequire(!getrootfragmentrec(signal, rootfragrecptr, req->fragId));
- if (cfirstfreerootfrag == RNIL) {
+ ndbrequire(!getfragmentrec(signal, fragrecptr, req->fragId));
+ if (cfirstfreefrag == RNIL) {
jam();
- addFragRefuse(signal, ZFULL_ROOTFRAGRECORD_ERROR);
+ addFragRefuse(signal, ZFULL_FRAGRECORD_ERROR);
return;
}//if
- seizeRootfragrec(signal);
- if (!addfragtotab(signal, rootfragrecptr.i, req->fragId)) {
+
+ seizeFragrec(signal);
+ initFragGeneral(fragrecptr);
+ initFragAdd(signal, fragrecptr);
+
+ if (!addfragtotab(signal, fragrecptr.i, req->fragId)) {
jam();
- releaseRootFragRecord(signal, rootfragrecptr);
- addFragRefuse(signal, ZFULL_ROOTFRAGRECORD_ERROR);
+ releaseFragRecord(signal, fragrecptr);
+ addFragRefuse(signal, ZFULL_FRAGRECORD_ERROR);
return;
}//if
- initRootfragrec(signal);
- for (Uint32 i = 0; i < 2; i++) {
+ if (cfirstfreeDirrange == RNIL) {
jam();
- if (cfirstfreefrag == RNIL) {
- jam();
- addFragRefuse(signal, ZFULL_FRAGRECORD_ERROR);
- return;
- }//if
- seizeFragrec(signal);
- initFragGeneral(fragrecptr);
- initFragAdd(signal, i, rootfragrecptr.i, fragrecptr);
- rootfragrecptr.p->fragmentptr[i] = fragrecptr.i;
- rootfragrecptr.p->fragmentid[i] = fragrecptr.p->myfid;
- if (cfirstfreeDirrange == RNIL) {
- jam();
- addFragRefuse(signal, ZDIR_RANGE_ERROR);
- return;
- } else {
- jam();
- seizeDirrange(signal);
- }//if
- fragrecptr.p->directory = newDirRangePtr.i;
- seizeDirectory(signal);
- if (tresult < ZLIMIT_OF_ERROR) {
- jam();
- newDirRangePtr.p->dirArray[0] = sdDirptr.i;
- } else {
- jam();
- addFragRefuse(signal, tresult);
- return;
- }//if
- seizePage(signal);
- if (tresult > ZLIMIT_OF_ERROR) {
- jam();
- addFragRefuse(signal, tresult);
- return;
- }//if
- sdDirptr.p->pagep[0] = spPageptr.i;
- tipPageId = 0;
- inpPageptr = spPageptr;
- initPage(signal);
- if (cfirstfreeDirrange == RNIL) {
- jam();
- addFragRefuse(signal, ZDIR_RANGE_ERROR);
- return;
- } else {
- jam();
- seizeDirrange(signal);
- }//if
- fragrecptr.p->overflowdir = newDirRangePtr.i;
- seizeDirectory(signal);
- if (tresult < ZLIMIT_OF_ERROR) {
- jam();
- newDirRangePtr.p->dirArray[0] = sdDirptr.i;
- } else {
- jam();
- addFragRefuse(signal, tresult);
- return;
- }//if
- }//for
+ releaseFragRecord(signal, fragrecptr);
+ addFragRefuse(signal, ZDIR_RANGE_ERROR);
+ return;
+ } else {
+ jam();
+ seizeDirrange(signal);
+ }//if
+
+ fragrecptr.p->directory = newDirRangePtr.i;
+ seizeDirectory(signal);
+ if (tresult < ZLIMIT_OF_ERROR) {
+ jam();
+ newDirRangePtr.p->dirArray[0] = sdDirptr.i;
+ } else {
+ jam();
+ addFragRefuse(signal, tresult);
+ return;
+ }//if
+
+ seizePage(signal);
+ if (tresult > ZLIMIT_OF_ERROR) {
+ jam();
+ addFragRefuse(signal, tresult);
+ return;
+ }//if
+ sdDirptr.p->pagep[0] = spPageptr.i;
+ tipPageId = 0;
+ inpPageptr = spPageptr;
+ initPage(signal);
+ if (cfirstfreeDirrange == RNIL) {
+ jam();
+ addFragRefuse(signal, ZDIR_RANGE_ERROR);
+ return;
+ } else {
+ jam();
+ seizeDirrange(signal);
+ }//if
+ fragrecptr.p->overflowdir = newDirRangePtr.i;
+ seizeDirectory(signal);
+ if (tresult < ZLIMIT_OF_ERROR) {
+ jam();
+ newDirRangePtr.p->dirArray[0] = sdDirptr.i;
+ } else {
+ jam();
+ addFragRefuse(signal, tresult);
+ return;
+ }//if
+
Uint32 userPtr = req->userPtr;
BlockReference retRef = req->userRef;
- rootfragrecptr.p->rootState = ACTIVEROOT;
+ fragrecptr.p->rootState = ACTIVEROOT;
AccFragConf * const conf = (AccFragConf*)&signal->theData[0];
conf->userPtr = userPtr;
- conf->rootFragPtr = rootfragrecptr.i;
- conf->fragId[0] = rootfragrecptr.p->fragmentid[0];
- conf->fragId[1] = rootfragrecptr.p->fragmentid[1];
- conf->fragPtr[0] = rootfragrecptr.p->fragmentptr[0];
- conf->fragPtr[1] = rootfragrecptr.p->fragmentptr[1];
- conf->rootHashCheck = rootfragrecptr.p->roothashcheck;
+ conf->rootFragPtr = RNIL;
+ conf->fragId[0] = fragrecptr.p->fragmentid;
+ conf->fragId[1] = RNIL;
+ conf->fragPtr[0] = fragrecptr.i;
+ conf->fragPtr[1] = RNIL;
+ conf->rootHashCheck = fragrecptr.p->roothashcheck;
sendSignal(retRef, GSN_ACCFRAGCONF, signal, AccFragConf::SignalLength, JBB);
}//Dbacc::execACCFRAGREQ()
@@ -1224,12 +783,17 @@
jam();
break;
}
+ csNumber >>= 16;
CHARSET_INFO* cs = 0;
if (csNumber != 0) {
cs = all_charsets[csNumber];
ndbrequire(cs != 0);
hasCharAttr = 1;
}
+ if (AttributeDescriptor::getArrayType(attributeDescriptor) !=
+ NDB_ARRAYTYPE_FIXED) {
+ hasCharAttr = 1; // has Var attr
+ }
tabptr.p->keyAttr[i].attributeDescriptor = attributeDescriptor;
tabptr.p->keyAttr[i].charsetInfo = cs;
i++;
@@ -1244,13 +808,9 @@
for (Uint32 i1 = 0; i1 < MAX_FRAG_PER_NODE; i1++) {
jam();
if (tabptr.p->fragptrholder[i1] != RNIL) {
- rootfragrecptr.i = tabptr.p->fragptrholder[i1];
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- for (Uint32 i2 = 0; i2 < 2; i2++) {
- fragrecptr.i = rootfragrecptr.p->fragmentptr[i2];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- fragrecptr.p->hasCharAttr = hasCharAttr;
- }
+ fragrecptr.i = tabptr.p->fragptrholder[i1];
+ ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
+ fragrecptr.p->hasCharAttr = hasCharAttr;
}
}
@@ -1276,7 +836,7 @@
void Dbacc::releaseRootFragResources(Signal* signal, Uint32 tableId)
{
- RootfragmentrecPtr rootPtr;
+ FragmentrecPtr rootPtr;
TabrecPtr tabPtr;
tabPtr.i = tableId;
ptrCheckGuard(tabPtr, ctablesize, tabrec);
@@ -1284,41 +844,18 @@
jam();
if (tabPtr.p->fragholder[i] != RNIL) {
jam();
- Uint32 fragIndex;
- rootPtr.i = tabPtr.p->fragptrholder[i];
- ptrCheckGuard(rootPtr, crootfragmentsize, rootfragmentrec);
- if (rootPtr.p->fragmentptr[0] != RNIL) {
- jam();
- fragIndex = rootPtr.p->fragmentptr[0];
- rootPtr.p->fragmentptr[0] = RNIL;
- } else if (rootPtr.p->fragmentptr[1] != RNIL) {
- jam();
- fragIndex = rootPtr.p->fragmentptr[1];
- rootPtr.p->fragmentptr[1] = RNIL;
- } else {
- jam();
- releaseRootFragRecord(signal, rootPtr);
- tabPtr.p->fragholder[i] = RNIL;
- tabPtr.p->fragptrholder[i] = RNIL;
- continue;
- }//if
- releaseFragResources(signal, fragIndex);
+ tabPtr.p->fragholder[i] = RNIL;
+ releaseFragResources(signal, tabPtr.p->fragptrholder[i]);
return;
}//if
}//for
-
+
/**
* Finished...
*/
sendFSREMOVEREQ(signal, tableId);
}//Dbacc::releaseRootFragResources()
-void Dbacc::releaseRootFragRecord(Signal* signal, RootfragmentrecPtr rootPtr)
-{
- rootPtr.p->nextroot = cfirstfreerootfrag;
- cfirstfreerootfrag = rootPtr.i;
-}//Dbacc::releaseRootFragRecord()
-
void Dbacc::releaseFragResources(Signal* signal, Uint32 fragIndex)
{
FragmentrecPtr regFragPtr;
@@ -1340,13 +877,11 @@
jam();
releaseDirIndexResources(signal, regFragPtr);
} else {
- RootfragmentrecPtr rootPtr;
jam();
- rootPtr.i = regFragPtr.p->myroot;
- ptrCheckGuard(rootPtr, crootfragmentsize, rootfragmentrec);
+ Uint32 tab = regFragPtr.p->mytabptr;
releaseFragRecord(signal, regFragPtr);
signal->theData[0] = ZREL_ROOT_FRAG;
- signal->theData[1] = rootPtr.p->mytabptr;
+ signal->theData[1] = tab;
sendSignal(cownBlockref, GSN_CONTINUEB, signal, 2, JBB);
}//if
}//Dbacc::releaseFragResources()
@@ -1358,13 +893,8 @@
ndbrequire(regFragPtr.p->datapages[i] == RNIL);
}//for
ndbrequire(regFragPtr.p->lockOwnersList == RNIL);
- ndbrequire(regFragPtr.p->firstWaitInQueOp == RNIL);
- ndbrequire(regFragPtr.p->lastWaitInQueOp == RNIL);
- ndbrequire(regFragPtr.p->sentWaitInQueOp == RNIL);
//ndbrequire(regFragPtr.p->fsConnPtr == RNIL);
ndbrequire(regFragPtr.p->zeroPagePtr == RNIL);
- ndbrequire(regFragPtr.p->nrWaitWriteUndoExit == 0);
- ndbrequire(regFragPtr.p->sentWaitInQueOp == RNIL);
}//Dbacc::verifyFragCorrect()
void Dbacc::releaseDirResources(Signal* signal,
@@ -1659,9 +1189,16 @@
// bit to mark lock operation
operationRecPtr.p->isAccLockReq = (Treqinfo >> 31) & 0x1;
-
// undo log is not run via ACCKEYREQ
operationRecPtr.p->isUndoLogReq = 0;
+
+ if(ERROR_INSERTED(5900) || ERROR_INSERTED(5901))
+ {
+ for(unsigned i = 0; i<8 && i<signal->theData[4]; i++){
+ operationRecPtr.p->keydata[i] = signal->theData[i+7];
+ }
+ }
+
}//Dbacc::initOpRec()
/* --------------------------------------------------------------------------------- */
@@ -1854,12 +1391,20 @@
while (i < noOfKeyAttr) {
const Tabrec::KeyAttr& keyAttr = tabptr.p->keyAttr[i];
+ const uchar* srcPtr = (const uchar*)&src[srcPos];
- Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(keyAttr.attributeDescriptor);
+ Uint32 typeId =
+ AttributeDescriptor::getType(keyAttr.attributeDescriptor);
+ Uint32 maxBytes =
+ AttributeDescriptor::getSizeInBytes(keyAttr.attributeDescriptor);
+ Uint32 lb, len;
+ bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, maxBytes, lb, len);
+ ndbrequire(ok);
+
+ Uint32 srcBytes = lb + len;
Uint32 srcWords = (srcBytes + 3) / 4;
Uint32 dstWords = ~0;
uchar* dstPtr = (uchar*)&dst[dstPos];
- const uchar* srcPtr = (const uchar*)&src[srcPos];
CHARSET_INFO* cs = keyAttr.charsetInfo;
if (cs == 0) {
@@ -1868,15 +1413,11 @@
dstWords = srcWords;
} else {
jam();
- Uint32 typeId = AttributeDescriptor::getType(keyAttr.attributeDescriptor);
- Uint32 lb, len;
- bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len);
- ndbrequire(ok);
Uint32 xmul = cs->strxfrm_multiply;
if (xmul == 0)
xmul = 1;
// see comment in DbtcMain.cpp
- Uint32 dstLen = xmul * (srcBytes - lb);
+ Uint32 dstLen = xmul * (maxBytes - lb);
ndbrequire(dstLen <= ((dstSize - dstPos) << 2));
int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
ndbrequire(n != -1);
@@ -1965,13 +1506,6 @@
/* --------------------------------------------------------------------------------- */
void Dbacc::insertelementLab(Signal* signal)
{
- if (fragrecptr.p->createLcp == ZTRUE) {
- if (remainingUndoPages() < ZMIN_UNDO_PAGES_AT_OPERATION) {
- jam();
- acckeyref1Lab(signal, ZTEMPORARY_ACC_UNDO_FAILURE);
- return;
- }//if
- }//if
if (fragrecptr.p->firstOverflowRec == RNIL) {
jam();
allocOverflowPage(signal);
@@ -1981,23 +1515,28 @@
return;
}//if
}//if
- if (fragrecptr.p->keyLength != operationRecPtr.p->tupkeylen) {
- // historical
- ndbrequire(fragrecptr.p->keyLength == 0);
- }//if
-
- signal->theData[0] = operationRecPtr.p->userptr;
- Uint32 blockNo = refToBlock(operationRecPtr.p->userblockref);
- EXECUTE_DIRECT(blockNo, GSN_LQH_ALLOCREQ, signal, 1);
- jamEntry();
- if (signal->theData[0] != 0) {
- jam();
- Uint32 result_code = signal->theData[0];
- acckeyref1Lab(signal, result_code);
- return;
- }//if
- Uint32 localKey = (signal->theData[1] << MAX_TUPLES_BITS) + signal->theData[2];
+ ndbrequire(operationRecPtr.p->tupkeylen <= fragrecptr.p->keyLength);
+ Uint32 localKey;
+ if(!operationRecPtr.p->isAccLockReq)
+ {
+ signal->theData[0] = operationRecPtr.p->userptr;
+ Uint32 blockNo = refToBlock(operationRecPtr.p->userblockref);
+ EXECUTE_DIRECT(blockNo, GSN_LQH_ALLOCREQ, signal, 1);
+ jamEntry();
+ if (signal->theData[0] != 0) {
+ jam();
+ Uint32 result_code = signal->theData[0];
+ acckeyref1Lab(signal, result_code);
+ return;
+ }//if
+ localKey = (signal->theData[1] << MAX_TUPLES_BITS) + signal->theData[2];
+ }
+ else
+ {
+ localKey = signal->theData[7];
+ }
+
insertLockOwnersList(signal, operationRecPtr);
const Uint32 tmp = fragrecptr.p->k + fragrecptr.p->lhfragbits;
@@ -2093,7 +1632,6 @@
jam();
queOperPtr.p->nextSerialQue = operationRecPtr.i;
operationRecPtr.p->prevSerialQue = queOperPtr.i;
- putOpInFragWaitQue(signal);
break;
}//switch
} else {
@@ -2161,7 +1699,6 @@
operationRecPtr.p->lockMode = readWriteOpPtr.p->lockMode;
break;
}//switch
- putOpInFragWaitQue(signal);
return;
}//if
}//if
@@ -2198,7 +1735,6 @@
operationRecPtr.p->prevSerialQue = readWriteOpPtr.i;
break;
}//switch
- putOpInFragWaitQue(signal);
}//Dbacc::checkOnlyReadEntry()
/* --------------------------------------------------------------------------------- */
@@ -2313,7 +1849,6 @@
/* --------------------------------------------------------------------------------- */
readWriteOpPtr.p->nextSerialQue = operationRecPtr.i;
operationRecPtr.p->prevSerialQue = readWriteOpPtr.i;
- putOpInFragWaitQue(signal);
return;
}//if
readWriteOpPtr.i = readWriteOpPtr.p->nextSerialQue;
@@ -2341,7 +1876,6 @@
readWriteOpPtr.p->nextParallelQue = operationRecPtr.i;
operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0];
operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1];
- putOpInFragWaitQue(signal);
return;
}//if
}//if
@@ -2396,23 +1930,6 @@
tulkLocalPtr = operationRecPtr.p->elementPointer + operationRecPtr.p->elementIsforward;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
ptrCheckGuard(ulkPageidptr, cpagesize, page8);
- if (fragrecptr.p->createLcp == ZTRUE) {
- //----------------------------------------------------------
- // To avoid undo log the element header we take care to only
- // undo log the local key part.
- //----------------------------------------------------------
- if (operationRecPtr.p->elementIsforward == 1) {
- jam();
- TlogStart = tulkLocalPtr;
- } else {
- jam();
- TlogStart = tulkLocalPtr - fragrecptr.p->localkeylen + 1;
- }//if
- datapageptr.p = ulkPageidptr.p;
- cundoinfolength = fragrecptr.p->localkeylen;
- cundoElemIndex = TlogStart;
- undoWritingProcess(signal);
- }//if
dbgWord32(ulkPageidptr, tulkLocalPtr, tlocalkey1);
arrGuard(tulkLocalPtr, 2048);
ulkPageidptr.p->word32[tulkLocalPtr] = tlocalkey1;
@@ -2453,15 +1970,13 @@
operationRecPtr.p->transactionstate = IDLE;
operationRecPtr.p->operation = ZUNDEFINED_OP;
if(Toperation != ZREAD){
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- rootfragrecptr.p->m_commit_count++;
+ fragrecptr.p->m_commit_count++;
if (Toperation != ZINSERT) {
if (Toperation != ZDELETE) {
return;
} else {
jam();
- rootfragrecptr.p->noOfElements--;
+ fragrecptr.p->noOfElements--;
fragrecptr.p->slack += operationRecPtr.p->insertDeleteLen;
if (fragrecptr.p->slack > fragrecptr.p->slackCheck) {
/* TIME FOR JOIN BUCKETS PROCESS */
@@ -2479,8 +1994,8 @@
}//if
}//if
} else {
- jam(); /* EXPAND PROCESS HANDLING */
- rootfragrecptr.p->noOfElements++;
+ jam(); /* EXPAND PROCESS HANDLING */
+ fragrecptr.p->noOfElements++;
fragrecptr.p->slack -= operationRecPtr.p->insertDeleteLen;
if (fragrecptr.p->slack >= (1u << 31)) {
/* IT MEANS THAT IF SLACK < ZERO */
@@ -2493,7 +2008,7 @@
sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 3, JBB);
}//if
}//if
- }//if
+ }
}
return;
}//Dbacc::execACC_COMMITREQ()
@@ -2557,20 +2072,11 @@
if (req->fragPtrI == RNIL) {
for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
jam();
- if (tabptr.p->fragptrholder[i] != RNIL) {
- rootfragrecptr.i = tabptr.p->fragptrholder[i];
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- if (rootfragrecptr.p->fragmentid[0] == req->fragId) {
- jam();
- req->fragPtrI = rootfragrecptr.p->fragmentptr[0];
- break;
- }
- if (rootfragrecptr.p->fragmentid[1] == req->fragId) {
- jam();
- req->fragPtrI = rootfragrecptr.p->fragmentptr[1];
- break;
- }
- }
+ if (tabptr.p->fragholder[i] == req->fragId){
+ jam();
+ req->fragPtrI = tabptr.p->fragptrholder[i];
+ break;
+ }
}
}
fragrecptr.i = req->fragPtrI;
@@ -2904,13 +2410,6 @@
return;
}//if
tidrContainerlen = tidrContainerlen + fragrecptr.p->elementLength;
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- datapageptr.p = idrPageptr.p;
- cundoElemIndex = tidrContainerptr;
- cundoinfolength = 1;
- undoWritingProcess(signal);
- }//if
if (tidrNextConLen == 0) {
/* EACH SIDE OF THE BUFFER WHICH BELONG TO A FREE */
/* LIST, HAS ZERO AS LENGTH. */
@@ -2955,15 +2454,6 @@
/* STRUCTURE. IT IS RATHER DIFFICULT TO MAINTAIN A LOGICAL STRUCTURE WHERE */
/* DELETES ARE INSERTS AND INSERTS ARE PURELY DELETES. */
/* --------------------------------------------------------------------------------- */
- if (fragrecptr.p->createLcp == ZTRUE) {
- if (tidrForward == ZTRUE) {
- cundoElemIndex = tidrIndex;
- } else {
- cundoElemIndex = (tidrIndex + 1) - fragrecptr.p->elementLength;
- }//if
- cundoinfolength = fragrecptr.p->elementLength;
- undoWritingProcess(signal);
- }//if
dbgWord32(idrPageptr, tidrIndex, tidrElemhead);
idrPageptr.p->word32[tidrIndex] = tidrElemhead; /* INSERTS THE HEAD OF THE ELEMENT */
tidrIndex += tidrForward;
@@ -2999,12 +2489,6 @@
{
Uint32 tancTmp1;
- if (fragrecptr.p->createLcp == ZTRUE) {
- cundoElemIndex = tancContainerptr;
- datapageptr.p = ancPageptr.p;
- cundoinfolength = 2;
- undoWritingProcess(signal); /* WHEN UNDO PROCESS HAS STARTED, */
- }//if
/* THE OLD DATA IS STORED ON AN UNDO PAGE */
/* --------------------------------------------------------------------------------- */
/* KEEP LENGTH INFORMATION IN BIT 26-31. */
@@ -3108,18 +2592,6 @@
arrGuard(tsllHeadIndex + 1, 2048);
tslNextfree = slPageptr.p->word32[tsllHeadIndex];
tslPrevfree = slPageptr.p->word32[tsllHeadIndex + 1];
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- datapageptr.p = slPageptr.p;
- cundoElemIndex = tsllHeadIndex;
- cundoinfolength = 2;
- undoWritingProcess(signal);
- }//if
- if (fragrecptr.p->createLcp == ZTRUE) {
- cundoElemIndex = ZPOS_EMPTY_LIST;
- cundoinfolength = 2;
- undoWritingProcess(signal);
- }//if
if (tslPrevfree == ZEMPTYLIST) {
jam();
/* UPDATE FREE LIST OF LEFT CONTAINER IN PAGE HEAD */
@@ -3133,22 +2605,12 @@
ndbrequire(tslPrevfree < ZEMPTYLIST);
jam();
tsllTmp = ((tslPrevfree << ZSHIFT_PLUS) - (tslPrevfree << ZSHIFT_MINUS)) + ZHEAD_SIZE;
- if (fragrecptr.p->createLcp == ZTRUE) {
- cundoElemIndex = tsllTmp;
- cundoinfolength = 1;
- undoWritingProcess(signal);
- }//if
dbgWord32(slPageptr, tsllTmp, tslNextfree);
slPageptr.p->word32[tsllTmp] = tslNextfree;
}//if
if (tslNextfree < ZEMPTYLIST) {
jam();
tsllTmp = (((tslNextfree << ZSHIFT_PLUS) - (tslNextfree << ZSHIFT_MINUS)) + ZHEAD_SIZE) + 1;
- if (fragrecptr.p->createLcp == ZTRUE) {
- cundoElemIndex = tsllTmp;
- cundoinfolength = 1;
- undoWritingProcess(signal);
- }//if
dbgWord32(slPageptr, tsllTmp, tslPrevfree);
slPageptr.p->word32[tsllTmp] = tslPrevfree;
} else {
@@ -3184,11 +2646,6 @@
if (tslNextfree < ZEMPTYLIST) {
jam();
tsllTmp = ((tslNextfree << ZSHIFT_PLUS) - (tslNextfree << ZSHIFT_MINUS)) + ZHEAD_SIZE;
- if (fragrecptr.p->createLcp == ZTRUE) {
- cundoElemIndex = tsllTmp;
- cundoinfolength = 1;
- undoWritingProcess(signal);
- }//if
tsllTmp1 = slPageptr.p->word32[tsllTmp] & 0xfe03ffff;
tsllTmp1 = tsllTmp1 | (tslPageindex << 18);
dbgWord32(slPageptr, tsllTmp, tsllTmp1);
@@ -3220,18 +2677,6 @@
arrGuard(tsrlHeadIndex + 1, 2048);
tslNextfree = slPageptr.p->word32[tsrlHeadIndex];
tslPrevfree = slPageptr.p->word32[tsrlHeadIndex + 1];
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- datapageptr.p = slPageptr.p;
- cundoElemIndex = tsrlHeadIndex;
- cundoinfolength = 2;
- undoWritingProcess(signal);
- }//if
- if (fragrecptr.p->createLcp == ZTRUE) {
- cundoElemIndex = ZPOS_EMPTY_LIST;
- cundoinfolength = 2;
- undoWritingProcess(signal);
- }//if
if (tslPrevfree == ZEMPTYLIST) {
jam();
tsrlTmp = slPageptr.p->word32[ZPOS_EMPTY_LIST];
@@ -3241,22 +2686,12 @@
ndbrequire(tslPrevfree < ZEMPTYLIST);
jam();
tsrlTmp = ((tslPrevfree << ZSHIFT_PLUS) - (tslPrevfree << ZSHIFT_MINUS)) + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
- if (fragrecptr.p->createLcp == ZTRUE) {
- cundoElemIndex = tsrlTmp;
- cundoinfolength = 1;
- undoWritingProcess(signal);
- }//if
dbgWord32(slPageptr, tsrlTmp, tslNextfree);
slPageptr.p->word32[tsrlTmp] = tslNextfree;
}//if
if (tslNextfree < ZEMPTYLIST) {
jam();
tsrlTmp = ((tslNextfree << ZSHIFT_PLUS) - (tslNextfree << ZSHIFT_MINUS)) + ((ZHEAD_SIZE + ZBUF_SIZE) - (ZCON_HEAD_SIZE - 1));
- if (fragrecptr.p->createLcp == ZTRUE) {
- cundoElemIndex = tsrlTmp;
- cundoinfolength = 1;
- undoWritingProcess(signal);
- }//if
dbgWord32(slPageptr, tsrlTmp, tslPrevfree);
slPageptr.p->word32[tsrlTmp] = tslPrevfree;
} else {
@@ -3290,12 +2725,6 @@
if (tslNextfree < ZEMPTYLIST) {
jam();
tsrlTmp = ((tslNextfree << ZSHIFT_PLUS) - (tslNextfree << ZSHIFT_MINUS)) + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- cundoElemIndex = tsrlTmp;
- cundoinfolength = 1;
- undoWritingProcess(signal);
- }//if
tsrlTmp1 = slPageptr.p->word32[tsrlTmp] & 0xfe03ffff;
dbgWord32(slPageptr, tsrlTmp, tsrlTmp1 | (tslPageindex << 18));
slPageptr.p->word32[tsrlTmp] = tsrlTmp1 | (tslPageindex << 18);
@@ -3588,21 +3017,23 @@
/* REPLACING IT WITH THE LAST ELEMENT IN THE BUCKET. IF THE DELETED ELEMENT */
/* IS ALSO THE LAST ELEMENT THEN IT IS ONLY NECESSARY TO REMOVE THE ELEMENT. */
/* --------------------------------------------------------------------------------- */
-void Dbacc::commitdelete(Signal* signal, bool systemRestart)
+void Dbacc::commitdelete(Signal* signal)
{
- if (!systemRestart) {
- jam();
- signal->theData[0] = fragrecptr.p->myfid;
- signal->theData[1] = fragrecptr.p->myTableId;
- signal->theData[2] = operationRecPtr.p->localdata[0];
- Uint32 localKey = operationRecPtr.p->localdata[0];
- Uint32 pageId = localKey >> MAX_TUPLES_BITS;
- Uint32 pageIndex = localKey & ((1 << MAX_TUPLES_BITS) - 1);
- signal->theData[2] = pageId;
- signal->theData[3] = pageIndex;
- EXECUTE_DIRECT(DBTUP, GSN_TUP_DEALLOCREQ, signal, 4);
- jamEntry();
- }//if
+ jam();
+ Uint32 localKey = operationRecPtr.p->localdata[0];
+ Uint32 userptr= operationRecPtr.p->userptr;
+
+ signal->theData[0] = fragrecptr.p->myfid;
+ signal->theData[1] = fragrecptr.p->myTableId;
+ signal->theData[2] = operationRecPtr.p->localdata[0];
+ Uint32 pageId = localKey >> MAX_TUPLES_BITS;
+ Uint32 pageIndex = localKey & ((1 << MAX_TUPLES_BITS) - 1);
+ signal->theData[2] = pageId;
+ signal->theData[3] = pageIndex;
+ signal->theData[4] = userptr;
+ EXECUTE_DIRECT(DBTUP, GSN_TUP_DEALLOCREQ, signal, 5);
+ jamEntry();
+
getdirindex(signal);
tlastPageindex = tgdiPageindex;
lastPageptr.i = gdiPageptr.i;
@@ -3673,18 +3104,6 @@
goto deleteElement_index_error1;
{
const Uint32 tdeElemhead = lastPageptr.p->word32[tlastElementptr];
- if (fragrecptr.p->createLcp == ZTRUE) {
- datapageptr.p = delPageptr.p;
- cundoinfolength = fragrecptr.p->elementLength;
- if (tdelForward == ZTRUE) {
- jam();
- cundoElemIndex = tdelElementptr;
- } else {
- jam();
- cundoElemIndex = (tdelElementptr + 1) - fragrecptr.p->elementLength;
- }//if
- undoWritingProcess(signal);
- }//if
tlastMoveElemptr = tlastElementptr;
tdelMoveElemptr = tdelElementptr;
guard31 = fragrecptr.p->elementLength - 1;
@@ -3704,16 +3123,10 @@
/* --------------------------------------------------------------------------------- */
deOperationRecPtr.i = ElementHeader::getOpPtrI(tdeElemhead);
ptrCheckGuard(deOperationRecPtr, coprecsize, operationrec);
- if (cundoLogActive == ZFALSE) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* WE DO NOT BOTHER WITH THIS INFORMATION DURING EXECUTION OF THE UNDO LOG. */
- /* --------------------------------------------------------------------------------- */
- deOperationRecPtr.p->elementPage = delPageptr.i;
- deOperationRecPtr.p->elementContainer = tdelContainerptr;
- deOperationRecPtr.p->elementPointer = tdelElementptr;
- deOperationRecPtr.p->elementIsforward = tdelForward;
- }//if
+ deOperationRecPtr.p->elementPage = delPageptr.i;
+ deOperationRecPtr.p->elementContainer = tdelContainerptr;
+ deOperationRecPtr.p->elementPointer = tdelElementptr;
+ deOperationRecPtr.p->elementIsforward = tdelForward;
/* --------------------------------------------------------------------------------- */
// We need to take extreme care to not install locked records after system restart.
// An undo of the delete will reinstall the moved record. We have to ensure that the
@@ -3841,13 +3254,6 @@
/* DELETE THE LAST CONTAINER AND UPDATE THE PREVIOUS CONTAINER. ALSO PUT THIS */
/* CONTAINER IN FREE CONTAINER LIST OF THE PAGE. */
/* --------------------------------------------------------------------------------- */
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- datapageptr.p = lastPrevpageptr.p;
- cundoElemIndex = tlastPrevconptr;
- cundoinfolength = 1;
- undoWritingProcess(signal);
- }//if
ndbrequire(tlastPrevconptr < 2048);
tglrTmp = lastPrevpageptr.p->word32[tlastPrevconptr] >> 9;
dbgWord32(lastPrevpageptr, tlastPrevconptr, tglrTmp << 9);
@@ -3868,13 +3274,6 @@
tglrHead = tlastContainerhead << 6;
tglrHead = tglrHead >> 6;
tglrHead = tglrHead | (tlastContainerlen << 26);
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- datapageptr.p = lastPageptr.p;
- cundoElemIndex = tlastContainerptr;
- cundoinfolength = 1;
- undoWritingProcess(signal);
- }//if
dbgWord32(lastPageptr, tlastContainerptr, tglrHead);
arrGuard(tlastContainerptr, 2048);
lastPageptr.p->word32[tlastContainerptr] = tglrHead;
@@ -3900,19 +3299,6 @@
Uint32 tullTmp;
Uint32 tullTmp1;
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- datapageptr.p = rlPageptr.p;
- cundoElemIndex = tullIndex;
- cundoinfolength = 2;
- undoWritingProcess(signal);
- }//if
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- cundoElemIndex = ZPOS_EMPTY_LIST;
- cundoinfolength = 2;
- undoWritingProcess(signal);
- }//if
/* --------------------------------------------------------------------------------- */
/* IF A CONTAINER IS RELEASED AND NOT ONLY A PART THEN WE HAVE TO REMOVE IT */
/* FROM THE LIST OF USED CONTAINERS IN THE PAGE. THIS IN ORDER TO ENSURE THAT */
@@ -3927,12 +3313,6 @@
jam();
tullTmp1 = (trlNextused << ZSHIFT_PLUS) - (trlNextused << ZSHIFT_MINUS);
tullTmp1 = tullTmp1 + ZHEAD_SIZE;
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- cundoElemIndex = tullTmp1;
- cundoinfolength = 1;
- undoWritingProcess(signal);
- }//if
tullTmp = rlPageptr.p->word32[tullTmp1] & 0xfe03ffff;
dbgWord32(rlPageptr, tullTmp1, tullTmp | (trlPrevused << 18));
rlPageptr.p->word32[tullTmp1] = tullTmp | (trlPrevused << 18);
@@ -3944,12 +3324,6 @@
jam();
tullTmp1 = (trlPrevused << ZSHIFT_PLUS) - (trlPrevused << ZSHIFT_MINUS);
tullTmp1 = tullTmp1 + ZHEAD_SIZE;
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- cundoElemIndex = tullTmp1;
- cundoinfolength = 1;
- undoWritingProcess(signal);
- }//if
tullTmp = rlPageptr.p->word32[tullTmp1] & 0xfffc07ff;
dbgWord32(rlPageptr, tullTmp1, tullTmp | (trlNextused << 11));
rlPageptr.p->word32[tullTmp1] = tullTmp | (trlNextused << 11);
@@ -3975,12 +3349,6 @@
jam();
tullTmp1 = (tullTmp1 << ZSHIFT_PLUS) - (tullTmp1 << ZSHIFT_MINUS);
tullTmp1 = (tullTmp1 + ZHEAD_SIZE) + 1;
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- cundoElemIndex = tullTmp1;
- cundoinfolength = 1;
- undoWritingProcess(signal);
- }//if
dbgWord32(rlPageptr, tullTmp1, trlPageindex);
rlPageptr.p->word32[tullTmp1] = trlPageindex; /* UPDATES PREV POINTER IN THE NEXT FREE */
} else {
@@ -4023,19 +3391,6 @@
Uint32 turlTmp1;
Uint32 turlTmp;
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- datapageptr.p = rlPageptr.p;
- cundoElemIndex = turlIndex;
- cundoinfolength = 2;
- undoWritingProcess(signal);
- }//if
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- cundoElemIndex = ZPOS_EMPTY_LIST;
- cundoinfolength = 2;
- undoWritingProcess(signal);
- }//if
/* --------------------------------------------------------------------------------- */
/* IF A CONTAINER IS RELEASED AND NOT ONLY A PART THEN WE HAVE TO REMOVE IT */
/* FROM THE LIST OF USED CONTAINERS IN THE PAGE. THIS IN ORDER TO ENSURE THAT */
@@ -4051,12 +3406,6 @@
jam();
turlTmp1 = (trlNextused << ZSHIFT_PLUS) - (trlNextused << ZSHIFT_MINUS);
turlTmp1 = turlTmp1 + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- cundoElemIndex = turlTmp1;
- cundoinfolength = 1;
- undoWritingProcess(signal);
- }//if
turlTmp = rlPageptr.p->word32[turlTmp1] & 0xfe03ffff;
dbgWord32(rlPageptr, turlTmp1, turlTmp | (trlPrevused << 18));
rlPageptr.p->word32[turlTmp1] = turlTmp | (trlPrevused << 18);
@@ -4068,12 +3417,6 @@
jam();
turlTmp1 = (trlPrevused << ZSHIFT_PLUS) - (trlPrevused << ZSHIFT_MINUS);
turlTmp1 = turlTmp1 + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- cundoElemIndex = turlTmp1;
- cundoinfolength = 1;
- undoWritingProcess(signal);
- }//if
turlTmp = rlPageptr.p->word32[turlTmp1] & 0xfffc07ff;
dbgWord32(rlPageptr, turlTmp1, turlTmp | (trlNextused << 11));
rlPageptr.p->word32[turlTmp1] = turlTmp | (trlNextused << 11);
@@ -4100,12 +3443,6 @@
jam();
turlTmp = (turlTmp1 << ZSHIFT_PLUS) - (turlTmp1 << ZSHIFT_MINUS);
turlTmp = turlTmp + ((ZHEAD_SIZE + ZBUF_SIZE) - (ZCON_HEAD_SIZE - 1));
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- cundoElemIndex = turlTmp;
- cundoinfolength = 1;
- undoWritingProcess(signal);
- }//if
dbgWord32(rlPageptr, turlTmp, trlPageindex);
rlPageptr.p->word32[turlTmp] = trlPageindex; /* UPDATES PREV POINTER IN THE NEXT FREE */
} else {
@@ -4136,26 +3473,24 @@
{
Uint32 tcolTmp;
- if (fragrecptr.p->loadingFlag == ZFALSE) {
- tcolTmp = colPageptr.p->word32[ZPOS_ALLOC_CONTAINERS];
- if (tcolTmp <= ZFREE_LIMIT) {
- if (tcolTmp == 0) {
- jam();
- ropPageptr = colPageptr;
- releaseOverpage(signal);
- } else {
- jam();
- if (colPageptr.p->word32[ZPOS_OVERFLOWREC] == RNIL) {
- ndbrequire(cfirstfreeoverrec != RNIL);
- jam();
- seizeOverRec(signal);
- sorOverflowRecPtr.p->dirindex = colPageptr.p->word32[ZPOS_PAGE_ID];
- sorOverflowRecPtr.p->overpage = colPageptr.i;
- dbgWord32(colPageptr, ZPOS_OVERFLOWREC, sorOverflowRecPtr.i);
- colPageptr.p->word32[ZPOS_OVERFLOWREC] = sorOverflowRecPtr.i;
- porOverflowRecPtr = sorOverflowRecPtr;
- putOverflowRecInFrag(signal);
- }//if
+ tcolTmp = colPageptr.p->word32[ZPOS_ALLOC_CONTAINERS];
+ if (tcolTmp <= ZFREE_LIMIT) {
+ if (tcolTmp == 0) {
+ jam();
+ ropPageptr = colPageptr;
+ releaseOverpage(signal);
+ } else {
+ jam();
+ if (colPageptr.p->word32[ZPOS_OVERFLOWREC] == RNIL) {
+ ndbrequire(cfirstfreeoverrec != RNIL);
+ jam();
+ seizeOverRec(signal);
+ sorOverflowRecPtr.p->dirindex = colPageptr.p->word32[ZPOS_PAGE_ID];
+ sorOverflowRecPtr.p->overpage = colPageptr.i;
+ dbgWord32(colPageptr, ZPOS_OVERFLOWREC, sorOverflowRecPtr.i);
+ colPageptr.p->word32[ZPOS_OVERFLOWREC] = sorOverflowRecPtr.i;
+ porOverflowRecPtr = sorOverflowRecPtr;
+ putOverflowRecInFrag(signal);
}//if
}//if
}//if
@@ -4222,7 +3557,7 @@
return;
} else {
jam();
- commitdelete(signal, false);
+ commitdelete(signal);
}//if
}//if
} else {
@@ -4230,7 +3565,6 @@
// We are not the lock owner.
/* --------------------------------------------------------------- */
jam();
- takeOutFragWaitQue(signal);
if (operationRecPtr.p->prevParallelQue != RNIL) {
jam();
/* ---------------------------------------------------------------------------------- */
@@ -4316,28 +3650,31 @@
do {
if (deleteOpPtr.p->operation == ZDELETE) {
jam();
- /* --------------------------------------------------------------------------------- */
- /* IF THE CURRENT OPERATION TO BE COMMITTED IS A DELETE OPERATION DUE TO A */
- /* SCAN-TAKEOVER THE ACTUAL DELETE WILL BE PERFORMED BY THE PREVIOUS OPERATION (SCAN)*/
- /* IN THE PARALLEL QUEUE WHICH OWNS THE LOCK.THE PROBLEM IS THAT THE SCAN OPERATION */
- /* DOES NOT HAVE A HASH VALUE ASSIGNED TO IT SO WE COPY IT FROM THIS OPERATION. */
- /* */
- /* WE ASSUME THAT THIS SOLUTION WILL WORK BECAUSE THE ONLY WAY A SCAN CAN PERFORM */
- /* A DELETE IS BY BEING FOLLOWED BY A NORMAL DELETE-OPERATION THAT HAS A HASH VALUE. */
- /* --------------------------------------------------------------------------------- */
+ /* -------------------------------------------------------------------
+ * IF THE CURRENT OPERATION TO BE COMMITTED IS A DELETE OPERATION DUE TO
+ * A SCAN-TAKEOVER THE ACTUAL DELETE WILL BE PERFORMED BY THE PREVIOUS
+ * OPERATION (SCAN) IN THE PARALLEL QUEUE WHICH OWNS THE LOCK.
+ * THE PROBLEM IS THAT THE SCAN OPERATION DOES NOT HAVE A HASH VALUE
+ * ASSIGNED TO IT SO WE COPY IT FROM THIS OPERATION.
+ *
+ * WE ASSUME THAT THIS SOLUTION WILL WORK BECAUSE THE ONLY WAY A
+ * SCAN CAN PERFORM A DELETE IS BY BEING FOLLOWED BY A NORMAL
+ * DELETE-OPERATION THAT HAS A HASH VALUE.
+ * ----------------------------------------------------------------- */
hashValue = deleteOpPtr.p->hashValue;
elementDeleted = true;
deleteCheckOngoing = false;
} else if ((deleteOpPtr.p->operation == ZREAD) ||
(deleteOpPtr.p->operation == ZSCAN_OP)) {
- /* --------------------------------------------------------------------------------- */
- /* We are trying to find out whether the commit will in the end delete the tuple. */
- /* Normally the delete will be the last operation in the list of operations on this */
- /* It is however possible to issue reads and scans in the same savepoint as the */
- /* delete operation was issued and these can end up after the delete in the list of */
- /* operations in the parallel queue. Thus if we discover a read or a scan we have to */
- /* continue scanning the list looking for a delete operation. */
- /* --------------------------------------------------------------------------------- */
+ /* -------------------------------------------------------------------
+ * We are trying to find out whether the commit will in the end delete
+ * the tuple. Normally the delete will be the last operation in the
+ * list of operations on this. It is however possible to issue reads
+ * and scans in the same savepoint as the delete operation was issued
+ * and these can end up after the delete in the list of operations
+ * in the parallel queue. Thus if we discover a read or a scan
+ * we have to continue scanning the list looking for a delete operation.
+ */
deleteOpPtr.i = deleteOpPtr.p->prevParallelQue;
if (deleteOpPtr.i == RNIL) {
jam();
@@ -4348,10 +3685,10 @@
}//if
} else {
jam();
- /* --------------------------------------------------------------------------------- */
- /* Finding an UPDATE or INSERT before finding a DELETE means we cannot be deleting */
- /* as the end result of this transaction. */
- /* --------------------------------------------------------------------------------- */
+ /* ------------------------------------------------------------------ */
+ /* Finding an UPDATE or INSERT before finding a DELETE
+ * means we cannot be deleting as the end result of this transaction.
+ */
deleteCheckOngoing = false;
}//if
} while (deleteCheckOngoing);
@@ -4434,7 +3771,7 @@
No queue and elementIsDisappeared is true. We perform the actual delete
operation.
*/
- commitdelete(signal, false);
+ commitdelete(signal);
return;
}//if
} else {
@@ -4616,10 +3953,11 @@
trlOperPtr.i = RNIL;
if (operationRecPtr.p->nextParallelQue != RNIL) {
jam();
- /* --------------------------------------------------------------------------------- */
- /* NEXT OPERATION TAKES OVER THE LOCK. We will simply move the info from the leader */
- // to the new queue leader.
- /* --------------------------------------------------------------------------------- */
+ /** ---------------------------------------------------------------------
+ * NEXT OPERATION TAKES OVER THE LOCK.
+ * We will simply move the info from the leader
+ * to the new queue leader.
+ * -------------------------------------------------------------------- */
trlOperPtr.i = operationRecPtr.p->nextParallelQue;
ptrCheckGuard(trlOperPtr, coprecsize, operationrec);
copyInOperPtr = trlOperPtr;
@@ -4628,9 +3966,10 @@
trlOperPtr.p->prevParallelQue = RNIL;
if (operationRecPtr.p->nextSerialQue != RNIL) {
jam();
- /* --------------------------------------------------------------------------------- */
- /* THERE IS A SERIAL QUEUE. MOVE IT FROM RELEASED OP REC TO THE NEW LOCK OWNER. */
- /* --------------------------------------------------------------------------------- */
+ /* -----------------------------------------------------------------
+ * THERE IS A SERIAL QUEUE. MOVE IT FROM RELEASED OP REC TO THE
+ * NEW LOCK OWNER.
+ * ------------------------------------------------------------------ */
trlOperPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue;
trlTmpOperPtr.i = trlOperPtr.p->nextSerialQue;
ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec);
@@ -4638,17 +3977,13 @@
}//if
check_lock_upgrade(signal, copyInOperPtr, operationRecPtr);
- /* --------------------------------------------------------------------------------- */
- /* SINCE THERE ARE STILL ITEMS IN THE PARALLEL QUEUE WE NEED NOT WORRY ABOUT */
- /* STARTING QUEUED OPERATIONS. THUS WE CAN END HERE. */
- /* --------------------------------------------------------------------------------- */
} else {
ndbrequire(operationRecPtr.p->nextSerialQue != RNIL);
jam();
- /* --------------------------------------------------------------------------------- */
- /* THE PARALLEL QUEUE IS EMPTY AND THE SERIAL QUEUE IS NOT EMPTY. WE NEED TO */
- /* REARRANGE LISTS AND START A NUMBER OF OPERATIONS. */
- /* --------------------------------------------------------------------------------- */
+ /** ---------------------------------------------------------------------
+ * THE PARALLEL QUEUE IS EMPTY AND THE SERIAL QUEUE IS NOT EMPTY.
+ * WE NEED TO REARRANGE LISTS AND START A NUMBER OF OPERATIONS.
+ * -------------------------------------------------------------------- */
trlOperPtr.i = operationRecPtr.p->nextSerialQue;
ptrCheckGuard(trlOperPtr, coprecsize, operationrec);
copyOperPtr = operationRecPtr;
@@ -4656,48 +3991,53 @@
copyOpInfo(signal);
trlOperPtr.p->prevSerialQue = RNIL;
ndbrequire(trlOperPtr.p->prevParallelQue == RNIL);
- /* --------------------------------------------------------------------------------- */
- /* WE HAVE MOVED TO THE NEXT PARALLEL QUEUE. WE MUST START ALL OF THOSE */
- /* OPERATIONS WHICH UP TILL NOW HAVE BEEN QUEUED WAITING FOR THE LOCK. */
- /* --------------------------------------------------------------------------------- */
+ /* --------------------------------------------------------------------- */
+ /* WE HAVE MOVED TO THE NEXT PARALLEL QUEUE. WE MUST START ALL OF THOSE */
+ /* OPERATIONS WHICH UP TILL NOW HAVE BEEN QUEUED WAITING FOR THE LOCK. */
+ /* --------------------------------------------------------------------- */
rloOperPtr = operationRecPtr;
trlTmpOperPtr = trlOperPtr;
TelementIsDisappeared = trlOperPtr.p->elementIsDisappeared;
Uint32 ThashValue = trlOperPtr.p->hashValue;
do {
- /* --------------------------------------------------------------------------------- */
- // Ensure that all operations in the queue are assigned with the elementIsDisappeared
- // to ensure that the element is removed after a previous delete. An insert does
- // however revert this decision since the element is put back again. Local checkpoints
- // complicate life here since they do not execute the next operation but simply change
- // the state on the operation. We need to set-up the variable elementIsDisappeared
- // properly even when local checkpoints and inserts/writes after deletes occur.
- /* --------------------------------------------------------------------------------- */
+ /* ------------------------------------------------------------------ */
+ // Ensure that all operations in the queue are assigned with the
+ // elementIsDisappeared to ensure that the element is removed after
+ // a previous delete. An insert does however revert this decision
+ // since the element is put back again.
+ // Local checkpoints complicate life here since they do not
+ // execute the next operation but simply change
+ // the state on the operation.
+ // We need to set-up the variable elementIsDisappeared
+ // properly even when local checkpoints and inserts/writes after
+ // deletes occur.
+ /* ------------------------------------------------------------------- */
trlTmpOperPtr.p->elementIsDisappeared = TelementIsDisappeared;
if (TelementIsDisappeared == ZTRUE) {
- /* --------------------------------------------------------------------------------- */
- // If the elementIsDisappeared is set then we know that the hashValue is also set
- // since it always originates from a committing abort or a aborting insert. Scans
- // do not initialise the hashValue and must have this value initialised if they are
- // to successfully commit the delete.
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------- */
+ // If the elementIsDisappeared is set then we know that the
+ // hashValue is also set since it always originates from a
+ // committing abort or a aborting insert.
+ // Scans do not initialise the hashValue and must have this
+ // value initialised if they are to successfully commit the delete.
+ /* ----------------------------------------------------------------- */
jam();
trlTmpOperPtr.p->hashValue = ThashValue;
}//if
trlTmpOperPtr.p->localdata[0] = trlOperPtr.p->localdata[0];
trlTmpOperPtr.p->localdata[1] = trlOperPtr.p->localdata[1];
- /* --------------------------------------------------------------------------------- */
+ /* ------------------------------------------------------------------- */
// Restart the queued operation.
- /* --------------------------------------------------------------------------------- */
+ /* ------------------------------------------------------------------- */
operationRecPtr = trlTmpOperPtr;
TelementIsDisappeared = executeNextOperation(signal);
ThashValue = operationRecPtr.p->hashValue;
if (trlTmpOperPtr.p->nextParallelQue != RNIL) {
jam();
- /* --------------------------------------------------------------------------------- */
- // We will continue with the next operation in the parallel queue and start this as
- // well.
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------- */
+ // We will continue with the next operation in the parallel
+ // queue and start this as well.
+ /* ----------------------------------------------------------------- */
trlTmpOperPtr.i = trlTmpOperPtr.p->nextParallelQue;
ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec);
} else {
@@ -4757,68 +4097,44 @@
Uint32 Dbacc::executeNextOperation(Signal* signal)
{
ndbrequire(operationRecPtr.p->transactionstate == ACTIVE);
- if (fragrecptr.p->stopQueOp == ZTRUE) {
- Uint32 TelemDisappeared;
- jam();
- TelemDisappeared = operationRecPtr.p->elementIsDisappeared;
- if ((operationRecPtr.p->elementIsDisappeared == ZTRUE) &&
- (operationRecPtr.p->prevParallelQue == RNIL) &&
- ((operationRecPtr.p->operation == ZINSERT) ||
- (operationRecPtr.p->operation == ZWRITE))) {
- jam();
- /* --------------------------------------------------------------------------------- */
- // In this case we do not wish to change the elementIsDisappeared since that would
- // create an error the next time this method is called for this operation after local
- // checkpoint starts up operations again. We must however ensure that operations
- // that follow in the queue do not get the value ZTRUE when actually an INSERT/WRITE
- // precedes them (only if the INSERT/WRITE is the first operation).
- /* --------------------------------------------------------------------------------- */
- TelemDisappeared = ZFALSE;
- }//if
- /* --------------------------------------------------------------------------------- */
- /* A LOCAL CHECKPOINT HAS STOPPED OPERATIONS. WE MUST NOT START THE OPERATION */
- /* AT THIS TIME. WE SET THE STATE TO INDICATE THAT WE ARE READY TO START AS */
- /* SOON AS WE ARE ALLOWED. */
- /* --------------------------------------------------------------------------------- */
- operationRecPtr.p->opState = WAIT_EXE_OP;
- return TelemDisappeared;
- }//if
- takeOutFragWaitQue(signal);
if (operationRecPtr.p->elementIsDisappeared == ZTRUE) {
- /* --------------------------------------------------------------------------------- */
- /* PREVIOUS OPERATION WAS DELETE OPERATION AND THE ELEMENT IS ALREADY DELETED. */
- /* --------------------------------------------------------------------------------- */
+ /* --------------------------------------------------------------------- */
+ /* PREVIOUS OPERATION WAS DELETE OPERATION AND THE ELEMENT IS DELETED. */
+ /* --------------------------------------------------------------------- */
if (((operationRecPtr.p->operation != ZINSERT) &&
(operationRecPtr.p->operation != ZWRITE)) ||
(operationRecPtr.p->prevParallelQue != RNIL)) {
if (operationRecPtr.p->operation != ZSCAN_OP ||
operationRecPtr.p->isAccLockReq) {
jam();
- /* --------------------------------------------------------------------------------- */
- // Updates and reads with a previous delete simply aborts with read error indicating
- // that tuple did not exist. Also inserts and writes not being the first operation.
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------- */
+ // Updates and reads with a previous delete simply aborts with read
+ // error indicating that tuple did not exist.
+ // Also inserts and writes not being the first operation.
+ /* ----------------------------------------------------------------- */
operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT;
signal->theData[0] = operationRecPtr.p->userptr;
signal->theData[1] = ZREAD_ERROR;
- sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal, 2, JBB);
+ sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal,
+ 2, JBB);
return operationRecPtr.p->elementIsDisappeared;
} else {
- /* --------------------------------------------------------------------------------- */
- /* ABORT OF OPERATION NEEDED BUT THE OPERATION IS A SCAN => SPECIAL TREATMENT. */
- /* IF THE SCAN WAITS IN QUEUE THEN WE MUST REMOVE THE OPERATION FROM THE SCAN */
- /* LOCK QUEUE AND IF NO MORE OPERATIONS ARE QUEUED THEN WE SHOULD RESTART THE */
- /* SCAN PROCESS. OTHERWISE WE SIMPLY RELEASE THE OPERATION AND DECREASE THE */
- /* NUMBER OF LOCKS HELD. */
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------- */
+ /* ABORT OF OPERATION NEEDED BUT THE OPERATION IS A
+ * SCAN => SPECIAL TREATMENT.
+ * IF THE SCAN WAITS IN QUEUE THEN WE MUST REMOVE THE OPERATION
+ * FROM THE SCAN LOCK QUEUE AND IF NO MORE OPERATIONS ARE QUEUED
+ * THEN WE SHOULD RESTART THE SCAN PROCESS. OTHERWISE WE SIMPLY
+ * RELEASE THE OPERATION AND DECREASE THE NUMBER OF LOCKS HELD.
+ * ----------------------------------------------------------------- */
takeOutScanLockQueue(operationRecPtr.p->scanRecPtr);
putReadyScanQueue(signal, operationRecPtr.p->scanRecPtr);
return operationRecPtr.p->elementIsDisappeared;
}//if
}//if
- /* --------------------------------------------------------------------------------- */
+ /* --------------------------------------------------------------------- */
// Insert and writes can continue but need to be converted to inserts.
- /* --------------------------------------------------------------------------------- */
+ /* --------------------------------------------------------------------- */
jam();
operationRecPtr.p->elementIsDisappeared = ZFALSE;
operationRecPtr.p->operation = ZINSERT;
@@ -4837,14 +4153,15 @@
}//if
if (abortFlag) {
jam();
- /* --------------------------------------------------------------------------------- */
- /* ELEMENT STILL REMAINS AND WE ARE TRYING TO INSERT IT AGAIN. THIS IS CLEARLY */
- /* NOT A GOOD IDEA. */
- /* --------------------------------------------------------------------------------- */
+ /* ------------------------------------------------------------------- */
+ /* ELEMENT STILL REMAINS AND WE ARE TRYING TO INSERT IT AGAIN. */
+ /* THIS IS CLEARLY NOT A GOOD IDEA. */
+ /* ------------------------------------------------------------------- */
operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT;
signal->theData[0] = operationRecPtr.p->userptr;
signal->theData[1] = ZWRITE_ERROR;
- sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal, 2, JBB);
+ sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal,
+ 2, JBB);
return operationRecPtr.p->elementIsDisappeared;
}//if
}
@@ -4873,52 +4190,12 @@
} else {
jam();
sendAcckeyconf(signal);
- sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYCONF, signal, 6, JBB);
+ sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYCONF,
+ signal, 6, JBB);
}//if
return operationRecPtr.p->elementIsDisappeared;
}//Dbacc::executeNextOperation()
-/* --------------------------------------------------------------------------------- */
-/* TAKE_OUT_FRAG_WAIT_QUE */
-/* DESCRIPTION: AN OPERATION WHICH OWNS A LOCK OF AN ELEMENT, IS IN A LIST */
-/* OF THE FRAGMENT. THIS LIST IS USED TO STOP THE QUEUE OPERATION */
-/* DURING CREATE CHECK POINT PROSESS FOR STOP AND RESTART OF THE */
-/* OPERATIONS. THIS SUBRUTIN TAKES A OPERATION RECORD OUT OF THE LIST */
-/* -------------------------------------------------------------------------------- */
-void Dbacc::takeOutFragWaitQue(Signal* signal)
-{
- OperationrecPtr tofwqOperRecPtr;
-
- if (operationRecPtr.p->opState == WAIT_IN_QUEUE) {
- if (fragrecptr.p->sentWaitInQueOp == operationRecPtr.i) {
- jam();
- fragrecptr.p->sentWaitInQueOp = operationRecPtr.p->nextQueOp;
- }//if
- if (operationRecPtr.p->prevQueOp != RNIL) {
- jam();
- tofwqOperRecPtr.i = operationRecPtr.p->prevQueOp;
- ptrCheckGuard(tofwqOperRecPtr, coprecsize, operationrec);
- tofwqOperRecPtr.p->nextQueOp = operationRecPtr.p->nextQueOp;
- } else {
- jam();
- fragrecptr.p->firstWaitInQueOp = operationRecPtr.p->nextQueOp;
- }//if
- if (operationRecPtr.p->nextQueOp != RNIL) {
- jam();
- tofwqOperRecPtr.i = operationRecPtr.p->nextQueOp;
- ptrCheckGuard(tofwqOperRecPtr, coprecsize, operationrec);
- tofwqOperRecPtr.p->prevQueOp = operationRecPtr.p->prevQueOp;
- } else {
- jam();
- fragrecptr.p->lastWaitInQueOp = operationRecPtr.p->prevQueOp;
- }//if
- operationRecPtr.p->opState = FREE_OP;
- return;
- } else {
- ndbrequire(operationRecPtr.p->opState == FREE_OP);
- }//if
-}//Dbacc::takeOutFragWaitQue()
-
/**
* takeOutLockOwnersList
*
@@ -4931,7 +4208,7 @@
{
const Uint32 Tprev = outOperPtr.p->prevLockOwnerOp;
const Uint32 Tnext = outOperPtr.p->nextLockOwnerOp;
-
+
#ifdef VM_TRACE
// Check that operation is already in the list
OperationrecPtr tmpOperPtr;
@@ -4945,10 +4222,10 @@
}
ndbrequire(inList == true);
#endif
-
+
ndbrequire(outOperPtr.p->lockOwner == ZTRUE);
outOperPtr.p->lockOwner = ZFALSE;
-
+
// Fast path through the code for the common case.
if ((Tprev == RNIL) && (Tnext == RNIL)) {
ndbrequire(fragrecptr.p->lockOwnersList == outOperPtr.i);
@@ -5004,8 +4281,12 @@
insOperPtr.p->lockOwner = ZTRUE;
insOperPtr.p->prevLockOwnerOp = RNIL;
tmpOperPtr.i = fragrecptr.p->lockOwnersList;
- fragrecptr.p->lockOwnersList = insOperPtr.i;
+ const Uint32 seq = fragrecptr.p->m_current_sequence_no;
insOperPtr.p->nextLockOwnerOp = tmpOperPtr.i;
+ insOperPtr.p->m_sequence_no = seq;
+
+ fragrecptr.p->lockOwnersList = insOperPtr.i;
+ fragrecptr.p->m_current_sequence_no = seq+1;
if (tmpOperPtr.i == RNIL) {
return;
} else {
@@ -5141,19 +4422,16 @@
DirRangePtr TDirRangePtr;
Page8Ptr TPageptr;
ScanRecPtr TscanPtr;
- RootfragmentrecPtr Trootfragrecptr;
- Trootfragrecptr.i = fragrecptr.p->myroot;
TSplit = fragrecptr.p->p;
- ptrCheckGuard(Trootfragrecptr, crootfragmentsize, rootfragmentrec);
for (Ti = 0; Ti < 4; Ti++) {
TreleaseScanIndicator[Ti] = 0;
- if (Trootfragrecptr.p->scan[Ti] != RNIL) {
+ if (fragrecptr.p->scan[Ti] != RNIL) {
//-------------------------------------------------------------
// A scan is ongoing on this particular local fragment. We have
// to check its current state.
//-------------------------------------------------------------
- TscanPtr.i = Trootfragrecptr.p->scan[Ti];
+ TscanPtr.i = fragrecptr.p->scan[Ti];
ptrCheckGuard(TscanPtr, cscanRecSize, scanRec);
if (TscanPtr.p->activeLocalFrag == fragrecptr.i) {
if (TscanPtr.p->scanBucketState == ScanRec::FIRST_LAP) {
@@ -5211,7 +4489,7 @@
for (Ti = 0; Ti < 4; Ti++) {
if (TreleaseScanIndicator[Ti] == 1) {
jam();
- scanPtr.i = Trootfragrecptr.p->scan[Ti];
+ scanPtr.i = fragrecptr.p->scan[Ti];
ptrCheckGuard(scanPtr, cscanRecSize, scanRec);
rsbPageidptr = TPageptr;
trsbPageindex = TPageIndex;
@@ -5280,16 +4558,6 @@
/*--------------------------------------------------------------*/
return;
}//if
- if (fragrecptr.p->createLcp == ZTRUE) {
- if (remainingUndoPages() < ZMIN_UNDO_PAGES_AT_EXPAND) {
- jam();
- /*--------------------------------------------------------------*/
- // We did not have enough undo log buffers to start up an
- // expand operation
- /*--------------------------------------------------------------*/
- return;
- }//if
- }//if
/*--------------------------------------------------------------------------*/
/* WE START BY FINDING THE PAGE, THE PAGE INDEX AND THE PAGE DIRECTORY*/
@@ -5410,38 +4678,34 @@
Uint32 fragId = signal->theData[1];
ptrCheckGuard(tabptr, ctablesize, tabrec);
- ndbrequire(getrootfragmentrec(signal, rootfragrecptr, fragId));
+ ndbrequire(getfragmentrec(signal, fragrecptr, fragId));
#if 0
ndbout_c("reenable expand check for table %d fragment: %d",
tabptr.i, fragId);
#endif
- for (Uint32 i = 0; i < 2; i++) {
- fragrecptr.i = rootfragrecptr.p->fragmentptr[i];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- switch(fragrecptr.p->expandFlag){
- case 0:
- /**
- * Hmm... this means that it's alreay has been reenabled...
- */
- ndbassert(false);
- continue;
- case 1:
- /**
- * Nothing is going on start expand check
- */
- case 2:
- /**
- * A shrink is running, do expand check anyway
- * (to reset expandFlag)
- */
- fragrecptr.p->expandFlag = 2;
- signal->theData[0] = fragrecptr.i;
- signal->theData[1] = fragrecptr.p->p;
- signal->theData[2] = fragrecptr.p->maxp;
- sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 3, JBB);
- break;
- }
+ switch(fragrecptr.p->expandFlag){
+ case 0:
+ /**
+ * Hmm... this means that it's alreay has been reenabled...
+ */
+ ndbassert(false);
+ break;
+ case 1:
+ /**
+ * Nothing is going on start expand check
+ */
+ case 2:
+ /**
+ * A shrink is running, do expand check anyway
+ * (to reset expandFlag)
+ */
+ fragrecptr.p->expandFlag = 2;
+ signal->theData[0] = fragrecptr.i;
+ signal->theData[1] = fragrecptr.p->p;
+ signal->theData[2] = fragrecptr.p->maxp;
+ sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 3, JBB);
+ break;
}
}
@@ -5515,20 +4779,6 @@
idrOperationRecPtr.i = ElementHeader::getOpPtrI(tidrElemhead);
ptrCheckGuard(idrOperationRecPtr, coprecsize, operationrec);
texcHashvalue = idrOperationRecPtr.p->hashvaluePart;
- if ((fragrecptr.p->createLcp == ZTRUE) &&
- (((texcHashvalue >> fragrecptr.p->hashcheckbit) & 1) != 0)) {
- jam();
- /* --------------------------------------------------------------------------------- */
- // During local checkpoints we must ensure that we restore the element header in
- // unlocked state and with the hash value part there with tuple status zeroed.
- // Otherwise a later insert over the same element will write an UNDO log that will
- // ensure that the now removed element is restored together with its locked element
- // header and without the hash value part.
- /* --------------------------------------------------------------------------------- */
- const Uint32 hv = idrOperationRecPtr.p->hashvaluePart;
- const Uint32 eh = ElementHeader::setUnlocked(hv, 0);
- excPageptr.p->word32[cexcElementptr] = eh;
- }//if
}//if
if (((texcHashvalue >> fragrecptr.p->hashcheckbit) & 1) == 0) {
jam();
@@ -5604,20 +4854,6 @@
idrOperationRecPtr.i = ElementHeader::getOpPtrI(tidrElemhead);
ptrCheckGuard(idrOperationRecPtr, coprecsize, operationrec);
texcHashvalue = idrOperationRecPtr.p->hashvaluePart;
- if ((fragrecptr.p->createLcp == ZTRUE) &&
- (((texcHashvalue >> fragrecptr.p->hashcheckbit) & 1) != 0)) {
- jam();
- /* --------------------------------------------------------------------------------- */
- // During local checkpoints we must ensure that we restore the element header in
- // unlocked state and with the hash value part there with tuple status zeroed.
- // Otherwise a later insert over the same element will write an UNDO log that will
- // ensure that the now removed element is restored together with its locked element
- // header and without the hash value part.
- /* --------------------------------------------------------------------------------- */
- const Uint32 hv = idrOperationRecPtr.p->hashvaluePart;
- const Uint32 eh = ElementHeader::setUnlocked(hv, 0);
- lastPageptr.p->word32[tlastElementptr] = eh;
- }//if
}//if
if (((texcHashvalue >> fragrecptr.p->hashcheckbit) & 1) == 0) {
jam();
@@ -5708,10 +4944,7 @@
DirRangePtr TDirRangePtr;
Page8Ptr TPageptr;
ScanRecPtr TscanPtr;
- RootfragmentrecPtr Trootfragrecptr;
- Trootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(Trootfragrecptr, crootfragmentsize, rootfragmentrec);
if (fragrecptr.p->p == 0) {
jam();
TmergeDest = fragrecptr.p->maxp >> 1;
@@ -5722,8 +4955,8 @@
TmergeSource = fragrecptr.p->maxp + fragrecptr.p->p;
for (Ti = 0; Ti < 4; Ti++) {
TreleaseScanIndicator[Ti] = 0;
- if (Trootfragrecptr.p->scan[Ti] != RNIL) {
- TscanPtr.i = Trootfragrecptr.p->scan[Ti];
+ if (fragrecptr.p->scan[Ti] != RNIL) {
+ TscanPtr.i = fragrecptr.p->scan[Ti];
ptrCheckGuard(TscanPtr, cscanRecSize, scanRec);
if (TscanPtr.p->activeLocalFrag == fragrecptr.i) {
//-------------------------------------------------------------
@@ -5785,7 +5018,7 @@
for (Ti = 0; Ti < 4; Ti++) {
if (TreleaseScanIndicator[Ti] == 1) {
jam();
- scanPtr.i = Trootfragrecptr.p->scan[Ti];
+ scanPtr.i = fragrecptr.p->scan[Ti];
ptrCheckGuard(scanPtr, cscanRecSize, scanRec);
rsbPageidptr.i = TPageptr.i;
rsbPageidptr.p = TPageptr.p;
@@ -5840,22 +5073,6 @@
return;
}//if
texpDirInd = (fragrecptr.p->maxp + fragrecptr.p->p) >> fragrecptr.p->k;
- if (((fragrecptr.p->maxp + fragrecptr.p->p) & ((1 << fragrecptr.p->k) - 1)) == 0) {
- if (fragrecptr.p->createLcp == ZTRUE) {
- if (fragrecptr.p->fragState == LCP_SEND_PAGES) {
- if (fragrecptr.p->lcpMaxDirIndex > texpDirInd) {
- if (fragrecptr.p->lcpDirIndex <= texpDirInd) {
- jam();
- /*--------------------------------------------------------------*/
- /* WE DO NOT ALLOW ANY SHRINKS THAT REMOVE PAGES THAT ARE */
- /* NEEDED AS PART OF THE LOCAL CHECKPOINT. */
- /*--------------------------------------------------------------*/
- return;
- }//if
- }//if
- }//if
- }//if
- }//if
if (fragrecptr.p->firstOverflowRec == RNIL) {
jam();
allocOverflowPage(signal);
@@ -5883,16 +5100,6 @@
/*--------------------------------------------------------------*/
return;
}//if
- if (fragrecptr.p->createLcp == ZTRUE) {
- if (remainingUndoPages() < ZMIN_UNDO_PAGES_AT_EXPAND) {
- jam();
- /*--------------------------------------------------------------*/
- // We did not have enough undo log buffers to start up an
- // shrink operation
- /*--------------------------------------------------------------*/
- return;
- }//if
- }//if
if (fragrecptr.p->p == 0) {
jam();
fragrecptr.p->maxp = fragrecptr.p->maxp >> 1;
@@ -5969,13 +5176,6 @@
}//if
tshrTmp1 = ZCON_HEAD_SIZE;
tshrTmp1 = tshrTmp1 << 26;
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- datapageptr.p = excPageptr.p;
- cundoinfolength = 1;
- cundoElemIndex = cexcContainerptr;
- undoWritingProcess(signal);
- }//if
dbgWord32(excPageptr, cexcContainerptr, tshrTmp1);
arrGuard(cexcContainerptr, 2048);
excPageptr.p->word32[cexcContainerptr] = tshrTmp1;
@@ -6157,19 +5357,6 @@
/* --------------------------------------------------------------------------------- */
idrOperationRecPtr.i = ElementHeader::getOpPtrI(tidrElemhead);
ptrCheckGuard(idrOperationRecPtr, coprecsize, operationrec);
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- /* --------------------------------------------------------------------------------- */
- // During local checkpoints we must ensure that we restore the element header in
- // unlocked state and with the hash value part there with tuple status zeroed.
- // Otherwise a later insert over the same element will write an UNDO log that will
- // ensure that the now removed element is restored together with its locked element
- // header and without the hash value part.
- /* --------------------------------------------------------------------------------- */
- const Uint32 hv = idrOperationRecPtr.p->hashvaluePart;
- const Uint32 eh = ElementHeader::setUnlocked(hv, 0);
- excPageptr.p->word32[tshrElementptr] = eh;
- }//if
}//if
tshrTmp = tshrElementptr + cexcForward;
guard21 = fragrecptr.p->localkeylen - 1;
@@ -6245,1739 +5432,7 @@
}//if
}//Dbacc::nextcontainerinfoExp()
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* */
-/* END OF EXPAND/SHRINK MODULE */
-/* */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* */
-/* LOCAL CHECKPOINT MODULE */
-/* */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* ******************--------------------------------------------------------------- */
-/* LCP_FRAGIDREQ */
-/* SENDER: LQH, LEVEL B */
-/* ENTER LCP_FRAGIDREQ WITH */
-/* TUSERPTR LQH CONNECTION PTR */
-/* TUSERBLOCKREF, LQH BLOCK REFERENCE */
-/* TCHECKPOINTID, THE CHECKPOINT NUMBER TO USE */
-/* (E.G. 1,2 OR 3) */
-/* TABPTR, TABLE ID = TABLE RECORD POINTER */
-/* TFID ROOT FRAGMENT ID */
-/* CACTIVE_UNDO_FILE_VERSION UNDO FILE VERSION 0,1,2 OR 3. */
-/* ******************--------------------------------------------------------------- */
-/* ******************--------------------------------------------------------------- */
-/* LCP_FRAGIDREQ REQUEST FOR LIST OF STOPED OPERATION */
-/* ******************------------------------------+ */
-/* SENDER: LQH, LEVEL B */
-void Dbacc::execLCP_FRAGIDREQ(Signal* signal)
-{
- jamEntry();
- tuserptr = signal->theData[0]; /* LQH CONNECTION PTR */
- tuserblockref = signal->theData[1]; /* LQH BLOCK REFERENCE */
- tcheckpointid = signal->theData[2]; /* THE CHECKPOINT NUMBER TO USE */
- /* (E.G. 1,2 OR 3) */
- tabptr.i = signal->theData[3]; /* TABLE ID = TABLE RECORD POINTER */
- ptrCheck(tabptr, ctablesize, tabrec);
- tfid = signal->theData[4]; /* ROOT FRAGMENT ID */
- cactiveUndoFileVersion = signal->theData[5]; /* UNDO FILE VERSION 0,1,2 OR 3. */
- tresult = 0;
- ndbrequire(getrootfragmentrec(signal, rootfragrecptr, tfid));
- ndbrequire(rootfragrecptr.p->rootState == ACTIVEROOT);
- seizeLcpConnectRec(signal);
- initLcpConnRec(signal);
- lcpConnectptr.p->rootrecptr = rootfragrecptr.i;
- rootfragrecptr.p->lcpPtr = lcpConnectptr.i;
- lcpConnectptr.p->localCheckPid = tcheckpointid;
- lcpConnectptr.p->lcpstate = LCP_ACTIVE;
- rootfragrecptr.p->rootState = LCP_CREATION;
- fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
- /* D6 AT FSOPENREQ =#010003FF. */
- tlfrTmp1 = 0x010003ff; /* FILE TYPE = .DATA ,VERSION OF FILENAME = 1 */
- tlfrTmp2 = 0x301; /* D7 CREATE, WRITE ONLY, TRUNCATE TO ZERO */
- ndbrequire(cfsFirstfreeconnect != RNIL);
- seizeFsConnectRec(signal);
- fsConnectptr.p->fragrecPtr = fragrecptr.i;
- fsConnectptr.p->fsState = WAIT_OPEN_DATA_FILE_FOR_WRITE;
- /* ----------- FILENAME (FILESYSTEM)/D3/DBACC/"T"TABID/"F"FRAGID/"S"VERSIONID.DATA ------------ */
- /* ************************ */
- /* FSOPENREQ */
- /* ************************ */
- signal->theData[0] = cownBlockref;
- signal->theData[1] = fsConnectptr.i;
- signal->theData[2] = tabptr.i; /* TABLE IDENTITY */
- signal->theData[3] = rootfragrecptr.p->fragmentid[0]; /* FRAGMENT IDENTITY */
- signal->theData[4] = lcpConnectptr.p->localCheckPid; /* CHECKPOINT ID */
- signal->theData[5] = tlfrTmp1;
- signal->theData[6] = tlfrTmp2;
- sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA);
- return;
-}//Dbacc::execLCP_FRAGIDREQ()
-
-/* ******************--------------------------------------------------------------- */
-/* FSOPENCONF OPENFILE CONF */
-/* SENDER: FS, LEVEL B */
-/* ENTER FSOPENCONF WITH */
-/* FS_CONNECTPTR, FS_CONNECTION PTR */
-/* TUSERPOINTER, FILE POINTER */
-/* ******************--------------------------------------------------------------- */
-void Dbacc::lcpFsOpenConfLab(Signal* signal)
-{
- fsConnectptr.p->fsPtr = tuserptr;
- fragrecptr.i = fsConnectptr.p->fragrecPtr;
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- rootfragrecptr.i = fragrecptr.p->myroot;
- fragrecptr.p->activeDataFilePage = 1; /* ZERO IS KEPT FOR PAGE_ZERO */
- fragrecptr.p->fsConnPtr = fsConnectptr.i;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- lcpConnectptr.i = rootfragrecptr.p->lcpPtr;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- if (rootfragrecptr.p->fragmentptr[0] == fragrecptr.i) {
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[1];
- ptrCheck(fragrecptr, cfragmentsize, fragmentrec);
- /* ----------- FILENAME (FILESYSTEM)/D3/DBACC/"T"TABID/"F"FRAGID/"S"VERSIONID.DATA ------------ */
- /* D6 AT FSOPENREQ =#010003FF. */
- tlfrTmp1 = 0x010003ff; /* FILE TYPE = .DATA ,VERSION OF FILENAME = 1 */
- tlfrTmp2 = 0x301; /* D7 CREATE, WRITE ONLY, TRUNCATE TO ZERO */
- ndbrequire(cfsFirstfreeconnect != RNIL);
- seizeFsConnectRec(signal);
- fsConnectptr.p->fragrecPtr = fragrecptr.i;
- fsConnectptr.p->fsState = WAIT_OPEN_DATA_FILE_FOR_WRITE;
- /* ************************ */
- /* FSOPENREQ */
- /* ************************ */
- signal->theData[0] = cownBlockref;
- signal->theData[1] = fsConnectptr.i;
- signal->theData[2] = rootfragrecptr.p->mytabptr; /* TABLE IDENTITY */
- signal->theData[3] = rootfragrecptr.p->fragmentid[1]; /* FRAGMENT IDENTITY */
- signal->theData[4] = lcpConnectptr.p->localCheckPid; /* CHECKPOINT ID */
- signal->theData[5] = tlfrTmp1;
- signal->theData[6] = tlfrTmp2;
- sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA);
- return;
- } else {
- ndbrequire(rootfragrecptr.p->fragmentptr[1] == fragrecptr.i);
- }//if
- /*---- BOTH DATA FILES ARE OPEN------*/
- /* ----IF THE UNDO FILE IS CLOSED , OPEN IT.----- */
- if (cactiveOpenUndoFsPtr != RNIL) {
- jam();
- sendLcpFragidconfLab(signal);
- return;
- }//if
- cactiveUndoFilePage = 0;
- cprevUndoaddress = cminusOne;
- cundoposition = 0;
- clastUndoPageIdWritten = 0;
- ndbrequire(cfsFirstfreeconnect != RNIL);
- seizeFsConnectRec(signal);
- fsConnectptr.p->fsState = WAIT_OPEN_UNDO_LCP;
- fsConnectptr.p->fsPart = 0; /* FILE INDEX, SECOND FILE IN THE DIRECTORY */
- cactiveOpenUndoFsPtr = fsConnectptr.i;
- cactiveRootfrag = rootfragrecptr.i;
- tlfrTmp1 = 1; /* FILE VERSION */
- tlfrTmp1 = (tlfrTmp1 << 8) + ZLOCALLOGFILE; /* .LOCLOG = 2 */
- tlfrTmp1 = (tlfrTmp1 << 8) + 4; /* ROOT DIRECTORY = D4 */
- tlfrTmp1 = (tlfrTmp1 << 8) + fsConnectptr.p->fsPart; /* P2 */
- tlfrTmp2 = 0x302; /* D7 CREATE , READ / WRITE , TRUNCATE TO ZERO */
- /* ---FILE NAME "D4"/"DBACC"/LCP_CONNECTPTR:LOCAL_CHECK_PID/FS_CONNECTPTR:FS_PART".LOCLOG-- */
- /* ************************ */
- /* FSOPENREQ */
- /* ************************ */
- signal->theData[0] = cownBlockref;
- signal->theData[1] = fsConnectptr.i;
- signal->theData[2] = cminusOne; /* #FFFFFFFF */
- signal->theData[3] = cminusOne; /* #FFFFFFFF */
- signal->theData[4] = cactiveUndoFileVersion;
- /* A GROUP OF UNDO FILES WHICH ARE UPDATED */
- signal->theData[5] = tlfrTmp1;
- signal->theData[6] = tlfrTmp2;
- sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA);
- return;
-}//Dbacc::lcpFsOpenConfLab()
-
-void Dbacc::lcpOpenUndofileConfLab(Signal* signal)
-{
- ptrGuard(fsConnectptr);
- fsConnectptr.p->fsState = WAIT_NOTHING;
- rootfragrecptr.i = cactiveRootfrag;
- ptrCheck(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- fsConnectptr.p->fsPtr = tuserptr;
- sendLcpFragidconfLab(signal);
- return;
-}//Dbacc::lcpOpenUndofileConfLab()
-
-void Dbacc::sendLcpFragidconfLab(Signal* signal)
-{
- ptrGuard(rootfragrecptr);
- lcpConnectptr.i = rootfragrecptr.p->lcpPtr;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- /* ************************ */
- /* LCP_FRAGIDCONF */
- /* ************************ */
- signal->theData[0] = lcpConnectptr.p->lcpUserptr;
- signal->theData[1] = lcpConnectptr.i;
- signal->theData[2] = 2;
- /* NO OF LOCAL FRAGMENTS */
- signal->theData[3] = rootfragrecptr.p->fragmentid[0];
- signal->theData[4] = rootfragrecptr.p->fragmentid[1];
- signal->theData[5] = RNIL;
- signal->theData[6] = RNIL;
- sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_LCP_FRAGIDCONF, signal, 7, JBB);
- return;
-}//Dbacc::sendLcpFragidconfLab()
-
-/* ******************--------------------------------------------------------------- */
-/* LCP_HOLDOPERATION REQUEST FOR LIST OF STOPED OPERATION */
-/* SENDER: LQH, LEVEL B */
-/* ENTER LCP_HOLDOPREQ WITH */
-/* LCP_CONNECTPTR CONNECTION POINTER */
-/* TFID, LOCAL FRAGMENT ID */
-/* THOLD_PREV_SENT_OP NR OF SENT OPERATIONS AT */
-/* PREVIOUS SIGNALS */
-/* TLQH_POINTER LQH USER POINTER */
-/* ******************--------------------------------------------------------------- */
-/* ******************--------------------------------------------------------------- */
-/* LCP_HOLDOPERATION REQUEST FOR LIST OF STOPED OPERATION */
-/* ******************------------------------------+ */
-/* SENDER: LQH, LEVEL B */
-void Dbacc::execLCP_HOLDOPREQ(Signal* signal)
-{
- Uint32 tholdPrevSentOp;
-
- jamEntry();
- lcpConnectptr.i = signal->theData[0]; /* CONNECTION POINTER */
- tfid = signal->theData[1]; /* LOCAL FRAGMENT ID */
- tholdPrevSentOp = signal->theData[2]; /* NR OF SENT OPERATIONS AT */
- /* PREVIOUS SIGNALS */
- tlqhPointer = signal->theData[3]; /* LQH USER POINTER */
-
- tresult = 0;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- ndbrequire(lcpConnectptr.p->lcpstate == LCP_ACTIVE);
- rootfragrecptr.i = lcpConnectptr.p->rootrecptr;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- if (rootfragrecptr.p->fragmentid[0] == tfid) {
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- } else {
- ndbrequire(rootfragrecptr.p->fragmentid[1] == tfid);
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[1];
- }//if
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- fragrecptr.p->lcpLqhPtr = tlqhPointer;
- if (tholdPrevSentOp != 0) {
- ndbrequire(fragrecptr.p->fragState == SEND_QUE_OP);
- } else if (tholdPrevSentOp == 0) {
- jam();
- fragrecptr.p->fragState = SEND_QUE_OP;
- fragrecptr.p->stopQueOp = ZTRUE;
- fragrecptr.p->sentWaitInQueOp = fragrecptr.p->firstWaitInQueOp;
- }//if
- tholdSentOp = 0; /* NR OF OPERATION WHICH ARE SENT THIS TIME */
- operationRecPtr.i = fragrecptr.p->sentWaitInQueOp;
-
- /* --------------------------------------------- */
- /* GO THROUGH ALL OPERATION IN THE WAIT */
- /* LIST AND SEND THE LQH CONNECTION PTR OF THE */
- /* OPERATIONS TO THE LQH BLOCK. MAX 23 0PERATION */
- /* PER SIGNAL */
- /* --------------------------------------------- */
- while (operationRecPtr.i != RNIL) {
- jam();
- ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
- ckeys[tholdSentOp] = operationRecPtr.p->userptr;
- operationRecPtr.i = operationRecPtr.p->nextQueOp;
- tholdSentOp++;
- if ((tholdSentOp >= 23) &&
- (operationRecPtr.i != RNIL)) {
- jam();
- /* ----------------------------------------------- */
- /* THERE IS MORE THAN 23 WAIT OPERATION. WE */
- /* HAVE TO SEND THESE 23 AND WAITE FOR NEXT SIGNAL */
- /* ----------------------------------------------- */
- tholdMore = ZTRUE; /* SECOUND DATA AT THE CONF SIGNAL , = MORE */
- fragrecptr.p->sentWaitInQueOp = operationRecPtr.i;
- sendholdconfsignalLab(signal);
- return;
- }//if
- }//while
- /* ----------------------------------------------- */
- /* OPERATION_REC_PTR = RNIL */
- /* THERE IS NO MORE WAITING OPERATION, STATE OF */
- /* THE FRAGMENT RRECORD IS CHANGED AND RETURN */
- /* SIGNAL IS SENT */
- /* ----------------------------------------------- */
- fragrecptr.p->sentWaitInQueOp = RNIL;
- tholdMore = ZFALSE; /* SECOND DATA AT THE CONF SIGNAL , = NOT MORE */
- fragrecptr.p->fragState = WAIT_ACC_LCPREQ;
- sendholdconfsignalLab(signal);
- return;
-}//Dbacc::execLCP_HOLDOPREQ()
-
-void Dbacc::sendholdconfsignalLab(Signal* signal)
-{
- tholdMore = (tholdMore << 16) + tholdSentOp;
- /* SECOND SIGNAL DATA, LENGTH + MORE */
- /* ************************ */
- /* LCP_HOLDOPCONF */
- /* ************************ */
- signal->theData[0] = fragrecptr.p->lcpLqhPtr;
- signal->theData[1] = tholdMore;
- signal->theData[2] = ckeys[0];
- signal->theData[3] = ckeys[1];
- signal->theData[4] = ckeys[2];
- signal->theData[5] = ckeys[3];
- signal->theData[6] = ckeys[4];
- signal->theData[7] = ckeys[5];
- signal->theData[8] = ckeys[6];
- signal->theData[9] = ckeys[7];
- signal->theData[10] = ckeys[8];
- signal->theData[11] = ckeys[9];
- signal->theData[12] = ckeys[10];
- signal->theData[13] = ckeys[11];
- signal->theData[14] = ckeys[12];
- signal->theData[15] = ckeys[13];
- signal->theData[16] = ckeys[14];
- signal->theData[17] = ckeys[15];
- signal->theData[18] = ckeys[16];
- signal->theData[19] = ckeys[17];
- signal->theData[20] = ckeys[18];
- signal->theData[21] = ckeys[19];
- signal->theData[22] = ckeys[20];
- signal->theData[23] = ckeys[21];
- signal->theData[24] = ckeys[22];
- sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_LCP_HOLDOPCONF, signal, 25, JBA);
- return;
-}//Dbacc::sendholdconfsignalLab()
-
-/**
- * execACC_LCPREQ
- * Perform local checkpoint of a fragment
- *
- * SENDER: LQH, LEVEL B
- * ENTER ACC_LCPREQ WITH
- * LCP_CONNECTPTR, OPERATION RECORD PTR
- * TLCP_LQH_CHECK_V, LQH'S LOCAL FRAG CHECK VALUE
- * TLCP_LOCAL_FRAG_ID, LOCAL FRAG ID
- *
- */
-void Dbacc::execACC_LCPREQ(Signal* signal)
-{
- Uint32 tlcpLocalFragId;
- Uint32 tlcpLqhCheckV;
-
- jamEntry();
- lcpConnectptr.i = signal->theData[0]; // CONNECTION PTR
- tlcpLqhCheckV = signal->theData[1]; // LQH'S LOCAL FRAG CHECK VALUE
- tlcpLocalFragId = signal->theData[2]; // LOCAL FRAG ID
- tresult = 0;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- ndbrequire(lcpConnectptr.p->lcpstate == LCP_ACTIVE);
-
- rootfragrecptr.i = lcpConnectptr.p->rootrecptr;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- if (rootfragrecptr.p->fragmentid[0] == tlcpLocalFragId) {
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
- } else {
- ndbrequire(rootfragrecptr.p->fragmentid[1] == tlcpLocalFragId);
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[1];
- }//if
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- ndbrequire(fragrecptr.p->fragState == WAIT_ACC_LCPREQ);
- fragrecptr.p->lcpLqhPtr = tlcpLqhCheckV;
-
- Page8Ptr zeroPagePtr;
- seizeLcpPage(zeroPagePtr);
- fragrecptr.p->zeroPagePtr = zeroPagePtr.i;
- fragrecptr.p->prevUndoposition = cminusOne;
- initRootFragPageZero(rootfragrecptr, zeroPagePtr);
- initFragPageZero(fragrecptr, zeroPagePtr);
- /*-----------------------------------------------------------------*/
- /* SEIZE ZERO PAGE FIRST AND THEN SEIZE DATA PAGES IN */
- /* BACKWARDS ORDER. THIS IS TO ENSURE THAT WE GET THE PAGES */
- /* IN ORDER. ON WINDOWS NT THIS WILL BE A BENEFIT SINCE WE */
- /* CAN THEN DO 1 WRITE_FILE INSTEAD OF 8. */
- /* WHEN WE RELEASE THE PAGES WE RELEASE THEM IN THE OPPOSITE */
- /* ORDER. */
- /*-----------------------------------------------------------------*/
- for (Uint32 taspTmp = ZWRITEPAGESIZE - 1; (Uint32)~taspTmp; taspTmp--) {
- Page8Ptr dataPagePtr;
- jam();
- ndbrequire(fragrecptr.p->datapages[taspTmp] == RNIL);
- seizeLcpPage(dataPagePtr);
- fragrecptr.p->datapages[taspTmp] = dataPagePtr.i;
- }//for
- fragrecptr.p->lcpMaxDirIndex = fragrecptr.p->dirsize;
- fragrecptr.p->lcpMaxOverDirIndex = fragrecptr.p->lastOverIndex;
- fragrecptr.p->createLcp = ZTRUE;
- operationRecPtr.i = fragrecptr.p->lockOwnersList;
- lcp_write_op_to_undolog(signal);
-}
-
-void
-Dbacc::lcp_write_op_to_undolog(Signal* signal)
-{
- bool delay_continueb= false;
- Uint32 i, j;
- for (i= 0; i < 16; i++) {
- jam();
- if (remainingUndoPages() <= ZMIN_UNDO_PAGES_AT_COMMIT) {
- jam();
- delay_continueb= true;
- break;
- }
- for (j= 0; j < 32; j++) {
- if (operationRecPtr.i == RNIL) {
- jam();
- break;
- }
- jam();
- ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
-
- if ((operationRecPtr.p->operation == ZINSERT) ||
- (operationRecPtr.p->elementIsDisappeared == ZTRUE)){
- /*******************************************************************
- * Only log inserts and elements that are marked as dissapeared.
- * All other operations update the element header and that is handled
- * when pages are written to disk
- ********************************************************************/
- undopageptr.i = (cundoposition>>ZUNDOPAGEINDEXBITS) & (cundopagesize-1);
- ptrAss(undopageptr, undopage);
- theadundoindex = cundoposition & ZUNDOPAGEINDEX_MASK;
- tundoindex = theadundoindex + ZUNDOHEADSIZE;
-
- writeUndoOpInfo(signal);/* THE INFORMATION ABOUT ELEMENT HEADER, STORED*/
- /* IN OP REC, IS WRITTEN AT UNDO PAGES */
- cundoElemIndex = 0;/* DEFAULT VALUE USED BY WRITE_UNDO_HEADER SUBROTINE */
- writeUndoHeader(signal, RNIL, UndoHeader::ZOP_INFO); /* WRITE THE HEAD OF THE UNDO ELEMENT */
- checkUndoPages(signal); /* SEND UNDO PAGE TO DISK WHEN A GROUP OF */
- /* UNDO PAGES,CURRENTLY 8, IS FILLED */
- }
- operationRecPtr.i = operationRecPtr.p->nextLockOwnerOp;
- }
- if (operationRecPtr.i == RNIL) {
- jam();
- break;
- }
- }
- if (operationRecPtr.i != RNIL) {
- jam();
- signal->theData[0]= ZLCP_OP_WRITE_RT_BREAK;
- signal->theData[1]= operationRecPtr.i;
- signal->theData[2]= fragrecptr.i;
- signal->theData[3]= lcpConnectptr.i;
- if (delay_continueb) {
- jam();
- sendSignalWithDelay(cownBlockref, GSN_CONTINUEB, signal, 10, 4);
- } else {
- jam();
- sendSignal(cownBlockref, GSN_CONTINUEB, signal, 4, JBB);
- }
- return;
- }
-
- signal->theData[0] = fragrecptr.p->lcpLqhPtr;
- sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_LCPSTARTED,
- signal, 1, JBA);
-
- fragrecptr.p->activeDataPage = 0;
- fragrecptr.p->lcpDirIndex = 0;
- fragrecptr.p->fragState = LCP_SEND_PAGES;
-
- signal->theData[0] = lcpConnectptr.i;
- signal->theData[1] = fragrecptr.i;
- sendSignal(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 2, JBB);
-}
-
-/* ******************--------------------------------------------------------------- */
-/* ACC_SAVE_PAGES A GROUP OF PAGES IS ALLOCATED. THE PAGES AND OVERFLOW */
-/* PAGES OF THE FRAGMENT ARE COPIED IN THEM AND IS SEND TO */
-/* THE DATA FILE OF THE CHECK POINT. */
-/* SENDER: ACC, LEVEL B */
-/* ENTER ACC_SAVE_PAGES WITH */
-/* LCP_CONNECTPTR, CONNECTION RECORD PTR */
-/* FRAGRECPTR FRAGMENT RECORD PTR */
-/* ******************--------------------------------------------------------------- */
-/* ******************--------------------------------------------------------------- */
-/* ACC_SAVE_PAGES REQUEST TO SEND THE PAGE TO DISK */
-/* ******************------------------------------+ UNDO PAGES */
-/* SENDER: ACC, LEVEL B */
-void Dbacc::execACC_SAVE_PAGES(Signal* signal)
-{
- jamEntry();
- lcpConnectptr.i = signal->theData[0];
- /* CONNECTION RECORD PTR */
- fragrecptr.i = signal->theData[1];
- /* FRAGMENT RECORD PTR */
- tresult = 0;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- if (lcpConnectptr.p->lcpstate != LCP_ACTIVE) {
- jam();
- sendSystemerror(signal);
- return;
- }//if
- if (ERROR_INSERTED(3000)) {
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- if (rootfragrecptr.p->mytabptr == c_errorInsert3000_TableId){
- ndbout << "Delay writing of datapages" << endl;
- // Delay writing of pages
- jam();
- sendSignalWithDelay(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 1000, 2);
- return;
- }
- }
- if (clblPageCounter == 0) {
- jam();
- signal->theData[0] = lcpConnectptr.i;
- signal->theData[1] = fragrecptr.i;
- sendSignalWithDelay(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 100, 2);
- return;
- } else {
- jam();
- clblPageCounter = clblPageCounter - 1;
- }//if
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- if (fragrecptr.p->fragState == LCP_SEND_PAGES) {
- jam();
- savepagesLab(signal);
- return;
- } else {
- if (fragrecptr.p->fragState == LCP_SEND_OVER_PAGES) {
- jam();
- saveOverPagesLab(signal);
- return;
- } else {
- ndbrequire(fragrecptr.p->fragState == LCP_SEND_ZERO_PAGE);
- jam();
- saveZeroPageLab(signal);
- return;
- }//if
- }//if
-}//Dbacc::execACC_SAVE_PAGES()
-
-void Dbacc::savepagesLab(Signal* signal)
-{
- DirRangePtr spDirRangePtr;
- DirectoryarrayPtr spDirptr;
- Page8Ptr aspPageptr;
- Page8Ptr aspCopyPageptr;
- Uint32 taspDirindex;
- Uint32 taspDirIndex;
- Uint32 taspIndex;
-
- if ((fragrecptr.p->lcpDirIndex >= fragrecptr.p->dirsize) ||
- (fragrecptr.p->lcpDirIndex >= fragrecptr.p->lcpMaxDirIndex)) {
- jam();
- endsavepageLab(signal);
- return;
- }//if
- /* SOME EXPAND PROCESSES HAVE BEEN PERFORMED. */
- /* THE ADDED PAGE ARE NOT SENT TO DISK */
- arrGuard(fragrecptr.p->activeDataPage, 8);
- aspCopyPageptr.i = fragrecptr.p->datapages[fragrecptr.p->activeDataPage];
- ptrCheckGuard(aspCopyPageptr, cpagesize, page8);
- taspDirindex = fragrecptr.p->lcpDirIndex; /* DIRECTORY OF ACTIVE PAGE */
- spDirRangePtr.i = fragrecptr.p->directory;
- taspDirIndex = taspDirindex >> 8;
- taspIndex = taspDirindex & 0xff;
- ptrCheckGuard(spDirRangePtr, cdirrangesize, dirRange);
- arrGuard(taspDirIndex, 256);
- spDirptr.i = spDirRangePtr.p->dirArray[taspDirIndex];
- ptrCheckGuard(spDirptr, cdirarraysize, directoryarray);
- aspPageptr.i = spDirptr.p->pagep[taspIndex];
- ptrCheckGuard(aspPageptr, cpagesize, page8);
- ndbrequire(aspPageptr.p->word32[ZPOS_PAGE_ID] == fragrecptr.p->lcpDirIndex);
- lcnPageptr = aspPageptr;
- lcnCopyPageptr = aspCopyPageptr;
- lcpCopyPage(signal);
- fragrecptr.p->lcpDirIndex++;
- fragrecptr.p->activeDataPage++;
- if (fragrecptr.p->activeDataPage < ZWRITEPAGESIZE) {
- jam();
- signal->theData[0] = lcpConnectptr.i;
- signal->theData[1] = fragrecptr.i;
- sendSignal(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 2, JBB);
- return;
- }//if
- senddatapagesLab(signal);
- return;
-}//Dbacc::savepagesLab()
-
-/* FRAGRECPTR:ACTIVE_DATA_PAGE = ZWRITEPAGESIZE */
-/* SEND A GROUP OF PAGES TO DISK */
-void Dbacc::senddatapagesLab(Signal* signal)
-{
- fsConnectptr.i = fragrecptr.p->fsConnPtr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- seizeFsOpRec(signal);
- initFsOpRec(signal);
- fsOpptr.p->fsOpstate = WAIT_WRITE_DATA;
- ndbrequire(fragrecptr.p->activeDataPage <= 8);
- for (Uint32 i = 0; i < fragrecptr.p->activeDataPage; i++) {
- signal->theData[i + 6] = fragrecptr.p->datapages[i];
- }//for
- signal->theData[fragrecptr.p->activeDataPage + 6] = fragrecptr.p->activeDataFilePage;
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsOpptr.i;
- signal->theData[3] = 0x2;
- signal->theData[4] = ZPAGE8_BASE_ADD;
- signal->theData[5] = fragrecptr.p->activeDataPage;
- sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, 15, JBA);
- return;
-}//Dbacc::senddatapagesLab()
-
-void Dbacc::endsavepageLab(Signal* signal)
-{
- Page8Ptr espPageidptr;
-
- espPageidptr.i = fragrecptr.p->zeroPagePtr;
- ptrCheckGuard(espPageidptr, cpagesize, page8);
- dbgWord32(espPageidptr, ZPAGEZERO_NO_PAGES, fragrecptr.p->lcpDirIndex);
- espPageidptr.p->word32[ZPAGEZERO_NO_PAGES] = fragrecptr.p->lcpDirIndex;
- fragrecptr.p->fragState = LCP_SEND_OVER_PAGES;
- fragrecptr.p->noOfStoredOverPages = 0;
- fragrecptr.p->lcpDirIndex = 0;
- saveOverPagesLab(signal);
- return;
-}//Dbacc::endsavepageLab()
-
-/* ******************--------------------------------------------------------------- */
-/* ACC_SAVE_OVER_PAGES CONTINUE SAVING THE LEFT OVERPAGES. */
-/* ******************--------------------------------------------------------------- */
-void Dbacc::saveOverPagesLab(Signal* signal)
-{
- DirRangePtr sopDirRangePtr;
- DirectoryarrayPtr sopOverflowDirptr;
- Page8Ptr sopPageptr;
- Page8Ptr sopCopyPageptr;
- Uint32 tsopDirindex;
- Uint32 tsopDirInd;
- Uint32 tsopIndex;
-
- if ((fragrecptr.p->lcpDirIndex >= fragrecptr.p->lastOverIndex) ||
- (fragrecptr.p->lcpDirIndex >= fragrecptr.p->lcpMaxOverDirIndex)) {
- jam();
- endsaveoverpageLab(signal);
- return;
- }//if
- arrGuard(fragrecptr.p->activeDataPage, 8);
- sopCopyPageptr.i = fragrecptr.p->datapages[fragrecptr.p->activeDataPage];
- ptrCheckGuard(sopCopyPageptr, cpagesize, page8);
- tsopDirindex = fragrecptr.p->lcpDirIndex;
- sopDirRangePtr.i = fragrecptr.p->overflowdir;
- tsopDirInd = tsopDirindex >> 8;
- tsopIndex = tsopDirindex & 0xff;
- ptrCheckGuard(sopDirRangePtr, cdirrangesize, dirRange);
- arrGuard(tsopDirInd, 256);
- sopOverflowDirptr.i = sopDirRangePtr.p->dirArray[tsopDirInd];
- ptrCheckGuard(sopOverflowDirptr, cdirarraysize, directoryarray);
- sopPageptr.i = sopOverflowDirptr.p->pagep[tsopIndex];
- fragrecptr.p->lcpDirIndex++;
- if (sopPageptr.i != RNIL) {
- jam();
- ptrCheckGuard(sopPageptr, cpagesize, page8);
- ndbrequire(sopPageptr.p->word32[ZPOS_PAGE_ID] == tsopDirindex);
- ndbrequire(((sopPageptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) != ZNORMAL_PAGE_TYPE);
- lcnPageptr = sopPageptr;
- lcnCopyPageptr = sopCopyPageptr;
- lcpCopyPage(signal);
- fragrecptr.p->noOfStoredOverPages++;
- fragrecptr.p->activeDataPage++;
- if ((sopPageptr.p->word32[ZPOS_ALLOC_CONTAINERS] == 0)) {
- //ndbrequire(((sopPageptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) == ZOVERFLOW_PAGE_TYPE);
- if (((sopPageptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) ==
- ZOVERFLOW_PAGE_TYPE) {
- /*--------------------------------------------------------------------------------*/
- /* THE PAGE IS EMPTY AND WAITING TO BE RELEASED. IT COULD NOT BE RELEASED */
- /* EARLIER SINCE IT WAS PART OF A LOCAL CHECKPOINT. */
- /*--------------------------------------------------------------------------------*/
- jam();
- ropPageptr = sopPageptr;
- releaseOverpage(signal);
- } else {
- jam();
- sendSystemerror(signal);
- }
- }//if
- }
- if (fragrecptr.p->activeDataPage == ZWRITEPAGESIZE) {
- jam();
- senddatapagesLab(signal);
- return;
- }//if
- signal->theData[0] = lcpConnectptr.i;
- signal->theData[1] = fragrecptr.i;
- sendSignal(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 2, JBB);
- return;
-}//Dbacc::saveOverPagesLab()
-
-void Dbacc::endsaveoverpageLab(Signal* signal)
-{
- Page8Ptr esoPageidptr;
-
- esoPageidptr.i = fragrecptr.p->zeroPagePtr;
- ptrCheckGuard(esoPageidptr, cpagesize, page8);
- dbgWord32(esoPageidptr, ZPAGEZERO_NO_OVER_PAGE, fragrecptr.p->noOfStoredOverPages);
- esoPageidptr.p->word32[ZPAGEZERO_NO_OVER_PAGE] = fragrecptr.p->noOfStoredOverPages;
- fragrecptr.p->fragState = LCP_SEND_ZERO_PAGE;
- if (fragrecptr.p->activeDataPage != 0) {
- jam();
- senddatapagesLab(signal); /* SEND LEFT PAGES TO DISK */
- return;
- }//if
- saveZeroPageLab(signal);
- return;
-}//Dbacc::endsaveoverpageLab()
-
-/* ******************--------------------------------------------------------------- */
-/* ACC_SAVE_ZERO_PAGE PAGE ZERO IS SENT TO DISK.IT IS THE LAST STAGE AT THE */
-/* CREATION LCP. ACC_LCPCONF IS RETURND. */
-/* ******************--------------------------------------------------------------- */
-void Dbacc::saveZeroPageLab(Signal* signal)
-{
- Page8Ptr szpPageidptr;
- Uint32 Tchs;
- Uint32 Ti;
-
- fragrecptr.p->createLcp = ZFALSE;
- fsConnectptr.i = fragrecptr.p->fsConnPtr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- szpPageidptr.i = fragrecptr.p->zeroPagePtr;
- ptrCheckGuard(szpPageidptr, cpagesize, page8);
- dbgWord32(szpPageidptr, ZPAGEZERO_PREV_UNDOP, fragrecptr.p->prevUndoposition);
- szpPageidptr.p->word32[ZPAGEZERO_PREV_UNDOP] = fragrecptr.p->prevUndoposition;
- dbgWord32(szpPageidptr, ZPAGEZERO_NEXT_UNDO_FILE, cactiveUndoFileVersion);
- szpPageidptr.p->word32[ZPAGEZERO_NEXT_UNDO_FILE] = cactiveUndoFileVersion;
- fragrecptr.p->fragState = WAIT_ZERO_PAGE_STORED;
-
- /* --------------------------------------------------------------------------------- */
- // Calculate the checksum and store it for the zero page of the fragment.
- /* --------------------------------------------------------------------------------- */
- szpPageidptr.p->word32[ZPOS_CHECKSUM] = 0;
- Tchs = 0;
- for (Ti = 0; Ti < 2048; Ti++) {
- Tchs = Tchs ^ szpPageidptr.p->word32[Ti];
- }//for
- szpPageidptr.p->word32[ZPOS_CHECKSUM] = Tchs;
- dbgWord32(szpPageidptr, ZPOS_CHECKSUM, Tchs);
-
- seizeFsOpRec(signal);
- initFsOpRec(signal);
- fsOpptr.p->fsOpstate = WAIT_WRITE_DATA;
- if (clblPageCounter > 0) {
- jam();
- clblPageCounter = clblPageCounter - 1;
- } else {
- jam();
- clblPageOver = clblPageOver + 1;
- }//if
- /* ************************ */
- /* FSWRITEREQ */
- /* ************************ */
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsOpptr.i;
- signal->theData[3] = 0x10;
- /* FLAG = LIST MEM PAGES, LIST FILE PAGES */
- /* SYNC FILE AFTER WRITING */
- signal->theData[4] = ZPAGE8_BASE_ADD;
- signal->theData[5] = 1;
- /* NO OF PAGES */
- signal->theData[6] = fragrecptr.p->zeroPagePtr;
- /* ZERO PAGE */
- signal->theData[7] = 0;
- sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, 8, JBA);
- /* ZERO PAGE AT DATA FILE */
- return;
-}//Dbacc::saveZeroPageLab()
-
-/* ******************--------------------------------------------------------------- */
-/* FSWRITECONF OPENFILE CONF */
-/* ENTER FSWRITECONF WITH SENDER: FS, LEVEL B */
-/* FS_OPPTR FS_CONNECTION PTR */
-/* ******************--------------------------------------------------------------- */
-void Dbacc::lcpCloseDataFileLab(Signal* signal)
-{
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- lcpConnectptr.i = rootfragrecptr.p->lcpPtr;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- fsConnectptr.p->fsState = LCP_CLOSE_DATA;
- /* ************************ */
- /* FSCLOSEREQ */
- /* ************************ */
- /* CLOSE DATA FILE */
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsConnectptr.i;
- signal->theData[3] = ZFALSE;
- sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, 4, JBA);
- /* FLAG = 0, DO NOT DELETE FILE */
- return;
-}//Dbacc::lcpCloseDataFileLab()
-
-void Dbacc::checkSyncUndoPagesLab(Signal* signal)
-{
- fragrecptr.i = fsConnectptr.p->fragrecPtr;
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- releaseFsConnRec(signal);
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- lcpConnectptr.i = rootfragrecptr.p->lcpPtr;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- switch (lcpConnectptr.p->syncUndopageState) {
- case WAIT_NOTHING:
- jam();
- lcpConnectptr.p->syncUndopageState = WAIT_ONE_CONF;
- break;
- case WAIT_ONE_CONF:
- jam();
- lcpConnectptr.p->syncUndopageState = WAIT_TWO_CONF;
- break;
- default:
- jam();
- sendSystemerror(signal);
- return;
- break;
- }//switch
-
- /* ACTIVE UNDO PAGE ID */
- Uint32 tundoPageId = cundoposition >> ZUNDOPAGEINDEXBITS;
- tmp1 = tundoPageId - (tundoPageId & (ZWRITE_UNDOPAGESIZE - 1));
- /* START PAGE OF THE LAST UNDO PAGES GROUP */
- tmp2 = (tundoPageId - tmp1) + 1; /* NO OF LEFT UNDO PAGES */
- tmp1 = tmp1 & (cundopagesize - 1); /* 1 MBYTE PAGE WINDOW IN MEMORY */
- fsConnectptr.i = cactiveOpenUndoFsPtr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- seizeFsOpRec(signal);
- initFsOpRec(signal);
- fsOpptr.p->fsOpstate = WAIT_WRITE_UNDO;
- fsOpptr.p->fsOpMemPage = tundoPageId; /* RECORD MEMORY PAGE WRITTEN */
- if (clblPageCounter >= (4 * tmp2)) {
- jam();
- clblPageCounter = clblPageCounter - (4 * tmp2);
- } else {
- jam();
- clblPageOver = clblPageOver + ((4 * tmp2) - clblPageCounter);
- clblPageCounter = 0;
- }//if
- /* ************************ */
- /* FSWRITEREQ */
- /* ************************ */
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsOpptr.i;
- /* FLAG = START MEM PAGES, START FILE PAGES */
- /* SYNC FILE AFTER WRITING */
- signal->theData[3] = 0x11;
- signal->theData[4] = ZUNDOPAGE_BASE_ADD;
- /* NO OF UNDO PAGES */
- signal->theData[5] = tmp2;
- /* FIRST MEMORY PAGE */
- signal->theData[6] = tmp1;
- /* ACTIVE PAGE AT UNDO FILE */
- signal->theData[7] = cactiveUndoFilePage;
- sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, 8, JBA);
-
- return;
-}//Dbacc::checkSyncUndoPagesLab()
-
-void Dbacc::checkSendLcpConfLab(Signal* signal)
-{
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- lcpConnectptr.i = rootfragrecptr.p->lcpPtr;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- ndbrequire(lcpConnectptr.p->lcpstate == LCP_ACTIVE);
- switch (lcpConnectptr.p->syncUndopageState) {
- case WAIT_ONE_CONF:
- jam();
- lcpConnectptr.p->syncUndopageState = WAIT_NOTHING;
- break;
- case WAIT_TWO_CONF:
- jam();
- lcpConnectptr.p->syncUndopageState = WAIT_ONE_CONF;
- break;
- default:
- ndbrequire(false);
- break;
- }//switch
- lcpConnectptr.p->noOfLcpConf++;
- ndbrequire(lcpConnectptr.p->noOfLcpConf <= 4);
- fragrecptr.p->fragState = ACTIVEFRAG;
- rlpPageptr.i = fragrecptr.p->zeroPagePtr;
- ptrCheckGuard(rlpPageptr, cpagesize, page8);
- releaseLcpPage(signal);
- fragrecptr.p->zeroPagePtr = RNIL;
- for (Uint32 i = 0; i < ZWRITEPAGESIZE; i++) {
- jam();
- if (fragrecptr.p->datapages[i] != RNIL) {
- jam();
- rlpPageptr.i = fragrecptr.p->datapages[i];
- ptrCheckGuard(rlpPageptr, cpagesize, page8);
- releaseLcpPage(signal);
- fragrecptr.p->datapages[i] = RNIL;
- }//if
- }//for
- signal->theData[0] = fragrecptr.p->lcpLqhPtr;
- sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_LCPCONF, signal, 1, JBB);
- if (lcpConnectptr.p->noOfLcpConf == 4) {
- jam();
- releaseLcpConnectRec(signal);
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- rootfragrecptr.p->rootState = ACTIVEROOT;
- }//if
-}//Dbacc::checkSendLcpConfLab()
-
-/* ******************--------------------------------------------------------------- */
-/* ACC_CONTOPREQ */
-/* SENDER: LQH, LEVEL B */
-/* ENTER ACC_CONTOPREQ WITH */
-/* LCP_CONNECTPTR */
-/* TMP1 LOCAL FRAG ID */
-/* ******************--------------------------------------------------------------- */
-/* ******************--------------------------------------------------------------- */
-/* ACC_CONTOPREQ COMMIT TRANSACTION */
-/* ******************------------------------------+ */
-/* SENDER: LQH, LEVEL B */
-void Dbacc::execACC_CONTOPREQ(Signal* signal)
-{
- Uint32 tcorLocalFrag;
-
- jamEntry();
- lcpConnectptr.i = signal->theData[0];
- /* CONNECTION PTR */
- tcorLocalFrag = signal->theData[1];
- /* LOCAL FRAG ID */
- tresult = 0;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- if(ERROR_INSERTED(3002) && lcpConnectptr.p->noOfLcpConf < 2)
- {
- sendSignalWithDelay(cownBlockref, GSN_ACC_CONTOPREQ, signal, 300,
- signal->getLength());
- return;
- }
-
- ndbrequire(lcpConnectptr.p->lcpstate == LCP_ACTIVE);
- rootfragrecptr.i = lcpConnectptr.p->rootrecptr;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- if (rootfragrecptr.p->fragmentid[0] == tcorLocalFrag) {
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- } else {
- ndbrequire(rootfragrecptr.p->fragmentid[1] == tcorLocalFrag);
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[1];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- }//if
- operationRecPtr.i = fragrecptr.p->firstWaitInQueOp;
- fragrecptr.p->sentWaitInQueOp = RNIL;
- fragrecptr.p->stopQueOp = ZFALSE;
- while (operationRecPtr.i != RNIL) {
- jam();
- ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
- if (operationRecPtr.p->opState == WAIT_EXE_OP) {
- jam();
- //------------------------------------------------------------
- // Indicate that we are now a normal waiter in the queue. We
- // will remove the operation from the queue as part of starting
- // operation again.
- //------------------------------------------------------------
- operationRecPtr.p->opState = WAIT_IN_QUEUE;
- executeNextOperation(signal);
- }//if
- operationRecPtr.i = operationRecPtr.p->nextQueOp;
- }//while
- signal->theData[0] = fragrecptr.p->lcpLqhPtr;
- sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_CONTOPCONF, signal, 1, JBA);
-
- lcpConnectptr.p->noOfLcpConf++;
- if (lcpConnectptr.p->noOfLcpConf == 4) {
- jam();
- releaseLcpConnectRec(signal);
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- rootfragrecptr.p->rootState = ACTIVEROOT;
- }//if
- return; /* ALL QUEUED OPERATION ARE RESTARTED IF NEEDED. */
-}//Dbacc::execACC_CONTOPREQ()
-
-/* ******************--------------------------------------------------------------- */
-/* END_LCPREQ END OF LOCAL CHECK POINT */
-/* ENTER END_LCPREQ WITH SENDER: LQH, LEVEL B */
-/* CLQH_PTR, LQH PTR */
-/* CLQH_BLOCK_REF LQH BLOCK REF */
-/* ******************--------------------------------------------------------------- */
-/* ******************--------------------------------------------------------------- */
-/* END_LCPREQ PERFORM A LOCAL CHECK POINT */
-/* ******************------------------------------+ */
-/* SENDER: LQH, LEVEL B */
-void Dbacc::execEND_LCPREQ(Signal* signal)
-{
- jamEntry();
- clqhPtr = signal->theData[0];
- /* LQH PTR */
- clqhBlockRef = signal->theData[1];
- /* LQH BLOCK REF */
- tresult = 0;
- fsConnectptr.i = cactiveOpenUndoFsPtr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- fsConnectptr.p->fsState = WAIT_CLOSE_UNDO; /* CLOSE FILE AFTER WRITTING */
- /* ************************ */
- /* FSCLOSEREQ */
- /* ************************ */
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsConnectptr.i;
- signal->theData[3] = ZFALSE;
- sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, 4, JBA);
- /* FLAG = 0, DO NOT DELETE FILE */
- cactiveUndoFileVersion = RNIL;
- cactiveOpenUndoFsPtr = RNIL;
- /* ************************ */
- /* END_LCPCONF */
- /* ************************ */
- signal->theData[0] = clqhPtr;
- sendSignal(clqhBlockRef, GSN_END_LCPCONF, signal, 1, JBB);
- return;
-}//Dbacc::execEND_LCPREQ()
-
-/*-----------------------------------------------------------------*/
-/* WHEN WE COPY THE PAGE WE ALSO WRITE THE ELEMENT HEADER AS */
-/* UNLOCKED IF THEY ARE CURRENTLY LOCKED. */
-/*-----------------------------------------------------------------*/
-void Dbacc::lcpCopyPage(Signal* signal)
-{
- Uint32 tlcnNextContainer;
- Uint32 tlcnTmp;
- Uint32 tlcnConIndex;
- Uint32 tlcnIndex;
- Uint32 Tmp1;
- Uint32 Tmp2;
- Uint32 Tmp3;
- Uint32 Tmp4;
- Uint32 Ti;
- Uint32 Tchs;
- Uint32 Tlimit;
-
- Tchs = 0;
- lupPageptr.p = lcnCopyPageptr.p;
- lcnPageptr.p->word32[ZPOS_CHECKSUM] = Tchs;
- for (Ti = 0; Ti < 32 ; Ti++) {
- Tlimit = 16 + (Ti << 6);
- for (tlcnTmp = (Ti << 6); tlcnTmp < Tlimit; tlcnTmp ++) {
- Tmp1 = lcnPageptr.p->word32[tlcnTmp];
- Tmp2 = lcnPageptr.p->word32[tlcnTmp + 16];
- Tmp3 = lcnPageptr.p->word32[tlcnTmp + 32];
- Tmp4 = lcnPageptr.p->word32[tlcnTmp + 48];
-
- lcnCopyPageptr.p->word32[tlcnTmp] = Tmp1;
- lcnCopyPageptr.p->word32[tlcnTmp + 16] = Tmp2;
- lcnCopyPageptr.p->word32[tlcnTmp + 32] = Tmp3;
- lcnCopyPageptr.p->word32[tlcnTmp + 48] = Tmp4;
-
- Tchs = Tchs ^ Tmp1;
- Tchs = Tchs ^ Tmp2;
- Tchs = Tchs ^ Tmp3;
- Tchs = Tchs ^ Tmp4;
- }//for
- }//for
- tlcnChecksum = Tchs;
- if (((lcnCopyPageptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) == ZNORMAL_PAGE_TYPE) {
- jam();
- /*-----------------------------------------------------------------*/
- /* TAKE CARE OF ALL 64 BUFFERS ADDRESSED BY ALGORITHM IN */
- /* FIRST PAGE. IF THEY ARE EMPTY THEY STILL HAVE A CONTAINER */
- /* HEADER OF 2 WORDS. */
- /*-----------------------------------------------------------------*/
- tlcnConIndex = ZHEAD_SIZE;
- tlupForward = 1;
- for (tlcnIndex = 0; tlcnIndex <= ZNO_CONTAINERS - 1; tlcnIndex++) {
- tlupIndex = tlcnConIndex;
- tlupElemIndex = tlcnConIndex + ZCON_HEAD_SIZE;
- lcpUpdatePage(signal);
- tlcnConIndex = tlcnConIndex + ZBUF_SIZE;
- }//for
- }//if
- /*-----------------------------------------------------------------*/
- /* TAKE CARE OF ALL USED BUFFERS ON THE LEFT SIDE. */
- /*-----------------------------------------------------------------*/
- tlcnNextContainer = (lcnCopyPageptr.p->word32[ZPOS_EMPTY_LIST] >> 23) & 0x7f;
- while (tlcnNextContainer < ZEMPTYLIST) {
- tlcnConIndex = (tlcnNextContainer << ZSHIFT_PLUS) - (tlcnNextContainer << ZSHIFT_MINUS);
- tlcnConIndex = tlcnConIndex + ZHEAD_SIZE;
- tlupIndex = tlcnConIndex;
- tlupElemIndex = tlcnConIndex + ZCON_HEAD_SIZE;
- tlupForward = 1;
- lcpUpdatePage(signal);
- tlcnNextContainer = (lcnCopyPageptr.p->word32[tlcnConIndex] >> 11) & 0x7f;
- }//while
- if (tlcnNextContainer == ZEMPTYLIST) {
- jam();
- /*empty*/;
- } else {
- jam();
- sendSystemerror(signal);
- return;
- }//if
- /*-----------------------------------------------------------------*/
- /* TAKE CARE OF ALL USED BUFFERS ON THE RIGHT SIDE. */
- /*-----------------------------------------------------------------*/
- tlupForward = cminusOne;
- tlcnNextContainer = (lcnCopyPageptr.p->word32[ZPOS_EMPTY_LIST] >> 16) & 0x7f;
- while (tlcnNextContainer < ZEMPTYLIST) {
- tlcnConIndex = (tlcnNextContainer << ZSHIFT_PLUS) - (tlcnNextContainer << ZSHIFT_MINUS);
- tlcnConIndex = tlcnConIndex + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
- tlupIndex = tlcnConIndex;
- tlupElemIndex = tlcnConIndex - 1;
- lcpUpdatePage(signal);
- tlcnNextContainer = (lcnCopyPageptr.p->word32[tlcnConIndex] >> 11) & 0x7f;
- }//while
- if (tlcnNextContainer == ZEMPTYLIST) {
- jam();
- /*empty*/;
- } else {
- jam();
- sendSystemerror(signal);
- return;
- }//if
- lcnCopyPageptr.p->word32[ZPOS_CHECKSUM] = tlcnChecksum;
-}//Dbacc::lcpCopyPage()
-
-/* --------------------------------------------------------------------------------- */
-/* THIS SUBROUTINE GOES THROUGH ONE CONTAINER TO CHECK FOR LOCKED ELEMENTS AND */
-/* UPDATING THEM TO ENSURE ALL ELEMENTS ARE UNLOCKED ON DISK. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::lcpUpdatePage(Signal* signal)
-{
- OperationrecPtr lupOperationRecPtr;
- Uint32 tlupElemHead;
- Uint32 tlupElemLen;
- Uint32 tlupElemStep;
- Uint32 tlupConLen;
-
- tlupConLen = lupPageptr.p->word32[tlupIndex] >> 26;
- tlupElemLen = fragrecptr.p->elementLength;
- tlupElemStep = tlupForward * tlupElemLen;
- while (tlupConLen > ZCON_HEAD_SIZE) {
- jam();
- tlupElemHead = lupPageptr.p->word32[tlupElemIndex];
- if (ElementHeader::getLocked(tlupElemHead)) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* WHEN CHANGING THE ELEMENT HEADER WE ALSO HAVE TO UPDATE THE CHECKSUM. IN */
- /* DOING THIS WE USE THE FORMULA (A XOR B) XOR B = A WHICH MEANS THAT IF WE */
- /* XOR SOMETHING TWICE WITH THE SAME OPERAND THEN WE RETURN TO THE ORIGINAL */
- /* VALUE. THEN WE ALSO HAVE TO USE THE NEW ELEMENT HEADER IN THE CHECKSUM */
- /* CALCULATION. */
- /* --------------------------------------------------------------------------------- */
- tlcnChecksum = tlcnChecksum ^ tlupElemHead;
- lupOperationRecPtr.i = ElementHeader::getOpPtrI(tlupElemHead);
- ptrCheckGuard(lupOperationRecPtr, coprecsize, operationrec);
- const Uint32 hv = lupOperationRecPtr.p->hashvaluePart;
- tlupElemHead = ElementHeader::setUnlocked(hv , 0);
- arrGuard(tlupElemIndex, 2048);
- lupPageptr.p->word32[tlupElemIndex] = tlupElemHead;
- tlcnChecksum = tlcnChecksum ^ tlupElemHead;
- }//if
- tlupConLen = tlupConLen - tlupElemLen;
- tlupElemIndex = tlupElemIndex + tlupElemStep;
- }//while
- if (tlupConLen < ZCON_HEAD_SIZE) {
- jam();
- sendSystemerror(signal);
- }//if
-}//Dbacc::lcpUpdatePage()
-
-/*-----------------------------------------------------------------*/
-// At a system restart we check that the page do not contain any
-// locks that hinder the system restart procedure.
-/*-----------------------------------------------------------------*/
-void Dbacc::srCheckPage(Signal* signal)
-{
- Uint32 tlcnNextContainer;
- Uint32 tlcnConIndex;
- Uint32 tlcnIndex;
-
- lupPageptr.p = lcnCopyPageptr.p;
- if (((lcnCopyPageptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) == ZNORMAL_PAGE_TYPE) {
- jam();
- /*-----------------------------------------------------------------*/
- /* TAKE CARE OF ALL 64 BUFFERS ADDRESSED BY ALGORITHM IN */
- /* FIRST PAGE. IF THEY ARE EMPTY THEY STILL HAVE A CONTAINER */
- /* HEADER OF 2 WORDS. */
- /*-----------------------------------------------------------------*/
- tlcnConIndex = ZHEAD_SIZE;
- tlupForward = 1;
- for (tlcnIndex = 0; tlcnIndex <= ZNO_CONTAINERS - 1; tlcnIndex++) {
- tlupIndex = tlcnConIndex;
- tlupElemIndex = tlcnConIndex + ZCON_HEAD_SIZE;
- srCheckContainer(signal);
- if (tresult != 0) {
- jam();
- return;
- }//if
- tlcnConIndex = tlcnConIndex + ZBUF_SIZE;
- }//for
- }//if
- /*-----------------------------------------------------------------*/
- /* TAKE CARE OF ALL USED BUFFERS ON THE LEFT SIDE. */
- /*-----------------------------------------------------------------*/
- tlcnNextContainer = (lcnCopyPageptr.p->word32[ZPOS_EMPTY_LIST] >> 23) & 0x7f;
- while (tlcnNextContainer < ZEMPTYLIST) {
- tlcnConIndex = (tlcnNextContainer << ZSHIFT_PLUS) - (tlcnNextContainer << ZSHIFT_MINUS);
- tlcnConIndex = tlcnConIndex + ZHEAD_SIZE;
- tlupIndex = tlcnConIndex;
- tlupElemIndex = tlcnConIndex + ZCON_HEAD_SIZE;
- tlupForward = 1;
- srCheckContainer(signal);
- if (tresult != 0) {
- jam();
- return;
- }//if
- tlcnNextContainer = (lcnCopyPageptr.p->word32[tlcnConIndex] >> 11) & 0x7f;
- }//while
- if (tlcnNextContainer == ZEMPTYLIST) {
- jam();
- /*empty*/;
- } else {
- jam();
- tresult = 4;
- return;
- }//if
- /*-----------------------------------------------------------------*/
- /* TAKE CARE OF ALL USED BUFFERS ON THE RIGHT SIDE. */
- /*-----------------------------------------------------------------*/
- tlupForward = cminusOne;
- tlcnNextContainer = (lcnCopyPageptr.p->word32[ZPOS_EMPTY_LIST] >> 16) & 0x7f;
- while (tlcnNextContainer < ZEMPTYLIST) {
- tlcnConIndex = (tlcnNextContainer << ZSHIFT_PLUS) - (tlcnNextContainer << ZSHIFT_MINUS);
- tlcnConIndex = tlcnConIndex + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
- tlupIndex = tlcnConIndex;
- tlupElemIndex = tlcnConIndex - 1;
- srCheckContainer(signal);
- if (tresult != 0) {
- jam();
- return;
- }//if
- tlcnNextContainer = (lcnCopyPageptr.p->word32[tlcnConIndex] >> 11) & 0x7f;
- }//while
- if (tlcnNextContainer == ZEMPTYLIST) {
- jam();
- /*empty*/;
- } else {
- jam();
- tresult = 4;
- return;
- }//if
-}//Dbacc::srCheckPage()
-
-/* --------------------------------------------------------------------------------- */
-/* THIS SUBROUTINE GOES THROUGH ONE CONTAINER TO CHECK FOR LOCKED ELEMENTS AND */
-/* UPDATING THEM TO ENSURE ALL ELEMENTS ARE UNLOCKED ON DISK. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::srCheckContainer(Signal* signal)
-{
- Uint32 tlupElemLen;
- Uint32 tlupElemStep;
- Uint32 tlupConLen;
-
- tlupConLen = lupPageptr.p->word32[tlupIndex] >> 26;
- tlupElemLen = fragrecptr.p->elementLength;
- tlupElemStep = tlupForward * tlupElemLen;
- while (tlupConLen > ZCON_HEAD_SIZE) {
- jam();
- const Uint32 tlupElemHead = lupPageptr.p->word32[tlupElemIndex];
- if (ElementHeader::getLocked(tlupElemHead)){
- jam();
- //-------------------------------------------------------
- // This is absolutely undesirable. We have a lock remaining
- // after the system restart. We send a crash signal that will
- // enter the trace file.
- //-------------------------------------------------------
- tresult = 2;
- return;
- }//if
- tlupConLen = tlupConLen - tlupElemLen;
- tlupElemIndex = tlupElemIndex + tlupElemStep;
- }//while
- if (tlupConLen < ZCON_HEAD_SIZE) {
- jam();
- tresult = 3;
- }//if
- return;
-}//Dbacc::srCheckContainer()
-
-/* ------------------------------------------------------------------------- */
-/* CHECK_UNDO_PAGES */
-/* DESCRIPTION: CHECKS WHEN A PAGE OR A GROUP OF UNDO PAGES IS FILLED.WHEN */
-/* A PAGE IS FILLED, CUNDOPOSITION WILL BE UPDATE, THE NEW */
-/* POSITION IS THE BEGNING OF THE NEXT UNDO PAGE. */
-/* IN CASE THAT A GROUP IS FILLED THE PAGES ARE SENT TO DISK, */
-/* AND A NEW GROUP IS CHOSEN. */
-/* ------------------------------------------------------------------------- */
-void Dbacc::checkUndoPages(Signal* signal)
-{
-
- fragrecptr.p->prevUndoposition = cundoposition;
- cprevUndoaddress = cundoposition;
-
- // Calculate active undo page id
- Uint32 tundoPageId = cundoposition >> ZUNDOPAGEINDEXBITS;
-
- /**
- * WE WILL WRITE UNTIL WE HAVE ABOUT 8 KBYTE REMAINING ON THE 32 KBYTE
- * PAGE. THIS IS TO ENSURE THAT WE DO NOT HAVE ANY UNDO LOG RECORDS THAT PASS
- * A PAGE BOUNDARIE. THIS SIMPLIFIES CODING TRADING SOME INEFFICIENCY.
- */
- static const Uint32 ZMAXUNDOPAGEINDEX = 7100;
- if (tundoindex < ZMAXUNDOPAGEINDEX) {
- jam();
- cundoposition = (tundoPageId << ZUNDOPAGEINDEXBITS) + tundoindex;
- return;
- }//if
-
- /**
- * WE CHECK IF MORE THAN 1 MBYTE OF WRITES ARE OUTSTANDING TO THE UNDO FILE.
- * IF SO WE HAVE TO CRASH SINCE WE HAVE NO MORE SPACE TO WRITE UNDO LOG
- * RECORDS IN
- */
- Uint16 nextUndoPageId = tundoPageId + 1;
- updateUndoPositionPage(signal, nextUndoPageId << ZUNDOPAGEINDEXBITS);
-
- if ((tundoPageId & (ZWRITE_UNDOPAGESIZE - 1)) == (ZWRITE_UNDOPAGESIZE - 1)) {
- jam();
- /* ---------- SEND A GROUP OF UNDO PAGES TO DISK --------- */
- fsConnectptr.i = cactiveOpenUndoFsPtr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- Uint32 tcupTmp1 = (tundoPageId - ZWRITE_UNDOPAGESIZE) + 1;
- tcupTmp1 = tcupTmp1 & (cundopagesize - 1); /* 1 MBYTE PAGE WINDOW */
- seizeFsOpRec(signal);
- initFsOpRec(signal);
- fsOpptr.p->fsOpstate = WAIT_WRITE_UNDO_EXIT;
- fsOpptr.p->fsOpMemPage = tundoPageId;
- fragrecptr.p->nrWaitWriteUndoExit++;
- if (clblPageCounter >= 8) {
- jam();
- clblPageCounter = clblPageCounter - 8;
- } else {
- jam();
- clblPageOver = clblPageOver + (8 - clblPageCounter);
- clblPageCounter = 0;
- }//if
- /* ************************ */
- /* FSWRITEREQ */
- /* ************************ */
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsOpptr.i;
- signal->theData[3] = 0x1;
- /* FLAG = START MEM PAGES, START FILE PAGES */
- signal->theData[4] = ZUNDOPAGE_BASE_ADD;
- signal->theData[5] = ZWRITE_UNDOPAGESIZE;
- signal->theData[6] = tcupTmp1;
- signal->theData[7] = cactiveUndoFilePage;
- sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, 8, JBA);
- cactiveUndoFilePage = cactiveUndoFilePage + ZWRITE_UNDOPAGESIZE;
- }//if
-}//Dbacc::checkUndoPages()
-
-/* --------------------------------------------------------------------------------- */
-/* UNDO_WRITING_PROCESS */
-/* INPUT: FRAGRECPTR, CUNDO_ELEM_INDEX, DATAPAGEPTR, CUNDOINFOLENGTH */
-/* DESCRIPTION: WHEN THE PROCESS OF CREATION LOCAL CHECK POINT HAS */
-/* STARTED. IF THE ACTIVE PAGE IS NOT ALREADY SENT TO DISK, THE */
-/* OLD VALUE OF THE ITEM WHICH IS GOING TO BE CHECKED IS STORED ON */
-/* THE ACTIVE UNDO PAGE. INFORMATION ABOUT UNDO PROCESS IN THE */
-/* BLOCK AND IN THE FRAGMENT WILL BE UPDATED. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::undoWritingProcess(Signal* signal)
-{
- const Uint32 tactivePageDir = datapageptr.p->word32[ZPOS_PAGE_ID];
- const Uint32 tpageType = (datapageptr.p->word32[ZPOS_EMPTY_LIST] >> ZPOS_PAGE_TYPE_BIT) & 3;
- if (fragrecptr.p->fragState == LCP_SEND_PAGES) {
- if (tpageType == ZNORMAL_PAGE_TYPE) {
- /* --------------------------------------------------------------------------- */
- /* HANDLING OF LOG OF NORMAL PAGES DURING WRITE OF NORMAL PAGES. */
- /* --------------------------------------------------------------------------- */
- if (tactivePageDir < fragrecptr.p->lcpDirIndex) {
- jam();
- /* ------------------------------------------------------------------- */
- /* THIS PAGE HAS ALREADY BEEN WRITTEN IN THE LOCAL CHECKPOINT. */
- /* ------------------------------------------------------------------- */
- /*empty*/;
- } else {
- if (tactivePageDir >= fragrecptr.p->lcpMaxDirIndex) {
- jam();
- /* --------------------------------------------------------------------------- */
- /* OBVIOUSLY THE FRAGMENT HAS EXPANDED SINCE THE START OF THE LOCAL CHECKPOINT.*/
- /* WE NEED NOT LOG ANY UPDATES OF PAGES THAT DID NOT EXIST AT START OF LCP. */
- /* --------------------------------------------------------------------------- */
- /*empty*/;
- } else {
- jam();
- /* --------------------------------------------------------------------------- */
- /* IN ALL OTHER CASES WE HAVE TO WRITE TO THE UNDO LOG. */
- /* --------------------------------------------------------------------------- */
- undopageptr.i = (cundoposition >> ZUNDOPAGEINDEXBITS) & (cundopagesize - 1);
- ptrAss(undopageptr, undopage);
- theadundoindex = cundoposition & ZUNDOPAGEINDEX_MASK;
- tundoindex = theadundoindex + ZUNDOHEADSIZE;
- writeUndoHeader(signal, tactivePageDir, UndoHeader::ZPAGE_INFO);
- tundoElemIndex = cundoElemIndex;
- writeUndoDataInfo(signal);
- checkUndoPages(signal);
- }//if
- }//if
- } else if (tpageType == ZOVERFLOW_PAGE_TYPE) {
- /* --------------------------------------------------------------------------------- */
- /* OVERFLOW PAGE HANDLING DURING WRITE OF NORMAL PAGES. */
- /* --------------------------------------------------------------------------------- */
- if (tactivePageDir >= fragrecptr.p->lcpMaxOverDirIndex) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* OBVIOUSLY THE FRAGMENT HAS EXPANDED THE NUMBER OF OVERFLOW PAGES SINCE THE */
- /* START OF THE LOCAL CHECKPOINT. WE NEED NOT LOG ANY UPDATES OF PAGES THAT DID*/
- /* NOT EXIST AT START OF LCP. */
- /* --------------------------------------------------------------------------------- */
- /*empty*/;
- } else {
- jam();
- undopageptr.i = (cundoposition >> ZUNDOPAGEINDEXBITS) & (cundopagesize - 1);
- ptrAss(undopageptr, undopage);
- theadundoindex = cundoposition & ZUNDOPAGEINDEX_MASK;
- tundoindex = theadundoindex + ZUNDOHEADSIZE;
- writeUndoHeader(signal, tactivePageDir, UndoHeader::ZOVER_PAGE_INFO);
- tundoElemIndex = cundoElemIndex;
- writeUndoDataInfo(signal);
- checkUndoPages(signal);
- }//if
- } else {
- jam();
- /* --------------------------------------------------------------------------- */
- /* ONLY PAGE INFO AND OVERFLOW PAGE INFO CAN BE LOGGED BY THIS ROUTINE. A */
- /* SERIOUS ERROR. */
- /* --------------------------------------------------------------------------- */
- sendSystemerror(signal);
- }
- } else {
- if (fragrecptr.p->fragState == LCP_SEND_OVER_PAGES) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* DURING WRITE OF OVERFLOW PAGES WE NEED NOT WORRY ANYMORE ABOUT NORMAL PAGES.*/
- /* --------------------------------------------------------------------------------- */
- if (tpageType == ZOVERFLOW_PAGE_TYPE) {
- if (tactivePageDir < fragrecptr.p->lcpDirIndex) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* THIS PAGE HAS ALREADY BEEN WRITTEN IN THE LOCAL CHECKPOINT. */
- /* --------------------------------------------------------------------------------- */
- /*empty*/;
- } else {
- if (tactivePageDir >= fragrecptr.p->lcpMaxOverDirIndex) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* OBVIOUSLY THE FRAGMENT HAS EXPANDED THE NUMBER OF OVERFLOW PAGES SINCE THE */
- /* START OF THE LOCAL CHECKPOINT. WE NEED NOT LOG ANY UPDATES OF PAGES THAT DID*/
- /* NOT EXIST AT START OF LCP. */
- /* --------------------------------------------------------------------------------- */
- /*empty*/;
- } else {
- jam();
- undopageptr.i = (cundoposition >> ZUNDOPAGEINDEXBITS) & (cundopagesize - 1);
- ptrAss(undopageptr, undopage);
- theadundoindex = cundoposition & ZUNDOPAGEINDEX_MASK;
- tundoindex = theadundoindex + ZUNDOHEADSIZE;
- writeUndoHeader(signal, tactivePageDir, UndoHeader::ZOVER_PAGE_INFO);
- tundoElemIndex = cundoElemIndex;
- writeUndoDataInfo(signal);
- checkUndoPages(signal);
- }//if
- }//if
- }
- }//if
- }//if
-}//Dbacc::undoWritingProcess()
-
-/* --------------------------------------------------------------------------------- */
-/* OTHER STATES MEANS THAT WE HAVE ALREADY WRITTEN ALL PAGES BUT NOT YET RESET */
-/* THE CREATE_LCP FLAG. */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* WRITE_UNDO_DATA_INFO */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::writeUndoDataInfo(Signal* signal)
-{
- Uint32 twudiIndex;
- Uint32 guard22;
-
- guard22 = cundoinfolength;
- arrGuard((tundoindex + guard22 - 1), 8192);
- arrGuard((tundoElemIndex + guard22 - 1), 2048);
- for (twudiIndex = 1; twudiIndex <= guard22; twudiIndex++) {
- undopageptr.p->undoword[tundoindex] = datapageptr.p->word32[tundoElemIndex];
- tundoindex++;
- tundoElemIndex++;
- }//for
-}//Dbacc::writeUndoDataInfo()
-
-/* --------------------------------------------------------------------------------- */
-/* WRITE_UNDO_HEADER */
-/* THE HEAD OF UNDO ELEMENT IS 24 BYTES AND CONTAINS THE FOLLOWING INFORMATION: */
-/* TABLE IDENTITY 32 BITS */
-/* ROOT FRAGMENT IDENTITY 32 BITS */
-/* LOCAL FRAGMENT IDENTITY 32 BITS */
-/* LENGTH OF ELEMENT INF0 (BIT 31 - 18) 14 BITS */
-/* INFO TYPE (BIT 17 - 14) 4 BITS */
-/* PAGE INDEX OF THE FIRST FIELD IN THE FRAGMENT (BIT 13 - 0) 14 BITS */
-/* DIRECTORY INDEX OF THE PAGE IN THE FRAGMENT 32 BITS */
-/* ADDRESS OF THE PREVIOUS ELEMENT OF THE FRAGMENT 64 BITS */
-/* ADDRESS OF THE PREVIOUS ELEMENT IN THE UNDO PAGES 64 BITS */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::writeUndoHeader(Signal* signal,
- Uint32 logicalPageId,
- UndoHeader::UndoHeaderType pageType)
-{
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- arrGuard(theadundoindex + 6, 8192);
-
- // Set the structpointer to point at the undo page at the right address.
- UndoHeader * const & undoHeaderPtr =
- (UndoHeader *) &undopageptr.p->undoword[theadundoindex];
-
- undoHeaderPtr->tableId = rootfragrecptr.p->mytabptr;
- undoHeaderPtr->rootFragId = rootfragrecptr.p->fragmentid[0] >> 1;
- undoHeaderPtr->localFragId = fragrecptr.p->myfid;
- ndbrequire((undoHeaderPtr->localFragId >> 1) == undoHeaderPtr->rootFragId);
- Uint32 Ttmp = cundoinfolength;
- Ttmp = (Ttmp << 4) + pageType;
- Ttmp = Ttmp << 14;
- undoHeaderPtr->variousInfo = Ttmp + cundoElemIndex;
- undoHeaderPtr->logicalPageId = logicalPageId;
- undoHeaderPtr->prevUndoAddressForThisFrag = fragrecptr.p->prevUndoposition;
- undoHeaderPtr->prevUndoAddress = cprevUndoaddress;
-}//Dbacc::writeUndoHeader()
-
-/* --------------------------------------------------------------------------------- */
-/* WRITE_UNDO_OP_INFO */
-/* FOR A LOCKED ELEMENT, OPERATION TYPE, UNDO OF ELEMENT HEADER AND THE LENGTH OF*/
-/* THE TUPLE KEY HAVE TO BE SAVED IN UNDO PAGES. IN THIS CASE AN UNDO ELEMENT */
-/* INCLUDES THE FLLOWING ITEMS. */
-/* OPERATION TYPE 32 BITS */
-/* HASH VALUE 32 BITS */
-/* LENGTH OF THE TUPLE = N 32 BITS */
-/* TUPLE KEYS N * 32 BITS */
-/* */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::writeUndoOpInfo(Signal* signal)
-{
- Page8Ptr locPageptr;
-
- arrGuard((tundoindex + 3), 8192);
- undopageptr.p->undoword[tundoindex] = operationRecPtr.p->operation;
- undopageptr.p->undoword[tundoindex + 1] = operationRecPtr.p->hashValue;
- undopageptr.p->undoword[tundoindex + 2] = operationRecPtr.p->tupkeylen;
- tundoindex = tundoindex + 3;
- // log localkey1
- locPageptr.i = operationRecPtr.p->elementPage;
- ptrCheckGuard(locPageptr, cpagesize, page8);
- Uint32 Tforward = operationRecPtr.p->elementIsforward;
- Uint32 TelemPtr = operationRecPtr.p->elementPointer;
- TelemPtr += Tforward; // ZELEM_HEAD_SIZE
- arrGuard(tundoindex+1, 8192);
- undopageptr.p->undoword[tundoindex] = locPageptr.p->word32[TelemPtr];
- tundoindex++;
- cundoinfolength = ZOP_HEAD_INFO_LN + 1;
-}//Dbacc::writeUndoOpInfo()
-
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* */
-/* END OF LOCAL CHECKPOINT MODULE */
-/* */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* */
-/* SYSTEM RESTART MODULE */
-/* */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* ******************--------------------------------------------------------------- */
-/* SR_FRAGIDREQ REQUEST FOR RESTART OF A FRAGMENT */
-/* SENDER: LQH, LEVEL B */
-/* ENTER SR_FRAGIDREQ WITH */
-/* TUSERPTR, LQH CONNECTION PTR */
-/* TUSERBLOCKREF, LQH BLOCK REFERENCE */
-/* TCHECKPOINTID, THE CHECKPOINT NUMBER TO USE */
-/* (E.G. 1,2 OR 3) */
-/* TABPTR, TABLE ID = TABLE RECORD POINTER */
-/* TFID, ROOT FRAGMENT ID */
-/* ******************--------------------------------------------------------------- */
-/* ******************--------------------------------------------------------------- */
-/* SR_FRAGIDREQ REQUEST FOR LIST OF STOPED OPERATION */
-/* ******************------------------------------+ */
-/* SENDER: LQH, LEVEL B */
-void Dbacc::execSR_FRAGIDREQ(Signal* signal)
-{
- jamEntry();
- tuserptr = signal->theData[0]; /* LQH CONNECTION PTR */
- tuserblockref = signal->theData[1]; /* LQH BLOCK REFERENCE */
- tcheckpointid = signal->theData[2]; /* THE CHECKPOINT NUMBER TO USE */
- /* (E.G. 1,2 OR 3) */
- tabptr.i = signal->theData[3];
- ptrCheckGuard(tabptr, ctablesize, tabrec);
- /* TABLE ID = TABLE RECORD POINTER */
- tfid = signal->theData[4]; /* ROOT FRAGMENT ID */
- tresult = 0; /* 0= FALSE,1= TRUE,> ZLIMIT_OF_ERROR =ERRORCODE */
- seizeLcpConnectRec(signal);
- initLcpConnRec(signal);
-
- ndbrequire(getrootfragmentrec(signal, rootfragrecptr, tfid));
- rootfragrecptr.p->lcpPtr = lcpConnectptr.i;
- lcpConnectptr.p->rootrecptr = rootfragrecptr.i;
- lcpConnectptr.p->localCheckPid = tcheckpointid;
- for (Uint32 i = 0; i < 2; i++) {
- Page8Ptr zeroPagePtr;
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[i];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- seizeLcpPage(zeroPagePtr);
- fragrecptr.p->zeroPagePtr = zeroPagePtr.i;
- }//for
-
- /* ---------------------------OPEN THE DATA FILE WHICH BELONGS TO TFID AND TCHECK POINT ---- */
- fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- tfid = rootfragrecptr.p->fragmentid[0];
- tmp = 0;
- srOpenDataFileLoopLab(signal);
-
- return;
-}//Dbacc::execSR_FRAGIDREQ()
-
-void Dbacc::srOpenDataFileLoopLab(Signal* signal)
-{
- /* D6 AT FSOPENREQ. FILE TYPE = .DATA */
- tmp1 = 0x010003ff; /* VERSION OF FILENAME = 1 */
- tmp2 = 0x0; /* D7 DON'T CREATE, READ ONLY */
- ndbrequire(cfsFirstfreeconnect != RNIL);
- seizeFsConnectRec(signal);
-
- fragrecptr.p->fsConnPtr = fsConnectptr.i;
- fsConnectptr.p->fragrecPtr = fragrecptr.i;
- fsConnectptr.p->fsState = WAIT_OPEN_DATA_FILE_FOR_READ;
- fsConnectptr.p->activeFragId = tmp; /* LOCAL FRAG INDEX */
- /* ************************ */
- /* FSOPENREQ */
- /* ************************ */
- signal->theData[0] = cownBlockref;
- signal->theData[1] = fsConnectptr.i;
- signal->theData[2] = rootfragrecptr.p->mytabptr; /* TABLE IDENTITY */
- signal->theData[3] = tfid; /* FRAGMENT IDENTITY */
- signal->theData[4] = lcpConnectptr.p->localCheckPid; /* CHECKPOINT ID */
- signal->theData[5] = tmp1;
- signal->theData[6] = tmp2;
- sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA);
- return;
-}//Dbacc::srOpenDataFileLoopLab()
-
-void Dbacc::srFsOpenConfLab(Signal* signal)
-{
- fsConnectptr.p->fsState = WAIT_READ_PAGE_ZERO;
- /* ------------------------ READ ZERO PAGE ---------- */
- fragrecptr.i = fsConnectptr.p->fragrecPtr;
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsConnectptr.i;
- signal->theData[3] = 0x0;
- /* FLAG = LIST MEM PAGES, LIST FILE PAGES */
- signal->theData[4] = ZPAGE8_BASE_ADD;
- signal->theData[5] = 1; /* NO OF PAGES */
- signal->theData[6] = fragrecptr.p->zeroPagePtr; /* ZERO PAGE */
- signal->theData[7] = 0; /* PAGE ZERO OF THE DATA FILE */
- sendSignal(NDBFS_REF, GSN_FSREADREQ, signal, 8, JBA);
- return;
-}//Dbacc::srFsOpenConfLab()
-
-void Dbacc::srReadPageZeroLab(Signal* signal)
-{
- Page8Ptr srzPageptr;
-
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- fragrecptr.p->activeDataFilePage = 1;
- srzPageptr.i = fragrecptr.p->zeroPagePtr;
- ptrCheckGuard(srzPageptr, cpagesize, page8);
- /* --------------------------------------------------------------------------------- */
- // Check that the checksum of the zero page is ok.
- /* --------------------------------------------------------------------------------- */
- ccoPageptr.p = srzPageptr.p;
- checksumControl(signal, (Uint32)0);
- if (tresult > 0) {
- jam();
- return; // We will crash through a DEBUG_SIG
- }//if
-
- ndbrequire(srzPageptr.p->word32[ZPAGEZERO_FRAGID0] == rootfragrecptr.p->fragmentid[0]);
- lcpConnectptr.i = rootfragrecptr.p->lcpPtr;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- if (fsConnectptr.p->activeFragId == 0) {
- jam();
- rootfragrecptr.p->fragmentid[1] = srzPageptr.p->word32[ZPAGEZERO_FRAGID1];
- /* ---------------------------OPEN THE DATA FILE FOR NEXT LOCAL FRAGMENT ----------- ---- */
- tfid = rootfragrecptr.p->fragmentid[1];
- tmp = 1; /* LOCAL FRAG INDEX */
- fragrecptr.i = rootfragrecptr.p->fragmentptr[1];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- srOpenDataFileLoopLab(signal);
- return;
- } else {
- jam();
- lcpConnectptr.p->lcpstate = LCP_ACTIVE;
- signal->theData[0] = lcpConnectptr.p->lcpUserptr;
- signal->theData[1] = lcpConnectptr.i;
- signal->theData[2] = 2; /* NO OF LOCAL FRAGMENTS */
- signal->theData[3] = srzPageptr.p->word32[ZPAGEZERO_FRAGID0];
- /* ROOTFRAGRECPTR:FRAGMENTID(0) */
- signal->theData[4] = srzPageptr.p->word32[ZPAGEZERO_FRAGID1];
- /* ROOTFRAGRECPTR:FRAGMENTID(1) */
- signal->theData[5] = RNIL;
- signal->theData[6] = RNIL;
- signal->theData[7] = rootfragrecptr.p->fragmentptr[0];
- signal->theData[8] = rootfragrecptr.p->fragmentptr[1];
- signal->theData[9] = srzPageptr.p->word32[ZPAGEZERO_HASH_CHECK];
- sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_SR_FRAGIDCONF, signal, 10, JBB);
- }//if
- return;
-}//Dbacc::srReadPageZeroLab()
-
void Dbacc::initFragAdd(Signal* signal,
- Uint32 rootFragIndex,
- Uint32 rootIndex,
FragmentrecPtr regFragPtr)
{
const AccFragReq * const req = (AccFragReq*)&signal->theData[0];
@@ -7990,8 +5445,7 @@
}//if
regFragPtr.p->fragState = ACTIVEFRAG;
// NOTE: next line must match calculation in Dblqh::execLQHFRAGREQ
- regFragPtr.p->myfid = (req->fragId << 1) | rootFragIndex;
- regFragPtr.p->myroot = rootIndex;
+ regFragPtr.p->myfid = req->fragId;
regFragPtr.p->myTableId = req->tableId;
ndbrequire(req->kValue == 6);
regFragPtr.p->k = req->kValue; /* TK_SIZE = 6 IN THIS VERSION */
@@ -8016,7 +5470,6 @@
regFragPtr.p->nodetype = (req->reqInfo >> 4) & 0x3;
regFragPtr.p->lastOverIndex = 0;
regFragPtr.p->dirsize = 1;
- regFragPtr.p->loadingFlag = ZFALSE;
regFragPtr.p->keyLength = req->keyLength;
ndbrequire(req->keyLength != 0);
regFragPtr.p->elementLength = ZELEM_HEAD_SIZE + regFragPtr.p->localkeylen;
@@ -8024,66 +5477,34 @@
Uint32 Tmp2 = regFragPtr.p->maxloadfactor - regFragPtr.p->minloadfactor;
Tmp2 = Tmp1 * Tmp2;
regFragPtr.p->slackCheck = Tmp2;
+ regFragPtr.p->mytabptr = req->tableId;
+ regFragPtr.p->roothashcheck = req->kValue + req->lhFragBits;
+ regFragPtr.p->noOfElements = 0;
+ for (Uint32 i = 0; i < MAX_PARALLEL_SCANS_PER_FRAG; i++) {
+ regFragPtr.p->scan[i] = RNIL;
+ }//for
+
}//Dbacc::initFragAdd()
void Dbacc::initFragGeneral(FragmentrecPtr regFragPtr)
{
regFragPtr.p->directory = RNIL;
regFragPtr.p->overflowdir = RNIL;
- regFragPtr.p->fsConnPtr = RNIL;
regFragPtr.p->firstOverflowRec = RNIL;
regFragPtr.p->lastOverflowRec = RNIL;
- regFragPtr.p->firstWaitInQueOp = RNIL;
- regFragPtr.p->lastWaitInQueOp = RNIL;
- regFragPtr.p->sentWaitInQueOp = RNIL;
regFragPtr.p->lockOwnersList = RNIL;
regFragPtr.p->firstFreeDirindexRec = RNIL;
regFragPtr.p->zeroPagePtr = RNIL;
regFragPtr.p->activeDataPage = 0;
- regFragPtr.p->createLcp = ZFALSE;
- regFragPtr.p->stopQueOp = ZFALSE;
regFragPtr.p->hasCharAttr = ZFALSE;
regFragPtr.p->nextAllocPage = 0;
- regFragPtr.p->nrWaitWriteUndoExit = 0;
- regFragPtr.p->lastUndoIsStored = ZFALSE;
- regFragPtr.p->loadingFlag = ZFALSE;
regFragPtr.p->fragState = FREEFRAG;
for (Uint32 i = 0; i < ZWRITEPAGESIZE; i++) {
regFragPtr.p->datapages[i] = RNIL;
}//for
- for (Uint32 j = 0; j < 4; j++) {
- regFragPtr.p->longKeyPageArray[j] = RNIL;
- }//for
}//Dbacc::initFragGeneral()
-void Dbacc::initFragSr(FragmentrecPtr regFragPtr, Page8Ptr regPagePtr)
-{
- regFragPtr.p->prevUndoposition = regPagePtr.p->word32[ZPAGEZERO_PREV_UNDOP];
- regFragPtr.p->noOfStoredOverPages = regPagePtr.p->word32[ZPAGEZERO_NO_OVER_PAGE];
- regFragPtr.p->noStoredPages = regPagePtr.p->word32[ZPAGEZERO_NO_PAGES];
- regFragPtr.p->dirsize = regPagePtr.p->word32[ZPAGEZERO_DIRSIZE];
- regFragPtr.p->expandCounter = regPagePtr.p->word32[ZPAGEZERO_EXPCOUNTER];
- regFragPtr.p->slack = regPagePtr.p->word32[ZPAGEZERO_SLACK];
- regFragPtr.p->hashcheckbit = regPagePtr.p->word32[ZPAGEZERO_HASHCHECKBIT];
- regFragPtr.p->k = regPagePtr.p->word32[ZPAGEZERO_K];
- regFragPtr.p->lhfragbits = regPagePtr.p->word32[ZPAGEZERO_LHFRAGBITS];
- regFragPtr.p->lhdirbits = regPagePtr.p->word32[ZPAGEZERO_LHDIRBITS];
- regFragPtr.p->localkeylen = regPagePtr.p->word32[ZPAGEZERO_LOCALKEYLEN];
- regFragPtr.p->maxp = regPagePtr.p->word32[ZPAGEZERO_MAXP];
- regFragPtr.p->maxloadfactor = regPagePtr.p->word32[ZPAGEZERO_MAXLOADFACTOR];
- regFragPtr.p->minloadfactor = regPagePtr.p->word32[ZPAGEZERO_MINLOADFACTOR];
- regFragPtr.p->myfid = regPagePtr.p->word32[ZPAGEZERO_MYFID];
- regFragPtr.p->lastOverIndex = regPagePtr.p->word32[ZPAGEZERO_LAST_OVER_INDEX];
- regFragPtr.p->nodetype = regPagePtr.p->word32[ZPAGEZERO_NODETYPE];
- regFragPtr.p->p = regPagePtr.p->word32[ZPAGEZERO_P];
- regFragPtr.p->elementLength = regPagePtr.p->word32[ZPAGEZERO_ELEMENT_LENGTH];
- regFragPtr.p->keyLength = regPagePtr.p->word32[ZPAGEZERO_KEY_LENGTH];
- regFragPtr.p->slackCheck = regPagePtr.p->word32[ZPAGEZERO_SLACK_CHECK];
-
- regFragPtr.p->loadingFlag = ZTRUE;
-
-}//Dbacc::initFragSr()
void Dbacc::initFragPageZero(FragmentrecPtr regFragPtr, Page8Ptr regPagePtr)
{
@@ -8111,104 +5532,15 @@
regPagePtr.p->word32[ZPAGEZERO_SLACK_CHECK] = regFragPtr.p->slackCheck;
}//Dbacc::initFragPageZero()
-void Dbacc::initRootFragPageZero(RootfragmentrecPtr rootPtr, Page8Ptr regPagePtr)
+void Dbacc::initRootFragPageZero(FragmentrecPtr rootPtr, Page8Ptr regPagePtr)
{
regPagePtr.p->word32[ZPAGEZERO_TABID] = rootPtr.p->mytabptr;
- regPagePtr.p->word32[ZPAGEZERO_FRAGID0] = rootPtr.p->fragmentid[0];
- regPagePtr.p->word32[ZPAGEZERO_FRAGID1] = rootPtr.p->fragmentid[1];
+ regPagePtr.p->word32[ZPAGEZERO_FRAGID0] = rootPtr.p->fragmentid;
+ regPagePtr.p->word32[ZPAGEZERO_FRAGID1] = RNIL;
regPagePtr.p->word32[ZPAGEZERO_HASH_CHECK] = rootPtr.p->roothashcheck;
regPagePtr.p->word32[ZPAGEZERO_NO_OF_ELEMENTS] = rootPtr.p->noOfElements;
}//Dbacc::initRootFragPageZero()
-void Dbacc::initRootFragSr(RootfragmentrecPtr rootPtr, Page8Ptr regPagePtr)
-{
- rootPtr.p->roothashcheck = regPagePtr.p->word32[ZPAGEZERO_HASH_CHECK];
- rootPtr.p->noOfElements = regPagePtr.p->word32[ZPAGEZERO_NO_OF_ELEMENTS];
-}//Dbacc::initRootFragSr()
-
-/* ******************--------------------------------------------------------------- */
-/* ACC_SRREQ SYSTEM RESTART OF A LOCAL CHECK POINT */
-/* SENDER: LQH, LEVEL B */
-/* ENTER ACC_SRREQ WITH */
-/* LCP_CONNECTPTR, OPERATION RECORD PTR */
-/* TMP2, LQH'S LOCAL FRAG CHECK VALUE */
-/* TFID, LOCAL FRAG ID */
-/* TMP1, LOCAL CHECKPOINT ID */
-/* ******************--------------------------------------------------------------- */
-/* ******************--------------------------------------------------------------- */
-/* ACC_SRREQ PERFORM A LOCAL CHECK POINT */
-/* ******************------------------------------+ */
-/* SENDER: LQH, LEVEL B */
-void Dbacc::execACC_SRREQ(Signal* signal)
-{
- Page8Ptr asrPageidptr;
- jamEntry();
- lcpConnectptr.i = signal->theData[0];
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- Uint32 lqhPtr = signal->theData[1];
- Uint32 fragId = signal->theData[2];
- Uint32 lcpId = signal->theData[3];
- tresult = 0;
- ndbrequire(lcpConnectptr.p->lcpstate == LCP_ACTIVE);
- rootfragrecptr.i = lcpConnectptr.p->rootrecptr;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- if (rootfragrecptr.p->fragmentid[0] == fragId) {
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
- } else {
- ndbrequire(rootfragrecptr.p->fragmentid[1] == fragId);
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[1];
- }//if
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- fragrecptr.p->lcpLqhPtr = lqhPtr;
- fragrecptr.p->localCheckpId = lcpId;
- asrPageidptr.i = fragrecptr.p->zeroPagePtr;
- ptrCheckGuard(asrPageidptr, cpagesize, page8);
- ndbrequire(asrPageidptr.p->word32[ZPAGEZERO_TABID] == rootfragrecptr.p->mytabptr);
- ndbrequire(asrPageidptr.p->word32[ZPAGEZERO_FRAGID0] == rootfragrecptr.p->fragmentid[0]);
- ndbrequire(asrPageidptr.p->word32[ZPAGEZERO_FRAGID1] == rootfragrecptr.p->fragmentid[1]);
- initRootFragSr(rootfragrecptr, asrPageidptr);
- initFragSr(fragrecptr, asrPageidptr);
- for (Uint32 i = 0; i < ZMAX_UNDO_VERSION; i++) {
- jam();
- if (csrVersList[i] != RNIL) {
- jam();
- srVersionPtr.i = csrVersList[i];
- ptrCheckGuard(srVersionPtr, csrVersionRecSize, srVersionRec);
- if (fragrecptr.p->localCheckpId == srVersionPtr.p->checkPointId) {
- jam();
- ndbrequire(srVersionPtr.p->checkPointId == asrPageidptr.p->word32[ZPAGEZERO_NEXT_UNDO_FILE]);
- /*--------------------------------------------------------------------------------*/
- /* SINCE -1 IS THE END OF LOG CODE WE MUST TREAT THIS CODE WITH CARE. WHEN */
- /* COMPARING IT IS LARGER THAN EVERYTHING ELSE BUT SHOULD BE TREATED AS THE */
- /* SMALLEST POSSIBLE VALUE, MEANING EMPTY. */
- /*--------------------------------------------------------------------------------*/
- if (fragrecptr.p->prevUndoposition != cminusOne) {
- if (srVersionPtr.p->prevAddress < fragrecptr.p->prevUndoposition) {
- jam();
- srVersionPtr.p->prevAddress = fragrecptr.p->prevUndoposition;
- } else if (srVersionPtr.p->prevAddress == cminusOne) {
- jam();
- srVersionPtr.p->prevAddress = fragrecptr.p->prevUndoposition;
- }//if
- }//if
- srAllocPage0011Lab(signal);
- return;
- }//if
- } else {
- jam();
- seizeSrVerRec(signal);
- srVersionPtr.p->checkPointId = fragrecptr.p->localCheckpId;
- srVersionPtr.p->prevAddress = fragrecptr.p->prevUndoposition;
- csrVersList[i] = srVersionPtr.i;
- srAllocPage0011Lab(signal);
- return;
- }//if
- }//for
- ndbrequire(false);
-}//Dbacc::execACC_SRREQ()
-
void
Dbacc::releaseLogicalPage(Fragmentrec * fragP, Uint32 logicalPageId){
Ptr<struct DirRange> dirRangePtr;
@@ -8232,1020 +5564,6 @@
dirArrPtr.p->pagep[lp2] = RNIL;
}
-void Dbacc::srAllocPage0011Lab(Signal* signal)
-{
- releaseLogicalPage(fragrecptr.p, 0);
-
-#if JONAS
- ndbrequire(cfirstfreeDirrange != RNIL);
- seizeDirrange(signal);
- fragrecptr.p->directory = newDirRangePtr.i;
- ndbrequire(cfirstfreeDirrange != RNIL);
- seizeDirrange(signal);
- fragrecptr.p->overflowdir = newDirRangePtr.i;
- seizeDirectory(signal);
- ndbrequire(tresult < ZLIMIT_OF_ERROR);
- newDirRangePtr.p->dirArray[0] = sdDirptr.i;
-#endif
-
- fragrecptr.p->nextAllocPage = 0;
- fragrecptr.p->fragState = SR_READ_PAGES;
- srReadPagesLab(signal);
- return;
-}//Dbacc::srAllocPage0011Lab()
-
-void Dbacc::srReadPagesLab(Signal* signal)
-{
- if (fragrecptr.p->nextAllocPage >= fragrecptr.p->noStoredPages) {
- /*--------------------------------------------------------------------------------*/
- /* WE HAVE NOW READ ALL NORMAL PAGES FROM THE FILE. */
- /*--------------------------------------------------------------------------------*/
- if (fragrecptr.p->nextAllocPage == fragrecptr.p->dirsize) {
- jam();
- /*--------------------------------------------------------------------------------*/
- /* WE HAVE NOW READ ALL NORMAL PAGES AND ALLOCATED ALL THE NEEDED PAGES. */
- /*--------------------------------------------------------------------------------*/
- fragrecptr.p->nextAllocPage = 0; /* THE NEXT OVER FLOW PAGE WHICH WILL BE READ */
- fragrecptr.p->fragState = SR_READ_OVER_PAGES;
- srReadOverPagesLab(signal);
- } else {
- ndbrequire(fragrecptr.p->nextAllocPage < fragrecptr.p->dirsize);
- jam();
- /*--------------------------------------------------------------------------------*/
- /* WE NEEDED TO ALLOCATE PAGES THAT WERE DEALLOCATED DURING THE LOCAL */
- /* CHECKPOINT. */
- /* ALLOCATE THE PAGE AND INITIALISE IT. THEN WE INSERT A REAL-TIME BREAK. */
- /*--------------------------------------------------------------------------------*/
- seizePage(signal);
- ndbrequire(tresult <= ZLIMIT_OF_ERROR);
- tipPageId = fragrecptr.p->nextAllocPage;
- inpPageptr.i = spPageptr.i;
- ptrCheckGuard(inpPageptr, cpagesize, page8);
- initPage(signal);
- fragrecptr.p->noOfExpectedPages = 1;
- fragrecptr.p->datapages[0] = spPageptr.i;
- signal->theData[0] = ZSR_READ_PAGES_ALLOC;
- signal->theData[1] = fragrecptr.i;
- sendSignal(cownBlockref, GSN_CONTINUEB, signal, 2, JBB);
- }//if
- return;
- }//if
- Uint32 limitLoop;
- if ((fragrecptr.p->noStoredPages - fragrecptr.p->nextAllocPage) < ZWRITEPAGESIZE) {
- jam();
- limitLoop = fragrecptr.p->noStoredPages - fragrecptr.p->nextAllocPage;
- } else {
- jam();
- limitLoop = ZWRITEPAGESIZE;
- }//if
- ndbrequire(limitLoop <= 8);
- for (Uint32 i = 0; i < limitLoop; i++) {
- jam();
- seizePage(signal);
- ndbrequire(tresult <= ZLIMIT_OF_ERROR);
- fragrecptr.p->datapages[i] = spPageptr.i;
- signal->theData[i + 6] = spPageptr.i;
- }//for
- signal->theData[limitLoop + 6] = fragrecptr.p->activeDataFilePage;
- fragrecptr.p->noOfExpectedPages = limitLoop;
- /* -----------------SEND READ PAGES SIGNAL TO THE FILE MANAGER --------- */
- fsConnectptr.i = fragrecptr.p->fsConnPtr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- fsConnectptr.p->fsState = WAIT_READ_DATA;
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsConnectptr.i;
- signal->theData[3] = 2;
- /* FLAG = LIST MEM PAGES, RANGE OF FILE PAGES */
- signal->theData[4] = ZPAGE8_BASE_ADD;
- signal->theData[5] = fragrecptr.p->noOfExpectedPages;
- sendSignal(NDBFS_REF, GSN_FSREADREQ, signal, 15, JBA);
- return;
-}//Dbacc::srReadPagesLab()
-
-void Dbacc::storeDataPageInDirectoryLab(Signal* signal)
-{
- fragrecptr.p->activeDataFilePage += fragrecptr.p->noOfExpectedPages;
- srReadPagesAllocLab(signal);
- return;
-}//Dbacc::storeDataPageInDirectoryLab()
-
-void Dbacc::srReadPagesAllocLab(Signal* signal)
-{
- DirRangePtr srpDirRangePtr;
- DirectoryarrayPtr srpDirptr;
- DirectoryarrayPtr srpOverflowDirptr;
- Page8Ptr srpPageidptr;
-
- if (fragrecptr.p->fragState == SR_READ_PAGES) {
- jam();
- for (Uint32 i = 0; i < fragrecptr.p->noOfExpectedPages; i++) {
- jam();
- tmpP = fragrecptr.p->nextAllocPage;
- srpDirRangePtr.i = fragrecptr.p->directory;
- tmpP2 = tmpP >> 8;
- tmp = tmpP & 0xff;
- ptrCheckGuard(srpDirRangePtr, cdirrangesize, dirRange);
- arrGuard(tmpP2, 256);
- if (srpDirRangePtr.p->dirArray[tmpP2] == RNIL) {
- seizeDirectory(signal);
- ndbrequire(tresult <= ZLIMIT_OF_ERROR);
- srpDirptr.i = sdDirptr.i;
- srpDirRangePtr.p->dirArray[tmpP2] = srpDirptr.i;
- } else {
- jam();
- srpDirptr.i = srpDirRangePtr.p->dirArray[tmpP2];
- }//if
- ptrCheckGuard(srpDirptr, cdirarraysize, directoryarray);
- arrGuard(i, 8);
- srpDirptr.p->pagep[tmp] = fragrecptr.p->datapages[i];
- srpPageidptr.i = fragrecptr.p->datapages[i];
- ptrCheckGuard(srpPageidptr, cpagesize, page8);
- ndbrequire(srpPageidptr.p->word32[ZPOS_PAGE_ID] == fragrecptr.p->nextAllocPage);
- ndbrequire(((srpPageidptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) == 0);
- ccoPageptr.p = srpPageidptr.p;
- checksumControl(signal, (Uint32)1);
- if (tresult > 0) {
- jam();
- return; // We will crash through a DEBUG_SIG
- }//if
- dbgWord32(srpPageidptr, ZPOS_OVERFLOWREC, RNIL);
- srpPageidptr.p->word32[ZPOS_OVERFLOWREC] = RNIL;
- fragrecptr.p->datapages[i] = RNIL;
- fragrecptr.p->nextAllocPage++;
- }//for
- srReadPagesLab(signal);
- return;
- } else {
- ndbrequire(fragrecptr.p->fragState == SR_READ_OVER_PAGES);
- for (Uint32 i = 0; i < fragrecptr.p->noOfExpectedPages; i++) {
- jam();
- arrGuard(i, 8);
- srpPageidptr.i = fragrecptr.p->datapages[i];
- ptrCheckGuard(srpPageidptr, cpagesize, page8);
- tmpP = srpPageidptr.p->word32[ZPOS_PAGE_ID]; /* DIR INDEX OF THE OVERFLOW PAGE */
- /*--------------------------------------------------------------------------------*/
- /* IT IS POSSIBLE THAT WE HAVE LOGICAL PAGES WHICH ARE NOT PART OF THE LOCAL*/
- /* CHECKPOINT. THUS WE USE THE LOGICAL PAGE ID FROM THE PAGE HERE. */
- /*--------------------------------------------------------------------------------*/
- srpDirRangePtr.i = fragrecptr.p->overflowdir;
- tmpP2 = tmpP >> 8;
- tmpP = tmpP & 0xff;
- ptrCheckGuard(srpDirRangePtr, cdirrangesize, dirRange);
- arrGuard(tmpP2, 256);
- if (srpDirRangePtr.p->dirArray[tmpP2] == RNIL) {
- jam();
- seizeDirectory(signal);
- ndbrequire(tresult <= ZLIMIT_OF_ERROR);
- srpDirRangePtr.p->dirArray[tmpP2] = sdDirptr.i;
- }//if
- srpOverflowDirptr.i = srpDirRangePtr.p->dirArray[tmpP2];
- ndbrequire(((srpPageidptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) != 0);
- ndbrequire(((srpPageidptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) != 3);
- ptrCheckGuard(srpOverflowDirptr, cdirarraysize, directoryarray);
- ndbrequire(srpOverflowDirptr.p->pagep[tmpP] == RNIL);
- srpOverflowDirptr.p->pagep[tmpP] = srpPageidptr.i;
- ccoPageptr.p = srpPageidptr.p;
- checksumControl(signal, (Uint32)1);
- ndbrequire(tresult == 0);
- dbgWord32(srpPageidptr, ZPOS_OVERFLOWREC, RNIL);
- srpPageidptr.p->word32[ZPOS_OVERFLOWREC] = RNIL;
- fragrecptr.p->nextAllocPage++;
- }//for
- srReadOverPagesLab(signal);
- return;
- }//if
-}//Dbacc::srReadPagesAllocLab()
-
-void Dbacc::srReadOverPagesLab(Signal* signal)
-{
- if (fragrecptr.p->nextAllocPage >= fragrecptr.p->noOfStoredOverPages) {
- fragrecptr.p->nextAllocPage = 0;
- if (fragrecptr.p->prevUndoposition == cminusOne) {
- jam();
- /* ************************ */
- /* ACC_OVER_REC */
- /* ************************ */
- /*--------------------------------------------------------------------------------*/
- /* UPDATE FREE LIST OF OVERFLOW PAGES AS PART OF SYSTEM RESTART AFTER */
- /* READING PAGES AND EXECUTING THE UNDO LOG. */
- /*--------------------------------------------------------------------------------*/
- signal->theData[0] = fragrecptr.i;
- sendSignal(cownBlockref, GSN_ACC_OVER_REC, signal, 1, JBB);
- } else {
- jam();
- srCloseDataFileLab(signal);
- }//if
- return;
- }//if
- Uint32 limitLoop;
- if ((fragrecptr.p->noOfStoredOverPages - fragrecptr.p->nextAllocPage) < ZWRITEPAGESIZE) {
- jam();
- limitLoop = fragrecptr.p->noOfStoredOverPages - fragrecptr.p->nextAllocPage;
- } else {
- jam();
- limitLoop = ZWRITEPAGESIZE;
- }//if
- ndbrequire(limitLoop <= 8);
- for (Uint32 i = 0; i < limitLoop; i++) {
- jam();
- seizePage(signal);
- ndbrequire(tresult <= ZLIMIT_OF_ERROR);
- fragrecptr.p->datapages[i] = spPageptr.i;
- signal->theData[i + 6] = spPageptr.i;
- }//for
- fragrecptr.p->noOfExpectedPages = limitLoop;
- signal->theData[limitLoop + 6] = fragrecptr.p->activeDataFilePage;
- /* -----------------SEND READ PAGES SIGNAL TO THE FILE MANAGER --------- */
- fsConnectptr.i = fragrecptr.p->fsConnPtr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- fsConnectptr.p->fsState = WAIT_READ_DATA;
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsConnectptr.i;
- signal->theData[3] = 2;
- signal->theData[4] = ZPAGE8_BASE_ADD;
- signal->theData[5] = fragrecptr.p->noOfExpectedPages;
- sendSignal(NDBFS_REF, GSN_FSREADREQ, signal, 15, JBA);
- return;
-}//Dbacc::srReadOverPagesLab()
-
-void Dbacc::srCloseDataFileLab(Signal* signal)
-{
- fsConnectptr.i = fragrecptr.p->fsConnPtr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- fsConnectptr.p->fsState = SR_CLOSE_DATA;
- /* ************************ */
- /* FSCLOSEREQ */
- /* ************************ */
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsConnectptr.i;
- signal->theData[3] = 0;
- sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, 4, JBA);
- return;
-}//Dbacc::srCloseDataFileLab()
-
-/* ************************ */
-/* ACC_SRCONF */
-/* ************************ */
-void Dbacc::sendaccSrconfLab(Signal* signal)
-{
- fragrecptr.i = fsConnectptr.p->fragrecPtr;
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- releaseFsConnRec(signal);
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- lcpConnectptr.i = rootfragrecptr.p->lcpPtr;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- fragrecptr.p->fragState = ACTIVEFRAG;
- fragrecptr.p->fsConnPtr = RNIL;
- for (Uint32 i = 0; i < ZWRITEPAGESIZE; i++) {
- fragrecptr.p->datapages[i] = RNIL;
- }//for
- rlpPageptr.i = fragrecptr.p->zeroPagePtr;
- ptrCheckGuard(rlpPageptr, cpagesize, page8);
- releaseLcpPage(signal);
- fragrecptr.p->zeroPagePtr = RNIL;
- signal->theData[0] = fragrecptr.p->lcpLqhPtr;
- sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_SRCONF, signal, 1, JBB);
- lcpConnectptr.p->noOfLcpConf++;
- if (lcpConnectptr.p->noOfLcpConf == 2) {
- jam();
- releaseLcpConnectRec(signal);
- rootfragrecptr.p->lcpPtr = RNIL;
- rootfragrecptr.p->rootState = ACTIVEROOT;
- }//if
- return;
-}//Dbacc::sendaccSrconfLab()
-
-/* --------------------------------------------------------------------------------- */
-/* CHECKSUM_CONTROL */
-/* INPUT: CCO_PAGEPTR */
-/* OUTPUT: TRESULT */
-/* */
-/* CHECK THAT CHECKSUM IN PAGE IS CORRECT TO ENSURE THAT NO ONE HAS CORRUPTED */
-/* THE PAGE INFORMATION. WHEN CALCULATING THE CHECKSUM WE REMOVE THE CHECKSUM */
-/* ITSELF FROM THE CHECKSUM BY XOR'ING THE CHECKSUM TWICE. WHEN CALCULATING */
-/* THE CHECKSUM THE CHECKSUM WORD IS ZERO WHICH MEANS NO CHANGE FROM XOR'ING. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::checksumControl(Signal* signal, Uint32 checkPage)
-{
- Uint32 Tchs;
- Uint32 tccoIndex;
- Uint32 Ti;
- Uint32 Tmp1;
- Uint32 Tmp2;
- Uint32 Tmp3;
- Uint32 Tmp4;
- Uint32 Tlimit;
-
- Tchs = 0;
- for (Ti = 0; Ti < 32 ; Ti++) {
- Tlimit = 16 + (Ti << 6);
- for (tccoIndex = (Ti << 6); tccoIndex < Tlimit; tccoIndex ++) {
- Tmp1 = ccoPageptr.p->word32[tccoIndex];
- Tmp2 = ccoPageptr.p->word32[tccoIndex + 16];
- Tmp3 = ccoPageptr.p->word32[tccoIndex + 32];
- Tmp4 = ccoPageptr.p->word32[tccoIndex + 48];
-
- Tchs = Tchs ^ Tmp1;
- Tchs = Tchs ^ Tmp2;
- Tchs = Tchs ^ Tmp3;
- Tchs = Tchs ^ Tmp4;
- }//for
- }//for
- if (Tchs == 0) {
- tresult = 0;
- if (checkPage != 0) {
- jam();
- lcnCopyPageptr.p = ccoPageptr.p;
- srCheckPage(signal);
- }//if
- } else {
- tresult = 1;
- }//if
- if (tresult != 0) {
- jam();
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- signal->theData[0] = RNIL;
- signal->theData[1] = rootfragrecptr.p->mytabptr;
- signal->theData[2] = fragrecptr.p->myfid;
- signal->theData[3] = ccoPageptr.p->word32[ZPOS_PAGE_ID];
- signal->theData[4] = tlupElemIndex;
- signal->theData[5] = ccoPageptr.p->word32[ZPOS_PAGE_TYPE];
- signal->theData[6] = tresult;
- sendSignal(cownBlockref, GSN_DEBUG_SIG, signal, 7, JBA);
- }//if
-}//Dbacc::checksumControl()
-
-/* ******************--------------------------------------------------------------- */
-/* START_RECREQ REQUEST TO START UNDO PROCESS */
-/* SENDER: LQH, LEVEL B */
-/* ENTER START_RECREQ WITH */
-/* CLQH_PTR, LQH CONNECTION PTR */
-/* CLQH_BLOCK_REF, LQH BLOCK REFERENCE */
-/* ******************--------------------------------------------------------------- */
-/* ******************--------------------------------------------------------------- */
-/* START_RECREQ REQUEST TO START UNDO PROCESS */
-/* ******************------------------------------+ */
-/* SENDER: LQH, LEVEL B */
-void Dbacc::execSTART_RECREQ(Signal* signal)
-{
- jamEntry();
- clqhPtr = signal->theData[0]; /* LQH CONNECTION PTR */
- clqhBlockRef = signal->theData[1]; /* LQH BLOCK REFERENCE */
- tresult = 0; /* 0= FALSE,1= TRUE,> ZLIMIT_OF_ERROR =ERRORCODE */
- for (int i = 0; i < UndoHeader::ZNO_UNDORECORD_TYPES; i++)
- cSrUndoRecords[i] = 0;
- startUndoLab(signal);
- return;
-}//Dbacc::execSTART_RECREQ()
-
-void Dbacc::startUndoLab(Signal* signal)
-{
- cundoLogActive = ZTRUE;
- /* ----- OPEN UNDO FILES --------- */
- for (tmp = 0; tmp <= ZMAX_UNDO_VERSION - 1; tmp++) {
- jam();
- if (csrVersList[tmp] != RNIL) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* SELECT THE NEXT SYSTEM RESTART RECORD WHICH CONTAINS AN UNDO LOG */
- /* THAT NEEDS TO BE EXECUTED AND SET UP THE DATA TO EXECUTE IT. */
- /*---------------------------------------------------------------------------*/
- srVersionPtr.i = csrVersList[tmp];
- csrVersList[tmp] = RNIL;
- ptrCheckGuard(srVersionPtr, csrVersionRecSize, srVersionRec);
- cactiveUndoFilePage = srVersionPtr.p->prevAddress >> 13;
- cprevUndoaddress = srVersionPtr.p->prevAddress;
- cactiveCheckpId = srVersionPtr.p->checkPointId;
-
- releaseSrRec(signal);
- startActiveUndo(signal);
- return;
- }//if
- }//for
-
- // Send report of how many undo log records where executed
- signal->theData[0] = NDB_LE_UNDORecordsExecuted;
- signal->theData[1] = DBACC; // From block
- signal->theData[2] = 0; // Total records executed
- for (int i = 0; i < 10; i++){
- if (i < UndoHeader::ZNO_UNDORECORD_TYPES){
- signal->theData[i+3] = cSrUndoRecords[i];
- signal->theData[2] += cSrUndoRecords[i];
- }else{
- signal->theData[i+3] = 0;
- }
- }
- sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 12, JBB);
-
- /* ******************************< */
- /* START_RECCONF */
- /* ******************************< */
- /*---------------------------------------------------------------------------*/
- /* REPORT COMPLETION OF UNDO LOG EXECUTION. */
- /*---------------------------------------------------------------------------*/
- cundoLogActive = ZFALSE;
- signal->theData[0] = clqhPtr;
- sendSignal(clqhBlockRef, GSN_START_RECCONF, signal, 1, JBB);
- /* LQH CONNECTION PTR */
- return;
-}//Dbacc::startUndoLab()
-
-/*---------------------------------------------------------------------------*/
-/* START THE UNDO OF AN UNDO LOG FILE BY OPENING THE UNDO LOG FILE. */
-/*---------------------------------------------------------------------------*/
-void Dbacc::startActiveUndo(Signal* signal)
-{
- if (cprevUndoaddress == cminusOne) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* THERE WAS NO UNDO LOG INFORMATION IN THIS LOG FILE. WE GET THE NEXT */
- /* OR REPORT COMPLETION. */
- /*---------------------------------------------------------------------------*/
- signal->theData[0] = ZSTART_UNDO;
- sendSignal(cownBlockref, GSN_CONTINUEB, signal, 1, JBB);
- } else {
- jam();
- /*---------------------------------------------------------------------------*/
- /* OPEN THE LOG FILE PERTAINING TO THIS UNDO LOG. */
- /*---------------------------------------------------------------------------*/
- if (cfsFirstfreeconnect == RNIL) {
- jam();
- sendSystemerror(signal);
- }//if
- seizeFsConnectRec(signal);
- cactiveSrFsPtr = fsConnectptr.i;
- fsConnectptr.p->fsState = OPEN_UNDO_FILE_SR;
- fsConnectptr.p->fsPart = 0;
- tmp1 = 1; /* FILE VERSION ? */
- tmp1 = (tmp1 << 8) + ZLOCALLOGFILE; /* .LOCLOG = 2 */
- tmp1 = (tmp1 << 8) + 4; /* ROOT DIRECTORY = D4 */
- tmp1 = (tmp1 << 8) + fsConnectptr.p->fsPart; /* P2 */
- tmp2 = 0x0; /* D7 DON'T CREATE , READ ONLY */
- /* DON'T TRUNCATE TO ZERO */
- /* ---FILE NAME "D4"/"DBACC"/LCP_CONNECTPTR:LOCAL_CHECK_PID/FS_CONNECTPTR:FS_PART".LOCLOG-- */
- /* ************************ */
- /* FSOPENREQ */
- /* ************************ */
- signal->theData[0] = cownBlockref;
- signal->theData[1] = fsConnectptr.i;
- signal->theData[2] = cminusOne; /* #FFFFFFFF */
- signal->theData[3] = cminusOne; /* #FFFFFFFF */
- signal->theData[4] = cactiveCheckpId; /* CHECKPOINT VERSION */
- signal->theData[5] = tmp1;
- signal->theData[6] = tmp2;
- sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA);
- }//if
-}//Dbacc::startActiveUndo()
-
-/* ------- READ A GROUP OF UNDO PAGES --------------- */
-void Dbacc::srStartUndoLab(Signal* signal)
-{
- /*---------------------------------------------------------------------------*/
- /* ALL LOG FILES HAVE BEEN OPENED. WE CAN NOW READ DATA FROM THE LAST */
- /* PAGE IN THE LAST LOG FILE AND BACKWARDS UNTIL WE REACH THE VERY */
- /* FIRST UNDO LOG RECORD. */
- /*---------------------------------------------------------------------------*/
- if (cactiveUndoFilePage >= ZWRITE_UNDOPAGESIZE) {
- jam();
- tmp1 = ZWRITE_UNDOPAGESIZE; /* NO OF READ UNDO PAGES */
- cactiveSrUndoPage = ZWRITE_UNDOPAGESIZE - 1; /* LAST PAGE */
- } else {
- jam();
- tmp1 = cactiveUndoFilePage + 1; /* NO OF READ UNDO PAGES */
- cactiveSrUndoPage = cactiveUndoFilePage;
- }//if
- fsConnectptr.i = cactiveSrFsPtr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsConnectptr.i;
- signal->theData[3] = 0;
- /* FLAG = LIST MEM PAGES, LIST FILE PAGES */
- signal->theData[4] = ZUNDOPAGE_BASE_ADD;
- signal->theData[5] = tmp1;
- signal->theData[6] = 0;
- signal->theData[7] = (cactiveUndoFilePage - tmp1) + 1;
- signal->theData[8] = 1;
- signal->theData[9] = cactiveUndoFilePage;
-
- sendSignal(NDBFS_REF, GSN_FSREADREQ, signal, 10, JBA);
- if (tmp1 > cactiveUndoFilePage) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* THIS IS THE LAST READ IN THIS LOG FILE. WE SET THE ACTIVE FILE */
- /* POINTER. IF IT IS THE FIRST WE SHOULD NEVER ATTEMPT ANY MORE READS */
- /* SINCE WE SHOULD ENCOUNTER A FIRST LOG RECORD WITH PREVIOUS PAGE ID */
- /* EQUAL TO RNIL. */
- /*---------------------------------------------------------------------------*/
- cactiveSrFsPtr = RNIL;
- fsConnectptr.p->fsState = READ_UNDO_PAGE_AND_CLOSE;
- } else {
- jam();
- /*---------------------------------------------------------------------------*/
- /* WE STILL HAVE MORE INFORMATION IN THIS LOG FILE. WE ONLY MOVE BACK */
- /* THE FILE PAGE. */
- /*---------------------------------------------------------------------------*/
- cactiveUndoFilePage = cactiveUndoFilePage - tmp1;
- fsConnectptr.p->fsState = READ_UNDO_PAGE;
- }//if
- return;
-}//Dbacc::srStartUndoLab()
-
-/* ------- DO UNDO ---------------------------*/
-/* ******************--------------------------------------------------------------- */
-/* NEXTOPERATION ORD FOR EXECUTION OF NEXT OP */
-/* ******************------------------------------+ */
-/* SENDER: ACC, LEVEL B */
-void Dbacc::execNEXTOPERATION(Signal* signal)
-{
- jamEntry();
- tresult = 0;
- srDoUndoLab(signal);
- return;
-}//Dbacc::execNEXTOPERATION()
-
-void Dbacc::srDoUndoLab(Signal* signal)
-{
- DirRangePtr souDirRangePtr;
- DirectoryarrayPtr souDirptr;
- Page8Ptr souPageidptr;
- Uint32 tundoPageindex;
- UndoHeader *undoHeaderPtr;
- Uint32 tmpindex;
-
- jam();
- undopageptr.i = cactiveSrUndoPage;
- ptrCheckGuard(undopageptr, cundopagesize, undopage);
- /*---------------------------------------------------------------------------*/
- /* LAYOUT OF AN UNDO LOG RECORD: */
- /* ***************************** */
- /* */
- /* |----------------------------------------------------| */
- /* | TABLE ID | */
- /* |----------------------------------------------------| */
- /* | ROOT FRAGMENT ID | */
- /* |----------------------------------------------------| */
- /* | LOCAL FRAGMENT ID | */
- /* |----------------------------------------------------| */
- /* | UNDO INFO LEN 14 b | TYPE 4 b | PAGE INDEX 14 b | */
- /* |----------------------------------------------------| */
- /* | INDEX INTO PAGE DIRECTORY (LOGICAL PAGE ID) | */
- /* |----------------------------------------------------| */
- /* | PREVIOUS UNDO LOG RECORD FOR THE FRAGMENT | */
- /* |----------------------------------------------------| */
- /* | PREVIOUS UNDO LOG RECORD FOR ALL FRAGMENTS | */
- /* |----------------------------------------------------| */
- /* | TYPE SPECIFIC PART | */
- /* |----------------------------------------------------| */
- /*---------------------------------------------------------------------------*/
- /*---------------------------------------------------------------------------*/
- /* SET THE PAGE POINTER. WE ONLY WORK WITH TWO PAGES IN THIS RESTART */
- /* ACTIVITY. GET THE PAGE POINTER AND THE PAGE INDEX TO READ FROM. */
- /*---------------------------------------------------------------------------*/
- tundoindex = cprevUndoaddress & ZUNDOPAGEINDEX_MASK; //0x1fff, 13 bits.
- undoHeaderPtr = (UndoHeader *) &undopageptr.p->undoword[tundoindex];
- tundoindex = tundoindex + ZUNDOHEADSIZE;
-
- /*------------------------------------------------------------------------*/
- /* READ TABLE ID AND ROOT FRAGMENT ID AND USE THIS TO GET ROOT RECORD. */
- /*------------------------------------------------------------------------*/
- arrGuard((tundoindex + 6), 8192);
-
- // TABLE ID
- tabptr.i = undoHeaderPtr->tableId;
- ptrCheckGuard(tabptr, ctablesize, tabrec);
-
- // ROOT FRAGMENT ID
- tfid = undoHeaderPtr->rootFragId;
- ndbrequire((undoHeaderPtr->localFragId >> 1) == undoHeaderPtr->rootFragId);
- if (!getrootfragmentrec(signal, rootfragrecptr, tfid)) {
- jam();
- /*---------------------------------------------------------------------*/
- /* THE ROOT RECORD WAS NOT FOUND. OBVIOUSLY WE ARE NOT RESTARTING THIS */
- /* FRAGMENT. WE THUS IGNORE THIS LOG RECORD AND PROCEED WITH THE NEXT. */
- /*---------------------------------------------------------------------*/
- creadyUndoaddress = cprevUndoaddress;
- // PREVIOUS UNDO LOG RECORD FOR ALL FRAGMENTS
- cprevUndoaddress = undoHeaderPtr->prevUndoAddress;
- undoNext2Lab(signal);
-#ifdef VM_TRACE
- ndbout_c("ignoring root fid %d", (int)tfid);
-#endif
- return;
- }//if
- /*-----------------------------------------------------------------------*/
- /* READ THE LOCAL FRAGMENT ID AND VERIFY THAT IT IS CORRECT. */
- /*-----------------------------------------------------------------------*/
- if (rootfragrecptr.p->fragmentid[0] == undoHeaderPtr->localFragId) {
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- } else {
- if (rootfragrecptr.p->fragmentid[1] == undoHeaderPtr->localFragId) {
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[1];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- } else {
- jam();
- progError(__LINE__, 0, "Invalid local fragment id in undo log");
- return;
- }//if
- }//if
- /*------------------------------------------------------------------------*/
- /* READ UNDO INFO LENGTH, TYPE OF LOG RECORD AND PAGE INDEX WHERE TO */
- /* APPLY THIS LOG RECORD. ALSO STEP INDEX TO PREPARE READ OF LOGICAL */
- /* PAGE ID. SET TMPINDEX TO INDEX THE FIRST WORD IN THE TYPE SPECIFIC */
- /* PART. */
- /*------------------------------------------------------------------------*/
- // UNDO INFO LENGTH 14 b | TYPE 4 b | PAGE INDEX 14 b
- const Uint32 tmp1 = undoHeaderPtr->variousInfo;
- cundoinfolength = tmp1 >> 18;
- const Uint32 tpageType = (tmp1 >> 14) & 0xf;
- tundoPageindex = tmp1 & 0x3fff;
-
- // INDEX INTO PAGE DIRECTORY (LOGICAL PAGE ID)
- tmpP = undoHeaderPtr->logicalPageId ;
- tmpindex = tundoindex;
- arrGuard((tmpindex + cundoinfolength - 1), 8192);
- if (fragrecptr.p->localCheckpId != cactiveCheckpId) {
- jam();
- /*-----------------------------------------------------------------------*/
- /* THE FRAGMENT DID EXIST BUT IS NOT AFFECTED BY THIS UNDO LOG */
- /* EXECUTION. EITHER IT BELONGS TO ANOTHER OR IT IS CREATED AND ONLY IN */
- /* NEED OF EXECUTION OF REDO LOG RECORDS FROM LQH. */
- /*-----------------------------------------------------------------------*/
- creadyUndoaddress = cprevUndoaddress;
- // PREVIOUS UNDO LOG RECORD FOR ALL FRAGMENTS
- cprevUndoaddress = undoHeaderPtr->prevUndoAddress;
-
- undoNext2Lab(signal);
- return;
- }//if
- /*-----------------------------------------------------------------------*/
- /* VERIFY CONSISTENCY OF UNDO LOG RECORDS. */
- /*-----------------------------------------------------------------------*/
- ndbrequire(fragrecptr.p->prevUndoposition == cprevUndoaddress);
- cSrUndoRecords[tpageType]++;
- switch(tpageType){
-
- case UndoHeader::ZPAGE_INFO:{
- jam();
- /*----------------------------------------------------------------------*/
- /* WE HAVE TO UNDO UPDATES IN A NORMAL PAGE. GET THE PAGE POINTER BY */
- /* USING THE LOGICAL PAGE ID. THEN RESET THE OLD VALUE IN THE PAGE BY */
- /* USING THE OLD DATA WHICH IS STORED IN THIS UNDO LOG RECORD. */
- /*----------------------------------------------------------------------*/
- souDirRangePtr.i = fragrecptr.p->directory;
- tmpP2 = tmpP >> 8;
- tmpP = tmpP & 0xff;
- ptrCheckGuard(souDirRangePtr, cdirrangesize, dirRange);
- arrGuard(tmpP2, 256);
- souDirptr.i = souDirRangePtr.p->dirArray[tmpP2];
- ptrCheckGuard(souDirptr, cdirarraysize, directoryarray);
- souPageidptr.i = souDirptr.p->pagep[tmpP];
- ptrCheckGuard(souPageidptr, cpagesize, page8);
- Uint32 loopLimit = tundoPageindex + cundoinfolength;
- ndbrequire(loopLimit <= 2048);
- for (Uint32 tmp = tundoPageindex; tmp < loopLimit; tmp++) {
- dbgWord32(souPageidptr, tmp, undopageptr.p->undoword[tmpindex]);
- souPageidptr.p->word32[tmp] = undopageptr.p->undoword[tmpindex];
- tmpindex = tmpindex + 1;
- }//for
- break;
- }
-
- case UndoHeader::ZOVER_PAGE_INFO:{
- jam();
- /*----------------------------------------------------------------------*/
- /* WE HAVE TO UNDO UPDATES IN AN OVERFLOW PAGE. GET THE PAGE POINTER BY*/
- /* USING THE LOGICAL PAGE ID. THEN RESET THE OLD VALUE IN THE PAGE BY */
- /* USING THE OLD DATA WHICH IS STORED IN THIS UNDO LOG RECORD. */
- /*----------------------------------------------------------------------*/
- souDirRangePtr.i = fragrecptr.p->overflowdir;
- tmpP2 = tmpP >> 8;
- tmpP = tmpP & 0xff;
- ptrCheckGuard(souDirRangePtr, cdirrangesize, dirRange);
- arrGuard(tmpP2, 256);
- souDirptr.i = souDirRangePtr.p->dirArray[tmpP2];
- ptrCheckGuard(souDirptr, cdirarraysize, directoryarray);
- souPageidptr.i = souDirptr.p->pagep[tmpP];
- ptrCheckGuard(souPageidptr, cpagesize, page8);
- Uint32 loopLimit = tundoPageindex + cundoinfolength;
- ndbrequire(loopLimit <= 2048);
- for (Uint32 tmp = tundoPageindex; tmp < loopLimit; tmp++) {
- dbgWord32(souPageidptr, tmp, undopageptr.p->undoword[tmpindex]);
- souPageidptr.p->word32[tmp] = undopageptr.p->undoword[tmpindex];
- tmpindex = tmpindex + 1;
- }//for
- break;
- }
-
- case UndoHeader::ZOP_INFO: {
- jam();
- /*---------------------------------------------------------------------*/
- /* AN OPERATION WAS ACTIVE WHEN LOCAL CHECKPOINT WAS EXECUTED. WE NEED */
- /* TO RESET THE LOCKS IT HAS SET. IF THE OPERATION WAS AN INSERT OR */
- /* THE ELEMENT WAS MARKED AS DISSAPEARED IT WILL ALSO BE REMOVED */
- /* FROM THE PAGE */
- /* */
- /* BEGIN BY SEARCHING AFTER THE ELEMENT, WHEN FOUND UNDO THE */
- /* CHANGES ON THE ELEMENT HEADER. IF IT WAS AN INSERT OPERATION OR */
- /* MARKED AS DISSAPEARED PROCEED BY REMOVING THE ELEMENT. */
- /*---------------------------------------------------------------------*/
- seizeOpRec(signal);
- // Initialise the opRec
- operationRecPtr.p->transId1 = 0;
- operationRecPtr.p->transId2 = RNIL;
- operationRecPtr.p->transactionstate = ACTIVE;
- operationRecPtr.p->commitDeleteCheckFlag = ZFALSE;
- operationRecPtr.p->lockMode = 0;
- operationRecPtr.p->dirtyRead = 0;
- operationRecPtr.p->nodeType = 0;
- operationRecPtr.p->fid = fragrecptr.p->myfid;
- operationRecPtr.p->nextParallelQue = RNIL;
- operationRecPtr.p->prevParallelQue = RNIL;
- operationRecPtr.p->nextQueOp = RNIL;
- operationRecPtr.p->prevQueOp = RNIL;
- operationRecPtr.p->nextSerialQue = RNIL;
- operationRecPtr.p->prevSerialQue = RNIL;
- operationRecPtr.p->elementPage = RNIL;
- operationRecPtr.p->keyinfoPage = RNIL;
- operationRecPtr.p->insertIsDone = ZFALSE;
- operationRecPtr.p->lockOwner = ZFALSE;
- operationRecPtr.p->elementIsDisappeared = ZFALSE;
- operationRecPtr.p->insertDeleteLen = fragrecptr.p->elementLength;
- operationRecPtr.p->longPagePtr = RNIL;
- operationRecPtr.p->longKeyPageIndex = RNIL;
- operationRecPtr.p->scanRecPtr = RNIL;
- operationRecPtr.p->isAccLockReq = ZFALSE;
- operationRecPtr.p->isUndoLogReq = ZTRUE;
-
- // Read operation values from undo page
- operationRecPtr.p->operation = undopageptr.p->undoword[tmpindex];
- tmpindex++;
- operationRecPtr.p->hashValue = undopageptr.p->undoword[tmpindex];
- tmpindex++;
- const Uint32 tkeylen = undopageptr.p->undoword[tmpindex];
- tmpindex++;
- operationRecPtr.p->tupkeylen = tkeylen;
- operationRecPtr.p->xfrmtupkeylen = 0; // not used
- operationRecPtr.p->fragptr = fragrecptr.i;
-
- ndbrequire(fragrecptr.p->keyLength != 0 &&
- fragrecptr.p->keyLength == tkeylen);
-
- // Read localkey1 from undo page
- signal->theData[7 + 0] = undopageptr.p->undoword[tmpindex];
- tmpindex = tmpindex + 1;
- arrGuard((tmpindex - 1), 8192);
- getElement(signal);
- if (tgeResult != ZTRUE) {
- jam();
- signal->theData[0] = RNIL;
- signal->theData[1] = tabptr.i;
- signal->theData[2] = cactiveCheckpId;
- signal->theData[3] = cprevUndoaddress;
- signal->theData[4] = operationRecPtr.p->operation;
- signal->theData[5] = operationRecPtr.p->hashValue;
- signal->theData[6] = operationRecPtr.p->tupkeylen;
- sendSignal(cownBlockref, GSN_DEBUG_SIG, signal, 11, JBA);
- return;
- }//if
-
- operationRecPtr.p->elementPage = gePageptr.i;
- operationRecPtr.p->elementContainer = tgeContainerptr;
- operationRecPtr.p->elementPointer = tgeElementptr;
- operationRecPtr.p->elementIsforward = tgeForward;
-
- commitdelete(signal, true);
- releaseOpRec(signal);
- break;
- }
-
- default:
- jam();
- progError(__LINE__, 0, "Invalid pagetype in undo log");
- break;
-
- }//switch(tpageType)
-
- /*----------------------------------------------------------------------*/
- /* READ THE PAGE ID AND THE PAGE INDEX OF THE PREVIOUS UNDO LOG RECORD */
- /* FOR THIS FRAGMENT. */
- /*----------------------------------------------------------------------*/
- fragrecptr.p->prevUndoposition = undoHeaderPtr->prevUndoAddressForThisFrag;
- /*----------------------------------------------------------------------*/
- /* READ THE PAGE ID AND THE PAGE INDEX OF THE PREVIOUS UNDO LOG RECORD */
- /* FOR THIS UNDO LOG. */
- /*----------------------------------------------------------------------*/
- creadyUndoaddress = cprevUndoaddress;
- cprevUndoaddress = undoHeaderPtr->prevUndoAddress;
-
- if (fragrecptr.p->prevUndoposition == cminusOne) {
- jam();
- /*---------------------------------------------------------------------*/
- /* WE HAVE NOW EXECUTED ALL UNDO LOG RECORDS FOR THIS FRAGMENT. WE */
- /* NOW NEED TO UPDATE THE FREE LIST OF OVERFLOW PAGES. */
- /*---------------------------------------------------------------------*/
- ndbrequire(fragrecptr.p->nextAllocPage == 0);
-
- signal->theData[0] = fragrecptr.i;
- sendSignal(cownBlockref, GSN_ACC_OVER_REC, signal, 1, JBB);
- return;
- }//if
- undoNext2Lab(signal);
- return;
-}//Dbacc::srDoUndoLab()
-
-void Dbacc::undoNext2Lab(Signal* signal)
-{
- /*---------------------------------------------------------------------------*/
- /* EXECUTE NEXT UNDO LOG RECORD. */
- /*---------------------------------------------------------------------------*/
- if (cprevUndoaddress == cminusOne) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* WE HAVE EXECUTED THIS UNDO LOG TO COMPLETION. IT IS NOW TIME TO TAKE*/
- /* OF THE NEXT UNDO LOG OR REPORT COMPLETION OF UNDO LOG EXECUTION. */
- /*---------------------------------------------------------------------------*/
- signal->theData[0] = ZSTART_UNDO;
- sendSignal(cownBlockref, GSN_CONTINUEB, signal, 1, JBB);
- return;
- }//if
- if ((creadyUndoaddress >> 13) != (cprevUndoaddress >> 13)) {
- /*---------------------------------------------------------------------------*/
- /* WE ARE CHANGING PAGE. */
- /*---------------------------------------------------------------------------*/
- if (cactiveSrUndoPage == 0) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* WE HAVE READ AND EXECUTED ALL UNDO LOG INFORMATION IN THE CURRENTLY */
- /* READ PAGES. WE STILL HAVE MORE INFORMATION TO READ FROM FILE SINCE */
- /* WE HAVEN'T FOUND THE FIRST LOG RECORD IN THE LOG FILE YET. */
- /*---------------------------------------------------------------------------*/
- srStartUndoLab(signal);
- return;
- } else {
- jam();
- /*---------------------------------------------------------------------------*/
- /* WE HAVE ANOTHER PAGE READ THAT WE NEED TO EXECUTE. */
- /*---------------------------------------------------------------------------*/
- cactiveSrUndoPage = cactiveSrUndoPage - 1;
- }//if
- }//if
- /*---------------------------------------------------------------------------*/
- /* REAL-TIME BREAK */
- /*---------------------------------------------------------------------------*/
- /* ******************************< */
- /* NEXTOPERATION */
- /* ******************************< */
- sendSignal(cownBlockref, GSN_NEXTOPERATION, signal, 1, JBB);
- return;
-}//Dbacc::undoNext2Lab()
-
-/*-----------------------------------------------------------------------------------*/
-/* AFTER COMPLETING THE READING OF DATA PAGES FROM DISK AND EXECUTING THE UNDO */
-/* LOG WE ARE READY TO UPDATE THE FREE LIST OF OVERFLOW PAGES. THIS LIST MUST */
-/* BE BUILT AGAIN SINCE IT IS NOT CHECKPOINTED. WHEN THE PAGES ARE ALLOCATED */
-/* THEY ARE NOT PART OF ANY LIST. PAGES CAN EITHER BE PUT IN FREE LIST, NOT */
-/* IN FREE LIST OR BE PUT INTO LIST OF LONG KEY PAGES. */
-/*-----------------------------------------------------------------------------------*/
-void Dbacc::execACC_OVER_REC(Signal* signal)
-{
- DirRangePtr pnoDirRangePtr;
- DirectoryarrayPtr pnoOverflowDirptr;
- Page8Ptr pnoPageidptr;
- Uint32 tpnoPageType;
- Uint32 toverPageCheck;
-
- jamEntry();
- fragrecptr.i = signal->theData[0];
- toverPageCheck = 0;
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- ndbrequire((fragrecptr.p->nextAllocPage != 0) ||
- (fragrecptr.p->firstOverflowRec == RNIL));
- /*-----------------------------------------------------------------------------------*/
- /* WHO HAS PUT SOMETHING INTO THE LIST BEFORE WE EVEN STARTED PUTTING THINGS */
- /* THERE. */
- /*-----------------------------------------------------------------------------------*/
- ndbrequire(fragrecptr.p->loadingFlag == ZTRUE);
- /*---------------------------------------------------------------------------*/
- /* LOADING HAS STOPPED BEFORE WE HAVE LOADED, SYSTEM ERROR. */
- /*---------------------------------------------------------------------------*/
- while (toverPageCheck < ZNO_OF_OP_PER_SIGNAL) {
- jam();
- if (fragrecptr.p->nextAllocPage >= fragrecptr.p->lastOverIndex) {
- jam();
- fragrecptr.p->loadingFlag = ZFALSE;
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- if (rootfragrecptr.p->lcpPtr != RNIL) {
- jam();
- srCloseDataFileLab(signal);
- } else {
- jam();
- undoNext2Lab(signal);
- }//if
- return;
- }//if
- tmpP = fragrecptr.p->nextAllocPage;
- pnoDirRangePtr.i = fragrecptr.p->overflowdir;
- tmpP2 = tmpP >> 8;
- tmpP = tmpP & 0xff;
- arrGuard(tmpP2, 256);
- ptrCheckGuard(pnoDirRangePtr, cdirrangesize, dirRange);
- if (pnoDirRangePtr.p->dirArray[tmpP2] == RNIL) {
- jam();
- pnoPageidptr.i = RNIL;
- } else {
- pnoOverflowDirptr.i = pnoDirRangePtr.p->dirArray[tmpP2];
- if (pnoOverflowDirptr.i == RNIL) {
- jam();
- pnoPageidptr.i = RNIL;
- } else {
- jam();
- ptrCheckGuard(pnoOverflowDirptr, cdirarraysize, directoryarray);
- pnoPageidptr.i = pnoOverflowDirptr.p->pagep[tmpP];
- }//if
- }//if
- if (pnoPageidptr.i == RNIL) {
- jam();
- seizeOverRec(signal);
- sorOverflowRecPtr.p->dirindex = fragrecptr.p->nextAllocPage;
- sorOverflowRecPtr.p->overpage = RNIL;
- priOverflowRecPtr = sorOverflowRecPtr;
- putRecInFreeOverdir(signal);
- } else {
- ptrCheckGuard(pnoPageidptr, cpagesize, page8);
- tpnoPageType = pnoPageidptr.p->word32[ZPOS_PAGE_TYPE];
- tpnoPageType = (tpnoPageType >> ZPOS_PAGE_TYPE_BIT) & 3;
- if (pnoPageidptr.p->word32[ZPOS_ALLOC_CONTAINERS] > ZFREE_LIMIT) {
- jam();
- dbgWord32(pnoPageidptr, ZPOS_OVERFLOWREC, RNIL);
- pnoPageidptr.p->word32[ZPOS_OVERFLOWREC] = RNIL;
- ndbrequire(pnoPageidptr.p->word32[ZPOS_PAGE_ID] == fragrecptr.p->nextAllocPage);
- } else {
- jam();
- seizeOverRec(signal);
- sorOverflowRecPtr.p->dirindex = pnoPageidptr.p->word32[ZPOS_PAGE_ID];
- ndbrequire(sorOverflowRecPtr.p->dirindex == fragrecptr.p->nextAllocPage);
- dbgWord32(pnoPageidptr, ZPOS_OVERFLOWREC, sorOverflowRecPtr.i);
- pnoPageidptr.p->word32[ZPOS_OVERFLOWREC] = sorOverflowRecPtr.i;
- sorOverflowRecPtr.p->overpage = pnoPageidptr.i;
- porOverflowRecPtr = sorOverflowRecPtr;
- putOverflowRecInFrag(signal);
- if (pnoPageidptr.p->word32[ZPOS_ALLOC_CONTAINERS] == 0) {
- jam();
- ropPageptr = pnoPageidptr;
- releaseOverpage(signal);
- }//if
- }//if
- }//if
- fragrecptr.p->nextAllocPage++;
- toverPageCheck++;
- }//while
- signal->theData[0] = fragrecptr.i;
- sendSignal(cownBlockref, GSN_ACC_OVER_REC, signal, 1, JBB);
-}//Dbacc::execACC_OVER_REC()
-
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* */
-/* END OF SYSTEM RESTART MODULE */
-/* */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* */
-/* SCAN MODULE */
-/* */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* ******************--------------------------------------------------------------- */
-/* ACC_SCANREQ START OF A SCAN PROCESS */
-/* SENDER: LQH, LEVEL B */
-/* ENTER ACC_SCANREQ WITH */
-/* TUSERPTR, LQH SCAN_CONNECT POINTER */
-/* TUSERBLOCKREF, LQH BLOCK REFERENCE */
-/* TABPTR, TABLE IDENTITY AND PTR */
-/* TFID ROOT FRAGMENT IDENTITY */
-/* TSCAN_FLAG , = ZCOPY, ZSCAN, ZSCAN_LOCK_ALL */
-/* ZREADLOCK, ZWRITELOCK */
-/* TSCAN_TRID1 , TRANSACTION ID PART 1 */
-/* TSCAN_TRID2 TRANSACTION ID PART 2 */
-/* ******************--------------------------------------------------------------- */
-/* ******************--------------------------------------------------------------- */
-/* ACC_SCANREQ START OF A SCAN PROCESS */
-/* ******************------------------------------+ */
-/* SENDER: LQH, LEVEL B */
void Dbacc::execACC_SCANREQ(Signal* signal)
{
jamEntry();
@@ -9260,12 +5578,12 @@
tresult = 0;
ptrCheckGuard(tabptr, ctablesize, tabrec);
- ndbrequire(getrootfragmentrec(signal,rootfragrecptr, tfid));
+ ndbrequire(getfragmentrec(signal, fragrecptr, tfid));
Uint32 i;
for (i = 0; i < MAX_PARALLEL_SCANS_PER_FRAG; i++) {
jam();
- if (rootfragrecptr.p->scan[i] == RNIL) {
+ if (fragrecptr.p->scan[i] == RNIL) {
jam();
break;
}
@@ -9274,7 +5592,7 @@
ndbrequire(cfirstFreeScanRec != RNIL);
seizeScanRec(signal);
- rootfragrecptr.p->scan[i] = scanPtr.i;
+ fragrecptr.p->scan[i] = scanPtr.i;
scanPtr.p->scanBucketState = ScanRec::FIRST_LAP;
scanPtr.p->scanLockMode = AccScanReq::getLockMode(tscanFlag);
scanPtr.p->scanReadCommittedFlag = AccScanReq::getReadCommittedFlag(tscanFlag);
@@ -9287,7 +5605,6 @@
scanPtr.p->scanUserblockref = tuserblockref;
scanPtr.p->scanTrid1 = tscanTrid1;
scanPtr.p->scanTrid2 = tscanTrid2;
- scanPtr.p->rootPtr = rootfragrecptr.i;
scanPtr.p->scanLockHeld = 0;
scanPtr.p->scanOpsAllocated = 0;
scanPtr.p->scanFirstActiveOp = RNIL;
@@ -9296,8 +5613,6 @@
scanPtr.p->scanFirstLockedOp = RNIL;
scanPtr.p->scanLastLockedOp = RNIL;
scanPtr.p->scanState = ScanRec::WAIT_NEXT;
- fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
initScanFragmentPart(signal);
/*------------------------------------------------------*/
@@ -9317,10 +5632,9 @@
/* ************************ */
signal->theData[0] = scanPtr.p->scanUserptr;
signal->theData[1] = scanPtr.i;
- signal->theData[2] = 2;
- /* NR OF LOCAL FRAGMENT */
- signal->theData[3] = rootfragrecptr.p->fragmentid[0];
- signal->theData[4] = rootfragrecptr.p->fragmentid[1];
+ signal->theData[2] = 1; /* NR OF LOCAL FRAGMENT */
+ signal->theData[3] = fragrecptr.p->fragmentid;
+ signal->theData[4] = RNIL;
signal->theData[7] = AccScanConf::ZNOT_EMPTY_FRAGMENT;
sendSignal(scanPtr.p->scanUserblockref, GSN_ACC_SCANCONF, signal, 8, JBB);
/* NOT EMPTY FRAGMENT */
@@ -9360,24 +5674,14 @@
case ZCOPY_NEXT_COMMIT:
case ZCOPY_COMMIT:
jam();
- /* --------------------------------------------------------------------------------- */
- /* COMMIT ACTIVE OPERATION. SEND NEXT SCAN ELEMENT IF IT IS ZCOPY_NEXT_COMMIT. */
- /* --------------------------------------------------------------------------------- */
+ /* --------------------------------------------------------------------- */
+ /* COMMIT ACTIVE OPERATION.
+ * SEND NEXT SCAN ELEMENT IF IT IS ZCOPY_NEXT_COMMIT.
+ * --------------------------------------------------------------------- */
ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
fragrecptr.i = operationRecPtr.p->fragptr;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
if (!scanPtr.p->scanReadCommittedFlag) {
- if (fragrecptr.p->createLcp == ZTRUE) {
- if (remainingUndoPages() < ZMIN_UNDO_PAGES_AT_COMMIT) {
- jam();
- /*--------------------------------------------------------------*/
- // We did not have enough undo log buffers to safely commit an
- // operation. Try again in 10 milliseconds.
- /*--------------------------------------------------------------*/
- sendSignalWithDelay(cownBlockref, GSN_NEXT_SCANREQ, signal, 10, 3);
- return;
- }//if
- }//if
commitOperation(signal);
}//if
takeOutActiveScanOp(signal);
@@ -9395,19 +5699,6 @@
jam();
fragrecptr.i = scanPtr.p->activeLocalFrag;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- if (!scanPtr.p->scanReadCommittedFlag) {
- if (fragrecptr.p->createLcp == ZTRUE) {
- if (remainingUndoPages() < ZMIN_UNDO_PAGES_AT_OPERATION) {
- jam();
- /*--------------------------------------------------------------*/
- // We did not have enough undo log buffers to commit a set of
- // operations. Try again in 10 milliseconds.
- /*--------------------------------------------------------------*/
- sendSignalWithDelay(cownBlockref, GSN_NEXT_SCANREQ, signal, 10, 3);
- return;
- }//if
- }//if
- }//if
/* --------------------------------------------------------------------------------- */
/* THE SCAN PROCESS IS FINISHED. RELOCK ALL LOCKED EL. RELESE ALL INVOLVED REC. */
/* --------------------------------------------------------------------------------- */
@@ -9639,38 +5930,10 @@
void Dbacc::checkNextFragmentLab(Signal* signal)
{
- RootfragmentrecPtr cnfRootfragrecptr;
-
- cnfRootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(cnfRootfragrecptr, crootfragmentsize, rootfragmentrec);
- if (scanPtr.p->activeLocalFrag == cnfRootfragrecptr.p->fragmentptr[0]) {
- jam();
- fragrecptr.i = cnfRootfragrecptr.p->fragmentptr[1];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- initScanFragmentPart(signal);
- signal->theData[0] = scanPtr.i;
- signal->theData[1] = AccCheckScan::ZCHECK_LCP_STOP;
- sendSignal(cownBlockref, GSN_ACC_CHECK_SCAN, signal, 2, JBB);
- return;
- } else {
- if (scanPtr.p->activeLocalFrag == cnfRootfragrecptr.p->fragmentptr[1]) {
- jam();
- /* --------------------------------------------------------------------------------- */
- // Both fragments have completed their scan part and we can indicate that the scan is
- // now completed.
- /* --------------------------------------------------------------------------------- */
- scanPtr.p->scanBucketState = ScanRec::SCAN_COMPLETED;
- /*empty*/;
- } else {
- jam();
- /* ALL ELEMENTS ARE SENT */
- sendSystemerror(signal);
- }//if
- }//if
- /* --------------------------------------------------------------------------------- */
- // The scan is completed. ACC_CHECK_SCAN will perform all the necessary checks to see
+ scanPtr.p->scanBucketState = ScanRec::SCAN_COMPLETED;
+ // The scan is completed. ACC_CHECK_SCAN will perform all the necessary
+ // checks to see
// what the next step is.
- /* --------------------------------------------------------------------------------- */
signal->theData[0] = scanPtr.i;
signal->theData[1] = AccCheckScan::ZCHECK_LCP_STOP;
execACC_CHECK_SCAN(signal);
@@ -9719,13 +5982,13 @@
releaseAndCommitQueuedOps(signal);
releaseAndAbortLockedOps(signal);
- rootfragrecptr.i = scanPtr.p->rootPtr;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
+ fragrecptr.i = scanPtr.p->activeLocalFrag;
+ ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
for (tmp = 0; tmp < MAX_PARALLEL_SCANS_PER_FRAG; tmp++) {
jam();
- if (rootfragrecptr.p->scan[tmp] == scanPtr.i) {
+ if (fragrecptr.p->scan[tmp] == scanPtr.i) {
jam();
- rootfragrecptr.p->scan[tmp] = RNIL;
+ fragrecptr.p->scan[tmp] = RNIL;
}//if
}//for
// Stops the heartbeat.
@@ -9821,10 +6084,11 @@
ptrCheckGuard(scanPtr, cscanRecSize, scanRec);
while (scanPtr.p->scanFirstQueuedOp != RNIL) {
jam();
- //----------------------------------------------------------------------------
- // An operation has been released from the lock queue. We are in the parallel
- // queue of this tuple. We are ready to report the tuple now.
- //----------------------------------------------------------------------------
+ //---------------------------------------------------------------------
+ // An operation has been released from the lock queue.
+ // We are in the parallel queue of this tuple. We are
+ // ready to report the tuple now.
+ //------------------------------------------------------------------------
operationRecPtr.i = scanPtr.p->scanFirstQueuedOp;
ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
takeOutReadyScanQueue(signal);
@@ -9832,17 +6096,6 @@
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
if (operationRecPtr.p->elementIsDisappeared == ZTRUE) {
jam();
- if (fragrecptr.p->createLcp == ZTRUE) {
- if (remainingUndoPages() < ZMIN_UNDO_PAGES_AT_COMMIT) {
- jam();
- /*--------------------------------------------------------------*/
- // We did not have enough undo log buffers to safely abort an
- // operation. Try again in 10 milliseconds.
- /*--------------------------------------------------------------*/
- sendSignalWithDelay(cownBlockref, GSN_ACC_CHECK_SCAN, signal, 10, 2);
- return;
- }//if
- }//if
abortOperation(signal);
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
@@ -10604,14 +6857,14 @@
/* --------------------------------------------------------------------------------- */
/* --------------------------------------------------------------------------------- */
-bool Dbacc::getrootfragmentrec(Signal* signal, RootfragmentrecPtr& rootPtr, Uint32 fid)
+bool Dbacc::getfragmentrec(Signal* signal, FragmentrecPtr& rootPtr, Uint32 fid)
{
for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
jam();
if (tabptr.p->fragholder[i] == fid) {
jam();
- rootPtr.i = tabptr.p->fragptrholder[i];
- ptrCheckGuard(rootPtr, crootfragmentsize, rootfragmentrec);
+ fragrecptr.i = tabptr.p->fragptrholder[i];
+ ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
return true;
}//if
}//for
@@ -10619,22 +6872,12 @@
}//Dbacc::getrootfragmentrec()
/* --------------------------------------------------------------------------------- */
-/* INIT_FS_OP_REC */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::initFsOpRec(Signal* signal)
-{
- fsOpptr.p->fsOpfragrecPtr = fragrecptr.i;
- fsOpptr.p->fsConptr = fsConnectptr.i;
-}//Dbacc::initFsOpRec()
-
-/* --------------------------------------------------------------------------------- */
/* INIT_LCP_CONN_REC */
/* --------------------------------------------------------------------------------- */
void Dbacc::initLcpConnRec(Signal* signal)
{
lcpConnectptr.p->lcpUserblockref = tuserblockref;
lcpConnectptr.p->lcpUserptr = tuserptr;
- lcpConnectptr.p->noOfLcpConf = 0; /* NO OF RETUREND CONF SIGNALS */
lcpConnectptr.p->syncUndopageState = WAIT_NOTHING;
}//Dbacc::initLcpConnRec()
@@ -10796,40 +7039,6 @@
}//Dbacc::initPage()
/* --------------------------------------------------------------------------------- */
-/* PUT_OP_IN_FRAG_WAIT_QUE */
-/* DESCRIPTION: AN OPERATION WHICH OWNS A LOCK OF AN ELEMENT, IS PUT IN A */
-/* LIST OF THE FRAGMENT. THIS LIST IS USED TO STOP THE QUEUE */
-/* OPERATION DURING CREATE CHECK POINT PROSESS FOR STOP AND */
-/* RESTART OF THE OPERATIONS. */
-/* */
-/* IF CONTINUEB SIGNALS ARE INTRODUCED AFTER STARTING TO EXECUTE ACCKEYREQ WE */
-/* MUST PUT IT IN THIS LIST BEFORE EXITING TO ENSURE THAT WE ARE NOT BEING */
-/* LOCKED AFTER THAT LQH HAS RECEIVED ALL LCP_HOLDOP'S. THEN THE LCP WILL NEVER*/
-/* PROCEED. WE ALSO PUT IT INTO THIS LIST WHEN WAITING FOR LONG KEYS. THIS IS */
-/* ONLY NEEDED IF SIGNALS CAN ENTER BETWEEN THE KEYDATA CARRYING SIGNALS. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::putOpInFragWaitQue(Signal* signal)
-{
- OperationrecPtr tpiwOperRecPtr;
-
- if (operationRecPtr.p->operation != ZSCAN_OP) {
- if (fragrecptr.p->firstWaitInQueOp == RNIL) {
- jam();
- fragrecptr.p->firstWaitInQueOp = operationRecPtr.i;
- } else {
- jam();
- tpiwOperRecPtr.i = fragrecptr.p->lastWaitInQueOp;
- ptrCheckGuard(tpiwOperRecPtr, coprecsize, operationrec);
- tpiwOperRecPtr.p->nextQueOp = operationRecPtr.i;
- }//if
- operationRecPtr.p->opState = WAIT_IN_QUEUE;
- operationRecPtr.p->nextQueOp = RNIL;
- operationRecPtr.p->prevQueOp = fragrecptr.p->lastWaitInQueOp;
- fragrecptr.p->lastWaitInQueOp = operationRecPtr.i;
- }//if
-}//Dbacc::putOpInFragWaitQue()
-
-/* --------------------------------------------------------------------------------- */
/* PUT_OVERFLOW_REC_IN_FRAG */
/* DESCRIPTION: AN OVERFLOW RECORD WITCH IS USED TO KEEP INFORMATION ABOUT */
/* OVERFLOW PAGE WILL BE PUT IN A LIST OF OVERFLOW RECORDS IN */
@@ -10948,26 +7157,6 @@
}//Dbacc::releaseDirrange()
/* --------------------------------------------------------------------------------- */
-/* RELEASE_FS_CONN_REC */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::releaseFsConnRec(Signal* signal)
-{
- fsConnectptr.p->fsNext = cfsFirstfreeconnect;
- cfsFirstfreeconnect = fsConnectptr.i;
- fsConnectptr.p->fsState = WAIT_NOTHING;
-}//Dbacc::releaseFsConnRec()
-
-/* --------------------------------------------------------------------------------- */
-/* RELEASE_FS_OP_REC */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::releaseFsOpRec(Signal* signal)
-{
- fsOpptr.p->fsOpnext = cfsFirstfreeop;
- cfsFirstfreeop = fsOpptr.i;
- fsOpptr.p->fsOpstate = WAIT_NOTHING;
-}//Dbacc::releaseFsOpRec()
-
-/* --------------------------------------------------------------------------------- */
/* RELEASE_LCP_CONNECT_REC */
/* --------------------------------------------------------------------------------- */
void Dbacc::releaseLcpConnectRec(Signal* signal)
@@ -11047,30 +7236,6 @@
jam();
return; /* THERE IS ONLY ONE OVERFLOW PAGE */
}//if
- if ((fragrecptr.p->createLcp == ZTRUE) &&
- (fragrecptr.p->lcpMaxOverDirIndex > ropPageptr.p->word32[ZPOS_PAGE_ID])) {
- /* --------------------------------------------------------------------------------- */
- /* THE PAGE PARTICIPATES IN THE LOCAL CHECKPOINT. */
- /* --------------------------------------------------------------------------------- */
- if (fragrecptr.p->fragState == LCP_SEND_PAGES) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* THE PAGE PARTICIPATES IN THE LOCAL CHECKPOINT AND THE WRITE TO DISK HAS NOT */
- /* YET BEEN COMPLETED. WE MUST KEEP IT A WHILE LONGER SINCE AN EMPTY PAGE IS */
- /* NOT EQUIVALENT TO AN INITIALISED PAGE SINCE THE FREE LISTS CAN DIFFER. */
- /* --------------------------------------------------------------------------------- */
- return;
- } else {
- if ((fragrecptr.p->fragState == LCP_SEND_OVER_PAGES) &&
- (fragrecptr.p->lcpDirIndex <= ropPageptr.p->word32[ZPOS_PAGE_ID])) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* SEE COMMENT ABOVE */
- /* --------------------------------------------------------------------------------- */
- return;
- }//if
- }//if
- }//if
#if kalle
logicalPage = 0;
@@ -11094,9 +7259,9 @@
#endif
- /* --------------------------------------------------------------------------------- */
- /* IT WAS OK TO RELEASE THE PAGE. */
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------------- */
+ /* IT WAS OK TO RELEASE THE PAGE. */
+ /* ----------------------------------------------------------------------- */
ptrCheckGuard(ropOverflowRecPtr, coverflowrecsize, overflowRecord);
tfoOverflowRecPtr = ropOverflowRecPtr;
takeRecOutOfFreeOverpage(signal);
@@ -11118,10 +7283,10 @@
jam();
return;
}//if
- /* --------------------------------------------------------------------------------- */
- /* THE LAST PAGE IN THE DIRECTORY WAS RELEASED IT IS NOW NECESSARY TO REMOVE */
- /* ALL RELEASED OVERFLOW DIRECTORIES AT THE END OF THE LIST. */
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------------- */
+ /* THE LAST PAGE IN THE DIRECTORY WAS RELEASED IT IS NOW NECESSARY
+ * TO REMOVE ALL RELEASED OVERFLOW DIRECTORIES AT THE END OF THE LIST.
+ * ---------------------------------------------------------------------- */
do {
fragrecptr.p->lastOverIndex--;
if (tropTmp2 == 0) {
@@ -11139,10 +7304,10 @@
ropOverflowDirptr.i = ropOverflowrangeptr.p->dirArray[tropTmp1];
ptrCheckGuard(ropOverflowDirptr, cdirarraysize, directoryarray);
} while (ropOverflowDirptr.p->pagep[tropTmp2] == RNIL);
- /* --------------------------------------------------------------------------------- */
- /* RELEASE ANY OVERFLOW RECORDS THAT ARE PART OF THE FREE INDEX LIST WHICH */
- /* DIRECTORY INDEX NOW HAS BEEN RELEASED. */
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------------- */
+ /* RELEASE ANY OVERFLOW RECORDS THAT ARE PART OF THE FREE INDEX LIST WHICH */
+ /* DIRECTORY INDEX NOW HAS BEEN RELEASED. */
+ /* ----------------------------------------------------------------------- */
tuodOverflowRecPtr.i = fragrecptr.p->firstFreeDirindexRec;
jam();
while (tuodOverflowRecPtr.i != RNIL) {
@@ -11162,9 +7327,9 @@
}//while
}//Dbacc::releaseOverpage()
-/* --------------------------------------------------------------------------------- */
-/* RELEASE_PAGE */
-/* --------------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* RELEASE_PAGE */
+/* ------------------------------------------------------------------------- */
void Dbacc::releasePage(Signal* signal)
{
#ifdef VM_TRACE
@@ -11189,9 +7354,9 @@
cnoOfAllocatedPages--;
}//Dbacc::releasePage()
-/* --------------------------------------------------------------------------------- */
-/* RELEASE_LCP_PAGE */
-/* --------------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------ */
+/* RELEASE_LCP_PAGE */
+/* ------------------------------------------------------------------------ */
void Dbacc::releaseLcpPage(Signal* signal)
{
rlpPageptr.p->word32[0] = cfirstfreeLcpPage;
@@ -11199,15 +7364,6 @@
}//Dbacc::releaseLcpPage()
/* --------------------------------------------------------------------------------- */
-/* RELEASE_SR_REC */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::releaseSrRec(Signal* signal)
-{
- srVersionPtr.p->nextFreeSr = cfirstFreeSrVersionRec;
- cfirstFreeSrVersionRec = srVersionPtr.i;
-}//Dbacc::releaseSrRec()
-
-/* --------------------------------------------------------------------------------- */
/* SEIZE_DIRECTORY */
/* DESCRIPTION: A DIRECTORY BLOCK (ZDIRBLOCKSIZE NUMBERS OF DIRECTORY */
/* RECORDS WILL BE ALLOCATED AND RETURNED. */
@@ -11269,29 +7425,6 @@
}//Dbacc::seizeFragrec()
/* --------------------------------------------------------------------------------- */
-/* SEIZE_FS_CONNECT_REC */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::seizeFsConnectRec(Signal* signal)
-{
- fsConnectptr.i = cfsFirstfreeconnect;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- cfsFirstfreeconnect = fsConnectptr.p->fsNext;
- fsConnectptr.p->fsNext = RNIL;
- fsConnectptr.p->fsState = WAIT_NOTHING;
-}//Dbacc::seizeFsConnectRec()
-
-/* --------------------------------------------------------------------------------- */
-/* SEIZE_FS_OP_REC */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::seizeFsOpRec(Signal* signal)
-{
- fsOpptr.i = cfsFirstfreeop;
- ptrCheckGuard(fsOpptr, cfsOpsize, fsOprec);
- cfsFirstfreeop = fsOpptr.p->fsOpnext;
- fsOpptr.p->fsOpnext = RNIL;
-}//Dbacc::seizeFsOpRec()
-
-/* --------------------------------------------------------------------------------- */
/* SEIZE_LCP_CONNECT_REC */
/* --------------------------------------------------------------------------------- */
void Dbacc::seizeLcpConnectRec(Signal* signal)
@@ -11382,13 +7515,6 @@
/* --------------------------------------------------------------------------------- */
/* SEIZE_ROOTFRAGREC */
/* --------------------------------------------------------------------------------- */
-void Dbacc::seizeRootfragrec(Signal* signal)
-{
- rootfragrecptr.i = cfirstfreerootfrag;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- cfirstfreerootfrag = rootfragrecptr.p->nextroot;
- rootfragrecptr.p->nextroot = RNIL;
-}//Dbacc::seizeRootfragrec()
/* --------------------------------------------------------------------------------- */
/* SEIZE_SCAN_REC */
@@ -11404,13 +7530,6 @@
/* --------------------------------------------------------------------------------- */
/* SEIZE_SR_VERSION_REC */
/* --------------------------------------------------------------------------------- */
-void Dbacc::seizeSrVerRec(Signal* signal)
-{
- srVersionPtr.i = cfirstFreeSrVersionRec;
- ptrCheckGuard(srVersionPtr, csrVersionRecSize, srVersionRec);
- cfirstFreeSrVersionRec = srVersionPtr.p->nextFreeSr;
-}//Dbacc::seizeSrVerRec()
-
/* --------------------------------------------------------------------------------- */
/* SEND_SYSTEMERROR */
/* --------------------------------------------------------------------------------- */
@@ -11506,11 +7625,10 @@
scanPtr.i, scanPtr.p->scanState,scanPtr.p->scanTrid1,
scanPtr.p->scanTrid2);
infoEvent(" timer=%d, continueBCount=%d, "
- "activeLocalFrag=%d, root=%d, nextBucketIndex=%d",
+ "activeLocalFrag=%d, nextBucketIndex=%d",
scanPtr.p->scanTimer,
scanPtr.p->scanContinuebCounter,
scanPtr.p->activeLocalFrag,
- scanPtr.p->rootPtr,
scanPtr.p->nextBucketIndex);
infoEvent(" scanNextfreerec=%d firstActOp=%d firstLockedOp=%d, "
"scanLastLockedOp=%d firstQOp=%d lastQOp=%d",
@@ -11807,20 +7925,18 @@
}//execSET_VAR_REQ()
void
-Dbacc::execREAD_PSUEDO_REQ(Signal* signal){
+Dbacc::execREAD_PSEUDO_REQ(Signal* signal){
jamEntry();
fragrecptr.i = signal->theData[0];
Uint32 attrId = signal->theData[1];
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
Uint64 tmp;
switch(attrId){
case AttributeHeader::ROW_COUNT:
- tmp = rootfragrecptr.p->noOfElements;
+ tmp = fragrecptr.p->noOfElements;
break;
case AttributeHeader::COMMIT_COUNT:
- tmp = rootfragrecptr.p->m_commit_count;
+ tmp = fragrecptr.p->m_commit_count;
break;
default:
tmp = 0;
@@ -11834,3 +7950,68 @@
// signal->theData[1] = src[1];
}
+void
+Dbacc::execTUX_MAINT_REQ(Signal* signal)
+{
+ jamEntry();
+ TuxMaintReq* const req = (TuxMaintReq*)signal->getDataPtrSend();
+
+ tabptr.i = req->tableId;
+ Uint32 fragId = req->fragId;
+ Uint32 pageId = req->pageId;
+ Uint32 tupFragPtrI = req->tupFragPtrI;
+ Uint32 localKey = (req->fragPageId << MAX_TUPLES_BITS) | req->pageIndex;
+
+ int ret = c_tup->tuxReadPk(tupFragPtrI, pageId, req->pageIndex,
+ signal->theData+24, true);
+
+ if(ret <= 0)
+ {
+ req->errorCode = (Uint32)-ret;
+ return;
+ }
+
+ Uint32 hashValue = md5_hash((Uint64*)(signal->theData+24), ret);
+ ptrCheckGuard(tabptr, ctablesize, tabrec);
+
+ fragrecptr.i = RNIL;
+ for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++)
+ {
+ jam();
+ if (tabptr.p->fragholder[i] == fragId)
+ {
+ fragrecptr.i = tabptr.p->fragptrholder[i];
+ break;
+ }
+ }
+ ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
+
+ ndbrequire(cfreeopRec != RNIL);
+ seizeOpRec(signal);
+ // init as in ACCSEIZEREQ
+ operationRecPtr.p->userptr = 0;
+ operationRecPtr.p->userblockref = 0;
+ operationRecPtr.p->operation = ZUNDEFINED_OP;
+ operationRecPtr.p->transactionstate = IDLE;
+
+ // do insert via ACCKEYREQ
+ signal->theData[0] = operationRecPtr.i;
+ signal->theData[1] = fragrecptr.i;
+ signal->theData[2] = ZINSERT | (1u << 31);
+ signal->theData[3] = hashValue;
+ signal->theData[4] = fragrecptr.p->keyLength;
+ signal->theData[5] = 0; // TransId1
+ signal->theData[6] = 0; // TransId2
+ // enter local key in place of PK
+ signal->theData[7] = localKey;
+ EXECUTE_DIRECT(DBACC, GSN_ACCKEYREQ, signal, 8);
+
+ jamEntry();
+ ndbrequire(signal->theData[0] == 0); // userptr
+
+ signal->theData[0] = operationRecPtr.i;
+ EXECUTE_DIRECT(DBACC, GSN_ACC_COMMITREQ, signal, 1);
+
+ releaseOpRec(signal);
+ req->errorCode = 0;
+}
--- 1.3.6.1/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2005-07-25 13:23:29 +02:00
+++ 1.37/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2005-07-25 15:49:55 +02:00
@@ -31,6 +31,7 @@
#include <signaldata/TupKey.hpp>
#include <signaldata/DropTab.hpp>
+#include <SLList.hpp>
#define DEBUG(x) { ndbout << "TUP::" << x << endl; }
@@ -40,71 +41,49 @@
void Dbtup::initData()
{
cnoOfAttrbufrec = ZNO_OF_ATTRBUFREC;
- cnoOfLcpRec = ZNO_OF_LCP_REC;
- cnoOfConcurrentOpenOp = ZNO_OF_CONCURRENT_OPEN_OP;
- cnoOfConcurrentWriteOp = ZNO_OF_CONCURRENT_WRITE_OP;
- cnoOfFragoprec = 2 * MAX_FRAG_PER_NODE;
+ cnoOfFragrec = MAX_FRAG_PER_NODE;
+ cnoOfPage = ZNO_OF_PAGE;
+ cnoOfFragoprec = MAX_FRAG_PER_NODE;
cnoOfPageRangeRec = ZNO_OF_PAGE_RANGE_REC;
- cnoOfParallellUndoFiles = ZNO_OF_PARALLELL_UNDO_FILES;
- cnoOfRestartInfoRec = ZNO_OF_RESTART_INFO_REC;
c_maxTriggersPerTable = ZDEFAULT_MAX_NO_TRIGGERS_PER_TABLE;
c_noOfBuildIndexRec = 32;
attrbufrec = 0;
- checkpointInfo = 0;
- diskBufferSegmentInfo = 0;
fragoperrec = 0;
fragrecord = 0;
hostBuffer = 0;
- localLogInfo = 0;
- operationrec = 0;
- page = 0;
+ cpage = 0;
pageRange = 0;
- pendingFileOpenInfo = 0;
- restartInfoRecord = 0;
tablerec = 0;
tableDescriptor = 0;
- undoPage = 0;
totNoOfPagesAllocated = 0;
cnoOfAllocatedPages = 0;
// Records with constant sizes
+ init_list_sizes();
}//Dbtup::initData()
-Dbtup::Dbtup(const class Configuration & conf)
+Dbtup::Dbtup(const class Configuration & conf, Pgman* pgman)
: SimulatedBlock(DBTUP, conf),
- c_storedProcPool(),
- c_buildIndexList(c_buildIndexPool)
+ c_lqh(0),
+ c_storedProcPool(),
+ c_buildIndexList(c_buildIndexPool),
+ c_undo_buffer(this),
+ m_pgman(this, pgman)
{
- Uint32 log_page_size= 0;
BLOCK_CONSTRUCTOR(Dbtup);
const ndb_mgm_configuration_iterator * p = conf.getOwnConfigIterator();
ndbrequire(p != 0);
- ndb_mgm_get_int_parameter(p, CFG_DB_UNDO_DATA_BUFFER,
- &log_page_size);
-
- /**
- * Always set page size in half MBytes
- */
- cnoOfUndoPage= (log_page_size / sizeof(UndoPage));
- Uint32 mega_byte_part= cnoOfUndoPage & 15;
- if (mega_byte_part != 0) {
- jam();
- cnoOfUndoPage+= (16 - mega_byte_part);
- }
-
addRecSignal(GSN_DEBUG_SIG, &Dbtup::execDEBUG_SIG);
addRecSignal(GSN_CONTINUEB, &Dbtup::execCONTINUEB);
+ addRecSignal(GSN_LCP_FRAG_ORD, &Dbtup::execLCP_FRAG_ORD);
addRecSignal(GSN_DUMP_STATE_ORD, &Dbtup::execDUMP_STATE_ORD);
addRecSignal(GSN_SEND_PACKED, &Dbtup::execSEND_PACKED);
addRecSignal(GSN_ATTRINFO, &Dbtup::execATTRINFO);
addRecSignal(GSN_STTOR, &Dbtup::execSTTOR);
- addRecSignal(GSN_TUP_LCPREQ, &Dbtup::execTUP_LCPREQ);
- addRecSignal(GSN_END_LCPREQ, &Dbtup::execEND_LCPREQ);
- addRecSignal(GSN_START_RECREQ, &Dbtup::execSTART_RECREQ);
addRecSignal(GSN_MEMCHECKREQ, &Dbtup::execMEMCHECKREQ);
addRecSignal(GSN_TUPKEYREQ, &Dbtup::execTUPKEYREQ);
addRecSignal(GSN_TUPSEIZEREQ, &Dbtup::execTUPSEIZEREQ);
@@ -114,16 +93,6 @@
addRecSignal(GSN_TUP_ADD_ATTRREQ, &Dbtup::execTUP_ADD_ATTRREQ);
addRecSignal(GSN_TUP_COMMITREQ, &Dbtup::execTUP_COMMITREQ);
addRecSignal(GSN_TUP_ABORTREQ, &Dbtup::execTUP_ABORTREQ);
- addRecSignal(GSN_TUP_SRREQ, &Dbtup::execTUP_SRREQ);
- addRecSignal(GSN_TUP_PREPLCPREQ, &Dbtup::execTUP_PREPLCPREQ);
- addRecSignal(GSN_FSOPENCONF, &Dbtup::execFSOPENCONF);
- addRecSignal(GSN_FSOPENREF, &Dbtup::execFSOPENREF);
- addRecSignal(GSN_FSCLOSECONF, &Dbtup::execFSCLOSECONF);
- addRecSignal(GSN_FSCLOSEREF, &Dbtup::execFSCLOSEREF);
- addRecSignal(GSN_FSWRITECONF, &Dbtup::execFSWRITECONF);
- addRecSignal(GSN_FSWRITEREF, &Dbtup::execFSWRITEREF);
- addRecSignal(GSN_FSREADCONF, &Dbtup::execFSREADCONF);
- addRecSignal(GSN_FSREADREF, &Dbtup::execFSREADREF);
addRecSignal(GSN_NDB_STTOR, &Dbtup::execNDB_STTOR);
addRecSignal(GSN_READ_CONFIG_REQ, &Dbtup::execREAD_CONFIG_REQ, true);
addRecSignal(GSN_SET_VAR_REQ, &Dbtup::execSET_VAR_REQ);
@@ -158,15 +127,6 @@
sizeof(Attrbufrec),
cnoOfAttrbufrec);
- deallocRecord((void **)&checkpointInfo,"CheckpointInfo",
- sizeof(CheckpointInfo),
- cnoOfLcpRec);
-
- deallocRecord((void **)&diskBufferSegmentInfo,
- "DiskBufferSegmentInfo",
- sizeof(DiskBufferSegmentInfo),
- cnoOfConcurrentWriteOp);
-
deallocRecord((void **)&fragoperrec,"Fragoperrec",
sizeof(Fragoperrec),
cnoOfFragoprec);
@@ -179,15 +139,7 @@
sizeof(HostBuffer),
MAX_NODES);
- deallocRecord((void **)&localLogInfo,"LocalLogInfo",
- sizeof(LocalLogInfo),
- cnoOfParallellUndoFiles);
-
- deallocRecord((void **)&operationrec,"Operationrec",
- sizeof(Operationrec),
- cnoOfOprec);
-
- deallocRecord((void **)&page,"Page",
+ deallocRecord((void **)&cpage,"Page",
sizeof(Page),
cnoOfPage);
@@ -195,16 +147,6 @@
sizeof(PageRange),
cnoOfPageRangeRec);
- deallocRecord((void **)&pendingFileOpenInfo,
- "PendingFileOpenInfo",
- sizeof(PendingFileOpenInfo),
- cnoOfConcurrentOpenOp);
-
- deallocRecord((void **)&restartInfoRecord,
- "RestartInfoRecord",
- sizeof(RestartInfoRecord),
- cnoOfRestartInfoRec);
-
deallocRecord((void **)&tablerec,"Tablerec",
sizeof(Tablerec),
cnoOfTablerec);
@@ -213,338 +155,16 @@
sizeof(TableDescriptor),
cnoOfTabDescrRec);
- deallocRecord((void **)&undoPage,"UndoPage",
- sizeof(UndoPage),
- cnoOfUndoPage);
-
}//Dbtup::~Dbtup()
BLOCK_FUNCTIONS(Dbtup)
-/* **************************************************************** */
-/* ---------------------------------------------------------------- */
-/* ----- GENERAL SIGNAL MULTIPLEXER (FS + CONTINUEB) -------------- */
-/* ---------------------------------------------------------------- */
-/* **************************************************************** */
-void Dbtup::execFSCLOSECONF(Signal* signal)
-{
- PendingFileOpenInfoPtr pfoPtr;
- ljamEntry();
- pfoPtr.i = signal->theData[0];
- ptrCheckGuard(pfoPtr, cnoOfConcurrentOpenOp, pendingFileOpenInfo);
- switch (pfoPtr.p->pfoOpenType) {
- case LCP_DATA_FILE_CLOSE:
- {
- CheckpointInfoPtr ciPtr;
- ljam();
- ciPtr.i = pfoPtr.p->pfoCheckpointInfoP;
- ptrCheckGuard(ciPtr, cnoOfLcpRec, checkpointInfo);
- ciPtr.p->lcpDataFileHandle = RNIL;
- lcpClosedDataFileLab(signal, ciPtr);
- break;
- }
- case LCP_UNDO_FILE_CLOSE:
- {
- LocalLogInfoPtr lliPtr;
- ljam();
- lliPtr.i = pfoPtr.p->pfoCheckpointInfoP;
- ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo);
- lliPtr.p->lliUndoFileHandle = RNIL;
- lcpEndconfLab(signal);
- break;
- }
- case LCP_UNDO_FILE_READ:
- ljam();
- endExecUndoLogLab(signal, pfoPtr.p->pfoCheckpointInfoP);
- break;
- case LCP_DATA_FILE_READ:
- ljam();
- rfrClosedDataFileLab(signal, pfoPtr.p->pfoCheckpointInfoP);
- break;
- default:
- ndbrequire(false);
- break;
- }//switch
- releasePendingFileOpenInfoRecord(pfoPtr);
-}//Dbtup::execFSCLOSECONF()
-
-void Dbtup::execFSCLOSEREF(Signal* signal)
-{
- ljamEntry();
- ndbrequire(false);
-}//Dbtup::execFSCLOSEREF()
-
-void Dbtup::execFSOPENCONF(Signal* signal)
-{
- PendingFileOpenInfoPtr pfoPtr;
-
- ljamEntry();
- pfoPtr.i = signal->theData[0];
- Uint32 fileHandle = signal->theData[1];
- ptrCheckGuard(pfoPtr, cnoOfConcurrentOpenOp, pendingFileOpenInfo);
- switch (pfoPtr.p->pfoOpenType) {
- case LCP_DATA_FILE_READ:
- {
- RestartInfoRecordPtr riPtr;
- ljam();
- riPtr.i = pfoPtr.p->pfoRestartInfoP;
- ptrCheckGuard(riPtr, cnoOfRestartInfoRec, restartInfoRecord);
- riPtr.p->sriDataFileHandle = fileHandle;
- rfrReadRestartInfoLab(signal, riPtr);
- break;
- }
- case LCP_UNDO_FILE_READ:
- {
- RestartInfoRecordPtr riPtr;
- LocalLogInfoPtr lliPtr;
- DiskBufferSegmentInfoPtr dbsiPtr;
-
- ljam();
- riPtr.i = pfoPtr.p->pfoRestartInfoP;
- ptrCheckGuard(riPtr, cnoOfRestartInfoRec, restartInfoRecord);
- lliPtr.i = riPtr.p->sriLocalLogInfoP;
- ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo);
- lliPtr.p->lliUndoFileHandle = fileHandle;
- dbsiPtr.i = riPtr.p->sriDataBufferSegmentP;
- ptrCheckGuard(dbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo);
- rfrLoadDataPagesLab(signal, riPtr, dbsiPtr);
- break;
- }
- case LCP_DATA_FILE_WRITE_WITH_UNDO:
- {
- CheckpointInfoPtr ciPtr;
- LocalLogInfoPtr lliPtr;
-
- ljam();
- ciPtr.i = pfoPtr.p->pfoCheckpointInfoP;
- ptrCheckGuard(ciPtr, cnoOfLcpRec, checkpointInfo);
- lliPtr.i = ciPtr.p->lcpLocalLogInfoP;
- ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo);
- ciPtr.p->lcpDataFileHandle = fileHandle;
- if (lliPtr.p->lliUndoFileHandle != RNIL) {
- ljam();
- signal->theData[0] = ciPtr.p->lcpUserptr;
- signal->theData[1] = ciPtr.i;
- sendSignal(ciPtr.p->lcpBlockref, GSN_TUP_PREPLCPCONF, signal, 2, JBB);
- }//if
- break;
- }
- case LCP_DATA_FILE_WRITE:
- {
- CheckpointInfoPtr ciPtr;
-
- ljam();
- ciPtr.i = pfoPtr.p->pfoCheckpointInfoP;
- ptrCheckGuard(ciPtr, cnoOfLcpRec, checkpointInfo);
- ciPtr.p->lcpDataFileHandle = fileHandle;
- signal->theData[0] = ciPtr.p->lcpUserptr;
- signal->theData[1] = ciPtr.i;
- sendSignal(ciPtr.p->lcpBlockref, GSN_TUP_PREPLCPCONF, signal, 2, JBB);
- break;
- }
- case LCP_UNDO_FILE_WRITE:
- {
- CheckpointInfoPtr ciPtr;
- LocalLogInfoPtr lliPtr;
-
- ljam();
- ciPtr.i = pfoPtr.p->pfoCheckpointInfoP;
- ptrCheckGuard(ciPtr, cnoOfLcpRec, checkpointInfo);
- lliPtr.i = ciPtr.p->lcpLocalLogInfoP;
- ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo);
- lliPtr.p->lliUndoFileHandle = fileHandle;
- if (ciPtr.p->lcpDataFileHandle != RNIL) {
- ljam();
- signal->theData[0] = ciPtr.p->lcpUserptr;
- signal->theData[1] = ciPtr.i;
- sendSignal(ciPtr.p->lcpBlockref, GSN_TUP_PREPLCPCONF, signal, 2, JBB);
- }//if
- break;
- }
- default:
- ndbrequire(false);
- break;
- }//switch
- releasePendingFileOpenInfoRecord(pfoPtr);
-}//Dbtup::execFSOPENCONF()
-
-void Dbtup::execFSOPENREF(Signal* signal)
-{
- ljamEntry();
- ndbrequire(false);
-}//Dbtup::execFSOPENREF()
-
-void Dbtup::execFSREADCONF(Signal* signal)
-{
- DiskBufferSegmentInfoPtr dbsiPtr;
- ljamEntry();
- dbsiPtr.i = signal->theData[0];
- ptrCheckGuard(dbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo);
- switch (dbsiPtr.p->pdxOperation) {
- case CHECKPOINT_DATA_READ:
- {
- RestartInfoRecordPtr riPtr;
- ljam();
- riPtr.i = dbsiPtr.p->pdxRestartInfoP;
- ptrCheckGuard(riPtr, cnoOfRestartInfoRec, restartInfoRecord);
-/************************************************************/
-/* VERIFY THAT THE PAGES ARE CORRECT, HAVE A CORRECT */
-/* STATE AND A CORRECT PAGE ID. */
-/************************************************************/
- ndbrequire(dbsiPtr.p->pdxNumDataPages <= 16);
- for (Uint32 i = 0; i < dbsiPtr.p->pdxNumDataPages; i++) {
- PagePtr pagePtr;
- ljam();
- pagePtr.i = dbsiPtr.p->pdxDataPage[i];
- ptrCheckGuard(pagePtr, cnoOfPage, page);
- ndbrequire(pagePtr.p->pageWord[ZPAGE_STATE_POS] != 0);
- ndbrequire(pagePtr.p->pageWord[ZPAGE_STATE_POS] <= ZAC_MM_FREE_COPY);
- ndbrequire(pagePtr.p->pageWord[ZPAGE_FRAG_PAGE_ID_POS] == ((dbsiPtr.p->pdxFilePage - 1) + i));
- }//for
- rfrLoadDataPagesLab(signal, riPtr, dbsiPtr);
- break;
- }
- case CHECKPOINT_DATA_READ_PAGE_ZERO:
- {
- ljam();
- rfrInitRestartInfoLab(signal, dbsiPtr);
- break;
- }
- case CHECKPOINT_UNDO_READ:
- {
- LocalLogInfoPtr lliPtr;
- ljam();
- lliPtr.i = dbsiPtr.p->pdxCheckpointInfoP;
- ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo);
- xlcGetNextRecordLab(signal, dbsiPtr, lliPtr);
- break;
- }
- case CHECKPOINT_UNDO_READ_FIRST:
- ljam();
- rfrReadSecondUndoLogLab(signal, dbsiPtr);
- break;
- default:
- ndbrequire(false);
- break;
- }//switch
-}//Dbtup::execFSREADCONF()
-
-void Dbtup::execFSREADREF(Signal* signal)
-{
- ljamEntry();
- ndbrequire(false);
-}//Dbtup::execFSREADREF()
-
-void Dbtup::execFSWRITECONF(Signal* signal)
-{
- DiskBufferSegmentInfoPtr dbsiPtr;
-
- ljamEntry();
- dbsiPtr.i = signal->theData[0];
- ptrCheckGuard(dbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo);
- switch (dbsiPtr.p->pdxOperation) {
- case CHECKPOINT_DATA_WRITE:
- ljam();
- lcpSaveDataPageLab(signal, dbsiPtr.p->pdxCheckpointInfoP);
- break;
- case CHECKPOINT_DATA_WRITE_LAST:
- {
- CheckpointInfoPtr ciPtr;
- ljam();
- ciPtr.i = dbsiPtr.p->pdxCheckpointInfoP;
- ptrCheckGuard(ciPtr, cnoOfLcpRec, checkpointInfo);
- lcpFlushLogLab(signal, ciPtr);
- break;
- }
- case CHECKPOINT_DATA_WRITE_FLUSH:
- {
- ljam();
- Uint32 ciIndex = dbsiPtr.p->pdxCheckpointInfoP;
- freeDiskBufferSegmentRecord(signal, dbsiPtr);
- lcpCompletedLab(signal, ciIndex);
- break;
- }
- case CHECKPOINT_UNDO_WRITE_FLUSH:
- {
- ljam();
- Uint32 ciIndex = dbsiPtr.p->pdxCheckpointInfoP;
- freeDiskBufferSegmentRecord(signal, dbsiPtr);
- lcpFlushRestartInfoLab(signal, ciIndex);
- break;
- }
- case CHECKPOINT_UNDO_WRITE:
- ljam();
- freeDiskBufferSegmentRecord(signal, dbsiPtr);
- break;
- default:
- ndbrequire(false);
- break;
- }//switch
- return;
-}//Dbtup::execFSWRITECONF()
-
-void Dbtup::execFSWRITEREF(Signal* signal)
-{
- ljamEntry();
- ndbrequire(false);
-}//Dbtup::execFSWRITEREF()
-
void Dbtup::execCONTINUEB(Signal* signal)
{
ljamEntry();
Uint32 actionType = signal->theData[0];
Uint32 dataPtr = signal->theData[1];
switch (actionType) {
- case ZSTART_EXEC_UNDO_LOG:
- ljam();
- startExecUndoLogLab(signal, dataPtr);
- break;
- case ZCONT_SAVE_DP:
- ljam();
- lcpSaveDataPageLab(signal, dataPtr);
- break;
- case ZCONT_START_SAVE_CL:
- {
- CheckpointInfoPtr ciPtr;
-
- ljam();
- ciPtr.i = dataPtr;
- ptrCheckGuard(ciPtr, cnoOfLcpRec, checkpointInfo);
- lcpSaveCopyListLab(signal, ciPtr);
- break;
- }
- case ZCONT_EXECUTE_LC:
- {
- LocalLogInfoPtr lliPtr;
- DiskBufferSegmentInfoPtr dbsiPtr;
-
- ljam();
- lliPtr.i = dataPtr;
- ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo);
- dbsiPtr.i = lliPtr.p->lliUndoBufferSegmentP;
- ptrCheckGuard(dbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo);
- xlcGetNextRecordLab(signal, dbsiPtr, lliPtr);
- break;
- }
- case ZCONT_LOAD_DP:
- {
- DiskBufferSegmentInfoPtr dbsiPtr;
- RestartInfoRecordPtr riPtr;
-
- ljam();
- riPtr.i = dataPtr;
- ptrCheckGuard(riPtr, cnoOfRestartInfoRec, restartInfoRecord);
- dbsiPtr.i = riPtr.p->sriDataBufferSegmentP;
- ptrCheckGuard(dbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo);
- rfrLoadDataPagesLab(signal, riPtr, dbsiPtr);
- break;
- }
- case ZLOAD_BAL_LCP_TIMER:
- ljam();
- clblPageCounter = clblPagesPerTick;
- signal->theData[0] = ZLOAD_BAL_LCP_TIMER;
- sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 400, 1);
- break;
case ZINITIALISE_RECORDS:
ljam();
initialiseRecordsLab(signal, dataPtr,
@@ -581,6 +201,18 @@
ljam();
buildIndex(signal, dataPtr);
break;
+ case ZFREE_EXTENT:
+ ljam();
+
+ TablerecPtr tabPtr;
+ tabPtr.i= dataPtr;
+ FragrecordPtr fragPtr;
+ fragPtr.i= signal->theData[2];
+ ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
+ ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
+ drop_fragment_free_exent(signal, tabPtr, fragPtr, signal->theData[3]);
+ return;
+ break;
default:
ndbrequire(false);
break;
@@ -601,6 +233,9 @@
case ZSTARTPHASE1:
ljam();
CLEAR_ERROR_INSERT_VALUE;
+ ndbrequire((c_lqh= (Dblqh*)globalData.getBlock(DBLQH)) != 0);
+ ndbrequire((c_tsman= (Tsman*)globalData.getBlock(TSMAN)) != 0);
+ ndbrequire((c_lgman= (Lgman*)globalData.getBlock(LGMAN)) != 0);
cownref = calcTupBlockRef(0);
break;
default:
@@ -634,10 +269,8 @@
ndbrequire(p != 0);
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUP_FRAG, &cnoOfFragrec));
-
- ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUP_OP_RECS, &cnoOfOprec));
-
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUP_PAGE, &cnoOfPage));
+
Uint32 noOfTriggers= 0;
Uint32 tmp= 0;
@@ -657,28 +290,24 @@
c_buildIndexPool.setSize(c_noOfBuildIndexRec);
c_triggerPool.setSize(noOfTriggers);
+ c_extent_pool.setSize(100);
+ c_page_request_pool.setSize(100);
+
Uint32 nScanOp; // use TUX config for now
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_SCAN_OP, &nScanOp));
- c_scanOpPool.setSize(nScanOp);
+ c_scanOpPool.setSize(nScanOp + 1);
+
+ ScanOpPtr lcp;
+ ndbrequire(c_scanOpPool.seize(lcp));
+ c_lcp_scan_op= lcp.i;
initRecords();
czero = 0;
cminusOne = czero - 1;
clastBitMask = 1;
clastBitMask = clastBitMask << 31;
- cnoOfLocalLogInfo = 0;
- cnoFreeUndoSeg = 0;
initialiseRecordsLab(signal, 0, ref, senderData);
-
- clblPagesPerTick = 50;
- ndb_mgm_get_int_parameter(p, CFG_DB_LCP_DISC_PAGES_TUP_SR,
- &clblPagesPerTick);
-
- clblPagesPerTickAfterSr = 50;
- ndb_mgm_get_int_parameter(p, CFG_DB_LCP_DISC_PAGES_TUP,
- &clblPagesPerTickAfterSr);
-
}//Dbtup::execSIZEALT_REP()
void Dbtup::initRecords()
@@ -690,15 +319,6 @@
sizeof(Attrbufrec),
cnoOfAttrbufrec);
- checkpointInfo = (CheckpointInfo*)allocRecord("CheckpointInfo",
- sizeof(CheckpointInfo),
- cnoOfLcpRec);
-
- diskBufferSegmentInfo = (DiskBufferSegmentInfo*)
- allocRecord("DiskBufferSegmentInfo",
- sizeof(DiskBufferSegmentInfo),
- cnoOfConcurrentWriteOp);
-
fragoperrec = (Fragoperrec*)allocRecord("Fragoperrec",
sizeof(Fragoperrec),
cnoOfFragoprec);
@@ -716,38 +336,26 @@
sizeof(HostBuffer),
MAX_NODES);
- localLogInfo = (LocalLogInfo*)allocRecord("LocalLogInfo",
- sizeof(LocalLogInfo),
- cnoOfParallellUndoFiles);
-
- operationrec = (Operationrec*)allocRecord("Operationrec",
- sizeof(Operationrec),
- cnoOfOprec);
-
- page = (Page*)allocRecord("Page",
- sizeof(Page),
- cnoOfPage,
- false);
+ Uint32 tmp;
+ const ndb_mgm_configuration_iterator * p =
+ theConfiguration.getOwnConfigIterator();
+ ndbrequire(p != 0);
+ ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUP_OP_RECS, &tmp));
+ c_operation_pool.setSize(tmp);
+
+ cpage = (Page*)allocRecord("Page",
+ sizeof(Page),
+ cnoOfPage,
+ false);
pageRange = (PageRange*)allocRecord("PageRange",
sizeof(PageRange),
cnoOfPageRangeRec);
- pendingFileOpenInfo = (PendingFileOpenInfo*)
- allocRecord("PendingFileOpenInfo",
- sizeof(PendingFileOpenInfo),
- cnoOfConcurrentOpenOp);
-
- restartInfoRecord = (RestartInfoRecord*)
- allocRecord("RestartInfoRecord",
- sizeof(RestartInfoRecord),
- cnoOfRestartInfoRec);
-
-
tablerec = (Tablerec*)allocRecord("Tablerec",
sizeof(Tablerec),
cnoOfTablerec);
-
+
for (i = 0; i<cnoOfTablerec; i++) {
void * p = &tablerec[i];
new (p) Tablerec(c_triggerPool);
@@ -757,24 +365,6 @@
allocRecord("TableDescriptor",
sizeof(TableDescriptor),
cnoOfTabDescrRec);
-
- undoPage = (UndoPage*)allocRecord("UndoPage",
- sizeof(UndoPage),
- cnoOfUndoPage);
-
-
- // Initialize BAT for interface to file system
- NewVARIABLE* bat = allocateBat(3);
- bat[1].WA = &page->pageWord[0];
- bat[1].nrr = cnoOfPage;
- bat[1].ClusterSize = sizeof(Page);
- bat[1].bits.q = 13; /* 8192 words/page */
- bat[1].bits.v = 5;
- bat[2].WA = &undoPage->undoPageWord[0];
- bat[2].nrr = cnoOfUndoPage;
- bat[2].ClusterSize = sizeof(UndoPage);
- bat[2].bits.q = 13; /* 8192 words/page */
- bat[2].bits.v = 5;
}//Dbtup::initRecords()
void Dbtup::initialiseRecordsLab(Signal* signal, Uint32 switchData,
@@ -795,7 +385,6 @@
break;
case 3:
ljam();
- initializeUndoPage();
break;
case 4:
ljam();
@@ -803,7 +392,6 @@
break;
case 5:
ljam();
- initializeCheckpointInfoRec();
break;
case 6:
ljam();
@@ -823,11 +411,9 @@
break;
case 10:
ljam();
- initializeDiskBufferSegmentRecord();
break;
case 11:
ljam();
- initializeLocalLogInfo();
break;
case 12:
ljam();
@@ -835,11 +421,9 @@
break;
case 13:
ljam();
- initializePendingFileOpenInfoRecord();
break;
case 14:
ljam();
- initializeRestartInfoRec();
{
ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
@@ -890,8 +474,6 @@
/* PAGES PER TICK AFTER SYSTEM */
/* RESTART. */
/*****************************************/
- clblPagesPerTick = clblPagesPerTickAfterSr;
-
signal->theData[0] = ZREPORT_MEMORY_USAGE;
sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 2000, 1);
break;
@@ -905,9 +487,6 @@
void Dbtup::startphase3Lab(Signal* signal, Uint32 config1, Uint32 config2)
{
- clblPageCounter = clblPagesPerTick;
- signal->theData[0] = ZLOAD_BAL_LCP_TIMER;
- sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 100, 1);
}//Dbtup::startphase3Lab()
void Dbtup::initializeAttrbufrec()
@@ -926,35 +505,6 @@
cnoFreeAttrbufrec = cnoOfAttrbufrec;
}//Dbtup::initializeAttrbufrec()
-void Dbtup::initializeCheckpointInfoRec()
-{
- CheckpointInfoPtr checkpointInfoPtr;
- for (checkpointInfoPtr.i = 0;
- checkpointInfoPtr.i < cnoOfLcpRec; checkpointInfoPtr.i++) {
- ptrAss(checkpointInfoPtr, checkpointInfo);
- checkpointInfoPtr.p->lcpNextRec = checkpointInfoPtr.i + 1;
- }//for
- checkpointInfoPtr.i = cnoOfLcpRec - 1;
- ptrAss(checkpointInfoPtr, checkpointInfo);
- checkpointInfoPtr.p->lcpNextRec = RNIL;
- cfirstfreeLcp = 0;
-}//Dbtup::initializeCheckpointInfoRec()
-
-void Dbtup::initializeDiskBufferSegmentRecord()
-{
- DiskBufferSegmentInfoPtr diskBufferSegmentPtr;
- for (diskBufferSegmentPtr.i = 0;
- diskBufferSegmentPtr.i < cnoOfConcurrentWriteOp; diskBufferSegmentPtr.i++) {
- ptrAss(diskBufferSegmentPtr, diskBufferSegmentInfo);
- diskBufferSegmentPtr.p->pdxNextRec = diskBufferSegmentPtr.i + 1;
- diskBufferSegmentPtr.p->pdxBuffertype = NOT_INITIALIZED;
- }//for
- diskBufferSegmentPtr.i = cnoOfConcurrentWriteOp - 1;
- ptrAss(diskBufferSegmentPtr, diskBufferSegmentInfo);
- diskBufferSegmentPtr.p->pdxNextRec = RNIL;
- cfirstfreePdx = 0;
-}//Dbtup::initializeDiskBufferSegmentRecord()
-
void Dbtup::initializeFragoperrec()
{
FragoperrecPtr fragoperPtr;
@@ -975,9 +525,6 @@
refresh_watch_dog();
ptrAss(regFragPtr, fragrecord);
regFragPtr.p->nextfreefrag = regFragPtr.i + 1;
- regFragPtr.p->checkpointVersion = RNIL;
- regFragPtr.p->firstusedOprec = RNIL;
- regFragPtr.p->lastusedOprec = RNIL;
regFragPtr.p->fragStatus = IDLE;
}//for
regFragPtr.i = cnoOfFragrec - 1;
@@ -997,71 +544,12 @@
}//for
}//Dbtup::initializeHostBuffer()
-void Dbtup::initializeLocalLogInfo()
-{
- LocalLogInfoPtr localLogInfoPtr;
- for (localLogInfoPtr.i = 0;
- localLogInfoPtr.i < cnoOfParallellUndoFiles; localLogInfoPtr.i++) {
- ptrAss(localLogInfoPtr, localLogInfo);
- localLogInfoPtr.p->lliActiveLcp = 0;
- localLogInfoPtr.p->lliUndoFileHandle = RNIL;
- }//for
-}//Dbtup::initializeLocalLogInfo()
void Dbtup::initializeOperationrec()
{
- OperationrecPtr regOpPtr;
- for (regOpPtr.i = 0; regOpPtr.i < cnoOfOprec; regOpPtr.i++) {
- refresh_watch_dog();
- ptrAss(regOpPtr, operationrec);
- regOpPtr.p->firstAttrinbufrec = RNIL;
- regOpPtr.p->lastAttrinbufrec = RNIL;
- regOpPtr.p->prevOprecInList = RNIL;
- regOpPtr.p->nextOprecInList = regOpPtr.i + 1;
- regOpPtr.p->optype = ZREAD;
- regOpPtr.p->inFragList = ZFALSE;
- regOpPtr.p->inActiveOpList = ZFALSE;
-/* FOR ABORT HANDLING BEFORE ANY SUCCESSFUL OPERATION */
- regOpPtr.p->transstate = DISCONNECTED;
- regOpPtr.p->storedProcedureId = ZNIL;
- regOpPtr.p->prevActiveOp = RNIL;
- regOpPtr.p->nextActiveOp = RNIL;
- regOpPtr.p->tupVersion = ZNIL;
- regOpPtr.p->deleteInsertFlag = 0;
- }//for
- regOpPtr.i = cnoOfOprec - 1;
- ptrAss(regOpPtr, operationrec);
- regOpPtr.p->nextOprecInList = RNIL;
- cfirstfreeOprec = 0;
+ refresh_watch_dog();
}//Dbtup::initializeOperationrec()
-void Dbtup::initializePendingFileOpenInfoRecord()
-{
- PendingFileOpenInfoPtr pendingFileOpenInfoPtr;
- for (pendingFileOpenInfoPtr.i = 0;
- pendingFileOpenInfoPtr.i < cnoOfConcurrentOpenOp; pendingFileOpenInfoPtr.i++) {
- ptrAss(pendingFileOpenInfoPtr, pendingFileOpenInfo);
- pendingFileOpenInfoPtr.p->pfoNextRec = pendingFileOpenInfoPtr.i + 1;
- }//for
- pendingFileOpenInfoPtr.i = cnoOfConcurrentOpenOp - 1;
- ptrAss(pendingFileOpenInfoPtr, pendingFileOpenInfo);
- pendingFileOpenInfoPtr.p->pfoNextRec = RNIL;
- cfirstfreePfo = 0;
-}//Dbtup::initializePendingFileOpenInfoRecord()
-
-void Dbtup::initializeRestartInfoRec()
-{
- RestartInfoRecordPtr restartInfoPtr;
- for (restartInfoPtr.i = 0; restartInfoPtr.i < cnoOfRestartInfoRec; restartInfoPtr.i++) {
- ptrAss(restartInfoPtr, restartInfoRecord);
- restartInfoPtr.p->sriNextRec = restartInfoPtr.i + 1;
- }//for
- restartInfoPtr.i = cnoOfRestartInfoRec - 1;
- ptrAss(restartInfoPtr, restartInfoRecord);
- restartInfoPtr.p->sriNextRec = RNIL;
- cfirstfreeSri = 0;
-}//Dbtup::initializeRestartInfoRec()
-
void Dbtup::initializeTablerec()
{
TablerecPtr regTabPtr;
@@ -1076,7 +564,7 @@
void
Dbtup::initTab(Tablerec* const regTabPtr)
{
- for (Uint32 i = 0; i < (2 * MAX_FRAG_PER_NODE); i++) {
+ for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
regTabPtr->fragid[i] = RNIL;
regTabPtr->fragrec[i] = RNIL;
}//for
@@ -1085,22 +573,12 @@
regTabPtr->charsetArray = NULL;
regTabPtr->tabDescriptor = RNIL;
- regTabPtr->attributeGroupDescriptor = RNIL;
regTabPtr->readKeyArray = RNIL;
regTabPtr->checksumIndicator = false;
- regTabPtr->GCPIndicator = false;
- regTabPtr->noOfAttr = 0;
+ regTabPtr->m_no_of_attributes = 0;
regTabPtr->noOfKeyAttr = 0;
- regTabPtr->noOfNewAttr = 0;
- regTabPtr->noOfAttributeGroups = 0;
-
- regTabPtr->tupheadsize = 0;
- regTabPtr->tupNullIndex = 0;
- regTabPtr->tupNullWords = 0;
- regTabPtr->tupChecksumIndex = 0;
- regTabPtr->tupGCPIndex = 0;
regTabPtr->m_dropTable.tabUserPtr = RNIL;
regTabPtr->m_dropTable.tabUserRef = 0;
@@ -1139,24 +617,6 @@
freeTabDescr(0, cnoOfTabDescrRec);
}//Dbtup::initializeTabDescr()
-void Dbtup::initializeUndoPage()
-{
- UndoPagePtr undoPagep;
- for (undoPagep.i = 0;
- undoPagep.i < cnoOfUndoPage;
- undoPagep.i = undoPagep.i + ZUB_SEGMENT_SIZE) {
- refresh_watch_dog();
- ptrAss(undoPagep, undoPage);
- undoPagep.p->undoPageWord[ZPAGE_NEXT_POS] = undoPagep.i +
- ZUB_SEGMENT_SIZE;
- cnoFreeUndoSeg++;
- }//for
- undoPagep.i = cnoOfUndoPage - ZUB_SEGMENT_SIZE;
- ptrAss(undoPagep, undoPage);
- undoPagep.p->undoPageWord[ZPAGE_NEXT_POS] = RNIL;
- cfirstfreeUndoSeg = 0;
-}//Dbtup::initializeUndoPage()
-
/* ---------------------------------------------------------------- */
/* ---------------------------------------------------------------- */
/* --------------- CONNECT/DISCONNECT MODULE ---------------------- */
@@ -1168,27 +628,36 @@
ljamEntry();
Uint32 userPtr = signal->theData[0];
BlockReference userRef = signal->theData[1];
- if (cfirstfreeOprec != RNIL) {
- ljam();
- seizeOpRec(regOperPtr);
- } else {
+ if (!c_operation_pool.seize(regOperPtr))
+ {
ljam();
signal->theData[0] = userPtr;
signal->theData[1] = ZGET_OPREC_ERROR;
sendSignal(userRef, GSN_TUPSEIZEREF, signal, 2, JBB);
return;
}//if
- regOperPtr.p->optype = ZREAD;
- initOpConnection(regOperPtr.p, 0);
+
+ new (regOperPtr.p) Operationrec();
+ regOperPtr.p->firstAttrinbufrec = RNIL;
+ regOperPtr.p->lastAttrinbufrec = RNIL;
+ regOperPtr.p->op_struct.op_type = ZREAD;
+ regOperPtr.p->op_struct.in_active_list = false;
+ set_trans_state(regOperPtr.p, TRANS_DISCONNECTED);
+ regOperPtr.p->storedProcedureId = ZNIL;
+ regOperPtr.p->prevActiveOp = RNIL;
+ regOperPtr.p->nextActiveOp = RNIL;
+ regOperPtr.p->tupVersion = ZNIL;
+ regOperPtr.p->op_struct.delete_insert_flag = false;
+
+ initOpConnection(regOperPtr.p);
regOperPtr.p->userpointer = userPtr;
- regOperPtr.p->userblockref = userRef;
signal->theData[0] = regOperPtr.p->userpointer;
signal->theData[1] = regOperPtr.i;
sendSignal(userRef, GSN_TUPSEIZECONF, signal, 2, JBB);
return;
}//Dbtup::execTUPSEIZEREQ()
-#define printFragment(t){ for(Uint32 i = 0; i < (2 * MAX_FRAG_PER_NODE);i++){\
+#define printFragment(t){ for(Uint32 i = 0; i < MAX_FRAG_PER_NODE;i++){\
ndbout_c("table = %d fragid[%d] = %d fragrec[%d] = %d", \
t.i, t.p->fragid[i], i, t.p->fragrec[i]); }}
@@ -1197,138 +666,20 @@
OperationrecPtr regOperPtr;
ljamEntry();
regOperPtr.i = signal->theData[0];
- ptrCheckGuard(regOperPtr, cnoOfOprec, operationrec);
- regOperPtr.p->transstate = DISCONNECTED;
- regOperPtr.p->nextOprecInList = cfirstfreeOprec;
- cfirstfreeOprec = regOperPtr.i;
+ c_operation_pool.getPtr(regOperPtr);
+ set_trans_state(regOperPtr.p, TRANS_DISCONNECTED);
+ c_operation_pool.release(regOperPtr);
+
signal->theData[0] = regOperPtr.p->userpointer;
- sendSignal(regOperPtr.p->userblockref, GSN_TUPRELEASECONF, signal, 1, JBB);
+ sendSignal(DBLQH_REF, GSN_TUPRELEASECONF, signal, 1, JBB);
return;
}//Dbtup::execTUPRELEASEREQ()
-/* ---------------------------------------------------------------- */
-/* ---------------- FREE_DISK_BUFFER_SEGMENT_RECORD --------------- */
-/* ---------------------------------------------------------------- */
-/* */
-/* THIS ROUTINE DEALLOCATES A DISK SEGMENT AND ITS DATA PAGES */
-/* */
-/* INPUT: DISK_BUFFER_SEGMENT_PTR THE DISK SEGMENT */
-/* */
-/* -----------------------------------------------------------------*/
-void Dbtup::freeDiskBufferSegmentRecord(Signal* signal, DiskBufferSegmentInfoPtr dbsiPtr)
-{
- switch (dbsiPtr.p->pdxBuffertype) {
- case UNDO_PAGES:
- case COMMON_AREA_PAGES:
- ljam();
- freeUndoBufferPages(signal, dbsiPtr);
- break;
- case UNDO_RESTART_PAGES:
- ljam();
- dbsiPtr.p->pdxDataPage[0] = dbsiPtr.p->pdxUndoBufferSet[0];
- freeUndoBufferPages(signal, dbsiPtr);
- dbsiPtr.p->pdxDataPage[0] = dbsiPtr.p->pdxUndoBufferSet[1];
- freeUndoBufferPages(signal, dbsiPtr);
- break;
- default:
- ndbrequire(false);
- break;
- }//switch
- releaseDiskBufferSegmentRecord(dbsiPtr);
-}//Dbtup::freeDiskBufferSegmentRecord()
-
-/* ---------------------------------------------------------------- */
-/* -------------------- FREE_UNDO_BUFFER_PAGES -------------------- */
-/* ---------------------------------------------------------------- */
-/* */
-/* THIS ROUTINE DEALLOCATES A SEGMENT OF UNDO PAGES */
-/* */
-/* INPUT: UNDO_PAGEP POINTER TO FIRST PAGE IN SEGMENT */
-/* */
-/* -----------------------------------------------------------------*/
-void Dbtup::freeUndoBufferPages(Signal* signal, DiskBufferSegmentInfoPtr dbsiPtr)
-{
- UndoPagePtr undoPagePtr;
-
- undoPagePtr.i = dbsiPtr.p->pdxDataPage[0];
- ptrCheckGuard(undoPagePtr, cnoOfUndoPage, undoPage);
- undoPagePtr.p->undoPageWord[ZPAGE_NEXT_POS] = cfirstfreeUndoSeg;
- cfirstfreeUndoSeg = undoPagePtr.i;
- cnoFreeUndoSeg++;
- if (cnoFreeUndoSeg == ZMIN_PAGE_LIMIT_TUP_COMMITREQ) {
- EXECUTE_DIRECT(DBLQH, GSN_TUP_COM_UNBLOCK, signal, 1);
- ljamEntry();
- }//if
-}//Dbtup::freeUndoBufferPages()
-
-void Dbtup::releaseCheckpointInfoRecord(CheckpointInfoPtr ciPtr)
-{
- ciPtr.p->lcpNextRec = cfirstfreeLcp;
- cfirstfreeLcp = ciPtr.i;
-}//Dbtup::releaseCheckpointInfoRecord()
-
-void Dbtup::releaseDiskBufferSegmentRecord(DiskBufferSegmentInfoPtr dbsiPtr)
-{
- dbsiPtr.p->pdxNextRec = cfirstfreePdx;
- cfirstfreePdx = dbsiPtr.i;
-}//Dbtup::releaseDiskBufferSegmentRecord()
-
void Dbtup::releaseFragrec(FragrecordPtr regFragPtr)
{
regFragPtr.p->nextfreefrag = cfirstfreefrag;
cfirstfreefrag = regFragPtr.i;
}//Dbtup::releaseFragrec()
-
-void Dbtup::releasePendingFileOpenInfoRecord(PendingFileOpenInfoPtr pfoPtr)
-{
- pfoPtr.p->pfoNextRec = cfirstfreePfo;
- cfirstfreePfo = pfoPtr.i;
-}//Dbtup::releasePendingFileOpenInfoRecord()
-
-void Dbtup::releaseRestartInfoRecord(RestartInfoRecordPtr riPtr)
-{
- riPtr.p->sriNextRec = cfirstfreeSri;
- cfirstfreeSri = riPtr.i;
-}//Dbtup::releaseRestartInfoRecord()
-
-void Dbtup::seizeCheckpointInfoRecord(CheckpointInfoPtr& ciPtr)
-{
- ciPtr.i = cfirstfreeLcp;
- ptrCheckGuard(ciPtr, cnoOfLcpRec, checkpointInfo);
- cfirstfreeLcp = ciPtr.p->lcpNextRec;
- ciPtr.p->lcpNextRec = RNIL;
-}//Dbtup::seizeCheckpointInfoRecord()
-
-void Dbtup::seizeDiskBufferSegmentRecord(DiskBufferSegmentInfoPtr& dbsiPtr)
-{
- dbsiPtr.i = cfirstfreePdx;
- ptrCheckGuard(dbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo);
- cfirstfreePdx = dbsiPtr.p->pdxNextRec;
- dbsiPtr.p->pdxNextRec = RNIL;
- for (Uint32 i = 0; i < 16; i++) {
- dbsiPtr.p->pdxDataPage[i] = RNIL;
- }//for
- dbsiPtr.p->pdxCheckpointInfoP = RNIL;
- dbsiPtr.p->pdxRestartInfoP = RNIL;
- dbsiPtr.p->pdxLocalLogInfoP = RNIL;
- dbsiPtr.p->pdxFilePage = 0;
- dbsiPtr.p->pdxNumDataPages = 0;
-}//Dbtup::seizeDiskBufferSegmentRecord()
-
-void Dbtup::seizeOpRec(OperationrecPtr& regOperPtr)
-{
- regOperPtr.i = cfirstfreeOprec;
- ptrCheckGuard(regOperPtr, cnoOfOprec, operationrec);
- cfirstfreeOprec = regOperPtr.p->nextOprecInList;
-}//Dbtup::seizeOpRec()
-
-void Dbtup::seizePendingFileOpenInfoRecord(PendingFileOpenInfoPtr& pfoiPtr)
-{
- pfoiPtr.i = cfirstfreePfo;
- ptrCheckGuard(pfoiPtr, cnoOfConcurrentOpenOp, pendingFileOpenInfo);
- cfirstfreePfo = pfoiPtr.p->pfoNextRec;
- pfoiPtr.p->pfoNextRec = RNIL;
-}//Dbtup::seizePendingFileOpenInfoRecord()
void Dbtup::execSET_VAR_REQ(Signal* signal)
{
| Thread |
|---|
| • bk commit into 5.1 tree (joreland:1.1949) | jonas.oreland | 25 Jul |