Commit 94bdc9a7 authored by Artem Hrybeniuk's avatar Artem Hrybeniuk Committed by Matija Obreza
Browse files

Firehose: Firehose and Firehose ES proccesing

parent b05e8a99
......@@ -119,7 +119,7 @@ public class DatasetAndSubsetCountAspect {
taskExecutor.execute(() -> toReindex.forEach(accessionId -> {
LOG.trace("Scheduling reindexing of {} {}", Accession.class.getName(), accessionId.getId());
elasticReindexQueue.add(new ElasticReindex(Accession.class.getName(), accessionId.getId()));
elasticReindexQueue.add(new ElasticReindex(Accession.class, accessionId.getId()));
}));
}
......
......@@ -129,12 +129,12 @@ public class ExtraReindexingAspect {
if (toReindex instanceof AccessionId){
AccessionId accessionId = (AccessionId) toReindex;
LOG.trace("Scheduling reindexing of {} {}", Accession.class.getName(), accessionId.getId());
return new ElasticReindex(Accession.class.getName(), accessionId.getId());
return new ElasticReindex(Accession.class, accessionId.getId());
} else if (toReindex instanceof AccessionRef){
AccessionRef accessionRef = (AccessionRef) toReindex;
if (accessionRef.getAccession() != null) {
LOG.trace("Scheduling reindexing of {} {}", Accession.class.getName(), accessionRef.getAccession().getId());
return new ElasticReindex(Accession.class.getName(), accessionRef.getAccession().getId());
return new ElasticReindex(Accession.class, accessionRef.getAccession().getId());
}
}
return null;
......
......@@ -16,7 +16,12 @@
package org.genesys2.server.component.elastic;
import org.genesys.blocks.model.EmptyModel;
import org.genesys2.server.component.firehose.FirehoseEvent;
import org.hibernate.proxy.HibernateProxy;
import java.io.Serializable;
import java.util.Objects;
/**
* The Class ElasticReindex.
......@@ -24,7 +29,7 @@ import java.io.Serializable;
public class ElasticReindex implements Serializable {
private static final long serialVersionUID = -7779348224175021304L;
private String clazz;
private Class<?> clazz;
private Long id;
/**
......@@ -33,17 +38,29 @@ public class ElasticReindex implements Serializable {
* @param clazz the clazz
* @param id the id
*/
public ElasticReindex(String clazz, Long id) {
public ElasticReindex(Class<?> clazz, Long id) {
this.clazz = clazz;
if (HibernateProxy.class.isAssignableFrom(this.clazz)) {
System.err.println("You're trying to work with a HibernateProxy! " + this.clazz);
}
this.id = id;
}
public ElasticReindex(EmptyModel toReindex) {
this.clazz = toReindex.getClass();
if (toReindex instanceof HibernateProxy) {
HibernateProxy hib = (HibernateProxy) toReindex;
this.clazz = hib.getHibernateLazyInitializer().getPersistentClass();
}
this.id = toReindex.getId();
}
/**
* Gets the clazz.
*
* @return the clazz
*/
public String getClazz() {
public Class<?> getClazz() {
return clazz;
}
......@@ -58,32 +75,19 @@ public class ElasticReindex implements Serializable {
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((clazz == null) ? 0 : clazz.hashCode());
result = prime * result + ((id == null) ? 0 : id.hashCode());
return result;
return Objects.hash(clazz, id);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof FirehoseEvent)) return false;
ElasticReindex that = (ElasticReindex) o;
return Objects.equals(id, that.id) & Objects.equals(clazz, that.clazz);
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ElasticReindex other = (ElasticReindex) obj;
if (clazz == null) {
if (other.clazz != null)
return false;
} else if (!clazz.equals(other.clazz))
return false;
if (id == null) {
if (other.id != null)
return false;
} else if (!id.equals(other.id))
return false;
return true;
public String toString() {
return "ESRef " + clazz + " id=" + id;
}
}
......@@ -27,7 +27,6 @@ import java.util.concurrent.BlockingQueue;
import javax.annotation.Resource;
import org.genesys.blocks.model.BasicModel;
import org.genesys2.server.service.ElasticsearchService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -68,7 +67,7 @@ public class ElasticReindexProcessor {
}
List<ElasticReindex> forReindexing = new ArrayList<>(200);
Map<String, Set<Long>> buckets = Collections.synchronizedMap(new HashMap<String, Set<Long>>());
Map<Class<?>, Set<Long>> buckets = Collections.synchronizedMap(new HashMap<Class<?>, Set<Long>>());
while (elasticReindexQueue.drainTo(forReindexing, 200) > 0) {
LOG.debug("Remaining for reindex={} handling={}", elasticReindexQueue.size(), forReindexing.size());
......@@ -77,40 +76,29 @@ public class ElasticReindexProcessor {
}
if (!buckets.isEmpty()) {
for (var entry : buckets.entrySet()) {
try {
@SuppressWarnings("unchecked")
Class<? extends BasicModel> clazz = (Class<? extends BasicModel>) Class.forName(entry.getKey());
Set<Long> bucket = entry.getValue();
elasticsearch.asyncUpdate(clazz, bucket);
bucket.clear();
} catch (ClassNotFoundException e) {
LOG.error(e.getMessage(), e);
}
for (Map.Entry<Class<?>, Set<Long>> entry : buckets.entrySet()) {
Set<Long> bucket = entry.getValue();
elasticsearch.asyncUpdate(entry.getKey(), bucket);
bucket.clear();
}
buckets.clear();
}
}
private void bucketize(final Map<String, Set<Long>> buckets, final ElasticReindex toReindex) {
private void bucketize(final Map<Class<?>, Set<Long>> buckets, final ElasticReindex toReindex) {
if (toReindex == null)
return;
Set<Long> bucket = buckets.get(toReindex.getClazz());
Class<?> clazz = toReindex.getClazz();
Set<Long> bucket = buckets.get(clazz);
if (bucket == null) {
buckets.put(toReindex.getClazz(), bucket = Collections.synchronizedSet(new HashSet<Long>()));
buckets.put(clazz, bucket = Collections.synchronizedSet(new HashSet<Long>()));
}
bucket.add(toReindex.getId());
if (bucket.size() >= BATCH_SIZE) {
try {
@SuppressWarnings("unchecked")
Class<? extends BasicModel> clazz = (Class<? extends BasicModel>) Class.forName(toReindex.getClazz());
elasticsearch.asyncUpdate(clazz, bucket);
bucket.clear();
} catch (ClassNotFoundException e) {
LOG.error(e.getMessage(), e);
}
elasticsearch.asyncUpdate(clazz, bucket);
bucket.clear();
}
}
......
/*
* Copyright 2020 Global Crop Diversity Trust
* Copyright 2021 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -13,42 +13,42 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys2.server.component.elastic;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import javax.annotation.Resource;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.genesys.blocks.model.EmptyModel;
import org.genesys2.server.component.firehose.FirehoseDeleteAllEvent;
import org.genesys2.server.component.firehose.FirehoseEvent;
import org.genesys2.server.service.ElasticsearchService;
import org.genesys2.server.service.impl.SearchException;
import org.hibernate.proxy.HibernateProxy;
import org.hibernate.proxy.LazyInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.Advised;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalApplicationListener;
import org.springframework.transaction.event.TransactionalEventListener;
import javax.annotation.Resource;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
/**
* AspectJ powered listener on repository save and delete operations adds
* indexed entities to re-index queue, handled by
* {@link ElasticReindexProcessor}.
* The FirehoseReindexListener is a {@link TransactionalApplicationListener} that
* is attached to the {@link TransactionPhase} {@code TransactionPhase.AFTER_COMMIT}
* This event listener updates the Elasticsearch update queues immediately.
*
* @author Matija Obreza
*/
@Aspect
@Component("elasticJpaListener")
public class ElasticJPAListener implements InitializingBean {
private final static Logger LOG = LoggerFactory.getLogger(ElasticJPAListener.class);
@Component
public class FirehoseReindexListener implements InitializingBean {
private final static Logger LOG = LoggerFactory.getLogger(FirehoseReindexListener.class);
private List<Class<?>> includedClasses;
......@@ -58,101 +58,47 @@ public class ElasticJPAListener implements InitializingBean {
@Autowired
private ElasticsearchService elasticsearchService;
/**
* Instantiates a new elastic JPA listener.
*/
public ElasticJPAListener() {
System.err.println("Making ElasticJPAListener");
}
@Override
public void afterPropertiesSet() throws Exception {
includedClasses = elasticsearchService.getIndexedEntities();
}
/**
* After persist.
*
* @param joinPoint the join point
* @param result the result
*/
@AfterReturning(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.save(..)) || execution(* org.springframework.data.repository.*.saveAll(..)) || execution(* org.springframework.data.repository.*.saveAndFlush(..))", returning = "result")
public void afterPersist(final JoinPoint joinPoint, final Object result) {
LOG.trace("JPA afterPersist {} {}", joinPoint.toLongString(), joinPoint.getTarget());
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleEvent(FirehoseEvent firehoseEvent) {
Class<?> clazz = firehoseEvent.getClazz();
Object entity = firehoseEvent.getEntity();
try {
scheduleReindexing(result);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
if (isIndexed(clazz) && entity instanceof EmptyModel) {
scheduleReindexing(entity);
}
}
/**
* Before remove.
*
* @param joinPoint the join point
*/
@After(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.delete(..)) || execution(* org.springframework.data.repository.*.deleteAll(Iterable)) || execution(* org.springframework.data.jpa.repository.JpaRepository.deleteInBatch(Iterable))")
public void afterRemove(final JoinPoint joinPoint) {
final Object[] args = joinPoint.getArgs();
try {
final Object removed = args[0];
LOG.debug("JPA afterRemove: {} {}", joinPoint.toLongString(), joinPoint.getTarget());
scheduleReindexing(removed);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
@EventListener(classes = { FirehoseDeleteAllEvent.class })
// @TransactionalEventListener doesn't work in unit tests
// @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleDeleteAllEvent(FirehoseDeleteAllEvent firehoseDeleteAllEvent) {
if (!isIndexed(firehoseDeleteAllEvent.getClazz())) {
return;
}
}
/**
* Delete all
*
* @param joinPoint the join point
*/
@After(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.deleteAll()) || execution(* org.springframework.data.jpa.repository.JpaRepository.deleteAllInBatch())")
public void afterDeleteAll(final JoinPoint joinPoint) {
// Remove from created and updated only
var clazz = firehoseDeleteAllEvent.getClazz();
LOG.debug("Removing all queued items of {}", clazz);
elasticReindexQueue.removeIf(reference -> Objects.equals(reference.getClazz(), clazz));
try {
LOG.debug("JPA afterDeleteAll: {} {}", joinPoint.toLongString(), joinPoint.getTarget());
Object proxy = joinPoint.getTarget();
if (proxy instanceof Advised) {
Advised x = (Advised) proxy;
for (Class<?> foo : x.getProxiedInterfaces()) {
for (Type generic : foo.getGenericInterfaces()) {
if (generic instanceof ParameterizedType) {
Class<?> paramClass = (Class<?>) ((ParameterizedType) generic).getActualTypeArguments()[0];
if (isIndexed(paramClass)) {
LOG.debug("Deleting all documents for {}", paramClass);
elasticsearchService.removeAll(paramClass);
}
}
}
}
}
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
elasticsearchService.removeAll(clazz);
} catch (SearchException e) {
LOG.warn("Error reindexing {}: {}", clazz, e.getMessage()); //, e);
}
}
/**
* Flatten collections (if Iterable) and check each object class if indexed
*
* @param toReindex
*/
public void scheduleReindexing(Object toReindex) {
if (toReindex == null) {
return;
}
HashSet<Object> visited = new HashSet<Object>();
if (toReindex instanceof Iterable<?>) {
// Iterate and test
Iterable<?> it = (Iterable<?>) toReindex;
it.forEach((item) -> {
maybeReindex(item, visited);
});
} else {
maybeReindex(toReindex, visited);
}
maybeReindex(toReindex, visited);
}
private void maybeReindex(Object toReindex, HashSet<Object> visited) {
......@@ -190,14 +136,14 @@ public class ElasticJPAListener implements InitializingBean {
clazz = lazyInitializer.getPersistentClass();
if (isIndexed(clazz)) {
LOG.trace("Scheduling reindexing of {} {}", clazz, toReindex);
return new ElasticReindex(clazz.getName(), (Long) lazyInitializer.getIdentifier());
return new ElasticReindex(clazz, (Long) lazyInitializer.getIdentifier());
} else {
return null;
}
} else if (isIndexed(clazz)) {
if (toReindex instanceof EmptyModel) {
LOG.trace("Scheduling reindexing of {} {}", clazz, toReindex);
return new ElasticReindex(clazz.getName(), ((EmptyModel) toReindex).getId());
return new ElasticReindex(clazz, ((EmptyModel) toReindex).getId());
} else {
LOG.warn("Don't know how to index {}. Not an EmptyModel.", clazz.getName());
return null;
......@@ -210,7 +156,6 @@ public class ElasticJPAListener implements InitializingBean {
if (includedClasses.contains(clazz)) {
return true;
}
return false;
}
}
/*
* Copyright 2021 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys2.server.component.firehose;
import java.util.Date;
import java.util.Objects;
/**
*
* @author Matija Obreza
*/
public class FirehoseDeleteAllEvent {
private Class<?> clazz;
private Date timestamp;
public FirehoseDeleteAllEvent(Class<?> clazz, Date timestamp) {
this.clazz = clazz;
this.timestamp = timestamp;
}
public Class<?> getClazz() {
return clazz;
}
public Date getTimestamp() {
return timestamp;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof FirehoseDeleteAllEvent)) return false;
FirehoseDeleteAllEvent that = (FirehoseDeleteAllEvent) o;
return Objects.equals(timestamp, that.timestamp) && Objects.equals(clazz, that.clazz);
}
@Override
public int hashCode() {
return Objects.hash(timestamp, clazz);
}
}
/*
* Copyright 2021 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys2.server.component.firehose;
import java.util.Date;
import java.util.Objects;
/**
*
* @author Artem Hrybeniuk
*/
public class FirehoseEvent {
private Class<?> clazz;
private Long id;
private Date timestamp;
private Object entity;
private EventType eventType;
public static enum EventType {
CREATE, UPDATE, DELETE
}
public FirehoseEvent(Class<?> clazz, Long id, Date timestamp, EventType eventType) {
this.clazz = clazz;
this.id = id;
this.timestamp = timestamp;
this.eventType = eventType;
}
public FirehoseEvent(Class<?> clazz, Long id, Date timestamp, EventType eventType, Object entity) {
this.clazz = clazz;
this.id = id;
this.timestamp = timestamp;
this.eventType = eventType;
this.entity = entity;
}
public Class<?> getClazz() {
return clazz;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Date getTimestamp() {
return timestamp;
}
public void updateTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
public Object getEntity() {
return entity;
}
public void setEntity(Object entity) {
this.entity = entity;
}
public EventType getEventType() {
return eventType;
}
@Override
public String toString() {
return "Firehose." + this.eventType + " for type " + this.clazz + " id=" + this.id + " @" + this.timestamp;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof FirehoseEvent)) return false;
FirehoseEvent that = (FirehoseEvent) o;
return Objects.equals(timestamp, that.timestamp) && Objects.equals(id, that.id) && Objects.equals(clazz, that.clazz);
}
@Override
public int hashCode() {
return Objects.hash(timestamp, clazz, id);
}
/**
* Compare by timestamp, then id, then clazz
*
* @param a
* @param b
* @return
*/
public static int compareTo(FirehoseEvent a, FirehoseEvent b) {