List:Commits« Previous MessageNext Message »
From:sanja Date:March 17 2006 11:17am
Subject:bk commit into 5.1 tree (bell:1.2198)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of bell. When bell does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2198 06/03/17 13:17:10 bell@stripped +8 -0
  Merge book.local:/Users/bell/mysql/bk/mysql-5.0
  into  book.local:/Users/bell/mysql/bk/work-qc-5.1

  sql/sql_insert.cc
    1.191 06/03/17 13:17:02 bell@stripped +6 -10
    merge

  storage/ndb/src/kernel/blocks/backup/Backup.cpp
    1.40 06/03/17 13:07:57 bell@stripped +0 -0
    Auto merged

  sql/sql_yacc.yy
    1.483 06/03/17 13:07:56 bell@stripped +0 -0
    Auto merged

  sql/sql_parse.cc
    1.531 06/03/17 13:07:56 bell@stripped +0 -0
    Auto merged

  storage/ndb/src/kernel/blocks/backup/Backup.cpp
    1.15.16.2 06/03/17 13:07:55 bell@stripped +0 -0
    Merge rename: ndb/src/kernel/blocks/backup/Backup.cpp -> storage/ndb/src/kernel/blocks/backup/Backup.cpp

  scripts/make_binary_distribution.sh
    1.114 06/03/17 13:07:55 bell@stripped +0 -0
    Auto merged

  mysql-test/t/query_cache_notembedded.test
    1.5 06/03/17 13:07:55 bell@stripped +0 -0
    Auto merged

  mysql-test/r/query_cache_notembedded.result
    1.5 06/03/17 13:07:55 bell@stripped +0 -0
    Auto merged

  mysql-test/lib/mtr_timer.pl
    1.8 06/03/17 13:07:55 bell@stripped +0 -0
    Auto merged

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	bell
# Host:	book.local
# Root:	/Users/bell/mysql/bk/work-qc-5.1/RESYNC

--- 1.190/sql/sql_insert.cc	2006-03-13 17:06:23 +02:00
+++ 1.191/sql/sql_insert.cc	2006-03-17 13:17:02 +02:00
@@ -272,6 +272,7 @@
   bool log_on= (thd->options & OPTION_BIN_LOG) ||
     (!(thd->security_ctx->master_access & SUPER_ACL));
   bool transactional_table, joins_freed= FALSE;
+  bool changed;
   uint value_count;
   ulong counter = 1;
   ulonglong id;
@@ -558,35 +559,33 @@
     else if (table->next_number_field && info.copied)
       id=table->next_number_field->val_int();	// Return auto_increment value
 
-    /*
-      Invalidate the table in the query cache if something changed.
-      For the transactional algorithm to work the invalidation must be
-      before binlog writing and ha_autocommit_or_rollback
-    */
-    if (info.copied || info.deleted || info.updated)
-    {
-      query_cache_invalidate3(thd, table_list, 1);
-    }
-
     transactional_table= table->file->has_transactions();
 
-    if ((info.copied || info.deleted || info.updated) &&
-	(error <= 0 || !transactional_table))
+    if ((changed= (info.copied || info.deleted || info.updated)))
     {
-      if (mysql_bin_log.is_open())
+      /*
+        Invalidate the table in the query cache if something changed.
+        For the transactional algorithm to work the invalidation must be
+        before binlog writing and ha_autocommit_or_rollback
+      */
+      query_cache_invalidate3(thd, table_list, 1);
+      if (error <= 0 || !transactional_table)
       {
-        if (error <= 0)
-          thd->clear_error();
-	if (thd->binlog_query(THD::ROW_QUERY_TYPE,
-                              thd->query, thd->query_length,
-                              transactional_table, FALSE) &&
-            transactional_table)
+        if (mysql_bin_log.is_open())
         {
-          error=1;
+          if (error <= 0)
+            thd->clear_error();
+          if (thd->binlog_query(THD::ROW_QUERY_TYPE,
+                                thd->query, thd->query_length,
+                                transactional_table, FALSE) &&
+              transactional_table)
+          {
+            error=1;
+          }
         }
+        if (!transactional_table)
+          thd->options|=OPTION_STATUS_NO_TRANS_UPDATE;
       }
-      if (!transactional_table)
-	thd->options|=OPTION_STATUS_NO_TRANS_UPDATE;
     }
     if (transactional_table)
       error=ha_autocommit_or_rollback(thd,error);
@@ -594,6 +593,16 @@
     if (thd->lock)
     {
       mysql_unlock_tables(thd, thd->lock);
+      /*
+        Invalidate the table in the query cache if something changed
+        after unlocking when changes become fisible.
+        TODO: this is workaround. right way will be move invalidating in
+        the unlock procedure.
+      */
+      if (lock_type ==  TL_WRITE_CONCURRENT_INSERT && changed)
+      {
+        query_cache_invalidate3(thd, table_list, 1);
+      }
       thd->lock=0;
     }
   }

--- 1.530/sql/sql_parse.cc	2006-03-16 14:20:31 +02:00
+++ 1.531/sql/sql_parse.cc	2006-03-17 13:07:56 +02:00
@@ -3339,6 +3339,19 @@
         select_lex->context.table_list= 
           select_lex->context.first_name_resolution_table= second_table;
 	res= handle_select(thd, lex, result, OPTION_SETUP_TABLES_DONE);
+        /*
+          Invalidate the table in the query cache if something changed
+          after unlocking when changes become visible.
+          TODO: this is workaround. right way will be move invalidating in
+          the unlock procedure.
+        */
+        if (first_table->lock_type ==  TL_WRITE_CONCURRENT_INSERT &&
+            thd->lock)
+        {
+          mysql_unlock_tables(thd, thd->lock);
+          query_cache_invalidate3(thd, first_table, 1);
+          thd->lock=0;
+        }
         delete result;
       }
       /* revert changes for SP */

--- 1.482/sql/sql_yacc.yy	2006-03-16 14:20:31 +02:00
+++ 1.483/sql/sql_yacc.yy	2006-03-17 13:07:56 +02:00
@@ -7809,7 +7809,19 @@
 	;
 
 insert_lock_option:
-	/* empty */	{ $$= TL_WRITE_CONCURRENT_INSERT; }
+	/* empty */
+          {
+#ifdef HAVE_QUERY_CACHE
+            /*
+              If it is SP we do not allow insert optimisation whan result of
+              insert visible only after the table unlocking but everyone can
+              read table.
+            */
+            $$= (Lex->sphead ? TL_WRITE :TL_WRITE_CONCURRENT_INSERT);
+#else
+            $$= TL_WRITE_CONCURRENT_INSERT;
+#endif
+          }
 	| LOW_PRIORITY	{ $$= TL_WRITE_LOW_PRIORITY; }
 	| DELAYED_SYM	{ $$= TL_WRITE_DELAYED; }
 	| HIGH_PRIORITY { $$= TL_WRITE; }
@@ -8728,7 +8740,16 @@
 
 load_data_lock:
 	/* empty */	{ $$= YYTHD->update_lock_default; }
-	| CONCURRENT	{ $$= TL_WRITE_CONCURRENT_INSERT ; }
+	| CONCURRENT
+          {
+#ifdef HAVE_QUERY_CACHE
+            /*
+              Ignore this option in SP to avoid problem with query cache
+            */
+            if (Lex->sphead != 0)
+#endif
+              $$= TL_WRITE_CONCURRENT_INSERT;
+          }
 	| LOW_PRIORITY	{ $$= TL_WRITE_LOW_PRIORITY; };
 
 

--- 1.4/mysql-test/r/query_cache_notembedded.result	2006-02-25 20:35:06 +02:00
+++ 1.5/mysql-test/r/query_cache_notembedded.result	2006-03-17 13:07:55 +02:00
@@ -314,4 +314,34 @@
 drop procedure f3;
 drop procedure f4;
 drop table t1;
+reset query cache;
+drop function if exists f1;
+create table t1 (id int);
+create function f1 ()
+returns int
+begin
+declare i_var int;
+set i_var = sleep(3);
+insert into t1 values(3);
+set i_var = sleep(3);
+return 0;
+end;|
+ select f1();
+select sleep(4);
+sleep(4)
+0
+select * from t1;
+id
+3
+f1()
+0
+select * from t1;
+id
+3
+reset query cache;
+select * from t1;
+id
+3
+drop table t1;
+drop function f1;
 set GLOBAL query_cache_size=0;

--- 1.4/mysql-test/t/query_cache_notembedded.test	2006-02-25 20:35:07 +02:00
+++ 1.5/mysql-test/t/query_cache_notembedded.test	2006-03-17 13:07:55 +02:00
@@ -180,5 +180,45 @@
 drop procedure f4;
 drop table t1;
 
+#
+# bug#14767: INSERT in SF + concurrent SELECT with query cache
+#
+reset query cache;
+--disable_warnings
+drop function if exists f1;
+--enable_warnings
+create table t1 (id int);
+delimiter |;
+create function f1 ()
+  returns int
+begin
+  declare i_var int;
+  set i_var = sleep(3);
+  insert into t1 values(3);
+  set i_var = sleep(3);
+  return 0;
+end;|
+delimiter ;|
+
+connect (con1,localhost,root,,);
+connect (con2,localhost,root,,);
+
+connection con1;
+send select f1();
+connection con2;
+select sleep(4);
+select * from t1;
+connection con1;
+reap;
+connection con2;
+# This gives wrong result i.e. 't' table seems to be empty
+select * from t1;
+reset query cache;
+select * from t1;
+drop table t1;
+drop function f1;
+disconnect con1;
+disconnect con2;
+connection default;
 
 set GLOBAL query_cache_size=0;

--- 1.15.16.1/ndb/src/kernel/blocks/backup/Backup.cpp	2006-03-16 12:23:05 +02:00
+++ 1.40/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2006-03-17 13:07:57 +02:00
@@ -14,6 +14,7 @@
    along with this program; if not, write to the Free Software
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
+#include <my_config.h>
 #include "Backup.hpp"
 
 #include <ndb_version.h>
@@ -24,6 +25,7 @@
 #include <signaldata/NodeFailRep.hpp>
 #include <signaldata/ReadNodesConf.hpp>
 
+#include <signaldata/DihFragCount.hpp>
 #include <signaldata/ScanFrag.hpp>
 
 #include <signaldata/GetTabInfo.hpp>
@@ -52,6 +54,7 @@
 #include <AttributeHeader.hpp>
 
 #include <signaldata/WaitGCP.hpp>
+#include <signaldata/LCP.hpp>
 
 #include <NdbTick.h>
 
@@ -66,112 +69,13 @@
 #endif
 
 //#define DEBUG_ABORT
+//#define dbg globalSignalLoggers.log
 
 static Uint32 g_TypeOfStart = NodeState::ST_ILLEGAL_TYPE;
 
 #define SEND_BACKUP_STARTED_FLAG(A) (((A) & 0x3) > 0)
 #define SEND_BACKUP_COMPLETED_FLAG(A) (((A) & 0x3) > 1)
 
