List:Commits« Previous MessageNext Message »
From:pekka Date:January 25 2006 9:23pm
Subject:bk commit into 5.1 tree (pekka:1.2077)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of pekka. When pekka does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2077 06/01/25 22:22:50 pekka@stripped +9 -0
  ndb - wl#2972 rbr blobs: write blob data to binlog

  storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
    1.43 06/01/25 22:00:28 pekka@stripped +3 -0
    rbr blobs: write data + dict cache workarounds

  storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
    1.99 06/01/25 22:00:27 pekka@stripped +41 -5
    rbr blobs: write data + dict cache workarounds

  storage/ndb/src/ndbapi/NdbDictionary.cpp
    1.47 06/01/25 22:00:27 pekka@stripped +6 -0
    rbr blobs: write data + dict cache workarounds

  storage/ndb/src/ndbapi/NdbBlob.cpp
    1.33 06/01/25 22:00:27 pekka@stripped +3 -3
    rbr blobs: write data + dict cache workarounds

  storage/ndb/include/ndbapi/NdbDictionary.hpp
    1.64 06/01/25 22:00:27 pekka@stripped +2 -0
    rbr blobs: write data + dict cache workarounds

  sql/ha_ndbcluster_binlog.cc
    1.4 06/01/25 22:00:26 pekka@stripped +113 -29
    rbr blobs: write data + dict cache workarounds

  sql/ha_ndbcluster.h
    1.111 06/01/25 22:00:26 pekka@stripped +8 -0
    rbr blobs: write data + dict cache workarounds

  sql/ha_ndbcluster.cc
    1.244 06/01/25 22:00:25 pekka@stripped +67 -36
    rbr blobs: write data + dict cache workarounds

  mysql-test/t/disabled.def
    1.49 06/01/25 22:00:25 pekka@stripped +4 -0
    rbr blobs: write data + dict cache workarounds

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	pekka
# Host:	orca.ndb.mysql.com
# Root:	/space/pekka/ndb/version/my51-rbr

--- 1.48/mysql-test/t/disabled.def	2006-01-19 19:32:52 +01:00
+++ 1.49/mysql-test/t/disabled.def	2006-01-25 22:00:25 +01:00
@@ -28,3 +28,7 @@
 ndb_autodiscover2 : Needs to be fixed w.r.t binlog
 system_mysql_db : Needs fixing
 system_mysql_db_fix : Needs fixing
+#ndb_alter_table_row : sometimes wrong error 1015!=1046
+ndb_gis : garbled msgs from corrupt THD* + partitioning problem
+
+# vim: set filetype=conf:

--- 1.63/storage/ndb/include/ndbapi/NdbDictionary.hpp	2006-01-19 14:00:03 +01:00
+++ 1.64/storage/ndb/include/ndbapi/NdbDictionary.hpp	2006-01-25 22:00:27 +01:00
@@ -883,6 +883,7 @@
 
   private:
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
+    friend class NdbDictionaryImpl;
     friend class NdbTableImpl;
 #endif
     class NdbTableImpl & m_impl;
@@ -1764,6 +1765,7 @@
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
     const Table * getTable(const char * name, void **data) const;
     void set_local_table_data_size(unsigned sz);
+    void fix_blob_events(const Table* table, const char* ev_name);
 #endif
   };
 };

--- 1.46/storage/ndb/src/ndbapi/NdbDictionary.cpp	2006-01-19 14:00:03 +01:00
+++ 1.47/storage/ndb/src/ndbapi/NdbDictionary.cpp	2006-01-25 22:00:27 +01:00
@@ -1478,6 +1478,12 @@
   return m_impl.getNdbError();
 }
 
+void
+NdbDictionary::Dictionary::fix_blob_events(const Table* table, const char* ev_name)
+{
+  m_impl.fix_blob_events(table, ev_name);
+}
+
 // printers
 
 NdbOut&

