List:Commits« Previous MessageNext Message »
From:pekka Date:May 3 2006 7:05pm
Subject:bk commit into 5.1 tree (pekka:1.2391)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of pekka. When pekka does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2391 06/05/03 21:05:21 pekka@stripped +7 -0
  Merge pnousiainen@stripped:/home/bk/mysql-5.0
  into  mysql.com:/space/pekka/ndb/version/my51-tmp

  storage/ndb/src/ndbapi/NdbBlob.cpp
    1.44 06/05/03 21:04:58 pekka@stripped +1 -1
    manual merge

  storage/ndb/tools/delete_all.cpp
    1.20 06/05/03 21:03:12 pekka@stripped +0 -0
    Auto merged

  storage/ndb/test/ndbapi/testBlobs.cpp
    1.32 06/05/03 21:03:12 pekka@stripped +0 -0
    Auto merged

  storage/ndb/include/ndbapi/NdbBlob.hpp
    1.22 06/05/03 21:03:12 pekka@stripped +0 -0
    Auto merged

  storage/ndb/include/kernel/signaldata/TcKeyReq.hpp
    1.8 06/05/03 21:03:12 pekka@stripped +0 -0
    Auto merged

  storage/ndb/tools/delete_all.cpp
    1.16.2.2 06/05/03 21:03:11 pekka@stripped +0 -0
    Merge rename: ndb/tools/delete_all.cpp -> storage/ndb/tools/delete_all.cpp

  storage/ndb/test/ndbapi/testBlobs.cpp
    1.20.12.2 06/05/03 21:03:11 pekka@stripped +0 -0
    Merge rename: ndb/test/ndbapi/testBlobs.cpp -> storage/ndb/test/ndbapi/testBlobs.cpp

  storage/ndb/src/ndbapi/NdbBlob.cpp
    1.22.5.2 06/05/03 21:03:11 pekka@stripped +0 -0
    Merge rename: ndb/src/ndbapi/NdbBlob.cpp -> storage/ndb/src/ndbapi/NdbBlob.cpp

  storage/ndb/include/ndbapi/NdbBlob.hpp
    1.15.2.2 06/05/03 21:03:11 pekka@stripped +0 -0
    Merge rename: ndb/include/ndbapi/NdbBlob.hpp -> storage/ndb/include/ndbapi/NdbBlob.hpp

  storage/ndb/include/kernel/signaldata/TcKeyReq.hpp
    1.5.1.2 06/05/03 21:03:11 pekka@stripped +0 -0
    Merge rename: ndb/include/kernel/signaldata/TcKeyReq.hpp -> storage/ndb/include/kernel/signaldata/TcKeyReq.hpp

  sql/sql_table.cc
    1.327 06/05/03 21:03:11 pekka@stripped +0 -0
    Auto merged

  mysql-test/r/ndb_blob.result
    1.18 06/05/03 21:03:11 pekka@stripped +0 -0
    Auto merged

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	pekka
# Host:	orca.ndb.mysql.com
# Root:	/space/pekka/ndb/version/my51-tmp/RESYNC

--- 1.326/sql/sql_table.cc	2006-05-01 19:41:35 +02:00
+++ 1.327/sql/sql_table.cc	2006-05-03 21:03:11 +02:00
@@ -3545,7 +3545,9 @@
     }
   }
   delete file;
-  if (error)
+  if (error == HA_ERR_WRONG_COMMAND)
+    my_error(ER_NOT_SUPPORTED_YET, MYF(0), "ALTER TABLE");
+  else if (error)
     my_error(ER_ERROR_ON_RENAME, MYF(0), from, to, error);
   DBUG_RETURN(error != 0);
 }

--- 1.5.1.1/ndb/include/kernel/signaldata/TcKeyReq.hpp	2006-05-03 20:01:39 +02:00
+++ 1.8/storage/ndb/include/kernel/signaldata/TcKeyReq.hpp	2006-05-03 21:03:12 +02:00
@@ -128,8 +128,6 @@
   static Uint8 getSimpleFlag(const UintR & requestInfo);
   static Uint8 getDirtyFlag(const UintR & requestInfo);
   static Uint8 getInterpretedFlag(const UintR & requestInfo);
-  static Uint8 getDistributionGroupFlag(const UintR & requestInfo);
-  static Uint8 getDistributionGroupTypeFlag(const UintR & requestInfo);
   static Uint8 getDistributionKeyFlag(const UintR & requestInfo);
   static Uint8 getScanIndFlag(const UintR & requestInfo);
   static Uint8 getOperationType(const UintR & requestInfo);
@@ -138,6 +136,7 @@
   static Uint16 getKeyLength(const UintR & requestInfo);
   static Uint8  getAIInTcKeyReq(const UintR & requestInfo);
   static Uint8  getExecutingTrigger(const UintR & requestInfo);
+  static UintR  getNoDiskFlag(const UintR & requestInfo);
 
   /**
    * Get:ers for scanInfo
@@ -157,8 +156,6 @@
   static void setSimpleFlag(UintR & requestInfo, Uint32 flag);
   static void setDirtyFlag(UintR & requestInfo, Uint32 flag);
   static void setInterpretedFlag(UintR & requestInfo, Uint32 flag);
-  static void setDistributionGroupFlag(UintR & requestInfo, Uint32 flag);
-  static void setDistributionGroupTypeFlag(UintR & requestInfo, Uint32 flag);
   static void setDistributionKeyFlag(UintR & requestInfo, Uint32 flag);
   static void setScanIndFlag(UintR & requestInfo, Uint32 flag);
   static void setExecuteFlag(UintR & requestInfo, Uint32 flag);  
@@ -167,6 +164,7 @@
   static void setKeyLength(UintR & requestInfo, Uint32 len);
   static void setAIInTcKeyReq(UintR & requestInfo, Uint32 len);
   static void setExecutingTrigger(UintR & requestInfo, Uint32 flag);
+  static void setNoDiskFlag(UintR & requestInfo, UintR val);
 
   /**
    * Set:ers for scanInfo
@@ -185,29 +183,27 @@
  d = Dirty Indicator       - 1  Bit 0
  e = Scan Indicator        - 1  Bit 14
  f = Execute fired trigger - 1  Bit 19 
- g = Distribution Group Ind- 1  Bit 1
  i = Interpreted Indicator - 1  Bit 15
  k = Key length            - 12 Bits -> Max 4095 (Bit 20 - 31)
  o = Operation Type        - 3  Bits -> Max 7 (Bit 5-7)
  l = Execute               - 1  Bit 10
  p = Simple Indicator      - 1  Bit 8
  s = Start Indicator       - 1  Bit 11
- t = Distribution GroupType- 1  Bit 3
  y = Commit Type           - 2  Bit 12-13
+ n = No disk flag          - 1  Bit 1
 
            1111111111222222222233
  01234567890123456789012345678901
- dgbtcooop lsyyeiaaafkkkkkkkkkkkk
+ dnb cooop lsyyeiaaafkkkkkkkkkkkk
 */
 
+#define TCKEY_NODISK_SHIFT (1)
 #define COMMIT_SHIFT       (4)
 #define START_SHIFT        (11)
 #define SIMPLE_SHIFT       (8)
 #define DIRTY_SHIFT        (0)
 #define EXECUTE_SHIFT      (10)
 #define INTERPRETED_SHIFT  (15)
-#define DISTR_GROUP_SHIFT  (1)
-#define DISTR_GROUP_TYPE_SHIFT  (3)
 #define DISTR_KEY_SHIFT    (2)
 #define SCAN_SHIFT         (14)
 
@@ -306,18 +302,6 @@
 
 inline
 Uint8
-TcKeyReq::getDistributionGroupFlag(const UintR & requestInfo){
-  return (Uint8)((requestInfo >> DISTR_GROUP_SHIFT) & 1);
-}
-
-inline
-Uint8
-TcKeyReq::getDistributionGroupTypeFlag(const UintR & requestInfo){
-  return (Uint8)((requestInfo >> DISTR_GROUP_TYPE_SHIFT) & 1);
-}
-
-inline
-Uint8
 TcKeyReq::getDistributionKeyFlag(const UintR & requestInfo){
   return (Uint8)((requestInfo >> DISTR_KEY_SHIFT) & 1);
 }
@@ -416,22 +400,6 @@
 
 inline
 void 
-TcKeyReq::setDistributionGroupTypeFlag(UintR & requestInfo, Uint32 flag){
-  ASSERT_BOOL(flag, "TcKeyReq::setDistributionGroupTypeFlag");
-  requestInfo &= ~(1 << DISTR_GROUP_TYPE_SHIFT);
-  requestInfo |= (flag << DISTR_GROUP_TYPE_SHIFT);
-}
-
-inline
-void 
-TcKeyReq::setDistributionGroupFlag(UintR & requestInfo, Uint32 flag){
-  ASSERT_BOOL(flag, "TcKeyReq::setDistributionGroupFlag");
-  requestInfo &= ~(1 << DISTR_GROUP_SHIFT);
-  requestInfo |= (flag << DISTR_GROUP_SHIFT);
-}
-
-inline
-void 
 TcKeyReq::setDistributionKeyFlag(UintR & requestInfo, Uint32 flag){
   ASSERT_BOOL(flag, "TcKeyReq::setDistributionKeyFlag");
   requestInfo &= ~(1 << DISTR_KEY_SHIFT);
@@ -545,5 +513,18 @@
   anAttrLen |= aiLen;
 }
 
+inline
+UintR 
+TcKeyReq::getNoDiskFlag(const UintR & requestInfo){
+  return (requestInfo >> TCKEY_NODISK_SHIFT) & 1;
+}
+
+inline
+void 
+TcKeyReq::setNoDiskFlag(UintR & requestInfo, Uint32 flag){
+  ASSERT_BOOL(flag, "TcKeyReq::setNoDiskFlag");
+  requestInfo &= ~(1 << TCKEY_NODISK_SHIFT);
+  requestInfo |= (flag << TCKEY_NODISK_SHIFT);
+}
 
 #endif

--- 1.20.12.1/ndb/test/ndbapi/testBlobs.cpp	2006-05-03 20:01:40 +02:00
+++ 1.32/storage/ndb/test/ndbapi/testBlobs.cpp	2006-05-03 21:03:12 +02:00
@@ -226,7 +226,7 @@
 {
   NdbDictionary::Table tab(g_opt.m_tname);
   if (g_dic->getTable(g_opt.m_tname) != 0)
-    CHK(g_dic->dropTable(tab) == 0);
+    CHK(g_dic->dropTable(g_opt.m_tname) == 0);
   return 0;
 }
 

--- 1.16.2.1/ndb/tools/delete_all.cpp	2006-05-03 20:04:16 +02:00
+++ 1.20/storage/ndb/tools/delete_all.cpp	2006-05-03 21:03:12 +02:00
@@ -139,7 +139,8 @@
       goto failed;
     }
     