-void 
-Backup::execREAD_CONFIG_REQ(Signal* signal)
-{
-  jamEntry();
-
-  const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
-
-  Uint32 ref = req->senderRef;
-  Uint32 senderData = req->senderData;
-
-  const ndb_mgm_configuration_iterator * p = 
-    theConfiguration.getOwnConfigIterator();
-  ndbrequire(p != 0);
-
-  c_nodePool.setSize(MAX_NDB_NODES);
-
-  Uint32 noBackups = 0, noTables = 0, noAttribs = 0;
-  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_DISCLESS, &m_diskless));
-  ndb_mgm_get_int_parameter(p, CFG_DB_PARALLEL_BACKUPS, &noBackups);
-  //  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_NO_TABLES, &noTables));
-  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DICT_TABLE, &noTables));
-  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_NO_ATTRIBUTES, &noAttribs));
-
-  noAttribs++; //RT 527 bug fix
-
-  c_backupPool.setSize(noBackups);
-  c_backupFilePool.setSize(3 * noBackups);
-  c_tablePool.setSize(noBackups * noTables);
-  c_attributePool.setSize(noBackups * noAttribs);
-  c_triggerPool.setSize(noBackups * 3 * noTables);
-
-  // 2 = no of replicas
-  c_fragmentPool.setSize(noBackups * 2 * NO_OF_FRAG_PER_NODE * noTables);
-  
-  Uint32 szMem = 0;
-  ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_MEM, &szMem);
-  Uint32 noPages = (szMem + sizeof(Page32) - 1) / sizeof(Page32);
-  // We need to allocate an additional of 2 pages. 1 page because of a bug in
-  // ArrayPool and another one for DICTTAINFO.
-  c_pagePool.setSize(noPages + NO_OF_PAGES_META_FILE + 2); 
-
-  Uint32 szDataBuf = (2 * 1024 * 1024);
-  Uint32 szLogBuf = (2 * 1024 * 1024);
-  Uint32 szWrite = 32768;
-  ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_DATA_BUFFER_MEM, &szDataBuf);
-  ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_LOG_BUFFER_MEM, &szLogBuf);
-  ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_WRITE_SIZE, &szWrite);
-  
-  c_defaults.m_logBufferSize = szLogBuf;
-  c_defaults.m_dataBufferSize = szDataBuf;
-  c_defaults.m_minWriteSize = szWrite;
-  c_defaults.m_maxWriteSize = szWrite;
-  
-  { // Init all tables
-    ArrayList<Table> tables(c_tablePool);
-    TablePtr ptr;
-    while(tables.seize(ptr)){
-      new (ptr.p) Table(c_attributePool, c_fragmentPool);
-    }
-    tables.release();
-  }
-
-  {
-    ArrayList<BackupFile> ops(c_backupFilePool);
-    BackupFilePtr ptr;
-    while(ops.seize(ptr)){
-      new (ptr.p) BackupFile(* this, c_pagePool);
-    }
-    ops.release();
-  }
-  
-  {
-    ArrayList<BackupRecord> recs(c_backupPool);
-    BackupRecordPtr ptr;
-    while(recs.seize(ptr)){
-      new (ptr.p) BackupRecord(* this, c_pagePool, c_tablePool, 
-			       c_backupFilePool, c_triggerPool);
-    }
-    recs.release();
-  }
-
-  // Initialize BAT for interface to file system
-  {
-    Page32Ptr p;
-    ndbrequire(c_pagePool.seizeId(p, 0));
-    c_startOfPages = (Uint32 *)p.p;
-    c_pagePool.release(p);
-    
-    NewVARIABLE* bat = allocateBat(1);
-    bat[0].WA = c_startOfPages;
-    bat[0].nrr = c_pagePool.getSize()*sizeof(Page32)/sizeof(Uint32);
-  }
-  
-  ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
-  conf->senderRef = reference();
-  conf->senderData = senderData;
-  sendSignal(ref, GSN_READ_CONFIG_CONF, signal, 
-	     ReadConfigConf::SignalLength, JBB);
-}
-
 void
 Backup::execSTTOR(Signal* signal) 
 {
@@ -426,13 +330,26 @@
 
     if(signal->getLength() == 2 && signal->theData[1] == 2424)
     {
-      ndbrequire(c_tablePool.getSize() == c_tablePool.getNoOfFree());
-      ndbrequire(c_attributePool.getSize() == c_attributePool.getNoOfFree());
-      ndbrequire(c_backupPool.getSize() == c_backupPool.getNoOfFree());
-      ndbrequire(c_backupFilePool.getSize() == c_backupFilePool.getNoOfFree());
-      ndbrequire(c_pagePool.getSize() == c_pagePool.getNoOfFree());
-      ndbrequire(c_fragmentPool.getSize() == c_fragmentPool.getNoOfFree());
-      ndbrequire(c_triggerPool.getSize() == c_triggerPool.getNoOfFree());
+      /**
+       * Handle LCP
+       */
+      BackupRecordPtr lcp;
+      ndbrequire(c_backups.first(lcp));
+      
+      ndbrequire(c_backupPool.getSize() == c_backupPool.getNoOfFree() + 1);
+      if(lcp.p->tables.isEmpty())
+      {
+	ndbrequire(c_tablePool.getSize() == c_tablePool.getNoOfFree());
+	ndbrequire(c_attributePool.getSize() == c_attributePool.getNoOfFree());
+	ndbrequire(c_fragmentPool.getSize() == c_fragmentPool.getNoOfFree());
+	ndbrequire(c_triggerPool.getSize() == c_triggerPool.getNoOfFree());
+      }
+      ndbrequire(c_backupFilePool.getSize() == c_backupFilePool.getNoOfFree() + 1);
+      BackupFilePtr lcp_file;
+      c_backupFilePool.getPtr(lcp_file, lcp.p->dataFilePtr);
+      ndbrequire(c_pagePool.getSize() == 
+		 c_pagePool.getNoOfFree() + 
+		 lcp_file.p->pages.getSize());
     }
   }
 }
@@ -583,12 +500,6 @@
   TriggerEvent::TE_DELETE
 };
 
-const char* triggerNameFormat[] = {
-  "NDB$BACKUP_%d_%d_INSERT",
-  "NDB$BACKUP_%d_%d_UPDATE",
-  "NDB$BACKUP_%d_%d_DELETE"
-};
-
 const Backup::State 
 Backup::validSlaveTransitions[] = {
   INITIAL,  DEFINING,
@@ -858,7 +769,6 @@
       ref->backupPtr = ptr.i;
       ref->backupId = ptr.p->backupId;
       ref->errorCode = AbortBackupOrd::BackupFailureDueToNodeFail;
-      ref->signalNo = ptr.p->masterData.startBackup.signalNo;
       gsn= GSN_START_BACKUP_REF;
       len= StartBackupRef::SignalLength;
       pos= &ref->nodeId - signal->getDataPtr();
@@ -995,7 +905,6 @@
     return;
   }//if
 
-  ndbrequire(ptr.p->pages.empty());
   ndbrequire(ptr.p->tables.isEmpty());
   
   ptr.p->m_gsn = 0;
@@ -1010,9 +919,7 @@
   ptr.p->backupKey[1] = 0;
   ptr.p->backupDataLen = 0;
   ptr.p->masterData.errorCode = 0;
-  ptr.p->masterData.dropTrig.tableId = RNIL;
-  ptr.p->masterData.alterTrig.tableId = RNIL;
-  
+
   UtilSequenceReq * utilReq = (UtilSequenceReq*)signal->getDataPtrSend();
     
   ptr.p->masterData.gsn = GSN_UTIL_SEQUENCE_REQ;
@@ -1323,13 +1230,18 @@
   signal->theData[2] = ptr.p->backupId;
   ptr.p->nodes.copyto(NdbNodeBitmask::Size, signal->theData+3);
   sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3+NdbNodeBitmask::Size, JBB);
-  
+
   /**
-   * Prepare Trig
+   * We've received GSN_DEFINE_BACKUP_CONF from all participants.
+   *
+   * Our next step is to send START_BACKUP_REQ to all participants,
+   * who will then send CREATE_TRIG_REQ for all tables to their local
+   * DBTUP.
    */
   TablePtr tabPtr;
-  ndbrequire(ptr.p->tables.first(tabPtr));
-  sendCreateTrig(signal, ptr, tabPtr);
+  ptr.p->tables.first(tabPtr);
+
+  sendStartBackup(signal, ptr, tabPtr);
 }
 
 /*****************************************************************************
@@ -1343,11 +1255,12 @@
 {
   mask.clear();
   Table & table = * tabPtr.p;
-  for(Uint32 i = 0; i<table.noOfAttributes; i++) {
+  Ptr<Attribute> attrPtr;
+  table.attributes.first(attrPtr);
+  for(; !attrPtr.isNull(); table.attributes.next(attrPtr))
+  {
     jam();
-    AttributePtr attr;
-    table.attributes.getPtr(attr, i);
-    mask.set(i);
+    mask.set(attrPtr.p->data.attrId);
   }
 }
 
@@ -1356,42 +1269,72 @@
 			   BackupRecordPtr ptr, TablePtr tabPtr)
 {
   CreateTrigReq * req =(CreateTrigReq *)signal->getDataPtrSend();
-  
-  ptr.p->masterData.gsn = GSN_CREATE_TRIG_REQ;
-  ptr.p->masterData.sendCounter = 3;
-  ptr.p->masterData.createTrig.tableId = tabPtr.p->tableId;
+
+  /*
+   * First, setup the structures
+   */
+  for(Uint32 j=0; j<3; j++) {
+    jam();
+
+    TriggerPtr trigPtr;
+    if(!ptr.p->triggers.seize(trigPtr)) {
+      jam();
+      ptr.p->m_gsn = GSN_START_BACKUP_REF;
+      StartBackupRef* ref = (StartBackupRef*)signal->getDataPtrSend();
+      ref->backupPtr = ptr.i;
+      ref->backupId = ptr.p->backupId;
+      ref->errorCode = StartBackupRef::FailedToAllocateTriggerRecord;
+      ref->nodeId = getOwnNodeId();
+      sendSignal(ptr.p->masterRef, GSN_START_BACKUP_REF, signal,
+		 StartBackupRef::SignalLength, JBB);
+      return;
+    } // if
+
+    const Uint32 triggerId= trigPtr.i;
+    tabPtr.p->triggerIds[j] = triggerId;
+    tabPtr.p->triggerAllocated[j] = true;
+    trigPtr.p->backupPtr = ptr.i;
+    trigPtr.p->tableId = tabPtr.p->tableId;
+    trigPtr.p->tab_ptr_i = tabPtr.i;
+    trigPtr.p->logEntry = 0;
+    trigPtr.p->event = j;
+    trigPtr.p->maxRecordSize = 4096;
+    trigPtr.p->operation =
+      &ptr.p->files.getPtr(ptr.p->logFilePtr)->operation;
+    trigPtr.p->operation->noOfBytes = 0;
+    trigPtr.p->operation->noOfRecords = 0;
+    trigPtr.p->errorCode = 0;
+  } // for
+
+  /*
+   * now ask DBTUP to create
+   */
+  ptr.p->slaveData.gsn = GSN_CREATE_TRIG_REQ;
+  ptr.p->slaveData.trigSendCounter = 3;
+  ptr.p->slaveData.createTrig.tableId = tabPtr.p->tableId;
 
   req->setUserRef(reference());
+  req->setReceiverRef(reference());
   req->setConnectionPtr(ptr.i);
   req->setRequestType(CreateTrigReq::RT_USER);
-  
+
   Bitmask<MAXNROFATTRIBUTESINWORDS> attrMask;
   createAttributeMask(tabPtr, attrMask);
   req->setAttributeMask(attrMask);
   req->setTableId(tabPtr.p->tableId);
   req->setIndexId(RNIL);        // not used
-  req->setTriggerId(RNIL);      // to be created
   req->setTriggerType(TriggerType::SUBSCRIPTION);
   req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
   req->setMonitorReplicas(true);
   req->setMonitorAllAttributes(false);
-  req->setOnline(false);        // leave trigger offline
+  req->setOnline(true);
 
-  char triggerName[MAX_TAB_NAME_SIZE];
-  Uint32 nameBuffer[2 + ((MAX_TAB_NAME_SIZE + 3) >> 2)];  // SP string
-  LinearWriter w(nameBuffer, sizeof(nameBuffer) >> 2);
-  LinearSectionPtr lsPtr[3];
-  
   for (int i=0; i < 3; i++) {
+    req->setTriggerId(tabPtr.p->triggerIds[i]);
     req->setTriggerEvent(triggerEventValues[i]);
-    BaseString::snprintf(triggerName, sizeof(triggerName), triggerNameFormat[i],
-	     ptr.p->backupId, tabPtr.p->tableId);
-    w.reset();
-    w.add(CreateTrigReq::TriggerNameKey, triggerName);
-    lsPtr[0].p = nameBuffer;
-    lsPtr[0].sz = w.getWordsUsed();
-    sendSignal(DBDICT_REF, GSN_CREATE_TRIG_REQ, 
-	       signal, CreateTrigReq::SignalLength, JBB, lsPtr, 1);
+
+    sendSignal(DBTUP_REF, GSN_CREATE_TRIG_REQ,
+	       signal, CreateTrigReq::SignalLength, JBB);
   }
 }
 
@@ -1411,25 +1354,25 @@
 
   /**
    * Verify that I'm waiting for this conf
-   */
-  ndbrequire(ptr.p->masterRef == reference());
-  ndbrequire(ptr.p->masterData.gsn == GSN_CREATE_TRIG_REQ);
-  ndbrequire(ptr.p->masterData.sendCounter.done() == false);
-  ndbrequire(ptr.p->masterData.createTrig.tableId == tableId);
-  
+   *
+   * ptr.p->masterRef != reference()
+   * as slaves and masters have triggers now.
+   */
+  ndbrequire(ptr.p->slaveData.gsn == GSN_CREATE_TRIG_REQ);
+  ndbrequire(ptr.p->slaveData.trigSendCounter.done() == false);
+  ndbrequire(ptr.p->slaveData.createTrig.tableId == tableId);
+
   TablePtr tabPtr;
   ndbrequire(findTable(ptr, tabPtr, tableId));
   ndbrequire(type < 3); // if some decides to change the enums
 
-  ndbrequire(tabPtr.p->triggerIds[type] == ILLEGAL_TRIGGER_ID);
-  tabPtr.p->triggerIds[type] = triggerId;
-  
   createTrigReply(signal, ptr);
 }
 
 void
 Backup::execCREATE_TRIG_REF(Signal* signal)
 {
+  jamEntry();
   CreateTrigRef* ref = (CreateTrigRef*)signal->getDataPtr();
 
   const Uint32 ptrI = ref->getConnectionPtr();
@@ -1440,14 +1383,16 @@
 
   /**
    * Verify that I'm waiting for this ref
-   */
-  ndbrequire(ptr.p->masterRef == reference());
-  ndbrequire(ptr.p->masterData.gsn == GSN_CREATE_TRIG_REQ);
-  ndbrequire(ptr.p->masterData.sendCounter.done() == false);
-  ndbrequire(ptr.p->masterData.createTrig.tableId == tableId);
+   *
+   * ptr.p->masterRef != reference()
+   * as slaves and masters have triggers now
+   */
+  ndbrequire(ptr.p->slaveData.gsn == GSN_CREATE_TRIG_REQ);
+  ndbrequire(ptr.p->slaveData.trigSendCounter.done() == false);
+  ndbrequire(ptr.p->slaveData.createTrig.tableId == tableId);
 
   ptr.p->setErrorCode(ref->getErrorCode());
-  
+
   createTrigReply(signal, ptr);
 }
 
@@ -1459,26 +1404,33 @@
   /**
    * Check finished with table
    */
-  ptr.p->masterData.sendCounter--;
-  if(ptr.p->masterData.sendCounter.done() == false){
+  ptr.p->slaveData.trigSendCounter--;
+  if(ptr.p->slaveData.trigSendCounter.done() == false){
     jam();
     return;
   }//if
 
-  if (ERROR_INSERTED(10025)) 
+  if (ERROR_INSERTED(10025))
   {
     ptr.p->errorCode = 325;
   }
 
   if(ptr.p->checkError()) {
     jam();
-    masterAbort(signal, ptr);
+    ptr.p->m_gsn = GSN_START_BACKUP_REF;
+    StartBackupRef* ref = (StartBackupRef*)signal->getDataPtrSend();
+    ref->backupPtr = ptr.i;
+    ref->backupId = ptr.p->backupId;
+    ref->errorCode = ptr.p->errorCode;
+    ref->nodeId = getOwnNodeId();
+    sendSignal(ptr.p->masterRef, GSN_START_BACKUP_REF, signal,
+               StartBackupRef::SignalLength, JBB);
     return;
   }//if
 
   TablePtr tabPtr;
-  ndbrequire(findTable(ptr, tabPtr, ptr.p->masterData.createTrig.tableId));
-  
+  ndbrequire(findTable(ptr, tabPtr, ptr.p->slaveData.createTrig.tableId));
+
   /**
    * Next table
    */
@@ -1490,14 +1442,16 @@
   }//if
 
   /**
-   * Finished with all tables, send StartBackupReq
+   * We've finished creating triggers.
+   *
+   * send conf and wait
    */
-  ptr.p->tables.first(tabPtr);
-  ptr.p->masterData.startBackup.signalNo = 0;
-  ptr.p->masterData.startBackup.noOfSignals = 
-    (ptr.p->tables.noOfElements() + StartBackupReq::MaxTableTriggers - 1) / 
-    StartBackupReq::MaxTableTriggers;
-  sendStartBackup(signal, ptr, tabPtr);
+  ptr.p->m_gsn = GSN_START_BACKUP_CONF;
+  StartBackupConf* conf = (StartBackupConf*)signal->getDataPtrSend();
+  conf->backupPtr = ptr.i;
+  conf->backupId = ptr.p->backupId;
+  sendSignal(ptr.p->masterRef, GSN_START_BACKUP_CONF, signal,
+	     StartBackupConf::SignalLength, JBB);
 }
 
 /*****************************************************************************
@@ -1510,33 +1464,23 @@
 {
 
   ptr.p->masterData.startBackup.tablePtr = tabPtr.i;
-  
+
   StartBackupReq* req = (StartBackupReq*)signal->getDataPtrSend();
   req->backupId = ptr.p->backupId;
   req->backupPtr = ptr.i;
-  req->signalNo = ptr.p->masterData.startBackup.signalNo;
-  req->noOfSignals = ptr.p->masterData.startBackup.noOfSignals;
-  Uint32 i;
-  for(i = 0; i<StartBackupReq::MaxTableTriggers; i++) {
-    jam();
-    req->tableTriggers[i].tableId = tabPtr.p->tableId;
-    req->tableTriggers[i].triggerIds[0] = tabPtr.p->triggerIds[0];
-    req->tableTriggers[i].triggerIds[1] = tabPtr.p->triggerIds[1];
-    req->tableTriggers[i].triggerIds[2] = tabPtr.p->triggerIds[2];
-    if(!ptr.p->tables.next(tabPtr)){
-      jam();
-      i++;
-      break;
-    }//if
-  }//for
-  req->noOfTableTriggers = i;
 
+  /**
+   * We use trigger Ids that are unique to BACKUP.
+   * These don't interfere with other triggers (e.g. from DBDICT)
+   * as there is a special case in DBTUP.
+   *
+   * Consequently, backups during online upgrade won't work
+   */
   ptr.p->masterData.gsn = GSN_START_BACKUP_REQ;
   ptr.p->masterData.sendCounter = ptr.p->nodes;
   NodeReceiverGroup rg(BACKUP, ptr.p->nodes);
-  sendSignal(rg, GSN_START_BACKUP_REQ, signal, 
-	     StartBackupReq::HeaderLength + 
-	     (i * StartBackupReq::TableTriggerLength), JBB);
+  sendSignal(rg, GSN_START_BACKUP_REQ, signal,
+	     StartBackupReq::SignalLength, JBB);
 }
 
 void
@@ -1547,14 +1491,13 @@
   StartBackupRef* ref = (StartBackupRef*)signal->getDataPtr();
   const Uint32 ptrI = ref->backupPtr;
   //const Uint32 backupId = ref->backupId;
-  const Uint32 signalNo = ref->signalNo;
   const Uint32 nodeId = ref->nodeId;
 
   BackupRecordPtr ptr;
   c_backupPool.getPtr(ptr, ptrI);
 
   ptr.p->setErrorCode(ref->errorCode);
-  startBackupReply(signal, ptr, nodeId, signalNo);
+  startBackupReply(signal, ptr, nodeId);
 }
 
 void
@@ -1565,23 +1508,20 @@
   StartBackupConf* conf = (StartBackupConf*)signal->getDataPtr();
   const Uint32 ptrI = conf->backupPtr;
   //const Uint32 backupId = conf->backupId;
-  const Uint32 signalNo = conf->signalNo;
   const Uint32 nodeId = refToNode(signal->senderBlockRef());
   
   BackupRecordPtr ptr;
   c_backupPool.getPtr(ptr, ptrI);
 
-  startBackupReply(signal, ptr, nodeId, signalNo);
+  startBackupReply(signal, ptr, nodeId);
 }
 
 void
-Backup::startBackupReply(Signal* signal, BackupRecordPtr ptr, 
-			 Uint32 nodeId, Uint32 signalNo)
+Backup::startBackupReply(Signal* signal, BackupRecordPtr ptr, Uint32 nodeId)
 {
 
   CRASH_INSERTION((10004));
 
-  ndbrequire(ptr.p->masterData.startBackup.signalNo == signalNo);
   if (!haveAllSignals(ptr, GSN_START_BACKUP_REQ, nodeId)) {
     jam();
     return;
@@ -1598,149 +1538,21 @@
     return;
   }
 
-  TablePtr tabPtr;
-  c_tablePool.getPtr(tabPtr, ptr.p->masterData.startBackup.tablePtr);
-  for(Uint32 i = 0; i<StartBackupReq::MaxTableTriggers; i++) {
-    jam();
-    if(!ptr.p->tables.next(tabPtr)) {
-      jam();
-      break;
-    }//if
-  }//for
-  
-  if(tabPtr.i != RNIL) {
-    jam();
-    ptr.p->masterData.startBackup.signalNo++;
-    sendStartBackup(signal, ptr, tabPtr);
-    return;
-  }
-
-  sendAlterTrig(signal, ptr);
-}
-
-/*****************************************************************************
- * 
- * Master functionallity - Activate triggers
- *
- *****************************************************************************/
-void
-Backup::sendAlterTrig(Signal* signal, BackupRecordPtr ptr)
-{
-  AlterTrigReq * req =(AlterTrigReq *)signal->getDataPtrSend();
-  
-  ptr.p->masterData.gsn = GSN_ALTER_TRIG_REQ;
-  ptr.p->masterData.sendCounter = 0;
-  
-  req->setUserRef(reference());
-  req->setConnectionPtr(ptr.i);
-  req->setRequestType(AlterTrigReq::RT_USER);
-  req->setTriggerInfo(0);       // not used on ALTER via DICT
-  req->setOnline(true);
-  req->setReceiverRef(reference());
-
-  TablePtr tabPtr;
-
-  if (ptr.p->masterData.alterTrig.tableId == RNIL) {
-    jam();
-    ptr.p->tables.first(tabPtr);
-  } else {
-    jam();
-    ndbrequire(findTable(ptr, tabPtr, ptr.p->masterData.alterTrig.tableId));
-    ptr.p->tables.next(tabPtr);
-  }//if
-  if (tabPtr.i != RNIL) {
-    jam();
-    ptr.p->masterData.alterTrig.tableId = tabPtr.p->tableId;
-    req->setTableId(tabPtr.p->tableId);
-
-    req->setTriggerId(tabPtr.p->triggerIds[0]);
-    sendSignal(DBDICT_REF, GSN_ALTER_TRIG_REQ, 
-	       signal, AlterTrigReq::SignalLength, JBB);
-    
-    req->setTriggerId(tabPtr.p->triggerIds[1]);
-    sendSignal(DBDICT_REF, GSN_ALTER_TRIG_REQ, 
-	       signal, AlterTrigReq::SignalLength, JBB);
-
-    req->setTriggerId(tabPtr.p->triggerIds[2]);
-    sendSignal(DBDICT_REF, GSN_ALTER_TRIG_REQ, 
-	       signal, AlterTrigReq::SignalLength, JBB);
-
-    ptr.p->masterData.sendCounter += 3;
-    return;
-  }//if
-  ptr.p->masterData.alterTrig.tableId = RNIL;
-
   /**
-   * Finished with all tables
+   * Wait for GCP
    */
   ptr.p->masterData.gsn = GSN_WAIT_GCP_REQ;
   ptr.p->masterData.waitGCP.startBackup = true;
-  
+
   WaitGCPReq * waitGCPReq = (WaitGCPReq*)signal->getDataPtrSend();
   waitGCPReq->senderRef = reference();
   waitGCPReq->senderData = ptr.i;
   waitGCPReq->requestType = WaitGCPReq::CompleteForceStart;
-  sendSignal(DBDIH_REF, GSN_WAIT_GCP_REQ, signal, 
+  sendSignal(DBDIH_REF, GSN_WAIT_GCP_REQ, signal,
 	     WaitGCPReq::SignalLength,JBB);
 }
 
 void
-Backup::execALTER_TRIG_CONF(Signal* signal)
-{
-  jamEntry();
-
-  AlterTrigConf* conf = (AlterTrigConf*)signal->getDataPtr();
-  const Uint32 ptrI = conf->getConnectionPtr();
-  
-  BackupRecordPtr ptr;
-  c_backupPool.getPtr(ptr, ptrI);
-  
-  alterTrigReply(signal, ptr);
-}
-
-void
-Backup::execALTER_TRIG_REF(Signal* signal)
-{
-  jamEntry();
-
-  AlterTrigRef* ref = (AlterTrigRef*)signal->getDataPtr();
-  const Uint32 ptrI = ref->getConnectionPtr();
-  
-  BackupRecordPtr ptr;
-  c_backupPool.getPtr(ptr, ptrI);
-
-  ptr.p->setErrorCode(ref->getErrorCode());
-  
-  alterTrigReply(signal, ptr);
-}
-
-void
-Backup::alterTrigReply(Signal* signal, BackupRecordPtr ptr)
-{
-
-  CRASH_INSERTION((10005));
-
-  ndbrequire(ptr.p->masterRef == reference());
-  ndbrequire(ptr.p->masterData.gsn == GSN_ALTER_TRIG_REQ);
-  ndbrequire(ptr.p->masterData.sendCounter.done() == false);
-
-  ptr.p->masterData.sendCounter--;
-
-  if(ptr.p->masterData.sendCounter.done() == false){
-    jam();
-    return;
-  }//if
-
-  if(ptr.p->checkError()){
-    jam();
-    masterAbort(signal, ptr);
-    return;
-  }//if
-
-  sendAlterTrig(signal, ptr);
-}
-
-void
 Backup::execWAIT_GCP_REF(Signal* signal)
 {
   jamEntry();
@@ -1800,7 +1612,12 @@
     {
       CRASH_INSERTION((10009));
       ptr.p->stopGCP = gcp;
-      sendDropTrig(signal, ptr); // regular dropping of triggers
+      /**
+       * Backup is complete - begin cleanup
+       * STOP_BACKUP_REQ is sent to participants.
+       * They then drop the local triggers
+       */
+      sendStopBackup(signal, ptr);
       return;
     }//if
     
@@ -2007,8 +1824,8 @@
 }
 
 /*****************************************************************************
- * 
- * Master functionallity - Drop triggers
+ *
+ * Slave functionallity - Drop triggers
  *
  *****************************************************************************/
 
@@ -2016,23 +1833,63 @@
 Backup::sendDropTrig(Signal* signal, BackupRecordPtr ptr)
 {
   TablePtr tabPtr;
-  if (ptr.p->masterData.dropTrig.tableId == RNIL) {
+  ptr.p->slaveData.gsn = GSN_DROP_TRIG_REQ;
+
+  if (ptr.p->slaveData.dropTrig.tableId == RNIL) {
     jam();
     ptr.p->tables.first(tabPtr);
   } else {
     jam();
-    ndbrequire(findTable(ptr, tabPtr, ptr.p->masterData.dropTrig.tableId));
+    ndbrequire(findTable(ptr, tabPtr, ptr.p->slaveData.dropTrig.tableId));
     ptr.p->tables.next(tabPtr);
   }//if
   if (tabPtr.i != RNIL) {
     jam();
     sendDropTrig(signal, ptr, tabPtr);
   } else {
-    jam();
-    ptr.p->masterData.dropTrig.tableId = RNIL;
+    /**
+     * Insert footers
+     */
+    {
+      BackupFilePtr filePtr;
+      ptr.p->files.getPtr(filePtr, ptr.p->logFilePtr);
+      Uint32 * dst;
+      ndbrequire(filePtr.p->operation.dataBuffer.getWritePtr(&dst, 1));
+      * dst = 0;
+      filePtr.p->operation.dataBuffer.updateWritePtr(1);
+    }
 
-    sendStopBackup(signal, ptr);
-  }//if
+    {
+      BackupFilePtr filePtr;
+      ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
+
+      const Uint32 gcpSz = sizeof(BackupFormat::CtlFile::GCPEntry) >> 2;
+
+      Uint32 * dst;
+      ndbrequire(filePtr.p->operation.dataBuffer.getWritePtr(&dst, gcpSz));
+
+      BackupFormat::CtlFile::GCPEntry * gcp = 
+	(BackupFormat::CtlFile::GCPEntry*)dst;
+
+      gcp->SectionType   = htonl(BackupFormat::GCP_ENTRY);
+      gcp->SectionLength = htonl(gcpSz);
+      gcp->StartGCP      = htonl(ptr.p->startGCP);
+      gcp->StopGCP       = htonl(ptr.p->stopGCP - 1);
+      filePtr.p->operation.dataBuffer.updateWritePtr(gcpSz);
+    }
+
+    { // UNLOCK while dropping trigger for better timeslicing
+      TablePtr tabPtr;
+      for(ptr.p->tables.first(tabPtr); tabPtr.i != RNIL;
+	  ptr.p->tables.next(tabPtr))
+      {
+	signal->theData[0] = tabPtr.p->tableId;
+	signal->theData[1] = 0; // unlock
+	EXECUTE_DIRECT(DBDICT, GSN_BACKUP_FRAGMENT_REQ, signal, 2);
+      }
+    }
+    closeFiles(signal, ptr);
+  }
 }
 
 void
@@ -2041,40 +1898,26 @@
   jam();
   DropTrigReq * req = (DropTrigReq *)signal->getDataPtrSend();
 
-  ptr.p->masterData.gsn = GSN_DROP_TRIG_REQ;
-  ptr.p->masterData.sendCounter = 0;
-    
+  ptr.p->slaveData.gsn = GSN_DROP_TRIG_REQ;
+  ptr.p->slaveData.trigSendCounter = 0;
   req->setConnectionPtr(ptr.i);
   req->setUserRef(reference()); // Sending to myself
   req->setRequestType(DropTrigReq::RT_USER);
   req->setIndexId(RNIL);
-  req->setTriggerInfo(0);       // not used on DROP via DICT
+  req->setTriggerInfo(0);       // not used on DROP
+  req->setTriggerType(TriggerType::SUBSCRIPTION);
+  req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
 
-  char triggerName[MAX_TAB_NAME_SIZE];
-  Uint32 nameBuffer[2 + ((MAX_TAB_NAME_SIZE + 3) >> 2)];  // SP string
-  LinearWriter w(nameBuffer, sizeof(nameBuffer) >> 2);
-  LinearSectionPtr lsPtr[3];
-  
-  ptr.p->masterData.dropTrig.tableId = tabPtr.p->tableId;
+  ptr.p->slaveData.dropTrig.tableId = tabPtr.p->tableId;
   req->setTableId(tabPtr.p->tableId);
 
   for (int i = 0; i < 3; i++) {
     Uint32 id = tabPtr.p->triggerIds[i];
     req->setTriggerId(id);
-    if (id != ILLEGAL_TRIGGER_ID) {
-      sendSignal(DBDICT_REF, GSN_DROP_TRIG_REQ, 
-		 signal, DropTrigReq::SignalLength, JBB);
-    } else {
-      BaseString::snprintf(triggerName, sizeof(triggerName), triggerNameFormat[i],
-	       ptr.p->backupId, tabPtr.p->tableId);
-      w.reset();
-      w.add(CreateTrigReq::TriggerNameKey, triggerName);
-      lsPtr[0].p = nameBuffer;
-      lsPtr[0].sz = w.getWordsUsed();
-      sendSignal(DBDICT_REF, GSN_DROP_TRIG_REQ, 
-		 signal, DropTrigReq::SignalLength, JBB, lsPtr, 1);
-    }
-    ptr.p->masterData.sendCounter ++;
+    req->setTriggerEvent(triggerEventValues[i]);
+    sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ,
+	       signal, DropTrigReq::SignalLength, JBB);
+    ptr.p->slaveData.trigSendCounter ++;
   }
 }
 
@@ -2085,11 +1928,13 @@
 
   DropTrigRef* ref = (DropTrigRef*)signal->getDataPtr();
   const Uint32 ptrI = ref->getConnectionPtr();
-  
+
   BackupRecordPtr ptr;
   c_backupPool.getPtr(ptr, ptrI);
- 
-  //ndbrequire(ref->getErrorCode() == DropTrigRef::NoSuchTrigger);
+
+  ndbout << "ERROR DROPPING TRIGGER: " << ref->getConf()->getTriggerId();
+  ndbout << " Err: " << (Uint32)ref->getErrorCode() << endl << endl;
+
   dropTrigReply(signal, ptr);
 }
 
@@ -2100,29 +1945,29 @@
   
   DropTrigConf* conf = (DropTrigConf*)signal->getDataPtr();
   const Uint32 ptrI = conf->getConnectionPtr();
-  
+  const Uint32 triggerId= conf->getTriggerId();
+
   BackupRecordPtr ptr;
   c_backupPool.getPtr(ptr, ptrI);
-  
+
   dropTrigReply(signal, ptr);
 }
 
 void
 Backup::dropTrigReply(Signal* signal, BackupRecordPtr ptr)
 {
-
   CRASH_INSERTION((10012));
 
-  ndbrequire(ptr.p->masterRef == reference());
-  ndbrequire(ptr.p->masterData.gsn == GSN_DROP_TRIG_REQ);
-  ndbrequire(ptr.p->masterData.sendCounter.done() == false);
-  
-  ptr.p->masterData.sendCounter--;
-  if(ptr.p->masterData.sendCounter.done() == false){
+  ndbrequire(ptr.p->slaveData.gsn == GSN_DROP_TRIG_REQ);
+  ndbrequire(ptr.p->slaveData.trigSendCounter.done() == false);
+
+  // move from .masterData to .slaveData
+  ptr.p->slaveData.trigSendCounter--;
+  if(ptr.p->slaveData.trigSendCounter.done() == false){
     jam();
     return;
   }//if
-  
+
   sendDropTrig(signal, ptr); // recursive next
 }
 
@@ -2245,6 +2090,9 @@
 #ifdef DEBUG_ABORT
   ndbout_c("************ masterAbort");
 #endif
+
+  ndbassert(ptr.p->masterRef == reference());
+
   if(ptr.p->masterData.errorCode != 0)
   {
     jam();
@@ -2288,13 +2136,13 @@
   case GSN_BACKUP_FRAGMENT_REQ:
     jam();
     ptr.p->stopGCP= ptr.p->startGCP + 1;
-    sendDropTrig(signal, ptr); // dropping due to error
+    sendStopBackup(signal, ptr); // dropping due to error
     return;
   case GSN_UTIL_SEQUENCE_REQ:
   case GSN_UTIL_LOCK_REQ:
-  case GSN_DROP_TRIG_REQ:
     ndbrequire(false);
     return;
+  case GSN_DROP_TRIG_REQ:
   case GSN_STOP_BACKUP_REQ:
     return;
   }
@@ -2341,6 +2189,27 @@
 void
 Backup::defineBackupRef(Signal* signal, BackupRecordPtr ptr, Uint32 errCode)
 {
+  if(ptr.p->is_lcp()) 
+  {
+    jam();
+    TablePtr tabPtr;
+    FragmentPtr fragPtr;
+    
+    ptr.p->setErrorCode(errCode);
+    ndbrequire(ptr.p->tables.first(tabPtr));
+    tabPtr.p->fragments.getPtr(fragPtr, 0);
+    
+    LcpPrepareRef* ref= (LcpPrepareRef*)signal->getDataPtrSend();
+    ref->senderData = ptr.p->clientData;
+    ref->senderRef = reference();
+    ref->tableId = tabPtr.p->tableId;
+    ref->fragmentId = fragPtr.p->fragmentId;
+    ref->errorCode = errCode;
+    sendSignal(ptr.p->masterRef, GSN_LCP_PREPARE_REF, 
+	       signal, LcpPrepareRef::SignalLength, JBB);
+    return;
+  }
+
   ptr.p->m_gsn = GSN_DEFINE_BACKUP_REF;
   ptr.p->setErrorCode(errCode);
   ndbrequire(ptr.p->errorCode != 0);
@@ -2388,6 +2257,7 @@
   ptr.p->m_gsn = GSN_DEFINE_BACKUP_REQ;
   ptr.p->slaveState.forceState(INITIAL);
   ptr.p->slaveState.setState(DEFINING);
+  ptr.p->slaveData.dropTrig.tableId = RNIL;
   ptr.p->errorCode = 0;
   ptr.p->clientRef = req->clientRef;
   ptr.p->clientData = req->clientData;
@@ -2404,15 +2274,15 @@
   ptr.p->backupKey[0] = req->backupKey[0];
   ptr.p->backupKey[1] = req->backupKey[1];
   ptr.p->backupDataLen = req->backupDataLen;
-  ptr.p->masterData.dropTrig.tableId = RNIL;
-  ptr.p->masterData.alterTrig.tableId = RNIL;
   ptr.p->masterData.errorCode = 0;
   ptr.p->noOfBytes = 0;
   ptr.p->noOfRecords = 0;
   ptr.p->noOfLogBytes = 0;
   ptr.p->noOfLogRecords = 0;
   ptr.p->currGCP = 0;
-  
+  ptr.p->startGCP = 0;
+  ptr.p->stopGCP = 0;
+
   /**
    * Allocate files
    */
@@ -2446,9 +2316,22 @@
   maxWrite[2] = c_defaults.m_maxWriteSize;
   noOfPages[2] = (c_defaults.m_dataBufferSize + sizeof(Page32) - 1) / 
     sizeof(Page32);
+
+  if (ptr.p->is_lcp())
+  {
+    noOfPages[2] = (c_defaults.m_lcp_buffer_size + sizeof(Page32) - 1) / 
+      sizeof(Page32);
+  }
   
+  ptr.p->ctlFilePtr = ptr.p->logFilePtr = ptr.p->dataFilePtr = RNIL;
+
   for(Uint32 i = 0; i<3; i++) {
     jam();
+    if(ptr.p->is_lcp() && i != 2)
+    {
+      files[i].i = RNIL;
+      continue;
+    }
     if(!ptr.p->files.seize(files[i])) {
       jam();
       defineBackupRef(signal, ptr, 
@@ -2486,15 +2369,22 @@
       defineBackupRef(signal, ptr, DefineBackupRef::FailedToSetupFsBuffers);
       return;
     }//if
+
+    switch(i){
+    case 0:
+      files[i].p->fileType = BackupFormat::CTL_FILE;
+      ptr.p->ctlFilePtr = files[i].i;
+      break;
+    case 1:
+      files[i].p->fileType = BackupFormat::LOG_FILE;
+      ptr.p->logFilePtr = files[i].i;
+      break;
+    case 2:
+      files[i].p->fileType = BackupFormat::DATA_FILE;
+      ptr.p->dataFilePtr = files[i].i;
+    }
   }//for
-  files[0].p->fileType = BackupFormat::CTL_FILE;
-  files[1].p->fileType = BackupFormat::LOG_FILE;
-  files[2].p->fileType = BackupFormat::DATA_FILE;
-  
-  ptr.p->ctlFilePtr = files[0].i;
-  ptr.p->logFilePtr = files[1].i;
-  ptr.p->dataFilePtr = files[2].i;
-  
+    
   if (!verifyNodesAlive(ptr, ptr.p->nodes)) {
     jam();
     defineBackupRef(signal, ptr, DefineBackupRef::Undefined);
@@ -2512,6 +2402,13 @@
     return;
   }//if
   
+  if(ptr.p->is_lcp())
+  {
+    jam();
+    getFragmentInfoDone(signal, ptr);
+    return;
+  }
+  
   /**
    * Not implemented
    */
@@ -2548,15 +2445,22 @@
     Uint32 tableId = ListTablesConf::getTableId(conf->tableData[i]);
     Uint32 tableType = ListTablesConf::getTableType(conf->tableData[i]);
     Uint32 state= ListTablesConf::getTableState(conf->tableData[i]);
-    if (!DictTabInfo::isTable(tableType) && !DictTabInfo::isIndex(tableType)){
+
+    if (! (DictTabInfo::isTable(tableType) ||
+	   DictTabInfo::isIndex(tableType) ||
+	   DictTabInfo::isFilegroup(tableType) ||
+	   DictTabInfo::isFile(tableType)))
+    {
       jam();
       continue;
-    }//if
+    }
+    
     if (state != DictTabInfo::StateOnline)
     {
       jam();
       continue;
-    }//if
+    }
+    
     TablePtr tabPtr;
     ptr.p->tables.seize(tabPtr);
     if(tabPtr.i == RNIL) {
@@ -2595,8 +2499,7 @@
     FsOpenReq::OM_WRITEONLY | 
     FsOpenReq::OM_TRUNCATE |
     FsOpenReq::OM_CREATE | 
-    FsOpenReq::OM_APPEND |
-    FsOpenReq::OM_SYNC;
+    FsOpenReq::OM_APPEND;
   FsOpenReq::v2_setCount(req->fileNumber, 0xFFFFFFFF);
   
   /**
@@ -2716,30 +2619,44 @@
     return;
   }//if
 
-  /**
-   * Insert file headers
-   */
-  ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
-  if(!insertFileHeader(BackupFormat::CTL_FILE, ptr.p, filePtr.p)) {
-    jam();
-    defineBackupRef(signal, ptr, DefineBackupRef::FailedInsertFileHeader);
-    return;
-  }//if
-
-  ptr.p->files.getPtr(filePtr, ptr.p->logFilePtr);
-  if(!insertFileHeader(BackupFormat::LOG_FILE, ptr.p, filePtr.p)) {
-    jam();
-    defineBackupRef(signal, ptr, DefineBackupRef::FailedInsertFileHeader);
-    return;
-  }//if
-
-  ptr.p->files.getPtr(filePtr, ptr.p->dataFilePtr);
-  if(!insertFileHeader(BackupFormat::DATA_FILE, ptr.p, filePtr.p)) {
-    jam();
-    defineBackupRef(signal, ptr, DefineBackupRef::FailedInsertFileHeader);
-    return;
-  }//if
-
+  if(!ptr.p->is_lcp())
+  {
+    /**
+     * Insert file headers
+     */
+    ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
+    if(!insertFileHeader(BackupFormat::CTL_FILE, ptr.p, filePtr.p)) {
+      jam();
+      defineBackupRef(signal, ptr, DefineBackupRef::FailedInsertFileHeader);
+      return;
+    }//if
+    
+    ptr.p->files.getPtr(filePtr, ptr.p->logFilePtr);
+    if(!insertFileHeader(BackupFormat::LOG_FILE, ptr.p, filePtr.p)) {
+      jam();
+      defineBackupRef(signal, ptr, DefineBackupRef::FailedInsertFileHeader);
+      return;
+    }//if
+    
+    ptr.p->files.getPtr(filePtr, ptr.p->dataFilePtr);
+    if(!insertFileHeader(BackupFormat::DATA_FILE, ptr.p, filePtr.p)) {
+      jam();
+      defineBackupRef(signal, ptr, DefineBackupRef::FailedInsertFileHeader);
+      return;
+    }//if
+  }
+  else
+  {
+    ptr.p->files.getPtr(filePtr, ptr.p->dataFilePtr);
+    if(!insertFileHeader(BackupFormat::LCP_FILE, ptr.p, filePtr.p)) {
+      jam();
+      defineBackupRef(signal, ptr, DefineBackupRef::FailedInsertFileHeader);
+      return;
+    }//if
+    
+    ptr.p->ctlFilePtr = ptr.p->dataFilePtr;
+  }
+  
   /**
    * Start CTL file thread
    */
@@ -2747,17 +2664,17 @@
   filePtr.p->fileRunning = 1;
   
   signal->theData[0] = BackupContinueB::START_FILE_THREAD;
-  signal->theData[1] = ptr.p->ctlFilePtr;
+  signal->theData[1] = filePtr.i;
   sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 2);
 
   /**
    * Insert table list in ctl file
    */
   FsBuffer & buf = filePtr.p->operation.dataBuffer;
-
+  
   const Uint32 sz = 
     (sizeof(BackupFormat::CtlFile::TableList) >> 2) +
-    ptr.p->tables.noOfElements() - 1;
+    ptr.p->tables.count() - 1;
   
   Uint32 * dst;
   ndbrequire(sz < buf.getMaxWrite());
@@ -2788,7 +2705,7 @@
    * Start getting table definition data
    */
   ndbrequire(ptr.p->tables.first(tabPtr));
-
+  
   signal->theData[0] = BackupContinueB::BUFFER_FULL_META;
   signal->theData[1] = ptr.i;
   signal->theData[2] = tabPtr.i;
@@ -2853,6 +2770,8 @@
   //const Uint32 senderRef = info->senderRef;
   const Uint32 len = conf->totalLen;
   const Uint32 senderData = conf->senderData;
+  const Uint32 tableType = conf->tableType;
+  const Uint32 tableId = conf->tableId;
 
   BackupRecordPtr ptr;
   c_backupPool.getPtr(ptr, senderData);
@@ -2861,28 +2780,15 @@
   signal->getSection(dictTabInfoPtr, GetTabInfoConf::DICT_TAB_INFO);
   ndbrequire(dictTabInfoPtr.sz == len);
 
-  /**
-   * No of pages needed
-   */
-  const Uint32 noPages = (len + sizeof(Page32) - 1) / sizeof(Page32);
-  if(ptr.p->pages.getSize() < noPages) {
-    jam();
-    ptr.p->pages.release();
-    if(ptr.p->pages.seize(noPages) == false) {
-      jam();
-      ptr.p->setErrorCode(DefineBackupRef::FailedAllocateTableMem);
-      ndbrequire(false);
-      releaseSections(signal);
-      defineBackupRef(signal, ptr);
-      return;
-    }//if
-  }//if
-  
+  TablePtr tabPtr ;
+  ndbrequire(findTable(ptr, tabPtr, tableId));
+
   BackupFilePtr filePtr;
   ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
   FsBuffer & buf = filePtr.p->operation.dataBuffer;
+  Uint32* dst = 0;
   { // Write into ctl file
-    Uint32* dst, dstLen = len + 2;
+    Uint32 dstLen = len + 3;
     if(!buf.getWritePtr(&dst, dstLen)) {
       jam();
       ndbrequire(false);
@@ -2897,61 +2803,77 @@
       BackupFormat::CtlFile::TableDescription * desc = 
         (BackupFormat::CtlFile::TableDescription*)dst;
       desc->SectionType = htonl(BackupFormat::TABLE_DESCRIPTION);
-      desc->SectionLength = htonl(len + 2);
-      dst += 2;
-
+      desc->SectionLength = htonl(len + 3);
+      desc->TableType = htonl(tableType);
+      dst += 3;
+      
       copy(dst, dictTabInfoPtr);
       buf.updateWritePtr(dstLen);
     }//if
   }
-  
-  ndbrequire(ptr.p->pages.getSize() >= noPages);
-  Page32Ptr pagePtr;
-  ptr.p->pages.getPtr(pagePtr, 0);
-  copy(&pagePtr.p->data[0], dictTabInfoPtr);
+
   releaseSections(signal);
-  
+
   if(ptr.p->checkError()) {
     jam();
     defineBackupRef(signal, ptr);
     return;
   }//if
 
-  TablePtr tabPtr = parseTableDescription(signal, ptr, len);
-  if(tabPtr.i == RNIL) {
+  if (!DictTabInfo::isTable(tabPtr.p->tableType))
+  {
     jam();
-    defineBackupRef(signal, ptr);
-    return;
-  }//if
 
-  TablePtr tmp = tabPtr;
-  ptr.p->tables.next(tabPtr);
-  if(DictTabInfo::isIndex(tmp.p->tableType))
+    TablePtr tmp = tabPtr;
+    ptr.p->tables.next(tabPtr);
+    ptr.p->tables.release(tmp);
+    goto next;
+  }
+  
+  if (!parseTableDescription(signal, ptr, tabPtr, dst, len))
   {
     jam();
-    ptr.p->tables.release(tmp);
+    defineBackupRef(signal, ptr);
+    return;
   }
-  else
+  
+  if(!ptr.p->is_lcp())
   {
     jam();
-    signal->theData[0] = tmp.p->tableId;
+    signal->theData[0] = tabPtr.p->tableId;
     signal->theData[1] = 1; // lock
     EXECUTE_DIRECT(DBDICT, GSN_BACKUP_FRAGMENT_REQ, signal, 2);
   }
-
-  if(tabPtr.i == RNIL) {
+  
+  ptr.p->tables.next(tabPtr);
+  
+next:
+  if(tabPtr.i == RNIL) 
+  {
+    /**
+     * Done with all tables...
+     */
     jam();
     
-    ptr.p->pages.release();
+    if(ptr.p->is_lcp())
+    {
+      lcp_open_file_done(signal, ptr);
+      return;
+    }
     
     ndbrequire(ptr.p->tables.first(tabPtr));
-    signal->theData[0] = RNIL;
-    signal->theData[1] = tabPtr.p->tableId;
-    signal->theData[2] = ptr.i;
-    sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 3, JBB);
+    DihFragCountReq * const req = (DihFragCountReq*)signal->getDataPtrSend();
+    req->m_connectionData = RNIL;
+    req->m_tableRef = tabPtr.p->tableId;
+    req->m_senderData = ptr.i;
+    sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 
+               DihFragCountReq::SignalLength, JBB);
     return;
   }//if
 
+  /**
+   * Fetch next table...
+   */
   signal->theData[0] = BackupContinueB::BUFFER_FULL_META;
   signal->theData[1] = ptr.i;
   signal->theData[2] = tabPtr.i;
@@ -2959,14 +2881,14 @@
   return;
 }
 
-Backup::TablePtr
-Backup::parseTableDescription(Signal* signal, BackupRecordPtr ptr, Uint32 len)
+bool
+Backup::parseTableDescription(Signal* signal, 
+			      BackupRecordPtr ptr, 
+			      TablePtr tabPtr, 
+			      const Uint32 * tabdescptr,
+			      Uint32 len)
 {
-
-  Page32Ptr pagePtr;
-  ptr.p->pages.getPtr(pagePtr, 0);
-  
-  SimplePropertiesLinearReader it(&pagePtr.p->data[0], len);
+  SimplePropertiesLinearReader it(tabdescptr, len);
   
   it.first();
   
@@ -2977,13 +2899,15 @@
 				  DictTabInfo::TableMappingSize, 
 				  true, true);
   ndbrequire(stat == SimpleProperties::Break);
+
+  bool lcp = ptr.p->is_lcp();
+
+  ndbrequire(tabPtr.p->tableId == tmpTab.TableId);
+  ndbrequire(lcp || (tabPtr.p->tableType == tmpTab.TableType));
   
-  TablePtr tabPtr;
-  ndbrequire(findTable(ptr, tabPtr, tmpTab.TableId));
-  if(DictTabInfo::isIndex(tabPtr.p->tableType)){
-    jam();
-    return tabPtr;
-  }
+  /**
+   * LCP should not save disk attributes but only mem attributes
+   */
   
   /**
    * Initialize table object
@@ -3000,13 +2924,7 @@
   tabPtr.p->triggerAllocated[1] = false;
   tabPtr.p->triggerAllocated[2] = false;
 
-  if(tabPtr.p->attributes.seize(tabPtr.p->noOfAttributes) == false) {
-    jam();
-    ptr.p->setErrorCode(DefineBackupRef::FailedToAllocateAttributeRecord);
-    tabPtr.i = RNIL;
-    return tabPtr;
-  }//if
-  
+  Uint32 disk = 0;
   const Uint32 count = tabPtr.p->noOfAttributes;
   for(Uint32 i = 0; i<count; i++) {
     jam();
@@ -3017,59 +2935,113 @@
 				    true, true);
     
     ndbrequire(stat == SimpleProperties::Break);
+    it.next(); // Move Past EndOfAttribute
 
     const Uint32 arr = tmp.AttributeArraySize;
     const Uint32 sz = 1 << tmp.AttributeSize;
     const Uint32 sz32 = (sz * arr + 31) >> 5;
 
+    if(lcp && tmp.AttributeStorageType == NDB_STORAGETYPE_DISK)
+    {
+      disk++;
+      continue;
+    }
+
     AttributePtr attrPtr;
-    tabPtr.p->attributes.getPtr(attrPtr, tmp.AttributeId);
+    if(!tabPtr.p->attributes.seize(attrPtr))
+    {
+      jam();
+      ptr.p->setErrorCode(DefineBackupRef::FailedToAllocateAttributeRecord);
+      return false;
+    }
     
-    attrPtr.p->data.nullable = tmp.AttributeNullableFlag;
-    attrPtr.p->data.fixed = (tmp.AttributeArraySize != 0);
-    attrPtr.p->data.sz32 = sz32;
+    attrPtr.p->data.m_flags = 0;
+    attrPtr.p->data.attrId = tmp.AttributeId;
 
+    attrPtr.p->data.m_flags |= 
+      (tmp.AttributeNullableFlag ? Attribute::COL_NULLABLE : 0);
+    attrPtr.p->data.m_flags |= (tmp.AttributeArrayType == NDB_ARRAYTYPE_FIXED)?
+      Attribute::COL_FIXED : 0;
+    attrPtr.p->data.sz32 = sz32;
+    
     /**
-     * Either
-     * 1) Fixed
-     * 2) Nullable
-     * 3) Variable
+     * 1) Fixed non-nullable
+     * 2) Other
      */
-    if(attrPtr.p->data.fixed == true && attrPtr.p->data.nullable == false) {
+    if(attrPtr.p->data.m_flags & Attribute::COL_FIXED && 
+       !(attrPtr.p->data.m_flags & Attribute::COL_NULLABLE)) {
       jam();
       attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
       tabPtr.p->sz_FixedAttributes += sz32;
-    }//if
-    
-    if(attrPtr.p->data.fixed == true && attrPtr.p->data.nullable == true) {
-      jam();
-      attrPtr.p->data.offset = 0;
-      
-      attrPtr.p->data.offsetNull = tabPtr.p->noOfNull;
-      tabPtr.p->noOfNull++;
-      tabPtr.p->noOfVariable++;
-    }//if
-    
-    if(attrPtr.p->data.fixed == false) {
-      jam();
+    } else {
+      attrPtr.p->data.offset = ~0;
       tabPtr.p->noOfVariable++;
-      ndbrequire(0);
-    }//if
-    
-    it.next(); // Move Past EndOfAttribute
+    }
   }//for
-  return tabPtr;
+
+
+  if(lcp)
+  {
+    if (disk)
+    {
+      /**
+       * Remove all disk attributes, but add DISK_REF (8 bytes)
+       */
+      tabPtr.p->noOfAttributes -= (disk - 1);
+      
+      AttributePtr attrPtr;
+      ndbrequire(tabPtr.p->attributes.seize(attrPtr));
+      
+      Uint32 sz32 = 2;
+      attrPtr.p->data.attrId = AttributeHeader::DISK_REF;
+      attrPtr.p->data.m_flags = Attribute::COL_FIXED;
+      attrPtr.p->data.sz32 = 2;
+      
+      attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
+      tabPtr.p->sz_FixedAttributes += sz32;
+    }
+   
+    {
+      AttributePtr attrPtr;
+      ndbrequire(tabPtr.p->attributes.seize(attrPtr));
+      
+      Uint32 sz32 = 2;
+      attrPtr.p->data.attrId = AttributeHeader::ROWID;
+      attrPtr.p->data.m_flags = Attribute::COL_FIXED;
+      attrPtr.p->data.sz32 = 2;
+      
+      attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
+      tabPtr.p->sz_FixedAttributes += sz32;
+      tabPtr.p->noOfAttributes ++;
+    }
+
+    if (tmpTab.RowGCIFlag)
+    {
+      AttributePtr attrPtr;
+      ndbrequire(tabPtr.p->attributes.seize(attrPtr));
+      
+      Uint32 sz32 = 2;
+      attrPtr.p->data.attrId = AttributeHeader::ROW_GCI;
+      attrPtr.p->data.m_flags = Attribute::COL_FIXED;
+      attrPtr.p->data.sz32 = 2;
+      
+      attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
+      tabPtr.p->sz_FixedAttributes += sz32;
+      tabPtr.p->noOfAttributes ++;
+    }
+  }
+  return true;
 }
 
 void
 Backup::execDI_FCOUNTCONF(Signal* signal)
 {
   jamEntry();
-  
-  const Uint32 userPtr = signal->theData[0];
-  const Uint32 fragCount = signal->theData[1];
-  const Uint32 tableId = signal->theData[2];
-  const Uint32 senderData = signal->theData[3];
+  DihFragCountConf * const conf = (DihFragCountConf*)signal->getDataPtr();
+  const Uint32 userPtr = conf->m_connectionData;
+  const Uint32 fragCount = conf->m_fragmentCount;
+  const Uint32 tableId = conf->m_tableRef;
+  const Uint32 senderData = conf->m_senderData;
 
   ndbrequire(userPtr == RNIL && signal->length() == 5);
   
@@ -3087,6 +3059,7 @@
     fragPtr.p->scanned = 0;
     fragPtr.p->scanning = 0;
     fragPtr.p->tableId = tableId;
+    fragPtr.p->fragmentId = i;
     fragPtr.p->node = RNIL;
   }//for
   
@@ -3095,10 +3068,12 @@
    */
   if(ptr.p->tables.next(tabPtr)) {
     jam();
-    signal->theData[0] = RNIL;
-    signal->theData[1] = tabPtr.p->tableId;
-    signal->theData[2] = ptr.i;
-    sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 3, JBB);    
+    DihFragCountReq * const req = (DihFragCountReq*)signal->getDataPtrSend();
+    req->m_connectionData = RNIL;
+    req->m_tableRef = tabPtr.p->tableId;
+    req->m_senderData = ptr.i;
+    sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 
+               DihFragCountReq::SignalLength, JBB);
     return;
   }//if
   
@@ -3158,7 +3133,7 @@
 
   FragmentPtr fragPtr;
   tabPtr.p->fragments.getPtr(fragPtr, fragNo);
-
+  
   fragPtr.p->node = signal->theData[2];
 
   getFragmentInfo(signal, ptr, tabPtr, fragNo + 1);
@@ -3188,63 +3163,22 @@
   jamEntry();
 
   CRASH_INSERTION((10015));
-  
+
   StartBackupReq* req = (StartBackupReq*)signal->getDataPtr();
   const Uint32 ptrI = req->backupPtr;
-  //const Uint32 backupId = req->backupId;
-  const Uint32 signalNo = req->signalNo;
-  
+
   BackupRecordPtr ptr;
   c_backupPool.getPtr(ptr, ptrI);
-  
+
   ptr.p->slaveState.setState(STARTED);
   ptr.p->m_gsn = GSN_START_BACKUP_REQ;
 
-  for(Uint32 i = 0; i<req->noOfTableTriggers; i++) {
-    jam();
-    TablePtr tabPtr;
-    ndbrequire(findTable(ptr, tabPtr, req->tableTriggers[i].tableId));
-    for(Uint32 j = 0; j<3; j++) {
-      jam();
-      const Uint32 triggerId = req->tableTriggers[i].triggerIds[j];
-      tabPtr.p->triggerIds[j] = triggerId;
-      
-      TriggerPtr trigPtr;
-      if(!ptr.p->triggers.seizeId(trigPtr, triggerId)) {
-        jam();
-	ptr.p->m_gsn = GSN_START_BACKUP_REF;
-	StartBackupRef* ref = (StartBackupRef*)signal->getDataPtrSend();
-	ref->backupPtr = ptr.i;
-	ref->backupId = ptr.p->backupId;
-	ref->signalNo = signalNo;
-	ref->errorCode = StartBackupRef::FailedToAllocateTriggerRecord;
-	ref->nodeId = getOwnNodeId();
-	sendSignal(ptr.p->masterRef, GSN_START_BACKUP_REF, signal,
-		   StartBackupRef::SignalLength, JBB);
-	return;
-      }//if
-
-      tabPtr.p->triggerAllocated[j] = true;
-      trigPtr.p->backupPtr = ptr.i;
-      trigPtr.p->tableId = tabPtr.p->tableId;
-      trigPtr.p->tab_ptr_i = tabPtr.i;
-      trigPtr.p->logEntry = 0;
-      trigPtr.p->event = j;
-      trigPtr.p->maxRecordSize = 4096;
-      trigPtr.p->operation = 
-	&ptr.p->files.getPtr(ptr.p->logFilePtr)->operation;
-      trigPtr.p->operation->noOfBytes = 0;
-      trigPtr.p->operation->noOfRecords = 0;
-      trigPtr.p->errorCode = 0;
-    }//for
-  }//for
-  
   /**
    * Start file threads...
    */
   BackupFilePtr filePtr;
-  for(ptr.p->files.first(filePtr); 
-      filePtr.i!=RNIL; 
+  for(ptr.p->files.first(filePtr);
+      filePtr.i!=RNIL;
       ptr.p->files.next(filePtr)){
     jam();
     if(filePtr.p->fileRunning == 0) {
@@ -3255,14 +3189,13 @@
       sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 2);
     }//if
   }//for
-  
-  ptr.p->m_gsn = GSN_START_BACKUP_CONF;
-  StartBackupConf* conf = (StartBackupConf*)signal->getDataPtrSend();
-  conf->backupPtr = ptr.i;
-  conf->backupId = ptr.p->backupId;
-  conf->signalNo = signalNo;
-  sendSignal(ptr.p->masterRef, GSN_START_BACKUP_CONF, signal,
-	     StartBackupConf::SignalLength, JBB);
+
+  /**
+   * Tell DBTUP to create triggers
+   */
+  TablePtr tabPtr;
+  ndbrequire(ptr.p->tables.first(tabPtr));
+  sendCreateTrig(signal, ptr, tabPtr);
 }
 
 /*****************************************************************************
@@ -3283,7 +3216,7 @@
   const Uint32 tableId = req->tableId;
   const Uint32 fragNo = req->fragmentNo;
   const Uint32 count = req->count;
-  
+
   /**
    * Get backup record
    */
@@ -3320,7 +3253,7 @@
   ndbrequire(fragPtr.p->scanned == 0);
   ndbrequire(fragPtr.p->scanning == 0 || 
 	     refToNode(ptr.p->masterRef) == getOwnNodeId());
-  
+
   /**
    * Init operation
    */
@@ -3333,7 +3266,7 @@
   /**
    * Check for space in buffer
    */
