Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes
will be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2008-05-09 09:09:22+02:00, jonas@stripped +3 -0
ndb - wl3600++
- misc cleanup
1) remove unused variables and struct in dict
2) replace the awful c_schemaRecord.schemaPage != 0 with a enum
and replaced the "ugly" assignment for masterCheckSchemaStatus
byt a logical memcpy
- remove backup ongoing
This patch was originally planned to be the grand unified object state-patch
However, lazyness made into simply removing BACKUP_ONGOING and replacing
this with a variable on the table-record.
a step in the right direction, although not grand
- flush schema phases
This patch introduces 3 new transaction phases
flush (prepare/commit/complete)
They are all signal ping-ponged and state checked, but does not actually
do anything serious.
storage/ndb/include/kernel/signaldata/SchemaTransImpl.hpp@stripped, 2008-05-09
09:09:19+02:00, jonas@stripped +9 -8
introduce new request types for flushing schema
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp@stripped, 2008-05-09 09:09:20+02:00,
jonas@stripped +278 -88
remove backup ongoing
remove unused variables/structs
introduce 3 new transaction phases
storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp@stripped, 2008-05-09 09:09:20+02:00,
jonas@stripped +22 -26
remove backup ongoing
remove unused variables/structs
introduce 3 new transaction phases
diff -Nrup a/storage/ndb/include/kernel/signaldata/SchemaTransImpl.hpp
b/storage/ndb/include/kernel/signaldata/SchemaTransImpl.hpp
--- a/storage/ndb/include/kernel/signaldata/SchemaTransImpl.hpp 2008-04-28 12:45:51 +02:00
+++ b/storage/ndb/include/kernel/signaldata/SchemaTransImpl.hpp 2008-05-09 09:09:19 +02:00
@@ -26,14 +26,15 @@ struct SchemaTransImplReq
{
RT_START = 0x0,
RT_PARSE = 0x1,
- RT_PREPARE = 0x2,
- RT_ABORT_PARSE = 0x3,
- RT_ABORT_PREPARE = 0x4,
- RT_COMMIT = 0x5,
-
- RT_COMPLETE = 0x6,// Not yet used
- RT_FLUSH_SCHEMA = 0x7,// Not yet used
- RT_END = 0x8 // release...
+ RT_FLUSH_PREPARE = 0x2,
+ RT_PREPARE = 0x3,
+ RT_ABORT_PARSE = 0x4,
+ RT_ABORT_PREPARE = 0x5,
+ RT_FLUSH_COMMIT = 0x6,
+ RT_COMMIT = 0x7,
+ RT_FLUSH_COMPLETE= 0x8,
+ RT_COMPLETE = 0x9,
+ RT_END = 0xa // release...
};
STATIC_CONST( SignalLength = 9 );
diff -Nrup a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2008-04-29 19:39:47 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2008-05-09 09:09:20 +02:00
@@ -708,8 +708,7 @@ Dbdict::execCREATE_FRAGMENTATION_REQ(Sig
TableRecordPtr tablePtr;
c_tableRecordPool.getPtr(tablePtr, req->primaryTableId);
- if (tablePtr.p->tabState == TableRecord::DEFINED ||
- tablePtr.p->tabState == TableRecord::BACKUP_ONGOING) {
+ if (tablePtr.p->tabState == TableRecord::DEFINED) {
jam();
EXECUTE_DIRECT(DBDIH, GSN_CREATE_FRAGMENTATION_REQ, signal,
CreateFragmentationReq::SignalLength);
@@ -1218,7 +1217,7 @@ Dbdict::updateSchemaState(Signal* signal
bool savetodisk, bool dicttrans)
{
jam();
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
SchemaFile::TableEntry * tableEntry = getTableEntry(xsf, tableId);
if (!dicttrans) {
@@ -2053,14 +2052,11 @@ void Dbdict::initTableRecords()
void Dbdict::initialiseTableRecord(TableRecordPtr tablePtr)
{
new (tablePtr.p) TableRecord();
- tablePtr.p->activePage = RNIL;
tablePtr.p->filePtr[0] = RNIL;
tablePtr.p->filePtr[1] = RNIL;
- tablePtr.p->firstPage = RNIL;
tablePtr.p->tableId = tablePtr.i;
tablePtr.p->tableVersion = (Uint32)-1;
tablePtr.p->tabState = TableRecord::NOT_DEFINED;
- tablePtr.p->tabReturnState = TableRecord::TRS_IDLE;
tablePtr.p->fragmentType = DictTabInfo::AllNodesSmallTable;
tablePtr.p->gciTableCreated = 0;
tablePtr.p->noOfAttributes = ZNIL;
@@ -2135,7 +2131,7 @@ Uint32 Dbdict::getFsConnRecord()
*/
Uint32 Dbdict::getFreeObjId(Uint32 minId)
{
- const XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ const XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
Uint32 noOfPages = xsf->noOfPages;
Uint32 n, i;
for (n = 0; n < noOfPages; n++) {
@@ -2514,12 +2510,12 @@ void Dbdict::execHOT_SPAREREP(Signal* si
void Dbdict::initSchemaFile(Signal* signal)
{
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
xsf->noOfPages = (c_tableRecordPool.getSize() + NDB_SF_PAGE_ENTRIES - 1)
/ NDB_SF_PAGE_ENTRIES;
initSchemaFile(xsf, 0, xsf->noOfPages, true);
// init alt copy too for INR
- XSchemaFile * oldxsf = &c_schemaFile[c_schemaRecord.oldSchemaPage != 0];
+ XSchemaFile * oldxsf = &c_schemaFile[SchemaRecord::OLD_SCHEMA_FILE];
oldxsf->noOfPages = xsf->noOfPages;
memcpy(&oldxsf->schemaPage[0], &xsf->schemaPage[0],
xsf->schemaPage[0].FileSize);
@@ -2944,17 +2940,29 @@ void Dbdict::execDICTSTARTREQ(Signal* si
c_restartRecord.m_pass = 0;
c_restartRecord.activeTable = 0;
- c_schemaRecord.schemaPage = c_schemaRecord.oldSchemaPage; // ugly
+
+ /**
+ * master has same new/old schema file...
+ * copy old(read from disk) to new
+ */
+ {
+ XSchemaFile * oldxsf = &c_schemaFile[SchemaRecord::OLD_SCHEMA_FILE];
+ XSchemaFile * newxsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
+ newxsf->noOfPages = oldxsf->noOfPages;
+ memcpy(&newxsf->schemaPage[0],
+ &oldxsf->schemaPage[0],
+ oldxsf->schemaPage[0].FileSize);
+ }
+
checkSchemaStatus(signal);
}//execDICTSTARTREQ()
void
Dbdict::masterRestart_checkSchemaStatusComplete(Signal* signal,
Uint32 callbackData,
- Uint32 returnCode){
-
- c_schemaRecord.schemaPage = 0; // ugly
- XSchemaFile * oldxsf = &c_schemaFile[c_schemaRecord.oldSchemaPage != 0];
+ Uint32 returnCode)
+{
+ XSchemaFile * oldxsf = &c_schemaFile[SchemaRecord::OLD_SCHEMA_FILE];
ndbrequire(oldxsf->noOfPages != 0);
LinearSectionPtr ptr[3];
@@ -2975,7 +2983,7 @@ Dbdict::masterRestart_checkSchemaStatusC
1,
c);
- XSchemaFile * newxsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * newxsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
newxsf->noOfPages = oldxsf->noOfPages;
memcpy(&newxsf->schemaPage[0], &oldxsf->schemaPage[0],
oldxsf->noOfPages * NDB_SF_PAGE_SIZE);
@@ -2995,7 +3003,7 @@ Dbdict::execGET_SCHEMA_INFOREQ(Signal* s
LinearSectionPtr ptr[3];
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
ndbrequire(xsf->noOfPages != 0);
ptr[0].p = (Uint32*)&xsf->schemaPage[0];
@@ -3042,7 +3050,7 @@ void Dbdict::execSCHEMA_INFO(Signal* sig
SegmentedSectionPtr schemaDataPtr;
handle.getSection(schemaDataPtr, 0);
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
ndbrequire(schemaDataPtr.sz % NDB_SF_PAGE_SIZE_IN_WORDS == 0);
xsf->noOfPages = schemaDataPtr.sz / NDB_SF_PAGE_SIZE_IN_WORDS;
copy((Uint32*)&xsf->schemaPage[0], schemaDataPtr);
@@ -3056,7 +3064,7 @@ void Dbdict::execSCHEMA_INFO(Signal* sig
validateChecksum(xsf);
- XSchemaFile * oldxsf = &c_schemaFile[c_schemaRecord.oldSchemaPage != 0];
+ XSchemaFile * oldxsf = &c_schemaFile[SchemaRecord::OLD_SCHEMA_FILE];
resizeSchemaFile(xsf, oldxsf->noOfPages);
ndbrequire(signal->getSendersBlockRef() != reference());
@@ -3087,7 +3095,7 @@ Dbdict::restart_checkSchemaStatusComplet
ndbrequire(c_writeSchemaRecord.inUse == false);
c_writeSchemaRecord.inUse = true;
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
c_writeSchemaRecord.pageId = c_schemaRecord.schemaPage;
c_writeSchemaRecord.newFile = true;
c_writeSchemaRecord.firstPage = 0;
@@ -3215,8 +3223,8 @@ operator<<(NdbOut& out, const SchemaFile
void Dbdict::checkSchemaStatus(Signal* signal)
{
- XSchemaFile * newxsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
- XSchemaFile * oldxsf = &c_schemaFile[c_schemaRecord.oldSchemaPage != 0];
+ XSchemaFile * newxsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
+ XSchemaFile * oldxsf = &c_schemaFile[SchemaRecord::OLD_SCHEMA_FILE];
ndbrequire(newxsf->noOfPages == oldxsf->noOfPages);
const Uint32 noOfEntries = newxsf->noOfPages * NDB_SF_PAGE_ENTRIES;
@@ -3622,7 +3630,7 @@ Dbdict::execGET_TABINFO_CONF(Signal* sig
// save to disk
ndbrequire(tableId < c_tableRecordPool.getSize());
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
SchemaFile::TableEntry * tableEntry = getTableEntry(xsf, tableId);
tableEntry->m_info_words= tabInfoPtr.sz;
@@ -5140,7 +5148,7 @@ Dbdict::createTable_parse(Signal* signal
// update table version
{
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
SchemaFile::TableEntry * tabEntry = getTableEntry(xsf, tabPtr.i);
impl_req->tableVersion =
@@ -5240,7 +5248,7 @@ Dbdict::createTable_parse(Signal* signal
// save original schema file entry
{
- XSchemaFile* xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile* xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
SchemaFile::TableEntry* tableEntry = getTableEntry(xsf, tableId);
createTabPtr.p->m_orig_entry = *tableEntry;
}
@@ -6137,7 +6145,7 @@ Dbdict::createTable_abortLocalConf(Signa
// write schema file wl3600_todo why not use updateSchemaState
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
SchemaFile::TableEntry * tableEntry = getTableEntry(xsf, tableId);
tableEntry->m_tableState = SchemaFile::DROP_TABLE_COMMITTED;
tableEntry->m_transId = 0;
@@ -6412,9 +6420,16 @@ Dbdict::dropTable_parse(Signal* signal,
return;
}
+ if (tablePtr.p->m_read_locked)
+ {
+ jam();
+ setError(error, DropTableRef::BackupInProgress, __LINE__);
+ return;
+ }
+
// check and save original schema file entry
{
- XSchemaFile* xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile* xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
SchemaFile::TableEntry* te = getTableEntry(xsf, tableId);
if (te->m_transId != 0 && te->m_transId != trans_ptr.p->m_transId) {
@@ -6447,10 +6462,6 @@ Dbdict::dropTable_parse(Signal* signal,
jam();
setError(error, DropTableRef::DropInProgress, __LINE__);
return;
- case TableRecord::BACKUP_ONGOING:
- jam();
- setError(error, DropTableRef::BackupInProgress, __LINE__);
- return;
}
ndbrequire(ok);
@@ -6576,10 +6587,10 @@ Dbdict::dropTable_backup_mutex_locked(Si
Mutex mutex(signal, c_mutexMgr, dropTabPtr.p->m_define_backup_mutex);
mutex.unlock(); // ignore response
- if (tablePtr.p->tabState == TableRecord::BACKUP_ONGOING)
+ if (tablePtr.p->m_read_locked)
{
jam();
- setError(op_ptr, DropTableRef::BackupInProgress, __LINE__);
+ setError(op_ptr, AlterTableRef::BackupInProgress, __LINE__);
sendTransRef(signal, op_ptr);
return;
}
@@ -6681,7 +6692,7 @@ Dbdict::prepDropTab_writeSchema(Signal*
/**
* Modify schema wl3600_todo use updateSchemaState
*/
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
SchemaFile::TableEntry * tableEntry = getTableEntry(xsf, tablePtr.i);
SchemaFile::TableState tabState =
(SchemaFile::TableState)tableEntry->m_tableState;
@@ -6960,7 +6971,7 @@ Dbdict::dropTab_complete(Signal* signal,
/**
* Write to schema file
*/
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
SchemaFile::TableEntry * tableEntry = getTableEntry(xsf, tableId);
SchemaFile::TableState tabState =
(SchemaFile::TableState)tableEntry->m_tableState;
@@ -7056,7 +7067,7 @@ Dbdict::dropTable_abortParse(Signal* sig
// update schema state in memory XXX temp
if (dropTabPtr.p->m_curr_entry.m_tableState != SchemaFile::INIT)
{
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
SchemaFile::TableEntry * tableEntry = getTableEntry(xsf, tableId);
*tableEntry = dropTabPtr.p->m_orig_entry;
computeChecksum(xsf, tableId / NDB_SF_PAGE_ENTRIES);
@@ -7242,6 +7253,13 @@ Dbdict::alterTable_parse(Signal* signal,
}
c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
+ if (tablePtr.p->m_read_locked)
+ {
+ jam();
+ setError(error, tablePtr.p->m_read_locked, __LINE__);
+ return;
+ }
+
const TableRecord::TabState tabState = tablePtr.p->tabState;
bool ok = false;
switch (tabState) {
@@ -7254,10 +7272,6 @@ Dbdict::alterTable_parse(Signal* signal,
ok = true;
jam();
break;
- case TableRecord::BACKUP_ONGOING:
- jam();
- setError(error, AlterTableRef::BackupInProgress, __LINE__);
- return;
case TableRecord::PREPARE_DROPPING:
case TableRecord::DROPPING:
jam();
@@ -7358,7 +7372,7 @@ Dbdict::alterTable_parse(Signal* signal,
// save original schema file entry
{
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
SchemaFile::TableEntry * tableEntry = getTableEntry(xsf, impl_req->tableId);
alterTabPtr.p->m_orig_entry = *tableEntry;
}
@@ -7475,10 +7489,10 @@ Dbdict::alterTable_backup_mutex_locked(S
Mutex mutex(signal, c_mutexMgr, alterTabPtr.p->m_define_backup_mutex);
mutex.unlock(); // ignore response
- if (tablePtr.p->tabState == TableRecord::BACKUP_ONGOING)
+ if (tablePtr.p->m_read_locked)
{
jam();
- setError(op_ptr, AlterTableRef::BackupInProgress, __LINE__);
+ setError(op_ptr, tablePtr.p->m_read_locked, __LINE__);
sendTransRef(signal, op_ptr);
return;
}
@@ -7926,10 +7940,7 @@ Dbdict::alterTable_abortParse(Signal* si
if (!tablePtr.isNull()) {
jam();
// wl3600_todo tabState should be rationalized
- if (tablePtr.p->tabState != TableRecord::BACKUP_ONGOING) {
- jam();
- tablePtr.p->tabState = TableRecord::DEFINED;
- }
+ tablePtr.p->tabState = TableRecord::DEFINED;
tablePtr.setNull();
}
@@ -8274,7 +8285,7 @@ void Dbdict::execGET_TABINFOREQ(Signal*
SchemaFile::TableEntry *objEntry = 0;
if(obj_id != RNIL){
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
objEntry = getTableEntry(xsf, obj_id);
}
@@ -8313,8 +8324,7 @@ void Dbdict::execGET_TABINFOREQ(Signal*
jam();
TableRecordPtr tabPtr;
c_tableRecordPool.getPtr(tabPtr, obj_id);
- if (tabPtr.p->tabState != TableRecord::DEFINED &&
- tabPtr.p->tabState != TableRecord::BACKUP_ONGOING)
+ if (tabPtr.p->tabState != TableRecord::DEFINED)
{
jam();
sendGET_TABINFOREF(signal, req, GetTabInfoRef::TableNotDefined, __LINE__);
@@ -8476,7 +8486,16 @@ Dbdict::execLIST_TABLES_REQ(Signal* sign
if(DictTabInfo::isTable(type)){
switch (tablePtr.p->tabState) {
case TableRecord::DEFINING:
- conf->setTableState(pos, DictTabInfo::StateBuilding);
+ if (tablePtr.p->m_read_locked)
+ {
+ jam();
+ conf->setTableState(pos, DictTabInfo::StateBackup);
+ }
+ else
+ {
+ jam();
+ conf->setTableState(pos, DictTabInfo::StateBuilding);
+ }
break;
case TableRecord::PREPARE_DROPPING:
case TableRecord::DROPPING:
@@ -8485,9 +8504,6 @@ Dbdict::execLIST_TABLES_REQ(Signal* sign
case TableRecord::DEFINED:
conf->setTableState(pos, DictTabInfo::StateOnline);
break;
- case TableRecord::BACKUP_ONGOING:
- conf->setTableState(pos, DictTabInfo::StateBackup);
- break;
default:
conf->setTableState(pos, DictTabInfo::StateBroken);
break;
@@ -8789,8 +8805,7 @@ Dbdict::createIndex_parse(Signal* signal
c_obj_pool.getPtr(obj_ptr, tablePtr.p->m_obj_ptr_i);
if (obj_ptr.p->m_trans_key == 0) {
- if (tablePtr.p->tabState != TableRecord::DEFINED &&
- tablePtr.p->tabState != TableRecord::BACKUP_ONGOING) {
+ if (tablePtr.p->tabState != TableRecord::DEFINED) {
jam();
setError(error, CreateIndxRef::InvalidPrimaryTable, __LINE__);
return;
@@ -13919,8 +13934,7 @@ Dbdict::createTrigger_parse(Signal* sign
c_tableRecordPool.getPtr(tablePtr, tableId);
#if wl3600_todo // no PARSE state
if (tablePtr.p->tabState != TableRecord::PARSE &&
- tablePtr.p->tabState != TableRecord::DEFINED &&
- tablePtr.p->tabState != TableRecord::BACKUP_ONGOING)
+ tablePtr.p->tabState != TableRecord::DEFINED)
{
jam();
setError(error, CreateTrigRef::InvalidTable, __LINE__);
@@ -15611,12 +15625,14 @@ Dbdict::execBACKUP_LOCK_TAB_REQ(Signal*
if(lock == BackupLockTab::LOCK_TABLE)
{
+ jam();
+ tablePtr.p->m_read_locked = 1;
ndbrequire(tablePtr.p->tabState == TableRecord::DEFINED);
- tablePtr.p->tabState = TableRecord::BACKUP_ONGOING;
}
- else if(tablePtr.p->tabState == TableRecord::BACKUP_ONGOING)
+ else
{
- tablePtr.p->tabState = TableRecord::DEFINED;
+ jam();
+ tablePtr.p->m_read_locked = 0;
}
sendSignal(senderRef, GSN_BACKUP_LOCK_TAB_CONF, signal,
@@ -15827,7 +15843,7 @@ Dbdict::execCREATE_FILE_REQ(Signal* sign
trans_ptr.p->m_op.m_obj_id = objId;
create_obj->gci = 0;
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
SchemaFile::TableEntry *objEntry = getTableEntry(xsf, objId);
create_obj->objVersion =
create_obj_inc_schema_version(objEntry->m_tableVersion);
@@ -15948,7 +15964,7 @@ Dbdict::execCREATE_FILEGROUP_REQ(Signal*
trans_ptr.p->m_op.m_obj_id = objId;
create_obj->gci = 0;
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
SchemaFile::TableEntry *objEntry = getTableEntry(xsf, objId);
create_obj->objVersion =
create_obj_inc_schema_version(objEntry->m_tableVersion);
@@ -16865,7 +16881,7 @@ Dbdict::createObj_commit_start_done(Sign
ndbrequire(c_opCreateObj.find(createObjPtr, callbackData));
Uint32 objId = createObjPtr.p->m_obj_id;
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
SchemaFile::TableEntry objEntry = * getTableEntry(xsf, objId);
objEntry.m_tableState = SchemaFile::TABLE_ADD_COMMITTED;
@@ -16952,7 +16968,7 @@ Dbdict::createObj_abort_start_done(Signa
jam();
ndbrequire(c_opCreateObj.find(createObjPtr, callbackData));
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
SchemaFile::TableEntry objEntry = * getTableEntry(xsf,
createObjPtr.p->m_obj_id);
objEntry.m_tableState = SchemaFile::DROP_TABLE_COMMITTED;
@@ -17103,7 +17119,7 @@ Dbdict::dropObj_prepare_start_done(Signa
}
jam();
Uint32 objId = dropObjPtr.p->m_obj_id;
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
SchemaFile::TableEntry objEntry = *getTableEntry(xsf, objId);
objEntry.m_tableState = SchemaFile::DROP_TABLE_STARTED;
updateSchemaState(signal, objId, &objEntry, &cb);
@@ -17198,7 +17214,7 @@ Dbdict::dropObj_commit_start_done(Signal
ndbrequire(c_opDropObj.find(dropObjPtr, callbackData));
Uint32 objId = dropObjPtr.p->m_obj_id;
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
SchemaFile::TableEntry objEntry = * getTableEntry(xsf, objId);
objEntry.m_tableState = SchemaFile::DROP_TABLE_COMMITTED;
@@ -17287,7 +17303,7 @@ Dbdict::dropObj_abort_start_done(Signal*
ndbrequire(returnCode == 0);
ndbrequire(c_opDropObj.find(dropObjPtr, callbackData));
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
SchemaFile::TableEntry objEntry = * getTableEntry(xsf,
dropObjPtr.p->m_obj_id);
@@ -18213,7 +18229,7 @@ Dbdict::drop_fg_commit_start(Signal* sig
*/
Ptr<File> filePtr;
Local_file_list list(c_file_pool, fg_ptr.p->m_logfilegroup.m_files);
- XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.schemaPage != 0];
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
for(list.first(filePtr); !filePtr.isNull(); list.next(filePtr))
{
jam();
@@ -19101,6 +19117,11 @@ Dbdict::trans_start_recv_reply(Signal* s
trans_ptr.p->m_state = SchemaTrans::TS_ABORT_START;
/**
+ * Clear nodes that did not reply START-CONF
+ */
+ trans_ptr.p->m_nodes.bitANDC(trans_ptr.p->m_ref_nodes);
+
+ /**
* Abort before replying to client
*/
trans_end_start(signal, trans_ptr);
@@ -19291,6 +19312,7 @@ Dbdict::handleClientReq(Signal* signal,
req->transId = trans_ptr.p->m_transId;
req->gsn = gsn;
+ trans_ptr.p->m_ref_nodes.clear();
trans_ptr.p->m_nodes.bitAND(c_aliveNodes);
NdbNodeBitmask nodes = trans_ptr.p->m_nodes;
NodeReceiverGroup rg(DBDICT, nodes);
@@ -19445,6 +19467,9 @@ Dbdict::trans_recv_reply(Signal* signal,
case SchemaTrans::TS_ROLLBACK_SP:
trans_rollback_sp_recv_reply(signal, trans_ptr);
return;
+ case SchemaTrans::TS_FLUSH_PREPARE:
+ trans_prepare_first(signal, trans_ptr);
+ return;
case SchemaTrans::TS_PREPARING:
jam();
trans_prepare_recv_reply(signal, trans_ptr);
@@ -19457,9 +19482,17 @@ Dbdict::trans_recv_reply(Signal* signal,
jam();
trans_abort_parse_recv_reply(signal, trans_ptr);
return;
+ case SchemaTrans::TS_FLUSH_COMMIT:
+ jam();
+ trans_commit_first(signal, trans_ptr);
+ return;
case SchemaTrans::TS_COMMITTING:
trans_commit_recv_reply(signal, trans_ptr);
return;
+ case SchemaTrans::TS_FLUSH_COMPLETE:
+ jam();
+ trans_complete_first(signal, trans_ptr);
+ return;
case SchemaTrans::TS_COMPLETING:
trans_complete_recv_reply(signal, trans_ptr);
return;
@@ -19468,7 +19501,6 @@ Dbdict::trans_recv_reply(Signal* signal,
return;
case SchemaTrans::TS_STARTED: // These states are waiting for client
jam(); // And should not get a "internal" reply
- case SchemaTrans::TS_COMMITTED:
ndbrequire(false);
}
}
@@ -19637,6 +19669,33 @@ Dbdict::abortSubOps(Signal* signal, Sche
void
Dbdict::trans_prepare_start(Signal* signal, SchemaTransPtr trans_ptr)
{
+ trans_ptr.p->m_state = SchemaTrans::TS_FLUSH_PREPARE;
+
+ SchemaTransImplReq* req = (SchemaTransImplReq*)signal->getDataPtrSend();
+ req->senderRef = reference();
+ req->transKey = trans_ptr.p->trans_key;
+ req->opKey = RNIL;
+ req->requestInfo = SchemaTransImplReq::RT_FLUSH_PREPARE;
+ req->clientRef = 0;
+ req->transId = trans_ptr.p->m_transId;
+ req->gsn = 0;
+
+ trans_ptr.p->m_nodes.bitAND(c_aliveNodes);
+ NdbNodeBitmask nodes = trans_ptr.p->m_nodes;
+ NodeReceiverGroup rg(DBDICT, nodes);
+ {
+ SafeCounter sc(c_counterMgr, trans_ptr.p->m_counter);
+ bool ok = sc.init<SchemaTransImplRef>(rg, trans_ptr.p->trans_key);
+ ndbrequire(ok);
+ }
+
+ sendSignal(rg, GSN_SCHEMA_TRANS_IMPL_REQ, signal,
+ SchemaTransImplReq::SignalLength, JBB);
+}
+
+void
+Dbdict::trans_prepare_first(Signal* signal, SchemaTransPtr trans_ptr)
+{
trans_ptr.p->m_state = SchemaTrans::TS_PREPARING;
SchemaOpPtr op_ptr;
@@ -19795,8 +19854,6 @@ Dbdict::trans_abort_parse_next(Signal* s
trans_ptr.p->m_nodes.bitAND(c_aliveNodes);
NdbNodeBitmask nodes = trans_ptr.p->m_nodes;
- nodes.bitANDC(trans_ptr.p->m_ref_nodes);
- trans_ptr.p->m_ref_nodes.clear();
NodeReceiverGroup rg(DBDICT, nodes);
{
SafeCounter sc(c_counterMgr, trans_ptr.p->m_counter);
@@ -20029,7 +20086,7 @@ Dbdict::trans_rollback_sp_next(Signal* s
trans_ptr.p->m_nodes.bitAND(c_aliveNodes);
NdbNodeBitmask nodes = trans_ptr.p->m_nodes;
- nodes.bitANDC(trans_ptr.p->m_ref_nodes); // Note: uses m_ref_nodes
+ nodes.bitANDC(trans_ptr.p->m_ref_nodes);
trans_ptr.p->m_ref_nodes.clear();
NodeReceiverGroup rg(DBDICT, nodes);
{
@@ -20065,6 +20122,33 @@ Dbdict::trans_rollback_sp_done(Signal* s
void
Dbdict::trans_commit_start(Signal* signal, SchemaTransPtr trans_ptr)
{
+ trans_ptr.p->m_state = SchemaTrans::TS_FLUSH_COMMIT;
+
+ SchemaTransImplReq* req = (SchemaTransImplReq*)signal->getDataPtrSend();
+ req->senderRef = reference();
+ req->transKey = trans_ptr.p->trans_key;
+ req->opKey = RNIL;
+ req->requestInfo = SchemaTransImplReq::RT_FLUSH_COMMIT;
+ req->clientRef = 0;
+ req->transId = trans_ptr.p->m_transId;
+ req->gsn = 0;
+
+ trans_ptr.p->m_nodes.bitAND(c_aliveNodes);
+ NdbNodeBitmask nodes = trans_ptr.p->m_nodes;
+ NodeReceiverGroup rg(DBDICT, nodes);
+ {
+ SafeCounter sc(c_counterMgr, trans_ptr.p->m_counter);
+ bool ok = sc.init<SchemaTransImplRef>(rg, trans_ptr.p->trans_key);
+ ndbrequire(ok);
+ }
+
+ sendSignal(rg, GSN_SCHEMA_TRANS_IMPL_REQ, signal,
+ SchemaTransImplReq::SignalLength, JBB);
+}
+
+void
+Dbdict::trans_commit_first(Signal* signal, SchemaTransPtr trans_ptr)
+{
jam();
ndbout_c("trans_commit");
@@ -20208,7 +20292,34 @@ Dbdict::trans_commit_mutex_unlocked(Sign
}
void
-Dbdict::trans_complete_start(Signal * signal, SchemaTransPtr trans_ptr)
+Dbdict::trans_complete_start(Signal* signal, SchemaTransPtr trans_ptr)
+{
+ trans_ptr.p->m_state = SchemaTrans::TS_FLUSH_COMPLETE;
+
+ SchemaTransImplReq* req = (SchemaTransImplReq*)signal->getDataPtrSend();
+ req->senderRef = reference();
+ req->transKey = trans_ptr.p->trans_key;
+ req->opKey = RNIL;
+ req->requestInfo = SchemaTransImplReq::RT_FLUSH_COMPLETE;
+ req->clientRef = 0;
+ req->transId = trans_ptr.p->m_transId;
+ req->gsn = 0;
+
+ trans_ptr.p->m_nodes.bitAND(c_aliveNodes);
+ NdbNodeBitmask nodes = trans_ptr.p->m_nodes;
+ NodeReceiverGroup rg(DBDICT, nodes);
+ {
+ SafeCounter sc(c_counterMgr, trans_ptr.p->m_counter);
+ bool ok = sc.init<SchemaTransImplRef>(rg, trans_ptr.p->trans_key);
+ ndbrequire(ok);
+ }
+
+ sendSignal(rg, GSN_SCHEMA_TRANS_IMPL_REQ, signal,
+ SchemaTransImplReq::SignalLength, JBB);
+}
+
+void
+Dbdict::trans_complete_first(Signal * signal, SchemaTransPtr trans_ptr)
{
jam();
trans_ptr.p->m_state = SchemaTrans::TS_COMPLETING;
@@ -20379,12 +20490,12 @@ Dbdict::execSCHEMA_TRANS_IMPL_REQ(Signal
slave_run_parse(signal, trans_ptr, req);
return;
case SchemaTransImplReq::RT_END:
+ case SchemaTransImplReq::RT_FLUSH_PREPARE:
+ case SchemaTransImplReq::RT_FLUSH_COMMIT:
+ case SchemaTransImplReq::RT_FLUSH_COMPLETE:
jam();
- slave_run_end(signal, trans_ptr);
- return;
- case SchemaTransImplReq::RT_FLUSH_SCHEMA:
- jam();
- slave_flush_schema(signal, trans_ptr);
+ jamLine(rt);
+ slave_run_flush(signal, trans_ptr, req);
return;
default:
break;
@@ -20404,7 +20515,9 @@ Dbdict::execSCHEMA_TRANS_IMPL_REQ(Signal
switch(rt){
case SchemaTransImplReq::RT_START:
case SchemaTransImplReq::RT_PARSE:
- case SchemaTransImplReq::RT_FLUSH_SCHEMA:
+ case SchemaTransImplReq::RT_FLUSH_PREPARE:
+ case SchemaTransImplReq::RT_FLUSH_COMMIT:
+ case SchemaTransImplReq::RT_FLUSH_COMPLETE:
ndbrequire(false); // handled above
case SchemaTransImplReq::RT_PREPARE:
jam();
@@ -20469,6 +20582,7 @@ Dbdict::slave_run_start(Signal *signal,
trans_ptr.p->m_isMaster = false;
trans_ptr.p->m_masterRef = req->senderRef;
trans_ptr.p->m_requestInfo = req->requestInfo;
+ trans_ptr.p->m_state = SchemaTrans::TS_STARTED;
}
else
{
@@ -20559,19 +20673,98 @@ Dbdict::slave_run_parse(Signal *signal,
}
void
-Dbdict::slave_flush_schema(Signal *signal,
- SchemaTransPtr trans_ptr)
+Dbdict::slave_run_flush(Signal *signal,
+ SchemaTransPtr trans_ptr,
+ const SchemaTransImplReq* req)
{
- sendTransConf(signal, trans_ptr);
+ if (!trans_ptr.p->m_isMaster)
+ {
+ jam();
+ jamLine(trans_ptr.p->m_state);
+ const Uint32 rt = DictSignal::getRequestType(req->requestInfo);
+ switch(rt){
+ case SchemaTransImplReq::RT_FLUSH_PREPARE:
+ ndbrequire(trans_ptr.p->m_state == SchemaTrans::TS_STARTED);
+ trans_ptr.p->m_state = SchemaTrans::TS_FLUSH_PREPARE;
+ break;
+ case SchemaTransImplReq::RT_FLUSH_COMMIT:
+ ndbrequire(trans_ptr.p->m_state == SchemaTrans::TS_PREPARING);
+ trans_ptr.p->m_state = SchemaTrans::TS_FLUSH_COMMIT;
+ break;
+ case SchemaTransImplReq::RT_FLUSH_COMPLETE:
+ ndbrequire(trans_ptr.p->m_state == SchemaTrans::TS_COMMITTING);
+ trans_ptr.p->m_state = SchemaTrans::TS_FLUSH_COMPLETE;
+ break;
+ case SchemaTransImplReq::RT_END:
+ /**
+ * No state check here, cause we get here regardless if transaction
+ * succeded or not...
+ */
+ trans_ptr.p->m_state = SchemaTrans::TS_ENDING;
+ break;
+ default:
+ ndbrequire(false);
+ }
+ }
+
+ XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
+ ndbrequire(c_writeSchemaRecord.inUse == false);
+ c_writeSchemaRecord.pageId = c_schemaRecord.schemaPage;
+ c_writeSchemaRecord.newFile = false;
+ c_writeSchemaRecord.firstPage = 0;
+ c_writeSchemaRecord.noOfPages = xsf->noOfPages;
+
+ c_writeSchemaRecord.m_callback.m_callbackData = trans_ptr.p->trans_key;
+ c_writeSchemaRecord.m_callback.m_callbackFunction =
+ safe_cast(&Dbdict::slave_writeSchema_conf);
+
+#if TODO
+ c_writeSchemaRecord.inUse = true;
+ startWriteSchemaFile(signal);
+#else
+ execute(signal, c_writeSchemaRecord.m_callback, 0);
+#endif
}
void
-Dbdict::slave_run_end(Signal* signal,
- SchemaTransPtr trans_ptr)
+Dbdict::slave_writeSchema_conf(Signal* signal,
+ Uint32 trans_key,
+ Uint32 ret)
{
- sendTransConf(signal, trans_ptr);
+ jamEntry();
+ ndbrequire(ret == 0);
+ SchemaTransPtr trans_ptr;
+ ndbrequire(findSchemaTrans(trans_ptr, trans_key));
+
+ bool release = false;
if (!trans_ptr.p->m_isMaster)
{
+ switch(trans_ptr.p->m_state){
+ case SchemaTrans::TS_FLUSH_PREPARE:
+ jam();
+ trans_ptr.p->m_state = SchemaTrans::TS_PREPARING;
+ break;
+ case SchemaTrans::TS_FLUSH_COMMIT:
+ jam();
+ trans_ptr.p->m_state = SchemaTrans::TS_COMMITTING;
+ break;
+ case SchemaTrans::TS_FLUSH_COMPLETE:
+ jam();
+ trans_ptr.p->m_state = SchemaTrans::TS_COMPLETING;
+ break;
+ case SchemaTrans::TS_ENDING:
+ jam();
+ release = true;
+ break;
+ default:
+ jamLine(trans_ptr.p->m_state);
+ ndbrequire(false);
+ }
+ }
+ sendTransConf(signal, trans_ptr);
+
+ if (release)
+ {
jam();
releaseSchemaTrans(trans_ptr);
}
@@ -21400,9 +21593,6 @@ Dbdict::check_consistency_table(TableRec
switch (tablePtr.p->tabState) {
case TableRecord::DEFINED:
- jam();
- break;
- case TableRecord::BACKUP_ONGOING: // should not be a "TabState"
jam();
break;
default:
diff -Nrup a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2008-04-29 19:39:47 +02:00
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2008-05-09 09:09:20 +02:00
@@ -325,18 +325,12 @@ public:
* Support variables for table handling
****************************************************/
- /* Active page which is sent to disk */
- Uint32 activePage;
-
/** File pointer received from disk */
Uint32 filePtr[2];
/** Pointer to first attribute in table */
DLFifoList<AttributeRecord>::Head m_attributes;
- /* Pointer to first page of table description */
- Uint32 firstPage;
-
Uint32 nextPool;
enum TabState {
@@ -344,19 +338,10 @@ public:
DEFINING = 2,
DEFINED = 4,
PREPARE_DROPPING = 5,
- DROPPING = 6,
- BACKUP_ONGOING = 7
+ DROPPING = 6
};
TabState tabState;
-
- /* State when returning from TC_SCHVERREQ */
- enum TabReturnState {
- TRS_IDLE = 0,
- ADD_TABLE = 1,
- SLAVE_SYSTEM_RESTART = 2,
- MASTER_SYSTEM_RESTART = 3
- };
- TabReturnState tabReturnState;
+ Uint32 m_read_locked; // BACKUP_ONGOING
/** Number of words */
Uint32 packedSize;
@@ -1045,6 +1030,12 @@ private:
* Word 4: Currently zero
****************************************************************************/
struct SchemaRecord {
+ enum
+ {
+ NEW_SCHEMA_FILE = 0, // Index in c_schemaFile
+ OLD_SCHEMA_FILE = 1
+ };
+
/** Schema file first page (0) */
Uint32 schemaPage;
@@ -1605,13 +1596,15 @@ private:
TS_PARSING = 4, // Parsing at participants
TS_SUBOP = 5, // Creating subop
TS_ROLLBACK_SP = 6, // Rolling back to SP (supported before prepare)
- TS_PREPARING = 7, // Preparing operations
- TS_ABORTING_PREPARE = 8, // Aborting prepared operations
- TS_ABORTING_PARSE = 9, // Aborting parsed operations
- TS_COMMITTING = 10,// Committing
- TS_COMMITTED = 11,// Committed
- TS_COMPLETING = 12,// Completing
- TS_ENDING = 13
+ TS_FLUSH_PREPARE = 7,
+ TS_PREPARING = 8, // Preparing operations
+ TS_ABORTING_PREPARE = 9, // Aborting prepared operations
+ TS_ABORTING_PARSE = 10,// Aborting parsed operations
+ TS_FLUSH_COMMIT = 11,
+ TS_COMMITTING = 12,// Committing
+ TS_FLUSH_COMPLETE = 13,// Committed
+ TS_COMPLETING = 14,// Completing
+ TS_ENDING = 15
};
Uint32 m_state;
@@ -1711,6 +1704,7 @@ private:
void trans_parse_recv_reply(Signal*, SchemaTransPtr);
void trans_prepare_start(Signal*, SchemaTransPtr);
+ void trans_prepare_first(Signal*, SchemaTransPtr);
void trans_prepare_recv_reply(Signal*, SchemaTransPtr);
void trans_prepare_next(Signal*, SchemaTransPtr, SchemaOpPtr);
void trans_prepare_done(Signal*, SchemaTransPtr);
@@ -1732,12 +1726,14 @@ private:
void trans_commit_start(Signal*, SchemaTransPtr);
void trans_commit_mutex_locked(Signal*, Uint32, Uint32);
+ void trans_commit_first(Signal*, SchemaTransPtr);
void trans_commit_recv_reply(Signal*, SchemaTransPtr);
void trans_commit_next(Signal*, SchemaTransPtr, SchemaOpPtr);
void trans_commit_done(Signal* signal, SchemaTransPtr);
void trans_commit_mutex_unlocked(Signal*, Uint32, Uint32);
void trans_complete_start(Signal* signal, SchemaTransPtr);
+ void trans_complete_first(Signal* signal, SchemaTransPtr);
void trans_complete_next(Signal*, SchemaTransPtr, SchemaOpPtr);
void trans_complete_recv_reply(Signal*, SchemaTransPtr);
void trans_complete_done(Signal*, SchemaTransPtr);
@@ -1759,8 +1755,8 @@ private:
void slave_run_start(Signal*, const SchemaTransImplReq*);
void slave_run_parse(Signal*, SchemaTransPtr, const SchemaTransImplReq*);
- void slave_run_end(Signal*, SchemaTransPtr);
- void slave_flush_schema(Signal*, SchemaTransPtr);
+ void slave_run_flush(Signal*, SchemaTransPtr, const SchemaTransImplReq*);
+ void slave_writeSchema_conf(Signal*, Uint32, Uint32);
// reply to trans client for begin/end trans
void sendTransClientReply(Signal*, SchemaTransPtr);
| Thread |
|---|
| • bk commit into 5.1 tree (jonas:1.2592) | jonas | 9 May 2008 |