From: Date: May 5 2007 8:47am Subject: Rev 99: Fixed Python Async code. in http://bazaar.launchpad.net/~ndb-connectors/ndb-connectors/exceptions List-Archive: http://lists.mysql.com/commits/26163 Message-Id: ------------------------------------------------------------ revno: 99 revision-id: mtaylor@stripped parent: mtaylor@stripped committer: Monty Taylor branch nick: exceptions timestamp: Fri 2007-05-04 23:47:00 -0700 message: Fixed Python Async code. Removed two memory leaks (thanks python debug mode) modified: python/testasynch.py testasynch.py-20070227185119-f8ow9m7i9zfgr5jr-1 swig/NdbClusterConnection.i ndb_cluster_connecti-20070228021421-qkr4cbpxymyqdrf3-6 swig/NdbTransaction.i ndbtransaction.i-20070227184716-ecjyhh3jgvmye4de-7 === modified file 'python/testasynch.py' --- a/python/testasynch.py 2007-04-03 04:06:47 +0000 +++ b/python/testasynch.py 2007-05-05 06:47:00 +0000 @@ -1,26 +1,65 @@ +import sys,time,random,struct,math from mysql.cluster import ndbapi -import sys,time,random,struct,math +import mysql.cluster.ndbapi + +class PythonCallback(object): + + def __init__(self, recAttr): + self.recAttr=recAttr + + def __call__(self, ret, myTrans): + #print "value = ", self.recAttr.get_value() + x=self.recAttr.get_value() + +import MySQLdb + +if len(sys.argv) != 3: + print "Usage:\n\ttest.py NUM_OF_ITERATIONS NUM_OF_ROWS " + sys.exit(1) + +num_iter=int(sys.argv[1]) +INSERT_NUM=int(sys.argv[2]) +BATCH_SIZE=1000 + +db = MySQLdb.connect(host="localhost",user="root",read_default_group="client") + +cur=db.cursor() + +print "Dropping and recreating schema\n" + +cur.execute("CREATE DATABASE if not exists test") +cur.execute("USE test") + +cur.execute("drop table if exists mytablename") + +cur.execute(""" +CREATE TABLE if not exists + mytablename + (ATTR1 INT UNSIGNED not null auto_increment, + ATTR2 INT UNSIGNED NOT NULL, + PRIMARY KEY USING HASH (ATTR1)) +ENGINE=NDBCLUSTER +""") + ### Connect to cluster print "connecting to cluster\n" - -connection = ndbapi.Ndb_cluster_connection() - try: + connection = ndbapi.NdbFactory.createNdbClusterConnection(); + connection.connect(1,1,1) -except RuntimeError, e: +except mysql.cluster.ndbapi.NdbApiException,e: print e sys.exit(-1) - - + if (connection.wait_until_ready(30,30)): print "Cluster was not ready within 30 secs." sys.exit(-1); -myNdb = ndbapi.Ndb(connection,"test" ) +myNdb = connection.getNdb("test") if (myNdb.init(4) == -1): print myNdb.getNdbError().getMessage() @@ -31,68 +70,117 @@ ### Fill db -def testasync(*args): - #for f in args: - # print "arg: %s" % f - print "hello - I have no state, since I'm just a function!!" - -class testaclass(object): - - def __init__(self, recAttr): - self.recAttr=recAttr - - def __call__(self, ret, myTrans): - print "hello from testaclass" - print "value = ", self.recAttr.get_value() - #myNdb.closeTransaction(myTrans) - - -### Fetch ids using ndb - - -myTrans = myNdb.startTransaction() -if myTrans is None: - print myNdb.getNdbError().getMessage() - -myOper = myTrans.getNdbOperation("mytablename") -myOper.readTuple(ndbapi.NdbOperation.LM_Read) - -myOper.equal("ATTR1", 245755 ) - -myRecAttr= myOper.getValue("ATTR2") - -a = testaclass(myRecAttr) - -if myTrans.executeAsynchPrepare( ndbapi.Commit, a ) == -1: - print myTrans.getNdbError().getMessage() - #foo=myRecAttr.u_32_value() - -### Fetch ids using ndb - - -myTrans = myNdb.startTransaction() -if myTrans is None: - print myNdb.getNdbError().getMessage() - -myOper = myTrans.getNdbOperation("mytablename") -myOper.readTuple(ndbapi.NdbOperation.LM_Read) - -myOper.equal("ATTR1", 84 ) - -myRecAttr= myOper.getValue("ATTR2") - -callbacks=[] - -for x in range(0,30): - a = testaclass(myRecAttr) - callbacks.append(a) - - if myTrans.executeAsynchPrepare( ndbapi.Commit, a ) == -1: - print myTrans.getNdbError().getMessage() - #foo=myRecAttr.u_32_value() - - -myNdb.sendPollNdb(3000,1) - - - + +before_t = time.time() + +for t in range(0,int(math.ceil((int(INSERT_NUM)*1.0)/(1.0*BATCH_SIZE)))): + + myTransaction= myNdb.startTransaction() + val = ((t+1)*BATCH_SIZE)-INSERT_NUM + offset = 0 + if ( val > 0 ): + offset = val + + for i in range(0,BATCH_SIZE-offset): + + myOperation=myTransaction.getNdbOperation("mytablename") + myOperation.insertTuple() + auto_id = myNdb.getAutoIncrementValue("mytablename",BATCH_SIZE) + + myOperation.equal("ATTR1",auto_id); + myOperation.setValue("ATTR2", i); + + ret = myTransaction.execute( ndbapi.Commit ) + if ret == -1: + print myTransaction.getNdbError().getMessage() + + #myTransaction.close() + +after_t = time.time() +print "Insert time for %s: %s -- %s" % ( INSERT_NUM, after_t, before_t) +print "\t %s" % (after_t - before_t) + + +### Get list of ids +myTransaction = myNdb.startTransaction() + +myScanOperation=myTransaction.getNdbScanOperation("mytablename"); + + +if myScanOperation is None: + print myTransaction.getNdbError().getMessage() + + +if myScanOperation.readTuples(ndbapi.NdbOperation.LM_CommittedRead): + print myScanOperation.getNdbError().getMessage() + + + +myRecAttr=myScanOperation.getValue("ATTR1"); + + +myTransaction.execute(ndbapi.NoCommit); + +ids=[] +while 1: + + if (myScanOperation.nextResult(True) != 0) : + break + + random_id = myRecAttr.u_32_value() + ids.append(random_id) + + +#myTransaction.close() + + +### Fetch ids using ndb + +before_t = time.time() + + +for f in range(0,int(num_iter)): + + id_num = ids[random.randrange(0,len(ids))] + myTrans = myNdb.startTransaction("mytablename","%s"%id_num) + if myTrans is None: + print myNdb.getNdbError().getMessage() + + myOper = myTrans.getNdbOperation("mytablename") + myOper.readTuple(ndbapi.NdbOperation.LM_Read) + + myOper.equal("ATTR1",id_num) + + myRecAttr= myOper.getValue("ATTR2") + + cb = PythonCallback(myRecAttr) + + if myTrans.executeAsynchPrepare( ndbapi.Commit , cb ) == -1: + print myTrans.getNdbError().getMessage() + myNdb.sendPollNdb(3000,1) + myTrans.close() + +after_t = time.time() +print "NDBAPI time for %s: %s -- %s" % ( num_iter, after_t, before_t) +print "\t %s" % (after_t - before_t) + + + +cur=db.cursor() + +before_t = time.time() + +for f in range(0,int(num_iter)): + + id_num = ids[random.randrange(0,len(ids))] + + cur.execute("select ATTR2 from mytablename where ATTR1=%s" % (id_num)) + res=cur.fetchall() + +after_t = time.time() + +after_t = time.time() +print "MySQL time for %s: %s -- %s" % ( num_iter, after_t, before_t) +print "\t %s" % (after_t - before_t) + +#ndbapi.ndb_end() === modified file 'swig/NdbClusterConnection.i' --- a/swig/NdbClusterConnection.i 2007-05-04 20:45:38 +0000 +++ b/swig/NdbClusterConnection.i 2007-05-05 06:47:00 +0000 @@ -23,10 +23,12 @@ // NdbFactory.getNdbClusterConnection should be used instead Ndb_cluster_connection(const char * connectstring = 0); + +public: + + ~Ndb_cluster_connection(); -public: - void set_name(const char* name); #if defined(SWIGJAVA) @@ -38,25 +40,25 @@ if (result) { const char * msg = "Connect to management server failed"; NDB_exception(NdbApiException,msg); - return -1; + goto fail; } } %typemap(check) int no_retries { if ($1 < 0) { NDB_exception(NdbClusterConnectionPermanentException,"Retries must be greater than or equal to zero."); - return -1; + goto fail; } } %typemap(check) int retry_delay_in_seconds { if ($1 < 0) { NDB_exception(NdbClusterConnectionPermanentException,"Delay must be greater than or equal to zero."); - return -1; + goto fail; } } %typemap(check) int verbose { if ($1 < 0 || $1 > 1) { NDB_exception(NdbClusterConnectionPermanentException,"Verbose must be either zero or one."); - return -1; + goto fail; } } int connect(int no_retries=0, int retry_delay_in_seconds=1, int verbose=0); === modified file 'swig/NdbTransaction.i' --- a/swig/NdbTransaction.i 2007-05-04 20:45:38 +0000 +++ b/swig/NdbTransaction.i 2007-05-05 06:47:00 +0000 @@ -18,7 +18,7 @@ */ %delobject NdbTransaction::close; -%newobject NdbTransaction::getNdbOperation; +//%newobject NdbTransaction::getNdbOperation; class NdbTransaction { ~NdbTransaction();