-  if(!filePtr.p->operation.newFragment(tableId, fragNo)) {
+  if(!filePtr.p->operation.newFragment(tableId, fragPtr.p->fragmentId)) {
     jam();
     req->count = count + 1;
     sendSignalWithDelay(BACKUP_REF, GSN_BACKUP_FRAGMENT_REQ, signal, 50,
@@ -3346,8 +3279,8 @@
    * Mark things as "in use"
    */
   fragPtr.p->scanning = 1;
-  filePtr.p->fragmentNo = fragNo;
-
+  filePtr.p->fragmentNo = fragPtr.p->fragmentId;
+  
   /**
    * Start scan
    */
@@ -3362,7 +3295,7 @@
     req->senderData = filePtr.i;
     req->resultRef = reference();
     req->schemaVersion = table.schemaVersion;
-    req->fragmentNoKeyLen = fragNo;
+    req->fragmentNoKeyLen = fragPtr.p->fragmentId;
     req->requestInfo = 0;
     req->savePointId = 0;
     req->tableId = table.tableId;
@@ -3371,6 +3304,12 @@
     ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
     ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
     ScanFragReq::setAttrLen(req->requestInfo,attrLen); 
+    if (ptr.p->is_lcp())
+    {
+      ScanFragReq::setScanPrio(req->requestInfo, 1);
+      ScanFragReq::setTupScanFlag(req->requestInfo, 1);
+      ScanFragReq::setNoDiskFlag(req->requestInfo, 1);
+    }
     req->transId1 = 0;
     req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8);
     req->clientOpPtr= filePtr.i;
@@ -3391,13 +3330,20 @@
     signal->theData[7] = 0;
     
     Uint32 dataPos = 8;
-    Uint32 i;
-    for(i = 0; i<table.noOfAttributes; i++) {
+    Ptr<Attribute> attrPtr;
+    table.attributes.first(attrPtr);
+    for(; !attrPtr.isNull(); table.attributes.next(attrPtr))
+    {
       jam();
-      AttributePtr attr;
-      table.attributes.getPtr(attr, i);
       
-      AttributeHeader::init(&signal->theData[dataPos], i, 0);
+      /**
+       * LCP should not save disk attributes
+       */
+      ndbrequire(! (ptr.p->is_lcp() && 
+		    attrPtr.p->data.m_flags & Attribute::COL_DISK));
+      
+      AttributeHeader::init(&signal->theData[dataPos], 
+			    attrPtr.p->data.attrId, 0);
       dataPos++;
       if(dataPos == 25) {
         jam();
@@ -3444,65 +3390,57 @@
   op.attrSzTotal += dataLen;
 
   Uint32 srcSz = dataLen;
+  Uint32 usedSz = 0;
   const Uint32 * src = &signal->theData[3];
 
-  Uint32 * dst = op.dst;
-  Uint32 dstSz = op.attrSzLeft;
+  Ptr<Attribute> attrPtr;
+  table.attributes.first(attrPtr);
+  Uint32 columnNo = 0;
   
-  while(srcSz > 0) {
+  while (usedSz < srcSz) 
+  {
     jam();
-
-    if(dstSz == 0) {
-      jam();
-
-      /**
-       * Finished with one attribute now find next
-       */
-      const AttributeHeader attrHead(* src);
-      const Uint32 attrId = attrHead.getAttributeId();
-      const bool null = attrHead.isNULL();
-      const Attribute::Data attr = table.attributes.getPtr(attrId)->data;
-      
-      srcSz -= attrHead.getHeaderSize();
-      src   += attrHead.getHeaderSize();
-      
-      if(null) {
-	jam();
-	ndbrequire(attr.nullable);
-	op.nullAttribute(attr.offsetNull);
-	dstSz = 0;
-	continue;
-      }//if
+    
+    /**
+     * Finished with one attribute now find next
+     */
+    const AttributeHeader attrHead(* src);
+    const Uint32 attrId = attrHead.getAttributeId();
+    const bool null = attrHead.isNULL();
+    const Attribute::Data attr = attrPtr.p->data;
+    ndbrequire(attrId == attr.attrId);
+    
+    usedSz += attrHead.getHeaderSize();
+    src    += attrHead.getHeaderSize();
       
-      dstSz = attrHead.getDataSize();
-      ndbrequire(dstSz == attr.sz32);
-      if(attr.fixed && ! attr.nullable) {
-	jam();
-	dst = op.newAttrib(attr.offset, dstSz);
-      } else if (attr.fixed && attr.nullable) {
-	jam();
-	dst = op.newNullable(attrId, dstSz);
+    if (null) {
+      jam();
+      ndbrequire(attr.m_flags & Attribute::COL_NULLABLE);
+      op.nullVariable();
+    } else {
+      Uint32* dst;
+      Uint32 dstSz = attrHead.getDataSize();
+      if (attr.m_flags & Attribute::COL_FIXED && 
+         ! (attr.m_flags & Attribute::COL_NULLABLE)) {
+        jam();
+        dst = op.newAttrib(attr.offset, dstSz);
+        ndbrequire(dstSz == attr.sz32);
       } else {
-	ndbrequire(false);
-	//dst = op.newVariable(attrId, attrSize);
-      }//if
-    }//if
-    
-    const Uint32 szCopy = (dstSz > srcSz) ? srcSz : dstSz;
-    memcpy(dst, src, (szCopy << 2));
-
-    srcSz -= szCopy;
-    dstSz -= szCopy;
-    src   += szCopy;
-    dst   += szCopy;
-  }//while
-  op.dst        = dst;
-  op.attrSzLeft = dstSz;
-  
-  if(op.finished()){
-    jam();
-    op.newRecord(op.dst);
+        dst = op.newVariable(columnNo, attrHead.getByteSize());
+        ndbrequire(dstSz <= attr.sz32);
+      }
+      
+      memcpy(dst, src, (dstSz << 2));
+      src    += dstSz;
+      usedSz += dstSz;
+    }
+    table.attributes.next(attrPtr);
+    columnNo++;
   }
+  
+  ndbrequire(usedSz == srcSz);
+  ndbrequire(op.finished());
+  op.newRecord(op.dst);
 }
 
 void 
@@ -3697,6 +3635,7 @@
   conf->fragmentNo = filePtr.p->fragmentNo;
   conf->noOfRecords = op.noOfRecords;
   conf->noOfBytes = op.noOfBytes;
+
   sendSignal(ptr.p->masterRef, GSN_BACKUP_FRAGMENT_CONF, signal,
 	     BackupFragmentConf::SignalLength, JBB);
   
@@ -3808,7 +3747,7 @@
 Backup::execFSAPPENDCONF(Signal* signal)
 {
   jamEntry();
-  
+
   CRASH_INSERTION((10018));
 
   //FsConf * conf = (FsConf*)signal->getDataPtr();
@@ -3911,10 +3850,13 @@
   Uint32 result;
 
   jamEntry();
+
   c_triggerPool.getPtr(trigPtr, trigger_id);
+
   c_tablePool.getPtr(tabPtr, trigPtr.p->tab_ptr_i);
   tabPtr.p->fragments.getPtr(fragPtr, frag_id);
   if (fragPtr.p->node != getOwnNodeId()) {
+
     jam();
     result = ZFALSE;
   } else {
@@ -3935,12 +3877,12 @@
   TriggerPtr trigPtr;
   c_triggerPool.getPtr(trigPtr, trg->getTriggerId());
   ndbrequire(trigPtr.p->event != ILLEGAL_TRIGGER_ID); // Online...
-  
+
   if(trigPtr.p->errorCode != 0) {
     jam();
     return;
   }//if
-  
+
   if(trg->getAttrInfoType() == TrigAttrInfo::BEFORE_VALUES) {
     jam();
     /**
@@ -3977,18 +3919,29 @@
       memcpy(signal->getDataPtrSend(), save, 4*TrigAttrInfo::StaticLength);
       return;
     }//if
-        
+
     logEntry = (BackupFormat::LogFile::LogEntry *)dst;
     trigPtr.p->logEntry = logEntry;
     logEntry->Length       = 0;
     logEntry->TableId      = htonl(trigPtr.p->tableId);
-    logEntry->TriggerEvent = htonl(trigPtr.p->event);
+
+
+    if(trigPtr.p->event==0)
+      logEntry->TriggerEvent= htonl(TriggerEvent::TE_INSERT);
+    else if(trigPtr.p->event==1)
+      logEntry->TriggerEvent= htonl(TriggerEvent::TE_UPDATE);
+    else if(trigPtr.p->event==2)
+      logEntry->TriggerEvent= htonl(TriggerEvent::TE_DELETE);
+    else {
+      ndbout << "Bad Event: " << trigPtr.p->event << endl;
+      ndbrequire(false);
+    }
   } else {
     ndbrequire(logEntry->TableId == htonl(trigPtr.p->tableId));
-    ndbrequire(logEntry->TriggerEvent == htonl(trigPtr.p->event));
+//    ndbrequire(logEntry->TriggerEvent == htonl(trigPtr.p->event));
   }//if
-  
-  const Uint32 pos = logEntry->Length; 
+
+  const Uint32 pos = logEntry->Length;
   const Uint32 dataLen = signal->length() - TrigAttrInfo::StaticLength;
   memcpy(&logEntry->Data[pos], trg->getData(), dataLen << 2);
 
@@ -4003,6 +3956,7 @@
 
   const Uint32 gci = trg->getGCI();
   const Uint32 trI = trg->getTriggerId();
+  const Uint32 fragId = trg->fragId;
 
   TriggerPtr trigPtr;
   c_triggerPool.getPtr(trigPtr, trI);
@@ -4016,19 +3970,19 @@
 
   ndbrequire(trigPtr.p->logEntry != 0);
   Uint32 len = trigPtr.p->logEntry->Length;
+  trigPtr.p->logEntry->FragId = htonl(fragId);
 
   BackupRecordPtr ptr;
   c_backupPool.getPtr(ptr, trigPtr.p->backupPtr);
-  if(gci != ptr.p->currGCP) 
+  if(gci != ptr.p->currGCP)
   {
     jam();
-    
-    trigPtr.p->logEntry->TriggerEvent = htonl(trigPtr.p->event | 0x10000);
+    trigPtr.p->logEntry->TriggerEvent|= htonl(0x10000);
     trigPtr.p->logEntry->Data[len] = htonl(gci);
-    len ++;
+    len++;
     ptr.p->currGCP = gci;
-  }//if
-  
+  }
+
   len += (sizeof(BackupFormat::LogFile::LogEntry) >> 2) - 2;
   trigPtr.p->logEntry->Length = htonl(len);
 
@@ -4084,7 +4038,7 @@
    * At least one GCP must have passed
    */
   ndbrequire(stopGCP > startGCP);
-  
+
   /**
    * Get backup record
    */
@@ -4093,50 +4047,13 @@
 
   ptr.p->slaveState.setState(STOPPING);
   ptr.p->m_gsn = GSN_STOP_BACKUP_REQ;
+  ptr.p->startGCP= startGCP;
+  ptr.p->stopGCP= stopGCP;
 
   /**
-   * Insert footers
+   * Destroy the triggers in local DBTUP we created
    */
-  {
-    BackupFilePtr filePtr;
-    ptr.p->files.getPtr(filePtr, ptr.p->logFilePtr);
-    Uint32 * dst;
-    ndbrequire(filePtr.p->operation.dataBuffer.getWritePtr(&dst, 1));
-    * dst = 0;
-    filePtr.p->operation.dataBuffer.updateWritePtr(1);
-  }
-
-  {
-    BackupFilePtr filePtr;
-    ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
-    
-    const Uint32 gcpSz = sizeof(BackupFormat::CtlFile::GCPEntry) >> 2;
-    
-    Uint32 * dst;
-    ndbrequire(filePtr.p->operation.dataBuffer.getWritePtr(&dst, gcpSz));
-    
-    BackupFormat::CtlFile::GCPEntry * gcp = 
-      (BackupFormat::CtlFile::GCPEntry*)dst;
-    
-    gcp->SectionType   = htonl(BackupFormat::GCP_ENTRY);
-    gcp->SectionLength = htonl(gcpSz);
-    gcp->StartGCP      = htonl(startGCP);
-    gcp->StopGCP       = htonl(stopGCP - 1);
-    filePtr.p->operation.dataBuffer.updateWritePtr(gcpSz);
-  }
-
-  {
-    TablePtr tabPtr;
-    for(ptr.p->tables.first(tabPtr); tabPtr.i != RNIL;
-	ptr.p->tables.next(tabPtr))
-    {
-      signal->theData[0] = tabPtr.p->tableId;
-      signal->theData[1] = 0; // unlock
-      EXECUTE_DIRECT(DBDICT, GSN_BACKUP_FRAGMENT_REQ, signal, 2);
-    }
-  }
-  
-  closeFiles(signal, ptr);
+  sendDropTrig(signal, ptr);
 }
 
 void
@@ -4234,6 +4151,7 @@
   ndbrequire(filePtr.p->scanRunning == 0);	     
   
   filePtr.p->fileOpened = 0;
+  filePtr.p->operation.dataBuffer.reset();
   
   BackupRecordPtr ptr;
   c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
@@ -4255,6 +4173,12 @@
 Backup::closeFilesDone(Signal* signal, BackupRecordPtr ptr)
 {
   jam();
+
+  if(ptr.p->is_lcp())
+  {
+    lcp_close_file_conf(signal, ptr);
+    return;
+  }
   
   jam();
   BackupFilePtr filePtr;
@@ -4395,19 +4319,10 @@
   ptr.p->masterRef = reference();
   ptr.p->nodes.clear();
   ptr.p->nodes.set(getOwnNodeId());
-  
-  if(ref == reference())
-  {
-    ptr.p->stopGCP= ptr.p->startGCP + 1;
-    sendDropTrig(signal, ptr);
-  }
-  else
-  {
-    ptr.p->masterData.gsn = GSN_STOP_BACKUP_REQ;
-    ptr.p->masterData.sendCounter.clearWaitingFor();
-    ptr.p->masterData.sendCounter.setWaitingFor(getOwnNodeId());
-    closeFiles(signal, ptr);
-  }
+
+
+  ptr.p->stopGCP= ptr.p->startGCP + 1;
+  sendStopBackup(signal, ptr);
 }
 
 
@@ -4499,7 +4414,6 @@
   ptr.p->files.release();
   ptr.p->tables.release();
   ptr.p->triggers.release();
-  ptr.p->pages.release();
   ptr.p->backupId = ~0;
   
   if(ptr.p->checkError())
@@ -4554,3 +4468,203 @@
   c_backups.release(ptr);
 }
 
+/**
+ * LCP
+ */
+void
+Backup::execLCP_PREPARE_REQ(Signal* signal)
+{
+  jamEntry();
+  LcpPrepareReq req = *(LcpPrepareReq*)signal->getDataPtr();
+
+  BackupRecordPtr ptr;
+  c_backupPool.getPtr(ptr, req.backupPtr);
+
+  bool first= true;
+  TablePtr tabPtr;
+  if(ptr.p->tables.first(tabPtr) && tabPtr.p->tableId != req.tableId)
+  {
+    jam();
+    first= false;
+    tabPtr.p->attributes.release();
+    tabPtr.p->fragments.release();
+    ptr.p->tables.release();
+    ptr.p->errorCode = 0;
+  } 
+
+  if(ptr.p->tables.first(tabPtr) && ptr.p->errorCode == 0)
+  {
+    jam();
+    FragmentPtr fragPtr;
+    tabPtr.p->fragments.getPtr(fragPtr, 0);
+    fragPtr.p->fragmentId = req.fragmentId;
+    
+    lcp_open_file_done(signal, ptr);
+    return;
+  } 
+  else if(ptr.p->errorCode == 0)
+  {
+    jam();
+    FragmentPtr fragPtr;
+    if(!ptr.p->tables.seize(tabPtr) || !tabPtr.p->fragments.seize(1))
+    {
+      if(!tabPtr.isNull())
+	ptr.p->tables.release();
+      ndbrequire(false); // TODO
+    }
+    tabPtr.p->tableId = req.tableId;
+    tabPtr.p->fragments.getPtr(fragPtr, 0);
+    tabPtr.p->tableType = DictTabInfo::UserTable;
+    fragPtr.p->fragmentId = req.fragmentId;
+    fragPtr.p->lcp_no = req.lcpNo;
+    fragPtr.p->scanned = 0;
+    fragPtr.p->scanning = 0;
+    fragPtr.p->tableId = req.tableId;
+  } 
+  else
+  {
+    jam();
+    FragmentPtr fragPtr;
+    tabPtr.p->fragments.getPtr(fragPtr, 0);
+    fragPtr.p->fragmentId = req.fragmentId;
+    defineBackupRef(signal, ptr, ptr.p->errorCode);
+    return;
+  }
+  
+  if(first)
+  {
+    jam();
+    // start file thread
+    ptr.p->backupId= req.backupId;
+    lcp_open_file(signal, ptr);
+    return;
+  }
+  else
+  {
+    jam();
+    ndbrequire(ptr.p->backupId == req.backupId);
+  }
+  
+  /**
+   * Close previous file
+   */
+  jam();
+  BackupFilePtr filePtr;
+  c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr);
+  filePtr.p->fileClosing = 1;
+  filePtr.p->operation.dataBuffer.eof();
+}
+
+void
+Backup::lcp_close_file_conf(Signal* signal, BackupRecordPtr ptr)
+{
+  if(!ptr.p->tables.isEmpty())
+  {
+    jam();
+    lcp_open_file(signal, ptr);
+    return;
+  }
+  
+  lcp_send_end_lcp_conf(signal, ptr);
+}
+
+void
+Backup::lcp_open_file(Signal* signal, BackupRecordPtr ptr)
+{
+  FsOpenReq * req = (FsOpenReq *)signal->getDataPtrSend();
+  req->userReference = reference();
+  req->fileFlags = 
+    FsOpenReq::OM_WRITEONLY | 
+    FsOpenReq::OM_TRUNCATE |
+    FsOpenReq::OM_CREATE | 
+    FsOpenReq::OM_APPEND;
+  FsOpenReq::v2_setCount(req->fileNumber, 0xFFFFFFFF);
+  
+  TablePtr tabPtr;
+  FragmentPtr fragPtr;
+  
+  ndbrequire(ptr.p->tables.first(tabPtr));
+  tabPtr.p->fragments.getPtr(fragPtr, 0);
+
+  /**
+   * Lcp file
+   */
+  BackupFilePtr filePtr;
+  c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr);
+  ndbrequire(filePtr.p->fileRunning == 0);
+  filePtr.p->fileClosing = 0;
+  filePtr.p->fileRunning = 1;
+  
+  req->userPointer = filePtr.i;
+  FsOpenReq::setVersion(req->fileNumber, 5);
+  FsOpenReq::setSuffix(req->fileNumber, FsOpenReq::S_DATA);
+  FsOpenReq::v5_setLcpNo(req->fileNumber, fragPtr.p->lcp_no);
+  FsOpenReq::v5_setTableId(req->fileNumber, tabPtr.p->tableId);
+  sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBA);
+}
+
+void
+Backup::lcp_open_file_done(Signal* signal, BackupRecordPtr ptr)
+{
+  TablePtr tabPtr;
+  FragmentPtr fragPtr;
+
+  ndbrequire(ptr.p->tables.first(tabPtr));
+  tabPtr.p->fragments.getPtr(fragPtr, 0);
+  
+  ptr.p->slaveState.setState(STARTED);
+  
+  LcpPrepareConf* conf= (LcpPrepareConf*)signal->getDataPtrSend();
+  conf->senderData = ptr.p->clientData;
+  conf->senderRef = reference();
+  conf->tableId = tabPtr.p->tableId;
+  conf->fragmentId = fragPtr.p->fragmentId;
+  sendSignal(ptr.p->masterRef, GSN_LCP_PREPARE_CONF, 
+	     signal, LcpPrepareConf::SignalLength, JBB);
+}
+
+void
+Backup::execEND_LCPREQ(Signal* signal)
+{
+  EndLcpReq* req= (EndLcpReq*)signal->getDataPtr();
+
+  BackupRecordPtr ptr;
+  c_backupPool.getPtr(ptr, req->backupPtr);
+  ndbrequire(ptr.p->backupId == req->backupId);
+
+  ptr.p->slaveState.setState(STOPPING);
+
+  TablePtr tabPtr;
+  if(ptr.p->tables.first(tabPtr))
+  {
+    tabPtr.p->attributes.release();
+    tabPtr.p->fragments.release();
+    ptr.p->tables.release();
+  
+    BackupFilePtr filePtr;
+    c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr);
+    filePtr.p->fileClosing = 1;
+    filePtr.p->operation.dataBuffer.eof();
+    return;
+  } 
+  
+  lcp_send_end_lcp_conf(signal, ptr);
+}
+
+void
+Backup::lcp_send_end_lcp_conf(Signal* signal, BackupRecordPtr ptr)
+{
+  EndLcpConf* conf= (EndLcpConf*)signal->getDataPtr();
+
+  conf->senderData = ptr.p->clientData;
+  conf->senderRef = reference();
+  
+  ptr.p->errorCode = 0;
+  ptr.p->slaveState.setState(CLEANING);
+  ptr.p->slaveState.setState(INITIAL);
+  ptr.p->slaveState.setState(DEFINING);
+  ptr.p->slaveState.setState(DEFINED);
+
+  sendSignal(ptr.p->masterRef, GSN_END_LCPCONF,
+	     signal, EndLcpConf::SignalLength, JBB);
+}
Thread
bk commit into 5.1 tree (bell:1.2198)sanja17 Mar