-    if( pOp->readTuples(NdbOperation::LM_Exclusive,par) ) {
+    if( pOp->readTuples(NdbOperation::LM_Exclusive, 
+			NdbScanOperation::SF_TupScan, par) ) {
       goto failed;
     }
     

--- 1.17/mysql-test/r/ndb_blob.result	2006-02-09 11:34:34 +01:00
+++ 1.18/mysql-test/r/ndb_blob.result	2006-05-03 21:03:11 +02:00
@@ -481,14 +481,22 @@
 insert into t1 (msg) values(
 'Tries to validate (8 byte length + inline bytes) as UTF8 :(
 Fast fix: removed validation for Text.  It is not yet indexable
-so bad data will not crash kernel.
-Proper fix: Set inline bytes to multiple of mbmaxlen and
-validate it (after the 8 byte length).');
+so bad data will not crash kernel.');
 select * from t1;
 id	msg
 1	Tries to validate (8 byte length + inline bytes) as UTF8 :(
 Fast fix: removed validation for Text.  It is not yet indexable
 so bad data will not crash kernel.
-Proper fix: Set inline bytes to multiple of mbmaxlen and
-validate it (after the 8 byte length).
+drop table t1;
+create table t1 (
+a int primary key not null auto_increment,
+b text
+) engine=ndbcluster;
+select count(*) from t1;
+count(*)
+500
+truncate t1;
+select count(*) from t1;
+count(*)
+0
 drop table t1;

--- 1.15.2.1/ndb/include/ndbapi/NdbBlob.hpp	2006-05-03 20:01:39 +02:00
+++ 1.22/storage/ndb/include/ndbapi/NdbBlob.hpp	2006-05-03 21:03:12 +02:00
@@ -28,6 +28,7 @@
 class NdbRecAttr;
 class NdbTableImpl;
 class NdbColumnImpl;
+class NdbEventOperationImpl;
 
 /**
  * @class NdbBlob
@@ -71,8 +72,12 @@
  * writes.  It avoids execute penalty if nothing is pending.  It is not
  * needed after execute (obviously) or after next scan result.
  *
- * NdbBlob methods return -1 on error and 0 on success, and use output
- * parameters when necessary.
+ * NdbBlob also supports reading post or pre blob data from events.  The
+ * handle can be read after next event on main table has been retrieved.
+ * The data is available immediately.  See NdbEventOperation.
+ *
+ * Non-void NdbBlob methods return -1 on error and 0 on success.  Output
+ * parameters are used when necessary.
  *
  * Operation types:
  * - insertTuple must use setValue if blob column is non-nullable
@@ -112,6 +117,11 @@
    */
   State getState();
   /**
+   * Returns -1 for normal statement based blob and 0/1 for event
+   * operation post/pre data blob.  Always succeeds.
+   */
+  void getVersion(int& version);
+  /**
    * Inline blob header.
    */
   struct Head {
@@ -145,10 +155,15 @@
    * then the callback is invoked.
    */
   int setActiveHook(ActiveHook* activeHook, void* arg);
+#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
+  int getDefined(int& isNull);
+  int getNull(bool& isNull);
+#endif
   /**
-   * Check if blob is null.
+   * Return -1, 0, 1 if blob is undefined, non-null, or null.  For
+   * non-event blob, undefined causes a state error.
    */
-  int getNull(bool& isNull);
+  int getNull(int& isNull);
   /**
    * Set blob to NULL.
    */
@@ -192,8 +207,13 @@
    */
   static int getBlobTableName(char* btname, Ndb* anNdb, const char* tableName, const char* columnName);
   /**
-   * Return error object.  The error may be blob specific (below) or may
-   * be copied from a failed implicit operation.
+   * Get blob event name.  The blob event is created if the main event
+   * monitors the blob column.  The name includes main event name.
+   */
+  static int getBlobEventName(char* bename, Ndb* anNdb, const char* eventName, const char* columnName);
+  /**
+   * Return error object.  The error may be blob specific or may be
+   * copied from a failed implicit operation.
    */
   const NdbError& getNdbError() const;
   /**
@@ -217,17 +237,29 @@
   friend class NdbScanOperation;
   friend class NdbDictionaryImpl;
   friend class NdbResultSet; // atNextResult
+  friend class NdbEventBuffer;
+  friend class NdbEventOperationImpl;
 #endif
   // state
   State theState;
   void setState(State newState);
+  // quick and dirty support for events (consider subclassing)
+  int theEventBlobVersion; // -1=normal blob 0=post event 1=pre event
   // define blob table
   static void getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c);
   static void getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnImpl* c);
+  static void getBlobEventName(char* bename, const NdbEventImpl* e, const NdbColumnImpl* c);
+  static void getBlobEvent(NdbEventImpl& be, const NdbEventImpl* e, const NdbColumnImpl* c);
   // ndb api stuff
   Ndb* theNdb;
   NdbTransaction* theNdbCon;
   NdbOperation* theNdbOp;
+  NdbEventOperationImpl* theEventOp;
+  NdbEventOperationImpl* theBlobEventOp;
+  NdbRecAttr* theBlobEventPkRecAttr;
+  NdbRecAttr* theBlobEventDistRecAttr;
+  NdbRecAttr* theBlobEventPartRecAttr;
+  NdbRecAttr* theBlobEventDataRecAttr;
   const NdbTableImpl* theTable;
   const NdbTableImpl* theAccessTable;
   const NdbTableImpl* theBlobTable;
@@ -256,13 +288,17 @@
     Buf();
     ~Buf();
     void alloc(unsigned n);
+    void zerorest();
     void copyfrom(const Buf& src);
   };
   Buf theKeyBuf;
   Buf theAccessKeyBuf;
+  Buf thePackKeyBuf;
   Buf theHeadInlineBuf;
   Buf theHeadInlineCopyBuf;     // for writeTuple
   Buf thePartBuf;
+  Buf theBlobEventDataBuf;
+  Uint32 thePartNumber;         // for event
   Head* theHead;
   char* theInlineData;
   NdbRecAttr* theHeadInlineRecAttr;
@@ -295,6 +331,9 @@
   Uint32 getPartNumber(Uint64 pos);
   Uint32 getPartCount();
   Uint32 getDistKey(Uint32 part);
+  // pack / unpack
+  int packKeyValue(const NdbTableImpl* aTable, const Buf& srcBuf);
+  int unpackKeyValue(const NdbTableImpl* aTable, Buf& dstBuf);
   // getters and setters
   int getTableKeyValue(NdbOperation* anOp);
   int setTableKeyValue(NdbOperation* anOp);
@@ -307,6 +346,8 @@
   int readDataPrivate(char* buf, Uint32& bytes);
   int writeDataPrivate(const char* buf, Uint32 bytes);
   int readParts(char* buf, Uint32 part, Uint32 count);
+  int readTableParts(char* buf, Uint32 part, Uint32 count);
+  int readEventParts(char* buf, Uint32 part, Uint32 count);
   int insertParts(const char* buf, Uint32 part, Uint32 count);
   int updateParts(const char* buf, Uint32 part, Uint32 count);
   int deleteParts(Uint32 part, Uint32 count);
@@ -318,19 +359,23 @@
   int invokeActiveHook();
   // blob handle maintenance
   int atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl* aColumn);
+  int atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, const NdbColumnImpl* aColumn, int version);
+  int prepareColumn();
   int preExecute(NdbTransaction::ExecType anExecType, bool& batch);
   int postExecute(NdbTransaction::ExecType anExecType);
   int preCommit();
   int atNextResult();
+  int atNextEvent();
   // errors
   void setErrorCode(int anErrorCode, bool invalidFlag = true);
   void setErrorCode(NdbOperation* anOp, bool invalidFlag = true);
   void setErrorCode(NdbTransaction* aCon, bool invalidFlag = true);
+  void setErrorCode(NdbEventOperationImpl* anOp, bool invalidFlag = true);
 #ifdef VM_TRACE
   int getOperationType() const;
   friend class NdbOut& operator<<(NdbOut&, const NdbBlob&);
 #endif
-
+  // list stuff
   void next(NdbBlob* obj) { theNext= obj;}
   NdbBlob* next() { return theNext;}
   friend struct Ndb_free_list_t<NdbBlob>;

--- 1.22.5.1/ndb/src/ndbapi/NdbBlob.cpp	2006-05-03 20:01:39 +02:00
+++ 1.44/storage/ndb/src/ndbapi/NdbBlob.cpp	2006-05-03 21:04:58 +02:00
@@ -23,6 +23,7 @@
 #include <NdbBlob.hpp>
 #include "NdbBlobImpl.hpp"
 #include <NdbScanOperation.hpp>
+#include <NdbEventOperationImpl.hpp>
 #include <signaldata/TcKeyReq.hpp>
 
 /*
@@ -31,7 +32,21 @@
  */
 static const bool g_ndb_blob_ok_to_read_index_table = false;
 
-// state (inline)
+// get state
+
+NdbBlob::State
+NdbBlob::getState()
+{
+  return theState;
+}
+
+void
+NdbBlob::getVersion(int& version)
+{
+  version = theEventBlobVersion;
+}
+
+// set state (inline)
 
 inline void
 NdbBlob::setState(State newState)
@@ -47,32 +62,72 @@
 int
 NdbBlob::getBlobTableName(char* btname, Ndb* anNdb, const char* tableName, const char* columnName)
 {
+  DBUG_ENTER("NdbBlob::getBlobTableName");
   NdbTableImpl* t = anNdb->theDictionary->m_impl.getTable(tableName);
   if (t == NULL)
-    return -1;
+    DBUG_RETURN(-1);
   NdbColumnImpl* c = t->getColumn(columnName);
   if (c == NULL)
-    return -1;
+    DBUG_RETURN(-1);
   getBlobTableName(btname, t, c);
-  return 0;
+  DBUG_RETURN(0);
 }
 
 void
 NdbBlob::getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c)
 {
-  assert(t != 0 && c != 0 && c->getBlobType());
+  DBUG_ENTER("NdbBlob::getBlobTableName");
+  assert(t != 0 && c != 0 && c->getBlobType() && c->getPartSize() != 0);
   memset(btname, 0, NdbBlobImpl::BlobTableNameSize);
-  sprintf(btname, "NDB$BLOB_%d_%d", (int)t->m_tableId, (int)c->m_attrId);
+  sprintf(btname, "NDB$BLOB_%d_%d", (int)t->m_id, (int)c->m_column_no);
+  DBUG_PRINT("info", ("blob table name: %s", btname));
+  DBUG_VOID_RETURN;
 }
 
 void
 NdbBlob::getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnImpl* c)
 {
+  DBUG_ENTER("NdbBlob::getBlobTable");
   char btname[NdbBlobImpl::BlobTableNameSize];
   getBlobTableName(btname, t, c);
   bt.setName(btname);
   bt.setLogging(t->getLogging());
-  bt.setFragmentType(t->getFragmentType());
+  /*
+    BLOB tables use the same fragmentation as the original table
+    but may change the fragment type if it is UserDefined since it
+    must be hash based so that the kernel can handle it on its own.
+    It also uses the same tablespaces and it never uses any range or
+    list arrays.
+  */
+  bt.m_primaryTableId = t->m_id;
+  bt.m_fd.clear();
+  bt.m_ts.clear();
+  bt.m_range.clear();
+  bt.setFragmentCount(t->getFragmentCount());
+  bt.m_tablespace_id = t->m_tablespace_id;
+  bt.m_tablespace_version = t->m_tablespace_version;
+  switch (t->getFragmentType())
+  {
+    case NdbDictionary::Object::FragAllSmall:
+    case NdbDictionary::Object::FragAllMedium:
+    case NdbDictionary::Object::FragAllLarge:
+    case NdbDictionary::Object::FragSingle:
+      bt.setFragmentType(t->getFragmentType());
+      break;
+    case NdbDictionary::Object::DistrKeyLin:
+    case NdbDictionary::Object::DistrKeyHash:
+      bt.setFragmentType(t->getFragmentType());
+      break;
+    case NdbDictionary::Object::UserDefined:
+      bt.setFragmentType(NdbDictionary::Object::DistrKeyHash);
+      break;
+    default:
+      DBUG_ASSERT(0);
+      break;
+  }
+  DBUG_PRINT("info",
+  ("Create BLOB table with primary table = %u and Fragment Type = %u",
+    bt.m_primaryTableId, (uint)bt.getFragmentType()));
   { NdbDictionary::Column bc("PK");
     bc.setType(NdbDictionary::Column::Unsigned);
     assert(t->m_keyLenInWords != 0);
@@ -106,8 +161,67 @@
       break;
     }
     bc.setLength(c->getPartSize());
+    bc.setStorageType(c->getStorageType());
     bt.addColumn(bc);
   }
+  DBUG_VOID_RETURN;
+}
+
+int
+NdbBlob::getBlobEventName(char* bename, Ndb* anNdb, const char* eventName, const char* columnName)
+{
+  NdbEventImpl* e = anNdb->theDictionary->m_impl.getEvent(eventName);
+  if (e == NULL)
+    return -1;
+  NdbColumnImpl* c = e->m_tableImpl->getColumn(columnName);
+  if (c == NULL)
+    return -1;
+  getBlobEventName(bename, e, c);
+  delete e; // it is from new NdbEventImpl
+  return 0;
+}
+
+void
+NdbBlob::getBlobEventName(char* bename, const NdbEventImpl* e, const NdbColumnImpl* c)
+{
+  // XXX events should have object id
+  snprintf(bename, MAX_TAB_NAME_SIZE, "NDB$BLOBEVENT_%s_%d", e->m_name.c_str(), (int)c->m_column_no);
+}
+
+void
+NdbBlob::getBlobEvent(NdbEventImpl& be, const NdbEventImpl* e, const NdbColumnImpl* c)
+{
+  DBUG_ENTER("NdbBlob::getBlobEvent");
+  // blob table
+  assert(c->m_blobTable != NULL);
+  const NdbTableImpl& bt = *c->m_blobTable;
+  // blob event name
+  char bename[MAX_TAB_NAME_SIZE+1];
+  getBlobEventName(bename, e, c);
+  bename[sizeof(bename)-1]= 0;
+  be.setName(bename);
+  be.setTable(bt);
+  // simple assigments
+  be.mi_type = e->mi_type;
+  be.m_dur = e->m_dur;
+  be.m_mergeEvents = e->m_mergeEvents;
+  // report unchanged data
+  // not really needed now since UPD is DEL o INS and we subscribe to all
+  be.setReport(NdbDictionary::Event::ER_ALL);
+  // columns PK - DIST - PART - DATA
+  { const NdbColumnImpl* bc = bt.getColumn((Uint32)0);
+    be.addColumn(*bc);
+  }
+  { const NdbColumnImpl* bc = bt.getColumn((Uint32)1);
+    be.addColumn(*bc);
+  }
+  { const NdbColumnImpl* bc = bt.getColumn((Uint32)2);
+    be.addColumn(*bc);
+  }
+  { const NdbColumnImpl* bc = bt.getColumn((Uint32)3);
+    be.addColumn(*bc);
+  }
+  DBUG_VOID_RETURN;
 }
 
 // initialization
@@ -121,9 +235,16 @@
 NdbBlob::init()
 {
   theState = Idle;
+  theEventBlobVersion = -1;
   theNdb = NULL;
   theNdbCon = NULL;
   theNdbOp = NULL;
+  theEventOp = NULL;
+  theBlobEventOp = NULL;
+  theBlobEventPkRecAttr = NULL;
+  theBlobEventDistRecAttr = NULL;
+  theBlobEventPartRecAttr = NULL;
+  theBlobEventDataRecAttr = NULL;
   theTable = NULL;
   theAccessTable = NULL;
   theBlobTable = NULL;
@@ -189,9 +310,16 @@
 }
 
 void
+NdbBlob::Buf::zerorest()
+{
+  assert(size <= maxsize);
+  memset(data + size, 0, maxsize - size);
+}
+
+void
 NdbBlob::Buf::copyfrom(const NdbBlob::Buf& src)
 {
-  assert(size == src.size);
+  size = src.size;
   memcpy(data, src.data, size);
 }
 
@@ -296,6 +424,76 @@
   return (part / theStripeSize) % theStripeSize;
 }
 
