| 
| 技术资料  > .Net专区 > C#语言 : C#消息队列应用程序 -2 |  
C#消息队列应用程序 -2 March 25,2004 |  
  在这个数组内部,CWorker 类创建了 CWorkerThread类的一个实现版  
本。CWorkerThread 类(将在下面讨论)是一个必须继承的抽象类。导出  
类定义了消息的处理方式:  
aThreads = new ArrayList();  
for (int idx=0; idx〈sfWorker.NumberThreads; idx++)  
{  
  WorkerThreadFormatter wfThread = new WorkerThreadFormatter();  
  wfThread.ProcessName = sfWorker.ProcessName;  
  wfThread.ProcessDesc = sfWorker.ProcessDesc;  
  wfThread.ThreadNumber = idx;  
  wfThread.InputQueue = sfWorker.InputQueue;  
  wfThread.ErrorQueue = sfWorker.ErrorQueue;  
  wfThread.OutputName = sfWorker.OutputName;  
  // 定义辅助类型,并将其插入辅助线程结构  
  CWorkerThread wtBase;  
  switch (sfWorker.ProcessType)  
  {  
   case WorkerFormatter.SFProcessType.ProcessRoundRobin:  
     wtBase = new CWorkerThreadRoundRobin(this, wfThread);  
     break;  
   case WorkerFormatter.SFProcessType.ProcessAppSpecific:  
     wtBase = new CWorkerThreadAppSpecific(this, wfThread);  
     break;  
   case WorkerFormatter.SFProcessType.ProcessAssembly:  
     wtBase = new CWorkerThreadAssembly(this, wfThread);  
     break;  
   default:  
     throw new Exception("Unknown Processing Type");  
  }  
  // 添加对数组的调用  
  aThreads.Insert(idx, wtBase);  
}  
 
  一旦所有的对象都已创建,就可以通过调用每个线程对象的 Start方  
法来启动它们:  
foreach(CWorkerThread cThread in aThreads)  
  cThread.Start();  
 
  Stop、Pause 和 Continue 方法在 foreach循环里执行的操作类似。  
Stop方法具有如下的垃圾收集操作:  
GC.SuppressFinalize(this);  
 
  在类析构函数中将调用 Stop 方法,这样,在没有显式调用 Stop 方  
法的情况下也可以正确地终止对象。如果调用了 Stop 方法,将不需要析  
构函数。SuppressFinalize方法能够防止调用对象的 Finalize 方法(析  
构函数的实际实现)。  
 
CWorkerThread 抽象类  
 
  CWorkerThread 是一个由 CWorkerThreadAppSpecifc、CWorkerThread  
RoundRobin 和 CWorkerThreadAssembly继承的抽象类。无论如何处理消  
息,队列的大部分处理是相同的,所以 CWorkerThread类提供了这一功能。  
这个类提供了抽象方法(必须被实际方法替代)以管理资源和处理消息。  
 
  类的工作再一次通过 Start、Stop、Pause 和 Continue 方法来实现。  
在 Start方法中引用了输入和错误队列。在 .NET 框架中,消息由 System.  
Messaging 名称空间处理:  
// 尝试打开队列,并设置默认的读写属性  
MessageQueue mqInput = new MessageQueue(sInputQueue);  
mqInput.MessageReadPropertyFilter.Body = true;  
mqInput.MessageReadPropertyFilter.AppSpecific = true;  
MessageQueue mqError = new MessageQueue(sErrorQueue);  
// 如果使用 MSMQ COM,则将格式化程序设置为 ActiveX  
mqInput.Formatter = new ActiveXMessageFormatter();  
mqError.Formatter = new ActiveXMessageFormatter();  
 
  一旦定义了消息队列引用,即会创建一个线程用于实际的处理函数  
(称为 ProcessMessages)。在 .NET 框架中,使用 System.Threading  
名称空间很容易实现线程处理:  
procMessage = new Thread(new ThreadStart(ProcessMessages));  
procMessage.Start();  
 
  ProcessMessages 函数是基于 Boolean值的处理循环。当数值设为  
False,处理循环将终止。因此,线程对象的 Stop 方法只设置这一Boolean  
值,然后关闭打开的消息队列,并加入带有主线程的线程:  
// 加入服务线程和处理线程  
bRun = false;  
procMessage.Join();  
// 关闭打开的消息队列  
mqInput.Close();  
mqError.Close();  
 
Pause 方法只设置一个 Boolean 值,使处理线程休眠半秒钟:  
 
if (bPause)  
  Thread.Sleep(500);  
 
  最后,每一个 Start、Stop、Pause 和 Continue 方法将调用抽象的  
OnStart 、OnStop、OnPause 和 OnContinue 方法。这些抽象方法为实现  
的类提供了挂钩,以捕获和释放所需的资源。  
 
  ProcessMessages 循环具有如下基本结构:  
●接收Message。  
●如果Message具有成功的Receive,则调用抽象ProcessMessage方法。  
●如果Receive或ProcessMessage失败,将Message发送至错误队列中。  
 
Message mInput;  
try  
{  
  // 从队列中读取,并等候 1 秒  
  mInput = mqInput.Receive(new TimeSpan(0,0,0,1));  
}  
catch (MessageQueueException mqe)  
{  
  // 将消息设置为 null  
  mInput = null;  
  // 查看错误代码,了解是否超时  
  if (mqe.ErrorCode != (-1072824293) ) //0xC00E001B  
  {  
   // 如果未超时,发出一个错误并记录错误号  
   LogError("Error: " + mqe.Message);  
   throw mqe;  
  }  
}  
if (mInput != null)  
{  
  // 得到一个要处理的消息,调用处理消息抽象方法  
  try  
  {  
   ProcessMessage(mInput);  
  }  
  // 捕获已知异常状态的错误  
  catch (CWorkerThreadException ex)  
  {  
   ProcessError(mInput, ex.Terminate);  
  }  
  // 捕获未知异常,并调用 Terminate  
  catch  
  {  
   ProcessError(mInput, true);  
  }  
}  
 
  ProcessError方法将错误的消息发送至错误队列。另外,它也可能引  
发异常来终止线程。如果ProcessMessage方法引发了终止错误或 CWorker  
ThreadException类型,它将执行此操作。  
 
CworkerThread 导出类  
 
  任何从 CWorkerThread中继承的类都必须提供 OnStart、OnStop、On  
Pause、OnContinue和 ProcessMessage 方法。OnStart 和 OnStop方法获  
取并释放处理资源。OnPause 和 OnContinue 方法允许临时释放和重新获  
取这些资源。ProcessMessage方法应该处理消息,并在出现失败事件时引  
发 CWorkerThreadException 异常。  
 
  由于 CWorkerThread构造函数定义运行时参数,导出类必须调用基类  
构造函数:  
public CWorkerThreadDerived(CWorker v_cParent, WorkerThread  
Formatter v_wfThread)  
  : base (v_cParent, v_wfThread) {}  
 
  导出类提供了两种类型的处理:将消息发送至另一队列,或者调用组  
件方法。接收和发送消息的两种实现使用了循环技术或应用程序偏移(保  
留在消息 AppSpecific属性中),作为使用哪一队列的决定因素。此方案  
中的配置文件应该包括队列路径的列表。实现的 OnStart和 OnStop 方法  
应该打开和关闭对这些队列的引用:  
iQueues = wfThread.OutputName.Length;  
mqOutput = new MessageQueue[iQueues];  
for (int idx=0; idx〈iQueues; idx++)  
{  
  mqOutput[idx] = new MessageQueue(wfThread.OutputName[idx]);  
  mqOutput[idx].Formatter = new ActiveXMessageFormatter();  
}  
 
  在这些方案中,消息的处理很简单:将消息发送必要的输出队列。在  
循环情况下,这个进程为:  
try  
{  
  mqOutput[iNextQueue].Send(v_mInput);  
}  
catch (Exception ex)  
{  
  // 如果错误强制终止异常  
  throw new CWorkerThreadException(ex.Message, true);  
}  
// 计算下一个队列号  
iNextQueue++;  
iNextQueue %= iQueues;  
 
  后一种调用带消息参数的组件的实现方法比较有趣。ProcessMessage  
方法使用 IWebMessage接口调入一个 .NET 组件。OnStart 和 OnStop 方  
法获取和释放此组件的引用。  
 
  此方案中的配置文件应该包含两个项目:完整的类名和类所在文件的  
位置。按照 IWebMessage接口中的定义,在组件上调用 Process方法。  
 
  要获取对象引用,需要使用 Activator.CreateInstance 方法。此函  
数需要一个程序集类型。在这里,它是从程序集文件路径和类名中导出的。  
一旦获取对象引用,它将被放入合适的接口:  
private IWebMessage iwmSample;  
private string sFilePath, sTypeName;  
// 保存程序集路径和类型名称  
sFilePath = wfThread.OutputName[0];  
sTypeName = wfThread.OutputName[1];  
// 获取对必要对象的引用  
Assembly asmSample = Assembly.LoadFrom(sFilePath);  
Type typSample = asmSample.GetType(sTypeName);  
object objSample = Activator.CreateInstance(typSample);  
// 定义给对象的必要接口  
iwmSample = (IWebMessage)objSample;  
 
  获取对象引用后,ProcessMessage方法将在 IWebMessage接口上调用  
Process 方法:  
WebMessageReturn wbrSample;  
try  
{  
  // 定义方法调用的参数  
  string sLabel = v_mInput.Label;  
  string sBody = (string)v_mInput.Body;  
  int iAppSpecific = v_mInput.AppSpecific;  
  // 调用方法并捕捉返回代码  
  wbrSample = iwmSample.Process(sLabel, sBody, iAppSpecific);  
}  
catch (InvalidCastException ex)  
{  
  // 如果在消息内容中发生错误,则强制发出一个非终止异常  
  throw new CWorkerThreadException(ex.Message, false);  
}  
catch (Exception ex)  
{  
  // 如果错误调用程序集,则强制发出终止异常  
  throw new CWorkerThreadException(ex.Message, true);  
}  
// 如果没有错误,则检查对象调用的返回状态  
switch (wbrSample)  
{  
  case WebMessageReturn.ReturnBad:  
   throw new CWorkerThreadException  
     ("Unable to process message: Message marked bad", false);  
  case WebMessageReturn.ReturnAbort:  
   throw new CWorkerThreadException  
     ("Unable to process message: Process terminating", true);  
  default:  
   break;  
}  
 
  提供的示例组件将消息正文写入数据库表。如果捕获到严重数据库错  
误,您可能希望终止处理过程,但是在这里,仅仅将消息标记为错误的消  
息。  
 
  由于此示例中创建的类实例可能会获取并保留昂贵的数据库资源,所  
以用 OnPause和 OnContinue 方法释放和重新获取对象引用。  
 
检测设备  
 
  就象在所有优秀的应用程序中一样,检测设备用于监测应用程序的状  
态。。NET 框架大大简化了将事件日志、性能计数器和 Windows管理检测  
设备(WMI )纳入应用程序的过程。消息应用程序使用时间日志和性能计  
数器,二者都是来自 System.Diagnostics 程序集。  
 
  在 ServiceBase类中,您可以自动启用事件日志。另外,ServiceBase  
EventLog成员支持写入应用程序事件日志:  
EventLog.WriteEntry(sMyMessage, EventLogEntryType.Information);  
 
  对于写入事件日志而不是应用程序日志的应用程序,它能够很容易地  
创建和获取 EventLog 资源的引用(正如在 CWorker类中所做的一样),  
并能够使用 WriteEntry 方法记录日志项:  
private EventLog cLog;  
string sSource = ServiceControl.ServiceControlName;  
string sLog = "Application";  
// 查看源是否存在,如果不存在,则创建源  
if (!EventLog.SourceExists(sSource))  
  EventLog.CreateEventSource(sSource, sLog);  
// 创建日志对象,并引用现在定义的源  
cLog = new EventLog();  
cLog.Source = sSource;  
// 在日志中写入条目,表明创建成功  
cLog.WriteEntry("已成功创建", EventLogEntryType.Information);  
 
  .NET 框架大大简化了性能计数器。对于每一个处理线程、线程导出  
的用户和整个应用程序,这一消息应用程序都能提供计数器,用于跟踪消  
息数量和每秒钟处理消息的数量。要提供此功能,您需要定义性能计数器  
的类别,然后增加相应的计数器实例。  
 
  性能计数器的类别在服务 OnStart方法中定义。这些类别代表两种计  
数器——消息总数和每秒钟处理的消息数:  
CounterCreationData[] cdMessage = new CounterCreationData[2];  
cdMessage[0] = new CounterCreationData("Messages/Total", "Total  
Messages Processed",  
PerformanceCounterType.NumberOfItems64);  
cdMessage[1] = new CounterCreationData("Messages/Second",  
"Messages Processed a Second",  
PerformanceCounterType.RateOfChangePerSecond32);  
PerformanceCounterCategory.Create("MSDN Message Service", "MSDN  
Message Service Counters", cdMessage);  
 
  一旦定义了性能计数器类别,将创建 PerformanceCounter 对象以访  
问计数器实例功能。PerformanceCounter对象需要类别、计数器名称和一  
个可选的实例名称。对于辅助进程,将使用来自 XML文件的进程名称,代  
码如下:  
pcMsgTotWorker = new PerformanceCounter("MSDN Message Service",  
"Messages/Total", sProcessName);  
pcMsgSecWorker = new PerformanceCounter("MSDN Message Service",  
"Messages/Second", sProcessName);  
pcMsgTotWorker.RawValue = 0;  
pcMsgSecWorker.RawValue = 0;  
 
要增加计数器的值,仅仅需要调用适当的方法:  
 
pcMsgTotWorker.IncrementBy(1);  
pcMsgSecWorker.IncrementBy(1);  
 
最后说明一点,服务终止时,安装的性能计数器类别应该从系统中删除:  
 
PerformanceCounterCategory.Delete("MSDN Message Service");  
 
  由于性能计数器在 .NET 框架中工作,因此需要运行一项特殊的服务。  
此服务(PerfCounterService)提供了共享内存。计数器信息将写入共享  
内存,并被性能计数器系统读取。  
 
安装  
 
  在结束以前,我们来简要介绍一下安装以及称为 installutil.exe的  
安装工具。由于此应用程序是 Windows服务,它必须使用installutil.exe  
来安装。因此,需要使用一个从 System.Configuration.Install 程序集  
中继承的 Installer类:  
public class ServiceRegister: Installer  
{  
  private ServiceInstaller serviceInstaller;  
  private ServiceProcessInstaller processInstaller;  
  public ServiceRegister()  
  {  
   // 创建服务安装程序  
   serviceInstaller = new ServiceInstaller();  
   serviceInstaller.StartType = ServiceStart.Manual;  
   serviceInstaller.ServiceName = ServiceControl.ServiceControl  
   Name;  
   serviceInstaller.DisplayName = ServiceControl.ServiceControl  
   Desc;  
   Installers.Add(serviceInstaller);  
   // 创建进程安装程序  
   processInstaller = new ServiceProcessInstaller();  
   processInstaller.RunUnderSystemAccount = true;  
   Installers.Add(processInstaller);  
  }  
}  
 
  如此示例类所示,对于一个 Windows服务,服务和服务进程各需要一  
个安装程序,以定义运行服务的帐户。其他安装程序允许注册事件日志和  
性能计数器等资源。  
 
总结  
 
  从这个 .NET 框架应用程序示例中可以看出,以前只有 Visual C++  
程序员能够编写的应用程序,现在使用简单的面向对象程序即可实现。尽  
管我们的重点是 C# ,但本文所述的内容也同样适用于 Visual Basic 和  
Managed C++.新的 .NET 框架使开发人员能够使用任何编程语言来创建功  
能强大、可伸缩的 Windows应用程序和服务。  
 
  新的 .NET 框架不仅简化和扩展了编程的种种可能,还能够轻松地将  
人们经常遗忘的应用程序检测设备(例如性能监测计数器和事件日志通知)  
合并到应用程序中。尽管这里的应用程序没有使用 Windows管理检测设备  
(WMI ),但 .NET 框架同样也可以应用它。  
         
         |  
 
 |