
 
 | 
| 技术资料  > .Net专区 > C#语言 : C#消息队列应用程序 -1 |  
C#消息队列应用程序 -1 March 25,2004 |  
简介  
 
  Microsoft近期推出一种用于生成集成应用程序的新平台——Microsoft  
.NET框架。.NET 框架允许开发人员使用任何编程语言迅速生成和部署Web  
服务和应用程序。Microsoft Intermediate Language (MSIL)和实时  
(JIT )编译器使这种不依赖语言的框架得以实现。  
 
  与.NET框架同时面世的还有一种新的编程语言C#(读“C sharp”)。  
C#是一种简单、新颖、面向对象和类型安全的编程语言。利用 .NET 框架  
和 C# (除 Microsoft? Visual Basic ?和 Managed C++之外),用户  
可以编写功能强大的 Microsoft Windows?和 Web应用程序及服务。本文  
提供了这样的一个解决方案,它的重点是 .NET 框架和 C# 而不是编程语  
言。C#语言的介绍可以在“ C# 简介和概述(英文)”找到。  
 
  近期的文章“MSMQ:可伸缩、高可用性的负载平衡解决方案(英文)”  
介绍了一种解决方案,用于高可用性消息队列(MSMQ)的可伸缩负载平衡  
解决方案体系结构。此解决方案中涉及了一种将 Windows服务用作智能消  
息路由器的开发方案。这样的解决方案以前只有 Microsoft Visual C++  
程序员才能实现,而 .NET 框架的出现改变了这种情况。从下面的解决方  
案中,您可以看到这一点。  
 
.NET 框架应用程序  
 
  这里介绍的解决方案是一种用来处理若干消息队列的 Windows服务;  
其中每个队列都是由多个线程进行处理(接收和处理消息)。处理程序使  
用循环法技术或应用程序特定值(消息 AppSpecific属性)从目的队列列  
表中路由消息,并使用消息属性来调用组件方法。(示例进程也属于这种  
情况。)在后一种情况下,组件的要求是它能够实现给定的接口IWeb  
Message要处理错误,应用程序需要将不能处理的消息发送到错误队列中。  
 
  消息应用程序的结构与以前的活动模板库(ATL )应用程序相似,它  
们之间的主要不同在于用于管理服务的代码的封装和 .NET 框架组件的使  
用。要创建Windows服务,.NET框架用户仅仅需要创建一个从 ServiceBase  
(来自System.ServiceControl程序集)继承的类。这毫不奇怪,因为.NET  
框架是面向对象的。  
 
应用程序结构  
 
  应用程序中主要的类是 ServiceControl ,它是从 ServiceBase继承  
的。因而,它必须实现 OnStart和 OnStop 方法,以及可选的 OnPause和  
OnContinue方法。事实上,类是在静态方法 Main 内构造的:  
 
using System;  
using System.ServiceProcess;  
 
public class ServiceControl: ServiceBase  
{  
  // 创建服务对象的主入口点  
  public static void Main()  
  {  
   ServiceBase.Run(new ServiceControl());  
  }  
 
  // 定义服务参数的构造对象  
  public ServiceControl()  
  {  
   CanPauseAndContinue = true;  
   ServiceName = "MSDNMessageService";  
   AutoLog = false;  
  }  
 
  protected override void OnStart(string[] args) {...}  
  protected override void OnStop() {...}  
  protected override void OnPause() {...}  
  protected override void OnContinue() {...}  
}  
 
  ServiceControl类创建一系列 CWorker对象,即,为需要处理的每个  
消息队列创建 CWorker类的一个实例。根据定义中处理队列所需的线程数  
目,CWorker 类依次创建了一系列的 CWorkerThread对象。CWorkerThread  
类创建的一个处理线程将执行实际的服务工作。  
 
  使用 CWorker和 CWorkerThread类的主要目的是确认服务控件 Start、  
Stop、Pause 和 Continue 命令。因为这些进程必须是无阻塞的,命令操  
作最终将在后台处理线程上执行。  
 
  CWorkerThread 是一个抽象类,被 CWorkerThreadAppSpecific 、  
CWorkerThreadRoundRobin 和 CWorkerThreadAssembly继承。这些类以不  
同的方式处理消息。前两个类通过给另一队列发送消息来处理消息(其不  
同之处在于确定接收队列路径的方式),最后一个类则使用消息属性来调  
用组件方法。  
 
  .NET 框架内部的错误处理是以基类 Exception为基础的。当系统引  
发或捕获错误时,这些错误必须是从 Exception中导出的类。CWorker  
ThreadException 类就是这样一种实现,它通过附加额外属性(用于定义  
服务是否应继续运行)来扩展基类。  
 
  最后,应用程序包含两种结构。这些值类型定义了辅助进程或线程的  
运行时参数,以简化 CWorker和 CWorkerThread对象的结构。使用值类型  
结构(而不是引用类型类)能够确保这些运行时参数维护的是数值(而不  
是引用)。  
 
IWebMessage 接口  
 
  CWorkerThread 的实现之一是一个调用组件方法的类。这个名为  
CWorkerThreadAssembly 的类使用 IWebMessage接口来定义服务和组件之  
间的约定。  
 
  与当前版本的 Microsoft Visual Studio?不同,C#接口可以在任何  
语言中显式定义,而不需要创建和编译 IDL文件。C# IWebMessage接口的  
定义如下:  
public interface IWebMessage  
{  
  WebMessageReturn Process(string sMessageLabel, string sMessage  
  Body, int iAppSpecific);  
  void Release();  
}  
 
ATL 代码中的 Process 方法是为处理消息而指定的。Process 方法的返  
回代码定义为枚举类型 WebMessageReturn:  
 
public enum WebMessageReturn  
{  
  ReturnGood,  
  ReturnBad,  
  ReturnAbort  
}  
 
  枚举的定义如下:Good表示继续处理,Bad 表示将消息写入错误队列,  
Abort 表示终止处理。Release 方法为服务提供了轻松清除类实例的途径。  
因为仅在垃圾回收的过程中才调用类实例的析构函数,所以确保所有占用  
昂贵资源(例如数据库连接)的类都有一个能够在析构之前被调用的方法,  
用来释放这些资源,这是一种非常好的构思。  
 
名称空间  
 
  在这里先简单介绍一下名称空间。名称空间允许在内部和外部表示中  
将应用程序组织成为逻辑元素。服务内的所有代码都包含在 MSDNMessage  
Service.Service 名称空间内。尽管服务代码包含在若干文件中,但是由  
于它们包含在同一名称空间中,因此用户不需要引用其他文件。  
 
  由于 IWebMessage接口包含在 MSDNMessageService.Interface 名称  
空间中,因此使用此接口的线程类具有一个接口名称空间。  
 
服务类  
 
  应用程序的目的是监视和处理消息队列,每一队列在收到消息时都执  
行不同的进程。应用程序是作为 Windows服务来实现的。  
 
ServiceBase 类  
 
  如前所述,服务的基本结构是从 ServiceBase继承的类。重要的方法  
包括 OnStart、OnStop、OnPause 和 OnContinue ,每一个替代方法都与  
一个服务控制操作直接对应。OnStart 方法的目的是创建 CWorker对象,  
而 CWorker类又创建 CWorkerThread对象,然后在该对象中创建执行服务  
工作的线程。  
 
  服务的运行时配置(以及 CWorker和 CWorkerThread对象的属性)是  
在基于 XML的配置文件中维护的。它的名称与创建的 .exe 文件相同,但  
带有一个 .cfg 后缀。配置示例如下:  
〈?xml version="1.0"?〉  
〈configuration〉  
〈ProcessList〉  
 〈ProcessDefinition  
    ProcessName="Worker1"  
    ProcessDesc="Message Worker with 2 Threads"  
    ProcessType="AppSpecific"  
    ProcessThreads="2"  
    InputQueue=".private$test_load1"  
    ErrorQueue=".private$test_error"〉  
  〈OutputList〉  
   〈OutputDefinition OutputName=".private$test_out11" /〉  
   〈OutputDefinition OutputName=".private$test_out12" /〉  
  〈/OutputList〉  
 〈/ProcessDefinition〉  
 〈ProcessDefinition  
    ProcessName="Worker2"  
    ProcessDesc="Assembly Worker with 1 Thread"  
    ProcessType="Assembly"  
    ProcessThreads="1"  
    InputQueue=".private$test_load2"  
    ErrorQueue=".private$test_error"〉  
  〈OutputList〉  
   〈OutputDefinition OutputName="C:MSDNMessageServiceMessage  
   Example.dll" /〉  
   〈OutputDefinition OutputName="MSDNMessageService.Message  
   Sample.ExampleClass"/〉  
  〈/OutputList〉  
 〈/ProcessDefinition〉  
〈/ProcessList〉  
〈/configuration〉  
 
  对此信息的访问通过来自 System.Configuration 程序集的 Config  
Manager 类来管理。静态 Get方法返回信息的集合,这些集合将被枚举以  
获得单个属性。这些属性集的设置决定了辅助对象的运行时特征。除了这  
一配置文件,您还应该创建定义 XML文件结构的图元文件,并在其中引用  
位于服务器 machine.cfg配置文件中的图元文件:  
 
〈?xml version ="1.0"?〉  
〈MetaData xmlns="x-schema:CatMeta.xms"〉  
  〈DatabaseMeta InternalName="MessageService"〉  
  〈ServerWiring Interceptor="Core_XMLInterceptor"/〉  
  〈Collection  
     InternalName="Process" PublicName="ProcessList"  
     PublicRowName="ProcessDefinition"  
     SchemaGeneratorFlags="EMITXMLSCHEMA"〉  
   〈Property InternalName="ProcessName" Type="String" Meta  
   Flags="PRIMARYKEY" /〉  
   〈Property InternalName="ProcessDesc" Type="String" /〉  
   〈Property InternalName="ProcessType" Type="Int32" Default  
   Value="RoundRobin" 〉  
     〈Enum InternalName="RoundRobin" Value="0"/〉  
     〈Enum InternalName="AppSpecific" Value="1"/〉  
     〈Enum InternalName="Assembly" Value="2"/〉  
   〈/Property〉  
   〈Property InternalName="ProcessThreads" Type="Int32"  
   DefaultValue="1" /〉  
   〈Property InternalName="InputQueue" Type="String" /〉  
   〈Property InternalName="ErrorQueue" Type="String" /〉  
   〈Property InternalName="OutputName" Type="String" /〉  
   〈QueryMeta InternalName="All" MetaFlags="ALL" /〉  
   〈QueryMeta InternalName="QueryByFile" CellName="__FILE"  
   Operator="EQUAL" /〉  
  〈/Collection〉  
  〈Collection  
     InternalName="Output" PublicName="OutputList"  
     PublicRowName="OutputDefinition"  
     SchemaGeneratorFlags="EMITXMLSCHEMA"〉  
   〈Property InternalName="ProcessName" Type="String" Meta  
   Flags="PRIMARYKEY" /〉  
   〈Property InternalName="OutputName" Type="String" Meta  
   Flags="PRIMARYKEY" /〉  
   〈QueryMeta InternalName="All" MetaFlags="ALL" /〉  
   〈QueryMeta InternalName="QueryByFile" CellName="__FILE"  
   Operator="EQUAL" /〉  
  〈/Collection〉  
  〈/DatabaseMeta〉  
  〈RelationMeta    
   PrimaryTable="Process" PrimaryColumns="ProcessName"  
   ForeignTable="Output" ForeignColumns="ProcessName"  
   MetaFlags="USECONTAINMENT"/〉  
