------------------------------------------------------------
revno: 99
revision-id: mtaylor@stripped
parent: mtaylor@stripped
committer: Monty Taylor <mtaylor@stripped>
branch nick: exceptions
timestamp: Fri 2007-05-04 23:47:00 -0700
message:
Fixed Python Async code.
Removed two memory leaks (thanks python debug mode)
modified:
python/testasynch.py testasynch.py-20070227185119-f8ow9m7i9zfgr5jr-1
swig/NdbClusterConnection.i ndb_cluster_connecti-20070228021421-qkr4cbpxymyqdrf3-6
swig/NdbTransaction.i ndbtransaction.i-20070227184716-ecjyhh3jgvmye4de-7
=== modified file 'python/testasynch.py'
--- a/python/testasynch.py 2007-04-03 04:06:47 +0000
+++ b/python/testasynch.py 2007-05-05 06:47:00 +0000
@@ -1,26 +1,65 @@
+import sys,time,random,struct,math
from mysql.cluster import ndbapi
-import sys,time,random,struct,math
+import mysql.cluster.ndbapi
+
+class PythonCallback(object):
+
+ def __init__(self, recAttr):
+ self.recAttr=recAttr
+
+ def __call__(self, ret, myTrans):
+ #print "value = ", self.recAttr.get_value()
+ x=self.recAttr.get_value()
+
+import MySQLdb
+
+if len(sys.argv) != 3:
+ print "Usage:\n\ttest.py NUM_OF_ITERATIONS NUM_OF_ROWS "
+ sys.exit(1)
+
+num_iter=int(sys.argv[1])
+INSERT_NUM=int(sys.argv[2])
+BATCH_SIZE=1000
+
+db = MySQLdb.connect(host="localhost",user="root",read_default_group="client")
+
+cur=db.cursor()
+
+print "Dropping and recreating schema\n"
+
+cur.execute("CREATE DATABASE if not exists test")
+cur.execute("USE test")
+
+cur.execute("drop table if exists mytablename")
+
+cur.execute("""
+CREATE TABLE if not exists
+ mytablename
+ (ATTR1 INT UNSIGNED not null auto_increment,
+ ATTR2 INT UNSIGNED NOT NULL,
+ PRIMARY KEY USING HASH (ATTR1))
+ENGINE=NDBCLUSTER
+""")
+
### Connect to cluster
print "connecting to cluster\n"
-
-connection = ndbapi.Ndb_cluster_connection()
-
try:
+ connection = ndbapi.NdbFactory.createNdbClusterConnection();
+
connection.connect(1,1,1)
-except RuntimeError, e:
+except mysql.cluster.ndbapi.NdbApiException,e:
print e
sys.exit(-1)
-
-
+
if (connection.wait_until_ready(30,30)):
print "Cluster was not ready within 30 secs."
sys.exit(-1);
-myNdb = ndbapi.Ndb(connection,"test" )
+myNdb = connection.getNdb("test")
if (myNdb.init(4) == -1):
print myNdb.getNdbError().getMessage()
@@ -31,68 +70,117 @@
### Fill db
-def testasync(*args):
- #for f in args:
- # print "arg: %s" % f
- print "hello - I have no state, since I'm just a function!!"
-
-class testaclass(object):
-
- def __init__(self, recAttr):
- self.recAttr=recAttr
-
- def __call__(self, ret, myTrans):
- print "hello from testaclass"
- print "value = ", self.recAttr.get_value()
- #myNdb.closeTransaction(myTrans)
-
-
-### Fetch ids using ndb
-
-
-myTrans = myNdb.startTransaction()
-if myTrans is None:
- print myNdb.getNdbError().getMessage()
-
-myOper = myTrans.getNdbOperation("mytablename")
-myOper.readTuple(ndbapi.NdbOperation.LM_Read)
-
-myOper.equal("ATTR1", 245755 )
-
-myRecAttr= myOper.getValue("ATTR2")
-
-a = testaclass(myRecAttr)
-
-if myTrans.executeAsynchPrepare( ndbapi.Commit, a ) == -1:
- print myTrans.getNdbError().getMessage()
- #foo=myRecAttr.u_32_value()
-
-### Fetch ids using ndb
-
-
-myTrans = myNdb.startTransaction()
-if myTrans is None:
- print myNdb.getNdbError().getMessage()
-
-myOper = myTrans.getNdbOperation("mytablename")
-myOper.readTuple(ndbapi.NdbOperation.LM_Read)
-
-myOper.equal("ATTR1", 84 )
-
-myRecAttr= myOper.getValue("ATTR2")
-
-callbacks=[]
-
-for x in range(0,30):
- a = testaclass(myRecAttr)
- callbacks.append(a)
-
- if myTrans.executeAsynchPrepare( ndbapi.Commit, a ) == -1:
- print myTrans.getNdbError().getMessage()
- #foo=myRecAttr.u_32_value()
-
-
-myNdb.sendPollNdb(3000,1)
-
-
-
+
+before_t = time.time()
+
+for t in range(0,int(math.ceil((int(INSERT_NUM)*1.0)/(1.0*BATCH_SIZE)))):
+
+ myTransaction= myNdb.startTransaction()
+ val = ((t+1)*BATCH_SIZE)-INSERT_NUM
+ offset = 0
+ if ( val > 0 ):
+ offset = val
+
+ for i in range(0,BATCH_SIZE-offset):
+
+ myOperation=myTransaction.getNdbOperation("mytablename")
+ myOperation.insertTuple()
+ auto_id = myNdb.getAutoIncrementValue("mytablename",BATCH_SIZE)
+
+ myOperation.equal("ATTR1",auto_id);
+ myOperation.setValue("ATTR2", i);
+
+ ret = myTransaction.execute( ndbapi.Commit )
+ if ret == -1:
+ print myTransaction.getNdbError().getMessage()
+
+ #myTransaction.close()
+
+after_t = time.time()
+print "Insert time for %s: %s -- %s" % ( INSERT_NUM, after_t, before_t)
+print "\t %s" % (after_t - before_t)
+
+
+### Get list of ids
+myTransaction = myNdb.startTransaction()
+
+myScanOperation=myTransaction.getNdbScanOperation("mytablename");
+
+
+if myScanOperation is None:
+ print myTransaction.getNdbError().getMessage()
+
+
+if myScanOperation.readTuples(ndbapi.NdbOperation.LM_CommittedRead):
+ print myScanOperation.getNdbError().getMessage()
+
+
+
+myRecAttr=myScanOperation.getValue("ATTR1");
+
+
+myTransaction.execute(ndbapi.NoCommit);
+
+ids=[]
+while 1:
+
+ if (myScanOperation.nextResult(True) != 0) :
+ break
+
+ random_id = myRecAttr.u_32_value()
+ ids.append(random_id)
+
+
+#myTransaction.close()
+
+
+### Fetch ids using ndb
+
+before_t = time.time()
+
+
+for f in range(0,int(num_iter)):
+
+ id_num = ids[random.randrange(0,len(ids))]
+ myTrans = myNdb.startTransaction("mytablename","%s"%id_num)
+ if myTrans is None:
+ print myNdb.getNdbError().getMessage()
+
+ myOper = myTrans.getNdbOperation("mytablename")
+ myOper.readTuple(ndbapi.NdbOperation.LM_Read)
+
+ myOper.equal("ATTR1",id_num)
+
+ myRecAttr= myOper.getValue("ATTR2")
+
+ cb = PythonCallback(myRecAttr)
+
+ if myTrans.executeAsynchPrepare( ndbapi.Commit , cb ) == -1:
+ print myTrans.getNdbError().getMessage()
+ myNdb.sendPollNdb(3000,1)
+ myTrans.close()
+
+after_t = time.time()
+print "NDBAPI time for %s: %s -- %s" % ( num_iter, after_t, before_t)
+print "\t %s" % (after_t - before_t)
+
+
+
+cur=db.cursor()
+
+before_t = time.time()
+
+for f in range(0,int(num_iter)):
+
+ id_num = ids[random.randrange(0,len(ids))]
+
+ cur.execute("select ATTR2 from mytablename where ATTR1=%s" % (id_num))
+ res=cur.fetchall()
+
+after_t = time.time()
+
+after_t = time.time()
+print "MySQL time for %s: %s -- %s" % ( num_iter, after_t, before_t)
+print "\t %s" % (after_t - before_t)
+
+#ndbapi.ndb_end()
=== modified file 'swig/NdbClusterConnection.i'
--- a/swig/NdbClusterConnection.i 2007-05-04 20:45:38 +0000
+++ b/swig/NdbClusterConnection.i 2007-05-05 06:47:00 +0000
@@ -23,10 +23,12 @@
// NdbFactory.getNdbClusterConnection should be used instead
Ndb_cluster_connection(const char * connectstring = 0);
+
+public:
+
+
~Ndb_cluster_connection();
-public:
-
void set_name(const char* name);
#if defined(SWIGJAVA)
@@ -38,25 +40,25 @@
if (result) {
const char * msg = "Connect to management server failed";
NDB_exception(NdbApiException,msg);
- return -1;
+ goto fail;
}
}
%typemap(check) int no_retries {
if ($1 < 0) {
NDB_exception(NdbClusterConnectionPermanentException,"Retries must be greater than
or equal to zero.");
- return -1;
+ goto fail;
}
}
%typemap(check) int retry_delay_in_seconds {
if ($1 < 0) {
NDB_exception(NdbClusterConnectionPermanentException,"Delay must be greater than or
equal to zero.");
- return -1;
+ goto fail;
}
}
%typemap(check) int verbose {
if ($1 < 0 || $1 > 1) {
NDB_exception(NdbClusterConnectionPermanentException,"Verbose must be either zero
or one.");
- return -1;
+ goto fail;
}
}
int connect(int no_retries=0, int retry_delay_in_seconds=1, int verbose=0);
=== modified file 'swig/NdbTransaction.i'
--- a/swig/NdbTransaction.i 2007-05-04 20:45:38 +0000
+++ b/swig/NdbTransaction.i 2007-05-05 06:47:00 +0000
@@ -18,7 +18,7 @@
*/
%delobject NdbTransaction::close;
-%newobject NdbTransaction::getNdbOperation;
+//%newobject NdbTransaction::getNdbOperation;
class NdbTransaction {
~NdbTransaction();
| Thread |
|---|
| • Rev 99: Fixed Python Async code. in http://bazaar.launchpad.net/~ndb-connectors/ndb-connectors/exceptions | Monty Taylor | 5 May |