RabbitMQ(5)-使用场景及简单实现Hello World!
RabbitMQ(5)-使用场景及简单实现Hello World!
你我他有个梦 发表于2年前
RabbitMQ(5)-使用场景及简单实现Hello World!
  • 发表于 2年前
  • 阅读 324
  • 收藏 3
  • 点赞 0
  • 评论 0

移动开发云端新模式探索实践 >>>   

摘要: 讲解其6种应用场景,采用java实现,由于RabbitMQ传输二进制,所以传输文件、图片都可以,只不过需要自己实现,没有现成的API

一.单发送单接收

1.使用场景:

简单的发送与接收,没有特别的处理。

2.producer:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class OneProducer {
    
  private final static String QUEUE_NAME = "queue_one";

  public static void main(String[] argv) throws Exception {
                
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("liuzhaoqiang129");
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setPort(5672);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    /**
     * 参数:
      queue the name of the queue
      durable true if we are declaring a durable queue (the queue will survive a server restart)
      exclusive true if we are declaring an exclusive queue (restricted to this connection)
      autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
      arguments other properties (construction arguments) for the queue
   */
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    String message = "Hello World!";
    /**
     * 参数:
      exchange the exchange to publish the message to
      routingKey the routing key
      props other properties for the message - routing headers etc
      body the message body
     */
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    
    channel.close();
    connection.close();
  }
}

3.consumer:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.util.HashMap;
import java.util.Map;

public class OneConsumer {
       private final static String QUEUE_NAME = "queue_one";

       public static void main(String[] argv) throws Exception {
      //连接服务器
       ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("liuzhaoqiang128");
      factory.setUsername("admin");
      factory.setPassword("admin");
      factory.setPort(5672);
       Connection connection = factory.newConnection();
      //建立consumer与服务器之间的channel
       Channel channel = connection.createChannel();
      //定义queue
       channel.queueDeclare(QUEUE_NAME, true, false, false, null);
       System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
       
       QueueingConsumer consumer = new QueueingConsumer(channel);
      //设置consumer的优先级,数字越大代表接收消息的优先级越高,
      // 如果不设置优先级,发送多条消息时,cosumer端启动时间越长的接收消息的优先级越高
      Map<String, Object> args = new HashMap<String, Object>();
      args.put("x-priority", 10);
       /**
        * 参数:
         queue the name of the queue
         autoAck true if the server should consider messages acknowledged once delivered; false if the server should expect explicit acknowledgements
         callback an interface to the consumer object
        */

       channel.basicConsume(QUEUE_NAME, true, args,consumer);
       
       while (true) {
         QueueingConsumer.Delivery delivery = consumer.nextDelivery();
         String message = new String(delivery.getBody());
         System.out.println(" [x] Received '" + message + "'");
       }
     }
}

二.单发送多接收

1.使用场景:

一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。

2.producer:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class Producer {
  
  private static final String QUEUE_NAME = "queue_one";

  public static void main(String[] argv) throws Exception {
    //连接服务器
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("liuzhaoqiang129");
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setPort(5672);
    Connection connection = factory.newConnection();
    //创建producer与服务器之间的channel
    Channel channel = connection.createChannel();
    //声明queue
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    channel.confirmSelect().protocolMethodName();
    System.out.println("confirm发送确认消息:"+channel.confirmSelect().protocolMethodId());
    for(int i=0;i<10;i++){
       String message = getMessage(argv)+i;
       //消息持久化
       channel.basicPublish( "", QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes());
       System.out.println(" [x] Sent '" + message + "'");
    }
    Thread.sleep(100000);
    channel.close();
    connection.close();
  }
    
  private static String getMessage(String[] strings){
    if (strings.length < 1)
      return "Hello World!";
    return joinStrings(strings, " ");
  }  
  
  private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
      words.append(delimiter).append(strings[i]);
    }
    return words.toString();
  }
}
发送端和场景1不同点:

1、使Queue的durable的属性为true,即使消息队列durable
2、使用MessageProperties.PERSISTENT_TEXT_PLAIN使消息durable

3.consumer:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Consumer {

   private static final String TASK_QUEUE_NAME = "queue_one";

   public static void main(String[] argv) throws Exception {

      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("liuzhaoqiang129");
      factory.setUsername("admin");
      factory.setPassword("admin");
      factory.setPort(5672);
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();

      channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
      System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

      channel.basicQos(1);

      QueueingConsumer consumer = new QueueingConsumer(channel);
      channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

      while (true) {
         QueueingConsumer.Delivery delivery = consumer.nextDelivery();
         String message = new String(delivery.getBody());

         System.out.println(" [x] Received '" + message + "'");
         doWork(message);
         System.out.println(" [x] Done");

         channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
      }
   }

   private static void doWork(String task) throws InterruptedException {
      for (char ch : task.toCharArray()) {
         if (ch == '.')
            Thread.sleep(1000);
      }
   }
}
接收端和场景1不同点:
1、使用TASK_QUEUE_NAME声明消息队列,并使消息队列durable
2、在使用channel.basicConsume接收消息时使autoAck为false,即不自动会发ack,由channel.basicAck()在消息处理完成后发送消息。
3、使用了channel.basicQos(1)保证在接收端一个消息没有处理完时不会接收另一个消息,即接收端发送了ack后才会接收下一个消息。在这种情况下发送端会尝试把消息发送给下一个not busy的接收端。

三.publish/subscribe

1.使用场景:

发布订阅模式,发送端发送的是广播消息,多个接收端接收

2.producer:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class ProducerPublish {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("liuzhaoqiang128");
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setPort(5672);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

    String message = getMessage(argv);

    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }
  
  private static String getMessage(String[] strings){
    if (strings.length < 1)
            return "info: Hello World!";
    return joinStrings(strings, " ");
  }
  
  private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
  }
}

发送消息到一个名为“logs”的exchange上,使用“fanout”方式发送,即广播消息,不需要使用queue,发送端不需要关心谁接收。

3.consumer:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ConsumerSubcribe {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("liuzhaoqiang128");
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setPort(5672);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");
    
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println(" [x] Received '" + message + "'");
    }
  }
}
1、声明名为“logs”的exchange的,方式为"fanout",和发送端一样。
2、channel.queueDeclare().getQueue();该语句得到一个随机名称的Queue,该queue的类型为non-durable、exclusive、auto-delete的,将该queue绑定到上面的exchange上接收消息。
3、注意binding queue的时候,channel.queueBind()的第三个参数Routing key为空,即所有的消息都接收。如果这个值不为空,在exchange type为“fanout”方式下该值被忽略!

四.Routing

1.使用场景:

发送端按routing key发送消息,不同的接收端按不同的routing key接收消息

2.producer:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class ProducerRouting {

  private static final String EXCHANGE_NAME = "my-mq-exchange";
  private static final String ROUTING_KEY = "queue_one_key";
  public static void main(String[] argv) throws Exception {
    //连接服务器
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("liuzhaoqiang129");
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setPort(5672);
    Connection connection = factory.newConnection();
    //创建producer与服务器之间的channel
    Channel channel = connection.createChannel();
    //声明exchange与type
    channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);

    String message = "Hello World!";
    //为指定的rouking key发布消息
    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
    System.out.println(" [x] Sent '" + ROUTING_KEY + "':'" + message + "'");

    channel.close();
    connection.close();
  }
}

发送端和场景3的区别:
1、exchange的type为direct
2、发送消息的时候加入了routing key

