corba事件服务中的push和pull模型

原创
2013/06/10 11:13
阅读数 4K

首先说一下Corba中相对比较简单的服务模型,事件服务。

对于事件服务的话,有push和pull两种模型。下面就分别说一下这两种模型具体实现:

首先,push和pull模型都是基于事件通道EventChannel的,两种模型的通信最终都必须通过事件通道push或pull对象的引用,那就简要的说一下事件通道的概念。

事件信道(event channel)是一个既是事件提供者又是事件消费者的插入对象,它允许多个事件提供者和多个事件消费者异步地通信而不需要相互了解。事件信道又是一个标准的CORBA对象,驻留在对象请求中介上,可以断开提供者和消费者的通信。

事件信道利用代理(proxy)对象撤消时间的提供者和消费者。提供者和消费者不是直接交互作用,而是从事件信道那里获得代理对象,让代理对象在将来的事件交换中代表自己。提供者获得一个消费者代理,而消费者获得一个提供者代理。事件信道通过这些代理对象代理事件的交换。

push模型而言:

1)、下面说一下事件通道的注册绑定过程,具体实现代码如下:

    Properties properties = new Properties();

    //properties用来设置初始化orb所需要的参数信息

    properties.put("org.omg.PortableInterceptor.ORBInitializerClass.bidir_init","org        .jac    orb.orb.giop.BiDirConnectionInitializer");
    
    properties.put("org.omg.CORBA.ORBClass", "org.jacorb.orb.ORB");

    properties.put("org.omg.CORBA.ORBSingletonClass",

            "org.jacorb.orb.ORBSingleton");

    //初始化orb

    org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(argv, properties);

    try

    {

      //序列化poa

      org.omg.PortableServer.POA poa =

          org.omg.PortableServer.POAHelper.narrow(

              orb.resolve_initial_references("RootPOA"));

     

      NamingContextExt nc =

          NamingContextExtHelper.narrow(

              orb.resolve_initial_references("NameService"));

      //使用orb和poa来创建事件对象

      EventChannelImpl channel = new EventChannelImpl(orb,poa);

 

      poa.the_POAManager().activate();       //激活poa的manager

 

      org.omg.CORBA.Object o = poa.servant_to_reference(channel);

      //将事件通道绑定到"eventchannel.example"名字上

      nc.bind(nc.to_name("eventchannel.example"), o);

 

      orb.run();

    }

    catch( Exception e)

    {

      e.printStackTrace();

    }

2)、下面说一下PushSupplier端的具体实现代码如下:

class PushSupplierDemo extends PushSupplierPOA {

 

    public PushSupplierDemo(String[] args) {

        Properties properties = new Properties();

        properties.put(

                "org.omg.PortableInterceptor.ORBInitializerClass.bidir_init",

                "org.jacorb.orb.giop.BiDirConnectionInitializer");

        properties.put("org.omg.CORBA.ORBClass", "org.jacorb.orb.ORB");

        properties.put("org.omg.CORBA.ORBSingletonClass",

                "org.jacorb.orb.ORBSingleton");

        org.omg.CosEventChannelAdmin.EventChannel e = null;

        org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(args, properties);

        org.omg.PortableServer.POA poa = null;

        org.omg.CORBA.Object refObj = null;

 

        try {

            poa = org.omg.PortableServer.POAHelper.narrow(orb

                    .resolve_initial_references("RootPOA"));

            poa.the_POAManager().activate();

 

            NamingContextExt nc = NamingContextExtHelper.narrow(orb

                    .resolve_initial_references("NameService"));

            //

            e = EventChannelHelper.narrow(nc.resolve(nc

                    .to_name("eventchannel.example")));

        } catch (Exception ex) {

            ex.printStackTrace();

        }

 

        SupplierAdmin supplierAdmin = e.for_suppliers();

        ProxyPushConsumer proxyPushConsumer = supplierAdmin

                .obtain_push_consumer();

 

        try {

            proxyPushConsumer.connect_push_supplier(_this(orb));

        } catch (org.omg.CosEventChannelAdmin.AlreadyConnected ex) {

            ex.printStackTrace();

        }

 

        for (int i = 0; i < 10; i++) {

            try {

                Any any = orb.create_any();

                try {

                    System.out.println("supplier sleep");

                    Thread.sleep(1000);

                } catch (InterruptedException e1) {

                    e1.printStackTrace();

                }

                // 实例化一个对象

                Version_IOperationsImpl version_IOperationsImpl = new Version_IOperationsImpl();

                version_IOperationsImpl.setVersion("version0000");

                try {

                    // 从servant获得一个对象引用

                    refObj = poa.servant_to_reference(version_IOperationsImpl); //必须传送的是对象的引用,而不是对象本身

                } catch (ServantNotActive e3) {

                    e3.printStackTrace();

                } catch (WrongPolicy e3) {

                    e3.printStackTrace();

                }

                try {

                    any.insert_Object(refObj);  //将对象的引用,加入到any对象之中

                } catch (Exception e2) {

                    e2.printStackTrace();

                }

                proxyPushConsumer.push(any);

                System.out.println("Pushing event ### " + (i));

 

            } catch (Disconnected d) {

                d.printStackTrace();

            }

        }

        proxyPushConsumer.disconnect_push_consumer();

    }

 

    public void disconnect_push_supplier() {

        System.out.println("Supplier disconnected");

    }

 

    public static void main(String[] args) {

        PushSupplierDemo demo = new PushSupplierDemo(args);

}

3)、下面说一下PushConsumer端的具体实现代码如下:

static public void main(String[] args) {

        Properties properties = new Properties();

        properties.put(

                "org.omg.PortableInterceptor.ORBInitializerClass.bidir_init",

                "org.jacorb.orb.giop.BiDirConnectionInitializer");

        properties.put("org.omg.CORBA.ORBClass", "org.jacorb.orb.ORB");

        properties.put("org.omg.CORBA.ORBSingletonClass",

                "org.jacorb.orb.ORBSingleton");

 

        EventChannel ecs = null;

        ConsumerAdmin ca = null;

        PushConsumer pushConsumer = null;

        ProxyPushSupplier pps = null;

 

        try {

            orb = org.omg.CORBA.ORB.init(args, properties);

            NamingContextExt nc = NamingContextExtHelper.narrow(orb

                    .resolve_initial_references("NameService"));

 

            ecs = EventChannelHelper.narrow(nc.resolve(nc

                    .to_name("eventchannel.example")));

        } catch (Exception e) {

            e.printStackTrace();

        }

 

        ca = ecs.for_consumers();

        pps = ca.obtain_push_supplier();

 

        try {

            org.omg.PortableServer.POA poa = org.omg.PortableServer.POAHelper

                    .narrow(orb.resolve_initial_references("RootPOA"));

 

            poa.the_POAManager().activate();

 

            PushConsumerPOATie pt = new PushConsumerPOATie(

                    new PushConsumerDemo(pps));

            pt._this_object(orb);

            pushConsumer = PushConsumerHelper.narrow(poa

                    .servant_to_reference(pt));

            pps.connect_push_consumer(pushConsumer);

            System.out.println("PushConsumerImpl registered.");

            orb.run();

        } catch (Exception e) {

            e.printStackTrace();

        }

        System.out.println("Quit.");

    }

 

    public synchronized void push(org.omg.CORBA.Any data)

            throws org.omg.CosEventComm.Disconnected {

        org.omg.CORBA.Object object = data.extract_Object();

        try {

            Thread.sleep(1000);

            System.out.println("consumer sleep");

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        Version_I meImpl =  Version_IHelper.narrow(object); //必须用接口来接受序列化

        System.out.println("Server Version: " + meImpl.getVersion());

        count++;

        System.out.println("event@@@ " + count + " : " + data.extract_Object());

       

        if (count >= limit) {

            System.out.println("unregister");

            myPps.disconnect_push_supplier();

            orb.shutdown(false);

     }

}

展开阅读全文
打赏
1
15 收藏
分享
加载中
更多评论
打赏
0 评论
15 收藏
1
分享
返回顶部
顶部