Commit 6f26e70c authored by Matija Obreza's avatar Matija Obreza
Browse files

Firehose: Implementation updates

- Switch event queues from Set to Map, fixes slow processing when using HZ-backed sets
- Fixed ElasticReindex#equals
- Removed ExtraReindexingAspect component
- Hazelcast event sets implementation
parent ccf15d2e
/*
* Copyright 2019 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys2.server.component.aspect;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import javax.annotation.Resource;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.genesys2.server.component.elastic.ElasticReindex;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.model.genesys.AccessionId;
import org.genesys2.server.model.genesys.AccessionRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Modifications to AccessionId do not trigger ES reindexing. This aspects
* addresses that.
*
* @author Matija Obreza
*/
@Aspect
public class ExtraReindexingAspect {
private final static Logger LOG = LoggerFactory.getLogger(ExtraReindexingAspect.class);
@Resource
private BlockingQueue<ElasticReindex> elasticReindexQueue;
/**
* After AccessionIdRepository#save
*
* @param joinPoint the join point
* @param result the result
*/
@AfterReturning(value = "execution(* org.genesys2.server.persistence.AccessionIdRepository.save(..))", returning = "result")
public void afterPersist(final JoinPoint joinPoint, final Object result) {
LOG.trace("accessionIdRepository#save {} {}", joinPoint.toLongString(), joinPoint.getTarget());
try {
scheduleReindexing(result);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}
/**
* After AccessionIdRepository#save
*
* @param joinPoint the join point
* @param result the result
*/
@AfterReturning(value = "execution(* org.genesys2.server.persistence.AccessionRefRepository.save(..))", returning = "result")
public void afterPersistAccessionRef(final JoinPoint joinPoint, final Object result) {
LOG.trace("accessionRefRepository#save {} {}", joinPoint.toLongString(), joinPoint.getTarget());
try {
scheduleReindexing(result);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}
/**
* After AccessionRefRepository#delete
*
* @param joinPoint the join point
*/
@AfterReturning(value = "execution(* org.genesys2.server.persistence.AccessionRefRepository.delete(..))")
public void afterDeleteAccessionRef(final JoinPoint joinPoint) {
LOG.trace("accessionRefRepository#delete {} {}", joinPoint.toLongString(), joinPoint.getTarget());
try {
final Object[] args = joinPoint.getArgs();
scheduleReindexing(args[0]);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}
/**
* Flatten collections (if Iterable) and check each object class if indexed
*
* @param toReindex
*/
private void scheduleReindexing(Object toReindex) {
if (toReindex == null) {
return;
}
if (toReindex instanceof Iterable<?>) {
// Iterate and test
Iterable<?> it = (Iterable<?>) toReindex;
List<ElasticReindex> list = new ArrayList<>();
it.forEach((item) -> addNotNull(list, forReindexing(item)));
elasticReindexQueue.addAll(list);
} else {
addNotNull(elasticReindexQueue, forReindexing(toReindex));
}
}
private <T> void addNotNull(Collection<T> destination, T element) {
if (element != null) {
destination.add(element);
}
}
private ElasticReindex forReindexing(Object toReindex) {
if (toReindex instanceof AccessionId){
AccessionId accessionId = (AccessionId) toReindex;
LOG.trace("Scheduling reindexing of {} {}", Accession.class.getName(), accessionId.getId());
return new ElasticReindex(Accession.class, accessionId.getId());
} else if (toReindex instanceof AccessionRef){
AccessionRef accessionRef = (AccessionRef) toReindex;
if (accessionRef.getAccession() != null) {
LOG.trace("Scheduling reindexing of {} {}", Accession.class.getName(), accessionRef.getAccession().getId());
return new ElasticReindex(Accession.class, accessionRef.getAccession().getId());
}
}
return null;
}
}
......@@ -17,7 +17,6 @@
package org.genesys2.server.component.elastic;
import org.genesys.blocks.model.EmptyModel;
import org.genesys2.server.component.firehose.FirehoseEvent;
import org.hibernate.proxy.HibernateProxy;
import java.io.Serializable;
......@@ -81,7 +80,7 @@ public class ElasticReindex implements Serializable {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof FirehoseEvent)) return false;
if (!(o instanceof ElasticReindex)) return false;
ElasticReindex that = (ElasticReindex) o;
return Objects.equals(id, that.id) & Objects.equals(clazz, that.clazz);
}
......
......@@ -65,6 +65,9 @@ public class ElasticReindexProcessor {
if (LOG.isTraceEnabled()) {
LOG.trace("Scanning ES update queue size={}", elasticReindexQueue.size());
}
if (elasticReindexQueue.isEmpty()) {
return;
}
List<ElasticReindex> forReindexing = new ArrayList<>(200);
Map<Class<?>, Set<Long>> buckets = Collections.synchronizedMap(new HashMap<Class<?>, Set<Long>>());
......
......@@ -15,9 +15,20 @@
*/
package org.genesys2.server.component.elastic;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import javax.annotation.Resource;
import org.genesys.blocks.model.EmptyModel;
import org.genesys2.server.component.firehose.FirehoseDeleteAllEvent;
import org.genesys2.server.component.firehose.FirehoseEvent;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.model.genesys.AccessionId;
import org.genesys2.server.model.genesys.AccessionRef;
import org.genesys2.server.service.ElasticsearchService;
import org.genesys2.server.service.impl.SearchException;
import org.hibernate.proxy.HibernateProxy;
......@@ -31,13 +42,6 @@ import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalApplicationListener;
import org.springframework.transaction.event.TransactionalEventListener;
import javax.annotation.Resource;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
/**
* The FirehoseReindexListener is a {@link TransactionalApplicationListener} that
* is attached to the {@link TransactionPhase} {@code TransactionPhase.AFTER_COMMIT}
......@@ -67,7 +71,19 @@ public class FirehoseReindexListener implements InitializingBean {
Class<?> clazz = firehoseEvent.getClazz();
Object entity = firehoseEvent.getEntity();
if (isIndexed(clazz) && entity instanceof EmptyModel) {
if (entity instanceof AccessionId) {
AccessionId accessionId = (AccessionId) entity;
LOG.trace("Scheduling reindexing of {} {}", Accession.class.getName(), accessionId.getId());
addNotNull(elasticReindexQueue, new ElasticReindex(Accession.class, accessionId.getId()));
} else if (entity instanceof AccessionRef){
AccessionRef<?> accessionRef = (AccessionRef<?>) entity;
if (accessionRef.getAccession() != null) {
LOG.trace("Scheduling reindexing of {} {}", Accession.class.getName(), accessionRef.getAccession().getId());
addNotNull(elasticReindexQueue, new ElasticReindex(Accession.class, accessionRef.getAccession().getId()));
}
} else if (isIndexed(clazz) && entity instanceof EmptyModel) {
scheduleReindexing(entity);
}
}
......@@ -122,7 +138,7 @@ public class FirehoseReindexListener implements InitializingBean {
}
private <T> void addNotNull(Collection<T> destination, T element) {
if (element != null) {
if (element != null && !destination.contains(element)) {
destination.add(element);
}
}
......
......@@ -15,6 +15,9 @@
*/
package org.genesys2.server.component.firehose;
import org.jetbrains.annotations.NotNull;
import java.io.Serializable;
import java.util.Date;
import java.util.Objects;
......@@ -22,17 +25,21 @@ import java.util.Objects;
*
* @author Artem Hrybeniuk
*/
public class FirehoseEvent {
public class FirehoseEvent implements Comparable<FirehoseEvent>, Serializable {
private static final long serialVersionUID = 5481828485125320364L;
private Class<?> clazz;
private Class<?> clazz; // TODO Maybe just className?
private Long id;
private String key;
private Date timestamp;
private Object entity;
transient private Object entity;
private EventType eventType;
public static enum EventType {
CREATE, UPDATE, DELETE
}
......@@ -40,6 +47,8 @@ public class FirehoseEvent {
public FirehoseEvent(Class<?> clazz, Long id, Date timestamp, EventType eventType) {
this.clazz = clazz;
this.id = id;
this.key = (this.id == null ? "null" : this.id) + ":" + (this.clazz == null ? "null" : this.clazz.getName());
this.timestamp = timestamp;
this.eventType = eventType;
}
......@@ -47,6 +56,8 @@ public class FirehoseEvent {
public FirehoseEvent(Class<?> clazz, Long id, Date timestamp, EventType eventType, Object entity) {
this.clazz = clazz;
this.id = id;
this.key = (this.id == null ? "null" : this.id) + ":" + (this.clazz == null ? "null" : this.clazz.getName());
this.timestamp = timestamp;
this.eventType = eventType;
this.entity = entity;
......@@ -62,6 +73,7 @@ public class FirehoseEvent {
public void setId(Long id) {
this.id = id;
this.key = (this.id == null ? "null" : this.id) + ":" + (this.clazz == null ? "null" : this.clazz.getName());
}
public Date getTimestamp() {
......@@ -103,25 +115,23 @@ public class FirehoseEvent {
}
/**
*
* Compare by timestamp, then id, then clazz
*
* @param a
* @param b
* @return
*/
public static int compareTo(FirehoseEvent a, FirehoseEvent b) {
if (a.timestamp.equals(b.timestamp)) {
if (b.id == null && a.id == null) {
@Override
public int compareTo(@NotNull FirehoseEvent firehoseEvent) {
if (this.timestamp.equals(firehoseEvent.timestamp)) {
if (firehoseEvent.id == null && this.id == null) {
return 0;
} else if (b.id == null) {
} else if (firehoseEvent.id == null) {
return -1;
} else if (a.id == null) {
} else if (this.id == null) {
return 1;
} else {
return (int) (a.id - b.id);
return (int) (this.id - firehoseEvent.id);
}
} else {
return a.timestamp.compareTo(b.timestamp);
return this.timestamp.compareTo(firehoseEvent.timestamp);
}
}
......@@ -135,6 +145,15 @@ public class FirehoseEvent {
if (b == null) {
return false;
}
return Objects.equals(id, b.id) && Objects.equals(clazz, b.clazz);
return Objects.equals(key, b.key);
}
/**
* Object key
*
* @return id+className
*/
public String getKey() {
return this.key;
}
}
......@@ -15,14 +15,18 @@
*/
package org.genesys2.server.component.firehose;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalApplicationListener;
import org.springframework.transaction.event.TransactionalEventListener;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.Set;
/**
*
* The FirehoseEventListener is a {@link TransactionalApplicationListener} that
......@@ -35,14 +39,16 @@ import java.util.Set;
// Instantiated in FirehoseConfig! @Component("firehoseEventListener")
public class FirehoseEventListener {
public static final Logger LOG = LoggerFactory.getLogger(FirehoseEventListener.class);
@Resource(name = "updatedEventSet")
private Set<FirehoseEvent> updatedEvents;
private Map<String, FirehoseEvent> updatedEvents;
@Resource(name = "removedEventSet")
private Set<FirehoseEvent> removedEvents;
private Map<String, FirehoseEvent> removedEvents;
@Resource(name = "createdEventSet")
private Set<FirehoseEvent> createdEvents;
private Map<String, FirehoseEvent> createdEvents;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleEvent(FirehoseEvent firehoseEvent) {
......@@ -50,11 +56,11 @@ public class FirehoseEventListener {
if (FirehoseEvent.EventType.DELETE == eventType) {
// Remove given FirehoseEvent from CreatedEvents and UpdatedEvents
createdEvents.removeIf(firehoseEvent::sameReference);
updatedEvents.removeIf(firehoseEvent::sameReference);
removedEvents.add(firehoseEvent);
createdEvents.remove(firehoseEvent.getKey());
updatedEvents.remove(firehoseEvent.getKey());
removedEvents.put(firehoseEvent.getKey(), firehoseEvent);
} else if (FirehoseEvent.EventType.CREATE.equals(eventType)) {
createdEvents.add(firehoseEvent);
createdEvents.put(firehoseEvent.getKey(), firehoseEvent);
} else if (FirehoseEvent.EventType.UPDATE.equals(eventType)) {
putUpdatedEvent(firehoseEvent);
}
......@@ -64,29 +70,37 @@ public class FirehoseEventListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleDeleteAllEvent(FirehoseDeleteAllEvent firehoseDeleteAllEvent) {
// Remove from created and updated only
createdEvents.removeIf(event -> Objects.equals(event.getClazz(), firehoseDeleteAllEvent.getClazz()));
updatedEvents.removeIf(event -> Objects.equals(event.getClazz(), firehoseDeleteAllEvent.getClazz()));
var deletedType = firehoseDeleteAllEvent.getClazz();
// hazelcastMap.entrySet() is immutable!
// Unsupported: createdEvents.entrySet().removeIf(entry-> Objects.equals(entry.getValue().getClazz(), deletedType));
for (var event : new ArrayList<>(createdEvents.values())) {
if (Objects.equals(event.getClazz(), deletedType)) {
createdEvents.remove(event.getKey());
}
}
// Unsupported: updatedEvents.entrySet().removeIf(entry -> Objects.equals(entry.getValue().getClazz(), firehoseDeleteAllEvent.getClazz()));
for (var event : new ArrayList<>(updatedEvents.values())) {
if (Objects.equals(event.getClazz(), deletedType)) {
updatedEvents.remove(event.getKey());
}
}
}
private void putUpdatedEvent(FirehoseEvent firehoseEvent) {
if (LOG.isTraceEnabled()) {
LOG.trace("Replacing {} {}", firehoseEvent.getEventType(), firehoseEvent.getKey());
}
// replace equal FirehoseEvent in CreatedEvents if contains
boolean replaceCreatedEvent = replaceIfContains(createdEvents, firehoseEvent);
if (!replaceCreatedEvent) {
// replace equal FirehoseEvent in UpdatedEvents if contains
boolean replaceUpdatedEvent = replaceIfContains(updatedEvents, firehoseEvent);
if (!replaceUpdatedEvent) {
// add FirehoseEvent to UpdatedEvents
updatedEvents.add(firehoseEvent);
}
if (! replaceCreatedEvent) {
// replace/put FirehoseEvent to UpdatedEvents
updatedEvents.put(firehoseEvent.getKey(), firehoseEvent);
}
}
private boolean replaceIfContains(Set<FirehoseEvent> events, FirehoseEvent firehoseEvent) {
private boolean replaceIfContains(Map<String, FirehoseEvent> events, FirehoseEvent firehoseEvent) {
// replace Entity and ModifiedDate for equal FirehoseEvent in given Collection
if (events.removeIf(firehoseEvent::sameReference)) {
events.add(firehoseEvent);
return true;
}
return false;
return events.replace(firehoseEvent.getKey(), firehoseEvent) != null;
}
}
......@@ -15,18 +15,23 @@
*/
package org.genesys2.server.component.firehose;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeSet;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.genesys2.server.component.firehose.FirehoseEvent.EventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.NavigableSet;
/**
*
* Inspects the insert/update/delete queues and issues notifications.
......@@ -42,19 +47,19 @@ public class FirehoseMessageProcessor {
private int firehoseDelay;
@Resource(name = "updatedEventSet")
private NavigableSet<FirehoseEvent> updatedEvents;
private Map<String, FirehoseEvent> updatedEvents;
@Resource(name = "removedEventSet")
private NavigableSet<FirehoseEvent> removedEvents;
private Map<String, FirehoseEvent> removedEvents;
@Resource(name = "createdEventSet")
private NavigableSet<FirehoseEvent> createdEvents;
private Map<String, FirehoseEvent> createdEvents;
@Scheduled(fixedDelay = 1000)
public void processQueues() {
// Date stopDate = new Date(System.currentTimeMillis() - (firehoseDelay * 1000 * 60)); // delay in minutes
Date stopDate = new Date(System.currentTimeMillis() - (firehoseDelay * 1000)); // delay in seconds
LOG.trace("Scanning repository changes objects before: {}", stopDate);
LOG.trace("Scanning for Firehose events before: {}", stopDate);
List<FirehoseEvent> removedEventsForSending = getEventsBeforeStopDate(removedEvents, stopDate);
List<FirehoseEvent> createdEventsForSending = getEventsBeforeStopDate(createdEvents, stopDate);
......@@ -66,34 +71,36 @@ public class FirehoseMessageProcessor {
}
private void processEvents(EventType eventType, List<FirehoseEvent> events) {
if (events == null) {
if (CollectionUtils.isEmpty(events)) {
return;
}
events.forEach(firehoseEvent -> {
if (eventType == FirehoseEvent.EventType.CREATE) {
LOG.info("Sending CREATE notification for {} id={} @ {}", firehoseEvent.getClazz(), firehoseEvent.getId(), firehoseEvent.getTimestamp());
LOG.debug("Sending CREATE notification for {} id={} @ {}", firehoseEvent.getClazz(), firehoseEvent.getId(), firehoseEvent.getTimestamp());
} else if (eventType == FirehoseEvent.EventType.UPDATE) {
LOG.info("Sending UPDATE notification for {} id={} @ {}", firehoseEvent.getClazz(), firehoseEvent.getId(), firehoseEvent.getTimestamp());
LOG.debug("Sending UPDATE notification for {} id={} @ {}", firehoseEvent.getClazz(), firehoseEvent.getId(), firehoseEvent.getTimestamp());
} else if (eventType == FirehoseEvent.EventType.DELETE) {
LOG.info("Sending DELETE notification for {} id={} @ {}", firehoseEvent.getClazz(), firehoseEvent.getId(), firehoseEvent.getTimestamp());
LOG.debug("Sending DELETE notification for {} id={} @ {}", firehoseEvent.getClazz(), firehoseEvent.getId(), firehoseEvent.getTimestamp());
}
});
}
private List<FirehoseEvent> getEventsBeforeStopDate(NavigableSet<FirehoseEvent> events, Date stopDate) {
private List<FirehoseEvent> getEventsBeforeStopDate(Map<String, FirehoseEvent> events, Date stopDate) {
if (events.isEmpty()) {
return null;
}
var eventsBeforeStopDate = events.headSet(new FirehoseEvent(null, null, stopDate, null));
NavigableSet<FirehoseEvent> eventNavigableSet = Collections.synchronizedNavigableSet(new TreeSet<>(events.values()));
var eventsBeforeStopDate = eventNavigableSet.headSet(new FirehoseEvent(null, null, stopDate, null));
if (eventsBeforeStopDate.size() == 0) {
return null;
}
LOG.debug("Matched {} of {} events", eventsBeforeStopDate.size(), events.size());
var eventsForSending = new ArrayList<>(eventsBeforeStopDate);
eventsBeforeStopDate.clear();
LOG.debug("Keeping {} events, remaining {} of {} events", eventsForSending.size(), eventsBeforeStopDate.size(), events.size());
eventsForSending.forEach(event -> events.remove(event.getKey()));
if (LOG.isDebugEnabled()) {
LOG.debug("Matched {} of {} events, keeping {} events", eventsBeforeStopDate.size(), eventNavigableSet.size(), events.size());
}
return eventsForSending;
}
......
......@@ -40,7 +40,6 @@ import org.genesys.catalog.model.traits.DescriptorList;
import org.genesys.catalog.model.vocab.ControlledVocabulary;
import org.genesys.catalog.model.vocab.VocabularyTerm;
import org.genesys.custom.elasticsearch.EmbeddedNodeFactoryBean;
import org.genesys2.server.component.aspect.ExtraReindexingAspect;
import org.genesys2.server.component.elastic.ElasticJacksonAnnotationIntrospector;
import org.genesys2.server.component.elastic.ElasticReindex;
import org.genesys2.server.component.elastic.ElasticReindexProcessor;
......@@ -108,14 +107,7 @@ public class ElasticsearchConfig extends ElasticsearchConfigurationSupport {
@Value("${elasticsearch.embedded.portRange:random}")
private String embeddedPortRange;
/**
* Register aspect to handle extra ES reindexing not covered by other mechanisms.
*/
@Bean