List:Commits« Previous MessageNext Message »
From:knielsen Date:May 25 2007 3:07pm
Subject:bk commit into 5.1 tree (knielsen:1.2509)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of knielsen. When knielsen does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-05-25 15:07:03+02:00, knielsen@ymer.(none) +14 -0
  NdbRecord interpreted update + conflict resolution.

  sql/ha_ndbcluster.cc@stripped, 2007-05-25 15:06:57+02:00, knielsen@ymer.(none) +194 -30
    NdbRecord interpreted update + conflict resolution.

  sql/ha_ndbcluster.h@stripped, 2007-05-25 15:06:57+02:00, knielsen@ymer.(none) +8 -0
    NdbRecord interpreted update + conflict resolution.

  sql/ha_ndbcluster_binlog.cc@stripped, 2007-05-25 15:06:57+02:00, knielsen@ymer.(none) +7 -2
    NdbRecord interpreted update + conflict resolution.

  sql/mysqld.cc@stripped, 2007-05-25 15:06:57+02:00, knielsen@ymer.(none) +7 -0
    NdbRecord interpreted update + conflict resolution.

  storage/ndb/include/Makefile.am@stripped, 2007-05-25 15:06:57+02:00, knielsen@ymer.(none) +2
-1
    NdbRecord interpreted update + conflict resolution.

  storage/ndb/include/ndbapi/NdbInterpretedCode.hpp@stripped, 2007-05-25 15:06:58+02:00,
knielsen@ymer.(none) +131 -0
    New BitKeeper file ``storage/ndb/include/ndbapi/NdbInterpretedCode.hpp''

  storage/ndb/include/ndbapi/NdbInterpretedCode.hpp@stripped, 2007-05-25 15:06:58+02:00,
knielsen@ymer.(none) +0 -0

  storage/ndb/include/ndbapi/NdbOperation.hpp@stripped, 2007-05-25 15:06:57+02:00,
knielsen@ymer.(none) +3 -0
    NdbRecord interpreted update + conflict resolution.

  storage/ndb/include/ndbapi/NdbTransaction.hpp@stripped, 2007-05-25 15:06:57+02:00,
knielsen@ymer.(none) +9 -2
    NdbRecord interpreted update + conflict resolution.

  storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp@stripped, 2007-05-25 15:06:57+02:00,
knielsen@ymer.(none) +0 -13
    NdbRecord interpreted update + conflict resolution.

  storage/ndb/src/ndbapi/Makefile.am@stripped, 2007-05-25 15:06:58+02:00, knielsen@ymer.(none)
+2 -1
    NdbRecord interpreted update + conflict resolution.

  storage/ndb/src/ndbapi/NdbInterpretedCode.cpp@stripped, 2007-05-25 15:06:58+02:00,
knielsen@ymer.(none) +158 -0
    New BitKeeper file ``storage/ndb/src/ndbapi/NdbInterpretedCode.cpp''

  storage/ndb/src/ndbapi/NdbInterpretedCode.cpp@stripped, 2007-05-25 15:06:58+02:00,
knielsen@ymer.(none) +0 -0

  storage/ndb/src/ndbapi/NdbOperation.cpp@stripped, 2007-05-25 15:06:58+02:00,
knielsen@ymer.(none) +1 -0
    NdbRecord interpreted update + conflict resolution.

  storage/ndb/src/ndbapi/NdbOperationExec.cpp@stripped, 2007-05-25 15:06:58+02:00,
knielsen@ymer.(none) +41 -1
    NdbRecord interpreted update + conflict resolution.

  storage/ndb/src/ndbapi/NdbTransaction.cpp@stripped, 2007-05-25 15:06:58+02:00,
knielsen@ymer.(none) +18 -3
    NdbRecord interpreted update + conflict resolution.

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	knielsen
# Host:	ymer.(none)
# Root:	/usr/local/mysql/mysql-5.1-telco-ndbrecord

--- 1.643/sql/mysqld.cc	2007-05-25 15:07:11 +02:00
+++ 1.644/sql/mysqld.cc	2007-05-25 15:07:11 +02:00
@@ -379,6 +379,7 @@ ulong ndb_extra_logging;
 #ifdef HAVE_NDB_BINLOG
 ulong ndb_report_thresh_binlog_epoch_slip;
 ulong ndb_report_thresh_binlog_mem_usage;
+my_bool opt_ndb_log_update_as_write;
 #endif
 
 extern const char *ndb_distribution_names[];
@@ -4927,6 +4928,7 @@ enum options_mysqld
   OPT_NDB_REPORT_THRESH_BINLOG_EPOCH_SLIP,
   OPT_NDB_REPORT_THRESH_BINLOG_MEM_USAGE,
   OPT_NDB_USE_COPYING_ALTER_TABLE,
+  OPT_NDB_LOG_UPDATE_AS_WRITE,
   OPT_SKIP_SAFEMALLOC,
   OPT_TEMP_POOL, OPT_TX_ISOLATION, OPT_COMPLETION_TYPE,
   OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS,
@@ -5509,6 +5511,11 @@ master-ssl",
    (gptr*) &opt_ndb_shm,
    (gptr*) &opt_ndb_shm,
    0, GET_BOOL, OPT_ARG, OPT_NDB_SHM_DEFAULT, 0, 0, 0, 0, 0},
