Commit 2b17d953 authored by Matija Obreza's avatar Matija Obreza
Browse files

ES startup fixed and organized

parent b1c13105
......@@ -22,9 +22,7 @@ public interface IndexAliasConstants {
String INDEX_FULLTEXT = "fulltext";
String INDEX_PASSPORT = "passport";
String INDEX_GENESYS = "genesys";
String INDEX_PASSPORT = "genesys";
String INDEX_GENESYS_ARCHIVE = "genesysarchive";
......
......@@ -16,7 +16,18 @@
package org.genesys2.server.service.impl;
import com.hazelcast.core.ILock;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
......@@ -31,7 +42,7 @@ import org.genesys2.server.model.impl.ActivityPost;
import org.genesys2.server.model.impl.Article;
import org.genesys2.server.model.impl.Country;
import org.genesys2.server.model.impl.FaoInstitute;
import org.genesys2.server.persistence.domain.*;
import org.genesys2.server.persistence.domain.GenesysLowlevelRepository;
import org.genesys2.server.service.ElasticSearchManagementService;
import org.genesys2.server.service.IndexAliasConstants;
import org.genesys2.server.service.impl.FilterHandler.AppliedFilters;
......@@ -45,11 +56,7 @@ import org.springframework.jdbc.core.RowCallbackHandler;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import com.hazelcast.core.ILock;
@Service
public class ElasticSearchManagementServiceImpl implements ElasticSearchManagementService {
......
......@@ -135,8 +135,8 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
AndFilterBuilder filterBuilder = getFilterBuilder(appliedFilters);
SortBuilder sortBuilder = SortBuilders.fieldSort(FilterConstants.ACCENUMB).order(SortOrder.ASC);
SearchQuery searchQuery = new NativeSearchQueryBuilder().withIndices(IndexAliasConstants.INDEXALIAS_PASSPORT_READ).withTypes(IndexAliasConstants.PASSPORT_TYPE)
.withFilter(filterBuilder).withSort(sortBuilder).withPageable(pageable).build();
SearchQuery searchQuery = new NativeSearchQueryBuilder().withIndices(IndexAliasConstants.INDEXALIAS_PASSPORT_READ).withTypes(IndexAliasConstants.PASSPORT_TYPE).withFilter(filterBuilder)
.withSort(sortBuilder).withPageable(pageable).build();
try {
Page<AccessionDetails> sampleEntities = elasticsearchTemplate.queryForPage(searchQuery, AccessionDetails.class);
......@@ -159,8 +159,8 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
} else {
termFacetRequest = new TermFacetRequestBuilder("f").applyQueryFilter().fields(term).size(size).build();
}
SearchQuery searchQuery = new NativeSearchQueryBuilder().withIndices(IndexAliasConstants.INDEXALIAS_PASSPORT_READ).withTypes(IndexAliasConstants.PASSPORT_TYPE)
.withFilter(filterBuilder).withFacet(termFacetRequest).build();
SearchQuery searchQuery = new NativeSearchQueryBuilder().withIndices(IndexAliasConstants.INDEXALIAS_PASSPORT_READ).withTypes(IndexAliasConstants.PASSPORT_TYPE).withFilter(filterBuilder)
.withFacet(termFacetRequest).build();
try {
FacetedPage<AccessionDetails> page = elasticsearchTemplate.queryForPage(searchQuery, AccessionDetails.class);
......@@ -172,8 +172,7 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
}
/**
* Runs TermFacet, but will automatically increase size if #otherCount is
* more than 10%
* Runs TermFacet, but will automatically increase size if #otherCount is more than 10%
*/
@Override
public TermResult termStatisticsAuto(AppliedFilters appliedFilters, String term, int size) throws SearchException {
......@@ -295,8 +294,7 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
LOG.debug("startsWith " + startsWith);
if (genesysFilter.isAnalyzed()) {
if (FilterConstants.ALIAS.equals(key)) {
orFilter.add(FilterBuilders.nestedFilter("aliases",
QueryBuilders.queryStringQuery("aliases.name" + ":" + startsWith.getStartsWith() + "*")));
orFilter.add(FilterBuilders.nestedFilter("aliases", QueryBuilders.queryStringQuery("aliases.name" + ":" + startsWith.getStartsWith() + "*")));
} else {
orFilter.add(FilterBuilders.queryFilter(QueryBuilders.queryStringQuery(key + ":" + startsWith.getStartsWith() + "*")));
}
......@@ -365,8 +363,7 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
LOG.info("Removing from index " + clazz2 + " id=" + id);
if (clazz2.equals(AccessionDetails.class)) {
elasticsearchTemplate.delete(IndexAliasConstants.INDEXALIAS_PASSPORT_WRITE,
IndexAliasConstants.PASSPORT_TYPE, String.valueOf(id));
elasticsearchTemplate.delete(IndexAliasConstants.INDEXALIAS_PASSPORT_WRITE, IndexAliasConstants.PASSPORT_TYPE, String.valueOf(id));
} else {
// Default
elasticsearchTemplate.delete(clazz2, String.valueOf(id));
......@@ -376,12 +373,16 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
@Override
public void afterPropertiesSet() throws Exception {
LOG.info("Initializing index");
elasticsearchTemplate.createIndex(AccessionDetails.class);
LOG.info("Putting mapping");
try {
elasticsearchTemplate.putMapping(AccessionDetails.class);
} catch (Throwable e) {
LOG.error("Mapping mismatch. Need to reindex.", e);
for (int i = 0; i < 5; i++) {
try {
elasticsearchTemplate.createIndex(AccessionDetails.class);
LOG.info("Putting mapping");
elasticsearchTemplate.putMapping(AccessionDetails.class);
break;
} catch (Throwable e) {
LOG.error("Mapping mismatch. Deleting template index " + IndexAliasConstants.INDEX_PASSPORT, e);
elasticsearchTemplate.deleteIndex(IndexAliasConstants.INDEX_PASSPORT);
}
}
LOG.info("Refreshing");
elasticsearchTemplate.refresh(AccessionDetails.class, true);
......@@ -396,7 +397,7 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
elasticsearchTemplate.putMapping(IndexAliasConstants.INDEX_GENESYS_ARCHIVE, IndexAliasConstants.PASSPORT_TYPE, indexMapping);
}
String newIndex = IndexAliasConstants.INDEX_GENESYS + System.currentTimeMillis();
String newIndex = IndexAliasConstants.INDEX_PASSPORT + System.currentTimeMillis();
if (!elasticSearchManagementService.aliasExists(IndexAliasConstants.INDEXALIAS_PASSPORT_READ)) {
elasticsearchTemplate.createIndex(newIndex);
elasticsearchTemplate.putMapping(newIndex, IndexAliasConstants.PASSPORT_TYPE, indexMapping);
......@@ -458,10 +459,8 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(IndexAliasConstants.INDEXALIAS_PASSPORT_WRITE).type(IndexAliasConstants.PASSPORT_TYPE);
indexRequest.source("seqNo", o[2]);
UpdateQuery updateQuery = new UpdateQueryBuilder().withClass(AccessionDetails.class)
.withIndexName(IndexAliasConstants.INDEXALIAS_PASSPORT_WRITE).withType(IndexAliasConstants.PASSPORT_TYPE)
.withId(o[0].toString())
.withIndexRequest(indexRequest).build();
UpdateQuery updateQuery = new UpdateQueryBuilder().withClass(AccessionDetails.class).withIndexName(IndexAliasConstants.INDEXALIAS_PASSPORT_WRITE)
.withType(IndexAliasConstants.PASSPORT_TYPE).withId(o[0].toString()).withIndexRequest(indexRequest).build();
queries.add(updateQuery);
// LOG.debug("ES added seqNo to " + o[0].toString());
......
......@@ -21,7 +21,6 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
......@@ -35,9 +34,13 @@ import org.aspectj.lang.annotation.Before;
import org.genesys2.server.model.elastic.AccessionDetails;
import org.genesys2.server.model.elastic.FullTextDocument;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.model.genesys.AccessionId;
import org.genesys2.server.model.genesys.AccessionRelated;
import org.genesys2.server.model.genesys.SvalbardData;
import org.genesys2.server.model.impl.*;
import org.genesys2.server.model.impl.ActivityPost;
import org.genesys2.server.model.impl.Article;
import org.genesys2.server.model.impl.Country;
import org.genesys2.server.model.impl.CropTaxonomy;
import org.genesys2.server.model.impl.FaoInstitute;
import org.genesys2.server.service.GenesysService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......@@ -66,12 +69,12 @@ public class ElasticUpdaterAspect {
*/
@AfterReturning(value = "execution(* org.genesys2.server.service.GenesysService.save*(..))", returning = "result")
public Object afterSave(final JoinPoint joinPoint, final Object result) throws Throwable {
if (LOG.isDebugEnabled()) {
LOG.debug("Returning from " + joinPoint.toLongString());
if (LOG.isTraceEnabled()) {
LOG.trace("Returning from " + joinPoint.toLongString());
}
if (result != null) {
if (LOG.isDebugEnabled()) {
LOG.info("Result " + result.getClass());
if (LOG.isTraceEnabled()) {
LOG.trace("Result " + result.getClass());
}
if (result instanceof Collection<?>) {
Collection<?> list = (Collection<?>) result;
......@@ -79,8 +82,8 @@ public class ElasticUpdaterAspect {
return result;
for (Object o : list) {
if (LOG.isDebugEnabled()) {
LOG.debug("Indexing object " + o);
if (LOG.isTraceEnabled()) {
LOG.trace("Indexing object " + o);
}
if (o == null)
continue;
......@@ -88,10 +91,9 @@ public class ElasticUpdaterAspect {
elasticUpdater.update(Accession.class, ((Accession) o).getId());
}
if (o instanceof AccessionRelated) {
elasticUpdater.update(Accession.class, ((AccessionRelated) o).getAccession().getId());
}
if (o instanceof SvalbardData) {
elasticUpdater.update(Accession.class, ((SvalbardData) o).getId());
AccessionId acceId = ((AccessionRelated) o).getAccession();
if (acceId != null)
elasticUpdater.update(Accession.class, acceId.getId());
}
}
......@@ -110,9 +112,8 @@ public class ElasticUpdaterAspect {
*/
@AfterReturning(value = "execution(* org.genesys2.server.service.GenesysService.remove*(..))", returning = "result")
public Object afterRemove(final JoinPoint joinPoint, final Object result) throws Throwable {
if (LOG.isDebugEnabled()) {
LOG.debug("Returning from " + joinPoint.toLongString());
LOG.debug("Result " + result);
if (LOG.isTraceEnabled()) {
LOG.trace("Returning from " + joinPoint.toLongString() + " result=" + result);
}
if (result instanceof Collection<?>) {
Collection<?> list = (Collection<?>) result;
......@@ -127,10 +128,9 @@ public class ElasticUpdaterAspect {
// o).getId());
// }
if (o instanceof AccessionRelated) {
elasticUpdater.update(Accession.class, ((AccessionRelated) o).getAccession().getId());
}
if (o instanceof SvalbardData) {
elasticUpdater.update(Accession.class, ((SvalbardData) o).getId());
AccessionId acceId = ((AccessionRelated) o).getAccession();
if (acceId != null)
elasticUpdater.update(Accession.class, acceId.getId());
}
}
......@@ -148,10 +148,9 @@ public class ElasticUpdaterAspect {
* @throws Throwable
*/
@Around(value = "execution(* org.genesys2.server.service.GenesysService.removeAccessions(..)) && args(institute, accessions)")
public Object aroundRemoveAccession(final ProceedingJoinPoint joinPoint, final FaoInstitute institute, final Collection<Accession> accessions)
throws Throwable {
if (LOG.isDebugEnabled()) {
LOG.debug("Returning from " + joinPoint.toLongString());
public Object aroundRemoveAccession(final ProceedingJoinPoint joinPoint, final FaoInstitute institute, final Collection<Accession> accessions) throws Throwable {
if (LOG.isTraceEnabled()) {
LOG.trace("Returning from " + joinPoint.toLongString());
}
Collection<Long> accessionIds = new ArrayList<Long>(accessions.size());
......@@ -186,9 +185,8 @@ public class ElasticUpdaterAspect {
*/
@AfterReturning(value = "execution(* org.genesys2.server.persistence.domain.CropTaxonomyRepository.save(..))", returning = "result")
public Object afterCropTaxonomySave(final JoinPoint joinPoint, final Object result) throws Throwable {
if (LOG.isDebugEnabled()) {
LOG.debug("Returning from " + joinPoint.toLongString());
LOG.debug("Result " + result);
if (LOG.isTraceEnabled()) {
LOG.trace("Returning from " + joinPoint.toLongString() + " result=" + result);
}
// Find all accessions with taxonomies in the list
......@@ -216,8 +214,8 @@ public class ElasticUpdaterAspect {
*/
@Around(value = "execution(* org.genesys2.server.persistence.domain.CropTaxonomyRepository.delete(..)) && args(cropTaxonomies)")
public Object aroundCropTaxonomyDelete(final ProceedingJoinPoint joinPoint, final List<CropTaxonomy> cropTaxonomies) throws Throwable {
if (LOG.isDebugEnabled()) {
LOG.debug("Around " + joinPoint.toLongString());
if (LOG.isTraceEnabled()) {
LOG.trace("Around " + joinPoint.toLongString());
}
Set<Long> accessionsWithCT = new HashSet<Long>();
......@@ -253,15 +251,13 @@ public class ElasticUpdaterAspect {
*/
@AfterReturning(value = "execution(* org.genesys2.server.persistence.domain.ArticleRepository.save*(..))", returning = "result")
public Object afterSaveArticle(final JoinPoint joinPoint, final Object result) throws Throwable {
if (LOG.isDebugEnabled()) {
LOG.debug("Returning from " + joinPoint.toLongString());
if (LOG.isTraceEnabled()) {
LOG.trace("Returning from " + joinPoint.toLongString());
}
if (result != null) {
if (LOG.isDebugEnabled()) {
LOG.info("Result " + result.getClass());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Indexing object " + result);
if (LOG.isTraceEnabled()) {
LOG.trace("Result " + result.getClass());
LOG.trace("Indexing object " + result);
}
elasticUpdater.update(Article.class, ((Article) result).getId());
}
......@@ -278,15 +274,13 @@ public class ElasticUpdaterAspect {
*/
@AfterReturning(value = "execution(* org.genesys2.server.persistence.domain.ActivityPostRepository.save*(..))", returning = "result")
public Object afterSavePost(final JoinPoint joinPoint, final Object result) throws Throwable {
if (LOG.isDebugEnabled()) {
LOG.debug("Returning from " + joinPoint.toLongString());
if (LOG.isTraceEnabled()) {
LOG.trace("Returning from " + joinPoint.toLongString());
}
if (result != null) {
if (LOG.isDebugEnabled()) {
LOG.info("Result " + result.getClass());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Indexing object " + result);
if (LOG.isTraceEnabled()) {
LOG.trace("Result " + result.getClass());
LOG.trace("Indexing object " + result);
}
elasticUpdater.update(ActivityPost.class, ((ActivityPost) result).getId());
}
......@@ -303,15 +297,13 @@ public class ElasticUpdaterAspect {
*/
@AfterReturning(value = "execution(* org.genesys2.server.persistence.domain.CountryRepository.save*(..))", returning = "result")
public Object afterSaveCountry(final JoinPoint joinPoint, final Object result) throws Throwable {
if (LOG.isDebugEnabled()) {
LOG.debug("Returning from " + joinPoint.toLongString());
if (LOG.isTraceEnabled()) {
LOG.trace("Returning from " + joinPoint.toLongString());
}
if (result != null) {
if (LOG.isDebugEnabled()) {
LOG.info("Result " + result.getClass());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Indexing object " + result);
if (LOG.isTraceEnabled()) {
LOG.trace("Result " + result.getClass());
LOG.trace("Indexing object " + result);
}
elasticUpdater.update(Country.class, ((Country) result).getId());
}
......@@ -328,16 +320,16 @@ public class ElasticUpdaterAspect {
*/
@AfterReturning(value = "execution(* org.genesys2.server.persistence.domain.FaoInstituteRepository.save*(..))", returning = "result")
public Object afterSaveInstitute(final JoinPoint joinPoint, final Object result) throws Throwable {
if (LOG.isDebugEnabled()) {
LOG.debug("Returning from " + joinPoint.toLongString());
if (LOG.isTraceEnabled()) {
LOG.trace("Returning from " + joinPoint.toLongString());
}
if (result != null) {
if (LOG.isDebugEnabled()) {
LOG.info("Result " + result.getClass());
LOG.debug("Indexing object " + result);
if (LOG.isTraceEnabled()) {
LOG.trace("Result " + result.getClass());
LOG.trace("Indexing object " + result);
}
if (result instanceof Collection<?>) {
((Collection<FaoInstitute>)result).stream().forEach(inst -> elasticUpdater.update(FaoInstitute.class, inst.getId()));
((Collection<FaoInstitute>) result).stream().forEach(inst -> elasticUpdater.update(FaoInstitute.class, inst.getId()));
} else if (result instanceof FaoInstitute) {
elasticUpdater.update(FaoInstitute.class, ((FaoInstitute) result).getId());
}
......@@ -354,9 +346,8 @@ public class ElasticUpdaterAspect {
*/
@Before(value = "execution(* org.genesys2.server.persistence.domain.ArticleRepository.delete*(..))")
public void beforeRemoveArticle(final JoinPoint joinPoint) throws Throwable {
if (LOG.isDebugEnabled()) {
LOG.debug("Returning from " + joinPoint.toLongString());
LOG.debug("Id=" + (long) joinPoint.getArgs()[0]);
if (LOG.isTraceEnabled()) {
LOG.trace("Returning from " + joinPoint.toLongString() + " id=" + (long) joinPoint.getArgs()[0]);
}
elasticUpdater.remove(Article.class, (Long) joinPoint.getArgs()[0]);
}
......@@ -370,9 +361,8 @@ public class ElasticUpdaterAspect {
*/
@Before(value = "execution(* org.genesys2.server.persistence.domain.ActivityPostRepository.delete*(..))")
public void beforeRemoveActivityPost(final JoinPoint joinPoint) throws Throwable {
if (LOG.isDebugEnabled()) {
LOG.debug("Returning from " + joinPoint.toLongString());
LOG.debug("Id=" + (long) joinPoint.getArgs()[0]);
if (LOG.isTraceEnabled()) {
LOG.trace("Returning from " + joinPoint.toLongString() + " id=" + (long) joinPoint.getArgs()[0]);
}
elasticUpdater.remove(FullTextDocument.class, (Long) joinPoint.getArgs()[0]);
}
......@@ -386,9 +376,8 @@ public class ElasticUpdaterAspect {
*/
@Before(value = "execution(* org.genesys2.server.persistence.domain.CountryRepository.delete*(..))")
public void beforeRemoveCountry(final JoinPoint joinPoint) throws Throwable {
if (LOG.isDebugEnabled()) {
LOG.debug("Returning from " + joinPoint.toLongString());
LOG.debug("Id=" + (long) joinPoint.getArgs()[0]);
if (LOG.isTraceEnabled()) {
LOG.trace("Returning from " + joinPoint.toLongString() + " id=" + (long) joinPoint.getArgs()[0]);
}
elasticUpdater.remove(Country.class, (Long) joinPoint.getArgs()[0]);
}
......@@ -402,9 +391,8 @@ public class ElasticUpdaterAspect {
*/
@Before(value = "execution(* org.genesys2.server.persistence.domain.FaoInstituteRepository.delete*(..))")
public void beforeRemoveFaoInstitute(final JoinPoint joinPoint) throws Throwable {
if (LOG.isDebugEnabled()) {
LOG.debug("Returning from " + joinPoint.toLongString());
LOG.debug("Id=" + (long) joinPoint.getArgs()[0]);
if (LOG.isTraceEnabled()) {
LOG.trace("Returning from " + joinPoint.toLongString() + " id=" + (long) joinPoint.getArgs()[0]);
}
elasticUpdater.remove(FaoInstitute.class, (Long) joinPoint.getArgs()[0]);
}
......
......@@ -22,6 +22,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.client.Client;
......@@ -37,6 +39,7 @@ import org.genesys2.server.service.ElasticSearchManagementService;
import org.genesys2.server.service.IndexAliasConstants;
import org.genesys2.server.service.impl.FilterHandler;
import org.genesys2.server.service.worker.ElasticUpdater;
import org.genesys2.server.service.worker.ElasticUpdater.ElasticNode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Controller;
......@@ -46,6 +49,7 @@ import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hazelcast.core.IQueue;
@Controller
@RequestMapping("/admin/elastic")
......@@ -59,6 +63,12 @@ public class ElasticSearchController {
@Autowired
private ElasticSearchManagementService elasticSearchManagementService;
@Resource
private IQueue<ElasticNode> elasticRemoveQueue;
@Resource
private IQueue<ElasticNode> elasticUpdateQueue;
@Autowired
private Client client;
......@@ -89,6 +99,9 @@ public class ElasticSearchController {
model.addAttribute("indexes", indexMap);
model.addAttribute("reindexTypes", reindexTypes);
model.addAttribute("removeQueueSize", elasticRemoveQueue.size());
model.addAttribute("updateQueueSize", elasticUpdateQueue.size());
return "/admin/elastic/index";
}
......
......@@ -8,6 +8,10 @@
</head>
<body>
<%@ include file="/WEB-INF/jsp/admin/menu.jsp" %>
<h3>Queues</h3>
<div>Remove queue size: <c:out value="${removeQueueSize}" /></div>
<div>Update queue size: <c:out value="${updateQueueSize}" /></div>
<h3>Accession data</h3>
<form method="post" action="<c:url value="/admin/elastic/action" />">
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment