List:Commits« Previous MessageNext Message »
From:Vladislav Vaintroub Date:August 3 2009 4:37pm
Subject:bzr commit into connector-net-trunk branch (vvaintroub:717)
View as plain text  
#At file:///H:/connector_net/trunk/ based on revid:vvaintroub@strippedhtgawnr

  717 Vladislav Vaintroub	2009-08-03
      For command timeout, use underlying stream timeouts, instead of starting a timer and issue "cancel command" when timer expires. 
      The previous solution (cancel command) does not take  network latencies/disconnects into account and could hang.  
      The new solutiion with IO timeout has a  drawback that connection needs to be closed after timeout occurs, and running transactions are aborted.
      
      The new implementation is more .NET conform , where SqlCommand.Timeout is documented as accumulated timeout of network reads.
      
      - Named pipes implemention is reworked to support IO timeouts.
      Now it does minimum PInvoke, NamedPipeStream just wraps an asynchronous FileStream.

    added:
      MySql.Data/Provider/Source/TimedStream.cs
      MySql.Data/Provider/Source/cf/Stopwatch.cs
    modified:
      MySql.Data/Provider/MySql.Data.CF.csproj
      MySql.Data/Provider/MySql.Data.csproj
      MySql.Data/Provider/Source/CompressedStream.cs
      MySql.Data/Provider/Source/Driver.cs
      MySql.Data/Provider/Source/MySqlPool.cs
      MySql.Data/Provider/Source/MySqlStream.cs
      MySql.Data/Provider/Source/NativeDriver.cs
      MySql.Data/Provider/Source/command.cs
      MySql.Data/Provider/Source/common/NamedPipeStream.cs
      MySql.Data/Provider/Source/common/NativeMethods.cs
      MySql.Data/Provider/Source/common/SharedMemoryStream.cs
      MySql.Data/Provider/Source/datareader.cs
      MySql.Data/Tests/Source/TimeoutAndCancel.cs
=== modified file 'MySql.Data/Provider/MySql.Data.CF.csproj'
--- a/MySql.Data/Provider/MySql.Data.CF.csproj	2009-07-28 20:40:35 +0000
+++ b/MySql.Data/Provider/MySql.Data.CF.csproj	2009-08-03 16:37:40 +0000
@@ -64,6 +64,7 @@ <Project DefaultTargets="Build" xmlns
     <Compile Include="Source\base\DbConnectionStringBuilder.cs" />
     <Compile Include="Source\base\DbException.cs" />
     <Compile Include="Source\cf\BufferedStream.cs" />
+    <Compile Include="Source\cf\Stopwatch.cs" />
     <Compile Include="Source\cf\WinCE.cs" />
     <Compile Include="Source\CharSetMap.cs" />
     <Compile Include="Source\command.cs">

=== modified file 'MySql.Data/Provider/MySql.Data.csproj'
--- a/MySql.Data/Provider/MySql.Data.csproj	2009-07-28 20:40:35 +0000
+++ b/MySql.Data/Provider/MySql.Data.csproj	2009-08-03 16:37:40 +0000
@@ -246,6 +246,7 @@ <Project DefaultTargets="Build" xmlns
     <Compile Include="Source\MySqlPacket.cs" />
     <Compile Include="Source\MySqlScript.cs" />
     <Compile Include="Source\ResultSet.cs" />
+    <Compile Include="Source\TimedStream.cs" />
     <Compile Include="Source\Types\MySqlGuid.cs" />
   </ItemGroup>
   <ItemGroup>

=== modified file 'MySql.Data/Provider/Source/CompressedStream.cs'
--- a/MySql.Data/Provider/Source/CompressedStream.cs	2009-07-30 13:13:25 +0000
+++ b/MySql.Data/Provider/Source/CompressedStream.cs	2009-08-03 16:37:40 +0000
@@ -108,6 +108,38 @@ namespace MySql.Data.MySqlClient
             }
         }
 
+        public override bool CanTimeout
+        {
+            get
+            {
+               return baseStream.CanTimeout;
+            }
+        }
+
+        public override int ReadTimeout
+        {
+            get
+            {
+                return baseStream.ReadTimeout;
+            }
+            set
+            {
+                baseStream.ReadTimeout = value;
+            }
+        }
+
+        public override int WriteTimeout
+        {
+            get
+            {
+                return baseStream.WriteTimeout;
+            }
+            set
+            {
+                baseStream.WriteTimeout = value;
+            }
+        }
+
         public override int Read(byte[] buffer, int offset, int count)
         {
             if (buffer == null)

=== modified file 'MySql.Data/Provider/Source/Driver.cs'
--- a/MySql.Data/Provider/Source/Driver.cs	2009-07-28 20:40:35 +0000
+++ b/MySql.Data/Provider/Source/Driver.cs	2009-08-03 16:37:40 +0000
@@ -372,7 +372,7 @@ namespace MySql.Data.MySqlClient
         public abstract bool Ping();
         public abstract void CloseStatement(int id);
         public abstract void ExecuteDirect(string sql);
-
+        public abstract void ResetTimeout(int timeoutMilliseconds);
 		#endregion
 
 

=== modified file 'MySql.Data/Provider/Source/MySqlPool.cs'
--- a/MySql.Data/Provider/Source/MySqlPool.cs	2009-07-28 20:40:35 +0000
+++ b/MySql.Data/Provider/Source/MySqlPool.cs	2009-08-03 16:37:40 +0000
@@ -115,6 +115,20 @@ namespace MySql.Data.MySqlClient
                     driver = (Driver)idlePool.Dequeue();
             }
 
