文档章节

open-messaging使用实例

go4it
 go4it
发布于 2018/07/26 09:53
字数 1471
阅读 64
收藏 0

本文主要展示一下open-messaging使用实例

consumer

PullConsumer

openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java

public class PullConsumerApp {
    public static void main(String[] args) throws OMSResourceNotExistException {
        //Load and start the vendor implementation from a specific OMS driver URL.
        final MessagingAccessPoint messagingAccessPoint =
            OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
        messagingAccessPoint.startup();

        //Fetch a ResourceManager to create Queue resource.
        ResourceManager resourceManager = messagingAccessPoint.resourceManager();
        resourceManager.createQueue( "NS://HELLO_QUEUE", OMS.newKeyValue());

        //Start a PullConsumer to receive messages from the specific queue.
        final PullConsumer pullConsumer = messagingAccessPoint.createPullConsumer();
        pullConsumer.attachQueue("NS://HELLO_QUEUE");
        pullConsumer.startup();

        //Register a shutdown hook to close the opened endpoints.
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                pullConsumer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));

        //Receive one message from queue.
        Message message = pullConsumer.receive();

        //Acknowledge the consumed message
        pullConsumer.ack(message.sysHeaders().getString(Message.BuiltinKeys.RECEIPT_HANDLE));
    }
}
  • 首先创建messagingAccessPoint,然后启动是调用start,在shutdownHook里头调用shutdown
  • 然后通过resourceManager创建queue,和pullConsumer,并将其绑定
  • 之后调用pullConsumer的startup方法启动,然后关闭时shutdown方法
  • pullConsumer调用receive方法来拉取消息,这里改名为pull方法可能更合适些
  • pullConsumer可以对消息进行ack

PushConsumer

openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java

public class PushConsumerApp {
    public static void main(String[] args) throws OMSResourceNotExistException {
        //Load and start the vendor implementation from a specific OMS driver URL.
        final MessagingAccessPoint messagingAccessPoint =
            OMS.getMessagingAccessPoint("oms:rocketmq://localhost:10911/us-east");
        messagingAccessPoint.startup();

        //Fetch a ResourceManager to create Queue resource.
        ResourceManager resourceManager = messagingAccessPoint.resourceManager();
        final PushConsumer consumer = messagingAccessPoint.createPushConsumer();
        consumer.startup();

        //Register a shutdown hook to close the opened endpoints.
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                consumer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));

        //Consume messages from a simple queue.
        String simpleQueue = "NS://HELLO_QUEUE";
        resourceManager.createQueue( simpleQueue, OMS.newKeyValue());

        //This queue doesn't has a source queue, so only the message delivered to the queue directly can
        //be consumed by this consumer.
        consumer.attachQueue(simpleQueue, new MessageListener() {
            @Override
            public void onReceived(Message message, Context context) {
                System.out.println("Received one message: " + message);
                context.ack();
            }

        });
    }
}
  • 也是先创建messagingAccessPoint,然后创建PushConsumer
  • 也是通过resourceManager创建queue,然后跟PushConsumer绑定
  • PushConsumer通过注册MessageListener来处理回调逻辑

StreamingConsumer

openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/StreamingConsumerApp.java

public class StreamingConsumerApp {
    public static void main(String[] args) throws OMSResourceNotExistException {
        //Load and start the vendor implementation from a specific OMS driver URL.
        final MessagingAccessPoint messagingAccessPoint =
            OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
        messagingAccessPoint.startup();

        //Fetch a ResourceManager to create Queue resource.
        String targetQueue = "NS://HELLO_QUEUE";
        ResourceManager resourceManager = messagingAccessPoint.resourceManager();
        resourceManager.createQueue(targetQueue, OMS.newKeyValue());

        //Fetch the streams of the target queue.
        List<String> streams = resourceManager.listStreams(targetQueue);

        //Start a StreamingConsumer to iterate messages from the specific stream.
        final StreamingConsumer streamingConsumer = messagingAccessPoint.createStreamingConsumer();
        streamingConsumer.startup();

        //Register a shutdown hook to close the opened endpoints.
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                streamingConsumer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));

        assert streams.size() != 0;
        StreamingIterator streamingIterator = streamingConsumer.seekToBeginning(streams.get(0));

        while (streamingIterator.hasNext()) {
            Message message = streamingIterator.next();
            System.out.println("Received one message: " + message);
        }

        //All the messages in the stream has been consumed.
        //Now consume the messages in reverse order
        while (streamingIterator.hasPrevious()) {
            Message message = streamingIterator.previous();
            System.out.println("Received one message again: " + message);
        }
    }
}
  • stream的这种方式跟kafka的使用方式有点类似
  • 通过StreamingConsumer获取StreamingIterator,然后遍历获取消息

producer

Producer

openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java

public class ProducerApp {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint =
            OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");

        final Producer producer = messagingAccessPoint.createProducer();
        messagingAccessPoint.startup();
        producer.startup();

        //Register a shutdown hook to close the opened endpoints.
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                producer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));

        //Sends a message to the specified destination synchronously.
        {
            SendResult sendResult = producer.send(producer.createBytesMessage(
                "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));

            System.out.println("Send sync message OK, message id is: " + sendResult.messageId());
        }

        //Sends a message to the specified destination asynchronously.
        //And get the result through Future
        {
            final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage(
                "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));

            final SendResult sendResult = result.get(3000L);
            System.out.println("Send async message OK, message id is: " + sendResult.messageId());
        }

        //Sends a message to the specified destination asynchronously.
        //And retrieve the result through FutureListener
        {
            final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage(
                "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));

            result.addListener(new FutureListener<SendResult>() {

                @Override
                public void operationComplete(Future<SendResult> future) {
                    if (future.isDone() && null == future.getThrowable()) {
                        System.out.println("Send async message OK, message id is: " + future.get().messageId());
                    } else {
                        System.out.println("Send async message Failed, cause is: " + future.getThrowable().getMessage());
                    }
                }
            });
        }

        //Sends a message to the specific queue in OneWay manner.
        {
            //There is no {@code Future} related or {@code RuntimeException} thrown. The calling thread doesn't
            //care about the send result and also have no context to get the result.
            producer.sendOneway(producer.createBytesMessage(
                "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
        }
    }
}
  • 通过messagingAccessPoint创建producer
  • producer可以send、sendAsync以及sendOneway
  • send是同步,sendAsync是异步,可以通过listener回调处理,sendOneway就是不关系发送结果

TransactionProducer

openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java

public class TransactionProducerApp {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint =
            OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");

        final Producer producer = messagingAccessPoint.createProducer();
        messagingAccessPoint.startup();
        producer.startup();

        //Register a shutdown hook to close the opened endpoints.
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                producer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));

        Message message = producer.createBytesMessage(
            "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));

        //Sends a transaction message to the specified destination synchronously.
        SendResult sendResult = producer.send(message, new LocalTransactionExecutor() {
            @Override
            public void execute(final Message message, final ExecutionContext context) {
                //Do some local transaction
                //Then commit this transaction and the message will be delivered.
                context.commit();
            }

            @Override
            public void check(final Message message, final CheckContext context) {
                //The server may lookup the transaction status forwardly associated the specified message
                context.commit();
            }
        }, OMS.newKeyValue());

        System.out.println("Send transaction message OK, message id is: " + sendResult.messageId());
    }
}
  • 使用的还是Producer,只是send方法使用的是有LocalTransactionExecutor参数的方法,来发送事务消息
  • LocalTransactionExecutor定义了execute和check方法
  • execute方法用来做本地事务相关的操作;check方法用于检查本地事务的状态

routing

openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java

public class RoutingApp {
    public static void main(String[] args) throws OMSResourceNotExistException {
        //Load and start the vendor implementation from a specific OMS driver URL.
        final MessagingAccessPoint messagingAccessPoint =
            OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
        messagingAccessPoint.startup();

        String destinationQueue = "NS://DESTINATION_QUEUE";
        String sourceQueue = "NS://SOURCE_QUEUE";
        //Fetch a ResourceManager to create source Queue, destination Queue, and the Routing instance.
        ResourceManager resourceManager = messagingAccessPoint.resourceManager();

        //Create the destination queue.
        resourceManager.createQueue(destinationQueue, OMS.newKeyValue());
        //Create the source queue.
        resourceManager.createQueue(sourceQueue, OMS.newKeyValue());

        KeyValue routingAttr = OMS.newKeyValue();
        routingAttr.put(OMSBuiltinKeys.ROUTING_SOURCE, sourceQueue)
            .put(OMSBuiltinKeys.ROUTING_DESTINATION, destinationQueue)
            .put(OMSBuiltinKeys.ROUTING_EXPRESSION, "color = 'red'");

        resourceManager.createRouting("NS://HELLO_ROUTING", routingAttr);

        //Send messages to the source queue ahead of the routing
        final Producer producer = messagingAccessPoint.createProducer();
        producer.startup();

        producer.send(producer.createBytesMessage(sourceQueue, "RED_COLOR".getBytes())
            .putUserHeaders("color", "red"));

        producer.send(producer.createBytesMessage(sourceQueue, "GREEN_COLOR".getBytes())
            .putUserHeaders("color", "green"));

        //Consume messages from the queue behind the routing.
        final PushConsumer pushConsumer = messagingAccessPoint.createPushConsumer();
        pushConsumer.startup();

        pushConsumer.attachQueue(destinationQueue, new MessageListener() {
            @Override
            public void onReceived(Message message, Context context) {
                //The message sent to the sourceQueue will be delivered to anotherConsumer by the routing rule
                //In this case, the push consumer will only receive the message with red color.
                System.out.println("Received a red message: " + message);
                context.ack();
            }

        });

        //Register a shutdown hook to close the opened endpoints.
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                producer.shutdown();
                pushConsumer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));
    }
}
  • routing用来做路由,可以通过表达式来从源队列过滤消息到目标队列,起到消息过滤的作用

小结

  • open messaging没有定义kafka的topic相关的概念,也没有consumer group的概念
  • amqp通过Exchange屏蔽了queue和topic的细节,不像JMS那样,需要producer去选择是要发到topic,还是发到queue
  • 这里open messaging虽然没有定义exchange,但是由于没有topic概念,发送都是发送到queue
  • open messaging的routing概念,跟amqp的outingKey有点类似,不过这个routing仅仅是作用于消息过滤,对消费者起作用

doc

© 著作权归作者所有

go4it
粉丝 90
博文 1126
码字总数 1057866
作品 0
深圳
私信 提问
消息队列的简单使用

/// <summary> /// 创建消息队列 /// </summary> /// <param name="name">消息队列名称</param> /// <returns></returns> public void CreateNewQueue(string name) { if (!System.Messagin......

可达鸭眉头一皱
2018/03/02
20
0
[Erlang 0079] RabbitMQ 初探

最近在项目中实践RabbitMQ,比较幸运现在除了官方网站,还有一本非常棒的书可以读:RabbitMQ in Action;这本书目前还没有中文版或者影印版,但是从网上很容易找到PDF版本和epub mobi的版本.Rabb...

唐玄奘
2017/12/04
0
0
JXcore Beta 2 发布,LLVM 的 JavaScript 前端

JXcore 是 Node.js 的一个衍生项目,用于在同一个进程内的多个隔离实例。任何单线程应用都会受益于多线程核心,而且对代码无需任何改动。JXCore 同时包含一个支持集群的消息 API,支持 andr...

oschina
2014/03/29
2.7K
1
鸟人的Android揭秘(7)——搭建Android SDK开发环境(三)

前文已经讲解了Android源代码开发环境的搭建,以及如何使用模拟器加载源代码编译结果,但在开发过程中,无论是为了更地理解代码的运行机制,还是为了找出代码的Bug,都会涉及对代码的调试问题...

鸟人部落
2016/12/21
28
0
PHPMQ

PHPMQ is an Open Source serverless messaging toolkit for PHP, giving the PHP developer ability to perform JMS operations such as sending and receiving messages on queues and top......

匿名
2008/09/11
1K
0

没有更多内容

加载失败,请刷新页面

加载更多

SIPC的保全存证变现应用才是先见之明

Facebook发起的Libra在接连退群后终于在联盟链上线前官宣成立联盟,同时Telegram公链TON在主网上线前被美国SEC要求退回私募非法所得。两个都拥有活跃用户数上亿的社交平台,一个以1000万美元...

SimpleChain
10分钟前
1
0
Node_初步了解(3)回调,作用域,上下文

本文转载于:专业的前端网站➧Node_初步了解(3)回调,作用域,上下文 1. 1 //回调:回调是异步编程最基本的方法,node.js需要按顺序执行异步逻辑的时候,一般采用后续传递的方式,将后续逻辑...

前端老手
10分钟前
1
0
好程序员Java教程分享Java的五大特点

好程序员Java教程为大家分享Java的五大特点希望对初学者有所帮助。 一、Java的(五大)特点: 1.简单性 相对于c语言来说 c语言的核心 指针(保存地址)*p Java中没有指针的概念(使用的是引用概念...

好程序员官网
12分钟前
1
0
移动端rem适配各种屏幕字体

在页面中引入这个js文件,可以实现各个屏幕的字体自适应: (function (doc, win) { var docEl = doc.documentElement, resizeEvt = 'orientationchange' in window ? 'orientationchange' : ......

流年那么伤
14分钟前
2
0
2019我最喜爱的绿色应用活动投票开始,谁能突出重围?

在去年第一届软件绿色联盟开发者大会上,共有36个绿色应用荣获“2018年度我最喜爱的绿色应用”奖项。活动得到了消费者、开发者与应用厂商的一致好评,刚过完十一假期就有小伙伴们后台留言,问...

软件绿色联盟
25分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部