------------------------------------------------------------
revno: 102
revision-id: mtaylor@stripped
parent: mtaylor@stripped
committer: Monty Taylor <mtaylor@stripped>
branch nick: exceptions
timestamp: Sat 2007-05-05 00:46:55 -0700
message:
Made testasynch.py use threads.
modified:
python/testasynch.py testasynch.py-20070227185119-f8ow9m7i9zfgr5jr-1
swig/NdbClusterConnection.i ndb_cluster_connecti-20070228021421-qkr4cbpxymyqdrf3-6
=== modified file 'python/testasynch.py'
--- a/python/testasynch.py 2007-05-05 06:47:00 +0000
+++ b/python/testasynch.py 2007-05-05 07:46:55 +0000
@@ -1,6 +1,8 @@
+from __future__ import with_statement
import sys,time,random,struct,math
from mysql.cluster import ndbapi
import mysql.cluster.ndbapi
+import threading
class PythonCallback(object):
@@ -19,6 +21,7 @@
num_iter=int(sys.argv[1])
INSERT_NUM=int(sys.argv[2])
+concurrent=10
BATCH_SIZE=1000
db = MySQLdb.connect(host="localhost",user="root",read_default_group="client")
@@ -138,27 +141,62 @@
before_t = time.time()
-
-for f in range(0,int(num_iter)):
-
- id_num = ids[random.randrange(0,len(ids))]
- myTrans = myNdb.startTransaction("mytablename","%s"%id_num)
- if myTrans is None:
- print myNdb.getNdbError().getMessage()
-
- myOper = myTrans.getNdbOperation("mytablename")
- myOper.readTuple(ndbapi.NdbOperation.LM_Read)
-
- myOper.equal("ATTR1",id_num)
+threadlist=[]
+connsem=threading.BoundedSemaphore(value=1)
+
+def runit():
+ mydata=threading.local()
+ mydata.t=threading.currentThread()
+ with connsem:
+ mydata.ndb=connection.getNdb("test")
+ if (mydata.ndb.init(4) == -1):
+ print mydata.ndb.getNdbError().getMessage()
+ sys.exit(-1)
+ mydata.translist=[]
+
+ for mydata.f in range(0,int(num_iter/concurrent)):
+
+ mydata.id_num = ids[random.randrange(0,len(ids))]
+ #print "starting new trans", mydata.f,mydata.t.getName()
+ mydata.myTrans = mydata.ndb.startTransaction("mytablename","%s"%mydata.id_num)
+
+ if mydata.myTrans is None:
+ print mydata.ndb.getNdbError().getMessage()
+
+ mydata.myOper = mydata.myTrans.getNdbOperation("mytablename")
+ mydata.myOper.readTuple(ndbapi.NdbOperation.LM_Read)
+
+ mydata.myOper.equal("ATTR1",mydata.id_num)
- myRecAttr= myOper.getValue("ATTR2")
+ mydata.myRecAttr= mydata.myOper.getValue("ATTR2")
- cb = PythonCallback(myRecAttr)
+ mydata.cb = PythonCallback(mydata.myRecAttr)
- if myTrans.executeAsynchPrepare( ndbapi.Commit , cb ) == -1:
- print myTrans.getNdbError().getMessage()
- myNdb.sendPollNdb(3000,1)
- myTrans.close()
+ if mydata.myTrans.executeAsynchPrepare( ndbapi.Commit , mydata.cb ) == -1:
+ print mydata.myTrans.getNdbError().getMessage()
+ mydata.translist.append(mydata.myTrans)
+ if mydata.f%2==0:
+ #print "polling"
+ mydata.ndb.sendPollNdb(3000,20)
+ for mydata.x in range(0,len(mydata.translist)):
+ mydata.x=mydata.translist.pop()
+ mydata.x.close()
+ del(mydata.x)
+ time.sleep(0.001)
+ mydata.ndb.sendPollNdb(3000,4)
+
+ for mydata.x in range(0,len(mydata.translist)):
+ mydata.x=mydata.translist.pop()
+ mydata.x.close()
+ del(mydata.x)
+
+for x in range(0,concurrent):
+ thr=threading.Thread(target=runit,name="thread%s"%x)
+ threadlist.append(thr)
+ thr.start()
+
+for x in threadlist:
+ x.join()
after_t = time.time()
print "NDBAPI time for %s: %s -- %s" % ( num_iter, after_t, before_t)
=== modified file 'swig/NdbClusterConnection.i'
--- a/swig/NdbClusterConnection.i 2007-05-05 06:53:53 +0000
+++ b/swig/NdbClusterConnection.i 2007-05-05 07:46:55 +0000
@@ -93,7 +93,6 @@
}
}
Ndb* getNdb(const char* aCatalogName="", const char* aSchemaName="def") {
- Ndb* myNdb = new Ndb(self,aCatalogName,aSchemaName);
- return myNdb;
+ return new Ndb(self,aCatalogName,aSchemaName);
}
}
| Thread |
|---|
| • Rev 102: Made testasynch.py use threads. in http://bazaar.launchpad.net/~ndb-connectors/ndb-connectors/exceptions | Monty Taylor | 5 May |