List:Commits« Previous MessageNext Message »
From:Vladislav Vaintroub Date:August 16 2010 2:55pm
Subject:bzr commit into connector-net-trunk branch (vvaintroub:922)
View as plain text  
#At file:///H:/connector_net/trunk-asyncio/ based on revid:vvaintroub@stripped

  922 Vladislav Vaintroub	2010-08-16
      Patch to use asynchronous IO in BeginExecuteReader()

    modified:
      MySql.Data/Provider/Properties/Resources.Designer.cs
      MySql.Data/Provider/Properties/Resources.resx
      MySql.Data/Provider/Source/CompressedStream.cs
      MySql.Data/Provider/Source/Driver.cs
      MySql.Data/Provider/Source/MySqlStream.cs
      MySql.Data/Provider/Source/MySqlTrace.cs
      MySql.Data/Provider/Source/NativeDriver.cs
      MySql.Data/Provider/Source/TimedStream.cs
      MySql.Data/Provider/Source/TracingDriver.cs
      MySql.Data/Provider/Source/command.cs
      MySql.Data/Provider/Source/common/NamedPipeStream.cs
      MySql.Data/Provider/Source/common/SharedMemoryStream.cs
      MySql.Data/Provider/Source/datareader.cs
=== modified file 'MySql.Data/Provider/Properties/Resources.Designer.cs'
--- a/MySql.Data/Provider/Properties/Resources.Designer.cs	2010-04-27 18:31:24 +0000
+++ b/MySql.Data/Provider/Properties/Resources.Designer.cs	2010-08-16 14:55:33 +0000
@@ -938,6 +938,15 @@ namespace MySql.Data.MySqlClient.Propert
         }
         
         /// <summary>
+        ///   Looks up a localized string similar to {0}: Asynchronous Resultset open.
+        /// </summary>
+        public static string TraceResultAsyncOpen {
+            get {
+                return ResourceManager.GetString("TraceResultAsyncOpen", resourceCulture);
+            }
+        }
+        
+        /// <summary>
         ///   Looks up a localized string similar to {0}: Resultset Closed. Total rows={1}, skipped rows={2}, size (bytes)={3}.
         /// </summary>
         public static string TraceResultClosed {

=== modified file 'MySql.Data/Provider/Properties/Resources.resx'
--- a/MySql.Data/Provider/Properties/Resources.resx	2010-03-11 21:04:56 +0000
+++ b/MySql.Data/Provider/Properties/Resources.resx	2010-08-16 14:55:33 +0000
@@ -453,10 +453,14 @@
   <data name="UnableToEnableQueryAnalysis" xml:space="preserve">
     <value>Unable to enable query analysis.  Be sure the MySql.Data.EMTrace assembly is properly located and registered.</value>
   </data>
+  <assembly alias="System.Windows.Forms" name="System.Windows.Forms, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" />
   <data name="keywords" type="System.Resources.ResXFileRef, System.Windows.Forms">
     <value>keywords.txt;System.String, mscorlib, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089;Windows-1252</value>
   </data>
   <data name="TraceQueryNormalized" xml:space="preserve">
     <value>{0}: Query Normalized: {2}</value>
   </data>
+  <data name="TraceResultAsyncOpen" xml:space="preserve">
+    <value>{0}: Asynchronous Resultset open</value>
+  </data>
 </root>
\ No newline at end of file

=== modified file 'MySql.Data/Provider/Source/CompressedStream.cs'
--- a/MySql.Data/Provider/Source/CompressedStream.cs	2010-08-12 15:25:45 +0000
+++ b/MySql.Data/Provider/Source/CompressedStream.cs	2010-08-16 14:55:33 +0000
@@ -23,6 +23,7 @@ using System.IO;
 using zlib;
 using MySql.Data.MySqlClient.Properties;
 using MySql.Data.Common;
+using System.Diagnostics;
 
 namespace MySql.Data.MySqlClient
 {
@@ -140,20 +141,75 @@ namespace MySql.Data.MySqlClient
             }
         }
 
-        public override int Read(byte[] buffer, int offset, int count)
+        internal class CompressedReadAsyncResult:IAsyncResult
         {
-            if (buffer == null)
-                throw new ArgumentNullException("buffer", Resources.BufferCannotBeNull);
-            if (offset < 0 || offset >= buffer.Length)
-                throw new ArgumentOutOfRangeException("offset", Resources.OffsetMustBeValid);
-            if ((offset + count) > buffer.Length)
-                throw new ArgumentException(Resources.BufferNotLargeEnough, "buffer");
+            public byte[] buffer;
+            public int offset;
+            public int count;
+            public IAsyncResult asyncResult;
+
+            #region IAsyncResult Members
+
+            public object AsyncState
+            {
+                get { return asyncResult.AsyncState; }
+            }
+
+            public System.Threading.WaitHandle AsyncWaitHandle
+            {
+                get { return asyncResult.AsyncWaitHandle;  }
+            }
+
+            public bool CompletedSynchronously
+            {
+                get { return asyncResult.CompletedSynchronously; }
+            }
+
+            public bool IsCompleted
+            {
+                get { return asyncResult.IsCompleted; }
+            }
+
+            public CompressedReadAsyncResult(byte[] buffer, int offset, int count)
+            {
+                this.buffer = buffer;
+                this.offset = offset;
+                this.count = count;
+            }
+
+            #endregion
+        }
+
+        public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
+        {
+            if (inPos == maxInPos)
+            {
+                CompressedReadAsyncResult ar = new CompressedReadAsyncResult(buffer, offset, count);
+                ar.asyncResult = baseStream.BeginRead(lengthBytes, 0, 7, callback, state);
+                return ar;
+            }
+            return base.BeginRead(buffer, offset, count, callback, state);
+        }
+
+        public override int EndRead(IAsyncResult ar)
+        {
+            if (ar is CompressedReadAsyncResult)
+            {
+                CompressedReadAsyncResult myar = (CompressedReadAsyncResult)ar;
+                int bytesRead = baseStream.EndRead(myar.asyncResult);
+                return EndRead(myar.buffer, myar.offset, myar.count, bytesRead);
+            }
+            return base.EndRead(ar);
+        }
+
+        int EndRead(byte[] buffer, int offset, int count, int bytesRead)
+        {
+            int countRead;
 
             if (inPos == maxInPos)
-                PrepareNextPacket();
+                PrepareNextPacket(bytesRead);
 
             int countToRead = Math.Min(count, maxInPos - inPos);
-            int countRead;
             if (zInStream != null)
                 countRead = zInStream.read(buffer, offset, countToRead);
             else
@@ -174,9 +230,26 @@ namespace MySql.Data.MySqlClient
             return countRead;
         }
 
-        private void PrepareNextPacket()
+
+        public override int Read(byte[] buffer, int offset, int count)
         {
-            MySqlStream.ReadFully(baseStream, lengthBytes, 0, 7);
+            if (buffer == null)
+                throw new ArgumentNullException("buffer", Resources.BufferCannotBeNull);
+            if (offset < 0 || offset >= buffer.Length)
+                throw new ArgumentOutOfRangeException("offset", Resources.OffsetMustBeValid);
+            if ((offset + count) > buffer.Length)
+                throw new ArgumentException(Resources.BufferNotLargeEnough, "buffer");
+            return EndRead(buffer, offset, count, 0);
+        }
+
+
+        private void PrepareNextPacket(int offset)
+        {
+            Debug.Assert(offset <= 7);
+            if (offset < 7)
+            {
+                MySqlStream.ReadFully(baseStream, lengthBytes, offset, 7-offset);
+            }
             int compressedLength = lengthBytes[0] + (lengthBytes[1] << 8) + (lengthBytes[2] << 16);
             // lengthBytes[3] is seq
             int unCompressedLength = lengthBytes[4] + (lengthBytes[5] << 8) +

=== modified file 'MySql.Data/Provider/Source/Driver.cs'
--- a/MySql.Data/Provider/Source/Driver.cs	2010-06-10 19:27:42 +0000
+++ b/MySql.Data/Provider/Source/Driver.cs	2010-08-16 14:55:33 +0000
@@ -343,8 +343,29 @@ namespace MySql.Data.MySqlClient
                 return null;
             firstResult = false;
 
-            int affectedRows = -1, insertedId = -1, warnings = 0;
-            int fieldCount = GetResult(statementId, ref affectedRows, ref insertedId);
+            return InternalEndNextResult(statementId, null);
+        }
+
+        public virtual IAsyncResult BeginNextResult(int statementId, object state, AsyncCallback callback)
+        {
+            if (!firstResult && !HasStatus(ServerStatusFlags.AnotherQuery | ServerStatusFlags.MoreResults))
+                return null;
+            firstResult = false;
+            return BeginGetResult(statementId, state, callback);
+        }
+
+        private ResultSet InternalEndNextResult(int statementId, IAsyncResult asyncResult)
+        {
+            int affectedRows = -1, insertedId = -1;
+            int fieldCount;
+            if (asyncResult == null)
+            {
+                fieldCount = GetResult(statementId, ref affectedRows, ref insertedId);
+            }
+            else
+            {
+                fieldCount = EndGetResult(statementId, ref affectedRows, ref insertedId, asyncResult);
+            }
             if (fieldCount == -1)
                 return null;
             if (fieldCount > 0)
@@ -353,11 +374,26 @@ namespace MySql.Data.MySqlClient
                 return new ResultSet(affectedRows, insertedId);
         }
 
+        public virtual ResultSet EndNextResult(int statementId, IAsyncResult asyncResult)
+        {
+            return InternalEndNextResult(statementId, asyncResult);
+        }
         protected virtual int GetResult(int statementId, ref int affectedRows, ref int insertedId)
         {
             return handler.GetResult(ref affectedRows, ref insertedId);
         }
 
+        protected virtual IAsyncResult BeginGetResult(int statementId, object state, AsyncCallback callback)
+        {
+            return handler.BeginGetResult(state, callback);
+        }
+
+        protected virtual int EndGetResult(int statementId, ref int affectedRows, ref int insertedId,
+            IAsyncResult asyncResult)
+        {
+            return handler.EndGetResult(asyncResult, ref affectedRows, ref insertedId);
+        }
+
         public virtual bool FetchDataRow(int statementId, int columns)
         {
             return handler.FetchDataRow(statementId, columns);
@@ -484,6 +520,8 @@ namespace MySql.Data.MySqlClient
         void Close(bool isOpen);
         bool Ping();
         int GetResult(ref int affectedRows, ref int insertedId);
+        IAsyncResult  BeginGetResult(object state, AsyncCallback callback);
+        int EndGetResult(IAsyncResult asyncResult, ref int affectedRows, ref int insertedId);
         bool FetchDataRow(int statementId, int columns);
         int PrepareStatement(string sql, ref MySqlField[] parameters);
         void ExecuteStatement(MySqlPacket packet);

=== modified file 'MySql.Data/Provider/Source/MySqlStream.cs'
--- a/MySql.Data/Provider/Source/MySqlStream.cs	2009-10-07 20:30:34 +0000
+++ b/MySql.Data/Provider/Source/MySqlStream.cs	2010-08-16 14:55:33 +0000
@@ -114,35 +114,39 @@ namespace MySql.Data.MySqlClient
 
 		#region Packet methods
 
-		/// <summary>
-		/// ReadPacket is called by NativeDriver to start reading the next
-		/// packet on the stream.
-		/// </summary>
-		public MySqlPacket ReadPacket()
-		{
+        /// <summary>
+        /// ReadPacket is called by NativeDriver to start reading the next
+        /// packet on the stream.
+        /// </summary>
+        public MySqlPacket ReadPacket()
+        {
             //Debug.Assert(packet.Position == packet.Length);
 
             // make sure we have read all the data from the previous packet
-			//Debug.Assert(HasMoreData == false, "HasMoreData is true in OpenPacket");
+            //Debug.Assert(HasMoreData == false, "HasMoreData is true in OpenPacket");
 
-			LoadPacket();
+            LoadPacket();
+            CheckPacketError();
+            return packet;
+        }
 
+        void CheckPacketError()
+        {
             // now we check if this packet is a server error
-			if (packet.Buffer[0] == 0xff)
-			{
-				packet.ReadByte();  // read off the 0xff
+            if (packet.Buffer[0] == 0xff)
+            {
+                packet.ReadByte();  // read off the 0xff
 
-				int code = packet.ReadInteger(2);
-				string msg = packet.ReadString();
+                int code = packet.ReadInteger(2);
+                string msg = packet.ReadString();
                 if (msg.StartsWith("#"))
                 {
                     msg.Substring(1, 5);  /* state code */
                     msg = msg.Substring(6);
                 }
-				throw new MySqlException(msg, code);
-			}
-            return packet;
-		}
+                throw new MySqlException(msg, code);
+            }
+        }
 
          /// <summary>
          /// Reads the specified number of bytes from the stream and stores them at given 
@@ -169,18 +173,29 @@ namespace MySql.Data.MySqlClient
              }
          }
 
