Commit 78e45a77 authored by Alexander Prendetskiy's avatar Alexander Prendetskiy Committed by Matija Obreza

update Elasticsearch, migrate services and components from Catalog

parent fbf47ce6
......@@ -412,17 +412,10 @@
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.7.6</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-elasticsearch</artifactId>
<version>1.3.2.RELEASE</version>
<version>2.1.12.RELEASE</version>
</dependency>
<dependency>
......
/*
* Copyright 2018 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys.catalog.api;
import org.elasticsearch.index.query.QueryBuilder;
import org.genesys.catalog.model.filters.DatasetFilter;
import org.genesys.catalog.service.ElasticsearchFilter;
/**
* Wrapper for {@link DatasetFilter} to produce ES queries.
*
* @author Matija Obreza
*/
public class EsDatasetFilter extends DatasetFilter implements ElasticsearchFilter {
/*
* (non-Javadoc)
* @see org.genesys.catalog.server.service.ElasticsearchFilter#elasticQuery()
*/
@Override
public QueryBuilder elasticQuery() {
return null;
}
}
/*
* Copyright 2018 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys.catalog.api.v0;
import com.fasterxml.jackson.annotation.JsonView;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.genesys.blocks.model.BasicModel;
import org.genesys.blocks.model.JsonViews;
import org.genesys.catalog.api.EsDatasetFilter;
import org.genesys.catalog.exceptions.InvalidApiUsageException;
import org.genesys.catalog.model.Partner;
import org.genesys.catalog.model.dataset.AccessionIdentifier;
import org.genesys.catalog.model.dataset.Dataset;
import org.genesys.catalog.model.traits.Descriptor;
import org.genesys.catalog.service.ElasticsearchService;
import org.genesys2.server.model.impl.Crop;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.google.common.collect.Sets.newHashSet;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.termsQuery;
/**
* API to search the Catalog.
*
* @author Matija Obreza
*/
@RestController("catalogSearchApi0")
@RequestMapping(value = { "/api/v0/search" })
public class SearchController {
private static final Logger LOG = LoggerFactory.getLogger(SearchController.class);
@Autowired(required = false)
private ElasticsearchService elasticsearch;
/**
* Get suggestions for dataset filters .
*
* @param filters the filters
* @param searchQuery the search query
* @return the map
*/
@JsonView({ JsonViews.Minimal.class })
@PostMapping(value = "/dataset/suggest", produces = MediaType.APPLICATION_JSON_VALUE)
public Map<String, SearchResults> datasets(@RequestBody(required = false) final EsDatasetFilter filters, @RequestParam(value = "q", required = true) String searchQuery) {
LOG.trace("Incoming {}", searchQuery);
searchQuery = sanitizeQuery(searchQuery);
LOG.info("Suggestions for datasets for: {}", searchQuery);
if (StringUtils.isBlank(searchQuery)) {
throw new InvalidApiUsageException("No search query provided");
}
QueryBuilder extraFilters = filtersFromDataset(filters);
Map<Class<?>, List<BasicModel>> hitsByEntity = elasticsearch.search(extraFilters, searchQuery, newHashSet(Crop.class, Partner.class,
AccessionIdentifier.class, Descriptor.class));
Map<String, SearchResults> suggestions = new HashMap<>();
suggestions.put("search.group.crop", SearchResults.from("code", Arrays.asList("crop"), hitsByEntity.get(Crop.class)));
suggestions.put("search.group.partner", SearchResults.from("uuid", Arrays.asList("owner.uuid"), hitsByEntity.get(Partner.class)));
suggestions.put("search.group.accession", SearchResults.from("doi", Arrays.asList("accessionIdentifier.doi"), hitsByEntity.get(AccessionIdentifier.class)));
suggestions.put("search.group.descriptor", SearchResults.from("uuid", Arrays.asList("descriptor.uuid"), hitsByEntity.get(Descriptor.class)));
// Search datasets
suggestions.put("search.matches", SearchResults.from("uuid", Arrays.asList("uuid"), elasticsearch.search(extraFilters, searchQuery, Dataset.class)));
return suggestions;
}
/// Try to enhance the search by adding known Dataset filters
private QueryBuilder filtersFromDataset(EsDatasetFilter filters) {
if (filters == null) {
return null;
}
BoolQueryBuilder q = boolQuery();
if (filters.crop != null && !filters.crop.isEmpty()) {
q.must(termsQuery("crop", filters.crop));
}
q.boost(3.0f);
return q.hasClauses() ? q : null;
}
/**
* Sanitize incoming search query
*
* @param searchQuery incoming
* @return sanitized search query
*/
private String sanitizeQuery(String searchQuery) {
if (StringUtils.isBlank(searchQuery)) {
return null;
}
return searchQuery.replaceAll("[^\\w\\d\\s]+", "").replaceAll("\\s\\s+", " ").trim();
}
}
/**
* Wrapper for search results
*/
class SearchResults {
public List<String> filters;
public String key = "uuid";
public List<BasicModel> hits;
public static SearchResults from(String key, List<String> filters, List<BasicModel> list) {
if (list == null || list.isEmpty())
return null;
SearchResults sr = new SearchResults();
sr.filters = filters;
sr.key = key;
sr.hits = list;
return sr;
}
}
/*
* Copyright 2018 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys.catalog.component;
import com.hazelcast.core.IQueue;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.genesys.blocks.model.BasicModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* AspectJ powered listener on repository save and delete operations
* adds indexed entities to re-index queue, handled by {@link ElasticReindexProcessor}.
*/
@Aspect
@Component
public class ElasticJPAListener {
private final static Logger LOG = LoggerFactory.getLogger(ElasticJPAListener.class);
private Set<Object> includedClasses;
private Set<Object> ignoredClasses;
@Resource
private IQueue<ElasticReindex> elasticReindexQueue;
/**
* Instantiates a new elastic JPA listener.
*/
public ElasticJPAListener() {
System.err.println("Making ElasticJPAListener");
ignoredClasses = Collections.synchronizedSet(new HashSet<>());
includedClasses = Collections.synchronizedSet(new HashSet<>());
}
/**
* After persist.
*
* @param joinPoint the join point
* @param result the result
*/
@AfterReturning(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.save(..))", returning = "result")
public void afterPersist(final JoinPoint joinPoint, final Object result) {
LOG.debug("JPA afterPersist {} {}", joinPoint.toLongString(), joinPoint.getTarget());
try {
scheduleReindexing(result);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}
/**
* Before remove.
*
* @param joinPoint the join point
*/
@Before(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.delete(..)) || execution(* org.springframework.data.jpa.repository.JpaRepository.deleteInBatch(..))")
public void beforeRemove(final JoinPoint joinPoint) {
final Object[] args = joinPoint.getArgs();
try {
final Object removed = args[0];
LOG.debug("JPA afterRemove: {} {}", joinPoint.toLongString(), joinPoint.getTarget());
scheduleReindexing(removed);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}
/**
* Flatten collections (if Iterable) and check each object class if indexed
*
* @param toReindex
*/
private void scheduleReindexing(Object toReindex) {
if (toReindex == null) {
return;
}
if (toReindex instanceof Iterable<?>) {
// Iterate and test
Iterable<?> it = (Iterable<?>) toReindex;
for (Object obj : it) {
scheduleReindexing(obj);
}
} else {
Class<?> clazz = toReindex.getClass();
if (isIndexed(clazz)) {
LOG.warn("Reindexing {} {}", clazz.getName(), toReindex);
if (toReindex instanceof BasicModel) {
BasicModel entity = (BasicModel) toReindex;
elasticReindexQueue.add(new ElasticReindex(clazz.getName(), entity.getId()));
} else {
LOG.warn("Don't know how to index {}. Not a BasicModel.", clazz.getName());
}
}
}
}
private boolean isIndexed(Class<?> clazz) {
if (ignoredClasses.contains(clazz)) {
return false;
}
if (includedClasses.contains(clazz)) {
return true;
}
Document esDocumentAnnotation = clazz.getAnnotation(Document.class);
if (esDocumentAnnotation == null) {
ignoredClasses.add(clazz);
return false;
} else {
includedClasses.add(clazz);
return true;
}
}
}
/*
* Copyright 2018 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys.catalog.component;
import java.io.Serializable;
/**
* The Class ElasticReindex.
*/
public class ElasticReindex implements Serializable {
private static final long serialVersionUID = -7779348224175021304L;
private String clazz;
private Long id;
/**
* Instantiates a new elastic reindex.
*
* @param clazz the clazz
* @param id the id
*/
public ElasticReindex(String clazz, Long id) {
this.clazz = clazz;
this.id = id;
}
/**
* Gets the clazz.
*
* @return the clazz
*/
public String getClazz() {
return clazz;
}
/**
* Gets the id.
*
* @return the id
*/
public Long getId() {
return id;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((clazz == null) ? 0 : clazz.hashCode());
result = prime * result + ((id == null) ? 0 : id.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ElasticReindex other = (ElasticReindex) obj;
if (clazz == null) {
if (other.clazz != null)
return false;
} else if (!clazz.equals(other.clazz))
return false;
if (id == null) {
if (other.id != null)
return false;
} else if (!id.equals(other.id))
return false;
return true;
}
}
/*
* Copyright 2018 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys.catalog.component;
import com.hazelcast.core.IQueue;
import org.genesys.blocks.model.BasicModel;
import org.genesys.catalog.service.ElasticsearchService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
/**
* ES Processor component uses Spring's @Scheduled annotation to scan queues
* with 2000ms delay measured from the completion time of each preceding
* invocation.
*/
@Component
public class ElasticReindexProcessor {
/** The Constant LOG. */
public static final Logger LOG = LoggerFactory.getLogger(ElasticReindexProcessor.class);
private static final int BATCH_SIZE = 100;
@Resource
private IQueue<ElasticReindex> elasticReindexQueue;
@Autowired
private ElasticsearchService elasticsearch;
/**
* Process queues.
*/
@Scheduled(fixedDelay = 2000)
public void processQueues() {
if (LOG.isTraceEnabled()) {
LOG.trace("Scanning ES remove queue");
}
List<ElasticReindex> forReindexing = new ArrayList<>();
Map<String, Set<Long>> buckets = Collections.synchronizedMap(new HashMap<String, Set<Long>>());
while (elasticReindexQueue.drainTo(forReindexing, 100) > 0) {
LOG.info("Remaining for reindex={}", elasticReindexQueue.size());
forReindexing.stream().forEach(er -> bucketize(buckets, er));
forReindexing.clear();
}
if (!buckets.isEmpty()) {
for (String className : buckets.keySet()) {
try {
@SuppressWarnings("unchecked")
Class<? extends BasicModel> clazz = (Class<? extends BasicModel>) Class.forName(className);
Set<Long> bucket = buckets.get(className);
elasticsearch.asyncUpdate(clazz, bucket);
bucket.clear();
} catch (ClassNotFoundException e) {
LOG.error(e.getMessage(), e);
}
}
buckets.clear();
}
}
private void bucketize(final Map<String, Set<Long>> buckets, final ElasticReindex toReindex) {
if (toReindex == null)
return;
Set<Long> bucket = buckets.get(toReindex.getClazz());
if (bucket == null) {
buckets.put(toReindex.getClazz(), bucket = Collections.synchronizedSet(new HashSet<Long>()));
}
bucket.add(toReindex.getId());
if (bucket.size() >= BATCH_SIZE) {
try {
@SuppressWarnings("unchecked")
Class<? extends BasicModel> clazz = (Class<? extends BasicModel>) Class.forName(toReindex.getClazz());
elasticsearch.asyncUpdate(clazz, bucket);
bucket.clear();
} catch (ClassNotFoundException e) {
LOG.error(e.getMessage(), e);
}
}
}
}
\ No newline at end of file
/*
* Copyright 2018 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys.catalog.custom.elasticsearch;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.elasticsearch.ElasticsearchException;
import java.io.IOException;
import static org.genesys.catalog.custom.elasticsearch.CustomMappingBuilder.buildMapping;
/**
* Used to access the package-protected buildMapping functions.
*/
public class CustomMapping {
/** The Constant LOG. */
public static final Logger LOG = LoggerFactory.getLogger(CustomMapping.class);
/**
* Spring data mapping.
*
* @param <T> the generic type
* @param clazz the clazz
* @param indexType the index type
* @return the object
*/
public static <T> Object springDataMapping(Class<T> clazz, String indexType) {
XContentBuilder xContentBuilder = null;
try {
xContentBuilder = buildMapping(clazz, indexType, "id", null);
} catch (Exception e) {
throw new ElasticsearchException("Failed to build mapping for " + clazz.getSimpleName(), e);
}
try {
LOG.info("Mapping for class={}/{}: {}", clazz, indexType, xContentBuilder.string());
} catch (IOException e) {
}
return xContentBuilder;
}
}