Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.2086 06/02/02 12:23:23 jonas@stripped +17 -0
ndb dd
Fix SR bug that extent pages was scanned before undo was run
Fix bug wrt page flushing/tsman and tup's dirty page list
storage/ndb/tools/delete_all.cpp
1.18 06/02/02 12:23:20 jonas@stripped +2 -1
Fix dd SR + dd free space bugs
storage/ndb/test/tools/hugoLoad.cpp
1.9 06/02/02 12:23:20 jonas@stripped +6 -2
Fix dd SR + dd free space bugs
storage/ndb/src/kernel/vm/DLFifoList.hpp
1.6 06/02/02 12:23:20 jonas@stripped +33 -0
Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/tsman.hpp
1.3 06/02/02 12:23:20 jonas@stripped +30 -19
Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/tsman.cpp
1.6 06/02/02 12:23:20 jonas@stripped +138 -121
Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/pgman.hpp
1.4 06/02/02 12:23:20 jonas@stripped +22 -22
Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/pgman.cpp
1.7 06/02/02 12:23:20 jonas@stripped +178 -104
Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/lgman.cpp
1.6 06/02/02 12:23:20 jonas@stripped +3 -0
Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/diskpage.hpp
1.2 06/02/02 12:23:20 jonas@stripped +1 -1
Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
1.8 06/02/02 12:23:19 jonas@stripped +5 -4
Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
1.29 06/02/02 12:23:19 jonas@stripped +1 -6
Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
1.9 06/02/02 12:23:19 jonas@stripped +355 -480
Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
1.12 06/02/02 12:23:19 jonas@stripped +4 -2
Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
1.36 06/02/02 12:23:19 jonas@stripped +10 -5
Fix dd SR + dd free space bugs
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
1.88 06/02/02 12:23:19 jonas@stripped +110 -82
Add LCP_PREPARE to pgman
Change order between TSMAN/LGMAN START_RECREQ
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
1.44 06/02/02 12:23:19 jonas@stripped +1 -9
remove some unused code
storage/ndb/include/kernel/signaldata/PgmanContinueB.hpp
1.2 06/02/02 12:23:19 jonas@stripped +2 -1
Fix dd SR + dd free space bugs
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: jonas
# Host: perch.ndb.mysql.com
# Root: /home/jonas/src/51-new
--- 1.1/storage/ndb/include/kernel/signaldata/PgmanContinueB.hpp 2005-11-07 12:19:11
+01:00
+++ 1.2/storage/ndb/include/kernel/signaldata/PgmanContinueB.hpp 2006-02-02 12:23:19
+01:00
@@ -29,7 +29,8 @@
STATS_LOOP = 0,
BUSY_LOOP = 1,
CLEANUP_LOOP = 2,
- LCP_LOOP = 3
+ LCP_LOOP = 3,
+ LCP_PREPARE = 4
};
};
--- 1.8/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp 2006-01-27 15:57:57 +01:00
+++ 1.9/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp 2006-02-02 12:23:19 +01:00
@@ -17,6 +17,8 @@
#define DBTUP_C
#include "Dbtup.hpp"
+static bool f_undo_done = true;
+
static
NdbOut&
operator<<(NdbOut& out, const Ptr<Dbtup::Page> & ptr)
@@ -154,8 +156,6 @@
{
abort();
}
-
-
}
Uint32
@@ -227,6 +227,48 @@
return pos;
}
+void
+Dbtup::update_extent_pos(Disk_alloc_info& alloc,
+ Ptr<Extent_info> extentPtr)
+{
+ Uint32 old = extentPtr.p->m_free_matrix_pos;
+ if (old != RNIL)
+ {
+ Uint32 pos = alloc.calc_extent_pos(extentPtr.p);
+ if (old != pos)
+ {
+ jam();
+ Extent_list old_list(c_extent_pool, alloc.m_free_extents[old]);
+ Extent_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
+ old_list.remove(extentPtr);
+ new_list.add(extentPtr);
+ extentPtr.p->m_free_matrix_pos= pos;
+ }
+ }
+ else
+ {
+ ddassert(alloc.m_curr_extent_info_ptr_i == extentPtr.i);
+ }
+}
+
+void
+Dbtup::restart_setup_page(Ptr<Page> pagePtr)
+{
+ /**
+ * Link to extent, clear uncommitted_used_space
+ */
+ pagePtr.p->uncommitted_used_space = 0;
+ pagePtr.p->m_restart_seq = globalData.m_restart_seq;
+
+ Extent_info key;
+ key.m_key.m_file_no = pagePtr.p->m_file_no;
+ key.m_key.m_page_idx = pagePtr.p->m_extent_no;
+ Ptr<Extent_info> extentPtr;
+ ndbrequire(c_extent_hash.find(extentPtr, key));
+ pagePtr.p->m_extent_info_ptr = extentPtr.i;
+}
+
+
/**
* - Page free bits -
* 0 = 00 - free - 100% free
@@ -236,6 +278,8 @@
*
*/
+#define DBG_DISK 0
+
int
Dbtup::disk_page_prealloc(Signal* signal,
Ptr<Fragrecord> fragPtr,
@@ -252,6 +296,9 @@
fragPtrP->fragmentId,
fragPtrP->m_tablespace_id);
+ if (DBG_DISK)
+ ndbout << "disk_page_prealloc";
+
/**
* 1) search current dirty pages
*/
@@ -266,6 +313,8 @@
disk_page_prealloc_dirty_page(alloc, *(PagePtr*)&page, i, sz);
key->m_page_no= ((Page*)page.p)->m_page_no;
key->m_file_no= ((Page*)page.p)->m_file_no;
+ if (DBG_DISK)
+ ndbout << " found dirty page " << *key << endl;
return 0; // Page in memory
}
}
@@ -285,7 +334,8 @@
disk_page_prealloc_transit_page(alloc, req, i, sz);
* key = req.p->m_key;
- //ndbout_c("found transit page");
+ if (DBG_DISK)
+ ndbout << " found transit page " << *key << endl;
return 0;
}
}
@@ -346,26 +396,11 @@
if ((pos= alloc.find_extent(sz)) != RNIL)
{
jam();
- Uint32 cnt = 0;
LocalDLList<Extent_info> list(c_extent_pool, alloc.m_free_extents[pos]);
list.first(ext);
- while((pageBits= tsman.alloc_page_from_extent(&ext.p->m_key, bits)) < 0)
- if(!list.next(ext) || ++cnt == 10)
- break;
-
- if (cnt == 10 || ext.isNull())
- {
- pos = RNIL;
- }
- else
- {
- list.remove(ext);
- alloc.m_curr_extent_info_ptr_i= ext.i;
- ext.p->m_free_matrix_pos= RNIL;
- }
+ list.remove(ext);
}
-
- if (pos == RNIL)
+ else
{
jam();
/**
@@ -386,7 +421,7 @@
c_page_request_pool.release(req);
return err;
}
-
+
int pages= err;
ndbout << "allocated " << pages << " pages: " <<
ext.p->m_key << endl;
ext.p->m_first_page_no = ext.p->m_key.m_page_no;
@@ -398,19 +433,12 @@
LocalSLList<Extent_info, Extent_list_t>
list1(c_extent_pool, alloc.m_extent_list);
list1.add(ext);
-
- alloc.m_curr_extent_info_ptr_i= ext.i;
- ext.p->m_free_matrix_pos= RNIL;
- pageBits= tsman.alloc_page_from_extent(&ext.p->m_key, bits);
-#ifdef VM_TRACE
- ddassert(pageBits >= 0);
-#else
- if (unlikely(pageBits < 0))
- {
- return -AllocExtentReq::NoExtentAvailable;
- }
-#endif
- }
+ }
+
+ alloc.m_curr_extent_info_ptr_i= ext.i;
+ ext.p->m_free_matrix_pos= RNIL;
+ pageBits= tsman.alloc_page_from_extent(&ext.p->m_key, bits);
+ ddassert(pageBits >= 0);
}
/**
@@ -418,6 +446,9 @@
*/
*key= req.p->m_key= ext.p->m_key;
+ if (DBG_DISK)
+ ndbout << " allocated page " << *key << endl;
+
/**
* We don't know exact free space of page
* but we know what page free bits it has.
@@ -460,7 +491,7 @@
if (pageBits == 0)
{
//XXX empty page -> fast to map
- flags |= Page_cache_client::EMPTY_PAGE | Page_cache_client::NO_HOOK;
+ flags |= Page_cache_client::EMPTY_PAGE;
preq.m_callback.m_callbackFunction =
safe_cast(&Dbtup::disk_page_prealloc_initial_callback);
}
@@ -514,21 +545,7 @@
pagePtr.p->uncommitted_used_space = used;
ddassert(extentPtr.p->m_free_space >= sz);
extentPtr.p->m_free_space -= sz;
- Uint32 old_pos= extentPtr.p->m_free_matrix_pos;
- if (old_pos != RNIL) // Current extent
- {
- jam();
- Uint32 new_pos= alloc.calc_extent_pos(extentPtr.p);
- if (old_pos != new_pos)
- {
- jam();
- Extent_list old_list(c_extent_pool, alloc.m_free_extents[old_pos]);
- Extent_list new_list(c_extent_pool, alloc.m_free_extents[new_pos]);
- old_list.remove(extentPtr);
- new_list.add(extentPtr);
- extentPtr.p->m_free_matrix_pos= new_pos;
- }
- }
+ update_extent_pos(alloc, extentPtr);
}
@@ -567,27 +584,13 @@
req.p->m_estimated_free_space = free - sz;
ddassert(extentPtr.p->m_free_space >= sz);
extentPtr.p->m_free_space -= sz;
- Uint32 old_pos= extentPtr.p->m_free_matrix_pos;
- if (old_pos != RNIL) // Current extent
- {
- jam();
- Uint32 new_pos= alloc.calc_extent_pos(extentPtr.p);
- if (old_pos != new_pos)
- {
- jam();
- Extent_list old_list(c_extent_pool, alloc.m_free_extents[old_pos]);
- Extent_list new_list(c_extent_pool, alloc.m_free_extents[new_pos]);
- old_list.remove(extentPtr);
- new_list.add(extentPtr);
- extentPtr.p->m_free_matrix_pos= new_pos;
- }
- }
+ update_extent_pos(alloc, extentPtr);
}
void
Dbtup::disk_page_prealloc_callback(Signal* signal,
- Uint32 page_request, Uint32 page_id)
+ Uint32 page_request, Uint32 page_id)
{
//ndbout_c("disk_alloc_page_callback id: %d", page_id);
@@ -600,8 +603,15 @@
Ptr<Fragrecord> fragPtr;
fragPtr.i= req.p->m_frag_ptr_i;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
-
- disk_page_prealloc_callback_common(signal, req, fragPtr, gpage);
+
+ Ptr<Page> pagePtr = *(Ptr<Page>*)&gpage;
+
+ if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
+ {
+ restart_setup_page(pagePtr);
+ }
+
+ disk_page_prealloc_callback_common(signal, req, fragPtr, pagePtr);
}
void
@@ -622,6 +632,7 @@
Ptr<GlobalPage> gpage;
m_global_page_pool.getPtr(gpage, page_id);
+ Ptr<Page> pagePtr = *(Ptr<Page>*)&gpage;
Ptr<Fragrecord> fragPtr;
fragPtr.i= req.p->m_frag_ptr_i;
@@ -634,37 +645,34 @@
Ptr<Extent_info> extentPtr;
c_extent_pool.getPtr(extentPtr, req.p->m_extent_info_ptr);
- Page* page= (Page*)gpage.p;
- page->m_page_no= req.p->m_key.m_page_no;
- page->m_file_no= req.p->m_key.m_file_no;
- page->m_table_id= fragPtr.p->fragTableId;
- page->m_fragment_id = fragPtr.p->fragmentId;
- page->m_extent_no = extentPtr.p->m_key.m_page_idx; // logical extent no
- page->m_extent_info_ptr= req.p->m_extent_info_ptr;
- page->m_restart_seq = globalData.m_restart_seq;
- page->list_index = 0x8000;
- page->uncommitted_used_space = 0;
- page->nextList = page->prevList = RNIL;
+ pagePtr.p->m_page_no= req.p->m_key.m_page_no;
+ pagePtr.p->m_file_no= req.p->m_key.m_file_no;
+ pagePtr.p->m_table_id= fragPtr.p->fragTableId;
+ pagePtr.p->m_fragment_id = fragPtr.p->fragmentId;
+ pagePtr.p->m_extent_no = extentPtr.p->m_key.m_page_idx; // logical extent no
+ pagePtr.p->m_extent_info_ptr= req.p->m_extent_info_ptr;
+ pagePtr.p->m_restart_seq = globalData.m_restart_seq;
+ pagePtr.p->list_index = 0x8000;
+ pagePtr.p->uncommitted_used_space = 0;
+ pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
if (tabPtr.p->m_attributes[DD].m_no_of_varsize == 0)
{
- convertThPage((Fix_page*)gpage.p, tabPtr.p, DD);
+ convertThPage((Fix_page*)pagePtr.p, tabPtr.p, DD);
}
else
{
abort();
}
- disk_page_prealloc_callback_common(signal, req, fragPtr, gpage);
+ disk_page_prealloc_callback_common(signal, req, fragPtr, pagePtr);
}
void
Dbtup::disk_page_prealloc_callback_common(Signal* signal,
Ptr<Page_request> req,
Ptr<Fragrecord> fragPtr,
- Ptr<GlobalPage> pagePtr)
+ Ptr<Page> pagePtr)
{
- Page* page= (Page*)pagePtr.p;
-
/**
* 1) remove page request from Disk_alloc_info.m_page_requests
* 2) Add page to Disk_alloc_info.m_dirty_pages
@@ -672,31 +680,31 @@
* 4) inform pgman about current users
*/
Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
- ddassert((page->list_index & 0x8000) == 0x8000);
- ddassert(page->m_extent_info_ptr == req.p->m_extent_info_ptr);
- ddassert(page->m_page_no == req.p->m_key.m_page_no);
- ddassert(page->m_file_no == req.p->m_key.m_file_no);
+ ddassert((pagePtr.p->list_index & 0x8000) == 0x8000);
+ ddassert(pagePtr.p->m_extent_info_ptr == req.p->m_extent_info_ptr);
+ ddassert(pagePtr.p->m_page_no == req.p->m_key.m_page_no);
+ ddassert(pagePtr.p->m_file_no == req.p->m_key.m_file_no);
Uint32 old_idx = req.p->m_list_index;
Uint32 free= req.p->m_estimated_free_space;
Uint32 ext = req.p->m_extent_info_ptr;
Uint32 used= req.p->m_uncommitted_used_space;
- Uint32 real_free = page->free_space;
- Uint32 real_used = used + page->uncommitted_used_space;
+ Uint32 real_free = pagePtr.p->free_space;
+ Uint32 real_used = used + pagePtr.p->uncommitted_used_space;
ddassert(real_free >= free);
ddassert(real_free >= real_used);
ddassert(alloc.calc_page_free_bits(free) == old_idx);
Uint32 new_idx= alloc.calc_page_free_bits(real_free - real_used);
-
+
/**
* Add to dirty pages
*/
ArrayPool<Page> *cheat_pool= (ArrayPool<Page>*)&m_global_page_pool;
LocalDLList<Page> list(* cheat_pool, alloc.m_dirty_pages[new_idx]);
list.add(*(Ptr<Page>*)&pagePtr);
- page->uncommitted_used_space = real_used;
- page->list_index = new_idx;
+ pagePtr.p->uncommitted_used_space = real_used;
+ pagePtr.p->list_index = new_idx;
if (old_idx != new_idx || free != real_free)
{
@@ -712,21 +720,11 @@
extentPtr.p->m_free_page_count[new_idx]++;
}
- Uint32 old_pos= extentPtr.p->m_free_matrix_pos;
- if (old_pos != RNIL) // Current extent
- {
- jam();
- Uint32 new_pos= alloc.calc_extent_pos(extentPtr.p);
- if (old_pos != new_pos)
- {
- jam();
- Extent_list old_list(c_extent_pool, alloc.m_free_extents[old_pos]);
- Extent_list new_list(c_extent_pool, alloc.m_free_extents[new_pos]);
- old_list.remove(extentPtr);
- new_list.add(extentPtr);
- extentPtr.p->m_free_matrix_pos= new_pos;
- }
- }
+ update_extent_pos(alloc, extentPtr);
+ }
+ else
+ {
+ ndbout << endl;
}
{
@@ -736,103 +734,114 @@
}
}
-int
-Dbtup::disk_page_load_hook(Uint32 page_id)
+void
+Dbtup::disk_page_set_dirty(Ptr<Page> pagePtr)
{
- Ptr<GlobalPage> gpage;
- m_global_page_pool.getPtr(gpage, page_id);
+ Uint32 idx = pagePtr.p->list_index;
+ if ((idx & 0x8000) == 0)
+ {
+ /**
+ * Already in dirty list
+ */
+ return ;
+ }
- PagePtr pagePtr= *(PagePtr*)&gpage;
- Uint32 type = pagePtr.p->m_page_header.m_page_type;
- if (unlikely(type != File_formats::PT_Tup_fixsize_page &&
- type != File_formats::PT_Tup_varsize_page))
+ if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
{
- ndbassert(false);
- return 0;
+ restart_setup_page(pagePtr);
}
- pagePtr.p->list_index |= 0x8000;
- pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
-
Local_key key;
key.m_page_no = pagePtr.p->m_page_no;
key.m_file_no = pagePtr.p->m_file_no;
- if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
- {
- pagePtr.p->m_restart_seq = globalData.m_restart_seq;
- pagePtr.p->uncommitted_used_space = 0;
-
- Ptr<Tablerec> tabPtr;
- tabPtr.i= pagePtr.p->m_table_id;
- ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
-
- Ptr<Fragrecord> fragPtr;
- getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
-
- Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
- Uint32 idx= alloc.calc_page_free_bits(pagePtr.p->free_space);
-
- pagePtr.p->list_index = idx | 0x8000;
-
- Extent_info key;
- key.m_key.m_file_no = pagePtr.p->m_file_no;
- key.m_key.m_page_idx = pagePtr.p->m_extent_no;
- Ptr<Extent_info> extentPtr;
- ndbrequire(c_extent_hash.find(extentPtr, key));
- pagePtr.p->m_extent_info_ptr = extentPtr.i;
- return 1;
- }
+ if (DBG_DISK)
+ ndbout << " disk_page_set_dirty " << key << endl;
+
+ Uint32 tableId = pagePtr.p->m_table_id;
+ Uint32 fragId = pagePtr.p->m_fragment_id;
+
+ Uint32 free = pagePtr.p->free_space;
+ Uint32 used = pagePtr.p->uncommitted_used_space;
+
+ Ptr<Tablerec> tabPtr;
+ tabPtr.i= pagePtr.p->m_table_id;
+ ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
+
+ Ptr<Fragrecord> fragPtr;
+ getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
+
+ Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
+ Tablespace_client tsman(0, c_tsman,
+ fragPtr.p->fragTableId,
+ fragPtr.p->fragmentId,
+ fragPtr.p->m_tablespace_id);
- return 0;
+ ddassert(free >= used);
+ idx= alloc.calc_page_free_bits(free - used);
+
+ pagePtr.p->list_index = idx;
+ ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
+ LocalDLList<Page> list(*pool, alloc.m_dirty_pages[idx]);
+ list.add(pagePtr);
+
+ // Make sure no one will allocate it...
+ tsman.unmap_page(&key, MAX_FREE_LIST - 1);
}
void
-Dbtup::disk_page_unmap_callback(Uint32 page_id)
+Dbtup::disk_page_unmap_callback(Uint32 page_id, Uint32 dirty_count)
{
Ptr<GlobalPage> gpage;
m_global_page_pool.getPtr(gpage, page_id);
PagePtr pagePtr= *(PagePtr*)&gpage;
-
+
Uint32 type = pagePtr.p->m_page_header.m_page_type;
- if (unlikely(type != File_formats::PT_Tup_fixsize_page &&
- type != File_formats::PT_Tup_varsize_page))
+ if (unlikely((type != File_formats::PT_Tup_fixsize_page &&
+ type != File_formats::PT_Tup_varsize_page) ||
+ f_undo_done == false))
{
return ;
}
-
- Uint32 i = pagePtr.p->list_index;
-
+
Local_key key;
key.m_page_no = pagePtr.p->m_page_no;
key.m_file_no = pagePtr.p->m_file_no;
+ Uint32 idx = pagePtr.p->list_index;
- if ((i & 0x8000) == 0)
+ ndbassert((idx & 0x8000) == 0);
+
+ if (DBG_DISK)
+ ndbout << "disk_page_unmap_callback " << key << endl;
+
+ Ptr<Tablerec> tabPtr;
+ tabPtr.i= pagePtr.p->m_table_id;
+ ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
+
+ Ptr<Fragrecord> fragPtr;
+ getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
+
+ Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
+
+ if (dirty_count == 0)
{
- Ptr<Tablerec> tabPtr;
- tabPtr.i= pagePtr.p->m_table_id;
- ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
+ Uint32 free = pagePtr.p->free_space;
+ Uint32 used = pagePtr.p->uncommitted_used_space;
+ ddassert(free >= used);
+ ddassert(alloc.calc_page_free_bits(free - used) == idx);
+
+ Tablespace_client tsman(0, c_tsman,
+ fragPtr.p->fragTableId,
+ fragPtr.p->fragmentId,
+ fragPtr.p->m_tablespace_id);
- Ptr<Fragrecord> fragPtr;
- getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
-
- Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
-
- ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
- LocalDLList<Page> old(*pool, alloc.m_dirty_pages[i]);
- old.remove(pagePtr);
-
- if (pagePtr.p->uncommitted_used_space == 0)
- {
- Tablespace_client tsman(0, c_tsman,
- fragPtr.p->fragTableId,
- fragPtr.p->fragmentId,
- fragPtr.p->m_tablespace_id);
-
- tsman.unmap_page(&key);
- }
+ tsman.unmap_page(&key, idx);
+ pagePtr.p->list_index = idx | 0x8000;
}
- pagePtr.p->list_index = i | 0x8000;
+
+ ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
+ LocalDLList<Page> list(*pool, alloc.m_dirty_pages[idx]);
+ list.remove(pagePtr);
}
void
@@ -873,7 +882,7 @@
fragPtrP->fragTableId,
fragPtrP->fragmentId,
fragPtrP->m_tablespace_id);
-
+
tsman.update_page_free_bits(key, new_bits, lsn);
}
}
@@ -883,6 +892,9 @@
Tablerec *tabPtrP, Fragrecord * fragPtrP,
Local_key* key, PagePtr pagePtr, Uint32 gci)
{
+ if (DBG_DISK)
+ ndbout << " disk_page_free " << *key << endl;
+
Uint32 page_idx= key->m_page_idx;
Uint32 logfile_group_id= fragPtrP->m_logfile_group_id;
Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
@@ -895,6 +907,7 @@
{
sz = 1;
const Uint32 *src= ((Fix_page*)pagePtr.p)->get_ptr(page_idx, 0);
+ ndbassert(* (src + 1) != Tup_fixsize_page::FREE_RECORD);
lsn= disk_page_undo_free(pagePtr.p, key,
src, tabPtrP->m_offsets[DD].m_fix_header_size,
gci, logfile_group_id);
@@ -921,59 +934,40 @@
fragPtrP->fragTableId,
fragPtrP->fragmentId,
fragPtrP->m_tablespace_id);
-
+
tsman.update_page_free_bits(key, new_bits, lsn);
}
Uint32 ext = pagePtr.p->m_extent_info_ptr;
Uint32 used = pagePtr.p->uncommitted_used_space;
+ Uint32 old_idx = pagePtr.p->list_index;
ddassert(old_free >= used);
ddassert(new_free >= used);
ddassert(new_free >= old_free);
- page_idx = pagePtr.p->list_index;
- Uint32 old_idx = page_idx & 0x7FFF;
+ ddassert((old_idx & 0x8000) == 0);
+
Uint32 new_idx = alloc.calc_page_free_bits(new_free - used);
ddassert(alloc.calc_page_free_bits(old_free - used) == old_idx);
-
+
Ptr<Extent_info> extentPtr;
c_extent_pool.getPtr(extentPtr, ext);
-
+
if (old_idx != new_idx)
{
ddassert(extentPtr.p->m_free_page_count[old_idx]);
extentPtr.p->m_free_page_count[old_idx]--;
extentPtr.p->m_free_page_count[new_idx]++;
- if (old_idx == page_idx)
- {
- ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
- LocalDLList<Page> old_list(*pool, alloc.m_dirty_pages[old_idx]);
- LocalDLList<Page> new_list(*pool, alloc.m_dirty_pages[new_idx]);
- old_list.remove(pagePtr);
- new_list.add(pagePtr);
- pagePtr.p->list_index = new_idx;
- }
- else
- {
- pagePtr.p->list_index = new_idx | 0x8000;
- }
+ ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
+ LocalDLList<Page> new_list(*pool, alloc.m_dirty_pages[new_idx]);
+ LocalDLList<Page> old_list(*pool, alloc.m_dirty_pages[old_idx]);
+ old_list.remove(pagePtr);
+ new_list.add(pagePtr);
+ pagePtr.p->list_index = new_idx;
}
extentPtr.p->m_free_space += sz;
- Uint32 old_pos = extentPtr.p->m_free_matrix_pos;
- if (old_pos != RNIL)
- {
- Uint32 pos= alloc.calc_extent_pos(extentPtr.p);
-
- if (pos != old_pos)
- {
- Extent_list old_list(c_extent_pool, alloc.m_free_extents[old_pos]);
- Extent_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
- old_list.remove(extentPtr);
- new_list.add(extentPtr);
- extentPtr.p->m_free_matrix_pos= pos;
- }
- }
+ update_extent_pos(alloc, extentPtr);
}
void
@@ -1065,22 +1059,9 @@
}
pagePtr.p->uncommitted_used_space = used - sz;
- extentPtr.p->m_free_space += sz;
- Uint32 old_pos = extentPtr.p->m_free_matrix_pos;
- if (old_pos != RNIL)
- {
- Uint32 pos= alloc.calc_extent_pos(extentPtr.p);
-
- if (pos != old_pos)
- {
- Extent_list old_list(c_extent_pool, alloc.m_free_extents[old_pos]);
- Extent_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
- old_list.remove(extentPtr);
- new_list.add(extentPtr);
- extentPtr.p->m_free_matrix_pos= pos;
- }
- }
+ extentPtr.p->m_free_space += sz;
+ update_extent_pos(alloc, extentPtr);
}
Uint64
@@ -1160,109 +1141,17 @@
return lsn;
}
-int
-Dbtup::disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId,
- const Local_key* key, Uint32 pages)
-{
- TablerecPtr tabPtr;
- FragrecordPtr fragPtr;
- tabPtr.i = tableId;
- ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
- if (tabPtr.p->tableStatus == DEFINED)
- {
- getFragmentrec(fragPtr, fragId, tabPtr.p);
- if (!fragPtr.isNull())
- {
- Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
-
- Ptr<Extent_info> ext;
- ndbrequire(c_extent_pool.seize(ext));
-
- ext.p->m_key = *key;
- ndbout << "allocated " << pages << " pages: " <<
ext.p->m_key << endl;
- ext.p->m_first_page_no = ext.p->m_key.m_page_no;
- bzero(ext.p->m_free_page_count, sizeof(ext.p->m_free_page_count));
- ext.p->m_free_space= alloc.m_page_free_bits_map[0] * pages;
- ext.p->m_free_page_count[0]= pages; // All pages are "free"-est
-
- if (alloc.m_curr_extent_info_ptr_i != RNIL)
- {
- Ptr<Extent_info> old;
- c_extent_pool.getPtr(old, alloc.m_curr_extent_info_ptr_i);
- ndbassert(old.p->m_free_matrix_pos == RNIL);
- Uint32 pos= alloc.calc_extent_pos(old.p);
- Extent_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
- new_list.add(old);
- old.p->m_free_matrix_pos= pos;
- }
-
- alloc.m_curr_extent_info_ptr_i = ext.i;
- ext.p->m_free_matrix_pos = RNIL;
- c_extent_hash.add(ext);
-
- LocalSLList<Extent_info, Extent_list_t>
- list1(c_extent_pool, alloc.m_extent_list);
- list1.add(ext);
- return 0;
- }
- }
-
- return -1;
-}
-
-void
-Dbtup::disk_restart_page_bits(Uint32 tableId, Uint32 fragId,
- const Local_key*, Uint32 old_bits, Uint32 bits)
-{
- TablerecPtr tabPtr;
- FragrecordPtr fragPtr;
- tabPtr.i = tableId;
- ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
- getFragmentrec(fragPtr, fragId, tabPtr.p);
- Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
-
- Ptr<Extent_info> ext;
- c_extent_pool.getPtr(ext, alloc.m_curr_extent_info_ptr_i);
-
- Uint32 size= alloc.calc_page_free_space(bits);
- Uint32 old_size= alloc.calc_page_free_space(old_bits);
-
- if (bits != old_bits)
- {
- ndbassert(ext.p->m_free_page_count[old_bits] > 0);
- ndbassert(ext.p->m_free_space >= old_size);
-
- ext.p->m_free_page_count[bits]++;
- ext.p->m_free_page_count[old_bits]--;
-
- ext.p->m_free_space += size;
- ext.p->m_free_space -= old_size;
-
- Uint32 old_pos = ext.p->m_free_matrix_pos;
- if (old_pos != RNIL)
- {
- Uint32 pos= alloc.calc_extent_pos(ext.p);
-
- if (pos != old_pos)
- {
- Extent_list old_list(c_extent_pool, alloc.m_free_extents[old_pos]);
- Extent_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
- old_list.remove(ext);
- new_list.add(ext);
- ext.p->m_free_matrix_pos= pos;
- }
- }
- }
-}
-
#include <signaldata/LgmanContinueB.hpp>
static Dbtup::Apply_undo f_undo;
+#define DBG_UNDO 0
+
void
Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
Uint32 type, const Uint32 * ptr, Uint32 len)
{
+ f_undo_done = false;
f_undo.m_lsn= lsn;
f_undo.m_ptr= ptr;
f_undo.m_len= len;
@@ -1285,7 +1174,7 @@
Disk_undo::Alloc* rec= (Disk_undo::Alloc*)ptr;
preq.m_page.m_page_no = rec->m_page_no;
preq.m_page.m_file_no = rec->m_file_no_page_idx >> 16;
- preq.m_page.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
+ preq.m_page.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
break;
}
case File_formats::Undofile::UNDO_TUP_UPDATE:
@@ -1319,6 +1208,9 @@
disk_restart_undo_next(signal);
return;
}
+ case File_formats::Undofile::UNDO_END:
+ f_undo_done = true;
+ return;
default:
ndbrequire(false);
}
@@ -1327,7 +1219,7 @@
preq.m_callback.m_callbackFunction =
safe_cast(&Dbtup::disk_restart_undo_callback);
- int flags = Page_cache_client::NO_HOOK;
+ int flags = 0;
int res= m_pgman.get_page(signal, preq, flags);
switch(res)
{
@@ -1372,150 +1264,93 @@
Uint32 page_id)
{
jamEntry();
- Ptr<GlobalPage> page;
- m_global_page_pool.getPtr(page, page_id);
-
- Page* pageP = (Page*)page.p;
+ Ptr<GlobalPage> gpage;
+ m_global_page_pool.getPtr(gpage, page_id);
+ Ptr<Page> pagePtr = *(Ptr<Page>*)&gpage;
+ Apply_undo* undo = &f_undo;
+
bool update = false;
- if (! (pageP->list_index & 0x8000) ||
- pageP->nextList != RNIL ||
- pageP->prevList != RNIL)
+ if (! (pagePtr.p->list_index & 0x8000) ||
+ pagePtr.p->nextList != RNIL ||
+ pagePtr.p->prevList != RNIL)
{
update = true;
- pageP->list_index |= 0x8000;
- pageP->nextList = pageP->prevList = RNIL;
+ pagePtr.p->list_index |= 0x8000;
+ pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
}
-
- Ptr<Tablerec> tabPtr;
- tabPtr.i= pageP->m_table_id;
- ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
- if (tabPtr.p->tableStatus != DEFINED)
+ Uint32 tableId= pagePtr.p->m_table_id;
+ Uint32 fragId = pagePtr.p->m_fragment_id;
+
+ if (tableId >= cnoOfTablerec)
{
disk_restart_undo_next(signal);
return;
}
-
- Ptr<Fragrecord> fragPtr;
- getFragmentrec(fragPtr, pageP->m_fragment_id, tabPtr.p);
- if(fragPtr.isNull())
+ undo->m_table_ptr.i = tableId;
+ ptrCheckGuard(undo->m_table_ptr, cnoOfTablerec, tablerec);
+
+ if (undo->m_table_ptr.p->tableStatus != DEFINED)
+ {
+ disk_restart_undo_next(signal);
+ return;
+ }
+
+ getFragmentrec(undo->m_fragment_ptr, fragId, undo->m_table_ptr.p);
+ if(undo->m_fragment_ptr.isNull())
{
disk_restart_undo_next(signal);
return;
}
-
- Local_key key;
- key.m_page_no = pageP->m_page_no;
- key.m_file_no = pageP->m_file_no;
- if (pageP->m_restart_seq != globalData.m_restart_seq)
+ if (undo->m_fragment_ptr.p->m_undo_complete)
{
- {
- Extent_info key;
- key.m_key.m_file_no = pageP->m_file_no;
- key.m_key.m_page_idx = pageP->m_extent_no;
- Ptr<Extent_info> extentPtr;
- if (c_extent_hash.find(extentPtr, key))
- {
- pageP->m_extent_info_ptr = extentPtr.i;
- }
- else
- {
- /**
- * Extent was not allocated at start of LCP
- * (or was freed during)
- * I.e page does not need to be undoed as it's
- * really free
- */
- disk_restart_undo_next(signal);
- return;
- }
- }
-
- update= true;
- pageP->m_restart_seq = globalData.m_restart_seq;
- pageP->uncommitted_used_space = 0;
-
- Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
- Uint32 idx= alloc.calc_page_free_bits(pageP->free_space);
-
- pageP->list_index = idx | 0x8000;
-
+ disk_restart_undo_next(signal);
+ return;
}
+ Local_key key;
+ key.m_page_no = pagePtr.p->m_page_no;
+ key.m_file_no = pagePtr.p->m_file_no;
+
Uint64 lsn = 0;
- lsn += pageP->m_page_header.m_page_lsn_hi; lsn <<= 32;
- lsn += pageP->m_page_header.m_page_lsn_lo;
+ lsn += pagePtr.p->m_page_header.m_page_lsn_hi; lsn <<= 32;
+ lsn += pagePtr.p->m_page_header.m_page_lsn_lo;
- if (f_undo.m_lsn <= lsn)
+ if (undo->m_lsn <= lsn)
{
- Uint32 tableId= pageP->m_table_id;
- Uint32 fragId = pageP->m_fragment_id;
+ undo->m_page_ptr = pagePtr;
- f_undo.m_table_ptr.i= tableId;
- if (tableId < cnoOfTablerec)
- {
- ptrCheckGuard(f_undo.m_table_ptr, cnoOfTablerec, tablerec);
-
- if (f_undo.m_table_ptr.p->tableStatus == DEFINED)
- {
- getFragmentrec(f_undo.m_fragment_ptr, fragId, f_undo.m_table_ptr.p);
- if (!f_undo.m_fragment_ptr.isNull())
- {
- if (!f_undo.m_fragment_ptr.p->m_undo_complete)
- {
- f_undo.m_page_ptr.i = page_id;
- f_undo.m_page_ptr.p = pageP;
-
- update = true;
- ndbout_c("applying %lld", f_undo.m_lsn);
- /**
- * Apply undo record
- */
- switch(f_undo.m_type){
- case File_formats::Undofile::UNDO_TUP_ALLOC:
- disk_restart_undo_alloc(&f_undo);
- break;
- case File_formats::Undofile::UNDO_TUP_UPDATE:
- disk_restart_undo_update(&f_undo);
- break;
- case File_formats::Undofile::UNDO_TUP_FREE:
- disk_restart_undo_free(&f_undo);
- break;
- default:
- ndbrequire(false);
- }
-
- disk_restart_undo_page_bits(&f_undo);
-
- lsn = f_undo.m_lsn - 1; // make sure undo isn't run again...
- }
- else
- {
- ndbout_c("lsn %lld frag undo complete", f_undo.m_lsn);
- }
- }
- else
- {
- ndbout_c("lsn %lld table not defined", f_undo.m_lsn);
- }
- }
- else
- {
- ndbout_c("lsn %lld no such table", f_undo.m_lsn);
- }
- }
- else
- {
- ndbout_c("f_undo.m_lsn %lld > lsn %lld -> skip",
- f_undo.m_lsn, lsn);
- }
+ update = true;
+ if (DBG_UNDO)
+ ndbout_c("applying %lld", undo->m_lsn);
+ /**
+ * Apply undo record
+ */
+ switch(undo->m_type){
+ case File_formats::Undofile::UNDO_TUP_ALLOC:
+ disk_restart_undo_alloc(undo);
+ break;
+ case File_formats::Undofile::UNDO_TUP_UPDATE:
+ disk_restart_undo_update(undo);
+ break;
+ case File_formats::Undofile::UNDO_TUP_FREE:
+ disk_restart_undo_free(undo);
+ break;
+ default:
+ ndbrequire(false);
+ }
+
+ if (DBG_UNDO)
+ ndbout << "disk_restart_undo: " << undo->m_type << " "
+ << undo->m_key << endl;
- if (update)
- {
- m_pgman.update_lsn(f_undo.m_key, lsn);
- }
+ disk_restart_undo_page_bits(signal, undo);
+
+ lsn = undo->m_lsn - 1; // make sure undo isn't run again...
+
+ m_pgman.update_lsn(undo->m_key, lsn);
}
disk_restart_undo_next(signal);
@@ -1578,7 +1413,7 @@
}
void
-Dbtup::disk_restart_undo_page_bits(Apply_undo* undo)
+Dbtup::disk_restart_undo_page_bits(Signal* signal, Apply_undo* undo)
{
Fragrecord* fragPtrP = undo->m_fragment_ptr.p;
Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
@@ -1587,46 +1422,86 @@
* Set alloc.m_curr_extent_info_ptr_i to
* current this extent (and move old extend into free matrix)
*/
- Ptr<Extent_info> extentPtr;
- c_extent_pool.getPtr(extentPtr, undo->m_page_ptr.p->m_extent_info_ptr);
+ Page* pageP = undo->m_page_ptr.p;
+ Uint32 free = pageP->free_space;
+ Uint32 new_bits = alloc.calc_page_free_bits(free);
+ pageP->list_index = 0x8000 | new_bits;
- Uint32 currExtI = alloc.m_curr_extent_info_ptr_i;
- if (extentPtr.i != currExtI && currExtI != RNIL)
- {
- Ptr<Extent_info> currExtPtr;
- c_extent_pool.getPtr(currExtPtr, currExtI);
- ndbrequire(currExtPtr.p->m_free_matrix_pos == RNIL);
-
- Uint32 pos= alloc.calc_extent_pos(currExtPtr.p);
- Extent_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
- new_list.add(currExtPtr);
- currExtPtr.p->m_free_matrix_pos= pos;
- //ndbout_c("moving extent from %d to %d", old_pos, new_pos);
- }
+ Tablespace_client tsman(signal, c_tsman,
+ fragPtrP->fragTableId,
+ fragPtrP->fragmentId,
+ fragPtrP->m_tablespace_id);
- if (extentPtr.i != currExtI)
- {
- Uint32 old_pos = extentPtr.p->m_free_matrix_pos;
- Extent_list old_list(c_extent_pool, alloc.m_free_extents[old_pos]);
- old_list.remove(extentPtr);
- alloc.m_curr_extent_info_ptr_i = extentPtr.i;
- extentPtr.p->m_free_matrix_pos = RNIL;
- }
- else
+ tsman.restart_undo_page_free_bits(&undo->m_key, new_bits, undo->m_lsn);
+}
+
+int
+Dbtup::disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId,
+ const Local_key* key, Uint32 pages)
+{
+ TablerecPtr tabPtr;
+ FragrecordPtr fragPtr;
+ tabPtr.i = tableId;
+ ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
+ if (tabPtr.p->tableStatus == DEFINED)
{
- ndbrequire(extentPtr.p->m_free_matrix_pos == RNIL);
+ getFragmentrec(fragPtr, fragId, tabPtr.p);
+ if (!fragPtr.isNull())
+ {
+ Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
+
+ Ptr<Extent_info> ext;
+ ndbrequire(c_extent_pool.seize(ext));
+
+ ndbout << "allocated " << pages << " pages: " << *key
<< endl;
+
+ ext.p->m_key = *key;
+ ext.p->m_first_page_no = ext.p->m_key.m_page_no;
+ ext.p->m_free_space= 0;
+ bzero(ext.p->m_free_page_count, sizeof(ext.p->m_free_page_count));
+
+ if (alloc.m_curr_extent_info_ptr_i != RNIL)
+ {
+ Ptr<Extent_info> old;
+ c_extent_pool.getPtr(old, alloc.m_curr_extent_info_ptr_i);
+ ndbassert(old.p->m_free_matrix_pos == RNIL);
+ Uint32 pos= alloc.calc_extent_pos(old.p);
+ Extent_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
+ new_list.add(old);
+ old.p->m_free_matrix_pos= pos;
+ }
+
+ alloc.m_curr_extent_info_ptr_i = ext.i;
+ ext.p->m_free_matrix_pos = RNIL;
+ c_extent_hash.add(ext);
+
+ LocalSLList<Extent_info, Extent_list_t>
+ list1(c_extent_pool, alloc.m_extent_list);
+ list1.add(ext);
+ return 0;
+ }
}
- /**
- * Compute and update free bits for this page
- */
- Uint32 free = undo->m_page_ptr.p->free_space;
- Uint32 bits = alloc.calc_page_free_bits(free);
-
- Tablespace_client tsman(0, c_tsman,
- fragPtrP->fragTableId,
- fragPtrP->fragmentId,
- fragPtrP->m_tablespace_id);
+ return -1;
+}
- tsman.restart_undo_page_free_bits(&undo->m_key, bits, undo->m_lsn);
+void
+Dbtup::disk_restart_page_bits(Uint32 tableId, Uint32 fragId,
+ const Local_key*, Uint32 bits)
+{
+ TablerecPtr tabPtr;
+ FragrecordPtr fragPtr;
+ tabPtr.i = tableId;
+ ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
+ getFragmentrec(fragPtr, fragId, tabPtr.p);
+ Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
+
+ Ptr<Extent_info> ext;
+ c_extent_pool.getPtr(ext, alloc.m_curr_extent_info_ptr_i);
+
+ Uint32 size= alloc.calc_page_free_space(bits);
+
+ ext.p->m_free_space += size;
+ ext.p->m_free_page_count[bits]++;
+ ndbassert(ext.p->m_free_matrix_pos == RNIL);
}
--- 1.1/storage/ndb/src/kernel/blocks/diskpage.hpp 2005-11-07 12:19:14 +01:00
+++ 1.2/storage/ndb/src/kernel/blocks/diskpage.hpp 2006-02-02 12:23:20 +01:00
@@ -222,7 +222,7 @@
for(; words; words--)
sum |= m_page_bitmask[words-1];
- if(sum & 0x7777)
+ if(sum & 0x3333)
return false;
return true;
--- 1.5/storage/ndb/src/kernel/blocks/lgman.cpp 2006-01-27 15:12:02 +01:00
+++ 1.6/storage/ndb/src/kernel/blocks/lgman.cpp 2006-02-02 12:23:20 +01:00
@@ -2887,6 +2887,9 @@
void
Lgman::execEND_LCP_CONF(Signal* signal)
{
+ Dbtup* tup= (Dbtup*)globalData.getBlock(DBTUP);
+ tup->disk_restart_undo(signal, 0, File_formats::Undofile::UNDO_END, 0, 0);
+
/**
* pgman has completed flushing all pages
*
--- 1.6/storage/ndb/src/kernel/blocks/pgman.cpp 2006-01-27 07:34:14 +01:00
+++ 1.7/storage/ndb/src/kernel/blocks/pgman.cpp 2006-02-02 12:23:20 +01:00
@@ -70,9 +70,10 @@
addRecSignal(GSN_FSWRITEREF, &Pgman::execFSWRITEREF, true);
addRecSignal(GSN_FSWRITECONF, &Pgman::execFSWRITECONF);
+ addRecSignal(GSN_LCP_PREPARE_REQ, &Pgman::execLCP_PREPARE_REQ);
addRecSignal(GSN_LCP_FRAG_ORD, &Pgman::execLCP_FRAG_ORD);
addRecSignal(GSN_END_LCP_REQ, &Pgman::execEND_LCP_REQ);
-
+
// loop status
m_stats_loop_on = false;
m_busy_loop_on = false;
@@ -84,8 +85,6 @@
m_last_lcp_complete = 0;
m_lcp_curr_bucket = ~(Uint32)0;
m_lcp_outstanding = 0;
- m_lcp_copy_page = RNIL;
- m_lcp_copy_page_free = false;
// clean-up variables
m_cleanup_ptr.i = RNIL;
@@ -175,10 +174,6 @@
break;
case 3:
{
- Ptr<GlobalPage> page_ptr;
- ndbrequire(m_global_page_pool.seize(page_ptr));
- m_lcp_copy_page = page_ptr.i;
- m_lcp_copy_page_free = true;
// start forever loops
do_stats_loop(signal);
do_cleanup_loop(signal);
@@ -229,6 +224,23 @@
jam();
do_lcp_loop(signal);
break;
+ case PgmanContinueB::LCP_PREPARE:
+ {
+ jam();
+ Ptr<Page_entry> ptr;
+ Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
+ pl.getPtr(ptr, data1);
+ if (pl.next(ptr))
+ {
+ process_lcp_prepare(signal, ptr);
+ }
+ else
+ {
+ signal->theData[0] = 0;
+ sendSignal(DBLQH_REF, GSN_LCP_PREPARE_CONF, signal, 1, JBB);
+ }
+ return;
+ }
default:
ndbrequire(false);
break;
@@ -242,8 +254,10 @@
m_file_no(file_no),
m_page_no(page_no),
m_real_page_i(RNIL),
+ m_copy_page_i(RNIL),
m_lsn(0),
m_last_lcp(0),
+ m_dirty_count(0),
m_busy_count(0),
m_requests()
{
@@ -252,7 +266,7 @@
// page lists
Uint32
-Pgman::get_sublist_no(Uint16 state)
+Pgman::get_sublist_no(Page_state state)
{
if (state == 0)
{
@@ -290,14 +304,14 @@
}
void
-Pgman::set_page_state(Ptr<Page_entry> ptr, Uint16 new_state)
+Pgman::set_page_state(Ptr<Page_entry> ptr, Page_state new_state)
{
#ifdef VM_TRACE
debugOut << "PGMAN: >set_page_state: state=" << hex << new_state
<< endl;
debugOut << "PGMAN: " << ptr << ": before" << endl;
#endif
- Uint16 old_state = ptr.p->m_state;
+ Page_state old_state = ptr.p->m_state;
if (old_state != new_state)
{
Uint32 old_list_no = get_sublist_no(old_state);
@@ -424,7 +438,7 @@
debugOut << "PGMAN: release_page_entry" << endl;
debugOut << "PGMAN: " << ptr << endl;
#endif
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ndbrequire(! (state & Page_entry::REQUEST));
ndbrequire(ptr.p->m_requests.isEmpty());
@@ -459,7 +473,7 @@
while (pl_stack.first(ptr)) // first is stack bottom
{
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
if (state & Page_entry::HOT)
{
jam();
@@ -514,7 +528,7 @@
Ptr<Page_entry> ptr;
bool ok = pl_stack.first(ptr);
ndbrequire(ok);
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
#ifdef VM_TRACE
debugOut << "PGMAN: " << ptr << ": pop from stack" << endl;
@@ -557,7 +571,7 @@
Page_stack& pl_stack = m_page_stack;
Page_queue& pl_queue = m_page_queue;
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ndbrequire(! (state & Page_entry::LOCKED));
// even non-LIRS cache pages are counted on l.h.s.
@@ -811,7 +825,7 @@
// XXX busy loop
return false;
}
- Uint16 clean_state = clean_ptr.p->m_state;
+ Page_state clean_state = clean_ptr.p->m_state;
// under unusual circumstances it could still be paging in
if (! (clean_state & Page_entry::MAPPED) ||
clean_state & Page_entry::DIRTY ||
@@ -830,6 +844,7 @@
debugOut << "PGMAN: " << clean_ptr << " : evict" << endl;
#endif
+ ndbassert(clean_ptr.p->m_dirty_count == 0);
ndbrequire(clean_state & Page_entry::ONQUEUE);
ndbrequire(clean_state & Page_entry::BOUND);
ndbrequire(clean_state & Page_entry::MAPPED);
@@ -840,7 +855,6 @@
gptr.i = clean_ptr.p->m_real_page_i;
- c_tup->disk_page_unmap_callback(clean_ptr.p->m_real_page_i);
clean_ptr.p->m_real_page_i = RNIL;
clean_state &= ~ Page_entry::BOUND;
clean_state &= ~ Page_entry::MAPPED;
@@ -853,7 +867,7 @@
m_global_page_pool.getPtr(gptr);
}
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ptr.p->m_real_page_i = gptr.i;
state |= Page_entry::BOUND;
@@ -952,7 +966,7 @@
debugOut << "PGMAN: " << ptr << " : process_callback" << endl;
#endif
int max_count = 1;
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
while (! ptr.p->m_requests.isEmpty() && --max_count >= 0)
{
@@ -981,19 +995,19 @@
{
jam();
state |= Page_entry::DIRTY;
+ ndbassert(ptr.p->m_dirty_count);
+ ptr.p->m_dirty_count --;
}
}
ndbrequire(state & Page_entry::BOUND);
ndbrequire(state & Page_entry::MAPPED);
-
+
// callback may re-enter PGMAN and change page state
set_page_state(ptr, state);
b->execute(signal, callback, ptr.p->m_real_page_i);
state = ptr.p->m_state;
-
- state &= ~ Page_entry::NO_HOOK;
}
-
+
if (ptr.p->m_requests.isEmpty())
{
jam();
@@ -1040,7 +1054,7 @@
Ptr<Page_entry> ptr = m_cleanup_ptr;
while (max_loop_count != 0 && max_count != 0)
{
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ndbrequire(! (state & Page_entry::LOCKED));
if (state & Page_entry::BUSY)
{
@@ -1057,6 +1071,8 @@
#ifdef VM_TRACE
debugOut << "PGMAN: " << ptr << " : process_cleanup" <<
endl;
#endif
+ c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i,
+ ptr.p->m_dirty_count);
pageout(signal, ptr);
max_count--;
}
@@ -1090,6 +1106,91 @@
// LCP
void
+Pgman::execLCP_PREPARE_REQ(Signal* signal)
+{
+ jamEntry();
+
+ /**
+ * Reserve pages for all LOCKED pages...
+ */
+ Ptr<Page_entry> ptr;
+ Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
+ if (pl.first(ptr))
+ {
+ process_lcp_prepare(signal, ptr);
+ }
+ else
+ {
+ signal->theData[0] = 0;
+ sendSignal(DBLQH_REF, GSN_LCP_PREPARE_CONF, signal, 1, JBB);
+ }
+}
+
+void
+Pgman::process_lcp_prepare(Signal* signal, Ptr<Page_entry> ptr)
+{
+ ndbrequire(ptr.p->m_copy_page_i == RNIL);
+
+ Ptr<GlobalPage> copy;
+ ndbrequire(m_global_page_pool.seize(copy));
+ ptr.p->m_copy_page_i = copy.i;
+
+ DBG_LCP("assigning copy page to " << ptr << endl);
+
+ signal->theData[0] = PgmanContinueB::LCP_PREPARE;
+ signal->theData[1] = ptr.i;
+ sendSignal(PGMAN_REF, GSN_CONTINUEB, signal, 2, JBB);
+}
+
+int
+Pgman::create_copy_page(Ptr<Page_entry> ptr, Uint32 req_flags)
+{
+ DBG_LCP(<< ptr << " create_copy_page ");
+
+ if (! (req_flags & DIRTY_FLAGS) && ! (ptr.p->m_state &
Page_entry::COPY))
+ {
+ DBG_LCP(" return original" << endl);
+ return ptr.p->m_real_page_i;
+ }
+ if (! (ptr.p->m_state & Page_entry::COPY))
+ {
+ ptr.p->m_state |= Page_entry::COPY;
+
+ Ptr<GlobalPage> src;
+ Ptr<GlobalPage> copy;
+ m_global_page_pool.getPtr(src, ptr.p->m_real_page_i);
+ m_global_page_pool.getPtr(copy, ptr.p->m_copy_page_i);
+ memcpy(copy.p, src.p, sizeof(GlobalPage));
+ DBG_LCP("making copy... ");
+ }
+ DBG_LCP("return " << ptr.p->m_copy_page_i);
+ return ptr.p->m_copy_page_i;
+}
+
+void
+Pgman::restore_copy_page(Ptr<Page_entry> ptr)
+{
+ DBG_LCP(ptr << " restore_copy_page");
+
+ Uint32 copyPtrI = ptr.p->m_copy_page_i;
+ if (ptr.p->m_state & Page_entry::COPY)
+ {
+ DBG_LCP(" copy back");
+ Ptr<GlobalPage> src;
+ Ptr<GlobalPage> copy;
+ m_global_page_pool.getPtr(src, ptr.p->m_real_page_i);
+ m_global_page_pool.getPtr(copy, copyPtrI);
+ memcpy(src.p, copy.p, sizeof(GlobalPage));
+ }
+
+ m_global_page_pool.release(copyPtrI);
+ DBG_LCP(endl);
+
+ ptr.p->m_state &= ~Page_entry::COPY;
+ ptr.p->m_copy_page_i = RNIL;
+}
+
+void
Pgman::execLCP_FRAG_ORD(Signal* signal)
{
LcpFragOrd* ord = (LcpFragOrd*)signal->getDataPtr();
@@ -1098,16 +1199,15 @@
DBG_LCP("execLCP_FRAG_ORD" << endl);
ndbrequire(!m_lcp_outstanding);
- ndbrequire(m_lcp_copy_page_free);
m_lcp_curr_bucket = 0;
-
+
#ifdef VM_TRACE
debugOut
<< "PGMAN: execLCP_FRAG_ORD"
<< " this=" << m_last_lcp << " last_complete=" <<
m_last_lcp_complete
<< " bucket=" << m_lcp_curr_bucket << endl;
#endif
-
+
do_lcp_loop(signal, true);
}
@@ -1168,7 +1268,7 @@
(loop ++ < 32 || iter.bucket == m_lcp_curr_bucket))
{
Ptr<Page_entry>& ptr = iter.curr;
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
DBG_LCP("LCP "
<< " m_lcp_outstanding: " << m_lcp_outstanding
@@ -1190,28 +1290,7 @@
DBG_LCP(" BUSY" << endl);
break; // wait for it
}
- if (state & Page_entry::LOCKED)
- {
- /**
- * Special handling of LOCKED pages...only write 1 at a time...
- * using copy page (m_lcp_copy_page)
- */
- if (!m_lcp_copy_page_free)
- {
- DBG_LCP(" !m_lcp_copy_page_free" << endl);
- break;
- }
- m_lcp_copy_page_free = false;
- Ptr<GlobalPage> src, copy;
- m_global_page_pool.getPtr(copy, m_lcp_copy_page);
- m_global_page_pool.getPtr(src, ptr.p->m_real_page_i);
- memcpy(copy.p, src.p, sizeof(GlobalPage));
- ptr.p->m_real_page_i = copy.i;
- ptr.p->m_copy_real_page_i = src.i;
- ptr.p->m_state |= Page_entry::LCP;
- pageout(signal, ptr);
- }
- else if (state & Page_entry::PAGEOUT)
+ if (state & Page_entry::PAGEOUT)
{
DBG_LCP(" PAGEOUT -> state |= LCP" << endl);
set_page_state(ptr, state | Page_entry::LCP);
@@ -1220,18 +1299,25 @@
{
DBG_LCP(" pageout()" << endl);
ptr.p->m_state |= Page_entry::LCP;
+ c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i,
+ ptr.p->m_dirty_count);
pageout(signal, ptr);
}
ptr.p->m_last_lcp = m_last_lcp;
m_lcp_outstanding++;
}
+ else if (ptr.p->m_copy_page_i != RNIL)
+ {
+ DBG_LCP(" NOT DIRTY" << endl);
+ restore_copy_page(ptr);
+ }
else
{
DBG_LCP(" NOT DIRTY" << endl);
}
pl_hash.next(iter);
}
-
+
m_lcp_curr_bucket = (iter.curr.i != RNIL ? iter.bucket : ~(Uint32)0);
}
@@ -1278,17 +1364,10 @@
debugOut << "PGMAN: " << ptr << endl;
#endif
ndbrequire(ptr.p->m_state & Page_entry::PAGEIN);
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
- if (!(state & Page_entry::NO_HOOK) &&
- c_tup->disk_page_load_hook(ptr.p->m_real_page_i))
- {
- state |= Page_entry::DIRTY;
- }
-
state &= ~ Page_entry::PAGEIN;
state &= ~ Page_entry::EMPTY;
- state &= ~ Page_entry::NO_HOOK;
state |= Page_entry::MAPPED;
set_page_state(ptr, state);
@@ -1307,16 +1386,14 @@
debugOut << "PGMAN: " << ptr << endl;
#endif
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ndbrequire(state & Page_entry::BOUND);
ndbrequire(state & Page_entry::MAPPED);
ndbrequire(! (state & Page_entry::BUSY));
ndbrequire(! (state & Page_entry::PAGEOUT));
- state &= ~ Page_entry::NO_HOOK;
state |= Page_entry::PAGEOUT;
- c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i);
-
+
// update lsn on page prior to write
Ptr<GlobalPage> pagePtr;
m_global_page_pool.getPtr(pagePtr, ptr.p->m_real_page_i);
@@ -1355,7 +1432,7 @@
#endif
// it is OK to be "busy" at this point (the commit is queued)
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ndbrequire(state & Page_entry::PAGEOUT);
ndbrequire(state & Page_entry::LOGSYNC);
state &= ~ Page_entry::LOGSYNC;
@@ -1373,7 +1450,7 @@
debugOut << "PGMAN: " << ptr << endl;
#endif
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ndbrequire(state & Page_entry::PAGEOUT);
state &= ~ Page_entry::PAGEOUT;
@@ -1383,22 +1460,20 @@
ndbrequire(m_stats.m_current_io_waits > 0);
m_stats.m_current_io_waits--;
- if (state & Page_entry::LOCKED)
+ if (ptr.p->m_copy_page_i != RNIL)
{
jam();
- ndbrequire(!m_lcp_copy_page_free);
- m_lcp_copy_page_free = true;
- ptr.p->m_real_page_i = ptr.p->m_copy_real_page_i;
- ptr.p->m_copy_real_page_i = RNIL;
+ restore_copy_page(ptr);
+ state &= ~ Page_entry::COPY;
}
-
+
if (state & Page_entry::LCP)
{
ndbrequire(m_lcp_outstanding);
m_lcp_outstanding--;
+ state &= ~ Page_entry::LCP;
}
- state &= ~ Page_entry::LCP;
-
+
set_page_state(ptr, state);
do_busy_loop(signal, true);
}
@@ -1511,7 +1586,7 @@
//ndbrequire(ptr.p->m_requests.isEmpty());
}
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
bool is_new = (state == 0);
bool busy_count = false;
@@ -1545,8 +1620,21 @@
state = ptr.p->m_state;
}
+ const Page_state LOCKED = Page_entry::LOCKED | Page_entry::MAPPED;
+ if ((state & LOCKED) == LOCKED &&
+ ! (req_flags & Page_request::UNLOCK_PAGE))
+ {
+ ptr.p->m_state |= (req_flags & DIRTY_FLAGS ? Page_entry::DIRTY : 0);
+ if (m_lcp_loop_on && ptr.p->m_copy_page_i != RNIL)
+ {
+ return create_copy_page(ptr, req_flags);
+ }
+
+ return ptr.p->m_real_page_i;
+ }
+
bool only_request = ptr.p->m_requests.isEmpty();
-
+
if (only_request &&
state & Page_entry::MAPPED)
{
@@ -1565,14 +1653,6 @@
ndbrequire(ptr.p->m_real_page_i != RNIL);
return ptr.p->m_real_page_i;
}
-
- if (state & Page_entry::LOCKED &&
- ! (req_flags & Page_request::UNLOCK_PAGE))
- {
- ndbrequire(ptr.p->m_copy_real_page_i != m_lcp_copy_page);
- ndbrequire(ptr.p->m_copy_real_page_i != RNIL);
- return ptr.p->m_copy_real_page_i;
- }
}
if (! (req_flags & Page_request::LOCK_PAGE))
@@ -1585,9 +1665,12 @@
{
LocalDLFifoList<Page_request>
req_list(m_page_request_pool, ptr.p->m_requests);
- req_list.seize(req_ptr);
+ if (! (req_flags & Page_request::ALLOC_REQ))
+ req_list.seize(req_ptr);
+ else
+ req_list.seizeFront(req_ptr);
}
-
+
if (req_ptr.i == RNIL)
{
if (is_new)
@@ -1607,19 +1690,15 @@
state |= Page_entry::EMPTY;
}
- if (req_flags & Page_request::NO_HOOK)
- {
- state |= Page_entry::NO_HOOK;
- }
-
if (req_flags & Page_request::UNLOCK_PAGE)
{
state &= ~ Page_entry::LOCKED;
}
ptr.p->m_busy_count += busy_count;
+ ptr.p->m_dirty_count += !!(req_flags & DIRTY_FLAGS);
set_page_state(ptr, state);
-
+
do_busy_loop(signal, true);
#ifdef VM_TRACE
@@ -1638,7 +1717,7 @@
debugOut << "PGMAN: " << ptr << endl;
#endif
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ptr.p->m_lsn = lsn;
if (state & Page_entry::BUSY)
@@ -1745,7 +1824,7 @@
Page_stack& pl_stack = m_page_stack;
Page_queue& pl_queue = m_page_queue;
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
if (! (state & (Page_entry::PAGEIN | Page_entry::PAGEOUT)))
{
ndbrequire(state & Page_entry::BOUND);
@@ -1768,7 +1847,6 @@
if (ptr.p->m_real_page_i != RNIL)
{
jam();
- c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i);
release_cache_page(ptr.p->m_real_page_i);
ptr.p->m_real_page_i = RNIL;
}
@@ -1790,7 +1868,7 @@
Pgman::verify_page_entry(Ptr<Page_entry> ptr)
{
Uint32 ptrI = ptr.i;
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
bool has_req = state & Page_entry::REQUEST;
bool has_req2 = ! ptr.p->m_requests.isEmpty();
@@ -1879,7 +1957,7 @@
{
verify_page_entry(iter.curr);
- Uint16 state = iter.curr.p->m_state;
+ Page_state state = iter.curr.p->m_state;
if (state & Page_entry::ONSTACK)
stack_count++;
if (state & Page_entry::ONQUEUE)
@@ -1906,7 +1984,7 @@
{
ndbrequire(i1 != ptr.i);
i1 = ptr.i;
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ndbrequire(state & Page_entry::ONSTACK || dump_page_lists());
if (! pl_stack.hasPrev(ptr))
ndbrequire(state & Page_entry::HOT || dump_page_lists());
@@ -1920,7 +1998,7 @@
{
ndbrequire(i2 != ptr.i);
i2 = ptr.i;
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ndbrequire(state & Page_entry::ONQUEUE || dump_page_lists());
ndbrequire(state & Page_entry::BOUND || dump_page_lists());
cold_bound_count++;
@@ -2082,8 +2160,6 @@
out << " flags=" << hex << pr.m_flags;
out << "," << dec << (pr.m_flags & Pgman::Page_request::OP_MASK);
{
- if (pr.m_flags & Pgman::Page_request::STRICT_ORDER)
- out << ",strict_order";
if (pr.m_flags & Pgman::Page_request::LOCK_PAGE)
out << ",lock_page";
if (pr.m_flags & Pgman::Page_request::EMPTY_PAGE)
@@ -2094,8 +2170,6 @@
out << ",commit_req";
if (pr.m_flags & Pgman::Page_request::DIRTY_REQ)
out << ",dirty_req";
- if (pr.m_flags & Pgman::Page_request::NO_HOOK)
- out << ",no_hook";
if (pr.m_flags & Pgman::Page_request::CORR_REQ)
out << ",corr_req";
}
@@ -2132,8 +2206,6 @@
out << ",pageout";
if (pe.m_state & Pgman::Page_entry::LOGSYNC)
out << ",logsync";
- if (pe.m_state & Pgman::Page_entry::NO_HOOK)
- out << ",no_hook";
if (pe.m_state & Pgman::Page_entry::LCP)
out << ",lcp";
if (pe.m_state & Pgman::Page_entry::HOT)
@@ -2237,10 +2309,12 @@
if (pl_hash.find(ptr, key))
{
ndbout << "pageout " << ptr << endl;
+ c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i,
+ ptr.p->m_dirty_count);
pageout(signal, ptr);
}
}
-
+
if (signal->theData[0] == 11003)
{
--- 1.3/storage/ndb/src/kernel/blocks/pgman.hpp 2006-01-11 09:26:03 +01:00
+++ 1.4/storage/ndb/src/kernel/blocks/pgman.hpp 2006-02-02 12:23:20 +01:00
@@ -249,15 +249,13 @@
struct Page_request {
enum Flags {
OP_MASK = 0x000F // 4 bits for TUP operation
- ,STRICT_ORDER = 0x0010 // maintain request order
,LOCK_PAGE = 0x0020 // lock page in memory
,EMPTY_PAGE = 0x0040 // empty (new) page
,ALLOC_REQ = 0x0080 // part of alloc
,COMMIT_REQ = 0x0100 // part of commit
,DIRTY_REQ = 0x0200 // make page dirty wo/ update_lsn
- ,NO_HOOK = 0x0400 // dont run load hook
- ,UNLOCK_PAGE = 0x0800
- ,CORR_REQ = 0x1000 // correlated request (no LIRS update)
+ ,UNLOCK_PAGE = 0x0400
+ ,CORR_REQ = 0x0800 // correlated request (no LIRS update)
};
Uint16 m_block;
@@ -286,6 +284,8 @@
Uint32 prevList;
};
+ typedef Uint16 Page_state;
+
struct Page_entry : Page_entry_stack_ptr,
Page_entry_queue_ptr,
Page_entry_sublist_ptr {
@@ -305,7 +305,7 @@
,PAGEIN = 0x0100 // paging in
,PAGEOUT = 0x0200 // paging out
,LOGSYNC = 0x0400 // undo WAL as part of pageout
- ,NO_HOOK = 0x0800 // don't run load hook
+ ,COPY = 0x0800 // Copy page for LCP
,LCP = 0x1000 // page is LCP flushed
,HOT = 0x2000 // page is hot
,ONSTACK = 0x4000 // page is on LIRS stack
@@ -324,26 +324,26 @@
,SUBLIST_COUNT = 8
};
- Uint16 m_state; // flags (0 for new entry)
-
- Uint16 m_file_no; // disk page address set at seize
+ Uint16 m_file_no; // disk page address set at seize
+ Page_state m_state; // flags (0 for new entry)
+
Uint32 m_page_no;
Uint32 m_real_page_i;
- Uint32 m_copy_real_page_i; // used for flushing LOCKED pages
-
Uint64 m_lsn;
- Uint32 m_last_lcp;
+ Uint32 m_last_lcp;
+ Uint32 m_dirty_count;
+ Uint32 m_copy_page_i;
union {
Uint32 m_busy_count; // non-zero means BUSY
Uint32 nextPool;
};
DLFifoList<Page_request>::Head m_requests;
-
+
Uint32 nextHash;
Uint32 prevHash;
-
+
Uint32 hashValue() const { return m_file_no << 16 | m_page_no; }
bool equal(const Page_entry& obj) const {
return
@@ -374,8 +374,6 @@
Uint32 m_last_lcp_complete;
Uint32 m_lcp_curr_bucket;
Uint32 m_lcp_outstanding; // remaining i/o waits
- Uint32 m_lcp_copy_page;
- bool m_lcp_copy_page_free;
EndLcpReq m_end_lcp_req;
// clean-up variables
@@ -421,9 +419,10 @@
void execREAD_CONFIG_REQ(Signal* signal);
void execCONTINUEB(Signal* signal);
+ void execLCP_PREPARE_REQ(Signal* signal);
void execLCP_FRAG_ORD(Signal*);
void execEND_LCP_REQ(Signal*);
-
+
void execFSREADCONF(Signal*);
void execFSREADREF(Signal*);
void execFSWRITECONF(Signal*);
@@ -432,8 +431,8 @@
void execDUMP_STATE_ORD(Signal* signal);
private:
- static Uint32 get_sublist_no(Uint16 state);
- void set_page_state(Ptr<Page_entry> ptr, Uint16 new_state);
+ static Uint32 get_sublist_no(Page_state state);
+ void set_page_state(Ptr<Page_entry> ptr, Page_state new_state);
bool seize_cache_page(Ptr<GlobalPage>& gptr);
void release_cache_page(Uint32 i);
@@ -463,7 +462,10 @@
void move_cleanup_ptr(Ptr<Page_entry> ptr);
bool process_lcp(Signal*);
-
+ void process_lcp_prepare(Signal* signal, Ptr<Page_entry> ptr);
+ int create_copy_page(Ptr<Page_entry>, Uint32 req_flags);
+ void restore_copy_page(Ptr<Page_entry>);
+
void pagein(Signal*, Ptr<Page_entry>);
void fsreadreq(Signal*, Ptr<Page_entry>);
void fsreadconf(Signal*, Ptr<Page_entry>);
@@ -510,13 +512,11 @@
Ptr<GlobalPage> m_ptr; // TODO remove
enum RequestFlags {
- STRICT_ORDER = Pgman::Page_request::STRICT_ORDER
- ,LOCK_PAGE = Pgman::Page_request::LOCK_PAGE
+ LOCK_PAGE = Pgman::Page_request::LOCK_PAGE
,EMPTY_PAGE = Pgman::Page_request::EMPTY_PAGE
,ALLOC_REQ = Pgman::Page_request::ALLOC_REQ
,COMMIT_REQ = Pgman::Page_request::COMMIT_REQ
,DIRTY_REQ = Pgman::Page_request::DIRTY_REQ
- ,NO_HOOK = Pgman::Page_request::NO_HOOK
,UNLOCK_PAGE = Pgman::Page_request::UNLOCK_PAGE
,CORR_REQ = Pgman::Page_request::CORR_REQ
};
--- 1.5/storage/ndb/src/kernel/blocks/tsman.cpp 2006-01-27 07:34:14 +01:00
+++ 1.6/storage/ndb/src/kernel/blocks/tsman.cpp 2006-02-02 12:23:20 +01:00
@@ -32,6 +32,12 @@
#define JONAS 0
+#define COMMITTED_MASK ((1 << 0) | (1 << 1))
+#define UNCOMMITTED_MASK ((1 << 2) | (1 << 3))
+#define UNCOMMITTED_SHIFT 2
+
+#define DBG_UNDO 0
+
Tsman::Tsman(const Configuration & conf, class Pgman* pg, class Lgman* lg) :
SimulatedBlock(TSMAN, conf),
m_file_hash(m_file_pool),
@@ -41,6 +47,10 @@
m_lgman(lg)
{
BLOCK_CONSTRUCTOR(Tsman);
+
+ Uint32 SZ = File_formats::Datafile::EXTENT_HEADER_BITMASK_BITS_PER_PAGE;
+ ndbrequire((COMMITTED_MASK & UNCOMMITTED_MASK) == 0);
+ ndbrequire((COMMITTED_MASK | UNCOMMITTED_MASK) == ((1 << SZ) - 1));
// Add received signals
addRecSignal(GSN_STTOR, &Tsman::execSTTOR);
@@ -548,7 +558,7 @@
safe_cast(&Tsman::release_extent_pages_callback);
int page_id;
- int flags = Page_cache_client::UNLOCK_PAGE | Page_cache_client::NO_HOOK;
+ int flags = Page_cache_client::UNLOCK_PAGE;
if((page_id = m_page_cache_client.get_page(signal, preq, flags)) > 0)
{
execute(signal, preq.m_callback, page_id);
@@ -1039,7 +1049,7 @@
safe_cast(&Tsman::load_extent_page_callback);
int page_id;
- int flags = Page_cache_client::LOCK_PAGE | Page_cache_client::NO_HOOK;
+ int flags = Page_cache_client::LOCK_PAGE;
if((page_id = m_page_cache_client.get_page(signal, preq, flags)) > 0)
{
load_extent_page_callback(signal, ptr.i, (Uint32)page_id);
@@ -1202,7 +1212,7 @@
Uint32 extent_no = extents - j - 1;
File_formats::Datafile::Extent_header* header=
page->get_header(extent_no, size);
- if(header->m_table == RNIL)
+ if (header->m_table == RNIL)
{
header->m_next_free_extent = firstFree;
firstFree = page_no * per_page + extent_no;
@@ -1221,9 +1231,9 @@
ptr.p->m_online.m_used_extent_cnt++;
for(Uint32 i = 0; i<size; i++, key.m_page_no++)
{
- Uint32 bits= header->get_free_bits(i) & ~(1 << (SZ - 1));
- header->update_free_bits(i, bits);
- tup->disk_restart_page_bits(tableId, fragmentId, &key, 0, bits);
+ Uint32 bits= header->get_free_bits(i) & COMMITTED_MASK;
+ header->update_free_bits(i, bits | (bits << UNCOMMITTED_SHIFT));
+ tup->disk_restart_page_bits(tableId, fragmentId, &key, bits);
}
}
else
@@ -1550,7 +1560,9 @@
int
Tsman::update_page_free_bits(Signal* signal,
- Local_key *key, unsigned bit, Uint64 lsn)
+ Local_key *key,
+ unsigned committed_bits,
+ Uint64 lsn)
{
jamEntry();
@@ -1600,20 +1612,12 @@
/**
* Toggle word
*/
- bit |= header->get_free_bits(page_no_in_extent) & (1 << (SZ - 1));
- header->update_free_bits(page_no_in_extent, bit);
+ ndbassert((committed_bits & ~(COMMITTED_MASK)) == 0);
+ Uint32 src = header->get_free_bits(page_no_in_extent) & UNCOMMITTED_MASK;
+ header->update_free_bits(page_no_in_extent, src | committed_bits);
-#ifdef VM_TRACE
- if(! (bit & ((1 << (SZ - 1)) - 1)) &&
header->check_free(eh_words))
- {
- ndbout_c("Extent is now free!!");
- }
-#endif
-
- /**
- * Update lsn
- */
m_page_cache_client.update_lsn(preq.m_page, lsn);
+
return 0;
}
@@ -1621,7 +1625,9 @@
}
int
-Tsman::get_page_free_bits(Signal* signal, Local_key *key, unsigned* bits)
+Tsman::get_page_free_bits(Signal* signal, Local_key *key,
+ unsigned* uncommitted,
+ unsigned* committed)
{
jamEntry();
@@ -1664,7 +1670,9 @@
ndbrequire(header->m_table != RNIL);
Uint32 page_no_in_extent = (key->m_page_no - data_off) % size;
- *bits = header->get_free_bits(page_no_in_extent);
+ Uint32 bits = header->get_free_bits(page_no_in_extent);
+ *uncommitted = (bits & UNCOMMITTED_MASK) >> UNCOMMITTED_SHIFT;
+ *committed = (bits & COMMITTED_MASK);
return 0;
}
@@ -1672,7 +1680,7 @@
}
int
-Tsman::unmap_page(Signal* signal, Local_key *key)
+Tsman::unmap_page(Signal* signal, Local_key *key, Uint32 uncommitted_bits)
{
jamEntry();
@@ -1704,7 +1712,7 @@
/**
* Handling of unmapped extent header pages is not implemented
*/
- int flags = Page_cache_client::DIRTY_REQ;
+ int flags = 0;
int real_page_id;
if ((real_page_id = m_page_cache_client.get_page(signal, preq, flags)) > 0)
{
@@ -1722,13 +1730,106 @@
/**
* Toggle word
*/
- Uint32 old = header->get_free_bits(page_no_in_extent);
- unsigned bit =
- (header->get_free_bits(page_no_in_extent) & ((1 << (SZ - 1)) - 1));
- header->update_free_bits(page_no_in_extent, bit);
- if (JONAS)
- ndbout_c("toggle page: (%d, %d, %d) from %x to %x",
- key->m_page_no, extent, page_no_in_extent, old, bit);
+ ndbassert(((uncommitted_bits << UNCOMMITTED_SHIFT) & ~UNCOMMITTED_MASK) ==
0);
+ Uint32 src = header->get_free_bits(page_no_in_extent) & COMMITTED_MASK;
+ header->update_free_bits(page_no_in_extent,
+ src | (uncommitted_bits << UNCOMMITTED_SHIFT));
+ }
+
+ return AllocExtentReq::UnmappedExtentPageIsNotImplemented;
+}
+
+int
+Tsman::restart_undo_page_free_bits(Signal* signal,
+ Uint32 tableId,
+ Uint32 fragId,
+ Local_key *key,
+ unsigned bits,
+ Uint64 undo_lsn)
+{
+ jamEntry();
+
+ /**
+ * 1) Compute which extent_no key belongs to
+ * 2) Find out which page extent_no belongs to
+ * 3) Undo log m_page_bitmask
+ * 4) Update m_page_bitmask
+ */
+ Ptr<Datafile> file_ptr;
+ Datafile file_key;
+ file_key.m_file_no = key->m_file_no;
+ ndbrequire(m_file_hash.find(file_ptr, file_key));
+
+ Uint32 size = file_ptr.p->m_extent_size;
+ Uint32 data_off = file_ptr.p->m_online.m_offset_data_pages;
+ Uint32 eh_words = File_formats::Datafile::extent_header_words(size);
+ Uint32 per_page = File_formats::Datafile::EXTENT_PAGE_WORDS/eh_words;
+ Uint32 SZ= File_formats::Datafile::EXTENT_HEADER_BITMASK_BITS_PER_PAGE;
+
+ Uint32 extent = (key->m_page_no - data_off) / size + per_page;
+ Uint32 page_no = extent / per_page;
+ Uint32 extent_no = extent % per_page;
+
+ Page_cache_client::Request preq;
+ preq.m_page.m_page_no = page_no;
+ preq.m_page.m_file_no = key->m_file_no;
+
+ /**
+ * Handling of unmapped extent header pages is not implemented
+ */
+ int flags = 0;
+ int real_page_id;
+ if ((real_page_id = m_page_cache_client.get_page(signal, preq, flags)) > 0)
+ {
+ GlobalPage* ptr_p = m_page_cache_client.m_ptr.p;
+
+ File_formats::Datafile::Extent_page* page =
+ (File_formats::Datafile::Extent_page*)ptr_p;
+ File_formats::Datafile::Extent_header* header =
+ page->get_header(extent_no, size);
+
+ if (header->m_table == RNIL)
+ {
+ if (DBG_UNDO)
+ ndbout_c("tsman: apply undo - skip table == RNIL");
+ return 0;
+ }
+
+ ndbrequire(header->m_table == tableId);
+ ndbrequire(header->m_fragment_id == fragId);
+
+ Uint32 page_no_in_extent = (key->m_page_no - data_off) % size;
+ Uint32 src = header->get_free_bits(page_no_in_extent);
+
+ Uint64 lsn = 0;
+ lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
+ lsn += page->m_page_header.m_page_lsn_lo;
+
+ if (undo_lsn <= lsn)
+ {
+ /**
+ * Toggle word
+ */
+ if (DBG_UNDO)
+ ndbout_c("tsman: apply %lld(%lld) %x -> %x",
+ undo_lsn, lsn, src, (bits | (bits << UNCOMMITTED_SHIFT)));
+
+ lsn = undo_lsn;
+ page->m_page_header.m_page_lsn_hi = lsn >> 32;
+ page->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF;
+ ndbassert((bits & ~(COMMITTED_MASK)) == 0);
+ header->update_free_bits(page_no_in_extent,
+ bits | (bits << UNCOMMITTED_SHIFT));
+
+ m_page_cache_client.update_lsn(preq.m_page, lsn);
+ }
+ else
+ {
+ if (DBG_UNDO)
+ ndbout_c("tsman: apply %lld(%lld) %x -> %x",
+ undo_lsn, lsn, src, (bits | (bits << UNCOMMITTED_SHIFT)));
+ }
+
return 0;
}
@@ -1773,7 +1874,7 @@
/**
* Handling of unmapped extent header pages is not implemented
*/
- int flags = Page_cache_client::DIRTY_REQ;
+ int flags = 0;
int real_page_id;
Uint32 page_no;
Uint32 src_bits;
@@ -1799,6 +1900,8 @@
* 3 = 11 - full - less than pct_free% free, pct_free=10%
*/
+ Uint32 reqbits = req.bits << UNCOMMITTED_SHIFT;
+
/**
* Search
*/
@@ -1806,7 +1909,7 @@
for(page_no= page_no_in_extent; page_no<size; page_no++)
{
src_bits= (* src >> shift) & ((1 << SZ) - 1);
- if(src_bits <= req.bits)
+ if((src_bits & UNCOMMITTED_MASK) <= reqbits)
{
goto found;
}
@@ -1820,7 +1923,7 @@
for(page_no= 0; page_no<page_no_in_extent; page_no++)
{
src_bits= (* src >> shift) & ((1 << SZ) - 1);
- if(src_bits <= req.bits)
+ if((src_bits & UNCOMMITTED_MASK) <= reqbits)
{
goto found;
}
@@ -1844,97 +1947,11 @@
return;
found:
- if (JONAS)
- ndbout_c("alloc page: (%d, %d, %d)",
- data_off + extent * size + page_no, per_page + extent, page_no);
- src_bits |= (1 << (SZ - 1)); // high unlogged, allocated bit
- header->update_free_bits(page_no, src_bits);
- rep->bits= src_bits & ((1 << (SZ - 1)) - 1);
+ header->update_free_bits(page_no, src_bits | UNCOMMITTED_MASK);
+ rep->bits= (src_bits & UNCOMMITTED_MASK) >> UNCOMMITTED_SHIFT;
rep->key.m_page_no= data_off + extent * size + page_no;
rep->reply.errorCode= 0;
return;
-}
-
-int
-Tsman::restart_undo_page_free_bits(Signal* signal,
- Local_key *key, unsigned bit,
- Uint64 undo_lsn)
-{
- jamEntry();
-
- /**
- * 1) Compute which extent_no key belongs to
- * 2) Find out which page extent_no belongs to
- * 3) Undo log m_page_bitmask
- * 4) Update m_page_bitmask
- */
- Ptr<Datafile> file_ptr;
- Datafile file_key;
- file_key.m_file_no = key->m_file_no;
- ndbrequire(m_file_hash.find(file_ptr, file_key));
-
- Uint32 size = file_ptr.p->m_extent_size;
- Uint32 data_off = file_ptr.p->m_online.m_offset_data_pages;
- Uint32 eh_words = File_formats::Datafile::extent_header_words(size);
- Uint32 per_page = File_formats::Datafile::EXTENT_PAGE_WORDS/eh_words;
- Uint32 SZ= File_formats::Datafile::EXTENT_HEADER_BITMASK_BITS_PER_PAGE;
-
- Uint32 extent = (key->m_page_no - data_off) / size + per_page;
- Uint32 page_no = extent / per_page;
- Uint32 extent_no = extent % per_page;
-
- Page_cache_client::Request preq;
- preq.m_page.m_page_no = page_no;
- preq.m_page.m_file_no = key->m_file_no;
-
- /**
- * Handling of unmapped extent header pages is not implemented
- */
- int flags = Page_cache_client::COMMIT_REQ;
- int real_page_id;
- if ((real_page_id = m_page_cache_client.get_page(signal, preq, flags)) > 0)
- {
- GlobalPage* ptr_p = m_page_cache_client.m_ptr.p;
-
- File_formats::Datafile::Extent_page* page =
- (File_formats::Datafile::Extent_page*)ptr_p;
-
- Uint64 lsn = 0;
- lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
- lsn += page->m_page_header.m_page_lsn_lo;
-
- if (undo_lsn <= lsn)
- {
- File_formats::Datafile::Extent_header* header =
- page->get_header(extent_no, size);
-
- Uint32 tableId = header->m_table;
- Uint32 fragmentId = header->m_fragment_id;
- ndbrequire(tableId != RNIL);
- ndbrequire(fragmentId != RNIL);
-
- Uint32 page_no_in_extent = (key->m_page_no - data_off) % size;
-
- Uint32 old_bits = header->get_free_bits(page_no_in_extent);
- if (old_bits != bit)
- {
- ndbout << "tsman toggle " << *key << " from " << old_bits
<< " to "
- << bit << endl;
- Dbtup* tup= (Dbtup*)globalData.getBlock(DBTUP);
- header->update_free_bits(page_no_in_extent, bit);
- tup->disk_restart_page_bits(tableId, fragmentId, key, old_bits, bit);
- }
- lsn--; // prevent UNDO from being run again...
- }
- else
- {
- ndbout_c("tsman skipping undo %lld %lld", undo_lsn, lsn);
- }
-
- m_page_cache_client.update_lsn(preq.m_page, lsn);
- return 0;
- }
- return AllocExtentReq::UnmappedExtentPageIsNotImplemented;
}
void
--- 1.2/storage/ndb/src/kernel/blocks/tsman.hpp 2006-01-11 09:26:03 +01:00
+++ 1.3/storage/ndb/src/kernel/blocks/tsman.hpp 2006-02-02 12:23:20 +01:00
@@ -195,11 +195,13 @@
void load_extent_page_callback(Signal*, Uint32, Uint32);
void create_file_ref(Signal*, Ptr<Tablespace>, Ptr<Datafile>,
Uint32,Uint32,Uint32);
- int update_page_free_bits(Signal*, Local_key*, unsigned bits, Uint64 lsn);
- int get_page_free_bits(Signal*, Local_key*, unsigned* bits);
- int unmap_page(Signal*, Local_key*);
- int restart_undo_page_free_bits(Signal*, Local_key*, unsigned, Uint64);
-
+ int update_page_free_bits(Signal*, Local_key*, unsigned committed_bits,
+ Uint64 lsn);
+ int get_page_free_bits(Signal*, Local_key*, unsigned*, unsigned*);
+ int unmap_page(Signal*, Local_key*, unsigned uncommitted_bits);
+ int restart_undo_page_free_bits(Signal*, Uint32, Uint32, Local_key*,
+ unsigned committed_bits, Uint64 lsn);
+
int alloc_extent(Signal* signal, Uint32 tablespace, Local_key* key);
int alloc_page_from_extent(Signal*, Uint32, Local_key*, Uint32 bits);
@@ -266,22 +268,23 @@
* Update page free bits
*/
int update_page_free_bits(Local_key*, unsigned bits, Uint64 lsn);
-
+
/**
* Get page free bits
*/
- int get_page_free_bits(Local_key*, unsigned* bits);
-
+ int get_page_free_bits(Local_key*,
+ unsigned* uncommitted, unsigned* committed);
+
/**
* Update unlogged page free bit
*/
- int unmap_page(Local_key*);
-
+ int unmap_page(Local_key*, Uint32 bits);
+
/**
* Undo handling of page bits
*/
int restart_undo_page_free_bits(Local_key*, unsigned bits, Uint64 lsn);
-
+
/**
* Get tablespace info
*
@@ -353,32 +356,40 @@
inline
int
Tablespace_client::update_page_free_bits(Local_key *key,
- unsigned bits, Uint64 lsn)
+ unsigned committed_bits,
+ Uint64 lsn)
{
- return m_tsman->update_page_free_bits(m_signal, key, bits, lsn);
+ return m_tsman->update_page_free_bits(m_signal, key, committed_bits, lsn);
}
inline
int
-Tablespace_client::get_page_free_bits(Local_key *key, unsigned* bits)
+Tablespace_client::get_page_free_bits(Local_key *key,
+ unsigned* uncommited,
+ unsigned* commited)
{
- return m_tsman->get_page_free_bits(m_signal, key, bits);
+ return m_tsman->get_page_free_bits(m_signal, key, uncommited, commited);
}
inline
int
-Tablespace_client::unmap_page(Local_key *key)
+Tablespace_client::unmap_page(Local_key *key, unsigned uncommitted_bits)
{
- return m_tsman->unmap_page(m_signal, key);
+ return m_tsman->unmap_page(m_signal, key, uncommitted_bits);
}
inline
int
Tablespace_client::restart_undo_page_free_bits(Local_key* key,
- unsigned bits, Uint64 lsn)
+ unsigned committed_bits,
+ Uint64 lsn)
{
return m_tsman->restart_undo_page_free_bits(m_signal,
- key, bits, lsn);
+ m_table_id,
+ m_fragment_id,
+ key,
+ committed_bits,
+ lsn);
}
--- 1.43/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2006-01-11 09:26:03 +01:00
+++ 1.44/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2006-02-02 12:23:19 +01:00
@@ -1009,8 +1009,6 @@
LCP_SR_STARTED = 10,
LCP_SR_COMPLETED = 11
};
- LcpLocRecord m_acc;
- LcpLocRecord m_tup;
LcpState lcpState;
bool firstFragmentFlag;
@@ -1028,6 +1026,7 @@
bool reportEmpty;
NdbNodeBitmask m_EMPTY_LCP_REQ;
+ Uint32 m_error;
Uint32 m_outstanding;
}; // Size 76 bytes
typedef Ptr<LcpRecord> LcpRecordPtr;
@@ -2248,10 +2247,6 @@
bool checkLcpStarted(Signal* signal);
void checkLcpTupprep(Signal* signal);
void getNextFragForLcp(Signal* signal);
- void initLcpLocAcc(Signal* signal, Uint32 fragId);
- void initLcpLocTup(Signal* signal, Uint32 fragId);
- void releaseLocalLcps(Signal* signal);
- void seizeLcpLoc(Signal* signal);
void sendAccContOp(Signal* signal);
void sendStartLcp(Signal* signal);
void setLogTail(Signal* signal, Uint32 keepGci);
@@ -2283,7 +2278,6 @@
void checkNewMbyte(Signal* signal);
void checkReadExecSr(Signal* signal);
void checkScanTcCompleted(Signal* signal);
- void checkSrCompleted(Signal* signal);
void closeFile(Signal* signal, LogFileRecordPtr logFilePtr, Uint32 place);
void completedLogPage(Signal* signal, Uint32 clpType, Uint32 place);
void deleteFragrec(Uint32 fragId);
@@ -2302,7 +2296,6 @@
void initialiseFragrec(Signal* signal);
void initialiseGcprec(Signal* signal);
void initialiseLcpRec(Signal* signal);
- void initialiseLcpLocrec(Signal* signal);
void initialiseLfo(Signal* signal);
void initialiseLogFile(Signal* signal);
void initialiseLogPage(Signal* signal);
@@ -2346,7 +2339,6 @@
void releaseActiveCopy(Signal* signal);
void releaseAddfragrec(Signal* signal);
void releaseFragrec();
- void releaseLcpLoc(Signal* signal);
void releaseOprec(Signal* signal);
void releasePageRef(Signal* signal);
void releaseMmPages(Signal* signal);
--- 1.87/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2006-01-17 08:37:32 +01:00
+++ 1.88/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2006-02-02 12:23:19 +01:00
@@ -10923,9 +10923,38 @@
tabptr.i = ref->tableId;
ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
- lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED;
- lcpPtr.p->m_acc.lcpLocstate = LcpLocRecord::ACC_COMPLETED;
- contChkpNextFragLab(signal);
+ ndbrequire(lcpPtr.p->m_outstanding);
+ lcpPtr.p->m_outstanding--;
+
+ /**
+ * Only BACKUP is allowed to ref LCP_PREPARE
+ */
+ ndbrequire(refToBlock(signal->getSendersBlockRef()) == BACKUP);
+ lcpPtr.p->m_error = ref->errorCode;
+
+ if (lcpPtr.p->m_outstanding == 0)
+ {
+ jam();
+
+ if(lcpPtr.p->firstFragmentFlag)
+ {
+ jam();
+ LcpFragOrd *ord= (LcpFragOrd*)signal->getDataPtrSend();
+ lcpPtr.p->firstFragmentFlag= false;
+ *ord = lcpPtr.p->currentFragment.lcpFragOrd;
+ EXECUTE_DIRECT(PGMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
+ jamEntry();
+
+ /**
+ * First fragment mean that last LCP is complete :-)
+ */
+ EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, signal->length());
+ jamEntry();
+ }
+
+ lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED;
+ contChkpNextFragLab(signal);
+ }
}
/* --------------------------------------------------------------------------
@@ -10945,72 +10974,86 @@
fragptr.i = lcpPtr.p->currentFragment.fragPtrI;
c_fragment_pool.getPtr(fragptr);
-
- ndbrequire(conf->tableId == fragptr.p->tabRef);
- ndbrequire(conf->fragmentId == fragptr.p->fragId);
-
- lcpPtr.p->lcpState = LcpRecord::LCP_WAIT_HOLDOPS;
- lcpPtr.p->m_acc.lcpLocstate = LcpLocRecord::WAIT_LCPHOLDOP;
- lcpPtr.p->lcpState = LcpRecord::LCP_START_CHKP;
- lcpPtr.p->m_acc.lcpLocstate = LcpLocRecord::HOLDOP_READY;
+ if (refToBlock(signal->getSendersBlockRef()) != PGMAN)
+ {
+ ndbrequire(conf->tableId == fragptr.p->tabRef);
+ ndbrequire(conf->fragmentId == fragptr.p->fragId);
+ }
- /* ----------------------------------------------------------------------
- * UPDATE THE MAX_GCI_IN_LCP AND MAX_GCI_COMPLETED_IN_LCP NOW BEFORE
- * ACTIVATING THE FRAGMENT AGAIN.
- * --------------------------------------------------------------------- */
- ndbrequire(lcpPtr.p->currentFragment.lcpFragOrd.lcpNo < MAX_LCP_STORED);
- fragptr.p->maxGciInLcp = fragptr.p->newestGci;