--- 1.98/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2006-01-19 14:00:03 +01:00
+++ 1.99/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2006-01-25 22:00:27 +01:00
@@ -3398,12 +3398,14 @@
     if (ev->m_tableId      == info->m_table_impl->m_id &&
 	ev->m_tableVersion == info->m_table_impl->m_version)
       break;
+    DBUG_PRINT("error",("%s: retry=%d: "
+                        "table version mismatch, event: [%u,%u] table: [%u,%u]",
+                        ev->getTableName(), retry,
+                        ev->m_tableId, ev->m_tableVersion,
+                        info->m_table_impl->m_id, info->m_table_impl->m_version));
     if (retry)
     {
       m_error.code= 241;
-      DBUG_PRINT("error",("%s: table version mismatch, event: [%u,%u] table: [%u,%u]",
-			  ev->getTableName(), ev->m_tableId, ev->m_tableVersion,
-			  info->m_table_impl->m_id, info->m_table_impl->m_version));
       delete ev;
       DBUG_RETURN(NULL);
     }
@@ -3607,7 +3609,7 @@
     if (m_error.code != 723 && // no such table
         m_error.code != 241)   // invalid table
       DBUG_RETURN(-1);
-    DBUG_PRINT("info", ("no table, drop by name alone"));
+    DBUG_PRINT("info", ("no table err=%d, drop by name alone", m_error.code));
     evnt = new NdbEventImpl();
     evnt->setName(eventName);
   }
@@ -3644,7 +3646,17 @@
       (void)dropEvent(bename);
     }
   } else {
-    // could loop over MAX_ATTRIBUTES_IN_TABLE ...
+    // loop over MAX_ATTRIBUTES_IN_TABLE ...
+    Uint32 i;
+    for (i = 0; i < MAX_ATTRIBUTES_IN_TABLE; i++) {
+      char bename[MAX_TAB_NAME_SIZE];
+      // XXX should get name from NdbBlob
+      sprintf(bename, "NDB$BLOBEVENT_%s_%u", evnt.getName(), i);
+      NdbEventImpl* bevnt = new NdbEventImpl();
+      bevnt->setName(bename);
+      (void)m_receiver.dropEvent(*bevnt);
+      delete bevnt;
+    }
   }
   DBUG_RETURN(0);
 }
@@ -4629,6 +4641,30 @@
   dst.m_filegroup_version= f.FilegroupVersion;
   dst.m_free=  f.FileFreeExtents;
   return 0;
+}
+
+// XXX temp
+void
+NdbDictionaryImpl::fix_blob_events(const NdbDictionary::Table* table, const char* ev_name)
+{
+  const NdbTableImpl& t = table->m_impl;
+  const NdbEventImpl* ev = getEvent(ev_name);
+  assert(ev != NULL && ev->m_tableImpl == &t);
+  Uint32 i;
+  for (i = 0; i < t.m_columns.size(); i++) {
+    assert(t.m_columns[i] != NULL);
+    const NdbColumnImpl& c = *t.m_columns[i];
+    if (! c.getBlobType() || c.getPartSize() == 0)
+      continue;
+    char bename[200];
+    NdbBlob::getBlobEventName(bename, ev, &c);
+    // following fixes dict cache blob table
+    NdbEventImpl* bev = getEvent(bename);
+    if (c.m_blobTable != bev->m_tableImpl) {
+      // XXX const violation
+      ((NdbColumnImpl*)&c)->m_blobTable = bev->m_tableImpl;
+    }
+  }
 }
 
 template class Vector<int>;

--- 1.42/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2006-01-19 14:00:03 +01:00
+++ 1.43/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2006-01-25 22:00:28 +01:00
@@ -592,6 +592,9 @@
 
   NdbDictInterface m_receiver;
   Ndb & m_ndb;
+
+  // XXX temp
+  void fix_blob_events(const NdbDictionary::Table* table, const char* ev_name);
 private:
   NdbIndexImpl * getIndexImpl(const char * name,
                               const BaseString& internalName);

--- 1.243/sql/ha_ndbcluster.cc	2006-01-18 12:43:23 +01:00
+++ 1.244/sql/ha_ndbcluster.cc	2006-01-25 22:00:25 +01:00
@@ -35,6 +35,11 @@
 
 #include "ha_ndbcluster_binlog.h"
 
+#ifdef ndb_dynamite
+#undef assert
+#define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
+#endif
+
 // options from from mysqld.cc
 extern my_bool opt_ndb_optimized_node_selection;
 extern const char *opt_ndbcluster_connectstring;
@@ -791,10 +796,20 @@
   if (ndb_blob->blobsNextBlob() != NULL)
     DBUG_RETURN(0);
   ha_ndbcluster *ha= (ha_ndbcluster *)arg;
-  DBUG_RETURN(ha->get_ndb_blobs_value(ndb_blob));
+  int ret= get_ndb_blobs_value(ha->table, ha->m_value,
+                               ha->m_blobs_buffer, ha->m_blobs_buffer_size,
+                               0);
+  DBUG_RETURN(ret);
 }
 
-int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
+/*
+  This routine is shared by injector.  There is no common blobs buffer
+  so the buffer and length are passed by reference.  Injector also
+  passes a record pointer diff.
+ */
+int get_ndb_blobs_value(TABLE* table, NdbValue* value_array,
+                        byte*& buffer, uint& buffer_size,
+                        my_ptrdiff_t ptrdiff)
 {
   DBUG_ENTER("get_ndb_blobs_value");
 
@@ -803,44 +818,51 @@
   for (int loop= 0; loop <= 1; loop++)
   {
     uint32 offset= 0;
-    for (uint i= 0; i < table_share->fields; i++)
+    for (uint i= 0; i < table->s->fields; i++)
     {
       Field *field= table->field[i];
-      NdbValue value= m_value[i];
+      NdbValue value= value_array[i];
       if (value.ptr != NULL && (field->flags & BLOB_FLAG))
       {
         Field_blob *field_blob= (Field_blob *)field;
         NdbBlob *ndb_blob= value.blob;
-        Uint64 blob_len= 0;
-        if (ndb_blob->getLength(blob_len) != 0)
-          DBUG_RETURN(-1);
-        // Align to Uint64
-        uint32 blob_size= blob_len;
-        if (blob_size % 8 != 0)
-          blob_size+= 8 - blob_size % 8;
-        if (loop == 1)
-        {
-          char *buf= m_blobs_buffer + offset;
-          uint32 len= 0xffffffff;  // Max uint32
-          DBUG_PRINT("value", ("read blob ptr=%lx  len=%u",
-                               buf, (uint) blob_len));
-          if (ndb_blob->readData(buf, len) != 0)
+        int isNull;
+        ndb_blob->getDefined(isNull);
+        if (isNull == 0) { // XXX -1 should be allowed only for events
+          Uint64 blob_len= 0;
+          if (ndb_blob->getLength(blob_len) != 0)
             DBUG_RETURN(-1);
-          DBUG_ASSERT(len == blob_len);
-          field_blob->set_ptr(len, buf);
+          // Align to Uint64
+          uint32 blob_size= blob_len;
+          if (blob_size % 8 != 0)
+            blob_size+= 8 - blob_size % 8;
+          if (loop == 1)
+          {
+            char *buf= buffer + offset;
+            uint32 len= 0xffffffff;  // Max uint32
+            DBUG_PRINT("info", ("read blob ptr=%p len=%u",
+                                buf, (uint) blob_len));
+            if (ndb_blob->readData(buf, len) != 0)
+              DBUG_RETURN(-1);
+            DBUG_ASSERT(len == blob_len);
+            // Ugly hack assumes only ptr needs to be changed
+            field_blob->ptr += ptrdiff;
+            field_blob->set_ptr(len, buf);
+            field_blob->ptr -= ptrdiff;
+          }
+          offset+= blob_size;
         }
-        offset+= blob_size;
       }
     }
-    if (loop == 0 && offset > m_blobs_buffer_size)
+    if (loop == 0 && offset > buffer_size)
     {
-      my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
-      m_blobs_buffer_size= 0;
-      DBUG_PRINT("value", ("allocate blobs buffer size %u", offset));
-      m_blobs_buffer= my_malloc(offset, MYF(MY_WME));
-      if (m_blobs_buffer == NULL)
+      my_free(buffer, MYF(MY_ALLOW_ZERO_PTR));
+      buffer_size= 0;
+      DBUG_PRINT("info", ("allocate blobs buffer size %u", offset));
+      buffer= my_malloc(offset, MYF(MY_WME));
+      if (buffer == NULL)
         DBUG_RETURN(-1);
-      m_blobs_buffer_size= offset;
+      buffer_size= offset;
     }
   }
   DBUG_RETURN(0);
@@ -2713,14 +2735,22 @@
       else
       {
         NdbBlob *ndb_blob= (*value).blob;
-        bool isNull= TRUE;
-#ifndef DBUG_OFF
-        int ret= 
-#endif
-          ndb_blob->getNull(isNull);
-        DBUG_ASSERT(ret == 0);
-        if (isNull)
-          field->set_null(row_offset);
+        int isNull;
+        ndb_blob->getDefined(isNull);
+        if (isNull != 0)
+        {
+          uint col_no = ndb_blob->getColumn()->getColumnNo();
+          if (isNull == 1)
+          {
+            DBUG_PRINT("info",("[%u] NULL", col_no))
+            field->set_null(row_offset);
+          }
+          else
+          {
+            DBUG_PRINT("info",("[%u] UNDEFINED", col_no));
+            bitmap_clear_bit(defined, col_no);
+          }
+        }
       }
     }
   }
@@ -4713,6 +4743,7 @@
   NDBDICT *dict= ndb->getDictionary();
   const NDBTAB *orig_tab= (const NDBTAB *) m_table;
   DBUG_ENTER("alter_table_name");
+  DBUG_PRINT("info", ("from: %s to: %s", orig_tab->getName(), to));
 
   NdbDictionary::Table new_tab= *orig_tab;
   new_tab.setName(to);

--- 1.110/sql/ha_ndbcluster.h	2006-01-17 14:39:59 +01:00
+++ 1.111/sql/ha_ndbcluster.h	2006-01-25 22:00:26 +01:00
@@ -25,6 +25,9 @@
 #pragma interface                       /* gcc class implementation */
 #endif
 
+/* Blob tables and events are internal to NDB and must never be accessed */
+#define IS_NDB_BLOB_PREFIX(A) is_prefix(A, "NDB$BLOB")
+
 #include <NdbApi.hpp>
 #include <ndbapi_limits.h>
 
@@ -78,6 +81,10 @@
 
 typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
 
+int get_ndb_blobs_value(TABLE* table, NdbValue* value_array,
+                        byte*& buffer, uint& buffer_size,
+                        my_ptrdiff_t ptrdiff);
+
 typedef enum {
   NSS_INITIAL= 0,
   NSS_DROPPED,
@@ -114,6 +121,7 @@
 #ifdef HAVE_NDB_BINLOG
 /* NDB_SHARE.flags */
 #define NSF_HIDDEN_PK 1 /* table has hidden primary key */
+#define NSF_BLOB_FLAG 2 /* table has blob attributes */
 #define NSF_NO_BINLOG 4 /* table should not be binlogged */
 #endif
 

--- 1.32/storage/ndb/src/ndbapi/NdbBlob.cpp	2006-01-22 13:27:23 +01:00
+++ 1.33/storage/ndb/src/ndbapi/NdbBlob.cpp	2006-01-25 22:00:27 +01:00
@@ -1327,10 +1327,10 @@
   assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head));
   assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize);
   if (thePartSize > 0) {
-    const NdbDictionary::Table* bt = NULL;
-    const NdbDictionary::Column* bc = NULL;
+    const NdbTableImpl* bt = NULL;
+    const NdbColumnImpl* bc = NULL;
     if (theStripeSize == 0 ||
-        (bt = theColumn->getBlobTable()) == NULL ||
+        (bt = theColumn->m_blobTable) == NULL ||
         (bc = bt->getColumn("DATA")) == NULL ||
         bc->getType() != partType ||
         bc->getLength() != (int)thePartSize) {

--- 1.3/sql/ha_ndbcluster_binlog.cc	2006-01-17 15:36:50 +01:00
+++ 1.4/sql/ha_ndbcluster_binlog.cc	2006-01-25 22:00:26 +01:00
@@ -23,6 +23,11 @@
 #include "slave.h"
 #include "ha_ndbcluster_binlog.h"
 
+#ifdef ndb_dynamite
+#undef assert
+#define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
+#endif
+
 /*
   defines for cluster replication table names
 */
@@ -237,6 +242,8 @@
     DBUG_ASSERT(_table != 0);
     if (_table->s->primary_key == MAX_KEY)
       share->flags|= NSF_HIDDEN_PK;
+    if (_table->s->blob_fields != 0)
+      share->flags|= NSF_BLOB_FLAG;
     return;
   }
   while (1) 
@@ -316,6 +323,8 @@
     }
     if (table->s->primary_key == MAX_KEY)
       share->flags|= NSF_HIDDEN_PK;
+    if (table->s->blob_fields != 0)
+      share->flags|= NSF_BLOB_FLAG;
     break;
   }
 }
@@ -1622,6 +1631,7 @@
                                    NDB_SHARE *share)
 {
   DBUG_ENTER("ndbcluster_create_binlog_setup");
+  DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name));
 
   pthread_mutex_lock(&ndbcluster_mutex);
 
@@ -1713,6 +1723,10 @@
                         const char *event_name, NDB_SHARE *share)
 {
   DBUG_ENTER("ndbcluster_create_event");
+  DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s",
+                      ndbtab->getName(), ndbtab->getObjectVersion(),
+                      event_name, share ? share->key : "(nil)"));
+  DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
   if (!share)
   {
     DBUG_PRINT("info", ("share == NULL"));
@@ -1730,7 +1744,14 @@
   my_event.addTableEvent(NDBEVENT::TE_ALL);
   if (share->flags & NSF_HIDDEN_PK)
   {
-    /* No primary key, susbscribe for all attributes */
+    if (share->flags & NSF_BLOB_FLAG)
+    {
+      sql_print_error("NDB Binlog: logging of table %s "
+                      "with no PK and blob attributes is not supported",
+                      share->key);
+      DBUG_RETURN(-1);
+    }
+    /* No primary key, subscribe for all attributes */
     my_event.setReport(NDBEVENT::ER_ALL);
     DBUG_PRINT("info", ("subscription all"));
   }
@@ -1749,6 +1770,8 @@
       DBUG_PRINT("info", ("subscription all and subscribe"));
     }
   }
+  if (share->flags & NSF_BLOB_FLAG)
+    my_event.mergeEvents(true);
 
   /* add all columns to the event */
   int n_cols= ndbtab->getNoOfColumns();
@@ -1837,6 +1860,7 @@
   */
 
   DBUG_ENTER("ndbcluster_create_event_ops");
+  DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
 
   DBUG_ASSERT(share != 0);
 
@@ -1857,22 +1881,6 @@
   }
 
   TABLE *table= share->table;
