Commit 7137c510 authored by Matija Obreza's avatar Matija Obreza
Browse files

Update ES data using aspects

parent 7f50de75
......@@ -31,7 +31,7 @@ import org.genesys2.server.model.BusinessModel;
*/
@Entity
@Table(name = "accessionalias")
public class AccessionAlias extends BusinessModel {
public class AccessionAlias extends BusinessModel implements AccessionRelated {
/**
*
......@@ -99,6 +99,7 @@ public class AccessionAlias extends BusinessModel {
this.version = version;
}
@Override
public Accession getAccession() {
return accession;
}
......
......@@ -33,7 +33,7 @@ import org.hibernate.annotations.Type;
*/
@Entity
@Table(name = "accessionbreeding")
public class AccessionBreeding extends BusinessModel {
public class AccessionBreeding extends BusinessModel implements AccessionRelated {
/**
*
......@@ -66,6 +66,7 @@ public class AccessionBreeding extends BusinessModel {
this.version = version;
}
@Override
public Accession getAccession() {
return accession;
}
......
......@@ -33,7 +33,7 @@ import org.hibernate.annotations.Type;
*/
@Entity
@Table(name = "accessioncollect")
public class AccessionCollect extends BusinessModel {
public class AccessionCollect extends BusinessModel implements AccessionRelated {
/**
*
......@@ -85,6 +85,7 @@ public class AccessionCollect extends BusinessModel {
this.version = version;
}
@Override
public Accession getAccession() {
return accession;
}
......
......@@ -31,7 +31,7 @@ import org.genesys2.server.model.BusinessModel;
*/
@Entity
@Table(name = "accessionexchange")
public class AccessionExchange extends BusinessModel {
public class AccessionExchange extends BusinessModel implements AccessionRelated {
/**
*
......@@ -65,6 +65,7 @@ public class AccessionExchange extends BusinessModel {
this.version = version;
}
@Override
public Accession getAccession() {
return accession;
}
......
......@@ -29,7 +29,7 @@ import org.genesys2.server.model.impl.GeoReferencedEntity;
@Entity
@Table(name = "accessiongeo")
public class AccessionGeo extends BusinessModel implements GeoReferencedEntity {
public class AccessionGeo extends BusinessModel implements GeoReferencedEntity, AccessionRelated {
/**
*
......@@ -62,6 +62,7 @@ public class AccessionGeo extends BusinessModel implements GeoReferencedEntity {
this.version = version;
}
@Override
public Accession getAccession() {
return accession;
}
......
package org.genesys2.server.model.genesys;
public interface AccessionRelated {
Accession getAccession();
}
......@@ -32,7 +32,7 @@ import org.genesys2.server.model.BusinessModel;
*/
@Entity
@Table(name = "accessionremark")
public class AccessionRemark extends BusinessModel {
public class AccessionRemark extends BusinessModel implements AccessionRelated {
@Version
private long version = 0;
......@@ -59,6 +59,7 @@ public class AccessionRemark extends BusinessModel {
this.version = version;
}
@Override
public Accession getAccession() {
return accession;
}
......
......@@ -29,4 +29,10 @@ public interface ElasticService {
Page<AccessionDetails> search(String query, Pageable pageable) throws SearchException;
void remove(Class<?> clazz, long id);
void update(Class<?> clazz, long id);
void refreshIndex(Class<?> clazz);
}
......@@ -123,7 +123,7 @@ public interface GenesysService {
void updateAccessionInstitueRefs();
void saveAccessions(FaoInstitute institute, List<Accession> matching);
List<Accession> saveAccessions(FaoInstitute institute, List<Accession> matching);
void saveSvalbards(List<SvalbardData> svalbards);
......@@ -131,7 +131,7 @@ public interface GenesysService {
Set<Long> filterAvailable(Set<Long> accessionIds);
void saveAccession(Accession... accession);
List<Accession> saveAccession(Accession... accession);
void updateAccessionCount(FaoInstitute institute);
......@@ -161,7 +161,7 @@ public interface GenesysService {
List<AccessionGeo> listAccessionsGeo(Set<Long> copy);
void removeAccessions(FaoInstitute institute, List<Accession> toDelete);
List<Accession> removeAccessions(FaoInstitute institute, List<Accession> toDelete);
void setInSvalbard(List<Accession> matching);
......
......@@ -2,6 +2,7 @@ package org.genesys2.server.service.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -51,6 +52,13 @@ public class ElasticsearchSearchServiceImpl implements ElasticService {
@Autowired
private CropService cropService;
private final Map<Class<?>, Class<?>> clazzMap;
public ElasticsearchSearchServiceImpl() {
clazzMap = new HashMap<Class<?>, Class<?>>();
clazzMap.put(Accession.class, AccessionDetails.class);
}
@Override
public Page<AccessionDetails> search(String query, Pageable pageable) throws SearchException {
SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(org.elasticsearch.index.query.QueryBuilders.queryString(query))
......@@ -121,6 +129,55 @@ public class ElasticsearchSearchServiceImpl implements ElasticService {
return iq;
}
@Override
public void update(Class<?> clazz, long id) {
if (!clazzMap.containsKey(clazz)) {
return;
}
Object eo = toElasticObject(clazz, id);
if (eo == null)
return;
IndexQuery iq = new IndexQuery();
iq.setId(String.valueOf(id));
iq.setObject(eo);
LOG.info("Indexing " + clazz + " id=" + id);
elasticsearchTemplate.index(iq);
}
@Override
public void remove(Class<?> clazz, long id) {
Class<?> clazz2 = clazzMap.get(clazz);
if (clazz2 == null) {
return;
}
LOG.info("Removing from index " + clazz2 + " id=" + id);
elasticsearchTemplate.delete(clazz2, String.valueOf(id));
}
private Object toElasticObject(Class<?> clazz, long id) {
if (clazz == Accession.class) {
return genesysService.getAccessionDetails(id);
}
LOG.warn("Unsupported class " + clazz);
return null;
}
@Override
public void refreshIndex(Class<?> clazz) {
Class<?> clazz2 = clazzMap.get(clazz);
if (clazz2 == null) {
return;
}
LOG.info("Refresing index " + clazz2);
elasticsearchTemplate.refresh(clazz2, true);
}
@Override
public void reindexByFilter(String jsonFilter, boolean slow) {
......
......@@ -47,7 +47,6 @@ import org.genesys2.server.service.GenesysFilterService.GenesysFilter.DataType;
import org.genesys2.server.service.GenesysFilterService.GenesysFilter.FilterType;
import org.genesys2.server.service.GeoService;
import org.genesys2.server.service.InstituteService;
import org.genesys2.server.service.SearchService;
import org.genesys2.server.service.TaxonomyService;
import org.genesys2.server.service.TraitService;
import org.genesys2.server.service.impl.DirectMysqlQuery.MethodResolver;
......@@ -97,9 +96,6 @@ public class GenesysFilterServiceImpl implements GenesysFilterService {
@Autowired
private InstituteService instituteService;
@Autowired
private SearchService searchService;
@Autowired
private TaxonomyService taxonomyService;
......
......@@ -298,7 +298,7 @@ public class GenesysServiceImpl implements GenesysService, TraitService, Dataset
}
return accession;
}
@Override
public AccessionDetails getAccessionDetails(long accessionId) {
Accession accession = getAccession(accessionId);
......@@ -311,7 +311,8 @@ public class GenesysServiceImpl implements GenesysService, TraitService, Dataset
ad.geo(listAccessionGeo(accession));
ad.svalbard(getSvalbardData(accession));
ad.remarks(listAccessionRemarks(accession));
// ad.traits(listMethods(accession), getAccessionTraitValues(accession));
// ad.traits(listMethods(accession),
// getAccessionTraitValues(accession));
ad.crops(cropService.getCrops(accession.getTaxonomy()));
return ad;
}
......@@ -587,28 +588,32 @@ public class GenesysServiceImpl implements GenesysService, TraitService, Dataset
@Override
@Transactional(readOnly = false)
public void saveAccessions(FaoInstitute institute, List<Accession> accessions) {
public List<Accession> saveAccessions(FaoInstitute institute, List<Accession> accessions) {
if (LOG.isDebugEnabled()) {
LOG.debug("Saving " + accessions.size() + " accessions");
}
accessionRepository.save(accessions);
updateAccessionCount(institute);
return accessions;
}
@Override
@Transactional(readOnly = false)
public void saveAccession(Accession... accession) {
public List<Accession> saveAccession(Accession... accession) {
List<Accession> list = new ArrayList<Accession>();
for (final Accession a : accession) {
if (LOG.isDebugEnabled())
LOG.debug("Updating " + a);
accessionRepository.save(a);
list.add(a);
}
return list;
}
@Override
@Transactional(readOnly = false)
@PreAuthorize("hasRole('ADMINISTRATOR') or hasPermission(#institute, 'DELETE') or hasPermission(#institute, 'MANAGE')")
public void removeAccessions(FaoInstitute institute, List<Accession> toDelete) {
public List<Accession> removeAccessions(FaoInstitute institute, List<Accession> toDelete) {
if (toDelete.size() > 0) {
final Set<Long> accessionIds = new HashSet<Long>();
for (final Accession accn : toDelete) {
......@@ -628,6 +633,8 @@ public class GenesysServiceImpl implements GenesysService, TraitService, Dataset
updateAccessionCount(institute);
}
return toDelete;
}
@Override
......
/**
* Copyright 2014 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.genesys2.server.service.worker;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.genesys2.server.service.GenesysService;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.stereotype.Component;
/**
* Component that receives updated or deleted accession IDs and uses a
* background thread to refresh ES
*
* @author matijaobreza
*/
@Component
public class ElasticUpdater implements DisposableBean, InitializingBean {
public static final Log LOG = LogFactory.getLog(ElasticUpdater.class);
@Autowired
private ElasticsearchTemplate elasticsearchTemplate;
@Autowired
private GenesysService genesysService;
@Autowired
private ElasticUpdaterProcessor processor;
private Thread workerThread;
private ConcurrentLinkedQueue<ElasticNode> toRemove;
private ConcurrentLinkedQueue<ElasticNode> toUpdate;
private Object lock = new Object();
private Object semaphore = new Object();
public ElasticUpdater() {
toRemove = new ConcurrentLinkedQueue<ElasticNode>();
toUpdate = new ConcurrentLinkedQueue<ElasticNode>();
}
/**
* Schedule objects for removal
*
* @param clazz
* @param ids
*/
public void remove(Class<?> clazz, Long id) {
LOG.info("Removing " + clazz + " " + id);
ElasticNode node = new ElasticNode(clazz, id);
synchronized (lock) {
toUpdate.remove(node);
toRemove.add(node);
synchronized (semaphore) {
semaphore.notifyAll();
}
}
}
/**
* Schedule objects for removal
*
* @param clazz
* @param ids
*/
public void removeAll(Class<?> clazz, Long... ids) {
Set<ElasticNode> nodes = makeNodes(clazz, ids);
if (nodes.isEmpty())
return;
synchronized (lock) {
toUpdate.removeAll(nodes);
toRemove.addAll(nodes);
synchronized (semaphore) {
semaphore.notifyAll();
}
}
}
/**
* Schedule objects for update
*
* @param clazz
* @param ids
*/
public void update(Class<?> clazz, Long id) {
LOG.info("Updating " + clazz + " " + id);
ElasticNode node = new ElasticNode(clazz, id);
synchronized (lock) {
toRemove.remove(node);
// reinsert all to end of queue
toUpdate.remove(node);
toUpdate.add(node);
synchronized (semaphore) {
semaphore.notifyAll();
}
}
}
/**
* Schedule objects for update
*
* @param clazz
* @param ids
*/
public void updateAll(Class<?> clazz, Long... ids) {
Set<ElasticNode> nodes = makeNodes(clazz, ids);
if (nodes.isEmpty())
return;
synchronized (lock) {
toRemove.removeAll(nodes);
// reinsert all to end of queue
toUpdate.removeAll(nodes);
toUpdate.addAll(nodes);
synchronized (semaphore) {
semaphore.notifyAll();
}
}
}
/**
* Make a Set of ElasticNodes
*
* @param clazz
* @param ids
* @return
*/
private Set<ElasticNode> makeNodes(Class<?> clazz, Long[] ids) {
HashSet<ElasticNode> set = new HashSet<ElasticNode>();
for (Long id : ids) {
set.add(new ElasticNode(clazz, id));
}
return set;
}
public static class ElasticNode {
private final Class<?> clazz;
private final long id;
public ElasticNode(final Class<?> clazz, final Long id) {
this.clazz = clazz;
this.id = id;
}
public long getId() {
return id;
}
public Class<?> getClazz() {
return clazz;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((clazz == null) ? 0 : clazz.hashCode());
result = prime * result + (int) (id ^ (id >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ElasticNode other = (ElasticNode) obj;
if (clazz == null) {
if (other.clazz != null)
return false;
} else if (!clazz.equals(other.clazz))
return false;
if (id != other.id)
return false;
return true;
}
}
@Override
public void destroy() throws Exception {
LOG.info("Shutting down processor");
processor.shutdown();
workerThread.join(2000);
}
@Override
public void afterPropertiesSet() throws Exception {
this.processor.init(lock, semaphore, toRemove, toUpdate);
LOG.info("Starting thread!");
workerThread = new Thread(processor, "genesys-ES-processor");
workerThread.setPriority(3);
workerThread.start();
}
}
/**
* Copyright 2014 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.genesys2.server.service.worker;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.model.genesys.AccessionRelated;
import org.genesys2.server.model.genesys.SvalbardData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* Aspects that trigger updates of ES
*/
@Aspect
@Component
public class ElasticUpdaterAspect {
public static final Log LOG = LogFactory.getLog(ElasticUpdaterAspect.class);
@Autowired
private ElasticUpdater elasticUpdater;
@AfterReturning(value = "execution(* org.genesys2.server.service.GenesysService.save*(..))", returning = "result")
public Object afterSave(final JoinPoint joinPoint, final Object result) throws Throwable {
LOG.info("Returning from " + joinPoint.toLongString());
if (result != null) {
LOG.info("Result " + result.getClass());
if (result instanceof Collection<?>) {
LOG.info("Scanning collection for ES data...");
Collection<?> list = (Collection<?>) result;
if (list.isEmpty())