How to delete a large number of documents in ES

年后归来,进入客户后台查看定时任务执行的spark数据是否正常,点击进入后不堪入目,原本一个接口的图表只有10条数据,
结果涌现了数百条,且这些数据非常的有规律,相识度非常高,初步判断,“相同数据,复制了30+份!”。该报表为月报,在每
月1号3时后台spark从es读出数据,然后写回es,再供前端查询es。查看crontab脚本后,发现误将“每月1号3时”(0 0 3 1 *)
写成“每天3时”(0 3 * * *)执行...ops.开通该业务的公司有20个左右,总的数据冗余量在50000-100000条左右。

2018-3-9更新

昨天写完这篇总结,还特意查看了下6.0新特性等,后面得知es存在按条件(delete-by-query)删除documents! 额,之前一直苦苦没有找到,还费劲巴拉的建工程写rest api…

反思了下:为什么当时没有找到[delete-by-query]?当google时我一直在输入‘elasticsearch 批量删除’而不是‘elasticsearch按条件删除’,搜索信息还是非常重要的,关键词合适与否将决定返回的结果!!

[delete-by-query]曾在2.x中剔除了,若要使用需手动安装插件,然后在5.x又回归了,6.0实践也是可用的。

官方介绍:https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-delete-by-query.html “The simplest usage of _delete_by_query just performs a deletion on every document that match a query. ”_delete_by_query的最简单用法只是对每个匹配查询的文档执行删除操作。也就是能删除每个匹配的文档,即使每个type下的文档被清空后,该type依旧存在!

官方文档给出的格式,这边是直接在index之后,其实可以index/type/_delete_by_query也是没问题的

POST twitter/_delete_by_query
{
  "query": { 
    "match": {
      "message": "some message"
    }
  }
}

我的删除接口对应的es script

POST /spark_201802/spark/_delete_by_query
{
  "query": {
    "bool": {
      "must": [
        {
          "term": {
            "start_time": {
              "value": "2018-01-01T00:00:00+08:00"
            }
          }
        },
        {
          "term": {
            "end_time": {
              "value": "2018-01-31T23:59:59+08:00"
            }
          }
        },
        {
          "term": {
            "type": {
              "value": "dialCount_bridged_rate"
            }
          }
        },
        {
          "term": {
            "enterprise_id": {
              "value": 7000132
            }
          }
        }
      ],
      "filter": {
        "range": {
          "create_time": {
            "gte": "2018-02-08T04:00:07.407+08:00",
            "lte": "2018-02-10T04:05:07.407+08:00"
          }
        }
      }
    }
  }
}

es删除方式

平时开发日常用,一般设计的es操作为create,query或者基于aggs上的query,几乎没有任何DELETE的业务接口;因query能通过range筛选出符合一定日期范围的数据,类似的思维,DELETE命令是否也能圈定日期内的数据,批量删除呢?

并没有!!! 只有

DELETE /index/type/id

数万条数据…WTF???

产品已经上线,es服务器ip访问不了,但spark服务器环境可通过内网ip访问es….so,只能另起一个小项目通过es java api先查询数这些数据的id,然后放入集合,再遍历一条一条执行DELETE,写好测试通过后,将该项目打包成jar包,在spark环境运行,通过浏览器调用暴露的restful接口传递具体的企业Id、时间范围等条件执行该删除接口。

这边并没有使用bulk批量删除….

java获取所有冗余文档的ID遍历调用删除接口

通过spring boot另起项目,很快就搭建好了运行环境,接下来就是开发mvc的restful风格的接口去循环调用删除命令了。

jar依赖

<dependency>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch</artifactId>
			<version>5.1.1</version>
			<exclusions>
				<exclusion>
					<groupId>log4j</groupId>
					<artifactId>log4j</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>io.searchbox</groupId>
			<artifactId>jest</artifactId> //推荐使用jest,elasticsearch-rest-high-level-client好难用
			<version>2.4.0</version>
		</dependency>
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.32</version>
		</dependency>
	</dependencies>

es配置

application.properties配置

spring.elasticsearch.jest.uris=http://10.10.53.187:9200/,http://10.10.52.143:9200/,http://10.10.54.45:9200/
spring.elasticsearch.jest.connection-timeout=1500
spring.elasticsearch.jest.read-timeout=10000

java配置文件

package com.tinet.config;

import com.google.gson.GsonBuilder;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.List;

@Configuration
public class ElasticsearchConfiguration {
	
	@Value("${spring.elasticsearch.jest.uris}")
	private String serverUri;
	@Value("${spring.elasticsearch.jest.connection-timeout}")
	private Integer connTimeout;
	@Value("${spring.elasticsearch.jest.read-timeout}")
	private Integer readTimeout;
	
