Commit 4ee8ce1e authored by Matija Obreza's avatar Matija Obreza
Browse files

Elasticsearch: total and missing terms

- Fix: Use single ElasticJPAListener and ElasticReindexProcessor
parent 4543c36b
......@@ -45,7 +45,7 @@ import org.springframework.stereotype.Component;
* {@link ElasticReindexProcessor}.
*/
@Aspect
@Component
@Component("elasticJpaListener")
public class ElasticJPAListener {
private final static Logger LOG = LoggerFactory.getLogger(ElasticJPAListener.class);
......@@ -149,7 +149,7 @@ public class ElasticJPAListener {
} else {
Class<?> clazz = toReindex.getClass();
if (isIndexed(clazz)) {
LOG.trace("Reindexing {} {}", clazz.getName(), toReindex);
LOG.trace("Scheduling reindexing of {} {}", clazz.getName(), toReindex);
if (toReindex instanceof BasicModel) {
BasicModel entity = (BasicModel) toReindex;
elasticReindexQueue.add(new ElasticReindex(clazz.getName(), entity.getId()));
......
......@@ -40,7 +40,7 @@ import org.springframework.stereotype.Component;
* with 2000ms delay measured from the completion time of each preceding
* invocation.
*/
@Component
@Component("elasticReindexProcessor")
public class ElasticReindexProcessor {
/** The Constant LOG. */
......@@ -54,6 +54,10 @@ public class ElasticReindexProcessor {
@Autowired
private ElasticsearchService elasticsearch;
public ElasticReindexProcessor() {
System.err.println("Making ElasticReindexProcessor");
}
/**
* Process queues.
*/
......
......@@ -94,11 +94,11 @@ public interface ElasticsearchService {
* @return the term result
* @throws SearchException the search exception
*/
TermResult termStatisticsAuto(Class<?> clazz, BasicModelFilter filters, int size, String term) throws SearchException;
Map<String, TermResult> termStatisticsAuto(Class<?> clazz, BasicModelFilter filters, int size, String... terms) throws SearchException;
TermResult termStatisticsAuto(Class<? extends BasicModel> clazz, BasicModelFilter<?, ?> filters, int size, String term) throws SearchException;
Map<String, TermResult> termStatisticsAuto(Class<? extends BasicModel> clazz, BasicModelFilter<?, ?> filters, int size, String... terms) throws SearchException;
TermResult termStatistics(Class<?> clazz, BasicModelFilter filters, int size, String term) throws SearchException;
Map<String, TermResult> termStatistics(Class<?> clazz, BasicModelFilter filters, int size, String... terms) throws SearchException;
TermResult termStatistics(Class<? extends BasicModel> clazz, BasicModelFilter<?, ?> filters, int size, String term) throws SearchException;
Map<String, TermResult> termStatistics(Class<? extends BasicModel> clazz, BasicModelFilter<?, ?> filters, int size, String... terms) throws SearchException;
void realias(String aliasName, String indexName);
......@@ -110,11 +110,11 @@ public interface ElasticsearchService {
void reindexAll();
<T extends BasicModel> void reindex(Class<T> clazz, BasicModelFilter filter);
<T extends BasicModel> void reindex(Class<T> clazz, BasicModelFilter<?, ?> filter);
List<Class<? extends BasicModel>> getIndexedEntities();
long count(Class<? extends BasicModel> clazz, BasicModelFilter filter);
long count(Class<? extends BasicModel> clazz, BasicModelFilter<?, ?> filter);
/**
* Aggregate by date
......@@ -123,10 +123,10 @@ public interface ElasticsearchService {
* @param indexClass the index class
* @param aggregatedDateField the name of date field to aggregate
* @param groupingByField the name of field to grouping
* @param filter the BasicModelFilter filter
* @param filter the BasicModelFilter<?, ?> filter
* @return the result list
*/
List<Object[]> aggregateDate(int size, Class<? extends BasicModel> targetClass, Class<? extends BasicModel> indexClass, String aggregatedDateField, String groupingByField, BasicModelFilter filter);
List<Object[]> aggregateDate(int size, Class<? extends BasicModel> targetClass, Class<? extends BasicModel> indexClass, String aggregatedDateField, String groupingByField, BasicModelFilter<?, ?> filter);
public static class TermResult {
......@@ -134,17 +134,18 @@ public interface ElasticsearchService {
private Long total;
private long other;
/**
* @deprecated This number is useless here and needs to be calculated based matches - total - other
*/
private Long missing;
public TermResult(String name, List<Term> terms, long other, Long missing) {
public TermResult(String name, Long total, List<Term> terms, long other) {
this.terms = terms;
this.total = terms.stream().map(Term::getCount).mapToLong(Long::longValue).sum();
this.total = total;
this.other = other;
this.missing = missing;
this.missing = total - terms.stream().map(Term::getCount).mapToLong(Long::longValue).sum() - other;
if (missing <= 0) {
// We have some terms (storage) where the total is not the total number of all
// storage options, but count of accessions
missing = null;
}
}
public List<Term> getTerms() {
......@@ -188,5 +189,5 @@ public interface ElasticsearchService {
}
}
<T extends BasicModel> List<T> find(Class<T> clazz, BasicModelFilter filter);
<T extends BasicModel> List<T> find(Class<T> clazz, BasicModelFilter<?, ?> filter);
}
......@@ -249,7 +249,7 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
}
@Override
public <T extends BasicModel> void reindex(Class<T> clazz, BasicModelFilter filter) {
public <T extends BasicModel> void reindex(Class<T> clazz, BasicModelFilter<?, ?> filter) {
scanDatabase(clazz, filter);
}
......@@ -275,7 +275,7 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
return indexName;
}
private <R> void scanDatabase(Class<R> clazz, BasicModelFilter filter) {
private <R> void scanDatabase(Class<R> clazz, BasicModelFilter<?, ?> filter) {
PathBuilder<R> builder = new PathBuilderFactory().create(clazz);
Querydsl querydsl = new Querydsl(em, builder);
......@@ -646,17 +646,18 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
}
@Override
public TermResult termStatistics(Class<?> clazz, BasicModelFilter filters, int size, String term) throws SearchException {
public TermResult termStatistics(Class<? extends BasicModel> clazz, BasicModelFilter<?, ?> filters, int size, String term) throws SearchException {
return termStatistics(clazz, filters, size, new String[] { term }).get(term);
}
@Override
public Map<String, TermResult> termStatistics(Class<?> clazz, BasicModelFilter filters, int size, String... terms) throws SearchException {
public Map<String, TermResult> termStatistics(Class<? extends BasicModel> clazz, BasicModelFilter<?, ?> filters, int size, String... terms) throws SearchException {
if (!indexedEntities.contains(clazz)) {
throw new RuntimeException("Class is not indexed " + clazz);
}
Long total = count(clazz, filters);
String indexName = toIndexName(clazz) + INDEX_READ;
SearchRequestBuilder esQuery = client.prepareSearch(indexName).setTypes(COMMON_TYPE_NAME).setQuery(toEsQuery(clazz, filters));
for (String term : terms) {
......@@ -681,8 +682,8 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
InternalTerms<?, ?> topCounts = (InternalTerms<?, ?>) agg;
List<Bucket> buckets = topCounts.getBuckets();
TermResult tr = new TermResult(term, buckets.stream().map(bucket -> new Term(bucket.getKeyAsString(), bucket.getDocCount())).collect(Collectors.toList()),
topCounts.getSumOfOtherDocCounts(), topCounts.getDocCountError());
TermResult tr = new TermResult(term, total, buckets.stream().map(bucket -> new Term(bucket.getKeyAsString(), bucket.getDocCount())).collect(Collectors.toList()),
topCounts.getSumOfOtherDocCounts());
termResults.put(term, tr);
} else if (agg instanceof UnmappedTerms) {
......@@ -697,7 +698,7 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
return termResults;
}
private QueryBuilder toEsQuery(Class<?> clazz, BasicModelFilter filters) {
private QueryBuilder toEsQuery(Class<?> clazz, BasicModelFilter<?, ?> filters) {
ElasticQueryBuilder esQb = new ElasticQueryBuilder();
if (filters != null) {
filters.buildQuery().accept(esQb, null);
......@@ -716,12 +717,12 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
* than 10%
*/
@Override
public TermResult termStatisticsAuto(Class<?> clazz, BasicModelFilter filters, int size, String term) throws SearchException {
public TermResult termStatisticsAuto(Class<? extends BasicModel> clazz, BasicModelFilter<?, ?> filters, int size, String term) throws SearchException {
return termStatisticsAuto(clazz, filters, size, new String[] { term }).get(term);
}
@Override
public Map<String, TermResult> termStatisticsAuto(Class<?> clazz, BasicModelFilter filters, int size, String... terms) throws SearchException {
public Map<String, TermResult> termStatisticsAuto(Class<? extends BasicModel> clazz, BasicModelFilter<?, ?> filters, int size, String... terms) throws SearchException {
Map<String, TermResult> termResult = termStatistics(clazz, filters, size, terms);
......@@ -731,7 +732,7 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
}
@Override
public long count(Class<? extends BasicModel> clazz, BasicModelFilter filter) {
public long count(Class<? extends BasicModel> clazz, BasicModelFilter<?, ?> filter) {
String indexName = toIndexName(clazz) + INDEX_READ;
SearchResponse hits = client.prepareSearch(indexName).setTypes(COMMON_TYPE_NAME).setQuery(toEsQuery(clazz, filter)).setSize(0).get();
return hits.getHits().getTotalHits();
......@@ -740,7 +741,7 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
@Override
@Cacheable(value = "statistics", unless = "#result == null", key = "'stats.' + #root.methodName + '-' + #size + '-' + #targetClass.name + '-' + #indexClass.name")
public List<Object[]> aggregateDate(final int size, final Class<? extends BasicModel> targetClass, final Class<? extends BasicModel> indexClass,
final String aggregatedDateField, final String groupingByField, final BasicModelFilter filter) {
final String aggregatedDateField, final String groupingByField, final BasicModelFilter<?, ?> filter) {
final String DATE_HISTOGRAM_AGG = "updates_over_time";
final String TERMS_AGG = "institute_terms";
final String TOP_HITS_AGG = "top_tag_hits";
......@@ -791,7 +792,7 @@ public class ElasticsearchServiceImpl implements ElasticsearchService, Initializ
}
@Override
public <T extends BasicModel> List<T> find(Class<T> clazz, BasicModelFilter filter) {
public <T extends BasicModel> List<T> find(Class<T> clazz, BasicModelFilter<?, ?> filter) {
String indexName = toIndexName(clazz) + INDEX_READ;
List<T> results = new ArrayList<>();
client.prepareSearch(indexName).setTypes(COMMON_TYPE_NAME).setQuery(toEsQuery(clazz, filter)).get()
......
......@@ -179,8 +179,8 @@ public class ElasticsearchConfig {
*
* @return the elastic reindex processor
*/
@Bean
public ElasticReindexProcessor elasticReindexer() {
@Bean()
public ElasticReindexProcessor elasticReindexProcessor() {
return new ElasticReindexProcessor();
}
......
......@@ -187,7 +187,7 @@ public class ElasticQueryBuilderTest extends AbstractElasticServiceTest {
terms = elasticService.termStatisticsAuto(Accession.class, af, 10, "storage");
LOG.debug("Storage {}", objectMapper.writeValueAsString(terms));
assertThat(terms.getTotal(), is(3l));
assertThat(terms.getTotal(), is(2l));
terms.getTerms().forEach(t -> {
if (t.getTerm().equals(Integer.toString(MCPD_STORAGE_FIELD))) {
assertThat(t.getCount(), is(2l));
......@@ -212,7 +212,7 @@ public class ElasticQueryBuilderTest extends AbstractElasticServiceTest {
assertThat(terms.getTerms().get(0).getTerm(), is("INS001"));
assertThat(terms.getTerms().get(0).getCount(), is(2l));
terms = stats.get("storage");
assertThat(terms.getTotal(), is(3l));
assertThat(terms.getTotal(), is(2l));
terms.getTerms().forEach(t -> {
if (t.getTerm().equals(Integer.toString(MCPD_STORAGE_FIELD))) {
assertThat(t.getCount(), is(1l));
......@@ -226,7 +226,7 @@ public class ElasticQueryBuilderTest extends AbstractElasticServiceTest {
af.storage = newHashSet(MCPD_STORAGE_FIELD);
stats = elasticService.termStatistics(Accession.class, af, 10, "institute.code", "storage");
terms = stats.get("storage");
assertThat(terms.getTotal(), is(5l));
assertThat(terms.getTotal(), is(3l));
terms.getTerms().forEach(t -> {
if (t.getTerm().equals(Integer.toString(MCPD_STORAGE_FIELD))) {
assertThat(t.getCount(), is(3l));
......@@ -239,7 +239,7 @@ public class ElasticQueryBuilderTest extends AbstractElasticServiceTest {
af.storage = newHashSet(MCPD_STORAGE_INVITRO, MCPD_STORAGE_CRYO);
terms = elasticService.termStatistics(Accession.class, af, 10, "storage");
assertThat(terms.getTotal(), is(5l));
assertThat(terms.getTotal(), is(3l));
terms.getTerms().forEach(t -> {
if (t.getTerm().equals(Integer.toString(MCPD_STORAGE_FIELD))) {
assertThat(t.getCount(), is(2l));
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment