Commit 9d0edef9 authored by Matija Obreza's avatar Matija Obreza
Browse files

Using aliases for ES indexes

parent 90f566ea
......@@ -28,6 +28,9 @@ import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldIndex;
import org.springframework.data.elasticsearch.annotations.FieldType;
/**
* <code>AccessionDetails</code> is used in Elasticsearch mapping
*/
@Document(indexName = "genesys", type = "mcpd", refreshInterval = "60s")
public class AccessionDetails {
private static final Logger LOG = Logger.getLogger(AccessionDetails.class);
......
......@@ -46,4 +46,16 @@ public interface ElasticService {
void regenerateAccessionSequentialNumber();
/**
* Reindex part of accession database
*
* @param filters
*/
void reindex(AppliedFilters filters);
/**
* Create new indexes and fill them with data
*/
void regenerateIndexes();
}
......@@ -9,10 +9,16 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.index.query.AndFilterBuilder;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.OrFilterBuilder;
......@@ -27,9 +33,7 @@ import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.persistence.domain.GenesysLowlevelRepository;
import org.genesys2.server.service.ElasticService;
import org.genesys2.server.service.FilterConstants;
import org.genesys2.server.service.GenesysFilterService;
import org.genesys2.server.service.GenesysService;
import org.genesys2.server.service.OrganizationService;
import org.genesys2.server.service.impl.FilterHandler.AppliedFilter;
import org.genesys2.server.service.impl.FilterHandler.AppliedFilters;
import org.genesys2.server.service.impl.FilterHandler.FilterValue;
......@@ -38,6 +42,7 @@ import org.genesys2.server.service.impl.FilterHandler.MaxValueFilter;
import org.genesys2.server.service.impl.FilterHandler.MinValueFilter;
import org.genesys2.server.service.impl.FilterHandler.StartsWithFilter;
import org.genesys2.server.service.impl.FilterHandler.ValueRangeFilter;
import org.genesys2.server.service.worker.ElasticUpdater;
import org.genesys2.util.NumberUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -45,11 +50,13 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.ElasticsearchException;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.FacetedPage;
import org.springframework.data.elasticsearch.core.facet.FacetRequest;
import org.springframework.data.elasticsearch.core.facet.request.TermFacetRequestBuilder;
import org.springframework.data.elasticsearch.core.facet.result.TermResult;
import org.springframework.data.elasticsearch.core.query.AliasQuery;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
......@@ -58,29 +65,36 @@ import org.springframework.data.elasticsearch.core.query.UpdateQueryBuilder;
import org.springframework.jdbc.core.RowCallbackHandler;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hazelcast.core.ILock;
@Service
public class ElasticsearchSearchServiceImpl implements ElasticService, InitializingBean {
private static final Log LOG = LogFactory.getLog(ElasticsearchSearchServiceImpl.class);
private static final String INDEXALIAS_PASSPORT_READ = "passport";
private static final String INDEXALIAS_PASSPORT_WRITE = "passportWrite";
private static final String INDEXALIAS_ARCHIVE_READ = "archive";
private static final String INDEXALIAS_ARCHIVE_WRITE = "archiveWrite";
@Autowired
private Client client;
@Autowired
private ElasticsearchTemplate elasticsearchTemplate;
@Autowired
private GenesysService genesysService;
// @Autowired
// private OrganizationService organizationService;
// @Autowired
// private GenesysFilterService filterService;
@Autowired
private FilterHandler filterHandler;
// @Autowired
// private ObjectMapper objectMapper;
@Autowired
private ElasticUpdater elasticUpdater;
@Resource
private ILock elasticsearchAdminLock;
private final Map<String, Class<?>> clazzMap;
......@@ -95,7 +109,7 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
@Override
public Page<AccessionDetails> search(String query, Pageable pageable) throws SearchException {
SearchQuery searchQuery = new NativeSearchQueryBuilder()
SearchQuery searchQuery = new NativeSearchQueryBuilder().withIndices(ElasticsearchSearchServiceImpl.INDEXALIAS_PASSPORT_READ).withTypes("mcpd")
.withQuery(org.elasticsearch.index.query.QueryBuilders.queryString(query).defaultOperator(Operator.AND)).withPageable(pageable).build();
try {
......@@ -108,7 +122,7 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
@Override
public List<String> autocompleteSearch(String query) throws SearchException {
SearchQuery searchQuery = new NativeSearchQueryBuilder()
SearchQuery searchQuery = new NativeSearchQueryBuilder().withIndices(ElasticsearchSearchServiceImpl.INDEXALIAS_PASSPORT_READ).withTypes("mcpd")
.withQuery(org.elasticsearch.index.query.QueryBuilders.queryString("acceNumb:(" + query + "*)").defaultOperator(Operator.AND))
.withSort(SortBuilders.fieldSort(FilterConstants.ACCENUMB).order(SortOrder.ASC)).withPageable(new PageRequest(0, 10)).build();
......@@ -130,7 +144,8 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
AndFilterBuilder filterBuilder = getFilterBuilder(appliedFilters);
SortBuilder sortBuilder = SortBuilders.fieldSort(FilterConstants.ACCENUMB).order(SortOrder.ASC);
SearchQuery searchQuery = new NativeSearchQueryBuilder().withFilter(filterBuilder).withSort(sortBuilder).withPageable(pageable).build();
SearchQuery searchQuery = new NativeSearchQueryBuilder().withIndices(ElasticsearchSearchServiceImpl.INDEXALIAS_PASSPORT_READ).withTypes("mcpd")
.withFilter(filterBuilder).withSort(sortBuilder).withPageable(pageable).build();
try {
Page<AccessionDetails> sampleEntities = elasticsearchTemplate.queryForPage(searchQuery, AccessionDetails.class);
......@@ -153,7 +168,8 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
} else {
termFacetRequest = new TermFacetRequestBuilder("f").applyQueryFilter().fields(term).size(size).build();
}
SearchQuery searchQuery = new NativeSearchQueryBuilder().withFilter(filterBuilder).withFacet(termFacetRequest).build();
SearchQuery searchQuery = new NativeSearchQueryBuilder().withIndices(ElasticsearchSearchServiceImpl.INDEXALIAS_PASSPORT_READ).withTypes("mcpd")
.withFilter(filterBuilder).withFacet(termFacetRequest).build();
try {
FacetedPage<AccessionDetails> page = elasticsearchTemplate.queryForPage(searchQuery, AccessionDetails.class);
......@@ -315,11 +331,6 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
return;
}
// TODO Isn't that handled above?
if (!Accession.class.getName().equals(className)) {
LOG.warn("Unsupported class " + className);
return;
}
if (ids.isEmpty()) {
LOG.info("Skipping empty updateAll.");
return;
......@@ -331,14 +342,14 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
List<IndexQuery> queries = new ArrayList<IndexQuery>();
List<AccessionDetails> ads = genesysService.getAccessionDetails(ids);
// If Accession, update PDCI
genesysService.updatePDCI(ads);
for (AccessionDetails ad : ads) {
if (ad == null)
continue; // Skip null id
IndexQuery iq = new IndexQuery();
iq.setIndexName(ElasticsearchSearchServiceImpl.INDEXALIAS_PASSPORT_WRITE);
iq.setType("mcpd");
iq.setId(String.valueOf(ad.getId()));
iq.setObject(ad);
......@@ -359,7 +370,13 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
return;
}
LOG.info("Removing from index " + clazz2 + " id=" + id);
elasticsearchTemplate.delete(clazz2, String.valueOf(id));
if (clazz2.equals(AccessionDetails.class)) {
elasticsearchTemplate.delete(ElasticsearchSearchServiceImpl.INDEXALIAS_PASSPORT_WRITE, "mcpd", String.valueOf(id));
} else {
// Default
elasticsearchTemplate.delete(clazz2, String.valueOf(id));
}
}
@Override
......@@ -382,7 +399,6 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
elasticsearchTemplate.putMapping(AccessionDetails.class);
} catch (Throwable e) {
LOG.error("Mapping mismatch. Need to reindex.");
reindexExistingData(AccessionDetails.class);
}
LOG.info("Refreshing");
elasticsearchTemplate.refresh(AccessionDetails.class, true);
......@@ -396,21 +412,129 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
LOG.info("Copying mapping to genesysarchive");
elasticsearchTemplate.putMapping("genesysarchive", "mcpd", indexMapping);
}
if (!aliasExists(ElasticsearchSearchServiceImpl.INDEXALIAS_PASSPORT_READ)) {
realias(INDEXALIAS_PASSPORT_READ, "genesys");
}
if (!aliasExists(ElasticsearchSearchServiceImpl.INDEXALIAS_PASSPORT_WRITE)) {
realias(INDEXALIAS_PASSPORT_WRITE, "genesys");
}
if (!aliasExists(INDEXALIAS_ARCHIVE_READ)) {
realias(INDEXALIAS_ARCHIVE_READ, "genesysarchive");
}
if (!aliasExists(INDEXALIAS_ARCHIVE_WRITE)) {
realias(INDEXALIAS_ARCHIVE_WRITE, "genesysarchive");
}
}
/**
* Check if ES alias exists and points to some index
*
* @param aliasName
* @return true if alias exists
*/
private boolean aliasExists(String aliasName) {
if (LOG.isDebugEnabled()) {
LOG.debug("Checking for ES alias " + aliasName + " on client=" + client);
}
ImmutableOpenMap<String, AliasMetaData> x = client.admin().cluster().prepareState().execute().actionGet().getState().getMetaData().getAliases()
.get(aliasName);
if (x == null) {
LOG.debug("Alias does not exist");
return false;
} else {
LOG.debug("Got something back " + x);
return x.keysIt().hasNext();
}
}
/**
* Create a new index based on current timestamp, reindex it, change alias
* to point to index
*/
@Override
public void regenerateIndexes() throws ElasticsearchException {
try {
if (elasticsearchAdminLock.tryLock(10, TimeUnit.SECONDS)) {
long time = System.currentTimeMillis();
String passportIndexName = ElasticsearchSearchServiceImpl.INDEXALIAS_PASSPORT_READ + time;
// String archiveIndexName = "archive" + time;
Map<?, ?> indexMapping = elasticsearchTemplate.getMapping(AccessionDetails.class);
Map<?, ?> settings = elasticsearchTemplate.getSetting(AccessionDetails.class);
if (elasticsearchTemplate.indexExists(passportIndexName)) {
throw new ElasticsearchException("Index already exists with name " + passportIndexName);
}
// if (elasticsearchTemplate.indexExists(archiveIndexName)) {
// throw new ElasticsearchException("Index already exists with
// name " + passportIndexName);
// }
createIndex(passportIndexName, indexMapping, settings);
realias(ElasticsearchSearchServiceImpl.INDEXALIAS_PASSPORT_WRITE, passportIndexName);
reindex(new AppliedFilters());
realias(ElasticsearchSearchServiceImpl.INDEXALIAS_PASSPORT_READ, passportIndexName);
// createIndex(archiveIndexName, indexMapping, settings);
// realias(ElasticsearchSearchServiceImpl.INDEXALIAS_ARCHIVE_WRITE,
// passportIndexName);
// reindexArchive();
// realias(INDEXALIAS_ARCHIVE_READ, archiveIndexName);
} else {
throw new ElasticsearchException("Could not acquire elasticsearchAdminLock lock");
}
} catch (InterruptedException e) {
} finally {
elasticsearchAdminLock.unlock();
}
}
// TODO FIXME Don't delete index; use aliases!
private void reindexExistingData(Class<?> clazz) {
LOG.warn("Deleting index for class=" + clazz);
elasticsearchTemplate.deleteIndex(clazz);
// Make new index
LOG.warn("Creating new index for class=" + clazz);
elasticsearchTemplate.createIndex(clazz);
elasticsearchTemplate.putMapping(clazz);
/**
* Make the alias point exclusively to the specified index
*
* @param aliasName
* The alias name
* @param indexName
* The index the alias points to
*/
private void realias(String aliasName, String indexName) {
if (LOG.isDebugEnabled())
LOG.debug("Loading alias definition for " + aliasName);
ImmutableOpenMap<String, AliasMetaData> x = client.admin().cluster().prepareState().execute().actionGet().getState().getMetaData().getAliases()
.get(aliasName);
if (x != null) {
final Set<String> allIndices = new HashSet<>();
x.keysIt().forEachRemaining(allIndices::add);
// TODO Scan & Scroll
// TODO Re-alias
for (String aliasIndex : allIndices) {
AliasQuery query = new AliasQuery();
query.setAliasName(aliasName);
query.setIndexName(aliasIndex);
// But... For now, don't do anything
LOG.info("Removing alias " + aliasName + " from index " + aliasIndex);
elasticsearchTemplate.removeAlias(query);
}
}
AliasQuery query = new AliasQuery();
query.setAliasName(aliasName);
query.setIndexName(indexName);
LOG.info("Adding alias " + aliasName + " to index " + indexName);
elasticsearchTemplate.addAlias(query);
}
private void createIndex(String indexName, Map<?, ?> indexMapping, Map<?, ?> settings) {
LOG.info("Creating index " + indexName);
elasticsearchTemplate.createIndex(indexName, settings);
LOG.info("Copying mapping to genesysarchive");
elasticsearchTemplate.putMapping(indexName, "mcpd", indexMapping);
}
@Override
......@@ -458,9 +582,11 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
try {
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(ElasticsearchSearchServiceImpl.INDEXALIAS_PASSPORT_WRITE).type("mcpd");
indexRequest.source("seqNo", o[2]);
UpdateQuery updateQuery = new UpdateQueryBuilder().withClass(AccessionDetails.class).withId(o[0].toString()).withIndexRequest(indexRequest)
.build();
UpdateQuery updateQuery = new UpdateQueryBuilder().withClass(AccessionDetails.class)
.withIndexName(ElasticsearchSearchServiceImpl.INDEXALIAS_PASSPORT_WRITE).withType("mcpd").withId(o[0].toString())
.withIndexRequest(indexRequest).build();
queries.add(updateQuery);
// LOG.debug("ES added seqNo to " + o[0].toString());
......@@ -482,4 +608,20 @@ public class ElasticsearchSearchServiceImpl implements ElasticService, Initializ
}
}
@Override
public void reindex(AppliedFilters filters) {
genesysLowlevelRepository.listAccessionIds(filters, null, new RowCallbackHandler() {
@Override
public void processRow(ResultSet rs) throws SQLException {
long accessionId = rs.getLong(1);
elasticUpdater.update(Accession.class, accessionId);
}
});
LOG.info("Done.");
}
}
......@@ -25,14 +25,10 @@ import java.util.Set;
import javax.annotation.Resource;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.genesys2.server.model.elastic.AccessionDetails;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.service.GenesysService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.ElasticsearchException;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
......@@ -53,9 +49,6 @@ public class ElasticUpdater {
@Autowired
private ElasticsearchTemplate elasticsearchTemplate;
@Autowired
private GenesysService genesysService;
@Resource
private IQueue<ElasticNode> elasticRemoveQueue;
@Resource
......@@ -137,63 +130,6 @@ public class ElasticUpdater {
return set;
}
public void fullReindex() {
clearQueues();
try {
// Sleep to allow {@link ElasticUpdaterProcessor} to finish
Thread.sleep(1000);
} catch (InterruptedException e) {
}
// Delete index manually if you have to with: $ curl -XDELETE
// http://localhost:9200/genesys
// LOG.info("Deleting index");
// // This fails if an update happens in the meantime
// elasticsearchTemplate.deleteIndex(AccessionDetails.class);
LOG.info("Creating index");
elasticsearchTemplate.createIndex(AccessionDetails.class);
LOG.info("Putting mapping");
elasticsearchTemplate.putMapping(AccessionDetails.class);
LOG.info("Refreshing");
elasticsearchTemplate.refresh(AccessionDetails.class, true);
Map<?, ?> setting = elasticsearchTemplate.getSetting(AccessionDetails.class);
for (Object k : setting.keySet()) {
LOG.info("es setting " + k + " = " + setting.get(k));
}
Map<?, ?> mapping = elasticsearchTemplate.getMapping(AccessionDetails.class);
for (Object k : mapping.keySet()) {
LOG.info("es mapping " + k + " = " + mapping.get(k));
}
int page = 0, size = 100;
do {
if ((page * size) % 10000 == 0) {
LOG.info("Full reindex enqueue progress " + (page * size));
try {
// Sleep to allow {@link ElasticUpdaterProcessor} to finish
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
List<Long> accessions = genesysService.listAccessionsIds(new PageRequest(page, size));
if (accessions.isEmpty()) {
LOG.info("No more content");
break;
}
page++;
updateAll(Accession.class, accessions.toArray(ArrayUtils.EMPTY_LONG_OBJECT_ARRAY));
} while (true);
elasticsearchTemplate.refresh(AccessionDetails.class, true);
LOG.info("Done.");
}
public void clearQueues() {
elasticRemoveQueue.clear();
elasticUpdateQueue.clear();
......
......@@ -9,7 +9,6 @@ import javax.annotation.Resource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.genesys2.server.service.ElasticService;
import org.genesys2.server.service.GenesysService;
import org.genesys2.server.service.worker.ElasticUpdater.ElasticNode;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
......@@ -42,21 +41,8 @@ class ElasticUpdaterProcessor implements Runnable, InitializingBean, DisposableB
@Resource
private IQueue<ElasticNode> elasticUpdateQueue;
@Autowired
private GenesysService genesysService;
private long indexDelay = 5000;
private HashMap<String, Set<Long>> buckets = new HashMap<String, Set<Long>>();
public void setIndexDelay(long indexDelay) {
this.indexDelay = indexDelay;
}
public long getIndexDelay() {
return indexDelay;
}
@Override
public void run() {
LOG.info("Started.");
......
......@@ -30,11 +30,9 @@ import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.persistence.domain.GenesysLowlevelRepository;
import org.genesys2.server.service.CountryNamesUpdater;
import org.genesys2.server.service.ElasticService;
import org.genesys2.server.service.GenesysFilterService;
import org.genesys2.server.service.GenesysService;
import org.genesys2.server.service.GeoRegionService;
import org.genesys2.server.service.GeoService;
......@@ -47,7 +45,6 @@ import org.genesys2.server.service.worker.InstituteUpdater;
import org.genesys2.server.service.worker.SGSVUpdate;
import org.genesys2.server.service.worker.WorldClimUpdater;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.core.RowCallbackHandler;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Controller;
......@@ -72,6 +69,7 @@ public class AdminController {
@Autowired
ElasticUpdater elasticUpdater;
@Autowired
ElasticService elasticService;
......@@ -96,18 +94,14 @@ public class AdminController {
@Autowired
private GenesysLowlevelRepository genesysLowlevelRepository;
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private GenesysFilterService filterService;
@Autowired
private WorldClimUpdater worldClimUpdater;
@Autowired
private GeoRegionService geoRegionService;
ObjectMapper mapper = new ObjectMapper();
@RequestMapping("/")
public String root(Model model) {
return "/admin/index";
......@@ -133,37 +127,38 @@ public class AdminController {
return "redirect:/admin/";
}
@RequestMapping(method = RequestMethod.POST, value = "/reindex-elastic")
public String reindexElastic(@RequestParam(value = "startAt", required = false) Long startAt,
@RequestParam(value = "slow", required = true, defaultValue = "true") boolean slow) {
// LOG.info("Json filter: " + jsonFilter);
elasticUpdater.fullReindex();
/**
* Completely recreate Elasticsearch indexes: create, index, re-alias.
*
* @return
*/
@RequestMapping(method = RequestMethod.POST, value = "/elastic", params = { "regenerate" })
public String regenerateElastic() {
elasticService.regenerateIndexes();
return "redirect:/admin/";
}
@RequestMapping(method = RequestMethod.POST, value = "/reindex-elastic", params = { "filter" })
public String reindexElasticFiltered(@RequestParam(value = "filter", required = true) String jsonFilter,
@RequestParam(value = "slow", required = false, defaultValue = "true") boolean slow) throws IOException {
ObjectMapper mapper = new ObjectMapper();
/**
* This method refreshes data in the currently active index. It is very
* handy when having to refresh part of ES after direct database update.
*
* @param jsonFilter
* @throws IOException
*/