文档章节

【CEP】重构和改进HelloInsightObservable

realsa
 realsa
发布于 2014/12/25 20:40
字数 1474
阅读 109
收藏 0

上一篇 提到的官方源码也可以在我的共享(http://pan.baidu.com/s/1qWqKe5Y)下载。不了解观察者设计模式的读者阅读源码会有意义点困难(比如我),可以参考(.NET设计模式(19):观察者模式(Observer Pattern))入门。

1、官方源码的不足

HelloInsightObservable官方源码利用toToPointStream方法将观察者的实例转化为点事件流,接着在点事件流中使用linq查询e>50的输入,并将其输出

运行结果如下:

其不足之处在于代码有点混乱,而且只有一个观察者。

接下来本文就逐步修改,并且实现多个观察者的情况。

program.cs中定义观察者,将观察者“订阅”到目标对象的语句如下:

var outputObserver = new OutputObserver();

var outputObservable = query.ToObservable();//将事件流转化为可观察的输出               

outputObservable.Subscribe(outputObserver);//提供通知信息到outputObserver

那容易想到的思路是直接在program.cs中添加多个观察者,再使用Subscribe方法订阅多个观察者。但是输出每次都有变动,由于不同观察者输出一样也看不出明显规律,偶尔还会由于枚举观察者的过程中观察者集合变动而产生异常

这是因为InputObservable.cs中模拟输入流的GenerateInput是Timer的回调函数。每一个观察者在运行之后都会将Timer设为停止状态,别的观察者在Timer已经启动的情况下加入不是很恰当。令人奇怪的是官方源码在InputObservable. cs的构造函数中启动了Timer,既然没打算添加多个观察者,那在GenerateInput中遍历观察者集合Observers的语句有什么意义?

2、修改OutputObserver,添加name属性

2.1、新建项目C#控制台应用HelloInsight_edit

添加如下引用:

Microsoft.ComplexEventProcessing;

Microsoft.ComplexEventProcessing.Observable;

System.Reactive;

System.Reactive.Providers;

2.2、实现接口IObserver<int>

namespace HelloInsight_edit
{ 
    public class OutputObserver:IObserver<int>//实现IObserver接口
    {
        private string name;
        public OutputObserver(string name){
            this.name = name;
        }
        public virtual void OnCompleted()
        {
            Console.WriteLine("Stopping query...");
        }
        public virtual void OnError(Exception e)
        {
            Console.WriteLine("Unexpected error occured");
        }
        public virtual void OnNext(int value)
        {
            Console.WriteLine("{0}观察到的value: {1}", this.name,value);
        }        
    }
}

为简单起见,IObserver的抽象类型都使用int型,以后Main方法创建事件流的时候也会相应修改。

3、修改事件源,实现IObservable接口

我们要删掉构造方法中的timer.change(timeSpan,timeSpan),新建了update方法,用来调用这句话。这样可以使得多个observer都添加到observers中之后再启动Timer。

public class EventSource:IObservable<int>
    {

        private List<IObserver<int>> observers = new List<IObserver<int>>();

        private readonly int dataNumber;
        private int generatedNumber;
        private Random random;
        private readonly Timer timer;
        private readonly int timeSpan;
        //add
        private int _randomNumber;
        public EventSource(int dataNumber)
        {
            Console.WriteLine("我是构造方法");
            this.random = new Random();
            this.dataNumber = dataNumber;
            this.generatedNumber = 0;
            this.timer = new Timer(GenerateInput);//callback是一个委托,表示要执行的方法
            this.timeSpan = 100;//每个随机数字产生的时间间隔 1000ms
            //timer.Change(timeSpan, timeSpan);//此语句控制数据
            this._randomNumber = -1;//初始化随机数字
        }
        public int RandomNumber
        {
            get { return _randomNumber; }
            set { this._randomNumber = value; }
        }
        public void Update()
        {
            timer.Change(timeSpan, timeSpan);
        }
        private void GenerateInput(object _)
        {
            foreach (var observer in observers)
            {
                _randomNumber= random.Next(100);
                Console.WriteLine("Random generated data {0} : {1}", generatedNumber, _randomNumber);
                observer.OnNext(_randomNumber);
                generatedNumber++;
                if (generatedNumber >= dataNumber)
                {
                    observer.OnCompleted();
                    timer.Change(Timeout.Infinite, timeSpan);
                    return;
                }
            }
            timer.Change(timeSpan, timeSpan);
        }
        public void AddObserver(IObserver<int> observer)
        {
            observers.Add(observer);
        }
        public void RemoveObserver(IObserver<int> observer)
        {
            observers.Remove(observer);
        }

        //必须实现的方法
        public IDisposable Subscribe(IObserver<int> observer)
        {

            if (observer != null && !observers.Contains(observer))
            {
                observers.Add(observer);
            }
            Console.WriteLine("我是subscriber");
            return observer as IDisposable;

        }
        
    }


4、修改program.cs

将输入源的实例es转化为点事件流stream,query过滤得到stream中大于50的事件流,query2过滤得到stream大于70的事件流。建立了3个观察者roger、luffy和nami,我们用luffy观察query,用nami观察query2。

修好program.cs之后就可以调试了噢耶……

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
//add
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Linq;

namespace HelloInsight_edit
{
    class Program
    {
        static void Main(string[] args)
        {
            //将EventSource类作为CEP引擎的输入。
            EventSource es = new EventSource(10);
            
            var server = Server.Create("Default");
            var application = server.CreateApplication("Observable Application");
            //注意以下4行,这里与适配器方式的程序不同的是,没有插入CTI事件。
            var stream = es.ToPointStream(application,
               e => PointEvent.CreateInsert(DateTime.Now, e),
                 AdvanceTimeSettings.StrictlyIncreasingStartTime,
                 "Observable Stream");

            var query = from e in stream
                        where e > 50
                        select e;

            OutputObserver roger = new OutputObserver("roger");
            OutputObserver luffy = new OutputObserver("luffy");
            OutputObserver nami = new OutputObserver("nami");
            
            Console.WriteLine("Starting query...");
            //直接对原始流添加观察者            
            //es.AddObserver(roger); es.AddObserver(luffy); es.AddObserver(nami);
            
            //对newStream添加观察者
            var newStream = query.ToObservable();
            newStream.Subscribe(luffy); //newStream.Subscribe(nami);//添加多个订阅者可能会有异常

            //对newStream2添加观察者
            var query2 = from e in stream
                    where e > 70
                    select e;
            var newStream2 = query2.ToObservable();
            newStream2.Subscribe(nami);

            //调用timer.change(定义callback的等待时间和时间间隔)
            es.Update();           
            Console.ReadLine();
        }
    }
}

运行结果:

可以看出,Subscribe两个观察者的操作先执行。

在遍历观察者集合observers的过程中,每组显示2个随机数。luffy和nami依次观察第一个和第二个。

{?个人理解为newStream.Subscribe(luffy);的功能类似于一个绑定了luffy的线程,遍历结束之后全部用户开始依次输出。全局变量generatedNumber负责整体次数}

这不是我们要的功能。

对于流中每个事件,不同观察者都观察到才行。

4.1、重写GenerateInput(object _)

将生成随机数的语句放到遍历操作foreach之前

private void GenerateInput(object _)
        {
            _randomNumber = random.Next(100); 
            if (generatedNumber <= dataNumber)
            {
                Console.WriteLine("Random generated data {0} : {1}", generatedNumber, _randomNumber);
                foreach (var observer in observers) observer.OnNext(_randomNumber);//使用最大程度实现的OnNext   
            }
            else
            {
                observers.ElementAt(0).OnCompleted();
                timer.Change(Timeout.Infinite, timeSpan);
            }          
            generatedNumber++;
            timer.Change(timeSpan, timeSpan);
        }

运行结果:

可以看出,对于流中每个事件,luffy检测到了大于50的事件,nami检测到了大于70的事件,实现了预定的目标。

{!接下来我们要将观察者模式、点事件流检测和WCF(Windows Communication Foundation)相结合,实现事件源和观察者WCF通信,便于接下来部署到网络中}

5、参考资料

[1]IObserver<T>接口

http://msdn.microsoft.com/zh-cn/library/dd783449(v=vs.110).aspx

[2]IDisposable接口

http://msdn.microsoft.com/zh-cn/library/system.idisposable(v=vs.110).aspx

[3]virtual方法

http://www.cnblogs.com/hacker/archive/2004/08/10/31774.html

© 著作权归作者所有

realsa

realsa

粉丝 33
博文 84
码字总数 107087
作品 0
广州
程序员
私信 提问
【CEP】调试StreamInsight官方Demo——HelloInsight

原文链接:http://my.oschina.net/SnifferApache/blog/338550 StreamInsight开发指南(https://technet.microsoft.com/zh-cn/library/ee391564(v=sql.111).aspx)在“输入和输出适配器”后面......

realsa
2014/10/29
1K
2
重构-改善既有代码的设计-概述

注意事项 1.重构时不能添加新功能,只管改进程序的结构。 2.不要过早发布接口,请修改你的代码的所有权,使重构更流畅 3.1%的代码影响了90%的执行效率,不要只靠猜想来判断效率瓶颈,要量化出...

梦想游戏人
2016/05/15
93
0
《重构-改善既有代码的设计》——读后总结

一、什么是重构? 重构:在不改变软件可观察行为的前提下,为提高程序的可读性和可维护性而对程序内部结构做出合理的调整。 程序有两面价值:“今天可以为你做什么”和“明天可以为你做什么”...

hi_jyf
2017/01/22
62
0
Padre 0.94 发布,Perl 语言集成开发环境

Padre 0.94 发布,该版本改进了搜索和替换功能;文件类型过滤以及 MIME 子系统进行了重构;改进了用户界面;改进了聚焦控件;实现了版本控制功能;改进了设置对话框和调试界面等等。 Padre ...

红薯
2012/01/24
1K
0
读《代码重构》一书小计

通过阅读《代码重构》一书,让我了解的最重要的一点是“重构不同于优化”。在这之前,我的观念中,“重构”与“优化”是划等号的。不过通过这本书,我了解到他们做着完全不同的事情,甚至是对...

钟良
2016/07/13
167
1

没有更多内容

加载失败,请刷新页面

加载更多

Netty整合Protobuffer

现在我们都知道,rpc的三要素:IO模型,线程模型,然后就是数据交互模型,即我们说的序列化和反序列化,现在我们来看一下压缩比率最大的二进制序列化方式——Protobuffer,而且该方式是可以跨...

算法之名
11分钟前
7
0
如何用C++实现栈

栈的定义 栈(stack)又名堆栈,它是一种运算受限的线性表。限定仅在表尾进行插入和删除操作的线性表。这一端被称为栈顶,相对地,把另一端称为栈底。向一个栈插入新元素又称作进栈、入栈或压...

BWH_Steven
29分钟前
3
0
编程作业20190210900169

1编写一个程序,提示用户输入名和姓,然后以“名,姓”的格式打印出来。 #include <stdio.h>#include <stdlib.h> int main(){ char firstName[20]; char lastName[20]; print......

1李嘉焘1
41分钟前
6
0
补码的优点及原理分析

只讨论整数 1.计算机内部为什么没有减法器? 减法运算本身其实就是加法,如x - y即x +(-y),所以只需要将负数成功表示出来并可以参加加法运算,那加法器就可同时实现“+”和“-”的运算。这...

清自以敬
57分钟前
73
0
Docker 可视化管理 portainer

官网安装指南: https://portainer.readthedocs.io/en/latest/deployment.html docker-compose.yml 位置,下载地址:https://downloads.portainer.io/docker-compose.yml...

Moks角木
今天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部