+            // Obey the connection timeout
+            if (driver != null)
+            {
+                try
+                {
+                    driver.ResetTimeout((int)Settings.ConnectionTimeout * 1000);
+                }
+                catch (Exception)
+                {
+                    driver.Close();
+                    driver = null;
+                }
+            }
+            
             if (driver != null)
             {
                 // first check to see that the server is still alive

=== modified file 'MySql.Data/Provider/Source/MySqlStream.cs'
--- a/MySql.Data/Provider/Source/MySqlStream.cs	2009-07-29 11:06:15 +0000
+++ b/MySql.Data/Provider/Source/MySqlStream.cs	2009-08-03 16:37:40 +0000
@@ -37,8 +37,7 @@ namespace MySql.Data.MySqlClient
 		private MemoryStream bufferStream;
 		private int maxBlockSize;
 		private ulong maxPacketSize;
-		private Stream inStream;
-		private Stream outStream;
+		private TimedStream stream;
 		private byte[] tempBuffer = new byte[4];
         MySqlPacket packet = new MySqlPacket();
 
@@ -61,19 +60,19 @@ namespace MySql.Data.MySqlClient
         public MySqlStream(Stream baseStream, Encoding encoding, bool compress)
             : this(encoding)
         {
-
-            inStream = new BufferedStream(baseStream);
-            outStream = baseStream;
             if (compress)
             {
-                inStream = new CompressedStream(inStream);
-                outStream = new CompressedStream(outStream);
+                stream = new TimedStream(new CompressedStream(baseStream), true, false);
+            }
+            else
+            {
+                stream = new TimedStream(baseStream, true, false);
             }
         }
 
 		public void Close()
 		{
-			inStream.Close();
+			stream.Close();
             // no need to close outStream because closing
             // inStream closes the underlying network stream
             // for us.
@@ -87,6 +86,11 @@ namespace MySql.Data.MySqlClient
 			set { encoding = value; }
 		}
 
+        public void ResetTimeout(int timeout)
+        {
+            stream.ResetTimeout(timeout);
+        }
+
 		public byte SequenceByte
 		{
 			get { return sequenceByte; }
@@ -177,10 +181,10 @@ namespace MySql.Data.MySqlClient
                 int offset = 0;
                 while (true)
                 {
-                    int b1 = inStream.ReadByte();
-                    int b2 = inStream.ReadByte();
-                    int b3 = inStream.ReadByte();
-                    int seqByte = inStream.ReadByte();
+                    int b1 = stream.ReadByte();
+                    int b2 = stream.ReadByte();
+                    int b3 = stream.ReadByte();
+                    int seqByte = stream.ReadByte();
 
                     if (b1 == -1 || b2 == -1 || b3 == -1 || seqByte == -1)
                         throw new MySqlException(
@@ -192,7 +196,7 @@ namespace MySql.Data.MySqlClient
                     // make roo for the next block
                     packet.Length += length;
 
-                    ReadFully(inStream, packet.Buffer, offset, length);
+                    ReadFully(stream, packet.Buffer, offset, length);
                     offset += length;
 
                     // if this block was < maxBlock then it's last one in a multipacket series
@@ -223,8 +227,8 @@ namespace MySql.Data.MySqlClient
                 buffer[offset+2] = (byte)((lenToSend >> 16) & 0xff);
                 buffer[offset+3] = sequenceByte++;
 
-                outStream.Write(buffer, offset, lenToSend + 4);
-                outStream.Flush();
+                stream.Write(buffer, offset, lenToSend + 4);
+                stream.Flush();
                 length -= lenToSend;
                 offset += lenToSend;
             }
@@ -250,8 +254,8 @@ namespace MySql.Data.MySqlClient
             buffer[1] = (byte)((count >> 8) & 0xff);
             buffer[2] = (byte)((count >> 16) & 0xff);
             buffer[3] = sequenceByte++;
-            outStream.Write(buffer, 0, count + 4);
-            outStream.Flush();
+            stream.Write(buffer, 0, count + 4);
+            stream.Flush();
         }
 
         /// <summary>

=== modified file 'MySql.Data/Provider/Source/NativeDriver.cs'
--- a/MySql.Data/Provider/Source/NativeDriver.cs	2009-07-28 20:40:35 +0000
+++ b/MySql.Data/Provider/Source/NativeDriver.cs	2009-08-03 16:37:40 +0000
@@ -220,6 +220,8 @@ namespace MySql.Data.MySqlClient
             int maxSinglePacket = 255*255*255;
             stream = new MySqlStream(baseStream, encoding, false);
 
+            stream.ResetTimeout((int)Settings.ConnectionTimeout*1000);
+
             // read off the welcome packet and parse out it's values
             packet = stream.ReadPacket();
             protocol = packet.ReadByte();
@@ -942,5 +944,16 @@ namespace MySql.Data.MySqlClient
             stream.SendPacket(packet);
         }
 
-	}
+        /// <summary>
+        /// Execution timeout, in milliseconds. When the accumulated time for network IO exceeds this value
+        /// TimeoutException is thrown. This timeout needs to be reset for every new command
+        /// </summary>
+        /// 
+        public override void ResetTimeout(int timeout)
+        {
+            stream.ResetTimeout(timeout);
+        }
+
+    }
+ 
 }

=== added file 'MySql.Data/Provider/Source/TimedStream.cs'
--- a/MySql.Data/Provider/Source/TimedStream.cs	1970-01-01 00:00:00 +0000
+++ b/MySql.Data/Provider/Source/TimedStream.cs	2009-08-03 16:37:40 +0000
@@ -0,0 +1,276 @@
+// Copyright (c) 2009 Sun Microsystems, Inc.
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 2 as published by
+// the Free Software Foundation
+//
+// There are special exceptions to the terms and conditions of the GPL 
+// as it is applied to this software. View the full text of the 
+// exception in file EXCEPTIONS in the directory of this software 
+// distribution.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
+
+using System;
+using System.IO;
+using System.Net.Sockets;
+using System.Diagnostics;
+
+namespace MySql.Data.MySqlClient
+{
+    /// <summary>
+    /// Stream that supports timeout of IO operations.
+    /// This class is used is used to support timeouts for SQL command, where a typical operation involves several network reads/writes. 
+    /// Timeout here is defined as the accumulated duration of all IO operations.
+    /// </summary>
+    /// <remarks> Any IO exception or timeout exception closes the stream </remarks>
+    internal class TimedStream : Stream
+    {
+        Stream baseStream;
+        Stream inStream;
+        Stream outStream;
+        int timeout;
+        Stopwatch stopwatch;
+        bool isClosed;
+
+
+        /// <summary>
+        /// Construct a TimedStream
+        /// </summary>
+        /// <param name="baseStream"> Undelying stream</param>
+        /// <param name="bufferedInput">If input buffering should be used</param>
+        /// <param name="bufferedOutput">If output buffering should be used</param>
+        public TimedStream(Stream baseStream, bool bufferedInput, bool bufferedOutput)
+        {
+            this.baseStream = baseStream;
+            if (bufferedInput)
+                inStream = new BufferedStream(baseStream);
+            else
+                inStream = baseStream;
+
+            if (bufferedOutput)
+                outStream = new BufferedStream(baseStream);
+            else
+                outStream = baseStream;
+            timeout = System.Threading.Timeout.Infinite;
+            isClosed = false;
+            stopwatch = new Stopwatch();
+        }
+
+        private void StartTimer()
+        {
+            baseStream.ReadTimeout = baseStream.WriteTimeout =
+                timeout;
+            stopwatch.Start();
+        }
+        private void StopTimer()
+        {
+            stopwatch.Stop();
+
+            if (timeout != System.Threading.Timeout.Infinite)
+            {
+                // Normally, a timeout exception would be thrown  by stream itself, since we set the read/write timeout 
+                // for the stream.  However there is a gap between  end of IO operation and stopping the stop watch, 
+                // and it makes it possible for timeout to exceed even after IO completed successfully.
+                if (stopwatch.ElapsedMilliseconds > timeout)
+                {
+                    throw new TimeoutException();
+                }
+            }
+        }
+        public override bool CanRead
+        {
+            get { return baseStream.CanRead; }
+        }
+
+        public override bool CanSeek
+        {
+            get { return baseStream.CanSeek; }
+        }
+
+        public override bool CanWrite
+        {
+            get { return baseStream.CanWrite; }
+        }
+
+        public override void Flush()
+        {
+            try
+            {
+                StartTimer();
+                outStream.Flush();
+                StopTimer();
+            }
+            catch (Exception e)
+            {
+                HandleTimeout(e);
+            }
+        }
+
+        public override long Length
+        {
+            get { return baseStream.Length; }
+        }
+
+        public override long Position
+        {
+            get
+            {
+                return baseStream.Position;
+            }
+            set
+            {
+                baseStream.Position = value;
+                if (inStream != baseStream)
+                {
+                    inStream.Position = value;
+                }
+                if (outStream != baseStream)
+                {
+                    outStream.Position = value;
+                }
+            }
+        }
+
+        public override int Read(byte[] buffer, int offset, int count)
+        {
+            try
+            {
+                StartTimer();
+                int retval = inStream.Read(buffer, offset, count);
+                StopTimer();
+                return retval;
+            }
+            catch (Exception e)
+            {
+                HandleTimeout(e);
+                throw;
+            }
+        }
+
+        public override int ReadByte()
+        {
+            try
+            {
+                StartTimer();
+                int retval = inStream.ReadByte();
+                StopTimer();
+                return retval;
+            }
+            catch (Exception e)
+            {
+                HandleTimeout(e);
+                throw;
+            }
+        }
+
+        public override long Seek(long offset, SeekOrigin origin)
+        {
+            return baseStream.Seek(offset, origin);
+        }
+
+        public override void SetLength(long value)
+        {
+            baseStream.SetLength(value);
+        }
+
+        public override void Write(byte[] buffer, int offset, int count)
+        {
+            try
+            {
+                StartTimer();
+                outStream.Write(buffer, offset, count);
+                StopTimer();
+            }
+            catch (Exception e)
+            {
+                HandleTimeout(e);
+                throw;
+            }
+        }
+
+        public override bool CanTimeout
+        {
+            get { return baseStream.CanTimeout; }
+        }
+
+        public override int ReadTimeout
+        {
+            get { return inStream.ReadTimeout; }
+            set { inStream.ReadTimeout = value; }
+        }
+        public override int WriteTimeout
+        {
+            get { return outStream.WriteTimeout; }
+            set { outStream.WriteTimeout = value; }
+        }
+
+        public override void Close()
+        {
+            if (isClosed)
+                return;
+            baseStream.Close();
+            if (inStream != baseStream)
+            {
+                inStream.Close();
+            }
+            if (outStream != baseStream)
+            {
+                outStream.Close();
+            }
+            isClosed = true;
+        }
+
+        public void ResetTimeout(int newTimeout)
+        {
+
+            if (newTimeout == System.Threading.Timeout.Infinite)
+                timeout = newTimeout;
+            else if (newTimeout < 0)
+                throw new ArgumentException("Invalid timeout value");
+            else if (newTimeout == 0)
+                timeout = System.Threading.Timeout.Infinite;
+            else
+                timeout = newTimeout;
+            stopwatch.Reset();
+        }
+
+
+        /// <summary>
+        /// Examine the exception chain for exceptions throw during read/write operations
+        /// If timeout exception is detected in the chain, new TimeoutException is thrown.
+        /// </summary>
+        /// <param name="e">original exception</param>
+        void HandleTimeout(Exception e)
+        {
+            stopwatch.Stop();
+            Close();
+            Exception currentException = e;
+
+            while (currentException != null)
+            {
+                if (currentException is SocketException)
+                {
+                    SocketException socketException = (SocketException)currentException;
+#if CF
+                    if (socketException.NativeErrorCode == 10060)
+#else
+                    if (socketException.SocketErrorCode == SocketError.TimedOut)
+#endif
+                    {
+                        throw new TimeoutException(e.Message, e);
+                    }
+                }
+                currentException = currentException.InnerException;
+            }
+        }
+
+    }
+}

=== added file 'MySql.Data/Provider/Source/cf/Stopwatch.cs'
--- a/MySql.Data/Provider/Source/cf/Stopwatch.cs	1970-01-01 00:00:00 +0000
+++ b/MySql.Data/Provider/Source/cf/Stopwatch.cs	2009-08-03 16:37:40 +0000
@@ -0,0 +1,52 @@
+// Copyright (c) 2009 Sun Microsystems, Inc.
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 2 as published by
+// the Free Software Foundation
+//
+// There are special exceptions to the terms and conditions of the GPL 
+// as it is applied to this software. View the full text of the 
+// exception in file EXCEPTIONS in the directory of this software 
+// distribution.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
+
+using System;
+
+namespace MySql.Data.Common
+{
+    class Stopwatch
+    {
+        long millis;
+        DateTime startTime;
+        public Stopwatch()
+        {
+            millis = 0;
+        }
+        public long ElapsedMilliseconds
+        {
+            get { return millis; }
+        }
+        public void Start()
+        {
+            startTime = DateTime.Now;
+        }
+
+        public void Stop()
+        {
+            millis += (long)DateTime.Now.Subtract(startTime).TotalMilliseconds;
+        }
+
+        public void Reset()
+        {
+            millis = 0;
+        }
+    }
+}

=== modified file 'MySql.Data/Provider/Source/command.cs'
--- a/MySql.Data/Provider/Source/command.cs	2009-07-28 20:40:35 +0000
+++ b/MySql.Data/Provider/Source/command.cs	2009-08-03 16:37:40 +0000
@@ -332,28 +332,6 @@ namespace MySql.Data.MySqlClient
 			return ExecuteReader(CommandBehavior.Default);
 		}
 
-		private void TimeoutExpired(object commandObject)
-		{
-			MySqlCommand cmd = (commandObject as MySqlCommand);
-            if (cmd == null)
-            {
-                Logger.LogWarning(Resources.TimeoutExpiredNullObject);
-                return;
-            }
-
-            cmd.timedOut = true;
-            try
-            {
-                cmd.Cancel();
-            }
-            catch (Exception ex)
-            {
-                // if something goes wrong, we log it and eat it.  There's really nothing
-                // else we can do.
-                if (connection.Settings.Logging)
-                    Logger.LogException(ex);
-            }
-		}
 
 		/// <include file='docs/mysqlcommand.xml' path='docs/ExecuteReader1/*'/>
 		public new MySqlDataReader ExecuteReader(CommandBehavior behavior)
@@ -398,34 +376,28 @@ namespace MySql.Data.MySqlClient
             HandleCommandBehaviors(behavior);
 
 			updatedRowCount = -1;
-
-            Timer timer = null;
             try
             {
+                connection.driver.ResetTimeout(commandTimeout * 1000);
                 MySqlDataReader reader = new MySqlDataReader(this, statement, behavior);
-
-                // start a threading timer on our command timeout 
-                timedOut = false;
-                canceled = false;
-
                 // execute the statement
                 statement.Execute();
-
-                // start a timeout timer
-                if (connection.driver.Version.isAtLeast(5, 0, 0) &&
-                     commandTimeout > 0)
-                {
-                    TimerCallback timerDelegate =
-                         new TimerCallback(TimeoutExpired);
-                    timer = new Timer(timerDelegate, this, this.CommandTimeout * 1000, Timeout.Infinite);
-                }
-
                 // wait for data to return
                 reader.NextResult();
-                
                 connection.Reader = reader;
                 return reader;
             }
+            catch (TimeoutException tex)
+            {
+                try
+                {
+                    Connection.Abort();
+                }
+                catch (Exception)
+                {
+                }
+                throw new MySqlException(Resources.Timeout,true, tex);
+            }
             catch (MySqlException ex)
             {
                 try
@@ -437,8 +409,6 @@ namespace MySql.Data.MySqlClient
                 // if we caught an exception because of a cancel, then just return null
                 if (ex.Number == 1317)
                 {
-                    if (TimedOut)
-                        throw new MySqlException(Resources.Timeout);
                     return null;
                 }
                 if (ex.IsFatal)
@@ -447,11 +417,6 @@ namespace MySql.Data.MySqlClient
                     throw new MySqlException(Resources.FatalErrorDuringExecute, ex);
                 throw;
             }
-            finally
-            {
-                if (timer != null)
-                    timer.Dispose();
-            }
 		}
 
 		/// <include file='docs/mysqlcommand.xml' path='docs/ExecuteScalar/*'/>

