Commit b73de79f authored by Matija Obreza's avatar Matija Obreza

ES reindexing using custom batch sizes

- Dataset produces a large JSON and requires a smaller batch processing size
parent f24473b6
/*
* Copyright 2018 Global Crop Diversity Trust
* Copyright 2019 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -44,6 +44,15 @@ public interface ElasticsearchService {
*/
<R extends BasicModel> void indexEntity(Class<R> clazz);
/**
* Index entity.
*
* @param <R> the generic type
* @param clazz the model class
* @param reindexBatchSize custom batch size
*/
<R extends BasicModel> void indexEntity(Class<R> clazz, int reindexBatchSize);
/**
* Reindex.
*
......@@ -241,4 +250,16 @@ public interface ElasticsearchService {
* @throws InterruptedException
*/
void waitForCount(Class<? extends BasicModel> clazz, BasicModelFilter<?, ?> filter, int mustHaveCount) throws InterruptedException;
/**
* Sets the batch size for reindexing. Some entities generate super large JSON and we want to
* reindex those in much smaller batches.
*
* @param <R> the BasicModel type
* @param model the document model
* @param batchSize the batch size
* @return the integer
*/
<R extends BasicModel> Integer setReindexBatchSize(Class<R> model, Integer batchSize);
}
/*
* Copyright 2018 Global Crop Diversity Trust
* Copyright 2019 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -37,7 +37,6 @@ import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Root;
import com.querydsl.core.BooleanBuilder;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.collections4.MapUtils;
......@@ -95,6 +94,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
......@@ -119,6 +119,7 @@ import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
import com.fasterxml.jackson.module.jsonSchema.types.ObjectSchema;
import com.google.common.collect.Sets;
import com.querydsl.core.BooleanBuilder;
import com.querydsl.core.types.EntityPath;
import com.querydsl.core.types.Predicate;
import com.querydsl.core.types.dsl.PathBuilder;
......@@ -149,7 +150,7 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
protected static final SourceFilter DEFAULT_SOURCE_FILTER = new FetchSourceFilter(new String[] { "id", "_class", "title", "code", "description" }, new String[] {});
@Autowired
private TaskScheduler taskScheduler;
private TaskExecutor taskExecutor;
@Autowired
private EntityManager em;
......@@ -166,6 +167,7 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
private final Set<Class<? extends BasicModel>> indexedEntities = Collections.synchronizedSet(new HashSet<>());
private final Map<String, Class<BasicModel>> namesToClasses = Collections.synchronizedMap(new HashMap<>());
private final Map<Class<? extends BasicModel>, Set<String>> jsonSchemas = new HashMap<>();
private final Map<Class<? extends BasicModel>, Integer> reindexBatchSize = new HashMap<>();
/// Size of database batch scan for IDs
private int batchSize = 1000;
......@@ -288,6 +290,25 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
indexedEntities.add(clazz);
namesToClasses.put(clazz.getSimpleName(), (Class<BasicModel>) clazz);
}
@Override
public <R extends BasicModel> void indexEntity(Class<R> clazz, int reindexBatchSize) {
indexEntity(clazz);
this.reindexBatchSize.put(clazz, reindexBatchSize);
}
/**
* Sets the reindex batch size.
*
* @param <R> the generic type
* @param model the model
* @param batchSize the batch size
* @return the integer
*/
@Override
public <R extends BasicModel> Integer setReindexBatchSize(Class<R> model, Integer batchSize) {
return this.reindexBatchSize.put(model, batchSize);
}
@Override
public List<Class<? extends BasicModel>> getIndexedEntities() {
......@@ -378,18 +399,21 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
query.where(filter.buildPredicate());
}
Integer scanSize = reindexBatchSize.get(clazz);
final int customBatchSize = scanSize == null ? batchSize : scanSize.intValue();
int startPosition = 0;
query.offset(startPosition);
query.limit(batchSize);
query.limit(customBatchSize);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
List<Long> results;
do {
stopWatch.split();
LOG.debug("Reading IDs stopwatch={}s {} {}+{}", stopWatch.getSplitTime() / 1000, clazz.getName(), startPosition, batchSize);
if (startPosition > 10 * batchSize && startPosition / (10 * batchSize) == 0) {
LOG.info("Reading IDs stopwatch={}s {} {}+{}", stopWatch.getSplitTime() / 1000, clazz.getName(), startPosition, batchSize);
LOG.debug("Reading IDs stopwatch={}s {} {}+{}", stopWatch.getSplitTime() / 1000, clazz.getName(), startPosition, customBatchSize);
if (startPosition > 10 * customBatchSize && startPosition / (10 * customBatchSize) == 0) {
LOG.info("Reading IDs stopwatch={}s {} {}+{}", stopWatch.getSplitTime() / 1000, clazz.getName(), startPosition, customBatchSize);
}
results = query.fetch();
asyncUpdate(clazz, results);
......@@ -422,14 +446,14 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
ensureWriteAlias(clazz);
taskScheduler.schedule(() -> {
taskExecutor.execute(() -> {
LOG.debug("Running scheduled reindex of {} size={}", clazz.getName(), copy.size());
try {
_self.update(clazz, copy);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}, new Date(System.currentTimeMillis() + 5000));
});
}
/**
......
......@@ -145,19 +145,19 @@ public class ElasticsearchConfig {
es.indexEntity(Partner.class);
es.indexEntity(Crop.class);
es.indexEntity(ControlledVocabulary.class);
es.indexEntity(ControlledVocabulary.class, 20); // big JSON, small batch
es.indexEntity(VocabularyTerm.class);
es.indexEntity(Descriptor.class);
es.indexEntity(DescriptorList.class);
es.indexEntity(Dataset.class);
es.indexEntity(Subset.class);
es.indexEntity(DescriptorList.class, 100); // big JSON, small batch
es.indexEntity(Dataset.class, 1); // big JSON, small batch
es.indexEntity(Subset.class, 10); // smaller batch
es.indexEntity(Accession.class);
es.indexEntity(FaoInstitute.class);
es.indexEntity(Country.class);
es.indexEntity(Article.class);
es.indexEntity(ActivityPost.class);
return es;
}
......
......@@ -47,6 +47,7 @@ public class SchedulerConfig implements SchedulingConfigurer, AsyncConfigurer {
pool.setCorePoolSize(4);
pool.setMaxPoolSize(16);
pool.setQueueCapacity(100);
pool.setThreadPriority(Thread.NORM_PRIORITY - 2);
pool.setThreadNamePrefix("genesys-background-");
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return pool;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment