Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2006-10-26 18:58:51+02:00, jonas@eel.(none) +4 -0
Merge joreland@stripped:/home/bk/mysql-5.1-wl2325-5.0
into eel.(none):/home/jonas/src/mysql-5.1-wl2325-5.0
MERGE: 1.2049.1.22
storage/ndb/src/common/debugger/EventLogger.cpp@stripped, 2006-10-26 18:58:44+02:00, jonas@eel.(none) +0 -0
Auto merged
MERGE: 1.27.1.1
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp@stripped, 2006-10-26 18:58:44+02:00, jonas@eel.(none) +0 -0
Auto merged
MERGE: 1.47.1.6
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp@stripped, 2006-10-26 18:58:45+02:00, jonas@eel.(none) +0 -0
Auto merged
MERGE: 1.86.1.3
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp@stripped, 2006-10-26 18:58:45+02:00, jonas@eel.(none) +0 -0
Auto merged
MERGE: 1.100.1.2
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: jonas
# Host: eel.(none)
# Root: /home/jonas/src/mysql-5.1-wl2325-5.0/RESYNC
--- 1.28/storage/ndb/src/common/debugger/EventLogger.cpp 2006-10-26 18:58:57 +02:00
+++ 1.29/storage/ndb/src/common/debugger/EventLogger.cpp 2006-10-26 18:58:57 +02:00
@@ -872,7 +872,7 @@
ROW(NDBStopCompleted, LogLevel::llStartUp, 1, Logger::LL_INFO ),
ROW(NDBStopForced, LogLevel::llStartUp, 1, Logger::LL_ALERT ),
ROW(NDBStopAborted, LogLevel::llStartUp, 1, Logger::LL_INFO ),
- ROW(StartREDOLog, LogLevel::llStartUp, 10, Logger::LL_INFO ),
+ ROW(StartREDOLog, LogLevel::llStartUp, 4, Logger::LL_INFO ),
ROW(StartLog, LogLevel::llStartUp, 10, Logger::LL_INFO ),
ROW(UNDORecordsExecuted, LogLevel::llStartUp, 15, Logger::LL_INFO ),
ROW(StartReport, LogLevel::llStartUp, 4, Logger::LL_INFO ),
--- 1.48/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2006-10-26 18:58:57 +02:00
+++ 1.49/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2006-10-26 18:58:57 +02:00
@@ -626,22 +626,48 @@
ndbrequire(c_copyGCISlave.m_copyReason == CopyGCIReq::IDLE);
ndbrequire(c_copyGCISlave.m_expectedNextWord == tstart);
ndbrequire(reason != CopyGCIReq::IDLE);
-
+ bool isdone = (tstart + CopyGCIReq::DATA_SIZE) >= Sysfile::SYSFILE_SIZE32;
+
+ if (ERROR_INSERTED(7177))
+ {
+ jam();
+
+ if (signal->getLength() == 3)
+ {
+ jam();
+ goto done;
+ }
+ }
+
arrGuard(tstart + CopyGCIReq::DATA_SIZE, sizeof(sysfileData)/4);
for(Uint32 i = 0; i<CopyGCIReq::DATA_SIZE; i++)
cdata[tstart+i] = copyGCI->data[i];
- if ((tstart + CopyGCIReq::DATA_SIZE) >= Sysfile::SYSFILE_SIZE32) {
+ if (ERROR_INSERTED(7177) && isMaster() && isdone)
+ {
+ sendSignalWithDelay(reference(), GSN_COPY_GCIREQ, signal, 1000, 3);
+ return;
+ }
+
+done:
+ if (isdone)
+ {
jam();
c_copyGCISlave.m_expectedNextWord = 0;
- } else {
+ }
+ else
+ {
jam();
c_copyGCISlave.m_expectedNextWord += CopyGCIReq::DATA_SIZE;
return;
- }//if
-
- memcpy(sysfileData, cdata, sizeof(sysfileData));
+ }
+ if (cmasterdihref != reference())
+ {
+ jam();
+ memcpy(sysfileData, cdata, sizeof(sysfileData));
+ }
+
c_copyGCISlave.m_copyReason = reason;
c_copyGCISlave.m_senderRef = signal->senderBlockRef();
c_copyGCISlave.m_senderData = copyGCI->anyData;
@@ -1273,9 +1299,9 @@
if (isMaster()) {
jam();
systemRestartTakeOverLab(signal);
- if (anyActiveTakeOver() && false) {
+ if (anyActiveTakeOver())
+ {
jam();
- ndbout_c("1 - anyActiveTakeOver == true");
return;
}
}
@@ -2253,6 +2279,8 @@
// NOT ACTIVE NODES THAT HAVE NOT YET BEEN TAKEN OVER NEEDS TAKE OVER
// IMMEDIATELY. IF WE ARE ALIVE WE TAKE OVER OUR OWN NODE.
/*-------------------------------------------------------------------*/
+ infoEvent("Take over of node %d started",
+ nodePtr.i);
startTakeOver(signal, RNIL, nodePtr.i, nodePtr.i);
}//if
break;
@@ -2365,6 +2393,12 @@
*--------------------------------------------------------------------*/
Uint32 takeOverNode = Sysfile::getTakeOverNode(startNodeId,
SYSFILE->takeOver);
+ if(takeOverNode == 0){
+ jam();
+ warningEvent("Bug in take-over code restarting");
+ takeOverNode = startNodeId;
+ }
+
startTakeOver(signal, RNIL, startNodeId, takeOverNode);
break;
}
@@ -2518,7 +2552,14 @@
Sysfile::setTakeOverNode(takeOverPtr.p->toFailedNode, SYSFILE->takeOver,
startNode);
takeOverPtr.p->toMasterStatus = TakeOverRecord::TO_START_COPY;
-
+
+ if (getNodeState().getSystemRestartInProgress())
+ {
+ jam();
+ checkToCopy();
+ checkToCopyCompleted(signal);
+ return;
+ }
cstartGcpNow = true;
}//Dbdih::startTakeOver()
@@ -3266,6 +3307,18 @@
signal->theData[1] = takeOverPtr.p->toStartingNode;
sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
+ if (getNodeState().getSystemRestartInProgress())
+ {
+ jam();
+ infoEvent("Take over of node %d complete", takeOverPtr.p->toStartingNode);
+ setNodeActiveStatus(takeOverPtr.p->toStartingNode, Sysfile::NS_Active);
+ takeOverPtr.p->toMasterStatus = TakeOverRecord::WAIT_LCP;
+ takeOverCompleted(takeOverPtr.p->toStartingNode);
+ checkToCopy();
+ checkToCopyCompleted(signal);
+ return;
+ }
+
c_lcpState.immediateLcpStart = true;
takeOverPtr.p->toMasterStatus = TakeOverRecord::WAIT_LCP;
@@ -3372,16 +3425,12 @@
}//if
endTakeOver(takeOverPtr.i);
- ndbout_c("2 - endTakeOver");
if (cstartPhase == ZNDB_SPH4) {
jam();
- ndbrequire(false);
if (anyActiveTakeOver()) {
jam();
- ndbout_c("4 - anyActiveTakeOver == true");
return;
}//if
- ndbout_c("5 - anyActiveTakeOver == false -> ndbsttorry10Lab");
ndbsttorry10Lab(signal, __LINE__);
return;
}//if
@@ -8379,14 +8428,33 @@
resetReplicaLcp(replicaPtr.p, newestRestorableGCI);
- /* -----------------------------------------------------------------
- * LINK THE REPLICA INTO THE STORED REPLICA LIST. WE WILL USE THIS
- * NODE AS A STORED REPLICA.
- * WE MUST FIRST LINK IT OUT OF THE LIST OF OLD STORED REPLICAS.
- * --------------------------------------------------------------- */
- removeOldStoredReplica(fragPtr, replicaPtr);
- linkStoredReplica(fragPtr, replicaPtr);
-
+ /**
+ * Make sure we can also find REDO for restoring replica...
+ */
+ {
+ CreateReplicaRecord createReplica;
+ ConstPtr<ReplicaRecord> constReplicaPtr;
+ constReplicaPtr.i = replicaPtr.i;
+ constReplicaPtr.p = replicaPtr.p;
+ if (tabPtr.p->storedTable == 0 ||
+ setup_create_replica(fragPtr,
+ &createReplica, constReplicaPtr))
+ {
+ jam();
+ removeOldStoredReplica(fragPtr, replicaPtr);
+ linkStoredReplica(fragPtr, replicaPtr);
+ }
+ else
+ {
+ jam();
+ infoEvent("Forcing take-over of node %d due to unsufficient REDO"
+ " for table %d fragment: %d",
+ nodePtr.i, tabPtr.i, i);
+
+ setNodeActiveStatus(nodePtr.i,
+ Sysfile::NS_NotActive_NotTakenOver);
+ }
+ }
}
default:
jam();
@@ -9507,6 +9575,7 @@
FragmentstorePtr fragPtr;
getFragstore(tabPtr.p, fragId, fragPtr);
checkKeepGci(tabPtr, fragId, fragPtr.p, fragPtr.p->storedReplicas);
+ checkKeepGci(tabPtr, fragId, fragPtr.p, fragPtr.p->oldStoredReplicas);
fragId++;
if (fragId >= tabPtr.p->totalfragments) {
jam();
@@ -9692,73 +9761,84 @@
nodePtr.i = replicaPtr.p->procNode;
ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord);
- if (replicaPtr.p->lcpOngoingFlag &&
- replicaPtr.p->lcpIdStarted < lcpId) {
- jam();
- //-------------------------------------------------------------------
- // We have found a replica on a node that performs local checkpoint
- // that is alive and that have not yet been started.
- //-------------------------------------------------------------------
-
- if (nodePtr.p->noOfStartedChkpt < 2) {
- jam();
- /**
- * Send LCP_FRAG_ORD to LQH
- */
-
- /**
- * Mark the replica so with lcpIdStarted == true
- */
- replicaPtr.p->lcpIdStarted = lcpId;
-
- Uint32 i = nodePtr.p->noOfStartedChkpt;
- nodePtr.p->startedChkpt[i].tableId = tabPtr.i;
- nodePtr.p->startedChkpt[i].fragId = curr.fragmentId;
- nodePtr.p->startedChkpt[i].replicaPtr = replicaPtr.i;
- nodePtr.p->noOfStartedChkpt = i + 1;
-
- sendLCP_FRAG_ORD(signal, nodePtr.p->startedChkpt[i]);
- } else if (nodePtr.p->noOfQueuedChkpt < 2) {
- jam();
- /**
- * Put LCP_FRAG_ORD "in queue"
- */
-
- /**
- * Mark the replica so with lcpIdStarted == true
- */
- replicaPtr.p->lcpIdStarted = lcpId;
+ if (c_lcpState.m_participatingLQH.get(nodePtr.i))
+ {
+ if (replicaPtr.p->lcpOngoingFlag &&
+ replicaPtr.p->lcpIdStarted < lcpId)
+ {
+ jam();
+ //-------------------------------------------------------------------
+ // We have found a replica on a node that performs local checkpoint
+ // that is alive and that have not yet been started.
+ //-------------------------------------------------------------------
- Uint32 i = nodePtr.p->noOfQueuedChkpt;
- nodePtr.p->queuedChkpt[i].tableId = tabPtr.i;
- nodePtr.p->queuedChkpt[i].fragId = curr.fragmentId;
- nodePtr.p->queuedChkpt[i].replicaPtr = replicaPtr.i;
- nodePtr.p->noOfQueuedChkpt = i + 1;
- } else {
- jam();
+ if (nodePtr.p->noOfStartedChkpt < 2)
+ {
+ jam();
+ /**
+ * Send LCP_FRAG_ORD to LQH
+ */
+
+ /**
+ * Mark the replica so with lcpIdStarted == true
+ */
+ replicaPtr.p->lcpIdStarted = lcpId;
- if(save){
+ Uint32 i = nodePtr.p->noOfStartedChkpt;
+ nodePtr.p->startedChkpt[i].tableId = tabPtr.i;
+ nodePtr.p->startedChkpt[i].fragId = curr.fragmentId;
+ nodePtr.p->startedChkpt[i].replicaPtr = replicaPtr.i;
+ nodePtr.p->noOfStartedChkpt = i + 1;
+
+ sendLCP_FRAG_ORD(signal, nodePtr.p->startedChkpt[i]);
+ }
+ else if (nodePtr.p->noOfQueuedChkpt < 2)
+ {
+ jam();
/**
- * Stop increasing value on first that was "full"
+ * Put LCP_FRAG_ORD "in queue"
*/
- c_lcpState.currentFragment = curr;
- save = false;
- }
-
- busyNodes.set(nodePtr.i);
- if(busyNodes.count() == lcpNodes){
+
/**
- * There were no possibility to start the local checkpoint
- * and it was not possible to queue it up. In this case we
- * stop the start of local checkpoints until the nodes with a
- * backlog have performed more checkpoints. We will return and
- * will not continue the process of starting any more checkpoints.
+ * Mark the replica so with lcpIdStarted == true
*/
- return;
+ replicaPtr.p->lcpIdStarted = lcpId;
+
+ Uint32 i = nodePtr.p->noOfQueuedChkpt;
+ nodePtr.p->queuedChkpt[i].tableId = tabPtr.i;
+ nodePtr.p->queuedChkpt[i].fragId = curr.fragmentId;
+ nodePtr.p->queuedChkpt[i].replicaPtr = replicaPtr.i;
+ nodePtr.p->noOfQueuedChkpt = i + 1;
+ }
+ else
+ {
+ jam();
+
+ if(save)
+ {
+ /**
+ * Stop increasing value on first that was "full"
+ */
+ c_lcpState.currentFragment = curr;
+ save = false;
+ }
+
+ busyNodes.set(nodePtr.i);
+ if(busyNodes.count() == lcpNodes)
+ {
+ /**
+ * There were no possibility to start the local checkpoint
+ * and it was not possible to queue it up. In this case we
+ * stop the start of local checkpoints until the nodes with a
+ * backlog have performed more checkpoints. We will return and
+ * will not continue the process of starting any more checkpoints.
+ */
+ return;
+ }//if
}//if
- }//if
- }
- }//while
+ }
+ }//while
+ }
curr.fragmentId++;
if (curr.fragmentId >= tabPtr.p->totalfragments) {
jam();
@@ -12484,16 +12564,75 @@
/* CHECKPOINT WITHOUT NEEDING ANY EXTRA LOGGING FACILITIES.*/
/* A MAXIMUM OF FOUR NODES IS RETRIEVED. */
/*************************************************************************/
+bool
+Dbdih::setup_create_replica(FragmentstorePtr fragPtr,
+ CreateReplicaRecord* createReplicaPtrP,
+ ConstPtr<ReplicaRecord> replicaPtr)
+{
+ createReplicaPtrP->dataNodeId = replicaPtr.p->procNode;
+ createReplicaPtrP->replicaRec = replicaPtr.i;
+
+ /* ----------------------------------------------------------------- */
+ /* WE NEED TO SEARCH FOR A PROPER LOCAL CHECKPOINT TO USE FOR THE */
+ /* SYSTEM RESTART. */
+ /* ----------------------------------------------------------------- */
+ Uint32 startGci;
+ Uint32 startLcpNo;
+ Uint32 stopGci = SYSFILE->newestRestorableGCI;
+ bool result = findStartGci(replicaPtr,
+ stopGci,
+ startGci,
+ startLcpNo);
+ if (!result)
+ {
+ jam();
+ /* --------------------------------------------------------------- */
+ /* WE COULD NOT FIND ANY LOCAL CHECKPOINT. THE FRAGMENT THUS DO NOT*/
+ /* CONTAIN ANY VALID LOCAL CHECKPOINT. IT DOES HOWEVER CONTAIN A */
+ /* VALID FRAGMENT LOG. THUS BY FIRST CREATING THE FRAGMENT AND THEN*/
+ /* EXECUTING THE FRAGMENT LOG WE CAN CREATE THE FRAGMENT AS */
+ /* DESIRED. THIS SHOULD ONLY OCCUR AFTER CREATING A FRAGMENT. */
+ /* */
+ /* TO INDICATE THAT NO LOCAL CHECKPOINT IS TO BE USED WE SET THE */
+ /* LOCAL CHECKPOINT TO ZNIL. */
+ /* --------------------------------------------------------------- */
+ createReplicaPtrP->lcpNo = ZNIL;
+ }
+ else
+ {
+ jam();
+ /* --------------------------------------------------------------- */
+ /* WE FOUND A PROPER LOCAL CHECKPOINT TO RESTART FROM. */
+ /* SET LOCAL CHECKPOINT ID AND LOCAL CHECKPOINT NUMBER. */
+ /* --------------------------------------------------------------- */
+ createReplicaPtrP->lcpNo = startLcpNo;
+ arrGuard(startLcpNo, MAX_LCP_STORED);
+ createReplicaPtrP->createLcpId = replicaPtr.p->lcpId[startLcpNo];
+ }//if
+
+
+ /* ----------------------------------------------------------------- */
+ /* WE HAVE EITHER FOUND A LOCAL CHECKPOINT OR WE ARE PLANNING TO */
+ /* EXECUTE THE LOG FROM THE INITIAL CREATION OF THE TABLE. IN BOTH */
+ /* CASES WE NEED TO FIND A SET OF LOGS THAT CAN EXECUTE SUCH THAT */
+ /* WE RECOVER TO THE SYSTEM RESTART GLOBAL CHECKPOINT. */
+ /* -_--------------------------------------------------------------- */
+ return findLogNodes(createReplicaPtrP, fragPtr, startGci, stopGci);
+}
+
void Dbdih::searchStoredReplicas(FragmentstorePtr fragPtr)
{
Uint32 nextReplicaPtrI;
- ConstPtr<ReplicaRecord> replicaPtr;
+ Ptr<ReplicaRecord> replicaPtr;
replicaPtr.i = fragPtr.p->storedReplicas;
while (replicaPtr.i != RNIL) {
jam();
ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
nextReplicaPtrI = replicaPtr.p->nextReplica;
+ ConstPtr<ReplicaRecord> constReplicaPtr;
+ constReplicaPtr.i = replicaPtr.i;
+ constReplicaPtr.p = replicaPtr.p;
NodeRecordPtr nodePtr;
nodePtr.i = replicaPtr.p->procNode;
ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord);
@@ -12513,69 +12652,13 @@
createReplicaPtr.i = cnoOfCreateReplicas;
ptrCheckGuard(createReplicaPtr, 4, createReplicaRecord);
cnoOfCreateReplicas++;
- createReplicaPtr.p->dataNodeId = replicaPtr.p->procNode;
- createReplicaPtr.p->replicaRec = replicaPtr.i;
- /* ----------------------------------------------------------------- */
- /* WE NEED TO SEARCH FOR A PROPER LOCAL CHECKPOINT TO USE FOR THE */
- /* SYSTEM RESTART. */
- /* ----------------------------------------------------------------- */
- Uint32 startGci;
- Uint32 startLcpNo;
- Uint32 stopGci = SYSFILE->newestRestorableGCI;
- bool result = findStartGci(replicaPtr,
- stopGci,
- startGci,
- startLcpNo);
- if (!result) {
- jam();
- /* --------------------------------------------------------------- */
- /* WE COULD NOT FIND ANY LOCAL CHECKPOINT. THE FRAGMENT THUS DO NOT*/
- /* CONTAIN ANY VALID LOCAL CHECKPOINT. IT DOES HOWEVER CONTAIN A */
- /* VALID FRAGMENT LOG. THUS BY FIRST CREATING THE FRAGMENT AND THEN*/
- /* EXECUTING THE FRAGMENT LOG WE CAN CREATE THE FRAGMENT AS */
- /* DESIRED. THIS SHOULD ONLY OCCUR AFTER CREATING A FRAGMENT. */
- /* */
- /* TO INDICATE THAT NO LOCAL CHECKPOINT IS TO BE USED WE SET THE */
- /* LOCAL CHECKPOINT TO ZNIL. */
- /* --------------------------------------------------------------- */
- createReplicaPtr.p->lcpNo = ZNIL;
- } else {
- jam();
- /* --------------------------------------------------------------- */
- /* WE FOUND A PROPER LOCAL CHECKPOINT TO RESTART FROM. */
- /* SET LOCAL CHECKPOINT ID AND LOCAL CHECKPOINT NUMBER. */
- /* --------------------------------------------------------------- */
- createReplicaPtr.p->lcpNo = startLcpNo;
- arrGuard(startLcpNo, MAX_LCP_STORED);
- createReplicaPtr.p->createLcpId = replicaPtr.p->lcpId[startLcpNo];
- }//if
-
- if(ERROR_INSERTED(7073) || ERROR_INSERTED(7074)){
- jam();
- nodePtr.p->nodeStatus = NodeRecord::DEAD;
- }
-
- /* ----------------------------------------------------------------- */
- /* WE HAVE EITHER FOUND A LOCAL CHECKPOINT OR WE ARE PLANNING TO */
- /* EXECUTE THE LOG FROM THE INITIAL CREATION OF THE TABLE. IN BOTH */
- /* CASES WE NEED TO FIND A SET OF LOGS THAT CAN EXECUTE SUCH THAT */
- /* WE RECOVER TO THE SYSTEM RESTART GLOBAL CHECKPOINT. */
- /* -_--------------------------------------------------------------- */
- if (!findLogNodes(createReplicaPtr.p, fragPtr, startGci, stopGci)) {
- jam();
- /* --------------------------------------------------------------- */
- /* WE WERE NOT ABLE TO FIND ANY WAY OF RESTORING THIS REPLICA. */
- /* THIS IS A POTENTIAL SYSTEM ERROR. */
- /* --------------------------------------------------------------- */
- cnoOfCreateReplicas--;
- return;
- }//if
-
- if(ERROR_INSERTED(7073) || ERROR_INSERTED(7074)){
- jam();
- nodePtr.p->nodeStatus = NodeRecord::ALIVE;
- }
+ /**
+ * Should have been checked in resetReplicaSr
+ */
+ ndbrequire(setup_create_replica(fragPtr,
+ createReplicaPtr.p,
+ constReplicaPtr));
break;
}
default:
--- 1.87/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2006-10-26 18:58:57 +02:00
+++ 1.88/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2006-10-26 18:58:57 +02:00
@@ -4923,6 +4923,8 @@
ptrCheckGuard(nextHashptr, ctcConnectrecFileSize, tcConnectionrec);
nextHashptr.p->prevHashRec = prevHashptr.i;
}//if
+
+ regTcPtr->prevHashRec = regTcPtr->nextHashRec = RNIL;
}//Dblqh::deleteTransidHash()
/* --------------------------------------------------------------------------
@@ -7375,15 +7377,15 @@
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes = 0;
closeScanLab(signal);
+ } else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
+ jam();
+ closeScanLab(signal);
+ return;
} else if (scanptr.p->check_scan_batch_completed() &&
scanptr.p->scanLockHold != ZTRUE) {
jam();
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
sendScanFragConf(signal, ZFALSE);
- } else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
- jam();
- closeScanLab(signal);
- return;
} else {
jam();
/*
@@ -7591,6 +7593,7 @@
ZNIL);
tcConnectptr.p->save1 = 4;
tcConnectptr.p->primKeyLen = keyLen + 4; // hard coded in execKEYINFO
+ tcConnectptr.p->applRef = scanFragReq->resultRef;
errorCode = initScanrec(scanFragReq);
if (errorCode != ZOK) {
jam();
@@ -9144,6 +9147,7 @@
tcConnectptr.p->copyCountWords = 0;
tcConnectptr.p->tcOprec = tcConnectptr.i;
tcConnectptr.p->schemaVersion = scanptr.p->scanSchemaVersion;
+ tcConnectptr.p->applRef = 0;
scanptr.p->scanState = ScanRecord::WAIT_ACC_COPY;
AccScanReq * req = (AccScanReq*)&signal->theData[0];
req->senderData = scanptr.i;
@@ -18351,9 +18355,224 @@
}//if
}//Dblqh::writeNextLog()
+bool
+Dblqh::validate_filter(Signal* signal)
+{
+ Uint32 * start = signal->theData + 1;
+ Uint32 * end = signal->theData + signal->getLength();
+ if (start == end)
+ {
+ infoEvent("No filter specified, not listing...");
+ return false;
+ }
+
+ while(start < end)
+ {
+ switch(* start){
+ case 0: // Table
+ case 1: // API Node
+ case 3: // TC Node
+ start += 2;
+ break;
+ case 2: // Transid
+ start += 3;
+ break;
+ default:
+ infoEvent("Invalid filter op: 0x%x pos: %d",
+ * start,
+ start - (signal->theData + 1));
+ return false;
+ }
+ }
+
+ if (start != end)
+ {
+ infoEvent("Invalid filter, unexpected end");
+ return false;
+ }
+
+ return true;
+}
+
+bool
+Dblqh::match_and_print(Signal* signal, Ptr<TcConnectionrec> tcRec)
+{
+ Uint32 len = signal->getLength();
+ Uint32* start = signal->theData + 3;
+ Uint32* end = signal->theData + len;
+ while (start < end)
+ {
+ switch(* start){
+ case 0:
+ if (tcRec.p->tableref != * (start + 1))
+ return false;
+ start += 2;
+ break;
+ case 1:
+ if (refToNode(tcRec.p->applRef) != * (start + 1))
+ return false;
+ start += 2;
+ break;
+ case 2:
+ if (tcRec.p->transid[0] != * (start + 1) ||
+ tcRec.p->transid[1] != * (start + 2))
+ return false;
+ start += 3;
+ break;
+ case 3:
+ if (refToNode(tcRec.p->tcBlockref) != * (start + 1))
+ return false;
+ start += 2;
+ break;
+ default:
+ ndbassert(false);
+ return false;
+ }
+ }
+
+ if (start != end)
+ {
+ ndbassert(false);
+ return false;
+ }
+
+ /**
+ * Do print
+ */
+ Uint32 *temp = signal->theData + 25;
+ memcpy(temp, signal->theData, 4 * len);
+
+ char state[20];
+ const char* op = "<Unknown>";
+ if (tcRec.p->tcScanRec != RNIL)
+ {
+ ScanRecordPtr sp;
+ sp.i = tcRec.p->tcScanRec;
+ c_scanRecordPool.getPtr(sp);
+
+ if (sp.p->scanLockMode)
+ op = "SCAN-EX";
+ else if(sp.p->scanLockHold)
+ op = "SCAN-SH";
+ else
+ op = "SCAN";
+
+ switch(sp.p->scanState){
+ case ScanRecord::WAIT_NEXT_SCAN:
+ BaseString::snprintf(state, sizeof(state), "WaitNextScan");
+ break;
+ case ScanRecord::IN_QUEUE:
+ BaseString::snprintf(state, sizeof(state), "InQueue");
+ break;
+ case ScanRecord::SCAN_FREE:
+ case ScanRecord::WAIT_STORED_PROC_COPY:
+ case ScanRecord::WAIT_STORED_PROC_SCAN:
+ case ScanRecord::WAIT_NEXT_SCAN_COPY:
+ case ScanRecord::WAIT_DELETE_STORED_PROC_ID_SCAN:
+ case ScanRecord::WAIT_DELETE_STORED_PROC_ID_COPY:
+ case ScanRecord::WAIT_ACC_COPY:
+ case ScanRecord::WAIT_ACC_SCAN:
+ case ScanRecord::WAIT_SCAN_NEXTREQ:
+ case ScanRecord::WAIT_CLOSE_SCAN:
+ case ScanRecord::WAIT_CLOSE_COPY:
+ case ScanRecord::WAIT_RELEASE_LOCK:
+ case ScanRecord::WAIT_TUPKEY_COPY:
+ case ScanRecord::WAIT_LQHKEY_COPY:
+ BaseString::snprintf(state, sizeof(state), "%u", sp.p->scanState);
+ }
+ }
+ else
+ {
+ switch(tcRec.p->operation){
+ case ZREAD:
+ if (tcRec.p->lockType)
+ op = "READ-EX";
+ else if(!tcRec.p->dirtyOp)
+ op = "READ-SH";
+ else
+ op = "READ";
+ break;
+ case ZINSERT: op = "INSERT"; break;
+ case ZUPDATE: op = "UPDATE"; break;
+ case ZDELETE: op = "DELETE"; break;
+ case ZWRITE: op = "WRITE"; break;
+ }
+
+ switch(tcRec.p->transactionState){
+ case TcConnectionrec::IDLE:
+ case TcConnectionrec::WAIT_ACC:
+ BaseString::snprintf(state, sizeof(state), "In lock queue");
+ break;
+ case TcConnectionrec::WAIT_TUPKEYINFO:
+ case TcConnectionrec::WAIT_ATTR:
+ BaseString::snprintf(state, sizeof(state), "WaitData");
+ break;
+ case TcConnectionrec::WAIT_TUP:
+ BaseString::snprintf(state, sizeof(state), "Running");
+ break;
+ case TcConnectionrec::PREPARED:
+ BaseString::snprintf(state, sizeof(state), "Prepared");
+ break;
+ case TcConnectionrec::COMMITTED:
+ BaseString::snprintf(state, sizeof(state), "Committed");
+ break;
+ case TcConnectionrec::STOPPED:
+ case TcConnectionrec::LOG_QUEUED:
+ case TcConnectionrec::LOG_COMMIT_WRITTEN_WAIT_SIGNAL:
+ case TcConnectionrec::LOG_COMMIT_QUEUED_WAIT_SIGNAL:
+ case TcConnectionrec::COMMIT_STOPPED:
+ case TcConnectionrec::LOG_COMMIT_QUEUED:
+ case TcConnectionrec::COMMIT_QUEUED:
+ case TcConnectionrec::WAIT_ACC_ABORT:
+ case TcConnectionrec::ABORT_QUEUED:
+ case TcConnectionrec::ABORT_STOPPED:
+ case TcConnectionrec::WAIT_AI_AFTER_ABORT:
+ case TcConnectionrec::LOG_ABORT_QUEUED:
+ case TcConnectionrec::WAIT_TUP_TO_ABORT:
+ case TcConnectionrec::WAIT_SCAN_AI:
+ case TcConnectionrec::SCAN_STATE_USED:
+ case TcConnectionrec::SCAN_FIRST_STOPPED:
+ case TcConnectionrec::SCAN_CHECK_STOPPED:
+ case TcConnectionrec::SCAN_STOPPED:
+ case TcConnectionrec::SCAN_RELEASE_STOPPED:
+ case TcConnectionrec::SCAN_CLOSE_STOPPED:
+ case TcConnectionrec::COPY_CLOSE_STOPPED:
+ case TcConnectionrec::COPY_FIRST_STOPPED:
+ case TcConnectionrec::COPY_STOPPED:
+ case TcConnectionrec::SCAN_TUPKEY:
+ case TcConnectionrec::COPY_TUPKEY:
+ case TcConnectionrec::TC_NOT_CONNECTED:
+ case TcConnectionrec::PREPARED_RECEIVED_COMMIT:
+ case TcConnectionrec::LOG_COMMIT_WRITTEN:
+ BaseString::snprintf(state, sizeof(state), "%u",
+ tcRec.p->transactionState);
+ }
+ }
+
+ char buf[100];
+ BaseString::snprintf(buf, sizeof(buf),
+ "OP[%u]: Tab: %d frag: %d TC: %u API: %d(0x%x)"
+ "transid: 0x%x 0x%x op: %s state: %s",
+ tcRec.i,
+ tcRec.p->tableref,
+ tcRec.p->fragmentid,
+ refToNode(tcRec.p->tcBlockref),
+ refToNode(tcRec.p->applRef),
+ refToBlock(tcRec.p->applRef),
+ tcRec.p->transid[0], tcRec.p->transid[1],
+ op,
+ state);
+
+ infoEvent(buf);
+
+ memcpy(signal->theData, temp, 4*len);
+ return true;
+}
+
void
Dblqh::execDUMP_STATE_ORD(Signal* signal)
{
+ jamEntry();
DumpStateOrd * const dumpState = (DumpStateOrd *)&signal->theData[0];
if(dumpState->args[0] == DumpStateOrd::CommitAckMarkersSize){
infoEvent("LQH: m_commitAckMarkerPool: %d free size: %d",
@@ -18436,7 +18655,7 @@
ScanRecordPtr sp;
sp.i = recordNo;
- c_scanRecordPool.getPtr(scanptr);
+ c_scanRecordPool.getPtr(sp);
if (sp.p->scanState != ScanRecord::SCAN_FREE){
dumpState->args[0] = DumpStateOrd::LqhDumpOneScanRec;
dumpState->args[1] = recordNo;
@@ -18757,6 +18976,183 @@
ndbrequire(arg != 2308);
}
+ if (arg == 2350)
+ {
+ jam();
+ Uint32 len = signal->getLength() - 1;
+ if (len + 3 > 25)
+ {
+ jam();
+ infoEvent("Too long filter");
+ return;
+ }
+ if (validate_filter(signal))
+ {
+ jam();
+ memmove(signal->theData + 3, signal->theData + 1, 4 * len);
+ signal->theData[0] = 2351;
+ signal->theData[1] = 0; // Bucket
+ signal->theData[2] = RNIL; // Record
+ sendSignal(reference(), GSN_DUMP_STATE_ORD, signal, len + 3, JBB);
+
+ infoEvent("Starting dump of operations");
+ }
+ return;
+ }
+
+ if (arg == 2351)
+ {
+ jam();
+ Uint32 bucket = signal->theData[1];
+ Uint32 record = signal->theData[2];
+ Uint32 len = signal->getLength();
+ TcConnectionrecPtr tcRec;
+ if (record != RNIL)
+ {
+ jam();
+ /**
+ * Check that record is still in use...
+ */
+ tcRec.i = record;
+ ptrCheckGuard(tcRec, ttcConnectrecFileSize, regTcConnectionrec);
+
+ Uint32 hashIndex = (tcRec.p->transid[0] ^ tcRec.p->tcOprec) & 1023;
+ if (hashIndex != bucket)
+ {
+ jam();
+ record = RNIL;
+ }
+ else
+ {
+ jam();
+ if (tcRec.p->nextHashRec == RNIL &&
+ tcRec.p->prevHashRec == RNIL &&
+ ctransidHash[hashIndex] != record)
+ {
+ jam();
+ record = RNIL;
+ }
+ }
+
+ if (record == RNIL)
+ {
+ jam();
+ signal->theData[2] = RNIL;
+ sendSignal(reference(), GSN_DUMP_STATE_ORD, signal,
+ signal->getLength(), JBB);
+ return;
+ }
+ }
+ else if ((record = ctransidHash[bucket]) == RNIL)
+ {
+ jam();
+ bucket++;
+ if (bucket < 1024)
+ {
+ jam();
+ signal->theData[1] = bucket;
+ signal->theData[2] = RNIL;
+ sendSignal(reference(), GSN_DUMP_STATE_ORD, signal,
+ signal->getLength(), JBB);
+ }
+ else
+ {
+ jam();
+ infoEvent("End of operation dump");
+ }
+
+ return;
+ }
+ else
+ {
+ jam();
+ tcRec.i = record;
+ ptrCheckGuard(tcRec, ttcConnectrecFileSize, regTcConnectionrec);
+ }
+
+ for (Uint32 i = 0; i<32; i++)
+ {
+ jam();
+ bool print = match_and_print(signal, tcRec);
+
+ tcRec.i = tcRec.p->nextHashRec;
+ if (tcRec.i == RNIL || print)
+ {
+ jam();
+ break;
+ }
+
+ ptrCheckGuard(tcRec, ttcConnectrecFileSize, regTcConnectionrec);
+ }
+
+ if (tcRec.i == RNIL)
+ {
+ jam();
+ bucket++;
+ if (bucket < 1024)
+ {
+ jam();
+ signal->theData[1] = bucket;
+ signal->theData[2] = RNIL;
+ sendSignal(reference(), GSN_DUMP_STATE_ORD, signal, len, JBB);
+ }
+ else
+ {
+ jam();
+ infoEvent("End of operation dump");
+ }
+
+ return;
+ }
+ else
+ {
+ jam();
+ signal->theData[2] = tcRec.i;
+ sendSignalWithDelay(reference(), GSN_DUMP_STATE_ORD, signal, 200, len);
+ return;
+ }
+ }
+
+ if (arg == 2352 && signal->getLength() == 2)
+ {
+ jam();
+ Uint32 i;
+ Uint32 opNo = signal->theData[1];
+ TcConnectionrecPtr tcRec;
+ if (opNo < ttcConnectrecFileSize)
+ {
+ jam();
+ tcRec.i = opNo;
+ ptrCheckGuard(tcRec, ttcConnectrecFileSize, regTcConnectionrec);
+
+ Uint32 keyLen = tcRec.p->primKeyLen;
+ BaseString key;
+ for(i = 0; i<keyLen && i < 4; i++)
+ {
+ jam();
+ key.appfmt("0x%x ", tcRec.p->tupkeyData[i]);
+ }
+
+ if (keyLen > 4)
+ {
+ jam();
+ tcConnectptr = tcRec;
+ sendKeyinfoAcc(signal, 4);
+ for (i = 4; i<keyLen; i++)
+ {
+ jam();
+ key.appfmt("0x%x ", signal->theData[i]);
+ }
+ }
+
+ char buf[100];
+ BaseString::snprintf(buf, sizeof(buf),
+ "OP[%u]: transid: 0x%x 0x%x key: %s",
+ tcRec.i,
+ tcRec.p->transid[0], tcRec.p->transid[1], key.c_str());
+ infoEvent(buf);
+ }
+ }
}//Dblqh::execDUMP_STATE_ORD()
void Dblqh::execSET_VAR_REQ(Signal* signal)
--- 1.101/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2006-10-26 18:58:57 +02:00
+++ 1.102/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2006-10-26 18:58:57 +02:00
@@ -6330,6 +6330,18 @@
break;
case CS_START_SCAN:{
jam();
+
+ /*
+ We are waiting for application to continue the transaction. In this
+ particular state we will use the application timeout parameter rather
+ than the shorter Deadlock detection timeout.
+ */
+ if (c_appl_timeout_value == 0 ||
+ (ctcTimer - getApiConTimer(apiConnectptr.i)) <= c_appl_timeout_value) {
+ jam();
+ return;
+ }//if
+
ScanRecordPtr scanPtr;
scanPtr.i = apiConnectptr.p->apiScanRec;
ptrCheckGuard(scanPtr, cscanrecFileSize, scanRecord);
@@ -9847,6 +9859,17 @@
conf->requestInfo = op_count | ScanTabConf::EndOfData;
releaseScanResources(scanPtr);
}
+ else
+ {
+ if (scanPtr.p->m_running_scan_frags.isEmpty())
+ {
+ jam();
+ /**
+ * All scan frags delivered...waiting for API
+ */
+ setApiConTimer(apiConnectptr.i, ctcTimer, __LINE__);
+ }
+ }
if(4 + 3 * op_count > 25){
jam();
@@ -10600,7 +10623,9 @@
void
Dbtc::execDUMP_STATE_ORD(Signal* signal)
{
+ jamEntry();
DumpStateOrd * const dumpState = (DumpStateOrd *)&signal->theData[0];
+ Uint32 arg = signal->theData[0];
if(signal->theData[0] == DumpStateOrd::CommitAckMarkersSize){
infoEvent("TC: m_commitAckMarkerPool: %d free size: %d",
m_commitAckMarkerPool.getNoOfFree(),
@@ -10927,7 +10952,241 @@
sendSignalWithDelay(cownref, GSN_SYSTEM_ERROR, signal, 300, 1);
return;
}
+
+ if (arg == 2550)
+ {
+ jam();
+ Uint32 len = signal->getLength() - 1;
+ if (len + 2 > 25)
+ {
+ jam();
+ infoEvent("Too long filter");
+ return;
+ }
+ if (validate_filter(signal))
+ {
+ jam();
+ memmove(signal->theData + 2, signal->theData + 1, 4 * len);
+ signal->theData[0] = 2551;
+ signal->theData[1] = 0; // record
+ sendSignal(reference(), GSN_DUMP_STATE_ORD, signal, len + 2, JBB);
+
+ infoEvent("Starting dump of transactions");
+ }
+ return;
+ }
+
+ if (arg == 2551)
+ {
+ jam();
+ Uint32 record = signal->theData[1];
+ Uint32 len = signal->getLength();
+ ndbassert(len > 1);
+
+ ApiConnectRecordPtr ap;
+ ap.i = record;
+ ptrAss(ap, apiConnectRecord);
+
+ bool print = false;
+ for (Uint32 i = 0; i<32; i++)
+ {
+ jam();
+ print = match_and_print(signal, ap);
+
+ ap.i++;
+ if (ap.i == capiConnectFilesize || print)
+ {
+ jam();
+ break;
+ }
+
+ ptrAss(ap, apiConnectRecord);
+ }
+
+ if (ap.i == capiConnectFilesize)
+ {
+ jam();
+ infoEvent("End of transaction dump");
+ return;
+ }
+
+ signal->theData[1] = ap.i;
+ if (print)
+ {
+ jam();
+ sendSignalWithDelay(reference(), GSN_DUMP_STATE_ORD, signal, 200, len);
+ }
+ else
+ {
+ jam();
+ sendSignal(reference(), GSN_DUMP_STATE_ORD, signal, len, JBB);
+ }
+ return;
+ }
}//Dbtc::execDUMP_STATE_ORD()
+
+bool
+Dbtc::validate_filter(Signal* signal)
+{
+ Uint32 * start = signal->theData + 1;
+ Uint32 * end = signal->theData + signal->getLength();
+ if (start == end)
+ {
+ infoEvent("No filter specified, not listing...");
+ return false;
+ }
+
+ while(start < end)
+ {
+ switch(* start){
+ case 1: // API Node
+ case 4: // Inactive time
+ start += 2;
+ break;
+ case 2: // Transid
+ start += 3;
+ break;
+ default:
+ infoEvent("Invalid filter op: 0x%x pos: %d",
+ * start,
+ start - (signal->theData + 1));
+ return false;
+ }
+ }
+
+ if (start != end)
+ {
+ infoEvent("Invalid filter, unexpected end");
+ return false;
+ }
+
+ return true;
+}
+
+bool
+Dbtc::match_and_print(Signal* signal, ApiConnectRecordPtr apiPtr)
+{
+ Uint32 conState = apiPtr.p->apiConnectstate;
+ if (conState == CS_CONNECTED ||
+ conState == CS_DISCONNECTED ||
+ conState == CS_RESTART)
+ return false;
+
+ Uint32 len = signal->getLength();
+ Uint32* start = signal->theData + 2;
+ Uint32* end = signal->theData + len;
+ Uint32 apiTimer = getApiConTimer(apiPtr.i);
+ while (start < end)
+ {
+ jam();
+ switch(* start){
+ case 1:
+ jam();
+ if (refToNode(apiPtr.p->ndbapiBlockref) != * (start + 1))
+ return false;
+ start += 2;
+ break;
+ case 2:
+ jam();
+ if (apiPtr.p->transid[0] != * (start + 1) ||
+ apiPtr.p->transid[1] != * (start + 2))
+ return false;
+ start += 3;
+ break;
+ case 4:{
+ jam();
+ if (apiTimer == 0 || ((ctcTimer - apiTimer) / 100) < * (start + 1))
+ return false;
+ start += 2;
+ break;
+ }
+ default:
+ ndbassert(false);
+ return false;
+ }
+ }
+
+ if (start != end)
+ {
+ ndbassert(false);
+ return false;
+ }
+
+ /**
+ * Do print
+ */
+ Uint32 *temp = signal->theData + 25;
+ memcpy(temp, signal->theData, 4 * len);
+
+ char state[10];
+ const char *stateptr = "";
+
+ switch(apiPtr.p->apiConnectstate){
+ case CS_STARTED:
+ stateptr = "Prepared";
+ break;
+ case CS_RECEIVING:
+ case CS_REC_COMMITTING:
+ case CS_START_COMMITTING:
+ stateptr = "Running";
+ break;
+ case CS_COMMITTING:
+ stateptr = "Committing";
+ break;
+ case CS_COMPLETING:
+ stateptr = "Completing";
+ break;
+ case CS_PREPARE_TO_COMMIT:
+ stateptr = "Prepare to commit";
+ break;
+ case CS_COMMIT_SENT:
+ stateptr = "Commit sent";
+ break;
+ case CS_COMPLETE_SENT:
+ stateptr = "Complete sent";
+ break;
+ case CS_ABORTING:
+ stateptr = "Aborting";
+ break;
+ case CS_START_SCAN:
+ stateptr = "Scanning";
+ break;
+ case CS_WAIT_ABORT_CONF:
+ case CS_WAIT_COMMIT_CONF:
+ case CS_WAIT_COMPLETE_CONF:
+ case CS_FAIL_PREPARED:
+ case CS_FAIL_COMMITTING:
+ case CS_FAIL_COMMITTED:
+ case CS_REC_PREPARING:
+ case CS_START_PREPARING:
+ case CS_PREPARED:
+ case CS_RESTART:
+ case CS_FAIL_ABORTED:
+ case CS_DISCONNECTED:
+ default:
+ BaseString::snprintf(state, sizeof(state),
+ "%u", apiPtr.p->apiConnectstate);
+ stateptr = state;
+ break;
+ }
+
+ char buf[100];
+ BaseString::snprintf(buf, sizeof(buf),
+ "TRX[%u]: API: %d(0x%x)"
+ "transid: 0x%x 0x%x inactive: %u(%d) state: %s",
+ apiPtr.i,
+ refToNode(apiPtr.p->ndbapiBlockref),
+ refToBlock(apiPtr.p->ndbapiBlockref),
+ apiPtr.p->transid[0],
+ apiPtr.p->transid[1],
+ apiTimer ? (ctcTimer - apiTimer) / 100 : 0,
+ c_apiConTimer_line[apiPtr.i],
+ stateptr);
+ infoEvent(buf);
+
+ memcpy(signal->theData, temp, 4*len);
+ return true;
+}
void Dbtc::execSET_VAR_REQ(Signal* signal)
{
| Thread |
|---|
| • bk commit into 5.1 tree (jonas:1.2051) | jonas | 26 Oct |