Commit f2f17819 authored by Matija Obreza's avatar Matija Obreza

Fixed reindexing and institute#accessionCount updating

- Using delayed queue to update accession numbers
- Update Collections properly
parent efccf89d
......@@ -648,7 +648,7 @@
<httpConnector>
<port>8080</port>
</httpConnector>
<stopPort>8888</stopPort>
<stopPort>8998</stopPort>
<stopKey>stop</stopKey>
<jvmArgs>-Dspring.profiles.active=dev -Xmx2048M -Xms1024M -XX:MaxPermSize=128M -Djava.awt.headless=true -server -Dorg.eclipse.jetty.server.Request.maxFormContentSize=5000000</jvmArgs>
</configuration>
......
......@@ -37,6 +37,7 @@ import javax.persistence.OrderBy;
import javax.persistence.PrePersist;
import javax.persistence.Table;
import org.genesys.blocks.auditlog.annotations.Audited;
import org.genesys.blocks.model.AuditedVersionedModel;
import org.genesys.blocks.model.IdUUID;
import org.genesys.blocks.model.JsonViews;
......@@ -53,6 +54,7 @@ import com.fasterxml.jackson.annotation.JsonView;
@Entity
@Inheritance(strategy = InheritanceType.JOINED)
@Table(name = "acce")
@Audited
public class AccessionId extends AuditedVersionedModel implements IdUUID {
/**
......
......@@ -57,14 +57,14 @@ public interface FaoInstituteRepository extends JpaRepository<FaoInstitute, Long
@Query("from FaoInstitute fi where fi.id in ( ?1 )")
List<FaoInstitute> findByIds(List<Long> oids, Sort sort);
@Modifying
@Query("update FaoInstitute fi set accessionCount=(select count(a) from Accession a where a.institute = :institute ) where fi=:institute")
void updateInstituteAccessionCount(@Param("institute") FaoInstitute institute);
@Query("select distinct fi from FaoInstitute fi where lower(fi.code) like lower(?1) or lower(fi.fullName) like lower(?1) or lower(fi.acronym) like lower(?1) order by fi.accessionCount desc")
List<FaoInstitute> autocomplete(String string, Pageable pageable);
@Query("select fi from FaoInstitute fi where fi.maintainsCollection=true and fi.current=true order by fi.code")
Page<FaoInstitute> listPGRInstitutes(Pageable pageable);
@Modifying
@Query("update FaoInstitute fi set accessionCount=:accessionCount where fi=:institute")
void updateAccessionCount(@Param("institute") FaoInstitute institute, @Param("accessionCount") long accessionCount);
}
......@@ -142,7 +142,7 @@ public interface GenesysService {
List<Accession> saveAccessions(Iterable<Accession> accession);
FaoInstitute updateAccessionCount(FaoInstitute institute);
void updateAccessionCount(FaoInstitute institute);
List<SvalbardDeposit> getSvalbardData(AccessionId accession);
......
......@@ -708,13 +708,11 @@ public class GenesysServiceImpl implements GenesysService, DatasetService {
@Override
@Transactional
// @CacheEvict(value = "statistics", allEntries = true)
public FaoInstitute updateAccessionCount(FaoInstitute institute) {
if (institute == null)
return institute;
long accessionCount = accessionRepository.countByInstitute(institute);
institute = instituteRepository.getOne(institute.getId());
institute.setAccessionCount(accessionCount);
return instituteRepository.save(institute);
public void updateAccessionCount(FaoInstitute institute) {
if (institute != null) {
long accessionCount = accessionRepository.countByInstitute(institute);
instituteRepository.updateAccessionCount(institute, accessionCount);
}
}
/**
......
/*
* Copyright 2018 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys2.server.service.worker;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.genesys2.server.model.impl.FaoInstitute;
import org.genesys2.server.service.GenesysService;
import org.genesys2.server.service.InstituteService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* Updating {@link FaoInstitute} accession count during upsert and delete is not
* efficient. This component keeps requests for updates in a delayed queue, keeping
* the most recent request for update.
*
* It processes expired recounts every X seconds.
*
* @author Matija Obreza
*/
@Component
public class AccessionCounter implements InitializingBean, DisposableBean {
private static final Logger LOG = LoggerFactory.getLogger(AccessionCounter.class);
private DelayQueue<DelayedObject<String>> instituteQueue;
@Autowired
InstituteService instituteService;
@Autowired
private GenesysService genesysService;
@Override
public void afterPropertiesSet() throws Exception {
this.instituteQueue = new DelayQueue<>();
}
@Override
public void destroy() throws Exception {
LOG.info("Running {} re-counts before shutdown", this.instituteQueue.size());
synchronized (instituteQueue) {
for (DelayedObject<String> forProcessing : this.instituteQueue) {
process(forProcessing);
}
}
}
public void recountInstitute(FaoInstitute institute) {
if (institute == null) {
return;
}
DelayedObject<String> delay = new DelayedObject<String>(institute.getCode(), 2000);
synchronized (instituteQueue) {
LOG.trace("Queue size {}, contains {} = {}", instituteQueue.size(), delay.getObj(), instituteQueue.contains(delay));
if (! instituteQueue.remove(delay)) {
LOG.debug("Element was not removed {}", delay.getObj());
}
instituteQueue.put(delay);
}
LOG.trace("Scheduled re-count for {}", institute.getCode());
}
@Scheduled(fixedDelay = 2000)
public void processQueues() {
DelayedObject<String> forProcessing = null;
do {
synchronized (instituteQueue) {
forProcessing = instituteQueue.poll();
}
process(forProcessing);
} while (forProcessing != null);
}
private void process(DelayedObject<String> forProcessing) {
if (forProcessing == null) {
return;
}
FaoInstitute institute = instituteService.findInstitute(forProcessing.getObj());
if (institute != null) {
LOG.info("Updating count for {}", institute.getCode());
genesysService.updateAccessionCount(institute);
}
}
public static class DelayedObject<T> implements Delayed {
private T obj;
private long expiryTime;
/**
*
* @param delay delay in milliseconds
*/
DelayedObject(T obj, long delay) {
this.obj = obj;
this.expiryTime = System.currentTimeMillis() + delay;
}
public T getObj() {
return this.obj;
}
@Override
public int compareTo(Delayed other) {
if (this == other) {
return 0;
}
return Long.compare(getDelay(TimeUnit.MILLISECONDS), other.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expiryTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((obj == null) ? 0 : obj.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
DelayedObject<?> other = (DelayedObject<?>) obj;
if (this.obj == null) {
if (other.obj != null)
return false;
} else if (!this.obj.equals(other.obj))
return false;
return true;
}
}
}
......@@ -18,6 +18,7 @@ package org.genesys2.server.service.worker;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
......@@ -80,6 +81,9 @@ public class AccessionUploader implements InitializingBean {
@Autowired
private FaoInstituteRepository faoInstituteRepository;
@Autowired
private AccessionCounter accessionCounter;
@Autowired
private GeoService geoService;
......@@ -203,7 +207,7 @@ public class AccessionUploader implements InitializingBean {
toSave = accessionRepository.save(toSave);
LOG.trace("Saved: {}", toSave);
accessionCounter.recountInstitute(institute);
return responses;
}
......@@ -391,6 +395,7 @@ public class AccessionUploader implements InitializingBean {
}
}
@SuppressWarnings("rawtypes")
private <T> void copy(Class<T> clazz, T source, T target, Iterator<String> fieldNames) throws IllegalArgumentException, IllegalAccessException {
String fieldName = null;
while (fieldNames.hasNext() && (fieldName = fieldNames.next()) != null) {
......@@ -399,7 +404,15 @@ public class AccessionUploader implements InitializingBean {
if (field != null) {
ReflectionUtils.makeAccessible(field);
final Object srcValue = field.get(source);
field.set(target, srcValue);
final Object dest = field.get(source);
// handle collections better
if (srcValue instanceof Collection && dest != null) {
Collection collection = (Collection) dest;
collection.clear();
collection.addAll((Collection) srcValue);
} else {
field.set(target, srcValue);
}
LOG.trace("Set {} to {}", fieldName, srcValue);
} else {
LOG.warn("No such field {}.{}", clazz.getName(), fieldName);
......@@ -446,6 +459,7 @@ public class AccessionUploader implements InitializingBean {
accessionRepository.delete(toRemove);
LOG.trace("Deleted: {}", toRemove);
}
accessionCounter.recountInstitute(institute);
return responses;
}
......
......@@ -33,8 +33,6 @@ import org.aspectj.lang.annotation.Pointcut;
import org.genesys2.server.model.elastic.AccessionDetails;
import org.genesys2.server.model.elastic.FullTextDocument;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.model.genesys.AccessionId;
import org.genesys2.server.model.genesys.AccessionRelated;
import org.genesys2.server.model.impl.ActivityPost;
import org.genesys2.server.model.impl.Article;
import org.genesys2.server.model.impl.Country;
......@@ -60,7 +58,7 @@ public class ElasticUpdaterAspect {
@Autowired
private GenesysService genesysService;
@Pointcut(value = "execution(* org.genesys2.server.service.GenesysService.save*(..)) || execution(* org.genesys2.server.service.GenesysService.update*(..))")
@Pointcut(value = "execution(* org.genesys2.server.persistence.domain.AccessionRepository.save*(..))")
public void accessionUpdates() {
}
......@@ -106,47 +104,13 @@ public class ElasticUpdaterAspect {
if (o instanceof Accession) {
elasticUpdater.update(Accession.class, ((Accession) o).getId());
}
if (o instanceof AccessionRelated) {
elasticUpdater.update(Accession.class, ((AccessionRelated) o).getAccession().getId());
}
}
/**
* Handle removal of accession related objects
*
* @param joinPoint
* @param result
* @return
* @throws Throwable
*/
@AfterReturning(value = "execution(* org.genesys2.server.service.GenesysService.remove*(..))", returning = "result")
public Object afterRemove(final JoinPoint joinPoint, final Object result) throws Throwable {
if (LOG.isTraceEnabled()) {
LOG.trace("Returning from {} result={}", joinPoint.toLongString(), result);
}
if (result instanceof Collection<?>) {
Collection<?> list = (Collection<?>) result;
if (list.isEmpty())
return result;
for (Object o : list) {
if (o == null)
continue;
// if (o instanceof Accession) {
// elasticUpdater.remove(Accession.class, ((Accession)
// o).getId());
// }
if (o instanceof AccessionRelated) {
AccessionId acceId = ((AccessionRelated) o).getAccession();
if (acceId != null)
elasticUpdater.update(Accession.class, acceId.getId());
}
}
}
return result;
// if (o instanceof AccessionRelated) {
// elasticUpdater.update(Accession.class, ((AccessionRelated)
// o).getAccession().getId());
// }
}
/**
* Handle accession removal
*
......@@ -156,17 +120,27 @@ public class ElasticUpdaterAspect {
* @return
* @throws Throwable
*/
@Around(value = "execution(* org.genesys2.server.service.GenesysService.removeAccessions(..)) && args(institute, accessions)")
public Object aroundRemoveAccession(final ProceedingJoinPoint joinPoint, final FaoInstitute institute, final Collection<Accession> accessions) throws Throwable {
@Around(value = "execution(* org.genesys2.server.persistence.domain.AccessionRepository.delete*(..)) && args(obj)")
public Object aroundRemoveAccession(final ProceedingJoinPoint joinPoint, final Object obj) throws Throwable {
if (LOG.isTraceEnabled()) {
LOG.trace("Returning from {}", joinPoint.toLongString());
}
Collection<Long> accessionIds = new ArrayList<Long>(accessions.size());
for (Accession a : accessions) {
if (a != null && a.getId() != null)
Collection<Long> accessionIds = new ArrayList<Long>();
if (obj instanceof Long) {
accessionIds.add((Long)obj);
} else if (obj instanceof Accession) {
Accession a = (Accession)obj;
if (a.isPersisted())
accessionIds.add(a.getId());
} else if (obj instanceof Iterable<?>) {
for (Accession a : (Iterable<Accession>) obj) {
if (a.isPersisted())
accessionIds.add(a.getId());
}
}
// Need to load AccessionDetails before deleting
List<AccessionDetails> deletedAccessions = genesysService.getAccessionDetails(accessionIds);
......
......@@ -291,9 +291,6 @@ public class AccessionController extends RestController {
LOG.info("Retrying delete one by one due to {}", e.getMessage());
response = deleteAccessions1by1(institute, batch);
}
// Force update institute#accessionCount (outside previous
// transaction)
genesysService.updateAccessionCount(institute);
return response;
} catch (CannotAcquireLockException e) {
......
......@@ -125,6 +125,7 @@ import org.genesys2.server.service.impl.TaxonomyServiceImpl;
import org.genesys2.server.service.impl.TeamServiceImpl;
import org.genesys2.server.service.impl.TokenVerificationServiceImpl;
import org.genesys2.server.service.impl.UserServiceImpl;
import org.genesys2.server.service.worker.AccessionCounter;
import org.genesys2.server.service.worker.AccessionUploader;
import org.genesys2.server.service.worker.ElasticUpdater;
import org.genesys2.server.service.worker.GeoRegionDataCLDR;
......@@ -462,6 +463,11 @@ public abstract class AbstractRestTest extends BaseSpringTest {
public AccessionUploader uploader() {
return new AccessionUploader();
}
@Bean
public AccessionCounter accessionCounter() {
return new AccessionCounter();
}
@Bean
public AccessionController accessionController() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment