文档章节

[RabbitMQ]12_RabbitMQ学习之ConntectionFactory与Conntection的认知

morpheusWB
 morpheusWB
发布于 2017/09/05 21:53
字数 985
阅读 4
收藏 0

在发送和接收消息重要的类有:ConnectionFactory, Connection,Channel和 QueueingConsumer。

ConntectionFactory类是方便创建与AMQP代理相关联的Connection;下面来看看ConntectionFactory是如何创建一个Contention.首先通过new ConnectionFactory()创建一个ConnectionFactory;并设置此连接工厂的主机设置为broke IP。通过ConnectionFactory的newConnection()方法 创建一个Connection; newConnection方法通过得到当前连接的地址及端口号来获得一个Address,通过createFrameHandler的方法 来得到FrameHandler;再通过new AMQConnection(this, frameHandler)来得到Connection并启动。
    代码清单1创建Connection的源码(ConnectionFactory.Java中)

 

[java] view plain copy

 print?

  1. protected FrameHandler createFrameHandler(Address addr)  
  2.         throws IOException {  
  3.   
  4.         String hostName = addr.getHost();//获取主机IP  
  5.         int portNumber = portOrDefault(addr.getPort());//获取端口  
  6.         Socket socket = null;  
  7.         try {  
  8.             socket = factory.createSocket();  
  9.             configureSocket(socket);  
  10.             socket.connect(new InetSocketAddress(hostName, portNumber),  
  11.                     connectionTimeout);  
  12.             return createFrameHandler(socket);  
  13.         } catch (IOException ioe) {  
  14.             quietTrySocketClose(socket);  
  15.             throw ioe;  
  16.         }  
  17.     }  
  18.   
  19.     private static void quietTrySocketClose(Socket socket) {  
  20.         if (socket != null)  
  21.             try { socket.close(); } catch (Exception _) {/*ignore exceptions*/}  
  22.     }  
  23.   
  24.     protected FrameHandler createFrameHandler(Socket sock)  
  25.         throws IOException  
  26.     {  
  27.         return new SocketFrameHandler(sock);  
  28.     }  
  29.   
  30.     /** 
  31.      *  Provides a hook to insert custom configuration of the sockets 
  32.      *  used to connect to an AMQP server before they connect. 
  33.      * 
  34.      *  The default behaviour of this method is to disable Nagle's 
  35.      *  algorithm to get more consistently low latency.  However it 
  36.      *  may be overridden freely and there is no requirement to retain 
  37.      *  this behaviour. 
  38.      * 
  39.      *  @param socket The socket that is to be used for the Connection 
  40.      */  
  41.     protected void configureSocket(Socket socket) throws IOException{  
  42.         // disable Nagle's algorithm, for more consistently low latency  
  43.         socket.setTcpNoDelay(true);  
  44.     }  
  45.   
  46.     /** 
  47.      * Create a new broker connection 
  48.      * @param addrs an array of known broker addresses (hostname/port pairs) to try in order 
  49.      * @return an interface to the connection 
  50.      * @throws IOException if it encounters a problem 
  51.      */  
  52.     public Connection newConnection(Address[] addrs) throws IOException {  
  53.         return newConnection(null, addrs);  
  54.     }  
  55.   
  56.     /** 
  57.      * Create a new broker connection 
  58.      * @param executor thread execution service for consumers on the connection 
  59.      * @param addrs an array of known broker addresses (hostname/port pairs) to try in order 
  60.      * @return an interface to the connection 
  61.      * @throws IOException if it encounters a problem 
  62.      */  
  63.     public Connection newConnection(ExecutorService executor, Address[] addrs)  
  64.         throws IOException  
  65.     {  
  66.         IOException lastException = null;  
  67.         for (Address addr : addrs) {  
  68.             try {  
  69.                 FrameHandler frameHandler = createFrameHandler(addr);  
  70.                 AMQConnection conn =  
  71.                     new AMQConnection(username,  
  72.                                       password,  
  73.                                       frameHandler,  
  74.                                       executor,  
  75.                                       virtualHost,  
  76.                                       getClientProperties(),  
  77.                                       requestedFrameMax,  
  78.                                       requestedChannelMax,  
  79.                                       requestedHeartbeat,  
  80.                                       saslConfig);  
  81.                 conn.start();  
  82.                 return conn;  
  83.             } catch (IOException e) {  
  84.                 lastException = e;  
  85.             }  
  86.         }  
  87.   
  88.         throw (lastException != null) ? lastException  
  89.                                       : new IOException("failed to connect");  
  90.     }  
  91.   
  92.     /** 
  93.      * Create a new broker connection 
  94.      * @return an interface to the connection 
  95.      * @throws IOException if it encounters a problem 
  96.      */  
  97.     public Connection newConnection() throws IOException {  
  98.         return newConnection(null,  
  99.                              new Address[] {new Address(getHost(), getPort())}  
  100.                             );  
  101.     }  
  102.   
  103.     /** 
  104.      * Create a new broker connection 
  105.      * @param executor thread execution service for consumers on the connection 
  106.      * @return an interface to the connection 
  107.      * @throws IOException if it encounters a problem 
  108.      */  
  109.     public Connection newConnection(ExecutorService executor) throws IOException {  
  110.         return newConnection(executor,  
  111.                              new Address[] {new Address(getHost(), getPort())}  
  112.                             );  
  113.     }  

  代码清单2 连接启动

 

 

[java] view plain copy

 print?

  1. /** 
  2.      * Start up the connection, including the MainLoop thread. 
  3.      * Sends the protocol 
  4.      * version negotiation header, and runs through 
  5.      * Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then 
  6.      * calls Connection.Open and waits for the OpenOk. Sets heart-beat 
  7.      * and frame max values after tuning has taken place. 
  8.      * @throws IOException if an error is encountered 
  9.      * either before, or during, protocol negotiation; 
  10.      * sub-classes {@link ProtocolVersionMismatchException} and 
  11.      * {@link PossibleAuthenticationFailureException} will be thrown in the 
  12.      * corresponding circumstances. If an exception is thrown, connection 
  13.      * resources allocated can all be garbage collected when the connection 
  14.      * object is no longer referenced. 
  15.      */  
  16.     public void start()  
  17.         throws IOException  
  18.     {  
  19.         this._running = true;  
  20.         // Make sure that the first thing we do is to send the header,  
  21.         // which should cause any socket errors to show up for us, rather  
  22.         // than risking them pop out in the MainLoop  
  23.         AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =  
  24.             new AMQChannel.SimpleBlockingRpcContinuation();  
  25.         // We enqueue an RPC continuation here without sending an RPC  
  26.         // request, since the protocol specifies that after sending  
  27.         // the version negotiation header, the client (connection  
  28.         // initiator) is to wait for a connection.start method to  
  29.         // arrive.  
  30.         _channel0.enqueueRpc(connStartBlocker);  
  31.         try {  
  32.             // The following two lines are akin to AMQChannel's  
  33.             // transmit() method for this pseudo-RPC.  
  34.             _frameHandler.setTimeout(HANDSHAKE_TIMEOUT);  
  35.             _frameHandler.sendHeader();  
  36.         } catch (IOException ioe) {  
  37.             _frameHandler.close();  
  38.             throw ioe;  
  39.         }  
  40.   
  41.         // start the main loop going  
  42.         new MainLoop("AMQP Connection " + getHostAddress() + ":" + getPort()).start();  
  43.         // after this point clear-up of MainLoop is triggered by closing the frameHandler.  
  44.   
  45.         AMQP.Connection.Start connStart = null;  
  46.         AMQP.Connection.Tune connTune = null;  
  47.         try {  
  48.             connStart =  
  49.                 (AMQP.Connection.Start) connStartBlocker.getReply().getMethod();  
  50.   
  51.             _serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());  
  52.   
  53.             Version serverVersion =  
  54.                 new Version(connStart.getVersionMajor(),  
  55.                             connStart.getVersionMinor());  
  56.   
  57.             if (!Version.checkVersion(clientVersion, serverVersion)) {  
  58.                 throw new ProtocolVersionMismatchException(clientVersion,  
  59.                                                            serverVersion);  
  60.             }  
  61.   
  62.             String[] mechanisms = connStart.getMechanisms().toString().split(" ");  
  63.             SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);  
  64.             if (sm == null) {  
  65.                 throw new IOException("No compatible authentication mechanism found - " +  
  66.                         "server offered [" + connStart.getMechanisms() + "]");  
  67.             }  
  68.   
  69.             LongString challenge = null;  
  70.             LongString response = sm.handleChallenge(null, this.username, this.password);  
  71.   
  72.             do {  
  73.                 Method method = (challenge == null)  
  74.                     ? new AMQP.Connection.StartOk.Builder()  
  75.                                     .clientProperties(_clientProperties)  
  76.                                     .mechanism(sm.getName())  
  77.                                     .response(response)  
  78.                           .build()  
  79.                     : new AMQP.Connection.SecureOk.Builder().response(response).build();  
  80.   
  81.                 try {  
  82.                     Method serverResponse = _channel0.rpc(method).getMethod();  
  83.                     if (serverResponse instanceof AMQP.Connection.Tune) {  
  84.                         connTune = (AMQP.Connection.Tune) serverResponse;  
  85.                     } else {  
  86.                         challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();  
  87.                         response = sm.handleChallenge(challenge, this.username, this.password);  
  88.                     }  
  89.                 } catch (ShutdownSignalException e) {  
  90.                     throw new PossibleAuthenticationFailureException(e);  
  91.                 }  
  92.             } while (connTune == null);  
  93.         } catch (ShutdownSignalException sse) {  
  94.             _frameHandler.close();  
  95.             throw AMQChannel.wrap(sse);  
  96.         } catch(IOException ioe) {  
  97.             _frameHandler.close();  
  98.             throw ioe;  
  99.         }  
  100.   
  101.         try {  
  102.             int channelMax =  
  103.                 negotiatedMaxValue(this.requestedChannelMax,  
  104.                                    connTune.getChannelMax());  
  105.             _channelManager = new ChannelManager(this._workService, channelMax);  
  106.   
  107.             int frameMax =  
  108.                 negotiatedMaxValue(this.requestedFrameMax,  
  109.                                    connTune.getFrameMax());  
  110.             this._frameMax = frameMax;  
  111.   
  112.             int heartbeat =  
  113.                 negotiatedMaxValue(this.requestedHeartbeat,  
  114.                                    connTune.getHeartbeat());  
  115.   
  116.             setHeartbeat(heartbeat);  
  117.   
  118.             _channel0.transmit(new AMQP.Connection.TuneOk.Builder()  
  119.                                 .channelMax(channelMax)  
  120.                                 .frameMax(frameMax)  
  121.                                 .heartbeat(heartbeat)  
  122.                               .build());  
  123.             _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()  
  124.                                       .virtualHost(_virtualHost)  
  125.                                     .build());  
  126.         } catch (IOException ioe) {  
  127.             _heartbeatSender.shutdown();  
  128.             _frameHandler.close();  
  129.             throw ioe;  
  130.         } catch (ShutdownSignalException sse) {  
  131.             _heartbeatSender.shutdown();  
  132.             _frameHandler.close();  
  133.             throw AMQChannel.wrap(sse);  
  134.         }  
  135.   
  136.         // We can now respond to errors having finished tailoring the connection  
  137.         this._inConnectionNegotiation = false;  
  138.   
  139.         return;  
  140.     }  


转载:http://wubin850219.iteye.com/blog/1007984

本文转载自:http://blog.csdn.net/zhu_tianwei/article/details/40889101

morpheusWB
粉丝 29
博文 93
码字总数 14941
作品 0
西安
程序员
私信 提问
RabbitMQ管理插件的安装

先安装rabbitmq-server这里就不写了,之前有篇文章里有相关步骤:RabbitMQ的安装与配置 如果/etc/rabbitmq不存在会报如下错误: Error: {cannotwriteenabledpluginsfile,”/etc/rabbitmq/ena...

openthings
2015/05/21
3.5K
0
RabbitMQ 3.6.12 RC2 发布,AMQP 消息服务器

RabbitMQ 3.6.12 RC2 发布了。RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继承了这...

淡漠悠然
2017/09/01
622
4
我的RabbitMQ的学习成果

背景 在研发分布式事务的最终一致性事务模式时,使用了RabbitMQ。 在这之前也接触过RabbitMQ,但没有特别深入的去了解它的特性与原理。这次决定系统的学习一次,所以业余时间阅读大神们的书籍...

XuePeng77
04/15
264
0
RabbitMQ 3.7.9 发布,Erlang 的 AMQP 开源实现

RabbitMQ 3.7.9 已发布,这是一个维护版本,主要聚焦于修复 bug 和可用性改进。 兼容性说明 此版本与早期的 3.7.x 没有已知的不兼容性。 升级至 Erlang 21.0 升级该版本会同时将 Erlang 更新...

淡漠悠然
2018/11/16
1K
1
在CentOS上安装rabbitmq

转自:http://flyingdutchman.iteye.com/blog/1887283 这文章写得很好,除了安装软件编译时间比较长之外,安装这个几乎没出现什么错误。现在去配置下rabbitmq,马上就可以使用了。 在本节中我...

mac_zhao
2014/09/28
207
0

没有更多内容

加载失败,请刷新页面

加载更多

Android面试常客之Handler全解

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/fnhfire_7030/article/details/79518819 前言:又到了一年...

shzwork
8分钟前
0
0
position sticky 定位

本文转载于:专业的前端网站➫position sticky 定位 1、兼容性 https://caniuse.com/#search=sticky chrome、ios和firefox兼容性良好。 2、使用场景 sticky:粘性。粘性布局。 在屏幕范围内时...

前端老手
14分钟前
1
0
CentOS 7 yum 安装 PHP7.3 教程

参考:https://www.mf8.biz/centos-rhel-install-php7-3/ 1、首先安装 EPEL 源: yum install epel-release 安装 REMI 源: yum install http://rpms.remirepo.net/enterprise/remi-release......

dragon_tech
29分钟前
1
0
Linux物理网卡聚合及桥接

Linux内部实现的bridge可以把一台机器上的多张网卡桥接起来,从而把自己作为一台交换机。同时,LInux bridge还支持虚拟端口,即桥接的不一定都是物理网卡接口,还可以是虚拟接口。目前主要表...

xiangyunyan
30分钟前
1
0
一起来学Java8(一)——函数式编程

在这篇文章中,我们将了解到在Java8下如何进行函数式编程。 函数式编程 所谓的函数式编程就是把函数名字当做值进行传递,然后接收方拿到这个函数名进行调用。 首先来看下JavaScript如何进行函...

猿敲月下码
今天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部