+// pack/unpack table/index key  XXX support routines, shortcuts
+
+int
+NdbBlob::packKeyValue(const NdbTableImpl* aTable, const Buf& srcBuf)
+{
+  DBUG_ENTER("NdbBlob::packKeyValue");
+  const Uint32* data = (const Uint32*)srcBuf.data;
+  unsigned pos = 0;
+  Uint32* pack_data = (Uint32*)thePackKeyBuf.data;
+  unsigned pack_pos = 0;
+  for (unsigned i = 0; i < aTable->m_columns.size(); i++) {
+    NdbColumnImpl* c = aTable->m_columns[i];
+    assert(c != NULL);
+    if (c->m_pk) {
+      unsigned len = c->m_attrSize * c->m_arraySize;
+      Uint32 pack_len;
+      bool ok = c->get_var_length(&data[pos], pack_len);
+      if (! ok) {
+        setErrorCode(NdbBlobImpl::ErrCorruptPK);
+        DBUG_RETURN(-1);
+      }
+      memcpy(&pack_data[pack_pos], &data[pos], pack_len);
+      while (pack_len % 4 != 0) {
+        char* p = (char*)&pack_data[pack_pos] + pack_len++;
+        *p = 0;
+      }
+      pos += (len + 3) / 4;
+      pack_pos += pack_len / 4;
+    }
+  }
+  assert(4 * pos == srcBuf.size);
+  assert(4 * pack_pos <= thePackKeyBuf.maxsize);
+  thePackKeyBuf.size = 4 * pack_pos;
+  thePackKeyBuf.zerorest();
+  DBUG_RETURN(0);
+}
+
+int
+NdbBlob::unpackKeyValue(const NdbTableImpl* aTable, Buf& dstBuf)
+{
+  DBUG_ENTER("NdbBlob::unpackKeyValue");
+  Uint32* data = (Uint32*)dstBuf.data;
+  unsigned pos = 0;
+  const Uint32* pack_data = (const Uint32*)thePackKeyBuf.data;
+  unsigned pack_pos = 0;
+  for (unsigned i = 0; i < aTable->m_columns.size(); i++) {
+    NdbColumnImpl* c = aTable->m_columns[i];
+    assert(c != NULL);
+    if (c->m_pk) {
+      unsigned len = c->m_attrSize * c->m_arraySize;
+      Uint32 pack_len;
+      bool ok = c->get_var_length(&pack_data[pack_pos], pack_len);
+      if (! ok) {
+        setErrorCode(NdbBlobImpl::ErrCorruptPK);
+        DBUG_RETURN(-1);
+      }
+      memcpy(&data[pos], &pack_data[pack_pos], pack_len);
+      while (pack_len % 4 != 0) {
+        char* p = (char*)&data[pos] + pack_len++;
+        *p = 0;
+      }
+      pos += (len + 3) / 4;
+      pack_pos += pack_len / 4;
+    }
+  }
+  assert(4 * pos == dstBuf.size);
+  assert(4 * pack_pos == thePackKeyBuf.size);
+  DBUG_RETURN(0);
+}
+
 // getters and setters
 
 int
@@ -338,13 +536,15 @@
     assert(c != NULL);
     if (c->m_pk) {
       unsigned len = c->m_attrSize * c->m_arraySize;
-      if (anOp->equal_impl(c, (const char*)&data[pos], len) == -1) {
+      if (anOp->equal_impl(c, (const char*)&data[pos]) == -1) {
         setErrorCode(anOp);
         DBUG_RETURN(-1);
       }
       pos += (len + 3) / 4;
     }
   }
+  if (theNdbOp->theDistrKeyIndicator_)
+    anOp->setPartitionId(theNdbOp->getPartitionId());
   assert(pos == theKeyBuf.size / 4);
   DBUG_RETURN(0);
 }
@@ -362,7 +562,7 @@
     assert(c != NULL);
     if (c->m_pk) {
       unsigned len = c->m_attrSize * c->m_arraySize;
-      if (anOp->equal_impl(c, (const char*)&data[pos], len) == -1) {
+      if (anOp->equal_impl(c, (const char*)&data[pos]) == -1) {
         setErrorCode(anOp);
         DBUG_RETURN(-1);
       }
@@ -377,12 +577,10 @@
 NdbBlob::setPartKeyValue(NdbOperation* anOp, Uint32 part)
 {
   DBUG_ENTER("NdbBlob::setPartKeyValue");
-  DBUG_PRINT("info", ("dist=%u part=%u key=", getDistKey(part), part));
-  DBUG_DUMP("info", theKeyBuf.data, 4 * theTable->m_keyLenInWords);
-  Uint32* data = (Uint32*)theKeyBuf.data;
-  unsigned size = theTable->m_keyLenInWords;
+  DBUG_PRINT("info", ("dist=%u part=%u packkey=", getDistKey(part), part));
+  DBUG_DUMP("info", thePackKeyBuf.data, 4 * thePackKeyBuf.size);
   // TODO use attr ids after compatibility with 4.1.7 not needed
-  if (anOp->equal("PK", theKeyBuf.data) == -1 ||
+  if (anOp->equal("PK", thePackKeyBuf.data) == -1 ||
       anOp->equal("DIST", getDistKey(part)) == -1 ||
       anOp->equal("PART", part) == -1) {
     setErrorCode(anOp);
@@ -409,8 +607,10 @@
   DBUG_ENTER("NdbBlob::getHeadFromRecAttr");
   assert(theHeadInlineRecAttr != NULL);
   theNullFlag = theHeadInlineRecAttr->isNULL();
-  assert(theNullFlag != -1);
+  assert(theEventBlobVersion >= 0 || theNullFlag != -1);
   theLength = ! theNullFlag ? theHead->length : 0;
+  DBUG_PRINT("info", ("theNullFlag=%d theLength=%llu",
+                      theNullFlag, theLength));
   DBUG_VOID_RETURN;
 }
 
@@ -423,7 +623,7 @@
     memset(theInlineData + theLength, 0, theInlineSize - theLength);
   assert(theNullFlag != -1);
   const char* aValue = theNullFlag ? 0 : theHeadInlineBuf.data;
-  if (anOp->setValue(theColumn, aValue, theHeadInlineBuf.size) == -1) {
+  if (anOp->setValue(theColumn, aValue) == -1) {
     setErrorCode(anOp);
     DBUG_RETURN(-1);
   }
@@ -514,7 +714,19 @@
 // misc operations
 
 int
-NdbBlob::getNull(bool& isNull)
+NdbBlob::getDefined(int& isNull) // deprecated
+{
+  DBUG_ENTER("NdbBlob::getDefined");
+  if (theState == Prepared && theSetFlag) {
+    isNull = (theSetBuf == NULL);
+    DBUG_RETURN(0);
+  }
+  isNull = theNullFlag;
+  DBUG_RETURN(0);
+}
+
+int
+NdbBlob::getNull(bool& isNull) // deprecated
 {
   DBUG_ENTER("NdbBlob::getNull");
   if (theState == Prepared && theSetFlag) {
@@ -530,6 +742,23 @@
 }
 
 int
+NdbBlob::getNull(int& isNull)
+{
+  DBUG_ENTER("NdbBlob::getNull");
+  if (theState == Prepared && theSetFlag) {
+    isNull = (theSetBuf == NULL);
+    DBUG_RETURN(0);
+  }
+  isNull = theNullFlag;
+  if (isNull == -1 && theEventBlobVersion == -1) {
+    setErrorCode(NdbBlobImpl::ErrState);
+    DBUG_RETURN(-1);
+  }
+  DBUG_PRINT("info", ("isNull=%d", isNull));
+  DBUG_RETURN(0);
+}
+
+int
 NdbBlob::setNull()
 {
   DBUG_ENTER("NdbBlob::setNull");
@@ -857,6 +1086,18 @@
 {
   DBUG_ENTER("NdbBlob::readParts");
   DBUG_PRINT("info", ("part=%u count=%u", part, count));
+  int ret;
+  if (theEventBlobVersion == -1)
+    ret = readTableParts(buf, part, count);
+  else
+    ret = readEventParts(buf, part, count);
+  DBUG_RETURN(ret);
+}
+
+int
+NdbBlob::readTableParts(char* buf, Uint32 part, Uint32 count)
+{
+  DBUG_ENTER("NdbBlob::readTableParts");
   Uint32 n = 0;
   while (n < count) {
     NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
@@ -877,6 +1118,18 @@
 }
 
 int
+NdbBlob::readEventParts(char* buf, Uint32 part, Uint32 count)
+{
+  DBUG_ENTER("NdbBlob::readEventParts");
+  int ret = theEventOp->readBlobParts(buf, this, part, count);
+  if (ret != 0) {
+    setErrorCode(theEventOp);
+    DBUG_RETURN(-1);
+  }
+  DBUG_RETURN(0);
+}
+
+int
 NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count)
 {
   DBUG_ENTER("NdbBlob::insertParts");
@@ -1066,68 +1319,40 @@
   theTable = anOp->m_currentTable;
   theAccessTable = anOp->m_accessTable;
   theColumn = aColumn;
-  NdbDictionary::Column::Type partType = NdbDictionary::Column::Undefined;
-  switch (theColumn->getType()) {
-  case NdbDictionary::Column::Blob:
-    partType = NdbDictionary::Column::Binary;
-    theFillChar = 0x0;
-    break;
-  case NdbDictionary::Column::Text:
-    partType = NdbDictionary::Column::Char;
-    theFillChar = 0x20;
-    break;
-  default:
-    setErrorCode(NdbBlobImpl::ErrUsage);
+  // prepare blob column and table
+  if (prepareColumn() == -1)
     DBUG_RETURN(-1);
-  }
-  // sizes
-  theInlineSize = theColumn->getInlineSize();
-  thePartSize = theColumn->getPartSize();
-  theStripeSize = theColumn->getStripeSize();
-  // sanity check
-  assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head));
-  assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize);
-  if (thePartSize > 0) {
-    const NdbDictionary::Table* bt = NULL;
-    const NdbDictionary::Column* bc = NULL;
-    if (theStripeSize == 0 ||
-        (bt = theColumn->getBlobTable()) == NULL ||
-        (bc = bt->getColumn("DATA")) == NULL ||
-        bc->getType() != partType ||
-        bc->getLength() != (int)thePartSize) {
-      setErrorCode(NdbBlobImpl::ErrTable);
-      DBUG_RETURN(-1);
-    }
-    theBlobTable = &NdbTableImpl::getImpl(*bt);
-  }
-  // buffers
-  theKeyBuf.alloc(theTable->m_keyLenInWords << 2);
+  // extra buffers
   theAccessKeyBuf.alloc(theAccessTable->m_keyLenInWords << 2);
-  theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize);
   theHeadInlineCopyBuf.alloc(sizeof(Head) + theInlineSize);
-  thePartBuf.alloc(thePartSize);
-  theHead = (Head*)theHeadInlineBuf.data;
-  theInlineData = theHeadInlineBuf.data + sizeof(Head);
   // handle different operation types
   bool supportedOp = false;
   if (isKeyOp()) {
     if (isTableOp()) {
       // get table key
-      Uint32* data = (Uint32*)theKeyBuf.data;
-      unsigned size = theTable->m_keyLenInWords;
+      Uint32* data = (Uint32*)thePackKeyBuf.data;
+      Uint32 size = theTable->m_keyLenInWords; // in-out
       if (theNdbOp->getKeyFromTCREQ(data, size) == -1) {
         setErrorCode(NdbBlobImpl::ErrUsage);
         DBUG_RETURN(-1);
       }
+      thePackKeyBuf.size = 4 * size;
+      thePackKeyBuf.zerorest();
+      if (unpackKeyValue(theTable, theKeyBuf) == -1)
+        DBUG_RETURN(-1);
     }
     if (isIndexOp()) {
       // get index key
-      Uint32* data = (Uint32*)theAccessKeyBuf.data;
-      unsigned size = theAccessTable->m_keyLenInWords;
+      Uint32* data = (Uint32*)thePackKeyBuf.data;
+      Uint32 size = theAccessTable->m_keyLenInWords; // in-out
       if (theNdbOp->getKeyFromTCREQ(data, size) == -1) {
         setErrorCode(NdbBlobImpl::ErrUsage);
         DBUG_RETURN(-1);
       }
+      thePackKeyBuf.size = 4 * size;
+      thePackKeyBuf.zerorest();
+      if (unpackKeyValue(theAccessTable, theAccessKeyBuf) == -1)
+        DBUG_RETURN(-1);
     }
     if (isReadOp()) {
       // add read of head+inline in this op
@@ -1161,6 +1386,105 @@
   DBUG_RETURN(0);
 }
 
+int
+NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, const NdbColumnImpl* aColumn, int version)
+{
+  DBUG_ENTER("NdbBlob::atPrepare [event]");
+  DBUG_PRINT("info", ("this=%p op=%p", this, anOp));
+  assert(theState == Idle);
+  assert(version == 0 || version == 1);
+  theEventBlobVersion = version;
+  // ndb api stuff
+  theNdb = anOp->m_ndb;
+  theEventOp = anOp;
+  theBlobEventOp = aBlobOp;
+  theTable = anOp->m_eventImpl->m_tableImpl;
+  theAccessTable = theTable;
+  theColumn = aColumn;
+  // prepare blob column and table
+  if (prepareColumn() == -1)
+    DBUG_RETURN(-1);
+  // tinyblob sanity
+  assert((theBlobEventOp == NULL) == (theBlobTable == NULL));
+  // extra buffers
+  theBlobEventDataBuf.alloc(thePartSize);
+  // prepare receive of head+inline
+  theHeadInlineRecAttr = theEventOp->getValue(aColumn, theHeadInlineBuf.data, version);
+  if (theHeadInlineRecAttr == NULL) {
+    setErrorCode(theEventOp);
+    DBUG_RETURN(-1);
+  }
+  // prepare receive of blob part
+  if (theBlobEventOp != NULL) {
+    if ((theBlobEventPkRecAttr =
+           theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)0),
+                                    thePackKeyBuf.data, version)) == NULL ||
+        (theBlobEventDistRecAttr =
+           theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)1),
+                                    (char*)0, version)) == NULL ||
+        (theBlobEventPartRecAttr =
+           theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)2),
+                                    (char*)&thePartNumber, version)) == NULL ||
+        (theBlobEventDataRecAttr =
+           theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)3),
+                                    theBlobEventDataBuf.data, version)) == NULL) {
+      setErrorCode(theBlobEventOp);
+      DBUG_RETURN(-1);
+    }
+  }
+  setState(Prepared);
+  DBUG_RETURN(0);
+}
+
+int
+NdbBlob::prepareColumn()
+{
+  DBUG_ENTER("prepareColumn");
+  NdbDictionary::Column::Type partType = NdbDictionary::Column::Undefined;
+  switch (theColumn->getType()) {
+  case NdbDictionary::Column::Blob:
+    partType = NdbDictionary::Column::Binary;
+    theFillChar = 0x0;
+    break;
+  case NdbDictionary::Column::Text:
+    partType = NdbDictionary::Column::Char;
+    theFillChar = 0x20;
+    break;
+  default:
+    setErrorCode(NdbBlobImpl::ErrUsage);
+    DBUG_RETURN(-1);
+  }
+  // sizes
+  theInlineSize = theColumn->getInlineSize();
+  thePartSize = theColumn->getPartSize();
+  theStripeSize = theColumn->getStripeSize();
+  // sanity check
+  assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head));
+  assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize);
+  if (thePartSize > 0) {
+    const NdbTableImpl* bt = NULL;
+    const NdbColumnImpl* bc = NULL;
+    if (theStripeSize == 0 ||
+        (bt = theColumn->m_blobTable) == NULL ||
+        (bc = bt->getColumn("DATA")) == NULL ||
+        bc->getType() != partType ||
+        bc->getLength() != (int)thePartSize) {
+      setErrorCode(NdbBlobImpl::ErrTable);
+      DBUG_RETURN(-1);
+    }
+    // blob table
+    theBlobTable = &NdbTableImpl::getImpl(*bt);
+  }
+  // these buffers are always used
+  theKeyBuf.alloc(theTable->m_keyLenInWords << 2);
+  thePackKeyBuf.alloc(max(theTable->m_keyLenInWords, theAccessTable->m_keyLenInWords) << 2);
+  theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize);
+  theHead = (Head*)theHeadInlineBuf.data;
+  theInlineData = theHeadInlineBuf.data + sizeof(Head);
+  thePartBuf.alloc(thePartSize);
+  DBUG_RETURN(0);
+}
+
 /*
  * Before execute of prepared operation.  May add new operations before
  * this one.  May ask that this operation and all before it (a "batch")
@@ -1252,7 +1576,7 @@
         if (tOp == NULL ||
             tOp->readTuple() == -1 ||
             setAccessKeyValue(tOp) == -1 ||
-            tOp->getValue(pkAttrId, theKeyBuf.data) == NULL) {
+            tOp->getValue(pkAttrId, thePackKeyBuf.data) == NULL) {
           setErrorCode(tOp);
           DBUG_RETURN(-1);
         }
@@ -1341,10 +1665,13 @@
   assert(isKeyOp());
   if (isIndexOp()) {
     NdbBlob* tFirstBlob = theNdbOp->theBlobList;
-    if (this != tFirstBlob) {
+    if (this == tFirstBlob) {
+      packKeyValue(theTable, theKeyBuf);
+    } else {
       // copy key from first blob
-      assert(theKeyBuf.size == tFirstBlob->theKeyBuf.size);
-      memcpy(theKeyBuf.data, tFirstBlob->theKeyBuf.data, tFirstBlob->theKeyBuf.size);
+      theKeyBuf.copyfrom(tFirstBlob->theKeyBuf);
+      thePackKeyBuf.copyfrom(tFirstBlob->thePackKeyBuf);
+      thePackKeyBuf.zerorest();
     }
   }
   if (isReadOp()) {
@@ -1498,12 +1825,17 @@
     DBUG_RETURN(-1);
   assert(isScanOp());
   // get primary key
-  { Uint32* data = (Uint32*)theKeyBuf.data;
-    unsigned size = theTable->m_keyLenInWords;
-    if (((NdbScanOperation*)theNdbOp)->getKeyFromKEYINFO20(data, size) == -1) {
+  { NdbScanOperation* tScanOp = (NdbScanOperation*)theNdbOp;
+    Uint32* data = (Uint32*)thePackKeyBuf.data;
+    unsigned size = theTable->m_keyLenInWords; // in-out
+    if (tScanOp->getKeyFromKEYINFO20(data, size) == -1) {
       setErrorCode(NdbBlobImpl::ErrUsage);
       DBUG_RETURN(-1);
     }
+    thePackKeyBuf.size = 4 * size;
+    thePackKeyBuf.zerorest();
+    if (unpackKeyValue(theTable, theKeyBuf) == -1)
+      DBUG_RETURN(-1);
   }
   getHeadFromRecAttr();
   if (setPos(0) == -1)
@@ -1523,6 +1855,29 @@
   DBUG_RETURN(0);
 }
 
+/*
+ * After next event on main table.
+ */
+int
+NdbBlob::atNextEvent()
+{
+  DBUG_ENTER("NdbBlob::atNextEvent");
+  Uint32 optype = theEventOp->m_data_item->sdata->operation;
+  DBUG_PRINT("info", ("this=%p op=%p blob op=%p version=%d optype=%u", this, theEventOp, theBlobEventOp, theEventBlobVersion, optype));
+  if (theState == Invalid)
+    DBUG_RETURN(-1);
+  assert(theEventBlobVersion >= 0);
+  if (optype >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT)
+    DBUG_RETURN(0);
+  getHeadFromRecAttr();
+  if (theNullFlag == -1) // value not defined
+    DBUG_RETURN(0);
+  if (setPos(0) == -1)
+    DBUG_RETURN(-1);
+  setState(Active);
+  DBUG_RETURN(0);
+}
+
 // misc
 
 const NdbDictionary::Column*
@@ -1569,6 +1924,17 @@
   if (theNdbCon != NULL && (code = theNdbCon->theError.code) != 0)
     ;
   else if ((code = theNdb->theError.code) != 0)
+    ;
+  else
+    code = NdbBlobImpl::ErrUnknown;
+  setErrorCode(code, invalidFlag);
+}
+
+void
+NdbBlob::setErrorCode(NdbEventOperationImpl* anOp, bool invalidFlag)
+{
+  int code = 0;
+  if ((code = anOp->m_error.code) != 0)
     ;
   else
     code = NdbBlobImpl::ErrUnknown;
Thread
bk commit into 5.1 tree (pekka:1.2391)pekka3 May