+  {"ndb-log-update-as-write", OPT_NDB_LOG_UPDATE_AS_WRITE,
+   "",
+   (gptr*) &opt_ndb_log_update_as_write,
+   (gptr*) &opt_ndb_log_update_as_write,
+   0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
   {"ndb-optimized-node-selection", OPT_NDB_OPTIMIZED_NODE_SELECTION,
    "Select nodes for transactions in a more optimal way.",
    (gptr*) &opt_ndb_optimized_node_selection,
--- New file ---
+++ storage/ndb/include/ndbapi/NdbInterpretedCode.hpp	07/05/25 15:06:58
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#ifndef NdbInterpretedCode_H
#define NdbInterpretedCode_H

#include <ndb_global.h>
#include <ndb_types.h>
#include "NdbDictionary.hpp"

class NdbTableImpl;
class NdbColumnImpl;


#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/*
  @brief Stand-alone interpreted programs, for use with NdbRecord

  @details This class is used to prepare an NDB interpreted program for use
  in operations created using NdbRecord.

  The caller supplies the memory buffer to be used for the operations. If
  this is too small, an error will be returned when attempting to add
  instructions. The caller must deallocate the buffer if necessary after
  the InterpretedCode object is destroyed.

  Methods have minimal error checks, for efficiency.

  Note that this is currently considered an internal interface, and subject
  to change without notice.

  ToDo: We should add placeholders to this, so that one can use a single
  InterpretedCode object to create many operations from different embedded
  constants. Once we do this, we could add more error checks, as the cost
  would then not be paid for each operation creation.

*/
class NdbInterpretedCode
{
private:
  friend class NdbOperation;

  static const Uint32 MaxReg= 8;
  Uint32 *m_buffer;
  const Uint32 m_buffer_length;               // In words
  const Uint32 m_number_of_labels;
  /* Number of words used for instructions. */
  Uint32 m_instructions_length;
  /*
    Number of words left in buffer.
    This can be smaller than m_buffer_length-m_instructions_length, as
    the end of the buffer is used to store jump and label addresses for
    back-patching jumps.
  */
  Uint32 m_available_length;
  Uint32 m_number_of_jumps;
  enum Flags {
    /*
      We will set m_got_error if an error occurs, so that we can
      refuse to create an operation from InterpretedCode that the user
      forgot to do error checks on.
    */
    GotError= 0x1,
    /* Set if reading disk column. */
    UsesDisk= 0x2
  };
  Uint32 m_flags;

  int error()
  {
    m_flags|= GotError;
    return -1;
  }

  int add1(Uint32 x1)
  {
    if (unlikely(m_available_length < 1))
      return error();
    Uint32 current = m_instructions_length;
    m_buffer[current    ] = x1;
    m_instructions_length = current + 1;
    m_available_length--;
    return 0;
  }

  int add2(Uint32 x1, Uint32 x2)
  {
    if (unlikely(m_available_length < 2))
      return error();
    Uint32 current = m_instructions_length;
    m_buffer[current    ] = x1;
    m_buffer[current + 1] = x2;
    m_instructions_length = current + 2;
    m_available_length -= 2;
    return 0;
  }

  int add_branch(Uint32 type,
                 Uint32 RegLvalue, Uint32 RegRvalue, Uint32 Label);
  int read_attr_impl(const NdbColumnImpl *c, Uint32 RegDest);

public:
  NdbInterpretedCode(Uint32 *buffer, Uint32 buffer_word_size,
                     Uint32 num_labels);
  int load_const_u32(Uint32 RegDest, Uint32 Constant);
  int read_attr(const NdbDictionary::Table *table, Uint32 attrId,
                Uint32 RegDest);
  int read_attr(const NdbDictionary::Column *column, Uint32 RegDest);
  int branch_gt(Uint32 RegLvalue, Uint32 RegRvalue, Uint32 Label);
  int branch_eq(Uint32 RegLvalue, Uint32 RegRvalue, Uint32 Label);
  int branch_le(Uint32 RegLvalue, Uint32 RegRvalue, Uint32 Label);
  int def_label(int tLabelNo);
  int interpret_exit_nok(Uint32 ErrorCode);
  int interpret_exit_ok();
  int backpatch();
};
#endif

#endif

--- New file ---
+++ storage/ndb/src/ndbapi/NdbInterpretedCode.cpp	07/05/25 15:06:58
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include "NdbInterpretedCode.hpp"
#include "Interpreter.hpp"
#include "NdbDictionaryImpl.hpp"


int NdbInterpretedCode::add_branch(Uint32 type, Uint32 RegLvalue,
                                   Uint32 RegRvalue, Uint32 Label)
{
  /* NOTE: subroutines not supported yet. */
  /* Store the jump location for backpatching. */
  if (m_available_length < 2 ||
      m_instructions_length > 0xffff ||
      Label > 0xffff)
    return error();
  m_number_of_jumps++;
  m_available_length--;
  Uint32 pos= m_buffer_length - (m_number_of_labels + m_number_of_jumps);
  m_buffer[pos]= m_instructions_length | (Label << 16);
  return add1(Interpreter::Branch(type, RegLvalue, RegRvalue));
}

NdbInterpretedCode::NdbInterpretedCode(Uint32 *buffer, Uint32 buffer_word_size,
                                       Uint32 num_labels) :
  m_buffer(buffer),
  m_buffer_length(buffer_word_size),
  m_number_of_labels(num_labels),
  m_instructions_length(0),
  m_available_length(m_buffer_length),
  m_number_of_jumps(0),
  m_flags(0)
{
  Uint32 i;
  /*
    At the end of the buffer we store the address of all labels as they are
    defined, so that we can back-patch forward jumps later.
    If the supplied buffer is too small, we cannot fail in the constructor,
    but all attempts to add instructions will fail due to full buffer.
  */
  for (i = 0; i < num_labels && i < buffer_word_size; i++)
  {
    buffer[buffer_word_size - (i + 1)] = ~(Uint32)0;
    m_available_length--;
  }
}

int NdbInterpretedCode::load_const_u32(Uint32 RegDest, Uint32 Constant)
{
  return add2(Interpreter::LoadConst32(RegDest % MaxReg), Constant);
}

int NdbInterpretedCode::read_attr_impl(const NdbColumnImpl *c, Uint32 RegDest)
{
  if (c->m_storageType == NDB_STORAGETYPE_DISK)
    m_flags|= UsesDisk;
  return add1(Interpreter::Read(c->m_attrId, RegDest % MaxReg));
}

int NdbInterpretedCode::read_attr(const NdbDictionary::Table *table,
                                  Uint32 attrId, Uint32 RegDest)
{
  const NdbTableImpl *t= & NdbTableImpl::getImpl(*table);
  const NdbColumnImpl *c= t->getColumn(attrId);
  if (unlikely(c == NULL))
    return error();
  return read_attr_impl(c, RegDest);
}

int NdbInterpretedCode::read_attr(const NdbDictionary::Column *column,
                                  Uint32 RegDest)
{
  return read_attr_impl(&NdbColumnImpl::getImpl(*column), RegDest);
}
int NdbInterpretedCode::branch_gt(Uint32 RegLvalue, Uint32 RegRvalue,
                                  Uint32 Label)
{
  return add_branch(Interpreter::BRANCH_GT_REG_REG,
                    RegLvalue, RegRvalue, Label);
}

int NdbInterpretedCode::branch_eq(Uint32 RegLvalue, Uint32 RegRvalue,
                                  Uint32 Label)
{
  return add_branch(Interpreter::BRANCH_EQ_REG_REG,
                    RegLvalue, RegRvalue, Label);
}

int NdbInterpretedCode::branch_le(Uint32 RegLvalue, Uint32 RegRvalue,
                                  Uint32 Label)
{
  return add_branch(Interpreter::BRANCH_LE_REG_REG,
                    RegLvalue, RegRvalue, Label);
}

int NdbInterpretedCode::def_label(int tLabelNo)
{
  if (tLabelNo < 0 ||
      (Uint32)tLabelNo >= m_number_of_labels)
    return error();
  Uint32 pos= m_buffer_length - (tLabelNo + 1);
  /* Check label not already defined. */
  if (m_buffer[pos] != ~(Uint32)0)
    return error();
  m_buffer[pos]= m_instructions_length;
  return 0;
}

int NdbInterpretedCode::interpret_exit_nok(Uint32 ErrorCode)
{
  return add1((ErrorCode << 16) + Interpreter::EXIT_REFUSE);
}

int NdbInterpretedCode::interpret_exit_ok()
{
  return add1(Interpreter::EXIT_OK);
}

int NdbInterpretedCode::backpatch()
{
  Uint32 i;
  Uint32 jumplist_end = m_buffer_length - m_number_of_labels;
  Uint32 jumplist_start = jumplist_end - m_number_of_jumps;

  for (i = jumplist_start; i < jumplist_end; i++)
  {
    Uint32 value = m_buffer[i];
    Uint32 instruction_address = value & 0xffff;
    Uint32 label_idx = value >> 16;
    assert(label_idx < m_number_of_labels);
    assert(instruction_address < m_instructions_length);
    Uint32 label_address = m_buffer[m_buffer_length - (label_idx + 1)];
    /* Check for jump to undefined label. */
    if (label_address == ~(Uint32)0)
      return error();
    /* Check if backwards or forwards jump. */
    if (label_address < instruction_address)
      m_buffer[instruction_address] |=
        (((instruction_address - label_address) << 16) | ((Uint32)1 << 31));
    else
      m_buffer[instruction_address] |=
        ((label_address - instruction_address) << 16);
  }
  return 0;
}


--- 1.65/storage/ndb/include/ndbapi/NdbTransaction.hpp	2007-05-25 15:07:11 +02:00
+++ 1.66/storage/ndb/include/ndbapi/NdbTransaction.hpp	2007-05-25 15:07:11 +02:00
@@ -30,6 +30,7 @@ class NdbIndexOperation;
 class NdbApiSignal;
 class Ndb;
 class NdbBlob;
+class NdbInterpretedCode;
 
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
 // to be documented later
@@ -590,7 +591,10 @@ public:
                             const unsigned char *mask= 0);
   NdbOperation *updateTuple(const NdbRecord *key_rec, const char *key_row,
                             const NdbRecord *attr_rec, const char *attr_row,
-                            const unsigned char *mask= 0);
+                            const unsigned char *mask= 0,
+                            const Uint32 *setPartitionId = 0,
+                            const void *getSetValue = 0,
+                            const NdbInterpretedCode *interpreted_code = 0);
   NdbOperation *writeTuple(const NdbRecord *key_rec, const char *key_row,
                            const NdbRecord *attr_rec, const char *attr_row,
                            const unsigned char *mask= 0);
@@ -814,7 +818,10 @@ private:						
                               const char *key_row,
                               const NdbRecord *attribute_record,
                               const char *attribute_row,
-                              const unsigned char *mask);
+                              const unsigned char *mask,
+                              const Uint32 *setPartitionId = 0,
+                              const void *getSetValue = 0,
+                              const NdbInterpretedCode *interpreted_code = 0);
 
   void		handleExecuteCompletion();
   

--- 1.52/storage/ndb/include/ndbapi/NdbOperation.hpp	2007-05-25 15:07:11 +02:00
+++ 1.53/storage/ndb/include/ndbapi/NdbOperation.hpp	2007-05-25 15:07:11 +02:00
@@ -32,6 +32,7 @@ class NdbColumnImpl;
 class NdbBlob;
 class TcKeyReq;
 class NdbRecord;
+class NdbInterpretedCode;
 
 /**
  * @class NdbOperation
@@ -1183,6 +1184,8 @@ protected:
     #include <Bitmask.hpp> in application code.
   */
   Uint32 m_read_mask[(NDB_MAX_ATTRIBUTES_IN_TABLE+31)>>5];
+  /* Interpreted program for NdbRecord operations. */
+  const NdbInterpretedCode *m_interpreted_code;
 
   Uint32 m_any_value;                           // Valid if m_use_any_value!=0
 

--- 1.154/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2007-05-25 15:07:11 +02:00
+++ 1.155/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2007-05-25 15:07:11 +02:00
@@ -5211,19 +5211,6 @@ void Dbtc::execLQHKEYREF(Signal* signal)
 	return;
       }
       
-      /* Only ref in certain situations */
-      {
-	const Uint32 opType = regTcPtr->operation;
-	if (   (opType == ZDELETE && errCode != ZNOT_FOUND)
-	    || (opType == ZINSERT && errCode != ZALREADYEXIST)
-	    || (opType == ZUPDATE && errCode != ZNOT_FOUND)
-	    || (opType == ZWRITE  && errCode != 839 && errCode != 840))
-	{
-	  TCKEY_abort(signal, 49);
-	  return;
-	}
-      }
-
       /* *************** */
       /*    TCKEYREF   < */
       /* *************** */

--- 1.87/storage/ndb/src/ndbapi/NdbTransaction.cpp	2007-05-25 15:07:11 +02:00
+++ 1.88/storage/ndb/src/ndbapi/NdbTransaction.cpp	2007-05-25 15:07:11 +02:00
@@ -2107,7 +2107,10 @@ NdbTransaction::setupRecordOp(NdbOperati
                               const char *key_row,
                               const NdbRecord *attribute_record,
                               const char *attribute_row,
-                              const unsigned char *mask)
+                              const unsigned char *mask,
+                              const Uint32 *setPartitionId,
+                              const void *getSetValue,
+                              const NdbInterpretedCode *interpreted_code)
 {
   NdbOperation *op;
   /*
@@ -2144,6 +2147,14 @@ NdbTransaction::setupRecordOp(NdbOperati
   op->m_attribute_row= attribute_row;
   attribute_record->copyMask(op->m_read_mask, mask);
 
+  assert(getSetValue == NULL);                  // ToDo
+  if (setPartitionId != NULL)
+  {
+    op->theDistributionKey= *setPartitionId;
+    op->theDistrKeyIndicator_= 1;
+  }
+  op->m_interpreted_code= interpreted_code;
+
   if (unlikely(attribute_record->flags & NdbRecord::RecHasBlob))
   {
     if (op->getBlobHandlesNdbRecord(this) == -1)
@@ -2222,7 +2233,10 @@ NdbTransaction::insertTuple(const NdbRec
 NdbOperation *
 NdbTransaction::updateTuple(const NdbRecord *key_rec, const char *key_row,
                             const NdbRecord *attr_rec, const char *attr_row,
-                            const unsigned char *mask)
+                            const unsigned char *mask,
+                            const Uint32 *setPartitionId,
+                            const void *getSetValue,
+                            const NdbInterpretedCode *interpreted_code)
 {
   /* Check that the NdbRecord specifies the full primary key. */
   if (!(key_rec->flags & NdbRecord::RecIsKeyRecord))
@@ -2233,7 +2247,8 @@ NdbTransaction::updateTuple(const NdbRec
 
   NdbOperation *op= setupRecordOp(NdbOperation::UpdateRequest,
                                   NdbOperation::LM_Exclusive, key_rec, key_row,
-                                  attr_rec, attr_row, mask);
+                                  attr_rec, attr_row, mask, setPartitionId,
+                                  getSetValue, interpreted_code);
   if(!op)
     return op;
 

--- 1.24/storage/ndb/src/ndbapi/NdbOperation.cpp	2007-05-25 15:07:11 +02:00
+++ 1.25/storage/ndb/src/ndbapi/NdbOperation.cpp	2007-05-25 15:07:11 +02:00
@@ -164,6 +164,7 @@ NdbOperation::init(const NdbTableImpl* t
   theBlobList = NULL;
   m_abortOption = -1;
   m_no_disk_flag = 1;
+  m_interpreted_code = NULL;
   m_use_any_value = 0;
 
   tSignal = theNdb->getSignal();

--- 1.39/storage/ndb/src/ndbapi/NdbOperationExec.cpp	2007-05-25 15:07:11 +02:00
+++ 1.40/storage/ndb/src/ndbapi/NdbOperationExec.cpp	2007-05-25 15:07:11 +02:00
@@ -21,6 +21,7 @@
 #include <Ndb.hpp>
 #include <NdbRecAttr.hpp>
 #include "NdbUtil.hpp"
+#include "NdbInterpretedCode.hpp"
 
 #include "Interpreter.hpp"
 #include <AttributeHeader.hpp>
@@ -617,6 +618,8 @@ NdbOperation::prepareSendNdbRecord(Uint3
   Uint32 remain;
   int res;
   Uint32 no_disk_flag;
+  Uint32 interpreted_code_end;
+  Uint32 *update_len_addr;
 
   assert(theStatus==UseNdbRecord);
   /* Not yet support for NdbRecord with interpreted operations. */
@@ -706,6 +709,34 @@ NdbOperation::prepareSendNdbRecord(Uint3
 
   no_disk_flag= m_no_disk_flag;
 
+  /* Handle any interpreted program. */
+  const NdbInterpretedCode *code= m_interpreted_code;
+  if (code)
+  {
+    if (code->m_flags & NdbInterpretedCode::UsesDisk)
+      no_disk_flag = 0;
+    Uint32 sizes[5];
+    sizes[0] = 0;               // Initial read.
+    sizes[1] = code->m_instructions_length;
+    sizes[2] = 0;               // Update size, filled later
+    update_len_addr= attrInfoPtr + 2;
+    sizes[3] = 0;               // Final read size, ToDo
+    sizes[4] = 0;               // Subroutine size, ToDo
+    res = insertATTRINFOData_NdbRecord(aTC_ConnectPtr, aTransId,
+                                       (const char *)sizes,
+                                       sizeof(sizes),
+                                       &attrInfoPtr, &remain);
+    if (res)
+      return res;
+    res = insertATTRINFOData_NdbRecord(aTC_ConnectPtr, aTransId,
+                                       (const char *)(code->m_buffer),
+                                       code->m_instructions_length*4,
+                                       &attrInfoPtr, &remain);
+    if (res)
+      return res;
+    interpreted_code_end= theTotalCurrAI_Len;
+  }
+
   OperationType tOpType= theOperationType;
   if ((tOpType == InsertRequest) || (tOpType == WriteRequest) ||
       (tOpType == UpdateRequest))
@@ -854,6 +885,15 @@ NdbOperation::prepareSendNdbRecord(Uint3
       return res;
   }
 
+  if (code)
+  {
+    /* ToDo: This only handles update currently. */
+    Uint32 update_word_length= theTotalCurrAI_Len - interpreted_code_end;
+    *update_len_addr = update_word_length;
+for (Uint32 i= 0; i < 5; i++) {fprintf(stderr, "%2d %p 0x%08x    %u  %p 0x%08x\n", i,
tcKeyReq->attrInfo, tcKeyReq->attrInfo[i], update_word_length, update_len_addr-2,
update_len_addr[(int)i - 2]);}
+assert(update_word_length > 0);
+  }
+
   Uint32 signalLength= hdrSize +
     (theTupKeyLen > TcKeyReq::MaxKeyInfo ?
          TcKeyReq::MaxKeyInfo : theTupKeyLen) +
@@ -906,7 +946,7 @@ NdbOperation::fillTcKeyReqHdr(TcKeyReq *
   TcKeyReq::setSimpleFlag(reqInfo, theSimpleIndicator);
   TcKeyReq::setCommitFlag(reqInfo, theCommitIndicator);
   TcKeyReq::setStartFlag(reqInfo, theStartIndicator);
-  TcKeyReq::setInterpretedFlag(reqInfo, theInterpretIndicator);
+  TcKeyReq::setInterpretedFlag(reqInfo, (m_interpreted_code != NULL));
   /* We will setNoDiskFlag() later when we have checked all columns. */
   TcKeyReq::setDirtyFlag(reqInfo, theDirtyIndicator);
   TcKeyReq::setOperationType(reqInfo, theOperationType);

--- 1.471/sql/ha_ndbcluster.cc	2007-05-25 15:07:11 +02:00
+++ 1.472/sql/ha_ndbcluster.cc	2007-05-25 15:07:11 +02:00
@@ -33,6 +33,7 @@
 #include "ha_ndbcluster_cond.h"
 #include <../util/Bitmask.hpp>
 #include <ndbapi/NdbIndexStat.hpp>
+#include <ndbapi/NdbInterpretedCode.hpp>
 
 #include "ha_ndbcluster_binlog.h"
 #include "ha_ndbcluster_tables.h"
@@ -290,51 +291,72 @@ void ha_ndbcluster::reset_state_at_execu
   thd_ndb->m_unsent_bytes= 0;
 }
 
-int execute_no_commit_ignore_no_key(ha_ndbcluster *h, NdbTransaction *trans)
+static int
+check_completed_operations(NdbTransaction *trans, const NdbOperation *first)
 {
-  int res= trans->execute(NdbTransaction::NoCommit,
-                          NdbOperation::AO_IgnoreError,
-                          h->m_force_send);
-  h->reset_state_at_execute();
-  if (res == -1)
-    return -1;
-
-  const NdbError &err= trans->getNdbError();
-  if (err.classification != NdbError::NoError &&
-      err.classification != NdbError::ConstraintViolation &&
-      err.classification != NdbError::NoDataFound)
-    return -1;
-
-  return 0;
+  DBUG_ENTER("check_completed_operations");
+  /*
+    Check that all errors are "accepted" errors
+  */
+  while (first)
+  {
+    const NdbError &err= first->getNdbError();
+    if (err.classification != NdbError::NoError &&
+        err.classification != NdbError::ConstraintViolation &&
+        err.classification != NdbError::NoDataFound &&
+        err.code != 9999)
+      DBUG_RETURN(err.code);
+    first= trans->getNextCompletedOperation(first);
+  }
+  DBUG_RETURN(0);
 }
 
 inline
 int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans,
 		      bool force_release)
 {
+  DBUG_ENTER("execute_no_commit");
   h->release_completed_operations(trans, force_release);
-  if (h->m_ignore_no_key)
-    return execute_no_commit_ignore_no_key(h,trans);
-  else
+  const NdbOperation *first= trans->getFirstDefinedOperation();
+  if (trans->execute(NdbTransaction::NoCommit,
+                      NdbOperation::AO_IgnoreError,
+                      h->m_force_send))
   {
-    int res= trans->execute(NdbTransaction::NoCommit,
-                            NdbOperation::AbortOnError,
-                            h->m_force_send);
     h->reset_state_at_execute();
-    return res;
+    DBUG_RETURN(-1);
   }
+  h->reset_state_at_execute();
+  if (!h->m_ignore_no_key || trans->getNdbError().code == 0)
+    DBUG_RETURN(trans->getNdbError().code);
+
+  DBUG_RETURN(check_completed_operations(trans, first));
+}
+
+inline
+int execute_commit(NdbTransaction *trans, int force_send, int ignore_error)
+{
+  DBUG_ENTER("execute_commit");
+  const NdbOperation *first= trans->getFirstDefinedOperation();
+  if (trans->execute(NdbTransaction::Commit,
+                     NdbOperation::AO_IgnoreError,
+                     force_send))
+  {
+    DBUG_RETURN(-1);
+  }
+  if (!ignore_error || trans->getNdbError().code == 0)
+    DBUG_RETURN(trans->getNdbError().code);
+  DBUG_RETURN(check_completed_operations(trans, first));
 }
 
 inline
 int execute_commit(ha_ndbcluster *h, NdbTransaction *trans)
 {
-  int res= trans->execute(NdbTransaction::Commit,
-                          NdbOperation::AbortOnError,
-                          h->m_force_send);
+  int res= execute_commit(trans, h->m_force_send, h->m_ignore_no_key);
   h->reset_state_at_execute();
   return res;
 }
 
+static
 inline
 int execute_commit(THD *thd, NdbTransaction *trans)
 {
@@ -343,21 +365,22 @@ int execute_commit(THD *thd, NdbTransact
     end time, and we will not take further action after this (nor can we
     easily, as we execute outside the context of the ha_ndbcluster object).
   */
-  return trans->execute(NdbTransaction::Commit,
-                        NdbOperation::AbortOnError,
-                        thd->variables.ndb_force_send);
+
+/* CHECK THIS */
+  return execute_commit(trans, thd->variables.ndb_force_send, FALSE);
 }
 
 inline
 int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans,
 			 bool force_release)
 {
+  DBUG_ENTER("execute_no_commit_ie");
   h->release_completed_operations(trans, force_release);
   int res= trans->execute(NdbTransaction::NoCommit,
                           NdbOperation::AO_IgnoreError,
                           h->m_force_send);
   h->reset_state_at_execute();
-  return res;
+  DBUG_RETURN(res);
 }
 
 /*
@@ -794,6 +817,14 @@ ha_ndbcluster::get_row_buffer()
                     table->s->reclength + m_extra_reclength);
 }
 
+/* Return a row buffer, valid until next execute(). */
+char *
+ha_ndbcluster::get_buffer(uint size)
+{
+  Thd_ndb *thd_ndb= get_thd_ndb(table->in_use);
+  return alloc_root(&(thd_ndb->m_batch_mem_root), size);
+}
+
 /*
   When using extra hidden columns, the mysqld column bitmaps do not
   include bits for the extra columns, so we use this method to initialize
@@ -1215,6 +1246,32 @@ int ha_ndbcluster::get_metadata(const ch
   if (error)
     goto err;
 
+#ifdef HAVE_NDB_BINLOG
+  if (m_share->m_resolve_column == 0xFFFFFFFF)
+  {
+    m_share->m_resolve_column= 0;
+    for (int j= 0; j < tab->getNoOfColumns(); j++)
+    {
+      const NDBCOL *c= tab->getColumn(j);
+      if (strcmp("X", c->getName()) == 0)
+      {
+        switch (c->getType())
+        {
+        case  NDBCOL::Unsigned:
+          m_share->m_resolve_column= j+1;
+          DBUG_PRINT("info", ("resolve column %u", m_share->m_resolve_column));
+          break;
+        default:
+          DBUG_PRINT("info", ("resolve column %u has wrong type",
+                              m_share->m_resolve_column));
+          break;
+        }
+        break;
+      }
+    }
+  }
+#endif /* HAVE_NDB_BINLOG */
+
   DBUG_PRINT("info", ("fetched table %s", tab->getName()));
   m_table= tab;
 
@@ -3506,6 +3563,93 @@ int ha_ndbcluster::primary_key_cmp(const
   Update one record in NDB using primary key
 */
 
+#ifdef HAVE_NDB_BINLOG
+int
+ha_ndbcluster::update_row_timestamp_resolve(const byte *old_data, byte *new_data,
NdbInterpretedCode *code)
+{
+  const int do_before_image_check= 0;
+  uint32 resolve_column= m_share->m_resolve_column;
+
+  const uint error_wrong_before_image= 9998;
+  const uint error_lower_timestamp= 9999;
+  const uint label_0= 0, label_1= 1;
+  const Uint32 RegNewValue= 1, RegCurrentValue= 2;
+  int r;
+  Field *field= table->field[resolve_column-1];
+  DBUG_PRINT("info", ("interpreted update post equal"));
+  /*
+   * read new value from record
+   */
+  uint32 new_value;
+  {
+    const byte* field_ptr= field->ptr + (new_data - table->record[0]);
+    memcpy(&new_value, field_ptr, 4);
+  }
+  DBUG_PRINT("info", ("new_value: %u", new_value));
+  /*
+   * Load registers RegNewValue and RegCurrentValue
+   */
+  r= code->load_const_u32(RegNewValue, new_value);
+  DBUG_ASSERT(r == 0);
+  r= code->read_attr(m_table, resolve_column-1, RegCurrentValue);
+  DBUG_ASSERT(r == 0);
+  /*
+   * if RegNewValue > RegCurrentValue goto label_0
+   * else raise error for this row
+   */
+  r= code->branch_gt(RegCurrentValue, RegNewValue, label_0);
+  DBUG_ASSERT(r == 0);
+  r= code->interpret_exit_nok(error_lower_timestamp);
+  DBUG_ASSERT(r == 0);
+  r= code->def_label(label_0);
+  DBUG_ASSERT(r == 0);
+  if (do_before_image_check)
+  {
+    const Uint32 RegOldValue= RegNewValue; /* reuse register, new value not needed */
+    /*
+     * Now also check that old values are the same
+     */
+
+    /*
+     * read old value from record
+     */
+    uint32 old_value;
+    {
+      const byte* field_ptr= field->ptr + (old_data - table->record[0]);
+      memcpy(&old_value, field_ptr, 4);
+    }
+    DBUG_PRINT("info", ("old_value: %u", old_value));
+    /*
+     * Load register RegOldValue
+     */
+    r= code->load_const_u32(RegOldValue, old_value);
+    DBUG_ASSERT(r == 0);
+    // RegCurrentValue already stored
+    /*
+     * if RegOldValue == RegCurrentValue goto label label_1
+     * else raise error for this row
+     */
+    r= code->branch_eq(RegOldValue, RegCurrentValue, label_1);
+    DBUG_ASSERT(r == 0);
+    if (error_wrong_before_image)
+      r= code->interpret_exit_nok(error_wrong_before_image);
+    else
+      r= code->interpret_exit_nok(0);
+    DBUG_ASSERT(r == 0);
+    /*
+     * Exit ok and let transaction continue without error
+     */
+    r= code->def_label(label_1);
+    DBUG_ASSERT(r == 1);
+  }
+  r= code->interpret_exit_ok();
+  DBUG_ASSERT(r == 0);
+  r= code->backpatch();
+  DBUG_ASSERT(r == 0);
+  return r;
+}
+#endif
+
 int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
 {
   THD *thd= table->in_use;
@@ -3679,7 +3823,27 @@ int ha_ndbcluster::update_row(const byte
       key_row= (const char *)(&m_ref);
     }
 
-    if (!(op= trans->updateTuple(key_rec, key_row, m_ndb_record, row, mask)))
+    NdbInterpretedCode *code= NULL;
+#ifdef HAVE_NDB_BINLOG
+    if (m_share->m_resolve_column)
+    {
+      DBUG_PRINT("info", ("interpreted update pre equal on column %u",
+                          m_share->m_resolve_column));
+      /* Room for 10 instruction words, two labels, and two jumps. */
+      uint code_size= 14;
+      uint struct_size= sizeof(*code);
+      char *mem= get_buffer(struct_size + code_size*sizeof(Uint32));
+      Uint32* buffer= (Uint32 *)(mem + struct_size);
+      code= new(mem) NdbInterpretedCode(buffer, code_size, 2);
+      if (update_row_timestamp_resolve(old_data, new_data, code))
+      {
+        /* ToDo error handling */
+        abort();
+      }
+    }
+#endif
+    if (!(op= trans->updateTuple(key_rec, key_row, m_ndb_record, row, mask,
+                                 NULL, NULL, code)))
       ERR_RETURN(trans->getNdbError());  
   }
 

--- 1.182/sql/ha_ndbcluster.h	2007-05-25 15:07:11 +02:00
+++ 1.183/sql/ha_ndbcluster.h	2007-05-25 15:07:11 +02:00
@@ -41,6 +41,7 @@ class NdbIndexScanOperation; 
 class NdbBlob;
 class NdbIndexStat;
 class NdbEventOperation;
+class NdbInterpretedCode;
 class ha_ndbcluster_cond;
 
 // connectstring to cluster if given by mysqld
@@ -120,6 +121,7 @@ typedef struct st_ndbcluster_share {
 #ifdef HAVE_NDB_BINLOG
   uint32 connect_count;
   uint32 flags;
+  uint32 m_resolve_column;
   NdbEventOperation *op;
   NdbEventOperation *op_old; // for rename table
   char *old_names; // for rename table
@@ -248,6 +250,10 @@ class ha_ndbcluster: public handler
 
   int write_row(byte *buf);
   int update_row(const byte *old_data, byte *new_data);
+#ifdef HAVE_NDB_BINLOG
+  int update_row_timestamp_resolve(const byte *old_data, byte *new_data,
+                                   NdbInterpretedCode *);
+#endif
   int delete_row(const byte *buf);
   int index_init(uint index, bool sorted);
   int index_end();
@@ -506,6 +512,7 @@ private:
                                  uint op_batch_size, bool & batch_full);
   char *copy_row_to_buffer(Thd_ndb *thd_ndb, const byte *record);
   char *get_row_buffer();
+  char *get_buffer(uint size);
   void clear_extended_column_set(uchar *mask);
   uchar *copy_column_set(MY_BITMAP *bitmap);
 
@@ -547,6 +554,7 @@ private:
 
   void reset_state_at_execute();
   friend int execute_commit(ha_ndbcluster*, NdbTransaction*);
+  friend int execute_commit(NdbTransaction *, int, int);
   friend int execute_no_commit_ignore_no_key(ha_ndbcluster*, NdbTransaction*);
   friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool);
   friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*, bool);

--- 1.20/storage/ndb/src/ndbapi/Makefile.am	2007-05-25 15:07:11 +02:00
+++ 1.21/storage/ndb/src/ndbapi/Makefile.am	2007-05-25 15:07:11 +02:00
@@ -56,7 +56,8 @@ libndbapi_la_SOURCES = \
 	NdbBlob.cpp \
 	NdbIndexStat.cpp \
         SignalSender.cpp \
-        ObjectMap.cpp
+        ObjectMap.cpp \
+	NdbInterpretedCode.cpp
 
 INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/mgmapi
 

--- 1.22/storage/ndb/include/Makefile.am	2007-05-25 15:07:11 +02:00
+++ 1.23/storage/ndb/include/Makefile.am	2007-05-25 15:07:11 +02:00
@@ -41,7 +41,8 @@ ndbapi/NdbScanFilter.hpp \
 ndbapi/NdbScanOperation.hpp \
 ndbapi/NdbIndexScanOperation.hpp \
 ndbapi/NdbIndexStat.hpp \
-ndbapi/ndberror.h
+ndbapi/ndberror.h \
+ndbapi/NdbInterpretedCode.hpp
 
 mgmapiinclude_HEADERS = \
 mgmapi/mgmapi.h \

--- 1.120/sql/ha_ndbcluster_binlog.cc	2007-05-25 15:07:11 +02:00
+++ 1.121/sql/ha_ndbcluster_binlog.cc	2007-05-25 15:07:11 +02:00
@@ -33,6 +33,8 @@
 #define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__,
__LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort();
::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
 #endif
 
+extern my_bool opt_ndb_log_update_as_write;
+
 /*
   defines for cluster replication table names
 */
@@ -385,6 +387,7 @@ void ndbcluster_binlog_init_share(NDB_SH
   DBUG_ENTER("ndbcluster_binlog_init_share");
 
   share->connect_count= g_ndb_cluster_connection->get_connect_count();
+  share->m_resolve_column= 0xFFFFFFFF;
 
   share->op= 0;
   share->table= 0;
@@ -3499,7 +3502,8 @@ ndb_binlog_thread_handle_data_event(Ndb 
         since we do not have an after image
       */
       int n;
-      if (table->s->primary_key != MAX_KEY)
+      if (table->s->primary_key != MAX_KEY &&
+          opt_ndb_log_update_as_write)
         n= 0; /*
                 use the primary key only as it save time and space and
                 it is the only thing needed to log the delete
@@ -3546,7 +3550,8 @@ ndb_binlog_thread_handle_data_event(Ndb 
       ndb_unpack_record(table, share->ndb_value[0],
                         &b, table->record[0]);
       DBUG_EXECUTE("info", print_records(table, table->record[0]););
-      if (table->s->primary_key != MAX_KEY) 
+      if (table->s->primary_key != MAX_KEY &&
+          opt_ndb_log_update_as_write) 
       {
         /*
           since table has a primary key, we can do a write
Thread
bk commit into 5.1 tree (knielsen:1.2509)knielsen25 May