-		/// <summary>
-		/// LoadPacket loads up and decodes the header of the incoming packet.
-		/// </summary>
-		public void LoadPacket()
-		{
-			try
-			{
+
+         /// <summary>
+         /// LoadPacket loads up and decodes the header of the incoming packet.
+         /// </summary>
+         public void LoadPacket()
+         {
+             LoadPacket(0);
+         }
+
+         private void LoadPacket(int off)
+         {
+             Debug.Assert(off <= 4);
+             try
+             {
                 packet.Length = 0;
                 int offset = 0;
                 while (true)
                 {
-                    ReadFully(inStream, packetHeader, 0, 4);
+                    if (off < 4)
+                    {
+                        ReadFully(inStream, packetHeader, off, 4 - off);
+                    }
+                    off = 0;
                     sequenceByte = (byte)(packetHeader[3]+1);
                     int length = (int)(packetHeader[0] + (packetHeader[1] << 8) + 
                         (packetHeader[2] << 16));
@@ -202,6 +217,21 @@ namespace MySql.Data.MySqlClient
 			}
 		}
 
+        public IAsyncResult BeginReadPacket(object state, AsyncCallback callback)
+        {
+            packet.Length = 0;
+            return inStream.BeginRead(packetHeader, 0, 4, callback, state);
+        }
+
+        public MySqlPacket EndReadPacket(IAsyncResult asyncResult)
+        {
+            int bytesRead = inStream.EndRead(asyncResult);
+            Debug.Assert(bytesRead <= 4);
+            LoadPacket(bytesRead);
+            CheckPacketError();
+            return packet;
+        }
+
         public void SendPacket(MySqlPacket packet)
         {
             byte[] buffer = packet.Buffer;

=== modified file 'MySql.Data/Provider/Source/MySqlTrace.cs'
--- a/MySql.Data/Provider/Source/MySqlTrace.cs	2010-03-08 21:23:04 +0000
+++ b/MySql.Data/Provider/Source/MySqlTrace.cs	2010-08-16 14:55:33 +0000
@@ -141,7 +141,8 @@ namespace MySql.Data.MySqlClient
         UsageAdvisorWarning,
         Warning,
         Error,
-        QueryNormalized
+        QueryNormalized,
+        ResultAsyncOpened
     }
 
     public enum UsageAdvisorWarningFlags

=== modified file 'MySql.Data/Provider/Source/NativeDriver.cs'
--- a/MySql.Data/Provider/Source/NativeDriver.cs	2010-07-01 22:51:18 +0000
+++ b/MySql.Data/Provider/Source/NativeDriver.cs	2010-08-16 14:55:33 +0000
@@ -31,7 +31,6 @@ using System.Text;
 using System.Net.Security;
 using System.Security.Authentication;
 using System.Globalization;
-using System.Text;
 #endif
 
 namespace MySql.Data.MySqlClient
@@ -547,11 +546,27 @@ namespace MySql.Data.MySqlClient
             }
         }
 
+        public IAsyncResult BeginGetResult(object state, AsyncCallback callback)
+        {
+            return stream.BeginReadPacket(state, callback);
+        }
+
+        public int EndGetResult(IAsyncResult asyncResult, ref int affectedRow, ref int insertedId)
+        {
+            return GetResultInternal(ref affectedRow, ref insertedId, asyncResult);
+        }
+
         public int GetResult(ref int affectedRow, ref int insertedId)
         {
+           return GetResultInternal(ref affectedRow, ref insertedId, null);
+        }
+
+        private int GetResultInternal(ref int affectedRow, ref int insertedId, 
+            IAsyncResult asyncResult)
+        {
             try
             {
-                packet = stream.ReadPacket();
+                packet = (asyncResult == null)?  stream.ReadPacket() : stream.EndReadPacket(asyncResult);
             }
             catch (TimeoutException)
             {

=== modified file 'MySql.Data/Provider/Source/TimedStream.cs'
--- a/MySql.Data/Provider/Source/TimedStream.cs	2010-01-15 19:09:26 +0000
+++ b/MySql.Data/Provider/Source/TimedStream.cs	2010-08-16 14:55:33 +0000
@@ -42,6 +42,7 @@ namespace MySql.Data.MySqlClient
         int lastWriteTimeout;
         LowResolutionStopwatch stopwatch;
         bool isClosed;
+        bool hasPendingIO;
 
 
         enum IOKind
@@ -66,6 +67,13 @@ namespace MySql.Data.MySqlClient
             stopwatch = new LowResolutionStopwatch();
         }
 
+        private void CheckAsyncIO()
+        {
+            if(hasPendingIO)
+            {
+                throw new InvalidOperationException("Asyncronous IO in progress");
+            }
+        }
 
         /// <summary>
         /// Figure out whether it is necessary to reset timeout on stream.
@@ -193,6 +201,7 @@ namespace MySql.Data.MySqlClient
 
         public override int Read(byte[] buffer, int offset, int count)
         {
+            CheckAsyncIO();
             try
             {
                 StartTimer(IOKind.Read);
@@ -209,6 +218,7 @@ namespace MySql.Data.MySqlClient
 
         public override int ReadByte()
         {
+            CheckAsyncIO();
             try
             {
                 StartTimer(IOKind.Read);
@@ -235,6 +245,7 @@ namespace MySql.Data.MySqlClient
 
         public override void Write(byte[] buffer, int offset, int count)
         {
+            CheckAsyncIO();
             try
             {
                 StartTimer(IOKind.Write);
@@ -272,6 +283,22 @@ namespace MySql.Data.MySqlClient
             baseStream.Close();
         }
 
+        public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, 
+            AsyncCallback callback, object state)
+        {
+            CheckAsyncIO();
+         
+            IAsyncResult result =  baseStream.BeginRead(buffer, offset, count, callback, state);
+            hasPendingIO = true;
+            return result;
+        }
+
+        public override int EndRead(IAsyncResult asyncResult)
+        {
+            hasPendingIO = false;
+            return baseStream.EndRead(asyncResult);
+        }
+
         public void ResetTimeout(int newTimeout)
         {
             if (newTimeout == System.Threading.Timeout.Infinite || newTimeout == 0)
@@ -281,7 +308,6 @@ namespace MySql.Data.MySqlClient
             stopwatch.Reset();
         }
 
-
         /// <summary>
         /// Common handler for IO exceptions.
         /// Resets timeout to infinity if timeout exception is 

=== modified file 'MySql.Data/Provider/Source/TracingDriver.cs'
--- a/MySql.Data/Provider/Source/TracingDriver.cs	2010-03-05 20:19:45 +0000
+++ b/MySql.Data/Provider/Source/TracingDriver.cs	2010-08-16 14:55:33 +0000
@@ -97,6 +97,43 @@ namespace MySql.Data.MySqlClient
             }
         }
 
+        protected override IAsyncResult BeginGetResult(int statementId, object state, AsyncCallback callback)
+        {
+            try
+            {
+                IAsyncResult result = base.BeginGetResult(statementId, state, callback);
+                MySqlTrace.TraceEvent(TraceEventType.Information, MySqlTraceEventType.ResultAsyncOpened,
+                    Resources.TraceResultAsyncOpen, driverId);
+                return result;
+            }
+            catch (MySqlException ex)
+            {
+                // we got an error so we report it
+                MySqlTrace.TraceEvent(TraceEventType.Information, MySqlTraceEventType.Error,
+                    Resources.TraceOpenResultError, driverId, ex.Number, ex.Message);
+                throw ex;
+            }
+        }
+
+        protected override int EndGetResult(int statementId, ref int affectedRows, ref int insertedId, IAsyncResult asyncResult)
+        {
+            try
+            {
+                int fieldCount = base.EndGetResult(statementId, ref affectedRows, ref insertedId, asyncResult);
+                MySqlTrace.TraceEvent(TraceEventType.Information, MySqlTraceEventType.ResultOpened,
+                    Resources.TraceResult, driverId, fieldCount, affectedRows, insertedId);
+
+                return fieldCount;
+            }
+            catch (MySqlException ex)
+            {
+                // we got an error so we report it
+                MySqlTrace.TraceEvent(TraceEventType.Information, MySqlTraceEventType.Error,
+                    Resources.TraceOpenResultError, driverId, ex.Number, ex.Message);
+                throw ex;
+            }
+        }
+
         public override ResultSet NextResult(int statementId)
         {
             // first let's see if we already have a resultset on this statementId

=== modified file 'MySql.Data/Provider/Source/command.cs'
--- a/MySql.Data/Provider/Source/command.cs	2010-07-30 22:20:57 +0000
+++ b/MySql.Data/Provider/Source/command.cs	2010-08-16 14:55:33 +0000
@@ -51,7 +51,6 @@ namespace MySql.Data.MySqlClient
 		long updatedRowCount;
 		UpdateRowSource updatedRowSource;
 		MySqlParameterCollection parameters;
-		private IAsyncResult asyncResult;
 		private bool designTimeVisible;
 		internal Int64 lastInsertedId;
 		private PreparableStatement statement;
@@ -360,6 +359,12 @@ namespace MySql.Data.MySqlClient
         /// <include file='docs/mysqlcommand.xml' path='docs/ExecuteReader1/*'/>
         public new MySqlDataReader ExecuteReader (CommandBehavior behavior)
         {
+            return (MySqlDataReader)InternalExecuteReader(behavior, null, null, false);
+        }
+
+
+        private object InternalExecuteReader(CommandBehavior behavior, object state, AsyncCallback callback, bool async)
+        {
             bool success = false;
             CheckState();
             Driver driver = connection.driver;
@@ -431,10 +436,19 @@ namespace MySql.Data.MySqlClient
                 connection.Reader = reader;
                 // execute the statement
                 statement.Execute();
-                // wait for data to return
-                reader.NextResult();
-                success = true;
-                return reader;
+                if (!async)
+                {
+                    // wait for data to return
+                    reader.NextResult();
+                    success = true;
+                    return reader;
+                }
+                else
+                {
+                    IAsyncResult asyncResult = reader.BeginNextResult(state, callback);
+                    success = true;
+                    return asyncResult;
+                }
             }
             catch (TimeoutException tex)
             {
@@ -566,8 +580,6 @@ namespace MySql.Data.MySqlClient
 
 		#region Async Methods
 
-		internal delegate object AsyncDelegate(int type, CommandBehavior behavior);
-        internal AsyncDelegate caller = null;
 		internal Exception thrownException;
 
 		private static string TrimSemicolons(string sql)
@@ -625,12 +637,7 @@ namespace MySql.Data.MySqlClient
 		/// the returned rows. </returns>
 		public IAsyncResult BeginExecuteReader(CommandBehavior behavior)
 		{
-            if (caller != null)
-                throw new MySqlException(Resources.UnableToStartSecondAsyncOp);
-
-			caller = new AsyncDelegate(AsyncExecuteWrapper);
-			asyncResult = caller.BeginInvoke(1, behavior, null, null);
-			return asyncResult;
+            return (IAsyncResult)InternalExecuteReader(behavior, null, null, true);
 		}
 
 		/// <summary>
@@ -642,12 +649,13 @@ namespace MySql.Data.MySqlClient
 		/// <returns>A <b>MySqlDataReader</b> object that can be used to retrieve the requested rows. </returns>
 		public MySqlDataReader EndExecuteReader(IAsyncResult result)
 		{
-			result.AsyncWaitHandle.WaitOne();
-            AsyncDelegate c = caller;
-            caller = null;
-            if (thrownException != null)
-                throw thrownException;
-            return (MySqlDataReader)c.EndInvoke(result);
+            MySqlDataReader r = connection.Reader;
+            if (r != null)
+            {
+                r.EndNextResult(result);
+                return r;
+            }
+            return null;
 		}
 
 		/// <summary>
@@ -666,13 +674,7 @@ namespace MySql.Data.MySqlClient
 		/// which returns the number of affected rows. </returns>
 		public IAsyncResult BeginExecuteNonQuery(AsyncCallback callback, object stateObject)
 		{
-            if (caller != null)
-                throw new MySqlException(Resources.UnableToStartSecondAsyncOp);
-
-            caller = new AsyncDelegate(AsyncExecuteWrapper);
-			asyncResult = caller.BeginInvoke(2, CommandBehavior.Default, 
-				callback, stateObject);
-			return asyncResult;
+            return (IAsyncResult)InternalExecuteReader(CommandBehavior.Default, stateObject, null, true);
 		}
 
 		/// <summary>
@@ -684,12 +686,7 @@ namespace MySql.Data.MySqlClient
 		/// which returns the number of affected rows. </returns>
 		public IAsyncResult BeginExecuteNonQuery()
 		{
-            if (caller != null)
-                throw new MySqlException(Resources.UnableToStartSecondAsyncOp);
-
-            caller = new AsyncDelegate(AsyncExecuteWrapper);
-			asyncResult = caller.BeginInvoke(2, CommandBehavior.Default, null, null);
-			return asyncResult;
+            return BeginExecuteNonQuery(null, null);
 		}
 
 		/// <summary>
@@ -700,12 +697,14 @@ namespace MySql.Data.MySqlClient
 		/// <returns></returns>
 		public int EndExecuteNonQuery(IAsyncResult asyncResult)
 		{
-			asyncResult.AsyncWaitHandle.WaitOne();
-            AsyncDelegate c = caller;
-            caller = null;
-            if (thrownException != null)
-                throw thrownException;
-            return (int)c.EndInvoke(asyncResult);
+            if (connection.Reader != null)
+            {
+                MySqlDataReader r = connection.Reader;
+                r.EndNextResult(asyncResult);
+                r.Close();
+                return (int)r.affectedRows;
+            }
+            return -1;
 		}
 
 		#endregion

=== modified file 'MySql.Data/Provider/Source/common/NamedPipeStream.cs'
--- a/MySql.Data/Provider/Source/common/NamedPipeStream.cs	2009-10-13 03:07:05 +0000
+++ b/MySql.Data/Provider/Source/common/NamedPipeStream.cs	2010-08-16 14:55:33 +0000
@@ -141,7 +141,15 @@ namespace MySql.Data.Common
             return fileStream.EndRead(result);
         }
 
+        public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
+        {
+            return fileStream.BeginRead(buffer, offset, count, callback, state);
+        }
 
+        public override int EndRead(IAsyncResult asyncResult)
+        {
+            return fileStream.EndRead(asyncResult);
+        }
         public override void Write(byte[] buffer, int offset, int count)
         {
             if (writeTimeout == Timeout.Infinite)

=== modified file 'MySql.Data/Provider/Source/common/SharedMemoryStream.cs'
--- a/MySql.Data/Provider/Source/common/SharedMemoryStream.cs	2009-10-07 20:30:34 +0000
+++ b/MySql.Data/Provider/Source/common/SharedMemoryStream.cs	2010-08-16 14:55:33 +0000
@@ -94,10 +94,14 @@ namespace MySql.Data.Common
         private EventWaitHandle clientRead;
         private EventWaitHandle clientWrote;
         private EventWaitHandle connectionClosed;
+        private EventWaitHandle asyncReadCompleted;
         private SharedMemory data;
         private int bytesLeft;
         private int position;
         private int connectNumber;
+        private bool closed;
+        RegisteredWaitHandle closeWaitHandle;
+
 
         private const int BUFFERLENGTH = 16004;
 
@@ -121,27 +125,37 @@ namespace MySql.Data.Common
 
         public override void Close()
         {
+
             if (connectionClosed != null)
             {
-                bool isClosed = connectionClosed.WaitOne(0);
-                if (!isClosed)
-                {
-                    connectionClosed.Set();
-                    connectionClosed.Close();
-                }
-                connectionClosed = null;
-                EventWaitHandle[] handles = 
-                {serverRead, serverWrote, clientRead, clientWrote};
-
-                for(int i=0; i< handles.Length; i++)
-                {
-                    if(handles[i] != null)
-                        handles[i].Close();
-                }
-                if (data != null)
+                lock (connectionClosed)
                 {
-                    data.Dispose();
-                    data = null;
+                    if (closeWaitHandle != null)
+                    {
+                        closeWaitHandle.Unregister(connectionClosed);
+                        closeWaitHandle = null;
+                    }
+                    bool isClosed = connectionClosed.WaitOne(0);
+                    if (!isClosed)
+                    {
+                        connectionClosed.Set();
+                        connectionClosed.Close();
+                    }
+                    connectionClosed = null;
+                    EventWaitHandle[] handles = { serverRead, serverWrote, 
+                                                    clientRead, clientWrote, 
+                                                    asyncReadCompleted };
+
+                    for (int i = 0; i < handles.Length; i++)
+                    {
+                        if (handles[i] != null)
+                            handles[i].Close();
+                    }
+                    if (data != null)
+                    {
+                        data.Dispose();
+                        data = null;
+                    }
                 }
             }
         }
@@ -179,7 +193,25 @@ namespace MySql.Data.Common
             }
         }
 
+        private void ConnectionClosedCallback(object state, bool timedOut)
+        {
+            closed = true;
+            // synchronization required, since we do not want
+            // this callback which is executed in thread pool thread 
+            // to run in parallel with Close()
+            if (connectionClosed != null)
+            {
+                lock (connectionClosed)
+                {
+                    // wakeup Read(), if it is waiting
+                    serverWrote.Set();
+
+                    // wakeup Write(), if it is waiting
+                    serverRead.Set();
+                }
+            }
 
+        }
         private void SetupEvents()
         {
             string prefix = memoryName + "_" + connectNumber;
@@ -189,6 +221,8 @@ namespace MySql.Data.Common
             clientWrote = EventWaitHandle.OpenExisting(prefix + "_CLIENT_WROTE");
             clientRead  = EventWaitHandle.OpenExisting(prefix + "_CLIENT_READ");
             connectionClosed = EventWaitHandle.OpenExisting(prefix + "_CONNECTION_CLOSED");
+            ThreadPool.RegisterWaitForSingleObject(connectionClosed,
+                ConnectionClosedCallback, null, Timeout.Infinite, true);
 
             // tell the server we are ready
             serverRead.Set();
@@ -232,17 +266,16 @@ namespace MySql.Data.Common
         public override int Read(byte[] buffer, int offset, int count)
         {
             int timeLeft = readTimeout;
-            WaitHandle[] waitHandles = { serverWrote, connectionClosed };
             LowResolutionStopwatch stopwatch = new LowResolutionStopwatch();
             while (bytesLeft == 0)
             {
                 stopwatch.Start();
-                int index = WaitHandle.WaitAny(waitHandles, timeLeft);
+                bool ok = serverWrote.WaitOne(timeLeft);
                 stopwatch.Stop();
-                if (index == WaitHandle.WaitTimeout)
+                if (!ok)
                     throw new TimeoutException("Timeout when reading from shared memory");
 
-                if (waitHandles[index] == connectionClosed)
+                if (closed)
                     throw new MySqlException("Connection to server lost",true, null);
 
                 if (readTimeout != System.Threading.Timeout.Infinite)
@@ -269,6 +302,100 @@ namespace MySql.Data.Common
             return len;
         }
 
+
+        internal class AsyncSharedMemoryIO:IAsyncResult
+        {
+            public object state;
+            public byte[] buffer;
+            public int count;
+            public AsyncCallback callback;
+            public WaitHandle handle;
+            public bool completed;
+            public int bytesRead;
+            public RegisteredWaitHandle registeredWaitHandle;
+
+            internal AsyncSharedMemoryIO(
+                object state, byte[] buffer, int count, AsyncCallback callback,
+                WaitHandle handle)
+            {
+                this.state = state;
+                this.buffer = buffer;
+                this.count = count;
+                this.callback = callback;
+                this.handle = handle;
+            }
+
+            #region IAsyncResult Members
+
+            public object AsyncState
+            {
+                get { return state; }
+            }
+
+            public WaitHandle AsyncWaitHandle
+            {
+                get { return handle; }
+            }
+
+            public bool CompletedSynchronously
+            {
+                get { return false;  }
+            }
+
+            public bool IsCompleted
+            {
+                get { return completed; }
+            }
+
+            #endregion
+        }
+
+        private void ServerWroteCallback(object req, bool timedOut)
+        {
+            AsyncSharedMemoryIO request = (AsyncSharedMemoryIO)req;
+            request.registeredWaitHandle.Unregister(serverWrote);
+
+            // signal handle, it was reset (since it is autoreset handle)
+            serverWrote.Set();
+            // this read is non-blocking (serverWrote is set)
+            request.bytesRead = Read(request.buffer, 0, request.count);
+
+            if (request.callback != null)
+            {
+                request.callback(request);
+            }
+
+            asyncReadCompleted.Set();
+            request.completed = true;
+        }
+
+        public override IAsyncResult BeginRead(byte[] buffer, int offset, int count,
+            AsyncCallback callback, object state)
+        {
+            if (asyncReadCompleted == null)
+            {
+                asyncReadCompleted = new EventWaitHandle(false, EventResetMode.ManualReset);
+            }
+            asyncReadCompleted.Reset();
+
+            AsyncSharedMemoryIO req = new AsyncSharedMemoryIO(state, buffer, count, 
+                callback, serverWrote);
+            req.registeredWaitHandle = 
+                ThreadPool.RegisterWaitForSingleObject(serverWrote, ServerWroteCallback, 
+                state, Timeout.Infinite, true);
+            return req;
+        }
+
+        public override int EndRead(IAsyncResult asyncResult)
+        {
+            AsyncSharedMemoryIO io = (AsyncSharedMemoryIO)asyncResult;
+            if (!io.completed)
+            {
+                asyncReadCompleted.WaitOne();
+            }
+            return io.bytesRead;
+        }
+
         public override long Seek(long offset, SeekOrigin origin)
         {
             throw new NotSupportedException("SharedMemoryStream does not support seeking");
@@ -278,20 +405,20 @@ namespace MySql.Data.Common
         {
             int leftToDo = count;
             int buffPos = offset;
-            WaitHandle[] waitHandles = { serverRead, connectionClosed };
+            
             LowResolutionStopwatch stopwatch = new LowResolutionStopwatch();
             int timeLeft = writeTimeout;
 
             while (leftToDo > 0)
             {
                 stopwatch.Start();
-                int index = WaitHandle.WaitAny(waitHandles, timeLeft);
+                bool ok = serverRead.WaitOne(timeLeft);
                 stopwatch.Stop();
 
-                if (waitHandles[index] == connectionClosed)
+                if (closed)
                     throw new MySqlException("Connection to server lost",true, null);
 
-                if (index == WaitHandle.WaitTimeout)
+                if (!ok)
                     throw new TimeoutException("Timeout when reading from shared memory");
 
                 if (writeTimeout != System.Threading.Timeout.Infinite)

=== modified file 'MySql.Data/Provider/Source/datareader.cs'
--- a/MySql.Data/Provider/Source/datareader.cs	2010-07-30 22:20:57 +0000
+++ b/MySql.Data/Provider/Source/datareader.cs	2010-08-16 14:55:33 +0000
@@ -864,13 +864,40 @@ namespace MySql.Data.MySqlClient
 			return DBNull.Value == GetValue(i);
 		}
 
-		/// <summary>
+         /// <summary>
 		/// Advances the data reader to the next result, when reading the results of batch SQL statements.
 		/// </summary>
 		/// <returns></returns>
 		public override bool NextResult()
 		{
-			if (!isOpen)
+            return InternalNextResult(null);
+        }
+
+        internal IAsyncResult BeginNextResult(object state, AsyncCallback callback)
+        {
+            if (!isOpen)
+                throw new MySqlException(Resources.NextResultIsClosed);
+
+            // this will clear out any unread data
+            if (resultSet != null)
+                resultSet.Close();
+
+            // single result means we only return a single resultset.  If we have already
+            // returned one, then we return false;
+            if (resultSet != null && (commandBehavior & CommandBehavior.SingleResult) != 0)
+                return null;
+
+            return driver.BeginNextResult(Statement.StatementId, state, callback);
+        }
+
+        internal void EndNextResult(IAsyncResult asyncResult)
+        {
+            InternalNextResult(asyncResult);
+        }
+
+        private bool InternalNextResult(IAsyncResult asyncResult)
+        {
+            if (!isOpen)
                 throw new MySqlException(Resources.NextResultIsClosed);
 
             // this will clear out any unread data
@@ -888,7 +915,16 @@ namespace MySql.Data.MySqlClient
                 do
                 {
                     resultSet = null;
-                    resultSet = driver.NextResult(Statement.StatementId);
+                    if (asyncResult == null)
+                    {
+                        resultSet = driver.NextResult(Statement.StatementId);
+                    }
+                    else
+                    {
+                        resultSet = driver.EndNextResult(Statement.StatementId,
+                            asyncResult);
+                        asyncResult = null;
+                    }
                     if (resultSet == null) return false;
                     if (resultSet.IsOutputParameters) return false;
 


Attachment: [text/bzr-bundle] bzr/vvaintroub@mysql.com-20100816145533-y8urpn11xwnd45fo.bundle
Thread
bzr commit into connector-net-trunk branch (vvaintroub:922) Vladislav Vaintroub16 Aug