MySQL Lists are EOL. Please join:

List:Commits« Previous MessageNext Message »
From:rburnett Date:August 8 2006 6:41am
Subject:Connector/NET commit: r299 - in trunk: TestSuite mysqlclient mysqlclient/common
View as plain text  
Added:
   trunk/mysqlclient/common/SocketStream.cs
Modified:
   trunk/TestSuite/AsyncTests.cs
   trunk/mysqlclient/command.cs
Log:
Finished the implemenation of Async queries.
Added ExecuteNonQuery that takes a commandbehavior and
added ExecuteReader methods.

Also, added SocketStream back in.

Modified: trunk/TestSuite/AsyncTests.cs
===================================================================
--- trunk/TestSuite/AsyncTests.cs	2006-08-07 03:06:43 UTC (rev 298)
+++ trunk/TestSuite/AsyncTests.cs	2006-08-08 06:41:36 UTC (rev 299)
@@ -43,17 +43,81 @@
         [Test]
         public void ExecuteNonQuery()
         {
+            execSQL("DROP TABLE IF EXISTS test");
+            execSQL("CREATE TABLE test (id int)");
+            execSQL("CREATE PROCEDURE spTest() BEGIN SET @x=0; REPEAT INSERT INTO test VALUES(@x); " +
+                "SET @x=@x+1; UNTIL @x = 300 END REPEAT; END");
 
+            try
+            {
+                MySqlCommand proc = new MySqlCommand("spTest", conn);
+                proc.CommandType = CommandType.StoredProcedure;
+                IAsyncResult iar = proc.BeginExecuteNonQuery();
+                int count = 0;
+                while (!iar.IsCompleted)
+                {
+                    count++;
+                    System.Threading.Thread.Sleep(20);
+                }
+                int updated = proc.EndExecuteNonQuery(iar);
+                Assert.IsTrue(count > 0);
+
+                proc.CommandType = CommandType.Text;
+                proc.CommandText = "SELECT COUNT(*) FROM test";
+                object cnt = proc.ExecuteScalar();
+                Assert.AreEqual(300, cnt);
+            }
+            catch (Exception ex)
+            {
+                Assert.Fail(ex.Message);
+            }
+            finally
+            {
+            }
         }
 
         [Test]
         public void ExecuteReader()
         {
-        }
+            execSQL("DROP TABLE IF EXISTS test");
+            execSQL("CREATE TABLE test (id int)");
+            execSQL("CREATE PROCEDURE spTest() BEGIN SET @x=0; REPEAT INSERT INTO test VALUES(@x); " +
+                "SET @x=@x+1; UNTIL @x = 300 END REPEAT; SELECT 'done'; END");
 
-        [Test]
-        public void ExecuteScalar()
-        {
+            MySqlDataReader reader = null;
+            try
+            {
+                MySqlCommand proc = new MySqlCommand("spTest", conn);
+                proc.CommandType = CommandType.StoredProcedure;
+                IAsyncResult iar = proc.BeginExecuteReader();
+                int count = 0;
+                while (!iar.IsCompleted)
+                {
+                    count++;
+                    System.Threading.Thread.Sleep(20);
+                }
+
+                reader = proc.EndExecuteReader(iar);
+                Assert.IsNotNull(reader);
+                Assert.IsTrue(count > 0);
+                Assert.IsTrue(reader.Read());
+                Assert.AreEqual("done", reader.GetString(0));
+                reader.Close();
+
+                proc.CommandType = CommandType.Text;
+                proc.CommandText = "SELECT COUNT(*) FROM test";
+                object cnt = proc.ExecuteScalar();
+                Assert.AreEqual(300, cnt);
+            }
+            catch (Exception ex)
+            {
+                Assert.Fail(ex.Message);
+            }
+            finally
+            {
+                if (reader != null)
+                    reader.Close();
+            }
         }
 	}
 }

Modified: trunk/mysqlclient/command.cs
===================================================================
--- trunk/mysqlclient/command.cs	2006-08-07 03:06:43 UTC (rev 298)
+++ trunk/mysqlclient/command.cs	2006-08-08 06:41:36 UTC (rev 299)
@@ -350,77 +350,6 @@
 				throw new MySqlException( "Stored procedures are not supported on this version of MySQL" );
 		}
 
-/*        internal bool ExecuteInternal()
-        {
-            if (statements.Count == 0)
-                return false;
-
-            Statement st = statements[0];
-            st.Execute(parameters);
-            statements.Remove(st);*/
-
-/*            if (preparedStatement != null)
-            {
-                if (!preparedStatement.ExecutionCount > 0)
-                    return false;
-                preparedStatement.Execute(parameters, cursorPageSize);
-            }
-            else 
-            {
-                if (sqlBuffers.Count == 0) return false;
-				MemoryStream sqlStream = (MemoryStream)sqlBuffers[0];
-
-				using (sqlStream) 
-				{
-					connection.driver.SendQuery(sqlStream.GetBuffer());
-                    sqlBuffers.RemoveAt(0);
-                }
-            }*/
-//            return true;
-  //      }
-
-/*        internal Statement GetNextResultset()
-        {
-            // execute the statement
-//            ulong affectedRows;
-  //          long fieldCount;
-
-            if (Driver.HasMoreResults(StatementId))
-                return true;
-
-            while (statements.Count > 0)
-            {
-                Statement statement = (Statement)statements[0];
-                // if this is a prepared statement and we have already executed it,
-                // then break out of the loop
-                if (statement is PreparedStatement &&
-                    (statement as PreparedStatement).ExecutionCount > 0) break;
-                statement.Execute(parameters);
-                if (statement.HasRows)
-                    return statement;
-                RecordsAffected += statement.RecordsAffected;
-            }
-            return null;*/
-/*
-            while (true)
-            {
-                bool hasResults = connection.driver.ReadResult(ref fieldCount, ref affectedRows, ref lastInsertedId);
-                if (hasResults)
-                {
-                    if (fieldCount > 0)
-                        return fieldCount;
-                    if (updatedRowCount == -1)
-                        updatedRowCount = 0;
-                    updatedRowCount += affectedRows;
-                }
-                else
-                {
-                    if (!ExecuteInternal())
-                        return 0;
-                }
-            }*/
-//        }
-
 		/// <include file='docs/mysqlcommand.xml' path='docs/ExecuteNonQuery/*'/>
 		public override int ExecuteNonQuery()
 		{
@@ -428,29 +357,8 @@
             MySqlDataReader reader = ExecuteReader();
             reader.Close();
             lastInsertedId = reader.InsertedId;
+            this.updatedRowCount = reader.RecordsAffected;
             return reader.RecordsAffected;
-/*			CheckState();
-
-//			updatedRowsCount = -1;
-
-//			if (preparedStatement == null)
-//				sqlBuffers = PrepareSqlBuffers(CommandText);
-
-            try
-            {
-//                ExecuteInternal();
-                MySqlDataReader reader = new MySqlDataReader(this, statement, CommandBehavior.Default);
-                reader.NextResult();
-                reader.Close();
-            }
-            catch (MySqlException ex)
-            {
-                //TODO: fix this
-                //connection.Abort();
-                throw;
-            }
-
-            return (int)this.updatedRowCount;*/
         }
 
         internal void Close()
@@ -546,7 +454,8 @@
 
 		#region Async Methods
 
-		internal delegate void AsyncExecuteNonQueryDelegate();
+		internal delegate int AsyncExecuteNonQueryDelegate();
+        internal delegate MySqlDataReader AsyncExecuteReaderDelegate(CommandBehavior behavior);
 
 		private string TrimSemicolons(string sql)
 		{
@@ -560,23 +469,44 @@
 				end--;
 			return sb.ToString(start, end-start+1);
 		}
-		private void AsyncExecuteNonQuery() 
-		{
-			ExecuteNonQuery();
-		}
 
+        public IAsyncResult BeginExecuteReader()
+        {
+            return BeginExecuteReader(CommandBehavior.Default);
+        }
+
+        public IAsyncResult BeginExecuteReader(CommandBehavior behavior)
+        {
+            AsyncExecuteReaderDelegate del = new AsyncExecuteReaderDelegate(ExecuteReader);
+            asyncResult = del.BeginInvoke(behavior, null, null);
+            return asyncResult;
+        }
+
+        public MySqlDataReader EndExecuteReader(IAsyncResult result)
+        {
+            result.AsyncWaitHandle.WaitOne();
+            return connection.Reader;
+        }
+
+        public IAsyncResult BeginExecuteNonQuery(AsyncCallback callback, object stateObject)
+        {
+            AsyncExecuteNonQueryDelegate del =
+                new AsyncExecuteNonQueryDelegate(ExecuteNonQuery);
+            asyncResult = del.BeginInvoke(callback, stateObject);
+            return asyncResult;
+        }
+
 		public IAsyncResult BeginExecuteNonQuery() 
 		{
 			AsyncExecuteNonQueryDelegate del = 
-				new AsyncExecuteNonQueryDelegate(AsyncExecuteNonQuery);
+				new AsyncExecuteNonQueryDelegate(ExecuteNonQuery);
 			asyncResult = del.BeginInvoke(null, null);
 			return asyncResult;
 		}
 
 		public int EndExecuteNonQuery(IAsyncResult result)
 		{
-			while (! result.IsCompleted)
-				System.Threading.Thread.Sleep(100);
+            result.AsyncWaitHandle.WaitOne();
 			return (int)updatedRowCount;
 		}
 

Added: trunk/mysqlclient/common/SocketStream.cs
===================================================================
--- trunk/mysqlclient/common/SocketStream.cs	2006-08-07 03:06:43 UTC (rev 298)
+++ trunk/mysqlclient/common/SocketStream.cs	2006-08-08 06:41:36 UTC (rev 299)
@@ -0,0 +1,118 @@
+using System;
+using System.IO;
+using System.Net;
+using System.Net.Sockets;
+using System.Collections;
+
+namespace MySql.Data.Common
+{
+	/// <summary>
+	/// Summary description for MySqlSocket.
+	/// </summary>
+	internal sealed class SocketStream : Stream
+	{
+		private Socket	socket;
+
+		public SocketStream(AddressFamily addressFamily, SocketType socketType, ProtocolType protocol)
+			: base()
+		{
+			socket = new Socket(addressFamily, socketType, protocol);
+		}
+
+		#region Properties
+
+		public Socket Socket 
+		{
+			get { return socket; }
+		}
+
+		public override bool CanRead
+		{
+			get	{ return true;	}
+		}
+
+		public override bool CanSeek
+		{
+			get	{ return false;	}
+		}
+
+		public override bool CanWrite
+		{
+			get	{ return true; }
+		}
+
+		public override long Length
+		{
+			get	{ return 0;	}
+		}
+
+		public override long Position
+		{
+			get	{ return 0;	}
+			set	{ throw new NotSupportedException("SocketStream does not support seek"); }
+		}
+
+		#endregion
+
+		#region Stream Implementation
+
+		public override void Flush()
+		{
+		}
+
+		public override int Read(byte[] buffer, int offset, int count)
+		{
+			return socket.Receive(buffer, offset, count, SocketFlags.None);
+		}
+
+		public override long Seek(long offset, SeekOrigin origin)
+		{
+			throw new NotSupportedException("SocketStream does not support seek");
+		}
+
+		public override void SetLength(long value)
+		{
+		}
+
+		public override void Write(byte[] buffer, int offset, int count)
+		{
+			socket.Send(buffer, offset, count, SocketFlags.None);
+		}
+
+
+		#endregion
+
+		public bool Connect(EndPoint remoteEP, int timeout)
+		{
+            // set the socket to non blocking
+            socket.Blocking = false;
+
+			// then we star the connect
+			SocketAddress addr = remoteEP.Serialize();
+			byte[] buff = new byte[addr.Size];
+			for (int i=0; i<addr.Size; i++)
+				buff[i] = addr[i];
+
+			int result = NativeMethods.connect(socket.Handle, buff, addr.Size);
+			int wsaerror = NativeMethods.WSAGetLastError();
+			if (wsaerror != 10035)
+				throw new Exception("Error creating MySQLSocket");
+
+			// next we wait for our connect timeout or until the socket is connected
+			ArrayList write = new ArrayList();
+			write.Add(socket);
+			ArrayList error = new ArrayList();
+			error.Add(socket);
+
+			Socket.Select(null, write, error, timeout*1000*1000);
+
+			if (write.Count == 0) return false;
+
+			// set socket back to blocking mode
+            socket.Blocking = true;
+			return true;
+		}
+
+
+	}
+}

Thread
Connector/NET commit: r299 - in trunk: TestSuite mysqlclient mysqlclient/commonrburnett8 Aug