首页 > 代码库 > .net异步编程の-------异步编程模型(APM)
.net异步编程の-------异步编程模型(APM)
术语解释:
APM 异步编程模型, Asynchronous Programming Model
EAP 基于事件的异步编程模式, Event-based Asynchronous Pattern
TAP 基于任务的异步编程模式, Task-based Asynchronous Pattern
一、异步编程
APM即异步编程模型的简写(Asynchronous Programming Model),大家在写代码的时候或者查看.NET 的类库的时候肯定会经常看到和使用以BeginXXX和EndXXX类似的方法,其实你在使用这些方法的时候,你就再使用异步编程模型来编写程序。异步编写模型是一种模式,该模式允许用更少的线程去做更多的操作,.NET Framework很多类也实现了该模式,同时我们也可以自定义类来实现该模式,(也就是在自定义的类中实现返回类型为IAsyncResult接口的BeginXXX方法和EndXXX方法),另外委托类型也定义了BeginInvoke和EndInvoke方法,并且我们使用WSDL.exe和SvcUtil.exe工具来生成Web服务的代理类型时,也会生成使用了APM的BeginXxx和EndXxx方法。下面就具体就拿FileStream类的BeginRead和EndRead方法来介绍下下异步编程模型的实现。
读取的同步方法:
public
override
int
Read(
byte
[] array,
int
offset,
int
count )
;
读取过程中,执行此方法会阻塞主线程,最明显的就是造成界面卡死。
读取的异步方法:
// 开始异步读操作
// 前面的3个参数和同步方法代表的意思一样,这里就不说了,可以看到这里多出了2个参数
// userCallback代表当异步IO操作完成时,你希望由一个线程池线程执行的方法,该方法必须匹配AsyncCallback委托
// stateObject代表你希望转发给回调方法的一个对象的引用,在回调方法中,可以查询IAsyncResult接口的AsyncState属性来访问该对象
public
override
IAsyncResult BeginRead(
byte
[] array,
int
offset,
int
numBytes, AsyncCallback userCallback, Object stateObject
)
EndXxx方法——结束异步操作介绍
前面介绍完了BeginXxx方法,我们看到所有BeginXxx方法返回的都是实现了IAsyncResult接口的一个对象,并不是对应的同步方法所要得到的结果的。此时我们需要调用对应的EndXxx方法来结束异步操作,并向该方法传递IAsyncResult对象,EndXxx方法的返回类型就是和同步方法
一样的。例如,FileStream的EndRead方法返回一个Int32来代表从文件流中实际读取的字节数。
异步调用的结果:
对于访问异步操作的结果,APM提供了四种方式供开发人员选择:
-
在调用BeginXxx方法的线程上调用EndXxx方法来得到异步操作的结果,但是这种方式会阻塞调用线程,知道操作完成之后调用线程才继续运行
-
查询IAsyncResult的AsyncWaitHandle属性,从而得到WaitHandle,然后再调用它的WaitOne方法来使一个线程阻塞并等待操作完成再调用EndXxx方法来获得操作的结果。
-
循环查询IAsyncResult的IsComplete属性,操作完成后再调用EndXxx方法来获得操作返回的结果。
-
使用 AsyncCallback委托来指定操作完成时要调用的方法,在操作完成后调用的方法中调用EndXxx操作来获得异步操作的结果。(最为常用)
异步编程模型的本质:
异步编程模型这个模式,是微软利用委托和线程池帮助我们实现的一个模式(该模式利用一个线程池线程去执行一个操作,在FileStream类BeginRead方法中就是执行一个读取文件操作,该线程池线程会立即将控制权返回给调用线程,此时线程池线程在后台进行这个异步操作;异步操作完成之后,通过回调函数来获取异步操作返回的结果。此时就是利用委托的机制。所以说异步编程模式时利用委托和线程池线程搞出来的模式,包括后面的基于事件的异步编程和基于任务的异步编程,还有C# 5中的async和await关键字,都是利用这委托和线程池搞出来的。他们的本质都一样,后面方法使异步编程更加简单。)
using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.IO; using System.Linq; using System.Net; using System.Runtime.Remoting.Messaging; using System.Text; using System.Windows.Forms; //https://msdn.microsoft.com/zh-cn/library/system.net.httpwebrequest.begingetresponse.aspx namespace WindowsFormsApplication1 { public class RequestState { // This class stores the State of the request. const int BUFFER_SIZE = 1024; public StringBuilder requestData; public byte[] BufferRead; public HttpWebRequest request; public HttpWebResponse response; public Stream streamResponse; public Stream filestream; public string savepath; public RequestState() { BufferRead = new byte[BUFFER_SIZE]; requestData = new StringBuilder(""); request = null; streamResponse = null; } } public partial class Form1 : Form { // 定义用来实现异步编程的委托 private delegate string AsyncMethodCaller(string fileurl); public Form1() { InitializeComponent(); tbUrl.Text = "http://download.microsoft.com/download/7/0/3/703455ee-a747-4cc8-bd3e-98a615c3aedb/dotNetFx35setup.exe"; // 允许跨线程调用 // 实际开发中不建议这样做的,违背了.NET 安全规范 CheckForIllegalCrossThreadCalls = false; } private void btnDownLoad_Click(object sender, EventArgs e) { rtbContent.Text = "Download............"; if (tbUrl.Text == string.Empty) { MessageBox.Show("Please input valid download file url"); return; } AsyncMethodCaller methodCaller = new AsyncMethodCaller(DownLoadFileSync); methodCaller.BeginInvoke(tbUrl.Text.Trim(), GetResult, null); } // 同步下载文件的方法 // 该方法会阻塞主线程,使用户无法对界面进行操作 // 在文件下载完成之前,用户甚至都不能关闭运行的程序。 private string DownLoadFileSync(string url) { // Create an instance of the RequestState RequestState requestState = new RequestState(); try { // Initialize an HttpWebRequest object HttpWebRequest myHttpWebRequest = (HttpWebRequest)WebRequest.Create(url); // assign HttpWebRequest instance to its request field. requestState.request = myHttpWebRequest; requestState.response = (HttpWebResponse)myHttpWebRequest.GetResponse(); requestState.streamResponse = requestState.response.GetResponseStream(); int readSize = requestState.streamResponse.Read(requestState.BufferRead, 0, requestState.BufferRead.Length); while (readSize > 0) { requestState.filestream.Write(requestState.BufferRead, 0, readSize); readSize = requestState.streamResponse.Read(requestState.BufferRead, 0, requestState.BufferRead.Length); } // 执行该方法的线程是线程池线程,该线程不是与创建richTextBox控件的线程不是一个线程 // 如果不把 CheckForIllegalCrossThreadCalls 设置为false,该程序会出现“不能跨线程访问控件”的异常 return string.Format("The Length of the File is: {0}", requestState.filestream.Length) + string.Format("\nDownLoad Completely, Download path is: {0}", requestState.savepath); } catch (Exception e) { return string.Format("Exception occurs in DownLoadFileSync method, Error Message is:{0}", e.Message); } finally { //requestState.response.Close(); //requestState.filestream.Close(); } } // 异步操作完成时执行的方法 private void GetResult(IAsyncResult result) { AsyncMethodCaller caller = (AsyncMethodCaller)((AsyncResult)result).AsyncDelegate; // 调用EndInvoke去等待异步调用完成并且获得返回值 // 如果异步调用尚未完成,则 EndInvoke 会一直阻止调用线程,直到异步调用完成 string returnstring = caller.EndInvoke(result); //sc.Post(ShowState,resultvalue); rtbContent.Text = returnstring; } } }
using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.IO; using System.Linq; using System.Net; using System.Runtime.Remoting.Messaging; using System.Text; using System.Windows.Forms; //https://msdn.microsoft.com/zh-cn/library/system.net.httpwebrequest.begingetresponse.aspx namespace WindowsFormsApplication1 { public class RequestState { // This class stores the State of the request. const int BUFFER_SIZE = 1024; public StringBuilder requestData; public byte[] BufferRead; public HttpWebRequest request; public HttpWebResponse response; public Stream streamResponse; public Stream filestream; public string savepath; public RequestState() { BufferRead = new byte[BUFFER_SIZE]; requestData = new StringBuilder(""); request = null; streamResponse = null; } } public partial class Form1 : Form { // 定义用来实现异步编程的委托 private delegate string AsyncMethodCaller(string fileurl); public Form1() { InitializeComponent(); tbUrl.Text = "http://download.microsoft.com/download/7/0/3/703455ee-a747-4cc8-bd3e-98a615c3aedb/dotNetFx35setup.exe"; // 允许跨线程调用 // 实际开发中不建议这样做的,违背了.NET 安全规范 CheckForIllegalCrossThreadCalls = false; } private void btnDownLoad_Click(object sender, EventArgs e) { rtbContent.Text = "Download............"; if (tbUrl.Text == string.Empty) { MessageBox.Show("Please input valid download file url"); return; } AsyncMethodCaller methodCaller = new AsyncMethodCaller(DownLoadFileSync); methodCaller.BeginInvoke(tbUrl.Text.Trim(), GetResult, null); } // 同步下载文件的方法 // 该方法会阻塞主线程,使用户无法对界面进行操作 // 在文件下载完成之前,用户甚至都不能关闭运行的程序。 private string DownLoadFileSync(string url) { // Create an instance of the RequestState RequestState requestState = new RequestState(); try { // Initialize an HttpWebRequest object HttpWebRequest myHttpWebRequest = (HttpWebRequest)WebRequest.Create(url); // assign HttpWebRequest instance to its request field. requestState.request = myHttpWebRequest; requestState.response = (HttpWebResponse)myHttpWebRequest.GetResponse(); requestState.streamResponse = requestState.response.GetResponseStream(); int readSize = requestState.streamResponse.Read(requestState.BufferRead, 0, requestState.BufferRead.Length); while (readSize > 0) { requestState.filestream.Write(requestState.BufferRead, 0, readSize); readSize = requestState.streamResponse.Read(requestState.BufferRead, 0, requestState.BufferRead.Length); } // 执行该方法的线程是线程池线程,该线程不是与创建richTextBox控件的线程不是一个线程 // 如果不把 CheckForIllegalCrossThreadCalls 设置为false,该程序会出现“不能跨线程访问控件”的异常 return string.Format("The Length of the File is: {0}", requestState.filestream.Length) + string.Format("\nDownLoad Completely, Download path is: {0}", requestState.savepath); } catch (Exception e) { return string.Format("Exception occurs in DownLoadFileSync method, Error Message is:{0}", e.Message); } finally { //requestState.response.Close(); //requestState.filestream.Close(); } } // 异步操作完成时执行的方法 private void GetResult(IAsyncResult result) { AsyncMethodCaller caller = (AsyncMethodCaller)((AsyncResult)result).AsyncDelegate; // 调用EndInvoke去等待异步调用完成并且获得返回值 // 如果异步调用尚未完成,则 EndInvoke 会一直阻止调用线程,直到异步调用完成 string returnstring = caller.EndInvoke(result); //sc.Post(ShowState,resultvalue); rtbContent.Text = returnstring; } } }
当我们调用实现基于事件的异步模式的类的 XxxAsync方法时,即代表开始了一个异步操作,该方法调用完之后会使一个线程池线程去执行耗时的操作,所以当UI线程调用该方法时,当然也就不会堵塞UI线程了。并且基于事件的异步模式是建立了APM的基础之上的(这也是我在上一专题中详解介绍APM的原因),而APM又是建立了在委托之上的
二、基于事件的异步编程
基于事件的C#异步编程模式是比IAsyncResult模式更高级的一种异步编程模式,也被用在更多的场合。该异步模式具有以下优点:
· “在后台”执行耗时任务(例如下载和数据库操作),但不会中断您的应用程序。
· 同时执行多个操作,每个操作完成时都会接到通知(在通知中可以区分是完成了哪个操作)。
· 等待资源变得可用,但不会停止(“挂起”)您的应用程序。
· 使用熟悉的事件和委托模型与挂起的异步操作通信。
对于相对简单的应用程序可以直接用 .Net 2.0 新增的 BackgroundWorker 组件来很方便的实现,对于更复杂的异步应用程序则需要自己实现一个符合基于事件的C#异步编程模式的类。在实现基于事件的异步模式的设计前,需要了解基于事件的异步模式的实现原理是什么。基于事件的异步模式需要以下三个类型的帮助。
AsyncOperation:提供了对异步操作的生存期进行跟踪的功能,包括操作进度通知和操作完成通知,并确保在正确的线程或上下文中调用客户端的事件处理程序。
public void Post(SendOrPostCallback d,Object arg);
public void PostOperationCompleted(SendOrPostCallback d,Object arg);
通过在异步辅助代码中调用Post方法把进度和中间结果报告给用户,如果是取消异步任务或提示异步任务已完成,则通过调用PostOperationCompleted方法结束异步操作的跟踪生命期。在PostOperationCompleted方法调用后,AsyncOperation对象变得不再可用,再次访问将引发异常。在此有个问题:在该异步模式中,通过AsyncOperation的Post函数来通知进度的时候,是如何使SendOrPostCallback委托在UI线程上执行的?针对该问题下文有具体分析。
AsyncOperationManager:为AsyncOperation对象的创建提供了便捷方式,通过CreateOperation方法可以创建多个AsyncOperation实例,实现对多个异步操作进行跟踪。
WindowsFormsSynchronizationContext:该类继承自SynchronizationContext类型,提供 Windows 窗体应用程序模型的同步上下文。该类型是基于事件异步模式通信的核心。之所以说该类型是基于事件异步模式的通信核心,是因为该类型解决了“保证SendOrPostCallback委托在UI线程上执行”的问题。它是如何解决的?请看AsyncOperation类型的Post方法的实现:
/// <summary> /// AsyncOperation类型的Post方法的实现 /// </summary> public void Post(SendOrPostCallback d, object arg) { this.VerifyNotCompleted(); this.VerifyDelegateNotNull(d); this.syncContext.Post(d, arg); }
在AsyncOperation类型的Post方法中,直接调用了SynchronizationContext类型的Post方法,再看该Post方法的实现:
/// <summary> /// WindowsFormsSynchronizationContext类型的Post方法的实现 /// </summary> public override void Post(SendOrPostCallback d, object state) { if (this.controlToSendTo != null) { this.controlToSendTo.BeginInvoke(d, new object[] { state }); //此处保证了SendOrPostCallBack委托在UI线程上执行 } }
例子:
using System; using System.Collections.Generic; using System.Text; using System.ComponentModel; using System.Collections.Specialized; using System.Threading; namespace test { /// <summary> /// 任务1的进度通知代理 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> public delegate void Work1ProgressChangedEventHandler(object sender, Work1ProgressChangedEventArgs e); /// <summary> /// 任务1的进度通知参数 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> public delegate void Work1CompletedEventHandler(object sender, Work1CompletedEventArgs e); public class BasedEventAsyncWorker { private delegate void WorkerEventHandler(int maxNumber, AsyncOperation asyncOp); private HybridDictionary userStateToLifetime = new HybridDictionary(); public BasedEventAsyncWorker() { } #region DoWork1的基于事件的异步调用 public void DoWork1Async(object userState, int maxNumber) { AsyncOperation asyncOp = AsyncOperationManager.CreateOperation(userState); //userStateToLifetime有可能会同时被多线程访问,在此需要lock进行同步处理 lock (userStateToLifetime.SyncRoot) { if (userStateToLifetime.Contains(userState)) { throw new ArgumentException( "userState parameter must be unique", "userState"); } userStateToLifetime[userState] = asyncOp; } //异步开始任务1 WorkerEventHandler workerDelegate = new WorkerEventHandler(DoWork1); workerDelegate.BeginInvoke(maxNumber, asyncOp, null, null); } private void DoWork1(int maxNumber, AsyncOperation asyncOp) { Exception e = null; //判断该userState的任务仍在处理中 if (!TaskCanceled(asyncOp.UserSuppliedState)) { try { int n = 0; int percentage = 0; while (n < maxNumber && !TaskCanceled(asyncOp.UserSuppliedState)) { Thread.Sleep(100); //模拟耗时操作 percentage = (int)((float)n / (float)maxNumber * 100); Work1ProgressChangedEventArgs progressChanageArgs = new Work1ProgressChangedEventArgs(maxNumber, percentage, asyncOp.UserSuppliedState); //任务1的进度通知 asyncOp.Post(new SendOrPostCallback(Work1ReportProgressCB), progressChanageArgs); n++; } } catch (Exception ex) { e = ex; } } this.Work1Complete(e, TaskCanceled(asyncOp.UserSuppliedState), asyncOp); } private void Work1Complete(Exception exception, bool canceled, AsyncOperation asyncOp) { if (!canceled) { lock (userStateToLifetime.SyncRoot) { userStateToLifetime.Remove(asyncOp.UserSuppliedState); } } Work1CompletedEventArgs e = new Work1CompletedEventArgs(exception, canceled, asyncOp.UserSuppliedState); //通知指定的任务已经完成 asyncOp.PostOperationCompleted(new SendOrPostCallback(Work1CompleteCB), e); //调用 PostOperationCompleted 方法来结束异步操作的生存期。 //为某个特定任务调用此方法后,再调用其相应的 AsyncOperation 对象会引发异常。 } private void Work1ReportProgressCB(object state) { Work1ProgressChangedEventArgs e = state as Work1ProgressChangedEventArgs; OnWork1ProgressChanged(e); } private void Work1CompleteCB(object state) { Work1CompletedEventArgs e = state as Work1CompletedEventArgs; OnWork1Completed(e); } #region Work1的进度通知和任务完成的事件 public event Work1ProgressChangedEventHandler Work1ProgressChanged; protected virtual void OnWork1ProgressChanged(Work1ProgressChangedEventArgs e) { Work1ProgressChangedEventHandler temp = this.Work1ProgressChanged; if (temp != null) { temp(this, e); } } public event Work1CompletedEventHandler Work1Completed; protected virtual void OnWork1Completed(Work1CompletedEventArgs e) { Work1CompletedEventHandler temp = this.Work1Completed; if (temp != null) { temp(this, e); } } #endregion #endregion /// <summary> /// 取消指定userState的任务执行 /// </summary> /// <param name="userState"></param> public void CancelAsync(object userState) { AsyncOperation asyncOp = userStateToLifetime[userState] as AsyncOperation; if (asyncOp != null) { lock (userStateToLifetime.SyncRoot) { userStateToLifetime.Remove(userState); } } } /// <summary> /// 判断指定userState的任务是否已经被结束。返回值:true 已经结束; false 还没有结束 /// </summary> /// <param name="userState"></param> /// <returns></returns> private bool TaskCanceled(object userState) { return (userStateToLifetime[userState] == null); } } public class Work1ProgressChangedEventArgs :ProgressChangedEventArgs { private int totalWork = 1; public Work1ProgressChangedEventArgs(int totalWork, int progressPercentage, object userState) : base(progressPercentage, userState) { this.totalWork = totalWork; } /// <summary> /// Work1的总工作量 /// </summary> public int TotalWork { get { return totalWork; } } } public class Work1CompletedEventArgs : AsyncCompletedEventArgs { public Work1CompletedEventArgs(Exception e, bool canceled, object state) : base(e, canceled, state) { } } }
三、基于任务的异步编程
基于任务的异步模式(Task-based Asynchronous Pattern,TAP)之所以被微软所推荐,主要就它使用简单,基于任务的异步模式使用单个方法来表示异步操作的开始和完成,然而异步编程模型(APM)却要求BeginXxx和EndXxx两个方法来分别表示异步操作的开始和完成(这样使用起来就复杂了),然而,基于事件的异步模式(EAP)要求具有Async后缀的方法和一个或多个事件、事件处理程序和事件参数。看到这里,是不是大家都有这样一个疑问的——我们怎样区分.NET类库中的类实现了基于任务的异步模式呢? 这个识别方法很简单,当看到类中存在TaskAsync为后缀的方法时就代表该类实现了TAP, 并且基于任务的异步模式同样也支持异步操作的取消和进度的报告的功能,但是这两个实现都不像EAP中实现的那么复杂,因为如果我们要自己实现EAP的类,我们需要定义多个事件和事件处理程序的委托类型和事件的参数(具体可以查看上一专题中的BackgroundWorker剖析部分),但是在TAP实现中,我们只需要通过向异步方法传入CancellationToken参数,因为在异步方法内部会对这个参数的IsCancellationRequested属性进行监控,当异步方法收到一个取消请求时,异步方法将会退出执行(具体这点可以使用反射工具查看WebClient的DownloadDataTaskAsync方法,同时也可以参考我后面部分自己实现基于任务的异步模式的异步方法。),在TAP中,我们可以通过IProgress<T>接口来实现进度报告的功能,具体实现可以参考我后面的程序部分。
目前我还没有找到在.NET 类库中实现了基于任务的异步模式的哪个类提供进度报告的功能,下面的将为大家演示这个实现,并且也是这个程序的亮点,同时通过自己实现TAP的异步方法来进一步理解基于任务的异步模式。
如何基于任务异步编程
// Download File // CancellationToken 参数赋值获得一个取消请求 // progress参数负责进度报告 private void DownLoadFile(string url, CancellationToken ct, IProgress<int> progress) { HttpWebRequest request = null; HttpWebResponse response = null; Stream responseStream = null; int bufferSize = 2048; byte[] bufferBytes = new byte[bufferSize]; try { request = (HttpWebRequest)WebRequest.Create(url); if (DownloadSize != 0) { request.AddRange(DownloadSize); } response = (HttpWebResponse)request.GetResponse(); responseStream = response.GetResponseStream(); int readSize = 0; while (true) { // 收到取消请求则退出异步操作 if (ct.IsCancellationRequested == true) { MessageBox.Show(String.Format("下载暂停,下载的文件地址为:{0}\n 已经下载的字节数为: {1}字节", downloadPath, DownloadSize)); response.Close(); filestream.Close(); sc.Post((state) => { this.btnStart.Enabled = true; this.btnPause.Enabled = false; }, null); // 退出异步操作 break; } readSize = responseStream.Read(bufferBytes, 0, bufferBytes.Length); if (readSize > 0) { DownloadSize += readSize; int percentComplete = (int)((float)DownloadSize / (float)totalSize * 100); filestream.Write(bufferBytes, 0, readSize); // 报告进度 progress.Report(percentComplete); } else { MessageBox.Show(String.Format("下载已完成,下载的文件地址为:{0},文件的总字节数为: {1}字节", downloadPath, totalSize)); sc.Post((state) => { this.btnStart.Enabled = false; this.btnPause.Enabled = false; }, null); response.Close(); filestream.Close(); break; } } } catch (AggregateException ex) { // 因为调用Cancel方法会抛出OperationCanceledException异常 // 将任何OperationCanceledException对象都视为以处理 ex.Handle(e => e is OperationCanceledException); } }
四、Async和await关键字(C#5.0,.net4.5以后)
”async”和”await”:你用我用,才有用;你用我不用,等于没用。
.net异步编程の-------异步编程模型(APM)