List:Internals« Previous MessageNext Message »
From:Martin Skold Date:November 7 2005 10:57am
Subject:bk commit into 5.1 tree (mskold:1.1892)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of marty. When marty does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1892 05/11/07 11:57:42 mskold@stripped +4 -0
  Merge mysql.com:/usr/local/home/marty/MySQL/mysql-5.1-opt
  into  mysql.com:/usr/local/home/marty/MySQL/mysql-5.1

  storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
    1.21 05/11/07 11:57:34 mskold@stripped +0 -0
    Auto merged

  storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
    1.57 05/11/07 11:57:34 mskold@stripped +0 -0
    Auto merged

  sql/ha_ndbcluster.h
    1.97 05/11/07 11:57:34 mskold@stripped +0 -0
    Auto merged

  sql/ha_ndbcluster.cc
    1.212 05/11/07 11:57:33 mskold@stripped +0 -0
    Auto merged

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	mskold
# Host:	blowfish.ndb.mysql.com
# Root:	/usr/local/home/marty/MySQL/mysql-5.1/RESYNC

--- 1.56/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2005-09-28 11:05:28 +02:00
+++ 1.57/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2005-11-07 11:57:34 +01:00
@@ -3255,8 +3255,8 @@
   lreq->gci = tablePtr.p->gciTableCreated;
   lreq->requestType = AlterTabReq::AlterTablePrepare;
   
-  sendSignal(rg, GSN_ALTER_TAB_REQ, signal, 
-	     AlterTabReq::SignalLength, JBB);
+  sendFragmentedSignal(rg, GSN_ALTER_TAB_REQ, signal, 
+		       AlterTabReq::SignalLength, JBB);
 }
 
 void Dbdict::alterTableRef(Signal * signal, 
@@ -3740,8 +3740,8 @@
 	lreq->gci = gci;
 	lreq->requestType = AlterTabReq::AlterTableCommit;
 	
-	sendSignal(rg, GSN_ALTER_TAB_REQ, signal, 
-		   AlterTabReq::SignalLength, JBB);
+	sendFragmentedSignal(rg, GSN_ALTER_TAB_REQ, signal, 
+			     AlterTabReq::SignalLength, JBB);
       }
     }
     else {
@@ -4439,6 +4439,44 @@
 
   sendSignal(DBDIH_REF, GSN_DIADDTABREQ, signal, 
 	     DiAddTabReq::SignalLength, JBB);
+
+  /**
+   * Create KeyDescriptor
+   */
+  KeyDescriptor* desc= g_key_descriptor_pool.getPtr(tabPtr.i);
+  new (desc) KeyDescriptor();
+
+  Uint32 key = 0;
+  Uint32 tAttr = tabPtr.p->firstAttribute;
+  while (tAttr != RNIL) 
+  {
+    jam();
+    AttributeRecord* aRec = c_attributeRecordPool.getPtr(tAttr);
+    if (aRec->tupleKey) 
+    {
+      desc->noOfKeyAttr ++;
+      desc->keyAttr[key].attributeDescriptor = aRec->attributeDescriptor;
+      
+      Uint32 csNumber = (aRec->extPrecision >> 16);
+      if(csNumber)
+      {
+	desc->keyAttr[key].charsetInfo = all_charsets[csNumber];
+	ndbrequire(all_charsets[csNumber]);
+	desc->hasCharAttr = 1;
+      }
+      else
+      {
+	desc->keyAttr[key].charsetInfo = 0;	  
+      }
+      if(AttributeDescriptor::getDKey(aRec->attributeDescriptor))
+      {
+	desc->noOfDistrKeys ++;
+      }
+      key++;
+    }
+    tAttr = aRec->nextAttrInTable;
+  }
+  ndbrequire(key == tabPtr.p->noOfPrimkey);
 }
 
 static
@@ -4541,44 +4579,6 @@
     sendSignal(DBLQH_REF, GSN_LQHFRAGREQ, signal, 
 	       LqhFragReq::SignalLength, JBB);
   }
-
-  /**
-   * Create KeyDescriptor
-   */
-  KeyDescriptor* desc= g_key_descriptor_pool.getPtr(tabPtr.i);
-  new (desc) KeyDescriptor();
-
-  Uint32 key = 0;
-  Uint32 tAttr = tabPtr.p->firstAttribute;
-  while (tAttr != RNIL) 
-  {
-    jam();
-    AttributeRecord* aRec = c_attributeRecordPool.getPtr(tAttr);
-    if (aRec->tupleKey) 
-    {
-      desc->noOfKeyAttr ++;
-      desc->keyAttr[key].attributeDescriptor = aRec->attributeDescriptor;
-      
-      Uint32 csNumber = (aRec->extPrecision >> 16);
-      if(csNumber)
-      {
-	desc->keyAttr[key].charsetInfo = all_charsets[csNumber];
-	ndbrequire(all_charsets[csNumber]);
-	desc->hasCharAttr = 1;
-      }
-      else
-      {
-	desc->keyAttr[key].charsetInfo = 0;	  
-      }
-      if(AttributeDescriptor::getDKey(aRec->attributeDescriptor))
-      {
-	desc->noOfDistrKeys ++;
-      }
-      key++;
-    }
-    tAttr = aRec->nextAttrInTable;
-  }
-  ndbrequire(key == tabPtr.p->noOfPrimkey);
 }
 
 void
@@ -5440,7 +5440,10 @@
 /* ---------------------------------------------------------------- */
 // Error Handling code needed
 /* ---------------------------------------------------------------- */
-  progError(ref->errorCode, 0);
+  char buf[32];
+  BaseString::snprintf(buf, sizeof(buf), "WAIT_GCP_REF ErrorCode=%d",
+		       ref->errorCode);
+  progError(__LINE__, NDBD_EXIT_NDBREQUIRE, buf);
 }//execWAIT_GCP_REF()
 
 
@@ -12164,7 +12167,8 @@
 */
 
 void
-Dbdict::getTableKeyList(TableRecordPtr tablePtr, AttributeList& list)
+Dbdict::getTableKeyList(TableRecordPtr tablePtr, 
+			Id_array<MAX_ATTRIBUTES_IN_INDEX+1>& list)
 {
   jam();
   list.sz = 0;
@@ -12175,6 +12179,7 @@
       list.id[list.sz++] = aRec->attributeId;
     tAttr = aRec->nextAttrInTable;
   }
+  ndbrequire(list.sz <= MAX_ATTRIBUTES_IN_INDEX + 1);
 }
 
 // XXX should store the primary attribute id

--- 1.20/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2005-09-16 09:32:21 +02:00
+++ 1.21/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2005-11-07 11:57:34 +01:00
@@ -1268,7 +1268,7 @@
     // original request plus buffer for attribute lists
     BuildIndxReq m_request;
     AttributeList m_attrList;
-    AttributeList m_tableKeyList;
+    Id_array<MAX_ATTRIBUTES_IN_INDEX+1> m_tableKeyList;
     // coordinator DICT
     Uint32 m_coordinatorRef;
     bool m_isMaster;
@@ -2053,7 +2053,8 @@
   void alterTrigger_sendSlaveReq(Signal* signal, OpAlterTriggerPtr opPtr);
   void alterTrigger_sendReply(Signal* signal, OpAlterTriggerPtr opPtr, bool);
   // support
-  void getTableKeyList(TableRecordPtr tablePtr, AttributeList& list);
+  void getTableKeyList(TableRecordPtr, 
+		       Id_array<MAX_ATTRIBUTES_IN_INDEX+1>& list);
   void getIndexAttr(TableRecordPtr indexPtr, Uint32 itAttr, Uint32* id);
   void getIndexAttrList(TableRecordPtr indexPtr, AttributeList& list);
   void getIndexAttrMask(TableRecordPtr indexPtr, AttributeMask& mask);

--- 1.211/sql/ha_ndbcluster.cc	2005-09-28 11:05:28 +02:00
+++ 1.212/sql/ha_ndbcluster.cc	2005-11-07 11:57:33 +01:00
@@ -46,13 +46,18 @@
 static const int max_transactions= 3; // should really be 2 but there is a transaction to much allocated when loch table is used
 
 static const char *ha_ndb_ext=".ndb";
+static const char share_prefix[]= "./";
 
 static int ndbcluster_close_connection(THD *thd);
 static int ndbcluster_commit(THD *thd, bool all);
 static int ndbcluster_rollback(THD *thd, bool all);
 
-static handlerton ndbcluster_hton = {
+handlerton ndbcluster_hton = {
   "ndbcluster",
+  SHOW_OPTION_YES,
+  "Clustered, fault-tolerant, memory-based tables", 
+  DB_TYPE_NDBCLUSTER,
+  ndbcluster_init,
   0, /* slot */
   0, /* savepoint size */
   ndbcluster_close_connection,
@@ -95,12 +100,15 @@
 }
 
 // Typedefs for long names
+typedef NdbDictionary::Object NDBOBJ;
 typedef NdbDictionary::Column NDBCOL;
 typedef NdbDictionary::Table NDBTAB;
 typedef NdbDictionary::Index  NDBINDEX;
 typedef NdbDictionary::Dictionary  NDBDICT;
+typedef NdbDictionary::Event  NDBEVENT;
 
-bool ndbcluster_inited= FALSE;
+static int ndbcluster_inited= 0;
+static int ndbcluster_util_inited= 0;
 
 static Ndb* g_ndb= NULL;
 static Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
@@ -113,9 +121,12 @@
 
 static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
                                 my_bool not_used __attribute__((unused)));
-static void ndb_set_fragmentation(NDBTAB & tab, TABLE *table, uint pk_len);
-static NDB_SHARE *get_share(const char *table_name);
-static void free_share(NDB_SHARE *share);
+static NDB_SHARE *get_share(const char *key,
+                            bool create_if_not_exists= TRUE,
+                            bool have_lock= FALSE);
+static void free_share(NDB_SHARE **share, bool have_lock= FALSE);
+static void real_free_share(NDB_SHARE **share);
+static void ndb_set_fragmentation(NDBTAB &tab, TABLE *table, uint pk_len);
 
 static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len);
 static int unpackfrm(const void **data, uint *len,
@@ -124,11 +135,38 @@
 static int ndb_get_table_statistics(Ndb*, const char *, 
                                     struct Ndb_statistics *);
 
+#ifndef DBUG_OFF
+void print_records(TABLE *table, const char *record)
+{
+  if (_db_on_)
+  {
+    for (uint j= 0; j < table->s->fields; j++)
+    {
+      char buf[40];
+      int pos= 0;
+      Field *field= table->field[j];
+      const byte* field_ptr= field->ptr - table->record[0] + record;
+      int pack_len= field->pack_length();
+      int n= pack_len < 10 ? pack_len : 10;
+      
+      for (int i= 0; i < n && pos < 20; i++)
+      {
+	pos+= sprintf(&buf[pos]," %x", (int) (unsigned char) field_ptr[i]);
+      }
+      buf[pos]= 0;
+      DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
+    }
+  }
+}
+#else
+#define print_records(a,b)
+#endif
+
 // Util thread variables
 static pthread_t ndb_util_thread;
 pthread_mutex_t LOCK_ndb_util_thread;
 pthread_cond_t COND_ndb_util_thread;
-extern "C" pthread_handler_decl(ndb_util_thread_func, arg);
+pthread_handler_t ndb_util_thread_func(void *arg);
 ulong ndb_cache_check_time;
 
 /*
@@ -175,65 +213,70 @@
   {NullS, NullS, SHOW_LONG}
 };
 
+/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */
+extern Uint64 g_latest_trans_gci;
+
 /*
   Error handling functions
 */
 
-struct err_code_mapping
-{
-  int ndb_err;
-  int my_err;
-  int show_warning;
-};
+/* Note for merge: old mapping table, moved to storage/ndb/ndberror.c */
 
-static const err_code_mapping err_map[]= 
+static int ndb_to_mysql_error(const NdbError *ndberr)
 {
-  { 626, HA_ERR_KEY_NOT_FOUND, 0 },
-  { 630, HA_ERR_FOUND_DUPP_KEY, 0 },
-  { 893, HA_ERR_FOUND_DUPP_KEY, 0 },
-  { 721, HA_ERR_TABLE_EXIST, 1 },
-  { 4244, HA_ERR_TABLE_EXIST, 1 },
-
-  { 709, HA_ERR_NO_SUCH_TABLE, 0 },
-
-  { 266, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
-  { 274, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
-  { 296, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
-  { 297, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
-  { 237, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
-
-  { 623, HA_ERR_RECORD_FILE_FULL, 1 },
-  { 624, HA_ERR_RECORD_FILE_FULL, 1 },
-  { 625, HA_ERR_RECORD_FILE_FULL, 1 },
-  { 826, HA_ERR_RECORD_FILE_FULL, 1 },
-  { 827, HA_ERR_RECORD_FILE_FULL, 1 },
-  { 832, HA_ERR_RECORD_FILE_FULL, 1 },
+  /* read the mysql mapped error code */
+  int error= ndberr->mysql_code;
 
-  { 284, HA_ERR_TABLE_DEF_CHANGED, 0 },
-
-  { 0, 1, 0 },
+  switch (error)
+  {
+    /* errors for which we do not add warnings, just return mapped error code
+    */
+  case HA_ERR_NO_SUCH_TABLE:
+  case HA_ERR_KEY_NOT_FOUND:
+  case HA_ERR_FOUND_DUPP_KEY:
+    return error;
 
-  { -1, -1, 1 }
-};
+    /* Mapping missing, go with the ndb error code*/
+  case -1:
+    error= ndberr->code;
+    break;
 
+    /* Mapping exists, go with the mapped code */
+  default:
+    break;
+  }
 
-static int ndb_to_mysql_error(const NdbError *err)
-{
-  uint i;
-  for (i=0; err_map[i].ndb_err != err->code && err_map[i].my_err != -1; i++);
-  if (err_map[i].show_warning)
-  {
-    // Push the NDB error message as warning
+  /*
+    Push the NDB error message as warning
+    - Used to be able to use SHOW WARNINGS toget more info on what the error is
+    - Used by replication to see if the error was temporary
+  */
+  if (ndberr->status == NdbError::TemporaryError)
     push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
-                        err->code, err->message, "NDB");
-  }
-  if (err_map[i].my_err == -1)
-    return err->code;
-  return err_map[i].my_err;
+			ER_GET_TEMPORARY_ERRMSG, ER(ER_GET_TEMPORARY_ERRMSG),
+			ndberr->code, ndberr->message, "NDB");
+  else
+    push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+			ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+			ndberr->code, ndberr->message, "NDB");
+  return error;
 }
 
+int execute_no_commit_ignore_no_key(ha_ndbcluster *h, NdbTransaction *trans)
+{
+  int res= trans->execute(NdbTransaction::NoCommit,
+                          NdbTransaction::AO_IgnoreError,
+                          h->m_force_send);
+  if (res == 0)
+    return 0;
 
+  const NdbError &err= trans->getNdbError();
+  if (err.classification != NdbError::ConstraintViolation &&
+      err.classification != NdbError::NoDataFound)
+    return res;
+
+  return 0;
+}
 
 inline
 int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans)
@@ -243,9 +286,11 @@
   if (m_batch_execute)
     return 0;
 #endif
-  return trans->execute(NdbTransaction::NoCommit,
-                        NdbTransaction::AbortOnError,
-                        h->m_force_send);
+  return h->m_ignore_no_key ?
+    execute_no_commit_ignore_no_key(h,trans) :
+    trans->execute(NdbTransaction::NoCommit,
+		   NdbTransaction::AbortOnError,
+		   h->m_force_send);
 }
 
 inline
@@ -433,29 +478,38 @@
     #   The mapped error code
 */
 
-void ha_ndbcluster::invalidate_dictionary_cache(bool global)
+void
+ha_ndbcluster::invalidate_dictionary_cache(TABLE *table, Ndb *ndb,
+					   const char *tabname, bool global)
 {
-  NDBDICT *dict= get_ndb()->getDictionary();
+  NDBDICT *dict= ndb->getDictionary();
   DBUG_ENTER("invalidate_dictionary_cache");
-  DBUG_PRINT("info", ("invalidating %s", m_tabname));
+  DBUG_PRINT("info", ("invalidating %s", tabname));
 
   if (global)
   {
-    const NDBTAB *tab= dict->getTable(m_tabname);
+    const NDBTAB *tab= dict->getTable(tabname);
     if (!tab)
       DBUG_VOID_RETURN;
     if (tab->getObjectStatus() == NdbDictionary::Object::Invalid)
     {
       // Global cache has already been invalidated
-      dict->removeCachedTable(m_tabname);
+      dict->removeCachedTable(tabname);
       global= FALSE;
     }
     else
-      dict->invalidateTable(m_tabname);
+      dict->invalidateTable(tabname);
   }
   else
-    dict->removeCachedTable(m_tabname);
+    dict->removeCachedTable(tabname);
   table->s->version=0L;			/* Free when thread is ready */
+  DBUG_VOID_RETURN;
+}
+
+void ha_ndbcluster::invalidate_dictionary_cache(bool global)
+{
+  NDBDICT *dict= get_ndb()->getDictionary();
+  invalidate_dictionary_cache(table, get_ndb(), m_tabname, global);
   /* Invalidate indexes */
   for (uint i= 0; i < table->s->keys; i++)
   {
@@ -487,7 +541,6 @@
       break;
     }
   }
-  DBUG_VOID_RETURN;
 }
 
 int ha_ndbcluster::ndb_err(NdbTransaction *trans)
@@ -595,9 +648,9 @@
   case MYSQL_TYPE_ENUM:
   case MYSQL_TYPE_SET:         
   case MYSQL_TYPE_BIT:
+  case MYSQL_TYPE_GEOMETRY:
     return TRUE;
   case MYSQL_TYPE_NULL:   
-  case MYSQL_TYPE_GEOMETRY:
     break;
   }
   return FALSE;
@@ -644,14 +697,15 @@
 */
 
 int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field, 
-                                 uint fieldnr, bool *set_blob_value)
+                                 uint fieldnr, int row_offset,
+                                 bool *set_blob_value)
 {
-  const byte* field_ptr= field->ptr;
-  uint32 pack_len=  field->pack_length();
+  const byte* field_ptr= field->ptr + row_offset;
+  uint32 pack_len= field->pack_length();
   DBUG_ENTER("set_ndb_value");
-  DBUG_PRINT("enter", ("%d: %s, type: %u, len=%d, is_null=%s", 
+  DBUG_PRINT("enter", ("%d: %s  type: %u  len=%d  is_null=%s", 
                        fieldnr, field->field_name, field->type(), 
-                       pack_len, field->is_null()?"Y":"N"));
+                       pack_len, field->is_null(row_offset) ? "Y" : "N"));
   DBUG_DUMP("value", (char*) field_ptr, pack_len);
 
   DBUG_ASSERT(ndb_supported_type(field->type()));
@@ -662,7 +716,7 @@
     {
       pack_len= sizeof(empty_field);
       field_ptr= (byte *)&empty_field;
-      if (field->is_null())
+      if (field->is_null(row_offset))
         empty_field= 0;
       else
         empty_field= 1;
@@ -671,12 +725,15 @@
     {
       if (field->type() != MYSQL_TYPE_BIT)
       {
-        if (field->is_null())
+        if (field->is_null(row_offset))
+        {
+          DBUG_PRINT("info", ("field is NULL"));
           // Set value to NULL
           DBUG_RETURN((ndb_op->setValue(fieldnr, 
                                         (char*)NULL, pack_len) != 0));
+	}
         // Common implementation for most field types
-        DBUG_RETURN(ndb_op->setValue(fieldnr, 
+        DBUG_RETURN(ndb_op->setValue(fieldnr,
                                      (char*)field_ptr, pack_len) != 0);
       }
       else // if (field->type() == MYSQL_TYPE_BIT)
@@ -686,7 +743,7 @@
         // Round up bit field length to nearest word boundry
         pack_len= ((pack_len + 3) >> 2) << 2;
         DBUG_ASSERT(pack_len <= 8);
-        if (field->is_null())
+        if (field->is_null(row_offset))
           // Set value to NULL
           DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0));
         DBUG_PRINT("info", ("bit field"));
@@ -705,7 +762,7 @@
     NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
     if (ndb_blob != NULL)
     {
-      if (field->is_null())
+      if (field->is_null(row_offset))
         DBUG_RETURN(ndb_blob->setNull() != 0);
 
       Field_blob *field_blob= (Field_blob*)field;
@@ -786,8 +843,8 @@
         {
           char *buf= m_blobs_buffer + offset;
           uint32 len= 0xffffffff;  // Max uint32
-          DBUG_PRINT("value", ("read blob ptr=%x len=%u",
-                               (UintPtr)buf, (uint)blob_len));
+          DBUG_PRINT("value", ("read blob ptr=%lx  len=%u",
+                               buf, (uint) blob_len));
           if (ndb_blob->readData(buf, len) != 0)
             DBUG_RETURN(-1);
           DBUG_ASSERT(len == blob_len);
@@ -898,6 +955,19 @@
       of table accessed in NDB
 */
 
+static int cmp_frm(const NDBTAB *ndbtab, const void *pack_data,
+                   uint pack_length)
+{
+  DBUG_ENTER("cmp_frm");
+  /*
+    Compare FrmData in NDB with frm file from disk.
+  */
+  if ((pack_length != ndbtab->getFrmLength()) || 
+      (memcmp(pack_data, ndbtab->getFrmData(), pack_length)))
+    DBUG_RETURN(1);
+  DBUG_RETURN(0);
+}
+
 int ha_ndbcluster::get_metadata(const char *path)
 {
   Ndb *ndb= get_ndb();
@@ -935,8 +1005,7 @@
       DBUG_RETURN(1);
     }
     
-    if ((pack_length != tab->getFrmLength()) || 
-        (memcmp(pack_data, tab->getFrmData(), pack_length)))
+    if (cmp_frm(tab, pack_data, pack_length))
     {
       if (!invalidating_ndb_table)
       {
@@ -947,9 +1016,9 @@
       else
       {
         DBUG_PRINT("error", 
-                   ("metadata, pack_length: %d getFrmLength: %d memcmp: %d", 
+                   ("metadata, pack_length: %d  getFrmLength: %d  memcmp: %d",
                     pack_length, tab->getFrmLength(),
-                    memcmp(pack_data, tab->getFrmData(), pack_length)));      
+                    memcmp(pack_data, tab->getFrmData(), pack_length)));
         DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
         DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
         error= 3;
@@ -1278,7 +1347,8 @@
   DBUG_ENTER("ha_ndbcluster::index_flags");
   DBUG_PRINT("info", ("idx_no: %d", idx_no));
   DBUG_ASSERT(get_index_type_from_table(idx_no) < index_flags_size);
-  DBUG_RETURN(index_type_flags[get_index_type_from_table(idx_no)]);
+  DBUG_RETURN(index_type_flags[get_index_type_from_table(idx_no)] | 
+              HA_KEY_SCAN_NOT_ROR);
 }
 
 static void shrink_varchar(Field* field, const byte* & ptr, char* buf)
@@ -2014,7 +2084,7 @@
 
   DBUG_ENTER("write_row");
 
-  if (m_ignore_dup_key && table->s->primary_key != MAX_KEY)
+  if (!m_use_write && m_ignore_dup_key && table->s->primary_key != MAX_KEY)
   {
     int peek_res= peek_row(record);
     
@@ -2091,7 +2161,8 @@
   {
     Field *field= table->field[i];
     if (!(field->flags & PRI_KEY_FLAG) &&
-        set_ndb_value(op, field, i, &set_blob_value))
+	(ha_get_bit_in_write_set(i + 1) || !m_use_write) &&
+        set_ndb_value(op, field, i, record-table->record[0], &set_blob_value))
     {
       m_skip_auto_increment= TRUE;
       ERR_RETURN(op->getNdbError());
@@ -2333,7 +2404,7 @@
     Field *field= table->field[i];
     if (ha_get_bit_in_write_set(i+1) &&
         (!(field->flags & PRI_KEY_FLAG)) &&
-        set_ndb_value(op, field, i))
+        set_ndb_value(op, field, i, new_data - table->record[0]))
       ERR_RETURN(op->getNdbError());
   }
 
@@ -2448,51 +2519,73 @@
     set to null.
 */
 
-void ha_ndbcluster::unpack_record(byte* buf)
+static void ndb_unpack_record(TABLE *table, NdbValue *value,
+                              MY_BITMAP *defined, byte *buf)
 {
+  Field **p_field= table->field, *field= *p_field;
   uint row_offset= (uint) (buf - table->record[0]);
-  Field **field, **end;
-  NdbValue *value= m_value;
-  DBUG_ENTER("unpack_record");
+  DBUG_ENTER("ndb_unpack_record");
 
-  end= table->field + table->s->fields;
-  
   // Set null flag(s)
   bzero(buf, table->s->null_bytes);
-  for (field= table->field;
-       field < end;
-       field++, value++)
+  for ( ; field;
+       p_field++, value++, field= *p_field)
   {
     if ((*value).ptr)
     {
-      if (! ((*field)->flags & BLOB_FLAG))
+      if (!(field->flags & BLOB_FLAG))
       {
-        if ((*value).rec->isNULL())
-         (*field)->set_null(row_offset);
-        else if ((*field)->type() == MYSQL_TYPE_BIT)
+        int is_null= (*value).rec->isNULL();
+        if (is_null)
+        {
+          if (is_null > 0)
+          {
+            DBUG_PRINT("info",("[%u] NULL",
+                               (*value).rec->getColumn()->getColumnNo()));
+            field->set_null(row_offset);
+          }
+          else
+          {
+            DBUG_PRINT("info",("[%u] UNDEFINED",
+                               (*value).rec->getColumn()->getColumnNo()));
+            bitmap_clear_bit(defined,
+                             (*value).rec->getColumn()->getColumnNo());
+          }
+        }
+        else if (field->type() == MYSQL_TYPE_BIT)
         {
-          uint pack_len= (*field)->pack_length();
-          if (pack_len < 5)
+          byte *save_field_ptr= field->ptr;
+          field->ptr= save_field_ptr + row_offset;
+          if (field->pack_length() < 5)
           {
             DBUG_PRINT("info", ("bit field H'%.8X", 
                                 (*value).rec->u_32_value()));
-            ((Field_bit *) *field)->store((longlong) 
-                                          (*value).rec->u_32_value(),
-                                          FALSE);
+            ((Field_bit*) field)->store((longlong)
+                                        (*value).rec->u_32_value(), FALSE);
           }
           else
           {
             DBUG_PRINT("info", ("bit field H'%.8X%.8X",
-                                *(Uint32 *)(*value).rec->aRef(),
-                                *((Uint32 *)(*value).rec->aRef()+1)));
-            ((Field_bit *) *field)->store((longlong)
-                                          (*value).rec->u_64_value(), TRUE);
+                                *(Uint32*) (*value).rec->aRef(),
+                                *((Uint32*) (*value).rec->aRef()+1)));
+            ((Field_bit*) field)->store((longlong)
+                                         (*value).rec->u_64_value(),TRUE);
           }
+          field->ptr= save_field_ptr;
+          DBUG_PRINT("info",("[%u] SET",
+                             (*value).rec->getColumn()->getColumnNo()));
+          DBUG_DUMP("info", (const char*) field->ptr, field->field_length);
+        }
+        else
+        {
+          DBUG_PRINT("info",("[%u] SET",
+                             (*value).rec->getColumn()->getColumnNo()));
+          DBUG_DUMP("info", (const char*) field->ptr, field->field_length);
         }
       }
       else
       {
-        NdbBlob* ndb_blob= (*value).blob;
+        NdbBlob *ndb_blob= (*value).blob;
         bool isNull= TRUE;
 #ifndef DBUG_OFF
         int ret= 
@@ -2500,11 +2593,16 @@
           ndb_blob->getNull(isNull);
         DBUG_ASSERT(ret == 0);
         if (isNull)
-          (*field)->set_null(row_offset);
+          field->set_null(row_offset);
       }
     }
   }
-  
+  DBUG_VOID_RETURN;
+}
+
+void ha_ndbcluster::unpack_record(byte *buf)
+{
+  ndb_unpack_record(table, m_value, 0, buf);
 #ifndef DBUG_OFF
   // Read and print all values that was fetched
   if (table->s->primary_key == MAX_KEY)
@@ -2520,7 +2618,6 @@
   } 
   //print_results();
 #endif
-  DBUG_VOID_RETURN;
 }
 
 /*
@@ -3241,8 +3338,9 @@
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
   Ndb *ndb= thd_ndb->ndb;
 
-  DBUG_PRINT("enter", ("thd: %x, thd_ndb: %x, thd_ndb->lock_count: %d",
-                       thd, thd_ndb, thd_ndb->lock_count));
+  DBUG_PRINT("enter", ("this: %x  thd: %lx  thd_ndb: %lx  "
+                       "thd_ndb->lock_count: %d",
+                       this, thd, thd_ndb, thd_ndb->lock_count));
 
   if (lock_type != F_UNLCK)
   {
@@ -3250,7 +3348,7 @@
     if (!thd_ndb->lock_count++)
     {
       PRINT_OPTION_FLAGS(thd);
-      if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN | OPTION_TABLE_LOCK))) 
+      if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) 
       {
         // Autocommit transaction
         DBUG_ASSERT(!thd_ndb->stmt);
@@ -3422,31 +3520,24 @@
 }
 
 /*
-  When using LOCK TABLE's external_lock is only called when the actual
-  TABLE LOCK is done.
-  Under LOCK TABLES, each used tables will force a call to start_stmt.
-  Ndb doesn't currently support table locks, and will do ordinary
-  startTransaction for each transaction/statement.
+  Start a transaction for running a statement if one is not
+  already running in a transaction. This will be the case in
+  a BEGIN; COMMIT; block
+  When using LOCK TABLE's external_lock will start a transaction
+  since ndb does not currently does not support table locking
 */
 
-int ha_ndbcluster::start_stmt(THD *thd)
+int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type)
 {
   int error=0;
   DBUG_ENTER("start_stmt");
   PRINT_OPTION_FLAGS(thd);
 
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
-  NdbTransaction *trans= thd_ndb->stmt;
+  NdbTransaction *trans= (thd_ndb->stmt)?thd_ndb->stmt:thd_ndb->all;
   if (!trans){
     Ndb *ndb= thd_ndb->ndb;
     DBUG_PRINT("trans",("Starting transaction stmt"));  
-
-#if 0    
-    NdbTransaction *tablock_trans= thd_ndb->all;
-    DBUG_PRINT("info", ("tablock_trans: %x", (UintPtr)tablock_trans));
-    DBUG_ASSERT(tablock_trans);
-//    trans= ndb->hupp(tablock_trans);
-#endif
     trans= ndb->startTransaction();
     if (trans == NULL)
       ERR_RETURN(ndb->getNdbError());
@@ -3502,7 +3593,8 @@
   while ((share= it++))
   {
     pthread_mutex_lock(&share->mutex);
-    DBUG_PRINT("info", ("Invalidate commit_count for %s, share->commit_count: %d ", share->table_name, share->commit_count));
+    DBUG_PRINT("info", ("Invalidate commit_count for %s, share->commit_count: %d ",
+			share->key, share->commit_count));
     share->commit_count= 0;
     share->commit_count_lock++;
     pthread_mutex_unlock(&share->mutex);
@@ -3743,6 +3835,7 @@
     col.setStripeSize(0);
     break;
   //mysql_type_blob:
+  case MYSQL_TYPE_GEOMETRY:
   case MYSQL_TYPE_BLOB:    
     if ((field->flags & BINARY_FLAG) && cs == &my_charset_bin)
       col.setType(NDBCOL::Blob);
@@ -3808,7 +3901,6 @@
     break;
   }
   case MYSQL_TYPE_NULL:        
-  case MYSQL_TYPE_GEOMETRY:
     goto mysql_type_unsupported;
   mysql_type_unsupported:
   default:
@@ -3831,6 +3923,10 @@
   return 0;
 }
 
+/*
+  Create a table in NDB Cluster
+*/
+
 int ha_ndbcluster::create(const char *name, 
                           TABLE *form, 
                           HA_CREATE_INFO *info)
@@ -3840,7 +3936,8 @@
   uint pack_length, length, i, pk_length= 0;
   const void *data, *pack_data;
   char name2[FN_HEADLEN];
-  bool create_from_engine= (info->table_options & HA_CREATE_FROM_ENGINE);
+  bool create_from_engine= test(info->table_options &
+                                HA_OPTION_CREATE_FROM_ENGINE);
    
   DBUG_ENTER("ha_ndbcluster::create");
   DBUG_PRINT("enter", ("name: %s", name));
@@ -3855,7 +3952,8 @@
       caller.
       Do Ndb specific stuff, such as create a .ndb file
     */
-    my_errno= write_ndb_file();
+    if ((my_errno= write_ndb_file()))
+      DBUG_RETURN(my_errno);
     DBUG_RETURN(my_errno);
   }
 
@@ -3869,7 +3967,7 @@
   if (packfrm(data, length, &pack_data, &pack_length))
     DBUG_RETURN(2);
   
-  DBUG_PRINT("info", ("setFrm data=%x, len=%d", pack_data, pack_length));
+  DBUG_PRINT("info", ("setFrm data=%lx  len=%d", pack_data, pack_length));
   tab.setFrm(pack_data, pack_length);      
   my_free((char*)data, MYF(0));
   my_free((char*)pack_data, MYF(0));
@@ -3910,6 +4008,7 @@
      * 5 - from extra words added by tup/dict??
      */
     switch (form->field[i]->real_type()) {
+    case MYSQL_TYPE_GEOMETRY:
     case MYSQL_TYPE_BLOB:    
     case MYSQL_TYPE_MEDIUM_BLOB:   
     case MYSQL_TYPE_LONG_BLOB: 
@@ -4183,14 +4282,22 @@
     if (!(orig_tab= dict->getTable(m_tabname)))
       ERR_RETURN(dict->getNdbError());
   }
+
   m_table= (void *)orig_tab;
   // Change current database to that of target table
   set_dbname(to);
   ndb->setDatabaseName(m_dbname);
-  if (!(result= alter_table_name(new_tabname)))
+
+  if ((result= alter_table_name(new_tabname)))
   {
-    // Rename .ndb file
-    result= handler::rename_table(from, to);
+    DBUG_RETURN(result);
+  }
+  
+  // Rename .ndb file
+  if ((result= handler::rename_table(from, to)))
+  {
+    // ToDo in 4.1 should rollback alter table...
+    DBUG_RETURN(result);
   }
 
   DBUG_RETURN(result);
@@ -4206,7 +4313,7 @@
   Ndb *ndb= get_ndb();
   NDBDICT *dict= ndb->getDictionary();
   const NDBTAB *orig_tab= (const NDBTAB *) m_table;
-  DBUG_ENTER("alter_table_name_table");
+  DBUG_ENTER("alter_table_name");
 
   NdbDictionary::Table new_tab= *orig_tab;
   new_tab.setName(to);
@@ -4225,6 +4332,38 @@
 
  */
 
+/* static version which does not need a handler */
+
+int
+ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
+                            const char *path,
+                            const char *db,
+                            const char *table_name)
+{
+  DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
+  NDBDICT *dict= ndb->getDictionary();
+
+  /* Drop the table from NDB */
+  
+  int res;
+  if (h)
+  {
+    res= h->drop_table();
+  }
+  else
+  {
+    ndb->setDatabaseName(db);
+    res= dict->dropTable(table_name);
+  }
+
+  if (res)
+  {
+    DBUG_RETURN(res);
+  }
+
+  DBUG_RETURN(0);
+}
+
 int ha_ndbcluster::delete_table(const char *name)
 {
   DBUG_ENTER("ha_ndbcluster::delete_table");
@@ -4237,9 +4376,8 @@
 
   /* Call ancestor function to delete .ndb file */
   handler::delete_table(name);
-  
-  /* Drop the table from NDB */
-  DBUG_RETURN(drop_table());
+
+  DBUG_RETURN(delete_table(this, get_ndb(),name, m_dbname, m_tabname));
 }
 
 
@@ -4291,7 +4429,12 @@
            --retries &&
            ndb->getNdbError().status == NdbError::TemporaryError);
   if (auto_value == NDB_FAILED_AUTO_INCREMENT)
-    ERR_RETURN(ndb->getNdbError());
+  {
+    const NdbError err= ndb->getNdbError();
+    sql_print_error("Error %lu in ::get_auto_increment(): %s",
+                    (ulong) err.code, err.message);
+    DBUG_RETURN(~(ulonglong) 0);
+  }
   DBUG_RETURN((longlong)auto_value);
 }
 
@@ -4300,6 +4443,15 @@
   Constructor for the NDB Cluster table handler 
  */
 
+#define HA_NDBCLUSTER_TABLE_FLAGS \
+                HA_REC_NOT_IN_SEQ | \
+                HA_NULL_IN_KEY | \
+                HA_AUTO_PART_KEY | \
+                HA_NO_PREFIX_CHAR_KEYS | \
+                HA_NEED_READ_RANGE_BUFFER | \
+                HA_CAN_GEOMETRY | \
+                HA_CAN_BIT_FIELD
+
 ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
   handler(&ndbcluster_hton, table_arg),
   m_active_trans(NULL),
@@ -4307,12 +4459,7 @@
   m_table(NULL),
   m_table_version(-1),
   m_table_info(NULL),
-  m_table_flags(HA_REC_NOT_IN_SEQ |
-                HA_NULL_IN_KEY |
-                HA_AUTO_PART_KEY |
-                HA_NO_PREFIX_CHAR_KEYS |
-                HA_NEED_READ_RANGE_BUFFER |
-                HA_CAN_BIT_FIELD),
+  m_table_flags(HA_NDBCLUSTER_TABLE_FLAGS),
   m_share(0),
   m_part_info(NULL),
   m_use_partition_function(FALSE),
@@ -4320,6 +4467,7 @@
   m_use_write(FALSE),
   m_ignore_dup_key(FALSE),
   m_primary_key_update(FALSE),
+  m_ignore_no_key(FALSE),
   m_rows_to_insert((ha_rows) 1),
   m_rows_inserted((ha_rows) 0),
   m_bulk_insert_rows((ha_rows) 1024),
@@ -4374,7 +4522,9 @@
   DBUG_ENTER("~ha_ndbcluster");
 
   if (m_share)
-    free_share(m_share);
+  {
+    free_share(&m_share);
+  }
   release_metadata();
   my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
   m_blobs_buffer= 0;
@@ -4407,8 +4557,8 @@
   int res;
   KEY *key;
   DBUG_ENTER("open");
-  DBUG_PRINT("enter", ("name: %s mode: %d test_if_locked: %d",
-                       name, mode, test_if_locked));
+  DBUG_PRINT("enter", ("this: %d name: %s  mode: %d test_if_locked: %d",
+                       this, name, mode, test_if_locked));
   
   // Setup ref_length to make room for the whole 
   // primary key to be written in the ref variable
@@ -4428,7 +4578,8 @@
   set_tabname(name);
   
   if (check_ndb_connection()) {
-    free_share(m_share); m_share= 0;
+    free_share(&m_share);
+    m_share= 0;
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
   }
   
@@ -4457,7 +4608,8 @@
 int ha_ndbcluster::close(void)
 {
   DBUG_ENTER("close");  
-  free_share(m_share); m_share= 0;
+  free_share(&m_share);
+  m_share= 0;
   release_metadata();
   DBUG_RETURN(0);
 }
@@ -4660,21 +4812,24 @@
     ERR_RETURN(dict->getNdbError());
   for (i= 0 ; i < list.count ; i++)
   {
-    NdbDictionary::Dictionary::List::Element& t= list.elements[i];
-    DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));     
+    NdbDictionary::Dictionary::List::Element& elmt= list.elements[i];
+    DBUG_PRINT("info", ("Found %s/%s in NDB", elmt.database, elmt.name));     
     
     // Add only tables that belongs to db
-    if (my_strcasecmp(system_charset_info, t.database, dbname))
+    if (my_strcasecmp(system_charset_info, elmt.database, dbname))
       continue;
-    DBUG_PRINT("info", ("%s must be dropped", t.name));     
-    drop_list.push_back(thd->strdup(t.name));
+    DBUG_PRINT("info", ("%s must be dropped", elmt.name));     
+    drop_list.push_back(thd->strdup(elmt.name));
   }
   // Drop any tables belonging to database
+  char full_path[FN_REFLEN];
+  char *tmp= strxnmov(full_path, FN_REFLEN, share_prefix, dbname, "/", NullS);
   ndb->setDatabaseName(dbname);
   List_iterator_fast<char> it(drop_list);
   while ((tabname=it++))
   {
-    if (dict->dropTable(tabname))
+    strxnmov(tmp, FN_REFLEN - (tmp - full_path), tabname, NullS);
+    if (ha_ndbcluster::delete_table(0, ndb, full_path, dbname, tabname))
     {
       const NdbError err= dict->getNdbError();
       if (err.code != 709)
@@ -4687,6 +4842,92 @@
   DBUG_RETURN(ret);      
 }
 
+/*
+  find all tables in ndb and discover those needed
+*/
+static int ndbcluster_find_all_files(THD *thd)
+{
+  DBUG_ENTER("ndbcluster_find_all_files");
+  Ndb* ndb;
+  char key[FN_REFLEN];
+  NdbDictionary::Dictionary::List list;
+
+  if (!(ndb= check_ndb_in_thd(thd)))
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+  NDBDICT *dict= ndb->getDictionary();
+
+  int unhandled, retries= 5;
+  do
+  {
+    if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
+      ERR_RETURN(dict->getNdbError());
+    unhandled= 0;
+    for (uint i= 0 ; i < list.count ; i++)
+    {
+      NDBDICT::List::Element& elmt= list.elements[i];
+      DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name));
+      if (!(elmt.state == NDBOBJ::StateBuilding ||
+            elmt.state == NDBOBJ::StateOnline))
+      {
+        sql_print_information("NDB: skipping setup table %s.%s, in state %d",
+                              elmt.database, elmt.name, elmt.state);
+        continue;
+      }
+
+      ndb->setDatabaseName(elmt.database);
+      const NDBTAB *ndbtab;
+
+      if (!(ndbtab= dict->getTable(elmt.name)))
+      {
+        sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
+                        elmt.database, elmt.name,
+                        dict->getNdbError().code,
+                        dict->getNdbError().message);
+        unhandled++;
+        continue;
+      }
+
+      if (ndbtab->getFrmLength() == 0)
+        continue;
+    
+      strxnmov(key, FN_LEN, mysql_data_home, "/",
+               elmt.database, "/", elmt.name, NullS);
+      const void *data= 0, *pack_data= 0;
+      uint length, pack_length;
+      int discover= 0;
+      if (readfrm(key, &data, &length) ||
+          packfrm(data, length, &pack_data, &pack_length))
+      {
+        discover= 1;
+        sql_print_information("NDB: missing frm for %s.%s, discovering...",
+                              elmt.database, elmt.name);
+      }
+      else if (cmp_frm(ndbtab, pack_data, pack_length))
+      {
+        discover= 1;
+        sql_print_information("NDB: mismatch in frm for %s.%s, discovering...",
+                              elmt.database, elmt.name);
+      }
+      my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR));
+      my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR));
+
+      if (discover)
+      {
+        /* ToDo 4.1 database needs to be created if missing */
+        pthread_mutex_lock(&LOCK_open);
+        if (ha_create_table_from_engine(thd, elmt.database, elmt.name))
+        {
+          /* ToDo 4.1 handle error */
+        }
+        pthread_mutex_unlock(&LOCK_open);
+      }
+    }
+  }
+  while (unhandled && retries--);
+
+  DBUG_RETURN(0);
+}
 
 int ndbcluster_find_files(THD *thd,const char *db,const char *path,
                           const char *wild, bool dir, List<char> *files)
@@ -4698,7 +4939,7 @@
   Ndb* ndb;
   char name[FN_REFLEN];
   HASH ndb_tables, ok_tables;
-  NdbDictionary::Dictionary::List list;
+  NDBDICT::List list;
 
   if (!(ndb= check_ndb_in_thd(thd)))
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
@@ -4729,11 +4970,11 @@
 
   for (i= 0 ; i < list.count ; i++)
   {
-    NdbDictionary::Dictionary::List::Element& t= list.elements[i];
-    DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));     
+    NDBDICT::List::Element& elmt= list.elements[i];
+    DBUG_PRINT("info", ("Found %s/%s in NDB", elmt.database, elmt.name));
 
     // Add only tables that belongs to db
-    if (my_strcasecmp(system_charset_info, t.database, db))
+    if (my_strcasecmp(system_charset_info, elmt.database, db))
       continue;
 
     // Apply wildcard to list of tables in NDB
@@ -4741,14 +4982,14 @@
     {
       if (lower_case_table_names)
       {
-        if (wild_case_compare(files_charset_info, t.name, wild))
+        if (wild_case_compare(files_charset_info, elmt.name, wild))
           continue;
       }
-      else if (wild_compare(t.name,wild,0))
+      else if (wild_compare(elmt.name,wild,0))
         continue;
     }
-    DBUG_PRINT("info", ("Inserting %s into ndb_tables hash", t.name));     
-    my_hash_insert(&ndb_tables, (byte*)thd->strdup(t.name));
+    DBUG_PRINT("info", ("Inserting %s into ndb_tables hash", elmt.name));     
+    my_hash_insert(&ndb_tables, (byte*)thd->strdup(elmt.name));
   }
 
   char *file_name;
@@ -4796,10 +5037,15 @@
     file_name= hash_element(&ndb_tables, i);
     if (!hash_search(&ok_tables, file_name, strlen(file_name)))
     {
-      DBUG_PRINT("info", ("%s must be discovered", file_name));       
-      // File is in list of ndb tables and not in ok_tables
-      // This table need to be created
-      create_list.push_back(thd->strdup(file_name));
+      strxnmov(name, sizeof(name),
+               mysql_data_home, "/", db, "/", file_name, reg_ext, NullS);
+      if (access(name, F_OK))
+      {
+        DBUG_PRINT("info", ("%s must be discovered", file_name));
+        // File is in list of ndb tables and not in ok_tables
+        // This table need to be created
+        create_list.push_back(thd->strdup(file_name));
+      }
     }
   }
 
