首页 > 代码库 > thrift学习笔记

thrift学习笔记

Thrift学习笔记

一:thrift介绍

  Thrift是facebook开发的用来处理各不同系统之间数据通讯的rpc服务框架,后来成为apche的开源项目。thrift支持多种程序语言,包括Java,Python,Ruby,JavaScript,Node.js,Go,C,C++,C#,Erlang,Delphi,Perl,Php,SmallTalk,OCaml,Haxe,Haskell,D语言。Thrift采用IDL(Interface Defination Language)描述性语言来定义数据结构和接口。Thrift模型如下所示:
 
                                                  图 thrift模型图

 

二 thrift数据传输协议

TBinaryProtocol                 二进制传输协议
TCompactProtocol                使用VLQ编码进行压缩的数据传输协议
TJSONProtocol                   JSON格式的数据传输协议
TSimpleJSONProtocol             简单的JSON格式数据传输协议
TDebugProtocol                  调试时使用的文本传输协议

 

三 thrift传输层

TFramedTransport               按块的大小进行传输
TFileTransport                 按照文件的方式进行传输
TMemoryTransport               使用内存IO方式进行传输
TZlibTransport                 执行zlib压缩方式传输

四 thrift服务器端

TSimpleServer                  简单的单线程标准阻塞式服务器
TThreadPoolServer              多线程阻塞式服务器
TNonblockingServer             多线程非阻塞式服务器
THsHaServer                    半同步半异步服务器
其实传输层的传输只有阻塞和非阻塞,再加上具体的工作方式 单线程 多线程

五 thrift客户端

TClient                    简单单线程阻塞式客户端
TAsynClient                异步客户端(多线程)

六 thrift开发步骤

1服务器端

实现服务处理接口impl
创建TProcessor
创建TServerTransport(TServerSocket)   创建阻塞通信的还是非阻塞通信
创建TProtocol                                      数据传输协议
创建TServer                                       服务器类型 单工(单线程)  双工(多线程)  半单工半双工(多线程)
启动Server

2客户端

创建Transport(TSocket)               创建阻塞通信(客户端只有阻塞)
创建TProtocol                        数据传输协议
基于TTransport和TProtocol创建Client
调用Client的相应方法

七 thrift数据类型

1基本类型

bool:布尔值,true 或 false

byte:8 位有符号整数

i16:16 位有符号整数

i32:32 位有符号整数

i64:64 位有符号整数

double:64 位浮点数

string:utf-8编码的字符串

2结构体类型

struct:定义公共的对象

enum: 枚举类型

3容器类型

list:对应 Java 的 ArrayList

set:对应 Java 的 HashSet

map:对应 Java 的 HashMap

4异常类型

exception:对应 Java 的 Exception

5服务类型

service:对应服务的类  提供接口

八 thrift例子

enum 类型

 

struct Student{     1: required i32 id     2: required string username     3: required string password     4: requried string number     5: optional double age}

 

struct 类型

struct School{     1: required i32 id     2: required string name     3: required set<Student> students     4: required list<Student> rank     5: required map<string, string> number_name}

service 类型

service ThriftMysqlService{     void addUser(1:Student user)     list<Student> queryAllUser()     Student queryOneUser(1:i32 id)     map<string, string> queryOneArticle(1:i32 id)}

 

具体代码

thrift.thrift 定义数据类型和接口的文件

namespace java org.seava.thrift_example.thriftstruct User{    1: required i32 userId    2: required string username    3: required string password}service ThriftService{    void addUser(1:User user)    User queryUser(1:i32 id)    list<User> queryUserList()    map<string, string> queryUserNamePass()    map<i32, User> queryUserMap()}

到apache的官网下载thrift.exe程序, 下载地址 http://thrift.apache.org/ ,下下来通过cmd命令窗口去运行如下命令

thrift  -gen java xxx.thrift

接口实现 ThriftServiceImpl.java

 1 package org.seava.thrift_example.thrift; 2  3 import java.util.ArrayList; 4 import java.util.HashMap; 5 import java.util.List; 6 import java.util.Map; 7  8  9 public class ThriftServiceImpl implements ThriftService.Iface {10 11       public void addUser(User user) throws org.apache.thrift.TException{12           System.out.println(user.userId + "  " + user.username + "  " + user.password);13       }14 15       public User queryUser(int id) throws org.apache.thrift.TException{16           System.out.println(id);17           User user = new User();18           user.userId = 100;19           user.username = "FFF";20           user.password = "NNN";21           return user;22       }23 24       public List<User> queryUserList() throws org.apache.thrift.TException{25           User user = new User();26           user.userId = 100;27           user.username = "FFF";28           user.password = "NNN";29           User user2 = new User();30           user2.userId = 102;31           user2.username = "FFF2";32           user2.password = "NNN2";33           List<User> list = new ArrayList<User>();34           list.add(user2);35           list.add(user);36           return list;37       }38 39       public Map<String,String> queryUserNamePass() throws org.apache.thrift.TException{40           User user = new User();41           user.userId = 100;42           user.username = "FFF";43           user.password = "NNN";44           Map<String, String> map = new HashMap<String, String>();45           map.put("password", user.password);46           map.put("useranme", user.username);47           return map;48       }49 50       public Map<Integer,User> queryUserMap() throws org.apache.thrift.TException{51           User user = new User();52           user.userId = 100;53           user.username = "FFF";54           user.password = "NNN";55           User user2 = new User();56           user2.userId = 102;57           user2.username = "FFF2";58           user2.password = "NNN2";59           Map<Integer, User> map = new HashMap<Integer, User>();60           map.put(user.userId, user);61           map.put(user2.userId, user2);62           return map;63       }64 65 }

服务器 Server.java 

  1 package org.seava.thrift_example.thrift;  2   3 import org.apache.thrift.TProcessor;  4 import org.apache.thrift.protocol.TBinaryProtocol;  5 import org.apache.thrift.protocol.TCompactProtocol;  6 import org.apache.thrift.server.THsHaServer;  7 import org.apache.thrift.server.TNonblockingServer;  8 import org.apache.thrift.server.TServer;  9 import org.apache.thrift.server.TSimpleServer; 10 import org.apache.thrift.server.TThreadPoolServer; 11 import org.apache.thrift.transport.TFramedTransport; 12 import org.apache.thrift.transport.TNonblockingServerSocket; 13 import org.apache.thrift.transport.TNonblockingServerTransport; 14 import org.apache.thrift.transport.TServerSocket; 15 import org.apache.thrift.transport.TTransportException; 16  17  18 public class Server { 19      20     public static int port = 8090; 21      22     /** 23      * 简单服务器类型  阻塞单线程 24      * 步骤 25      * 创建TProcessor 26      * 创建TServerTransport 27      * 创建TProtocol 28      * 创建TServer 29      * 启动Server 30      */ 31     public static void startSimpleServer(){ 32         //创建processor 33         TProcessor tprocessor = new ThriftService.Processor<ThriftService.Iface>(new ThriftServiceImpl()); 34         try { 35             //创建transport 阻塞通信 36             TServerSocket serverTransport = new TServerSocket(port); 37             //创建protocol 38             TBinaryProtocol.Factory protocol = new TBinaryProtocol.Factory(); 39             //将processor transport protocol设入到服务器server中 40             TServer.Args args = new TServer.Args(serverTransport); 41             args.processor(tprocessor); 42             args.protocolFactory(protocol); 43             //定义服务器类型 设定参数 44             TServer server = new TSimpleServer(args); 45             //开启服务 46             server.serve(); 47         } catch (TTransportException e) { 48             e.printStackTrace(); 49         } 50     } 51      52     /** 53      * 多线程服务器   阻塞多线程 54      */ 55     public static void startThreadPoolServer(){ 56         //创建processor 57         TProcessor tprocessor = new ThriftService.Processor<ThriftService.Iface>(new ThriftServiceImpl()); 58         try{ 59             //创建transport 阻塞通信 60             TServerSocket serverTransport = new TServerSocket(port); 61             //创建protocol  数据传输协议 62             TBinaryProtocol.Factory protocol = new TBinaryProtocol.Factory(); 63             TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport); 64             args.processor(tprocessor); 65             args.protocolFactory(protocol); 66             //创建服务器类型  多线程 67             TServer server = new TThreadPoolServer(args); 68             //开启服务 69             server.serve(); 70         }catch(Exception e){ 71             e.printStackTrace(); 72         } 73     } 74      75     /** 76      * 非阻塞I/O 77      */ 78     public static void startTNonblockingServer(){ 79         //创建processor 80         TProcessor tprocessor = new ThriftService.Processor<ThriftService.Iface>(new ThriftServiceImpl()); 81         try{ 82             //创建transport 非阻塞 nonblocking 83             TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(port); 84             //创建protocol 数据传输协议 85             TCompactProtocol.Factory protocol = new TCompactProtocol.Factory(); 86             //创建transport 数据传输方式  非阻塞需要用这种方式传输 87             TFramedTransport.Factory transport = new TFramedTransport.Factory(); 88             TNonblockingServer.Args args = new TNonblockingServer.Args(serverTransport); 89             args.processor(tprocessor); 90             args.transportFactory(transport); 91             args.protocolFactory(protocol); 92             //创建服务器 类型是非阻塞 93             TServer server = new TNonblockingServer(args); 94             //开启服务 95             server.serve(); 96         }catch(Exception e){ 97             e.printStackTrace(); 98         } 99     }100     101     /**102      * 半同步半异步的非阻塞I/O103      */104     public static void startTHsHaServer(){105         //创建processor106         TProcessor tprocessor = new ThriftService.Processor<ThriftService.Iface>(new ThriftServiceImpl());107         try{108             //创建transport  非阻塞109             TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(port);110             //非阻塞需要的传输方式111             TFramedTransport.Factory transport = new TFramedTransport.Factory();112             //数据传输协议113             TCompactProtocol.Factory protocol = new TCompactProtocol.Factory();114             //创建半同步半异步服务115             THsHaServer.Args args = new THsHaServer.Args(serverTransport);116             args.processor(tprocessor);117             args.transportFactory(transport);118             args.protocolFactory(protocol);119             //创建 服务类型120             TServer server = new THsHaServer(args);121             //开启服务122             server.serve();123         }catch(Exception e){124             e.printStackTrace();125         }126     }127     128     public static void main(String args[]){129         //开启简单服务器130 //        Server.startSimpleServer();131         //开启多线程服务器132 //        Server.startThreadPoolServer();133 //        Server.startTNonblockingServer();134 //        Server.startTHsHaServer();135         Server.startTNonblockingServer();136     }137 }

Server.java实现了简单服务器(阻塞单线程)   阻塞多线程   非阻塞   半同步半异步非阻塞

注意: 非阻塞时传输层需要选择TFramedTransport           

客户端 Client.java

  1 package org.seava.thrift_example.thrift;  2   3 import java.util.List;  4 import java.util.Map;  5 import java.util.concurrent.CountDownLatch;  6 import java.util.concurrent.TimeUnit;  7   8 import org.apache.thrift.async.TAsyncClientManager;  9 import org.apache.thrift.protocol.TBinaryProtocol; 10 import org.apache.thrift.protocol.TCompactProtocol; 11 import org.apache.thrift.protocol.TProtocol; 12 import org.apache.thrift.protocol.TProtocolFactory; 13 import org.apache.thrift.transport.TFramedTransport; 14 import org.apache.thrift.transport.TNonblockingSocket; 15 import org.apache.thrift.transport.TNonblockingTransport; 16 import org.apache.thrift.transport.TSocket; 17 import org.apache.thrift.transport.TTransport; 18  19  20 public class Client implements Runnable { 21  22     public static String ip = "localhost"; 23     public static int port = 8090; 24     public static int time_out = 30000; 25      26     /** 27      * 客户端设置 28      * 创建Transport 29      * 创建TProtocol 30      * 基于TTransport和TProtocol创建Client 31      * 调用Client的相应方法 32      */ 33     public static void startSimpleClient(){ 34         TTransport transport = null; 35         try{ 36             //创建Transport 37             transport = new TSocket(ip, port, time_out); 38             //创建TProtocol 39             TProtocol protocol = new TBinaryProtocol(transport); 40             //基于TTransport和TProtocol创建Client 41             ThriftService.Client client = new ThriftService.Client(protocol); 42             transport.open(); 43             //调用client方法 44             List<User> list = client.queryUserList(); 45             for(User user : list){ 46                 System.out.println(user.userId + " " + user.username + " " + user.password); 47             } 48             Map<String, String> map = client.queryUserNamePass(); 49             System.out.println(map); 50             User user = client.queryUser(10); 51             System.out.println(user.userId + " " + user.username + " " + user.password); 52             Map<Integer, User> map_u = client.queryUserMap(); 53             System.out.println(map_u); 54             User uu = new User(); 55             uu.userId = 1111; 56             uu.username = "mmbbmmbb"; 57             uu.password = "ppbbppbb"; 58             client.addUser(uu); 59         }catch(Exception e){ 60             e.printStackTrace(); 61         } 62     } 63      64     /** 65      * 调用阻塞服务器的客户端 66      */ 67     public static void startNonblockingClient(){ 68         TTransport transport = null; 69         try{ 70             transport = new TFramedTransport(new TSocket(ip, port)); 71             TCompactProtocol protocol = new TCompactProtocol(transport); 72             ThriftService.Client client = new ThriftService.Client(protocol); 73             transport.open(); 74             //调用client方法 75             List<User> list = client.queryUserList(); 76             for(User user : list){ 77                 System.out.println(user.userId + " " + user.username + " " + user.password); 78             } 79             Map<String, String> map = client.queryUserNamePass(); 80             System.out.println(map); 81             User user = client.queryUser(10); 82             System.out.println(user.userId + " " + user.username + " " + user.password); 83             Map<Integer, User> map_u = client.queryUserMap(); 84             System.out.println(map_u); 85             User uu = new User(); 86             uu.userId = 1111; 87             uu.username = "mmbbmmbb"; 88             uu.password = "ppbbppbb"; 89             client.addUser(uu); 90         }catch(Exception e){ 91             e.printStackTrace(); 92         } 93     } 94      95     public static void startAsynClient(){ 96         try{ 97             TAsyncClientManager clientManager = new TAsyncClientManager(); 98             TNonblockingTransport transport = new TNonblockingSocket(ip, port, time_out); 99             TProtocolFactory tprotocol = new TCompactProtocol.Factory();100             ThriftService.AsyncClient asyncClient = new ThriftService.AsyncClient(tprotocol, clientManager, transport);101             System.out.println("Client start ...");102             CountDownLatch latch = new CountDownLatch(1);103             AsynCallback callBack = new AsynCallback(latch);104             System.out.println("call method queryUser start ...");105             asyncClient.queryUser(100, callBack);106             System.out.println("call method queryUser end");107             boolean wait = latch.await(30, TimeUnit.SECONDS);108             System.out.println("latch.await =:" + wait);109         }catch(Exception e){110             e.printStackTrace();111         }112     }113     114     public void run(){115         Client.startSimpleClient();116     }117     118     public static void main(String args[]){119         //调用简单服务器 120 //        Client.startSimpleClient();121         /*Client c1 = new Client();122         Client c2 = new Client();123         124         new Thread(c1).start();125         new Thread(c2).start();*/126         127 //        Client.startNonblockingClient();128 //        Client.startNonblockingClient();129         Client.startAsynClient();130     }131 }

客户端实现了 阻塞单线程  和 异步客户端

具体代码在github上: https://github.com/WaterHsu/thrift-example.git

thrift学习笔记