=== modified file 'MySql.Data/Provider/Source/common/NamedPipeStream.cs'
--- a/MySql.Data/Provider/Source/common/NamedPipeStream.cs	2009-07-28 20:40:35 +0000
+++ b/MySql.Data/Provider/Source/common/NamedPipeStream.cs	2009-08-03 16:37:40 +0000
@@ -1,4 +1,4 @@
-// Copyright (c) 2004-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
+// Copyright (c) 2009 Sun Microsystems, Inc.
 //
 // This program is free software; you can redistribute it and/or modify
 // it under the terms of the GNU General Public License version 2 as published by
@@ -22,176 +22,174 @@ using System;
 using System.IO;
 using MySql.Data.MySqlClient;
 using MySql.Data.MySqlClient.Properties;
+using Microsoft.Win32.SafeHandles;
+using System.Threading;
+using System.Diagnostics;
+
 
 
 namespace MySql.Data.Common
 {
-	/// <summary>
-	/// Summary description for API.
-	/// </summary>
-	internal class NamedPipeStream : Stream
-	{
-		int			pipeHandle;
-		FileAccess	_mode;
-
-		public NamedPipeStream(string host, FileAccess mode)
-		{
-			Open(host, mode);
-		}
-
-		public void Open( string host, FileAccess mode )
-		{
-			_mode = mode;
-			uint pipemode = 0;
-
-			if ((mode & FileAccess.Read) > 0)
-				pipemode |= NativeMethods.GENERIC_READ;
-			if ((mode & FileAccess.Write) > 0)
-				pipemode |= NativeMethods.GENERIC_WRITE;
-
-			pipeHandle = NativeMethods.CreateFile( host, pipemode,
-						0, null, NativeMethods.OPEN_EXISTING, 0, 0 );
-		}
-
-		public override bool CanRead
-		{
-			get { return (_mode & FileAccess.Read) > 0; }
-		}
-
-		public override bool CanWrite
-		{
-			get { return (_mode & FileAccess.Write) > 0; }
-		}
-
-		public override bool CanSeek
-		{
-			get { throw new NotSupportedException(Resources.NamedPipeNoSeek); }
-		}
-
-		public override long Length
-		{
-			get { throw new NotSupportedException(Resources.NamedPipeNoSeek); }
-		}
-
-		public override long Position 
-		{
-			get { throw new NotSupportedException(Resources.NamedPipeNoSeek); }
-			set { }
-		}
-
-		public override void Flush() 
-		{
-			if (pipeHandle != 0)
-				NativeMethods.FlushFileBuffers((IntPtr)pipeHandle);
-		}
-
-		public override int Read(byte[] buffer, int offset, int count)
-		{
-			if (buffer == null) 
-				throw new ArgumentNullException("buffer", 
-					Resources.BufferCannotBeNull);
-			if (buffer.Length < (offset + count))
-				throw new ArgumentException(
-					Resources.BufferNotLargeEnough);
-			if (offset < 0) 
-				throw new ArgumentOutOfRangeException("offset", offset, 
-					Resources.OffsetCannotBeNegative);
-			if (count < 0)
-				throw new ArgumentOutOfRangeException("count", count, 
-					Resources.CountCannotBeNegative);
-			if (! CanRead)
-				throw new NotSupportedException(Resources.StreamNoRead);
-			if (pipeHandle == 0) 
-				throw new ObjectDisposedException("NamedPipeStream", 
-					Resources.StreamAlreadyClosed);
-
-			// first read the data into an internal buffer since ReadFile cannot read into a buf at
-			// a specified offset
-			uint read;
-			byte[] buf = new Byte[count];
-			bool result = NativeMethods.ReadFile((IntPtr)pipeHandle, buf, 
-				(uint)count, out read, IntPtr.Zero); 
-			
-			if (! result)
-			{
-				Close();
-				throw new MySqlException(Resources.ReadFromStreamFailed, true, null);
-			}
-
-			Array.Copy(buf, (int)0, buffer, (int)offset, (int)read);
-			return (int)read;
-		}
-
-		public override void Close()
-		{
-			if (pipeHandle != 0)
-			{
-				NativeMethods.CloseHandle((IntPtr)pipeHandle);
-				pipeHandle = 0;
-			}
-		}
-
-		public override void SetLength(long length)
-		{
-			throw new NotSupportedException(Resources.NamedPipeNoSetLength);
-		}
-
-		public override void Write(byte[] buffer, int offset, int count)
-		{
-			if (buffer == null) 
-				throw new ArgumentNullException("buffer", Resources.BufferCannotBeNull);
-			if (buffer.Length < (offset + count))
-				throw new ArgumentException(Resources.BufferNotLargeEnough, "buffer");
-			if (offset < 0) 
-				throw new ArgumentOutOfRangeException("offset", offset, 
-					Resources.OffsetCannotBeNegative);
-			if (count < 0)
-				throw new ArgumentOutOfRangeException("count", count, 
-					Resources.CountCannotBeNegative);
-			if (! CanWrite)
-				throw new NotSupportedException(Resources.StreamNoWrite);
-			if (pipeHandle == 0)
-				throw new ObjectDisposedException("NamedPipeStream", 
-					Resources.StreamAlreadyClosed);
-			
-			// copy data to internal buffer to allow writing from a specified offset
-			uint bytesWritten = 0;
-			bool result;
-
-			if (offset == 0  && count <= 65535)
-				result = NativeMethods.WriteFile((IntPtr)pipeHandle, buffer, (uint)count, out bytesWritten, IntPtr.Zero);
-			else
-			{
-				byte[] localBuf = new byte[65535];
-
-				result = true;
-				while (count != 0 && result)
-				{
-                    uint thisWritten;
-                    int cnt = Math.Min(count, 65535);
-					Array.Copy( buffer, offset, localBuf, 0, cnt );
-					result = NativeMethods.WriteFile((IntPtr)pipeHandle, localBuf, (uint)cnt, out thisWritten, IntPtr.Zero);
-					bytesWritten += thisWritten;
-					count -= cnt;
-					offset += cnt;
-				}
-			}
-
-			if (! result)
-			{
-				Close();
-				throw new MySqlException(Resources.WriteToStreamFailed, true, null);
-			}
-			if (bytesWritten < count)
-				throw new IOException("Unable to write entire buffer to stream");
-		}
-
-		public override long Seek( long offset, SeekOrigin origin )
-		{
-			throw new NotSupportedException(Resources.NamedPipeNoSeek);
-		}
+    /// <summary>
+    /// Summary description for API.
+    /// </summary>
+    internal class NamedPipeStream : Stream
+    {
+
+        Stream fileStream;
+        int readTimeout = Timeout.Infinite;
+        int writeTimeout = Timeout.Infinite;
+
+
+        public NamedPipeStream(string path, FileAccess mode)
+        {
+            Open(path, mode);
+        }
+
+        public void Open( string path, FileAccess mode )
+        {
+           SafeFileHandle handle = new SafeFileHandle(NativeMethods.CreateFile(path, NativeMethods.GENERIC_READ | NativeMethods.GENERIC_WRITE,
+                        0, null, NativeMethods.OPEN_EXISTING, NativeMethods.FILE_FLAG_OVERLAPPED, 0),true);
+           fileStream = new FileStream(handle, mode,4096 , true);
+        }
+
+        public override bool CanRead
+        {
+            get { return fileStream.CanRead; }
+        }
+
+        public override bool CanWrite
+        {
+            get { return fileStream.CanWrite; }
+        }
+
+        public override bool CanSeek
+        {
+            get { throw new NotSupportedException(Resources.NamedPipeNoSeek); }
+        }
+
+        public override long Length
+        {
+            get { throw new NotSupportedException(Resources.NamedPipeNoSeek); }
+        }
+
+        public override long Position 
+        {
+            get { throw new NotSupportedException(Resources.NamedPipeNoSeek); }
+            set { }
+        }
+
+        public override void Flush() 
+        {
+            fileStream.Flush();
+        }
 

-        internal static NamedPipeStream Create(string pipeName, string hostname)
+        public override int Read(byte[] buffer, int offset, int count)
+        {
+            if(readTimeout == Timeout.Infinite)
+            {
+                return fileStream.Read(buffer, offset, count);
+            }
+            IAsyncResult result = fileStream.BeginRead(buffer, offset, count, null, null);
+            if (result.CompletedSynchronously)
+                return fileStream.EndRead(result);
+            int timeLeft = readTimeout;
+
+            Stopwatch stopwatch = new Stopwatch();
+            stopwatch.Start();
+
+            while(!result.IsCompleted)
+            {
+                bool signaled = result.AsyncWaitHandle.WaitOne(readTimeout);
+                if (!signaled)
+                    throw new TimeoutException();
+                timeLeft -= (int)stopwatch.ElapsedMilliseconds;
+                if (timeLeft < 0)
+                    throw new TimeoutException();
+            }
+            int bytesRead = fileStream.EndRead(result);
+
+            return bytesRead;
+        }
+
+
+        public override void Write(byte[] buffer, int offset, int count)
+        {
+            if (readTimeout == Timeout.Infinite)
+            {
+                fileStream.Write(buffer, offset, count);
+                return;
+            }
+            IAsyncResult result = fileStream.BeginWrite(buffer, offset, count, null, null);
+            if (result.CompletedSynchronously)
+            {
+                fileStream.EndWrite(result);
+            }
+            int timeLeft = writeTimeout;
+
+            Stopwatch stopwatch = new Stopwatch();
+            stopwatch.Start();
+            while (!result.IsCompleted)
+            {
+                bool signaled = result.AsyncWaitHandle.WaitOne(timeLeft);
+                if (!signaled)
+                    throw new TimeoutException();
+                timeLeft -= (int)stopwatch.ElapsedMilliseconds;
+                if (timeLeft < 0)
+                    throw new TimeoutException();
+            }
+            fileStream.EndWrite(result);
+        }
+
+        public override void Close()
+        {
+            fileStream.Close();
+        }
+
+        public override void SetLength(long length)
+        {
+            throw new NotSupportedException(Resources.NamedPipeNoSetLength);
+        }
+
+
+        public override bool CanTimeout
+        {
+            get
+            {
+                return true;
+            }
+        }
+
+        public override int ReadTimeout
+        {
+            get
+            {
+                return readTimeout;
+            }
+            set
+            {
+                readTimeout = value;
+            }
+        }
+
+        public override  int WriteTimeout
+        {
+            get
+            {
+                return writeTimeout;
+            }
+            set
+            {
+                writeTimeout = value;
+            }
+        }
+
+        public override long Seek( long offset, SeekOrigin origin )
+        {
+            throw new NotSupportedException(Resources.NamedPipeNoSeek);
+        }
+     
+        internal static Stream Create(string pipeName, string hostname)
         {
             string pipePath;
             if (0 == String.Compare(hostname, "localhost", true))
@@ -200,7 +198,7 @@ namespace MySql.Data.Common
                 pipePath = String.Format(@"\\{0}\pipe\{1}", hostname, pipeName);
             return new NamedPipeStream(pipePath, FileAccess.ReadWrite);
         }
-	}
+    }
 }
 
 

=== modified file 'MySql.Data/Provider/Source/common/NativeMethods.cs'
--- a/MySql.Data/Provider/Source/common/NativeMethods.cs	2009-04-21 18:02:13 +0000
+++ b/MySql.Data/Provider/Source/common/NativeMethods.cs	2009-08-03 16:37:40 +0000
@@ -58,7 +58,7 @@ namespace MySql.Data.Common
 		}
 
 		[DllImport("Kernel32", CharSet=CharSet.Unicode)]
-		static extern public int CreateFile(
+		static extern public IntPtr CreateFile(
             String fileName,
 			uint desiredAccess,
 			uint shareMode, 

=== modified file 'MySql.Data/Provider/Source/common/SharedMemoryStream.cs'
--- a/MySql.Data/Provider/Source/common/SharedMemoryStream.cs	2009-07-28 20:40:35 +0000
+++ b/MySql.Data/Provider/Source/common/SharedMemoryStream.cs	2009-08-03 16:37:40 +0000
@@ -51,6 +51,8 @@ namespace MySql.Data.Common
 //		private const uint EVENT_ALL_ACCESS = 0x001F0003;
 		private const uint FILE_MAP_WRITE = 0x2;
 		private const int BUFFERLENGTH = 16004;
+		private int readTimeout = System.Threading.Timeout.Infinite;
+		private int writeTimeout = System.Threading.Timeout.Infinite;
 
 		public SharedMemoryStream(string memName)
 		{
@@ -74,6 +76,17 @@ namespace MySql.Data.Common
 			AutoResetEvent connectRequest = new AutoResetEvent(false);
             IntPtr handle = NativeMethods.OpenEvent(SYNCHRONIZE | EVENT_MODIFY_STATE, false,
 			memoryName + "_" + "CONNECT_REQUEST");
+			if (handle == IntPtr.Zero)
+			{
+				// If server runs as service, its shared memory is global 
+				// And if connector runs in user session, it needs to prefix
+				// shared memory name with "Global\"
+				string prefixedMemoryName= @"Global\" + memoryName;
+				handle = NativeMethods.OpenEvent(SYNCHRONIZE | EVENT_MODIFY_STATE, false,
+					prefixedMemoryName + "_" + "CONNECT_REQUEST");
+				if (handle != IntPtr.Zero)
+					memoryName = prefixedMemoryName;
+			}
 			connectRequest.SafeWaitHandle = new SafeWaitHandle(handle, true);
 
 			AutoResetEvent connectAnswer = new AutoResetEvent(false);
@@ -181,11 +194,20 @@ namespace MySql.Data.Common
 
 		public override int Read(byte[] buffer, int offset, int count)
 		{
+			Stopwatch stopwatch = new Stopwatch();
+			int timeLeft = readTimeout;
 			while (bytesLeft == 0)
 			{
-				while (!serverWrote.WaitOne(500, false))
+				if (IsClosed()) return 0;
+				if (!serverWrote.WaitOne(timeLeft, false))
+				{
+					throw new TimeoutException();
+				}
+				if (readTimeout != System.Threading.Timeout.Infinite)
 				{
-					if (IsClosed()) return 0;
+					timeLeft = readTimeout - (int)stopwatch.ElapsedMilliseconds;
+					if (timeLeft < 0)
+							throw new TimeoutException();
 				}
 
 				bytesLeft = Marshal.ReadInt32(dataView);
@@ -216,11 +238,19 @@ namespace MySql.Data.Common
 			int leftToDo = count;
 			int buffPos = offset;
 
+			Stopwatch stopwatch = new Stopwatch();
+			int timeLeft = writeTimeout;
+
 			while (leftToDo > 0)
 			{
-				if (!serverRead.WaitOne())
-					throw new MySqlException("Writing to shared memory failed");
-
+				if (!serverRead.WaitOne(timeLeft))
+					throw new TimeoutException();
+				if (writeTimeout != System.Threading.Timeout.Infinite)
+				{
+					timeLeft = writeTimeout - (int)stopwatch.ElapsedMilliseconds;
+					if (timeLeft < 0)
+						throw new TimeoutException();
+				}
 				int bytesToDo = Math.Min(leftToDo, BUFFERLENGTH);
 
 				long baseMem = dataView.ToInt64() + 4;
@@ -237,6 +267,39 @@ namespace MySql.Data.Common
 		{
 			throw new NotSupportedException("SharedMemoryStream does not support seeking");
 		}
-	}
+
+        public override bool CanTimeout
+        {
+            get
+            {
+                return true;
+            }
+        }
+
+        public override int ReadTimeout
+        {
+            get
+            {
+                return readTimeout;
+            }
+            set
+            {
+                readTimeout = value;
+            }
+        }
+
+        public override int WriteTimeout
+        {
+            get
+            {
+                return writeTimeout;
+            }
+            set
+            {
+                writeTimeout = value;
+            }
+        }
+
+    }
 #endif
 }

=== modified file 'MySql.Data/Provider/Source/datareader.cs'
--- a/MySql.Data/Provider/Source/datareader.cs	2009-07-28 20:40:35 +0000
+++ b/MySql.Data/Provider/Source/datareader.cs	2009-08-03 16:37:40 +0000
@@ -818,13 +818,22 @@ namespace MySql.Data.MySqlClient
             // single result means we only return a single resultset.  If we have already
             // returned one, then we return false;
             if (resultSet.ResultsIndex == 0 && (commandBehavior & CommandBehavior.SingleResult) != 0)
+            {
+                // Command is completed, clear the IO timeouts for the stream
+                connection.driver.ResetTimeout(0);
                 return false;
+            }
 
 			// tell our command to continue execution of the SQL batch until it its
 			// another resultset
 			try
 			{
-                if (!resultSet.NextResult()) return false;
+                if (!resultSet.NextResult())
+                {
+                    // Command is completed, clear the IO timeouts for the stream
+                    connection.driver.ResetTimeout(0);
+                    return false;
+                }
 
 				// issue any requested UA warnings
 				if (connection.Settings.UseUsageAdvisor)
@@ -841,8 +850,6 @@ namespace MySql.Data.MySqlClient
             {
 				if (ex.IsFatal)
 					connection.Abort();
-                if (command.TimedOut)
-                    throw new MySqlException(Resources.Timeout);
                 if (ex.Number == 0)
                     throw new MySqlException(Resources.FatalErrorReadingResult, ex);
                 throw;

=== modified file 'MySql.Data/Tests/Source/TimeoutAndCancel.cs'
--- a/MySql.Data/Tests/Source/TimeoutAndCancel.cs	2009-04-23 01:08:24 +0000
+++ b/MySql.Data/Tests/Source/TimeoutAndCancel.cs	2009-08-03 16:37:40 +0000
@@ -109,8 +109,7 @@ namespace MySql.Data.MySqlClient.Tests
             stateChangeCount++;
         }
 
-        [Test]
-        public void TimeoutExpiring()
+        private void TimeoutExpiring(MySqlConnectionProtocol protocol)
         {
             if (version < new Version(5, 0)) return;
 
@@ -120,25 +119,49 @@ namespace MySql.Data.MySqlClient.Tests
                     SELECT SLEEP(duration);
                 END");
 
-            DateTime start = DateTime.Now;
-            try
+            MySqlConnectionStringBuilder builder = new MySqlConnectionStringBuilder(conn.ConnectionString);
+            builder.ConnectionProtocol = protocol;
+            using (MySqlConnection connection = new MySqlConnection(builder.ConnectionString))
             {
-                MySqlCommand cmd = new MySqlCommand("spTest", conn);
-                cmd.Parameters.AddWithValue("duration", 60);
-                cmd.CommandType = CommandType.StoredProcedure;
-                cmd.CommandTimeout = 5;
-                cmd.ExecuteNonQuery();
-                Assert.Fail("Should not get to this point");
-            }
-            catch (MySqlException ex)
-            {
-                TimeSpan ts = DateTime.Now.Subtract(start);
-                Assert.IsTrue(ts.TotalSeconds <= 10);
-                Assert.IsTrue(ex.Message.StartsWith("Timeout expired"), "Message is wrong");
+                connection.Open();
+
+                DateTime start = DateTime.Now;
+                try
+                {
+                    MySqlCommand cmd = new MySqlCommand("spTest", connection);
+                    cmd.Parameters.AddWithValue("duration", 60);
+                    cmd.CommandType = CommandType.StoredProcedure;
+                    cmd.CommandTimeout = 5;
+                    cmd.ExecuteNonQuery();
+                    Assert.Fail("Should not get to this point");
+                }
+                catch (MySqlException ex)
+                {
+                    TimeSpan ts = DateTime.Now.Subtract(start);
+                    Assert.IsTrue(ts.TotalSeconds <= 10);
+                    Assert.IsTrue(ex.Message.StartsWith("Timeout expired"), "Message is wrong");
+                }
             }
         }
 
         [Test]
+        public void TimeoutExpiringSockets()
+        {
+            TimeoutExpiring(MySqlConnectionProtocol.Sockets);
+        }
+
+        [Test]
+        public void TimeoutExpiringSharedMemory()
+        {
+            TimeoutExpiring(MySqlConnectionProtocol.SharedMemory);
+        }
+
+        [Test]
+        public void TimeoutExpiringNamedPipe()
+        {
+            TimeoutExpiring(MySqlConnectionProtocol.NamedPipe);
+        }
+
         public void TimeoutNotExpiring()
         {
             if (Version < new Version(5, 0)) return;
@@ -149,6 +172,7 @@ namespace MySql.Data.MySqlClient.Tests
                     SELECT SLEEP(duration);
                 END");
 
+            conn.Open();
             MySqlCommand cmd = new MySqlCommand("spTest", conn);
             cmd.Parameters.AddWithValue("duration", 10);
             cmd.CommandType = CommandType.StoredProcedure;
@@ -230,16 +254,23 @@ namespace MySql.Data.MySqlClient.Tests
                 c.Open();
                 string connStr1 = c.ConnectionString;
 
-                MySqlCommand cmd = new MySqlCommand("SELECT SLEEP(10)", c);
-                cmd.CommandTimeout = 5;
+                MySqlCommand cmd = new MySqlCommand("SELECT SLEEP(2)", c);
+                cmd.CommandTimeout = 1;
 
-                using (MySqlDataReader reader = cmd.ExecuteReader())
+                try
+                {
+                    using (MySqlDataReader reader = cmd.ExecuteReader())
+                    {
+                    }
+                }
+                catch (MySqlException ex)
                 {
-                    string connStr2 = c.ConnectionString.ToLower(CultureInfo.InvariantCulture);
-                    Assert.AreEqual(-1, connStr2.IndexOf("pooling=true"));
-                    Assert.AreEqual(-1, connStr2.IndexOf("pooling=false"));
-                    reader.Read();
+                    Assert.IsTrue(ex.InnerException is TimeoutException);
+                    Assert.IsTrue(c.State == ConnectionState.Closed);
                 }
+                string connStr2 = c.ConnectionString.ToLower(CultureInfo.InvariantCulture);
+                Assert.AreEqual(-1, connStr2.IndexOf("pooling=true"));
+                Assert.AreEqual(-1, connStr2.IndexOf("pooling=false"));
             }
         }
     }

Attachment: [text/bzr-bundle] bzr/vvaintroub@mysql.com-20090803163740-aqgbxvr4ubvf4dne.bundle
Thread
bzr commit into connector-net-trunk branch (vvaintroub:717)Vladislav Vaintroub3 Aug