Thrift不同服务类型的使用探索
Thrift不同服务类型的使用探索
王孟君 发表于4个月前
Thrift不同服务类型的使用探索
  • 发表于 4个月前
  • 阅读 1767
  • 收藏 70
  • 点赞 4
  • 评论 2

腾讯云 十分钟定制你的第一个小程序>>>   

Thrift是一个软件框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 等等编程语言间无缝结合的、高效的服务。

Thrift最初由facebook开发,07年四月开放源码,08年5月进入apache孵化器。thrift允许你定义一个简单的定义文件中的数据类型和服务接口。以作为输入文件,编译器生成代码用来方便地生成RPC客户端和服务器通信的无缝跨编程语言。

本篇博文编写的目的是对Thrfit不同的服务类型进行整理,并结合代码示例进行说明~

目标

本篇博文编写的目的是对Thrfit不同的服务类型进行整理,并结合代码示例进行说明~

博文主要包含如下几个部分:

1. 实例代码准备

2. 对不同的服务类型进行介绍说明,并给出示例

3. 异步客户端调用实例

4. Nifty库的使用,包含服务端和客户端代码示例

实例

说明

在这个示例中,我们主要在用户接口中定义三个接口:保存用户,根据name获取用户列表以及删除用户

如:  

/**
     * 保存用户
     * 
     * @param user
     */
    public boolean save(com.xxx.tutorial.thrift.entity.User user) throws org.apache.thrift.TException;

    /**
     * 根据name获取用户列表
     * 
     * @param name
     */
    public java.util.List<com.xxx.tutorial.thrift.entity.User> findUsersByName(java.lang.String name) throws org.apache.thrift.TException;

    /**
     * 删除用户
     * 
     * @param userId
     */
    public void deleteByUserId(int userId) throws com.xxx.tutorial.thrift.exception.UserNotFoundException, org.apache.thrift.TException;

编写thrift文件

  • user.thrift用于定义用户类
namespace java com.xxx.tutorial.thrift.entity  

/**
 * 用户类
 */
struct  User {  
  1:i32 userId,
  2:string name
}  
  • exception.thrift用于自定义异常
namespace java com.xxx.tutorial.thrift.exception  

exception UserNotFoundException {
	1: string code;
	2: string message;
}
  • userService.thrift用于定义用户服务接口
include "user.thrift"
include "exception.thrift"

namespace java com.xxx.tutorial.thrift.service  

/**
 * 用户服务
 */
service  UserService {   

  /**保存用户*/ 
  bool save(1:user.User user),
  
  /**根据name获取用户列表*/ 
  list<user.User> findUsersByName(1:string name),
  
  /**删除用户*/ 
  void deleteByUserId(1:i32 userId) throws (1: exception.UserNotFoundException e)
}  

产生代码

根据thrift文件生成Java代码,这里就不再描述,请参考以前的博文【一步一步完成thrift Java示例

服务接口代码

将生成的Java代码放入thrift-demo-interface块~ 如,

服务实现代码

thrift-demo-service模块增加UserService的实现类~

UserServiceImpl.java的内容如下:

package com.xxx.tutorial.thrift.service.impl;

import java.util.Arrays;
import java.util.List;
import java.util.logging.Logger;

import org.apache.thrift.TException;

import com.xxx.tutorial.thrift.entity.User;
import com.xxx.tutorial.thrift.exception.UserNotFoundException;
import com.xxx.tutorial.thrift.service.UserService;

/**
 * @author wangmengjun
 *
 */
public class UserServiceImpl implements UserService.Iface {

	private static final Logger logger = Logger.getLogger(UserServiceImpl.class.getName());

	public boolean save(User user) throws TException {
		logger.info("方法save的参数user的内容==>" + user.toString());
		return true;
	}

	public List<User> findUsersByName(String name) throws TException {
		logger.info("方法findUsersByName的参数name的内容==>" + name);
		return Arrays.asList(new User(1, "Wang"), new User(2, "Mengjun"));
	}

	public void deleteByUserId(int userId) throws UserNotFoundException, TException {
		/**
		 * 直接模拟抛出异常,用于测试
		 */
		logger.info("方法deleteByUserId的参数userId的内容==>" + userId);
		throw new UserNotFoundException("1001", String.format("userId=%d的用户不存在", userId));
	}
}

经过上述几个操作,用户服务接口以及实现类的编写就完毕了。
接下来,就能够创建Thrift服务~

Thrift不同服务端类型

服务端类型

查看Thrift的TServer层次结构,我们可以看出,Thrift的服务端类型有如下几种:

服务端类型的描述如下:

  • TSimpleServer —— 单线程服务器端使用标准的阻塞式 I/O
/**
 * Simple singlethreaded server for testing.
 *
 */
public class TSimpleServer extends TServer {
... ...
}
  • TThreadPoolServer —— 多线程服务器端使用标准的阻塞式 I/O

/**
 * Server which uses Java's built in ThreadPool management to spawn off
 * a worker pool that
 *
 */
public class TThreadPoolServer extends TServer {

... ...

}
  • TNonblockingServer —— 多线程服务器端使用非阻塞式 I/O

/**
 * A nonblocking TServer implementation. This allows for fairness amongst all
 * connected clients in terms of invocations.
 *
 * This server is inherently single-threaded. If you want a limited thread pool
 * coupled with invocation-fairness, see THsHaServer.
 *
 * To use this server, you MUST use a TFramedTransport at the outermost
 * transport, otherwise this server will be unable to determine when a whole
 * method call has been read off the wire. Clients must also use TFramedTransport.
 */
public class TNonblockingServer extends AbstractNonblockingServer {

... ...

}
  • THsHaSercver —— 是TNonblockingServer的扩展, 多线程服务器端使用非阻塞式 I/O
/**
 * An extension of the TNonblockingServer to a Half-Sync/Half-Async server.
 * Like TNonblockingServer, it relies on the use of TFramedTransport.
 */
public class THsHaServer extends TNonblockingServer {

... ... 
}
  • TThreadedSelectorServer —— 多线程服务器端使用非阻塞式 I/O
TThreadedSelectorServer是对以上NonblockingServer的扩充, 其分离了Accept和Read/Write的Selector线程,
 同时引入Worker工作线程池. 它也是种Half-sync/Half-async的服务模型。
/**
 * A Half-Sync/Half-Async server with a separate pool of threads to handle
 * non-blocking I/O. Accepts are handled on a single thread, and a configurable
 * number of nonblocking selector threads manage reading and writing of client
 * connections. A synchronous worker thread pool handles processing of requests.
 * 
 * Performs better than TNonblockingServer/THsHaServer in multi-core
 * environments when the the bottleneck is CPU on the single selector thread
 * handling I/O. In addition, because the accept handling is decoupled from
 * reads/writes and invocation, the server has better ability to handle back-
 * pressure from new connections (e.g. stop accepting when busy).
 * 
 * Like TNonblockingServer, it relies on the use of TFramedTransport.
 */
public class TThreadedSelectorServer extends AbstractNonblockingServer {
... ...
}

另外,

Facebook还开源了Nifty -- 一种基于netty的thrift服务端和客户端实现。

Nifty是facebook公司开源的,基于netty的thrift服务端和客户端实现

然后使用此包就可以快速发布出基于netty的高效的服务端和客户端代码。

https://github.com/facebook/nifty

服务端创建步骤

(1)  创建一个transport对象

(2)  为transport对象创建输入输出protocol

(3)  基于输入输出protocol创建processor

(4)  等待连接请求并将之交给processor处理

如:

    try {
			
			/**
			 * 1. 创建Transport
			 */
			TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
			TServer.Args tArgs = new TServer.Args(serverTransport);
			
			/**
			 * 2. 为Transport创建Protocol
			 */
			tArgs.protocolFactory(new TBinaryProtocol.Factory());
			// tArgs.protocolFactory(new TCompactProtocol.Factory());
			// tArgs.protocolFactory(new TJSONProtocol.Factory());
			
			/**
			 * 3. 为Protocol创建Processor
			 */
			TProcessor tprocessor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());
			tArgs.processor(tprocessor);


			/**
			 * 4. 创建Server并启动
			 * 
			 * org.apache.thrift.server.TSimpleServer - 简单的单线程服务模型,一般用于测试
			 */
			TServer server = new TSimpleServer(tArgs);
			logger.info("UserService TSimpleServer start ....");
			server.serve();
			

		} catch (Exception e) {
			logger.severe("Server start error!!!" + e.getLocalizedMessage());
			e.printStackTrace();
		}

接下来,我们就一起来完成不同服务端类型的代码示例以及客户端调用实例~

TSimpleServer服务类型

服务端

package com.xxx.tutorial.thrift.server;

import java.util.logging.Logger;

import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.transport.TServerSocket;

import com.xxx.tutorial.thrift.service.UserService;
import com.xxx.tutorial.thrift.service.impl.UserServiceImpl;

/**
 * @author wangmengjun
 *
 */
public class TSimpleServerExample {

	private static final Logger logger = Logger.getLogger(TSimpleServerExample.class.getName());

	private static final int SERVER_PORT = 9123;

	public static void main(String[] args) {

		try {
			
			/**
			 * 1. 创建Transport
			 */
			TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
			TServer.Args tArgs = new TServer.Args(serverTransport);
			
			/**
			 * 2. 为Transport创建Protocol
			 */
			tArgs.protocolFactory(new TBinaryProtocol.Factory());
			// tArgs.protocolFactory(new TCompactProtocol.Factory());
			// tArgs.protocolFactory(new TJSONProtocol.Factory());
			
			/**
			 * 3. 为Protocol创建Processor
			 */
			TProcessor tprocessor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());
			tArgs.processor(tprocessor);


			/**
			 * 4. 创建Server并启动
			 * 
			 * org.apache.thrift.server.TSimpleServer - 简单的单线程服务模型,一般用于测试
			 */
			TServer server = new TSimpleServer(tArgs);
			logger.info("UserService TSimpleServer start ....");
			server.serve();
			

		} catch (Exception e) {
			logger.severe("Server start error!!!" + e.getLocalizedMessage());
			e.printStackTrace();
		}
	}
}

客户端

package com.xxx.tutorial.thrift.client;

import java.util.List;
import java.util.logging.Logger;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import com.xxx.tutorial.thrift.entity.User;
import com.xxx.tutorial.thrift.exception.UserNotFoundException;
import com.xxx.tutorial.thrift.service.UserService;

/**
 * 
 * @author wangmengjun
 *
 */
public class UserClient {

	private static final Logger logger = Logger.getLogger(UserClient.class.getName());

	public static void main(String[] args) {

		try {

			TTransport transport = new TSocket("127.0.0.1", 9123);
			TProtocol protocol = new TBinaryProtocol(transport);

			UserService.Client client = new UserService.Client(protocol);
			transport.open();

			/**
			 * 查询User列表
			 */
			List<User> users = client.findUsersByName("wang");
			logger.info("client.findUsersByName()方法結果 == >" + users);

			/**
			 * 保存User
			 */
			boolean isUserSaved = client.save(new User(101, "WMJ"));
			logger.info("user saved result == > " + isUserSaved);

			/**
			 * 删除用户
			 */
			client.deleteByUserId(1002);

			transport.close();

		} catch (TTransportException e) {
			logger.severe("TTransportException==>" + e.getLocalizedMessage());
		} catch (UserNotFoundException e) {
			logger.severe("UserNotFoundException==>" + e.getMessage());
		} catch (TException e) {
			logger.severe("TException==>" + e.getLocalizedMessage());
		}
	}
}

测试

  • 服务端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
六月 09, 2017 8:29:14 下午 com.xxx.tutorial.thrift.server.TSimpleServerExample main
信息: UserService TSimpleServer start ....
六月 09, 2017 8:29:27 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl findUsersByName
信息: 方法findUsersByName的参数name的内容==>wang
六月 09, 2017 8:29:27 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl save
信息: 方法save的参数user的内容==>User(userId:101, name:WMJ)
六月 09, 2017 8:29:27 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl deleteByUserId
信息: 方法deleteByUserId的参数userId的内容==>1002
  • 客户端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Received 1
六月 09, 2017 8:29:27 下午 com.xxx.tutorial.thrift.client.UserClient main
信息: client.findUsersByName()方法結果 == >[User(userId:1, name:Wang), User(userId:2, name:Mengjun)]
Received 2
六月 09, 2017 8:29:27 下午 com.xxx.tutorial.thrift.client.UserClient main
信息: user saved result == > true
Received 3
六月 09, 2017 8:29:27 下午 com.xxx.tutorial.thrift.client.UserClient main
严重: UserNotFoundException==>userId=1002的用户不存在

