List:Commits« Previous MessageNext Message »
From:Martin Skold Date:June 8 2006 2:12pm
Subject:bk commit into 4.1 tree (mskold:1.2490) BUG#18184
View as plain text  
Below is the list of changes that have just been committed into a local
4.1 repository of marty. When marty does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2490 06/06/08 16:12:38 mskold@stripped +10 -0
  Fix for Bug #18184  SELECT ... FOR UPDATE does not work..: implemented ha_ndblcuster::unlock_row() and explicitly lock all rows that are not being unlocked

  sql/ha_ndbcluster.h
    1.59 06/06/08 16:12:09 mskold@stripped +2 -0
    Fix for Bug #18184  SELECT ... FOR UPDATE does not work..: implemented ha_ndblcuster::unlock_row() and explicitly lock all rows that are not being unlocked

  sql/ha_ndbcluster.cc
    1.183 06/06/08 16:12:08 mskold@stripped +70 -11
    Fix for Bug #18184  SELECT ... FOR UPDATE does not work..: implemented ha_ndblcuster::unlock_row() and explicitly lock all rows that are not being unlocked

  ndb/src/ndbapi/ndberror.c
    1.30 06/06/08 16:12:08 mskold@stripped +1 -1
    Fix for Bug #18184  SELECT ... FOR UPDATE does not work..: implemented ha_ndblcuster::unlock_row() and explicitly lock all rows that are not being unlocked

  ndb/src/ndbapi/NdbScanOperation.cpp
    1.51 06/06/08 16:12:08 mskold@stripped +21 -9
    Fix for Bug #18184  SELECT ... FOR UPDATE does not work..: implemented ha_ndblcuster::unlock_row() and explicitly lock all rows that are not being unlocked

  ndb/src/ndbapi/NdbResultSet.cpp
    1.9 06/06/08 16:12:08 mskold@stripped +11 -0
    Fix for Bug #18184  SELECT ... FOR UPDATE does not work..: implemented ha_ndblcuster::unlock_row() and explicitly lock all rows that are not being unlocked

  ndb/include/ndbapi/NdbScanOperation.hpp
    1.21 06/06/08 16:12:08 mskold@stripped +4 -2
    Fix for Bug #18184  SELECT ... FOR UPDATE does not work..: implemented ha_ndblcuster::unlock_row() and explicitly lock all rows that are not being unlocked

  ndb/include/ndbapi/NdbResultSet.hpp
    1.6 06/06/08 16:12:08 mskold@stripped +21 -0
    Fix for Bug #18184  SELECT ... FOR UPDATE does not work..: implemented ha_ndblcuster::unlock_row() and explicitly lock all rows that are not being unlocked

  ndb/include/ndbapi/NdbIndexScanOperation.hpp
    1.9 06/06/08 16:12:08 mskold@stripped +4 -3
    Fix for Bug #18184  SELECT ... FOR UPDATE does not work..: implemented ha_ndblcuster::unlock_row() and explicitly lock all rows that are not being unlocked

  mysql-test/t/ndb_lock.test
    1.8 06/06/08 16:12:08 mskold@stripped +76 -0
    Fix for Bug #18184  SELECT ... FOR UPDATE does not work..: implemented ha_ndblcuster::unlock_row() and explicitly lock all rows that are not being unlocked

  mysql-test/r/ndb_lock.result
    1.6 06/06/08 16:12:07 mskold@stripped +59 -0
    Fix for Bug #18184  SELECT ... FOR UPDATE does not work..: implemented ha_ndblcuster::unlock_row() and explicitly lock all rows that are not being unlocked

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	mskold
# Host:	linux.site
# Root:	/home/marty/MySQL/mysql-4.1

--- 1.8/ndb/include/ndbapi/NdbIndexScanOperation.hpp	2005-09-16 11:26:31 +02:00
+++ 1.9/ndb/include/ndbapi/NdbIndexScanOperation.hpp	2006-06-08 16:12:08 +02:00
@@ -45,14 +45,15 @@ public:
   NdbResultSet* readTuples(LockMode = LM_Read,
 			   Uint32 batch = 0, 
 			   Uint32 parallel = 0,
-			   bool order_by = false);
+			   bool order_by = false,
+			   bool keyinfo = false);
   
   inline NdbResultSet* readTuples(int parallell){
-    return readTuples(LM_Read, 0, parallell, false);
+    return readTuples(LM_Read, 0, parallell);
   }
   
   inline NdbResultSet* readTuplesExclusive(int parallell = 0){
-    return readTuples(LM_Exclusive, 0, parallell, false);
+    return readTuples(LM_Exclusive, 0, parallell);
   }
 
   /**

--- 1.5/ndb/include/ndbapi/NdbResultSet.hpp	2004-11-22 14:41:43 +01:00
+++ 1.6/ndb/include/ndbapi/NdbResultSet.hpp	2006-06-08 16:12:08 +02:00
@@ -102,6 +102,27 @@ public:
   int restart(bool forceSend = false);
   
   /**
+   * Lock current row by transfering scan operation to a locking transaction. 
+   * Use this function 
+   * when a scan has found a record that you want to lock. 
+   * 1. Start a new transaction.
+   * 2. Call the function takeOverForUpdate using your new transaction 
+   *    as parameter, all the properties of the found record will be copied 
+   *    to the new transaction.
+   * 3. When you execute the new transaction, the lock held by the scan will 
+   *    be transferred to the new transaction(it's taken over).
+   *
+   * @note You must have started the scan with openScanExclusive
+   *       or explictly have requested keyinfo to be able to lock 
+   *       the found tuple.
+   *
+   * @param lockingTrans the locking transaction connection.
+   * @return an NdbOperation or NULL.
+   */
+  NdbOperation* lockTuple();
+  NdbOperation*	lockTuple(NdbConnection* lockingTrans);
+
+  /**
    * Transfer scan operation to an updating transaction. Use this function 
    * when a scan has found a record that you want to update. 
    * 1. Start a new transaction.

--- 1.20/ndb/include/ndbapi/NdbScanOperation.hpp	2005-04-05 13:00:33 +02:00
+++ 1.21/ndb/include/ndbapi/NdbScanOperation.hpp	2006-06-08 16:12:08 +02:00
@@ -64,14 +64,16 @@ public:
    * Tuples are not stored in NdbResultSet until execute(NoCommit) 
    * has been executed and nextResult has been called.
    * 
+   * @param keyinfo   Return primary key, needed to be able to call lockTuple
    * @param parallel  Scan parallelism
    * @param batch No of rows to fetch from each fragment at a time
    * @param LockMode  Scan lock handling   
    * @returns NdbResultSet.
-   * @note specifying 0 for batch and parallall means max performance
+   * @note specifying 0 for batch and parallell means max performance
    */ 
   NdbResultSet* readTuples(LockMode = LM_Read, 
-			   Uint32 batch = 0, Uint32 parallel = 0);
+			   Uint32 batch = 0, Uint32 parallel = 0, 
+			   bool keyinfo = false);
   
   inline NdbResultSet* readTuples(int parallell){
     return readTuples(LM_Read, 0, parallell);

--- 1.8/ndb/src/ndbapi/NdbResultSet.cpp	2005-04-05 13:00:33 +02:00
+++ 1.9/ndb/src/ndbapi/NdbResultSet.cpp	2006-06-08 16:12:08 +02:00
@@ -73,6 +73,17 @@ void NdbResultSet::close(bool forceSend)
 }
 
 NdbOperation* 
+NdbResultSet::lockTuple(){
+  return lockTuple(m_operation->m_transConnection);
+}
+
+NdbOperation* 
+NdbResultSet::lockTuple(NdbConnection* takeOverTrans){
+  return m_operation->takeOverScanOp(NdbOperation::ReadRequest, 
+				     takeOverTrans);
+}
+
+NdbOperation* 
 NdbResultSet::updateTuple(){
   return updateTuple(m_operation->m_transConnection);
 }

--- 1.50/ndb/src/ndbapi/NdbScanOperation.cpp	2005-04-05 13:00:34 +02:00
+++ 1.51/ndb/src/ndbapi/NdbScanOperation.cpp	2006-06-08 16:12:08 +02:00
@@ -126,7 +126,8 @@ NdbScanOperation::init(const NdbTableImp
 
 NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
 					   Uint32 batch, 
-					   Uint32 parallel)
+					   Uint32 parallel,
+					   bool keyinfo)
 {
   m_ordered = 0;
 
@@ -170,7 +171,7 @@ NdbResultSet* NdbScanOperation::readTupl
     return 0;
   }
 
-  m_keyInfo = lockExcl ? 1 : 0;
+  m_keyInfo = (keyinfo || lockExcl) ? 1 : 0;
 
   bool range = false;
   if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex ||
@@ -956,18 +957,28 @@ NdbScanOperation::takeOverScanOp(Operati
     if (newOp == NULL){
       return NULL;
     }
+    if (!m_keyInfo)
+    {
+      // Cannot take over lock if no keyinfo was requested
+      setErrorCodeAbort(4604);
+      return NULL;
+    }
     pTrans->theSimpleState = 0;
     
     const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1;
 
     newOp->theTupKeyLen = len;
     newOp->theOperationType = opType;
-    if (opType == DeleteRequest) {
-      newOp->theStatus = GetValue;  
-    } else {
-      newOp->theStatus = SetValue;  
+    switch (opType) {
+    case (ReadRequest):
+      newOp->theLockMode = theLockMode;
+      // Fall through
+    case (DeleteRequest):
+      newOp->theStatus = GetValue;
+      break;
+    default:
+      newOp->theStatus = SetValue;
     }
-    
     const Uint32 * src = (Uint32*)tRecAttr->aRef();
     const Uint32 tScanInfo = src[len] & 0x3FFFF;
     const Uint32 tTakeOverNode = src[len] >> 20;
@@ -1241,8 +1252,9 @@ NdbResultSet*
 NdbIndexScanOperation::readTuples(LockMode lm,
 				  Uint32 batch,
 				  Uint32 parallel,
-				  bool order_by){
-  NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0);
+				  bool order_by,
+				  bool keyinfo){
+  NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0, keyinfo);
   if(rs && order_by){
     m_ordered = 1;
     Uint32 cnt = m_accessTable->getNoOfColumns() - 1;

--- 1.182/sql/ha_ndbcluster.cc	2006-06-02 07:25:57 +02:00
+++ 1.183/sql/ha_ndbcluster.cc	2006-06-08 16:12:08 +02:00
@@ -1,4 +1,4 @@
- /* Copyright (C) 2000-2003 MySQL AB
+  /* Copyright (C) 2000-2003 MySQL AB
 
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
@@ -1043,12 +1043,23 @@ void ha_ndbcluster::release_metadata()
 
 int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
 {
+  DBUG_ENTER("ha_ndbcluster::get_ndb_lock_type");
   if (type >= TL_WRITE_ALLOW_WRITE)
-    return NdbOperation::LM_Exclusive;
-  else if (uses_blob_value(m_retrieve_all_fields))
-    return NdbOperation::LM_Read;
+  {
+    DBUG_PRINT("info", ("Using exclusive lock"));
+    DBUG_RETURN(NdbOperation::LM_Exclusive);
+  }
+  else if (type ==  TL_READ_WITH_SHARED_LOCKS || 
+	   uses_blob_value(m_retrieve_all_fields))
+  {
+    DBUG_PRINT("info", ("Using read lock"));
+    DBUG_RETURN(NdbOperation::LM_Read);
+  }
   else
-    return NdbOperation::LM_CommittedRead;
+  {
+    DBUG_PRINT("info", ("Using committed read"));
+    DBUG_RETURN(NdbOperation::LM_CommittedRead);
+  }
 }
 
 static const ulong index_type_flags[]=
@@ -1384,12 +1395,35 @@ inline int ha_ndbcluster::next_result(by
 
   if (!cursor)
     DBUG_RETURN(HA_ERR_END_OF_FILE);
+
+  if (m_lock_tuple)
+  {
+    /*
+      Lock level m_lock.type either TL_WRITE_ALLOW_WRITE
+      (SELECT FOR UPDATE) or TL_READ_WITH_SHARED_LOCKS (SELECT
+      LOCK WITH SHARE MODE) and row was not explictly unlocked 
+      with unlock_row() call
+    */
+      NdbConnection *trans= m_active_trans;
+      NdbOperation *op;
+      // Lock row
+      DBUG_PRINT("info", ("Keeping lock on scanned row"));
+      
+      if (!(op= m_active_cursor->lockTuple()))
+      {
+	m_lock_tuple= false;
+	ERR_RETURN(trans->getNdbError());
+      }
+      m_ops_pending++;
+  }
+  m_lock_tuple= false;
     
   /* 
      If this an update or delete, call nextResult with false
      to process any records already cached in NdbApi
   */
-  bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE;
+  bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE &&
+                    m_lock.type != TL_READ_WITH_SHARED_LOCKS;
   do {
     DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
     /*
@@ -1407,9 +1441,16 @@ inline int ha_ndbcluster::next_result(by
     {
       // One more record found
       DBUG_PRINT("info", ("One more record found"));    
-
+      
       unpack_record(buf);
       table->status= 0;
+      /*
+	Explicitly lock tuple if "select for update" or
+	"select lock in share mode"
+      */
+      m_lock_tuple= (m_lock.type == TL_WRITE_ALLOW_WRITE
+		     || 
+		     m_lock.type == TL_READ_WITH_SHARED_LOCKS);
       DBUG_RETURN(0);
     } 
     else if (check == 1 || check == 2)
@@ -1444,7 +1485,6 @@ inline int ha_ndbcluster::next_result(by
       contact_ndb= (check == 2);
     }
   } while (check == 2);
