Commit 05f17f10 authored by Matija Obreza's avatar Matija Obreza
Browse files

Merge branch '613-firehose' into 'main'

Resolve "Firehose"

Closes #613

See merge request genesys-pgr/genesys-server!666
parents b05e8a99 3964722b
......@@ -119,7 +119,7 @@ public class DatasetAndSubsetCountAspect {
taskExecutor.execute(() -> toReindex.forEach(accessionId -> {
LOG.trace("Scheduling reindexing of {} {}", Accession.class.getName(), accessionId.getId());
elasticReindexQueue.add(new ElasticReindex(Accession.class.getName(), accessionId.getId()));
elasticReindexQueue.add(new ElasticReindex(Accession.class, accessionId.getId()));
}));
}
......
/*
* Copyright 2019 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys2.server.component.aspect;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import javax.annotation.Resource;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.genesys2.server.component.elastic.ElasticReindex;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.model.genesys.AccessionId;
import org.genesys2.server.model.genesys.AccessionRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Modifications to AccessionId do not trigger ES reindexing. This aspects
* addresses that.
*
* @author Matija Obreza
*/
@Aspect
public class ExtraReindexingAspect {
private final static Logger LOG = LoggerFactory.getLogger(ExtraReindexingAspect.class);
@Resource
private BlockingQueue<ElasticReindex> elasticReindexQueue;
/**
* After AccessionIdRepository#save
*
* @param joinPoint the join point
* @param result the result
*/
@AfterReturning(value = "execution(* org.genesys2.server.persistence.AccessionIdRepository.save(..))", returning = "result")
public void afterPersist(final JoinPoint joinPoint, final Object result) {
LOG.trace("accessionIdRepository#save {} {}", joinPoint.toLongString(), joinPoint.getTarget());
try {
scheduleReindexing(result);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}
/**
* After AccessionIdRepository#save
*
* @param joinPoint the join point
* @param result the result
*/
@AfterReturning(value = "execution(* org.genesys2.server.persistence.AccessionRefRepository.save(..))", returning = "result")
public void afterPersistAccessionRef(final JoinPoint joinPoint, final Object result) {
LOG.trace("accessionRefRepository#save {} {}", joinPoint.toLongString(), joinPoint.getTarget());
try {
scheduleReindexing(result);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}
/**
* After AccessionRefRepository#delete
*
* @param joinPoint the join point
*/
@AfterReturning(value = "execution(* org.genesys2.server.persistence.AccessionRefRepository.delete(..))")
public void afterDeleteAccessionRef(final JoinPoint joinPoint) {
LOG.trace("accessionRefRepository#delete {} {}", joinPoint.toLongString(), joinPoint.getTarget());
try {
final Object[] args = joinPoint.getArgs();
scheduleReindexing(args[0]);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}
/**
* Flatten collections (if Iterable) and check each object class if indexed
*
* @param toReindex
*/
private void scheduleReindexing(Object toReindex) {
if (toReindex == null) {
return;
}
if (toReindex instanceof Iterable<?>) {
// Iterate and test
Iterable<?> it = (Iterable<?>) toReindex;
List<ElasticReindex> list = new ArrayList<>();
it.forEach((item) -> addNotNull(list, forReindexing(item)));
elasticReindexQueue.addAll(list);
} else {
addNotNull(elasticReindexQueue, forReindexing(toReindex));
}
}
private <T> void addNotNull(Collection<T> destination, T element) {
if (element != null) {
destination.add(element);
}
}
private ElasticReindex forReindexing(Object toReindex) {
if (toReindex instanceof AccessionId){
AccessionId accessionId = (AccessionId) toReindex;
LOG.trace("Scheduling reindexing of {} {}", Accession.class.getName(), accessionId.getId());
return new ElasticReindex(Accession.class.getName(), accessionId.getId());
} else if (toReindex instanceof AccessionRef){
AccessionRef accessionRef = (AccessionRef) toReindex;
if (accessionRef.getAccession() != null) {
LOG.trace("Scheduling reindexing of {} {}", Accession.class.getName(), accessionRef.getAccession().getId());
return new ElasticReindex(Accession.class.getName(), accessionRef.getAccession().getId());
}
}
return null;
}
}
......@@ -16,7 +16,11 @@
package org.genesys2.server.component.elastic;
import org.genesys.blocks.model.EmptyModel;
import org.hibernate.proxy.HibernateProxy;
import java.io.Serializable;
import java.util.Objects;
/**
* The Class ElasticReindex.
......@@ -24,7 +28,7 @@ import java.io.Serializable;
public class ElasticReindex implements Serializable {
private static final long serialVersionUID = -7779348224175021304L;
private String clazz;
private Class<?> clazz;
private Long id;
/**
......@@ -33,17 +37,29 @@ public class ElasticReindex implements Serializable {
* @param clazz the clazz
* @param id the id
*/
public ElasticReindex(String clazz, Long id) {
public ElasticReindex(Class<?> clazz, Long id) {
this.clazz = clazz;
if (HibernateProxy.class.isAssignableFrom(this.clazz)) {
System.err.println("You're trying to work with a HibernateProxy! " + this.clazz);
}
this.id = id;
}
public ElasticReindex(EmptyModel toReindex) {
this.clazz = toReindex.getClass();
if (toReindex instanceof HibernateProxy) {
HibernateProxy hib = (HibernateProxy) toReindex;
this.clazz = hib.getHibernateLazyInitializer().getPersistentClass();
}
this.id = toReindex.getId();
}
/**
* Gets the clazz.
*
* @return the clazz
*/
public String getClazz() {
public Class<?> getClazz() {
return clazz;
}
......@@ -58,32 +74,19 @@ public class ElasticReindex implements Serializable {
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((clazz == null) ? 0 : clazz.hashCode());
result = prime * result + ((id == null) ? 0 : id.hashCode());
return result;
return Objects.hash(clazz, id);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ElasticReindex)) return false;
ElasticReindex that = (ElasticReindex) o;
return Objects.equals(id, that.id) & Objects.equals(clazz, that.clazz);
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ElasticReindex other = (ElasticReindex) obj;
if (clazz == null) {
if (other.clazz != null)
return false;
} else if (!clazz.equals(other.clazz))
return false;
if (id == null) {
if (other.id != null)
return false;
} else if (!id.equals(other.id))
return false;
return true;
public String toString() {
return "ESRef " + clazz + " id=" + id;
}
}
......@@ -27,7 +27,6 @@ import java.util.concurrent.BlockingQueue;
import javax.annotation.Resource;
import org.genesys.blocks.model.BasicModel;
import org.genesys2.server.service.ElasticsearchService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -66,9 +65,12 @@ public class ElasticReindexProcessor {
if (LOG.isTraceEnabled()) {
LOG.trace("Scanning ES update queue size={}", elasticReindexQueue.size());
}
if (elasticReindexQueue.isEmpty()) {
return;
}
List<ElasticReindex> forReindexing = new ArrayList<>(200);
Map<String, Set<Long>> buckets = Collections.synchronizedMap(new HashMap<String, Set<Long>>());
Map<Class<?>, Set<Long>> buckets = Collections.synchronizedMap(new HashMap<Class<?>, Set<Long>>());
while (elasticReindexQueue.drainTo(forReindexing, 200) > 0) {
LOG.debug("Remaining for reindex={} handling={}", elasticReindexQueue.size(), forReindexing.size());
......@@ -77,40 +79,29 @@ public class ElasticReindexProcessor {
}
if (!buckets.isEmpty()) {
for (var entry : buckets.entrySet()) {
try {
@SuppressWarnings("unchecked")
Class<? extends BasicModel> clazz = (Class<? extends BasicModel>) Class.forName(entry.getKey());
Set<Long> bucket = entry.getValue();
elasticsearch.asyncUpdate(clazz, bucket);
bucket.clear();
} catch (ClassNotFoundException e) {
LOG.error(e.getMessage(), e);
}
for (Map.Entry<Class<?>, Set<Long>> entry : buckets.entrySet()) {
Set<Long> bucket = entry.getValue();
elasticsearch.asyncUpdate(entry.getKey(), bucket);
bucket.clear();
}
buckets.clear();
}
}
private void bucketize(final Map<String, Set<Long>> buckets, final ElasticReindex toReindex) {
private void bucketize(final Map<Class<?>, Set<Long>> buckets, final ElasticReindex toReindex) {
if (toReindex == null)
return;
Set<Long> bucket = buckets.get(toReindex.getClazz());
Class<?> clazz = toReindex.getClazz();
Set<Long> bucket = buckets.get(clazz);
if (bucket == null) {
buckets.put(toReindex.getClazz(), bucket = Collections.synchronizedSet(new HashSet<Long>()));
buckets.put(clazz, bucket = Collections.synchronizedSet(new HashSet<Long>()));
}
bucket.add(toReindex.getId());
if (bucket.size() >= BATCH_SIZE) {
try {
@SuppressWarnings("unchecked")
Class<? extends BasicModel> clazz = (Class<? extends BasicModel>) Class.forName(toReindex.getClazz());
elasticsearch.asyncUpdate(clazz, bucket);
bucket.clear();
} catch (ClassNotFoundException e) {
LOG.error(e.getMessage(), e);
}
elasticsearch.asyncUpdate(clazz, bucket);
bucket.clear();
}
}
......
/*
* Copyright 2020 Global Crop Diversity Trust
* Copyright 2021 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -13,42 +13,45 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys2.server.component.elastic;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import javax.annotation.Resource;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.genesys.blocks.model.EmptyModel;
import org.genesys2.server.component.firehose.FirehoseDeleteAllEvent;
import org.genesys2.server.component.firehose.FirehoseEvent;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.model.genesys.AccessionId;
import org.genesys2.server.model.genesys.AccessionRef;
import org.genesys2.server.service.ElasticsearchService;
import org.genesys2.server.service.impl.SearchException;
import org.hibernate.proxy.HibernateProxy;
import org.hibernate.proxy.LazyInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.Advised;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.context.event.EventListener;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalApplicationListener;
import org.springframework.transaction.event.TransactionalEventListener;
/**
* AspectJ powered listener on repository save and delete operations adds
* indexed entities to re-index queue, handled by
* {@link ElasticReindexProcessor}.
* The FirehoseReindexListener is a {@link TransactionalApplicationListener} that
* is attached to the {@link TransactionPhase} {@code TransactionPhase.AFTER_COMMIT}
* This event listener updates the Elasticsearch update queues immediately.
*
* @author Matija Obreza
*/
@Aspect
@Component("elasticJpaListener")
public class ElasticJPAListener implements InitializingBean {
private final static Logger LOG = LoggerFactory.getLogger(ElasticJPAListener.class);
// Instantiated in ElasticsearchConfig! @Component
public class FirehoseReindexListener implements InitializingBean {
private final static Logger LOG = LoggerFactory.getLogger(FirehoseReindexListener.class);
private List<Class<?>> includedClasses;
......@@ -58,101 +61,59 @@ public class ElasticJPAListener implements InitializingBean {
@Autowired
private ElasticsearchService elasticsearchService;
/**
* Instantiates a new elastic JPA listener.
*/
public ElasticJPAListener() {
System.err.println("Making ElasticJPAListener");
}
@Override
public void afterPropertiesSet() throws Exception {
includedClasses = elasticsearchService.getIndexedEntities();
}
/**
* After persist.
*
* @param joinPoint the join point
* @param result the result
*/
@AfterReturning(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.save(..)) || execution(* org.springframework.data.repository.*.saveAll(..)) || execution(* org.springframework.data.repository.*.saveAndFlush(..))", returning = "result")
public void afterPersist(final JoinPoint joinPoint, final Object result) {
LOG.trace("JPA afterPersist {} {}", joinPoint.toLongString(), joinPoint.getTarget());
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleEvent(FirehoseEvent firehoseEvent) {
Class<?> clazz = firehoseEvent.getClazz();
Object entity = firehoseEvent.getEntity();
if (entity instanceof AccessionId) {
AccessionId accessionId = (AccessionId) entity;
LOG.trace("Scheduling reindexing of {} {}", Accession.class.getName(), accessionId.getId());
addNotNull(elasticReindexQueue, new ElasticReindex(Accession.class, accessionId.getId()));
} else if (entity instanceof AccessionRef){
AccessionRef<?> accessionRef = (AccessionRef<?>) entity;
if (accessionRef.getAccession() != null) {
LOG.trace("Scheduling reindexing of {} {}", Accession.class.getName(), accessionRef.getAccession().getId());
addNotNull(elasticReindexQueue, new ElasticReindex(Accession.class, accessionRef.getAccession().getId()));
}
try {
scheduleReindexing(result);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
} else if (isIndexed(clazz) && entity instanceof EmptyModel) {
scheduleReindexing(entity);
}
}
/**
* Before remove.
*
* @param joinPoint the join point
*/
@After(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.delete(..)) || execution(* org.springframework.data.repository.*.deleteAll(Iterable)) || execution(* org.springframework.data.jpa.repository.JpaRepository.deleteInBatch(Iterable))")
public void afterRemove(final JoinPoint joinPoint) {
final Object[] args = joinPoint.getArgs();
try {
final Object removed = args[0];
LOG.debug("JPA afterRemove: {} {}", joinPoint.toLongString(), joinPoint.getTarget());
scheduleReindexing(removed);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
@EventListener(classes = { FirehoseDeleteAllEvent.class })
// @TransactionalEventListener doesn't work in unit tests
// @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleDeleteAllEvent(FirehoseDeleteAllEvent firehoseDeleteAllEvent) {
if (!isIndexed(firehoseDeleteAllEvent.getClazz())) {
return;
}
}
/**
* Delete all
*
* @param joinPoint the join point
*/
@After(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.deleteAll()) || execution(* org.springframework.data.jpa.repository.JpaRepository.deleteAllInBatch())")
public void afterDeleteAll(final JoinPoint joinPoint) {
// Remove from created and updated only
var clazz = firehoseDeleteAllEvent.getClazz();
LOG.debug("Removing all queued items of {}", clazz);
elasticReindexQueue.removeIf(reference -> Objects.equals(reference.getClazz(), clazz));
try {
LOG.debug("JPA afterDeleteAll: {} {}", joinPoint.toLongString(), joinPoint.getTarget());
Object proxy = joinPoint.getTarget();
if (proxy instanceof Advised) {
Advised x = (Advised) proxy;
for (Class<?> foo : x.getProxiedInterfaces()) {
for (Type generic : foo.getGenericInterfaces()) {
if (generic instanceof ParameterizedType) {
Class<?> paramClass = (Class<?>) ((ParameterizedType) generic).getActualTypeArguments()[0];
if (isIndexed(paramClass)) {
LOG.debug("Deleting all documents for {}", paramClass);
elasticsearchService.removeAll(paramClass);
}
}
}
}
}
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
elasticsearchService.removeAll(clazz);
} catch (SearchException e) {
LOG.warn("Error reindexing {}: {}", clazz, e.getMessage()); //, e);
}
}
/**
* Flatten collections (if Iterable) and check each object class if indexed
*
* @param toReindex
*/
public void scheduleReindexing(Object toReindex) {
if (toReindex == null) {
return;
}
HashSet<Object> visited = new HashSet<Object>();
if (toReindex instanceof Iterable<?>) {
// Iterate and test
Iterable<?> it = (Iterable<?>) toReindex;
it.forEach((item) -> {
maybeReindex(item, visited);
});
} else {
maybeReindex(toReindex, visited);
}
maybeReindex(toReindex, visited);
}
private void maybeReindex(Object toReindex, HashSet<Object> visited) {
......@@ -177,7 +138,7 @@ public class ElasticJPAListener implements InitializingBean {
}
private <T> void addNotNull(Collection<T> destination, T element) {
if (element != null) {
if (element != null && !destination.contains(element)) {
destination.add(element);
}
}
......@@ -190,14 +151,14 @@ public class ElasticJPAListener implements InitializingBean {
clazz = lazyInitializer.getPersistentClass();
if (isIndexed(clazz)) {
LOG.trace("Scheduling reindexing of {} {}", clazz, toReindex);
return new ElasticReindex(clazz.getName(), (Long) lazyInitializer.getIdentifier());
return new ElasticReindex(clazz, (Long) lazyInitializer.getIdentifier());
} else {
return null;
}
} else if (isIndexed(clazz)) {
if (toReindex instanceof EmptyModel) {
LOG.trace("Scheduling reindexing of {} {}", clazz, toReindex);
return new ElasticReindex(clazz.getName(), ((EmptyModel) toReindex).getId());
return new ElasticReindex(clazz, ((EmptyModel) toReindex).getId());
} else {
LOG.warn("Don't know how to index {}. Not an EmptyModel.", clazz.getName());
return null;
......@@ -210,7 +171,6 @@ public class ElasticJPAListener implements InitializingBean {
if (includedClasses.contains(clazz)) {
return true;
}
return false;
}
}
/*
* Copyright 2021 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys2.server.component.firehose;