Commit 16a8e647 authored by Matija Obreza's avatar Matija Obreza

Merge branch 'es-indexing' into 'master'

Elasticsearch indexing updated

See merge request genesys-pgr/genesys-server!171
parents 50f6dfa7 795944f1
......@@ -147,7 +147,7 @@ public class ApiExceptionHandler {
@ExceptionHandler(value = { NotFoundElement.class, org.genesys2.server.exception.NotFoundElement.class })
@ResponseBody
public ApiError<Exception> handleNotFound(final Exception e, final HttpServletRequest request) {
LOG.warn("Element not found {} {}", request.getMethod(), request.getRequestURL(), e);
LOG.warn("Element not found {} {}", request.getMethod(), request.getRequestURL());
return new ApiError<>(e);
}
......@@ -161,7 +161,7 @@ public class ApiExceptionHandler {
@ResponseStatus(HttpStatus.METHOD_NOT_ALLOWED)
@ExceptionHandler(value = { HttpRequestMethodNotSupportedException.class })
public ApiError<Exception> handleRequestMethodFail(final HttpServletRequest req, final HttpRequestMethodNotSupportedException e) {
LOG.warn("Request method {} not supported for URL {}", e.getMethod(), req.getRequestURL(), e);
LOG.warn("Request method {} not supported for URL {}", e.getMethod(), req.getRequestURL());
return new ApiError<>(e);
}
......
......@@ -22,7 +22,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import javax.annotation.Resource;
......@@ -128,7 +127,8 @@ public class ElasticSearchController {
* @throws IOException
*/
@RequestMapping(method = RequestMethod.POST, value = "/action", params = { "reindex=accn", "filter" })
public String reindexElasticFiltered(@RequestParam(value = "filter", required = true) AccessionFilter accessionFilter) throws IOException {
public String reindexElasticFiltered(@RequestParam(value = "filter", required = true) String jsonfilt) throws IOException {
AccessionFilter accessionFilter = new ObjectMapper().readValue(jsonfilt, AccessionFilter.class);
taskExecutor.execute(() -> {
try {
elasticsearchService.reindex(Accession.class, accessionFilter);
......
......@@ -85,7 +85,7 @@ public interface ElasticsearchService {
* @param searchQuery the search query
* @return the map
*/
<T extends BasicModel> Map<Class<T>, List<T>> search(QueryBuilder shouldMatch, Set<Class<?>> clazzes, String searchQuery);
<T extends BasicModel> Map<Class<BasicModel>, List<T>> search(QueryBuilder shouldMatch, Set<Class<?>> clazzes, String searchQuery);
/**
* Term statistics auto.
......@@ -172,4 +172,6 @@ public interface ElasticsearchService {
List<Class<? extends BasicModel>> getIndexedEntities();
long count(Class<? extends BasicModel> clazz, BasicModelFilter filter);
}
......@@ -36,6 +36,7 @@ import javax.persistence.criteria.Root;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.time.StopWatch;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
......@@ -61,7 +62,6 @@ import org.genesys.blocks.model.filters.BasicModelFilter;
import org.genesys.catalog.custom.elasticsearch.CustomMapping;
import org.genesys2.server.component.elastic.ElasticQueryBuilder;
import org.genesys2.server.service.ElasticsearchService;
import org.hibernate.jpa.QueryHints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
......@@ -76,12 +76,20 @@ import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import org.springframework.data.elasticsearch.core.query.SourceFilter;
import org.springframework.data.jpa.repository.support.Querydsl;
import org.springframework.data.querydsl.SimpleEntityPathResolver;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.querydsl.core.types.EntityPath;
import com.querydsl.core.types.dsl.PathBuilder;
import com.querydsl.core.types.dsl.PathBuilderFactory;
import com.querydsl.jpa.JPQLQuery;
/**
* Manage Elasticsearch indexing, indices.
......@@ -89,7 +97,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
* @author Matija Obreza
*/
// Not @Service
@Transactional(readOnly = true)
@Transactional(readOnly = true, propagation = Propagation.SUPPORTS, isolation = Isolation.READ_COMMITTED)
public class ElasticsearchServiceImpl implements ElasticsearchService, InitializingBean {
/** The Constant LOG. */
......@@ -121,6 +129,9 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
private final Set<Class<? extends BasicModel>> indexedEntities = Collections.synchronizedSet(new HashSet<>());
private final Map<String, Class<BasicModel>> namesToClasses = Collections.synchronizedMap(new HashMap<>());
/// Size of database batch scan for IDs
private int batchSize = 1000;
@Override
public void afterPropertiesSet() throws Exception {
if (withElasticsearch()) {
......@@ -139,6 +150,16 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
}
}
/**
* Set the size of IDs loaded in a single batch when reindexing an entity.
* {@link #batchSize}.
*
* @param batchSize
*/
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
private boolean withElasticsearch() {
try {
elasticsearchTemplate.getClient().admin().cluster().prepareClusterStats().get();
......@@ -170,7 +191,7 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
indexedEntities.add(clazz);
namesToClasses.put(clazz.getSimpleName(), (Class<BasicModel>) clazz);
}
@Override
public List<Class<? extends BasicModel>> getIndexedEntities() {
return ListUtils.unmodifiableList(new ArrayList<>(this.indexedEntities));
......@@ -201,7 +222,7 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
final String indexName = createIndexFor(clazz);
// Scan
scanDatabase(clazz);
scanDatabase(clazz, null);
// Move _READ alias
realias(indexRoot + INDEX_READ, indexName);
......@@ -215,10 +236,10 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
elasticsearchTemplate.deleteIndex(oldIndexName);
}
}
@Override
public <T extends BasicModel> void reindex(Class<T> clazz, BasicModelFilter filter) {
throw new RuntimeException("Reindexing by filter not supported, yet.");
scanDatabase(clazz, filter);
}
/**
......@@ -243,30 +264,39 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
return indexName;
}
private <R> void scanDatabase(Class<R> clazz) {
CriteriaBuilder cb = em.getCriteriaBuilder();
CriteriaQuery<Long> cq = cb.createQuery(Long.class);
Root<R> root = cq.from(clazz);
cq.select(root.get("id"));
cq.orderBy(cb.asc(root.get("id")));
private <R> void scanDatabase(Class<R> clazz, BasicModelFilter filter) {
PathBuilder<R> builder = new PathBuilderFactory().create(clazz);
Querydsl querydsl = new Querydsl(em, builder);
EntityPath<R> entityPath = SimpleEntityPathResolver.INSTANCE.createPath(clazz);
PathBuilder<R> pathBuilder = new PathBuilder<R>(clazz, entityPath.getMetadata().getName());
JPQLQuery<Long> query = querydsl.createQuery(entityPath)
// select id only
.select(pathBuilder.getNumber("id", Long.class));
TypedQuery<Long> query = em.createQuery(cq);
if (filter != null) {
// apply filter
query.where(filter.buildQuery());
}
int startPosition = 0;
query.setHint(QueryHints.HINT_FETCH_SIZE, 100);
query.setFirstResult(startPosition);
query.setMaxResults(100);
query.offset(startPosition);
query.limit(batchSize);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
List<Long> results;
do {
stopWatch.split();
LOG.debug("Reading IDs stopwatch={}ms {} {} - {}", stopWatch.getSplitTime(), clazz.getName(), query.getFirstResult(), query.getFirstResult() + query.getMaxResults());
results = query.getResultList();
LOG.debug("Reading IDs stopwatch={}s {} {}+{}", stopWatch.getSplitTime() / 1000, clazz.getName(), startPosition, batchSize);
if (startPosition > 10 * batchSize && startPosition / (10 * batchSize) == 0) {
LOG.info("Reading IDs stopwatch={}s {} {}+{}", stopWatch.getSplitTime() / 1000, clazz.getName(), startPosition, batchSize);
}
results = query.fetch();
asyncUpdate(clazz, results);
// Next page
query.setFirstResult(startPosition += results.size());
query.offset(startPosition += results.size());
} while (results.size() > 0);
stopWatch.stop();
LOG.warn("Reindexing {} took {}ms", clazz.getName(), stopWatch.getTime());
......@@ -296,7 +326,7 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
threadPoolTaskExecutor.execute(new Runnable() {
@Override
public void run() {
LOG.debug("Reindexing {} size={}", clazz.getName(), copy.size());
// LOG.debug("Reindexing {} size={}", clazz.getName(), copy.size());
try {
_self.update(clazz, copy);
} catch (Throwable e) {
......@@ -337,7 +367,7 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
// ObjectWriter pp = om.writerWithDefaultPrettyPrinter();
for (R x : results) {
LOG.debug("Indexing {} {}", clazz.getName(), x);
LOG.trace("Indexing {} {}", clazz.getName(), x);
BasicModel bm = (BasicModel) x;
// try {
......@@ -359,12 +389,25 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
}
if (!queries.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Indexing {} count={} of provided objects count={}", clazz.getName(), queries.size(), ids.size());
LOG.debug("Indexing {} count={} of provided objects count={}", clazz.getName(), queries.size(), ids.size());
try {
elasticsearchTemplate.bulkIndex(queries);
} catch (org.springframework.data.elasticsearch.ElasticsearchException e) {
LOG.error(e.getMessage());
Map<String, String> failedDocs = e.getFailedDocuments();
for (String key : failedDocs.keySet()) {
LOG.error("{} {}", key, failedDocs.get(key));
}
} catch (ElasticsearchException e) {
LOG.error("Could not index document", e);
}
elasticsearchTemplate.bulkIndex(queries);
}
for (R x : results) {
// detach from EM
em.detach(x);
}
for (Long id : notFoundIds) {
LOG.debug("Removing from index {} {}", clazz.getName(), id);
elasticsearchTemplate.delete(indexName, COMMON_TYPE_NAME, String.valueOf(id));
......@@ -481,13 +524,13 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
Pageable pageable = new PageRequest(0, 20);
Map<Class<T>, List<T>> hits = new HashMap<>();
Map<Class<BasicModel>, List<T>> hits = new HashMap<>();
searchIndex(indexName, shouldMatch, searchQuery, pageable, hits);
return hits.get(clazz);
}
@Override
public <T extends BasicModel> Map<Class<T>, List<T>> search(QueryBuilder shouldMatch, Set<Class<?>> clazzes, String searchQuery) {
public <T extends BasicModel> Map<Class<BasicModel>, List<T>> search(QueryBuilder shouldMatch, Set<Class<?>> clazzes, String searchQuery) {
final Set<String> indexNames = clazzes.stream().filter(clazz -> indexedEntities.contains(clazz)).map(clazz -> toIndexName(clazz) + INDEX_READ).collect(Collectors.toSet());
if (indexNames.size() == 0) {
......@@ -496,7 +539,7 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
}
LOG.debug("Searching {} in indices {}", clazzes, indexNames);
Map<Class<T>, List<T>> hits = new HashMap<>();
Map<Class<BasicModel>, List<T>> hits = new HashMap<>();
Pageable pageable = new PageRequest(0, 20);
......@@ -507,7 +550,7 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
return hits;
}
private <T extends BasicModel> void searchIndex(String indexName, QueryBuilder shouldMatch, String searchQuery, Pageable pageable, Map<Class<T>, List<T>> hits) {
private <T extends BasicModel> void searchIndex(String indexName, QueryBuilder shouldMatch, String searchQuery, Pageable pageable, Map<Class<BasicModel>, List<T>> hits) {
BoolQueryBuilder theQuery = boolQuery()
/*@formatter:off*/
.should(multiMatchQuery(searchQuery, "_all")
......@@ -556,7 +599,7 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
String _class = searchHit.getFields().get("_class").getValue();
LOG.debug("Hit score={} id={} _class={} _source={}", searchHit.getScore(), searchHit.getId(), _class, searchHit.getSourceAsString());
Class<T> clazz = (Class<T>) namesToClasses.get(_class);
Class<BasicModel> clazz = namesToClasses.get(_class);
if (clazz != null) {
if (!hits.containsKey(clazz)) {
......@@ -587,7 +630,8 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
});
}
private <T extends BasicModel> T loadEntity(Class<T> clazz, Long id) {
@SuppressWarnings("unchecked")
private <T extends BasicModel> T loadEntity(Class<BasicModel> clazz, Long id) {
return (T) em.find(clazz, id);
}
......@@ -671,4 +715,11 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
}
@Override
public long count(Class<? extends BasicModel> clazz, BasicModelFilter filter) {
String indexName = toIndexName(clazz) + INDEX_READ;
SearchResponse hits = client.prepareSearch(indexName).setTypes(COMMON_TYPE_NAME).setQuery(toEsQuery(clazz, filter)).setSize(0).get();
return hits.getHits().getTotalHits();
}
}
......@@ -12,12 +12,12 @@
<div>Update queue size: <c:out value="${updateQueueSize}" /></div>
<h3>Elastic reindex</h3>
<%-- <form method="post" action="<c:url value="/admin/elastic/action" />">
<form method="post" action="<c:url value="/admin/elastic/action" />">
<input type="hidden" name="${_csrf.parameterName}" value="${_csrf.token}" />
<input type="text" name="filter" placeholder="Genesys filter {}" value='{"historic":[true,false]}' />
<input type="text" name="filter" placeholder="Genesys filter {}" value='{"holder":{"code":["NGA039"]}}' />
<button type="submit" class="btn btn-default" value="accn" name="reindex">Reindex</button>
<button type="submit" class="btn btn-default" value="accn" name="regenerate">Regenerate</button>
</form> --%>
</form>
<form method="post" action="<c:url value="/admin/elastic/action" />">
<input type="hidden" name="${_csrf.parameterName}" value="${_csrf.token}" />
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment