Commit 330c3079 authored by Matija Obreza's avatar Matija Obreza

AccessionDetails JSON updated, reindexing code moved to ElasticUpdater

parent 2535d6a6
......@@ -24,11 +24,11 @@ import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldIndex;
import org.springframework.data.elasticsearch.annotations.FieldType;
@Document(indexName = "genesys", type = "mcpd", refreshInterval = "60")
@Document(indexName = "genesys", type = "mcpd", refreshInterval = "60s")
public class AccessionDetails {
private static final Logger LOG = Logger.getLogger(AccessionDetails.class);
private static Pattern mcpdSplit = Pattern.compile("\\w*;\\w*");
static Pattern mcpdSplit = Pattern.compile("\\w*;\\w*");
@Version
private Long version;
......
......@@ -2,6 +2,7 @@ package org.genesys2.server.model.elastic;
import org.apache.commons.lang.StringUtils;
import org.genesys2.server.model.genesys.AccessionAlias;
import org.genesys2.server.model.genesys.AccessionAlias.AliasType;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldIndex;
import org.springframework.data.elasticsearch.annotations.FieldType;
......@@ -12,12 +13,14 @@ public class Alias {
@Field(index = FieldIndex.not_analyzed, type = FieldType.String)
private String lang;
private String usedBy;
private AliasType type;
public static Alias from(AccessionAlias aa) {
Alias a = new Alias();
a.name = aa.getName();
a.lang = StringUtils.defaultIfBlank(aa.getLang(), null);
a.usedBy = StringUtils.defaultIfBlank(aa.getUsedBy(), null);
a.type = aa.getAliasType();
return a;
}
......@@ -45,4 +48,11 @@ public class Alias {
this.usedBy = usedBy;
}
public AliasType getType() {
return type;
}
public void setType(AliasType type) {
this.type = type;
}
}
\ No newline at end of file
package org.genesys2.server.model.elastic;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.genesys2.server.model.genesys.AccessionCollect;
......@@ -9,7 +12,7 @@ import org.springframework.data.elasticsearch.annotations.FieldType;
public class Collect {
public String collCode;
public Set<String> collCode;
@Field(searchAnalyzer = "keyword", type = FieldType.String)
public String collDate;
public String collInstAddr;
......@@ -26,7 +29,8 @@ public class Collect {
return null;
Collect c = new Collect();
c.collCode = StringUtils.defaultIfBlank(collect.getCollCode(), null);
if (StringUtils.isNotBlank(collect.getCollCode()))
c.collCode = new HashSet<String>(Arrays.asList(AccessionDetails.mcpdSplit.split(collect.getCollCode())));
c.collDate = StringUtils.defaultIfBlank(collect.getCollDate(), null);
c.collInstAddr = StringUtils.defaultIfBlank(collect.getCollInstAddress(), null);
c.collMissId = StringUtils.defaultIfBlank(collect.getCollMissId(), null);
......
......@@ -19,7 +19,8 @@ public class Geo {
Geo g = new Geo();
g.datum = StringUtils.defaultIfBlank(ag.getDatum(), null);
g.elevation = ag.getElevation();
g.coordinates = new Double[] { ag.getLongitude(), ag.getLatitude() };
if (ag.getLongitude() != null & ag.getLatitude() != null)
g.coordinates = new Double[] { ag.getLongitude(), ag.getLatitude() };
g.longitude = ag.getLongitude();
g.latitude = ag.getLatitude();
g.uncertainty = ag.getUncertainty();
......@@ -83,5 +84,4 @@ public class Geo {
this.datum = datum;
}
}
\ No newline at end of file
......@@ -25,8 +25,6 @@ import org.springframework.data.domain.Pageable;
public interface ElasticService {
void reindexEverything();
void reindexByFilter(String jsonFilter, boolean slow);
Page<AccessionDetails> search(String query, Pageable pageable) throws SearchException;
......
......@@ -74,63 +74,6 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
}
}
@Override
public void reindexEverything() {
// LOG.info("Deleting index");
// elasticsearchTemplate.deleteIndex(AccessionDetails.class);
LOG.info("Creating index");
elasticsearchTemplate.createIndex(AccessionDetails.class);
LOG.info("Putting mapping");
elasticsearchTemplate.putMapping(AccessionDetails.class);
LOG.info("Refreshing");
elasticsearchTemplate.refresh(AccessionDetails.class, true);
Map<?, ?> setting = elasticsearchTemplate.getSetting(AccessionDetails.class);
for (Object k : setting.keySet()) {
LOG.info("es setting " + k + " = " + setting.get(k));
}
Map<?, ?> mapping = elasticsearchTemplate.getMapping(AccessionDetails.class);
for (Object k : mapping.keySet()) {
LOG.info("es mapping " + k + " = " + mapping.get(k));
}
int page = 0, size = 100;
List<IndexQuery> queries = new ArrayList<IndexQuery>();
do {
LOG.info("Reindexing progress " + (page * size));
queries.clear();
List<Long> accessions = genesysService.listAccessionsIds(new PageRequest(page, size));
if (accessions.isEmpty()) {
LOG.info("No more content");
break;
}
page++;
for (Long accn : accessions) {
queries.add(reindexAccession(accn));
}
elasticsearchTemplate.bulkIndex(queries);
if (page % 10 == 0)
elasticsearchTemplate.refresh(AccessionDetails.class, true);
} while (true);
elasticsearchTemplate.refresh(AccessionDetails.class, true);
LOG.info("Done.");
}
private IndexQuery reindexAccession(long accessionId) {
AccessionDetails ad = genesysService.getAccessionDetails(accessionId);
IndexQuery iq = new IndexQuery();
iq.setId(String.valueOf(ad.getId()));
iq.setObject(ad);
return iq;
}
@Override
public void update(String className, long id) {
if (!clazzMap.containsKey(className)) {
......@@ -174,7 +117,7 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
queries.add(iq);
}
if (LOG.isInfoEnabled() && ! queries.isEmpty()) {
if (LOG.isInfoEnabled() && !queries.isEmpty()) {
LOG.info("Indexing " + className + " count=" + queries.size());
elasticsearchTemplate.bulkIndex(queries);
}
......@@ -226,7 +169,7 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
if (StringUtils.isNotBlank(jsonFilter))
jsonTree = (ObjectNode) objectMapper.readTree(jsonFilter);
else {
reindexEverything();
LOG.warn("No filter provided. Quitting.");
return;
}
} catch (IOException e) {
......@@ -288,6 +231,15 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
LOG.info("Done.");
}
private IndexQuery reindexAccession(long accessionId) {
AccessionDetails ad = genesysService.getAccessionDetails(accessionId);
IndexQuery iq = new IndexQuery();
iq.setId(String.valueOf(ad.getId()));
iq.setObject(ad);
return iq;
}
@Override
public void afterPropertiesSet() throws Exception {
LOG.info("Initializing index");
......
......@@ -17,16 +17,24 @@
package org.genesys2.server.service.worker;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Resource;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.genesys2.server.model.elastic.AccessionDetails;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.service.GenesysService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.stereotype.Component;
import com.hazelcast.core.IQueue;
......@@ -126,6 +134,70 @@ public class ElasticUpdater {
return set;
}
public void fullReindex() {
clearQueues();
try {
// Sleep to allow {@link ElasticUpdaterProcessor} to finish
Thread.sleep(500);
} catch (InterruptedException e) {
}
// Delete index manually if you have to with: $ curl -XDELETE
// http://localhost:9200/genesys
// LOG.info("Deleting index");
// // This fails if an update happens in the meantime
// elasticsearchTemplate.deleteIndex(AccessionDetails.class);
LOG.info("Creating index");
elasticsearchTemplate.createIndex(AccessionDetails.class);
LOG.info("Putting mapping");
elasticsearchTemplate.putMapping(AccessionDetails.class);
LOG.info("Refreshing");
elasticsearchTemplate.refresh(AccessionDetails.class, true);
Map<?, ?> setting = elasticsearchTemplate.getSetting(AccessionDetails.class);
for (Object k : setting.keySet()) {
LOG.info("es setting " + k + " = " + setting.get(k));
}
Map<?, ?> mapping = elasticsearchTemplate.getMapping(AccessionDetails.class);
for (Object k : mapping.keySet()) {
LOG.info("es mapping " + k + " = " + mapping.get(k));
}
int page = 0, size = 100;
List<IndexQuery> queries = new ArrayList<IndexQuery>();
do {
if ((page * size) % 10000 == 0) {
LOG.info("Full reindex enqueue progress " + (page * size));
try {
// Sleep to allow {@link ElasticUpdaterProcessor} to finish
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
queries.clear();
List<Long> accessions = genesysService.listAccessionsIds(new PageRequest(page, size));
if (accessions.isEmpty()) {
LOG.info("No more content");
break;
}
page++;
updateAll(Accession.class, accessions.toArray(ArrayUtils.EMPTY_LONG_OBJECT_ARRAY));
} while (true);
elasticsearchTemplate.refresh(AccessionDetails.class, true);
LOG.info("Done.");
}
public void clearQueues() {
elasticRemoveQueue.clear();
elasticUpdateQueue.clear();
}
public static class ElasticNode implements Serializable {
private final String className;
private final long id;
......
......@@ -75,7 +75,7 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
}
if (i % 100 == 0) {
LOG.info("Queue size remove=" + elasticRemoveQueue.size() + " update=" + elasticUpdateQueue.size());
LOG.info("Queue size remove=" + elasticRemoveQueue.size() + " update=" + elasticUpdateQueue.size() + " deleted=" + i);
// if (!updatedIndices.isEmpty()) {
// updatedIndices.clear();
......@@ -99,7 +99,7 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
}
if (i % 100 == 0) {
LOG.info("Queue size remove=" + elasticRemoveQueue.size() + " update=" + elasticUpdateQueue.size());
LOG.info("Queue size remove=" + elasticRemoveQueue.size() + " update=" + elasticUpdateQueue.size() + " indexed=" + i);
// if (!updatedIndices.isEmpty()) {
// updatedIndices.clear();
......@@ -136,6 +136,7 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
} catch (InterruptedException e) {
}
}
LOG.info("Finished");
}
private void updateNode(ElasticNode toUpdate) {
......@@ -170,6 +171,7 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
@Override
public void destroy() throws Exception {
LOG.info("Stopping worker");
this.running = false;
this.worker.interrupt();
}
......
......@@ -30,13 +30,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.genesys2.server.persistence.domain.GenesysLowlevelRepository;
import org.genesys2.server.service.CountryNamesUpdater;
import org.genesys2.server.service.ElasticService;
import org.genesys2.server.service.GenesysService;
import org.genesys2.server.service.GeoService;
import org.genesys2.server.service.InstituteService;
import org.genesys2.server.service.MappingService;
import org.genesys2.server.service.impl.ContentSanitizer;
import org.genesys2.server.service.worker.AccessionStorageScanner;
import org.genesys2.server.service.worker.ElasticUpdater;
import org.genesys2.server.service.worker.ITPGRFAStatusUpdater;
import org.genesys2.server.service.worker.InstituteUpdater;
import org.genesys2.server.service.worker.SGSVUpdate;
......@@ -65,7 +65,7 @@ public class AdminController {
CountryNamesUpdater alternateNamesUpdater;
@Autowired
ElasticService elasticService;
ElasticUpdater elasticUpdater;
@Autowired
GenesysService genesysService;
......@@ -126,12 +126,18 @@ public class AdminController {
}
@RequestMapping(method = RequestMethod.POST, value = "/reindex-elastic")
public String reindexElastic(@RequestParam(value="jsonFilter", required=false) String jsonFilter, @RequestParam(value="slow", required=true, defaultValue="true") boolean slow) {
LOG.info("Json filter: " + jsonFilter);
elasticService.reindexByFilter(jsonFilter, slow);
public String reindexElastic(@RequestParam(value="startAt", required=false) Long startAt, @RequestParam(value="slow", required=true, defaultValue="true") boolean slow) {
// LOG.info("Json filter: " + jsonFilter);
elasticUpdater.fullReindex();
return "redirect:/admin/";
}
@RequestMapping(method = RequestMethod.POST, value = "/clear-queues")
public String clearElasticQueues() {
elasticUpdater.clearQueues();
return "redirect:/admin/";
}
@RequestMapping(method = RequestMethod.POST, value = "/updateAccessionCountryRefs")
public String updateAccessionCountryRefs() {
genesysService.updateAccessionCountryRefs();
......
......@@ -20,7 +20,7 @@
</form>
<form method="post" action="<c:url value="/admin/reindex-elastic" />">
<input type="text" name="jsonFilter" />
<input type="text" name="startAt" disabled="true" />
<label>
<input type="checkbox" name="slow" value="false" /> No sleep
</label>
......@@ -29,6 +29,12 @@
<input type="hidden" name="${_csrf.parameterName}" value="${_csrf.token}"/>
</form>
<form method="post" action="<c:url value="/admin/clear-queues" />">
<input type="submit" class="btn btn-default" value="Clear ES update queues" />
<!-- CSRF protection -->
<input type="hidden" name="${_csrf.parameterName}" value="${_csrf.token}"/>
</form>
<h3>Country data</h3>
<form method="post" action="<c:url value="/admin/refreshCountries" />">
<input type="submit" class="btn btn-default" value="Refresh country data" />
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment