Commit 8f28e9c9 authored by Matija Obreza's avatar Matija Obreza
Browse files

Defer relinking of AccessionRefs for Datasets and Subsets

parent d028e392
......@@ -15,6 +15,8 @@
*/
package org.genesys.catalog.persistence.dataset;
import java.util.List;
import org.genesys.catalog.model.dataset.Dataset;
import org.genesys.catalog.model.dataset.DatasetAccessionRef;
import org.springframework.data.domain.Page;
......@@ -40,6 +42,13 @@ public interface DatasetAccessionRefRepository {
*/
Iterable<DatasetAccessionRef> save(Iterable<DatasetAccessionRef> entities);
/**
* Update records.
*
* @param entities the existing DatasetAccessionRefs to update
*/
void update(Iterable<DatasetAccessionRef> entities);
/**
* Delete for dataset.
*
......@@ -47,6 +56,12 @@ public interface DatasetAccessionRefRepository {
* @return the long
*/
long deleteForDataset(Dataset dataset);
/**
* @param dataset
* @return
*/
List<DatasetAccessionRef> findAll(Dataset dataset);
/**
* Find all.
......
......@@ -15,14 +15,24 @@
*/
package org.genesys.catalog.persistence.dataset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Join;
import javax.persistence.criteria.Path;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
import org.genesys.catalog.model.dataset.Dataset;
import org.genesys.catalog.model.dataset.DatasetAccessionRef;
import org.genesys.catalog.model.dataset.QDatasetAccessionRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
......@@ -32,7 +42,6 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import com.google.common.collect.Iterables;
import com.querydsl.core.types.dsl.BooleanExpression;
import com.querydsl.jpa.impl.JPAQuery;
import com.querydsl.jpa.impl.JPAQueryFactory;
......@@ -41,30 +50,66 @@ import com.querydsl.jpa.impl.JPAQueryFactory;
*/
@Repository
public class DatasetAccessionRefRepositoryCustomImpl implements DatasetAccessionRefRepository {
private final static Logger LOG = LoggerFactory.getLogger(DatasetAccessionRefRepository.class);
private static final Predicate[] EMPTY_PREDICATE_ARRAY = new Predicate[] {};
@Autowired
private JPAQueryFactory jpaQueryFactory;
@PersistenceContext
private EntityManager em;
@Override
@Transactional(readOnly = true)
public List<DatasetAccessionRef> findAll(Dataset dataset) {
return jpaQueryFactory.selectFrom(QDatasetAccessionRef.datasetAccessionRef).where(QDatasetAccessionRef.datasetAccessionRef.dataset.eq(dataset)).fetch();
}
@Override
@Transactional
public void update(Iterable<DatasetAccessionRef> entities) {
for (DatasetAccessionRef entity: entities) {
em.merge(entity);
}
}
/* (non-Javadoc)
* @see org.genesys.catalog.persistence.dataset.DatasetAccessionRefRepository#save(java.lang.Iterable)
*/
@Override
@Transactional
public Iterable<DatasetAccessionRef> save(Iterable<DatasetAccessionRef> entities) {
Iterables.partition(entities, 50).forEach(batch -> {
System.err.println("Batch size " + batch.size());
JPAQuery<DatasetAccessionRef> q = jpaQueryFactory.selectFrom(QDatasetAccessionRef.datasetAccessionRef);
for (DatasetAccessionRef ref : entities) {
BooleanExpression p = QDatasetAccessionRef.datasetAccessionRef.dataset.eq(ref.getDataset()).and(
QDatasetAccessionRef.datasetAccessionRef.instCode.eq(ref.getInstCode())).and(
QDatasetAccessionRef.datasetAccessionRef.acceNumb.eq(ref.getAcceNumb())).and(
QDatasetAccessionRef.datasetAccessionRef.genus.eq(ref.getGenus()));
q.where(p);
final CriteriaBuilder criteriaBuilder = em.getCriteriaBuilder();
final AtomicInteger batchCounter = new AtomicInteger(0);
Iterables.partition(entities, 1000).forEach(batch -> {
LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), batch.size());
CriteriaQuery<DatasetAccessionRef> cq = criteriaBuilder.createQuery(DatasetAccessionRef.class);
Root<DatasetAccessionRef> root = cq.from(DatasetAccessionRef.class);
Join<Object, Object> dataset = root.join("dataset");
cq.distinct(true);
// cq.select(root);
// Path<Object> theDoi = root.get("doi");
Path<Object> theDatasetId = dataset.get("id");
Path<Object> theInstCode = root.get("instCode");
Path<Object> theAcceNumb = root.get("acceNumb");
Path<Object> theGenus = root.get("genus");
List<Predicate> restrictions = new ArrayList<Predicate>();
// A lot of .. (instCode=? and acceNumb=? and genus=?)
for (DatasetAccessionRef ref : batch) {
restrictions.add(criteriaBuilder.and(criteriaBuilder.equal(theDatasetId, ref.getDataset().getId()), criteriaBuilder.equal(theInstCode, ref
.getInstCode()), criteriaBuilder.equal(theAcceNumb, ref
.getAcceNumb()), criteriaBuilder.equal(theGenus, ref.getGenus())));
}
List<DatasetAccessionRef> existing = q.fetch();
for (DatasetAccessionRef entity : entities) {
cq.where(criteriaBuilder.or(restrictions.toArray(EMPTY_PREDICATE_ARRAY)));
List<DatasetAccessionRef> existing = em.createQuery(cq).getResultList();
LOG.debug("Got {} existing records", existing.size());
for (DatasetAccessionRef entity : batch) {
if (existing.contains(entity)) {
em.merge(entity);
} else {
......@@ -72,6 +117,7 @@ public class DatasetAccessionRefRepositoryCustomImpl implements DatasetAccession
}
}
});
LOG.info("Done persisting {} AccessionRef batches", batchCounter.get());
return entities;
}
......
......@@ -81,16 +81,22 @@ import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.security.access.prepost.PostAuthorize;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.acls.domain.BasePermission;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.multipart.MultipartFile;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.querydsl.core.BooleanBuilder;
import com.querydsl.core.types.Predicate;
......@@ -153,6 +159,13 @@ public class DatasetServiceImpl implements DatasetService {
@Value("${file.repository.datasets.folder}")
public String datasetRepositoryPath;
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
private PlatformTransactionManager transactionManager;
/**
* {@inheritDoc}
*/
......@@ -590,15 +603,41 @@ public class DatasetServiceImpl implements DatasetService {
LOG.warn("Matching " + accessionRefs.size() + " with Accessions");
List<DatasetAccessionRef> dArs = Lists.newArrayList();
lookupMatchingAccessions(accessionRefs).forEach(ref -> {
accessionRefs.forEach(ref -> {
DatasetAccessionRef dAr = new DatasetAccessionRef(ref);
dAr.setDataset(loadedDataset);
dArs.add(dAr);
});
LOG.warn("Done matching " + accessionRefs.size() + " with " + dArs.size() + " Accessions");
accessionRefRepository.save(dArs);
loadedDataset.setAccessionCount((int) accessionRefRepository.countByDataset(loadedDataset));
System.err.println("accessionCount=" + loadedDataset.getAccessionCount());
LOG.warn("Done saving {} accession refs, have {} in dataset", accessionRefs.size(), loadedDataset.getAccessionCount());
threadPoolTaskExecutor.execute(() -> {
try {
Thread.sleep(2000);
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
// explicitly setting the transaction name is something that can only be done
// programmatically
def.setName("SomeTxName");
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
TransactionStatus status = transactionManager.getTransaction(def);
try {
// execute your business logic here
rematchDatasetAccessions(loadedDataset);
} catch (Throwable ex) {
LOG.error("Rolling back rematch. Exception: {}", ex.getMessage(), ex);
transactionManager.rollback(status);
throw ex;
}
transactionManager.commit(status);
LOG.warn("Done relinking records");;
} catch (InterruptedException e) {
LOG.warn("Interrupted rematcher");
}
});
return lazyLoad(datasetRepository.save(loadedDataset));
}
......@@ -911,54 +950,59 @@ public class DatasetServiceImpl implements DatasetService {
*
* @return accessionRefs with matching accessions from Genesys
*/
private <T extends Collection<? extends AccessionRef>> T lookupMatchingAccessions(final T accessionRefs) {
private Collection<DatasetAccessionRef> lookupMatchingAccessions(final Collection<DatasetAccessionRef> accessionRefs) throws InterruptedException {
StopWatch stopWatch = StopWatch.createStarted();
accessionRefs.forEach(ref -> ref.trimStringsToNull());
List<AccessionRef> list = new ArrayList<AccessionRef>(accessionRefs);
List<Accession> foundAccessions = accessionRepository.find(list);
LOG.warn("Found {} matches for {} identifiers after {}ms", foundAccessions.size(), accessionRefs.size(), stopWatch.getTime());
// instCode / genus / acceNumb map
Map<String, Map<String, Map<String, Accession>>> instMap = new HashMap<>();
Map<String, Accession> doiAccessions = new HashMap<>();
foundAccessions.forEach(found -> {
if (found.getDoi() != null) {
doiAccessions.put(found.getDoi(), found);
}
Map<String, Map<String, Accession>> instGenusMap = instMap.get(found.getInstCode());
if (instGenusMap == null) {
instMap.put(found.getInstCode(), instGenusMap = new HashMap<>());
}
Map<String, Accession> genusMap = instGenusMap.get(found.getGenus());
if (genusMap == null) {
instGenusMap.put(found.getGenus(), genusMap = new HashMap<>());
}
genusMap.put(found.getAccessionNumber(), found);
});
accessionRefs.forEach(ref -> {
ref.setAccession(null);
if (StringUtils.isNotBlank(ref.getDoi())) {
Accession doiAcce = doiAccessions.get(ref.getDoi());
if (doiAcce != null) {
ref.setAccession(doiAcce);
return;
final AtomicInteger batchCounter = new AtomicInteger(0);
Iterables.partition(accessionRefs, 1000).forEach(batch -> {
LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), batch.size());
List<Accession> foundAccessions = accessionRepository.find(batch);
LOG.warn("Found {} matches for {} identifiers after {}ms", foundAccessions.size(), batch.size(), stopWatch.getTime());
// instCode / genus / acceNumb map
Map<String, Map<String, Map<String, Accession>>> instMap = new HashMap<>();
Map<String, Accession> doiAccessions = new HashMap<>();
foundAccessions.forEach(found -> {
if (found.getDoi() != null) {
doiAccessions.put(found.getDoi(), found);
}
}
Map<String, Map<String, Accession>> instGenusMap = instMap.get(ref.getInstCode());
if (instGenusMap != null) {
Map<String, Accession> genusMap = instGenusMap.get(ref.getGenus());
if (genusMap != null) {
ref.setAccession(genusMap.get(ref.getAcceNumb()));
Map<String, Map<String, Accession>> instGenusMap = instMap.get(found.getInstCode());
if (instGenusMap == null) {
instMap.put(found.getInstCode(), instGenusMap = new HashMap<>());
}
}
Map<String, Accession> genusMap = instGenusMap.get(found.getGenus());
if (genusMap == null) {
instGenusMap.put(found.getGenus(), genusMap = new HashMap<>());
}
genusMap.put(found.getAccessionNumber(), found);
});
if (ref.getAccession() == null) {
LOG.debug("No match for {}", ref);
}
batch.forEach(ref -> {
ref.setAccession(null);
if (StringUtils.isNotBlank(ref.getDoi())) {
Accession doiAcce = doiAccessions.get(ref.getDoi());
if (doiAcce != null) {
ref.setAccession(doiAcce);
return;
}
}
Map<String, Map<String, Accession>> instGenusMap = instMap.get(ref.getInstCode());
if (instGenusMap != null) {
Map<String, Accession> genusMap = instGenusMap.get(ref.getGenus());
if (genusMap != null) {
ref.setAccession(genusMap.get(ref.getAcceNumb()));
}
}
if (ref.getAccession() == null) {
LOG.debug("No match for {}", ref);
}
});
});
LOG.warn("Matched accessions after {}ms", stopWatch.getTime());
......@@ -980,8 +1024,12 @@ public class DatasetServiceImpl implements DatasetService {
public Dataset rematchDatasetAccessions(Dataset dataset) {
dataset = datasetRepository.findByUuid(dataset.getUuid());
LOG.warn("Linking {} accessions with dataset {}", dataset.getAccessionCount(), dataset.getId());
accessionRefRepository.save(lookupMatchingAccessions(dataset.getAccessionRefs()));
return datasetRepository.save(dataset);
try {
accessionRefRepository.update(lookupMatchingAccessions(accessionRefRepository.findAll(dataset)));
} catch (InterruptedException e) {
LOG.warn("AccessionRef matching interrupted.");
}
return dataset;
}
@Override
......
......@@ -16,6 +16,7 @@
package org.genesys2.server.persistence;
import java.util.Collection;
import java.util.List;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.model.impl.Subset;
......@@ -43,6 +44,13 @@ public interface SubsetAccessionRefRepository {
*/
Iterable<SubsetAccessionRef> save(Iterable<SubsetAccessionRef> entities);
/**
* Update records.
*
* @param entities the existing SubsetAccessionRefs to update
*/
void update(Iterable<SubsetAccessionRef> entities);
/**
* Delete for subset.
*
......@@ -50,6 +58,12 @@ public interface SubsetAccessionRefRepository {
* @return the long
*/
long deleteForSubset(Subset subset);
/**
* @param subset the subset
* @return
*/
List<SubsetAccessionRef> findAll(Subset subset);
/**
* Delete.
......
......@@ -15,16 +15,27 @@
*/
package org.genesys2.server.persistence;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Join;
import javax.persistence.criteria.Path;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
import org.genesys.catalog.persistence.dataset.DatasetAccessionRefRepository;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.model.impl.QSubsetAccessionRef;
import org.genesys2.server.model.impl.Subset;
import org.genesys2.server.model.impl.SubsetAccessionRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
......@@ -34,7 +45,6 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import com.google.common.collect.Iterables;
import com.querydsl.core.types.dsl.BooleanExpression;
import com.querydsl.jpa.impl.JPAQuery;
import com.querydsl.jpa.impl.JPAQueryFactory;
......@@ -43,29 +53,65 @@ import com.querydsl.jpa.impl.JPAQueryFactory;
*/
@Repository
public class SubsetAccessionRefRepositoryCustomImpl implements SubsetAccessionRefRepository {
private final static Logger LOG = LoggerFactory.getLogger(DatasetAccessionRefRepository.class);
private static final Predicate[] EMPTY_PREDICATE_ARRAY = new Predicate[] {};
@Autowired
private JPAQueryFactory jpaQueryFactory;
@PersistenceContext
private EntityManager em;
@Override
@Transactional(readOnly = true)
public List<SubsetAccessionRef> findAll(final Subset subset) {
return jpaQueryFactory.selectFrom(QSubsetAccessionRef.subsetAccessionRef).where(QSubsetAccessionRef.subsetAccessionRef.subset.eq(subset)).fetch();
}
@Override
@Transactional
public void update(Iterable<SubsetAccessionRef> entities) {
for (SubsetAccessionRef entity: entities) {
em.merge(entity);
}
}
/* (non-Javadoc)
* @see org.genesys2.server.persistence.SubsetAccessionRefRepository#save(java.lang.Iterable)
*/
@Override
public Iterable<SubsetAccessionRef> save(Iterable<SubsetAccessionRef> entities) {
Iterables.partition(entities, 50).forEach(batch -> {
JPAQuery<SubsetAccessionRef> q = jpaQueryFactory.selectFrom(QSubsetAccessionRef.subsetAccessionRef);
for (SubsetAccessionRef ref : entities) {
BooleanExpression p = QSubsetAccessionRef.subsetAccessionRef.subset.eq(ref.getSubset()).and(
QSubsetAccessionRef.subsetAccessionRef.instCode.eq(ref.getInstCode())).and(
QSubsetAccessionRef.subsetAccessionRef.acceNumb.eq(ref.getAcceNumb())).and(
QSubsetAccessionRef.subsetAccessionRef.genus.eq(ref.getGenus()));
q.where(p);
final CriteriaBuilder criteriaBuilder = em.getCriteriaBuilder();
final AtomicInteger batchCounter = new AtomicInteger(0);
Iterables.partition(entities, 1000).forEach(batch -> {
LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), batch.size());
CriteriaQuery<SubsetAccessionRef> cq = criteriaBuilder.createQuery(SubsetAccessionRef.class);
Root<SubsetAccessionRef> root = cq.from(SubsetAccessionRef.class);
Join<Object, Object> subset = root.join("subset");
cq.distinct(true);
// cq.select(root);
// Path<Object> theDoi = root.get("doi");
Path<Object> theSubsetId = subset.get("id");
Path<Object> theInstCode = root.get("instCode");
Path<Object> theAcceNumb = root.get("acceNumb");
Path<Object> theGenus = root.get("genus");
List<Predicate> restrictions = new ArrayList<Predicate>();
// A lot of .. (instCode=? and acceNumb=? and genus=?)
for (SubsetAccessionRef ref : batch) {
restrictions.add(criteriaBuilder.and(criteriaBuilder.equal(theSubsetId, ref.getSubset().getId()), criteriaBuilder.equal(theInstCode, ref
.getInstCode()), criteriaBuilder.equal(theAcceNumb, ref
.getAcceNumb()), criteriaBuilder.equal(theGenus, ref.getGenus())));
}
List<SubsetAccessionRef> existing = q.fetch();
for (SubsetAccessionRef entity : entities) {
cq.where(criteriaBuilder.or(restrictions.toArray(EMPTY_PREDICATE_ARRAY)));
List<SubsetAccessionRef> existing = em.createQuery(cq).getResultList();
LOG.debug("Got {} existing records", existing.size());
for (SubsetAccessionRef entity : batch) {
if (existing.contains(entity)) {
em.merge(entity);
} else {
......@@ -73,6 +119,7 @@ public class SubsetAccessionRefRepositoryCustomImpl implements SubsetAccessionRe
}
}
});
LOG.info("Done persisting {} AccessionRef batches", batchCounter.get());
return entities;
}
......
......@@ -18,7 +18,6 @@ package org.genesys2.server.service.impl;
import static org.genesys2.server.model.impl.QSubsetCreator.subsetCreator;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
......@@ -33,7 +32,6 @@ import javax.validation.Valid;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.genesys.catalog.persistence.dataset.DatasetAccessionRefRepository;
import org.genesys2.server.exception.InvalidApiUsageException;
import org.genesys2.server.exception.NotFoundElement;
import org.genesys2.server.model.PublishState;
......@@ -63,15 +61,21 @@ import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.security.access.prepost.PostAuthorize;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.acls.domain.BasePermission;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.validation.annotation.Validated;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.querydsl.core.BooleanBuilder;
import com.querydsl.core.types.Predicate;
......@@ -107,6 +111,11 @@ public class SubsetServiceImpl implements SubsetService {
@Autowired
private SubsetAccessionRefRepository accessionRefRepository;
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
private PlatformTransactionManager transactionManager;
/**
* {@inheritDoc}
......@@ -323,14 +332,41 @@ public class SubsetServiceImpl implements SubsetService {
}
List<SubsetAccessionRef> sArs = Lists.newArrayList();
lookupMatchingAccessions(accessionRefs).forEach(ref -> {
accessionRefs.forEach(ref -> {
SubsetAccessionRef sAr = new SubsetAccessionRef(ref);
sAr.setSubset(subset);
sArs.add(sAr);
});
LOG.warn("Done matching " + accessionRefs.size() + " with " + sArs.size() + " Accessions");
accessionRefRepository.save(sArs);
subset.setAccessionCount((int) accessionRefRepository.countBySubset(subset));
LOG.warn("Done saving {} accession refs, have {} in subset", accessionRefs.size(), subset.getAccessionCount());
threadPoolTaskExecutor.execute(() -> {
try {
Thread.sleep(2000);