TThreadPoolServer 服务类型

服务端

package com.xxx.tutorial.thrift.server;

import java.util.logging.Logger;

import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;

import com.xxx.tutorial.thrift.service.UserService;
import com.xxx.tutorial.thrift.service.impl.UserServiceImpl;

/**
 * 
 * @author wangmengjun
 *
 */
public class TThreadPoolServerExample {

	private static final Logger logger = Logger.getLogger(TThreadPoolServerExample.class.getName());

	private static final int SERVER_PORT = 9123;

	public static void main(String[] args) {

		try {
			
			/**
			 * 1. 创建Transport
			 */
			TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
			TThreadPoolServer.Args tArgs = new TThreadPoolServer.Args(serverTransport);
			
			/**
			 * 2. 为Transport创建Protocol
			 */
			tArgs.protocolFactory(new TBinaryProtocol.Factory());
			// tArgs.protocolFactory(new TCompactProtocol.Factory());
			// tArgs.protocolFactory(new TJSONProtocol.Factory());
			
			/**
			 * 3. 为Protocol创建Processor
			 */
			TProcessor tprocessor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());
			tArgs.processor(tprocessor);


			/**
			 * 4. 创建Server并启动
			 * 
			 * org.apache.thrift.server.TThreadPoolServer
			 */
			TServer server = new TThreadPoolServer(tArgs);
			logger.info("UserService TThreadPoolServer start ....");
			server.serve();
			

		} catch (Exception e) {
			logger.severe("Server start error!!!" + e.getLocalizedMessage());
			e.printStackTrace();
		}
	}
}

客户端

客户端的代码可以和TSimpleServer中使用的Client代码一致,

package com.xxx.tutorial.thrift.client;

import java.util.List;
import java.util.logging.Logger;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import com.xxx.tutorial.thrift.entity.User;
import com.xxx.tutorial.thrift.exception.UserNotFoundException;
import com.xxx.tutorial.thrift.service.UserService;

/**
 * 
 * @author wangmengjun
 *
 */
public class UserClient {

	private static final Logger logger = Logger.getLogger(UserClient.class.getName());

	public static void main(String[] args) {

		try {

			TTransport transport = new TSocket("127.0.0.1", 9123);
			TProtocol protocol = new TBinaryProtocol(transport);

			UserService.Client client = new UserService.Client(protocol);
			transport.open();

			/**
			 * 查询User列表
			 */
			List<User> users = client.findUsersByName("wang");
			logger.info("client.findUsersByName()方法結果 == >" + users);

			/**
			 * 保存User
			 */
			boolean isUserSaved = client.save(new User(101, "WMJ"));
			logger.info("user saved result == > " + isUserSaved);

			/**
			 * 删除用户
			 */
			client.deleteByUserId(1002);

			transport.close();

		} catch (TTransportException e) {
			logger.severe("TTransportException==>" + e.getLocalizedMessage());
		} catch (UserNotFoundException e) {
			logger.severe("UserNotFoundException==>" + e.getMessage());
		} catch (TException e) {
			logger.severe("TException==>" + e.getLocalizedMessage());
		}
	}
}

测试

  • 服务端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
六月 09, 2017 8:31:44 下午 com.xxx.tutorial.thrift.server.TThreadPoolServerExample main
信息: UserService TThreadPoolServer start ....
六月 09, 2017 8:32:05 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl findUsersByName
信息: 方法findUsersByName的参数name的内容==>wang
六月 09, 2017 8:32:05 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl save
信息: 方法save的参数user的内容==>User(userId:101, name:WMJ)
六月 09, 2017 8:32:05 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl deleteByUserId
信息: 方法deleteByUserId的参数userId的内容==>1002

  • 客户端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Received 1
六月 09, 2017 8:32:05 下午 com.xxx.tutorial.thrift.client.UserClient main
信息: client.findUsersByName()方法結果 == >[User(userId:1, name:Wang), User(userId:2, name:Mengjun)]
Received 2
六月 09, 2017 8:32:05 下午 com.xxx.tutorial.thrift.client.UserClient main
信息: user saved result == > true
Received 3
六月 09, 2017 8:32:05 下午 com.xxx.tutorial.thrift.client.UserClient main
严重: UserNotFoundException==>userId=1002的用户不存在

TNonblockingServer 服务类型

服务端

package com.xxx.tutorial.thrift.server;

import java.util.logging.Logger;

import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;

import com.xxx.tutorial.thrift.service.UserService;
import com.xxx.tutorial.thrift.service.impl.UserServiceImpl;

/**
 * 
 * @author wangmengjun
 *
 */
public class TNonblockingServerExample {

	private static final Logger logger = Logger.getLogger(TNonblockingServerExample.class.getName());

	private static final int SERVER_PORT = 9123;

	public static void main(String[] args) {

		try {

			/**
			 * 1. 创建Transport
			 */
			TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(SERVER_PORT);
			TNonblockingServer.Args tArgs = new TNonblockingServer.Args(serverTransport);

			/**
			 * 2. 为Transport创建Protocol
			 */
			tArgs.transportFactory(new TFramedTransport.Factory());
			tArgs.protocolFactory(new TBinaryProtocol.Factory());
			// tArgs.protocolFactory(new TCompactProtocol.Factory());
			// tArgs.protocolFactory(new TJSONProtocol.Factory());

			/**
			 * 3. 为Protocol创建Processor
			 */
			TProcessor tprocessor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());
			tArgs.processor(tprocessor);

			/**
			 * 4. 创建Server并启动
			 */
			TServer server = new TNonblockingServer(tArgs);
			logger.info("UserService TNonblockingServer start ....");
			server.serve();

		} catch (Exception e) {
			logger.severe("Server start error!!!" + e.getLocalizedMessage());
			e.printStackTrace();
		}
	}
}

客户端

package com.xxx.tutorial.thrift.client;

import java.util.List;
import java.util.logging.Logger;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import com.xxx.tutorial.thrift.entity.User;
import com.xxx.tutorial.thrift.exception.UserNotFoundException;
import com.xxx.tutorial.thrift.service.UserService;

public class UserClient2 {

	private static final Logger logger = Logger.getLogger(UserClient.class.getName());

	public static void main(String[] args) {

		try {

			TTransport transport = new TFramedTransport(new TSocket("127.0.0.1", 9123, 3000));
			TProtocol protocol = new TBinaryProtocol(transport);

			UserService.Client client = new UserService.Client(protocol);
			transport.open();

			/**
			 * 查询User列表
			 */
			List<User> users = client.findUsersByName("wang");
			logger.info("client.findUsersByName()方法結果 == >" + users);

			/**
			 * 保存User
			 */
			boolean isUserSaved = client.save(new User(101, "WMJ"));
			logger.info("user saved result == > " + isUserSaved);

			/**
			 * 删除用户
			 */
			client.deleteByUserId(1002);

			transport.close();

		} catch (TTransportException e) {
			logger.severe("TTransportException==>" + e.getLocalizedMessage());
		} catch (UserNotFoundException e) {
			logger.severe("UserNotFoundException==>" + e.getLocalizedMessage());
		} catch (TException e) {
			logger.severe("TException==>" + e.getLocalizedMessage());
		}
	}
}

测试

  • 服务端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
六月 09, 2017 8:34:38 下午 com.xxx.tutorial.thrift.server.TNonblockingServerExample main
信息: UserService TNonblockingServer start ....
六月 09, 2017 8:34:59 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl findUsersByName
信息: 方法findUsersByName的参数name的内容==>wang
六月 09, 2017 8:34:59 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl save
信息: 方法save的参数user的内容==>User(userId:101, name:WMJ)
六月 09, 2017 8:34:59 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl deleteByUserId
信息: 方法deleteByUserId的参数userId的内容==>1002
  • 客户端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Received 1
六月 09, 2017 8:34:59 下午 com.xxx.tutorial.thrift.client.UserClient2 main
信息: client.findUsersByName()方法結果 == >[User(userId:1, name:Wang), User(userId:2, name:Mengjun)]
Received 2
六月 09, 2017 8:34:59 下午 com.xxx.tutorial.thrift.client.UserClient2 main
信息: user saved result == > true
Received 3
六月 09, 2017 8:34:59 下午 com.xxx.tutorial.thrift.client.UserClient2 main
严重: UserNotFoundException==>userId=1002的用户不存在

THsHaServer服务类型

服务端

package com.xxx.tutorial.thrift.server;

import java.util.logging.Logger;

import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;

import com.xxx.tutorial.thrift.service.UserService;
import com.xxx.tutorial.thrift.service.impl.UserServiceImpl;

/**
 * @author wangmengjun
 *
 */
public class THsHaServerExample {

	private static final Logger logger = Logger.getLogger(THsHaServerExample.class.getName());

	private static final int SERVER_PORT = 9123;

	public static void main(String[] args) {

		try {
			
			/**
			 * 1. 创建Transport
			 */
			//TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
			TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(SERVER_PORT);
			THsHaServer.Args tArgs = new THsHaServer.Args(serverTransport);
			
			/**
			 * 2. 为Transport创建Protocol
			 */
			tArgs.transportFactory(new TFramedTransport.Factory());
			tArgs.protocolFactory(new TBinaryProtocol.Factory());
			// tArgs.protocolFactory(new TCompactProtocol.Factory());
			// tArgs.protocolFactory(new TJSONProtocol.Factory());
			
			/**
			 * 3. 为Protocol创建Processor
			 */
			TProcessor tprocessor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());
			tArgs.processor(tprocessor);


			/**
			 * 4. 创建Server并启动
			 * 
			 */
			//半同步半异步的服务模型
			TServer server = new THsHaServer(tArgs);
			logger.info("UserService TSimpleServer start ....");
			server.serve();
			

		} catch (Exception e) {
			logger.severe("Server start error!!!" + e.getLocalizedMessage());
			e.printStackTrace();
		}
	}
}

客户端

package com.xxx.tutorial.thrift.client;

import java.util.List;
import java.util.logging.Logger;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import com.xxx.tutorial.thrift.entity.User;
import com.xxx.tutorial.thrift.exception.UserNotFoundException;
import com.xxx.tutorial.thrift.service.UserService;

public class UserClient2 {

	private static final Logger logger = Logger.getLogger(UserClient.class.getName());

	public static void main(String[] args) {

		try {

			TTransport transport = new TFramedTransport(new TSocket("127.0.0.1", 9123, 3000));
			TProtocol protocol = new TBinaryProtocol(transport);

			UserService.Client client = new UserService.Client(protocol);
			transport.open();

			/**
			 * 查询User列表
			 */
			List<User> users = client.findUsersByName("wang");
			logger.info("client.findUsersByName()方法結果 == >" + users);

			/**
			 * 保存User
			 */
			boolean isUserSaved = client.save(new User(101, "WMJ"));
			logger.info("user saved result == > " + isUserSaved);

			/**
			 * 删除用户
			 */
			client.deleteByUserId(1002);

			transport.close();

		} catch (TTransportException e) {
			logger.severe("TTransportException==>" + e.getLocalizedMessage());
		} catch (UserNotFoundException e) {
			logger.severe("UserNotFoundException==>" + e.getLocalizedMessage());
		} catch (TException e) {
			logger.severe("TException==>" + e.getLocalizedMessage());
		}
	}
}

测试

  • 服务端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
六月 09, 2017 8:57:26 下午 com.xxx.tutorial.thrift.server.THsHaServerExample main
信息: UserService TSimpleServer start ....
六月 09, 2017 8:57:43 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl findUsersByName
信息: 方法findUsersByName的参数name的内容==>wang
六月 09, 2017 8:57:43 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl save
信息: 方法save的参数user的内容==>User(userId:101, name:WMJ)
六月 09, 2017 8:57:43 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl deleteByUserId
信息: 方法deleteByUserId的参数userId的内容==>1002
  • 客户端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Received 1
六月 09, 2017 8:57:43 下午 com.xxx.tutorial.thrift.client.UserClient2 main
信息: client.findUsersByName()方法結果 == >[User(userId:1, name:Wang), User(userId:2, name:Mengjun)]
Received 2
六月 09, 2017 8:57:43 下午 com.xxx.tutorial.thrift.client.UserClient2 main
信息: user saved result == > true
Received 3
六月 09, 2017 8:57:43 下午 com.xxx.tutorial.thrift.client.UserClient2 main
严重: UserNotFoundException==>userId=1002的用户不存在

Nifty服务类型

Facebook还开源了Nifty -- 一种基于netty的thrift服务端和客户端实现。

Nifty是facebook公司开源的,基于netty的thrift服务端和客户端实现

使用Nifty,我们只要只要导入Nifty的jar包即可~


		<dependency>
			<groupId>com.facebook.nifty</groupId>
			<artifactId>nifty-core</artifactId>
			<version>0.10.0</version>
		</dependency>

服务端

package com.xxx.tutorial.thrift.server;

import org.apache.thrift.TProcessor;

import com.facebook.nifty.core.NettyServerTransport;
import com.facebook.nifty.core.ThriftServerDef;
import com.facebook.nifty.core.ThriftServerDefBuilder;
import com.xxx.tutorial.thrift.service.UserService;
import com.xxx.tutorial.thrift.service.impl.UserServiceImpl;

public class NiftyServer {

	public static void main(String[] args) {
		
		// Create the handler
		UserService.Iface userServiceImpl = new UserServiceImpl();

		// Create the processor
		TProcessor processor = new UserService.Processor<>(userServiceImpl);

		// Build the server definition
		ThriftServerDef serverDef = new ThriftServerDefBuilder().withProcessor(processor).listen(9123).build();

		// Create the server transport
		final NettyServerTransport server = new NettyServerTransport(serverDef);

		// Create netty boss and executor thread pools
/*		ExecutorService bossExecutor = Executors.newCachedThreadPool();
		ExecutorService workerExecutor = Executors.newCachedThreadPool();

		// Start the server
		server.start(bossExecutor, workerExecutor);*/
		System.out.println("启动~~~");
		server.start();

		// Arrange to stop the server at shutdown
		Runtime.getRuntime().addShutdownHook(new Thread() {
			@Override
			public void run() {
				try {
					server.stop();
				} catch (InterruptedException e) {
					Thread.currentThread().interrupt();
				}
			}
		});
	}
}

客户端

如下示例给出NiftyClient来完成调用,使用NiftyClient需要导入相关jar包~

		
		<dependency>
			<groupId>com.facebook.nifty</groupId>
			<artifactId>nifty-client</artifactId>
			<version>0.10.0</version>
		</dependency>
package com.xxx.tutorial.thrift.client;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.logging.Logger;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import com.facebook.nifty.client.NiftyClient;
import com.xxx.tutorial.thrift.entity.User;
import com.xxx.tutorial.thrift.service.UserService;

public class ThriftNiftyClient {

	private static final Logger logger = Logger.getLogger(ThriftNiftyClient.class.getName());

	@SuppressWarnings({ "resource" })
	public static void main(String[] args) {

		NiftyClient niftyClient = new NiftyClient();
		InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9123);
		try {
			TTransport transport = niftyClient.connectSync(address);
			TBinaryProtocol tp = new TBinaryProtocol(transport);
			UserService.Client userService = new UserService.Client(tp);
			logger.info("userService.findUsersByName 方法调用~");
			List<User> users = userService.findUsersByName("wang");
			logger.info("userService.findUsersByName 调用结果~");
			logger.info("users ==> " + users);

		} catch (TTransportException e) {
			logger.severe("TTransportException ==> " + e.getMessage());
		} catch (InterruptedException e) {
			logger.severe("InterruptedException ==> " + e.getMessage());
		} catch (TException e) {
			logger.severe("TException ==> " + e.getMessage());
		}
	}
}

测试

  • 服务端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
启动~~~
六月 09, 2017 9:01:44 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl findUsersByName
信息: 方法findUsersByName的参数name的内容==>wang
  • 客户端控制台输出
六月 09, 2017 9:01:44 下午 com.xxx.tutorial.thrift.client.ThriftNiftyClient main
信息: userService.findUsersByName 方法调用~
Received 1
六月 09, 2017 9:01:44 下午 com.xxx.tutorial.thrift.client.ThriftNiftyClient main
信息: userService.findUsersByName 调用结果~
六月 09, 2017 9:01:44 下午 com.xxx.tutorial.thrift.client.ThriftNiftyClient main
信息: users ==> [User(userId:1, name:Wang), User(userId:2, name:Mengjun)]

NiftyClient同时支持同步连接和异步连接,具体给参考Thrift Nifty~~

至此,Thrift多种服务端的编写已经完成,但是上述Client端的编写基本上采用同步来实现~

下面给出一个异步客户端的示例~

异步客户端

package com.xxx.tutorial.thrift.client;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;

import com.xxx.tutorial.thrift.entity.User;
import com.xxx.tutorial.thrift.service.UserService;

public class UserAsynClient {

	private static final Logger logger = Logger.getLogger(UserAsynClient.class.getName());

	public static void main(String[] args) {
		try {
			TAsyncClientManager clientManager = new TAsyncClientManager();
			TNonblockingTransport transport = new TNonblockingSocket("127.0.0.1", 9123, 3000);

			TProtocolFactory tprotocol = new TBinaryProtocol.Factory();

			UserService.AsyncClient asyncClient = new UserService.AsyncClient(tprotocol, clientManager, transport);

			System.out.println("Client start .....");

			CountDownLatch latch = new CountDownLatch(1);
			/*AsynCallback_FindUsersByName callBack = new AsynCallback_FindUsersByName(latch);
			asyncClient.findUsersByName("wang", callBack);*/
			
			AsynCallback_SaveUser saveUserCallBack = new AsynCallback_SaveUser(latch);
			asyncClient.save(new User(10001,"A name"), saveUserCallBack);

			System.out.println("call method findUsersByName_call .... end");

			boolean wait = latch.await(30, TimeUnit.SECONDS);

			System.out.println("latch.await =:" + wait);

		} catch (Exception e) {
			e.printStackTrace();
		}
		System.out.println("startClient end.");
	}

	public static class AsynCallback_FindUsersByName implements AsyncMethodCallback<List<User>> {

		private CountDownLatch latch;

		public AsynCallback_FindUsersByName(CountDownLatch latch) {
			this.latch = latch;
		}

		public void onComplete(List<User> response) {

			logger.info("onComplete ==> findUsersByName_call");
			try {
				logger.info("findUsersByName_call response ==> " + response.toString());
			} finally {
				latch.countDown();
			}

		}

		public void onError(Exception exception) {
			logger.severe("onError ==> " + exception.getMessage());
			latch.countDown();
		}
	}

	public static class AsynCallback_SaveUser implements AsyncMethodCallback<Boolean> {

		private CountDownLatch latch;

		public AsynCallback_SaveUser(CountDownLatch latch) {
			this.latch = latch;
		}

		public void onComplete(Boolean response) {
			logger.info("onComplete ==> save_call");
			try {
				logger.info("save_call response ==> " + response.toString());
			} finally {
				latch.countDown();
			}
		}
		
		public void onError(Exception exception) {
			logger.severe("onError ==> " + exception.getMessage());
			latch.countDown();
		}

	}
}

上述采用AsyncClient 来完成调用,使用AsyncClient 的时候,需要编写一个用于回调的线程类~

UserService.AsyncClient asyncClient = new UserService.AsyncClient(tprotocol, clientManager, transport);

使用上述给出的NiftyServer类,启动服务,然后运行UserAsynClient类,就完成一个异步客户端示例~

运行结果~

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Client start .....
call method findUsersByName_call .... end
Received 0
六月 12, 2017 10:52:44 上午 com.xxx.tutorial.thrift.client.UserAsynClient$AsynCallback_SaveUser onComplete
信息: onComplete ==> save_call
六月 12, 2017 10:52:44 上午 com.xxx.tutorial.thrift.client.UserAsynClient$AsynCallback_SaveUser onComplete
信息: save_call response ==> true
latch.await =:true
startClient end.

至此,本篇博文的目标内容就完成了~

本篇博文编写的目的是对Thrfit不同的服务类型进行整理,并结合代码示例进行说明。博文主要包含如下几个部分:

1. 实例代码准备

2. 对不同的服务类型进行介绍说明,并给出示例

3. 异步客户端调用实例

4. Nifty库的使用,包含服务端和客户端代码示例

代码下载==>【代码下载

标签: Thrift rpc Nifty Java Async
共有 人打赏支持
王孟君
粉丝 191
博文 92
码字总数 217217
评论 (2)
kerneler
最不可取之处就是 它自己的 .thrift 跟谷歌的序列化工具一样
爱吃大肉包
有个问题, netty(或者Nifty)对比TThreadedSelectorServer的最大优势是什么呢? 性能上还是?
×
王孟君
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: