Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2006-10-20 16:25:14+02:00, jonas@stripped +3 -0
Merge perch.ndb.mysql.com:/home/jonas/src/50-work
into perch.ndb.mysql.com:/home/jonas/src/51-work
MERGE: 1.1810.2123.20
storage/ndb/include/kernel/ndb_limits.h@stripped, 2006-10-20 16:23:35+02:00, jonas@stripped +0 -0
Auto merged
MERGE: 1.14.4.2
storage/ndb/include/kernel/ndb_limits.h@stripped, 2006-10-20 16:23:34+02:00, jonas@stripped +0 -0
Merge rename: ndb/include/kernel/ndb_limits.h -> storage/ndb/include/kernel/ndb_limits.h
storage/ndb/src/kernel/blocks/backup/Backup.cpp@stripped, 2006-10-20 16:23:35+02:00, jonas@stripped +0 -0
Auto merged
MERGE: 1.15.24.2
storage/ndb/src/kernel/blocks/backup/Backup.cpp@stripped, 2006-10-20 16:23:34+02:00, jonas@stripped +0 -0
Merge rename: ndb/src/kernel/blocks/backup/Backup.cpp -> storage/ndb/src/kernel/blocks/backup/Backup.cpp
storage/ndb/src/kernel/blocks/backup/Backup.hpp@stripped, 2006-10-20 16:25:12+02:00, jonas@stripped +3 -3
merge
MERGE: 1.7.7.2
storage/ndb/src/kernel/blocks/backup/Backup.hpp@stripped, 2006-10-20 16:23:35+02:00, jonas@stripped +0 -0
Merge rename: ndb/src/kernel/blocks/backup/Backup.hpp -> storage/ndb/src/kernel/blocks/backup/Backup.hpp
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: jonas
# Host: perch.ndb.mysql.com
# Root: /home/jonas/src/51-work/RESYNC
--- 1.14.4.1/ndb/include/kernel/ndb_limits.h 2006-10-20 16:25:18 +02:00
+++ 1.25/storage/ndb/include/kernel/ndb_limits.h 2006-10-20 16:25:18 +02:00
@@ -27,6 +27,7 @@
*/
#define MAX_NDB_NODES 49
#define MAX_NODES 64
+#define UNDEF_NODEGROUP 0xFFFF
/**
* MAX_API_NODES = MAX_NODES - No of NDB Nodes in use
@@ -63,6 +64,9 @@
#define MAX_FRM_DATA_SIZE 6000
#define MAX_NULL_BITS 4096
#define MAX_FRAGMENT_DATA_BYTES (4+(2 * 8 * MAX_REPLICAS * MAX_NDB_NODES))
+#define MAX_NDB_PARTITIONS 1024
+#define MAX_RANGE_DATA (131072+MAX_NDB_PARTITIONS) //0.5 MByte of list data
+#define MAX_WORDS_META_FILE 16382
#define MAX_WORDS_META_FILE 24576
@@ -126,9 +130,26 @@
*/
#define MAX_XFRM_MULTIPLY 8 /* max expansion when normalizing */
+/**
+ * Disk data
+ */
+#define MAX_FILES_PER_FILEGROUP 1024
+
+/**
+ * Page size in global page pool
+ */
+#define GLOBAL_PAGE_SIZE 32768
+#define GLOBAL_PAGE_SIZE_WORDS 8192
+
/*
* Long signals
*/
#define NDB_SECTION_SEGMENT_SZ 60
+
+/*
+ * Restore Buffer in pages
+ * 4M
+ */
+#define LCP_RESTORE_BUFFER (4*32)
#endif
--- 1.15.24.1/ndb/src/kernel/blocks/backup/Backup.cpp 2006-10-20 16:25:18 +02:00
+++ 1.56/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2006-10-20 16:25:18 +02:00
@@ -14,6 +14,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+#include <my_config.h>
#include "Backup.hpp"
#include <ndb_version.h>
@@ -24,6 +25,7 @@
#include <signaldata/NodeFailRep.hpp>
#include <signaldata/ReadNodesConf.hpp>
+#include <signaldata/DihFragCount.hpp>
#include <signaldata/ScanFrag.hpp>
#include <signaldata/GetTabInfo.hpp>
@@ -52,6 +54,7 @@
#include <AttributeHeader.hpp>
#include <signaldata/WaitGCP.hpp>
+#include <signaldata/LCP.hpp>
#include <NdbTick.h>
@@ -66,112 +69,13 @@
#endif
//#define DEBUG_ABORT
+//#define dbg globalSignalLoggers.log
static Uint32 g_TypeOfStart = NodeState::ST_ILLEGAL_TYPE;
#define SEND_BACKUP_STARTED_FLAG(A) (((A) & 0x3) > 0)
#define SEND_BACKUP_COMPLETED_FLAG(A) (((A) & 0x3) > 1)
-void
-Backup::execREAD_CONFIG_REQ(Signal* signal)
-{
- jamEntry();
-
- const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
-
- Uint32 ref = req->senderRef;
- Uint32 senderData = req->senderData;
-
- const ndb_mgm_configuration_iterator * p =
- theConfiguration.getOwnConfigIterator();
- ndbrequire(p != 0);
-
- c_nodePool.setSize(MAX_NDB_NODES);
-
- Uint32 noBackups = 0, noTables = 0, noAttribs = 0, noFrags = 0;
- ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_DISCLESS, &m_diskless));
- ndb_mgm_get_int_parameter(p, CFG_DB_PARALLEL_BACKUPS, &noBackups);
- // ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_NO_TABLES, &noTables));
- ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DICT_TABLE, &noTables));
- ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_NO_ATTRIBUTES, &noAttribs));
- ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DIH_FRAG_CONNECT, &noFrags));
-
- noAttribs++; //RT 527 bug fix
-
- c_backupPool.setSize(noBackups);
- c_backupFilePool.setSize(3 * noBackups);
- c_tablePool.setSize(noBackups * noTables);
- c_attributePool.setSize(noBackups * noAttribs);
- c_triggerPool.setSize(noBackups * 3 * noTables);
- c_fragmentPool.setSize(noBackups * noFrags);
-
- Uint32 szMem = 0;
- ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_MEM, &szMem);
- Uint32 noPages = (szMem + sizeof(Page32) - 1) / sizeof(Page32);
- // We need to allocate an additional of 2 pages. 1 page because of a bug in
- // ArrayPool and another one for DICTTAINFO.
- c_pagePool.setSize(noPages + NO_OF_PAGES_META_FILE + 2);
-
- Uint32 szDataBuf = (2 * 1024 * 1024);
- Uint32 szLogBuf = (2 * 1024 * 1024);
- Uint32 szWrite = 32768, maxWriteSize = (256 * 1024);
- ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_DATA_BUFFER_MEM, &szDataBuf);
- ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_LOG_BUFFER_MEM, &szLogBuf);
- ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_WRITE_SIZE, &szWrite);
- ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_MAX_WRITE_SIZE, &maxWriteSize);
-
- c_defaults.m_logBufferSize = szLogBuf;
- c_defaults.m_dataBufferSize = szDataBuf;
- c_defaults.m_minWriteSize = szWrite;
- c_defaults.m_maxWriteSize = maxWriteSize;
-
- { // Init all tables
- ArrayList<Table> tables(c_tablePool);
- TablePtr ptr;
- while(tables.seize(ptr)){
- new (ptr.p) Table(c_attributePool, c_fragmentPool);
- }
- tables.release();
- }
-
- {
- ArrayList<BackupFile> ops(c_backupFilePool);
- BackupFilePtr ptr;
- while(ops.seize(ptr)){
- new (ptr.p) BackupFile(* this, c_pagePool);
- }
- ops.release();
- }
-
- {
- ArrayList<BackupRecord> recs(c_backupPool);
- BackupRecordPtr ptr;
- while(recs.seize(ptr)){
- new (ptr.p) BackupRecord(* this, c_pagePool, c_tablePool,
- c_backupFilePool, c_triggerPool);
- }
- recs.release();
- }
-
- // Initialize BAT for interface to file system
- {
- Page32Ptr p;
- ndbrequire(c_pagePool.seizeId(p, 0));
- c_startOfPages = (Uint32 *)p.p;
- c_pagePool.release(p);
-
- NewVARIABLE* bat = allocateBat(1);
- bat[0].WA = c_startOfPages;
- bat[0].nrr = c_pagePool.getSize()*sizeof(Page32)/sizeof(Uint32);
- }
-
- ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
- conf->senderRef = reference();
- conf->senderData = senderData;
- sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
- ReadConfigConf::SignalLength, JBB);
-}
-
void
Backup::execSTTOR(Signal* signal)
{
@@ -180,6 +84,16 @@
const Uint32 startphase = signal->theData[1];
const Uint32 typeOfStart = signal->theData[7];
+ if (startphase == 1)
+ {
+ m_curr_disk_write_speed = c_defaults.m_disk_write_speed_sr;
+ m_overflow_disk_write = 0;
+ m_reset_disk_speed_time = NdbTick_CurrentMillisecond();
+ m_reset_delay_used = Backup::DISK_SPEED_CHECK_DELAY;
+ signal->theData[0] = BackupContinueB::RESET_DISK_SPEED_COUNTER;
+ sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal,
+ Backup::DISK_SPEED_CHECK_DELAY, 1);
+ }
if (startphase == 3) {
jam();
g_TypeOfStart = typeOfStart;
@@ -188,6 +102,11 @@
return;
}//if
+ if (startphase == 7)
+ {
+ m_curr_disk_write_speed = c_defaults.m_disk_write_speed;
+ }
+
if(startphase == 7 && g_TypeOfStart == NodeState::ST_INITIAL_START &&
c_masterNodeId == getOwnNodeId()){
jam();
@@ -266,8 +185,45 @@
const Uint32 Tdata2 = signal->theData[2];
switch(Tdata0) {
+ case BackupContinueB::RESET_DISK_SPEED_COUNTER:
+ {
+ /*
+ Adjust for upto 10 millisecond delay of this signal. Longer
+ delays will not be handled, in this case the system is most
+ likely under too high load and it won't matter very much that
+ we decrease the speed of checkpoints.
+
+ We use a technique where we allow an overflow write in one
+ period. This overflow will be removed from the next period
+ such that the load will at average be as specified.
+ */
+ int delay_time = m_reset_delay_used;
+ NDB_TICKS curr_time = NdbTick_CurrentMillisecond();
+ int sig_delay = curr_time - m_reset_disk_speed_time;
+
+ m_words_written_this_period = m_overflow_disk_write;
+ m_overflow_disk_write = 0;
+ m_reset_disk_speed_time = curr_time;
+
+ if (sig_delay > delay_time + 10)
+ delay_time = Backup::DISK_SPEED_CHECK_DELAY - 10;
+ else if (sig_delay < delay_time - 10)
+ delay_time = Backup::DISK_SPEED_CHECK_DELAY + 10;
+ else
+ delay_time = Backup::DISK_SPEED_CHECK_DELAY - (sig_delay - delay_time);
+ m_reset_delay_used= delay_time;
+ signal->theData[0] = BackupContinueB::RESET_DISK_SPEED_COUNTER;
+ sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, delay_time, 1);
+#if 0
+ ndbout << "Signal delay was = " << sig_delay;
+ ndbout << " Current time = " << curr_time << endl;
+ ndbout << " Delay time will be = " << delay_time << endl << endl;
+#endif
+ break;
+ }
case BackupContinueB::BACKUP_FRAGMENT_INFO:
{
+ jam();
const Uint32 ptr_I = Tdata1;
Uint32 tabPtr_I = Tdata2;
Uint32 fragPtr_I = signal->theData[3];
@@ -276,48 +232,56 @@
c_backupPool.getPtr(ptr, ptr_I);
TablePtr tabPtr;
ptr.p->tables.getPtr(tabPtr, tabPtr_I);
- FragmentPtr fragPtr;
- tabPtr.p->fragments.getPtr(fragPtr, fragPtr_I);
- BackupFilePtr filePtr;
- ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
-
- const Uint32 sz = sizeof(BackupFormat::CtlFile::FragmentInfo) >> 2;
- Uint32 * dst;
- if (!filePtr.p->operation.dataBuffer.getWritePtr(&dst, sz))
+ if (fragPtr_I != tabPtr.p->fragments.getSize())
{
- sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 4);
- return;
+ jam();
+ FragmentPtr fragPtr;
+ tabPtr.p->fragments.getPtr(fragPtr, fragPtr_I);
+
+ BackupFilePtr filePtr;
+ ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
+
+ const Uint32 sz = sizeof(BackupFormat::CtlFile::FragmentInfo) >> 2;
+ Uint32 * dst;
+ if (!filePtr.p->operation.dataBuffer.getWritePtr(&dst, sz))
+ {
+ sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 4);
+ return;
+ }
+
+ BackupFormat::CtlFile::FragmentInfo * fragInfo =
+ (BackupFormat::CtlFile::FragmentInfo*)dst;
+ fragInfo->SectionType = htonl(BackupFormat::FRAGMENT_INFO);
+ fragInfo->SectionLength = htonl(sz);
+ fragInfo->TableId = htonl(fragPtr.p->tableId);
+ fragInfo->FragmentNo = htonl(fragPtr_I);
+ fragInfo->NoOfRecordsLow = htonl(fragPtr.p->noOfRecords & 0xFFFFFFFF);
+ fragInfo->NoOfRecordsHigh = htonl(fragPtr.p->noOfRecords >> 32);
+ fragInfo->FilePosLow = htonl(0);
+ fragInfo->FilePosHigh = htonl(0);
+
+ filePtr.p->operation.dataBuffer.updateWritePtr(sz);
+
+ fragPtr_I++;
}
-
- BackupFormat::CtlFile::FragmentInfo * fragInfo =
- (BackupFormat::CtlFile::FragmentInfo*)dst;
- fragInfo->SectionType = htonl(BackupFormat::FRAGMENT_INFO);
- fragInfo->SectionLength = htonl(sz);
- fragInfo->TableId = htonl(fragPtr.p->tableId);
- fragInfo->FragmentNo = htonl(fragPtr_I);
- fragInfo->NoOfRecordsLow = htonl(fragPtr.p->noOfRecords & 0xFFFFFFFF);
- fragInfo->NoOfRecordsHigh = htonl(fragPtr.p->noOfRecords >> 32);
- fragInfo->FilePosLow = htonl(0 & 0xFFFFFFFF);
- fragInfo->FilePosHigh = htonl(0 >> 32);
-
- filePtr.p->operation.dataBuffer.updateWritePtr(sz);
-
- fragPtr_I++;
+
if (fragPtr_I == tabPtr.p->fragments.getSize())
{
signal->theData[0] = tabPtr.p->tableId;
signal->theData[1] = 0; // unlock
EXECUTE_DIRECT(DBDICT, GSN_BACKUP_FRAGMENT_REQ, signal, 2);
-
+
fragPtr_I = 0;
ptr.p->tables.next(tabPtr);
if ((tabPtr_I = tabPtr.i) == RNIL)
{
- closeFiles(signal, ptr);
- return;
+ jam();
+ closeFiles(signal, ptr);
+ return;
}
}
+
signal->theData[0] = BackupContinueB::BACKUP_FRAGMENT_INFO;
signal->theData[1] = ptr_I;
signal->theData[2] = tabPtr_I;
@@ -461,10 +425,9 @@
for(ptr.p->files.first(filePtr); filePtr.i != RNIL;
ptr.p->files.next(filePtr)){
jam();
- infoEvent(" file %d: type: %d open: %d running: %d done: %d scan: %d",
- filePtr.i, filePtr.p->fileType, filePtr.p->fileOpened,
- filePtr.p->fileRunning,
- filePtr.p->fileClosing, filePtr.p->scanRunning);
+ infoEvent(" file %d: type: %d flags: H'%x",
+ filePtr.i, filePtr.p->fileType,
+ filePtr.p->m_flags);
}
}
}
@@ -485,13 +448,26 @@
if(signal->getLength() == 2 && signal->theData[1] == 2424)
{
- ndbrequire(c_tablePool.getSize() == c_tablePool.getNoOfFree());
- ndbrequire(c_attributePool.getSize() == c_attributePool.getNoOfFree());
- ndbrequire(c_backupPool.getSize() == c_backupPool.getNoOfFree());
- ndbrequire(c_backupFilePool.getSize() == c_backupFilePool.getNoOfFree());
- ndbrequire(c_pagePool.getSize() == c_pagePool.getNoOfFree());
- ndbrequire(c_fragmentPool.getSize() == c_fragmentPool.getNoOfFree());
- ndbrequire(c_triggerPool.getSize() == c_triggerPool.getNoOfFree());
+ /**
+ * Handle LCP
+ */
+ BackupRecordPtr lcp;
+ ndbrequire(c_backups.first(lcp));
+
+ ndbrequire(c_backupPool.getSize() == c_backupPool.getNoOfFree() + 1);
+ if(lcp.p->tables.isEmpty())
+ {
+ ndbrequire(c_tablePool.getSize() == c_tablePool.getNoOfFree());
+ ndbrequire(c_attributePool.getSize() == c_attributePool.getNoOfFree());
+ ndbrequire(c_fragmentPool.getSize() == c_fragmentPool.getNoOfFree());
+ ndbrequire(c_triggerPool.getSize() == c_triggerPool.getNoOfFree());
+ }
+ ndbrequire(c_backupFilePool.getSize() == c_backupFilePool.getNoOfFree() + 1);
+ BackupFilePtr lcp_file;
+ c_backupFilePool.getPtr(lcp_file, lcp.p->dataFilePtr);
+ ndbrequire(c_pagePool.getSize() ==
+ c_pagePool.getNoOfFree() +
+ lcp_file.p->pages.getSize());
}
}
}
@@ -644,12 +620,6 @@
TriggerEvent::TE_DELETE
};
-const char* triggerNameFormat[] = {
- "NDB$BACKUP_%d_%d_INSERT",
- "NDB$BACKUP_%d_%d_UPDATE",
- "NDB$BACKUP_%d_%d_DELETE"
-};
-
const Backup::State
Backup::validSlaveTransitions[] = {
INITIAL, DEFINING,
@@ -919,7 +889,6 @@
ref->backupPtr = ptr.i;
ref->backupId = ptr.p->backupId;
ref->errorCode = AbortBackupOrd::BackupFailureDueToNodeFail;
- ref->signalNo = ptr.p->masterData.startBackup.signalNo;
gsn= GSN_START_BACKUP_REF;
len= StartBackupRef::SignalLength;
pos= &ref->nodeId - signal->getDataPtr();
@@ -1028,7 +997,7 @@
return;
}//if
- if (m_diskless)
+ if (c_defaults.m_diskless)
{
sendBackupRef(senderRef, flags, signal, senderData,
BackupRef::CannotBackupDiskless);
@@ -1056,7 +1025,6 @@
return;
}//if
- ndbrequire(ptr.p->pages.empty());
ndbrequire(ptr.p->tables.isEmpty());
ptr.p->m_gsn = 0;
@@ -1071,9 +1039,7 @@
ptr.p->backupKey[1] = 0;
ptr.p->backupDataLen = 0;
ptr.p->masterData.errorCode = 0;
- ptr.p->masterData.dropTrig.tableId = RNIL;
- ptr.p->masterData.alterTrig.tableId = RNIL;
-
+
UtilSequenceReq * utilReq = (UtilSequenceReq*)signal->getDataPtrSend();
ptr.p->masterData.gsn = GSN_UTIL_SEQUENCE_REQ;
@@ -1384,13 +1350,18 @@
signal->theData[2] = ptr.p->backupId;
ptr.p->nodes.copyto(NdbNodeBitmask::Size, signal->theData+3);
sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3+NdbNodeBitmask::Size, JBB);
-
+
/**
- * Prepare Trig
+ * We've received GSN_DEFINE_BACKUP_CONF from all participants.
+ *
+ * Our next step is to send START_BACKUP_REQ to all participants,
+ * who will then send CREATE_TRIG_REQ for all tables to their local
+ * DBTUP.
*/
TablePtr tabPtr;
- ndbrequire(ptr.p->tables.first(tabPtr));
- sendCreateTrig(signal, ptr, tabPtr);
+ ptr.p->tables.first(tabPtr);
+
+ sendStartBackup(signal, ptr, tabPtr);
}
/*****************************************************************************
@@ -1404,11 +1375,12 @@
{
mask.clear();
Table & table = * tabPtr.p;
- for(Uint32 i = 0; i<table.noOfAttributes; i++) {
+ Ptr<Attribute> attrPtr;
+ table.attributes.first(attrPtr);
+ for(; !attrPtr.isNull(); table.attributes.next(attrPtr))
+ {
jam();
- AttributePtr attr;
- table.attributes.getPtr(attr, i);
- mask.set(i);
+ mask.set(attrPtr.p->data.attrId);
}
}
@@ -1417,42 +1389,72 @@
BackupRecordPtr ptr, TablePtr tabPtr)
{
CreateTrigReq * req =(CreateTrigReq *)signal->getDataPtrSend();
-
- ptr.p->masterData.gsn = GSN_CREATE_TRIG_REQ;
- ptr.p->masterData.sendCounter = 3;
- ptr.p->masterData.createTrig.tableId = tabPtr.p->tableId;
+
+ /*
+ * First, setup the structures
+ */
+ for(Uint32 j=0; j<3; j++) {
+ jam();
+
+ TriggerPtr trigPtr;
+ if(!ptr.p->triggers.seize(trigPtr)) {
+ jam();
+ ptr.p->m_gsn = GSN_START_BACKUP_REF;
+ StartBackupRef* ref = (StartBackupRef*)signal->getDataPtrSend();
+ ref->backupPtr = ptr.i;
+ ref->backupId = ptr.p->backupId;
+ ref->errorCode = StartBackupRef::FailedToAllocateTriggerRecord;
+ ref->nodeId = getOwnNodeId();
+ sendSignal(ptr.p->masterRef, GSN_START_BACKUP_REF, signal,
+ StartBackupRef::SignalLength, JBB);
+ return;
+ } // if
+
+ const Uint32 triggerId= trigPtr.i;
+ tabPtr.p->triggerIds[j] = triggerId;
+ tabPtr.p->triggerAllocated[j] = true;
+ trigPtr.p->backupPtr = ptr.i;
+ trigPtr.p->tableId = tabPtr.p->tableId;
+ trigPtr.p->tab_ptr_i = tabPtr.i;
+ trigPtr.p->logEntry = 0;
+ trigPtr.p->event = j;
+ trigPtr.p->maxRecordSize = 4096;
+ trigPtr.p->operation =
+ &ptr.p->files.getPtr(ptr.p->logFilePtr)->operation;
+ trigPtr.p->operation->noOfBytes = 0;
+ trigPtr.p->operation->noOfRecords = 0;
+ trigPtr.p->errorCode = 0;
+ } // for
+
+ /*
+ * now ask DBTUP to create
+ */
+ ptr.p->slaveData.gsn = GSN_CREATE_TRIG_REQ;
+ ptr.p->slaveData.trigSendCounter = 3;
+ ptr.p->slaveData.createTrig.tableId = tabPtr.p->tableId;
req->setUserRef(reference());
+ req->setReceiverRef(reference());
req->setConnectionPtr(ptr.i);
req->setRequestType(CreateTrigReq::RT_USER);
-
+
Bitmask<MAXNROFATTRIBUTESINWORDS> attrMask;
createAttributeMask(tabPtr, attrMask);
req->setAttributeMask(attrMask);
req->setTableId(tabPtr.p->tableId);
req->setIndexId(RNIL); // not used
- req->setTriggerId(RNIL); // to be created
req->setTriggerType(TriggerType::SUBSCRIPTION);
req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
req->setMonitorReplicas(true);
req->setMonitorAllAttributes(false);
- req->setOnline(false); // leave trigger offline
+ req->setOnline(true);
- char triggerName[MAX_TAB_NAME_SIZE];
- Uint32 nameBuffer[2 + ((MAX_TAB_NAME_SIZE + 3) >> 2)]; // SP string
- LinearWriter w(nameBuffer, sizeof(nameBuffer) >> 2);
- LinearSectionPtr lsPtr[3];
-
for (int i=0; i < 3; i++) {
+ req->setTriggerId(tabPtr.p->triggerIds[i]);
req->setTriggerEvent(triggerEventValues[i]);
- BaseString::snprintf(triggerName, sizeof(triggerName), triggerNameFormat[i],
- ptr.p->backupId, tabPtr.p->tableId);
- w.reset();
- w.add(CreateTrigReq::TriggerNameKey, triggerName);
- lsPtr[0].p = nameBuffer;
- lsPtr[0].sz = w.getWordsUsed();
- sendSignal(DBDICT_REF, GSN_CREATE_TRIG_REQ,
- signal, CreateTrigReq::SignalLength, JBB, lsPtr, 1);
+
+ sendSignal(DBTUP_REF, GSN_CREATE_TRIG_REQ,
+ signal, CreateTrigReq::SignalLength, JBB);
}
}
@@ -1472,25 +1474,25 @@
/**
* Verify that I'm waiting for this conf
- */
- ndbrequire(ptr.p->masterRef == reference());
- ndbrequire(ptr.p->masterData.gsn == GSN_CREATE_TRIG_REQ);
- ndbrequire(ptr.p->masterData.sendCounter.done() == false);
- ndbrequire(ptr.p->masterData.createTrig.tableId == tableId);
-
+ *
+ * ptr.p->masterRef != reference()
+ * as slaves and masters have triggers now.
+ */
+ ndbrequire(ptr.p->slaveData.gsn == GSN_CREATE_TRIG_REQ);
+ ndbrequire(ptr.p->slaveData.trigSendCounter.done() == false);
+ ndbrequire(ptr.p->slaveData.createTrig.tableId == tableId);
+
TablePtr tabPtr;
ndbrequire(findTable(ptr, tabPtr, tableId));
ndbrequire(type < 3); // if some decides to change the enums
- ndbrequire(tabPtr.p->triggerIds[type] == ILLEGAL_TRIGGER_ID);
- tabPtr.p->triggerIds[type] = triggerId;
-
createTrigReply(signal, ptr);
}
void
Backup::execCREATE_TRIG_REF(Signal* signal)
{
+ jamEntry();
CreateTrigRef* ref = (CreateTrigRef*)signal->getDataPtr();
const Uint32 ptrI = ref->getConnectionPtr();
@@ -1501,14 +1503,16 @@
/**
* Verify that I'm waiting for this ref
- */
- ndbrequire(ptr.p->masterRef == reference());
- ndbrequire(ptr.p->masterData.gsn == GSN_CREATE_TRIG_REQ);
- ndbrequire(ptr.p->masterData.sendCounter.done() == false);
- ndbrequire(ptr.p->masterData.createTrig.tableId == tableId);
+ *
+ * ptr.p->masterRef != reference()
+ * as slaves and masters have triggers now
+ */
+ ndbrequire(ptr.p->slaveData.gsn == GSN_CREATE_TRIG_REQ);
+ ndbrequire(ptr.p->slaveData.trigSendCounter.done() == false);
+ ndbrequire(ptr.p->slaveData.createTrig.tableId == tableId);
ptr.p->setErrorCode(ref->getErrorCode());
-
+
createTrigReply(signal, ptr);
}
@@ -1520,26 +1524,33 @@
/**
* Check finished with table
*/
- ptr.p->masterData.sendCounter--;
- if(ptr.p->masterData.sendCounter.done() == false){
+ ptr.p->slaveData.trigSendCounter--;
+ if(ptr.p->slaveData.trigSendCounter.done() == false){
jam();
return;
}//if
- if (ERROR_INSERTED(10025))
+ if (ERROR_INSERTED(10025))
{
ptr.p->errorCode = 325;
}
if(ptr.p->checkError()) {
jam();
- masterAbort(signal, ptr);
+ ptr.p->m_gsn = GSN_START_BACKUP_REF;
+ StartBackupRef* ref = (StartBackupRef*)signal->getDataPtrSend();
+ ref->backupPtr = ptr.i;
+ ref->backupId = ptr.p->backupId;
+ ref->errorCode = ptr.p->errorCode;
+ ref->nodeId = getOwnNodeId();
+ sendSignal(ptr.p->masterRef, GSN_START_BACKUP_REF, signal,
+ StartBackupRef::SignalLength, JBB);
return;
}//if
TablePtr tabPtr;
- ndbrequire(findTable(ptr, tabPtr, ptr.p->masterData.createTrig.tableId));
-
+ ndbrequire(findTable(ptr, tabPtr, ptr.p->slaveData.createTrig.tableId));
+
/**
* Next table
*/
@@ -1551,14 +1562,16 @@
}//if
/**
- * Finished with all tables, send StartBackupReq
+ * We've finished creating triggers.
+ *
+ * send conf and wait
*/
- ptr.p->tables.first(tabPtr);
- ptr.p->masterData.startBackup.signalNo = 0;
- ptr.p->masterData.startBackup.noOfSignals =
- (ptr.p->tables.noOfElements() + StartBackupReq::MaxTableTriggers - 1) /
- StartBackupReq::MaxTableTriggers;
- sendStartBackup(signal, ptr, tabPtr);
+ ptr.p->m_gsn = GSN_START_BACKUP_CONF;
+ StartBackupConf* conf = (StartBackupConf*)signal->getDataPtrSend();
+ conf->backupPtr = ptr.i;
+ conf->backupId = ptr.p->backupId;
+ sendSignal(ptr.p->masterRef, GSN_START_BACKUP_CONF, signal,
+ StartBackupConf::SignalLength, JBB);
}
/*****************************************************************************
@@ -1571,33 +1584,23 @@
{
ptr.p->masterData.startBackup.tablePtr = tabPtr.i;
-
+
StartBackupReq* req = (StartBackupReq*)signal->getDataPtrSend();
req->backupId = ptr.p->backupId;
req->backupPtr = ptr.i;
- req->signalNo = ptr.p->masterData.startBackup.signalNo;
- req->noOfSignals = ptr.p->masterData.startBackup.noOfSignals;
- Uint32 i;
- for(i = 0; i<StartBackupReq::MaxTableTriggers; i++) {
- jam();
- req->tableTriggers[i].tableId = tabPtr.p->tableId;
- req->tableTriggers[i].triggerIds[0] = tabPtr.p->triggerIds[0];
- req->tableTriggers[i].triggerIds[1] = tabPtr.p->triggerIds[1];
- req->tableTriggers[i].triggerIds[2] = tabPtr.p->triggerIds[2];
- if(!ptr.p->tables.next(tabPtr)){
- jam();
- i++;
- break;
- }//if
- }//for
- req->noOfTableTriggers = i;
+ /**
+ * We use trigger Ids that are unique to BACKUP.
+ * These don't interfere with other triggers (e.g. from DBDICT)
+ * as there is a special case in DBTUP.
+ *
+ * Consequently, backups during online upgrade won't work
+ */
ptr.p->masterData.gsn = GSN_START_BACKUP_REQ;
ptr.p->masterData.sendCounter = ptr.p->nodes;
NodeReceiverGroup rg(BACKUP, ptr.p->nodes);
- sendSignal(rg, GSN_START_BACKUP_REQ, signal,
- StartBackupReq::HeaderLength +
- (i * StartBackupReq::TableTriggerLength), JBB);
+ sendSignal(rg, GSN_START_BACKUP_REQ, signal,
+ StartBackupReq::SignalLength, JBB);
}
void
@@ -1608,14 +1611,13 @@
StartBackupRef* ref = (StartBackupRef*)signal->getDataPtr();
const Uint32 ptrI = ref->backupPtr;
//const Uint32 backupId = ref->backupId;
- const Uint32 signalNo = ref->signalNo;
const Uint32 nodeId = ref->nodeId;
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, ptrI);
ptr.p->setErrorCode(ref->errorCode);
- startBackupReply(signal, ptr, nodeId, signalNo);
+ startBackupReply(signal, ptr, nodeId);
}
void
@@ -1626,23 +1628,20 @@
StartBackupConf* conf = (StartBackupConf*)signal->getDataPtr();
const Uint32 ptrI = conf->backupPtr;
//const Uint32 backupId = conf->backupId;
- const Uint32 signalNo = conf->signalNo;
const Uint32 nodeId = refToNode(signal->senderBlockRef());
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, ptrI);
- startBackupReply(signal, ptr, nodeId, signalNo);
+ startBackupReply(signal, ptr, nodeId);
}
void
-Backup::startBackupReply(Signal* signal, BackupRecordPtr ptr,
- Uint32 nodeId, Uint32 signalNo)
+Backup::startBackupReply(Signal* signal, BackupRecordPtr ptr, Uint32 nodeId)
{
CRASH_INSERTION((10004));
- ndbrequire(ptr.p->masterData.startBackup.signalNo == signalNo);
if (!haveAllSignals(ptr, GSN_START_BACKUP_REQ, nodeId)) {
jam();
return;
@@ -1659,149 +1658,21 @@
return;
}
- TablePtr tabPtr;
- c_tablePool.getPtr(tabPtr, ptr.p->masterData.startBackup.tablePtr);
- for(Uint32 i = 0; i<StartBackupReq::MaxTableTriggers; i++) {
- jam();
- if(!ptr.p->tables.next(tabPtr)) {
- jam();
- break;
- }//if
- }//for
-
- if(tabPtr.i != RNIL) {
- jam();
- ptr.p->masterData.startBackup.signalNo++;
- sendStartBackup(signal, ptr, tabPtr);
- return;
- }
-
- sendAlterTrig(signal, ptr);
-}
-
-/*****************************************************************************
- *
- * Master functionallity - Activate triggers
- *
- *****************************************************************************/
-void
-Backup::sendAlterTrig(Signal* signal, BackupRecordPtr ptr)
-{
- AlterTrigReq * req =(AlterTrigReq *)signal->getDataPtrSend();
-
- ptr.p->masterData.gsn = GSN_ALTER_TRIG_REQ;
- ptr.p->masterData.sendCounter = 0;
-
- req->setUserRef(reference());
- req->setConnectionPtr(ptr.i);
- req->setRequestType(AlterTrigReq::RT_USER);
- req->setTriggerInfo(0); // not used on ALTER via DICT
- req->setOnline(true);
- req->setReceiverRef(reference());
-
- TablePtr tabPtr;
-
- if (ptr.p->masterData.alterTrig.tableId == RNIL) {
- jam();
- ptr.p->tables.first(tabPtr);
- } else {
- jam();
- ndbrequire(findTable(ptr, tabPtr, ptr.p->masterData.alterTrig.tableId));
- ptr.p->tables.next(tabPtr);
- }//if
- if (tabPtr.i != RNIL) {
- jam();
- ptr.p->masterData.alterTrig.tableId = tabPtr.p->tableId;
- req->setTableId(tabPtr.p->tableId);
-
- req->setTriggerId(tabPtr.p->triggerIds[0]);
- sendSignal(DBDICT_REF, GSN_ALTER_TRIG_REQ,
- signal, AlterTrigReq::SignalLength, JBB);
-
- req->setTriggerId(tabPtr.p->triggerIds[1]);
- sendSignal(DBDICT_REF, GSN_ALTER_TRIG_REQ,
- signal, AlterTrigReq::SignalLength, JBB);
-
- req->setTriggerId(tabPtr.p->triggerIds[2]);
- sendSignal(DBDICT_REF, GSN_ALTER_TRIG_REQ,
- signal, AlterTrigReq::SignalLength, JBB);
-
- ptr.p->masterData.sendCounter += 3;
- return;
- }//if
- ptr.p->masterData.alterTrig.tableId = RNIL;
-
/**
- * Finished with all tables
+ * Wait for GCP
*/
ptr.p->masterData.gsn = GSN_WAIT_GCP_REQ;
ptr.p->masterData.waitGCP.startBackup = true;
-
+
WaitGCPReq * waitGCPReq = (WaitGCPReq*)signal->getDataPtrSend();
waitGCPReq->senderRef = reference();
waitGCPReq->senderData = ptr.i;
waitGCPReq->requestType = WaitGCPReq::CompleteForceStart;
- sendSignal(DBDIH_REF, GSN_WAIT_GCP_REQ, signal,
+ sendSignal(DBDIH_REF, GSN_WAIT_GCP_REQ, signal,
WaitGCPReq::SignalLength,JBB);
}
void
-Backup::execALTER_TRIG_CONF(Signal* signal)
-{
- jamEntry();
-
- AlterTrigConf* conf = (AlterTrigConf*)signal->getDataPtr();
- const Uint32 ptrI = conf->getConnectionPtr();
-
- BackupRecordPtr ptr;
- c_backupPool.getPtr(ptr, ptrI);
-
- alterTrigReply(signal, ptr);
-}
-
-void
-Backup::execALTER_TRIG_REF(Signal* signal)
-{
- jamEntry();
-
- AlterTrigRef* ref = (AlterTrigRef*)signal->getDataPtr();
- const Uint32 ptrI = ref->getConnectionPtr();
-
- BackupRecordPtr ptr;
- c_backupPool.getPtr(ptr, ptrI);
-
- ptr.p->setErrorCode(ref->getErrorCode());
-
- alterTrigReply(signal, ptr);
-}
-
-void
-Backup::alterTrigReply(Signal* signal, BackupRecordPtr ptr)
-{
-
- CRASH_INSERTION((10005));
-
- ndbrequire(ptr.p->masterRef == reference());
- ndbrequire(ptr.p->masterData.gsn == GSN_ALTER_TRIG_REQ);
- ndbrequire(ptr.p->masterData.sendCounter.done() == false);
-
- ptr.p->masterData.sendCounter--;
-
- if(ptr.p->masterData.sendCounter.done() == false){
- jam();
- return;
- }//if
-
- if(ptr.p->checkError()){
- jam();
- masterAbort(signal, ptr);
- return;
- }//if
-
- sendAlterTrig(signal, ptr);
-}
-
-void
Backup::execWAIT_GCP_REF(Signal* signal)
{
jamEntry();
@@ -1861,7 +1732,12 @@
{
CRASH_INSERTION((10009));
ptr.p->stopGCP = gcp;
- sendDropTrig(signal, ptr); // regular dropping of triggers
+ /**
+ * Backup is complete - begin cleanup
+ * STOP_BACKUP_REQ is sent to participants.
+ * They then drop the local triggers
+ */
+ sendStopBackup(signal, ptr);
return;
}//if
@@ -2115,8 +1991,8 @@
}
/*****************************************************************************
- *
- * Master functionallity - Drop triggers
+ *
+ * Slave functionallity - Drop triggers
*
*****************************************************************************/
@@ -2124,23 +2000,77 @@
Backup::sendDropTrig(Signal* signal, BackupRecordPtr ptr)
{
TablePtr tabPtr;
- if (ptr.p->masterData.dropTrig.tableId == RNIL) {
+ ptr.p->slaveData.gsn = GSN_DROP_TRIG_REQ;
+
+ if (ptr.p->slaveData.dropTrig.tableId == RNIL) {
jam();
- ptr.p->tables.first(tabPtr);
+ if(ptr.p->tables.count())
+ ptr.p->tables.first(tabPtr);
+ else
+ {
+ // Early abort, go to close files
+ jam();
+ closeFiles(signal, ptr);
+ return;
+ }
} else {
jam();
- ndbrequire(findTable(ptr, tabPtr, ptr.p->masterData.dropTrig.tableId));
+ ndbrequire(findTable(ptr, tabPtr, ptr.p->slaveData.dropTrig.tableId));
ptr.p->tables.next(tabPtr);
}//if
if (tabPtr.i != RNIL) {
jam();
sendDropTrig(signal, ptr, tabPtr);
} else {
- jam();
- ptr.p->masterData.dropTrig.tableId = RNIL;
+ /**
+ * Insert footers
+ */
+ {
+ BackupFilePtr filePtr;
+ ptr.p->files.getPtr(filePtr, ptr.p->logFilePtr);
+ Uint32 * dst;
+ ndbrequire(filePtr.p->operation.dataBuffer.getWritePtr(&dst, 1));
+ * dst = 0;
+ filePtr.p->operation.dataBuffer.updateWritePtr(1);
+ }
- sendStopBackup(signal, ptr);
- }//if
+ {
+ BackupFilePtr filePtr;
+ ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
+
+ const Uint32 gcpSz = sizeof(BackupFormat::CtlFile::GCPEntry) >> 2;
+
+ Uint32 * dst;
+ ndbrequire(filePtr.p->operation.dataBuffer.getWritePtr(&dst, gcpSz));
+
+ BackupFormat::CtlFile::GCPEntry * gcp =
+ (BackupFormat::CtlFile::GCPEntry*)dst;
+
+ gcp->SectionType = htonl(BackupFormat::GCP_ENTRY);
+ gcp->SectionLength = htonl(gcpSz);
+ gcp->StartGCP = htonl(ptr.p->startGCP);
+ gcp->StopGCP = htonl(ptr.p->stopGCP - 1);
+ filePtr.p->operation.dataBuffer.updateWritePtr(gcpSz);
+
+ {
+ TablePtr tabPtr;
+ if (ptr.p->tables.first(tabPtr))
+ {
+ jam();
+ signal->theData[0] = BackupContinueB::BACKUP_FRAGMENT_INFO;
+ signal->theData[1] = ptr.i;
+ signal->theData[2] = tabPtr.i;
+ signal->theData[3] = 0;
+ sendSignal(BACKUP_REF, GSN_CONTINUEB, signal, 4, JBB);
+ }
+ else
+ {
+ jam();
+ closeFiles(signal, ptr);
+ }
+ }
+ }
+ }
}
void
@@ -2149,40 +2079,26 @@
jam();
DropTrigReq * req = (DropTrigReq *)signal->getDataPtrSend();
- ptr.p->masterData.gsn = GSN_DROP_TRIG_REQ;
- ptr.p->masterData.sendCounter = 0;
-
+ ptr.p->slaveData.gsn = GSN_DROP_TRIG_REQ;
+ ptr.p->slaveData.trigSendCounter = 0;
req->setConnectionPtr(ptr.i);
req->setUserRef(reference()); // Sending to myself
req->setRequestType(DropTrigReq::RT_USER);
req->setIndexId(RNIL);
- req->setTriggerInfo(0); // not used on DROP via DICT
+ req->setTriggerInfo(0); // not used on DROP
+ req->setTriggerType(TriggerType::SUBSCRIPTION);
+ req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
- char triggerName[MAX_TAB_NAME_SIZE];
- Uint32 nameBuffer[2 + ((MAX_TAB_NAME_SIZE + 3) >> 2)]; // SP string
- LinearWriter w(nameBuffer, sizeof(nameBuffer) >> 2);
- LinearSectionPtr lsPtr[3];
-
- ptr.p->masterData.dropTrig.tableId = tabPtr.p->tableId;
+ ptr.p->slaveData.dropTrig.tableId = tabPtr.p->tableId;
req->setTableId(tabPtr.p->tableId);
for (int i = 0; i < 3; i++) {
Uint32 id = tabPtr.p->triggerIds[i];
req->setTriggerId(id);
- if (id != ILLEGAL_TRIGGER_ID) {
- sendSignal(DBDICT_REF, GSN_DROP_TRIG_REQ,
- signal, DropTrigReq::SignalLength, JBB);
- } else {
- BaseString::snprintf(triggerName, sizeof(triggerName), triggerNameFormat[i],
- ptr.p->backupId, tabPtr.p->tableId);
- w.reset();
- w.add(CreateTrigReq::TriggerNameKey, triggerName);
- lsPtr[0].p = nameBuffer;
- lsPtr[0].sz = w.getWordsUsed();
- sendSignal(DBDICT_REF, GSN_DROP_TRIG_REQ,
- signal, DropTrigReq::SignalLength, JBB, lsPtr, 1);
- }
- ptr.p->masterData.sendCounter ++;
+ req->setTriggerEvent(triggerEventValues[i]);
+ sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ,
+ signal, DropTrigReq::SignalLength, JBB);
+ ptr.p->slaveData.trigSendCounter ++;
}
}
@@ -2193,11 +2109,16 @@
DropTrigRef* ref = (DropTrigRef*)signal->getDataPtr();
const Uint32 ptrI = ref->getConnectionPtr();
-
+
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, ptrI);
-
- //ndbrequire(ref->getErrorCode() == DropTrigRef::NoSuchTrigger);
+
+ if(ref->getConf()->getTriggerId() != -1)
+ {
+ ndbout << "ERROR DROPPING TRIGGER: " << ref->getConf()->getTriggerId();
+ ndbout << " Err: " << (Uint32)ref->getErrorCode() << endl << endl;
+ }
+
dropTrigReply(signal, ptr);
}
@@ -2208,29 +2129,29 @@
DropTrigConf* conf = (DropTrigConf*)signal->getDataPtr();
const Uint32 ptrI = conf->getConnectionPtr();
-
+ const Uint32 triggerId= conf->getTriggerId();
+
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, ptrI);
-
+
dropTrigReply(signal, ptr);
}
void
Backup::dropTrigReply(Signal* signal, BackupRecordPtr ptr)
{
-
CRASH_INSERTION((10012));
- ndbrequire(ptr.p->masterRef == reference());
- ndbrequire(ptr.p->masterData.gsn == GSN_DROP_TRIG_REQ);
- ndbrequire(ptr.p->masterData.sendCounter.done() == false);
-
- ptr.p->masterData.sendCounter--;
- if(ptr.p->masterData.sendCounter.done() == false){
+ ndbrequire(ptr.p->slaveData.gsn == GSN_DROP_TRIG_REQ);
+ ndbrequire(ptr.p->slaveData.trigSendCounter.done() == false);
+
+ // move from .masterData to .slaveData
+ ptr.p->slaveData.trigSendCounter--;
+ if(ptr.p->slaveData.trigSendCounter.done() == false){
jam();
return;
}//if
-
+
sendDropTrig(signal, ptr); // recursive next
}
@@ -2357,6 +2278,9 @@
#ifdef DEBUG_ABORT
ndbout_c("************ masterAbort");
#endif
+
+ ndbassert(ptr.p->masterRef == reference());
+
if(ptr.p->masterData.errorCode != 0)
{
jam();
@@ -2400,13 +2324,13 @@
case GSN_BACKUP_FRAGMENT_REQ:
jam();
ptr.p->stopGCP= ptr.p->startGCP + 1;
- sendDropTrig(signal, ptr); // dropping due to error
+ sendStopBackup(signal, ptr); // dropping due to error
return;
case GSN_UTIL_SEQUENCE_REQ:
case GSN_UTIL_LOCK_REQ:
- case GSN_DROP_TRIG_REQ:
ndbrequire(false);
return;
+ case GSN_DROP_TRIG_REQ:
case GSN_STOP_BACKUP_REQ:
return;
}
@@ -2453,8 +2377,46 @@
void
Backup::defineBackupRef(Signal* signal, BackupRecordPtr ptr, Uint32 errCode)
{
- ptr.p->m_gsn = GSN_DEFINE_BACKUP_REF;
+ jam();
ptr.p->setErrorCode(errCode);
+ if(ptr.p->is_lcp())
+ {
+ jam();
+
+ BackupFilePtr filePtr;
+ ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
+ if (filePtr.p->m_flags & BackupFile::BF_LCP_META)
+ {
+ jam();
+ ndbrequire(! (filePtr.p->m_flags & BackupFile::BF_FILE_THREAD));
+ filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_LCP_META;
+ if (filePtr.p->m_flags & BackupFile::BF_OPEN)
+ {
+ closeFile(signal, ptr, filePtr);
+ return;
+ }
+ }
+
+ ndbrequire(filePtr.p->m_flags == 0);
+
+ TablePtr tabPtr;
+ FragmentPtr fragPtr;
+
+ ndbrequire(ptr.p->tables.first(tabPtr));
+ tabPtr.p->fragments.getPtr(fragPtr, 0);
+
+ LcpPrepareRef* ref= (LcpPrepareRef*)signal->getDataPtrSend();
+ ref->senderData = ptr.p->clientData;
+ ref->senderRef = reference();
+ ref->tableId = tabPtr.p->tableId;
+ ref->fragmentId = fragPtr.p->fragmentId;
+ ref->errorCode = errCode;
+ sendSignal(ptr.p->masterRef, GSN_LCP_PREPARE_REF,
+ signal, LcpPrepareRef::SignalLength, JBB);
+ return;
+ }
+
+ ptr.p->m_gsn = GSN_DEFINE_BACKUP_REF;
ndbrequire(ptr.p->errorCode != 0);
DefineBackupRef* ref = (DefineBackupRef*)signal->getDataPtrSend();
@@ -2500,6 +2462,7 @@
ptr.p->m_gsn = GSN_DEFINE_BACKUP_REQ;
ptr.p->slaveState.forceState(INITIAL);
ptr.p->slaveState.setState(DEFINING);
+ ptr.p->slaveData.dropTrig.tableId = RNIL;
ptr.p->errorCode = 0;
ptr.p->clientRef = req->clientRef;
ptr.p->clientData = req->clientData;
@@ -2516,15 +2479,15 @@
ptr.p->backupKey[0] = req->backupKey[0];
ptr.p->backupKey[1] = req->backupKey[1];
ptr.p->backupDataLen = req->backupDataLen;
- ptr.p->masterData.dropTrig.tableId = RNIL;
- ptr.p->masterData.alterTrig.tableId = RNIL;
ptr.p->masterData.errorCode = 0;
ptr.p->noOfBytes = 0;
ptr.p->noOfRecords = 0;
ptr.p->noOfLogBytes = 0;
ptr.p->noOfLogRecords = 0;
ptr.p->currGCP = 0;
-
+ ptr.p->startGCP = 0;
+ ptr.p->stopGCP = 0;
+
/**
* Allocate files
*/
@@ -2558,9 +2521,22 @@
maxWrite[2] = c_defaults.m_maxWriteSize;
noOfPages[2] = (c_defaults.m_dataBufferSize + sizeof(Page32) - 1) /
sizeof(Page32);
+
+ if (ptr.p->is_lcp())
+ {
+ noOfPages[2] = (c_defaults.m_lcp_buffer_size + sizeof(Page32) - 1) /
+ sizeof(Page32);
+ }
+ ptr.p->ctlFilePtr = ptr.p->logFilePtr = ptr.p->dataFilePtr = RNIL;
+
for(Uint32 i = 0; i<3; i++) {
jam();
+ if(ptr.p->is_lcp() && i != 2)
+ {
+ files[i].i = RNIL;
+ continue;
+ }
if(!ptr.p->files.seize(files[i])) {
jam();
defineBackupRef(signal, ptr,
@@ -2571,13 +2547,11 @@
files[i].p->tableId = RNIL;
files[i].p->backupPtr = ptr.i;
files[i].p->filePointer = RNIL;
- files[i].p->fileClosing = 0;
- files[i].p->fileOpened = 0;
- files[i].p->fileRunning = 0;
- files[i].p->scanRunning = 0;
+ files[i].p->m_flags = 0;
files[i].p->errorCode = 0;
-
- if(files[i].p->pages.seize(noOfPages[i]) == false) {
+
+ if(ERROR_INSERTED(10035) || files[i].p->pages.seize(noOfPages[i]) == false)
+ {
jam();
DEBUG_OUT("Failed to seize " << noOfPages[i] << " pages");
defineBackupRef(signal, ptr, DefineBackupRef::FailedToAllocateBuffers);
@@ -2598,15 +2572,22 @@
defineBackupRef(signal, ptr, DefineBackupRef::FailedToSetupFsBuffers);
return;
}//if
+
+ switch(i){
+ case 0:
+ files[i].p->fileType = BackupFormat::CTL_FILE;
+ ptr.p->ctlFilePtr = files[i].i;
+ break;
+ case 1:
+ files[i].p->fileType = BackupFormat::LOG_FILE;
+ ptr.p->logFilePtr = files[i].i;
+ break;
+ case 2:
+ files[i].p->fileType = BackupFormat::DATA_FILE;
+ ptr.p->dataFilePtr = files[i].i;
+ }
}//for
- files[0].p->fileType = BackupFormat::CTL_FILE;
- files[1].p->fileType = BackupFormat::LOG_FILE;
- files[2].p->fileType = BackupFormat::DATA_FILE;
-
- ptr.p->ctlFilePtr = files[0].i;
- ptr.p->logFilePtr = files[1].i;
- ptr.p->dataFilePtr = files[2].i;
-
+
if (!verifyNodesAlive(ptr, ptr.p->nodes)) {
jam();
defineBackupRef(signal, ptr, DefineBackupRef::Undefined);
@@ -2624,6 +2605,13 @@
return;
}//if
+ if(ptr.p->is_lcp())
+ {
+ jam();
+ getFragmentInfoDone(signal, ptr);
+ return;
+ }
+
/**
* Not implemented
*/
@@ -2660,15 +2648,22 @@
Uint32 tableId = ListTablesConf::getTableId(conf->tableData[i]);
Uint32 tableType = ListTablesConf::getTableType(conf->tableData[i]);
Uint32 state= ListTablesConf::getTableState(conf->tableData[i]);
- if (!DictTabInfo::isTable(tableType) && !DictTabInfo::isIndex(tableType)){
+
+ if (! (DictTabInfo::isTable(tableType) ||
+ DictTabInfo::isIndex(tableType) ||
+ DictTabInfo::isFilegroup(tableType) ||
+ DictTabInfo::isFile(tableType)))
+ {
jam();
continue;
- }//if
+ }
+
if (state != DictTabInfo::StateOnline)
{
jam();
continue;
- }//if
+ }
+
TablePtr tabPtr;
ptr.p->tables.seize(tabPtr);
if(tabPtr.i == RNIL) {
@@ -2708,15 +2703,14 @@
FsOpenReq::OM_TRUNCATE |
FsOpenReq::OM_CREATE |
FsOpenReq::OM_APPEND |
- FsOpenReq::OM_SYNC;
+ FsOpenReq::OM_AUTOSYNC;
FsOpenReq::v2_setCount(req->fileNumber, 0xFFFFFFFF);
-
+ req->auto_sync_size = c_defaults.m_disk_synch_size;
/**
* Ctl file
*/
c_backupFilePool.getPtr(filePtr, ptr.p->ctlFilePtr);
- ndbrequire(filePtr.p->fileRunning == 0);
- filePtr.p->fileRunning = 1;
+ filePtr.p->m_flags |= BackupFile::BF_OPENING;
req->userPointer = filePtr.i;
FsOpenReq::setVersion(req->fileNumber, 2);
@@ -2729,8 +2723,7 @@
* Log file
*/
c_backupFilePool.getPtr(filePtr, ptr.p->logFilePtr);
- ndbrequire(filePtr.p->fileRunning == 0);
- filePtr.p->fileRunning = 1;
+ filePtr.p->m_flags |= BackupFile::BF_OPENING;
req->userPointer = filePtr.i;
FsOpenReq::setVersion(req->fileNumber, 2);
@@ -2743,8 +2736,7 @@
* Data file
*/
c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr);
- ndbrequire(filePtr.p->fileRunning == 0);
- filePtr.p->fileRunning = 1;
+ filePtr.p->m_flags |= BackupFile::BF_OPENING;
req->userPointer = filePtr.i;
FsOpenReq::setVersion(req->fileNumber, 2);
@@ -2790,8 +2782,8 @@
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
- ndbrequire(filePtr.p->fileOpened == 0);
- filePtr.p->fileOpened = 1;
+ ndbrequire(! (filePtr.p->m_flags & BackupFile::BF_OPEN));
+ filePtr.p->m_flags |= BackupFile::BF_OPEN;
openFilesReply(signal, ptr, filePtr);
}
@@ -2804,16 +2796,16 @@
/**
* Mark files as "opened"
*/
- ndbrequire(filePtr.p->fileRunning == 1);
- filePtr.p->fileRunning = 0;
-
+ ndbrequire(filePtr.p->m_flags & BackupFile::BF_OPENING);
+ filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_OPENING;
+ filePtr.p->m_flags |= BackupFile::BF_OPEN;
/**
* Check if all files have recived open_reply
*/
for(ptr.p->files.first(filePtr); filePtr.i!=RNIL;ptr.p->files.next(filePtr))
{
jam();
- if(filePtr.p->fileRunning == 1) {
+ if(filePtr.p->m_flags & BackupFile::BF_OPENING) {
jam();
return;
}//if
@@ -2828,48 +2820,71 @@
return;
}//if
+ if(!ptr.p->is_lcp())
+ {
+ /**
+ * Insert file headers
+ */
+ ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
+ if(!insertFileHeader(BackupFormat::CTL_FILE, ptr.p, filePtr.p)) {
+ jam();
+ defineBackupRef(signal, ptr, DefineBackupRef::FailedInsertFileHeader);
+ return;
+ }//if
+
+ ptr.p->files.getPtr(filePtr, ptr.p->logFilePtr);
+ if(!insertFileHeader(BackupFormat::LOG_FILE, ptr.p, filePtr.p)) {
+ jam();
+ defineBackupRef(signal, ptr, DefineBackupRef::FailedInsertFileHeader);
+ return;
+ }//if
+
+ ptr.p->files.getPtr(filePtr, ptr.p->dataFilePtr);
+ if(!insertFileHeader(BackupFormat::DATA_FILE, ptr.p, filePtr.p)) {
+ jam();
+ defineBackupRef(signal, ptr, DefineBackupRef::FailedInsertFileHeader);
+ return;
+ }//if
+ }
+ else
+ {
+ ptr.p->files.getPtr(filePtr, ptr.p->dataFilePtr);
+ if(!insertFileHeader(BackupFormat::LCP_FILE, ptr.p, filePtr.p)) {
+ jam();
+ defineBackupRef(signal, ptr, DefineBackupRef::FailedInsertFileHeader);
+ return;
+ }//if
+
+ ptr.p->ctlFilePtr = ptr.p->dataFilePtr;
+ }
+
/**
- * Insert file headers
+ * Start CTL file thread
*/
- ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
- if(!insertFileHeader(BackupFormat::CTL_FILE, ptr.p, filePtr.p)) {
- jam();
- defineBackupRef(signal, ptr, DefineBackupRef::FailedInsertFileHeader);
- return;
- }//if
-
- ptr.p->files.getPtr(filePtr, ptr.p->logFilePtr);
- if(!insertFileHeader(BackupFormat::LOG_FILE, ptr.p, filePtr.p)) {
+ if (!ptr.p->is_lcp())
+ {
jam();
- defineBackupRef(signal, ptr, DefineBackupRef::FailedInsertFileHeader);
- return;
- }//if
-
- ptr.p->files.getPtr(filePtr, ptr.p->dataFilePtr);
- if(!insertFileHeader(BackupFormat::DATA_FILE, ptr.p, filePtr.p)) {
+ ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
+ filePtr.p->m_flags |= BackupFile::BF_FILE_THREAD;
+
+ signal->theData[0] = BackupContinueB::START_FILE_THREAD;
+ signal->theData[1] = filePtr.i;
+ sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 2);
+ }
+ else
+ {
jam();
- defineBackupRef(signal, ptr, DefineBackupRef::FailedInsertFileHeader);
- return;
- }//if
-
- /**
- * Start CTL file thread
- */
- ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
- filePtr.p->fileRunning = 1;
+ filePtr.p->m_flags |= BackupFile::BF_LCP_META;
+ }
- signal->theData[0] = BackupContinueB::START_FILE_THREAD;
- signal->theData[1] = ptr.p->ctlFilePtr;
- sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 2);
-
/**
* Insert table list in ctl file
*/
FsBuffer & buf = filePtr.p->operation.dataBuffer;
-
+
const Uint32 sz =
(sizeof(BackupFormat::CtlFile::TableList) >> 2) +
- ptr.p->tables.noOfElements() - 1;
+ ptr.p->tables.count() - 1;
Uint32 * dst;
ndbrequire(sz < buf.getMaxWrite());
@@ -2900,7 +2915,7 @@
* Start getting table definition data
*/
ndbrequire(ptr.p->tables.first(tabPtr));
-
+
signal->theData[0] = BackupContinueB::BUFFER_FULL_META;
signal->theData[1] = ptr.i;
signal->theData[2] = tabPtr.i;
@@ -2948,6 +2963,10 @@
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, senderData);
+ BackupFilePtr filePtr;
+ ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
+ filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_FILE_THREAD;
+
defineBackupRef(signal, ptr, ref->errorCode);
}
@@ -2965,6 +2984,8 @@
//const Uint32 senderRef = info->senderRef;
const Uint32 len = conf->totalLen;
const Uint32 senderData = conf->senderData;
+ const Uint32 tableType = conf->tableType;
+ const Uint32 tableId = conf->tableId;
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, senderData);
@@ -2973,28 +2994,15 @@
signal->getSection(dictTabInfoPtr, GetTabInfoConf::DICT_TAB_INFO);
ndbrequire(dictTabInfoPtr.sz == len);
- /**
- * No of pages needed
- */
- const Uint32 noPages = (len + sizeof(Page32) - 1) / sizeof(Page32);
- if(ptr.p->pages.getSize() < noPages) {
- jam();
- ptr.p->pages.release();
- if(ptr.p->pages.seize(noPages) == false) {
- jam();
- ptr.p->setErrorCode(DefineBackupRef::FailedAllocateTableMem);
- ndbrequire(false);
- releaseSections(signal);
- defineBackupRef(signal, ptr);
- return;
- }//if
- }//if
-
+ TablePtr tabPtr ;
+ ndbrequire(findTable(ptr, tabPtr, tableId));
+
BackupFilePtr filePtr;
ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
FsBuffer & buf = filePtr.p->operation.dataBuffer;
+ Uint32* dst = 0;
{ // Write into ctl file
- Uint32* dst, dstLen = len + 2;
+ Uint32 dstLen = len + 3;
if(!buf.getWritePtr(&dst, dstLen)) {
jam();
ndbrequire(false);
@@ -3009,61 +3017,77 @@
BackupFormat::CtlFile::TableDescription * desc =
(BackupFormat::CtlFile::TableDescription*)dst;
desc->SectionType = htonl(BackupFormat::TABLE_DESCRIPTION);
- desc->SectionLength = htonl(len + 2);
- dst += 2;
-
+ desc->SectionLength = htonl(len + 3);
+ desc->TableType = htonl(tableType);
+ dst += 3;
+
copy(dst, dictTabInfoPtr);
buf.updateWritePtr(dstLen);
}//if
}
-
- ndbrequire(ptr.p->pages.getSize() >= noPages);
- Page32Ptr pagePtr;
- ptr.p->pages.getPtr(pagePtr, 0);
- copy(&pagePtr.p->data[0], dictTabInfoPtr);
+
releaseSections(signal);
-
+
if(ptr.p->checkError()) {
jam();
defineBackupRef(signal, ptr);
return;
}//if
- TablePtr tabPtr = parseTableDescription(signal, ptr, len);
- if(tabPtr.i == RNIL) {
+ if (!DictTabInfo::isTable(tabPtr.p->tableType))
+ {
jam();
- defineBackupRef(signal, ptr);
- return;
- }//if
- TablePtr tmp = tabPtr;
- ptr.p->tables.next(tabPtr);
- if(DictTabInfo::isIndex(tmp.p->tableType))
+ TablePtr tmp = tabPtr;
+ ptr.p->tables.next(tabPtr);
+ ptr.p->tables.release(tmp);
+ goto next;
+ }
+
+ if (!parseTableDescription(signal, ptr, tabPtr, dst, len))
{
jam();
- ptr.p->tables.release(tmp);
+ defineBackupRef(signal, ptr);
+ return;
}
- else
+
+ if(!ptr.p->is_lcp())
{
jam();
- signal->theData[0] = tmp.p->tableId;
+ signal->theData[0] = tabPtr.p->tableId;
signal->theData[1] = 1; // lock
EXECUTE_DIRECT(DBDICT, GSN_BACKUP_FRAGMENT_REQ, signal, 2);
}
-
- if(tabPtr.i == RNIL) {
+
+ ptr.p->tables.next(tabPtr);
+
+next:
+ if(tabPtr.i == RNIL)
+ {
+ /**
+ * Done with all tables...
+ */
jam();
- ptr.p->pages.release();
+ if(ptr.p->is_lcp())
+ {
+ lcp_open_file_done(signal, ptr);
+ return;
+ }
ndbrequire(ptr.p->tables.first(tabPtr));
- signal->theData[0] = RNIL;
- signal->theData[1] = tabPtr.p->tableId;
- signal->theData[2] = ptr.i;
- sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 3, JBB);
+ DihFragCountReq * const req = (DihFragCountReq*)signal->getDataPtrSend();
+ req->m_connectionData = RNIL;
+ req->m_tableRef = tabPtr.p->tableId;
+ req->m_senderData = ptr.i;
+ sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal,
+ DihFragCountReq::SignalLength, JBB);
return;
}//if
+ /**
+ * Fetch next table...
+ */
signal->theData[0] = BackupContinueB::BUFFER_FULL_META;
signal->theData[1] = ptr.i;
signal->theData[2] = tabPtr.i;
@@ -3071,14 +3095,14 @@
return;
}
-Backup::TablePtr
-Backup::parseTableDescription(Signal* signal, BackupRecordPtr ptr, Uint32 len)
+bool
+Backup::parseTableDescription(Signal* signal,
+ BackupRecordPtr ptr,
+ TablePtr tabPtr,
+ const Uint32 * tabdescptr,
+ Uint32 len)
{
-
- Page32Ptr pagePtr;
- ptr.p->pages.getPtr(pagePtr, 0);
-
- SimplePropertiesLinearReader it(&pagePtr.p->data[0], len);
+ SimplePropertiesLinearReader it(tabdescptr, len);
it.first();
@@ -3089,13 +3113,15 @@
DictTabInfo::TableMappingSize,
true, true);
ndbrequire(stat == SimpleProperties::Break);
+
+ bool lcp = ptr.p->is_lcp();
+
+ ndbrequire(tabPtr.p->tableId == tmpTab.TableId);
+ ndbrequire(lcp || (tabPtr.p->tableType == tmpTab.TableType));
- TablePtr tabPtr;
- ndbrequire(findTable(ptr, tabPtr, tmpTab.TableId));
- if(DictTabInfo::isIndex(tabPtr.p->tableType)){
- jam();
- return tabPtr;
- }
+ /**
+ * LCP should not save disk attributes but only mem attributes
+ */
/**
* Initialize table object
@@ -3113,13 +3139,7 @@
tabPtr.p->triggerAllocated[1] = false;
tabPtr.p->triggerAllocated[2] = false;
- if(tabPtr.p->attributes.seize(tabPtr.p->noOfAttributes) == false) {
- jam();
- ptr.p->setErrorCode(DefineBackupRef::FailedToAllocateAttributeRecord);
- tabPtr.i = RNIL;
- return tabPtr;
- }//if
-
+ Uint32 disk = 0;
const Uint32 count = tabPtr.p->noOfAttributes;
for(Uint32 i = 0; i<count; i++) {
jam();
@@ -3130,59 +3150,116 @@
true, true);
ndbrequire(stat == SimpleProperties::Break);
+ it.next(); // Move Past EndOfAttribute
const Uint32 arr = tmp.AttributeArraySize;
const Uint32 sz = 1 << tmp.AttributeSize;
const Uint32 sz32 = (sz * arr + 31) >> 5;
+ if(lcp && tmp.AttributeStorageType == NDB_STORAGETYPE_DISK)
+ {
+ disk++;
+ continue;
+ }
+
AttributePtr attrPtr;
- tabPtr.p->attributes.getPtr(attrPtr, tmp.AttributeId);
+ if(!tabPtr.p->attributes.seize(attrPtr))
+ {
+ jam();
+ ptr.p->setErrorCode(DefineBackupRef::FailedToAllocateAttributeRecord);
+ return false;
+ }
- attrPtr.p->data.nullable = tmp.AttributeNullableFlag;
- attrPtr.p->data.fixed = (tmp.AttributeArraySize != 0);
- attrPtr.p->data.sz32 = sz32;
+ attrPtr.p->data.m_flags = 0;
+ attrPtr.p->data.attrId = tmp.AttributeId;
+ attrPtr.p->data.m_flags |=
+ (tmp.AttributeNullableFlag ? Attribute::COL_NULLABLE : 0);
+ attrPtr.p->data.m_flags |= (tmp.AttributeArrayType == NDB_ARRAYTYPE_FIXED)?
+ Attribute::COL_FIXED : 0;
+ attrPtr.p->data.sz32 = sz32;
+
/**
- * Either
- * 1) Fixed
- * 2) Nullable
- * 3) Variable
+ * 1) Fixed non-nullable
+ * 2) Other
*/
- if(attrPtr.p->data.fixed == true && attrPtr.p->data.nullable == false) {
+ if(attrPtr.p->data.m_flags & Attribute::COL_FIXED &&
+ !(attrPtr.p->data.m_flags & Attribute::COL_NULLABLE)) {
jam();
attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
tabPtr.p->sz_FixedAttributes += sz32;
- }//if
-
- if(attrPtr.p->data.fixed == true && attrPtr.p->data.nullable == true) {
- jam();
- attrPtr.p->data.offset = 0;
-
- attrPtr.p->data.offsetNull = tabPtr.p->noOfNull;
- tabPtr.p->noOfNull++;
- tabPtr.p->noOfVariable++;
- }//if
-
- if(attrPtr.p->data.fixed == false) {
- jam();
+ } else {
+ attrPtr.p->data.offset = ~0;
tabPtr.p->noOfVariable++;
- ndbrequire(0);
- }//if
-
- it.next(); // Move Past EndOfAttribute
+ }
}//for
- return tabPtr;
+
+
+ if(lcp)
+ {
+ if (disk)
+ {
+ /**
+ * Remove all disk attributes
+ */
+ tabPtr.p->noOfAttributes -= disk;
+
+ {
+ AttributePtr attrPtr;
+ ndbrequire(tabPtr.p->attributes.seize(attrPtr));
+
+ Uint32 sz32 = 2;
+ attrPtr.p->data.attrId = AttributeHeader::DISK_REF;
+ attrPtr.p->data.m_flags = Attribute::COL_FIXED;
+ attrPtr.p->data.sz32 = 2;
+
+ attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
+ tabPtr.p->sz_FixedAttributes += sz32;
+ tabPtr.p->noOfAttributes ++;
+ }
+ }
+
+ {
+ AttributePtr attrPtr;
+ ndbrequire(tabPtr.p->attributes.seize(attrPtr));
+
+ Uint32 sz32 = 2;
+ attrPtr.p->data.attrId = AttributeHeader::ROWID;
+ attrPtr.p->data.m_flags = Attribute::COL_FIXED;
+ attrPtr.p->data.sz32 = 2;
+
+ attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
+ tabPtr.p->sz_FixedAttributes += sz32;
+ tabPtr.p->noOfAttributes ++;
+ }
+
+ if (tmpTab.RowGCIFlag)
+ {
+ AttributePtr attrPtr;
+ ndbrequire(tabPtr.p->attributes.seize(attrPtr));
+
+ Uint32 sz32 = 2;
+ attrPtr.p->data.attrId = AttributeHeader::ROW_GCI;
+ attrPtr.p->data.m_flags = Attribute::COL_FIXED;
+ attrPtr.p->data.sz32 = 2;
+
+ attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
+ tabPtr.p->sz_FixedAttributes += sz32;
+ tabPtr.p->noOfAttributes ++;
+ }
+ }
+ return true;
}
void
Backup::execDI_FCOUNTCONF(Signal* signal)
{
jamEntry();
-
- const Uint32 userPtr = signal->theData[0];
- const Uint32 fragCount = signal->theData[1];
- const Uint32 tableId = signal->theData[2];
- const Uint32 senderData = signal->theData[3];
+ DihFragCountConf * const conf = (DihFragCountConf*)signal->getDataPtr();
+ const Uint32 userPtr = conf->m_connectionData;
+ const Uint32 fragCount = conf->m_fragmentCount;
+ const Uint32 tableId = conf->m_tableRef;
+ const Uint32 senderData = conf->m_senderData;
ndbrequire(userPtr == RNIL && signal->length() == 5);
@@ -3200,6 +3277,7 @@
fragPtr.p->scanned = 0;
fragPtr.p->scanning = 0;
fragPtr.p->tableId = tableId;
+ fragPtr.p->fragmentId = i;
fragPtr.p->node = RNIL;
}//for
@@ -3208,10 +3286,12 @@
*/
if(ptr.p->tables.next(tabPtr)) {
jam();
- signal->theData[0] = RNIL;
- signal->theData[1] = tabPtr.p->tableId;
- signal->theData[2] = ptr.i;
- sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 3, JBB);
+ DihFragCountReq * const req = (DihFragCountReq*)signal->getDataPtrSend();
+ req->m_connectionData = RNIL;
+ req->m_tableRef = tabPtr.p->tableId;
+ req->m_senderData = ptr.i;
+ sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal,
+ DihFragCountReq::SignalLength, JBB);
return;
}//if
@@ -3271,7 +3351,7 @@
FragmentPtr fragPtr;
tabPtr.p->fragments.getPtr(fragPtr, fragNo);
-
+
fragPtr.p->node = signal->theData[2];
getFragmentInfo(signal, ptr, tabPtr, fragNo + 1);
@@ -3301,81 +3381,39 @@
jamEntry();
CRASH_INSERTION((10015));
-
+
StartBackupReq* req = (StartBackupReq*)signal->getDataPtr();
const Uint32 ptrI = req->backupPtr;
- //const Uint32 backupId = req->backupId;
- const Uint32 signalNo = req->signalNo;
-
+
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, ptrI);
-
+
ptr.p->slaveState.setState(STARTED);
ptr.p->m_gsn = GSN_START_BACKUP_REQ;
- for(Uint32 i = 0; i<req->noOfTableTriggers; i++) {
- jam();
- TablePtr tabPtr;
- ndbrequire(findTable(ptr, tabPtr, req->tableTriggers[i].tableId));
- for(Uint32 j = 0; j<3; j++) {
- jam();
- const Uint32 triggerId = req->tableTriggers[i].triggerIds[j];
- tabPtr.p->triggerIds[j] = triggerId;
-
- TriggerPtr trigPtr;
- if(!ptr.p->triggers.seizeId(trigPtr, triggerId)) {
- jam();
- ptr.p->m_gsn = GSN_START_BACKUP_REF;
- StartBackupRef* ref = (StartBackupRef*)signal->getDataPtrSend();
- ref->backupPtr = ptr.i;
- ref->backupId = ptr.p->backupId;
- ref->signalNo = signalNo;
- ref->errorCode = StartBackupRef::FailedToAllocateTriggerRecord;
- ref->nodeId = getOwnNodeId();
- sendSignal(ptr.p->masterRef, GSN_START_BACKUP_REF, signal,
- StartBackupRef::SignalLength, JBB);
- return;
- }//if
-
- tabPtr.p->triggerAllocated[j] = true;
- trigPtr.p->backupPtr = ptr.i;
- trigPtr.p->tableId = tabPtr.p->tableId;
- trigPtr.p->tab_ptr_i = tabPtr.i;
- trigPtr.p->logEntry = 0;
- trigPtr.p->event = j;
- trigPtr.p->maxRecordSize = 4096;
- trigPtr.p->operation =
- &ptr.p->files.getPtr(ptr.p->logFilePtr)->operation;
- trigPtr.p->operation->noOfBytes = 0;
- trigPtr.p->operation->noOfRecords = 0;
- trigPtr.p->errorCode = 0;
- }//for
- }//for
-
/**
* Start file threads...
*/
BackupFilePtr filePtr;
- for(ptr.p->files.first(filePtr);
- filePtr.i!=RNIL;
- ptr.p->files.next(filePtr)){
+ for(ptr.p->files.first(filePtr); filePtr.i!=RNIL;ptr.p->files.next(filePtr))
+ {
jam();
- if(filePtr.p->fileRunning == 0) {
+ if(! (filePtr.p->m_flags & BackupFile::BF_FILE_THREAD))
+ {
jam();
- filePtr.p->fileRunning = 1;
+ filePtr.p->m_flags |= BackupFile::BF_FILE_THREAD;
signal->theData[0] = BackupContinueB::START_FILE_THREAD;
signal->theData[1] = filePtr.i;
sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 2);
}//if
}//for
-
- ptr.p->m_gsn = GSN_START_BACKUP_CONF;
- StartBackupConf* conf = (StartBackupConf*)signal->getDataPtrSend();
- conf->backupPtr = ptr.i;
- conf->backupId = ptr.p->backupId;
- conf->signalNo = signalNo;
- sendSignal(ptr.p->masterRef, GSN_START_BACKUP_CONF, signal,
- StartBackupConf::SignalLength, JBB);
+
+ /**
+ * Tell DBTUP to create triggers
+ */
+ TablePtr tabPtr;
+ ndbrequire(ptr.p->tables.first(tabPtr));
+ sendCreateTrig(signal, ptr, tabPtr);
}
/*****************************************************************************
@@ -3396,7 +3434,7 @@
const Uint32 tableId = req->tableId;
const Uint32 fragNo = req->fragmentNo;
const Uint32 count = req->count;
-
+
/**
* Get backup record
*/
@@ -3413,10 +3451,8 @@
c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr);
ndbrequire(filePtr.p->backupPtr == ptrI);
- ndbrequire(filePtr.p->fileOpened == 1);
- ndbrequire(filePtr.p->fileRunning == 1);
- ndbrequire(filePtr.p->scanRunning == 0);
- ndbrequire(filePtr.p->fileClosing == 0);
+ ndbrequire(filePtr.p->m_flags ==
+ (BackupFile::BF_OPEN | BackupFile::BF_FILE_THREAD));
/**
* Get table
@@ -3433,7 +3469,7 @@
ndbrequire(fragPtr.p->scanned == 0);
ndbrequire(fragPtr.p->scanning == 0 ||
refToNode(ptr.p->masterRef) == getOwnNodeId());
-
+
/**
* Init operation
*/
@@ -3446,7 +3482,7 @@
/**
* Check for space in buffer
*/
- if(!filePtr.p->operation.newFragment(tableId, fragNo)) {
+ if(!filePtr.p->operation.newFragment(tableId, fragPtr.p->fragmentId)) {
jam();
req->count = count + 1;
sendSignalWithDelay(BACKUP_REF, GSN_BACKUP_FRAGMENT_REQ, signal, 50,
@@ -3459,13 +3495,13 @@
* Mark things as "in use"
*/
fragPtr.p->scanning = 1;
- filePtr.p->fragmentNo = fragNo;
-
+ filePtr.p->fragmentNo = fragPtr.p->fragmentId;
+
/**
* Start scan
*/
{
- filePtr.p->scanRunning = 1;
+ filePtr.p->m_flags |= BackupFile::BF_SCAN_THREAD;
Table & table = * tabPtr.p;
ScanFragReq * req = (ScanFragReq *)signal->getDataPtrSend();
@@ -3475,7 +3511,7 @@
req->senderData = filePtr.i;
req->resultRef = reference();
req->schemaVersion = table.schemaVersion;
- req->fragmentNoKeyLen = fragNo;
+ req->fragmentNoKeyLen = fragPtr.p->fragmentId;
req->requestInfo = 0;
req->savePointId = 0;
req->tableId = table.tableId;
@@ -3484,6 +3520,13 @@
ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
ScanFragReq::setAttrLen(req->requestInfo,attrLen);
+ if (ptr.p->is_lcp())
+ {
+ ScanFragReq::setScanPrio(req->requestInfo, 1);
+ ScanFragReq::setTupScanFlag(req->requestInfo, 1);
+ ScanFragReq::setNoDiskFlag(req->requestInfo, 1);
+ ScanFragReq::setLcpScanFlag(req->requestInfo, 1);
+ }
req->transId1 = 0;
req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8);
req->clientOpPtr= filePtr.i;
@@ -3504,13 +3547,20 @@
signal->theData[7] = 0;
Uint32 dataPos = 8;
- Uint32 i;
- for(i = 0; i<table.noOfAttributes; i++) {
+ Ptr<Attribute> attrPtr;
+ table.attributes.first(attrPtr);
+ for(; !attrPtr.isNull(); table.attributes.next(attrPtr))
+ {
jam();
- AttributePtr attr;
- table.attributes.getPtr(attr, i);
- AttributeHeader::init(&signal->theData[dataPos], i, 0);
+ /**
+ * LCP should not save disk attributes
+ */
+ ndbrequire(! (ptr.p->is_lcp() &&
+ attrPtr.p->data.m_flags & Attribute::COL_DISK));
+
+ AttributeHeader::init(&signal->theData[dataPos],
+ attrPtr.p->data.attrId, 0);
dataPos++;
if(dataPos == 25) {
jam();
@@ -3557,65 +3607,57 @@
op.attrSzTotal += dataLen;
Uint32 srcSz = dataLen;
+ Uint32 usedSz = 0;
const Uint32 * src = &signal->theData[3];
- Uint32 * dst = op.dst;
- Uint32 dstSz = op.attrSzLeft;
+ Ptr<Attribute> attrPtr;
+ table.attributes.first(attrPtr);
+ Uint32 columnNo = 0;
- while(srcSz > 0) {
+ while (usedSz < srcSz)
+ {
jam();
-
- if(dstSz == 0) {
- jam();
-
- /**
- * Finished with one attribute now find next
- */
- const AttributeHeader attrHead(* src);
- const Uint32 attrId = attrHead.getAttributeId();
- const bool null = attrHead.isNULL();
- const Attribute::Data attr = table.attributes.getPtr(attrId)->data;
-
- srcSz -= attrHead.getHeaderSize();
- src += attrHead.getHeaderSize();
-
- if(null) {
- jam();
- ndbrequire(attr.nullable);
- op.nullAttribute(attr.offsetNull);
- dstSz = 0;
- continue;
- }//if
+
+ /**
+ * Finished with one attribute now find next
+ */
+ const AttributeHeader attrHead(* src);
+ const Uint32 attrId = attrHead.getAttributeId();
+ const bool null = attrHead.isNULL();
+ const Attribute::Data attr = attrPtr.p->data;
+ ndbrequire(attrId == attr.attrId);
+
+ usedSz += attrHead.getHeaderSize();
+ src += attrHead.getHeaderSize();
- dstSz = attrHead.getDataSize();
- ndbrequire(dstSz == attr.sz32);
- if(attr.fixed && ! attr.nullable) {
- jam();
- dst = op.newAttrib(attr.offset, dstSz);
- } else if (attr.fixed && attr.nullable) {
- jam();
- dst = op.newNullable(attrId, dstSz);
+ if (null) {
+ jam();
+ ndbrequire(attr.m_flags & Attribute::COL_NULLABLE);
+ op.nullVariable();
+ } else {
+ Uint32* dst;
+ Uint32 dstSz = attrHead.getDataSize();
+ if (attr.m_flags & Attribute::COL_FIXED &&
+ ! (attr.m_flags & Attribute::COL_NULLABLE)) {
+ jam();
+ dst = op.newAttrib(attr.offset, dstSz);
+ ndbrequire(dstSz == attr.sz32);
} else {
- ndbrequire(false);
- //dst = op.newVariable(attrId, attrSize);
- }//if
- }//if
-
- const Uint32 szCopy = (dstSz > srcSz) ? srcSz : dstSz;
- memcpy(dst, src, (szCopy << 2));
-
- srcSz -= szCopy;
- dstSz -= szCopy;
- src += szCopy;
- dst += szCopy;
- }//while
- op.dst = dst;
- op.attrSzLeft = dstSz;
-
- if(op.finished()){
- jam();
- op.newRecord(op.dst);
+ dst = op.newVariable(columnNo, attrHead.getByteSize());
+ ndbrequire(dstSz <= attr.sz32);
+ }
+
+ memcpy(dst, src, (dstSz << 2));
+ src += dstSz;
+ usedSz += dstSz;
+ }
+ table.attributes.next(attrPtr);
+ columnNo++;
}
+
+ ndbrequire(usedSz == srcSz);
+ ndbrequire(op.finished());
+ op.newRecord(op.dst);
}
void
@@ -3744,7 +3786,7 @@
c_backupFilePool.getPtr(filePtr, filePtrI);
filePtr.p->errorCode = ref->errorCode;
- filePtr.p->scanRunning = 0;
+ filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_SCAN_THREAD;
backupFragmentRef(signal, filePtr);
}
@@ -3784,7 +3826,7 @@
if(filePtr.p->errorCode != 0)
{
jam();
- filePtr.p->scanRunning = 0;
+ filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_SCAN_THREAD;
backupFragmentRef(signal, filePtr); // Scan completed
return;
}//if
@@ -3798,25 +3840,33 @@
return;
}//if
- filePtr.p->scanRunning = 0;
+ filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_SCAN_THREAD;
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
- BackupFragmentConf * conf = (BackupFragmentConf*)signal->getDataPtrSend();
- conf->backupId = ptr.p->backupId;
- conf->backupPtr = ptr.i;
- conf->tableId = filePtr.p->tableId;
- conf->fragmentNo = filePtr.p->fragmentNo;
- conf->noOfRecordsLow = (Uint32)(op.noOfRecords & 0xFFFFFFFF);
- conf->noOfRecordsHigh = (Uint32)(op.noOfRecords >> 32);
- conf->noOfBytesLow = (Uint32)(op.noOfBytes & 0xFFFFFFFF);
- conf->noOfBytesHigh = (Uint32)(op.noOfBytes >> 32);
- sendSignal(ptr.p->masterRef, GSN_BACKUP_FRAGMENT_CONF, signal,
- BackupFragmentConf::SignalLength, JBB);
-
- ptr.p->m_gsn = GSN_BACKUP_FRAGMENT_CONF;
- ptr.p->slaveState.setState(STARTED);
+ if (ptr.p->is_lcp())
+ {
+ ptr.p->slaveState.setState(STOPPING);
+ filePtr.p->operation.dataBuffer.eof();
+ }
+ else
+ {
+ BackupFragmentConf * conf = (BackupFragmentConf*)signal->getDataPtrSend();
+ conf->backupId = ptr.p->backupId;
+ conf->backupPtr = ptr.i;
+ conf->tableId = filePtr.p->tableId;
+ conf->fragmentNo = filePtr.p->fragmentNo;
+ conf->noOfRecordsLow = (Uint32)(op.noOfRecords & 0xFFFFFFFF);
+ conf->noOfRecordsHigh = (Uint32)(op.noOfRecords >> 32);
+ conf->noOfBytesLow = (Uint32)(op.noOfBytes & 0xFFFFFFFF);
+ conf->noOfBytesHigh = (Uint32)(op.noOfBytes >> 32);
+ sendSignal(ptr.p->masterRef, GSN_BACKUP_FRAGMENT_CONF, signal,
+ BackupFragmentConf::SignalLength, JBB);
+
+ ptr.p->m_gsn = GSN_BACKUP_FRAGMENT_CONF;
+ ptr.p->slaveState.setState(STARTED);
+ }
return;
}
@@ -3913,7 +3963,7 @@
BackupFilePtr filePtr;
c_backupFilePool.getPtr(filePtr, filePtrI);
- filePtr.p->fileRunning = 0;
+ filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_FILE_THREAD;
filePtr.p->errorCode = errCode;
checkFile(signal, filePtr);
@@ -3923,7 +3973,7 @@
Backup::execFSAPPENDCONF(Signal* signal)
{
jamEntry();
-
+
CRASH_INSERTION((10018));
//FsConf * conf = (FsConf*)signal->getDataPtr();
@@ -3940,6 +3990,69 @@
checkFile(signal, filePtr);
}
+/*
+ This routine handles two problems with writing to disk during local
+ checkpoints and backups. The first problem is that we need to limit
+ the writing to ensure that we don't use too much CPU and disk resources
+ for backups and checkpoints. The perfect solution to this is to use
+ a dynamic algorithm that adapts to the environment. Until we have
+ implemented this we can satisfy ourselves with an algorithm that
+ uses a configurable limit.
+
+ The second problem is that in Linux we can get severe problems if we
+ write very much to the disk without synching. In the worst case we
+ can have Gigabytes of data in the Linux page cache before we reach
+ the limit of how much we can write. If this happens the performance
+ will drop significantly when we reach this limit since the Linux flush
+ daemon will spend a few minutes on writing out the page cache to disk.
+ To avoid this we ensure that a file never have more than a certain
+ amount of data outstanding before synch. This variable is also
+ configurable.
+*/
+bool
+Backup::ready_to_write(bool ready, Uint32 sz, bool eof, BackupFile *fileP)
+{
+#if 0
+ ndbout << "ready_to_write: ready = " << ready << " eof = " << eof;
+ ndbout << " sz = " << sz << endl;
+ ndbout << "words this period = " << m_words_written_this_period;
+ ndbout << endl << "overflow disk write = " << m_overflow_disk_write;
+ ndbout << endl << "Current Millisecond is = ";
+ ndbout << NdbTick_CurrentMillisecond() << endl;
+#endif
+ if ((ready || eof) &&
+ m_words_written_this_period <= m_curr_disk_write_speed)
+ {
+ /*
+ We have a buffer ready to write or we have reached end of
+ file and thus we must write the last before closing the
+ file.
+ We have already check that we are allowed to write at this
+ moment. We only worry about history of last 100 milliseconds.
+ What happened before that is of no interest since a disk
+ write that was issued more than 100 milliseconds should be
+ completed by now.
+ */
+ int overflow;
+ m_words_written_this_period += sz;
+ overflow = m_words_written_this_period - m_curr_disk_write_speed;
+ if (overflow > 0)
+ m_overflow_disk_write = overflow;
+#if 0
+ ndbout << "Will write with " << endl;
+ ndbout << endl;
+#endif
+ return true;
+ }
+ else
+ {
+#if 0
+ ndbout << "Will not write now" << endl << endl;
+#endif
+ return false;
+ }
+}
+
void
Backup::checkFile(Signal* signal, BackupFilePtr filePtr)
{
@@ -3949,35 +4062,23 @@
#endif
OperationRecord & op = filePtr.p->operation;
-
- Uint32 * tmp, sz; bool eof;
- if(op.dataBuffer.getReadPtr(&tmp, &sz, &eof))
+ Uint32 *tmp = NULL;
+ Uint32 sz = 0;
+ bool eof = FALSE;
+ bool ready = op.dataBuffer.getReadPtr(&tmp, &sz, &eof);
+#if 0
+ ndbout << "Ptr to data = " << hex << tmp << endl;
+#endif
+ if (!ready_to_write(ready, sz, eof, filePtr.p))
{
jam();
-
- jam();
- FsAppendReq * req = (FsAppendReq *)signal->getDataPtrSend();
- req->filePointer = filePtr.p->filePointer;
- req->userPointer = filePtr.i;
- req->userReference = reference();
- req->varIndex = 0;
- req->offset = tmp - c_startOfPages;
- req->size = sz;
-
- sendSignal(NDBFS_REF, GSN_FSAPPENDREQ, signal,
- FsAppendReq::SignalLength, JBA);
- return;
- }
-
- if(!eof) {
- jam();
signal->theData[0] = BackupContinueB::BUFFER_UNDERFLOW;
signal->theData[1] = filePtr.i;
- sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 50, 2);
+ sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 20, 2);
return;
- }//if
-
- if(sz > 0) {
+ }
+ else if (sz > 0)
+ {
jam();
FsAppendReq * req = (FsAppendReq *)signal->getDataPtrSend();
req->filePointer = filePtr.p->filePointer;
@@ -3985,25 +4086,23 @@
req->userReference = reference();
req->varIndex = 0;
req->offset = tmp - c_startOfPages;
- req->size = sz; // Round up
+ req->size = sz;
+ req->synch_flag = 0;
sendSignal(NDBFS_REF, GSN_FSAPPENDREQ, signal,
FsAppendReq::SignalLength, JBA);
return;
- }//if
+ }
+
+ Uint32 flags = filePtr.p->m_flags;
+ filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_FILE_THREAD;
- filePtr.p->fileRunning = 0;
- filePtr.p->fileClosing = 1;
+ ndbrequire(flags & BackupFile::BF_OPEN);
+ ndbrequire(flags & BackupFile::BF_FILE_THREAD);
- FsCloseReq * req = (FsCloseReq *)signal->getDataPtrSend();
- req->filePointer = filePtr.p->filePointer;
- req->userPointer = filePtr.i;
- req->userReference = reference();
- req->fileFlag = 0;
-#ifdef DEBUG_ABORT
- ndbout_c("***** a FSCLOSEREQ filePtr.i = %u", filePtr.i);
-#endif
- sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, FsCloseReq::SignalLength, JBA);
+ BackupRecordPtr ptr;
+ c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
+ closeFile(signal, ptr, filePtr);
}
@@ -4026,10 +4125,13 @@
Uint32 result;
jamEntry();
+
c_triggerPool.getPtr(trigPtr, trigger_id);
+
c_tablePool.getPtr(tabPtr, trigPtr.p->tab_ptr_i);
tabPtr.p->fragments.getPtr(fragPtr, frag_id);
if (fragPtr.p->node != getOwnNodeId()) {
+
jam();
result = ZFALSE;
} else {
@@ -4050,12 +4152,12 @@
TriggerPtr trigPtr;
c_triggerPool.getPtr(trigPtr, trg->getTriggerId());
ndbrequire(trigPtr.p->event != ILLEGAL_TRIGGER_ID); // Online...
-
+
if(trigPtr.p->errorCode != 0) {
jam();
return;
}//if
-
+
if(trg->getAttrInfoType() == TrigAttrInfo::BEFORE_VALUES) {
jam();
/**
@@ -4092,18 +4194,29 @@
memcpy(signal->getDataPtrSend(), save, 4*TrigAttrInfo::StaticLength);
return;
}//if
-
+
logEntry = (BackupFormat::LogFile::LogEntry *)dst;
trigPtr.p->logEntry = logEntry;
logEntry->Length = 0;
logEntry->TableId = htonl(trigPtr.p->tableId);
- logEntry->TriggerEvent = htonl(trigPtr.p->event);
+
+
+ if(trigPtr.p->event==0)
+ logEntry->TriggerEvent= htonl(TriggerEvent::TE_INSERT);
+ else if(trigPtr.p->event==1)
+ logEntry->TriggerEvent= htonl(TriggerEvent::TE_UPDATE);
+ else if(trigPtr.p->event==2)
+ logEntry->TriggerEvent= htonl(TriggerEvent::TE_DELETE);
+ else {
+ ndbout << "Bad Event: " << trigPtr.p->event << endl;
+ ndbrequire(false);
+ }
} else {
ndbrequire(logEntry->TableId == htonl(trigPtr.p->tableId));
- ndbrequire(logEntry->TriggerEvent == htonl(trigPtr.p->event));
+// ndbrequire(logEntry->TriggerEvent == htonl(trigPtr.p->event));
}//if
-
- const Uint32 pos = logEntry->Length;
+
+ const Uint32 pos = logEntry->Length;
const Uint32 dataLen = signal->length() - TrigAttrInfo::StaticLength;
memcpy(&logEntry->Data[pos], trg->getData(), dataLen << 2);
@@ -4118,6 +4231,7 @@
const Uint32 gci = trg->getGCI();
const Uint32 trI = trg->getTriggerId();
+ const Uint32 fragId = trg->fragId;
TriggerPtr trigPtr;
c_triggerPool.getPtr(trigPtr, trI);
@@ -4131,19 +4245,19 @@
ndbrequire(trigPtr.p->logEntry != 0);
Uint32 len = trigPtr.p->logEntry->Length;
+ trigPtr.p->logEntry->FragId = htonl(fragId);
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, trigPtr.p->backupPtr);
- if(gci != ptr.p->currGCP)
+ if(gci != ptr.p->currGCP)
{
jam();
-
- trigPtr.p->logEntry->TriggerEvent = htonl(trigPtr.p->event | 0x10000);
+ trigPtr.p->logEntry->TriggerEvent|= htonl(0x10000);
trigPtr.p->logEntry->Data[len] = htonl(gci);
- len ++;
+ len++;
ptr.p->currGCP = gci;
- }//if
-
+ }
+
len += (sizeof(BackupFormat::LogFile::LogEntry) >> 2) - 2;
trigPtr.p->logEntry->Length = htonl(len);
@@ -4199,7 +4313,7 @@
* At least one GCP must have passed
*/
ndbrequire(stopGCP > startGCP);
-
+
/**
* Get backup record
*/
@@ -4208,48 +4322,13 @@
ptr.p->slaveState.setState(STOPPING);
ptr.p->m_gsn = GSN_STOP_BACKUP_REQ;
+ ptr.p->startGCP= startGCP;
+ ptr.p->stopGCP= stopGCP;
/**
- * Insert footers
+ * Destroy the triggers in local DBTUP we created
*/
- {
- BackupFilePtr filePtr;
- ptr.p->files.getPtr(filePtr, ptr.p->logFilePtr);
- Uint32 * dst;
- ndbrequire(filePtr.p->operation.dataBuffer.getWritePtr(&dst, 1));
- * dst = 0;
- filePtr.p->operation.dataBuffer.updateWritePtr(1);
- }
-
- {
- BackupFilePtr filePtr;
- ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
-
- const Uint32 gcpSz = sizeof(BackupFormat::CtlFile::GCPEntry) >> 2;
-
- Uint32 * dst;
- ndbrequire(filePtr.p->operation.dataBuffer.getWritePtr(&dst, gcpSz));
-
- BackupFormat::CtlFile::GCPEntry * gcp =
- (BackupFormat::CtlFile::GCPEntry*)dst;
-
- gcp->SectionType = htonl(BackupFormat::GCP_ENTRY);
- gcp->SectionLength = htonl(gcpSz);
- gcp->StartGCP = htonl(startGCP);
- gcp->StopGCP = htonl(stopGCP - 1);
- filePtr.p->operation.dataBuffer.updateWritePtr(gcpSz);
-
- {
- TablePtr tabPtr;
- ptr.p->tables.first(tabPtr);
-
- signal->theData[0] = BackupContinueB::BACKUP_FRAGMENT_INFO;
- signal->theData[1] = ptr.i;
- signal->theData[2] = tabPtr.i;
- signal->theData[3] = 0;
- sendSignal(BACKUP_REF, GSN_CONTINUEB, signal, 4, JBB);
- }
- }
+ sendDropTrig(signal, ptr);
}
void
@@ -4262,7 +4341,8 @@
int openCount = 0;
for(ptr.p->files.first(filePtr); filePtr.i!=RNIL; ptr.p->files.next(filePtr))
{
- if(filePtr.p->fileOpened == 0) {
+ if(! (filePtr.p->m_flags & BackupFile::BF_OPEN))
+ {
jam();
continue;
}
@@ -4270,34 +4350,26 @@
jam();
openCount++;
- if(filePtr.p->fileClosing == 1){
+ if(filePtr.p->m_flags & BackupFile::BF_CLOSING)
+ {
jam();
continue;
}//if
- filePtr.p->fileClosing = 1;
-
- if(filePtr.p->fileRunning == 1){
+ filePtr.p->operation.dataBuffer.eof();
+ if(filePtr.p->m_flags & BackupFile::BF_FILE_THREAD)
+ {
jam();
#ifdef DEBUG_ABORT
ndbout_c("Close files fileRunning == 1, filePtr.i=%u", filePtr.i);
#endif
- filePtr.p->operation.dataBuffer.eof();
- } else {
+ }
+ else
+ {
jam();
-
- FsCloseReq * req = (FsCloseReq *)sig->getDataPtrSend();
- req->filePointer = filePtr.p->filePointer;
- req->userPointer = filePtr.i;
- req->userReference = reference();
- req->fileFlag = 0;
-#ifdef DEBUG_ABORT
- ndbout_c("***** b FSCLOSEREQ filePtr.i = %u", filePtr.i);
-#endif
- sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, sig,
- FsCloseReq::SignalLength, JBA);
- }//if
- }//for
+ closeFile(sig, ptr, filePtr);
+ }
+ }
if(openCount == 0){
jam();
@@ -4306,6 +4378,33 @@
}
void
+Backup::closeFile(Signal* signal, BackupRecordPtr ptr, BackupFilePtr filePtr)
+{
+ ndbrequire(filePtr.p->m_flags & BackupFile::BF_OPEN);
+ ndbrequire(! (filePtr.p->m_flags & BackupFile::BF_OPENING));
+ ndbrequire(! (filePtr.p->m_flags & BackupFile::BF_CLOSING));
+ filePtr.p->m_flags |= BackupFile::BF_CLOSING;
+
+ FsCloseReq * req = (FsCloseReq *)signal->getDataPtrSend();
+ req->filePointer = filePtr.p->filePointer;
+ req->userPointer = filePtr.i;
+ req->userReference = reference();
+ req->fileFlag = 0;
+
+ if (ptr.p->errorCode)
+ {
+ FsCloseReq::setRemoveFileFlag(req->fileFlag, 1);
+ }
+
+#ifdef DEBUG_ABORT
+ ndbout_c("***** a FSCLOSEREQ filePtr.i = %u flags: %x",
+ filePtr.i, filePtr.p->m_flags);
+#endif
+ sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, FsCloseReq::SignalLength, JBA);
+
+}
+
+void
Backup::execFSCLOSEREF(Signal* signal)
{
jamEntry();
@@ -4319,7 +4418,6 @@
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
- filePtr.p->fileOpened = 1;
FsConf * conf = (FsConf*)signal->getDataPtr();
conf->userPointer = filePtrI;
@@ -4340,44 +4438,49 @@
#ifdef DEBUG_ABORT
ndbout_c("***** FSCLOSECONF filePtrI = %u", filePtrI);
#endif
-
- ndbrequire(filePtr.p->fileClosing == 1);
- ndbrequire(filePtr.p->fileOpened == 1);
- ndbrequire(filePtr.p->fileRunning == 0);
- ndbrequire(filePtr.p->scanRunning == 0);
- filePtr.p->fileOpened = 0;
+ ndbrequire(filePtr.p->m_flags == (BackupFile::BF_OPEN |
+ BackupFile::BF_CLOSING));
+
+ filePtr.p->m_flags &= ~(Uint32)(BackupFile::BF_OPEN |BackupFile::BF_CLOSING);
+ filePtr.p->operation.dataBuffer.reset();
+
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
- for(ptr.p->files.first(filePtr); filePtr.i!=RNIL;ptr.p->files.next(filePtr))
- {
- jam();
- if(filePtr.p->fileOpened == 1) {
- jam();
-#ifdef DEBUG_ABORT
- ndbout_c("waiting for more FSCLOSECONF's filePtr.i = %u", filePtr.i);
-#endif
- return; // we will be getting more FSCLOSECONF's
- }//if
- }//for
- closeFilesDone(signal, ptr);
+ closeFiles(signal, ptr);
}
void
Backup::closeFilesDone(Signal* signal, BackupRecordPtr ptr)
{
jam();
+
+ if(ptr.p->is_lcp())
+ {
+ lcp_close_file_conf(signal, ptr);
+ return;
+ }
jam();
- BackupFilePtr filePtr;
- ptr.p->files.getPtr(filePtr, ptr.p->logFilePtr);
-
+
StopBackupConf* conf = (StopBackupConf*)signal->getDataPtrSend();
conf->backupId = ptr.p->backupId;
conf->backupPtr = ptr.i;
- conf->noOfLogBytes = filePtr.p->operation.noOfBytes;
- conf->noOfLogRecords = filePtr.p->operation.noOfRecords;
+
+ BackupFilePtr filePtr;
+ if(ptr.p->logFilePtr != RNIL)
+ {
+ ptr.p->files.getPtr(filePtr, ptr.p->logFilePtr);
+ conf->noOfLogBytes= filePtr.p->operation.noOfBytes;
+ conf->noOfLogRecords= filePtr.p->operation.noOfRecords;
+ }
+ else
+ {
+ conf->noOfLogBytes= 0;
+ conf->noOfLogRecords= 0;
+ }
+
sendSignal(ptr.p->masterRef, GSN_STOP_BACKUP_CONF, signal,
StopBackupConf::SignalLength, JBB);
@@ -4508,19 +4611,10 @@
ptr.p->masterRef = reference();
ptr.p->nodes.clear();
ptr.p->nodes.set(getOwnNodeId());
-
- if(ref == reference())
- {
- ptr.p->stopGCP= ptr.p->startGCP + 1;
- sendDropTrig(signal, ptr);
- }
- else
- {
- ptr.p->masterData.gsn = GSN_STOP_BACKUP_REQ;
- ptr.p->masterData.sendCounter.clearWaitingFor();
- ptr.p->masterData.sendCounter.setWaitingFor(getOwnNodeId());
- closeFiles(signal, ptr);
- }
+
+
+ ptr.p->stopGCP= ptr.p->startGCP + 1;
+ sendStopBackup(signal, ptr);
}
@@ -4560,12 +4654,8 @@
filePtr.i != RNIL;
ptr.p->files.next(filePtr)) {
jam();
- ndbout_c("filePtr.i = %u, filePtr.p->fileOpened=%u fileRunning=%u "
- "scanRunning=%u",
- filePtr.i,
- filePtr.p->fileOpened,
- filePtr.p->fileRunning,
- filePtr.p->scanRunning);
+ ndbout_c("filePtr.i = %u, flags: H'%x ",
+ filePtr.i, filePtr.p->m_flags);
}//for
}
}
@@ -4599,20 +4689,16 @@
}//for
BackupFilePtr filePtr;
- for(ptr.p->files.first(filePtr);
- filePtr.i != RNIL;
- ptr.p->files.next(filePtr)) {
- jam();
- ndbrequire(filePtr.p->fileOpened == 0);
- ndbrequire(filePtr.p->fileRunning == 0);
- ndbrequire(filePtr.p->scanRunning == 0);
+ for(ptr.p->files.first(filePtr);filePtr.i != RNIL;ptr.p->files.next(filePtr))
+ {
+ jam();
+ ndbrequire(filePtr.p->m_flags == 0);
filePtr.p->pages.release();
}//for
ptr.p->files.release();
ptr.p->tables.release();
ptr.p->triggers.release();
- ptr.p->pages.release();
ptr.p->backupId = ~0;
if(ptr.p->checkError())
@@ -4667,3 +4753,215 @@
c_backups.release(ptr);
}
+/**
+ * LCP
+ */
+void
+Backup::execLCP_PREPARE_REQ(Signal* signal)
+{
+ jamEntry();
+ LcpPrepareReq req = *(LcpPrepareReq*)signal->getDataPtr();
+
+ BackupRecordPtr ptr;
+ c_backupPool.getPtr(ptr, req.backupPtr);
+
+ ptr.p->m_gsn = GSN_LCP_PREPARE_REQ;
+
+ TablePtr tabPtr;
+ FragmentPtr fragPtr;
+ if (!ptr.p->tables.isEmpty())
+ {
+ jam();
+ ndbrequire(ptr.p->errorCode);
+ ptr.p->tables.first(tabPtr);
+ if (tabPtr.p->tableId == req.tableId)
+ {
+ jam();
+ ndbrequire(!tabPtr.p->fragments.empty());
+ tabPtr.p->fragments.getPtr(fragPtr, 0);
+ fragPtr.p->fragmentId = req.fragmentId;
+ defineBackupRef(signal, ptr, ptr.p->errorCode);
+ return;
+ }
+ else
+ {
+ jam();
+ tabPtr.p->attributes.release();
+ tabPtr.p->fragments.release();
+ ptr.p->tables.release();
+ ptr.p->errorCode = 0;
+ // fall-through
+ }
+ }
+
+ if(!ptr.p->tables.seize(tabPtr) || !tabPtr.p->fragments.seize(1))
+ {
+ if(!tabPtr.isNull())
+ ptr.p->tables.release();
+ ndbrequire(false); // TODO
+ }
+ tabPtr.p->tableId = req.tableId;
+ tabPtr.p->fragments.getPtr(fragPtr, 0);
+ tabPtr.p->tableType = DictTabInfo::UserTable;
+ fragPtr.p->fragmentId = req.fragmentId;
+ fragPtr.p->lcp_no = req.lcpNo;
+ fragPtr.p->scanned = 0;
+ fragPtr.p->scanning = 0;
+ fragPtr.p->tableId = req.tableId;
+
+ ptr.p->backupId= req.backupId;
+ lcp_open_file(signal, ptr);
+}
+
+void
+Backup::lcp_close_file_conf(Signal* signal, BackupRecordPtr ptr)
+{
+ jam();
+
+ TablePtr tabPtr;
+ ndbrequire(ptr.p->tables.first(tabPtr));
+ Uint32 tableId = tabPtr.p->tableId;
+
+ BackupFilePtr filePtr;
+ c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr);
+ ndbrequire(filePtr.p->m_flags == 0);
+
+ if (ptr.p->m_gsn == GSN_LCP_PREPARE_REQ)
+ {
+ jam();
+ defineBackupRef(signal, ptr, ptr.p->errorCode);
+ return;
+ }
+
+ FragmentPtr fragPtr;
+ tabPtr.p->fragments.getPtr(fragPtr, 0);
+ Uint32 fragmentId = fragPtr.p->fragmentId;
+
+ tabPtr.p->attributes.release();
+ tabPtr.p->fragments.release();
+ ptr.p->tables.release();
+ ptr.p->errorCode = 0;
+
+ BackupFragmentConf * conf = (BackupFragmentConf*)signal->getDataPtrSend();
+ conf->backupId = ptr.p->backupId;
+ conf->backupPtr = ptr.i;
+ conf->tableId = tableId;
+ conf->fragmentNo = fragmentId;
+ conf->noOfRecordsLow = 0;
+ conf->noOfRecordsHigh = 0;
+ conf->noOfBytesLow = 0;
+ conf->noOfBytesHigh = 0;
+ sendSignal(ptr.p->masterRef, GSN_BACKUP_FRAGMENT_CONF, signal,
+ BackupFragmentConf::SignalLength, JBB);
+}
+
+void
+Backup::lcp_open_file(Signal* signal, BackupRecordPtr ptr)
+{
+ FsOpenReq * req = (FsOpenReq *)signal->getDataPtrSend();
+ req->userReference = reference();
+ req->fileFlags =
+ FsOpenReq::OM_WRITEONLY |
+ FsOpenReq::OM_TRUNCATE |
+ FsOpenReq::OM_CREATE |
+ FsOpenReq::OM_APPEND |
+ FsOpenReq::OM_AUTOSYNC;
+ FsOpenReq::v2_setCount(req->fileNumber, 0xFFFFFFFF);
+ req->auto_sync_size = c_defaults.m_disk_synch_size;
+
+ TablePtr tabPtr;
+ FragmentPtr fragPtr;
+
+ ndbrequire(ptr.p->tables.first(tabPtr));
+ tabPtr.p->fragments.getPtr(fragPtr, 0);
+
+ /**
+ * Lcp file
+ */
+ BackupFilePtr filePtr;
+ c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr);
+ ndbrequire(filePtr.p->m_flags == 0);
+ filePtr.p->m_flags |= BackupFile::BF_OPENING;
+ filePtr.p->tableId = RNIL; // Will force init
+ req->userPointer = filePtr.i;
+ FsOpenReq::setVersion(req->fileNumber, 5);
+ FsOpenReq::setSuffix(req->fileNumber, FsOpenReq::S_DATA);
+ FsOpenReq::v5_setLcpNo(req->fileNumber, fragPtr.p->lcp_no);
+ FsOpenReq::v5_setTableId(req->fileNumber, tabPtr.p->tableId);
+ FsOpenReq::v5_setFragmentId(req->fileNumber, fragPtr.p->fragmentId);
+ sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBA);
+}
+
+void
+Backup::lcp_open_file_done(Signal* signal, BackupRecordPtr ptr)
+{
+ TablePtr tabPtr;
+ FragmentPtr fragPtr;
+
+ ndbrequire(ptr.p->tables.first(tabPtr));
+ tabPtr.p->fragments.getPtr(fragPtr, 0);
+
+ BackupFilePtr filePtr;
+ c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr);
+ ndbrequire(filePtr.p->m_flags ==
+ (BackupFile::BF_OPEN | BackupFile::BF_LCP_META));
+ filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_LCP_META;
+
+ ptr.p->slaveState.setState(STARTED);
+
+ LcpPrepareConf* conf= (LcpPrepareConf*)signal->getDataPtrSend();
+ conf->senderData = ptr.p->clientData;
+ conf->senderRef = reference();
+ conf->tableId = tabPtr.p->tableId;
+ conf->fragmentId = fragPtr.p->fragmentId;
+ sendSignal(ptr.p->masterRef, GSN_LCP_PREPARE_CONF,
+ signal, LcpPrepareConf::SignalLength, JBB);
+
+ /**
+ * Start file thread
+ */
+ filePtr.p->m_flags |= BackupFile::BF_FILE_THREAD;
+
+ signal->theData[0] = BackupContinueB::START_FILE_THREAD;
+ signal->theData[1] = filePtr.i;
+ signal->theData[2] = __LINE__;
+ sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 3);
+}
+
+void
+Backup::execEND_LCPREQ(Signal* signal)
+{
+ EndLcpReq* req= (EndLcpReq*)signal->getDataPtr();
+
+ BackupRecordPtr ptr;
+ c_backupPool.getPtr(ptr, req->backupPtr);
+ ndbrequire(ptr.p->backupId == req->backupId);
+
+ BackupFilePtr filePtr;
+ ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
+ ndbrequire(filePtr.p->m_flags == 0);
+
+ if (!ptr.p->tables.isEmpty())
+ {
+ jam();
+ ndbrequire(ptr.p->errorCode);
+ TablePtr tabPtr;
+ ptr.p->tables.first(tabPtr);
+ tabPtr.p->attributes.release();
+ tabPtr.p->fragments.release();
+ ptr.p->tables.release();
+ ptr.p->errorCode = 0;
+ }
+
+ ptr.p->errorCode = 0;
+ ptr.p->slaveState.setState(CLEANING);
+ ptr.p->slaveState.setState(INITIAL);
+ ptr.p->slaveState.setState(DEFINING);
+ ptr.p->slaveState.setState(DEFINED);
+
+ EndLcpConf* conf= (EndLcpConf*)signal->getDataPtr();
+ conf->senderData = ptr.p->clientData;
+ conf->senderRef = reference();
+ sendSignal(ptr.p->masterRef, GSN_END_LCPCONF,
+ signal, EndLcpConf::SignalLength, JBB);
+}
--- 1.7.7.1/ndb/src/kernel/blocks/backup/Backup.hpp 2006-10-20 16:25:18 +02:00
+++ 1.26/storage/ndb/src/kernel/blocks/backup/Backup.hpp 2006-10-20 16:25:18 +02:00
@@ -27,11 +27,14 @@
#include <SimpleProperties.hpp>
#include <SLList.hpp>
-#include <ArrayList.hpp>
+#include <DLFifoList.hpp>
+#include <DLCFifoList.hpp>
#include <SignalCounter.hpp>
#include <blocks/mutexes.hpp>
#include <NdbTCP.h>
+#include <NdbTick.h>
+#include <Array.hpp>
/**
* Backup - This block manages database backup and restore
@@ -39,7 +42,7 @@
class Backup : public SimulatedBlock
{
public:
- Backup(const Configuration & conf);
+ Backup(Block_context& ctx);
virtual ~Backup();
BLOCK_DEFINES(Backup);
@@ -96,8 +99,6 @@
void execGET_TABINFO_CONF(Signal* signal);
void execCREATE_TRIG_REF(Signal* signal);
void execCREATE_TRIG_CONF(Signal* signal);
- void execALTER_TRIG_REF(Signal* signal);
- void execALTER_TRIG_CONF(Signal* signal);
void execDROP_TRIG_REF(Signal* signal);
void execDROP_TRIG_CONF(Signal* signal);
@@ -148,7 +149,9 @@
void execWAIT_GCP_REF(Signal* signal);
void execWAIT_GCP_CONF(Signal* signal);
-
+ void execLCP_PREPARE_REQ(Signal* signal);
+ void execLCP_FRAGMENT_REQ(Signal*);
+ void execEND_LCPREQ(Signal* signal);
private:
void defineBackupMutex_locked(Signal* signal, Uint32 ptrI,Uint32 retVal);
void dictCommitTableMutex_locked(Signal* signal, Uint32 ptrI,Uint32 retVal);
@@ -170,26 +173,34 @@
typedef Ptr<Page32> Page32Ptr;
struct Attribute {
+ enum Flags {
+ COL_NULLABLE = 0x1,
+ COL_FIXED = 0x2,
+ COL_DISK = 0x4
+ };
struct Data {
- Uint8 nullable;
- Uint8 fixed;
- Uint8 unused;
- Uint8 unused2;
+ Uint16 m_flags;
+ Uint16 attrId;
Uint32 sz32; // No of 32 bit words
Uint32 offset; // Relative DataFixedAttributes/DataFixedKeys
Uint32 offsetNull; // In NullBitmask
} data;
- Uint32 nextPool;
+ union {
+ Uint32 nextPool;
+ Uint32 nextList;
+ };
+ Uint32 prevList;
};
typedef Ptr<Attribute> AttributePtr;
struct Fragment {
Uint64 noOfRecords;
Uint32 tableId;
- Uint8 node;
- Uint8 scanned; // 0 = not scanned x = scanned by node x
- Uint8 scanning; // 0 = not scanning x = scanning on node x
- Uint8 unused1;
+ Uint16 node;
+ Uint16 fragmentId;
+ Uint8 scanned; // 0 = not scanned x = scanned by node x
+ Uint8 scanning; // 0 = not scanning x = scanning on node x
+ Uint8 lcp_no;
Uint32 nextPool;
};
typedef Ptr<Fragment> FragmentPtr;
@@ -209,7 +220,7 @@
Uint32 triggerIds[3];
bool triggerAllocated[3];
- Array<Attribute> attributes;
+ DLFifoList<Attribute> attributes;
Array<Fragment> fragments;
Uint32 nextList;
@@ -248,6 +259,7 @@
/**
* Per attribute
*/
+ void nullVariable();
void nullAttribute(Uint32 nullOffset);
Uint32 * newNullable(Uint32 attrId, Uint32 sz);
Uint32 * newAttrib(Uint32 offset, Uint32 sz);
@@ -269,7 +281,6 @@
public:
Uint32* dst;
- Uint32 attrSzLeft; // No of words missing for current attribute
Uint32 attrSzTotal; // No of AI words received
Uint32 tablePtr; // Ptr.i to current table
@@ -334,10 +345,16 @@
Uint32 nextList;
union { Uint32 prevList; Uint32 nextPool; };
- Uint8 fileOpened;
- Uint8 fileRunning;
- Uint8 fileClosing;
- Uint8 scanRunning;
+ enum {
+ BF_OPEN = 0x1
+ ,BF_OPENING = 0x2
+ ,BF_CLOSING = 0x4
+ ,BF_FILE_THREAD = 0x8
+ ,BF_SCAN_THREAD = 0x10
+ ,BF_LCP_META = 0x20
+ };
+ Uint32 m_flags;
+ Uint32 m_pos;
};
typedef Ptr<BackupFile> BackupFilePtr;
@@ -346,14 +363,14 @@
* State for BackupRecord
*/
enum State {
- INITIAL,
- DEFINING, // Defining backup content and parameters
- DEFINED, // DEFINE_BACKUP_CONF sent in slave, received all in master
- STARTED, // Creating triggers
- SCANNING, // Scanning fragments
- STOPPING, // Closing files
- CLEANING, // Cleaning resources
- ABORTING // Aborting backup
+ INITIAL = 0,
+ DEFINING = 1, // Defining backup content and parameters
+ DEFINED = 2, // DEFINE_BACKUP_CONF sent in slave, received all in master
+ STARTED = 3, // Creating triggers
+ SCANNING = 4, // Scanning fragments
+ STOPPING = 5, // Closing files
+ CLEANING = 6, // Cleaning resources
+ ABORTING = 7 // Aborting backup
};
static const Uint32 validSlaveTransitionsCount;
@@ -403,13 +420,14 @@
* One record per backup
*/
struct BackupRecord {
- BackupRecord(Backup& b, ArrayPool<Page32> & pp,
+ BackupRecord(Backup& b,
ArrayPool<Table> & tp,
ArrayPool<BackupFile> & bp,
ArrayPool<TriggerRecord> & trp)
: slaveState(b, validSlaveTransitions, validSlaveTransitionsCount,1)
- , tables(tp), triggers(trp), files(bp), pages(pp)
+ , tables(tp), triggers(trp), files(bp)
, masterData(b), backup(b)
+ , ctlFilePtr(RNIL), logFilePtr(RNIL), dataFilePtr(RNIL)
{
}
@@ -419,6 +437,7 @@
Uint32 clientRef;
Uint32 clientData;
Uint32 flags;
+ Uint32 signalNo;
Uint32 backupId;
Uint32 backupKey[2];
Uint32 masterRef;
@@ -433,7 +452,7 @@
Uint32 startGCP;
Uint32 currGCP;
Uint32 stopGCP;
- DLList<Table> tables;
+ DLCFifoList<Table> tables;
SLList<TriggerRecord> triggers;
SLList<BackupFile> files;
@@ -442,9 +461,19 @@
Uint32 dataFilePtr; // Ptr.i to first data-file
Uint32 backupDataLen; // Used for (un)packing backup request
- Array<Page32> pages; // Used for (un)packing backup request
SimpleProperties props;// Used for (un)packing backup request
-
+
+ struct SlaveData {
+ SignalCounter trigSendCounter;
+ Uint32 gsn;
+ struct {
+ Uint32 tableId;
+ } createTrig;
+ struct {
+ Uint32 tableId;
+ } dropTrig;
+ } slaveData;
+
struct MasterData {
MasterData(Backup & b)
{
@@ -455,15 +484,6 @@
Uint32 gsn;
SignalCounter sendCounter;
Uint32 errorCode;
- struct {
- Uint32 tableId;
- } createTrig;
- struct {
- Uint32 tableId;
- } dropTrig;
- struct {
- Uint32 tableId;
- } alterTrig;
union {
struct {
Uint32 startBackup;
@@ -491,6 +511,10 @@
return errorCode != 0;
}
+ bool is_lcp() const {
+ return backupDataLen == ~(Uint32)0;
+ }
+
Backup & backup;
BlockNumber number() const { return backup.number(); }
void progError(int line, int cause, const char * extra) {
@@ -505,6 +529,12 @@
Uint32 m_logBufferSize;
Uint32 m_minWriteSize;
Uint32 m_maxWriteSize;
+ Uint32 m_lcp_buffer_size;
+
+ Uint32 m_disk_write_speed_sr;
+ Uint32 m_disk_write_speed;
+ Uint32 m_disk_synch_size;
+ Uint32 m_diskless;
};
/**
@@ -516,12 +546,21 @@
NdbNodeBitmask c_aliveNodes;
DLList<BackupRecord> c_backups;
Config c_defaults;
- Uint32 m_diskless;
- STATIC_CONST(NO_OF_PAGES_META_FILE =
- (MAX_WORDS_META_FILE + BACKUP_WORDS_PER_PAGE - 1) /
- BACKUP_WORDS_PER_PAGE);
+ /*
+ Variables that control checkpoint to disk speed
+ */
+ Uint32 m_curr_disk_write_speed;
+ Uint32 m_words_written_this_period;
+ Uint32 m_overflow_disk_write;
+ Uint32 m_reset_delay_used;
+ NDB_TICKS m_reset_disk_speed_time;
+ static const int DISK_SPEED_CHECK_DELAY = 100;
+ STATIC_CONST(NO_OF_PAGES_META_FILE =
+ (MAX_WORDS_META_FILE + BACKUP_WORDS_PER_PAGE - 1) /
+ BACKUP_WORDS_PER_PAGE);
+
/**
* Pools
*/
@@ -546,6 +585,7 @@
void openFiles(Signal* signal, BackupRecordPtr ptr);
void openFilesReply(Signal*, BackupRecordPtr ptr, BackupFilePtr);
void closeFiles(Signal*, BackupRecordPtr ptr);
+ void closeFile(Signal*, BackupRecordPtr, BackupFilePtr);
void closeFilesDone(Signal*, BackupRecordPtr ptr);
void sendDefineBackupReq(Signal *signal, BackupRecordPtr ptr);
@@ -553,7 +593,7 @@
void defineBackupReply(Signal* signal, BackupRecordPtr ptr, Uint32 nodeId);
void createTrigReply(Signal* signal, BackupRecordPtr ptr);
void alterTrigReply(Signal* signal, BackupRecordPtr ptr);
- void startBackupReply(Signal* signal, BackupRecordPtr ptr, Uint32, Uint32);
+ void startBackupReply(Signal* signal, BackupRecordPtr ptr, Uint32);
void stopBackupReply(Signal* signal, BackupRecordPtr ptr, Uint32 nodeId);
void defineBackupRef(Signal*, BackupRecordPtr, Uint32 errCode = 0);
@@ -597,7 +637,7 @@
NodeId getMasterNodeId() const { return c_masterNodeId; }
bool findTable(const BackupRecordPtr &, TablePtr &, Uint32 tableId) const;
- TablePtr parseTableDescription(Signal*, BackupRecordPtr ptr, Uint32 len);
+ bool parseTableDescription(Signal*, BackupRecordPtr ptr, TablePtr, const Uint32*, Uint32);
bool insertFileHeader(BackupFormat::FileType, BackupRecord*, BackupFile*);
void sendBackupRef(Signal* signal, BackupRecordPtr ptr, Uint32 errorCode);
@@ -611,6 +651,12 @@
void sendSTTORRY(Signal*);
void createSequence(Signal* signal);
void createSequenceReply(Signal*, class UtilSequenceConf *);
+
+ void lcp_open_file(Signal* signal, BackupRecordPtr ptr);
+ void lcp_open_file_done(Signal*, BackupRecordPtr);
+ void lcp_close_file_conf(Signal* signal, BackupRecordPtr);
+
+ bool ready_to_write(bool ready, Uint32 sz, bool eof, BackupFile *fileP);
};
inline
@@ -623,14 +669,13 @@
dst_VariableData = (BackupFormat::DataFile::VariableData*)p;
BitmaskImpl::clear(sz_Bitmask, dst_Bitmask);
attrLeft = noOfAttributes;
- attrSzLeft = attrSzTotal = 0;
+ attrSzTotal = 0;
}
inline
Uint32 *
Backup::OperationRecord::newAttrib(Uint32 offset, Uint32 sz){
attrLeft--;
- attrSzLeft = sz;
dst = dst_FixedAttribs + offset;
return dst;
}
@@ -643,16 +688,24 @@
}
inline
+void
+Backup::OperationRecord::nullVariable()
+{
+ attrLeft --;
+}
+
+inline
Uint32 *
Backup::OperationRecord::newNullable(Uint32 id, Uint32 sz){
+ Uint32 sz32 = (sz + 3) >> 2;
+
attrLeft--;
- attrSzLeft = sz;
dst = &dst_VariableData->Data[0];
dst_VariableData->Sz = htonl(sz);
dst_VariableData->Id = htonl(id);
- dst_VariableData = (BackupFormat::DataFile::VariableData *)(dst + sz);
+ dst_VariableData = (BackupFormat::DataFile::VariableData *)(dst + sz32);
// Clear all bits on newRecord -> dont need to clear this
// BitmaskImpl::clear(sz_Bitmask, dst_Bitmask, offsetNull);
@@ -662,21 +715,22 @@
inline
Uint32 *
Backup::OperationRecord::newVariable(Uint32 id, Uint32 sz){
+ Uint32 sz32 = (sz + 3) >> 2;
+
attrLeft--;
- attrSzLeft = sz;
dst = &dst_VariableData->Data[0];
dst_VariableData->Sz = htonl(sz);
dst_VariableData->Id = htonl(id);
- dst_VariableData = (BackupFormat::DataFile::VariableData *)(dst + sz);
+ dst_VariableData = (BackupFormat::DataFile::VariableData *)(dst + sz32);
return dst;
}
inline
bool
Backup::OperationRecord::finished(){
- if(attrLeft != 0 || attrSzLeft != 0){
+ if(attrLeft != 0){
return false;
}
| Thread |
|---|
| • bk commit into 5.1 tree (jonas:1.2321) | jonas | 20 Oct |