-    
   table->status= STATUS_NOT_FOUND;
   if (check == -1)
     DBUG_RETURN(ndb_err(trans));
@@ -1679,10 +1719,11 @@ int ha_ndbcluster::ordered_index_scan(co
     restart= false;
     NdbOperation::LockMode lm=
       (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
+    bool need_pk = (lm == NdbOperation::LM_Read);
     if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
 					      m_index[active_index].index, 
 					      (const NDBTAB *) m_table)) ||
-	!(cursor= op->readTuples(lm, 0, parallelism, sorted)))
+	!(cursor= op->readTuples(lm, 0, parallelism, sorted, need_pk)))
       ERR_RETURN(trans->getNdbError());
     m_active_cursor= cursor;
   } else {
@@ -1817,8 +1858,9 @@ int ha_ndbcluster::full_table_scan(byte 
 
   NdbOperation::LockMode lm=
     (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
+  bool need_pk = (lm == NdbOperation::LM_Read);
   if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
-      !(cursor= op->readTuples(lm, 0, parallelism)))
+      !(cursor= op->readTuples(lm, 0, parallelism, need_pk)))
     ERR_RETURN(trans->getNdbError());
   m_active_cursor= cursor;
   DBUG_RETURN(define_read_attrs(buf, op));
@@ -2082,6 +2124,7 @@ int ha_ndbcluster::update_row(const byte
     DBUG_PRINT("info", ("Calling updateTuple on cursor"));
     if (!(op= cursor->updateTuple()))
       ERR_RETURN(trans->getNdbError());
+    m_lock_tuple= false;
     m_ops_pending++;
     if (uses_blob_value(FALSE))
       m_blobs_pending= TRUE;
@@ -2157,6 +2200,7 @@ int ha_ndbcluster::delete_row(const byte
     DBUG_PRINT("info", ("Calling deleteTuple on cursor"));
     if (cursor->deleteTuple() != 0)
       ERR_RETURN(trans->getNdbError());     
+    m_lock_tuple= false;
     m_ops_pending++;
 
     no_uncommitted_rows_update(-1);
@@ -2439,6 +2483,12 @@ int ha_ndbcluster::index_init(uint index
 {
   DBUG_ENTER("index_init");
   DBUG_PRINT("enter", ("index: %u", index));
+  /*
+    Locks are are explicitly released in scan
+    unless m_lock.type == TL_READ_HIGH_PRIORITY
+    and no sub-sequent call to unlock_row()
+   */
+  m_lock_tuple= false;
   DBUG_RETURN(handler::index_init(index));
 }
 
@@ -2683,7 +2733,7 @@ int ha_ndbcluster::close_scan()
   if (!cursor)
     DBUG_RETURN(1);
 
-  
+  m_lock_tuple= false;
   if (m_ops_pending)
   {
     /*
@@ -3382,6 +3432,15 @@ int ha_ndbcluster::external_lock(THD *th
   When using LOCK TABLE's external_lock will start a transaction
   since ndb does not currently does not support table locking
 */
+
+void ha_ndbcluster::unlock_row() 
+{
+  DBUG_ENTER("unlock_row");
+
+  DBUG_PRINT("info", ("Unlocking row"));
+  m_lock_tuple= false;
+  DBUG_VOID_RETURN;
+}
 
 int ha_ndbcluster::start_stmt(THD *thd)
 {

--- 1.58/sql/ha_ndbcluster.h	2006-02-10 17:40:18 +01:00
+++ 1.59/sql/ha_ndbcluster.h	2006-06-08 16:12:09 +02:00
@@ -120,6 +120,7 @@ class ha_ndbcluster: public handler
   int extra_opt(enum ha_extra_function operation, ulong cache_size);
   int reset();
   int external_lock(THD *thd, int lock_type);
+  void unlock_row();
   int start_stmt(THD *thd);
   const char * table_type() const;
   const char ** bas_ext() const;
@@ -223,6 +224,7 @@ class ha_ndbcluster: public handler
   char m_tabname[FN_HEADLEN];
   ulong m_table_flags;
   THR_LOCK_DATA m_lock;
+  bool m_lock_tuple;
   NDB_SHARE *m_share;
   NDB_INDEX_DATA  m_index[MAX_KEY];
   // NdbRecAttr has no reference to blob

--- 1.5/mysql-test/r/ndb_lock.result	2004-10-13 10:07:50 +02:00
+++ 1.6/mysql-test/r/ndb_lock.result	2006-06-08 16:12:07 +02:00
@@ -63,3 +63,62 @@ pk	u	o
 5	5	5
 insert into t1 values (1,1,1);
 drop table t1;
+create table t1 (x integer not null primary key, y varchar(32)) engine = ndb;
+insert into t1 values (1,'one'), (2,'two'),(3,"three");
+begin;
+select * from t1 where x = 1 for update;
+x	y
+1	one
+begin;
+select * from t1 where x = 2 for update;
+x	y
+2	two
+select * from t1 where x = 1 for update;
+ERROR HY000: Lock wait timeout exceeded; try restarting transaction
+rollback;
+commit;
+begin;
+select * from t1 where y = 'one' or y = 'three' for update;
+x	y
+3	three
+1	one
+begin;
+select * from t1 where x = 2 for update;
+x	y
+2	two
+select * from t1 where x = 1 for update;
+ERROR HY000: Lock wait timeout exceeded; try restarting transaction
+rollback;
+commit;
+begin;
+select * from t1 where x = 1 lock in share mode;
+x	y
+1	one
+begin;
+select * from t1 where x = 1 lock in share mode;
+x	y
+1	one
+select * from t1 where x = 2 for update;
+x	y
+2	two
+select * from t1 where x = 1 for update;
+ERROR HY000: Lock wait timeout exceeded; try restarting transaction
+rollback;
+commit;
+begin;
+select * from t1 where y = 'one' or y = 'three' lock in share mode;
+x	y
+3	three
+1	one
+begin;
+select * from t1 where y = 'one' lock in share mode;
+x	y
+1	one
+select * from t1 where x = 2 for update;
+x	y
+2	two
+select * from t1 where x = 1 for update;
+ERROR HY000: Lock wait timeout exceeded; try restarting transaction
+rollback;
+commit;
+drop table t1;

--- 1.7/mysql-test/t/ndb_lock.test	2005-07-28 02:21:45 +02:00
+++ 1.8/mysql-test/t/ndb_lock.test	2006-06-08 16:12:08 +02:00
@@ -69,4 +69,80 @@ insert into t1 values (1,1,1);
 
 drop table t1;
 
+# Lock for update
+
+create table t1 (x integer not null primary key, y varchar(32)) engine = ndb;
+
+insert into t1 values (1,'one'), (2,'two'),(3,"three"); 
+
+# PK access
+connection con1;
+begin;
+select * from t1 where x = 1 for update;
+
+connection con2;
+begin;
+select * from t1 where x = 2 for update;
+--error 1205
+select * from t1 where x = 1 for update;
+rollback;
+
+connection con1;
+commit;
+
+# scan
+connection con1;
+begin;
+select * from t1 where y = 'one' or y = 'three' for update;
+
+connection con2;
+begin;
+# Have to check with pk access here since scans take locks on
+# all rows and then release them in chunks
+select * from t1 where x = 2 for update;
+--error 1205
+select * from t1 where x = 1 for update;
+rollback;
+
+connection con1;
+commit;
+
+# share locking
+
+# PK access
+connection con1;
+begin;
+select * from t1 where x = 1 lock in share mode;
+
+connection con2;
+begin;
+select * from t1 where x = 1 lock in share mode;
+select * from t1 where x = 2 for update;
+--error 1205
+select * from t1 where x = 1 for update;
+rollback;
+
+connection con1;
+commit;
+
+# scan
+connection con1;
+begin;
+select * from t1 where y = 'one' or y = 'three' lock in share mode;
+
+connection con2;
+begin;
+select * from t1 where y = 'one' lock in share mode;
+# Have to check with pk access here since scans take locks on
+# all rows and then release them in chunks
+select * from t1 where x = 2 for update;
+--error 1205
+select * from t1 where x = 1 for update;
+rollback;
+
+connection con1;
+commit;
+
+drop table t1;
+
 # End of 4.1 tests

--- 1.29/ndb/src/ndbapi/ndberror.c	2005-08-31 13:09:13 +02:00
+++ 1.30/ndb/src/ndbapi/ndberror.c	2006-06-08 16:12:08 +02:00
@@ -281,7 +281,7 @@ ErrorBundle ErrorCodes[] = {
   { 4601, AE, "Transaction is not started"},
   { 4602, AE, "You must call getNdbOperation before executeScan" },
   { 4603, AE, "There can only be ONE operation in a scan transaction" },
-  { 4604, AE, "takeOverScanOp, opType must be UpdateRequest or DeleteRequest" },
+  { 4604, AE, "takeOverScanOp, to take over a scanned row one must explicitly request keyinfo in readTuples call" },
   { 4605, AE, "You may only call openScanRead or openScanExclusive once for each operation"},
   { 4607, AE, "There may only be one operation in a scan transaction"},
   { 4608, AE, "You can not takeOverScan unless you have used openScanExclusive"},
Thread
bk commit into 4.1 tree (mskold:1.2490) BUG#18184Martin Skold8 Jun