-  if (table)
-  {
-    /*
-      Logging of blob tables is not yet implemented, it would require:
-      1. setup of events also on the blob attribute tables
-      2. collect the pieces of the blob into one from an epoch to
-         provide a full blob to binlog
-    */
-    if (table->s->blob_fields)
-    {
-      sql_print_error("NDB Binlog: logging of blob table %s "
-                      "is not supported", share->key);
-      share->flags|= NSF_NO_BINLOG;
-      DBUG_RETURN(0);
-    }
-  }
 
   int do_schema_share= 0, do_apply_status_share= 0;
   int retries= 100;
@@ -1910,37 +1918,64 @@
       DBUG_RETURN(-1);
     }
 
+    if (share->flags & NSF_BLOB_FLAG)
+      op->mergeEvents(true); // currently not inherited from event
+
+    if (share->flags & NSF_BLOB_FLAG)
+    {
+      /*
+       * Given servers S1 S2, following results in out-of-date
+       * event->m_tableImpl and column->m_blobTable.
+       *
+       * S1: create table t1(a int primary key);
+       * S2: drop table t1;
+       * S1: create table t2(a int primary key, b blob);
+       * S1: alter table t2 add x int;
+       * S1: alter table t2 drop x;
+       *
+       * TODO fix at right place before we get here
+       */
+      ndb->getDictionary()->fix_blob_events(ndbtab, event_name);
+    }
+
     int n_columns= ndbtab->getNoOfColumns();
