List:NDB Connectors« Previous MessageNext Message »
From:Monty Taylor Date:May 5 2007 6:47am
Subject:Rev 99: Fixed Python Async code. in http://bazaar.launchpad.net/~ndb-connectors/ndb-connectors/exceptions
View as plain text  
------------------------------------------------------------
revno: 99
revision-id: mtaylor@stripped
parent: mtaylor@stripped
committer: Monty Taylor <mtaylor@stripped>
branch nick: exceptions
timestamp: Fri 2007-05-04 23:47:00 -0700
message:
  Fixed Python Async code.
  Removed two memory leaks (thanks python debug mode)
modified:
  python/testasynch.py           testasynch.py-20070227185119-f8ow9m7i9zfgr5jr-1
  swig/NdbClusterConnection.i    ndb_cluster_connecti-20070228021421-qkr4cbpxymyqdrf3-6
  swig/NdbTransaction.i          ndbtransaction.i-20070227184716-ecjyhh3jgvmye4de-7
=== modified file 'python/testasynch.py'
--- a/python/testasynch.py	2007-04-03 04:06:47 +0000
+++ b/python/testasynch.py	2007-05-05 06:47:00 +0000
@@ -1,26 +1,65 @@
+import sys,time,random,struct,math
 from mysql.cluster import ndbapi
-import sys,time,random,struct,math
+import mysql.cluster.ndbapi
+
+class PythonCallback(object):
+
+  def __init__(self, recAttr):
+    self.recAttr=recAttr
+
+  def __call__(self, ret, myTrans):
+    #print "value = ", self.recAttr.get_value()
+    x=self.recAttr.get_value()
+
+import  MySQLdb
+
+if len(sys.argv) != 3:
+  print "Usage:\n\ttest.py NUM_OF_ITERATIONS NUM_OF_ROWS "
+  sys.exit(1)
+
+num_iter=int(sys.argv[1])
+INSERT_NUM=int(sys.argv[2])
+BATCH_SIZE=1000
+
+db = MySQLdb.connect(host="localhost",user="root",read_default_group="client")
+
+cur=db.cursor()
+
+print "Dropping and recreating schema\n"
+
+cur.execute("CREATE DATABASE if not exists test")
+cur.execute("USE test")
+
+cur.execute("drop table if exists mytablename")
+
+cur.execute("""
+CREATE TABLE if not exists
+  mytablename
+   (ATTR1 INT UNSIGNED not null auto_increment,
+    ATTR2 INT UNSIGNED NOT NULL,
+    PRIMARY KEY USING HASH (ATTR1))
+ENGINE=NDBCLUSTER 
+""")
+
 
 ### Connect to cluster
 
 print "connecting to cluster\n"
-
    
-connection = ndbapi.Ndb_cluster_connection()
-
 try:
+  connection = ndbapi.NdbFactory.createNdbClusterConnection();
+
   connection.connect(1,1,1)
-except RuntimeError, e:
+except mysql.cluster.ndbapi.NdbApiException,e:
   print e
   sys.exit(-1)
-    
-    
+
 if (connection.wait_until_ready(30,30)):
     print "Cluster was not ready within 30 secs."
     sys.exit(-1);
 
 
-myNdb = ndbapi.Ndb(connection,"test" )
+myNdb = connection.getNdb("test")
 
 if (myNdb.init(4) == -1):
     print myNdb.getNdbError().getMessage()
@@ -31,68 +70,117 @@
 
 ### Fill db
 
-def testasync(*args):
-  #for f in args:
-  #  print "arg: %s" % f
-  print "hello - I have no state, since I'm just a function!!"
-
-class testaclass(object):
-
-  def __init__(self, recAttr):
-    self.recAttr=recAttr
-
-  def __call__(self, ret, myTrans):
-    print "hello from testaclass"
-    print "value = ", self.recAttr.get_value()
-    #myNdb.closeTransaction(myTrans)
-
-
-### Fetch ids using ndb
-
-      
-myTrans = myNdb.startTransaction()
-if myTrans is None:
-  print myNdb.getNdbError().getMessage()
-
-myOper = myTrans.getNdbOperation("mytablename")
-myOper.readTuple(ndbapi.NdbOperation.LM_Read)
-
-myOper.equal("ATTR1", 245755 )
-
-myRecAttr= myOper.getValue("ATTR2")
-
-a = testaclass(myRecAttr)
-
-if myTrans.executeAsynchPrepare( ndbapi.Commit, a ) == -1:
-  print myTrans.getNdbError().getMessage()
-  #foo=myRecAttr.u_32_value()
-
-### Fetch ids using ndb
-
-      
-myTrans = myNdb.startTransaction()
-if myTrans is None:
-  print myNdb.getNdbError().getMessage()
-
-myOper = myTrans.getNdbOperation("mytablename")
-myOper.readTuple(ndbapi.NdbOperation.LM_Read)
-
-myOper.equal("ATTR1", 84 )
-
-myRecAttr= myOper.getValue("ATTR2")
-
-callbacks=[]
-
-for x in range(0,30):
-  a = testaclass(myRecAttr)
-  callbacks.append(a)
-
-  if myTrans.executeAsynchPrepare( ndbapi.Commit, a ) == -1:
-    print myTrans.getNdbError().getMessage()
-    #foo=myRecAttr.u_32_value()
-
-      
-myNdb.sendPollNdb(3000,1)
-      
-
-
+
+before_t = time.time()
+
+for t in range(0,int(math.ceil((int(INSERT_NUM)*1.0)/(1.0*BATCH_SIZE)))):
+  
+  myTransaction= myNdb.startTransaction()
+  val = ((t+1)*BATCH_SIZE)-INSERT_NUM
+  offset = 0
+  if ( val > 0 ):
+    offset = val
+
+  for i in range(0,BATCH_SIZE-offset):
+    
+    myOperation=myTransaction.getNdbOperation("mytablename")
+    myOperation.insertTuple()
+    auto_id = myNdb.getAutoIncrementValue("mytablename",BATCH_SIZE)
+   
+    myOperation.equal("ATTR1",auto_id);
+    myOperation.setValue("ATTR2", i);
+      
+  ret = myTransaction.execute( ndbapi.Commit )
+  if ret == -1:
+    print myTransaction.getNdbError().getMessage()
+  
+  #myTransaction.close()
+
+after_t = time.time()
+print "Insert time for %s: %s -- %s" % ( INSERT_NUM, after_t, before_t)
+print "\t %s" % (after_t - before_t)
+
+
+### Get list of ids
+myTransaction = myNdb.startTransaction()
+
+myScanOperation=myTransaction.getNdbScanOperation("mytablename");
+
+
+if myScanOperation is None:
+    print myTransaction.getNdbError().getMessage()
+
+
+if myScanOperation.readTuples(ndbapi.NdbOperation.LM_CommittedRead):
+    print myScanOperation.getNdbError().getMessage()
+
+
+    
+myRecAttr=myScanOperation.getValue("ATTR1");
+    
+    
+myTransaction.execute(ndbapi.NoCommit);
+
+ids=[]
+while 1:
+      
+    if (myScanOperation.nextResult(True) != 0) :
+        break
+      
+    random_id = myRecAttr.u_32_value()
+    ids.append(random_id)
+      
+    
+#myTransaction.close()
+
+
+### Fetch ids using ndb
+
+before_t = time.time()
+
+
+for f in range(0,int(num_iter)):
+
+    id_num = ids[random.randrange(0,len(ids))]
+    myTrans = myNdb.startTransaction("mytablename","%s"%id_num)
+    if myTrans is None:
+        print myNdb.getNdbError().getMessage()
+
+    myOper = myTrans.getNdbOperation("mytablename")
+    myOper.readTuple(ndbapi.NdbOperation.LM_Read)
+
+    myOper.equal("ATTR1",id_num)
+            
+    myRecAttr= myOper.getValue("ATTR2")
+
+    cb =  PythonCallback(myRecAttr)
+    
+    if myTrans.executeAsynchPrepare( ndbapi.Commit , cb ) == -1:
+        print myTrans.getNdbError().getMessage()
+    myNdb.sendPollNdb(3000,1)
+    myTrans.close()
+
+after_t = time.time()
+print "NDBAPI time for %s: %s -- %s" % ( num_iter, after_t, before_t)
+print "\t %s" % (after_t - before_t)
+
+
+
+cur=db.cursor()
+
+before_t = time.time()
+
+for f in range(0,int(num_iter)):
+
+    id_num = ids[random.randrange(0,len(ids))]
+
+    cur.execute("select ATTR2 from mytablename where ATTR1=%s" % (id_num))
+    res=cur.fetchall()
+
+after_t = time.time()
+
+after_t = time.time()
+print "MySQL time for %s: %s -- %s" % ( num_iter, after_t, before_t)
+print "\t %s" % (after_t - before_t)
+
+#ndbapi.ndb_end()

=== modified file 'swig/NdbClusterConnection.i'
--- a/swig/NdbClusterConnection.i	2007-05-04 20:45:38 +0000
+++ b/swig/NdbClusterConnection.i	2007-05-05 06:47:00 +0000
@@ -23,10 +23,12 @@
 
   // NdbFactory.getNdbClusterConnection should be used instead
   Ndb_cluster_connection(const char * connectstring = 0);
+
+public:
+
+
   ~Ndb_cluster_connection();
 
-public:
-
   void set_name(const char* name); 
 
 #if defined(SWIGJAVA)
@@ -38,25 +40,25 @@
       if (result) { 
 	const char * msg = "Connect to management server failed"; 
 	NDB_exception(NdbApiException,msg);
-        return -1;
+        goto fail;
       }
   }
   %typemap(check) int no_retries {
     if ($1 < 0) { 
       NDB_exception(NdbClusterConnectionPermanentException,"Retries must be greater than
or equal to zero.");
-      return -1;
+      goto fail;
     }
   }
   %typemap(check) int retry_delay_in_seconds {
     if ($1 < 0) { 
       NDB_exception(NdbClusterConnectionPermanentException,"Delay must be greater than or
equal to zero.");
-      return -1;
+      goto fail;
     }
   }
   %typemap(check) int verbose {
     if ($1 < 0 || $1 > 1) { 
       NDB_exception(NdbClusterConnectionPermanentException,"Verbose must be either zero
or one.");
-      return -1;
+      goto fail;
     }
   }
   int connect(int no_retries=0, int retry_delay_in_seconds=1, int verbose=0);

=== modified file 'swig/NdbTransaction.i'
--- a/swig/NdbTransaction.i	2007-05-04 20:45:38 +0000
+++ b/swig/NdbTransaction.i	2007-05-05 06:47:00 +0000
@@ -18,7 +18,7 @@
 */
 
 %delobject NdbTransaction::close;
-%newobject NdbTransaction::getNdbOperation;
+//%newobject NdbTransaction::getNdbOperation;
 
 class NdbTransaction {
   ~NdbTransaction();

Thread
Rev 99: Fixed Python Async code. in http://bazaar.launchpad.net/~ndb-connectors/ndb-connectors/exceptionsMonty Taylor5 May