#At file:///H:/connector_net/trunk-asyncio/ based on revid:vvaintroub@stripped
922 Vladislav Vaintroub 2010-08-16
Patch to use asynchronous IO in BeginExecuteReader()
modified:
MySql.Data/Provider/Properties/Resources.Designer.cs
MySql.Data/Provider/Properties/Resources.resx
MySql.Data/Provider/Source/CompressedStream.cs
MySql.Data/Provider/Source/Driver.cs
MySql.Data/Provider/Source/MySqlStream.cs
MySql.Data/Provider/Source/MySqlTrace.cs
MySql.Data/Provider/Source/NativeDriver.cs
MySql.Data/Provider/Source/TimedStream.cs
MySql.Data/Provider/Source/TracingDriver.cs
MySql.Data/Provider/Source/command.cs
MySql.Data/Provider/Source/common/NamedPipeStream.cs
MySql.Data/Provider/Source/common/SharedMemoryStream.cs
MySql.Data/Provider/Source/datareader.cs
=== modified file 'MySql.Data/Provider/Properties/Resources.Designer.cs'
--- a/MySql.Data/Provider/Properties/Resources.Designer.cs 2010-04-27 18:31:24 +0000
+++ b/MySql.Data/Provider/Properties/Resources.Designer.cs 2010-08-16 14:55:33 +0000
@@ -938,6 +938,15 @@ namespace MySql.Data.MySqlClient.Propert
}
/// <summary>
+ /// Looks up a localized string similar to {0}: Asynchronous Resultset open.
+ /// </summary>
+ public static string TraceResultAsyncOpen {
+ get {
+ return ResourceManager.GetString("TraceResultAsyncOpen", resourceCulture);
+ }
+ }
+
+ /// <summary>
/// Looks up a localized string similar to {0}: Resultset Closed. Total rows={1}, skipped rows={2}, size (bytes)={3}.
/// </summary>
public static string TraceResultClosed {
=== modified file 'MySql.Data/Provider/Properties/Resources.resx'
--- a/MySql.Data/Provider/Properties/Resources.resx 2010-03-11 21:04:56 +0000
+++ b/MySql.Data/Provider/Properties/Resources.resx 2010-08-16 14:55:33 +0000
@@ -453,10 +453,14 @@
<data name="UnableToEnableQueryAnalysis" xml:space="preserve">
<value>Unable to enable query analysis. Be sure the MySql.Data.EMTrace assembly is properly located and registered.</value>
</data>
+ <assembly alias="System.Windows.Forms" name="System.Windows.Forms, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" />
<data name="keywords" type="System.Resources.ResXFileRef, System.Windows.Forms">
<value>keywords.txt;System.String, mscorlib, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089;Windows-1252</value>
</data>
<data name="TraceQueryNormalized" xml:space="preserve">
<value>{0}: Query Normalized: {2}</value>
</data>
+ <data name="TraceResultAsyncOpen" xml:space="preserve">
+ <value>{0}: Asynchronous Resultset open</value>
+ </data>
</root>
\ No newline at end of file
=== modified file 'MySql.Data/Provider/Source/CompressedStream.cs'
--- a/MySql.Data/Provider/Source/CompressedStream.cs 2010-08-12 15:25:45 +0000
+++ b/MySql.Data/Provider/Source/CompressedStream.cs 2010-08-16 14:55:33 +0000
@@ -23,6 +23,7 @@ using System.IO;
using zlib;
using MySql.Data.MySqlClient.Properties;
using MySql.Data.Common;
+using System.Diagnostics;
namespace MySql.Data.MySqlClient
{
@@ -140,20 +141,75 @@ namespace MySql.Data.MySqlClient
}
}
- public override int Read(byte[] buffer, int offset, int count)
+ internal class CompressedReadAsyncResult:IAsyncResult
{
- if (buffer == null)
- throw new ArgumentNullException("buffer", Resources.BufferCannotBeNull);
- if (offset < 0 || offset >= buffer.Length)
- throw new ArgumentOutOfRangeException("offset", Resources.OffsetMustBeValid);
- if ((offset + count) > buffer.Length)
- throw new ArgumentException(Resources.BufferNotLargeEnough, "buffer");
+ public byte[] buffer;
+ public int offset;
+ public int count;
+ public IAsyncResult asyncResult;
+
+ #region IAsyncResult Members
+
+ public object AsyncState
+ {
+ get { return asyncResult.AsyncState; }
+ }
+
+ public System.Threading.WaitHandle AsyncWaitHandle
+ {
+ get { return asyncResult.AsyncWaitHandle; }
+ }
+
+ public bool CompletedSynchronously
+ {
+ get { return asyncResult.CompletedSynchronously; }
+ }
+
+ public bool IsCompleted
+ {
+ get { return asyncResult.IsCompleted; }
+ }
+
+ public CompressedReadAsyncResult(byte[] buffer, int offset, int count)
+ {
+ this.buffer = buffer;
+ this.offset = offset;
+ this.count = count;
+ }
+
+ #endregion
+ }
+
+ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
+ {
+ if (inPos == maxInPos)
+ {
+ CompressedReadAsyncResult ar = new CompressedReadAsyncResult(buffer, offset, count);
+ ar.asyncResult = baseStream.BeginRead(lengthBytes, 0, 7, callback, state);
+ return ar;
+ }
+ return base.BeginRead(buffer, offset, count, callback, state);
+ }
+
+ public override int EndRead(IAsyncResult ar)
+ {
+ if (ar is CompressedReadAsyncResult)
+ {
+ CompressedReadAsyncResult myar = (CompressedReadAsyncResult)ar;
+ int bytesRead = baseStream.EndRead(myar.asyncResult);
+ return EndRead(myar.buffer, myar.offset, myar.count, bytesRead);
+ }
+ return base.EndRead(ar);
+ }
+
+ int EndRead(byte[] buffer, int offset, int count, int bytesRead)
+ {
+ int countRead;
if (inPos == maxInPos)
- PrepareNextPacket();
+ PrepareNextPacket(bytesRead);
int countToRead = Math.Min(count, maxInPos - inPos);
- int countRead;
if (zInStream != null)
countRead = zInStream.read(buffer, offset, countToRead);
else
@@ -174,9 +230,26 @@ namespace MySql.Data.MySqlClient
return countRead;
}
- private void PrepareNextPacket()
+
+ public override int Read(byte[] buffer, int offset, int count)
{
- MySqlStream.ReadFully(baseStream, lengthBytes, 0, 7);
+ if (buffer == null)
+ throw new ArgumentNullException("buffer", Resources.BufferCannotBeNull);
+ if (offset < 0 || offset >= buffer.Length)
+ throw new ArgumentOutOfRangeException("offset", Resources.OffsetMustBeValid);
+ if ((offset + count) > buffer.Length)
+ throw new ArgumentException(Resources.BufferNotLargeEnough, "buffer");
+ return EndRead(buffer, offset, count, 0);
+ }
+
+
+ private void PrepareNextPacket(int offset)
+ {
+ Debug.Assert(offset <= 7);
+ if (offset < 7)
+ {
+ MySqlStream.ReadFully(baseStream, lengthBytes, offset, 7-offset);
+ }
int compressedLength = lengthBytes[0] + (lengthBytes[1] << 8) + (lengthBytes[2] << 16);
// lengthBytes[3] is seq
int unCompressedLength = lengthBytes[4] + (lengthBytes[5] << 8) +
=== modified file 'MySql.Data/Provider/Source/Driver.cs'
--- a/MySql.Data/Provider/Source/Driver.cs 2010-06-10 19:27:42 +0000
+++ b/MySql.Data/Provider/Source/Driver.cs 2010-08-16 14:55:33 +0000
@@ -343,8 +343,29 @@ namespace MySql.Data.MySqlClient
return null;
firstResult = false;
- int affectedRows = -1, insertedId = -1, warnings = 0;
- int fieldCount = GetResult(statementId, ref affectedRows, ref insertedId);
+ return InternalEndNextResult(statementId, null);
+ }
+
+ public virtual IAsyncResult BeginNextResult(int statementId, object state, AsyncCallback callback)
+ {
+ if (!firstResult && !HasStatus(ServerStatusFlags.AnotherQuery | ServerStatusFlags.MoreResults))
+ return null;
+ firstResult = false;
+ return BeginGetResult(statementId, state, callback);
+ }
+
+ private ResultSet InternalEndNextResult(int statementId, IAsyncResult asyncResult)
+ {
+ int affectedRows = -1, insertedId = -1;
+ int fieldCount;
+ if (asyncResult == null)
+ {
+ fieldCount = GetResult(statementId, ref affectedRows, ref insertedId);
+ }
+ else
+ {
+ fieldCount = EndGetResult(statementId, ref affectedRows, ref insertedId, asyncResult);
+ }
if (fieldCount == -1)
return null;
if (fieldCount > 0)
@@ -353,11 +374,26 @@ namespace MySql.Data.MySqlClient
return new ResultSet(affectedRows, insertedId);
}
+ public virtual ResultSet EndNextResult(int statementId, IAsyncResult asyncResult)
+ {
+ return InternalEndNextResult(statementId, asyncResult);
+ }
protected virtual int GetResult(int statementId, ref int affectedRows, ref int insertedId)
{
return handler.GetResult(ref affectedRows, ref insertedId);
}
+ protected virtual IAsyncResult BeginGetResult(int statementId, object state, AsyncCallback callback)
+ {
+ return handler.BeginGetResult(state, callback);
+ }
+
+ protected virtual int EndGetResult(int statementId, ref int affectedRows, ref int insertedId,
+ IAsyncResult asyncResult)
+ {
+ return handler.EndGetResult(asyncResult, ref affectedRows, ref insertedId);
+ }
+
public virtual bool FetchDataRow(int statementId, int columns)
{
return handler.FetchDataRow(statementId, columns);
@@ -484,6 +520,8 @@ namespace MySql.Data.MySqlClient
void Close(bool isOpen);
bool Ping();
int GetResult(ref int affectedRows, ref int insertedId);
+ IAsyncResult BeginGetResult(object state, AsyncCallback callback);
+ int EndGetResult(IAsyncResult asyncResult, ref int affectedRows, ref int insertedId);
bool FetchDataRow(int statementId, int columns);
int PrepareStatement(string sql, ref MySqlField[] parameters);
void ExecuteStatement(MySqlPacket packet);
=== modified file 'MySql.Data/Provider/Source/MySqlStream.cs'
--- a/MySql.Data/Provider/Source/MySqlStream.cs 2009-10-07 20:30:34 +0000
+++ b/MySql.Data/Provider/Source/MySqlStream.cs 2010-08-16 14:55:33 +0000
@@ -114,35 +114,39 @@ namespace MySql.Data.MySqlClient
#region Packet methods
- /// <summary>
- /// ReadPacket is called by NativeDriver to start reading the next
- /// packet on the stream.
- /// </summary>
- public MySqlPacket ReadPacket()
- {
+ /// <summary>
+ /// ReadPacket is called by NativeDriver to start reading the next
+ /// packet on the stream.
+ /// </summary>
+ public MySqlPacket ReadPacket()
+ {
//Debug.Assert(packet.Position == packet.Length);
// make sure we have read all the data from the previous packet
- //Debug.Assert(HasMoreData == false, "HasMoreData is true in OpenPacket");
+ //Debug.Assert(HasMoreData == false, "HasMoreData is true in OpenPacket");
- LoadPacket();
+ LoadPacket();
+ CheckPacketError();
+ return packet;
+ }
+ void CheckPacketError()
+ {
// now we check if this packet is a server error
- if (packet.Buffer[0] == 0xff)
- {
- packet.ReadByte(); // read off the 0xff
+ if (packet.Buffer[0] == 0xff)
+ {
+ packet.ReadByte(); // read off the 0xff
- int code = packet.ReadInteger(2);
- string msg = packet.ReadString();
+ int code = packet.ReadInteger(2);
+ string msg = packet.ReadString();
if (msg.StartsWith("#"))
{
msg.Substring(1, 5); /* state code */
msg = msg.Substring(6);
}
- throw new MySqlException(msg, code);
- }
- return packet;
- }
+ throw new MySqlException(msg, code);
+ }
+ }
/// <summary>
/// Reads the specified number of bytes from the stream and stores them at given
@@ -169,18 +173,29 @@ namespace MySql.Data.MySqlClient
}
}
- /// <summary>
- /// LoadPacket loads up and decodes the header of the incoming packet.
- /// </summary>
- public void LoadPacket()
- {
- try
- {
+
+ /// <summary>
+ /// LoadPacket loads up and decodes the header of the incoming packet.
+ /// </summary>
+ public void LoadPacket()
+ {
+ LoadPacket(0);
+ }
+
+ private void LoadPacket(int off)
+ {
+ Debug.Assert(off <= 4);
+ try
+ {
packet.Length = 0;
int offset = 0;
while (true)
{
- ReadFully(inStream, packetHeader, 0, 4);
+ if (off < 4)
+ {
+ ReadFully(inStream, packetHeader, off, 4 - off);
+ }
+ off = 0;
sequenceByte = (byte)(packetHeader[3]+1);
int length = (int)(packetHeader[0] + (packetHeader[1] << 8) +
(packetHeader[2] << 16));
@@ -202,6 +217,21 @@ namespace MySql.Data.MySqlClient
}
}
+ public IAsyncResult BeginReadPacket(object state, AsyncCallback callback)
+ {
+ packet.Length = 0;
+ return inStream.BeginRead(packetHeader, 0, 4, callback, state);
+ }
+
+ public MySqlPacket EndReadPacket(IAsyncResult asyncResult)
+ {
+ int bytesRead = inStream.EndRead(asyncResult);
+ Debug.Assert(bytesRead <= 4);
+ LoadPacket(bytesRead);
+ CheckPacketError();
+ return packet;
+ }
+
public void SendPacket(MySqlPacket packet)
{
byte[] buffer = packet.Buffer;
=== modified file 'MySql.Data/Provider/Source/MySqlTrace.cs'
--- a/MySql.Data/Provider/Source/MySqlTrace.cs 2010-03-08 21:23:04 +0000
+++ b/MySql.Data/Provider/Source/MySqlTrace.cs 2010-08-16 14:55:33 +0000
@@ -141,7 +141,8 @@ namespace MySql.Data.MySqlClient
UsageAdvisorWarning,
Warning,
Error,
- QueryNormalized
+ QueryNormalized,
+ ResultAsyncOpened
}
public enum UsageAdvisorWarningFlags
=== modified file 'MySql.Data/Provider/Source/NativeDriver.cs'
--- a/MySql.Data/Provider/Source/NativeDriver.cs 2010-07-01 22:51:18 +0000
+++ b/MySql.Data/Provider/Source/NativeDriver.cs 2010-08-16 14:55:33 +0000
@@ -31,7 +31,6 @@ using System.Text;
using System.Net.Security;
using System.Security.Authentication;
using System.Globalization;
-using System.Text;
#endif
namespace MySql.Data.MySqlClient
@@ -547,11 +546,27 @@ namespace MySql.Data.MySqlClient
}
}
+ public IAsyncResult BeginGetResult(object state, AsyncCallback callback)
+ {
+ return stream.BeginReadPacket(state, callback);
+ }
+
+ public int EndGetResult(IAsyncResult asyncResult, ref int affectedRow, ref int insertedId)
+ {
+ return GetResultInternal(ref affectedRow, ref insertedId, asyncResult);
+ }
+
public int GetResult(ref int affectedRow, ref int insertedId)
{
+ return GetResultInternal(ref affectedRow, ref insertedId, null);
+ }
+
+ private int GetResultInternal(ref int affectedRow, ref int insertedId,
+ IAsyncResult asyncResult)
+ {
try
{
- packet = stream.ReadPacket();
+ packet = (asyncResult == null)? stream.ReadPacket() : stream.EndReadPacket(asyncResult);
}
catch (TimeoutException)
{
=== modified file 'MySql.Data/Provider/Source/TimedStream.cs'
--- a/MySql.Data/Provider/Source/TimedStream.cs 2010-01-15 19:09:26 +0000
+++ b/MySql.Data/Provider/Source/TimedStream.cs 2010-08-16 14:55:33 +0000
@@ -42,6 +42,7 @@ namespace MySql.Data.MySqlClient
int lastWriteTimeout;
LowResolutionStopwatch stopwatch;
bool isClosed;
+ bool hasPendingIO;
enum IOKind
@@ -66,6 +67,13 @@ namespace MySql.Data.MySqlClient
stopwatch = new LowResolutionStopwatch();
}
+ private void CheckAsyncIO()
+ {
+ if(hasPendingIO)
+ {
+ throw new InvalidOperationException("Asyncronous IO in progress");
+ }
+ }
/// <summary>
/// Figure out whether it is necessary to reset timeout on stream.
@@ -193,6 +201,7 @@ namespace MySql.Data.MySqlClient
public override int Read(byte[] buffer, int offset, int count)
{
+ CheckAsyncIO();
try
{
StartTimer(IOKind.Read);
@@ -209,6 +218,7 @@ namespace MySql.Data.MySqlClient
public override int ReadByte()
{
+ CheckAsyncIO();
try
{
StartTimer(IOKind.Read);
@@ -235,6 +245,7 @@ namespace MySql.Data.MySqlClient
public override void Write(byte[] buffer, int offset, int count)
{
+ CheckAsyncIO();
try
{
StartTimer(IOKind.Write);
@@ -272,6 +283,22 @@ namespace MySql.Data.MySqlClient
baseStream.Close();
}
+ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count,
+ AsyncCallback callback, object state)
+ {
+ CheckAsyncIO();
+
+ IAsyncResult result = baseStream.BeginRead(buffer, offset, count, callback, state);
+ hasPendingIO = true;
+ return result;
+ }
+
+ public override int EndRead(IAsyncResult asyncResult)
+ {
+ hasPendingIO = false;
+ return baseStream.EndRead(asyncResult);
+ }
+
public void ResetTimeout(int newTimeout)
{
if (newTimeout == System.Threading.Timeout.Infinite || newTimeout == 0)
@@ -281,7 +308,6 @@ namespace MySql.Data.MySqlClient
stopwatch.Reset();
}
-
/// <summary>
/// Common handler for IO exceptions.
/// Resets timeout to infinity if timeout exception is
=== modified file 'MySql.Data/Provider/Source/TracingDriver.cs'
--- a/MySql.Data/Provider/Source/TracingDriver.cs 2010-03-05 20:19:45 +0000
+++ b/MySql.Data/Provider/Source/TracingDriver.cs 2010-08-16 14:55:33 +0000
@@ -97,6 +97,43 @@ namespace MySql.Data.MySqlClient
}
}
+ protected override IAsyncResult BeginGetResult(int statementId, object state, AsyncCallback callback)
+ {
+ try
+ {
+ IAsyncResult result = base.BeginGetResult(statementId, state, callback);
+ MySqlTrace.TraceEvent(TraceEventType.Information, MySqlTraceEventType.ResultAsyncOpened,
+ Resources.TraceResultAsyncOpen, driverId);
+ return result;
+ }
+ catch (MySqlException ex)
+ {
+ // we got an error so we report it
+ MySqlTrace.TraceEvent(TraceEventType.Information, MySqlTraceEventType.Error,
+ Resources.TraceOpenResultError, driverId, ex.Number, ex.Message);
+ throw ex;
+ }
+ }
+
+ protected override int EndGetResult(int statementId, ref int affectedRows, ref int insertedId, IAsyncResult asyncResult)
+ {
+ try
+ {
+ int fieldCount = base.EndGetResult(statementId, ref affectedRows, ref insertedId, asyncResult);
+ MySqlTrace.TraceEvent(TraceEventType.Information, MySqlTraceEventType.ResultOpened,
+ Resources.TraceResult, driverId, fieldCount, affectedRows, insertedId);
+
+ return fieldCount;
+ }
+ catch (MySqlException ex)
+ {
+ // we got an error so we report it
+ MySqlTrace.TraceEvent(TraceEventType.Information, MySqlTraceEventType.Error,
+ Resources.TraceOpenResultError, driverId, ex.Number, ex.Message);
+ throw ex;
+ }
+ }
+
public override ResultSet NextResult(int statementId)
{
// first let's see if we already have a resultset on this statementId
=== modified file 'MySql.Data/Provider/Source/command.cs'
--- a/MySql.Data/Provider/Source/command.cs 2010-07-30 22:20:57 +0000
+++ b/MySql.Data/Provider/Source/command.cs 2010-08-16 14:55:33 +0000
@@ -51,7 +51,6 @@ namespace MySql.Data.MySqlClient
long updatedRowCount;
UpdateRowSource updatedRowSource;
MySqlParameterCollection parameters;
- private IAsyncResult asyncResult;
private bool designTimeVisible;
internal Int64 lastInsertedId;
private PreparableStatement statement;
@@ -360,6 +359,12 @@ namespace MySql.Data.MySqlClient
/// <include file='docs/mysqlcommand.xml' path='docs/ExecuteReader1/*'/>
public new MySqlDataReader ExecuteReader (CommandBehavior behavior)
{
+ return (MySqlDataReader)InternalExecuteReader(behavior, null, null, false);
+ }
+
+
+ private object InternalExecuteReader(CommandBehavior behavior, object state, AsyncCallback callback, bool async)
+ {
bool success = false;
CheckState();
Driver driver = connection.driver;
@@ -431,10 +436,19 @@ namespace MySql.Data.MySqlClient
connection.Reader = reader;
// execute the statement
statement.Execute();
- // wait for data to return
- reader.NextResult();
- success = true;
- return reader;
+ if (!async)
+ {
+ // wait for data to return
+ reader.NextResult();
+ success = true;
+ return reader;
+ }
+ else
+ {
+ IAsyncResult asyncResult = reader.BeginNextResult(state, callback);
+ success = true;
+ return asyncResult;
+ }
}
catch (TimeoutException tex)
{
@@ -566,8 +580,6 @@ namespace MySql.Data.MySqlClient
#region Async Methods
- internal delegate object AsyncDelegate(int type, CommandBehavior behavior);
- internal AsyncDelegate caller = null;
internal Exception thrownException;
private static string TrimSemicolons(string sql)
@@ -625,12 +637,7 @@ namespace MySql.Data.MySqlClient
/// the returned rows. </returns>
public IAsyncResult BeginExecuteReader(CommandBehavior behavior)
{
- if (caller != null)
- throw new MySqlException(Resources.UnableToStartSecondAsyncOp);
-
- caller = new AsyncDelegate(AsyncExecuteWrapper);
- asyncResult = caller.BeginInvoke(1, behavior, null, null);
- return asyncResult;
+ return (IAsyncResult)InternalExecuteReader(behavior, null, null, true);
}
/// <summary>
@@ -642,12 +649,13 @@ namespace MySql.Data.MySqlClient
/// <returns>A <b>MySqlDataReader</b> object that can be used to retrieve the requested rows. </returns>
public MySqlDataReader EndExecuteReader(IAsyncResult result)
{
- result.AsyncWaitHandle.WaitOne();
- AsyncDelegate c = caller;
- caller = null;
- if (thrownException != null)
- throw thrownException;
- return (MySqlDataReader)c.EndInvoke(result);
+ MySqlDataReader r = connection.Reader;
+ if (r != null)
+ {
+ r.EndNextResult(result);
+ return r;
+ }
+ return null;
}
/// <summary>
@@ -666,13 +674,7 @@ namespace MySql.Data.MySqlClient
/// which returns the number of affected rows. </returns>
public IAsyncResult BeginExecuteNonQuery(AsyncCallback callback, object stateObject)
{
- if (caller != null)
- throw new MySqlException(Resources.UnableToStartSecondAsyncOp);
-
- caller = new AsyncDelegate(AsyncExecuteWrapper);
- asyncResult = caller.BeginInvoke(2, CommandBehavior.Default,
- callback, stateObject);
- return asyncResult;
+ return (IAsyncResult)InternalExecuteReader(CommandBehavior.Default, stateObject, null, true);
}
/// <summary>
@@ -684,12 +686,7 @@ namespace MySql.Data.MySqlClient
/// which returns the number of affected rows. </returns>
public IAsyncResult BeginExecuteNonQuery()
{
- if (caller != null)
- throw new MySqlException(Resources.UnableToStartSecondAsyncOp);
-
- caller = new AsyncDelegate(AsyncExecuteWrapper);
- asyncResult = caller.BeginInvoke(2, CommandBehavior.Default, null, null);
- return asyncResult;
+ return BeginExecuteNonQuery(null, null);
}
/// <summary>
@@ -700,12 +697,14 @@ namespace MySql.Data.MySqlClient
/// <returns></returns>
public int EndExecuteNonQuery(IAsyncResult asyncResult)
{
- asyncResult.AsyncWaitHandle.WaitOne();
- AsyncDelegate c = caller;
- caller = null;
- if (thrownException != null)
- throw thrownException;
- return (int)c.EndInvoke(asyncResult);
+ if (connection.Reader != null)
+ {
+ MySqlDataReader r = connection.Reader;
+ r.EndNextResult(asyncResult);
+ r.Close();
+ return (int)r.affectedRows;
+ }
+ return -1;
}
#endregion
=== modified file 'MySql.Data/Provider/Source/common/NamedPipeStream.cs'
--- a/MySql.Data/Provider/Source/common/NamedPipeStream.cs 2009-10-13 03:07:05 +0000
+++ b/MySql.Data/Provider/Source/common/NamedPipeStream.cs 2010-08-16 14:55:33 +0000
@@ -141,7 +141,15 @@ namespace MySql.Data.Common
return fileStream.EndRead(result);
}
+ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
+ {
+ return fileStream.BeginRead(buffer, offset, count, callback, state);
+ }
+ public override int EndRead(IAsyncResult asyncResult)
+ {
+ return fileStream.EndRead(asyncResult);
+ }
public override void Write(byte[] buffer, int offset, int count)
{
if (writeTimeout == Timeout.Infinite)
=== modified file 'MySql.Data/Provider/Source/common/SharedMemoryStream.cs'
--- a/MySql.Data/Provider/Source/common/SharedMemoryStream.cs 2009-10-07 20:30:34 +0000
+++ b/MySql.Data/Provider/Source/common/SharedMemoryStream.cs 2010-08-16 14:55:33 +0000
@@ -94,10 +94,14 @@ namespace MySql.Data.Common
private EventWaitHandle clientRead;
private EventWaitHandle clientWrote;
private EventWaitHandle connectionClosed;
+ private EventWaitHandle asyncReadCompleted;
private SharedMemory data;
private int bytesLeft;
private int position;
private int connectNumber;
+ private bool closed;
+ RegisteredWaitHandle closeWaitHandle;
+
private const int BUFFERLENGTH = 16004;
@@ -121,27 +125,37 @@ namespace MySql.Data.Common
public override void Close()
{
+
if (connectionClosed != null)
{
- bool isClosed = connectionClosed.WaitOne(0);
- if (!isClosed)
- {
- connectionClosed.Set();
- connectionClosed.Close();
- }
- connectionClosed = null;
- EventWaitHandle[] handles =
- {serverRead, serverWrote, clientRead, clientWrote};
-
- for(int i=0; i< handles.Length; i++)
- {
- if(handles[i] != null)
- handles[i].Close();
- }
- if (data != null)
+ lock (connectionClosed)
{
- data.Dispose();
- data = null;
+ if (closeWaitHandle != null)
+ {
+ closeWaitHandle.Unregister(connectionClosed);
+ closeWaitHandle = null;
+ }
+ bool isClosed = connectionClosed.WaitOne(0);
+ if (!isClosed)
+ {
+ connectionClosed.Set();
+ connectionClosed.Close();
+ }
+ connectionClosed = null;
+ EventWaitHandle[] handles = { serverRead, serverWrote,
+ clientRead, clientWrote,
+ asyncReadCompleted };
+
+ for (int i = 0; i < handles.Length; i++)
+ {
+ if (handles[i] != null)
+ handles[i].Close();
+ }
+ if (data != null)
+ {
+ data.Dispose();
+ data = null;
+ }
}
}
}
@@ -179,7 +193,25 @@ namespace MySql.Data.Common
}
}
+ private void ConnectionClosedCallback(object state, bool timedOut)
+ {
+ closed = true;
+ // synchronization required, since we do not want
+ // this callback which is executed in thread pool thread
+ // to run in parallel with Close()
+ if (connectionClosed != null)
+ {
+ lock (connectionClosed)
+ {
+ // wakeup Read(), if it is waiting
+ serverWrote.Set();
+
+ // wakeup Write(), if it is waiting
+ serverRead.Set();
+ }
+ }
+ }
private void SetupEvents()
{
string prefix = memoryName + "_" + connectNumber;
@@ -189,6 +221,8 @@ namespace MySql.Data.Common
clientWrote = EventWaitHandle.OpenExisting(prefix + "_CLIENT_WROTE");
clientRead = EventWaitHandle.OpenExisting(prefix + "_CLIENT_READ");
connectionClosed = EventWaitHandle.OpenExisting(prefix + "_CONNECTION_CLOSED");
+ ThreadPool.RegisterWaitForSingleObject(connectionClosed,
+ ConnectionClosedCallback, null, Timeout.Infinite, true);
// tell the server we are ready
serverRead.Set();
@@ -232,17 +266,16 @@ namespace MySql.Data.Common
public override int Read(byte[] buffer, int offset, int count)
{
int timeLeft = readTimeout;
- WaitHandle[] waitHandles = { serverWrote, connectionClosed };
LowResolutionStopwatch stopwatch = new LowResolutionStopwatch();
while (bytesLeft == 0)
{
stopwatch.Start();
- int index = WaitHandle.WaitAny(waitHandles, timeLeft);
+ bool ok = serverWrote.WaitOne(timeLeft);
stopwatch.Stop();
- if (index == WaitHandle.WaitTimeout)
+ if (!ok)
throw new TimeoutException("Timeout when reading from shared memory");
- if (waitHandles[index] == connectionClosed)
+ if (closed)
throw new MySqlException("Connection to server lost",true, null);
if (readTimeout != System.Threading.Timeout.Infinite)
@@ -269,6 +302,100 @@ namespace MySql.Data.Common
return len;
}
+
+ internal class AsyncSharedMemoryIO:IAsyncResult
+ {
+ public object state;
+ public byte[] buffer;
+ public int count;
+ public AsyncCallback callback;
+ public WaitHandle handle;
+ public bool completed;
+ public int bytesRead;
+ public RegisteredWaitHandle registeredWaitHandle;
+
+ internal AsyncSharedMemoryIO(
+ object state, byte[] buffer, int count, AsyncCallback callback,
+ WaitHandle handle)
+ {
+ this.state = state;
+ this.buffer = buffer;
+ this.count = count;
+ this.callback = callback;
+ this.handle = handle;
+ }
+
+ #region IAsyncResult Members
+
+ public object AsyncState
+ {
+ get { return state; }
+ }
+
+ public WaitHandle AsyncWaitHandle
+ {
+ get { return handle; }
+ }
+
+ public bool CompletedSynchronously
+ {
+ get { return false; }
+ }
+
+ public bool IsCompleted
+ {
+ get { return completed; }
+ }
+
+ #endregion
+ }
+
+ private void ServerWroteCallback(object req, bool timedOut)
+ {
+ AsyncSharedMemoryIO request = (AsyncSharedMemoryIO)req;
+ request.registeredWaitHandle.Unregister(serverWrote);
+
+ // signal handle, it was reset (since it is autoreset handle)
+ serverWrote.Set();
+ // this read is non-blocking (serverWrote is set)
+ request.bytesRead = Read(request.buffer, 0, request.count);
+
+ if (request.callback != null)
+ {
+ request.callback(request);
+ }
+
+ asyncReadCompleted.Set();
+ request.completed = true;
+ }
+
+ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count,
+ AsyncCallback callback, object state)
+ {
+ if (asyncReadCompleted == null)
+ {
+ asyncReadCompleted = new EventWaitHandle(false, EventResetMode.ManualReset);
+ }
+ asyncReadCompleted.Reset();
+
+ AsyncSharedMemoryIO req = new AsyncSharedMemoryIO(state, buffer, count,
+ callback, serverWrote);
+ req.registeredWaitHandle =
+ ThreadPool.RegisterWaitForSingleObject(serverWrote, ServerWroteCallback,
+ state, Timeout.Infinite, true);
+ return req;
+ }
+
+ public override int EndRead(IAsyncResult asyncResult)
+ {
+ AsyncSharedMemoryIO io = (AsyncSharedMemoryIO)asyncResult;
+ if (!io.completed)
+ {
+ asyncReadCompleted.WaitOne();
+ }
+ return io.bytesRead;
+ }
+
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException("SharedMemoryStream does not support seeking");
@@ -278,20 +405,20 @@ namespace MySql.Data.Common
{
int leftToDo = count;
int buffPos = offset;
- WaitHandle[] waitHandles = { serverRead, connectionClosed };
+
LowResolutionStopwatch stopwatch = new LowResolutionStopwatch();
int timeLeft = writeTimeout;
while (leftToDo > 0)
{
stopwatch.Start();
- int index = WaitHandle.WaitAny(waitHandles, timeLeft);
+ bool ok = serverRead.WaitOne(timeLeft);
stopwatch.Stop();
- if (waitHandles[index] == connectionClosed)
+ if (closed)
throw new MySqlException("Connection to server lost",true, null);
- if (index == WaitHandle.WaitTimeout)
+ if (!ok)
throw new TimeoutException("Timeout when reading from shared memory");
if (writeTimeout != System.Threading.Timeout.Infinite)
=== modified file 'MySql.Data/Provider/Source/datareader.cs'
--- a/MySql.Data/Provider/Source/datareader.cs 2010-07-30 22:20:57 +0000
+++ b/MySql.Data/Provider/Source/datareader.cs 2010-08-16 14:55:33 +0000
@@ -864,13 +864,40 @@ namespace MySql.Data.MySqlClient
return DBNull.Value == GetValue(i);
}
- /// <summary>
+ /// <summary>
/// Advances the data reader to the next result, when reading the results of batch SQL statements.
/// </summary>
/// <returns></returns>
public override bool NextResult()
{
- if (!isOpen)
+ return InternalNextResult(null);
+ }
+
+ internal IAsyncResult BeginNextResult(object state, AsyncCallback callback)
+ {
+ if (!isOpen)
+ throw new MySqlException(Resources.NextResultIsClosed);
+
+ // this will clear out any unread data
+ if (resultSet != null)
+ resultSet.Close();
+
+ // single result means we only return a single resultset. If we have already
+ // returned one, then we return false;
+ if (resultSet != null && (commandBehavior & CommandBehavior.SingleResult) != 0)
+ return null;
+
+ return driver.BeginNextResult(Statement.StatementId, state, callback);
+ }
+
+ internal void EndNextResult(IAsyncResult asyncResult)
+ {
+ InternalNextResult(asyncResult);
+ }
+
+ private bool InternalNextResult(IAsyncResult asyncResult)
+ {
+ if (!isOpen)
throw new MySqlException(Resources.NextResultIsClosed);
// this will clear out any unread data
@@ -888,7 +915,16 @@ namespace MySql.Data.MySqlClient
do
{
resultSet = null;
- resultSet = driver.NextResult(Statement.StatementId);
+ if (asyncResult == null)
+ {
+ resultSet = driver.NextResult(Statement.StatementId);
+ }
+ else
+ {
+ resultSet = driver.EndNextResult(Statement.StatementId,
+ asyncResult);
+ asyncResult = null;
+ }
if (resultSet == null) return false;
if (resultSet.IsOutputParameters) return false;
Attachment: [text/bzr-bundle] bzr/vvaintroub@mysql.com-20100816145533-y8urpn11xwnd45fo.bundle
| Thread |
|---|
| • bzr commit into connector-net-trunk branch (vvaintroub:922) | Vladislav Vaintroub | 16 Aug |