首页 > 代码库 > elasticsearch持有者类

elasticsearch持有者类

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
 * <p></p>
 *
 * @author
 * @version V1.0
 * @modificationHistory=========================逻辑或功能性重大变更记录
 * @modify by user: $author$ $date$
 * @modify by reason: {方法名}:{原因}
 */
public class ESHolder implements Serializable,Closeable{
    private static final Logger LOG = LoggerFactory.getLogger(ESHolder.class);

    private String esClusterName = null;
    private String esClusterAddress = null;
    // ES客户端
    private Client ESClient = null;

    public ESHolder(String esClusterName, String esClusterAddress) {
        this.esClusterName = esClusterName;
        this.esClusterAddress = esClusterAddress;
    }


    public Client getESClient() {
        if (ESClient == null) {
            initESClient(esClusterName, esClusterAddress);
        }
        return ESClient;
    }

    /**
     * 批量建立ES索引
     *
     * @param list
     * @return
     * @author
     */
    public boolean addIndex(String indexName, String typeName, List<Map<String, Object>> list) {
        long t = System.currentTimeMillis();
        try {
            ObjectMapper mapper = new ObjectMapper();
            BulkRequestBuilder bulkRequest = getESClient().prepareBulk();
            for(Map<String, Object> data : list){
                byte[] json = mapper.writeValueAsBytes(data);
                bulkRequest.add(new IndexRequest(indexName, typeName).source(json));
            }

            BulkResponse response = bulkRequest.execute().actionGet();
            if(response.hasFailures()){
                BulkItemResponse[] itemResponses = response.getItems();
                for(BulkItemResponse itemResponse : itemResponses){
                    // TODO Must do something to handle failures.
                    LOG.error("Add ES Index failed! DOC_ID: {}, Reason: {}", itemResponse.getId(), itemResponse.getFailureMessage());
                }
            }
        } catch (JsonProcessingException e) {
            LOG.error("Build index fail.", e);
            return false;
        }
        LOG.debug("build index complete,num:{}, cost:{}", list.size(), System.currentTimeMillis() - t);
        return true;
    }

    /**
     * 批量删除ES索引
     *
     * @param docIds
     *
     *
     */
    public void deleteIndex(String indexName, String typeName, List<String> docIds){
        BulkRequestBuilder bulkRequest = getESClient().prepareBulk();
        for(String docId : docIds){
            bulkRequest.add(new DeleteRequest(indexName, typeName, docId));
        }
        BulkResponse response = bulkRequest.execute().actionGet();
        if(response.hasFailures()){
            BulkItemResponse[] itemResponses = response.getItems();
            for(BulkItemResponse itemResponse : itemResponses){
                // TODO Must do something to handle failures.
                LOG.error("ES Index delete failed! DOC_ID: {}, Reason: {}", itemResponse.getId(), itemResponse.getFailureMessage());
            }
        }
    }

    /**
     * 删除ES索引
     *
     * @param indexName
     * @param typeName
     * @param data
     * @return
     */
    public boolean deleteIndex(String indexName, String typeName, Map<String, Object> data){
        DeleteRequestBuilder requestBuilder = getESClient().prepareDelete(indexName, typeName,
                (String) data.get(“rowkey”));
        DeleteResponse response = requestBuilder.execute().actionGet();
        if(!response.isFound()){
            LOG.error("ES Index not found! DOC_ID: {}", response.getId());
            return false;
        }
        return true;
    }

    /**
     * 从ES查询数据
     *
     * @param query
     * @return
     *
     */
    public SearchHits queryWithES(SearchRequestBuilder query){
        SearchHits response = query.execute().actionGet().getHits();
        return response;
    }

    /**
     * 构造查询对象
     *
     * @param index
     * @param type
     * @param queryBuilder
     * @param retField
     * @param sortField
     * @param start
     * @param rows
     * @return
     */
    public SearchRequestBuilder buildSearch(String index, String type, QueryBuilder queryBuilder, String retField, String sortField, SortOrder sortOrder, int start, int rows){

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(queryBuilder).from(start).size(rows);

        if(StringUtils.isNotEmpty(retField)){
            searchSourceBuilder.field(retField);
        }

        if(StringUtils.isNotEmpty(sortField)){
            searchSourceBuilder.sort(sortField, sortOrder);
        }

        LOG.debug("ES Query string: " + searchSourceBuilder.toString());

        return getESClient().prepareSearch().setIndices(index).setTypes(type)
                .setExtraSource(searchSourceBuilder.buildAsBytes(Requests.CONTENT_TYPE));
    }

    /**
     * 统计数据量
     *
     * @return 符合条件的数据量
     */
    public long countWithQuery(String indexName, String typeName, QueryBuilder queryBuilder){
        SearchRequestBuilder builder = getESClient().prepareSearch(indexName).setTypes(typeName)
                .setQuery(queryBuilder).setFrom(0).setSize(0);
        return countWithQuery(builder);
    }

    /**
     * 统计数据量
     *
     * @param query
     * @return
     *
     */
    public long countWithQuery(SearchRequestBuilder query){
        return query.execute().actionGet().getHits().getTotalHits();
    }

    /**
     * 初始化ES客户端
     *
     * @return
     */
    private void initESClient(String esClusterName, String esClusterAddress) {
        int esClientTimeout = 180000;
        LOG.info("init ES Client...");
        try {
            String[] hostPair = esClusterAddress.split(“,”);
            TransportAddress[] addrs = new TransportAddress[hostPair.length];

            int i = 0;
            String[] keyValuePair;
            for (String t : hostPair) {
                keyValuePair = t.split(":");
                if (2 != keyValuePair.length) {
                    throw new IOException("ES‘s host is not correct:" + Arrays.toString(keyValuePair));
                }
                addrs[i] = new InetSocketTransportAddress(InetAddress.getByName(keyValuePair[0]), Integer.valueOf(keyValuePair[1]));
                i++;
            }

            Settings settings = Settings.settingsBuilder()
                    .put("cluster.name", esClusterName)
                    .put("client.transport.sniff", true)
                    .put("client.transport.ping_timeout", esClientTimeout + "s").build();

            ESClient = TransportClient.builder().settings(settings).build().addTransportAddresses(addrs);
        } catch (Exception e) {
            LOG.error("Address error!", e);
        }
    }


    @Override
    public void close() throws IOException {
        if(this.ESClient != null){
            LOG.info("closing esclient....");
            this.ESClient.close();
            this.ESClient = null;
        }
    }
}

 

elasticsearch持有者类