首页 > 代码库 > lume NG 学习笔记(九)Flune Client 开发

lume NG 学习笔记(九)Flune Client 开发

文章内容还是来自官网http://flume.apache.org/FlumeDeveloperGuide.html

由于在实际工作中,数据的生产方式极具多样性,Flume 虽然包含了一些内置的机制来采集数据,但是更多的时候用户更希望能将应用程序和flume直接相通。所以这边运行用户开发应用程序,通过IPC或者RPC连接flume并往flume发送数据。

一、RPC client interface

Flume的RpcClient实现了Flume的RPC机制。用户的应用程序可以很简单的调用Flume Client SDK的append(Event) 或者appendBatch(List<Event>) 方法发送数据,不用担心底层信息交换的细节。用户可以提供所需的event通过直接实现Event接口,例如可以使用简单的方便的实现SimpleEvent类或者使用EventBuilder的writeBody()静态辅助方法。

自Flume 1.4.0起,Avro是默认的RPC协议。NettyAvroRpcClient和ThriftRpcClient实现了RpcClient接口。实现中我们需要知道我们将要连接的目标flume agent的host和port用于创建client实例,然后使用RpcClient发送数据到flume agent。

官网给了一个Avro RPCclients的例子,这边直接拿来做实际测试例子。

这里我们把client.init("host.example.org",41414);

改成 client.init("192.168.233.128",50000);  与我们的主机对接


[java] view plain copy

  1. import org.apache.flume.Event;  

  2. import org.apache.flume.EventDeliveryException;  

  3. import org.apache.flume.api.RpcClient;  

  4. import org.apache.flume.api.RpcClientFactory;  

  5. import org.apache.flume.event.EventBuilder;  

  6. import java.nio.charset.Charset;  

  7.    

  8. public class MyApp {  

  9.   public static voidmain(String[] args) {  

  10.    MyRpcClientFacade client = new MyRpcClientFacade();  

  11.    // Initializeclient with the remote Flume agent‘s host and port  

  12. //client.init("host.example.org",41414);  

  13. client.init("192.168.233.128",50000);  

  14.    

  15.    // Send 10events to the remote Flume agent. That agent should be  

  16.    // configured tolisten with an AvroSource.  

  17.    String sampleData = "Hello Flume!";  

  18.    for (int i =0; i < 10; i++) {  

  19.      client.sendDataToFlume(sampleData);  

  20.    }  

  21.    

  22.    client.cleanUp();  

  23.   }  

  24. }  

  25.    

  26. class MyRpcClientFacade {  

  27.   private RpcClient client;  

  28.   private String hostname;  

  29.   private int port;  

  30.    

  31.   public void init(String hostname, int port) {  

  32.    // Setup the RPCconnection  

  33.    this.hostname = hostname;  

  34.    this.port = port;  

  35.    this.client = RpcClientFactory.getDefaultInstance(hostname, port);  

  36.    // Use thefollowing method to create a thrift client (instead of the above line):  

  37.     // this.client = RpcClientFactory.getThriftInstance(hostname, port);  

  38.   }  

  39.    

  40.   public void sendDataToFlume(String data) {  

  41.    // Create aFlume Event object that encapsulates the sample data  

  42.    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));  

  43.    

  44.    // Send theevent  

  45.    try {  

  46.      client.append(event);  

  47.    } catch (EventDeliveryException e) {  

  48.      // clean up andrecreate the client  

  49.      client.close();  

  50.      client = null;  

  51.      client = RpcClientFactory.getDefaultInstance(hostname, port);  

  52.      // Use thefollowing method to create a thrift client (instead of the above line):  

  53.      // this.client =RpcClientFactory.getThriftInstance(hostname, port);  

  54.    }  

  55.   }  

  56.    

  57.   public void cleanUp() {  

  58.    // Close the RPCconnection  

  59.    client.close();  

  60.   }  

  61.    

  62. }  

这边代码不解释了,主要是将HelloFlume 发送10遍给flume,同时记得将flume 安装主目录下的lib 文件都添加进项目,才能正常运行程序。


 

下面是代理配置:


[html] view plain copy

  1. #配置文件:avro_client_case20.conf  

  2. # Name the components on this agent  

  3. a1.sources = r1  

  4. a1.sinks = k1  

  5. a1.channels = c1  

  6.    

  7. # Describe/configure the source  

  8. a1.sources.r1.type = avro  

  9. a1.sources.r1.port = 50000  

  10. a1.sources.r1.host = 192.168.233.128  

  11. a1.sources.r1.channels = c1  

  12.    

  13. # Describe the sink  

  14. a1.sinks.k1.channel = c1  

  15. a1.sinks.k1.type = logger  

  16.    

  17. # Use a channel which buffers events inmemory  

  18. a1.channels.c1.type = memory  

  19. a1.channels.c1.capacity = 1000  

  20. a1.channels.c1.transactionCapacity = 100  




这里要注意下,之前说了,在接收端需要AvroSource或者Thrift Source来监听接口。所以配置代理的时候要把a1.sources.r1.type 写成avro或者thrift

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

启动成功后

 

在eclipse 里运行Java程序,当然也可以打包后在服务器上运行JAVA程序。

 

#在启动源发送的代理终端查看console输出

技术分享


可以看到10条数据正常发送。

这里要说明下,开发代码中client.append(event)不仅仅可以发送一条数据,也可以发送一个List(string) 的数据信息,也就是批量发送。这边就不做演示了。

二、Failover Client

这个类包封装了Avro RPCclient的类默认提供故障处理能力。hosts采用空格分开host:port所代表的flume agent,构成一个故障处理组。这Failover RPC Client目前不支持thrift。如果当前选择的host agent有问题,这个failover client会自动负载到组中下一个host中。

下面是官网开发例子:

[java] view plain copy

  1. // Setup properties for the failover  

  2. Properties props = new Properties();  

  3. props.put("client.type""default_failover");  

  4.   

  5. // List of hosts (space-separated list of user-chosen host aliases)  

  6. props.put("hosts""h1 h2 h3");  

  7.   

  8. // host/port pair for each host alias  

  9. String host1 = "host1.example.org:41414";  

  10. String host2 = "host2.example.org:41414";  

  11. String host3 = "host3.example.org:41414";  

  12. props.put("hosts.h1", host1);  

  13. props.put("hosts.h2", host2);  

  14. props.put("hosts.h3", host3);  

  15.   

  16. // create the client with failover properties  

  17. RpcClient client = RpcClientFactory.getInstance(props);  

下面是测试的开发例子

[java] view plain copy

  1. import org.apache.flume.Event;  

  2. import org.apache.flume.EventDeliveryException;  

  3. import org.apache.flume.api.RpcClient;  

  4. import org.apache.flume.api.RpcClientFactory;  

  5. import org.apache.flume.event.EventBuilder;  

  6.   

  7. import java.nio.charset.Charset;  

  8. import java.util.Properties;  

  9.   

  10. public class Failover_Client {  

  11.     public static void main(String[] args) {  

  12.         MyRpcClientFacade2 client = new MyRpcClientFacade2();  

  13.         // Initialize client with the remote Flume agent‘s host and port  

  14.         client.init();  

  15.   

  16.         // Send 10 events to the remote Flume agent. That agent should be  

  17.         // configured to listen with an AvroSource.  

  18.         String sampleData = "Hello Flume!";  

  19.         for (int i = 0; i < 10; i++) {  

  20.           client.sendDataToFlume(sampleData);  

  21.         }  

  22.   

  23.         client.cleanUp();  

  24.       }  

  25.     }  

  26.   

  27.     class MyRpcClientFacade2 {  

  28.       private RpcClient client;  

  29.       private String hostname;  

  30.       private int port;  

  31.   

  32.       public void init() {  

  33.         // Setup the RPC connection  

  34.         // Use the following method to create a thrift client (instead of the above line):  

  35.         // this.client = RpcClientFactory.getThriftInstance(hostname, port);  

  36.      // Setup properties for the failover  

  37.         Properties props = new Properties();  

  38.         props.put("client.type""default_failover");  

  39.   

  40.         // List of hosts (space-separated list of user-chosen host aliases)  

  41.         props.put("hosts""h1 h2 h3");  

  42.   

  43.         // host/port pair for each host alias  

  44.         String host1 = "192.168.233.128:50000";  

  45.         String host2 = "192.168.233.128:50001";  

  46.         String host3 = "192.168.233.128:50002";  

  47.         props.put("hosts.h1", host1);  

  48.         props.put("hosts.h2", host2);  

  49.         props.put("hosts.h3", host3);  

  50.   

  51.         // create the client with failover properties  

  52.         client = RpcClientFactory.getInstance(props);  

  53.       }  

  54.   

  55.       public void sendDataToFlume(String data) {  

  56.         // Create a Flume Event object that encapsulates the sample data  

  57.         Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));  

  58.   

  59.         // Send the event  

  60.         try {  

  61.           client.append(event);  

  62.         } catch (EventDeliveryException e) {  

  63.           // clean up and recreate the client  

  64.           client.close();  

  65.           client = null;  

  66.           client = RpcClientFactory.getDefaultInstance(hostname, port);  

  67.           // Use the following method to create a thrift client (instead of the above line):  

  68.           // this.client = RpcClientFactory.getThriftInstance(hostname, port);  

  69.         }  

  70.       }  

  71.   

  72.       public void cleanUp() {  

  73.         // Close the RPC connection  

  74.         client.close();  

  75.       }  

  76. }  

这边代码设三个host用于故障转移,这里偷懒,用同一个主机的3个端口模拟。代码还是将Hello Flume 发送10遍给第一个flume代理,当第一个代理故障的时候,则发送给第二个代理,以顺序进行故障转移。

下面是代理配置沿用之前的那个,并对配置文件进行拷贝,

cp avro_client_case20.conf avro_client_case21.conf

cp avro_client_case20.conf avro_client_case22.conf

分别修改avro_client_case21.conf与avro_client_case22.conf中的

a1.sources.r1.port= 50001 与a1.sources.r1.port = 50002

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console

 

启动成功后

 

在eclipse 里运行JAVA程序Failover_Client.java,当然也可以打包后在服务器上运行JAVA程序。

#在启动源发送的3个代理终端查看console输出

我们可以看到第一个代理终端收到了,数据而其他2个终端没有数据。

技术分享

然后我们把第一个终端的进程关掉,再运行一遍client程序,然后会发现这个时候是发生到第二个终端中。当第二个终端也关闭的时候,再发送数据,则是发送到最后一个终端。这里我们可以看到,故障转移的代理主机转移是采用顺序序列的。

 

三、LoadBalancing RPC client

Flume Client SDK也支持在多个host之间使用负载均衡的Rpc Client。这种类型的client带有一个通过空格分隔的host:port主机列表并构成了一个负载均衡组。这个client可以指定一个负载均衡的策略,既可以随机的选择一个配置的host,也可以循环选择一个host。当然你也可以自己编写一个类实现LoadBalancingRpcClient$HostSelector接口以至于用户可以使用自己编写的选择顺序。在这种情况下,用户自定义的类需要被指定为host-selector属性的值。LoadBalancing RPC Client当前不支持thrift。

如果开启了backoff,那么client失败将被放入黑名单中,只有过了被指定的超时之间之后这个被选择的失败的主机才会从黑名单中被排除。当超时到了,如果主机还是没有反应,那么这被认为是一个连续的失败并且超时时间会成倍的增长,以避免可能陷入对反应迟钝主机的长时间等待中。

这backoff的最大超时时间可以通过maxBackoff属性来配置,单位是毫秒。在默认情况下maxBackoff的值是30秒(在orderSelector类里面指定)。

下面是官网例子

[java] view plain copy

  1. // Setup properties for the load balancing  

  2. Properties props = new Properties();  

  3. props.put("client.type""default_loadbalance");  

  4.   

  5. // List of hosts (space-separated list of user-chosen host aliases)  

  6. props.put("hosts""h1 h2 h3");  

  7.   

  8. // host/port pair for each host alias  

  9. String host1 = "host1.example.org:41414";  

  10. String host2 = "host2.example.org:41414";  

  11. String host3 = "host3.example.org:41414";  

  12. props.put("hosts.h1", host1);  

  13. props.put("hosts.h2", host2);  

  14. props.put("hosts.h3", host3);  

  15.   

  16. props.put("host-selector""random"); // For random host selection  

  17. // props.put("host-selector", "round_robin"); // For round-robin host  

  18. //                                            // selection  

  19. props.put("backoff""true"); // Disabled by default.  

  20.   

  21. props.put("maxBackoff""10000"); // Defaults 0, which effectively  

  22.                                   // becomes 30000 ms  

  23.   

  24. // Create the client with load balancing properties  

  25. RpcClient client = RpcClientFactory.getInstance(props);  

下面是测试的开发例子

[java] view plain copy

  1. import java.nio.charset.Charset;  

  2.   

  3. import org.apache.flume.Event;  

  4. import org.apache.flume.EventDeliveryException;  

  5. import org.apache.flume.api.RpcClient;  

  6. import org.apache.flume.api.RpcClientFactory;  

  7. import org.apache.flume.event.EventBuilder;  

  8. import java.util.Properties;  

  9.   

  10. public class Load_Client {  

  11.     public static void main(String[] args) {  

  12.         MyRpcClientFacade3 client = new MyRpcClientFacade3();  

  13.         // Initialize client with the remote Flume agent‘s host and port  

  14.         client.init();  

  15.   

  16.         // Send 10 events to the remote Flume agent. That agent should be  

  17.         // configured to listen with an AvroSource.  

  18.         String sampleData = "Flume Load_Client";  

  19.         for (int i = 0; i < 10; i++) {  

  20.           client.sendDataToFlume(sampleData);  

  21.         }  

  22.   

  23.         client.cleanUp();  

  24.       }  

  25.     }  

  26.   

  27.     class MyRpcClientFacade3{  

  28.       private RpcClient client;  

  29.       private String hostname;  

  30.       private int port;  

  31.   

  32.       public void init() {  

  33.           Properties props = new Properties();  

  34.           props.put("client.type""default_loadbalance");  

  35.   

  36.           // List of hosts (space-separated list of user-chosen host aliases)  

  37.           props.put("hosts""h1 h2 h3");  

  38.   

  39.           // host/port pair for each host alias  

  40.           String host1 = "192.168.233.128:50000";  

  41.           String host2 = "192.168.233.128:50001";  

  42.           String host3 = "192.168.233.128:50002";  

  43.           props.put("hosts.h1", host1);  

  44.           props.put("hosts.h2", host2);  

  45.           props.put("hosts.h3", host3);  

  46.   

  47.           props.put("host-selector""random"); // For random host selection  

  48.           // props.put("host-selector", "round_robin"); // For round-robin host  

  49. //                                                    // selection  

  50.           props.put("backoff""true"); // Disabled by default.  

  51.   

  52.           props.put("maxBackoff""10000"); // Defaults 0, which effectively  

  53.                                             // becomes 30000 ms  

  54.   

  55.           // Create the client with load balancing properties  

  56.           client = RpcClientFactory.getInstance(props);  

  57.       }  

  58.   

  59.       public void sendDataToFlume(String data) {  

  60.         // Create a Flume Event object that encapsulates the sample data  

  61.         Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));  

  62.   

  63.         // Send the event  

  64.         try {  

  65.           client.append(event);  

  66.         } catch (EventDeliveryException e) {  

  67.           // clean up and recreate the client  

  68.           client.close();  

  69.           client = null;  

  70.           client = RpcClientFactory.getDefaultInstance(hostname, port);  

  71.           // Use the following method to create a thrift client (instead of the above line):  

  72.           // this.client = RpcClientFactory.getThriftInstance(hostname, port);  

  73.         }  

  74.       }  

  75.   

  76.       public void cleanUp() {  

  77.         // Close the RPC connection  

  78.         client.close();  

  79.       }  

  80. }  

这里采用随机的负载均衡props.put("host-selector","random") 。测试的时候沿用之前的3个接受代理配置avro_client_case20.conf、avro_client_case21.conf和avro_client_case22.conf,并将他们起起来。

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console

 

启动成功后

 

在eclipse 里运行JAVA程序Failover_Client.java,当然也可以打包后在服务器上运行JAVA程序。

#在启动源发送的3个代理终端查看console输出

下面是Host1,收到了2条数据

技术分享

下面是Host2,收到了2条数据

技术分享

下面是Host3,收到了6条数据。

技术分享


可以看到我们开发例子中,host-selector选择的是随机,因此程序也是随机发送数据。下面我们测试轮询round_robin选项。

程序里我们修改这句

//props.put("host-selector","random"); // For random host selection

props.put("host-selector", "round_robin");// Forround-robin host

再运行Java 程序

下面是Host1,收到了4条数据

技术分享


下面是Host2,收到了3条数据

技术分享


同样Host3,收到了3条数据,这边就不放图了。轮询就是按照顺序放图。


lume NG 学习笔记(九)Flune Client 开发