#At file:///export/space/pekka/ndb/version/my51-wl4391/
2764 Pekka Nousiainen 2008-09-16
wl#4391 20.diff
User-BACKUP. Single-worker version.
modified:
storage/ndb/src/kernel/blocks/backup/Backup.cpp
storage/ndb/src/kernel/blocks/backup/Backup.hpp
storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp
storage/ndb/src/mgmsrv/MgmtSrvr.cpp
=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2008-09-12 12:55:29 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2008-09-16 18:29:47 +0000
@@ -250,7 +250,7 @@ Backup::execCONTINUEB(Signal* signal)
Uint32 * dst;
if (!filePtr.p->operation.dataBuffer.getWritePtr(&dst, sz))
{
- sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 4);
+ sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 4);
return;
}
@@ -288,7 +288,7 @@ Backup::execCONTINUEB(Signal* signal)
signal->theData[1] = ptr_I;
signal->theData[2] = tabPtr_I;
signal->theData[3] = fragPtr_I;
- sendSignal(BACKUP_REF, GSN_CONTINUEB, signal, 4, JBB);
+ sendSignal(reference(), GSN_CONTINUEB, signal, 4, JBB);
return;
}
case BackupContinueB::START_FILE_THREAD:
@@ -365,7 +365,7 @@ Backup::execCONTINUEB(Signal* signal)
if (ERROR_INSERTED(10039))
{
jam();
- sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 300,
+ sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 300,
signal->getLength());
return;
}
@@ -374,9 +374,29 @@ Backup::execCONTINUEB(Signal* signal)
jam();
CLEAR_ERROR_INSERT_VALUE;
ndbout_c("Resuming backup");
- memmove(signal->theData, signal->theData + 1,
+
+ Uint32 filePtr_I = Tdata1;
+ BackupFilePtr filePtr;
+ c_backupFilePool.getPtr(filePtr, filePtr_I);
+ BackupRecordPtr ptr;
+ c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
+ TablePtr tabPtr;
+ findTable(ptr, tabPtr, filePtr.p->tableId);
+ FragmentPtr fragPtr;
+ tabPtr.p->fragments.getPtr(fragPtr, filePtr.p->fragmentNo);
+
+ BlockReference lqhRef = 0;
+ if (ptr.p->is_lcp()) {
+ lqhRef = calcInstanceBlockRef(DBLQH);
+ } else {
+ const Uint32 instanceKey = fragPtr.p->lqhInstanceKey;
+ ndbrequire(instanceKey != 0);
+ lqhRef = numberToRef(DBLQH, instanceKey, getOwnNodeId());
+ }
+
+ memmove(signal->theData, signal->theData + 2,
4*ScanFragNextReq::SignalLength);
- BlockReference lqhRef = calcInstanceBlockRef(DBLQH);
+
sendSignal(lqhRef, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB);
return ;
@@ -412,7 +432,7 @@ Backup::execBACKUP_LOCK_TAB_CONF(Signal
signal->theData[1] = ptr.i;
signal->theData[2] = tabPtr.i;
signal->theData[3] = 0; // Start from first fragment of next table
- sendSignal(BACKUP_REF, GSN_CONTINUEB, signal, 4, JBB);
+ sendSignal(reference(), GSN_CONTINUEB, signal, 4, JBB);
return;
}
case BackupLockTab::GET_TABINFO_CONF:
@@ -496,7 +516,7 @@ Backup::execDUMP_STATE_ORD(Signal* signa
BackupReq * req = (BackupReq*)signal->getDataPtrSend();
req->senderData = 23;
req->backupDataLen = 0;
- sendSignal(BACKUP_REF, GSN_BACKUP_REQ,signal,BackupReq::SignalLength, JBB);
+ sendSignal(reference(), GSN_BACKUP_REQ,signal,BackupReq::SignalLength, JBB);
startTime = NdbTick_CurrentMillisecond();
return;
}
@@ -1415,7 +1435,8 @@ Backup::sendDefineBackupReq(Signal *sign
ptr.p->masterData.gsn = GSN_DEFINE_BACKUP_REQ;
ptr.p->masterData.sendCounter = ptr.p->nodes;
- NodeReceiverGroup rg(BACKUP, ptr.p->nodes);
+ BlockNumber backupBlockNo = numberToBlock(BACKUP, instance());
+ NodeReceiverGroup rg(backupBlockNo, ptr.p->nodes);
sendSignal(rg, GSN_DEFINE_BACKUP_REQ, signal,
DefineBackupReq::SignalLength, JBB);
@@ -1774,7 +1795,8 @@ Backup::sendStartBackup(Signal* signal,
*/
ptr.p->masterData.gsn = GSN_START_BACKUP_REQ;
ptr.p->masterData.sendCounter = ptr.p->nodes;
- NodeReceiverGroup rg(BACKUP, ptr.p->nodes);
+ BlockNumber backupBlockNo = numberToBlock(BACKUP, instance());
+ NodeReceiverGroup rg(backupBlockNo, ptr.p->nodes);
sendSignal(rg, GSN_START_BACKUP_REQ, signal,
StartBackupReq::SignalLength, JBB);
}
@@ -1977,7 +1999,7 @@ Backup::nextFragment(Signal* signal, Bac
req->count = 0;
ptr.p->masterData.sendCounter++;
- const BlockReference ref = numberToRef(BACKUP, nodeId);
+ const BlockReference ref = numberToRef(BACKUP, instance(), nodeId);
sendSignal(ref, GSN_BACKUP_FRAGMENT_REQ, signal,
BackupFragmentReq::SignalLength, JBB);
}//if
@@ -2077,7 +2099,8 @@ Backup::execBACKUP_FRAGMENT_CONF(Signal*
rep->noOfTableRowsHigh = (Uint32)(tabPtr.p->noOfRecords >> 32);
rep->noOfFragmentRowsLow = (Uint32)(noOfRecords & 0xFFFFFFFF);
rep->noOfFragmentRowsHigh = (Uint32)(noOfRecords >> 32);
- NodeReceiverGroup rg(BACKUP, ptr.p->nodes);
+ BlockNumber backupBlockNo = numberToBlock(BACKUP, instance());
+ NodeReceiverGroup rg(backupBlockNo, ptr.p->nodes);
sendSignal(rg, GSN_BACKUP_FRAGMENT_COMPLETE_REP, signal,
BackupFragmentCompleteRep::SignalLength, JBB);
}
@@ -2248,7 +2271,7 @@ Backup::sendDropTrig(Signal* signal, Bac
signal->theData[1] = ptr.i;
signal->theData[2] = tabPtr.i;
signal->theData[3] = 0;
- sendSignal(BACKUP_REF, GSN_CONTINUEB, signal, 4, JBB);
+ sendSignal(reference(), GSN_CONTINUEB, signal, 4, JBB);
}
else
{
@@ -2387,7 +2410,8 @@ Backup::sendStopBackup(Signal* signal, B
ptr.p->masterData.gsn = GSN_STOP_BACKUP_REQ;
ptr.p->masterData.sendCounter = ptr.p->nodes;
- NodeReceiverGroup rg(BACKUP, ptr.p->nodes);
+ BlockNumber backupBlockNo = numberToBlock(BACKUP, instance());
+ NodeReceiverGroup rg(backupBlockNo, ptr.p->nodes);
sendSignal(rg, GSN_STOP_BACKUP_REQ, signal,
StopBackupReq::SignalLength, JBB);
}
@@ -2577,7 +2601,8 @@ Backup::masterAbort(Signal* signal, Back
ord->backupId = ptr.p->backupId;
ord->backupPtr = ptr.i;
ord->senderData= ptr.i;
- NodeReceiverGroup rg(BACKUP, ptr.p->nodes);
+ BlockNumber backupBlockNo = numberToBlock(BACKUP, instance());
+ NodeReceiverGroup rg(backupBlockNo, ptr.p->nodes);
switch(ptr.p->masterData.gsn){
case GSN_DEFINE_BACKUP_REQ:
@@ -2628,7 +2653,7 @@ Backup::abort_scan(Signal * signal, Back
if(fragPtr.p->scanning != 0 && ptr.p->nodes.get(nodeId)) {
jam();
- const BlockReference ref = numberToRef(BACKUP, nodeId);
+ const BlockReference ref = calcInstanceBlockRef(BACKUP, nodeId);
sendSignal(ref, GSN_ABORT_BACKUP_ORD, signal,
AbortBackupOrd::SignalLength, JBB);
@@ -3184,7 +3209,7 @@ Backup::openFilesReply(Signal* signal,
signal->theData[0] = BackupContinueB::START_FILE_THREAD;
signal->theData[1] = filePtr.i;
- sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 2);
+ sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
}
else
{
@@ -3581,6 +3606,7 @@ Backup::execDIH_SCAN_TAB_CONF(Signal* si
fragPtr.p->scanning = 0;
fragPtr.p->tableId = tableId;
fragPtr.p->fragmentId = i;
+ fragPtr.p->lqhInstanceKey = 0;
fragPtr.p->node = 0;
}//for
@@ -3654,6 +3680,7 @@ Backup::execDIH_SCAN_GET_NODES_CONF(Sign
const Uint32 nodeCount = conf->count;
const Uint32 tableId = conf->tableId;
const Uint32 fragNo = conf->fragId;
+ const Uint32 instanceKey = conf->instanceKey;
ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS);
@@ -3665,6 +3692,7 @@ Backup::execDIH_SCAN_GET_NODES_CONF(Sign
FragmentPtr fragPtr;
tabPtr.p->fragments.getPtr(fragPtr, fragNo);
+ fragPtr.p->lqhInstanceKey = instanceKey;
fragPtr.p->node = conf->nodes[0];
@@ -3718,7 +3746,7 @@ Backup::execSTART_BACKUP_REQ(Signal* sig
filePtr.p->m_flags |= BackupFile::BF_FILE_THREAD;
signal->theData[0] = BackupContinueB::START_FILE_THREAD;
signal->theData[1] = filePtr.i;
- sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 2);
+ sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
}//if
}//for
@@ -3799,7 +3827,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal*
if(!filePtr.p->operation.newFragment(tableId, fragPtr.p->fragmentId)) {
jam();
req->count = count + 1;
- sendSignalWithDelay(BACKUP_REF, GSN_BACKUP_FRAGMENT_REQ, signal, 50,
+ sendSignalWithDelay(reference(), GSN_BACKUP_FRAGMENT_REQ, signal, 50,
signal->length());
ptr.p->slaveState.setState(STARTED);
return;
@@ -3846,7 +3874,14 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal*
req->clientOpPtr= filePtr.i;
req->batch_size_rows= parallelism;
req->batch_size_bytes= 0;
- BlockReference lqhRef = calcInstanceBlockRef(DBLQH);
+ BlockReference lqhRef = 0;
+ if (ptr.p->is_lcp()) {
+ lqhRef = calcInstanceBlockRef(DBLQH);
+ } else {
+ const Uint32 instanceKey = fragPtr.p->lqhInstanceKey;
+ ndbrequire(instanceKey != 0);
+ lqhRef = numberToRef(DBLQH, instanceKey, getOwnNodeId());
+ }
sendSignal(lqhRef, GSN_SCAN_FRAGREQ, signal,
ScanFragReq::SignalLength, JBB);
@@ -4105,7 +4140,7 @@ Backup::fragmentCompleted(Signal* signal
jam();
signal->theData[0] = BackupContinueB::BUFFER_FULL_FRAG_COMPLETE;
signal->theData[1] = filePtr.i;
- sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 50, 2);
+ sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 50, 2);
return;
}//if
@@ -4157,7 +4192,21 @@ void
Backup::checkScan(Signal* signal, BackupFilePtr filePtr)
{
OperationRecord & op = filePtr.p->operation;
- BlockReference lqhRef = calcInstanceBlockRef(DBLQH);
+ BlockReference lqhRef = 0;
+ {
+ BackupRecordPtr ptr LINT_SET_PTR;
+ c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
+ if (ptr.p->is_lcp()) {
+ lqhRef = calcInstanceBlockRef(DBLQH);
+ } else {
+ TablePtr tabPtr;
+ ndbrequire(findTable(ptr, tabPtr, filePtr.p->tableId));
+ FragmentPtr fragPtr;
+ tabPtr.p->fragments.getPtr(fragPtr, filePtr.p->fragmentNo);
+ const Uint32 instanceKey = fragPtr.p->lqhInstanceKey;
+ lqhRef = numberToRef(DBLQH, instanceKey, getOwnNodeId());
+ }
+ }
if(filePtr.p->errorCode != 0)
{
@@ -4196,11 +4245,12 @@ Backup::checkScan(Signal* signal, Backup
filePtr.p->tableId,
filePtr.p->fragmentNo,
filePtr.p->operation.noOfRecords);
- memmove(signal->theData+1, signal->theData,
+ memmove(signal->theData+2, signal->theData,
4*ScanFragNextReq::SignalLength);
signal->theData[0] = BackupContinueB::ZDELAY_SCAN_NEXT;
- sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal,
- 300, 1+ScanFragNextReq::SignalLength);
+ signal->theData[1] = filePtr.i;
+ sendSignalWithDelay(reference(), GSN_CONTINUEB, signal,
+ 300, 2+ScanFragNextReq::SignalLength);
return;
}
if(ERROR_INSERTED(10032))
@@ -4240,7 +4290,7 @@ Backup::checkScan(Signal* signal, Backup
signal->theData[0] = BackupContinueB::BUFFER_FULL_SCAN;
signal->theData[1] = filePtr.i;
- sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 50, 2);
+ sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 50, 2);
}
void
@@ -4612,7 +4662,8 @@ Backup::sendAbortBackupOrd(Signal* signa
const Uint32 nodeId = node.p->nodeId;
if(node.p->alive && ptr.p->nodes.get(nodeId)) {
jam();
- sendSignal(numberToRef(BACKUP, nodeId), GSN_ABORT_BACKUP_ORD, signal,
+ BlockReference ref = calcInstanceBlockRef(BACKUP, nodeId);
+ sendSignal(ref, GSN_ABORT_BACKUP_ORD, signal,
AbortBackupOrd::SignalLength, JBB);
}//if
}//for
@@ -4868,7 +4919,8 @@ Backup::execABORT_BACKUP_ORD(Signal* sig
#ifdef DEBUG_ABORT
ndbout_c("---- Forward to master nodeId = %u", getMasterNodeId());
#endif
- sendSignal(calcBackupBlockRef(getMasterNodeId()), GSN_ABORT_BACKUP_ORD,
+ BlockReference ref = calcInstanceBlockRef(BACKUP, getMasterNodeId());
+ sendSignal(ref, GSN_ABORT_BACKUP_ORD,
signal, AbortBackupOrd::SignalLength, JBB);
return;
}
=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.hpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.hpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.hpp 2008-09-16 18:29:47 +0000
@@ -180,6 +180,7 @@ public:
Uint32 tableId;
Uint16 node;
Uint16 fragmentId;
+ Uint8 lqhInstanceKey;
Uint8 scanned; // 0 = not scanned x = scanned by node x
Uint8 scanning; // 0 = not scanning x = scanning on node x
Uint8 lcp_no;
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp 2008-08-11 11:48:10 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp 2008-09-16 18:29:47 +0000
@@ -246,10 +246,11 @@ void Dbtup::sendReadAttrinfo(Signal* sig
* The UTIL/TC blocks are in another thread (in multi-threaded ndbd), so
* must use sendSignal().
*
- * In MT LQH only LQH and BACKUP are in same thread.
+ * In MT LQH only LQH and BACKUP are in same thread, and BACKUP only
+ * in LCP case since user-backup uses single worker.
*/
BlockNumber blockMain = blockToMain(block);
- if (blockMain == DBLQH || blockMain == BACKUP)
+ if (blockMain == DBLQH)
{
EXECUTE_DIRECT(blockMain, GSN_TRANSID_AI, signal, 3 + ToutBufIndex);
jamEntry();
@@ -260,6 +261,13 @@ void Dbtup::sendReadAttrinfo(Signal* sig
EXECUTE_DIRECT(blockMain, GSN_TRANSID_AI, signal, 3 + ToutBufIndex, 0);
jamEntry();
}
+ else if (blockMain == BACKUP)
+ {
+ Uint32 iNo = blockToInstance(block);
+ // wl4391_todo maybe not MT safe in non-LCP case
+ EXECUTE_DIRECT(blockMain, GSN_TRANSID_AI, signal, 3 + ToutBufIndex, iNo);
+ jamEntry();
+ }
else
{
jam();
=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2008-09-12 12:55:29 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2008-09-16 18:29:47 +0000
@@ -2900,14 +2900,19 @@ MgmtSrvr::startBackup(Uint32& backupId,
SimpleSignal ssig;
BackupReq* req = CAST_PTR(BackupReq, ssig.getDataPtrSend());
+ /*
+ * Single-threaded backup. Set instance key 1. In the kernel
+ * this maps to main instance 0 or worker instance 1 (if MT LQH).
+ */
+ BlockNumber backupBlockNo = numberToBlock(BACKUP, 1);
if(input_backupId > 0)
{
- ssig.set(ss, TestOrd::TraceAPI, BACKUP, GSN_BACKUP_REQ,
+ ssig.set(ss, TestOrd::TraceAPI, backupBlockNo, GSN_BACKUP_REQ,
BackupReq::SignalLength);
req->inputBackupId = input_backupId;
}
else
- ssig.set(ss, TestOrd::TraceAPI, BACKUP, GSN_BACKUP_REQ,
+ ssig.set(ss, TestOrd::TraceAPI, backupBlockNo, GSN_BACKUP_REQ,
BackupReq::SignalLength - 1);
req->senderData = 19;
| Thread |
|---|
| • bzr commit into mysql-5.1 branch (pekka:2764) WL#4391 | Pekka Nousiainen | 16 Sep |