Commit 4cbfcae0 authored by Matija Obreza's avatar Matija Obreza

Reindex ES by filter

parent c0042d13
......@@ -27,8 +27,6 @@ import org.springframework.data.elasticsearch.core.facet.result.TermResult;
public interface ElasticService {
void reindexByFilter(AppliedFilters filters, boolean slow);
Page<AccessionDetails> search(String query, Pageable pageable) throws SearchException;
void remove(String className, long id);
......
......@@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.FacetedPage;
......@@ -47,7 +46,6 @@ import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -345,79 +343,6 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
elasticsearchTemplate.refresh(clazz2, true);
}
@Override
public void reindexByFilter(AppliedFilters filters, boolean slow) {
LOG.info("Creating index");
elasticsearchTemplate.createIndex(AccessionDetails.class);
LOG.info("Putting mapping");
elasticsearchTemplate.putMapping(AccessionDetails.class);
LOG.info("Refreshing");
elasticsearchTemplate.refresh(AccessionDetails.class, true);
StopWatch stopWatch = new StopWatch();
stopWatch.setKeepTaskList(false);
int page = 0, size = 100;
List<IndexQuery> queries = new ArrayList<IndexQuery>();
do {
LOG.info("Reindexing progress " + (page * size));
queries.clear();
stopWatch.start("Get data by filter");
Page<Accession> accessions = filterService.listAccessions(filters, new PageRequest(page, size));
stopWatch.stop();
LOG.info(stopWatch.getLastTaskName() + " took " + stopWatch.getLastTaskTimeMillis());
if (!accessions.hasContent()) {
LOG.info("No more content");
break;
}
page++;
stopWatch.start("Query generation");
for (Accession accn : accessions) {
queries.add(reindexAccession(accn.getId()));
}
stopWatch.stop();
LOG.info(stopWatch.getLastTaskName() + " took " + stopWatch.getLastTaskTimeMillis());
stopWatch.start("Bulk index");
elasticsearchTemplate.bulkIndex(queries);
stopWatch.stop();
LOG.info(stopWatch.getLastTaskName() + " took " + stopWatch.getLastTaskTimeMillis());
if (page % 10 == 0) {
stopWatch.start("Refresh");
elasticsearchTemplate.refresh(AccessionDetails.class, true);
stopWatch.stop();
LOG.info(stopWatch.getLastTaskName() + " took " + stopWatch.getLastTaskTimeMillis());
}
if (slow) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
break;
}
}
} while (true);
elasticsearchTemplate.refresh(AccessionDetails.class, true);
LOG.info("Done.");
}
private IndexQuery reindexAccession(long accessionId) {
AccessionDetails ad = genesysService.getAccessionDetails(accessionId);
IndexQuery iq = new IndexQuery();
iq.setId(String.valueOf(ad.getId()));
iq.setObject(ad);
return iq;
}
@Override
public void afterPropertiesSet() throws Exception {
LOG.info("Initializing index");
......
......@@ -28,13 +28,16 @@ import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.persistence.domain.GenesysLowlevelRepository;
import org.genesys2.server.service.CountryNamesUpdater;
import org.genesys2.server.service.GenesysFilterService;
import org.genesys2.server.service.GenesysService;
import org.genesys2.server.service.GeoService;
import org.genesys2.server.service.InstituteService;
import org.genesys2.server.service.MappingService;
import org.genesys2.server.service.impl.ContentSanitizer;
import org.genesys2.server.service.impl.FilterHandler.AppliedFilters;
import org.genesys2.server.service.worker.ElasticUpdater;
import org.genesys2.server.service.worker.ITPGRFAStatusUpdater;
import org.genesys2.server.service.worker.InstituteUpdater;
......@@ -43,6 +46,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.jdbc.core.RowCallbackHandler;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
......@@ -50,6 +56,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hazelcast.core.IMap;
@Controller
......@@ -96,6 +103,9 @@ public class AdminController {
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private GenesysFilterService filterService;
@RequestMapping("/")
public String root(Model model) {
return "/admin/index";
......@@ -122,18 +132,61 @@ public class AdminController {
}
@RequestMapping(method = RequestMethod.POST, value = "/reindex-elastic")
public String reindexElastic(@RequestParam(value="startAt", required=false) Long startAt, @RequestParam(value="slow", required=true, defaultValue="true") boolean slow) {
// LOG.info("Json filter: " + jsonFilter);
public String reindexElastic(@RequestParam(value = "startAt", required = false) Long startAt,
@RequestParam(value = "slow", required = true, defaultValue = "true") boolean slow) {
// LOG.info("Json filter: " + jsonFilter);
elasticUpdater.fullReindex();
return "redirect:/admin/";
}
@RequestMapping(method = RequestMethod.POST, value = "/reindex-elastic", params = { "filter" })
public String reindexElasticFiltered(@RequestParam(value = "filter", required = true) String jsonFilter,
@RequestParam(value = "slow", required = false, defaultValue = "true") boolean slow) throws IOException {
ObjectMapper mapper = new ObjectMapper();
AppliedFilters filters = mapper.readValue(jsonFilter, AppliedFilters.class);
int page = 0, size = 100;
List<IndexQuery> queries = new ArrayList<IndexQuery>();
do {
LOG.info("Reindexing progress " + (page * size));
queries.clear();
Page<Accession> accessions = filterService.listAccessions(filters, new PageRequest(page, size));
if (!accessions.hasContent()) {
LOG.info("No more content");
break;
}
page++;
for (Accession a : accessions) {
elasticUpdater.update(Accession.class, a.getId());
}
if (slow) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
break;
}
}
} while (true);
LOG.info("Done.");
return "redirect:/admin/";
}
@RequestMapping(method = RequestMethod.POST, value = "/clear-queues")
public String clearElasticQueues() {
elasticUpdater.clearQueues();
return "redirect:/admin/";
}
@RequestMapping(method = RequestMethod.POST, value = "/updateAccessionCountryRefs")
public String updateAccessionCountryRefs() {
genesysService.updateAccessionCountryRefs();
......
......@@ -21,6 +21,7 @@
<form method="post" action="<c:url value="/admin/reindex-elastic" />">
<input type="text" name="startAt" disabled="true" />
<input type="text" name="filter" />
<label>
<input type="checkbox" name="slow" value="false" /> No sleep
</label>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment