List:Commits« Previous MessageNext Message »
From:knielsen Date:December 13 2006 2:11pm
Subject:bk commit into 5.1 tree (knielsen:1.2345)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of knielsen. When knielsen does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2006-12-13 14:11:28+01:00, knielsen@ymer.(none) +16 -0
  WL#2223: NdbRecord.
  
  First draft of working 'real' implementation of NdbRecord, with the
  proper NDB API interface and efficient implementation in NdbTransaction
  and NdbReceiver.

  storage/ndb/include/kernel/signaldata/AttrInfo.hpp@stripped, 2006-12-13 14:11:25+01:00,
knielsen@ymer.(none) +1 -0
    First draft of working 'real' implementation of NdbRecord, with the
    proper NDB API interface and efficient implementation in NdbTransaction
    and NdbReceiver.

  storage/ndb/include/ndbapi/NdbDictionary.hpp@stripped, 2006-12-13 14:11:25+01:00,
knielsen@ymer.(none) +38 -8
    First draft of working 'real' implementation of NdbRecord, with the
    proper NDB API interface and efficient implementation in NdbTransaction
    and NdbReceiver.

  storage/ndb/include/ndbapi/NdbOperation.hpp@stripped, 2006-12-13 14:11:25+01:00,
knielsen@ymer.(none) +59 -3
    First draft of working 'real' implementation of NdbRecord, with the
    proper NDB API interface and efficient implementation in NdbTransaction
    and NdbReceiver.

  storage/ndb/include/ndbapi/NdbReceiver.hpp@stripped, 2006-12-13 14:11:25+01:00,
knielsen@ymer.(none) +15 -1
    First draft of working 'real' implementation of NdbRecord, with the
    proper NDB API interface and efficient implementation in NdbTransaction
    and NdbReceiver.

  storage/ndb/include/ndbapi/NdbTransaction.hpp@stripped, 2006-12-13 14:11:25+01:00,
knielsen@ymer.(none) +102 -10
    First draft of working 'real' implementation of NdbRecord, with the
    proper NDB API interface and efficient implementation in NdbTransaction
    and NdbReceiver.

  storage/ndb/src/ndbapi/NdbDictionary.cpp@stripped, 2006-12-13 14:11:25+01:00,
knielsen@ymer.(none) +28 -25
    First draft of working 'real' implementation of NdbRecord, with the
    proper NDB API interface and efficient implementation in NdbTransaction
    and NdbReceiver.

  storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp@stripped, 2006-12-13 14:11:25+01:00,
knielsen@ymer.(none) +159 -0
    First draft of working 'real' implementation of NdbRecord, with the
    proper NDB API interface and efficient implementation in NdbTransaction
    and NdbReceiver.

  storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp@stripped, 2006-12-13 14:11:25+01:00,
knielsen@ymer.(none) +44 -74
    First draft of working 'real' implementation of NdbRecord, with the
    proper NDB API interface and efficient implementation in NdbTransaction
    and NdbReceiver.

  storage/ndb/src/ndbapi/NdbIndexOperation.cpp@stripped, 2006-12-13 14:11:25+01:00,
knielsen@ymer.(none) +2 -2
    First draft of working 'real' implementation of NdbRecord, with the
    proper NDB API interface and efficient implementation in NdbTransaction
    and NdbReceiver.

  storage/ndb/src/ndbapi/NdbOperation.cpp@stripped, 2006-12-13 14:11:25+01:00,
knielsen@ymer.(none) +4 -3
    First draft of working 'real' implementation of NdbRecord, with the
    proper NDB API interface and efficient implementation in NdbTransaction
    and NdbReceiver.

  storage/ndb/src/ndbapi/NdbOperationExec.cpp@stripped, 2006-12-13 14:11:25+01:00,
knielsen@ymer.(none) +399 -0
    First draft of working 'real' implementation of NdbRecord, with the
    proper NDB API interface and efficient implementation in NdbTransaction
    and NdbReceiver.

  storage/ndb/src/ndbapi/NdbReceiver.cpp@stripped, 2006-12-13 14:11:25+01:00,
knielsen@ymer.(none) +119 -9
    First draft of working 'real' implementation of NdbRecord, with the
    proper NDB API interface and efficient implementation in NdbTransaction
    and NdbReceiver.

  storage/ndb/src/ndbapi/NdbScanOperation.cpp@stripped, 2006-12-13 14:11:25+01:00,
knielsen@ymer.(none) +2 -2
    First draft of working 'real' implementation of NdbRecord, with the
    proper NDB API interface and efficient implementation in NdbTransaction
    and NdbReceiver.

  storage/ndb/src/ndbapi/NdbTransaction.cpp@stripped, 2006-12-13 14:11:25+01:00,
knielsen@ymer.(none) +451 -189
    First draft of working 'real' implementation of NdbRecord, with the
    proper NDB API interface and efficient implementation in NdbTransaction
    and NdbReceiver.

  storage/ndb/src/ndbapi/ndberror.c@stripped, 2006-12-13 14:11:25+01:00, knielsen@ymer.(none)
+4 -0
    First draft of working 'real' implementation of NdbRecord, with the
    proper NDB API interface and efficient implementation in NdbTransaction
    and NdbReceiver.

  storage/ndb/test/ndbapi/flexBench.cpp@stripped, 2006-12-13 14:11:25+01:00,
knielsen@ymer.(none) +132 -51
    First draft of working 'real' implementation of NdbRecord, with the
    proper NDB API interface and efficient implementation in NdbTransaction
    and NdbReceiver.

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	knielsen
# Host:	ymer.(none)
# Root:	/usr/local/mysql/mysql-5.1-wl2223-new

--- 1.4/storage/ndb/include/kernel/signaldata/AttrInfo.hpp	2006-12-13 14:11:35 +01:00
+++ 1.5/storage/ndb/include/kernel/signaldata/AttrInfo.hpp	2006-12-13 14:11:35 +01:00
@@ -37,6 +37,7 @@ class AttrInfo {
   friend class Dblqh;
   friend class NdbScanOperation;
   friend class Restore;
+  friend class NdbOperation;
 
   friend bool printATTRINFO(FILE *, const Uint32 *, Uint32, Uint16);
   

--- 1.53/storage/ndb/include/ndbapi/NdbTransaction.hpp	2006-12-13 14:11:35 +01:00
+++ 1.54/storage/ndb/include/ndbapi/NdbTransaction.hpp	2006-12-13 14:11:35 +01:00
@@ -572,15 +572,72 @@ public:
 #endif
 
 
-  NdbOperation *readTuple(const char *tableName, const NdbRecord *rec, char *row, const
Uint32 *mask= 0);
-  NdbOperation *insertTuple(const char *tableName, const NdbRecord *rec, const char *row,
const Uint32 *mask= 0);
-  NdbOperation *updateTuple(const char *tableName, const NdbRecord *rec, const char *row,
const Uint32 *mask= 0);
-  NdbOperation *deleteTuple(const char *tableName, const NdbRecord *rec, const char
*row);
-  NdbOperation *dirtyWriteTuple(const char *tableName, const NdbRecord *rec, const char
*row, const Uint32 *mask= 0);
-  NdbOperation *writeTuple(const char *tableName, const NdbRecord *rec, const char *row,
const Uint32 *mask= 0);
-  NdbOperation *simpleReadTuple(const char *tableName, const NdbRecord *rec, char *row,
const Uint32 *mask= 0);
-  NdbOperation *dirtyReadTuple(const char *tableName, const NdbRecord *rec, char *row,
const Uint32 *mask= 0);
-  NdbOperation *dirtyUpdateTuple(const char *tableName, const NdbRecord *rec, const char
*row, const Uint32 *mask= 0);
+  NdbOperation *readTuple(const NdbDictionary::Table *table,
+                          const NdbRecord *key_rec, const char *key_row,
+                          const NdbRecord *result_rec, char *result_row,
+                          const Uint32 *result_mask= 0);
+  NdbOperation *readTuple(const char *tableName,
+                          const NdbRecord *key_rec, const char *key_row,
+                          const NdbRecord *result_rec, char *result_row,
+                          const Uint32 *result_mask= 0);
+  NdbOperation *insertTuple(const NdbDictionary::Table *table,
+                            const NdbRecord *rec, const char *row,
+                            const Uint32 *mask= 0);
+  NdbOperation *insertTuple(const char *tableName,
+                            const NdbRecord *rec, const char *row,
+                            const Uint32 *mask= 0);
+  NdbOperation *updateTuple(const NdbDictionary::Table *table,
+                            const NdbRecord *pk_rec, const char *pk_row,
+                            const NdbRecord *attr_rec, const char *attr_row,
+                            const Uint32 *mask= 0);
+  NdbOperation *updateTuple(const char *tableName,
+                            const NdbRecord *pk_rec, const char *pk_row,
+                            const NdbRecord *attr_rec, const char *attr_row,
+                            const Uint32 *mask= 0);
+  NdbOperation *deleteTuple(const NdbDictionary::Table *table,
+                            const NdbRecord *rec, const char *row= 0);
+  NdbOperation *deleteTuple(const char *tableName,
+                            const NdbRecord *rec, const char *row= 0);
+  NdbOperation *dirtyWriteTuple(const NdbDictionary::Table *table,
+                                const NdbRecord *pk_rec, const char *pk_row,
+                                const NdbRecord *attr_rec, const char *attr_row,
+                                const Uint32 *mask= 0);
+  NdbOperation *dirtyWriteTuple(const char *tableName,
+                                const NdbRecord *pk_rec, const char *pk_row,
+                                const NdbRecord *attr_rec, const char *attr_row,
+                                const Uint32 *mask= 0);
+  NdbOperation *writeTuple(const NdbDictionary::Table *table,
+                           const NdbRecord *pk_rec, const char *pk_row,
+                           const NdbRecord *attr_rec, const char *attr_row,
+                           const Uint32 *mask= 0);
+  NdbOperation *writeTuple(const char *tableName,
+                           const NdbRecord *pk_rec, const char *pk_row,
+                           const NdbRecord *attr_rec, const char *attr_row,
+                           const Uint32 *mask= 0);
+  NdbOperation *simpleReadTuple(const NdbDictionary::Table *table,
+                                const NdbRecord *key_rec, char *key_row,
+                                const NdbRecord *result_rec, char *result_row,
+                                const Uint32 *result_mask= 0);
+  NdbOperation *simpleReadTuple(const char *tableName,
+                                const NdbRecord *key_rec, char *key_row,
+                                const NdbRecord *result_rec, char *result_row,
+                                const Uint32 *result_mask= 0);
+  NdbOperation *dirtyReadTuple(const NdbDictionary::Table *table,
+                               const NdbRecord *key_rec, char *key_row,
+                               const NdbRecord *result_rec, char *result_row,
+                               const Uint32 *result_mask= 0);
+  NdbOperation *dirtyReadTuple(const char *tableName,
+                               const NdbRecord *key_rec, char *key_row,
+                               const NdbRecord *result_rec, char *result_row,
+                               const Uint32 *result_mask= 0);
+  NdbOperation *dirtyUpdateTuple(const NdbDictionary::Table *table,
+                                 const NdbRecord *pk_rec, const char *pk_row,
+                                 const NdbRecord *attr_rec, const char *attr_row,
+                                 const Uint32 *mask= 0);
+  NdbOperation *dirtyUpdateTuple(const char *tableName,
+                                 const NdbRecord *pk_rec, const char *pk_row,
+                                 const NdbRecord *attr_rec, const char *attr_row,
+                                 const Uint32 *mask= 0);
 
 private:						
   /**
@@ -690,7 +747,42 @@ private:						
 
   int		checkMagicNumber();		       // Verify correct object
   NdbOperation* getNdbOperation(const class NdbTableImpl* aTable,
-                                NdbOperation* aNextOp = 0);
+                                NdbOperation* aNextOp = 0,
+                                bool useRec= false);
+  NdbOperation *readTuple(const NdbTableImpl *table,
+                          const NdbRecord *key_rec, const char *key_row,
+                          const NdbRecord *result_rec, char *result_row,
+                          const Uint32 *result_mask);
+  NdbOperation *insertTuple(const NdbTableImpl *table,
+                            const NdbRecord *rec, const char *row,
+                            const Uint32 *mask);
+  NdbOperation *updateTuple(const NdbTableImpl *table,
+                            const NdbRecord *pk_rec, const char *pk_row,
+                            const NdbRecord *attr_rec, const char *attr_row,
+                            const Uint32 *mask);
+  NdbOperation *deleteTuple(const NdbTableImpl *table,
+                            const NdbRecord *rec, const char *row);
+  NdbOperation *dirtyWriteTuple(const NdbTableImpl *table,
+                                const NdbRecord *pk_rec, const char *pk_row,
+                                const NdbRecord *attr_rec, const char *attr_row,
+                                const Uint32 *mask);
+  NdbOperation *writeTuple(const NdbTableImpl *table,
+                           const NdbRecord *pk_rec, const char *pk_row,
+                           const NdbRecord *attr_rec, const char *attr_row,
+                           const Uint32 *mask);
+  NdbOperation *simpleReadTuple(const NdbTableImpl *table,
+                                const NdbRecord *key_rec, char *key_row,
+                                const NdbRecord *result_rec, char *result_row,
+                                const Uint32 *result_mask);
+  NdbOperation *dirtyReadTuple(const NdbTableImpl *table,
+                               const NdbRecord *key_rec, char *key_row,
+                               const NdbRecord *result_rec, char *result_row,
+                               const Uint32 *result_mask);
+  NdbOperation *dirtyUpdateTuple(const NdbTableImpl *table,
+                                 const NdbRecord *pk_rec, const char *pk_row,
+                                 const NdbRecord *attr_rec, const char *attr_row,
+                                 const Uint32 *mask);
+
   NdbIndexScanOperation* getNdbScanOperation(const class NdbTableImpl* aTable);
   NdbIndexOperation* getNdbIndexOperation(const class NdbIndexImpl* anIndex, 
                                           const class NdbTableImpl* aTable,

--- 1.86/storage/ndb/include/ndbapi/NdbDictionary.hpp	2006-12-13 14:11:35 +01:00
+++ 1.87/storage/ndb/include/ndbapi/NdbDictionary.hpp	2006-12-13 14:11:35 +01:00
@@ -1414,6 +1414,32 @@ public:
     Event(NdbEventImpl&);
   };
 
+  struct RecordSpecification {
+    enum RecTypes {
+      AttrOffsetNotNULL= 1,
+      AttrOffsetNULL= 2,
+    };
+
+    enum RecTypes type;
+    /*
+      Column is given by NdbDictionary::Column pointer, if not NULL, else by
+      name, if not NULL, else by id.
+    */
+    // ToDo: Hm, a bit clumsy interface that one needs to init colPtr and
+    // colName to NULL to use colNumber...
+    const Column *colPtr;
+    const char *colName;
+    Uint32 colNumber;
+
+    /* Offset of data from start of a row. */
+    Uint32 dataOffset;
+
+    /* Offset from start of row of byte containing NULL bit. */
+    Uint32 bmOffset;
+    /* NULL bit, 0-7. */
+    Uint32 bmBit;
+  };
+
   struct AutoGrowSpecification {
     Uint32 min_free;
     Uint64 max_size;
@@ -1914,18 +1940,22 @@ public:
     int removeTableGlobal(const Table &ndbtab, int invalidate) const;
 #endif
 
-    NdbRecord *getRecord(const Table *table);
-    NdbRecord *getRecord(const char *tableName);
+    NdbRecord *createRecord(const char *tableName,
+                            const RecordSpecification *recSpec,
+                            Uint32 length,
+                            Uint32 elemSize);
+    NdbRecord *createRecord(const Table *table,
+                            const RecordSpecification *recSpec,
+                            Uint32 length,
+                            Uint32 elemSize);
     void releaseRecord(NdbRecord *rec);
 
-    void recAddAttrNotNULL(const char *tableName, NdbRecord *rec, Uint32 attrId, Uint32
offset);
-    void recAddAttrNotNULL(const char *tableName, NdbRecord *rec, const char *colName,
Uint32 offset);
-
-    Uint32 *getRecAttrSet(const char *tableName, NdbRecord *rec);
+    Uint32 *getRecAttrSet(const NdbRecord *rec);
     void releaseRecAttrSet(Uint32 *attrSet);
 
-    void recAttrSetEnable(Uint32 *attrSet, const char *tableName, const NdbRecord *rec,
Uint32 attrId);
-    void recAttrSetEnable(Uint32 *attrSet, const char *tableName, const NdbRecord *rec,
const char *colName);
+    void recAttrSetEnable(Uint32 *attrSet, Uint32 attrId);
+    void recAttrSetEnable(Uint32 *attrSet, const char *tableName,
+                          const char *colName);
 
   };
 };

--- 1.39/storage/ndb/include/ndbapi/NdbOperation.hpp	2006-12-13 14:11:35 +01:00
+++ 1.40/storage/ndb/include/ndbapi/NdbOperation.hpp	2006-12-13 14:11:35 +01:00
@@ -31,6 +31,8 @@ class NdbOperation;
 class NdbTransaction;
 class NdbColumnImpl;
 class NdbBlob;
+class TcKeyReq;
+class NdbRecord;
 
 /**
  * @class NdbOperation
@@ -802,7 +804,7 @@ protected:
 //--------------------------------------------------------------
 // Initialise after allocating operation to a transaction		      
 //--------------------------------------------------------------
-  int init(const class NdbTableImpl*, NdbTransaction* aCon);
+  int init(const class NdbTableImpl*, NdbTransaction* aCon, bool useRec);
   void initInterpreter();
 
   NdbOperation(Ndb* aNdb, Type aType = PrimaryKeyAccess);	
@@ -831,7 +833,13 @@ protected:
     WaitResponse,
     WaitCommitResponse,
     Finished,
-    ReceiveFinished
+    ReceiveFinished,
+    /*
+      NdbRecord: For operations using NdbRecord. Built in a single call (like
+      NdbTransaction::readTuple(), and no state transitions possible before
+      execute().
+    */
+    UseNdbRecord
   };
 
   OperationStatus   Status();	         	// Read the status information
@@ -861,7 +869,37 @@ protected:
   virtual void   setLastFlag(NdbApiSignal* signal, Uint32 lastFlag);
     
   int	 prepareSendInterpreted();            // Help routine to prepare*
-   
+
+  int    prepareSendNdbRecord(Uint32 aTC_ConnectPtr, Uint64 aTransId);
+
+  /* Helper routines for prepareSendNdbRecord(). */
+  Uint32 fillTcKeyReqHdr(TcKeyReq *tcKeyReq,
+                         Uint32 connectPtr,
+                         Uint64 transId,
+                         const NdbRecord *rec);
+  int    allocKeyInfo(Uint32 connectPtr, Uint64 transId,
+                      Uint32 **dstPtr, Uint32 *remain);
+  int    allocAttrInfo(Uint32 connectPtr, Uint64 transId,
+                       Uint32 **dstPtr, Uint32 *remain);
+  int    insertKEYINFO_NdbRecord(Uint32 connectPtr,
+                                 Uint64 transId,
+                                 const char *value,
+                                 Uint32 size,
+                                 Uint32 **dstPtr,
+                                 Uint32 *remain);
+  int    insertATTRINFOHdr_NdbRecord(Uint32 connectPtr,
+                                     Uint64 transId,
+                                     Uint32 attrId,
+                                     Uint32 attrLen,
+                                     Uint32 **dstPtr,
+                                     Uint32 *remain);
+  int    insertATTRINFOData_NdbRecord(Uint32 connectPtr,
+                                      Uint64 transId,
+                                      const char *value,
+                                      Uint32 size,
+                                      Uint32 **dstPtr,
+                                      Uint32 *remain);
+
   int	 receiveTCKEYREF(NdbApiSignal*); 
 
   int	 checkMagicNumber(bool b = true); // Verify correct object
@@ -996,6 +1034,24 @@ protected:
   Uint16 m_tcReqGSN;
   Uint16 m_keyInfoGSN;
   Uint16 m_attrInfoGSN;
+
+  /*
+    Members for NdbRecord operations.
+    ToDo: We might overlap these (with anonymous unions) with members used
+    for NdbRecAttr access (theKEYINFOptr etc), to save a bit of memory. Not
+    sure if it is worth the loss of code clarity though.
+  */
+
+  /* NdbRecord describing the placement of Primary key in row. */
+  const NdbRecord *thePKRec;
+  /* Row containing the primary key to operate on. */
+  const char *thePKRow;
+  /* NdbRecord describing attributes to update. */
+  const NdbRecord *theUpdRec;
+  /* Row containing the update values. */
+  const char *theUpdRow;
+  /* Optional bitmask to disable selected columns. */
+  const Uint32 *theReadMask;
 
   // Blobs in this operation
   NdbBlob* theBlobList;

--- 1.16/storage/ndb/include/ndbapi/NdbReceiver.hpp	2006-12-13 14:11:35 +01:00
+++ 1.17/storage/ndb/include/ndbapi/NdbReceiver.hpp	2006-12-13 14:11:35 +01:00
@@ -22,6 +22,7 @@
 
 class Ndb;
 class NdbTransaction;
+class NdbRecord;
 
 class NdbReceiver
 {
@@ -39,7 +40,7 @@ public:
   };
   
   NdbReceiver(Ndb *aNdb);
-  void init(ReceiverType type, void* owner);
+  void init(ReceiverType type, bool useRec, void* owner);
   void release();
   ~NdbReceiver();
   
@@ -76,6 +77,7 @@ private:
    * At setup
    */
   class NdbRecAttr * getValue(const class NdbColumnImpl*, char * user_dst_ptr);
+  void getValues(const NdbRecord*, char*);
   void do_get_value(NdbReceiver*, Uint32 rows, Uint32 key_size, Uint32 range);
   void prepareSend();
   void calculate_batch_size(Uint32, Uint32, Uint32&, Uint32&, Uint32&);
@@ -84,9 +86,21 @@ private:
   int execTRANSID_AI(const Uint32* ptr, Uint32 len); 
   int execTCOPCONF(Uint32 len);
   int execSCANOPCONF(Uint32 tcPtrI, Uint32 len, Uint32 rows);
+
+  /*
+    We need to keep different state for old NdbRecAttr based operation and for
+    new NdbRecord style operation.
+    ToDo: Could save a little memory by overlapping old and new style state
+    using anonymous unions.
+  */
+  bool usingNdbRecord;
+  const NdbRecord *theNdbRecord;
+  char *theRow;
   class NdbRecAttr* theFirstRecAttr;
   class NdbRecAttr* theCurrentRecAttr;
   class NdbRecAttr** m_rows;
+  /* Index into theNdbRecord of next col to receive in TRANSID_AI. */
+  Uint32 m_RecPos;
   
   Uint32 m_list_index; // When using multiple
   Uint32 m_current_row;

--- 1.69/storage/ndb/src/ndbapi/NdbTransaction.cpp	2006-12-13 14:11:35 +01:00
+++ 1.70/storage/ndb/src/ndbapi/NdbTransaction.cpp	2006-12-13 14:11:35 +01:00
@@ -669,7 +669,11 @@ NdbTransaction::executeAsynchPrepare( Ex
     int tReturnCode;
     NdbOperation* tNextOp = tOp->next();
 
-    tReturnCode = tOp->prepareSend(theTCConPtr, theTransactionId);
+    if (tOp->Status() == NdbOperation::UseNdbRecord)
+      tReturnCode= tOp->prepareSendNdbRecord(theTCConPtr, theTransactionId);
+    else
+      tReturnCode= tOp->prepareSend(theTCConPtr, theTransactionId);
+
     if (tReturnCode == -1) {
       theSendStatus = sendABORTfail;
       DBUG_VOID_RETURN;
@@ -1114,7 +1118,9 @@ Remark:         Get an operation from Nd
                 object, synchronous.
 *****************************************************************************/
 NdbOperation*
-NdbTransaction::getNdbOperation(const NdbTableImpl * tab, NdbOperation* aNextOp)
+NdbTransaction::getNdbOperation(const NdbTableImpl * tab,
+                                NdbOperation* aNextOp,
+                                bool useRec)
 { 
   NdbOperation* tOp;
 
@@ -1148,7 +1154,7 @@ NdbTransaction::getNdbOperation(const Nd
     }
     tOp->next(aNextOp);
   }
-  if (tOp->init(tab, this) != -1) {
+  if (tOp->init(tab, this, useRec) != -1) {
     return tOp;
   } else {
     theNdb->releaseOperation(tOp);
@@ -1168,6 +1174,228 @@ NdbOperation* NdbTransaction::getNdbOper
     return NULL;
 }//NdbTransaction::getNdbOperation()
 
+NdbOperation *
+NdbTransaction::readTuple(const NdbDictionary::Table *table,
+                          const NdbRecord *key_rec, const char *key_row,
+                          const NdbRecord *result_rec, char *result_row,
+                          const Uint32 *result_mask)
+{
+  return readTuple(&NdbTableImpl::getImpl(*table),
+                   key_rec, key_row, result_rec, result_row, result_mask);
+}
+
+NdbOperation *
+NdbTransaction::readTuple(const char *tableName,
+                          const NdbRecord *key_rec, const char *key_row,
+                          const NdbRecord *result_rec, char *result_row,
+                          const Uint32 *result_mask)
+{
+  const NdbTableImpl* table= theNdb->theDictionary->getTable(tableName);
+  if (table){
+    return readTuple(table, key_rec, key_row, result_rec, result_row,
+                     result_mask);
+  } else {
+    setErrorCode(theNdb->theDictionary->getNdbError().code);
+    return NULL;
+  }
+}
+
+NdbOperation *
+NdbTransaction::insertTuple(const NdbDictionary::Table *table,
+                            const NdbRecord *rec, const char *row,
+                            const Uint32 *mask)
+{
+  return insertTuple(&NdbTableImpl::getImpl(*table),
+                     rec, row, mask);
+}
+
+NdbOperation *
+NdbTransaction::insertTuple(const char *tableName,
+                            const NdbRecord *rec, const char *row,
+                            const Uint32 *mask)
+{
+  const NdbTableImpl* table= theNdb->theDictionary->getTable(tableName);
+  if (table){
+    return insertTuple(table, rec, row, mask);
+  } else {
+    setErrorCode(theNdb->theDictionary->getNdbError().code);
+    return NULL;
+  }
+}
+
+NdbOperation *
+NdbTransaction::updateTuple(const NdbDictionary::Table *table,
+                            const NdbRecord *pk_rec, const char *pk_row,
+                            const NdbRecord *attr_rec, const char *attr_row,
+                            const Uint32 *mask)
+{
+  return updateTuple(&NdbTableImpl::getImpl(*table),
+                     pk_rec, pk_row, attr_rec, attr_row, mask);
+}
+
+NdbOperation *
+NdbTransaction::updateTuple(const char *tableName,
+                            const NdbRecord *pk_rec, const char *pk_row,
+                            const NdbRecord *attr_rec, const char *attr_row,
+                            const Uint32 *mask)
+{
+  const NdbTableImpl* table= theNdb->theDictionary->getTable(tableName);
+  if (table){
+    return updateTuple(table, pk_rec, pk_row, attr_rec, attr_row, mask);
+  } else {
+    setErrorCode(theNdb->theDictionary->getNdbError().code);
+    return NULL;
+  }
+}
+
+NdbOperation *
+NdbTransaction::deleteTuple(const NdbDictionary::Table *table,
+                            const NdbRecord *rec, const char *row)
+{
+  return deleteTuple(&NdbTableImpl::getImpl(*table), rec, row);
+}
+
+NdbOperation *
+NdbTransaction::deleteTuple(const char *tableName,
+                            const NdbRecord *rec, const char *row)
+{
+  const NdbTableImpl* table= theNdb->theDictionary->getTable(tableName);
+  if (table){
+    return deleteTuple(table, rec, row);
+  } else {
+    setErrorCode(theNdb->theDictionary->getNdbError().code);
+    return NULL;
+  }
+}
+
+NdbOperation *
+NdbTransaction::dirtyWriteTuple(const NdbDictionary::Table *table,
+                                const NdbRecord *pk_rec, const char *pk_row,
+                                const NdbRecord *attr_rec, const char *attr_row,
+                                const Uint32 *mask)
+{
+  return dirtyWriteTuple(&NdbTableImpl::getImpl(*table),
+                         pk_rec, pk_row, attr_rec, attr_row, mask);
+}
+
+NdbOperation *
+NdbTransaction::dirtyWriteTuple(const char *tableName,
+                                const NdbRecord *pk_rec, const char *pk_row,
+                                const NdbRecord *attr_rec, const char *attr_row,
+                                const Uint32 *mask)
+{
+  const NdbTableImpl* table= theNdb->theDictionary->getTable(tableName);
+  if (table){
+    return dirtyWriteTuple(table, pk_rec, pk_row, attr_rec, attr_row, mask);
+  } else {
+    setErrorCode(theNdb->theDictionary->getNdbError().code);
+    return NULL;
+  }
+}
+
+NdbOperation *
+NdbTransaction::writeTuple(const NdbDictionary::Table *table,
+                           const NdbRecord *pk_rec, const char *pk_row,
+                           const NdbRecord *attr_rec, const char *attr_row,
+                           const Uint32 *mask)
+{
+  return writeTuple(&NdbTableImpl::getImpl(*table),
+                   pk_rec, pk_row, attr_rec, attr_row, mask);
+}
+
+NdbOperation *
+NdbTransaction::writeTuple(const char *tableName,
+                           const NdbRecord *pk_rec, const char *pk_row,
+                           const NdbRecord *attr_rec, const char *attr_row,
+                           const Uint32 *mask)
+{
+  const NdbTableImpl* table= theNdb->theDictionary->getTable(tableName);
+  if (table){
+    return writeTuple(table, pk_rec, pk_row, attr_rec, attr_row, mask);
+  } else {
+    setErrorCode(theNdb->theDictionary->getNdbError().code);
+    return NULL;
+  }
+}
+
+NdbOperation *
+NdbTransaction::simpleReadTuple(const NdbDictionary::Table *table,
+                                const NdbRecord *key_rec, char *key_row,
+                                const NdbRecord *result_rec, char *result_row,
+                                const Uint32 *result_mask)
+{
+  return simpleReadTuple(&NdbTableImpl::getImpl(*table),
+                         key_rec, key_row, result_rec, result_row, result_mask);
+}
+
+NdbOperation *
+NdbTransaction::simpleReadTuple(const char *tableName,
+                                const NdbRecord *key_rec, char *key_row,
+                                const NdbRecord *result_rec, char *result_row,
+                                const Uint32 *result_mask)
+{
+  const NdbTableImpl* table= theNdb->theDictionary->getTable(tableName);
+  if (table){
+    return simpleReadTuple(table, key_rec, key_row, result_rec, result_row,
+                           result_mask);
+  } else {
+    setErrorCode(theNdb->theDictionary->getNdbError().code);
+    return NULL;
+  }
+}
+
+NdbOperation *
+NdbTransaction::dirtyReadTuple(const NdbDictionary::Table *table,
+                               const NdbRecord *key_rec, char *key_row,
+                               const NdbRecord *result_rec, char *result_row,
+                               const Uint32 *result_mask)
+{
+  return dirtyReadTuple(&NdbTableImpl::getImpl(*table),
+                        key_rec, key_row, result_rec, result_row, result_mask);
+}
+
+NdbOperation *
+NdbTransaction::dirtyReadTuple(const char *tableName,
+                               const NdbRecord *key_rec, char *key_row,
+                               const NdbRecord *result_rec, char *result_row,
+                               const Uint32 *result_mask)
+{
+  const NdbTableImpl* table= theNdb->theDictionary->getTable(tableName);
+  if (table){
+    return dirtyReadTuple(table, key_rec, key_row, result_rec, result_row,
+                          result_mask);
+  } else {
+    setErrorCode(theNdb->theDictionary->getNdbError().code);
+    return NULL;
+  }
+}
+
+NdbOperation *
+NdbTransaction::dirtyUpdateTuple(const NdbDictionary::Table *table,
+                                 const NdbRecord *pk_rec, const char *pk_row,
+                                 const NdbRecord *attr_rec, const char *attr_row,
+                                 const Uint32 *mask)
+{
+  return dirtyUpdateTuple(&NdbTableImpl::getImpl(*table),
+                          pk_rec, pk_row, attr_rec, attr_row, mask);
+}
+
+NdbOperation *
+NdbTransaction::dirtyUpdateTuple(const char *tableName,
+                                 const NdbRecord *pk_rec, const char *pk_row,
+                                 const NdbRecord *attr_rec, const char *attr_row,
+                                 const Uint32 *mask)
+{
+  const NdbTableImpl* table= theNdb->theDictionary->getTable(tableName);
+  if (table){
+    return dirtyUpdateTuple(table, pk_rec, pk_row, attr_rec, attr_row, mask);
+  } else {
+    setErrorCode(theNdb->theDictionary->getNdbError().code);
+    return NULL;
+  }
+}
+
+
 // NdbScanOperation
 /*****************************************************************************
 NdbScanOperation* getNdbScanOperation(const char* aTableName);
@@ -2159,254 +2387,288 @@ NdbTransaction::getNextCompletedOperatio
 }
 
 NdbOperation *
-NdbTransaction::readTuple(const char *tableName, const NdbRecord *rec, char *row, const
Uint32 *mask)
+NdbTransaction::readTuple(const NdbTableImpl *table,
+                          const NdbRecord *key_rec, const char *key_row,
+                          const NdbRecord *result_rec, char *result_row,
+                          const Uint32 *result_mask)
 {
-  NdbOperation *op= getNdbOperation(tableName);
+  /* Check that the NdbRecord specifies the full primary key. */
+  if (!(key_rec->flags & NdbRecord::RecIsPKRecord))
+  {
+    setOperationErrorCodeAbort(4279);
+    return NULL;
+  }
+
+  NdbOperation *op= getNdbOperation(table, NULL, true);
   if(!op)
     return op;
 
-  op->readTuple();
+  op->theStatus= NdbOperation::UseNdbRecord;
+  op->theOperationType= NdbOperation::ReadRequest;
+  op->theErrorLine++;
+  op->theLockMode= NdbOperation::LM_Read;
+
+  theSimpleState= 0;
+
+  /* Setup the record/row for sending the primary key. */
+  op->thePKRec= key_rec;
+  op->thePKRow= key_row;
+  op->theReadMask= result_mask;
 
-  /* First do equal on all of the primary key attributes. */
-  for(Uint32 i= 0; i<rec->noOfColumns; i++)
-  {
-    switch (rec->columns[i].type)
-    {
-      case NdbRecord::AttrUnused:
-        // ToDo: Report an error here if PK.
-        break;
-      case NdbRecord::AttrNotNULL:
-        if (rec->columns[i].flags & NdbRecord::IsPK)
-          op->equal(i, &(row[rec->columns[i].offset]),
rec->columns[i].maxSize);
-        else if (!mask || (mask[i>>5] & (1<<(i&31))))
-          op->getValue(i, &(row[rec->columns[i].offset]));
-        break;
-      default:
-        assert(0);
-    }
-  }
+  /* Setup the record/row for receiving the results. */
+  op->theReceiver.getValues(result_rec, result_row);
+
+  return op;
 }
 
 NdbOperation *
-NdbTransaction::insertTuple(const char *tableName, const NdbRecord *rec, const char *row,
const Uint32 *mask)
+NdbTransaction::insertTuple(const NdbTableImpl *table,
+                            const NdbRecord *rec, const char *row,
+                            const Uint32 *mask)
 {
-  NdbOperation *op= getNdbOperation(tableName);
-  if(!op)
-    return op;
-
-  op->insertTuple();
-  for(Uint32 i= 0; i<rec->noOfColumns; i++)
+  /* Check that the NdbRecord specifies the full primary key. */
+  if (!(rec->flags & NdbRecord::RecHasAllPKs))
   {
-    switch (rec->columns[i].type)
-    {
-      case NdbRecord::AttrUnused:
-        // ToDo: Report error if PK.
-        break;
-      case NdbRecord::AttrNotNULL:
-        if (rec->columns[i].flags & NdbRecord::IsPK)
-          op->equal(i, &(row[rec->columns[i].offset]),
rec->columns[i].maxSize);
-        else if (!mask || (mask[i>>5] & (1<<(i&31))))
-          op->setValue(i, &(row[rec->columns[i].offset]));
-        break;
-      default:
-        assert(0);
-    }
+    setOperationErrorCodeAbort(4279);
+    return NULL;
   }
-}
 
-NdbOperation *
-NdbTransaction::updateTuple(const char *tableName, const NdbRecord *rec, const char *row,
const Uint32 *mask)
-{
-  NdbOperation *op= getNdbOperation(tableName);
+  NdbOperation *op= getNdbOperation(table, NULL, true);
   if(!op)
     return op;
 
-  op->updateTuple();
+  op->theStatus= NdbOperation::UseNdbRecord;
+  op->theOperationType= NdbOperation::InsertRequest;
+  op->theErrorLine++;
+  op->theLockMode= NdbOperation::LM_Exclusive;
+
+  theSimpleState= 0;
+
+  /* Setup the record/row for sending the primary key. */
+  op->thePKRec= rec;
+  op->thePKRow= row;
+  op->theUpdRec= rec;
+  op->theUpdRow= row;
+  op->theReadMask= mask;
 
-  for(Uint32 i= 0; i<rec->noOfColumns; i++)
-  {
-    switch (rec->columns[i].type)
-    {
-      case NdbRecord::AttrUnused:
-        // ToDo: Report an error here if PK.
-        break;
-      case NdbRecord::AttrNotNULL:
-        if (rec->columns[i].flags & NdbRecord::IsPK)
-          op->equal(i, &(row[rec->columns[i].offset]),
rec->columns[i].maxSize);
-        else if (!mask || (mask[i>>5] & (1<<(i&31))))
-          op->setValue(i, &(row[rec->columns[i].offset]));
-        break;
-      default:
-        assert(0);
-    }
-  }
+  return op;
 }
 
 NdbOperation *
-NdbTransaction::deleteTuple(const char *tableName, const NdbRecord *rec, const char *row)
+NdbTransaction::updateTuple(const NdbTableImpl *table,
+                            const NdbRecord *pk_rec, const char *pk_row,
+                            const NdbRecord *attr_rec, const char *attr_row,
+                            const Uint32 *mask)
 {
-  NdbOperation *op= getNdbOperation(tableName);
+  /* Check that the NdbRecord specifies the full primary key. */
+  if (!(pk_rec->flags & NdbRecord::RecIsPKRecord))
+  {
+    setOperationErrorCodeAbort(4279);
+    return NULL;
+  }
+
+  NdbOperation *op= getNdbOperation(table, NULL, true);
   if(!op)
     return op;
 
-  op->deleteTuple();
+  op->theStatus= NdbOperation::UseNdbRecord;
+  op->theOperationType= NdbOperation::UpdateRequest;
+  op->theErrorLine++;
+  op->theLockMode= NdbOperation::LM_Exclusive;
+
+  theSimpleState= 0;
+
+  /* Setup the record/row for sending the primary key. */
+  op->thePKRec= pk_rec;
+  op->thePKRow= pk_row;
+  op->theUpdRec= attr_rec;
+  op->theUpdRow= attr_row;
+  op->theReadMask= mask;
 
-  /* Do equal on all of the primary key attributes. */
-  for(Uint32 i= 0; i<rec->noOfColumns; i++)
-  {
-    switch (rec->columns[i].type)
-    {
-      case NdbRecord::AttrUnused:
-        // ToDo: Report an error here.
-        break;
-      case NdbRecord::AttrNotNULL:
-        if (rec->columns[i].flags & NdbRecord::IsPK)
-          op->equal(i, &(row[rec->columns[i].offset]),
rec->columns[i].maxSize);
-        break;
-      default:
-        assert(0);
-    }
-  }
+  return op;
 }
 
 NdbOperation *
-NdbTransaction::dirtyWriteTuple(const char *tableName, const NdbRecord *rec, const char
*row, const Uint32 *mask)
+NdbTransaction::deleteTuple(const NdbTableImpl *table,
+                            const NdbRecord *rec, const char *row)
 {
-  NdbOperation *op= getNdbOperation(tableName);
+  /* Check that the NdbRecord specifies the full primary key. */
+  if (!(rec->flags & NdbRecord::RecIsPKRecord))
+  {
+    setOperationErrorCodeAbort(4279);
+    return NULL;
+  }
+
+  NdbOperation *op= getNdbOperation(table, NULL, true);
   if(!op)
     return op;
 
-  op->dirtyWrite();
-  for(Uint32 i= 0; i<rec->noOfColumns; i++)
-  {
-    switch (rec->columns[i].type)
-    {
-      case NdbRecord::AttrUnused:
-        // ToDo: Report error if PK.
-        break;
-      case NdbRecord::AttrNotNULL:
-        if (rec->columns[i].flags & NdbRecord::IsPK)
-          op->equal(i, &(row[rec->columns[i].offset]),
rec->columns[i].maxSize);
-        else if (!mask || (mask[i>>5] & (1<<(i&31))))
-          op->setValue(i, &(row[rec->columns[i].offset]));
-        break;
-      default:
-        assert(0);
-    }
-  }
+  op->theStatus= NdbOperation::UseNdbRecord;
+  op->theOperationType= NdbOperation::DeleteRequest;
+  op->theErrorLine++;
+  op->theLockMode= NdbOperation::LM_Exclusive;
+
+  theSimpleState= 0;
+
+  /* Setup the record/row for sending the primary key. */
+  op->thePKRec= rec;
+  op->thePKRow= row;
+
+  return op;
 }
 
 NdbOperation *
-NdbTransaction::writeTuple(const char *tableName, const NdbRecord *rec, const char *row,
const Uint32 *mask)
+NdbTransaction::dirtyWriteTuple(const NdbTableImpl *table,
+                                const NdbRecord *pk_rec, const char *pk_row,
+                                const NdbRecord *attr_rec, const char *attr_row,
+                                const Uint32 *mask)
 {
-  NdbOperation *op= getNdbOperation(tableName);
+  /* Check that the NdbRecord specifies the full primary key. */
+  if (!(pk_rec->flags & NdbRecord::RecIsPKRecord))
+  {
+    setOperationErrorCodeAbort(4279);
+    return NULL;
+  }
+
+  NdbOperation *op= getNdbOperation(table, NULL, true);
   if(!op)
     return op;
 
-  op->writeTuple();
-  for(Uint32 i= 0; i<rec->noOfColumns; i++)
-  {
-    switch (rec->columns[i].type)
-    {
-      case NdbRecord::AttrUnused:
-        // ToDo: Report error if PK.
-        break;
-      case NdbRecord::AttrNotNULL:
-        if (rec->columns[i].flags & NdbRecord::IsPK)
-          op->equal(i, &(row[rec->columns[i].offset]),
rec->columns[i].maxSize);
-        else if (!mask || (mask[i>>5] & (1<<(i&31))))
-          op->setValue(i, &(row[rec->columns[i].offset]));
-        break;
-      default:
-        assert(0);
-    }
-  }
+  op->theStatus= NdbOperation::UseNdbRecord;
+  op->theOperationType= NdbOperation::WriteRequest;
+  op->theErrorLine++;
+  op->theLockMode= NdbOperation::LM_CommittedRead;
+  op->theSimpleIndicator = 1;
+  op->theDirtyIndicator = 1;
+
+  theSimpleState= 0;
+
+  /* Setup the record/row for sending the primary key. */
+  op->thePKRec= pk_rec;
+  op->thePKRow= pk_row;
+  op->theUpdRec= attr_rec;
+  op->theUpdRow= attr_row;
+  op->theReadMask= mask;
+
+  return op;
 }
 
 NdbOperation *
-NdbTransaction::simpleReadTuple(const char *tableName, const NdbRecord *rec, char *row,
const Uint32 *mask)
+NdbTransaction::writeTuple(const NdbTableImpl *table,
+                           const NdbRecord *pk_rec, const char *pk_row,
+                           const NdbRecord *attr_rec, const char *attr_row,
+                           const Uint32 *mask)
 {
-  NdbOperation *op= getNdbOperation(tableName);
+  /* Check that the NdbRecord specifies the full primary key. */
+  if (!(pk_rec->flags & NdbRecord::RecIsPKRecord))
+  {
+    setOperationErrorCodeAbort(4279);
+    return NULL;
+  }
+
+  NdbOperation *op= getNdbOperation(table, NULL, true);
   if(!op)
     return op;
 
-  op->simpleRead();
+  op->theStatus= NdbOperation::UseNdbRecord;
+  op->theOperationType= NdbOperation::WriteRequest;
+  op->theErrorLine++;
+  op->theLockMode= NdbOperation::LM_Exclusive;
+
+  theSimpleState= 0;
+
+  op->thePKRec= pk_rec;
+  op->thePKRow= pk_row;
+  op->theUpdRec= attr_rec;
+  op->theUpdRow= attr_row;
+  op->theReadMask= mask;
 
-  /* First do equal on all of the primary key attributes. */
-  for(Uint32 i= 0; i<rec->noOfColumns; i++)
-  {
-    switch (rec->columns[i].type)
-    {
-      case NdbRecord::AttrUnused:
-        // ToDo: Report an error here if PK.
-        break;
-      case NdbRecord::AttrNotNULL:
-        if (rec->columns[i].flags & NdbRecord::IsPK)
-          op->equal(i, &(row[rec->columns[i].offset]),
rec->columns[i].maxSize);
-        else if (!mask || (mask[i>>5] & (1<<(i&31))))
-          op->getValue(i, &(row[rec->columns[i].offset]));
-        break;
-      default:
-        assert(0);
-    }
-  }
+  return op;
 }
 
 NdbOperation *
-NdbTransaction::dirtyReadTuple(const char *tableName, const NdbRecord *rec, char *row,
const Uint32 *mask)
+NdbTransaction::simpleReadTuple(const NdbTableImpl *table,
+                                const NdbRecord *key_rec, char *key_row,
+                                const NdbRecord *result_rec, char *result_row,
+                                const Uint32 *result_mask)
+{
+  /**
+   * Currently/still disabled
+   */
+  return readTuple(table, key_rec, key_row, result_rec, result_row, result_mask);
+}
+
+NdbOperation *
+NdbTransaction::dirtyReadTuple(const NdbTableImpl *table,
+                               const NdbRecord *key_rec, char *key_row,
+                               const NdbRecord *result_rec, char *result_row,
+                               const Uint32 *result_mask)
 {
-  NdbOperation *op= getNdbOperation(tableName);
+  /* Check that the NdbRecord specifies the full primary key. */
+  if (!(key_rec->flags & NdbRecord::RecIsPKRecord))
+  {
+    setOperationErrorCodeAbort(4279);
+    return NULL;
+  }
+
+  NdbOperation *op= getNdbOperation(table, NULL, true);
   if(!op)
     return op;
 
-  op->dirtyRead();
+  op->theStatus= NdbOperation::UseNdbRecord;
+  op->theOperationType= NdbOperation::ReadRequest;
+  op->theErrorLine++;
+  op->theLockMode= NdbOperation::LM_CommittedRead;
+  op->theSimpleIndicator = 1;
+  op->theDirtyIndicator = 1;
+
+  theSimpleState= 0;
+
+  /* Setup the record/row for sending the primary key. */
+  op->thePKRec= key_rec;
+  op->thePKRow= key_row;
+  op->theReadMask= result_mask;
 
-  /* First do equal on all of the primary key attributes. */
-  for(Uint32 i= 0; i<rec->noOfColumns; i++)
-  {
-    switch (rec->columns[i].type)
-    {
-      case NdbRecord::AttrUnused:
-        // ToDo: Report an error here if PK.
-        break;
-      case NdbRecord::AttrNotNULL:
-        if (rec->columns[i].flags & NdbRecord::IsPK)
-          op->equal(i, &(row[rec->columns[i].offset]),
rec->columns[i].maxSize);
-        else if (!mask || (mask[i>>5] & (1<<(i&31))))
-          op->getValue(i, &(row[rec->columns[i].offset]));
-        break;
-      default:
-        assert(0);
-    }
-  }
+  /* Setup the record/row for receiving the results. */
+  op->theReceiver.getValues(result_rec, result_row);
+
+  return op;
 }
 
 NdbOperation *
-NdbTransaction::dirtyUpdateTuple(const char *tableName, const NdbRecord *rec, const char
*row, const Uint32 *mask)
+NdbTransaction::dirtyUpdateTuple(const NdbTableImpl *table,
+                                 const NdbRecord *pk_rec, const char *pk_row,
+                                 const NdbRecord *attr_rec, const char *attr_row,
+                                 const Uint32 *mask)
 {
-  NdbOperation *op= getNdbOperation(tableName);
+  /* Check that the NdbRecord specifies the full primary key. */
+  if (!(pk_rec->flags & NdbRecord::RecIsPKRecord))
+  {
+    setOperationErrorCodeAbort(4279);
+    return NULL;
+  }
+
+  NdbOperation *op= getNdbOperation(table, NULL, true);
   if(!op)
     return op;
 
-  op->dirtyUpdate();
+  op->theStatus= NdbOperation::UseNdbRecord;
+  op->theOperationType= NdbOperation::UpdateRequest;
+  op->theErrorLine++;
+  op->theLockMode= NdbOperation::LM_CommittedRead;
+  op->theSimpleIndicator = 1;
+  op->theDirtyIndicator = 1;
+
+  theSimpleState= 0;
+
+  /* Setup the record/row for sending the primary key. */
+  op->thePKRec= pk_rec;
+  op->thePKRow= pk_row;
+  op->theUpdRec= attr_rec;
+  op->theUpdRow= attr_row;
+  op->theReadMask= mask;
 
-  for(Uint32 i= 0; i<rec->noOfColumns; i++)
-  {
-    switch (rec->columns[i].type)
-    {
-      case NdbRecord::AttrUnused:
-        // ToDo: Report an error here if PK.
-        break;
-      case NdbRecord::AttrNotNULL:
-        if (rec->columns[i].flags & NdbRecord::IsPK)
-          op->equal(i, &(row[rec->columns[i].offset]),
rec->columns[i].maxSize);
-        else if (!mask || (mask[i>>5] & (1<<(i&31))))
-          op->setValue(i, &(row[rec->columns[i].offset]));
-        break;
-      default:
-        assert(0);
-    }
-  }
+  return op;
 }
 
 #ifdef VM_TRACE

--- 1.67/storage/ndb/src/ndbapi/NdbDictionary.cpp	2006-12-13 14:11:35 +01:00
+++ 1.68/storage/ndb/src/ndbapi/NdbDictionary.cpp	2006-12-13 14:11:35 +01:00
@@ -1481,39 +1481,40 @@ NdbDictionary::Dictionary::removeTableGl
 }
 
 NdbRecord *
-NdbDictionary::Dictionary::getRecord(const Table *table)
-{
-  return m_impl.getRecord(&NdbTableImpl::getImpl(*table));
+NdbDictionary::Dictionary::createRecord(const char *tableName,
+                                        const RecordSpecification *recSpec,
+                                        Uint32 length,
+                                        Uint32 elemSize)
+{
+  const NdbTableImpl *table= m_impl.getTable(tableName);
+  if(table)
+    return createRecord(table, recSpec, length, elemSize);
+  else
+    return NULL;
 }
 
 NdbRecord *
-NdbDictionary::Dictionary::getRecord(const char *tableName)
-{
-  return m_impl.getRecord(tableName);
+NdbDictionary::Dictionary::createRecord(const Table *table,
+                                        const RecordSpecification *recSpec,
+                                        Uint32 length,
+                                        Uint32 elemSize)
+{
+  return m_impl.createRecord(&NdbTableImpl::getImpl(*table),
+                             recSpec,
+                             length,
+                             elemSize);
 }
 
 void 
 NdbDictionary::Dictionary::releaseRecord(NdbRecord *rec)
 {
-  m_impl.releaseRecord(rec);
-}
-
-void
-NdbDictionary::Dictionary::recAddAttrNotNULL(const char *tableName, NdbRecord *rec,
Uint32 attrId, Uint32 offset)
-{
-  m_impl.recAddAttrNotNULL(tableName, rec, attrId, offset);
-}
-
-void
-NdbDictionary::Dictionary::recAddAttrNotNULL(const char *tableName, NdbRecord *rec, const
char *colName, Uint32 offset)
-{
-  m_impl.recAddAttrNotNULL(tableName, rec, colName, offset);
+  m_impl.releaseRecord_impl(rec);
 }
 
 Uint32 *
-NdbDictionary::Dictionary::getRecAttrSet(const char *tableName, NdbRecord *rec)
+NdbDictionary::Dictionary::getRecAttrSet(const NdbRecord *rec)
 {
-  return m_impl.getRecAttrSet(tableName, rec);
+  return m_impl.getRecAttrSet(rec);
 }
 
 void 
@@ -1523,15 +1524,17 @@ NdbDictionary::Dictionary::releaseRecAtt
 }
 
 void 
-NdbDictionary::Dictionary::recAttrSetEnable(Uint32 *attrSet, const char *tableName, const
NdbRecord *rec, Uint32 attrId)
+NdbDictionary::Dictionary::recAttrSetEnable(Uint32 *attrSet, Uint32 attrId)
 {
-  m_impl.recAttrSetEnable(attrSet, tableName, rec, attrId);
+  m_impl.recAttrSetEnable(attrSet, attrId);
 }
 
 void 
-NdbDictionary::Dictionary::recAttrSetEnable(Uint32 *attrSet, const char *tableName, const
NdbRecord *rec, const char *colName)
+NdbDictionary::Dictionary::recAttrSetEnable(Uint32 *attrSet,
+                                            const char *tableName,
+                                            const char *colName)
 {
-  m_impl.recAttrSetEnable(attrSet, tableName, rec, colName);
+  m_impl.recAttrSetEnable(attrSet, tableName, colName);
 }
 
 void NdbDictionary::Dictionary::putTable(const NdbDictionary::Table * table)

--- 1.154/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2006-12-13 14:11:35 +01:00
+++ 1.155/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2006-12-13 14:11:35 +01:00
@@ -4508,6 +4508,165 @@ NdbDictionaryImpl::dropLogfileGroup(cons
   return m_receiver.drop_filegroup(fg);
 }
 
+static int
+cmp_ndbrec_attr(const void *a, const void *b)
+{
+  const NdbRecord::Attr *r1= (const NdbRecord::Attr *)a;
+  const NdbRecord::Attr *r2= (const NdbRecord::Attr *)b;
+  if(r1->attrId < r2->attrId)
+    return -1;
+  else if(r1->attrId == r2->attrId)
+    return 0;
+  else
+    return 1;
+}
+
+NdbRecord *
+NdbDictionaryImpl::createRecord(const NdbTableImpl *table,
+                                const NdbDictionary::RecordSpecification *recSpec,
+                                Uint32 length,
+                                Uint32 elemSize)
+{
+  NdbRecord *rec= NULL;
+  Uint32 tableNumPK;
+  Uint32 oldAttrId;
+  Uint32 numPK;
+
+  /*
+    In later versions we can use elemSize to provide backwards
+    compatibility if we extend the RecordSpecification structure.
+  */
+  if (elemSize != sizeof(NdbDictionary::RecordSpecification))
+  {
+    m_error.code= 4276;
+    return NULL;
+  }
+
+  rec= (NdbRecord *)
+    calloc(1, sizeof(NdbRecord) + (length-1)*elemSize);
+  if (!rec)
+  {
+    m_error.code= 4000;
+    return NULL;
+  }
+
+  rec->tableId= table->m_id;
+  rec->tableVersion= table->m_version;
+  rec->flags= 0;
+  rec->totalTableColumns= table->m_columns.size();
+  rec->noOfColumns= length;
+
+  for (Uint32 i= 0; i<length; i++)
+  {
+    const NdbDictionary::RecordSpecification *rs= &recSpec[i];
+    const NdbColumnImpl *col;
+    if (rs->colPtr)
+      col= &NdbColumnImpl::getImpl(*(rs->colPtr));
+    else if (rs->colName)
+      col= table->getColumn(rs->colName);
+    else
+      col= table->getColumn(rs->colNumber);
+    if(!col)
+    {
+      m_error.code= 4277;
+      goto err;
+    }
+
+    NdbRecord::Attr *recCol= &rec->columns[i];
+
+    bool isVarCol= (col->m_arrayType==NDB_ARRAYTYPE_SHORT_VAR ||
+                    col->m_arrayType==NDB_ARRAYTYPE_MEDIUM_VAR);
+
+    recCol->attrId= col->m_attrId;
+    recCol->offset= rs->dataOffset;
+    recCol->maxSize= col->m_attrSize*col->m_arraySize;
+    recCol->flags= 0;
+    if(col->m_pk)
+      recCol->flags|= NdbRecord::IsPK;
+
+    switch(rs->type)
+    {
+      case NdbDictionary::RecordSpecification::AttrOffsetNotNULL:
+        if (!isVarCol)
+        {
+          recCol->type= NdbRecord::AttrNotNULL;
+        }
+        else
+          assert(0);            // ToDo
+        break;
+
+      case NdbDictionary::RecordSpecification::AttrOffsetNULL:
+        assert(0);              // ToDo
+        break;
+
+      default:
+        /* Wrong type supplied by caller. */
+        m_error.code= 4118;
+        goto err;
+    }
+  }
+
+  /* Now we sort the array in attrId order. */
+  qsort(rec->columns,
+        rec->noOfColumns,
+        sizeof(rec->columns[0]),
+        cmp_ndbrec_attr);
+
+  /*
+    Now check for the presense of primary keys, and set flags for whether
+    this NdbRecord can be used for insert and/or for specifying keys for
+    read/update.
+
+    Also test for duplicate columns, easy now that they are sorted.
+  */
+
+  oldAttrId= ~0;
+  numPK= 0;
+  for (Uint32 i= 0; i<rec->noOfColumns; i++)
+  {
+    NdbRecord::Attr *recCol= &rec->columns[i];
+    if (i > 0 && oldAttrId==recCol->attrId)
+    {
+      m_error.code= 4278;
+      goto err;
+    }
+    oldAttrId= recCol->attrId;
+
+    if (recCol->flags & NdbRecord::IsPK)
+      numPK++;
+  }
+
+  /*
+    Since we checked for duplicates, we can check for primary key completeness
+    simply by counting.
+  */
+  tableNumPK= 0;
+  for (Uint32 i= 0; i<table->m_columns.size(); i++)
+  {
+    if (table->m_columns[i]->m_pk)
+      tableNumPK++;
+  }
+  if (numPK >= tableNumPK)
+  {
+    rec->flags|= NdbRecord::RecHasAllPKs;
+    if (numPK == tableNumPK)
+      rec->flags|= NdbRecord::RecIsPKRecord;
+  }
+
+  return rec;
+
+ err:
+  if (rec)
+    free(rec);
+  return NULL;
+}
+
+void NdbDictionaryImpl::releaseRecord_impl(NdbRecord *rec)
+{
+  if (rec)
+    free(rec);
+}
+
 int
 NdbDictInterface::create_file(const NdbFileImpl & file,
 			      const NdbFilegroupImpl & group,

--- 1.71/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2006-12-13 14:11:35 +01:00
+++ 1.72/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2006-12-13 14:11:35 +01:00
@@ -574,17 +574,35 @@ public:
 
 class NdbRecord {
 public:
-  Uint32 tableId;
-  Uint32 tableVersion;
+  /* Flag bits for the entire NdbRecord. */
+  enum RecFlags
+  {
+    /*
+      This flag tells whether this NdbRecord is a PK record for the table,
+      ie. that it describes _exactly_ the primary key attributes, no more and
+      no less. This is a requirement for the PK record used in read/update.
+    */
+    RecIsPKRecord= 0x1,
+
+    /*
+      This flag tells whether this NdbRecord includes _at least_ all PK columns
+      (and possibly other columns), which is a requirement for insert.
+    */
+    RecHasAllPKs= 0x2
+  };
 
-  enum Flags
+  /* Flag bits for individual columns in the NdbRecord. */
+  enum ColFlags
   {
-    IsPK= 1
+    /*
+      This flag tells whether the column is part of the primary key, used
+      for insert.
+    */
+    IsPK= 0x1
   };
 
   enum RecAttrTypes
   {
-    AttrUnused= 0,
     AttrNotNULL
   };
 
@@ -595,6 +613,7 @@ public:
       struct are valid.
     */
     enum RecAttrTypes type;
+    Uint32 attrId;
     /* Offset of data from the start of a row. */
     Uint32 offset;
     /*
@@ -603,14 +622,20 @@ public:
     */
     Uint32 maxSize;
 
+    /* Flags, or-ed from enum ColFlags. */
     Uint32 flags;
   };
 
-  /*
-    The size of this array is the same as number of columns in the table.
-  */
+  Uint32 tableId;
+  Uint32 tableVersion;
+  /* Flags, or-ed from enum RecFlags. */
+  Uint32 flags;
+  /* Total number of attributes in table. */
+  Uint32 totalTableColumns;
+
+  /* The real size of the array at the end of this struct. */
   Uint32 noOfColumns;
-  struct Attr *columns;
+  struct Attr columns[1];
 };
 
 
@@ -710,72 +735,17 @@ public:
                               const BaseString& internalName);
 
 
-  /* NdbRecord stuff. */
-  NdbRecord *getRecord(const NdbTableImpl *table)
-  {
-    NdbRecord *rec;
-
-    rec= (NdbRecord *)malloc(sizeof(*rec));
-    if (!rec)
-      return rec;
-    rec->noOfColumns= table->getNoOfColumns();
-    rec->columns= (NdbRecord::Attr *)calloc(rec->noOfColumns,
sizeof(rec->columns[0]));
-    if (!rec->columns)
-    {
-      free(rec);
-      return NULL;
-    }
-
-    rec->tableId= table->getObjectId();
-    rec->tableVersion= table->getObjectVersion();
-
-    return rec;
-  }
-
-  NdbRecord *getRecord(const char *tableName)
-  {
-    NdbTableImpl *table= getTable(tableName);
-    if(table)
-      return getRecord(table);
-    else
-      return NULL;
-  }
-
-  void releaseRecord(NdbRecord *rec)
-  {
-    free(rec->columns);
-    free(rec);
-  }
-
-  /* ToDo: The interface for setting columns in NdbRecord needs updating. */
-  void recAddAttrNotNULL(const char *tableName, NdbRecord *rec, Uint32 attrId, Uint32
offset)
-  {
-    const NdbTableImpl *table= getTable(tableName);
-    rec->columns[attrId].type= NdbRecord::AttrNotNULL;
-    rec->columns[attrId].offset= offset;
-    const NdbDictionary::Column *col= table->getColumn(attrId);
-    rec->columns[attrId].maxSize= col->getSizeInBytes();
-    rec->columns[attrId].flags=
-      (col->getPrimaryKey() ? NdbRecord::IsPK : 0);
-  }
-
-  void recAddAttrNotNULL(const char *tableName, NdbRecord *rec, const char *colName,
Uint32 offset)
-  {
-    const NdbTableImpl *table= getTable(tableName);
-    const NdbDictionary::Column *col= table->getColumn(colName);
-    Uint32 attrId= col->getAttrId();
-    rec->columns[attrId].type= NdbRecord::AttrNotNULL;
-    rec->columns[attrId].offset= offset;
-    rec->columns[attrId].maxSize= col->getSizeInBytes();
-    rec->columns[attrId].flags=
-      (col->getPrimaryKey() ? NdbRecord::IsPK : 0);
-  }
+  NdbRecord *createRecord(const NdbTableImpl *table,
+                          const NdbDictionary::RecordSpecification *recSpec,
+                          Uint32 length,
+                          Uint32 elemSize);
+  void releaseRecord_impl(NdbRecord *rec);
 
-  Uint32 *getRecAttrSet(const char *tableName, NdbRecord *rec)
+  Uint32 *getRecAttrSet(const NdbRecord *rec)
   {
     Uint32 *attrSet;
 
-    attrSet= (Uint32 *)calloc((rec->noOfColumns+31)>>5, sizeof(*attrSet));
+    attrSet= (Uint32 *)calloc((rec->totalTableColumns+31)>>5, sizeof(*attrSet));
     return attrSet;
   }
 
@@ -784,17 +754,17 @@ public:
     free(attrSet);
   }
 
-  void recAttrSetEnable(Uint32 *attrSet, const char *tableName, const NdbRecord *rec,
Uint32 attrId)
+  void recAttrSetEnable(Uint32 *attrSet, Uint32 attrId)
   {
     attrSet[attrId>>5]|= 1<<(attrId&31);
   }
 
-  void recAttrSetEnable(Uint32 *attrSet, const char *tableName, const NdbRecord *rec,
const char *colName)
+  void recAttrSetEnable(Uint32 *attrSet, const char *tableName, const char *colName)
   {
     // ToDo: check table/column not found ...
     const NdbTableImpl *table= getTable(tableName);
     const NdbDictionary::Column *col= table->getColumn(colName);
-    recAttrSetEnable(attrSet, tableName, rec, col->getAttrId());
+    recAttrSetEnable(attrSet, col->getAttrId());
   }
 
 private:

--- 1.28/storage/ndb/src/ndbapi/NdbIndexOperation.cpp	2006-12-13 14:11:35 +01:00
+++ 1.29/storage/ndb/src/ndbapi/NdbIndexOperation.cpp	2006-12-13 14:11:35 +01:00
@@ -36,7 +36,7 @@ NdbIndexOperation::NdbIndexOperation(Ndb
   /**
    * Change receiver type
    */
-  theReceiver.init(NdbReceiver::NDB_INDEX_OPERATION, this);
+  theReceiver.init(NdbReceiver::NDB_INDEX_OPERATION, false, this);
 }
 
 NdbIndexOperation::~NdbIndexOperation()
@@ -55,7 +55,7 @@ NdbIndexOperation::indxInit(const NdbInd
 			    const NdbTableImpl * aTable, 
 			    NdbTransaction* myConnection)
 {
-  NdbOperation::init(aTable, myConnection);
+  NdbOperation::init(aTable, myConnection, false);
 
   switch (anIndex->m_type) {
   case(NdbDictionary::Index::UniqueHashIndex):

--- 1.19/storage/ndb/src/ndbapi/NdbOperation.cpp	2006-12-13 14:11:35 +01:00
+++ 1.20/storage/ndb/src/ndbapi/NdbOperation.cpp	2006-12-13 14:11:35 +01:00
@@ -79,7 +79,7 @@ NdbOperation::NdbOperation(Ndb* aNdb, Nd
   theBlobList(NULL),
   m_abortOption(-1)
 {
-  theReceiver.init(NdbReceiver::NDB_OPERATION, this);
+  theReceiver.init(NdbReceiver::NDB_OPERATION, false, this);
   theError.code = 0;
 }
 /*****************************************************************************
@@ -129,7 +129,8 @@ NdbOperation::setErrorCodeAbort(int anEr
  *****************************************************************************/
 
 int
-NdbOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection){
+NdbOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection,
+                   bool useRec){
   NdbApiSignal* tSignal;
   theStatus		= Init;
   theError.code		= 0;
@@ -178,7 +179,7 @@ NdbOperation::init(const NdbTableImpl* t
   tcKeyReq->scanInfo = 0;
   theKEYINFOptr = &tcKeyReq->keyInfo[0];
   theATTRINFOptr = &tcKeyReq->attrInfo[0];
-  theReceiver.init(NdbReceiver::NDB_OPERATION, this);
+  theReceiver.init(NdbReceiver::NDB_OPERATION, useRec, this);
   return 0;
 }
 

--- 1.24/storage/ndb/src/ndbapi/NdbOperationExec.cpp	2006-12-13 14:11:35 +01:00
+++ 1.25/storage/ndb/src/ndbapi/NdbOperationExec.cpp	2006-12-13 14:11:35 +01:00
@@ -505,6 +505,405 @@ NdbOperation::prepareSendInterpreted()
   return 0;
 }//NdbOperation::prepareSendInterpreted()
 
+
+/*
+  Prepares TCKEYREQ and (if needed) KEYINFO and ATTRINFO signals, for
+  operations using NdbRecord.
+*/
+int
+NdbOperation::prepareSendNdbRecord(Uint32 aTC_ConnectPtr, Uint64 aTransId)
+{
+  Uint32 *keyInfoPtr, *attrInfoPtr;
+  Uint32 remain;
+  int res;
+
+  assert(theStatus==UseNdbRecord);
+  /* Not yet support for NdbRecord with interpreted operations. */
+  assert(!theInterpretIndicator);
+
+  const NdbRecord *key_rec= thePKRec;
+  const char *key_row= thePKRow;
+  const NdbRecord *result_rec, *upd_rec;
+  const char *updRow;
+  const Uint32 *result_mask= theReadMask;
+
+  TcKeyReq *tcKeyReq= CAST_PTR(TcKeyReq, theTCREQ->getDataPtrSend());
+  Uint32 hdrSize= fillTcKeyReqHdr(tcKeyReq, aTC_ConnectPtr, aTransId, key_rec);
+  keyInfoPtr= theTCREQ->getDataPtrSend() + hdrSize;
+
+  // Fill in keyinfo (in TCKEYREQ signal, spilling into KEYINFO signals)
+  remain= TcKeyReq::MaxKeyInfo;
+  theTotalNrOfKeyWordInSignal= 0;
+  for (Uint32 i= 0; i<key_rec->noOfColumns; i++)
+  {
+    const NdbRecord::Attr *col;
+
+    col= &key_rec->columns[i];
+    /* 
+       This is a special case for insert, which allows extra columns in the key
+       NdbRecord, since it uses only a single record.
+    */
+    if(!(col->flags&NdbRecord::IsPK))
+      continue;
+
+    switch (col->type)
+    {
+      case NdbRecord::AttrNotNULL:
+        res= insertKEYINFO_NdbRecord(aTC_ConnectPtr, aTransId,
+                                     &key_row[col->offset],
+                                     col->maxSize, &keyInfoPtr, &remain);
+        if(res)
+          return res;
+        break;
+
+      default:
+        assert(false);
+    }
+  }
+
+  /* 
+     Now the total keyinfo size has been computed, inside
+     insertKEYINFO_NdbRecord().
+  */
+  TcKeyReq::setKeyLength(tcKeyReq->requestInfo, theTupKeyLen);
+
+  // Fill in attrinfo (in TCKEYREQ signal, spilling into ATTRINFO signals)
+  remain= TcKeyReq::MaxAttrInfo;
+  attrInfoPtr= theTCREQ->getDataPtrSend() + hdrSize +
+    (theTupKeyLen > TcKeyReq::MaxKeyInfo ? TcKeyReq::MaxKeyInfo : theTupKeyLen);
+
+  OperationType tOpType= theOperationType;
+  if ((tOpType == InsertRequest) || (tOpType == WriteRequest) ||
+      (tOpType == UpdateRequest))
+  {
+    upd_rec= theUpdRec;
+    updRow= theUpdRow;
+    for (Uint32 i= 0; i<upd_rec->noOfColumns; i++)
+    {
+      const NdbRecord::Attr *col;
+
+      col= &upd_rec->columns[i];
+      Uint32 attrId= col->attrId;
+
+      if (result_mask)
+      {
+        if (!(result_mask[attrId>>5] & (1<<(attrId&31))))
+          continue;
+      }
+
+      switch (col->type)
+      {
+        case NdbRecord::AttrNotNULL:
+          res= insertATTRINFOHdr_NdbRecord(aTC_ConnectPtr, aTransId,
+                                           attrId, col->maxSize,
+                                           &attrInfoPtr, &remain);
+          if(res)
+            return res;
+          res= insertATTRINFOData_NdbRecord(aTC_ConnectPtr, aTransId,
+                                            &updRow[col->offset], col->maxSize,
+                                            &attrInfoPtr, &remain);
+          if(res)
+            return res;
+          break;
+
+        default:
+          assert(false);
+      }
+    }
+  }
+  else if (tOpType == ReadRequest)
+  {
+    result_rec= theReceiver.theNdbRecord;
+    for (Uint32 i= 0; i<result_rec->noOfColumns; i++)
+    {
+      const NdbRecord::Attr *col;
+
+      col= &result_rec->columns[i];
+      Uint32 attrId= col->attrId;
+
+      if (result_mask)
+      {
+        if (!(result_mask[attrId>>5] & (1<<(attrId&31))))
+          continue;
+      }
+
+      res= insertATTRINFOHdr_NdbRecord(aTC_ConnectPtr, aTransId,
+                                       attrId, 0,
+                                       &attrInfoPtr, &remain);
+      if(res)
+        return res;
+    }
+  }
+  Uint32 signalLength= hdrSize +
+    (theTupKeyLen > TcKeyReq::MaxKeyInfo ?
+         TcKeyReq::MaxKeyInfo : theTupKeyLen) +
+    (theTotalCurrAI_Len > TcKeyReq::MaxAttrInfo ?
+         TcKeyReq::MaxAttrInfo : theTotalCurrAI_Len);
+  theTCREQ->setLength(signalLength);
+
+
+  /* Check if too much attrinfo have been defined. */
+  if (theTotalCurrAI_Len > TcKeyReq::MaxTotalAttrInfo){
+    setErrorCodeAbort(4257);
+    return -1;
+  }
+  TcKeyReq::setAttrinfoLen(tcKeyReq->attrLen, theTotalCurrAI_Len);
+  TcKeyReq::setAIInTcKeyReq(tcKeyReq->requestInfo, 
+                            theTotalCurrAI_Len < TcKeyReq::MaxAttrInfo ?
+                                theTotalCurrAI_Len : TcKeyReq::MaxAttrInfo);
+
+  theStatus= WaitResponse;
+  theReceiver.prepareSend();
+
+  return 0;
+}
+
+/*
+  Set up the header of the TCKEYREQ signal (except a few length fields,
+  which are computed later in prepareSendNdbRecord()).
+  Returns the length of the header, used to find the correct placement of
+  keyinfo and attrinfo stored within TCKEYREQ.
+*/
+Uint32
+NdbOperation::fillTcKeyReqHdr(TcKeyReq *tcKeyReq,
+                              Uint32 connectPtr,
+                              Uint64 transId,
+                              const NdbRecord *rec)
+{
+  Uint32 hdrLen;
+  UintR *hdrPtr;
+
+  tcKeyReq->apiConnectPtr= connectPtr;
+  tcKeyReq->apiOperationPtr= ptr2int();
+
+  UintR attrLen= 0;
+  TcKeyReq::setAPIVersion(attrLen, NDB_VERSION);
+  /* We will setAttrinfoLen() later when AttrInfo has been written. */
+  tcKeyReq->attrLen= attrLen;
+
+  tcKeyReq->tableId= rec->tableId;
+
+  UintR reqInfo= 0;
+  TcKeyReq::setSimpleFlag(reqInfo, theSimpleIndicator);
+  TcKeyReq::setCommitFlag(reqInfo, theCommitIndicator);
+  TcKeyReq::setStartFlag(reqInfo, theStartIndicator);
+  TcKeyReq::setInterpretedFlag(reqInfo, theInterpretIndicator);
+  TcKeyReq::setNoDiskFlag(reqInfo, m_no_disk_flag);
+  TcKeyReq::setDirtyFlag(reqInfo, theDirtyIndicator);
+  TcKeyReq::setOperationType(reqInfo, theOperationType);
+  Uint8 abortOption=
+    m_abortOption != -1 ? m_abortOption : theNdbCon->m_abortOption;
+  TcKeyReq::setAbortOption
+    (reqInfo, theSimpleIndicator ? (Uint8)AO_IgnoreError : abortOption);
+  TcKeyReq::setDistributionKeyFlag(reqInfo, theDistrKeyIndicator_);
+  TcKeyReq::setScanIndFlag(reqInfo, theScanInfo & 1);
+  /* We will setAIInTcKeyReq() and setKeyLength() later. */
+  tcKeyReq->requestInfo= reqInfo;
+
+  tcKeyReq->tableSchemaVersion= rec->tableVersion;
+  tcKeyReq->transId1= (Uint32)transId;
+  tcKeyReq->transId2= (Uint32)(transId>>32);
+
+  /*
+    The next four words are optional, and included or not based on the flags
+    passed earlier. At most two of them are possible here.
+  */
+  hdrLen= 8;
+  hdrPtr= &(tcKeyReq->scanInfo);
+  if (theScanInfo & 1)
+  {
+    *hdrPtr++= theScanInfo;
+    hdrLen++;
+  }
+  if (theDistrKeyIndicator_)
+  {
+    *hdrPtr++= theDistributionKey;
+    hdrLen++;
+  }
+
+  return hdrLen;
+}
+
+/*
+  Link a new KEYINFO signal into the operation.
+  Return 0 on success, -1 on error.
+*/
+int
+NdbOperation::allocKeyInfo(Uint32 connectPtr, Uint64 transId,
+                           Uint32 **dstPtr, Uint32 *remain)
+{
+  NdbApiSignal *tSignal;
+  KeyInfo *keyInfo;
+
+  tSignal= theNdb->getSignal();
+  if (tSignal == NULL)
+  {
+    setErrorCodeAbort(4000);
+    return -1;
+  }
+  keyInfo= (struct KeyInfo *)(tSignal->getDataPtrSend());
+  if (tSignal->setSignal(m_keyInfoGSN) == -1)
+  {
+    setErrorCodeAbort(4001);
+    return -1;
+  }
+  tSignal->next(NULL);
+  keyInfo->connectPtr= connectPtr;
+  keyInfo->transId[0]= (Uint32)transId;
+  keyInfo->transId[1]= (Uint32)(transId>>32);
+  if (theTCREQ->next() != NULL)
+  {
+    theLastKEYINFO->setLength(KeyInfo::MaxSignalLength);
+    theLastKEYINFO->next(tSignal);
+  }
+  else
+  {
+    theTCREQ->next(tSignal);
+  }
+  theLastKEYINFO= tSignal;
+  *remain= KeyInfo::DataLength;
+  *dstPtr= &(keyInfo->keyData[0]);
+  return 0;
+}
+
+/*
+  Link a new ATTRINFO signal into the operation.
+  Return 0 on success, -1 on error.
+*/
+int
+NdbOperation::allocAttrInfo(Uint32 connectPtr, Uint64 transId,
+                            Uint32 **dstPtr, Uint32 *remain)
+{
+  NdbApiSignal *tSignal;
+  AttrInfo *attrInfo;
+
+  tSignal= theNdb->getSignal();
+  if (tSignal == NULL)
+  {
+    setErrorCodeAbort(4000);
+    return -1;
+  }
+  attrInfo= (struct AttrInfo *)(tSignal->getDataPtrSend());
+  if (tSignal->setSignal(m_attrInfoGSN) == -1)
+  {
+    setErrorCodeAbort(4001);
+    return -1;
+  }
+  tSignal->next(NULL);
+  attrInfo->connectPtr= connectPtr;
+  attrInfo->transId[0]= (Uint32)transId;
+  attrInfo->transId[1]= (Uint32)(transId>>32);
+  if (theFirstATTRINFO != NULL)
+  {
+    theCurrentATTRINFO->setLength(AttrInfo::MaxSignalLength);
+    theCurrentATTRINFO->next(tSignal);
+  }
+  else
+  {
+    theFirstATTRINFO= tSignal;
+  }
+  theCurrentATTRINFO= tSignal;
+  *remain= AttrInfo::DataLength;
+  *dstPtr= &(attrInfo->attrData[0]);
+
+  return 0;
+}
+
+int
+NdbOperation::insertKEYINFO_NdbRecord(Uint32 connectPtr,
+                                      Uint64 transId,
+                                      const char *value,
+                                      Uint32 size,
+                                      Uint32 **dstPtr,
+                                      Uint32 *remain)
+{
+  theTupKeyLen+= (size+3)/4;
+
+  while (size > *remain*4)
+  {
+    if (*remain)
+    {
+      memcpy(*dstPtr, value, *remain*4);
+      value+= *remain*4;
+      size-= *remain*4;
+    }
+    int res= allocKeyInfo(connectPtr, transId, dstPtr, remain);
+    if(res)
+      return res;
+  }
+
+  memcpy(*dstPtr, value, size);
+  if((size%4) != 0)
+    memset(((char *)*dstPtr)+size, 0, 3-(size%4));
+  Uint32 sizeInWords= (size+3)/4;
+  *dstPtr+= sizeInWords;
+  *remain-= sizeInWords;
+  if (theTCREQ->next() != NULL)
+    theLastKEYINFO->setLength(KeyInfo::MaxSignalLength - *remain);
+
+  return 0;
+}
+
+int
+NdbOperation::insertATTRINFOHdr_NdbRecord(Uint32 connectPtr,
+                                          Uint64 transId,
+                                          Uint32 attrId,
+                                          Uint32 attrLen,
+                                          Uint32 **dstPtr,
+                                          Uint32 *remain)
+{
+  theTotalCurrAI_Len++;
+  if (! *remain)
+  {
+    int res= allocAttrInfo(connectPtr, transId, dstPtr, remain);
+    if (res)
+      return res;
+  }
+  Uint32 ah;
+  AttributeHeader::init(&ah, attrId, attrLen);
+  *(*dstPtr)++= ah;
+  (*remain)--;
+  if (theFirstATTRINFO != NULL)
+    theCurrentATTRINFO->setLength(AttrInfo::MaxSignalLength - *remain);
+
+  return 0;
+}
+
+int
+NdbOperation::insertATTRINFOData_NdbRecord(Uint32 connectPtr,
+                                           Uint64 transId,
+                                           const char *value,
+                                           Uint32 size,
+                                           Uint32 **dstPtr,
+                                           Uint32 *remain)
+{
+  theTotalCurrAI_Len+= (size+3)/4;
+
+  while (size > *remain*4)
+  {
+    if (*remain)
+    {
+      memcpy(*dstPtr, value, *remain*4);
+      value+= *remain*4;
+      size-= *remain*4;
+    }
+    int res= allocAttrInfo(connectPtr, transId, dstPtr, remain);
+    if (res)
+      return res;
+  }
+
+  memcpy(*dstPtr, value, size);
+  if((size%4) != 0)
+    memset(((char *)*dstPtr)+size, 0, 3-(size%4));
+  Uint32 sizeInWords= (size+3)/4;
+  *dstPtr+= sizeInWords;
+  *remain-= sizeInWords;
+  if (theFirstATTRINFO != NULL)
+    theCurrentATTRINFO->setLength(AttrInfo::MaxSignalLength - *remain);
+
+  return 0;
+}
+
 int
 NdbOperation::checkState_TransId(NdbApiSignal* aSignal)
 {

--- 1.19/storage/ndb/src/ndbapi/NdbReceiver.cpp	2006-12-13 14:11:35 +01:00
+++ 1.20/storage/ndb/src/ndbapi/NdbReceiver.cpp	2006-12-13 14:11:35 +01:00
@@ -29,7 +29,8 @@ NdbReceiver::NdbReceiver(Ndb *aNdb) :
   m_ndb(aNdb),
   m_id(NdbObjectIdMap::InvalidId),
   m_type(NDB_UNINITIALIZED),
-  m_owner(0)
+  m_owner(0),
+  usingNdbRecord(false)
 {
   theCurrentRecAttr = theFirstRecAttr = 0;
   m_defined_rows = 0;
@@ -47,10 +48,11 @@ NdbReceiver::~NdbReceiver()
 }
 
 void
-NdbReceiver::init(ReceiverType type, void* owner)
+NdbReceiver::init(ReceiverType type, bool useRec, void* owner)
 {
   theMagicNumber = 0x11223344;
   m_type = type;
+  usingNdbRecord= useRec;
   m_owner = owner;
   if (m_id == NdbObjectIdMap::InvalidId) {
     if (m_ndb)
@@ -63,19 +65,24 @@ NdbReceiver::init(ReceiverType type, voi
 
 void
 NdbReceiver::release(){
-  NdbRecAttr* tRecAttr = theFirstRecAttr;
-  while (tRecAttr != NULL)
+  if (!usingNdbRecord)
   {
-    NdbRecAttr* tSaveRecAttr = tRecAttr;
-    tRecAttr = tRecAttr->next();
-    m_ndb->releaseRecAttr(tSaveRecAttr);
+    NdbRecAttr* tRecAttr = theFirstRecAttr;
+    while (tRecAttr != NULL)
+    {
+      NdbRecAttr* tSaveRecAttr = tRecAttr;
+      tRecAttr = tRecAttr->next();
+      m_ndb->releaseRecAttr(tSaveRecAttr);
+    }
+    theFirstRecAttr = NULL;
+    theCurrentRecAttr = NULL;
   }
-  theFirstRecAttr = NULL;
-  theCurrentRecAttr = NULL;
 }
   
 NdbRecAttr *
 NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
+  assert(!usingNdbRecord);
+
   NdbRecAttr* tRecAttr = m_ndb->getRecAttr();
   if(tRecAttr && !tRecAttr->setup(tAttrInfo, user_dst_ptr)){
     if (theFirstRecAttr == NULL)
@@ -92,6 +99,16 @@ NdbReceiver::getValue(const NdbColumnImp
   return 0;
 }
 
+void
+NdbReceiver::getValues(const NdbRecord* rec, char *row_ptr)
+{
+  assert(usingNdbRecord);
+
+  theNdbRecord= rec;
+  m_RecPos= 0;
+  theRow= row_ptr;
+}
+
 #define KEY_ATTR_ID (~(Uint32)0)
 
 void
@@ -228,9 +245,96 @@ NdbReceiver::copyout(NdbReceiver & dstRe
   return start;
 }
 
+static void assignToRec(const NdbRecord::Attr *col,
+                        char *row,
+                        const Uint32 *src,
+                        Uint32 byteSize)
+{
+  switch (col->type)
+  {
+    case NdbRecord::AttrNotNULL:
+      memcpy(&row[col->offset], src, byteSize);
+      break;
+
+    default:
+      assert(false);
+  }
+}
+
+static void setRecToNULL(const NdbRecord::Attr *col,
+                         char *row)
+{
+  switch (col->type)
+  {
+    case NdbRecord::AttrNotNULL:
+      assert(false);
+      break;
+
+    default:
+      assert(false);
+  }
+}
+
 int
 NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
 {
+  if (usingNdbRecord)
+  {
+    Uint32 exp= m_expected_result_length;
+    Uint32 tmp= m_received_result_length + aLength;
+    const NdbRecord *rec= theNdbRecord;
+
+    while (aLength > 0)
+    {
+      AttributeHeader ah(* aDataPtr++);
+      const Uint32 attrId= ah.getAttributeId();
+      aLength--;
+
+      /*
+        Set all not returned columns to NULL.
+        The rows should be returned in the same order as we requested them
+        (which is in any case in attribute ID order).
+      */
+      while (m_RecPos < rec->noOfColumns &&
+             rec->columns[m_RecPos].attrId < attrId)
+      {
+        setRecToNULL(&rec->columns[m_RecPos], theRow);
+        m_RecPos++;
+      }
+
+      /* We should never get back an attribute not originally requested. */
+      assert(m_RecPos < rec->noOfColumns &&
+             rec->columns[m_RecPos].attrId == attrId);
+
+      Uint32 attrSize= ah.getByteSize();
+      if (attrSize == 0)
+      {
+        setRecToNULL(&rec->columns[m_RecPos], theRow);
+      }
+      else
+      {
+        assert(attrSize <= rec->columns[m_RecPos].maxSize);
+        Uint32 sizeInWords= (attrSize+3)>>2;
+        /* Not sure how to deal with this, shouldn't happen. */
+        if (unlikely(sizeInWords > aLength))
+        {
+          sizeInWords= aLength;
+          attrSize= 4*aLength;
+        }
+
+        assignToRec(&rec->columns[m_RecPos], theRow, aDataPtr, attrSize);
+        aDataPtr+= sizeInWords;
+        aLength-= sizeInWords;
+      }
+      m_RecPos++;
+    }
+
+    m_received_result_length = tmp;
+
+    return (tmp == exp || (exp > TcKeyConf::SimpleReadBit) ? 1 : 0);
+  }
+
+  /* The old way, using getValue() and NdbRecAttr. */
   NdbRecAttr* currRecAttr = theCurrentRecAttr;
   
   for (Uint32 used = 0; used < aLength ; used++){
@@ -251,6 +355,12 @@ NdbReceiver::execTRANSID_AI(const Uint32
       aDataPtr += add;
       currRecAttr = currRecAttr->next();
     } else {
+      /*
+        This should not happen: we got back an attribute for which we have no
+        stored NdbRecAttr recording that we requested said attribute (or we got
+        back attributes in the wrong order).
+        So dump some info for debugging, and abort.
+      */
       ndbout_c("%p: tAttrId: %d currRecAttr: %p tAttrSize: %d %d", this,
 	       tAttrId, currRecAttr, 
 	       tAttrSize, currRecAttr->get_size_in_bytes());

--- 1.103/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2006-12-13 14:11:35 +01:00
+++ 1.104/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2006-12-13 14:11:35 +01:00
@@ -100,7 +100,7 @@ NdbScanOperation::init(const NdbTableImp
   }
 
   // NOTE! The hupped trans becomes the owner of the operation
-  if(NdbOperation::init(tab, aScanConnection) != 0){
+  if(NdbOperation::init(tab, aScanConnection, false) != 0){
     theNdb->theRemainingStartTransactions--;
     return -1;
   }
@@ -279,7 +279,7 @@ NdbScanOperation::fix_receivers(Uint32 p
 	return -1;
       }//if
       m_receivers[i] = tScanRec;
-      tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, this);
+      tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, false, this);
     }
     m_allocated_receivers = parallel;
   }

--- 1.15/storage/ndb/test/ndbapi/flexBench.cpp	2006-12-13 14:11:35 +01:00
+++ 1.16/storage/ndb/test/ndbapi/flexBench.cpp	2006-12-13 14:11:35 +01:00
@@ -613,10 +613,11 @@ static void* flexBenchThread(void* pArg)
   Uint32**          pAttrSet= NULL;
   int               nRefOpOffset= 0;
   NdbDictionary::Dictionary *dict= NULL;
+  NdbDictionary::RecordSpecification recSpec[MAXATTR+MAXNOLONGKEY];
 
   threadNo = pThreadData->threadNo ;
 
-  /* Additional space in rows for long primaru keys. */
+  /* Additional space in rows for long primary keys. */
   if (useLongKeys)
     nReadBuffSize+= tNoOfTables*sizeof(unsigned)*tSizeOfLongPK*tNoOfLongPK;
 
@@ -624,7 +625,7 @@ static void* flexBenchThread(void* pArg)
   attrRefValue = (int*)malloc(nRefBuffSize) ;
   pOps = (NdbOperation**)malloc(tNoOfTables*sizeof(NdbOperation*)) ;
   pNdb = new Ndb(g_cluster_connection, "TEST_DB" );
-  pRec= (NdbRecord **)calloc(tNoOfTables, sizeof(*pRec));
+  pRec= (NdbRecord **)calloc(tNoOfTables*3, sizeof(*pRec));
   pAttrSet= (Uint32 **)calloc(tNoOfTables, sizeof(*pAttrSet));
   
   if (!attrValue || !attrRefValue || !pOps || !pNdb || !pRec || !pAttrSet)
@@ -648,32 +649,93 @@ static void* flexBenchThread(void* pArg)
   dict= pNdb->getDictionary();
   for (int tab= 0; tab<tNoOfTables; tab++)
   {
-    int firstCol= 0;
-    if(useLongKeys)
-      firstCol+= tNoOfLongPK;
+    int numPKs= (useLongKeys ? tNoOfLongPK : 1);
 
-    pRec[tab]= dict->getRecord(tableName[tab]);
-    if (pRec[tab]==NULL) {
-      // This is a fatal error, abort program
-      ndbout << "Failed to allocate NdbRecord in thread" << threadNo;
-      ndbout << endl;
-      tResult = 13;
-      goto end;
+    /* First create NdbRecord for just the primary key(s). */
+    if (!useLongKeys)
+    {
+      recSpec[0].type= NdbDictionary::RecordSpecification::AttrOffsetNotNULL;
+      recSpec[0].colPtr= NULL;
+      recSpec[0].colName= NULL;
+      recSpec[0].colNumber= 0;
+      recSpec[0].dataOffset= 0;
+      pRec[tab]= dict->createRecord(tableName[tab],
+                                    recSpec,
+                                    1,
+                                    sizeof(recSpec[0]));
+    }
+    else
+    {
+      for (Uint32 i= 0; i<tNoOfLongPK; i++)
+      {
+        recSpec[i].type= NdbDictionary::RecordSpecification::AttrOffsetNotNULL;
+        recSpec[i].colPtr= NULL;
+        recSpec[i].colName= longKeyAttrName[i];
+        recSpec[i].dataOffset= sizeof(unsigned)*tSizeOfLongPK*i;
+      }
+      pRec[tab]= dict->createRecord(tableName[tab],
+                                    recSpec,
+                                    tNoOfLongPK,
+                                    sizeof(recSpec[0]));
+    }
+
+    /* Next NdbRecord for just the non-pk attributes. */
+    Uint32 count= 0;
+    for (Uint32 i= 1; i<tNoOfAttributes; i++)
+    {
+      recSpec[count].type= NdbDictionary::RecordSpecification::AttrOffsetNotNULL;
+      recSpec[count].colPtr= NULL;
+      recSpec[count].colName= NULL;
+      recSpec[count].colNumber= i+numPKs-1;
+      recSpec[count].dataOffset= sizeof(int)*tAttributeSize*i;
+      count++;
+    }
+    pRec[tab+tNoOfTables]= dict->createRecord(tableName[tab],
+                                              recSpec,
+                                              count,
+                                              sizeof(recSpec[0]));
+
+    /* And finally NdbRecord for all attributes (for insert). */
+    /* Also test here specifying NdbRecord columns out-of-order. */
+    count= 0;
+    for (Uint32 i= (useLongKeys?1:0); i<tNoOfAttributes; i++)
+    {
+      recSpec[count].type= NdbDictionary::RecordSpecification::AttrOffsetNotNULL;
+      recSpec[count].colPtr= NULL;
+      recSpec[count].colName= NULL;
+      recSpec[count].colNumber= i-1+numPKs;
+      recSpec[count].dataOffset= sizeof(int)*tAttributeSize*i;
+      count++;
     }
-    for (Uint32 col= 1; col<tNoOfAttributes; col++)
-      dict->recAddAttrNotNULL(tableName[tab], pRec[tab], col-1+firstCol,
sizeof(int)*tAttributeSize*col);
     if (useLongKeys)
     {
-      for (Uint32 col= 0; col<tNoOfLongPK; col++)
+      for (Uint32 i= 0; i<tNoOfLongPK; i++)
       {
-        dict->recAddAttrNotNULL(tableName[tab], pRec[tab], longKeyAttrName[col],
-                                sizeof(int)*tAttributeSize*tNoOfAttributes +
-                                sizeof(unsigned)*tSizeOfLongPK*col);
+        recSpec[count].type= NdbDictionary::RecordSpecification::AttrOffsetNotNULL;
+        recSpec[count].colPtr= NULL;
+        recSpec[count].colName= longKeyAttrName[i];
+        recSpec[count].dataOffset= sizeof(int)*tAttributeSize*tNoOfAttributes +
+                                   sizeof(unsigned)*tSizeOfLongPK*i;
+        count++;
       }
     }
+    pRec[tab+2*tNoOfTables]= dict->createRecord(tableName[tab],
+                                                recSpec,
+                                                count,
+                                                sizeof(recSpec[0]));
+
+    if (pRec[tab]==NULL ||
+        pRec[tab+tNoOfTables]==NULL ||
+        pRec[tab+2*tNoOfTables]==NULL) {
+      // This is a fatal error, abort program
+      ndbout << "Failed to allocate NdbRecord in thread" << threadNo;
+      ndbout << endl;
+      tResult = 13;
+      goto end;
+    }
 
     /* Attribute set for reading just one attribute, when verifying delete. */
-    pAttrSet[tab]= dict->getRecAttrSet(tableName[tab], pRec[tab]);
+    pAttrSet[tab]= dict->getRecAttrSet(pRec[tab]);
     if (pAttrSet[tab]==NULL) {
       // This is a fatal error, abort program
       ndbout << "Failed to allocate NdbRecAttrSet in thread" << threadNo;
@@ -682,9 +744,9 @@ static void* flexBenchThread(void* pArg)
       goto end;
     }
     if (useLongKeys)
-      dict->recAttrSetEnable(pAttrSet[tab], tableName[tab], pRec[tab],
longKeyAttrName[0]);
+      dict->recAttrSetEnable(pAttrSet[tab], tableName[tab], longKeyAttrName[0]);
     else
-      dict->recAttrSetEnable(pAttrSet[tab], tableName[tab], pRec[tab], (Uint32)0);
+      dict->recAttrSetEnable(pAttrSet[tab], (Uint32)0);
   }
 
   if(useLongKeys){
@@ -784,64 +846,83 @@ static void* flexBenchThread(void* pArg)
 	   countTables++) {
         int nTableOffset= tAttributeSize*tNoOfAttributes*countTables;
         int *pRow= &attrValue[nTableOffset];
+        char *pRowAttr= (char *)(&attrRefValue[nRefLocalOpOffset]);
+        char *pRowPK= (useLongKeys ?
+                             (char *)longKeyAttrValue[count-1] :
+                             (char *)(&attrRefValue[nRefLocalOpOffset]));
 
-        if ((tType==stInsert || tType==stUpdate) && tNoOfAttributes>1)
+        /* For insert, we need a single row with both pk and non-pk attrs. */
+        if (tType==stInsert && theWriteFlag!=1)
         {
           /* Copy the non-PK columns to send to the server. */
-          memcpy(&pRow[tAttributeSize],
-                 &attrRefValue[nRefLocalOpOffset+tAttributeSize],
-                 (tNoOfAttributes-1)*tAttributeSize*sizeof(int));
-        }
-        /* Copy the primary key(s). */
-        if (useLongKeys)
-        {
-          memcpy(pRow+tAttributeSize*tNoOfAttributes,
-                 longKeyAttrValue[count-1],
-                 tNoOfLongPK*tSizeOfLongPK*sizeof(unsigned));
-        }
-        else
-        {
-          pRow[0]= attrRefValue[nRefLocalOpOffset];
+          if (tNoOfAttributes>1)
+            memcpy(&pRow[tAttributeSize],
+                   &attrRefValue[nRefLocalOpOffset+tAttributeSize],
+                   (tNoOfAttributes-1)*tAttributeSize*sizeof(int));
+          /* Copy the primary key(s). */
+          if (useLongKeys)
+          {
+            memcpy(pRow+tAttributeSize*tNoOfAttributes,
+                   longKeyAttrValue[count-1],
+                   tNoOfLongPK*tSizeOfLongPK*sizeof(unsigned));
+          }
+          else
+          {
+            pRow[0]= attrRefValue[nRefLocalOpOffset];
+          }
         }
 
-        NdbRecord *record= pRec[countTables];
+        const NdbRecord *pk_record= pRec[countTables];
+        const NdbRecord *attr_record= pRec[countTables+tNoOfTables];
+        const NdbRecord *all_record= pRec[countTables+2*tNoOfTables];
         const char *tabName= tableName[countTables];
 
 	switch (tType) {
 	case stInsert:          // Insert case
 	  if (theWriteFlag == 1 && theDirtyFlag == 1)
-	    pOps[countTables]= pTrans->dirtyWriteTuple(tabName, record, (char *)pRow);
+	    pOps[countTables]= pTrans->dirtyWriteTuple(tabName, pk_record, pRowPK,
+                                                       attr_record, pRowAttr);
 	  else if (theWriteFlag == 1)
-	    pOps[countTables]= pTrans->writeTuple(tabName, record, (char *)pRow);
+	    pOps[countTables]= pTrans->writeTuple(tabName, pk_record, pRowPK,
+                                                  attr_record, pRowAttr);
 	  else
-	    pOps[countTables]= pTrans->insertTuple(tabName, record, (char *)pRow);
+	    pOps[countTables]= pTrans->insertTuple(tabName, all_record, (char *)pRow);
 	  break;
 	case stRead:            // Read Case
 	  if (theSimpleFlag == 1)
-	    pOps[countTables]= pTrans->simpleReadTuple(tabName, record, (char *)pRow);
+	    pOps[countTables]= pTrans->simpleReadTuple(tabName, pk_record, pRowPK,
+                                                       attr_record, (char *)pRow);
 	  else if (theDirtyFlag == 1)
-	    pOps[countTables]= pTrans->dirtyReadTuple(tabName, record, (char *)pRow);
+	    pOps[countTables]= pTrans->dirtyReadTuple(tabName, pk_record, pRowPK,
+                                                      attr_record, (char *)pRow);
 	  else
-	    pOps[countTables]= pTrans->readTuple(tabName, record, (char *)pRow);
+	    pOps[countTables]= pTrans->readTuple(tabName, pk_record, pRowPK,
+                                                 attr_record, (char *)pRow);
 	  break;
 	case stUpdate:          // Update Case
 	  if (theWriteFlag == 1 && theDirtyFlag == 1)
-	    pOps[countTables]= pTrans->dirtyWriteTuple(tabName, record, (char *)pRow);
+	    pOps[countTables]= pTrans->dirtyWriteTuple(tabName, pk_record, pRowPK,
+                                                       attr_record, pRowAttr);
 	  else if (theWriteFlag == 1)
-	    pOps[countTables]= pTrans->writeTuple(tabName, record, (char *)pRow);
+	    pOps[countTables]= pTrans->writeTuple(tabName, pk_record, pRowPK,
+                                                  attr_record, pRowAttr);
 	  else if (theDirtyFlag == 1)
-	    pOps[countTables]= pTrans->dirtyUpdateTuple(tabName, record, (char *)pRow);
+	    pOps[countTables]= pTrans->dirtyUpdateTuple(tabName, pk_record, pRowPK,
+                                                       attr_record, pRowAttr);
 	  else
-	    pOps[countTables]= pTrans->updateTuple(tabName, record, (char *)pRow);
+	    pOps[countTables]= pTrans->updateTuple(tabName, pk_record, pRowPK,
+                                                       attr_record, pRowAttr);
 	  break;
 	case stDelete:          // Delete Case
-	  pOps[countTables]= pTrans->deleteTuple(tabName, record, (char *)pRow);
+	  pOps[countTables]= pTrans->deleteTuple(tabName, pk_record, pRowPK);
 	  break;
 	case stVerify:
-	  pOps[countTables]= pTrans->readTuple(tabName, record, (char *)pRow);
+	  pOps[countTables]= pTrans->readTuple(tabName, pk_record, pRowPK,
+                                               attr_record, (char *)pRow);
 	  break;
 	case stVerifyDelete:
-	  pOps[countTables]= pTrans->readTuple(tabName, record, (char *)pRow,
+	  pOps[countTables]= pTrans->readTuple(tabName, pk_record, pRowPK,
+                                               pk_record, (char *)pRow,
                                                pAttrSet[countTables]);
 	  break;
 	default:
@@ -988,7 +1069,7 @@ static void* flexBenchThread(void* pArg)
   }
   if(pRec)
   {
-    for (Uint32 i= 0; i<tNoOfTables; i++)
+    for (Uint32 i= 0; i<tNoOfTables*3; i++)
       if (pRec[i])
         dict->releaseRecord(pRec[i]);
     free(pRec);

--- 1.78/storage/ndb/src/ndbapi/ndberror.c	2006-12-13 14:11:36 +01:00
+++ 1.79/storage/ndb/src/ndbapi/ndberror.c	2006-12-13 14:11:36 +01:00
@@ -616,6 +616,10 @@ ErrorBundle ErrorCodes[] = {
   { 4273, DMEC, IE, "No blob table in dict cache" },
   { 4274, DMEC, IE, "Corrupted main table PK in blob operation" },
   { 4275, DMEC, AE, "The blob method is incompatible with operation type or lock mode" },
+  { 4276, DMEC, AE, "API version mismatch or wrong
sizeof(NdbDictionary::RecordSpecification)" },
+  { 4277, DMEC, AE, "Missing column specification in NdbDictionary::RecordSpecification"
},
+  { 4278, DMEC, AE, "Duplicate column specification in
NdbDictionary::RecordSpecification" },
+  { 4279, DMEC, AE, "NdbRecord for tuple access is not a primary key NdbRecord" },
 };
 
 static
Thread
bk commit into 5.1 tree (knielsen:1.2345)knielsen13 Dec