3.consumer:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ConsumerRouting {

  private static final String EXCHANGE_NAME = "my-mq-exchange";
  private static final String ROUTING_KEY = "queue_one_key";
  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("liuzhaoqiang128");
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setPort(5672);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);
    String queueName = channel.queueDeclare().getQueue();
    System.out.println("queueName....."+queueName);
    channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);
    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      String routingKey = delivery.getEnvelope().getRoutingKey();

      System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
    }
  }
}
接收端和场景3的区别:
在绑定queue和exchange的时候使用了routing key,即从该exchange上只接收routing key指定的消息。

五.Topics

1.使用场景:

发送端不只按固定的routing key发送消息,而是按字符串匹配发送,接收端同样如此。

2.producer:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class ProducerTopic {

  private static final String EXCHANGE_NAME = "myexchange1";
  private static final String ROUTING_KEY = "foo.*";

  public static void main(String[] argv) {
    Connection connection = null;
    Channel channel = null;
    try {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("liuzhaoqiang129");
      factory.setUsername("admin");
      factory.setPassword("admin");
      factory.setPort(5672);
  
      connection = factory.newConnection();
      channel = connection.createChannel();

      channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);
      String message = getMessage(argv);

      channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
      System.out.println(" [x] Sent '" + ROUTING_KEY + "':'" + message + "'");

    }
    catch  (Exception e) {
      e.printStackTrace();
    }
    finally {
      if (connection != null) {
        try {
          connection.close();
        }
        catch (Exception ignore) {}
      }
    }
  }
  
  private static String getRouting(String[] strings){
    if (strings.length < 1)
            return "anonymous.info";
    return strings[0];
  }

  private static String getMessage(String[] strings){ 
    if (strings.length < 2)
            return "Hello World!";
    return joinStrings(strings, " ", 1);
  }
  
  private static String joinStrings(String[] strings, String delimiter, int startIndex) {
    int length = strings.length;
    if (length == 0 ) return "";
    if (length < startIndex ) return "";
    StringBuilder words = new StringBuilder(strings[startIndex]);
    for (int i = startIndex + 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
  }
}

发送端和场景4的区别:
1、exchange的type为topic
2、发送消息的routing key不是固定的单词,而是匹配字符串,如"*.lu.#",*匹配一个单词,#匹配0个或多个单词。

3.consumer:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ConsumerTopic {

   private static final String EXCHANGE_NAME = "myexchange1";
   private static final String QUEUE_NAME = "queue_one";
   private static final String ROUTING_KEY = "foo.queue";

   public static void main(String[] argv) {
      Connection connection = null;
      Channel channel = null;
      try {
         ConnectionFactory factory = new ConnectionFactory();
         factory.setHost("liuzhaoqiang128");
         factory.setUsername("admin");
         factory.setPassword("admin");
         factory.setPort(5672);

         connection = factory.newConnection();
         channel = connection.createChannel();
         channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);

         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

         QueueingConsumer consumer = new QueueingConsumer(channel);
         channel.basicConsume(QUEUE_NAME, true, consumer);
         while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
         }
      }
      catch  (Exception e) {
         e.printStackTrace();
      }
      finally {
         if (connection != null) {
            try {
               connection.close();
            }
            catch (Exception ignore) {}
         }
      }
   }
}
接收端和场景4的区别:
1、exchange的type为topic
2、接收消息的routing key不是固定的单词,而是匹配字符串。

六.RPC


1.使用场景:

如果想要在远程机器上获取执行结果,可用RPC

2.执行流程:

1、首先客户端发送一个reply_tocorrention_id的请求,发布到RPC队列中;

2、服务器端处理这个请求,并把处理结果发布到一个回调Queue,Queue的名称应当与reply_to的名称一致

3、客户端从回调Queue中得到先前corrention_id设定的值的处理结果。如果碰到和先前不一样的corrention_id的值,将会忽略而不是抛出异常。

3.常用属性:

  delivery_mode 标记消息是持久性消息还是瞬态信息。在前面的“Work Queue”中我们已经提到过;   

  content_type 用来描述MIME的类型。如把其类型设定为JSON

  reply_to 用于命名一个回调Queue

  correlation_id 用于与相关联的请求的RPC响应.

4.Server:

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Server
{
    private Connection connection;
    private Channel channel;
    private QueueingConsumer consumer;

    public Server Server(){
        return this;
    }

    public Server init()
            throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("liuzhaoqiang128");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);
        connection = factory.newConnection();
        channel = connection.createChannel();

        channel.exchangeDeclare("rpc", "direct");
        channel.queueDeclare("ping", false, false, false, null);
        channel.queueBind("ping", "rpc", "ping");

        consumer = new QueueingConsumer(channel);
        channel.basicConsume("ping", false, "ping", consumer);

        System.out.println(
                "Waiting for RPC calls..."
        );

        return this;
    }

    public void closeConnection() {
        if (connection != null) {
            try {
                connection.close();
            }
            catch (Exception ignore) {}
        }
    }

    public void serveRequests() {
        while (true) {
            try {

                Delivery delivery = consumer.nextDelivery();
                BasicProperties props = delivery.getProperties();

                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                System.out.println(
                        "Received API call...replying..."
                );

                channel.basicPublish(
                        "",
                        props.getReplyTo(),
                        null,
                        getResponse(delivery).getBytes("UTF-8")
                );

            } catch (Exception e){
                System.out.println(e.toString());
            }
        }
    }

    private String getResponse(Delivery delivery) {
        String response = null;
        try {
            String message = new String(delivery.getBody(), "UTF-8");
            response = "Pong!" +  JSON.parseObject(message).getString("time");
        }
        catch (Exception e){
            System.out.println(e.toString());
            response = "";
        }
        return response;
    }

    public static void main(String[] args) {
        Server server = null;
        try {
            server = new Server();
            server.init().serveRequests();
        } catch(Exception e) {
            e.printStackTrace();
        } finally {
            if(server != null) {
                server.closeConnection();
            }
        }
    }
}

5.Client:

import com.rabbitmq.client.*;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;
import org.json.JSONStringer;
import org.json.JSONException;

public class Client {

    private Connection connection;
    private Channel channel;
    private String replyQueueName;
    private QueueingConsumer consumer;

    public Client init()
            throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("liuzhaoqiang128");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);
        connection = factory.newConnection();
        channel = connection.createChannel();
        return this;
    }

    public Client setupConsumer()
            throws Exception {
        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, false, consumer);
        return this;
    }

    public String call(String message) throws Exception {
        String response = null;
        channel.basicPublish(
                "rpc",
                "ping",
                getRequestProperties(),
                message.getBytes()
        );

        System.out.println("Sent 'ping' RPC call. Waiting for reply...");

        while (true) {
            Delivery delivery = consumer.nextDelivery();
            response = new String(delivery.getBody(), "UTF-8");
            break;
        }

        return response;
    }

    public void close() throws Exception {
        connection.close();
    }

    private BasicProperties
    getRequestProperties() {
        return new BasicProperties
                .Builder()
                .replyTo(replyQueueName)
                .build();
    }

    public static String createRequest()
            throws JSONException {
        float epoch = System.currentTimeMillis()/1000;
        JSONObject object = new JSONObject();
        object.put("client_name","RPC Client 1.0");
        object.put("time",Float.toString(epoch));
        return JSON.toJSONString(object);
    }

    public static void main(String[] args) {
        Client client = null;
        String response = null;

        try {
            client = new Client();
            client.init().setupConsumer();
            response = client.call(Client.createRequest());
            System.out.println("RPC Reply --- " + response);
        }
        catch  (Exception e) {
            e.printStackTrace();
        }
        finally {
            if (client!= null) {
                try {
                    client.close();
                }
                catch (Exception ignore) {}
            }
        }
    }
}



  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
你我他有个梦
粉丝 92
博文 110
码字总数 98858
×
你我他有个梦
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: