Below is the list of changes that have just been committed into a local
5.1 repository of knielsen. When knielsen does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-05-25 15:07:03+02:00, knielsen@ymer.(none) +14 -0
NdbRecord interpreted update + conflict resolution.
sql/ha_ndbcluster.cc@stripped, 2007-05-25 15:06:57+02:00, knielsen@ymer.(none) +194 -30
NdbRecord interpreted update + conflict resolution.
sql/ha_ndbcluster.h@stripped, 2007-05-25 15:06:57+02:00, knielsen@ymer.(none) +8 -0
NdbRecord interpreted update + conflict resolution.
sql/ha_ndbcluster_binlog.cc@stripped, 2007-05-25 15:06:57+02:00, knielsen@ymer.(none) +7 -2
NdbRecord interpreted update + conflict resolution.
sql/mysqld.cc@stripped, 2007-05-25 15:06:57+02:00, knielsen@ymer.(none) +7 -0
NdbRecord interpreted update + conflict resolution.
storage/ndb/include/Makefile.am@stripped, 2007-05-25 15:06:57+02:00, knielsen@ymer.(none) +2 -1
NdbRecord interpreted update + conflict resolution.
storage/ndb/include/ndbapi/NdbInterpretedCode.hpp@stripped, 2007-05-25 15:06:58+02:00, knielsen@ymer.(none) +131 -0
New BitKeeper file ``storage/ndb/include/ndbapi/NdbInterpretedCode.hpp''
storage/ndb/include/ndbapi/NdbInterpretedCode.hpp@stripped, 2007-05-25 15:06:58+02:00, knielsen@ymer.(none) +0 -0
storage/ndb/include/ndbapi/NdbOperation.hpp@stripped, 2007-05-25 15:06:57+02:00, knielsen@ymer.(none) +3 -0
NdbRecord interpreted update + conflict resolution.
storage/ndb/include/ndbapi/NdbTransaction.hpp@stripped, 2007-05-25 15:06:57+02:00, knielsen@ymer.(none) +9 -2
NdbRecord interpreted update + conflict resolution.
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp@stripped, 2007-05-25 15:06:57+02:00, knielsen@ymer.(none) +0 -13
NdbRecord interpreted update + conflict resolution.
storage/ndb/src/ndbapi/Makefile.am@stripped, 2007-05-25 15:06:58+02:00, knielsen@ymer.(none) +2 -1
NdbRecord interpreted update + conflict resolution.
storage/ndb/src/ndbapi/NdbInterpretedCode.cpp@stripped, 2007-05-25 15:06:58+02:00, knielsen@ymer.(none) +158 -0
New BitKeeper file ``storage/ndb/src/ndbapi/NdbInterpretedCode.cpp''
storage/ndb/src/ndbapi/NdbInterpretedCode.cpp@stripped, 2007-05-25 15:06:58+02:00, knielsen@ymer.(none) +0 -0
storage/ndb/src/ndbapi/NdbOperation.cpp@stripped, 2007-05-25 15:06:58+02:00, knielsen@ymer.(none) +1 -0
NdbRecord interpreted update + conflict resolution.
storage/ndb/src/ndbapi/NdbOperationExec.cpp@stripped, 2007-05-25 15:06:58+02:00, knielsen@ymer.(none) +41 -1
NdbRecord interpreted update + conflict resolution.
storage/ndb/src/ndbapi/NdbTransaction.cpp@stripped, 2007-05-25 15:06:58+02:00, knielsen@ymer.(none) +18 -3
NdbRecord interpreted update + conflict resolution.
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: knielsen
# Host: ymer.(none)
# Root: /usr/local/mysql/mysql-5.1-telco-ndbrecord
--- 1.643/sql/mysqld.cc 2007-05-25 15:07:11 +02:00
+++ 1.644/sql/mysqld.cc 2007-05-25 15:07:11 +02:00
@@ -379,6 +379,7 @@ ulong ndb_extra_logging;
#ifdef HAVE_NDB_BINLOG
ulong ndb_report_thresh_binlog_epoch_slip;
ulong ndb_report_thresh_binlog_mem_usage;
+my_bool opt_ndb_log_update_as_write;
#endif
extern const char *ndb_distribution_names[];
@@ -4927,6 +4928,7 @@ enum options_mysqld
OPT_NDB_REPORT_THRESH_BINLOG_EPOCH_SLIP,
OPT_NDB_REPORT_THRESH_BINLOG_MEM_USAGE,
OPT_NDB_USE_COPYING_ALTER_TABLE,
+ OPT_NDB_LOG_UPDATE_AS_WRITE,
OPT_SKIP_SAFEMALLOC,
OPT_TEMP_POOL, OPT_TX_ISOLATION, OPT_COMPLETION_TYPE,
OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS,
@@ -5509,6 +5511,11 @@ master-ssl",
(gptr*) &opt_ndb_shm,
(gptr*) &opt_ndb_shm,
0, GET_BOOL, OPT_ARG, OPT_NDB_SHM_DEFAULT, 0, 0, 0, 0, 0},
+ {"ndb-log-update-as-write", OPT_NDB_LOG_UPDATE_AS_WRITE,
+ "",
+ (gptr*) &opt_ndb_log_update_as_write,
+ (gptr*) &opt_ndb_log_update_as_write,
+ 0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
{"ndb-optimized-node-selection", OPT_NDB_OPTIMIZED_NODE_SELECTION,
"Select nodes for transactions in a more optimal way.",
(gptr*) &opt_ndb_optimized_node_selection,
--- New file ---
+++ storage/ndb/include/ndbapi/NdbInterpretedCode.hpp 07/05/25 15:06:58
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef NdbInterpretedCode_H
#define NdbInterpretedCode_H
#include <ndb_global.h>
#include <ndb_types.h>
#include "NdbDictionary.hpp"
class NdbTableImpl;
class NdbColumnImpl;
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/*
@brief Stand-alone interpreted programs, for use with NdbRecord
@details This class is used to prepare an NDB interpreted program for use
in operations created using NdbRecord.
The caller supplies the memory buffer to be used for the operations. If
this is too small, an error will be returned when attempting to add
instructions. The caller must deallocate the buffer if necessary after
the InterpretedCode object is destroyed.
Methods have minimal error checks, for efficiency.
Note that this is currently considered an internal interface, and subject
to change without notice.
ToDo: We should add placeholders to this, so that one can use a single
InterpretedCode object to create many operations from different embedded
constants. Once we do this, we could add more error checks, as the cost
would then not be paid for each operation creation.
*/
class NdbInterpretedCode
{
private:
friend class NdbOperation;
static const Uint32 MaxReg= 8;
Uint32 *m_buffer;
const Uint32 m_buffer_length; // In words
const Uint32 m_number_of_labels;
/* Number of words used for instructions. */
Uint32 m_instructions_length;
/*
Number of words left in buffer.
This can be smaller than m_buffer_length-m_instructions_length, as
the end of the buffer is used to store jump and label addresses for
back-patching jumps.
*/
Uint32 m_available_length;
Uint32 m_number_of_jumps;
enum Flags {
/*
We will set m_got_error if an error occurs, so that we can
refuse to create an operation from InterpretedCode that the user
forgot to do error checks on.
*/
GotError= 0x1,
/* Set if reading disk column. */
UsesDisk= 0x2
};
Uint32 m_flags;
int error()
{
m_flags|= GotError;
return -1;
}
int add1(Uint32 x1)
{
if (unlikely(m_available_length < 1))
return error();
Uint32 current = m_instructions_length;
m_buffer[current ] = x1;
m_instructions_length = current + 1;
m_available_length--;
return 0;
}
int add2(Uint32 x1, Uint32 x2)
{
if (unlikely(m_available_length < 2))
return error();
Uint32 current = m_instructions_length;
m_buffer[current ] = x1;
m_buffer[current + 1] = x2;
m_instructions_length = current + 2;
m_available_length -= 2;
return 0;
}
int add_branch(Uint32 type,
Uint32 RegLvalue, Uint32 RegRvalue, Uint32 Label);
int read_attr_impl(const NdbColumnImpl *c, Uint32 RegDest);
public:
NdbInterpretedCode(Uint32 *buffer, Uint32 buffer_word_size,
Uint32 num_labels);
int load_const_u32(Uint32 RegDest, Uint32 Constant);
int read_attr(const NdbDictionary::Table *table, Uint32 attrId,
Uint32 RegDest);
int read_attr(const NdbDictionary::Column *column, Uint32 RegDest);
int branch_gt(Uint32 RegLvalue, Uint32 RegRvalue, Uint32 Label);
int branch_eq(Uint32 RegLvalue, Uint32 RegRvalue, Uint32 Label);
int branch_le(Uint32 RegLvalue, Uint32 RegRvalue, Uint32 Label);
int def_label(int tLabelNo);
int interpret_exit_nok(Uint32 ErrorCode);
int interpret_exit_ok();
int backpatch();
};
#endif
#endif
--- New file ---
+++ storage/ndb/src/ndbapi/NdbInterpretedCode.cpp 07/05/25 15:06:58
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "NdbInterpretedCode.hpp"
#include "Interpreter.hpp"
#include "NdbDictionaryImpl.hpp"
int NdbInterpretedCode::add_branch(Uint32 type, Uint32 RegLvalue,
Uint32 RegRvalue, Uint32 Label)
{
/* NOTE: subroutines not supported yet. */
/* Store the jump location for backpatching. */
if (m_available_length < 2 ||
m_instructions_length > 0xffff ||
Label > 0xffff)
return error();
m_number_of_jumps++;
m_available_length--;
Uint32 pos= m_buffer_length - (m_number_of_labels + m_number_of_jumps);
m_buffer[pos]= m_instructions_length | (Label << 16);
return add1(Interpreter::Branch(type, RegLvalue, RegRvalue));
}
NdbInterpretedCode::NdbInterpretedCode(Uint32 *buffer, Uint32 buffer_word_size,
Uint32 num_labels) :
m_buffer(buffer),
m_buffer_length(buffer_word_size),
m_number_of_labels(num_labels),
m_instructions_length(0),
m_available_length(m_buffer_length),
m_number_of_jumps(0),
m_flags(0)
{
Uint32 i;
/*
At the end of the buffer we store the address of all labels as they are
defined, so that we can back-patch forward jumps later.
If the supplied buffer is too small, we cannot fail in the constructor,
but all attempts to add instructions will fail due to full buffer.
*/
for (i = 0; i < num_labels && i < buffer_word_size; i++)
{
buffer[buffer_word_size - (i + 1)] = ~(Uint32)0;
m_available_length--;
}
}
int NdbInterpretedCode::load_const_u32(Uint32 RegDest, Uint32 Constant)
{
return add2(Interpreter::LoadConst32(RegDest % MaxReg), Constant);
}
int NdbInterpretedCode::read_attr_impl(const NdbColumnImpl *c, Uint32 RegDest)
{
if (c->m_storageType == NDB_STORAGETYPE_DISK)
m_flags|= UsesDisk;
return add1(Interpreter::Read(c->m_attrId, RegDest % MaxReg));
}
int NdbInterpretedCode::read_attr(const NdbDictionary::Table *table,
Uint32 attrId, Uint32 RegDest)
{
const NdbTableImpl *t= & NdbTableImpl::getImpl(*table);
const NdbColumnImpl *c= t->getColumn(attrId);
if (unlikely(c == NULL))
return error();
return read_attr_impl(c, RegDest);
}
int NdbInterpretedCode::read_attr(const NdbDictionary::Column *column,
Uint32 RegDest)
{
return read_attr_impl(&NdbColumnImpl::getImpl(*column), RegDest);
}
int NdbInterpretedCode::branch_gt(Uint32 RegLvalue, Uint32 RegRvalue,
Uint32 Label)
{
return add_branch(Interpreter::BRANCH_GT_REG_REG,
RegLvalue, RegRvalue, Label);
}
int NdbInterpretedCode::branch_eq(Uint32 RegLvalue, Uint32 RegRvalue,
Uint32 Label)
{
return add_branch(Interpreter::BRANCH_EQ_REG_REG,
RegLvalue, RegRvalue, Label);
}
int NdbInterpretedCode::branch_le(Uint32 RegLvalue, Uint32 RegRvalue,
Uint32 Label)
{
return add_branch(Interpreter::BRANCH_LE_REG_REG,
RegLvalue, RegRvalue, Label);
}
int NdbInterpretedCode::def_label(int tLabelNo)
{
if (tLabelNo < 0 ||
(Uint32)tLabelNo >= m_number_of_labels)
return error();
Uint32 pos= m_buffer_length - (tLabelNo + 1);
/* Check label not already defined. */
if (m_buffer[pos] != ~(Uint32)0)
return error();
m_buffer[pos]= m_instructions_length;
return 0;
}
int NdbInterpretedCode::interpret_exit_nok(Uint32 ErrorCode)
{
return add1((ErrorCode << 16) + Interpreter::EXIT_REFUSE);
}
int NdbInterpretedCode::interpret_exit_ok()
{
return add1(Interpreter::EXIT_OK);
}
int NdbInterpretedCode::backpatch()
{
Uint32 i;
Uint32 jumplist_end = m_buffer_length - m_number_of_labels;
Uint32 jumplist_start = jumplist_end - m_number_of_jumps;
for (i = jumplist_start; i < jumplist_end; i++)
{
Uint32 value = m_buffer[i];
Uint32 instruction_address = value & 0xffff;
Uint32 label_idx = value >> 16;
assert(label_idx < m_number_of_labels);
assert(instruction_address < m_instructions_length);
Uint32 label_address = m_buffer[m_buffer_length - (label_idx + 1)];
/* Check for jump to undefined label. */
if (label_address == ~(Uint32)0)
return error();
/* Check if backwards or forwards jump. */
if (label_address < instruction_address)
m_buffer[instruction_address] |=
(((instruction_address - label_address) << 16) | ((Uint32)1 << 31));
else
m_buffer[instruction_address] |=
((label_address - instruction_address) << 16);
}
return 0;
}
--- 1.65/storage/ndb/include/ndbapi/NdbTransaction.hpp 2007-05-25 15:07:11 +02:00
+++ 1.66/storage/ndb/include/ndbapi/NdbTransaction.hpp 2007-05-25 15:07:11 +02:00
@@ -30,6 +30,7 @@ class NdbIndexOperation;
class NdbApiSignal;
class Ndb;
class NdbBlob;
+class NdbInterpretedCode;
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
// to be documented later
@@ -590,7 +591,10 @@ public:
const unsigned char *mask= 0);
NdbOperation *updateTuple(const NdbRecord *key_rec, const char *key_row,
const NdbRecord *attr_rec, const char *attr_row,
- const unsigned char *mask= 0);
+ const unsigned char *mask= 0,
+ const Uint32 *setPartitionId = 0,
+ const void *getSetValue = 0,
+ const NdbInterpretedCode *interpreted_code = 0);
NdbOperation *writeTuple(const NdbRecord *key_rec, const char *key_row,
const NdbRecord *attr_rec, const char *attr_row,
const unsigned char *mask= 0);
@@ -814,7 +818,10 @@ private:
const char *key_row,
const NdbRecord *attribute_record,
const char *attribute_row,
- const unsigned char *mask);
+ const unsigned char *mask,
+ const Uint32 *setPartitionId = 0,
+ const void *getSetValue = 0,
+ const NdbInterpretedCode *interpreted_code = 0);
void handleExecuteCompletion();
--- 1.52/storage/ndb/include/ndbapi/NdbOperation.hpp 2007-05-25 15:07:11 +02:00
+++ 1.53/storage/ndb/include/ndbapi/NdbOperation.hpp 2007-05-25 15:07:11 +02:00
@@ -32,6 +32,7 @@ class NdbColumnImpl;
class NdbBlob;
class TcKeyReq;
class NdbRecord;
+class NdbInterpretedCode;
/**
* @class NdbOperation
@@ -1183,6 +1184,8 @@ protected:
#include <Bitmask.hpp> in application code.
*/
Uint32 m_read_mask[(NDB_MAX_ATTRIBUTES_IN_TABLE+31)>>5];
+ /* Interpreted program for NdbRecord operations. */
+ const NdbInterpretedCode *m_interpreted_code;
Uint32 m_any_value; // Valid if m_use_any_value!=0
--- 1.154/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2007-05-25 15:07:11 +02:00
+++ 1.155/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2007-05-25 15:07:11 +02:00
@@ -5211,19 +5211,6 @@ void Dbtc::execLQHKEYREF(Signal* signal)
return;
}
- /* Only ref in certain situations */
- {
- const Uint32 opType = regTcPtr->operation;
- if ( (opType == ZDELETE && errCode != ZNOT_FOUND)
- || (opType == ZINSERT && errCode != ZALREADYEXIST)
- || (opType == ZUPDATE && errCode != ZNOT_FOUND)
- || (opType == ZWRITE && errCode != 839 && errCode != 840))
- {
- TCKEY_abort(signal, 49);
- return;
- }
- }
-
/* *************** */
/* TCKEYREF < */
/* *************** */
--- 1.87/storage/ndb/src/ndbapi/NdbTransaction.cpp 2007-05-25 15:07:11 +02:00
+++ 1.88/storage/ndb/src/ndbapi/NdbTransaction.cpp 2007-05-25 15:07:11 +02:00
@@ -2107,7 +2107,10 @@ NdbTransaction::setupRecordOp(NdbOperati
const char *key_row,
const NdbRecord *attribute_record,
const char *attribute_row,
- const unsigned char *mask)
+ const unsigned char *mask,
+ const Uint32 *setPartitionId,
+ const void *getSetValue,
+ const NdbInterpretedCode *interpreted_code)
{
NdbOperation *op;
/*
@@ -2144,6 +2147,14 @@ NdbTransaction::setupRecordOp(NdbOperati
op->m_attribute_row= attribute_row;
attribute_record->copyMask(op->m_read_mask, mask);
+ assert(getSetValue == NULL); // ToDo
+ if (setPartitionId != NULL)
+ {
+ op->theDistributionKey= *setPartitionId;
+ op->theDistrKeyIndicator_= 1;
+ }
+ op->m_interpreted_code= interpreted_code;
+
if (unlikely(attribute_record->flags & NdbRecord::RecHasBlob))
{
if (op->getBlobHandlesNdbRecord(this) == -1)
@@ -2222,7 +2233,10 @@ NdbTransaction::insertTuple(const NdbRec
NdbOperation *
NdbTransaction::updateTuple(const NdbRecord *key_rec, const char *key_row,
const NdbRecord *attr_rec, const char *attr_row,
- const unsigned char *mask)
+ const unsigned char *mask,
+ const Uint32 *setPartitionId,
+ const void *getSetValue,
+ const NdbInterpretedCode *interpreted_code)
{
/* Check that the NdbRecord specifies the full primary key. */
if (!(key_rec->flags & NdbRecord::RecIsKeyRecord))
@@ -2233,7 +2247,8 @@ NdbTransaction::updateTuple(const NdbRec
NdbOperation *op= setupRecordOp(NdbOperation::UpdateRequest,
NdbOperation::LM_Exclusive, key_rec, key_row,
- attr_rec, attr_row, mask);
+ attr_rec, attr_row, mask, setPartitionId,
+ getSetValue, interpreted_code);
if(!op)
return op;
--- 1.24/storage/ndb/src/ndbapi/NdbOperation.cpp 2007-05-25 15:07:11 +02:00
+++ 1.25/storage/ndb/src/ndbapi/NdbOperation.cpp 2007-05-25 15:07:11 +02:00
@@ -164,6 +164,7 @@ NdbOperation::init(const NdbTableImpl* t
theBlobList = NULL;
m_abortOption = -1;
m_no_disk_flag = 1;
+ m_interpreted_code = NULL;
m_use_any_value = 0;
tSignal = theNdb->getSignal();
--- 1.39/storage/ndb/src/ndbapi/NdbOperationExec.cpp 2007-05-25 15:07:11 +02:00
+++ 1.40/storage/ndb/src/ndbapi/NdbOperationExec.cpp 2007-05-25 15:07:11 +02:00
@@ -21,6 +21,7 @@
#include <Ndb.hpp>
#include <NdbRecAttr.hpp>
#include "NdbUtil.hpp"
+#include "NdbInterpretedCode.hpp"
#include "Interpreter.hpp"
#include <AttributeHeader.hpp>
@@ -617,6 +618,8 @@ NdbOperation::prepareSendNdbRecord(Uint3
Uint32 remain;
int res;
Uint32 no_disk_flag;
+ Uint32 interpreted_code_end;
+ Uint32 *update_len_addr;
assert(theStatus==UseNdbRecord);
/* Not yet support for NdbRecord with interpreted operations. */
@@ -706,6 +709,34 @@ NdbOperation::prepareSendNdbRecord(Uint3
no_disk_flag= m_no_disk_flag;
+ /* Handle any interpreted program. */
+ const NdbInterpretedCode *code= m_interpreted_code;
+ if (code)
+ {
+ if (code->m_flags & NdbInterpretedCode::UsesDisk)
+ no_disk_flag = 0;
+ Uint32 sizes[5];
+ sizes[0] = 0; // Initial read.
+ sizes[1] = code->m_instructions_length;
+ sizes[2] = 0; // Update size, filled later
+ update_len_addr= attrInfoPtr + 2;
+ sizes[3] = 0; // Final read size, ToDo
+ sizes[4] = 0; // Subroutine size, ToDo
+ res = insertATTRINFOData_NdbRecord(aTC_ConnectPtr, aTransId,
+ (const char *)sizes,
+ sizeof(sizes),
+ &attrInfoPtr, &remain);
+ if (res)
+ return res;
+ res = insertATTRINFOData_NdbRecord(aTC_ConnectPtr, aTransId,
+ (const char *)(code->m_buffer),
+ code->m_instructions_length*4,
+ &attrInfoPtr, &remain);
+ if (res)
+ return res;
+ interpreted_code_end= theTotalCurrAI_Len;
+ }
+
OperationType tOpType= theOperationType;
if ((tOpType == InsertRequest) || (tOpType == WriteRequest) ||
(tOpType == UpdateRequest))
@@ -854,6 +885,15 @@ NdbOperation::prepareSendNdbRecord(Uint3
return res;
}
+ if (code)
+ {
+ /* ToDo: This only handles update currently. */
+ Uint32 update_word_length= theTotalCurrAI_Len - interpreted_code_end;
+ *update_len_addr = update_word_length;
+for (Uint32 i= 0; i < 5; i++) {fprintf(stderr, "%2d %p 0x%08x %u %p 0x%08x\n", i, tcKeyReq->attrInfo, tcKeyReq->attrInfo[i], update_word_length, update_len_addr-2, update_len_addr[(int)i - 2]);}
+assert(update_word_length > 0);
+ }
+
Uint32 signalLength= hdrSize +
(theTupKeyLen > TcKeyReq::MaxKeyInfo ?
TcKeyReq::MaxKeyInfo : theTupKeyLen) +
@@ -906,7 +946,7 @@ NdbOperation::fillTcKeyReqHdr(TcKeyReq *
TcKeyReq::setSimpleFlag(reqInfo, theSimpleIndicator);
TcKeyReq::setCommitFlag(reqInfo, theCommitIndicator);
TcKeyReq::setStartFlag(reqInfo, theStartIndicator);
- TcKeyReq::setInterpretedFlag(reqInfo, theInterpretIndicator);
+ TcKeyReq::setInterpretedFlag(reqInfo, (m_interpreted_code != NULL));
/* We will setNoDiskFlag() later when we have checked all columns. */
TcKeyReq::setDirtyFlag(reqInfo, theDirtyIndicator);
TcKeyReq::setOperationType(reqInfo, theOperationType);
--- 1.471/sql/ha_ndbcluster.cc 2007-05-25 15:07:11 +02:00
+++ 1.472/sql/ha_ndbcluster.cc 2007-05-25 15:07:11 +02:00
@@ -33,6 +33,7 @@
#include "ha_ndbcluster_cond.h"
#include <../util/Bitmask.hpp>
#include <ndbapi/NdbIndexStat.hpp>
+#include <ndbapi/NdbInterpretedCode.hpp>
#include "ha_ndbcluster_binlog.h"
#include "ha_ndbcluster_tables.h"
@@ -290,51 +291,72 @@ void ha_ndbcluster::reset_state_at_execu
thd_ndb->m_unsent_bytes= 0;
}
-int execute_no_commit_ignore_no_key(ha_ndbcluster *h, NdbTransaction *trans)
+static int
+check_completed_operations(NdbTransaction *trans, const NdbOperation *first)
{
- int res= trans->execute(NdbTransaction::NoCommit,
- NdbOperation::AO_IgnoreError,
- h->m_force_send);
- h->reset_state_at_execute();
- if (res == -1)
- return -1;
-
- const NdbError &err= trans->getNdbError();
- if (err.classification != NdbError::NoError &&
- err.classification != NdbError::ConstraintViolation &&
- err.classification != NdbError::NoDataFound)
- return -1;
-
- return 0;
+ DBUG_ENTER("check_completed_operations");
+ /*
+ Check that all errors are "accepted" errors
+ */
+ while (first)
+ {
+ const NdbError &err= first->getNdbError();
+ if (err.classification != NdbError::NoError &&
+ err.classification != NdbError::ConstraintViolation &&
+ err.classification != NdbError::NoDataFound &&
+ err.code != 9999)
+ DBUG_RETURN(err.code);
+ first= trans->getNextCompletedOperation(first);
+ }
+ DBUG_RETURN(0);
}
inline
int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans,
bool force_release)
{
+ DBUG_ENTER("execute_no_commit");
h->release_completed_operations(trans, force_release);
- if (h->m_ignore_no_key)
- return execute_no_commit_ignore_no_key(h,trans);
- else
+ const NdbOperation *first= trans->getFirstDefinedOperation();
+ if (trans->execute(NdbTransaction::NoCommit,
+ NdbOperation::AO_IgnoreError,
+ h->m_force_send))
{
- int res= trans->execute(NdbTransaction::NoCommit,
- NdbOperation::AbortOnError,
- h->m_force_send);
h->reset_state_at_execute();
- return res;
+ DBUG_RETURN(-1);
}
+ h->reset_state_at_execute();
+ if (!h->m_ignore_no_key || trans->getNdbError().code == 0)
+ DBUG_RETURN(trans->getNdbError().code);
+
+ DBUG_RETURN(check_completed_operations(trans, first));
+}
+
+inline
+int execute_commit(NdbTransaction *trans, int force_send, int ignore_error)
+{
+ DBUG_ENTER("execute_commit");
+ const NdbOperation *first= trans->getFirstDefinedOperation();
+ if (trans->execute(NdbTransaction::Commit,
+ NdbOperation::AO_IgnoreError,
+ force_send))
+ {
+ DBUG_RETURN(-1);
+ }
+ if (!ignore_error || trans->getNdbError().code == 0)
+ DBUG_RETURN(trans->getNdbError().code);
+ DBUG_RETURN(check_completed_operations(trans, first));
}
inline
int execute_commit(ha_ndbcluster *h, NdbTransaction *trans)
{
- int res= trans->execute(NdbTransaction::Commit,
- NdbOperation::AbortOnError,
- h->m_force_send);
+ int res= execute_commit(trans, h->m_force_send, h->m_ignore_no_key);
h->reset_state_at_execute();
return res;
}
+static
inline
int execute_commit(THD *thd, NdbTransaction *trans)
{
@@ -343,21 +365,22 @@ int execute_commit(THD *thd, NdbTransact
end time, and we will not take further action after this (nor can we
easily, as we execute outside the context of the ha_ndbcluster object).
*/
- return trans->execute(NdbTransaction::Commit,
- NdbOperation::AbortOnError,
- thd->variables.ndb_force_send);
+
+/* CHECK THIS */
+ return execute_commit(trans, thd->variables.ndb_force_send, FALSE);
}
inline
int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans,
bool force_release)
{
+ DBUG_ENTER("execute_no_commit_ie");
h->release_completed_operations(trans, force_release);
int res= trans->execute(NdbTransaction::NoCommit,
NdbOperation::AO_IgnoreError,
h->m_force_send);
h->reset_state_at_execute();
- return res;
+ DBUG_RETURN(res);
}
/*
@@ -794,6 +817,14 @@ ha_ndbcluster::get_row_buffer()
table->s->reclength + m_extra_reclength);
}
+/* Return a row buffer, valid until next execute(). */
+char *
+ha_ndbcluster::get_buffer(uint size)
+{
+ Thd_ndb *thd_ndb= get_thd_ndb(table->in_use);
+ return alloc_root(&(thd_ndb->m_batch_mem_root), size);
+}
+
/*
When using extra hidden columns, the mysqld column bitmaps do not
include bits for the extra columns, so we use this method to initialize
@@ -1215,6 +1246,32 @@ int ha_ndbcluster::get_metadata(const ch
if (error)
goto err;
+#ifdef HAVE_NDB_BINLOG
+ if (m_share->m_resolve_column == 0xFFFFFFFF)
+ {
+ m_share->m_resolve_column= 0;
+ for (int j= 0; j < tab->getNoOfColumns(); j++)
+ {
+ const NDBCOL *c= tab->getColumn(j);
+ if (strcmp("X", c->getName()) == 0)
+ {
+ switch (c->getType())
+ {
+ case NDBCOL::Unsigned:
+ m_share->m_resolve_column= j+1;
+ DBUG_PRINT("info", ("resolve column %u", m_share->m_resolve_column));
+ break;
+ default:
+ DBUG_PRINT("info", ("resolve column %u has wrong type",
+ m_share->m_resolve_column));
+ break;
+ }
+ break;
+ }
+ }
+ }
+#endif /* HAVE_NDB_BINLOG */
+
DBUG_PRINT("info", ("fetched table %s", tab->getName()));
m_table= tab;
@@ -3506,6 +3563,93 @@ int ha_ndbcluster::primary_key_cmp(const
Update one record in NDB using primary key
*/
+#ifdef HAVE_NDB_BINLOG
+int
+ha_ndbcluster::update_row_timestamp_resolve(const byte *old_data, byte *new_data, NdbInterpretedCode *code)
+{
+ const int do_before_image_check= 0;
+ uint32 resolve_column= m_share->m_resolve_column;
+
+ const uint error_wrong_before_image= 9998;
+ const uint error_lower_timestamp= 9999;
+ const uint label_0= 0, label_1= 1;
+ const Uint32 RegNewValue= 1, RegCurrentValue= 2;
+ int r;
+ Field *field= table->field[resolve_column-1];
+ DBUG_PRINT("info", ("interpreted update post equal"));
+ /*
+ * read new value from record
+ */
+ uint32 new_value;
+ {
+ const byte* field_ptr= field->ptr + (new_data - table->record[0]);
+ memcpy(&new_value, field_ptr, 4);
+ }
+ DBUG_PRINT("info", ("new_value: %u", new_value));
+ /*
+ * Load registers RegNewValue and RegCurrentValue
+ */
+ r= code->load_const_u32(RegNewValue, new_value);
+ DBUG_ASSERT(r == 0);
+ r= code->read_attr(m_table, resolve_column-1, RegCurrentValue);
+ DBUG_ASSERT(r == 0);
+ /*
+ * if RegNewValue > RegCurrentValue goto label_0
+ * else raise error for this row
+ */
+ r= code->branch_gt(RegCurrentValue, RegNewValue, label_0);
+ DBUG_ASSERT(r == 0);
+ r= code->interpret_exit_nok(error_lower_timestamp);
+ DBUG_ASSERT(r == 0);
+ r= code->def_label(label_0);
+ DBUG_ASSERT(r == 0);
+ if (do_before_image_check)
+ {
+ const Uint32 RegOldValue= RegNewValue; /* reuse register, new value not needed */
+ /*
+ * Now also check that old values are the same
+ */
+
+ /*
+ * read old value from record
+ */
+ uint32 old_value;
+ {
+ const byte* field_ptr= field->ptr + (old_data - table->record[0]);
+ memcpy(&old_value, field_ptr, 4);
+ }
+ DBUG_PRINT("info", ("old_value: %u", old_value));
+ /*
+ * Load register RegOldValue
+ */
+ r= code->load_const_u32(RegOldValue, old_value);
+ DBUG_ASSERT(r == 0);
+ // RegCurrentValue already stored
+ /*
+ * if RegOldValue == RegCurrentValue goto label label_1
+ * else raise error for this row
+ */
+ r= code->branch_eq(RegOldValue, RegCurrentValue, label_1);
+ DBUG_ASSERT(r == 0);
+ if (error_wrong_before_image)
+ r= code->interpret_exit_nok(error_wrong_before_image);
+ else
+ r= code->interpret_exit_nok(0);
+ DBUG_ASSERT(r == 0);
+ /*
+ * Exit ok and let transaction continue without error
+ */
+ r= code->def_label(label_1);
+ DBUG_ASSERT(r == 1);
+ }
+ r= code->interpret_exit_ok();
+ DBUG_ASSERT(r == 0);
+ r= code->backpatch();
+ DBUG_ASSERT(r == 0);
+ return r;
+}
+#endif
+
int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
{
THD *thd= table->in_use;
@@ -3679,7 +3823,27 @@ int ha_ndbcluster::update_row(const byte
key_row= (const char *)(&m_ref);
}
- if (!(op= trans->updateTuple(key_rec, key_row, m_ndb_record, row, mask)))
+ NdbInterpretedCode *code= NULL;
+#ifdef HAVE_NDB_BINLOG
+ if (m_share->m_resolve_column)
+ {
+ DBUG_PRINT("info", ("interpreted update pre equal on column %u",
+ m_share->m_resolve_column));
+ /* Room for 10 instruction words, two labels, and two jumps. */
+ uint code_size= 14;
+ uint struct_size= sizeof(*code);
+ char *mem= get_buffer(struct_size + code_size*sizeof(Uint32));
+ Uint32* buffer= (Uint32 *)(mem + struct_size);
+ code= new(mem) NdbInterpretedCode(buffer, code_size, 2);
+ if (update_row_timestamp_resolve(old_data, new_data, code))
+ {
+ /* ToDo error handling */
+ abort();
+ }
+ }
+#endif
+ if (!(op= trans->updateTuple(key_rec, key_row, m_ndb_record, row, mask,
+ NULL, NULL, code)))
ERR_RETURN(trans->getNdbError());
}
--- 1.182/sql/ha_ndbcluster.h 2007-05-25 15:07:11 +02:00
+++ 1.183/sql/ha_ndbcluster.h 2007-05-25 15:07:11 +02:00
@@ -41,6 +41,7 @@ class NdbIndexScanOperation;
class NdbBlob;
class NdbIndexStat;
class NdbEventOperation;
+class NdbInterpretedCode;
class ha_ndbcluster_cond;
// connectstring to cluster if given by mysqld
@@ -120,6 +121,7 @@ typedef struct st_ndbcluster_share {
#ifdef HAVE_NDB_BINLOG
uint32 connect_count;
uint32 flags;
+ uint32 m_resolve_column;
NdbEventOperation *op;
NdbEventOperation *op_old; // for rename table
char *old_names; // for rename table
@@ -248,6 +250,10 @@ class ha_ndbcluster: public handler
int write_row(byte *buf);
int update_row(const byte *old_data, byte *new_data);
+#ifdef HAVE_NDB_BINLOG
+ int update_row_timestamp_resolve(const byte *old_data, byte *new_data,
+ NdbInterpretedCode *);
+#endif
int delete_row(const byte *buf);
int index_init(uint index, bool sorted);
int index_end();
@@ -506,6 +512,7 @@ private:
uint op_batch_size, bool & batch_full);
char *copy_row_to_buffer(Thd_ndb *thd_ndb, const byte *record);
char *get_row_buffer();
+ char *get_buffer(uint size);
void clear_extended_column_set(uchar *mask);
uchar *copy_column_set(MY_BITMAP *bitmap);
@@ -547,6 +554,7 @@ private:
void reset_state_at_execute();
friend int execute_commit(ha_ndbcluster*, NdbTransaction*);
+ friend int execute_commit(NdbTransaction *, int, int);
friend int execute_no_commit_ignore_no_key(ha_ndbcluster*, NdbTransaction*);
friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool);
friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*, bool);
--- 1.20/storage/ndb/src/ndbapi/Makefile.am 2007-05-25 15:07:11 +02:00
+++ 1.21/storage/ndb/src/ndbapi/Makefile.am 2007-05-25 15:07:11 +02:00
@@ -56,7 +56,8 @@ libndbapi_la_SOURCES = \
NdbBlob.cpp \
NdbIndexStat.cpp \
SignalSender.cpp \
- ObjectMap.cpp
+ ObjectMap.cpp \
+ NdbInterpretedCode.cpp
INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/mgmapi
--- 1.22/storage/ndb/include/Makefile.am 2007-05-25 15:07:11 +02:00
+++ 1.23/storage/ndb/include/Makefile.am 2007-05-25 15:07:11 +02:00
@@ -41,7 +41,8 @@ ndbapi/NdbScanFilter.hpp \
ndbapi/NdbScanOperation.hpp \
ndbapi/NdbIndexScanOperation.hpp \
ndbapi/NdbIndexStat.hpp \
-ndbapi/ndberror.h
+ndbapi/ndberror.h \
+ndbapi/NdbInterpretedCode.hpp
mgmapiinclude_HEADERS = \
mgmapi/mgmapi.h \
--- 1.120/sql/ha_ndbcluster_binlog.cc 2007-05-25 15:07:11 +02:00
+++ 1.121/sql/ha_ndbcluster_binlog.cc 2007-05-25 15:07:11 +02:00
@@ -33,6 +33,8 @@
#define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
#endif
+extern my_bool opt_ndb_log_update_as_write;
+
/*
defines for cluster replication table names
*/
@@ -385,6 +387,7 @@ void ndbcluster_binlog_init_share(NDB_SH
DBUG_ENTER("ndbcluster_binlog_init_share");
share->connect_count= g_ndb_cluster_connection->get_connect_count();
+ share->m_resolve_column= 0xFFFFFFFF;
share->op= 0;
share->table= 0;
@@ -3499,7 +3502,8 @@ ndb_binlog_thread_handle_data_event(Ndb
since we do not have an after image
*/
int n;
- if (table->s->primary_key != MAX_KEY)
+ if (table->s->primary_key != MAX_KEY &&
+ opt_ndb_log_update_as_write)
n= 0; /*
use the primary key only as it save time and space and
it is the only thing needed to log the delete
@@ -3546,7 +3550,8 @@ ndb_binlog_thread_handle_data_event(Ndb
ndb_unpack_record(table, share->ndb_value[0],
&b, table->record[0]);
DBUG_EXECUTE("info", print_records(table, table->record[0]););
- if (table->s->primary_key != MAX_KEY)
+ if (table->s->primary_key != MAX_KEY &&
+ opt_ndb_log_update_as_write)
{
/*
since table has a primary key, we can do a write
| Thread |
|---|
| • bk commit into 5.1 tree (knielsen:1.2509) | knielsen | 25 May |