List:NDB Connectors« Previous MessageNext Message »
From:Monty Taylor Date:May 5 2007 7:47am
Subject:Rev 102: Made testasynch.py use threads. in http://bazaar.launchpad.net/~ndb-connectors/ndb-connectors/exceptions
View as plain text  
------------------------------------------------------------
revno: 102
revision-id: mtaylor@stripped
parent: mtaylor@stripped
committer: Monty Taylor <mtaylor@stripped>
branch nick: exceptions
timestamp: Sat 2007-05-05 00:46:55 -0700
message:
  Made testasynch.py use threads.
modified:
  python/testasynch.py           testasynch.py-20070227185119-f8ow9m7i9zfgr5jr-1
  swig/NdbClusterConnection.i    ndb_cluster_connecti-20070228021421-qkr4cbpxymyqdrf3-6
=== modified file 'python/testasynch.py'
--- a/python/testasynch.py	2007-05-05 06:47:00 +0000
+++ b/python/testasynch.py	2007-05-05 07:46:55 +0000
@@ -1,6 +1,8 @@
+from __future__ import with_statement
 import sys,time,random,struct,math
 from mysql.cluster import ndbapi
 import mysql.cluster.ndbapi
+import threading 
 
 class PythonCallback(object):
 
@@ -19,6 +21,7 @@
 
 num_iter=int(sys.argv[1])
 INSERT_NUM=int(sys.argv[2])
+concurrent=10
 BATCH_SIZE=1000
 
 db = MySQLdb.connect(host="localhost",user="root",read_default_group="client")
@@ -138,27 +141,62 @@
 
 before_t = time.time()
 
-
-for f in range(0,int(num_iter)):
-
-    id_num = ids[random.randrange(0,len(ids))]
-    myTrans = myNdb.startTransaction("mytablename","%s"%id_num)
-    if myTrans is None:
-        print myNdb.getNdbError().getMessage()
-
-    myOper = myTrans.getNdbOperation("mytablename")
-    myOper.readTuple(ndbapi.NdbOperation.LM_Read)
-
-    myOper.equal("ATTR1",id_num)
+threadlist=[]
+connsem=threading.BoundedSemaphore(value=1)
+
+def runit():
+  mydata=threading.local()
+  mydata.t=threading.currentThread()
+  with connsem:
+    mydata.ndb=connection.getNdb("test")
+  if (mydata.ndb.init(4) == -1):
+    print mydata.ndb.getNdbError().getMessage()
+    sys.exit(-1)
+  mydata.translist=[]
+
+  for mydata.f in range(0,int(num_iter/concurrent)):
+
+    mydata.id_num = ids[random.randrange(0,len(ids))]
+    #print "starting new trans", mydata.f,mydata.t.getName()
+    mydata.myTrans = mydata.ndb.startTransaction("mytablename","%s"%mydata.id_num)
+    
+    if mydata.myTrans is None:
+        print mydata.ndb.getNdbError().getMessage()
+
+    mydata.myOper = mydata.myTrans.getNdbOperation("mytablename")
+    mydata.myOper.readTuple(ndbapi.NdbOperation.LM_Read)
+
+    mydata.myOper.equal("ATTR1",mydata.id_num)
             
-    myRecAttr= myOper.getValue("ATTR2")
+    mydata.myRecAttr= mydata.myOper.getValue("ATTR2")
 
-    cb =  PythonCallback(myRecAttr)
+    mydata.cb =  PythonCallback(mydata.myRecAttr)
     
-    if myTrans.executeAsynchPrepare( ndbapi.Commit , cb ) == -1:
-        print myTrans.getNdbError().getMessage()
-    myNdb.sendPollNdb(3000,1)
-    myTrans.close()
+    if mydata.myTrans.executeAsynchPrepare( ndbapi.Commit , mydata.cb ) == -1:
+        print mydata.myTrans.getNdbError().getMessage()
+    mydata.translist.append(mydata.myTrans)
+    if mydata.f%2==0:
+      #print "polling"
+      mydata.ndb.sendPollNdb(3000,20)
+      for mydata.x in range(0,len(mydata.translist)):
+        mydata.x=mydata.translist.pop()
+        mydata.x.close()
+        del(mydata.x)
+  time.sleep(0.001)
+  mydata.ndb.sendPollNdb(3000,4)
+
+  for mydata.x in range(0,len(mydata.translist)):
+    mydata.x=mydata.translist.pop()
+    mydata.x.close()
+    del(mydata.x)
+
+for x in range(0,concurrent):
+  thr=threading.Thread(target=runit,name="thread%s"%x)
+  threadlist.append(thr)
+  thr.start()
+
+for x in threadlist:
+  x.join()
 
 after_t = time.time()
 print "NDBAPI time for %s: %s -- %s" % ( num_iter, after_t, before_t)

=== modified file 'swig/NdbClusterConnection.i'
--- a/swig/NdbClusterConnection.i	2007-05-05 06:53:53 +0000
+++ b/swig/NdbClusterConnection.i	2007-05-05 07:46:55 +0000
@@ -93,7 +93,6 @@
       }
   }
   Ndb* getNdb(const char* aCatalogName="", const char* aSchemaName="def") {
-    Ndb* myNdb = new Ndb(self,aCatalogName,aSchemaName);
-    return myNdb; 
+    return new Ndb(self,aCatalogName,aSchemaName);
   }
 }

Thread
Rev 102: Made testasynch.py use threads. in http://bazaar.launchpad.net/~ndb-connectors/ndb-connectors/exceptionsMonty Taylor5 May