-    int n_fields= table ? table->s->fields : 0;
+    int n_fields= table ? table->s->fields : 0; // XXX ???
     for (int j= 0; j < n_columns; j++)
     {
       const char *col_name= ndbtab->getColumn(j)->getName();
-      NdbRecAttr *attr0, *attr1;
+      NdbValue attr0, attr1;
       if (j < n_fields)
       {
         Field *f= share->table->field[j];
         if (is_ndb_compatible_type(f))
         {
           DBUG_PRINT("info", ("%s compatible", col_name));
-          attr0= op->getValue(col_name, f->ptr);
-          attr1= op->getPreValue(col_name, (f->ptr-share->table->record[0]) +
+          attr0.rec= op->getValue(col_name, f->ptr);
+          attr1.rec= op->getPreValue(col_name,
+                                 (f->ptr - share->table->record[0]) +
                                  share->table->record[1]);
         }
-        else
+        else if (! (f->flags & BLOB_FLAG))
         {
           DBUG_PRINT("info", ("%s non compatible", col_name));
-          attr0= op->getValue(col_name);
-          attr1= op->getPreValue(col_name);
+          attr0.rec= op->getValue(col_name);
+          attr1.rec= op->getPreValue(col_name);
+        }
+        else
+        {
+          DBUG_PRINT("info", ("%s blob", col_name));
+          attr0.blob= op->getBlobHandle(col_name);
+          attr1.blob= op->getPreBlobHandle(col_name);
         }
       }
       else
       {
         DBUG_PRINT("info", ("%s hidden key", col_name));
-        attr0= op->getValue(col_name);
-        attr1= op->getPreValue(col_name);
+        attr0.rec= op->getValue(col_name);
+        attr1.rec= op->getPreValue(col_name);
       }
-      share->ndb_value[0][j].rec= attr0;
-      share->ndb_value[1][j].rec= attr1;
+      share->ndb_value[0][j].ptr= attr0.ptr;
+      share->ndb_value[1][j].ptr= attr1.ptr;
     }
     op->setCustomData((void *) share); // set before execute
     share->op= op; // assign op in NDB_SHARE
@@ -2229,12 +2264,27 @@
    (saves moving data about many times)
   */
 
+  /*
+    for now malloc/free blobs buffer each time
+    TODO if possible share single permanent buffer with handlers
+   */
+  byte* blobs_buffer[2] = { 0, 0 };
+  uint blobs_buffer_size[2] = { 0, 0 };
+
   switch(pOp->getEventType())
   {
   case NDBEVENT::TE_INSERT:
     row.n_inserts++;
     DBUG_PRINT("info", ("INSERT INTO %s", share->key));
     {
+      if (share->flags & NSF_BLOB_FLAG)
+      {
+        my_ptrdiff_t ptrdiff= 0;
+        int ret= get_ndb_blobs_value(table, share->ndb_value[0],
+                                     blobs_buffer[0], blobs_buffer_size[0],
+                                     ptrdiff);
+        DBUG_ASSERT(ret == 0);
+      }
       ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
       trans.write_row(::server_id, injector::transaction::table(table, true),
                       &b, n_fields, table->record[0]);
@@ -2261,6 +2311,14 @@
                 key
 	      */
 
+      if (share->flags & NSF_BLOB_FLAG)
+      {
+        my_ptrdiff_t ptrdiff= table->record[n] - table->record[0];
+        int ret= get_ndb_blobs_value(table, share->ndb_value[n],
+                                     blobs_buffer[n], blobs_buffer_size[n],
+                                     ptrdiff);
+        DBUG_ASSERT(ret == 0);
+      }
       ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
       print_records(table, table->record[n]);
       trans.delete_row(::server_id, injector::transaction::table(table, true),
@@ -2271,13 +2329,21 @@
     row.n_updates++;
     DBUG_PRINT("info", ("UPDATE %s", share->key));
     {
+      if (share->flags & NSF_BLOB_FLAG)
+      {
+        my_ptrdiff_t ptrdiff= 0;
+        int ret= get_ndb_blobs_value(table, share->ndb_value[0],
+                                     blobs_buffer[0], blobs_buffer_size[0],
+                                     ptrdiff);
+        DBUG_ASSERT(ret == 0);
+      }
       ndb_unpack_record(table, share->ndb_value[0],
                         &b, table->record[0]);
       print_records(table, table->record[0]);
       if (table->s->primary_key != MAX_KEY) 
       {
         /*
-          since table has a primary key, we can to a write
+          since table has a primary key, we can do a write
           using only after values
 	*/
         trans.write_row(::server_id, injector::transaction::table(table, true),
@@ -2289,6 +2355,14 @@
           mysql server cannot handle the ndb hidden key and
           therefore needs the before image as well
 	*/
+        if (share->flags & NSF_BLOB_FLAG)
+        {
+          my_ptrdiff_t ptrdiff= table->record[1] - table->record[0];
+          int ret= get_ndb_blobs_value(table, share->ndb_value[1],
+                                       blobs_buffer[1], blobs_buffer_size[1],
+                                       ptrdiff);
+          DBUG_ASSERT(ret == 0);
+        }
         ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]);
         print_records(table, table->record[1]);
         trans.update_row(::server_id,
@@ -2305,6 +2379,12 @@
     break;
   }
 
+  if (share->flags & NSF_BLOB_FLAG)
+  {
+    my_free(blobs_buffer[0], MYF(MY_ALLOW_ZERO_PTR));
+    my_free(blobs_buffer[1], MYF(MY_ALLOW_ZERO_PTR));
+  }
+
   return 0;
 }
 
@@ -2544,6 +2624,9 @@
       Binlog_index_row row;
       while (pOp != NULL)
       {
+        // sometimes get TE_ALTER with invalid table
+        DBUG_ASSERT(pOp->getEventType() == NdbDictionary::Event::TE_ALTER ||
+                    ! IS_NDB_BLOB_PREFIX(pOp->getTable()->getName()));
         ndb->
           setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
         ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
@@ -2684,6 +2767,7 @@
     DBUG_PRINT("info",("removing all event operations"));
     while ((op= ndb->getEventOperation()))
     {
+      DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getTable()->getName()));
       DBUG_PRINT("info",("removing event operation on %s",
                          op->getEvent()->getName()));
       NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
Thread
bk commit into 5.1 tree (pekka:1.2077)pekka25 Jan