文档章节

如何让Netty管理任意客户端连接而非服务端连接?

强子大叔的码田
 强子大叔的码田
发布于 2016/03/08 08:57
字数 864
阅读 1575
收藏 2

最近在做一个项目,网址见:http://git.oschina.net/qiangzigege/MySQL-Binlog

要解决的一个问题就是:假如我想发起一个连接去连接mysql,成功后,如何让这个连接比较顺畅的纳入到netty的管理范围呢?

---不多说,上代码!

1)NettyQueue.java---存放连接队列

public class NettyQueue {

// poll: 若队列为空,返回null。

// remove:若队列为空,抛出NoSuchElementException异常。

// take:若队列为空,发生阻塞,等待有元素。

 

// put---无空间会等待

// add--- 满时,立即返回,会抛出异常

// offer---满时,立即返回,不抛异常

// private static final Logger logger =

// LoggerFactory.getLogger(MonitorQueue.class);

private static final Logger logger = LogManager.getLogger(NettyQueue.class);

public static BlockingQueue<Connection> objectQueue = new LinkedBlockingQueue<Connection>();

 

public static void addObject(Connection obj) {

objectQueue.offer(obj);

new Thread(new Runnable() {// 启动一个线程去触发...

@Override

public void run() {

// TODO Auto-generated method stub

MyProperties p = MyProperties.getInstance();

if (null != p) {// 连接本机的netty服务器

String ip = "127.0.0.1";

int port = p.getNetty_port();

if (null != ip && port >= 0) {

Socket socket = null;

try {

socket = new Socket();

System.out.println("ip: " + ip + " port:" + port);

socket.connect(new InetSocketAddress(ipport), 6 * 1000);// 还是给一个连接超时6秒

TimeUtils.sleepSeconds(6);// 睡眠6秒再退出,足够的时间给netty处理连接了

catch (Exception e) {

LoggerUtils.error(loggere.toString());

finally {

if (null != socket) {// 主动关闭连接

try {

socket.close();

catch (IOException e) {

}

}

// try...catch...finally结束

// if结束

}

}

}).start();

 

};

 

public static Connection getObject() {

return objectQueue.poll();

}

 

}


2)谁往里面放连接呢?

大体的代码是:

// 4)尝试创建连接conn

Connection conn = null;

int lastIndex = taskPath.lastIndexOf("/");

String target = taskPath.substring(lastIndex + 1);

String[] elements = target.split(":");

String ip = elements[0];

int port = Integer.valueOf(elements[1]);

try {

LoggerUtils.debug(logger"target machine:" + ip + ":" + port);

conn = ConnectionFactory.makeObject(ip, port, taskData, currentPath, binlogPositionPath, fn, fp);

catch (Exception e) {

LoggerUtils.debug(loggere.toString());

conn = null;

}

// conn = null;

if (null != conn) {// 创建成功

LoggerUtils.debug(logger"create socket succeed: " + conn.getSocketChannel());

NettyQueue.addObject(conn);

}


public static Connection makeObject(String ipint port, String data, String runningPath, String binlogPositionPath,

String initialFilenamelong initialPosition) {

Connection myConn = null;

 

if (null != ip && port >= 0) {

try {

// 在这里创建具体的对象,注意这里的用法

SocketAddress sAddress = new InetSocketAddress(ipport);

SocketChannel sChannel = SocketChannel.open(sAddress);

sChannel.configureBlocking(false);// 非阻塞

myConn = new Connection(sChannel,

ConnectionAttributes.parse(data).setIpPort(ipport).setRunningZKPath(runningPath)

.setBinlogPositionZKPath(binlogPositionPath)

.setClientId(Long.parseLong(ip.replaceAll(".""") + port))

.updateBinlogNameAndPosition(initialFilenameinitialPosition));

catch (Exception e) {

LoggerUtils.error(loggere.toString());

}

}

 

// 无论如何,都返回连接,失敗則返回null

return myConn;

}




3)谁负责取出来呢?

注意我们在第一部分是触发了一个连接本地127.0.0.1的一个连接,这个实际是连接本地的Netty服务器。

需要知道Netty如何启动的

EventLoopGroup workerGroup = new NioEventLoopGroup(worker);

try {

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroupworkerGroup).channel(MyNioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG, 2048).childHandler(new ChildChannelHandler());

// 绑定端口,同步等待成功

ChannelFuture f = b.bind(port).sync();

// 等待服务端监听端口关闭

LoggerUtils.info(logger"netty server start ok.");

f.channel().closeFuture().sync();

重点就在MyNioServerSocketChannel.class

public class MyNioServerSocketChannel extends NioServerSocketChannel {

private static final Logger logger = LogManager.getLogger(MyNioServerSocketChannel.class);

 

// 继承已经有的类,用于干预连接

@Override

protected int doReadMessages(List<Object> bufthrows Exception {

// logger.debug("\ndoReadMessages(...) enter....\n触发了新的连接...开始准备2阶段提取");

// logger.debug("buf :" + buf);

// LoggerUtils.debug(logger, new Exception().toString());

// 原始部分,直接关闭

try {

SocketChannel tempCh = javaChannel().accept();

tempCh.close();

catch (Exception e) {

 

}

 

// 移花接木

Connection connection = NettyQueue.getObject();

try {

if (connection != null) {

MyNioSocketChannel channel = new MyNioSocketChannel(thisconnection.getSocketChannel(),

connection.getAttributes());

buf.add(channel);

LoggerUtils.debug(logger"db conn is as follows: " + connection.getSocketChannel());

// logger.debug("buf :" + buf);

return 1;

}

catch (Throwable t) {

LoggerUtils.info(logger"Failed to create a new channel from an accepted socket." + t);

try {

connection.close();

catch (Throwable t2) {

LoggerUtils.info(logger"Failed to close a socket." + t2);

}

}

 

return 0;

 

}

}

所以,到这里应该就知道了,用了一种诱骗式的方法,

巧妙的绕过了Netty的平常只接收主动连接自己的服务端socket的限制

使得任意客户端连接都可以纳入netty的管理范围

同时对netty框架本身不构成任何入侵性。

© 著作权归作者所有

强子大叔的码田

强子大叔的码田

粉丝 906
博文 1417
码字总数 1208545
作品 9
南京
架构师
私信 提问
徒手撸一个简单的RPC框架(2)——项目改造

徒手撸一个简单的RPC框架(2)——项目改造 在上一篇的徒手撸一个简单的RPC框架中再最后的服务器和客户端连接的时候只是简单的写了Socket连接,觉得有些不妥。正好最近学习了Netty,在平时工作...

不学无数的程序员
06/18
46
0
netty 连接的问题

我在网上搜索了下使用netty的问题,无一列外都是通过ip,端口号,连上服务端来进行通信,我在想,如果我在android上采用netty客户端模式,netty服务端包含在java web的一管理平台中。 我的目的...

天王盖地虎626
2014/06/24
375
1
netty高性能浅析

最近在看内部的rpc框架,感觉rpc最重要的就是高效的网络传输,所以学习netty的使用和原理极其重要,于是把netty的基础再总结一下。 首先我们先来看一下netty中的概念,熟悉了基本概念后再来进...

KKys
2017/08/30
0
0
如何往Netty服务端可靠实时传递消息

Netty服务端维护着几十万的客户端channel,需要往客户端channel中分发消息,需要分发的消息随机生成,目前处理办法是分发消息的线程作为一个客户端channel,连接Netty服务端,往该客户端cha...

刀塔传奇
2016/08/12
712
3
netty 可以完成如下功能吗?

我打算在android上开发一个客户端,在客户端上采用netty技术实现和管理后台交互,接受管理后台的消息推送。 不过,我的android客户端,本身有用户登录功能,有心跳功能(维持长连接),接受管...

天王盖地虎626
2014/06/27
634
2

没有更多内容

加载失败,请刷新页面

加载更多

只需一步,在Spring Boot中统一Restful API返回值格式与统一处理异常

统一返回值 在前后端分离大行其道的今天,有一个统一的返回值格式不仅能使我们的接口看起来更漂亮,而且还可以使前端可以统一处理很多东西,避免很多问题的产生。 比较通用的返回值格式如下:...

晓月寒丶
今天
58
0
区块链应用到供应链上的好处和实际案例

区块链可以解决供应链中的很多问题,例如记录以及追踪产品。那么使用区块链应用到各产品供应链上到底有什么好处?猎头悬赏平台解优人才网小编给大家做个简单的分享: 使用区块链的最突出的优...

猎头悬赏平台
今天
27
0
全世界到底有多少软件开发人员?

埃文斯数据公司(Evans Data Corporation) 2019 最新的统计数据(原文)显示,2018 年全球共有 2300 万软件开发人员,预计到 2019 年底这个数字将达到 2640万,到 2023 年达到 2770万。 而来自...

红薯
今天
61
0
Go 语言基础—— 通道(channel)

通过通信来共享内存(Java是通过共享内存来通信的) 定义 func service() string {time.Sleep(time.Millisecond * 50)return "Done"}func AsyncService() chan string {retCh := mak......

刘一草
今天
57
0
Apache Flink 零基础入门(一):基础概念解析

Apache Flink 的定义、架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行快速...

Vincent-Duan
今天
58
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部