首页 > 代码库 > ES transport client批量导入

ES transport client批量导入

技术分享

从bulk.txt文件中按行读取,然后bulk导入。首先通过调用client.prepareBulk()实例化一个BulkRequestBuilder对象,调用BulkRequestBuilder对象的add方法添加数据。实现代码:

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

public class ElasticSearchBulkIn {

    public static void main(String[] args) {

        try {

            Settings settings = Settings.settingsBuilder()
                    .put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml中配置

            Client client = TransportClient.builder().settings(settings).build()
                    .addTransportAddress(new InetSocketTransportAddress(
                            InetAddress.getByName("127.0.0.1"), 9300));

            File article = new File("files/bulk.txt");
            FileReader fr=new FileReader(article);
            BufferedReader bfr=new BufferedReader(fr);
            String line=null;
            BulkRequestBuilder bulkRequest=client.prepareBulk();
            int count=0;
            while((line=bfr.readLine())!=null){
                bulkRequest.add(client.prepareIndex("test","article").setSource(line));
                if (count%10==0) {
                    bulkRequest.execute().actionGet();
                }
                count++;
                //System.out.println(line);
            }
            bulkRequest.execute().actionGet();

            bfr.close();
            fr.close();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

setSource里其实就是json的字符串!!!!
见:http://www.cnblogs.com/bonelee/p/6956138.html

Settings settings=ImmutableSettings.settingsBuilder()  
        .put("client.transport.sniff",true).put("cluster.name","myelasticsearch").build();  
//设置客户端连接transport  
        Client client=new TransportClient(settings).addTransportAddress(  
                new InetSocketTransportAddress("192.168.1.100",9300));  
//建立批量提交类  
BulkRequestBuilder bulkRequest=client.prepareBulk();  
                while(rs.next()){  
//建立批量json对象  
                    bulkRequest.add(client.prepareIndex("ryxx","tweet",rs.getString("id")).setSource(jsonBuilder().startObject()  
                                                        .field("name",rs.getString("name"))  
                            .field("age",rs.getString("age"))  
                            .field("address",rs.getString("address"))  
                            .field("phone",rs.getString("phone"))  
                            .endObject()  
                    ));  
                }  
//批量提交到服务器  
                BulkResponse bulkResponse=bulkRequest.execute().actionGet();  
//提交过程是否产生错误  
                if(bulkResponse.hasFailures()){  
                    System.out.println(bulkResponse.buildFailureMessage());  
                      
                }  

 

 

ES transport client批量导入