List:Commits« Previous MessageNext Message »
From:vvaintroub Date:March 17 2008 6:34pm
Subject:bk commit into 6.0 tree (vvaintroub:1.2595) WL#3763
View as plain text  
Below is the list of changes that have just been committed into a local
6.0 repository of vvaintroub.  When vvaintroub does a push these changes
will be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2008-03-17 18:34:36+01:00, vvaintroub@wva. +6 -0
  WL#3763- Falcon supernodes
  Core supernodes are ready(update of supernode array, search, logging)

  storage/falcon/IndexPage.cpp@stripped, 2008-03-17 18:34:32+01:00, vvaintroub@wva. +457 -47
    WL#3763- Falcon supernodes
    Core supernodes are ready(update of supernode array,search, logging)

  storage/falcon/IndexPage.h@stripped, 2008-03-17 18:34:32+01:00, vvaintroub@wva. +10 -1
    WL#3763- Falcon supernodes
    Core supernodes are ready(update of supernode array,search, logging)

  storage/falcon/IndexRootPage.cpp@stripped, 2008-03-17 18:34:33+01:00, vvaintroub@wva. +21 -62
    Supernodes. 
    
    -redoIndexPage adopted to cope with new serial log records 
    (containing supernodes) and older ones (without them)
     
    Index page update code in indexMerge, that was identical to the 
    addNode has been moved to IndexPage into an extra function, 
    that is used from both merge and add.  

  storage/falcon/IndexRootPage.h@stripped, 2008-03-17 18:34:33+01:00, vvaintroub@wva. +1 -1
    redoIndexPage needs additional parameter haveSuperNodes, 
    because since now (srlVersion >=14) supernodes are also logged

  storage/falcon/SRLIndexPage.cpp@stripped, 2008-03-17 18:34:34+01:00, vvaintroub@wva. +5 -1
    Supernodes logging

  storage/falcon/SRLVersion.h@stripped, 2008-03-17 18:34:34+01:00, vvaintroub@wva. +2 -1
    Increase srlCurrentVersion because supernode array logging.

diff -Nrup a/storage/falcon/IndexPage.cpp b/storage/falcon/IndexPage.cpp
--- a/storage/falcon/IndexPage.cpp	2008-03-11 16:16:31 +01:00
+++ b/storage/falcon/IndexPage.cpp	2008-03-17 18:34:32 +01:00
@@ -47,18 +47,9 @@ static const char THIS_FILE[]=__FILE__;
 
 AddNodeResult IndexPage::addNode(Dbb *dbb, IndexKey *indexKey, int32 recordNumber)
 {
-	if (dbb->debug & (DEBUG_KEYS | DEBUG_NODES_ADDED | DEBUG_PAGE_LEVEL))
-		{
-		Log::debug("\n***** addNode(%d) - ", recordNumber);
-		Btn::printKey ("New key", indexKey, 0, false);
-		}
-		
-	if (dbb->debug & (DEBUG_PAGES | DEBUG_NODES_ADDED))
-		printPage (this, 0, false);
 
-	// Find the insertion point.  In the process, compute the prior key
 
-	//validate(NULL);
+	// Find the insertion point.  In the process, compute the prior key
 	UCHAR *key = indexKey->key;
 	IndexKey priorKey;
 
@@ -73,47 +64,70 @@ AddNodeResult IndexPage::addNode(Dbb *db
 	// We got the prior key value.  Get key value for insertion point
 
 	IndexKey nextKey (priorKey);
+	return addNode(dbb, indexKey, recordNumber, &node, &priorKey, &nextKey);
+}
+
+// Add node (with already known insertion point)
+AddNodeResult IndexPage::addNode (Dbb *dbb, IndexKey *indexKey, int32 recordNumber, IndexNode *node, IndexKey *priorKey, IndexKey *nextKey)
+{
+
+	UCHAR *key = indexKey->key;
+
+	if (dbb->debug & (DEBUG_KEYS | DEBUG_NODES_ADDED | DEBUG_PAGE_LEVEL))
+		{
+		Log::debug("\n***** addNode(%d) - ", recordNumber);
+		Btn::printKey ("New key", indexKey, 0, false);
+		}
+		
+	if (dbb->debug & (DEBUG_PAGES | DEBUG_NODES_ADDED))
+		printPage (this, 0, false);
+	//validate(NULL);
 
 	// Compute delta size of new node
+	int offset1 = computePrefix (priorKey, indexKey);
+
+	// Check whether inserted node shall be supernode. If yes, does not compress the prefix.
+	bool addSuper = checkAddSuperNode(dbb->pageSize, node, indexKey, recordNumber, offset1);
+	if (addSuper)
+		offset1 = 0;
 
-	int offset1 = computePrefix (&priorKey, indexKey);
 	int length1 = indexKey->keyLength - offset1;
 	int delta = IndexNode::nodeLength(offset1, length1, recordNumber);
 	int32 nextNumber;
 	int offset2;
 	
-	if ((UCHAR*) node.node == (UCHAR*) this + length)
+	if ((UCHAR*) node->node == (UCHAR*) this + length)
 		{
 		offset2 = -1;
 		nextNumber = 0;
 		}
 	else
 		{
-		node.expandKey (&nextKey);
-		offset2 = computePrefix(indexKey, &nextKey);
-		int deltaOffset = offset2 - node.offset;
+		node->expandKey (nextKey);
+		offset2 = computePrefix(indexKey, nextKey);
+		int deltaOffset = offset2 - node->offset;
 		
-		if (node.length >= 128 && (node.length - deltaOffset) < 128)
+		if (node->length >= 128 && (node->length - deltaOffset) < 128)
 			--delta;
 
-		if (node.offset < 128 && (node.offset + deltaOffset) >= 128)
+		if (node->offset < 128 && (node->offset + deltaOffset) >= 128)
 			++delta;
 			
 		delta -= deltaOffset;
-		nextNumber = node.getNumber();
+		nextNumber = node->getNumber();
 		}
 
 	// If new node doesn't fit on page, split the page now
 
 	if (length + delta > dbb->pageSize)
 		{
-		if ((char*) node.nextNode < (char*) this + length)
+		if ((char*) node->nextNode < (char*) this + length)
 			return SplitMiddle;
 
-		if (node.getNumber() == END_LEVEL)
+		if (node->getNumber() == END_LEVEL)
 			return SplitEnd;
 
-		if (offset2 == -1 || nextKey.compare(indexKey) >= 0)
+		if (offset2 == -1 || nextKey->compare(indexKey) >= 0)
 			return NextPage;
 
 		return SplitEnd;
@@ -124,34 +138,45 @@ AddNodeResult IndexPage::addNode(Dbb *db
 			index, recordNumber, (CHAR*) node - (CHAR*) page);
 	***/
 
+	// Check whether there was a supernode at the insertion point
+	// If yes, delete it fow now. Supernode will be reinstalled at another position later,
+	// unless it gets prefix-compressed.
+	bool wasSuper = deleteSupernode(node->node);
+
 	// Slide tail of bucket forward to make room
 
 	if (offset2 >= 0)
 		{
-		UCHAR *tail = (UCHAR*) node.nextNode;
+		UCHAR *tail = (UCHAR*) node->nextNode;
 		int tailLength = (int) ((UCHAR*) this + length - tail);
 		ASSERT (tailLength >= 0);
 		
 		if (tailLength > 0)
-			memmove (tail + delta, tail, tailLength);
+			moveMemory(tail + delta, tail);
 		}
 
 	// Insert new node
 
 	IndexNode newNode;
-	newNode.insert (node.node, offset1, length1, key, recordNumber);
+	newNode.insert (node->node, offset1, length1, key, recordNumber);
 
 	// If necessary, rebuild next node
+	if(wasSuper && (offset2 == 0))
+		addSupernode(newNode.nextNode);
 
 	if (offset2 >= 0)
-		newNode.insert (newNode.nextNode, offset2, nextKey.keyLength - offset2, nextKey.key, nextNumber);
+		newNode.insert (newNode.nextNode, offset2, nextKey->keyLength - offset2, nextKey->key, nextNumber);
+
+	if(addSuper)
+		addSupernode(node->node);
+
+
 
 	length += delta;
 	//validate(NULL);
 
 	if (dbb->debug & (DEBUG_PAGES | DEBUG_NODES_ADDED))
 		printPage (this, 0, false);
-
 	return NodeAdded;
 }
 
@@ -168,6 +193,8 @@ Btn* IndexPage::findNodeInLeaf(IndexKey 
 	UCHAR *key = indexKey->key;
 	UCHAR *keyEnd = key + indexKey->keyLength;
 	Btn *bucketEnd = getEnd();
+	Btn *super;
+
 
 	if (indexKey->keyLength == 0)
 		{
@@ -177,7 +204,9 @@ Btn* IndexPage::findNodeInLeaf(IndexKey 
 		return nodes;
 		}
 
-	IndexNode node (this);
+	super = findSupernode(level, indexKey, 0, 0);
+
+	IndexNode node(super);
 
 	for (;; node.getNext(bucketEnd))
 		{
@@ -240,6 +269,12 @@ Btn* IndexPage::findNodeInLeaf(IndexKey 
 	if (foundKey)
 		foundKey->keyLength = priorLength;
 
+	if (foundKey && foundKey->keyLength==0 && node.node == super && node.node != nodes)
+		{
+		/* hit on left supernode, find prior key*/
+		findPriorNodeForSupernode(node.node,foundKey);
+		}
+
 	return node.node;
 }
 
@@ -306,11 +341,13 @@ Bdb* IndexPage::splitIndexPageMiddle(Dbb
 		node = prevNode;
 
 	int tailLength = (int) (((UCHAR*) this + length) - (UCHAR*) node.nextNode);
-
+	
+	int delta = (int)(node.nextNode - nodes);
 	if (tailLength < 0)
 		{
 		node.parseNode(nodes);
 		tailLength = (int) (((UCHAR*) this + length) - (UCHAR*) node.nextNode);
+		delta = (int) (node.nextNode - nodes);
 		}
 
 	// If we didn't find a break, use the last one.  This may be the first node
@@ -328,6 +365,7 @@ Bdb* IndexPage::splitIndexPageMiddle(Dbb
 	int32 recordNumber = node.getNumber();
 	IndexNode newNode;
 	newNode.insert(split->nodes, 0, kLength, key, recordNumber);
+	int newNodeSize = IndexNode::nodeLength(0, kLength, recordNumber);
 
 	memcpy(newNode.nextNode, node.nextNode, tailLength);
 
@@ -344,7 +382,27 @@ Bdb* IndexPage::splitIndexPageMiddle(Dbb
 
 	if (level == 0)
 		splitKey->appendRecordNumber(recordNumber);
-		
+	
+
+
+	// Split supernodes
+	memset(split->superNodes,0, sizeof(split->superNodes));
+	int i = 0;
+	for (i = 0; nodes + superNodes[i] < node.node; i++) ;
+
+	int j = 0;
+	for (; i < SUPERNODES && superNodes[i]; i++)
+	{
+		short newVal = superNodes[i] - delta + newNodeSize;
+		ASSERT(newVal >=0);
+		if(newVal == 0)
+			continue;
+		split->superNodes[j++] = newVal;
+		superNodes[i] = 0;
+	}
+	//validate(NULL);
+	//split->validate(NULL);
+
 	if (dbb->debug & (DEBUG_PAGES | DEBUG_SPLIT_PAGE))
 		{
 		printPage (this, 0, false);
@@ -356,6 +414,7 @@ Bdb* IndexPage::splitIndexPageMiddle(Dbb
 	return splitBdb;
 }
 
+
 Btn* IndexPage::findNodeInBranch(IndexKey *searchKey, int32 searchRecordNumber)
 {
 	ASSERT(level > 0);
@@ -366,11 +425,15 @@ Btn* IndexPage::findNodeInBranch(IndexKe
 		keyEnd -= searchKey->getAppendedRecordNumberLength();
 
 	Btn *bucketEnd = (Btn*) ((UCHAR*) this + length);
-	Btn *prior = nodes;
+
 	UCHAR *key = searchKey->key;
 
-	for (IndexNode node (this); node.node < bucketEnd; prior = node.node, node.getNext(bucketEnd))
+	Btn *super = findSupernode(level, searchKey, searchRecordNumber, 0);
+	Btn *prior = super;
+
+	for (IndexNode node(super) ; node.node < bucketEnd ; prior = node.node, node.getNext(bucketEnd))
 		{
+
 		if (node.offset < matchedOffset)
 			return prior;
 
@@ -418,7 +481,36 @@ void IndexPage::validate(void *before)
 {
 	Btn *bucketEnd = getEnd();
 	UCHAR key [MAX_PHYSICAL_KEY_LENGTH];
-	int keyLength = 0;
+	int priorKeyLength = 0;
+	int keyLength;
+	int recordNumber;
+	int priorRecordNumber=0;
+	int superIdx = 0;
+	int supernodeCount = 0;
+
+
+
+	for (int i=0; i< SUPERNODES; i++)
+	{
+		if(superNodes[i] ==0)
+			{
+			for(;i<SUPERNODES;i++)
+				if(superNodes[i] != 0)
+					FATAL("Index validate: unexpected non-zero supernode index");
+			break;
+			}
+		if (superNodes[i] < 0)
+			FATAL ("Index validate: negative supernode");
+
+		if (i> 0 && superNodes[i-1] >= superNodes[i])
+			FATAL ("Index validate: entries in supernode array not increasing");
+
+		if (nodes + superNodes[i] > bucketEnd)
+			FATAL ("Index validate: superNodes behind the end of page");
+
+		supernodeCount++;
+	}
+
 
 	for (IndexNode node (this);; node.getNext(bucketEnd))
 		{
@@ -435,19 +527,65 @@ void IndexPage::validate(void *before)
 				
 			break;
 			}
-			
-		int l = MIN(keyLength, node.keyLength());
-		int recordNumber = node.getNumber();
-		
-		if (recordNumber == END_BUCKET)
+
+		bool isSuper = false;
+		if(superIdx < SUPERNODES && superNodes[superIdx])
+		{
+			int delta = superNodes[superIdx] - (short) (node.node - nodes);
+			if(delta == 0)
+			{
+				superIdx++;
+				isSuper=true;
+				if(node.offset != 0)
+					FATAL ("Non-zero offset in supernode");
+			}
+			else if( delta < 0)
+				FATAL ("Index validate: next supernode offset < current offset");
+		}
+
+		keyLength = node.keyLength();
+		if(level > 0)
+			keyLength -= node.getAppendedRecordNumberLength();
+
+		if(isSuper && keyLength==0)
+			FATAL("Supernode with 0 key length");
+
+		int l = MIN(priorKeyLength, keyLength);
+		if (level == 0)
+			recordNumber = node.getNumber();
+		else
+			recordNumber = node.getAppendedRecordNumber();
+
+		if (node.number == END_BUCKET)
+			{
 			ASSERT(nextPage != 0);
+			break;
+			}
 
+		if (node.number == END_LEVEL)
+			break;
+
+		if  (node.node != nodes && l>0 && node.length > 0)
+			{
+			if(node.offset == 0 && node.key[0] == key[0] && !isSuper)
+				FATAL("node not correctly prefix-compressed");
+			}
+
+
+		if(node.length == 0 && recordNumber < priorRecordNumber)
+			FATAL ("Index validate:  record numbers out of order");
+
+
+		bool equalKeys = (priorKeyLength == keyLength);
 		for (UCHAR *p = node.key, *q = key + node.offset, *end = key + l; q < end; p++, q++)
 			{
 			if (*p > *q)		// current > previous, good
+				{
+				equalKeys = false;
 				break;
+				}
 			else if (*p == *q)	// current == previous, good so far
-				continue;     
+				continue;
 			else if (*p < *q)	// current < previous, bad
 				{
 				if (before)
@@ -457,9 +595,15 @@ void IndexPage::validate(void *before)
 				FATAL("Mal-formed index page");
 				}
 			}
-
-		keyLength = node.expandKey(key);
+			if( isSuper && equalKeys)
+				FATAL("supernode key equal to previous node");
+			node.expandKey(key);
+			priorKeyLength = keyLength;
+			priorRecordNumber = recordNumber;
 		}
+		
+	if (superIdx != supernodeCount)
+		FATAL("lost supernodes");
 }
 
 void IndexPage::printPage(Bdb * bdb, bool inversion)
@@ -579,6 +723,8 @@ int IndexPage::deleteNode(Dbb * dbb, Ind
 			{
 			IndexNode next (node.nextNode);
 			
+			deleteSupernode(node.node);
+
 			if (next.node >= bucketEnd)
 				length = (int) ((char*) node.node - (char*) this);
 			else
@@ -592,9 +738,20 @@ int IndexPage::deleteNode(Dbb * dbb, Ind
 				Btn *tail = next.nextNode;
 				int tailLength = (int) ((char*) bucketEnd - (char*) tail);
 
+
+
 				// Compute new prefix length; rebuild node
 
 				int prefix = computePrefix(&priorKey, &nextKey);
+				
+				// Check whether next node is a supernode.
+				// If yes, delete it for now. Supernode will be reinstalled later,
+				// if it does not get prefix-compressed.
+
+				Btn *nextSuper = 0;
+				if(deleteSupernode(node.nextNode) && (prefix == 0) && node.node != nodes)
+					nextSuper = node.node;
+
 				int32 num = next.getNumber();
 				node.insert(node.node, prefix, nextKey.keyLength - prefix, nextKey.key, num);
 
@@ -603,7 +760,10 @@ int IndexPage::deleteNode(Dbb * dbb, Ind
 				Btn *newTail = node.nextNode;
 				
 				if (tailLength > 0)
-					memmove(newTail, tail, tailLength);
+					moveMemory(newTail, tail);
+
+				if(nextSuper)
+					addSupernode(nextSuper);
 
 				length = (int) ((char*) newTail + tailLength - (char*) this);
 				//validate(NULL);
@@ -703,7 +863,7 @@ void IndexPage::printPage(IndexPage *pag
 				page->nextPage,
 				page->parentPage);
 
-	int length = (UCHAR*) node.node - (UCHAR*) page;
+	int length = (int)((UCHAR*) node.node - (UCHAR*) page);
 
 	if (length != page->length)
 		Log::debug ("**** bad bucket length -- should be %d ***\n", length);
@@ -972,8 +1132,11 @@ Bdb* IndexPage::splitIndexPageEnd(Dbb *d
 	int nodeOverhead = 3 + (offset >= 128) + (insertKey->keyLength - offset >= 128);
 	char *nextNextNode = (char*) node.nextNode - node.length + insertKey->keyLength - offset + nodeOverhead;
 
+
+	Btn *splitPos;
 	if (nextNextNode >= (char*) this + dbb->pageSize)
 		{
+		splitPos = priorNode;
 		node.parseNode(priorNode);
 		int number = node.getNumber();
 		//node.setNumber(END_BUCKET);
@@ -983,13 +1146,21 @@ Bdb* IndexPage::splitIndexPageEnd(Dbb *d
 		}
 	else
 		{
+		splitPos = node.node;
 		int offset = computePrefix(&tempKey, insertKey);
+		deleteSupernode(node.node);
 		node.insert(node.node, offset, insertKey->keyLength - offset, insertKey->key, END_BUCKET);
 		length = (int) ((char*)(node.nextNode) - (char*) this);
 		}
 
+	for(int i=0; i< SUPERNODES;i++)
+		if(nodes + superNodes[i] >= splitPos)
+			superNodes[i]=0;
+
+	//validate(NULL);
 	split->appendNode(insertKey, recordNumber, dbb->pageSize);
 	split->appendNode(&rolloverKey, rolloverNumber, dbb->pageSize);
+	//split->validate(NULL);
 	//printf ("splitIndexPage: level %d, %d -> %d\n", level, bdb->pageNumber, nextPage);
 	//validate( splitBdb->buffer);
 
@@ -1004,6 +1175,7 @@ Bdb* IndexPage::splitIndexPageEnd(Dbb *d
 	return splitBdb;
 }
 
+
 Btn* IndexPage::appendNode(IndexKey *newKey, int32 recordNumber, int pageSize)
 {
 	IndexKey key(newKey->index);
@@ -1085,21 +1257,37 @@ void IndexPage::logIndexPage(Bdb *bdb, T
 	
 	dbb->serialLog->logControl->indexPage.append(dbb, transId, INDEX_VERSION_1, bdb->pageNumber, indexPage->level, 
 												 indexPage->parentPage, indexPage->priorPage, indexPage->nextPage,  
-												 indexPage->length - OFFSET (IndexPage*, nodes), 
-												 (const UCHAR*) indexPage->nodes);
+												 indexPage->length - OFFSET (IndexPage*, superNodes), 
+												 (const UCHAR*) indexPage->superNodes);
 }
-
-Btn* IndexPage::findInsertionPoint(int level, IndexKey* indexKey, int32 recordNumber, IndexKey* expandedKey, Btn* nodes, Btn* bucketEnd)
+ 
+Btn* IndexPage::findInsertionPoint(int level, IndexKey* indexKey, int32 recordNumber, IndexKey* expandedKey, Btn* from, Btn* bucketEnd)
 {
 	UCHAR *key = indexKey->key;
 	const UCHAR *keyEnd = key + indexKey->keyLength;
-	IndexNode node(nodes);
 	uint matchedOffset = 0;
 	uint priorLength = 0;
 
 	if (level && indexKey->recordNumber)
 		keyEnd -= indexKey->getAppendedRecordNumberLength();
 
+
+	Btn *super = findSupernode(level, indexKey,(level > 0)?indexKey->recordNumber:recordNumber ,from);
+	IndexNode node;
+	if(super > from)
+		node.parseNode(super);
+	else
+		node.parseNode(from);
+	
+
+	if(node.node != nodes)
+	{
+		node.expandKey (expandedKey);
+		priorLength = expandedKey->keyLength;
+		if (level)
+			expandedKey->recordNumber = node.getAppendedRecordNumber();
+	}
+
 	if (node.offset)
 		{
 		while ((matchedOffset < node.offset) && 
@@ -1190,6 +1378,12 @@ Btn* IndexPage::findInsertionPoint(int l
 
 	expandedKey->keyLength = priorLength;
 
+	if (expandedKey && expandedKey->keyLength==0 && node.node == super && node.node != nodes)
+		{
+		/* hit on start supernode, find prior key */
+		findPriorNodeForSupernode(node.node,expandedKey);
+		}
+
 	return node.node;
 }
 
@@ -1197,3 +1391,219 @@ Btn* IndexPage::getEnd(void)
 {
 	return (Btn*) ((UCHAR*) this + length);
 }
+
+/* During node insertion, check whether supernode should be added at insertion point */
+bool IndexPage::checkAddSuperNode(int pageSize, IndexNode* node, IndexKey *indexKey, int recordNumber, int offset)
+{
+	Btn *insertionPoint = node->node;
+
+	if (insertionPoint == nodes) // no supernode at the start of the page
+		return false;
+
+	if (offset >= pageSize/SUPERNODES/2) //compression is too good, no super
+		return false;
+
+	if (superNodes[SUPERNODES-1]) // supernode array is full
+		return false;
+
+
+	int keyLen = indexKey->keyLength;
+
+	if(level)
+		keyLen -= indexKey->getAppendedRecordNumberLength();
+
+	if (keyLen == offset) // no supernode on duplicate entry (yet)
+		return false;
+
+	int offsetToEntry = (int)(insertionPoint - nodes);
+
+
+	int i;
+	// Find superNodes corresponding to insertion point
+	for(i = 0; i< SUPERNODES;i++)
+		{
+		if (!superNodes[i])
+			break;
+		if (nodes + superNodes[i] >= insertionPoint)
+			break;
+		}
+
+	// Avoid 2 superNodes adjacent to each other
+	if (nodes + superNodes[i] == insertionPoint)
+		return false;
+
+	int nodeLen =
+		IndexNode::nodeLength(offset, indexKey->keyLength - offset, recordNumber);
+	
+	/* Ideal distance between supernodes , assume page will get 90% full*/
+	int idealDistance = pageSize/(SUPERNODES+1)*9/10;
+
+	
+	int distLeft, distRight;
+
+	distLeft = offsetToEntry;
+
+	if (i > 0)
+		distLeft -= superNodes[i-1];
+
+	// last node, make supernode, if distance from prior >= idealDistance
+	if(superNodes[i] == 0)
+		{
+		if (distLeft >= idealDistance)
+			return true;
+		return false;
+		}
+
+	// between two supernodes, make new one , if distance 
+	// from both neighbours is at least idealSize*0.5
+
+	distRight = 
+		superNodes[i] + nodeLen - offsetToEntry; // from right super
+	ASSERT(distRight >=0 );
+
+	// Distance between existing supernodes
+	int d= (i > 0)? superNodes[i] - superNodes[i-1] + nodeLen :superNodes[0] + nodeLen;
+
+	if ( d >= idealDistance*3/2 && distLeft >= idealDistance/2 && distRight >= idealDistance/2 )
+		return true;
+
+	return false;
+}
+
+// Helper routine, used with findNodeInLeaf/Branch and findInsertionPoint
+Btn* 	IndexPage::findPriorNodeForSupernode(Btn *where,IndexKey *priorKey)
+{
+	Btn *prior = nodes;
+	bool found = false;
+
+
+	// Scan supernode array for prior super
+	for(int i=0; i < SUPERNODES && superNodes[i]; i++)
+		{
+		Btn* current = nodes + superNodes[i];
+		if(current == where)
+			{
+			found = true;
+			break;
+			}
+		prior = current;
+		}
+
+	ASSERT(found);
+
+	IndexNode priorNode(prior);
+	for (;; priorNode.getNext(where))
+		{
+		ASSERT(priorNode.nextNode <= where);
+		priorNode.expandKey(priorKey);
+		if (priorNode.nextNode == where)
+			return priorNode.node;
+		}
+
+	ASSERT(false);
+}
+
+
+// Move nodes, adjust supernode array
+void IndexPage::moveMemory(void *dst,void *src)
+{
+	UCHAR* end = (UCHAR *)getEnd();
+	size_t size = (dst > src)?(end - (UCHAR *)src) : (end-(UCHAR *)dst);
+
+	memmove(dst, src, size);
+
+	short delta = (short)((UCHAR *)dst - (UCHAR *)src);
+
+	for(int i=0; i< SUPERNODES && superNodes[i]; i++)
+		if(nodes + superNodes[i] >= src)
+			superNodes[i] += delta;
+}
+
+// Add entry to supernode array
+void IndexPage::addSupernode(Btn *where)
+{
+	int i;
+	short offset = (short)((UCHAR *)where - (UCHAR *)nodes);
+	for (i=0; i< SUPERNODES && superNodes[i] ;i++)
+		{
+		ASSERT(superNodes[i] != offset);
+		if(superNodes[i] > offset)
+			{
+			if(i != SUPERNODES -1)
+				memmove(&superNodes[i+1],&superNodes[i], (SUPERNODES-i-1)*sizeof(superNodes[0]));
+			break;
+			}
+		}
+	ASSERT(i < SUPERNODES);
+	superNodes[i] = (short)((UCHAR *)where - (UCHAR*)nodes);
+}
+
+
+// Delete entry from supernode array.
+// Returns true if supernode was present at given position and false otherwise
+bool IndexPage::deleteSupernode(Btn *where)
+{
+	short offset = (short)((UCHAR *)where - (UCHAR *)nodes);
+	ASSERT(where > 0);
+	for (int i=0; i< SUPERNODES && superNodes[i];i++)
+		{
+		if(superNodes[i] == offset)
+			{
+			if (i !=  SUPERNODES -1)
+				memmove(&superNodes[i],&superNodes[i+1], (SUPERNODES-i-1)*sizeof(superNodes[0]));
+			superNodes[SUPERNODES-1] = 0;
+			return true;
+			}
+		}
+	return false;
+}
+
+
+// Given a key, find the maximum supernode smaller or equal to the search key
+// Optional parameter "after" tells to skip supernodes smaller than its value
+
+Btn * IndexPage::findSupernode(int level, IndexKey *searchKey, int32 recordNumber, Btn *after)
+{
+
+	int	keylen = searchKey->keyLength;
+	UCHAR*	key = searchKey->key;
+
+	Btn *retval = nodes;
+
+	for(int i=0; i< SUPERNODES && superNodes[i];i++)
+		{
+		Btn *pos = nodes +superNodes[i];
+
+		if (after && (pos < after))
+			continue;
+
+		IndexNode node(pos);
+		int len = node.keyLength();
+
+		if (level)
+			len =- node.getAppendedRecordNumberLength();
+
+		int result = memcmp(key, node.key, MIN(keylen,len));
+
+		if (result == 0)
+			result = keylen - len;
+
+
+		if (result == 0)
+			{
+			int number= (level == 0)? node.number : node.getAppendedRecordNumber();
+			if ( (level > 0 && recordNumber == 0) || number == END_BUCKET)
+				result = -1;
+			else
+				result = recordNumber - number;
+			}
+
+		if (result < 0)
+			break;
+
+		retval = node.node;
+		}
+	return retval;
+}
+
+
diff -Nrup a/storage/falcon/IndexPage.h b/storage/falcon/IndexPage.h
--- a/storage/falcon/IndexPage.h	2008-03-12 13:15:13 +01:00
+++ b/storage/falcon/IndexPage.h	2008-03-17 18:34:32 +01:00
@@ -40,11 +40,13 @@ class IndexPage : public Page
 {
 public:
 	AddNodeResult addNode (Dbb *dbb, IndexKey *key, int32 recordNumber);
+	AddNodeResult addNode (Dbb *dbb, IndexKey *key, int32 recordNumber,IndexNode *node, IndexKey *priorKey, IndexKey *nextKey);
 	Btn*		appendNode (IndexKey *indexKey, int32 recordNumber, int pageSize);
 	int			deleteNode (Dbb *dbb, IndexKey *key, int32 recordNumber);
 	Btn*		findNodeInBranch (IndexKey *indexKey, int32 recordNumber);
 	Btn*		findNodeInLeaf (IndexKey *key, IndexKey *foundKey);
 	Btn*		findInsertionPoint (IndexKey *indexKey, int32 recordNumber, IndexKey *expandedKey);
+	Btn*		findInsertionPoint(int level, IndexKey* indexKey, int32 recordNumber, IndexKey* expandedKey, Btn* from, Btn* bucketEnd);
 	Bdb*		splitPage (Dbb *dbb, Bdb *bdb, TransId transId);
 	Bdb*		splitIndexPageEnd(Dbb *dbb, Bdb *bdb, TransId transId, IndexKey *insertKey, int recordNumber);
 	Bdb*		splitIndexPageMiddle(Dbb *dbb, Bdb *bdb, IndexKey *splitKey, TransId transId);
@@ -55,6 +57,13 @@ public:
 	void		validateNodes (Dbb *dbb, Validation *validation, Bitmap *children, int32 parentPageNumber);
 	void		validate (Dbb *dbb, Validation *validation, Bitmap *pages, int32 parentPageNumber);
 	void		validate(void *before);
+	bool		checkAddSuperNode(int pageSize, IndexNode* node, IndexKey *indexKey, int recordNumber, int offset);
+	void		moveMemory(void *dst,void *src);
+	void		addSupernode(Btn *where);
+	bool		deleteSupernode(Btn *where);
+	Btn*		findSupernode(int level, IndexKey *searchKey, int32 recordNumber, Btn *after);
+	Btn*		findPriorNodeForSupernode(Btn *where,IndexKey *priorKey);
+
 
 	static int		computePrefix (IndexKey *key1, IndexKey *key2);
 	static Bdb*		findLevel (Dbb *dbb, int32 indexId, Bdb *bdb, int level, IndexKey *indexKey, int32 recordNumber);
@@ -67,7 +76,7 @@ public:
 	static void		printNode(IndexPage *page, int32 pageNumber, Btn *node, bool inversion = false);
 	static int32	getRecordNumber(const UCHAR *ptr);
 	static void		logIndexPage (Bdb *bdb, TransId transId);
-	static Btn*		findInsertionPoint(int level, IndexKey* indexKey, int32 recordNumber, IndexKey* expandedKey, Btn* nodes, Btn* bucketEnd);
+
 
 	int32	parentPage;
 	int32	priorPage;
diff -Nrup a/storage/falcon/IndexRootPage.cpp b/storage/falcon/IndexRootPage.cpp
--- a/storage/falcon/IndexRootPage.cpp	2008-03-12 13:15:13 +01:00
+++ b/storage/falcon/IndexRootPage.cpp	2008-03-17 18:34:33 +01:00
@@ -545,6 +545,7 @@ bool IndexRootPage::splitIndexPage(Dbb *
 		page->level = splitPage->level + 1;
 		page->version = splitPage->version;
 		page->nextPage = 0;
+		memset(page->superNodes, 0, sizeof(page->superNodes));
 		IndexKey dummy(indexKey->index);
 		dummy.keyLength = 0;
 		page->length = OFFSET (IndexPage*, nodes);
@@ -782,7 +783,7 @@ void IndexRootPage::debugBucket(Dbb *dbb
 	bdb->release(REL_HISTORY);
 }
 
-void IndexRootPage::redoIndexPage(Dbb* dbb, int32 pageNumber, int32 parentPage, int level, int32 priorPage, int32 nextPage, int length, const UCHAR *data)
+void IndexRootPage::redoIndexPage(Dbb* dbb, int32 pageNumber, int32 parentPage, int level, int32 priorPage, int32 nextPage, int length, const UCHAR *data, bool haveSuperNodes)
 {
 	//Log::debug("redoIndexPage %d -> %d -> %d level %d, parent %d)\n", priorPage, pageNumber, nextPage, level, parentPage);
 	Bdb *bdb = dbb->fakePage(pageNumber, PAGE_any, 0);
@@ -805,8 +806,19 @@ void IndexRootPage::redoIndexPage(Dbb* d
 	indexPage->parentPage = parentPage;
 	indexPage->nextPage = nextPage;
 	indexPage->priorPage = priorPage;
-	indexPage->length = length + (int32) OFFSET (IndexPage*, nodes);
-	memcpy(indexPage->nodes, data, length);
+
+	if(haveSuperNodes)
+		{
+		indexPage->length = length + (int32) OFFSET (IndexPage*, superNodes);
+		memcpy(indexPage->superNodes, data, length);
+		}
+	else
+		{
+		indexPage->length = length + (int32) OFFSET (IndexPage*, nodes);
+		memcpy(indexPage->nodes, data, length);
+		memset(indexPage->superNodes, 0, sizeof(indexPage->superNodes));
+		}
+
 	
 	// If we have a parent page, propogate the first node upward
 
@@ -988,33 +1000,9 @@ void IndexRootPage::indexMerge(Dbb *dbb,
 				++duplicates;
 			else
 				{
-				int offset1 = IndexPage::computePrefix (&priorKey, &key);
-				int length1 = key.keyLength - offset1;
-				int delta = IndexNode::nodeLength(offset1, length1, recordNumber);
-				int32 nextNumber = 0;
-				int offset2;
-				
-				if ((UCHAR*) node.node == (UCHAR*) page + page->length)
-					offset2 = -1;
-				else
-					{
-					node.expandKey(&nextKey);
-					offset2 = IndexPage::computePrefix(&key, &nextKey);
-					int deltaOffset = offset2 - node.offset;
-					
-					if (node.length >= 128 && (node.length - deltaOffset) < 128)
-						--delta;
-
-					if (node.offset < 128 && (node.offset + deltaOffset) >= 128)
-						++delta;
-						
-					delta -= deltaOffset;
-					nextNumber = node.getNumber();
-					}
-
-				// If node doesn't fit, punt and let someone else do it
-				
-				if (page->length + delta > dbb->pageSize)
+				AddNodeResult result = page->addNode(dbb, &key, recordNumber, &node, &priorKey, &nextKey);
+				if (result != NodeAdded)
+					// If node doesn't fit, punt and let someone else do it
 					{
 					bdb->release(REL_HISTORY);
 					bdb = NULL;
@@ -1024,40 +1012,11 @@ void IndexRootPage::indexMerge(Dbb *dbb,
 					if ( (recordNumber = logRecord->nextKey(&key)) == -1)
 						//return;
 						goto exit;
-					
 					break;
 					}
-				
-				// Add insert node into page
-				
-				if (offset2 >= 0)
-					{
-					UCHAR *tail = (UCHAR*) node.nextNode;
-					int tailLength = (int) ((UCHAR*) page + page->length - tail);
-					ASSERT (tailLength >= 0);
-					
-					if (tailLength > 0)
-						memmove (tail + delta, tail, tailLength);
-					}
-
-				// Insert new node
-
-				++insertions;
-				IndexNode newNode;
-				newNode.insert(node.node, offset1, length1, key.key, recordNumber);
-
-				// If necessary, rebuild next node
-
-				if (offset2 >= 0)
-					newNode.insert(newNode.nextNode, offset2, nextKey.keyLength - offset2, nextKey.key, nextNumber);
-
-				page->length += delta;
-				//page->validate(NULL);
-
-				if (dbb->debug & (DEBUG_PAGES | DEBUG_INDEX_MERGE))
-					page->printPage(bdb, false);
+				else /* node inserted */
+					++insertions;
 				}
-				
 			priorKey.setKey(&key);
 			
 			// Get next key
@@ -1074,7 +1033,7 @@ void IndexRootPage::indexMerge(Dbb *dbb,
 			// Find the next insertion point, compute the next key, etc.
 			
 			bucketEnd = (Btn*) ((char*) page + page->length);
-			node.parseNode(IndexPage::findInsertionPoint(0, &key, recordNumber, &priorKey, node.node, bucketEnd));
+			node.parseNode(page->findInsertionPoint(0, &key, recordNumber, &priorKey, node.node, bucketEnd));
 			nextKey.setKey(0, node.offset, priorKey.key);
 			node.expandKey(&nextKey);
 			number = node.getNumber();
diff -Nrup a/storage/falcon/IndexRootPage.h b/storage/falcon/IndexRootPage.h
--- a/storage/falcon/IndexRootPage.h	2007-09-20 17:41:48 +02:00
+++ b/storage/falcon/IndexRootPage.h	2008-03-17 18:34:33 +01:00
@@ -56,7 +56,7 @@ public:
 	static void		analyzeIndex(Dbb* dbb, int indexId, IndexAnalysis *indexAnalysis);
 	static int32	getIndexRoot(Dbb* dbb, int indexId);
 
-	static void		redoIndexPage(Dbb* dbb, int32 pageNumber, int32 parentPageNumber, int level, int32 prior, int32 next, int length, const UCHAR *data);
+	static void		redoIndexPage(Dbb* dbb, int32 pageNumber, int32 parentPageNumber, int level, int32 prior, int32 next, int length, const UCHAR *data, bool haveSuperNodes);
 	static void		setIndexRoot(Dbb* dbb, int indexId, int32 pageNumber, TransId transId);
 	static void		redoIndexDelete(Dbb* dbb, int indexId);
 	static void		redoCreateIndex(Dbb* dbb, int indexId);
diff -Nrup a/storage/falcon/SRLIndexPage.cpp b/storage/falcon/SRLIndexPage.cpp
--- a/storage/falcon/SRLIndexPage.cpp	2008-02-13 20:40:21 +01:00
+++ b/storage/falcon/SRLIndexPage.cpp	2008-03-17 18:34:34 +01:00
@@ -110,7 +110,11 @@ void SRLIndexPage::pass2()
 					break;
 				
 				case INDEX_VERSION_1:
-					IndexRootPage::redoIndexPage(log->getDbb(tableSpaceId), pageNumber, parent, level, prior, next, length, data);
+					{
+					bool haveSuperNodes = (control->version >=srlVersion14);
+					IndexRootPage::redoIndexPage(log->getDbb(tableSpaceId), pageNumber, parent, level, prior, next, length, data,
+						haveSuperNodes);
+					}
 					break;
 				
 				default:
diff -Nrup a/storage/falcon/SRLVersion.h b/storage/falcon/SRLVersion.h
--- a/storage/falcon/SRLVersion.h	2008-02-14 22:05:58 +01:00
+++ b/storage/falcon/SRLVersion.h	2008-03-17 18:34:34 +01:00
@@ -40,7 +40,8 @@ static const int srlVersion10		= 10;	// 
 static const int srlVersion11		= 11;	// Added table space type (repository support)	December 4, 2007
 static const int srlVersion12		= 12;	// Added index version number to SRLIndexPage	February 13, 2008
 static const int srlVersion13		= 13;	// Added savepoint id to SRLUpdateRecords		February 14, 2008
-static const int srlCurrentVersion	= srlVersion13;
+static const int srlVersion14		= 14;	// Added supernodes logging	March 7, 2008
+static const int srlCurrentVersion	= srlVersion14;
 
 class SRLVersion : public SerialLogRecord  
 {
Thread
bk commit into 6.0 tree (vvaintroub:1.2595) WL#3763vvaintroub17 Mar