List:Commits« Previous MessageNext Message »
From:Vladislav Vaintroub Date:April 14 2009 5:25pm
Subject:bzr commit into mysql-6.0-falcon-team branch (vvaintroub:3122)
Bug#37056
View as plain text  
#At file:///G:/bzr/mysql-6.0-falcon-team/ based on revid:harrison@hotspur-20090414143638-twnbdsfrnswum1c0

 3122 Vladislav Vaintroub	2009-04-14 [merge]
      Bug#37056: 
      here is the patch that causes crashes . Provided for documentation purposes

    modified:
      storage/falcon/IndexPage.cpp
      storage/falcon/IndexPage.h
      storage/falcon/IndexRootPage.cpp
      storage/falcon/IndexRootPage.h
=== modified file 'storage/falcon/IndexPage.cpp'
--- a/storage/falcon/IndexPage.cpp	2009-04-02 22:36:19 +0000
+++ b/storage/falcon/IndexPage.cpp	2009-04-14 17:24:52 +0000
@@ -314,14 +314,15 @@ int IndexPage::computePrefix(IndexKey *k
 	return n;
 }
 
-Bdb* IndexPage::splitIndexPageMiddle(Dbb * dbb, Bdb *bdb, IndexKey *splitKey, TransId transId)
+Bdb* IndexPage::splitIndexPageMiddle(Dbb * dbb, Bdb *bdb, IndexKey *insertKey,
+	int recordNumber, TransId transId)
 {
 	if (dbb->debug & (DEBUG_PAGES | DEBUG_SPLIT_PAGE))
 		printPage (this, 0, false);
 
 	// Fudge page end to handle the all-duplicates problem
 
-	UCHAR *key = splitKey->key;
+	UCHAR key[MAX_PHYSICAL_KEY_LENGTH];
 	Btn *pageEnd = (Btn*) ((char*) this + dbb->pageSize - sizeof(Btn) - 256);
 	Btn *midpoint = (Btn*) ((UCHAR*) nodes + 
 					  (length - OFFSET (IndexPage*, nodes)) / 2);
@@ -366,9 +367,9 @@ Bdb* IndexPage::splitIndexPageMiddle(Dbb
 
 	// If we didn't find a break, use the last one.  This may be the first node
 
-	int kLength = splitKey->keyLength = node.offset + node.length;
+	int kLength = node.offset + node.length;
 
-	// Allocate and format new page.  Link forward and backward to siblings.
+	// Allocate and format new page.
 
 	Bdb *splitBdb = splitPage(dbb, bdb, transId);
 	IndexPage *split = (IndexPage*) splitBdb->buffer;
@@ -376,9 +377,9 @@ Bdb* IndexPage::splitIndexPageMiddle(Dbb
 	/* Copy midpoint node to new page.  Compute length, then
 	   copy tail of page to new page */
 
-	int32 recordNumber = node.getNumber();
+	int32 splitRecordNumber = node.getNumber();
 	IndexNode newNode;
-	newNode.insert(split->nodes, 0, kLength, key, recordNumber);
+	newNode.insert(split->nodes, 0, kLength, key, splitRecordNumber);
 	int newNodeSize = IndexNode::nodeLength(0, kLength, recordNumber);
 
 	memcpy(newNode.nextNode, node.nextNode, tailLength);
@@ -394,9 +395,6 @@ Bdb* IndexPage::splitIndexPageMiddle(Dbb
 	split->length = (int) (((UCHAR*) newNode.nextNode + tailLength) - (UCHAR*) split);
 	//split->validate(this);
 
-	if (level == 0)
-		splitKey->appendRecordNumber(recordNumber);
-
 	// Split supernodes
 	memset(split->superNodes,0, sizeof(split->superNodes));
 	int i = 0;
@@ -424,6 +422,22 @@ Bdb* IndexPage::splitIndexPageMiddle(Dbb
 		printPage (splitBdb, false);
 		}
 
+	
+
+	IndexKey splitKey;
+	IndexNode splitNode(split);
+	splitKey.setKey(splitNode.keyLength(), splitNode.key);
+	if (split->level == 0)
+		{
+		int splitRecordNumber = splitNode.getNumber();
+		splitKey.appendRecordNumber(splitRecordNumber);
+		}
+
+	
+	AddNodeResult res = addNode(dbb, insertKey, recordNumber);
+	if (res == NextPage)
+		res = split->addNode(dbb, insertKey, recordNumber);
+	ASSERT(res == NodeAdded || res == Duplicate);
 	return splitBdb;
 }
 
@@ -1065,7 +1079,8 @@ bool IndexPage::isLastNode(Btn *indexNod
 	return (char*) node.nextNode == (char*) this + length;
 }
 
-Bdb* IndexPage::splitIndexPageEnd(Dbb *dbb, Bdb *bdb, TransId transId, IndexKey *insertKey, int recordNumber)
+Bdb* IndexPage::splitIndexPageEnd(Dbb *dbb, Bdb *bdb, IndexKey *insertKey,
+	int recordNumber, TransId transId)
 {
 	if (dbb->debug & (DEBUG_KEYS | DEBUG_SPLIT_PAGE))
 		Btn::printKey("insert key", insertKey, 0, false);

=== modified file 'storage/falcon/IndexPage.h'
--- a/storage/falcon/IndexPage.h	2009-04-02 22:36:19 +0000
+++ b/storage/falcon/IndexPage.h	2009-04-14 17:24:52 +0000
@@ -48,8 +48,8 @@ public:
 	Btn*		findInsertionPoint (IndexKey *indexKey, int32 recordNumber, IndexKey *expandedKey);
 	Btn*		findInsertionPoint(int level, IndexKey* indexKey, int32 recordNumber, IndexKey* expandedKey, Btn* from, Btn* bucketEnd);
 	Bdb*		splitPage (Dbb *dbb, Bdb *bdb, TransId transId);
-	Bdb*		splitIndexPageEnd(Dbb *dbb, Bdb *bdb, TransId transId, IndexKey *insertKey, int recordNumber);
-	Bdb*		splitIndexPageMiddle(Dbb *dbb, Bdb *bdb, IndexKey *splitKey, TransId transId);
+	Bdb*		splitIndexPageEnd(Dbb *dbb, Bdb *bdb, IndexKey *insertKey, int recordNumber, TransId transId);
+	Bdb*		splitIndexPageMiddle(Dbb *dbb, Bdb *bdb, IndexKey *insertKey, int recordNumber, TransId transId);
 	bool		isLastNode (Btn *node);
 
 	void		analyze (int pageNumber);

=== modified file 'storage/falcon/IndexRootPage.cpp'
--- a/storage/falcon/IndexRootPage.cpp	2009-04-09 07:06:45 +0000
+++ b/storage/falcon/IndexRootPage.cpp	2009-04-14 17:24:52 +0000
@@ -120,99 +120,65 @@ bool IndexRootPage::addIndexEntry(Dbb * 
 		
 	// Multiple threads may attempt to update the same index. If necessary, make several attempts.
 	
-	for (int n = 0; n < 10; ++n)
-		{
-		/* Find insert page and position on page */
-
-		bool isRoot;
-		Bdb *bdb = findInsertionLeaf(dbb, indexId, &searchKey, recordNumber, transId,&isRoot);
-
-		if (!bdb)
-			return false;
-
-		IndexPage *page;
-
-		for (;;)
-			{
-			page = (IndexPage*) bdb->buffer;
-			Btn *node = page->findNodeInLeaf(key, NULL);
-			Btn *bucketEnd = (Btn*) ((char*) page + page->length);
 
-			if (node < bucketEnd || page->nextPage == 0)
-				break;
+	/* Find insert page and position on page */
 
-			ASSERT (bdb->pageNumber != page->nextPage);
-			bdb = dbb->handoffPage (bdb, page->nextPage, PAGE_btree, Exclusive);
-			isRoot = false;
-			BDB_HISTORY(bdb);
-			}
+	bool isRoot;
+	Bdb *bdb = findInsertionLeaf(dbb, indexId, &searchKey, recordNumber, transId,&isRoot);
 
-		bdb->mark(transId);
-
-		/* If the node fits on page, we're done */
-
-		AddNodeResult result;
+	if (!bdb)
+		return false;
 
-		for (;;)
-			{
-			result = page->addNode(dbb, key, recordNumber);
+	IndexPage *page;
 
-			if (result != NextPage)
-				break;
+	for (;;)
+		{
+		page = (IndexPage*) bdb->buffer;
+		Btn *node = page->findNodeInLeaf(key, NULL);
+		Btn *bucketEnd = (Btn*) ((char*) page + page->length);
 
-			bdb = dbb->handoffPage(bdb, page->nextPage, PAGE_btree, Exclusive);
-			isRoot = false;
-			BDB_HISTORY(bdb);
-			bdb->mark(transId);
-			page = (IndexPage*) bdb->buffer;
-			}
+		if (node < bucketEnd || page->nextPage == 0)
+			break;
 
-		if (result == NodeAdded || result == Duplicate)
-			{
-			if (dbb->debug & (DEBUG_PAGES | DEBUG_NODES_ADDED))
-				page->printPage (bdb, false);
-				
-			//page->validateInsertion (length, key, recordNumber);
-			bdb->release(REL_HISTORY);
+		ASSERT (bdb->pageNumber != page->nextPage);
+		bdb = dbb->handoffPage (bdb, page->nextPage, PAGE_btree, Exclusive);
+		isRoot = false;
+		BDB_HISTORY(bdb);
+		}
 
-			return true;
-			}
+	bdb->mark(transId);
 
-		/* Node didn't fit.  Split the page and propagate the
-		   split upward.  Sooner or later we'll go back and re-try
-		   the original insertion */
+	/* If the node fits on page, we're done */
 
-		if (splitIndexPage (dbb, indexId, bdb, transId, result, key, recordNumber, isRoot))
-			return true;
+	AddNodeResult result;
 
-#ifdef _DEBUG
-		if (n)
-			{
-			++dbb->debug;
-			Btn::printKey ("Key", key, 0, false);
-			Btn::printKey ("SearchKey", &searchKey, 0, false);
-			Bdb *bdb = findInsertionLeaf (dbb, indexId, &searchKey, recordNumber, transId);
-			IndexPage *page = (IndexPage*) bdb->buffer;
+	for (;;)
+		{
+		result = page->addNode(dbb, key, recordNumber);
 
-			while (page->nextPage)
-				{
-				bdb = dbb->handoffPage (bdb, page->nextPage, PAGE_btree, Exclusive);
-				BDB_HISTORY(bdb);
-				page = (IndexPage*) bdb->buffer;
-				page->printPage(bdb, false);
-				}
+		if (result != NextPage)
+			break;
 
-			bdb->release(REL_HISTORY);
-			--dbb->debug;
-			}
-#endif
+		bdb = dbb->handoffPage(bdb, page->nextPage, PAGE_btree, Exclusive);
+		isRoot = false;
+		BDB_HISTORY(bdb);
+		bdb->mark(transId);
+		page = (IndexPage*) bdb->buffer;
 		}
 
+	if (result == NodeAdded || result == Duplicate)
+		{
+		if (dbb->debug & (DEBUG_PAGES | DEBUG_NODES_ADDED))
+			page->printPage (bdb, false);
+			
+		//page->validateInsertion (length, key, recordNumber);
+		bdb->release(REL_HISTORY);
+		return true;
+		}
 
-	ASSERT (false);
-	FATAL ("index split failed");
-
-	return false;
+	// Node didn't fit. Split the page and propagate the split upwards.
+	splitIndexPage (dbb, indexId, bdb, transId, result, key, recordNumber, isRoot);
+	return true;
 }
 
 Bdb* IndexRootPage::findLeaf(Dbb *dbb, int32 indexId, int32 rootPage, IndexKey *indexKey, LockType lockType, TransId transId)
@@ -482,7 +448,7 @@ void IndexRootPage::scanIndex  (Dbb *dbb
 		}
 }
 
-bool IndexRootPage::splitIndexPage(Dbb * dbb, int32 indexId, Bdb * bdb, TransId transId,
+void IndexRootPage::splitIndexPage(Dbb * dbb, int32 indexId, Bdb * bdb, TransId transId,
 								   AddNodeResult addResult, IndexKey *indexKey, int recordNumber, bool isRoot)
 {
 
@@ -491,9 +457,10 @@ bool IndexRootPage::splitIndexPage(Dbb *
 	Bdb *splitBdb;
 	
 	if (addResult == SplitEnd)
-		splitBdb = page->splitIndexPageEnd (dbb, bdb, transId, indexKey, recordNumber);
+		splitBdb = page->splitIndexPageEnd (dbb, bdb,indexKey, recordNumber, transId);
 	else
-		splitBdb = page->splitIndexPageMiddle (dbb, bdb, &splitKey, transId);
+		splitBdb = page->splitIndexPageMiddle (dbb, bdb, indexKey, recordNumber, transId);
+		
 
 	IndexPage *splitPage = (IndexPage*) splitBdb->buffer;
 	IndexNode splitNode(splitPage);
@@ -523,8 +490,7 @@ bool IndexRootPage::splitIndexPage(Dbb *
 		if (leftPage->level == 0)
 			leftKey.appendRecordNumber(0);
 			
-		// Reinitialize the parent page
-		
+		// Reinitialize the parent page	
 		page->level = splitPage->level + 1;
 		page->version = splitPage->version;
 		page->nextPage = 0;
@@ -555,8 +521,7 @@ bool IndexRootPage::splitIndexPage(Dbb *
 		splitBdb->release(REL_HISTORY);
 		leftBdb->release(REL_HISTORY);
 		bdb->release(REL_HISTORY);
-		
-		return false;
+		return;
 		}
 
 	
@@ -571,40 +536,34 @@ bool IndexRootPage::splitIndexPage(Dbb *
 
 	// We need to insert the first key of the newly created parent page
 	// into the parent page.
-	for (;;)
-		{
-		Bdb *rootBdb = findRoot(dbb, indexId, 0, Exclusive, transId);
-		int rootPageNumber = rootBdb->pageNumber;
-		BDB_HISTORY(rootBdb);
-
-		// Find parent page
-		Bdb *parentBdb = 
-			IndexPage::findLevel(dbb, indexId, rootBdb, splitPageLevel + 1, &splitKey, splitRecordNumber);
-		BDB_HISTORY(parentBdb);
-		parentBdb->mark(transId);
-		IndexPage *parentPage = (IndexPage*) parentBdb->buffer;
-		AddNodeResult result = parentPage->addNode(dbb, &splitKey, splitPageNumber);
-
-		if (result == NodeAdded || result == Duplicate)
-			{
-			// Node added to parent page
-			// Log parent page.
-			if (result == NodeAdded)
-				{
-				IndexPage::logIndexPage(parentBdb,transId, indexKey->index);
-				}
-
-			parentBdb->release(REL_HISTORY);
-			return false;
-			}
 
-		// Parent page needs to be split.Recurse
-		ASSERT(result == SplitMiddle || result == SplitEnd || result == NextPage);
+	Bdb *rootBdb = findRoot(dbb, indexId, 0, Exclusive, transId);
+	int rootPageNumber = rootBdb->pageNumber;
+	BDB_HISTORY(rootBdb);
+
+	Bdb *parentBdb = 
+		IndexPage::findLevel(dbb, indexId, rootBdb, splitPageLevel + 1, &splitKey, splitRecordNumber);
+	BDB_HISTORY(parentBdb);
+	parentBdb->mark(transId);
+	IndexPage *parentPage = (IndexPage*) parentBdb->buffer;
+	AddNodeResult result = parentPage->addNode(dbb, &splitKey, splitPageNumber);
+
+	if (result == NodeAdded || result == Duplicate)
+		{
+		// Node added to parent page
+		// Log parent page.
+		if (result == NodeAdded)
+			IndexPage::logIndexPage(parentBdb,transId, indexKey->index);
 
-		if (splitIndexPage (dbb, indexId, parentBdb, transId, result, &splitKey,
-				splitPageNumber, (parentBdb->pageNumber == rootPageNumber)))
-			return true;
+		parentBdb->release(REL_HISTORY);
+		return;
 		}
+
+	// Parent page needs to be split.Recurse
+	ASSERT(result == SplitMiddle || result == SplitEnd || result == NextPage);
+
+	splitIndexPage (dbb, indexId, parentBdb, transId, result, &splitKey,
+			splitPageNumber, (parentBdb->pageNumber == rootPageNumber));
 }
 
 bool IndexRootPage::deleteIndexEntry(Dbb * dbb, int32 indexId, IndexKey *key, int32 recordNumber, TransId transId)

=== modified file 'storage/falcon/IndexRootPage.h'
--- a/storage/falcon/IndexRootPage.h	2009-03-02 18:36:32 +0000
+++ b/storage/falcon/IndexRootPage.h	2009-03-07 09:45:59 +0000
@@ -43,7 +43,7 @@ public:
 	static void		debugBucket (Dbb *dbb, int indexId, int recordNumber, TransId transactionId);
 	static void		deleteIndex (Dbb *dbb, int32 indexId, TransId transId);
 	static bool		deleteIndexEntry (Dbb *dbb, int32 indexId, IndexKey *key, int32 recordNumber, TransId transId);
-	static bool		splitIndexPage (Dbb *dbb, int32 indexId, Bdb *bdb, TransId transId,
+	static void		splitIndexPage (Dbb *dbb, int32 indexId, Bdb *bdb, TransId transId,
 									AddNodeResult addResult, IndexKey *indexKey, int recordNumber, bool isRootPage);
 	static void		scanIndex (Dbb *dbb, int32 indexId, int32 rootPage, IndexKey *low, IndexKey *high, int searchFlags, TransId transId, Bitmap *bitmap);
 	static void		positionIndex(Dbb* dbb, int indexId, int32 rootPage, WalkIndex* walkIndex);


Attachment: [text/bzr-bundle] bzr/vvaintroub@mysql.com-20090414172452-32nabzwe1tsogn8h.bundle
Thread
bzr commit into mysql-6.0-falcon-team branch (vvaintroub:3122)Bug#37056Vladislav Vaintroub14 Apr