#At file:///H:/connector_net/trunk/ based on revid:vvaintroub@strippedhtgawnr
717 Vladislav Vaintroub 2009-08-03
For command timeout, use underlying stream timeouts, instead of starting a timer and issue "cancel command" when timer expires.
The previous solution (cancel command) does not take network latencies/disconnects into account and could hang.
The new solutiion with IO timeout has a drawback that connection needs to be closed after timeout occurs, and running transactions are aborted.
The new implementation is more .NET conform , where SqlCommand.Timeout is documented as accumulated timeout of network reads.
- Named pipes implemention is reworked to support IO timeouts.
Now it does minimum PInvoke, NamedPipeStream just wraps an asynchronous FileStream.
added:
MySql.Data/Provider/Source/TimedStream.cs
MySql.Data/Provider/Source/cf/Stopwatch.cs
modified:
MySql.Data/Provider/MySql.Data.CF.csproj
MySql.Data/Provider/MySql.Data.csproj
MySql.Data/Provider/Source/CompressedStream.cs
MySql.Data/Provider/Source/Driver.cs
MySql.Data/Provider/Source/MySqlPool.cs
MySql.Data/Provider/Source/MySqlStream.cs
MySql.Data/Provider/Source/NativeDriver.cs
MySql.Data/Provider/Source/command.cs
MySql.Data/Provider/Source/common/NamedPipeStream.cs
MySql.Data/Provider/Source/common/NativeMethods.cs
MySql.Data/Provider/Source/common/SharedMemoryStream.cs
MySql.Data/Provider/Source/datareader.cs
MySql.Data/Tests/Source/TimeoutAndCancel.cs
=== modified file 'MySql.Data/Provider/MySql.Data.CF.csproj'
--- a/MySql.Data/Provider/MySql.Data.CF.csproj 2009-07-28 20:40:35 +0000
+++ b/MySql.Data/Provider/MySql.Data.CF.csproj 2009-08-03 16:37:40 +0000
@@ -64,6 +64,7 @@ <Project DefaultTargets="Build" xmlns
<Compile Include="Source\base\DbConnectionStringBuilder.cs" />
<Compile Include="Source\base\DbException.cs" />
<Compile Include="Source\cf\BufferedStream.cs" />
+ <Compile Include="Source\cf\Stopwatch.cs" />
<Compile Include="Source\cf\WinCE.cs" />
<Compile Include="Source\CharSetMap.cs" />
<Compile Include="Source\command.cs">
=== modified file 'MySql.Data/Provider/MySql.Data.csproj'
--- a/MySql.Data/Provider/MySql.Data.csproj 2009-07-28 20:40:35 +0000
+++ b/MySql.Data/Provider/MySql.Data.csproj 2009-08-03 16:37:40 +0000
@@ -246,6 +246,7 @@ <Project DefaultTargets="Build" xmlns
<Compile Include="Source\MySqlPacket.cs" />
<Compile Include="Source\MySqlScript.cs" />
<Compile Include="Source\ResultSet.cs" />
+ <Compile Include="Source\TimedStream.cs" />
<Compile Include="Source\Types\MySqlGuid.cs" />
</ItemGroup>
<ItemGroup>
=== modified file 'MySql.Data/Provider/Source/CompressedStream.cs'
--- a/MySql.Data/Provider/Source/CompressedStream.cs 2009-07-30 13:13:25 +0000
+++ b/MySql.Data/Provider/Source/CompressedStream.cs 2009-08-03 16:37:40 +0000
@@ -108,6 +108,38 @@ namespace MySql.Data.MySqlClient
}
}
+ public override bool CanTimeout
+ {
+ get
+ {
+ return baseStream.CanTimeout;
+ }
+ }
+
+ public override int ReadTimeout
+ {
+ get
+ {
+ return baseStream.ReadTimeout;
+ }
+ set
+ {
+ baseStream.ReadTimeout = value;
+ }
+ }
+
+ public override int WriteTimeout
+ {
+ get
+ {
+ return baseStream.WriteTimeout;
+ }
+ set
+ {
+ baseStream.WriteTimeout = value;
+ }
+ }
+
public override int Read(byte[] buffer, int offset, int count)
{
if (buffer == null)
=== modified file 'MySql.Data/Provider/Source/Driver.cs'
--- a/MySql.Data/Provider/Source/Driver.cs 2009-07-28 20:40:35 +0000
+++ b/MySql.Data/Provider/Source/Driver.cs 2009-08-03 16:37:40 +0000
@@ -372,7 +372,7 @@ namespace MySql.Data.MySqlClient
public abstract bool Ping();
public abstract void CloseStatement(int id);
public abstract void ExecuteDirect(string sql);
-
+ public abstract void ResetTimeout(int timeoutMilliseconds);
#endregion
=== modified file 'MySql.Data/Provider/Source/MySqlPool.cs'
--- a/MySql.Data/Provider/Source/MySqlPool.cs 2009-07-28 20:40:35 +0000
+++ b/MySql.Data/Provider/Source/MySqlPool.cs 2009-08-03 16:37:40 +0000
@@ -115,6 +115,20 @@ namespace MySql.Data.MySqlClient
driver = (Driver)idlePool.Dequeue();
}
+ // Obey the connection timeout
+ if (driver != null)
+ {
+ try
+ {
+ driver.ResetTimeout((int)Settings.ConnectionTimeout * 1000);
+ }
+ catch (Exception)
+ {
+ driver.Close();
+ driver = null;
+ }
+ }
+
if (driver != null)
{
// first check to see that the server is still alive
=== modified file 'MySql.Data/Provider/Source/MySqlStream.cs'
--- a/MySql.Data/Provider/Source/MySqlStream.cs 2009-07-29 11:06:15 +0000
+++ b/MySql.Data/Provider/Source/MySqlStream.cs 2009-08-03 16:37:40 +0000
@@ -37,8 +37,7 @@ namespace MySql.Data.MySqlClient
private MemoryStream bufferStream;
private int maxBlockSize;
private ulong maxPacketSize;
- private Stream inStream;
- private Stream outStream;
+ private TimedStream stream;
private byte[] tempBuffer = new byte[4];
MySqlPacket packet = new MySqlPacket();
@@ -61,19 +60,19 @@ namespace MySql.Data.MySqlClient
public MySqlStream(Stream baseStream, Encoding encoding, bool compress)
: this(encoding)
{
-
- inStream = new BufferedStream(baseStream);
- outStream = baseStream;
if (compress)
{
- inStream = new CompressedStream(inStream);
- outStream = new CompressedStream(outStream);
+ stream = new TimedStream(new CompressedStream(baseStream), true, false);
+ }
+ else
+ {
+ stream = new TimedStream(baseStream, true, false);
}
}
public void Close()
{
- inStream.Close();
+ stream.Close();
// no need to close outStream because closing
// inStream closes the underlying network stream
// for us.
@@ -87,6 +86,11 @@ namespace MySql.Data.MySqlClient
set { encoding = value; }
}
+ public void ResetTimeout(int timeout)
+ {
+ stream.ResetTimeout(timeout);
+ }
+
public byte SequenceByte
{
get { return sequenceByte; }
@@ -177,10 +181,10 @@ namespace MySql.Data.MySqlClient
int offset = 0;
while (true)
{
- int b1 = inStream.ReadByte();
- int b2 = inStream.ReadByte();
- int b3 = inStream.ReadByte();
- int seqByte = inStream.ReadByte();
+ int b1 = stream.ReadByte();
+ int b2 = stream.ReadByte();
+ int b3 = stream.ReadByte();
+ int seqByte = stream.ReadByte();
if (b1 == -1 || b2 == -1 || b3 == -1 || seqByte == -1)
throw new MySqlException(
@@ -192,7 +196,7 @@ namespace MySql.Data.MySqlClient
// make roo for the next block
packet.Length += length;
- ReadFully(inStream, packet.Buffer, offset, length);
+ ReadFully(stream, packet.Buffer, offset, length);
offset += length;
// if this block was < maxBlock then it's last one in a multipacket series
@@ -223,8 +227,8 @@ namespace MySql.Data.MySqlClient
buffer[offset+2] = (byte)((lenToSend >> 16) & 0xff);
buffer[offset+3] = sequenceByte++;
- outStream.Write(buffer, offset, lenToSend + 4);
- outStream.Flush();
+ stream.Write(buffer, offset, lenToSend + 4);
+ stream.Flush();
length -= lenToSend;
offset += lenToSend;
}
@@ -250,8 +254,8 @@ namespace MySql.Data.MySqlClient
buffer[1] = (byte)((count >> 8) & 0xff);
buffer[2] = (byte)((count >> 16) & 0xff);
buffer[3] = sequenceByte++;
- outStream.Write(buffer, 0, count + 4);
- outStream.Flush();
+ stream.Write(buffer, 0, count + 4);
+ stream.Flush();
}
/// <summary>
=== modified file 'MySql.Data/Provider/Source/NativeDriver.cs'
--- a/MySql.Data/Provider/Source/NativeDriver.cs 2009-07-28 20:40:35 +0000
+++ b/MySql.Data/Provider/Source/NativeDriver.cs 2009-08-03 16:37:40 +0000
@@ -220,6 +220,8 @@ namespace MySql.Data.MySqlClient
int maxSinglePacket = 255*255*255;
stream = new MySqlStream(baseStream, encoding, false);
+ stream.ResetTimeout((int)Settings.ConnectionTimeout*1000);
+
// read off the welcome packet and parse out it's values
packet = stream.ReadPacket();
protocol = packet.ReadByte();
@@ -942,5 +944,16 @@ namespace MySql.Data.MySqlClient
stream.SendPacket(packet);
}
- }
+ /// <summary>
+ /// Execution timeout, in milliseconds. When the accumulated time for network IO exceeds this value
+ /// TimeoutException is thrown. This timeout needs to be reset for every new command
+ /// </summary>
+ ///
+ public override void ResetTimeout(int timeout)
+ {
+ stream.ResetTimeout(timeout);
+ }
+
+ }
+
}
=== added file 'MySql.Data/Provider/Source/TimedStream.cs'
--- a/MySql.Data/Provider/Source/TimedStream.cs 1970-01-01 00:00:00 +0000
+++ b/MySql.Data/Provider/Source/TimedStream.cs 2009-08-03 16:37:40 +0000
@@ -0,0 +1,276 @@
+// Copyright (c) 2009 Sun Microsystems, Inc.
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 2 as published by
+// the Free Software Foundation
+//
+// There are special exceptions to the terms and conditions of the GPL
+// as it is applied to this software. View the full text of the
+// exception in file EXCEPTIONS in the directory of this software
+// distribution.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+using System;
+using System.IO;
+using System.Net.Sockets;
+using System.Diagnostics;
+
+namespace MySql.Data.MySqlClient
+{
+ /// <summary>
+ /// Stream that supports timeout of IO operations.
+ /// This class is used is used to support timeouts for SQL command, where a typical operation involves several network reads/writes.
+ /// Timeout here is defined as the accumulated duration of all IO operations.
+ /// </summary>
+ /// <remarks> Any IO exception or timeout exception closes the stream </remarks>
+ internal class TimedStream : Stream
+ {
+ Stream baseStream;
+ Stream inStream;
+ Stream outStream;
+ int timeout;
+ Stopwatch stopwatch;
+ bool isClosed;
+
+
+ /// <summary>
+ /// Construct a TimedStream
+ /// </summary>
+ /// <param name="baseStream"> Undelying stream</param>
+ /// <param name="bufferedInput">If input buffering should be used</param>
+ /// <param name="bufferedOutput">If output buffering should be used</param>
+ public TimedStream(Stream baseStream, bool bufferedInput, bool bufferedOutput)
+ {
+ this.baseStream = baseStream;
+ if (bufferedInput)
+ inStream = new BufferedStream(baseStream);
+ else
+ inStream = baseStream;
+
+ if (bufferedOutput)
+ outStream = new BufferedStream(baseStream);
+ else
+ outStream = baseStream;
+ timeout = System.Threading.Timeout.Infinite;
+ isClosed = false;
+ stopwatch = new Stopwatch();
+ }
+
+ private void StartTimer()
+ {
+ baseStream.ReadTimeout = baseStream.WriteTimeout =
+ timeout;
+ stopwatch.Start();
+ }
+ private void StopTimer()
+ {
+ stopwatch.Stop();
+
+ if (timeout != System.Threading.Timeout.Infinite)
+ {
+ // Normally, a timeout exception would be thrown by stream itself, since we set the read/write timeout
+ // for the stream. However there is a gap between end of IO operation and stopping the stop watch,
+ // and it makes it possible for timeout to exceed even after IO completed successfully.
+ if (stopwatch.ElapsedMilliseconds > timeout)
+ {
+ throw new TimeoutException();
+ }
+ }
+ }
+ public override bool CanRead
+ {
+ get { return baseStream.CanRead; }
+ }
+
+ public override bool CanSeek
+ {
+ get { return baseStream.CanSeek; }
+ }
+
+ public override bool CanWrite
+ {
+ get { return baseStream.CanWrite; }
+ }
+
+ public override void Flush()
+ {
+ try
+ {
+ StartTimer();
+ outStream.Flush();
+ StopTimer();
+ }
+ catch (Exception e)
+ {
+ HandleTimeout(e);
+ }
+ }
+
+ public override long Length
+ {
+ get { return baseStream.Length; }
+ }
+
+ public override long Position
+ {
+ get
+ {
+ return baseStream.Position;
+ }
+ set
+ {
+ baseStream.Position = value;
+ if (inStream != baseStream)
+ {
+ inStream.Position = value;
+ }
+ if (outStream != baseStream)
+ {
+ outStream.Position = value;
+ }
+ }
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ try
+ {
+ StartTimer();
+ int retval = inStream.Read(buffer, offset, count);
+ StopTimer();
+ return retval;
+ }
+ catch (Exception e)
+ {
+ HandleTimeout(e);
+ throw;
+ }
+ }
+
+ public override int ReadByte()
+ {
+ try
+ {
+ StartTimer();
+ int retval = inStream.ReadByte();
+ StopTimer();
+ return retval;
+ }
+ catch (Exception e)
+ {
+ HandleTimeout(e);
+ throw;
+ }
+ }
+
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ return baseStream.Seek(offset, origin);
+ }
+
+ public override void SetLength(long value)
+ {
+ baseStream.SetLength(value);
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ try
+ {
+ StartTimer();
+ outStream.Write(buffer, offset, count);
+ StopTimer();
+ }
+ catch (Exception e)
+ {
+ HandleTimeout(e);
+ throw;
+ }
+ }
+
+ public override bool CanTimeout
+ {
+ get { return baseStream.CanTimeout; }
+ }
+
+ public override int ReadTimeout
+ {
+ get { return inStream.ReadTimeout; }
+ set { inStream.ReadTimeout = value; }
+ }
+ public override int WriteTimeout
+ {
+ get { return outStream.WriteTimeout; }
+ set { outStream.WriteTimeout = value; }
+ }
+
+ public override void Close()
+ {
+ if (isClosed)
+ return;
+ baseStream.Close();
+ if (inStream != baseStream)
+ {
+ inStream.Close();
+ }
+ if (outStream != baseStream)
+ {
+ outStream.Close();
+ }
+ isClosed = true;
+ }
+
+ public void ResetTimeout(int newTimeout)
+ {
+
+ if (newTimeout == System.Threading.Timeout.Infinite)
+ timeout = newTimeout;
+ else if (newTimeout < 0)
+ throw new ArgumentException("Invalid timeout value");
+ else if (newTimeout == 0)
+ timeout = System.Threading.Timeout.Infinite;
+ else
+ timeout = newTimeout;
+ stopwatch.Reset();
+ }
+
+
+ /// <summary>
+ /// Examine the exception chain for exceptions throw during read/write operations
+ /// If timeout exception is detected in the chain, new TimeoutException is thrown.
+ /// </summary>
+ /// <param name="e">original exception</param>
+ void HandleTimeout(Exception e)
+ {
+ stopwatch.Stop();
+ Close();
+ Exception currentException = e;
+
+ while (currentException != null)
+ {
+ if (currentException is SocketException)
+ {
+ SocketException socketException = (SocketException)currentException;
+#if CF
+ if (socketException.NativeErrorCode == 10060)
+#else
+ if (socketException.SocketErrorCode == SocketError.TimedOut)
+#endif
+ {
+ throw new TimeoutException(e.Message, e);
+ }
+ }
+ currentException = currentException.InnerException;
+ }
+ }
+
+ }
+}
=== added file 'MySql.Data/Provider/Source/cf/Stopwatch.cs'
--- a/MySql.Data/Provider/Source/cf/Stopwatch.cs 1970-01-01 00:00:00 +0000
+++ b/MySql.Data/Provider/Source/cf/Stopwatch.cs 2009-08-03 16:37:40 +0000
@@ -0,0 +1,52 @@
+// Copyright (c) 2009 Sun Microsystems, Inc.
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 2 as published by
+// the Free Software Foundation
+//
+// There are special exceptions to the terms and conditions of the GPL
+// as it is applied to this software. View the full text of the
+// exception in file EXCEPTIONS in the directory of this software
+// distribution.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+using System;
+
+namespace MySql.Data.Common
+{
+ class Stopwatch
+ {
+ long millis;
+ DateTime startTime;
+ public Stopwatch()
+ {
+ millis = 0;
+ }
+ public long ElapsedMilliseconds
+ {
+ get { return millis; }
+ }
+ public void Start()
+ {
+ startTime = DateTime.Now;
+ }
+
+ public void Stop()
+ {
+ millis += (long)DateTime.Now.Subtract(startTime).TotalMilliseconds;
+ }
+
+ public void Reset()
+ {
+ millis = 0;
+ }
+ }
+}
=== modified file 'MySql.Data/Provider/Source/command.cs'
--- a/MySql.Data/Provider/Source/command.cs 2009-07-28 20:40:35 +0000
+++ b/MySql.Data/Provider/Source/command.cs 2009-08-03 16:37:40 +0000
@@ -332,28 +332,6 @@ namespace MySql.Data.MySqlClient
return ExecuteReader(CommandBehavior.Default);
}
- private void TimeoutExpired(object commandObject)
- {
- MySqlCommand cmd = (commandObject as MySqlCommand);
- if (cmd == null)
- {
- Logger.LogWarning(Resources.TimeoutExpiredNullObject);
- return;
- }
-
- cmd.timedOut = true;
- try
- {
- cmd.Cancel();
- }
- catch (Exception ex)
- {
- // if something goes wrong, we log it and eat it. There's really nothing
- // else we can do.
- if (connection.Settings.Logging)
- Logger.LogException(ex);
- }
- }
/// <include file='docs/mysqlcommand.xml' path='docs/ExecuteReader1/*'/>
public new MySqlDataReader ExecuteReader(CommandBehavior behavior)
@@ -398,34 +376,28 @@ namespace MySql.Data.MySqlClient
HandleCommandBehaviors(behavior);
updatedRowCount = -1;
-
- Timer timer = null;
try
{
+ connection.driver.ResetTimeout(commandTimeout * 1000);
MySqlDataReader reader = new MySqlDataReader(this, statement, behavior);
-
- // start a threading timer on our command timeout
- timedOut = false;
- canceled = false;
-
// execute the statement
statement.Execute();
-
- // start a timeout timer
- if (connection.driver.Version.isAtLeast(5, 0, 0) &&
- commandTimeout > 0)
- {
- TimerCallback timerDelegate =
- new TimerCallback(TimeoutExpired);
- timer = new Timer(timerDelegate, this, this.CommandTimeout * 1000, Timeout.Infinite);
- }
-
// wait for data to return
reader.NextResult();
-
connection.Reader = reader;
return reader;
}
+ catch (TimeoutException tex)
+ {
+ try
+ {
+ Connection.Abort();
+ }
+ catch (Exception)
+ {
+ }
+ throw new MySqlException(Resources.Timeout,true, tex);
+ }
catch (MySqlException ex)
{
try
@@ -437,8 +409,6 @@ namespace MySql.Data.MySqlClient
// if we caught an exception because of a cancel, then just return null
if (ex.Number == 1317)
{
- if (TimedOut)
- throw new MySqlException(Resources.Timeout);
return null;
}
if (ex.IsFatal)
@@ -447,11 +417,6 @@ namespace MySql.Data.MySqlClient
throw new MySqlException(Resources.FatalErrorDuringExecute, ex);
throw;
}
- finally
- {
- if (timer != null)
- timer.Dispose();
- }
}
/// <include file='docs/mysqlcommand.xml' path='docs/ExecuteScalar/*'/>
=== modified file 'MySql.Data/Provider/Source/common/NamedPipeStream.cs'
--- a/MySql.Data/Provider/Source/common/NamedPipeStream.cs 2009-07-28 20:40:35 +0000
+++ b/MySql.Data/Provider/Source/common/NamedPipeStream.cs 2009-08-03 16:37:40 +0000
@@ -1,4 +1,4 @@
-// Copyright (c) 2004-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
+// Copyright (c) 2009 Sun Microsystems, Inc.
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 2 as published by
@@ -22,176 +22,174 @@ using System;
using System.IO;
using MySql.Data.MySqlClient;
using MySql.Data.MySqlClient.Properties;
+using Microsoft.Win32.SafeHandles;
+using System.Threading;
+using System.Diagnostics;
+
namespace MySql.Data.Common
{
- /// <summary>
- /// Summary description for API.
- /// </summary>
- internal class NamedPipeStream : Stream
- {
- int pipeHandle;
- FileAccess _mode;
-
- public NamedPipeStream(string host, FileAccess mode)
- {
- Open(host, mode);
- }
-
- public void Open( string host, FileAccess mode )
- {
- _mode = mode;
- uint pipemode = 0;
-
- if ((mode & FileAccess.Read) > 0)
- pipemode |= NativeMethods.GENERIC_READ;
- if ((mode & FileAccess.Write) > 0)
- pipemode |= NativeMethods.GENERIC_WRITE;
-
- pipeHandle = NativeMethods.CreateFile( host, pipemode,
- 0, null, NativeMethods.OPEN_EXISTING, 0, 0 );
- }
-
- public override bool CanRead
- {
- get { return (_mode & FileAccess.Read) > 0; }
- }
-
- public override bool CanWrite
- {
- get { return (_mode & FileAccess.Write) > 0; }
- }
-
- public override bool CanSeek
- {
- get { throw new NotSupportedException(Resources.NamedPipeNoSeek); }
- }
-
- public override long Length
- {
- get { throw new NotSupportedException(Resources.NamedPipeNoSeek); }
- }
-
- public override long Position
- {
- get { throw new NotSupportedException(Resources.NamedPipeNoSeek); }
- set { }
- }
-
- public override void Flush()
- {
- if (pipeHandle != 0)
- NativeMethods.FlushFileBuffers((IntPtr)pipeHandle);
- }
-
- public override int Read(byte[] buffer, int offset, int count)
- {
- if (buffer == null)
- throw new ArgumentNullException("buffer",
- Resources.BufferCannotBeNull);
- if (buffer.Length < (offset + count))
- throw new ArgumentException(
- Resources.BufferNotLargeEnough);
- if (offset < 0)
- throw new ArgumentOutOfRangeException("offset", offset,
- Resources.OffsetCannotBeNegative);
- if (count < 0)
- throw new ArgumentOutOfRangeException("count", count,
- Resources.CountCannotBeNegative);
- if (! CanRead)
- throw new NotSupportedException(Resources.StreamNoRead);
- if (pipeHandle == 0)
- throw new ObjectDisposedException("NamedPipeStream",
- Resources.StreamAlreadyClosed);
-
- // first read the data into an internal buffer since ReadFile cannot read into a buf at
- // a specified offset
- uint read;
- byte[] buf = new Byte[count];
- bool result = NativeMethods.ReadFile((IntPtr)pipeHandle, buf,
- (uint)count, out read, IntPtr.Zero);
-
- if (! result)
- {
- Close();
- throw new MySqlException(Resources.ReadFromStreamFailed, true, null);
- }
-
- Array.Copy(buf, (int)0, buffer, (int)offset, (int)read);
- return (int)read;
- }
-
- public override void Close()
- {
- if (pipeHandle != 0)
- {
- NativeMethods.CloseHandle((IntPtr)pipeHandle);
- pipeHandle = 0;
- }
- }
-
- public override void SetLength(long length)
- {
- throw new NotSupportedException(Resources.NamedPipeNoSetLength);
- }
-
- public override void Write(byte[] buffer, int offset, int count)
- {
- if (buffer == null)
- throw new ArgumentNullException("buffer", Resources.BufferCannotBeNull);
- if (buffer.Length < (offset + count))
- throw new ArgumentException(Resources.BufferNotLargeEnough, "buffer");
- if (offset < 0)
- throw new ArgumentOutOfRangeException("offset", offset,
- Resources.OffsetCannotBeNegative);
- if (count < 0)
- throw new ArgumentOutOfRangeException("count", count,
- Resources.CountCannotBeNegative);
- if (! CanWrite)
- throw new NotSupportedException(Resources.StreamNoWrite);
- if (pipeHandle == 0)
- throw new ObjectDisposedException("NamedPipeStream",
- Resources.StreamAlreadyClosed);
-
- // copy data to internal buffer to allow writing from a specified offset
- uint bytesWritten = 0;
- bool result;
-
- if (offset == 0 && count <= 65535)
- result = NativeMethods.WriteFile((IntPtr)pipeHandle, buffer, (uint)count, out bytesWritten, IntPtr.Zero);
- else
- {
- byte[] localBuf = new byte[65535];
-
- result = true;
- while (count != 0 && result)
- {
- uint thisWritten;
- int cnt = Math.Min(count, 65535);
- Array.Copy( buffer, offset, localBuf, 0, cnt );
- result = NativeMethods.WriteFile((IntPtr)pipeHandle, localBuf, (uint)cnt, out thisWritten, IntPtr.Zero);
- bytesWritten += thisWritten;
- count -= cnt;
- offset += cnt;
- }
- }
-
- if (! result)
- {
- Close();
- throw new MySqlException(Resources.WriteToStreamFailed, true, null);
- }
- if (bytesWritten < count)
- throw new IOException("Unable to write entire buffer to stream");
- }
-
- public override long Seek( long offset, SeekOrigin origin )
- {
- throw new NotSupportedException(Resources.NamedPipeNoSeek);
- }
+ /// <summary>
+ /// Summary description for API.
+ /// </summary>
+ internal class NamedPipeStream : Stream
+ {
+
+ Stream fileStream;
+ int readTimeout = Timeout.Infinite;
+ int writeTimeout = Timeout.Infinite;
+
+
+ public NamedPipeStream(string path, FileAccess mode)
+ {
+ Open(path, mode);
+ }
+
+ public void Open( string path, FileAccess mode )
+ {
+ SafeFileHandle handle = new SafeFileHandle(NativeMethods.CreateFile(path, NativeMethods.GENERIC_READ | NativeMethods.GENERIC_WRITE,
+ 0, null, NativeMethods.OPEN_EXISTING, NativeMethods.FILE_FLAG_OVERLAPPED, 0),true);
+ fileStream = new FileStream(handle, mode,4096 , true);
+ }
+
+ public override bool CanRead
+ {
+ get { return fileStream.CanRead; }
+ }
+
+ public override bool CanWrite
+ {
+ get { return fileStream.CanWrite; }
+ }
+
+ public override bool CanSeek
+ {
+ get { throw new NotSupportedException(Resources.NamedPipeNoSeek); }
+ }
+
+ public override long Length
+ {
+ get { throw new NotSupportedException(Resources.NamedPipeNoSeek); }
+ }
+
+ public override long Position
+ {
+ get { throw new NotSupportedException(Resources.NamedPipeNoSeek); }
+ set { }
+ }
+
+ public override void Flush()
+ {
+ fileStream.Flush();
+ }
- internal static NamedPipeStream Create(string pipeName, string hostname)
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ if(readTimeout == Timeout.Infinite)
+ {
+ return fileStream.Read(buffer, offset, count);
+ }
+ IAsyncResult result = fileStream.BeginRead(buffer, offset, count, null, null);
+ if (result.CompletedSynchronously)
+ return fileStream.EndRead(result);
+ int timeLeft = readTimeout;
+
+ Stopwatch stopwatch = new Stopwatch();
+ stopwatch.Start();
+
+ while(!result.IsCompleted)
+ {
+ bool signaled = result.AsyncWaitHandle.WaitOne(readTimeout);
+ if (!signaled)
+ throw new TimeoutException();
+ timeLeft -= (int)stopwatch.ElapsedMilliseconds;
+ if (timeLeft < 0)
+ throw new TimeoutException();
+ }
+ int bytesRead = fileStream.EndRead(result);
+
+ return bytesRead;
+ }
+
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ if (readTimeout == Timeout.Infinite)
+ {
+ fileStream.Write(buffer, offset, count);
+ return;
+ }
+ IAsyncResult result = fileStream.BeginWrite(buffer, offset, count, null, null);
+ if (result.CompletedSynchronously)
+ {
+ fileStream.EndWrite(result);
+ }
+ int timeLeft = writeTimeout;
+
+ Stopwatch stopwatch = new Stopwatch();
+ stopwatch.Start();
+ while (!result.IsCompleted)
+ {
+ bool signaled = result.AsyncWaitHandle.WaitOne(timeLeft);
+ if (!signaled)
+ throw new TimeoutException();
+ timeLeft -= (int)stopwatch.ElapsedMilliseconds;
+ if (timeLeft < 0)
+ throw new TimeoutException();
+ }
+ fileStream.EndWrite(result);
+ }
+
+ public override void Close()
+ {
+ fileStream.Close();
+ }
+
+ public override void SetLength(long length)
+ {
+ throw new NotSupportedException(Resources.NamedPipeNoSetLength);
+ }
+
+
+ public override bool CanTimeout
+ {
+ get
+ {
+ return true;
+ }
+ }
+
+ public override int ReadTimeout
+ {
+ get
+ {
+ return readTimeout;
+ }
+ set
+ {
+ readTimeout = value;
+ }
+ }
+
+ public override int WriteTimeout
+ {
+ get
+ {
+ return writeTimeout;
+ }
+ set
+ {
+ writeTimeout = value;
+ }
+ }
+
+ public override long Seek( long offset, SeekOrigin origin )
+ {
+ throw new NotSupportedException(Resources.NamedPipeNoSeek);
+ }
+
+ internal static Stream Create(string pipeName, string hostname)
{
string pipePath;
if (0 == String.Compare(hostname, "localhost", true))
@@ -200,7 +198,7 @@ namespace MySql.Data.Common
pipePath = String.Format(@"\\{0}\pipe\{1}", hostname, pipeName);
return new NamedPipeStream(pipePath, FileAccess.ReadWrite);
}
- }
+ }
}
=== modified file 'MySql.Data/Provider/Source/common/NativeMethods.cs'
--- a/MySql.Data/Provider/Source/common/NativeMethods.cs 2009-04-21 18:02:13 +0000
+++ b/MySql.Data/Provider/Source/common/NativeMethods.cs 2009-08-03 16:37:40 +0000
@@ -58,7 +58,7 @@ namespace MySql.Data.Common
}
[DllImport("Kernel32", CharSet=CharSet.Unicode)]
- static extern public int CreateFile(
+ static extern public IntPtr CreateFile(
String fileName,
uint desiredAccess,
uint shareMode,
=== modified file 'MySql.Data/Provider/Source/common/SharedMemoryStream.cs'
--- a/MySql.Data/Provider/Source/common/SharedMemoryStream.cs 2009-07-28 20:40:35 +0000
+++ b/MySql.Data/Provider/Source/common/SharedMemoryStream.cs 2009-08-03 16:37:40 +0000
@@ -51,6 +51,8 @@ namespace MySql.Data.Common
// private const uint EVENT_ALL_ACCESS = 0x001F0003;
private const uint FILE_MAP_WRITE = 0x2;
private const int BUFFERLENGTH = 16004;
+ private int readTimeout = System.Threading.Timeout.Infinite;
+ private int writeTimeout = System.Threading.Timeout.Infinite;
public SharedMemoryStream(string memName)
{
@@ -74,6 +76,17 @@ namespace MySql.Data.Common
AutoResetEvent connectRequest = new AutoResetEvent(false);
IntPtr handle = NativeMethods.OpenEvent(SYNCHRONIZE | EVENT_MODIFY_STATE, false,
memoryName + "_" + "CONNECT_REQUEST");
+ if (handle == IntPtr.Zero)
+ {
+ // If server runs as service, its shared memory is global
+ // And if connector runs in user session, it needs to prefix
+ // shared memory name with "Global\"
+ string prefixedMemoryName= @"Global\" + memoryName;
+ handle = NativeMethods.OpenEvent(SYNCHRONIZE | EVENT_MODIFY_STATE, false,
+ prefixedMemoryName + "_" + "CONNECT_REQUEST");
+ if (handle != IntPtr.Zero)
+ memoryName = prefixedMemoryName;
+ }
connectRequest.SafeWaitHandle = new SafeWaitHandle(handle, true);
AutoResetEvent connectAnswer = new AutoResetEvent(false);
@@ -181,11 +194,20 @@ namespace MySql.Data.Common
public override int Read(byte[] buffer, int offset, int count)
{
+ Stopwatch stopwatch = new Stopwatch();
+ int timeLeft = readTimeout;
while (bytesLeft == 0)
{
- while (!serverWrote.WaitOne(500, false))
+ if (IsClosed()) return 0;
+ if (!serverWrote.WaitOne(timeLeft, false))
+ {
+ throw new TimeoutException();
+ }
+ if (readTimeout != System.Threading.Timeout.Infinite)
{
- if (IsClosed()) return 0;
+ timeLeft = readTimeout - (int)stopwatch.ElapsedMilliseconds;
+ if (timeLeft < 0)
+ throw new TimeoutException();
}
bytesLeft = Marshal.ReadInt32(dataView);
@@ -216,11 +238,19 @@ namespace MySql.Data.Common
int leftToDo = count;
int buffPos = offset;
+ Stopwatch stopwatch = new Stopwatch();
+ int timeLeft = writeTimeout;
+
while (leftToDo > 0)
{
- if (!serverRead.WaitOne())
- throw new MySqlException("Writing to shared memory failed");
-
+ if (!serverRead.WaitOne(timeLeft))
+ throw new TimeoutException();
+ if (writeTimeout != System.Threading.Timeout.Infinite)
+ {
+ timeLeft = writeTimeout - (int)stopwatch.ElapsedMilliseconds;
+ if (timeLeft < 0)
+ throw new TimeoutException();
+ }
int bytesToDo = Math.Min(leftToDo, BUFFERLENGTH);
long baseMem = dataView.ToInt64() + 4;
@@ -237,6 +267,39 @@ namespace MySql.Data.Common
{
throw new NotSupportedException("SharedMemoryStream does not support seeking");
}
- }
+
+ public override bool CanTimeout
+ {
+ get
+ {
+ return true;
+ }
+ }
+
+ public override int ReadTimeout
+ {
+ get
+ {
+ return readTimeout;
+ }
+ set
+ {
+ readTimeout = value;
+ }
+ }
+
+ public override int WriteTimeout
+ {
+ get
+ {
+ return writeTimeout;
+ }
+ set
+ {
+ writeTimeout = value;
+ }
+ }
+
+ }
#endif
}
=== modified file 'MySql.Data/Provider/Source/datareader.cs'
--- a/MySql.Data/Provider/Source/datareader.cs 2009-07-28 20:40:35 +0000
+++ b/MySql.Data/Provider/Source/datareader.cs 2009-08-03 16:37:40 +0000
@@ -818,13 +818,22 @@ namespace MySql.Data.MySqlClient
// single result means we only return a single resultset. If we have already
// returned one, then we return false;
if (resultSet.ResultsIndex == 0 && (commandBehavior & CommandBehavior.SingleResult) != 0)
+ {
+ // Command is completed, clear the IO timeouts for the stream
+ connection.driver.ResetTimeout(0);
return false;
+ }
// tell our command to continue execution of the SQL batch until it its
// another resultset
try
{
- if (!resultSet.NextResult()) return false;
+ if (!resultSet.NextResult())
+ {
+ // Command is completed, clear the IO timeouts for the stream
+ connection.driver.ResetTimeout(0);
+ return false;
+ }
// issue any requested UA warnings
if (connection.Settings.UseUsageAdvisor)
@@ -841,8 +850,6 @@ namespace MySql.Data.MySqlClient
{
if (ex.IsFatal)
connection.Abort();
- if (command.TimedOut)
- throw new MySqlException(Resources.Timeout);
if (ex.Number == 0)
throw new MySqlException(Resources.FatalErrorReadingResult, ex);
throw;
=== modified file 'MySql.Data/Tests/Source/TimeoutAndCancel.cs'
--- a/MySql.Data/Tests/Source/TimeoutAndCancel.cs 2009-04-23 01:08:24 +0000
+++ b/MySql.Data/Tests/Source/TimeoutAndCancel.cs 2009-08-03 16:37:40 +0000
@@ -109,8 +109,7 @@ namespace MySql.Data.MySqlClient.Tests
stateChangeCount++;
}
- [Test]
- public void TimeoutExpiring()
+ private void TimeoutExpiring(MySqlConnectionProtocol protocol)
{
if (version < new Version(5, 0)) return;
@@ -120,25 +119,49 @@ namespace MySql.Data.MySqlClient.Tests
SELECT SLEEP(duration);
END");
- DateTime start = DateTime.Now;
- try
+ MySqlConnectionStringBuilder builder = new MySqlConnectionStringBuilder(conn.ConnectionString);
+ builder.ConnectionProtocol = protocol;
+ using (MySqlConnection connection = new MySqlConnection(builder.ConnectionString))
{
- MySqlCommand cmd = new MySqlCommand("spTest", conn);
- cmd.Parameters.AddWithValue("duration", 60);
- cmd.CommandType = CommandType.StoredProcedure;
- cmd.CommandTimeout = 5;
- cmd.ExecuteNonQuery();
- Assert.Fail("Should not get to this point");
- }
- catch (MySqlException ex)
- {
- TimeSpan ts = DateTime.Now.Subtract(start);
- Assert.IsTrue(ts.TotalSeconds <= 10);
- Assert.IsTrue(ex.Message.StartsWith("Timeout expired"), "Message is wrong");
+ connection.Open();
+
+ DateTime start = DateTime.Now;
+ try
+ {
+ MySqlCommand cmd = new MySqlCommand("spTest", connection);
+ cmd.Parameters.AddWithValue("duration", 60);
+ cmd.CommandType = CommandType.StoredProcedure;
+ cmd.CommandTimeout = 5;
+ cmd.ExecuteNonQuery();
+ Assert.Fail("Should not get to this point");
+ }
+ catch (MySqlException ex)
+ {
+ TimeSpan ts = DateTime.Now.Subtract(start);
+ Assert.IsTrue(ts.TotalSeconds <= 10);
+ Assert.IsTrue(ex.Message.StartsWith("Timeout expired"), "Message is wrong");
+ }
}
}
[Test]
+ public void TimeoutExpiringSockets()
+ {
+ TimeoutExpiring(MySqlConnectionProtocol.Sockets);
+ }
+
+ [Test]
+ public void TimeoutExpiringSharedMemory()
+ {
+ TimeoutExpiring(MySqlConnectionProtocol.SharedMemory);
+ }
+
+ [Test]
+ public void TimeoutExpiringNamedPipe()
+ {
+ TimeoutExpiring(MySqlConnectionProtocol.NamedPipe);
+ }
+
public void TimeoutNotExpiring()
{
if (Version < new Version(5, 0)) return;
@@ -149,6 +172,7 @@ namespace MySql.Data.MySqlClient.Tests
SELECT SLEEP(duration);
END");
+ conn.Open();
MySqlCommand cmd = new MySqlCommand("spTest", conn);
cmd.Parameters.AddWithValue("duration", 10);
cmd.CommandType = CommandType.StoredProcedure;
@@ -230,16 +254,23 @@ namespace MySql.Data.MySqlClient.Tests
c.Open();
string connStr1 = c.ConnectionString;
- MySqlCommand cmd = new MySqlCommand("SELECT SLEEP(10)", c);
- cmd.CommandTimeout = 5;
+ MySqlCommand cmd = new MySqlCommand("SELECT SLEEP(2)", c);
+ cmd.CommandTimeout = 1;
- using (MySqlDataReader reader = cmd.ExecuteReader())
+ try
+ {
+ using (MySqlDataReader reader = cmd.ExecuteReader())
+ {
+ }
+ }
+ catch (MySqlException ex)
{
- string connStr2 = c.ConnectionString.ToLower(CultureInfo.InvariantCulture);
- Assert.AreEqual(-1, connStr2.IndexOf("pooling=true"));
- Assert.AreEqual(-1, connStr2.IndexOf("pooling=false"));
- reader.Read();
+ Assert.IsTrue(ex.InnerException is TimeoutException);
+ Assert.IsTrue(c.State == ConnectionState.Closed);
}
+ string connStr2 = c.ConnectionString.ToLower(CultureInfo.InvariantCulture);
+ Assert.AreEqual(-1, connStr2.IndexOf("pooling=true"));
+ Assert.AreEqual(-1, connStr2.IndexOf("pooling=false"));
}
}
}
Attachment: [text/bzr-bundle] bzr/vvaintroub@mysql.com-20090803163740-aqgbxvr4ubvf4dne.bundle
| Thread |
|---|
| • bzr commit into connector-net-trunk branch (vvaintroub:717) | Vladislav Vaintroub | 3 Aug |