〈/MetaData〉  
 
  由于 Service类必须维护一个已创建辅助对象的列表,因此使用了  
Hashtable 集合,用于保持类型对象的名称/ 数值对列表。Hashtable 不  
仅支持枚举,还允许通过关键字来查询值。在应用程序中,XML 进程名称  
是唯一的关键字:  
private Hashtable htWorkers = new Hashtable();  
IConfigCollection cWorkers = ConfigManager.Get("ProcessList", new  
AppDomainSelector());  
foreach (IConfigItem ciWorker in cWorkers)  
{  
  WorkerFormatter sfWorker = new WorkerFormatter();  
  sfWorker.ProcessName = (string)ciWorker["ProcessName"];  
  sfWorker.ProcessDesc = (string)ciWorker["ProcessDesc"];  
  sfWorker.NumberThreads = (int)ciWorker["ProcessThreads"];  
  sfWorker.InputQueue = (string)ciWorker["InputQueue"];  
  sfWorker.ErrorQueue = (string)ciWorker["ErrorQueue"];  
  // 计算并定义进程类型  
  switch ((int)ciWorker["ProcessType"])  
  {  
   case 0:  
     sfWorker.ProcessType = WorkerFormatter.SFProcessType.  
     ProcessRoundRobin;  
     break;  
   case 1:  
     sfWorker.ProcessType = WorkerFormatter.SFProcessType.  
     ProcessAppSpecific;  
     break;  
   case 2:  
     sfWorker.ProcessType = WorkerFormatter.SFProcessType.  
     ProcessAssembly;  
     break;  
   default:  
     throw new Exception("Unknown Processing Type");  
  }  
  // 执行更多的工作以读取输出信息  
  string sProcessName = (string)ciWorker["ProcessName"];  
  if (htWorkers.ContainsKey(sProcessName))  
   throw new ArgumentException("Process Name Must be Unique: "  
   + sProcessName);  
  htWorkers.Add(sProcessName, new CWorker(sfWorker));  
}  
 
  在这段代码中没有包含的主要信息是输出数据的获取。每一个进程定  
义中都有一组相应的输出定义项。该信息是通过如下的简单查询读取的:  
 
string sQuery = "SELECT * FROM OutputList WHERE ProcessName=" +  
  sfWorker.ProcessName + " AND Selector=appdomain://";  
ConfigQuery qQuery = new ConfigQuery(sQuery);  
IConfigCollection cOutputs = ConfigManager.Get("OutputList",  
qQuery);  
int iSize = cOutputs.Count, iLoop = 0;  
sfWorker.OutputName = new string[iSize];  
foreach (IConfigItem ciOutput in cOutputs)  
  sfWorker.OutputName[iLoop++] = (string)ciOutput["OutputName"];  
 
  CWorkerThread 和 Cworker类都有相应的服务控制方法,根据服务控  
制操作进行调用。由于 Hashtable中引用了每一个 CWorker对象,因此需  
要枚举 Hashtable的内容,以调用适当的服务控制方法:  
foreach (CWorker cWorker in htWorkers.Values)  
  cWorker.Start();  
 
  类似地,实现的 OnPause、OnContinue和 OnStop 方法是通过调用  
CWorker 对象上的相应方法来执行操作的。  
 
CWorker 类  
 
  CWorker 类的主要功能是创建和管理 CWorkerThread对象。Start 、  
Stop、Pause 和 Continue 方法调用相应的 CWorkerThread方法。实际的  
CWorkerThread 对象是在Start 方法中创建的。与使用 Hashtable管理辅  
助对象引用的 Service类相似,CWorker 使用 ArrayList(简单的动态数  
组)来维护线程对象的列表。  
         
         |  
 
 | 
  
Copyright © 2001-2008 Shenzhen Hiblue Software Team All rights reserved