@@ -4855,14 +5101,18 @@
 static int connect_callback()
 {
   update_status_variables(g_ndb_cluster_connection);
+  pthread_cond_signal(&COND_ndb_util_thread);
   return 0;
 }
 
-handlerton *
-ndbcluster_init()
+bool ndbcluster_init()
 {
   int res;
   DBUG_ENTER("ndbcluster_init");
+
+  if (have_ndbcluster != SHOW_OPTION_YES)
+    goto ndbcluster_init_error;
+
   // Set connectstring if specified
   if (opt_ndbcluster_connectstring != 0)
     DBUG_PRINT("connectstring", ("%s", opt_ndbcluster_connectstring));     
@@ -4926,6 +5176,7 @@
   (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
                    (hash_get_key) ndbcluster_get_key,0,0);
   pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
+
   pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST);
   pthread_cond_init(&COND_ndb_util_thread, NULL);
 
@@ -4943,16 +5194,17 @@
   }
   
   ndbcluster_inited= 1;
-  DBUG_RETURN(&ndbcluster_hton);
+  DBUG_RETURN(FALSE);
 
- ndbcluster_init_error:
+ndbcluster_init_error:
   if (g_ndb)
     delete g_ndb;
   g_ndb= NULL;
   if (g_ndb_cluster_connection)
     delete g_ndb_cluster_connection;
   g_ndb_cluster_connection= NULL;
-  DBUG_RETURN(NULL);
+  have_ndbcluster= SHOW_OPTION_DISABLED;	// If we couldn't use handler
+  DBUG_RETURN(TRUE);
 }
 
 
@@ -4987,6 +5239,7 @@
   pthread_mutex_destroy(&LOCK_ndb_util_thread);
   pthread_cond_destroy(&COND_ndb_util_thread);
   ndbcluster_inited= 0;
+  ndbcluster_util_inited= 0;
   DBUG_RETURN(0);
 }
 
@@ -5259,7 +5512,7 @@
 
   char name[FN_REFLEN];
   NDB_SHARE *share;
-  (void)strxnmov(name, FN_REFLEN, "./",dbname,"/",tabname,NullS);
+  (void)strxnmov(name, FN_REFLEN, share_prefix, dbname, "/", tabname, NullS);
   DBUG_PRINT("enter", ("name: %s", name));
   pthread_mutex_lock(&ndbcluster_mutex);
   if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
@@ -5267,8 +5520,7 @@
                                        strlen(name))))
   {
     pthread_mutex_unlock(&ndbcluster_mutex);
-    DBUG_PRINT("info", ("Table %s not found in ndbcluster_open_tables",
-                        name));
+    DBUG_PRINT("info", ("Table %s not found in ndbcluster_open_tables", name));
     DBUG_RETURN(1);
   }
   share->use_count++;
@@ -5283,7 +5535,7 @@
       DBUG_PRINT("info", ("Getting commit_count: %llu from share",
                           share->commit_count));
       pthread_mutex_unlock(&share->mutex);
-      free_share(share);
+      free_share(&share);
       DBUG_RETURN(0);
     }
   }
@@ -5298,7 +5550,7 @@
   struct Ndb_statistics stat;
   if (ndb_get_table_statistics(ndb, tabname, &stat))
   {
-    free_share(share);
+    free_share(&share);
     DBUG_RETURN(1);
   }
 
@@ -5315,7 +5567,7 @@
     *commit_count= 0;
   }
   pthread_mutex_unlock(&share->mutex);
-  free_share(share);
+  free_share(&share);
   DBUG_RETURN(0);
 }
 
@@ -5452,6 +5704,60 @@
 }
 
 
+#ifndef DBUG_OFF
+static void dbug_print_table(const char *info, TABLE *table)
+{
+  if (table == 0)
+  {
+    DBUG_PRINT("info",("%s: (null)", info));
+    return;
+  }
+  DBUG_PRINT("info",
+             ("%s: %s.%s s->fields: %d  "
+              "reclength: %d  rec_buff_length: %d  record[0]: %lx  "
+              "record[1]: %lx",
+              info,
+              table->s->db,
+              table->s->table_name,
+              table->s->fields,
+              table->s->reclength,
+              table->s->rec_buff_length,
+              table->record[0],
+              table->record[1]));
+
+  for (unsigned int i= 0; i < table->s->fields; i++) 
+  {
+    Field *f= table->field[i];
+    DBUG_PRINT("info",
+               ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d  pack_length: %d  "
+                "ptr: 0x%lx[+%d]  null_bit: %u  null_ptr: 0x%lx[+%d]",
+                i,
+                f->field_name,
+                f->flags,
+                (f->flags & PRI_KEY_FLAG)  ? "pri"       : "attr",
+                (f->flags & NOT_NULL_FLAG) ? ""          : ",nullable",
+                (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
+                (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
+                (f->flags & BLOB_FLAG)     ? ",blob"     : "",
+                (f->flags & BINARY_FLAG)   ? ",binary"   : "",
+                f->real_type(),
+                f->pack_length(),
+                f->ptr, f->ptr - table->record[0],
+                f->null_bit,
+                f->null_ptr, (byte*) f->null_ptr - table->record[0]));
+    if (f->type() == MYSQL_TYPE_BIT)
+    {
+      Field_bit *g= (Field_bit*) f;
+      DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d  bit_ptr: 0x%lx[+%d] "
+                                   "bit_ofs: %u  bit_len: %u",
+                                   g->field_length, g->bit_ptr,
+                                   (byte*) g->bit_ptr-table->record[0],
+                                   g->bit_ofs, g->bit_len));
+    }
+  }
+}
+#endif
+
 /*
   Handling the shared NDB_SHARE structure that is needed to
   provide table locking.
@@ -5460,68 +5766,195 @@
   data we want to or can share.
  */
 
-static byte* ndbcluster_get_key(NDB_SHARE *share,uint *length,
+static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
                                 my_bool not_used __attribute__((unused)))
 {
-  *length=share->table_name_length;
-  return (byte*) share->table_name;
+  *length= share->key_length;
+  return (byte*) share->key;
 }
 
-static NDB_SHARE* get_share(const char *table_name)
+#ifndef DBUG_OFF
+static void dbug_print_open_tables()
+{
+  DBUG_ENTER("dbug_print_open_tables");
+  for (uint i= 0; i < ndbcluster_open_tables.records; i++)
+  {
+    NDB_SHARE *share= (NDB_SHARE*) hash_element(&ndbcluster_open_tables, i);
+    DBUG_PRINT("share",
+               ("[%d] 0x%lx key: %s  key_length: %d",
+                i, share, share->key, share->key_length));
+    DBUG_PRINT("share",
+               ("db.tablename: %s.%s  use_count: %d  commit_count: %d",
+                share->db, share->table_name,
+                share->use_count, share->commit_count));
+  }
+  DBUG_VOID_RETURN;
+}
+#else
+#define dbug_print_open_tables()
+#endif
+
+/*
+  Increase refcount on existing share.
+  Always returns share and cannot fail.
+*/
+static NDB_SHARE *get_share(NDB_SHARE *share)
 {
-  NDB_SHARE *share;
   pthread_mutex_lock(&ndbcluster_mutex);
-  uint length=(uint) strlen(table_name);
-  if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
-                                       (byte*) table_name,
-                                       length)))
+  share->use_count++;
+
+  dbug_print_open_tables();
+
+  DBUG_PRINT("get_share",
+             ("0x%lx key: %s  key_length: %d",
+              share, share->key, share->key_length));
+  DBUG_PRINT("get_share",
+             ("db.tablename: %s.%s  use_count: %d  commit_count: %d",
+              share->db, share->table_name,
+              share->use_count, share->commit_count));
+  pthread_mutex_unlock(&ndbcluster_mutex);
+  return share;
+}
+
+/*
+  Get a share object for key
+
+  Returns share for key, and increases the refcount on the share.
+
+  create_if_not_exists == TRUE:
+    creates share if it does not alreade exist
+    returns 0 only due to out of memory, and then sets my_error
+
+  create_if_not_exists == FALSE:
+    returns 0 if share does not exist
+
+  have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken
+*/
+static NDB_SHARE *get_share(const char *key, bool create_if_not_exists,
+                            bool have_lock)
+{
+  NDB_SHARE *share;
+  if (!have_lock)
+    pthread_mutex_lock(&ndbcluster_mutex);
+  uint length= (uint) strlen(key);
+  if (!(share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
+                                        (byte*) key,
+                                        length)))
   {
-    if ((share=(NDB_SHARE *) my_malloc(sizeof(*share)+length+1,
+    if (!create_if_not_exists)
+    {
+      DBUG_PRINT("error", ("get_share: %s does not exist", key));
+      if (!have_lock)
+        pthread_mutex_unlock(&ndbcluster_mutex);
+      return 0;
+    }
+    if ((share= (NDB_SHARE*) my_malloc(sizeof(*share),
                                        MYF(MY_WME | MY_ZEROFILL))))
     {
-      share->table_name_length=length;
-      share->table_name=(char*) (share+1);
-      strmov(share->table_name,table_name);
+      MEM_ROOT **root_ptr=
+        my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
+      MEM_ROOT *old_root= *root_ptr;
+      init_sql_alloc(&share->mem_root, 1024, 0);
+      *root_ptr= &share->mem_root; // remember to reset before return
+
+      /* enough space for key, db, and table_name */
+      share->key= alloc_root(*root_ptr, 2 * (length + 1));
+      share->key_length= length;
+      strmov(share->key, key);
       if (my_hash_insert(&ndbcluster_open_tables, (byte*) share))
       {
-        pthread_mutex_unlock(&ndbcluster_mutex);
-        my_free((gptr) share,0);
+        free_root(&share->mem_root, MYF(0));
+        my_free((gptr) share, 0);
+        *root_ptr= old_root;
+        if (!have_lock)
+          pthread_mutex_unlock(&ndbcluster_mutex);
         return 0;
       }
       thr_lock_init(&share->lock);
-      pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
+      pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
       share->commit_count= 0;
       share->commit_count_lock= 0;
+      share->db= share->key + length + 1;
+      ha_ndbcluster::set_dbname(key, share->db);
+      share->table_name= share->db + strlen(share->db) + 1;
+      ha_ndbcluster::set_tabname(key, share->table_name);
+      *root_ptr= old_root;
     }
     else
     {
-      DBUG_PRINT("error", ("Failed to alloc share"));
-      pthread_mutex_unlock(&ndbcluster_mutex);
+      DBUG_PRINT("error", ("get_share: failed to alloc share"));
+      if (!have_lock)
+        pthread_mutex_unlock(&ndbcluster_mutex);
+      my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*share));
       return 0;
     }
   }
   share->use_count++;
 
-  DBUG_PRINT("share",
-	     ("table_name: %s, length: %d, use_count: %d, commit_count: %d",
-	      share->table_name, share->table_name_length, share->use_count,
-	      share->commit_count));
-  pthread_mutex_unlock(&ndbcluster_mutex);
+  dbug_print_open_tables();
+
+  DBUG_PRINT("get_share",
+             ("0x%lx key: %s  key_length: %d  key: %s",
+              share, share->key, share->key_length, key));
+  DBUG_PRINT("get_share",
+             ("db.tablename: %s.%s  use_count: %d  commit_count: %d",
+              share->db, share->table_name,
+              share->use_count, share->commit_count));
+  if (!have_lock)
+    pthread_mutex_unlock(&ndbcluster_mutex);
   return share;
 }
 
+static void real_free_share(NDB_SHARE **share)
+{
+  DBUG_PRINT("real_free_share",
+             ("0x%lx key: %s  key_length: %d",
+              (*share), (*share)->key, (*share)->key_length));
+  DBUG_PRINT("real_free_share",
+             ("db.tablename: %s.%s  use_count: %d  commit_count: %d",
+              (*share)->db, (*share)->table_name,
+              (*share)->use_count, (*share)->commit_count));
+
+  hash_delete(&ndbcluster_open_tables, (byte*) *share);
+  thr_lock_delete(&(*share)->lock);
+  pthread_mutex_destroy(&(*share)->mutex);
+  free_root(&(*share)->mem_root, MYF(0));
+
+  my_free((gptr) *share, MYF(0));
+  *share= 0;
+
+  dbug_print_open_tables();
+}
+
+/*
+  decrease refcount of share
+  calls real_free_share when refcount reaches 0
 
-static void free_share(NDB_SHARE *share)
+  have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken
+*/
+static void free_share(NDB_SHARE **share, bool have_lock)
 {
-  pthread_mutex_lock(&ndbcluster_mutex);
-  if (!--share->use_count)
+  if (!have_lock)
+    pthread_mutex_lock(&ndbcluster_mutex);
+  if ((*share)->util_lock == current_thd)
+    (*share)->util_lock= 0;
+  if (!--(*share)->use_count)
   {
-     hash_delete(&ndbcluster_open_tables, (byte*) share);
-    thr_lock_delete(&share->lock);
-    pthread_mutex_destroy(&share->mutex);
-    my_free((gptr) share, MYF(0));
+    real_free_share(share);
   }
-  pthread_mutex_unlock(&ndbcluster_mutex);
+  else
+  {
+    dbug_print_open_tables();
+    DBUG_PRINT("free_share",
+               ("0x%lx key: %s  key_length: %d",
+                *share, (*share)->key, (*share)->key_length));
+    DBUG_PRINT("free_share",
+               ("db.tablename: %s.%s  use_count: %d  commit_count: %d",
+                (*share)->db, (*share)->table_name,
+                (*share)->use_count, (*share)->commit_count));
+  }
+  if (!have_lock)
+    pthread_mutex_unlock(&ndbcluster_mutex);
 }
 
 
@@ -5552,14 +5985,14 @@
   uint blob_len;
   frm_blob_struct* blob;
   DBUG_ENTER("packfrm");
-  DBUG_PRINT("enter", ("data: %x, len: %d", data, len));
+  DBUG_PRINT("enter", ("data: 0x%lx  len: %d", data, len));
   
   error= 1;
   org_len= len;
   if (my_compress((byte*)data, &org_len, &comp_len))
     goto err;
   
-  DBUG_PRINT("info", ("org_len: %d, comp_len: %d", org_len, comp_len));
+  DBUG_PRINT("info", ("org_len: %d  comp_len: %d", org_len, comp_len));
   DBUG_DUMP("compressed", (char*)data, org_len);
   
   error= 2;
@@ -5579,7 +6012,8 @@
   *pack_len= blob_len;
   error= 0;
   
-  DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len));
+  DBUG_PRINT("exit",
+             ("pack_data: 0x%lx  pack_len: %d", *pack_data, *pack_len));
 err:
   DBUG_RETURN(error);
   
@@ -5593,7 +6027,7 @@
    byte *data;
    ulong complen, orglen, ver;
    DBUG_ENTER("unpackfrm");
-   DBUG_PRINT("enter", ("pack_data: %x", pack_data));
+   DBUG_PRINT("enter", ("pack_data: 0x%lx", pack_data));
 
    complen=     uint4korr((char*)&blob->head.complen);
    orglen=      uint4korr((char*)&blob->head.orglen);
@@ -5618,7 +6052,7 @@
    *unpack_data= data;
    *unpack_len= complen;
 
-   DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len));
+   DBUG_PRINT("exit", ("frmdata: 0x%lx, len: %d", *unpack_data, *unpack_len));
 
    DBUG_RETURN(0);
 }
@@ -5631,11 +6065,10 @@
   DBUG_ENTER("ndb_get_table_statistics");
   DBUG_PRINT("enter", ("table: %s", table));
   NdbTransaction* pTrans= ndb->startTransaction();
+  if (pTrans == NULL)
+    ERR_RETURN(ndb->getNdbError());
   do 
   {
-    if (pTrans == NULL)
-      break;
-      
     NdbScanOperation* pOp= pTrans->getNdbScanOperation(table);
     if (pOp == NULL)
       break;
@@ -6152,13 +6585,12 @@
 
 
 // Utility thread main loop
-extern "C" pthread_handler_decl(ndb_util_thread_func,
-                                arg __attribute__((unused)))
+pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
 {
   THD *thd; /* needs to be first for thread_stack */
   Ndb* ndb;
-  int error= 0;
   struct timespec abstime;
+  List<NDB_SHARE> util_open_tables;
 
   my_thread_init();
   DBUG_ENTER("ndb_util_thread");
@@ -6179,21 +6611,62 @@
     delete ndb;
     DBUG_RETURN(NULL);
   }
+  thd->init_for_queries();
+  thd->version=refresh_version;
+  thd->set_time();
+  thd->main_security_ctx.host_or_ip= "";
+  thd->client_capabilities = 0;
+  my_net_init(&thd->net, 0);
+  thd->main_security_ctx.master_access= ~0;
+  thd->main_security_ctx.priv_user = 0;
+
+  /*
+    wait for mysql server to start
+  */
+  pthread_mutex_lock(&LOCK_server_started);
+  while (!mysqld_server_started)
+    pthread_cond_wait(&COND_server_started, &LOCK_server_started);
+  pthread_mutex_unlock(&LOCK_server_started);
+
+  /*
+    Wait for cluster to start
+  */
+  pthread_mutex_lock(&LOCK_ndb_util_thread);
+  while (!ndb_cluster_node_id)
+  {
+    /* ndb not connected yet */
+    set_timespec(abstime, 1);
+    pthread_cond_timedwait(&COND_ndb_util_thread,
+                           &LOCK_ndb_util_thread,
+                           &abstime);
+    if (abort_loop)
+    {
+      pthread_mutex_unlock(&LOCK_ndb_util_thread);
+      goto ndb_util_thread_end;
+    }
+  }
+  pthread_mutex_unlock(&LOCK_ndb_util_thread);
+
+  /*
+    Get all table definitions from the storage node
+  */
+  ndbcluster_find_all_files(thd);
+
+  ndbcluster_util_inited= 1;
 
-  List<NDB_SHARE> util_open_tables;
   set_timespec(abstime, 0);
-  for (;;)
+  for (;!abort_loop;)
   {
 
     pthread_mutex_lock(&LOCK_ndb_util_thread);
-    error= pthread_cond_timedwait(&COND_ndb_util_thread,
-                                  &LOCK_ndb_util_thread,
-                                  &abstime);
+    pthread_cond_timedwait(&COND_ndb_util_thread,
+                           &LOCK_ndb_util_thread,
+                           &abstime);
     pthread_mutex_unlock(&LOCK_ndb_util_thread);
-
+#ifdef NDB_EXTRA_DEBUG_UTIL_THREAD
     DBUG_PRINT("ndb_util_thread", ("Started, ndb_cache_check_time: %d",
                                    ndb_cache_check_time));
-
+#endif
     if (abort_loop)
       break; /* Shutting down server */
 
@@ -6224,20 +6697,12 @@
     List_iterator_fast<NDB_SHARE> it(util_open_tables);
     while ((share= it++))
     {
-      /* Split tab- and dbname */
-      char buf[FN_REFLEN];
-      char *tabname, *db;
-      uint length= dirname_length(share->table_name);
-      tabname= share->table_name+length;
-      memcpy(buf, share->table_name, length-1);
-      buf[length-1]= 0;
-      db= buf+dirname_length(buf);
       DBUG_PRINT("ndb_util_thread",
                  ("Fetching commit count for: %s",
-                  share->table_name));
+                  share->key));
 
       /* Contact NDB to get commit count for table */
-      ndb->setDatabaseName(db);
+      ndb->setDatabaseName(share->db);
       struct Ndb_statistics stat;
 
       uint lock;
@@ -6245,17 +6710,17 @@
       lock= share->commit_count_lock;
       pthread_mutex_unlock(&share->mutex);
 
-      if (ndb_get_table_statistics(ndb, tabname, &stat) == 0)
+      if (ndb_get_table_statistics(ndb, share->table_name, &stat) == 0)
       {
         DBUG_PRINT("ndb_util_thread",
                    ("Table: %s, commit_count: %llu, rows: %llu",
-                    share->table_name, stat.commit_count, stat.row_count));
+                    share->key, stat.commit_count, stat.row_count));
       }
       else
       {
         DBUG_PRINT("ndb_util_thread",
                    ("Error: Could not get commit count for table %s",
-                    share->table_name));
+                    share->key));
         stat.commit_count= 0;
       }
 
@@ -6265,7 +6730,7 @@
       pthread_mutex_unlock(&share->mutex);
 
       /* Decrease the use count and possibly free share */
-      free_share(share);
+      free_share(&share);
     }
 
     /* Clear the list of open tables */
@@ -6292,7 +6757,8 @@
       abstime.tv_nsec-= 1000000000;
     }
   }
-
+ndb_util_thread_end:
+  net_end(&thd->net);
   thd->cleanup();
   delete thd;
   delete ndb;
@@ -7629,6 +8095,53 @@
   DBUG_RETURN(0);
 }
 
+/*
+  Implements the SHOW NDB STATUS command.
+*/
+int
+ndbcluster_show_status(THD* thd)
+{
+  Protocol *protocol= thd->protocol;
+  
+  DBUG_ENTER("ndbcluster_show_status");
+  
+  if (have_ndbcluster != SHOW_OPTION_YES) 
+  {
+    my_message(ER_NOT_SUPPORTED_YET,
+	       "Cannot call SHOW NDBCLUSTER STATUS because skip-ndbcluster is defined",
+	       MYF(0));
+    DBUG_RETURN(TRUE);
+  }
+  
+  List<Item> field_list;
+  field_list.push_back(new Item_empty_string("free_list", 255));
+  field_list.push_back(new Item_return_int("created", 10,MYSQL_TYPE_LONG));
+  field_list.push_back(new Item_return_int("free", 10,MYSQL_TYPE_LONG));
+  field_list.push_back(new Item_return_int("sizeof", 10,MYSQL_TYPE_LONG));
+
+  if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
+    DBUG_RETURN(TRUE);
+  
+  if (get_thd_ndb(thd) && get_thd_ndb(thd)->ndb)
+  {
+    Ndb* ndb= (get_thd_ndb(thd))->ndb;
+    Ndb::Free_list_usage tmp; tmp.m_name= 0;
+    while (ndb->get_free_list_usage(&tmp))
+    {
+      protocol->prepare_for_resend();
+      
+      protocol->store(tmp.m_name, &my_charset_bin);
+      protocol->store((uint)tmp.m_created);
+      protocol->store((uint)tmp.m_free);
+      protocol->store((uint)tmp.m_sizeof);
+      if (protocol->write())
+	DBUG_RETURN(TRUE);
+    }
+  }
+  send_eof(thd);
+  
+  DBUG_RETURN(FALSE);
+}
 
 /*
   Create a table in NDB Cluster

--- 1.96/sql/ha_ndbcluster.h	2005-09-28 11:02:47 +02:00
+++ 1.97/sql/ha_ndbcluster.h	2005-11-07 11:57:34 +01:00
@@ -25,6 +25,7 @@
 #pragma interface                       /* gcc class implementation */
 #endif
 
+#include <ndbapi/NdbApi.hpp>
 #include <ndbapi_limits.h>
 
 class Ndb;             // Forward declaration
@@ -36,10 +37,13 @@
 class NdbIndexScanOperation; 
 class NdbBlob;
 class NdbIndexStat;
+class NdbEventOperation;
 
 // connectstring to cluster if given by mysqld
 extern const char *ndbcluster_connectstring;
 extern ulong ndb_cache_check_time;
+extern ulong ndb_report_thresh_binlog_epoch_slip;
+extern ulong ndb_report_thresh_binlog_mem_usage;
 
 typedef enum ndb_index_type {
   UNDEFINED_INDEX = 0,
@@ -72,13 +76,25 @@
   // Index marked for deletion, for DROP INDEX
 } NDB_INDEX_DATA;
 
+typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
+
+typedef enum {
+  NSS_INITIAL= 0,
+  NSS_DROPPED
+} NDB_SHARE_STATE;
+
 typedef struct st_ndbcluster_share {
+  MEM_ROOT mem_root;
   THR_LOCK lock;
   pthread_mutex_t mutex;
-  char *table_name;
-  uint table_name_length,use_count;
+  char *key;
+  uint key_length;
+  THD *util_lock;
+  uint use_count;
   uint commit_count_lock;
   ulonglong commit_count;
+  char *db;
+  char *table_name;
 } NDB_SHARE;
 
 typedef enum ndb_item_type {
@@ -519,7 +535,7 @@
   int extra(enum ha_extra_function operation);
   int extra_opt(enum ha_extra_function operation, ulong cache_size);
   int external_lock(THD *thd, int lock_type);
-  int start_stmt(THD *thd);
+  int start_stmt(THD *thd, thr_lock_type lock_type);
   const char * table_type() const;
   const char ** bas_ext() const;
   ulong table_flags(void) const;
@@ -612,9 +628,16 @@
 
   bool check_if_incompatible_data(HA_CREATE_INFO *info,
 				  uint table_changes);
+  static void invalidate_dictionary_cache(TABLE *table, Ndb *ndb,
+					  const char *tabname, bool global);
 
 private:
+  friend int ndbcluster_drop_database(const char *path);
   int alter_table_name(const char *to);
+  static int delete_table(ha_ndbcluster *h, Ndb *ndb,
+			  const char *path,
+			  const char *db,
+			  const char *table_name);
   int drop_table();
   int create_ndb_index(const char *name, KEY *key_info, bool unique);
   int create_ordered_index(const char *name, KEY *key_info);
@@ -666,7 +689,8 @@
                       uint fieldnr, const byte* field_ptr);
   int set_ndb_key(NdbOperation*, Field *field,
                   uint fieldnr, const byte* field_ptr);
-  int set_ndb_value(NdbOperation*, Field *field, uint fieldnr, bool *set_blob_value= 0);
+  int set_ndb_value(NdbOperation*, Field *field, uint fieldnr,
+		    int row_offset= 0, bool *set_blob_value= 0);
   int get_ndb_value(NdbOperation*, Field *field, uint fieldnr, byte*);
   friend int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg);
   int get_ndb_blobs_value(NdbBlob *last_ndb_blob);
@@ -711,6 +735,7 @@
                            NdbScanOperation* op);
 
   friend int execute_commit(ha_ndbcluster*, NdbTransaction*);
+  friend int execute_no_commit_ignore_no_key(ha_ndbcluster*, NdbTransaction*);
   friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*);
   friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*);
 
@@ -727,7 +752,6 @@
   NDB_SHARE *m_share;
   NDB_INDEX_DATA  m_index[MAX_KEY];
   // NdbRecAttr has no reference to blob
-  typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
   NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
   partition_info *m_part_info;
   byte *m_rec0;
@@ -738,6 +762,7 @@
   bool m_ignore_dup_key;
   bool m_primary_key_update;
   bool m_write_op;
+  bool m_ignore_no_key;
   ha_rows m_rows_to_insert;
   ha_rows m_rows_inserted;
   ha_rows m_bulk_insert_rows;
@@ -769,7 +794,7 @@
 
 extern struct show_var_st ndb_status_variables[];
 
-handlerton *ndbcluster_init(void);
+bool ndbcluster_init(void);
 bool ndbcluster_end(void);
 
 int ndbcluster_discover(THD* thd, const char* dbname, const char* name,
@@ -781,3 +806,6 @@
 int ndbcluster_drop_database(const char* path);
 
 void ndbcluster_print_error(int error, const NdbOperation *error_op);
+
+int ndbcluster_show_status(THD*);
+
Thread
bk commit into 5.1 tree (mskold:1.1892)Martin Skold7 Nov