Commit 127ef78e authored by Aleksandr Sharaban's avatar Aleksandr Sharaban

Feature #31361. Full-text search - refactored.

parent 1c7cf7fc
package org.genesys2.server.service;
import org.genesys2.server.service.impl.FilterHandler.AppliedFilters;
import org.springframework.data.elasticsearch.ElasticsearchException;
public interface ElasticSearchManagementService {
/**
* Move alias to a different index
*
* @param aliasName
* @param indexName
*/
void realias(String aliasName, String indexName);
/**
* Delete an ES alias
*
* @param aliasName
*/
void deleteAlias(String aliasName);
/**
* Checks if alias exists
*
* @param aliasName
*/
boolean aliasExists(String aliasName);
/**
* Refreshes specified index
*
* @param className
*/
void refreshIndex(String className);
/**
* Create new indexes and fill them with data
*/
void regenerateIndexes(String indexName) throws ElasticsearchException;
/**
* Reindex part of accession database
*
* @param filters
*/
void reindex(AppliedFilters filters);
/**
* Reindex part of accession database
*
* @param type
*/
void reindex(String type);
/**
* Delete an unused index (no aliases)
*
* @param indexName
*/
void deleteIndex(String indexName);
}
......@@ -34,8 +34,6 @@ public interface ElasticService {
void updateAll(String className, Collection<Long> bucket);
void refreshIndex(String className);
Page<AccessionDetails> filter(AppliedFilters appliedFilters, Pageable pageable) throws SearchException;
TermResult termStatistics(AppliedFilters appliedFilters, String term, int size) throws SearchException;
......@@ -45,39 +43,4 @@ public interface ElasticService {
List<String> autocompleteSearch(String query) throws SearchException;
void regenerateAccessionSequentialNumber();
/**
* Reindex part of accession database
*
* @param filters
*/
void reindex(AppliedFilters filters);
/**
* Create new indexes and fill them with data
*/
void regenerateIndexes();
/**
* Move alias to a different index
*
* @param aliasName
* @param indexName
*/
void realias(String aliasName, String indexName);
/**
* Delete an ES alias
*
* @param aliasName
*/
void deleteAlias(String aliasName);
/**
* Delete an unused index (no aliases)
*
* @param indexName
*/
void deleteIndex(String indexName);
}
......@@ -22,7 +22,6 @@ import org.genesys2.server.model.elastic.FullTextDocument;
import org.genesys2.server.service.impl.SearchException;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.ElasticsearchException;
public interface FullTextSearchService {
......@@ -34,15 +33,5 @@ public interface FullTextSearchService {
void updateAll(String className, Collection<Long> bucket);
void realias(String aliasName, String indexName);
void deleteAlias(String aliasName);
void regenerateIndexes() throws ElasticsearchException;
void reindex(String type);
void deleteIndex(String indexName);
Page<?> search(String query, Pageable pageable, Class<?> type) throws SearchException;
}
package org.genesys2.server.service;
public interface IndexAliasConstants {
String PASSPORT_TYPE = "mcpd";
String INDEX_FULLTEXT = "fulltext";
String INDEX_PASSPORT = "passport";
String INDEX_GENESYS = "genesys";
String INDEX_GENESYS_ARCHIVE = "genesysarchive";
String INDEXALIAS_FULLTEXT_READ = "fulltextRead";
String INDEXALIAS_FULLTEXT_WRITE = "fulltextWrite";
String INDEXALIAS_PASSPORT_READ = "passport";
String INDEXALIAS_PASSPORT_WRITE = "passportWrite";
}
package org.genesys2.server.service.impl;
import com.hazelcast.core.ILock;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.genesys2.server.model.BusinessModel;
import org.genesys2.server.model.elastic.AccessionDetails;
import org.genesys2.server.model.elastic.FullTextDocument;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.model.impl.ActivityPost;
import org.genesys2.server.model.impl.Article;
import org.genesys2.server.model.impl.Country;
import org.genesys2.server.model.impl.FaoInstitute;
import org.genesys2.server.persistence.domain.*;
import org.genesys2.server.service.ElasticSearchManagementService;
import org.genesys2.server.service.IndexAliasConstants;
import org.genesys2.server.service.impl.FilterHandler.AppliedFilters;
import org.genesys2.server.service.worker.ElasticUpdater;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.elasticsearch.ElasticsearchException;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.AliasQuery;
import org.springframework.jdbc.core.RowCallbackHandler;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Service
public class ElasticSearchManagementServiceImpl implements ElasticSearchManagementService {
private static final Log LOG = LogFactory.getLog(ElasticSearchManagementServiceImpl.class);
private static final String REINDEX_TYPE_ALL = "All";
@Autowired
private Client client;
@Autowired
private ElasticsearchTemplate elasticsearchTemplate;
@Autowired
private ElasticUpdater elasticUpdater;
@Autowired
@Qualifier("genesysLowlevelRepositoryCustomImpl")
private GenesysLowlevelRepository genesysLowlevelRepository;
@Resource
private ILock elasticsearchAdminLock;
private final Map<String, Class<?>> clazzMap;
public ElasticSearchManagementServiceImpl() {
clazzMap = new HashMap<String, Class<?>>();
clazzMap.put(Article.class.getName(), FullTextDocument.class);
clazzMap.put(ActivityPost.class.getName(), FullTextDocument.class);
clazzMap.put(Country.class.getName(), FullTextDocument.class);
clazzMap.put(FaoInstitute.class.getName(), FullTextDocument.class);
}
/**
* Make the alias point exclusively to the specified index
*
* @param aliasName
* The alias name
* @param indexName
* The index the alias points to
*/
@Override
public void realias(final String aliasName, final String indexName) {
if (LOG.isDebugEnabled())
LOG.debug("Loading alias definition for " + aliasName);
deleteAlias(aliasName);
final AliasQuery query = new AliasQuery();
query.setAliasName(aliasName);
query.setIndexName(indexName);
LOG.info("Adding alias " + aliasName + " to index " + indexName);
this.elasticsearchTemplate.addAlias(query);
}
@Override
public void deleteAlias(final String aliasName) {
final ImmutableOpenMap<String, AliasMetaData> x = this.client.admin().cluster().prepareState()
.execute().actionGet().getState().getMetaData()
.getAliases().get(aliasName);
if (x != null) {
final Set<String> allIndices = new HashSet<>();
x.keysIt().forEachRemaining(allIndices::add);
for (final String aliasIndex : allIndices) {
final AliasQuery query = new AliasQuery();
query.setAliasName(aliasName);
query.setIndexName(aliasIndex);
LOG.info("Removing alias " + aliasName + " from index " + aliasIndex);
this.elasticsearchTemplate.removeAlias(query);
}
}
}
@Override
public void refreshIndex(String className) {
Class<?> clazz2 = clazzMap.get(className);
if (clazz2 == null) {
return;
}
LOG.info("Refreshing index " + clazz2);
elasticsearchTemplate.refresh(clazz2, true);
}
/**
* Create a new data index based on current timestamp, reindex it,
* change alias to point to index
*/
@Override
public void regenerateIndexes(final String indexName) throws ElasticsearchException {
Class clazz = null;
String readAlias = null;
String writeAlias = null;
if (IndexAliasConstants.INDEX_FULLTEXT.equals(indexName)) {
clazz = FullTextDocument.class;
readAlias = IndexAliasConstants.INDEXALIAS_FULLTEXT_READ;
writeAlias = IndexAliasConstants.INDEXALIAS_FULLTEXT_WRITE;
} else if (IndexAliasConstants.INDEX_PASSPORT.equals(indexName)) {
clazz = AccessionDetails.class;
readAlias = IndexAliasConstants.INDEXALIAS_PASSPORT_READ;
writeAlias = IndexAliasConstants.INDEXALIAS_PASSPORT_WRITE;
}
if (clazz != null) {
try {
if (this.elasticsearchAdminLock.tryLock(10, TimeUnit.SECONDS)) {
final long time = System.currentTimeMillis();
final String fullTextIndexName = indexName + time;
final Map<?, ?> indexMapping = this.elasticsearchTemplate.getMapping(clazz);
final Map<?, ?> settings = this.elasticsearchTemplate.getSetting(clazz);
if (this.elasticsearchTemplate.indexExists(fullTextIndexName)) {
throw new ElasticsearchException("Index already exists with name " + fullTextIndexName);
}
createIndex(fullTextIndexName, indexMapping, settings);
realias(writeAlias, fullTextIndexName);
if (IndexAliasConstants.INDEX_FULLTEXT.equals(indexName)) {
reindex(REINDEX_TYPE_ALL);
} else if (IndexAliasConstants.INDEX_PASSPORT.equals(indexName)) {
reindex(new AppliedFilters());
}
realias(readAlias, fullTextIndexName);
} else {
throw new ElasticsearchException("Could not acquire elasticsearchAdminLock lock");
}
} catch (final InterruptedException e) {
} finally {
this.elasticsearchAdminLock.unlock();
}
}
}
@Override
public void reindex(AppliedFilters filters) {
final List<Long> ids = new ArrayList<>(100);
genesysLowlevelRepository.listAccessionIds(filters, null, new RowCallbackHandler() {
private long count = 0;
@Override
public void processRow(ResultSet rs) throws SQLException {
count++;
add(rs.getLong(1));
}
private void add(long accessionId) {
if (ids.size() >= 100) {
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling reindex of batch " + (count / 100) + " size=" + ids.size());
}
elasticUpdater.updateAll(Accession.class, ids.toArray(ArrayUtils.EMPTY_LONG_OBJECT_ARRAY));
ids.clear();
}
ids.add(accessionId);
}
});
if (ids.size() >= 100) {
// Final kick
elasticUpdater.updateAll(Accession.class, ids.toArray(ArrayUtils.EMPTY_LONG_OBJECT_ARRAY));
}
LOG.info("Done.");
}
@Override
public void reindex(String type) {
final List<Long> ids = new ArrayList<>(100);
final List<String> typesToReindex = new ArrayList<>();
if (type.equals(REINDEX_TYPE_ALL)) {
typesToReindex.addAll(this.clazzMap.keySet());
} else {
typesToReindex.add(type);
}
for (final String className : typesToReindex) {
Class clazz;
try {
clazz = Class.forName(className);
} catch (final ClassNotFoundException e) {
LOG.error("Class is not found", e);
return;
}
this.genesysLowlevelRepository.listFullTextSearchEntitiesIds(clazz.getSimpleName(),
new CustomRowCallbackHandler(clazz, ids));
this.elasticUpdater.updateAll(clazz, ids.toArray(ArrayUtils.EMPTY_LONG_OBJECT_ARRAY));
}
LOG.info("Done.");
}
/**
* Delete an index
*/
@Override
@PreAuthorize("hasRole('ADMINISTRATOR')")
public void deleteIndex(final String indexName) {
LOG.warn("Deleting Elasticsearch index " + indexName);
this.elasticsearchTemplate.deleteIndex(indexName);
}
/**
* Check if ES alias exists and points to some index
*
* @param aliasName
* @return true if alias exists
*/
public boolean aliasExists(final String aliasName) {
if (LOG.isDebugEnabled()) {
LOG.debug("Checking for ES alias " + aliasName + " on client=" + this.client);
}
final ImmutableOpenMap<String, AliasMetaData> x = this.client.admin().cluster().prepareState()
.execute().actionGet().getState().getMetaData().getAliases()
.get(aliasName);
if (x == null) {
LOG.debug("Alias does not exist");
return false;
} else {
LOG.debug("Got something back " + x);
return x.keysIt().hasNext();
}
}
private void createIndex(String indexName, Map<?, ?> indexMapping, Map<?, ?> settings) {
LOG.info("Creating index " + indexName);
elasticsearchTemplate.createIndex(indexName, settings);
LOG.info("Copying mapping to genesysarchive");
elasticsearchTemplate.putMapping(indexName, IndexAliasConstants.PASSPORT_TYPE, indexMapping);
}
class CustomRowCallbackHandler implements RowCallbackHandler {
private long count = 0;
private final Class<? extends BusinessModel> clazz;
private final List<Long> ids;
public CustomRowCallbackHandler(final Class<? extends BusinessModel> clazz, final List<Long> ids) {
this.clazz = clazz;
this.ids = ids;
}
@Override
public void processRow(final ResultSet rs) throws SQLException {
this.count++;
add(rs.getLong(1));
}
private void add(final long entityId) {
if (this.ids.size() >= 100) {
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling reindex of batch " + (this.count / 100) + " size=" + this.ids.size());
}
ElasticSearchManagementServiceImpl.this.elasticUpdater
.updateAll(this.clazz, this.ids.toArray(ArrayUtils.EMPTY_LONG_OBJECT_ARRAY));
this.ids.clear();
}
this.ids.add(entityId);
}
}
}
......@@ -29,13 +29,12 @@ import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.model.impl.ActivityPost;
import org.genesys2.server.model.impl.Article;
import org.genesys2.server.model.impl.Country;
import org.genesys2.server.model.impl.FaoInstitute;
import org.genesys2.server.service.ElasticService;
import org.genesys2.server.service.FullTextSearchService;
import org.genesys2.server.service.ElasticSearchManagementService;
import org.genesys2.server.service.IndexAliasConstants;
import org.genesys2.server.service.impl.FilterHandler;
import org.genesys2.server.service.worker.ElasticUpdater;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -58,10 +57,7 @@ public class ElasticSearchController {
private ElasticUpdater elasticUpdater;
@Autowired
private ElasticService elasticService;
@Autowired
private FullTextSearchService fullTextSearchService;
private ElasticSearchManagementService elasticSearchManagementService;
@Autowired
private Client client;
......@@ -103,7 +99,7 @@ public class ElasticSearchController {
*/
@RequestMapping(method = RequestMethod.POST, value = "/action", params = { "regenerate=accn" })
public String regenerateElastic() {
elasticService.regenerateIndexes();
elasticSearchManagementService.regenerateIndexes(IndexAliasConstants.INDEX_PASSPORT);
return "redirect:/admin/elastic/";
}
......@@ -114,7 +110,7 @@ public class ElasticSearchController {
*/
@RequestMapping(method = RequestMethod.POST, value = "/action", params = { "regenerate=content" })
public String regenerateElasticContent() {
fullTextSearchService.regenerateIndexes();
elasticSearchManagementService.regenerateIndexes(IndexAliasConstants.INDEX_FULLTEXT);
return "redirect:/admin/elastic/";
}
......@@ -130,7 +126,7 @@ public class ElasticSearchController {
FilterHandler.AppliedFilters filters = mapper.readValue(jsonFilter, FilterHandler.AppliedFilters.class);
elasticService.reindex(filters);
elasticSearchManagementService.reindex(filters);
return "redirect:/admin/elastic/";
}
......@@ -139,16 +135,16 @@ public class ElasticSearchController {
* This method refreshes data in the currently active index. It is very
* handy when having to refresh part of ES after direct database update.
*
* @param jsonFilter
* @param type
* @throws IOException
*/
@RequestMapping(method = RequestMethod.POST, value = "/action", params = { "reindex=content", "type" })
public String reindexElasticContent(@RequestParam(value = "type", required = true) String type) throws IOException {
if (type.equals("All")) {
fullTextSearchService.regenerateIndexes();
elasticSearchManagementService.regenerateIndexes(IndexAliasConstants.INDEX_FULLTEXT);
} else {
fullTextSearchService.reindex(type);
elasticSearchManagementService.reindex(type);
}
return "redirect:/admin/elastic/";
......@@ -167,29 +163,25 @@ public class ElasticSearchController {
@RequestMapping(method = RequestMethod.POST, value = "/action", params = { "action=realias" })
public String moveAlias(@RequestParam(name = "aliasName") String aliasName, @RequestParam(name = "indexName") String indexName) {
elasticService.realias(aliasName, indexName);
fullTextSearchService.realias(aliasName, indexName);
elasticSearchManagementService.realias(aliasName, indexName);
return "redirect:/admin/elastic/";
}
@RequestMapping(method = RequestMethod.POST, value = "/action", params = { "action=delete-alias" })
public String deleteAlias(@RequestParam(name = "aliasName") String aliasName) {
elasticService.deleteAlias(aliasName);
fullTextSearchService.deleteAlias(aliasName);
elasticSearchManagementService.deleteAlias(aliasName);
return "redirect:/admin/elastic/";
}
@RequestMapping(method = RequestMethod.POST, value = "/action", params = { "action=delete-index", "indexName" })
public String deleteIndex(@RequestParam(name = "indexName") String indexName) {
elasticService.deleteIndex(indexName);
fullTextSearchService.deleteIndex(indexName);
elasticSearchManagementService.deleteIndex(indexName);
return "redirect:/admin/elastic/";
}
private Map<String, String> createReindexTypesMap() {
Map<String, String> reindexTypesMap = new HashMap<>();
reindexTypesMap.put(Accession.class.getSimpleName(), Accession.class.getName());
reindexTypesMap.put(Article.class.getSimpleName(), Article.class.getName());
reindexTypesMap.put(ActivityPost.class.getSimpleName(), ActivityPost.class.getName());
reindexTypesMap.put(Country.class.getSimpleName(), Country.class.getName());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment