Below is the list of changes that have just been committed into a local
6.0 repository of vvaintroub. When vvaintroub does a push these changes
will be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2008-03-17 18:34:36+01:00, vvaintroub@wva. +6 -0
WL#3763- Falcon supernodes
Core supernodes are ready(update of supernode array, search, logging)
storage/falcon/IndexPage.cpp@stripped, 2008-03-17 18:34:32+01:00, vvaintroub@wva. +457 -47
WL#3763- Falcon supernodes
Core supernodes are ready(update of supernode array,search, logging)
storage/falcon/IndexPage.h@stripped, 2008-03-17 18:34:32+01:00, vvaintroub@wva. +10 -1
WL#3763- Falcon supernodes
Core supernodes are ready(update of supernode array,search, logging)
storage/falcon/IndexRootPage.cpp@stripped, 2008-03-17 18:34:33+01:00, vvaintroub@wva. +21 -62
Supernodes.
-redoIndexPage adopted to cope with new serial log records
(containing supernodes) and older ones (without them)
Index page update code in indexMerge, that was identical to the
addNode has been moved to IndexPage into an extra function,
that is used from both merge and add.
storage/falcon/IndexRootPage.h@stripped, 2008-03-17 18:34:33+01:00, vvaintroub@wva. +1 -1
redoIndexPage needs additional parameter haveSuperNodes,
because since now (srlVersion >=14) supernodes are also logged
storage/falcon/SRLIndexPage.cpp@stripped, 2008-03-17 18:34:34+01:00, vvaintroub@wva. +5 -1
Supernodes logging
storage/falcon/SRLVersion.h@stripped, 2008-03-17 18:34:34+01:00, vvaintroub@wva. +2 -1
Increase srlCurrentVersion because supernode array logging.
diff -Nrup a/storage/falcon/IndexPage.cpp b/storage/falcon/IndexPage.cpp
--- a/storage/falcon/IndexPage.cpp 2008-03-11 16:16:31 +01:00
+++ b/storage/falcon/IndexPage.cpp 2008-03-17 18:34:32 +01:00
@@ -47,18 +47,9 @@ static const char THIS_FILE[]=__FILE__;
AddNodeResult IndexPage::addNode(Dbb *dbb, IndexKey *indexKey, int32 recordNumber)
{
- if (dbb->debug & (DEBUG_KEYS | DEBUG_NODES_ADDED | DEBUG_PAGE_LEVEL))
- {
- Log::debug("\n***** addNode(%d) - ", recordNumber);
- Btn::printKey ("New key", indexKey, 0, false);
- }
-
- if (dbb->debug & (DEBUG_PAGES | DEBUG_NODES_ADDED))
- printPage (this, 0, false);
- // Find the insertion point. In the process, compute the prior key
- //validate(NULL);
+ // Find the insertion point. In the process, compute the prior key
UCHAR *key = indexKey->key;
IndexKey priorKey;
@@ -73,47 +64,70 @@ AddNodeResult IndexPage::addNode(Dbb *db
// We got the prior key value. Get key value for insertion point
IndexKey nextKey (priorKey);
+ return addNode(dbb, indexKey, recordNumber, &node, &priorKey, &nextKey);
+}
+
+// Add node (with already known insertion point)
+AddNodeResult IndexPage::addNode (Dbb *dbb, IndexKey *indexKey, int32 recordNumber, IndexNode *node, IndexKey *priorKey, IndexKey *nextKey)
+{
+
+ UCHAR *key = indexKey->key;
+
+ if (dbb->debug & (DEBUG_KEYS | DEBUG_NODES_ADDED | DEBUG_PAGE_LEVEL))
+ {
+ Log::debug("\n***** addNode(%d) - ", recordNumber);
+ Btn::printKey ("New key", indexKey, 0, false);
+ }
+
+ if (dbb->debug & (DEBUG_PAGES | DEBUG_NODES_ADDED))
+ printPage (this, 0, false);
+ //validate(NULL);
// Compute delta size of new node
+ int offset1 = computePrefix (priorKey, indexKey);
+
+ // Check whether inserted node shall be supernode. If yes, does not compress the prefix.
+ bool addSuper = checkAddSuperNode(dbb->pageSize, node, indexKey, recordNumber, offset1);
+ if (addSuper)
+ offset1 = 0;
- int offset1 = computePrefix (&priorKey, indexKey);
int length1 = indexKey->keyLength - offset1;
int delta = IndexNode::nodeLength(offset1, length1, recordNumber);
int32 nextNumber;
int offset2;
- if ((UCHAR*) node.node == (UCHAR*) this + length)
+ if ((UCHAR*) node->node == (UCHAR*) this + length)
{
offset2 = -1;
nextNumber = 0;
}
else
{
- node.expandKey (&nextKey);
- offset2 = computePrefix(indexKey, &nextKey);
- int deltaOffset = offset2 - node.offset;
+ node->expandKey (nextKey);
+ offset2 = computePrefix(indexKey, nextKey);
+ int deltaOffset = offset2 - node->offset;
- if (node.length >= 128 && (node.length - deltaOffset) < 128)
+ if (node->length >= 128 && (node->length - deltaOffset) < 128)
--delta;
- if (node.offset < 128 && (node.offset + deltaOffset) >= 128)
+ if (node->offset < 128 && (node->offset + deltaOffset) >= 128)
++delta;
delta -= deltaOffset;
- nextNumber = node.getNumber();
+ nextNumber = node->getNumber();
}
// If new node doesn't fit on page, split the page now
if (length + delta > dbb->pageSize)
{
- if ((char*) node.nextNode < (char*) this + length)
+ if ((char*) node->nextNode < (char*) this + length)
return SplitMiddle;
- if (node.getNumber() == END_LEVEL)
+ if (node->getNumber() == END_LEVEL)
return SplitEnd;
- if (offset2 == -1 || nextKey.compare(indexKey) >= 0)
+ if (offset2 == -1 || nextKey->compare(indexKey) >= 0)
return NextPage;
return SplitEnd;
@@ -124,34 +138,45 @@ AddNodeResult IndexPage::addNode(Dbb *db
index, recordNumber, (CHAR*) node - (CHAR*) page);
***/
+ // Check whether there was a supernode at the insertion point
+ // If yes, delete it fow now. Supernode will be reinstalled at another position later,
+ // unless it gets prefix-compressed.
+ bool wasSuper = deleteSupernode(node->node);
+
// Slide tail of bucket forward to make room
if (offset2 >= 0)
{
- UCHAR *tail = (UCHAR*) node.nextNode;
+ UCHAR *tail = (UCHAR*) node->nextNode;
int tailLength = (int) ((UCHAR*) this + length - tail);
ASSERT (tailLength >= 0);
if (tailLength > 0)
- memmove (tail + delta, tail, tailLength);
+ moveMemory(tail + delta, tail);
}
// Insert new node
IndexNode newNode;
- newNode.insert (node.node, offset1, length1, key, recordNumber);
+ newNode.insert (node->node, offset1, length1, key, recordNumber);
// If necessary, rebuild next node
+ if(wasSuper && (offset2 == 0))
+ addSupernode(newNode.nextNode);
if (offset2 >= 0)
- newNode.insert (newNode.nextNode, offset2, nextKey.keyLength - offset2, nextKey.key, nextNumber);
+ newNode.insert (newNode.nextNode, offset2, nextKey->keyLength - offset2, nextKey->key, nextNumber);
+
+ if(addSuper)
+ addSupernode(node->node);
+
+
length += delta;
//validate(NULL);
if (dbb->debug & (DEBUG_PAGES | DEBUG_NODES_ADDED))
printPage (this, 0, false);
-
return NodeAdded;
}
@@ -168,6 +193,8 @@ Btn* IndexPage::findNodeInLeaf(IndexKey
UCHAR *key = indexKey->key;
UCHAR *keyEnd = key + indexKey->keyLength;
Btn *bucketEnd = getEnd();
+ Btn *super;
+
if (indexKey->keyLength == 0)
{
@@ -177,7 +204,9 @@ Btn* IndexPage::findNodeInLeaf(IndexKey
return nodes;
}
- IndexNode node (this);
+ super = findSupernode(level, indexKey, 0, 0);
+
+ IndexNode node(super);
for (;; node.getNext(bucketEnd))
{
@@ -240,6 +269,12 @@ Btn* IndexPage::findNodeInLeaf(IndexKey
if (foundKey)
foundKey->keyLength = priorLength;
+ if (foundKey && foundKey->keyLength==0 && node.node == super && node.node != nodes)
+ {
+ /* hit on left supernode, find prior key*/
+ findPriorNodeForSupernode(node.node,foundKey);
+ }
+
return node.node;
}
@@ -306,11 +341,13 @@ Bdb* IndexPage::splitIndexPageMiddle(Dbb
node = prevNode;
int tailLength = (int) (((UCHAR*) this + length) - (UCHAR*) node.nextNode);
-
+
+ int delta = (int)(node.nextNode - nodes);
if (tailLength < 0)
{
node.parseNode(nodes);
tailLength = (int) (((UCHAR*) this + length) - (UCHAR*) node.nextNode);
+ delta = (int) (node.nextNode - nodes);
}
// If we didn't find a break, use the last one. This may be the first node
@@ -328,6 +365,7 @@ Bdb* IndexPage::splitIndexPageMiddle(Dbb
int32 recordNumber = node.getNumber();
IndexNode newNode;
newNode.insert(split->nodes, 0, kLength, key, recordNumber);
+ int newNodeSize = IndexNode::nodeLength(0, kLength, recordNumber);
memcpy(newNode.nextNode, node.nextNode, tailLength);
@@ -344,7 +382,27 @@ Bdb* IndexPage::splitIndexPageMiddle(Dbb
if (level == 0)
splitKey->appendRecordNumber(recordNumber);
-
+
+
+
+ // Split supernodes
+ memset(split->superNodes,0, sizeof(split->superNodes));
+ int i = 0;
+ for (i = 0; nodes + superNodes[i] < node.node; i++) ;
+
+ int j = 0;
+ for (; i < SUPERNODES && superNodes[i]; i++)
+ {
+ short newVal = superNodes[i] - delta + newNodeSize;
+ ASSERT(newVal >=0);
+ if(newVal == 0)
+ continue;
+ split->superNodes[j++] = newVal;
+ superNodes[i] = 0;
+ }
+ //validate(NULL);
+ //split->validate(NULL);
+
if (dbb->debug & (DEBUG_PAGES | DEBUG_SPLIT_PAGE))
{
printPage (this, 0, false);
@@ -356,6 +414,7 @@ Bdb* IndexPage::splitIndexPageMiddle(Dbb
return splitBdb;
}
+
Btn* IndexPage::findNodeInBranch(IndexKey *searchKey, int32 searchRecordNumber)
{
ASSERT(level > 0);
@@ -366,11 +425,15 @@ Btn* IndexPage::findNodeInBranch(IndexKe
keyEnd -= searchKey->getAppendedRecordNumberLength();
Btn *bucketEnd = (Btn*) ((UCHAR*) this + length);
- Btn *prior = nodes;
+
UCHAR *key = searchKey->key;
- for (IndexNode node (this); node.node < bucketEnd; prior = node.node, node.getNext(bucketEnd))
+ Btn *super = findSupernode(level, searchKey, searchRecordNumber, 0);
+ Btn *prior = super;
+
+ for (IndexNode node(super) ; node.node < bucketEnd ; prior = node.node, node.getNext(bucketEnd))
{
+
if (node.offset < matchedOffset)
return prior;
@@ -418,7 +481,36 @@ void IndexPage::validate(void *before)
{
Btn *bucketEnd = getEnd();
UCHAR key [MAX_PHYSICAL_KEY_LENGTH];
- int keyLength = 0;
+ int priorKeyLength = 0;
+ int keyLength;
+ int recordNumber;
+ int priorRecordNumber=0;
+ int superIdx = 0;
+ int supernodeCount = 0;
+
+
+
+ for (int i=0; i< SUPERNODES; i++)
+ {
+ if(superNodes[i] ==0)
+ {
+ for(;i<SUPERNODES;i++)
+ if(superNodes[i] != 0)
+ FATAL("Index validate: unexpected non-zero supernode index");
+ break;
+ }
+ if (superNodes[i] < 0)
+ FATAL ("Index validate: negative supernode");
+
+ if (i> 0 && superNodes[i-1] >= superNodes[i])
+ FATAL ("Index validate: entries in supernode array not increasing");
+
+ if (nodes + superNodes[i] > bucketEnd)
+ FATAL ("Index validate: superNodes behind the end of page");
+
+ supernodeCount++;
+ }
+
for (IndexNode node (this);; node.getNext(bucketEnd))
{
@@ -435,19 +527,65 @@ void IndexPage::validate(void *before)
break;
}
-
- int l = MIN(keyLength, node.keyLength());
- int recordNumber = node.getNumber();
-
- if (recordNumber == END_BUCKET)
+
+ bool isSuper = false;
+ if(superIdx < SUPERNODES && superNodes[superIdx])
+ {
+ int delta = superNodes[superIdx] - (short) (node.node - nodes);
+ if(delta == 0)
+ {
+ superIdx++;
+ isSuper=true;
+ if(node.offset != 0)
+ FATAL ("Non-zero offset in supernode");
+ }
+ else if( delta < 0)
+ FATAL ("Index validate: next supernode offset < current offset");
+ }
+
+ keyLength = node.keyLength();
+ if(level > 0)
+ keyLength -= node.getAppendedRecordNumberLength();
+
+ if(isSuper && keyLength==0)
+ FATAL("Supernode with 0 key length");
+
+ int l = MIN(priorKeyLength, keyLength);
+ if (level == 0)
+ recordNumber = node.getNumber();
+ else
+ recordNumber = node.getAppendedRecordNumber();
+
+ if (node.number == END_BUCKET)
+ {
ASSERT(nextPage != 0);
+ break;
+ }
+ if (node.number == END_LEVEL)
+ break;
+
+ if (node.node != nodes && l>0 && node.length > 0)
+ {
+ if(node.offset == 0 && node.key[0] == key[0] && !isSuper)
+ FATAL("node not correctly prefix-compressed");
+ }
+
+
+ if(node.length == 0 && recordNumber < priorRecordNumber)
+ FATAL ("Index validate: record numbers out of order");
+
+
+ bool equalKeys = (priorKeyLength == keyLength);
for (UCHAR *p = node.key, *q = key + node.offset, *end = key + l; q < end; p++, q++)
{
if (*p > *q) // current > previous, good
+ {
+ equalKeys = false;
break;
+ }
else if (*p == *q) // current == previous, good so far
- continue;
+ continue;
else if (*p < *q) // current < previous, bad
{
if (before)
@@ -457,9 +595,15 @@ void IndexPage::validate(void *before)
FATAL("Mal-formed index page");
}
}
-
- keyLength = node.expandKey(key);
+ if( isSuper && equalKeys)
+ FATAL("supernode key equal to previous node");
+ node.expandKey(key);
+ priorKeyLength = keyLength;
+ priorRecordNumber = recordNumber;
}
+
+ if (superIdx != supernodeCount)
+ FATAL("lost supernodes");
}
void IndexPage::printPage(Bdb * bdb, bool inversion)
@@ -579,6 +723,8 @@ int IndexPage::deleteNode(Dbb * dbb, Ind
{
IndexNode next (node.nextNode);
+ deleteSupernode(node.node);
+
if (next.node >= bucketEnd)
length = (int) ((char*) node.node - (char*) this);
else
@@ -592,9 +738,20 @@ int IndexPage::deleteNode(Dbb * dbb, Ind
Btn *tail = next.nextNode;
int tailLength = (int) ((char*) bucketEnd - (char*) tail);
+
+
// Compute new prefix length; rebuild node
int prefix = computePrefix(&priorKey, &nextKey);
+
+ // Check whether next node is a supernode.
+ // If yes, delete it for now. Supernode will be reinstalled later,
+ // if it does not get prefix-compressed.
+
+ Btn *nextSuper = 0;
+ if(deleteSupernode(node.nextNode) && (prefix == 0) && node.node != nodes)
+ nextSuper = node.node;
+
int32 num = next.getNumber();
node.insert(node.node, prefix, nextKey.keyLength - prefix, nextKey.key, num);
@@ -603,7 +760,10 @@ int IndexPage::deleteNode(Dbb * dbb, Ind
Btn *newTail = node.nextNode;
if (tailLength > 0)
- memmove(newTail, tail, tailLength);
+ moveMemory(newTail, tail);
+
+ if(nextSuper)
+ addSupernode(nextSuper);
length = (int) ((char*) newTail + tailLength - (char*) this);
//validate(NULL);
@@ -703,7 +863,7 @@ void IndexPage::printPage(IndexPage *pag
page->nextPage,
page->parentPage);
- int length = (UCHAR*) node.node - (UCHAR*) page;
+ int length = (int)((UCHAR*) node.node - (UCHAR*) page);
if (length != page->length)
Log::debug ("**** bad bucket length -- should be %d ***\n", length);
@@ -972,8 +1132,11 @@ Bdb* IndexPage::splitIndexPageEnd(Dbb *d
int nodeOverhead = 3 + (offset >= 128) + (insertKey->keyLength - offset >= 128);
char *nextNextNode = (char*) node.nextNode - node.length + insertKey->keyLength - offset + nodeOverhead;
+
+ Btn *splitPos;
if (nextNextNode >= (char*) this + dbb->pageSize)
{
+ splitPos = priorNode;
node.parseNode(priorNode);
int number = node.getNumber();
//node.setNumber(END_BUCKET);
@@ -983,13 +1146,21 @@ Bdb* IndexPage::splitIndexPageEnd(Dbb *d
}
else
{
+ splitPos = node.node;
int offset = computePrefix(&tempKey, insertKey);
+ deleteSupernode(node.node);
node.insert(node.node, offset, insertKey->keyLength - offset, insertKey->key, END_BUCKET);
length = (int) ((char*)(node.nextNode) - (char*) this);
}
+ for(int i=0; i< SUPERNODES;i++)
+ if(nodes + superNodes[i] >= splitPos)
+ superNodes[i]=0;
+
+ //validate(NULL);
split->appendNode(insertKey, recordNumber, dbb->pageSize);
split->appendNode(&rolloverKey, rolloverNumber, dbb->pageSize);
+ //split->validate(NULL);
//printf ("splitIndexPage: level %d, %d -> %d\n", level, bdb->pageNumber, nextPage);
//validate( splitBdb->buffer);
@@ -1004,6 +1175,7 @@ Bdb* IndexPage::splitIndexPageEnd(Dbb *d
return splitBdb;
}
+
Btn* IndexPage::appendNode(IndexKey *newKey, int32 recordNumber, int pageSize)
{
IndexKey key(newKey->index);
@@ -1085,21 +1257,37 @@ void IndexPage::logIndexPage(Bdb *bdb, T
dbb->serialLog->logControl->indexPage.append(dbb, transId, INDEX_VERSION_1, bdb->pageNumber, indexPage->level,
indexPage->parentPage, indexPage->priorPage, indexPage->nextPage,
- indexPage->length - OFFSET (IndexPage*, nodes),
- (const UCHAR*) indexPage->nodes);
+ indexPage->length - OFFSET (IndexPage*, superNodes),
+ (const UCHAR*) indexPage->superNodes);
}
-
-Btn* IndexPage::findInsertionPoint(int level, IndexKey* indexKey, int32 recordNumber, IndexKey* expandedKey, Btn* nodes, Btn* bucketEnd)
+
+Btn* IndexPage::findInsertionPoint(int level, IndexKey* indexKey, int32 recordNumber, IndexKey* expandedKey, Btn* from, Btn* bucketEnd)
{
UCHAR *key = indexKey->key;
const UCHAR *keyEnd = key + indexKey->keyLength;
- IndexNode node(nodes);
uint matchedOffset = 0;
uint priorLength = 0;
if (level && indexKey->recordNumber)
keyEnd -= indexKey->getAppendedRecordNumberLength();
+
+ Btn *super = findSupernode(level, indexKey,(level > 0)?indexKey->recordNumber:recordNumber ,from);
+ IndexNode node;
+ if(super > from)
+ node.parseNode(super);
+ else
+ node.parseNode(from);
+
+
+ if(node.node != nodes)
+ {
+ node.expandKey (expandedKey);
+ priorLength = expandedKey->keyLength;
+ if (level)
+ expandedKey->recordNumber = node.getAppendedRecordNumber();
+ }
+
if (node.offset)
{
while ((matchedOffset < node.offset) &&
@@ -1190,6 +1378,12 @@ Btn* IndexPage::findInsertionPoint(int l
expandedKey->keyLength = priorLength;
+ if (expandedKey && expandedKey->keyLength==0 && node.node == super && node.node != nodes)
+ {
+ /* hit on start supernode, find prior key */
+ findPriorNodeForSupernode(node.node,expandedKey);
+ }
+
return node.node;
}
@@ -1197,3 +1391,219 @@ Btn* IndexPage::getEnd(void)
{
return (Btn*) ((UCHAR*) this + length);
}
+
+/* During node insertion, check whether supernode should be added at insertion point */
+bool IndexPage::checkAddSuperNode(int pageSize, IndexNode* node, IndexKey *indexKey, int recordNumber, int offset)
+{
+ Btn *insertionPoint = node->node;
+
+ if (insertionPoint == nodes) // no supernode at the start of the page
+ return false;
+
+ if (offset >= pageSize/SUPERNODES/2) //compression is too good, no super
+ return false;
+
+ if (superNodes[SUPERNODES-1]) // supernode array is full
+ return false;
+
+
+ int keyLen = indexKey->keyLength;
+
+ if(level)
+ keyLen -= indexKey->getAppendedRecordNumberLength();
+
+ if (keyLen == offset) // no supernode on duplicate entry (yet)
+ return false;
+
+ int offsetToEntry = (int)(insertionPoint - nodes);
+
+
+ int i;
+ // Find superNodes corresponding to insertion point
+ for(i = 0; i< SUPERNODES;i++)
+ {
+ if (!superNodes[i])
+ break;
+ if (nodes + superNodes[i] >= insertionPoint)
+ break;
+ }
+
+ // Avoid 2 superNodes adjacent to each other
+ if (nodes + superNodes[i] == insertionPoint)
+ return false;
+
+ int nodeLen =
+ IndexNode::nodeLength(offset, indexKey->keyLength - offset, recordNumber);
+
+ /* Ideal distance between supernodes , assume page will get 90% full*/
+ int idealDistance = pageSize/(SUPERNODES+1)*9/10;
+
+
+ int distLeft, distRight;
+
+ distLeft = offsetToEntry;
+
+ if (i > 0)
+ distLeft -= superNodes[i-1];
+
+ // last node, make supernode, if distance from prior >= idealDistance
+ if(superNodes[i] == 0)
+ {
+ if (distLeft >= idealDistance)
+ return true;
+ return false;
+ }
+
+ // between two supernodes, make new one , if distance
+ // from both neighbours is at least idealSize*0.5
+
+ distRight =
+ superNodes[i] + nodeLen - offsetToEntry; // from right super
+ ASSERT(distRight >=0 );
+
+ // Distance between existing supernodes
+ int d= (i > 0)? superNodes[i] - superNodes[i-1] + nodeLen :superNodes[0] + nodeLen;
+
+ if ( d >= idealDistance*3/2 && distLeft >= idealDistance/2 && distRight >= idealDistance/2 )
+ return true;
+
+ return false;
+}
+
+// Helper routine, used with findNodeInLeaf/Branch and findInsertionPoint
+Btn* IndexPage::findPriorNodeForSupernode(Btn *where,IndexKey *priorKey)
+{
+ Btn *prior = nodes;
+ bool found = false;
+
+
+ // Scan supernode array for prior super
+ for(int i=0; i < SUPERNODES && superNodes[i]; i++)
+ {
+ Btn* current = nodes + superNodes[i];
+ if(current == where)
+ {
+ found = true;
+ break;
+ }
+ prior = current;
+ }
+
+ ASSERT(found);
+
+ IndexNode priorNode(prior);
+ for (;; priorNode.getNext(where))
+ {
+ ASSERT(priorNode.nextNode <= where);
+ priorNode.expandKey(priorKey);
+ if (priorNode.nextNode == where)
+ return priorNode.node;
+ }
+
+ ASSERT(false);
+}
+
+
+// Move nodes, adjust supernode array
+void IndexPage::moveMemory(void *dst,void *src)
+{
+ UCHAR* end = (UCHAR *)getEnd();
+ size_t size = (dst > src)?(end - (UCHAR *)src) : (end-(UCHAR *)dst);
+
+ memmove(dst, src, size);
+
+ short delta = (short)((UCHAR *)dst - (UCHAR *)src);
+
+ for(int i=0; i< SUPERNODES && superNodes[i]; i++)
+ if(nodes + superNodes[i] >= src)
+ superNodes[i] += delta;
+}
+
+// Add entry to supernode array
+void IndexPage::addSupernode(Btn *where)
+{
+ int i;
+ short offset = (short)((UCHAR *)where - (UCHAR *)nodes);
+ for (i=0; i< SUPERNODES && superNodes[i] ;i++)
+ {
+ ASSERT(superNodes[i] != offset);
+ if(superNodes[i] > offset)
+ {
+ if(i != SUPERNODES -1)
+ memmove(&superNodes[i+1],&superNodes[i], (SUPERNODES-i-1)*sizeof(superNodes[0]));
+ break;
+ }
+ }
+ ASSERT(i < SUPERNODES);
+ superNodes[i] = (short)((UCHAR *)where - (UCHAR*)nodes);
+}
+
+
+// Delete entry from supernode array.
+// Returns true if supernode was present at given position and false otherwise
+bool IndexPage::deleteSupernode(Btn *where)
+{
+ short offset = (short)((UCHAR *)where - (UCHAR *)nodes);
+ ASSERT(where > 0);
+ for (int i=0; i< SUPERNODES && superNodes[i];i++)
+ {
+ if(superNodes[i] == offset)
+ {
+ if (i != SUPERNODES -1)
+ memmove(&superNodes[i],&superNodes[i+1], (SUPERNODES-i-1)*sizeof(superNodes[0]));
+ superNodes[SUPERNODES-1] = 0;
+ return true;
+ }
+ }
+ return false;
+}
+
+
+// Given a key, find the maximum supernode smaller or equal to the search key
+// Optional parameter "after" tells to skip supernodes smaller than its value
+
+Btn * IndexPage::findSupernode(int level, IndexKey *searchKey, int32 recordNumber, Btn *after)
+{
+
+ int keylen = searchKey->keyLength;
+ UCHAR* key = searchKey->key;
+
+ Btn *retval = nodes;
+
+ for(int i=0; i< SUPERNODES && superNodes[i];i++)
+ {
+ Btn *pos = nodes +superNodes[i];
+
+ if (after && (pos < after))
+ continue;
+
+ IndexNode node(pos);
+ int len = node.keyLength();
+
+ if (level)
+ len =- node.getAppendedRecordNumberLength();
+
+ int result = memcmp(key, node.key, MIN(keylen,len));
+
+ if (result == 0)
+ result = keylen - len;
+
+
+ if (result == 0)
+ {
+ int number= (level == 0)? node.number : node.getAppendedRecordNumber();
+ if ( (level > 0 && recordNumber == 0) || number == END_BUCKET)
+ result = -1;
+ else
+ result = recordNumber - number;
+ }
+
+ if (result < 0)
+ break;
+
+ retval = node.node;
+ }
+ return retval;
+}
+
+
diff -Nrup a/storage/falcon/IndexPage.h b/storage/falcon/IndexPage.h
--- a/storage/falcon/IndexPage.h 2008-03-12 13:15:13 +01:00
+++ b/storage/falcon/IndexPage.h 2008-03-17 18:34:32 +01:00
@@ -40,11 +40,13 @@ class IndexPage : public Page
{
public:
AddNodeResult addNode (Dbb *dbb, IndexKey *key, int32 recordNumber);
+ AddNodeResult addNode (Dbb *dbb, IndexKey *key, int32 recordNumber,IndexNode *node, IndexKey *priorKey, IndexKey *nextKey);
Btn* appendNode (IndexKey *indexKey, int32 recordNumber, int pageSize);
int deleteNode (Dbb *dbb, IndexKey *key, int32 recordNumber);
Btn* findNodeInBranch (IndexKey *indexKey, int32 recordNumber);
Btn* findNodeInLeaf (IndexKey *key, IndexKey *foundKey);
Btn* findInsertionPoint (IndexKey *indexKey, int32 recordNumber, IndexKey *expandedKey);
+ Btn* findInsertionPoint(int level, IndexKey* indexKey, int32 recordNumber, IndexKey* expandedKey, Btn* from, Btn* bucketEnd);
Bdb* splitPage (Dbb *dbb, Bdb *bdb, TransId transId);
Bdb* splitIndexPageEnd(Dbb *dbb, Bdb *bdb, TransId transId, IndexKey *insertKey, int recordNumber);
Bdb* splitIndexPageMiddle(Dbb *dbb, Bdb *bdb, IndexKey *splitKey, TransId transId);
@@ -55,6 +57,13 @@ public:
void validateNodes (Dbb *dbb, Validation *validation, Bitmap *children, int32 parentPageNumber);
void validate (Dbb *dbb, Validation *validation, Bitmap *pages, int32 parentPageNumber);
void validate(void *before);
+ bool checkAddSuperNode(int pageSize, IndexNode* node, IndexKey *indexKey, int recordNumber, int offset);
+ void moveMemory(void *dst,void *src);
+ void addSupernode(Btn *where);
+ bool deleteSupernode(Btn *where);
+ Btn* findSupernode(int level, IndexKey *searchKey, int32 recordNumber, Btn *after);
+ Btn* findPriorNodeForSupernode(Btn *where,IndexKey *priorKey);
+
static int computePrefix (IndexKey *key1, IndexKey *key2);
static Bdb* findLevel (Dbb *dbb, int32 indexId, Bdb *bdb, int level, IndexKey *indexKey, int32 recordNumber);
@@ -67,7 +76,7 @@ public:
static void printNode(IndexPage *page, int32 pageNumber, Btn *node, bool inversion = false);
static int32 getRecordNumber(const UCHAR *ptr);
static void logIndexPage (Bdb *bdb, TransId transId);
- static Btn* findInsertionPoint(int level, IndexKey* indexKey, int32 recordNumber, IndexKey* expandedKey, Btn* nodes, Btn* bucketEnd);
+
int32 parentPage;
int32 priorPage;
diff -Nrup a/storage/falcon/IndexRootPage.cpp b/storage/falcon/IndexRootPage.cpp
--- a/storage/falcon/IndexRootPage.cpp 2008-03-12 13:15:13 +01:00
+++ b/storage/falcon/IndexRootPage.cpp 2008-03-17 18:34:33 +01:00
@@ -545,6 +545,7 @@ bool IndexRootPage::splitIndexPage(Dbb *
page->level = splitPage->level + 1;
page->version = splitPage->version;
page->nextPage = 0;
+ memset(page->superNodes, 0, sizeof(page->superNodes));
IndexKey dummy(indexKey->index);
dummy.keyLength = 0;
page->length = OFFSET (IndexPage*, nodes);
@@ -782,7 +783,7 @@ void IndexRootPage::debugBucket(Dbb *dbb
bdb->release(REL_HISTORY);
}
-void IndexRootPage::redoIndexPage(Dbb* dbb, int32 pageNumber, int32 parentPage, int level, int32 priorPage, int32 nextPage, int length, const UCHAR *data)
+void IndexRootPage::redoIndexPage(Dbb* dbb, int32 pageNumber, int32 parentPage, int level, int32 priorPage, int32 nextPage, int length, const UCHAR *data, bool haveSuperNodes)
{
//Log::debug("redoIndexPage %d -> %d -> %d level %d, parent %d)\n", priorPage, pageNumber, nextPage, level, parentPage);
Bdb *bdb = dbb->fakePage(pageNumber, PAGE_any, 0);
@@ -805,8 +806,19 @@ void IndexRootPage::redoIndexPage(Dbb* d
indexPage->parentPage = parentPage;
indexPage->nextPage = nextPage;
indexPage->priorPage = priorPage;
- indexPage->length = length + (int32) OFFSET (IndexPage*, nodes);
- memcpy(indexPage->nodes, data, length);
+
+ if(haveSuperNodes)
+ {
+ indexPage->length = length + (int32) OFFSET (IndexPage*, superNodes);
+ memcpy(indexPage->superNodes, data, length);
+ }
+ else
+ {
+ indexPage->length = length + (int32) OFFSET (IndexPage*, nodes);
+ memcpy(indexPage->nodes, data, length);
+ memset(indexPage->superNodes, 0, sizeof(indexPage->superNodes));
+ }
+
// If we have a parent page, propogate the first node upward
@@ -988,33 +1000,9 @@ void IndexRootPage::indexMerge(Dbb *dbb,
++duplicates;
else
{
- int offset1 = IndexPage::computePrefix (&priorKey, &key);
- int length1 = key.keyLength - offset1;
- int delta = IndexNode::nodeLength(offset1, length1, recordNumber);
- int32 nextNumber = 0;
- int offset2;
-
- if ((UCHAR*) node.node == (UCHAR*) page + page->length)
- offset2 = -1;
- else
- {
- node.expandKey(&nextKey);
- offset2 = IndexPage::computePrefix(&key, &nextKey);
- int deltaOffset = offset2 - node.offset;
-
- if (node.length >= 128 && (node.length - deltaOffset) < 128)
- --delta;
-
- if (node.offset < 128 && (node.offset + deltaOffset) >= 128)
- ++delta;
-
- delta -= deltaOffset;
- nextNumber = node.getNumber();
- }
-
- // If node doesn't fit, punt and let someone else do it
-
- if (page->length + delta > dbb->pageSize)
+ AddNodeResult result = page->addNode(dbb, &key, recordNumber, &node, &priorKey, &nextKey);
+ if (result != NodeAdded)
+ // If node doesn't fit, punt and let someone else do it
{
bdb->release(REL_HISTORY);
bdb = NULL;
@@ -1024,40 +1012,11 @@ void IndexRootPage::indexMerge(Dbb *dbb,
if ( (recordNumber = logRecord->nextKey(&key)) == -1)
//return;
goto exit;
-
break;
}
-
- // Add insert node into page
-
- if (offset2 >= 0)
- {
- UCHAR *tail = (UCHAR*) node.nextNode;
- int tailLength = (int) ((UCHAR*) page + page->length - tail);
- ASSERT (tailLength >= 0);
-
- if (tailLength > 0)
- memmove (tail + delta, tail, tailLength);
- }
-
- // Insert new node
-
- ++insertions;
- IndexNode newNode;
- newNode.insert(node.node, offset1, length1, key.key, recordNumber);
-
- // If necessary, rebuild next node
-
- if (offset2 >= 0)
- newNode.insert(newNode.nextNode, offset2, nextKey.keyLength - offset2, nextKey.key, nextNumber);
-
- page->length += delta;
- //page->validate(NULL);
-
- if (dbb->debug & (DEBUG_PAGES | DEBUG_INDEX_MERGE))
- page->printPage(bdb, false);
+ else /* node inserted */
+ ++insertions;
}
-
priorKey.setKey(&key);
// Get next key
@@ -1074,7 +1033,7 @@ void IndexRootPage::indexMerge(Dbb *dbb,
// Find the next insertion point, compute the next key, etc.
bucketEnd = (Btn*) ((char*) page + page->length);
- node.parseNode(IndexPage::findInsertionPoint(0, &key, recordNumber, &priorKey, node.node, bucketEnd));
+ node.parseNode(page->findInsertionPoint(0, &key, recordNumber, &priorKey, node.node, bucketEnd));
nextKey.setKey(0, node.offset, priorKey.key);
node.expandKey(&nextKey);
number = node.getNumber();
diff -Nrup a/storage/falcon/IndexRootPage.h b/storage/falcon/IndexRootPage.h
--- a/storage/falcon/IndexRootPage.h 2007-09-20 17:41:48 +02:00
+++ b/storage/falcon/IndexRootPage.h 2008-03-17 18:34:33 +01:00
@@ -56,7 +56,7 @@ public:
static void analyzeIndex(Dbb* dbb, int indexId, IndexAnalysis *indexAnalysis);
static int32 getIndexRoot(Dbb* dbb, int indexId);
- static void redoIndexPage(Dbb* dbb, int32 pageNumber, int32 parentPageNumber, int level, int32 prior, int32 next, int length, const UCHAR *data);
+ static void redoIndexPage(Dbb* dbb, int32 pageNumber, int32 parentPageNumber, int level, int32 prior, int32 next, int length, const UCHAR *data, bool haveSuperNodes);
static void setIndexRoot(Dbb* dbb, int indexId, int32 pageNumber, TransId transId);
static void redoIndexDelete(Dbb* dbb, int indexId);
static void redoCreateIndex(Dbb* dbb, int indexId);
diff -Nrup a/storage/falcon/SRLIndexPage.cpp b/storage/falcon/SRLIndexPage.cpp
--- a/storage/falcon/SRLIndexPage.cpp 2008-02-13 20:40:21 +01:00
+++ b/storage/falcon/SRLIndexPage.cpp 2008-03-17 18:34:34 +01:00
@@ -110,7 +110,11 @@ void SRLIndexPage::pass2()
break;
case INDEX_VERSION_1:
- IndexRootPage::redoIndexPage(log->getDbb(tableSpaceId), pageNumber, parent, level, prior, next, length, data);
+ {
+ bool haveSuperNodes = (control->version >=srlVersion14);
+ IndexRootPage::redoIndexPage(log->getDbb(tableSpaceId), pageNumber, parent, level, prior, next, length, data,
+ haveSuperNodes);
+ }
break;
default:
diff -Nrup a/storage/falcon/SRLVersion.h b/storage/falcon/SRLVersion.h
--- a/storage/falcon/SRLVersion.h 2008-02-14 22:05:58 +01:00
+++ b/storage/falcon/SRLVersion.h 2008-03-17 18:34:34 +01:00
@@ -40,7 +40,8 @@ static const int srlVersion10 = 10; //
static const int srlVersion11 = 11; // Added table space type (repository support) December 4, 2007
static const int srlVersion12 = 12; // Added index version number to SRLIndexPage February 13, 2008
static const int srlVersion13 = 13; // Added savepoint id to SRLUpdateRecords February 14, 2008
-static const int srlCurrentVersion = srlVersion13;
+static const int srlVersion14 = 14; // Added supernodes logging March 7, 2008
+static const int srlCurrentVersion = srlVersion14;
class SRLVersion : public SerialLogRecord
{
| Thread |
|---|
| • bk commit into 6.0 tree (vvaintroub:1.2595) WL#3763 | vvaintroub | 17 Mar |