	@Bean
	public JestClient getJestClient() {
		String[] serverUris=serverUri.split(","); //集群,多个地址
		List<String> serverUriList=new ArrayList<>();
		if (serverUris.length > 1){
			serverUri=serverUris[0];
			for (int i=1;i<serverUris.length;i++){
				serverUriList.add(serverUris[i]);
			}
		}
		JestClientFactory factory = new JestClientFactory();
		factory.setHttpClientConfig(new HttpClientConfig.Builder(serverUri).addServer(serverUriList)
				.gson(new GsonBuilder().setDateFormat("yyyy-MM-dd'T'hh:mm:ss").create()).connTimeout(connTimeout)
				.readTimeout(readTimeout).multiThreaded(true).build());
		return factory.getObject();
	}
	
	@Bean
	public SearchSourceBuilder getSearchSourceBuilder() {
		return SearchSourceBuilder.searchSource();
	}
}

需求代码实现

主要为获取query后的document ID.

try {
            searchResult = super.jestClient.execute(search);
        } catch (IOException e) {
            logger.error("查询ES异常", e);
        }

        if (searchResult == null || !searchResult.isSucceeded()) {
            logger.error(searchResult.getErrorMessage());
        }

        List<String> dataList = null;
        if(searchResult.isSucceeded()){
            String jsonString = searchResult.getJsonString();
            logger.debug(jsonString);
            // 得到主通道数据
            JSONObject fastResult = JSON.parseObject(jsonString);
            JSONObject fastHits = fastResult.getJSONObject("hits");
            JSONArray fastArray = fastHits.getJSONArray("hits");


            // 处理主通道数据
            dataList = new ArrayList<>();
            for (int i = 0; i < fastArray.size(); i++) {
                JSONObject one = fastArray.getJSONObject(i);
                JSONObject fastSource = one.getJSONObject("_source");

                dataList.add(one.getString("_id"));
                System.out.println(one.getString("_id"));



                Delete delete = new Delete.Builder(one.getString("_id")).index(generateSparkIndex(date, Const.ELASTICSERACH_ALIAS)).type("spark").build();

                JestResult result = null ;
                try {
                    result = jestClient.execute(delete);
                    logger.info("deleteDocument == " + result.getJsonString());
                    Thread.sleep(50); //适当控制下删除速度
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

获取一个月的索引

该项目部分索引采用一天一个索引的方式,而不是一月一个,因此在查询一个月的时候,需自己加工下索引名字,顺便记录下: 用户传入开始时间和结束时间,格式为yyyy-MM-dd,最后获取单个索引的格式为cdr_yyyyMMdd_alias,一个月的则【cdr_yyyyMMdd_alias,cdr_yyyyMMdd_alias….】

 protected static final String CDR_INDEX_PARSE_PATTERN = "yyyy-MM-dd";
 private static final String CDR_INDEX_FORMAT_PATTERN = "yyyyMMdd";
 private static final String CDR_INDEX_PREFIX = "cdr_";
 
 /**
     * 获取查询使用的ES索引集合
     *
     * @param startTime 起始时间
     * @param endTime   结束时间
     * @return 索引集合,例如:
     * <p>
     * [cdr_20170601_alias, cdr_20170602_alias]
     * </p>
     */
    protected static List<String> generateIndexList(String startTime, String endTime, String esAlias) {


        LocalDate startDate = LocalDate.parse(startTime, DateTimeFormatter.ofPattern(CDR_INDEX_PARSE_PATTERN));
        LocalDate endDate = LocalDate.parse(endTime, DateTimeFormatter.ofPattern(CDR_INDEX_PARSE_PATTERN));

        long start = startDate.toEpochDay();
        long end = endDate.toEpochDay();

        List<String> indexList = new ArrayList<>();
        while (start <= end) {

            String day = startDate.format(DateTimeFormatter.ofPattern(CDR_INDEX_FORMAT_PATTERN));
            indexList.add(CDR_INDEX_PREFIX + day + esAlias);

            startDate = startDate.plus(1, ChronoUnit.DAYS);
            start = startDate.toEpochDay();
        }

        return indexList;
    }

比较jestClien和elasticsearch-rest-high-level-client

两者是我在执行es命令时用过的java客户端,分别在两个产品中使用,前者的es version为5.4,后者的es version 为6.0,此时jestclient还不支持es 6.0,故选择elasticsearch-rest-high-level-client.JestClient是第三方开发的,后者则是elastic自家的,这边想吐槽的是elastic自家的在取出聚合数据时,诶嘛,费时费力,绕来老去,提供的数据格式有限,需自己手动转换,是在不如JestClient实用;而JestClient去aggs中的数据时,可直接转为json,借用fastjson获取数据,非常容易,特别是有过fastjon类似的序列化框架经验的开发人员。

教训

公司的产品还属于研发阶段,正式用户不多,不然也不会拖这么长的时间才发现问题,这次的教训够沉重的了,直接往生产环境es大量删除数据,如履薄冰,胆战心惊,就怕误删数据,这样的操作还是第一次…毕竟,引以为戒。

热情洋溢的程序员欢迎您以任何形式转载!