Commit 36250557 authored by Matija Obreza's avatar Matija Obreza

Introducing AccessionRefsMatcher component

- Takes proper care of transactions
parent 8d736a7f
......@@ -19,6 +19,7 @@ import java.util.List;
import org.genesys.catalog.model.dataset.Dataset;
import org.genesys.catalog.model.dataset.DatasetAccessionRef;
import org.genesys2.server.persistence.AccessionRefRepository;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Repository;
......@@ -27,35 +28,8 @@ import org.springframework.stereotype.Repository;
* The DatasetAccessionRefRepository.
*/
@Repository
public interface DatasetAccessionRefRepository {
public interface DatasetAccessionRefRepository extends AccessionRefRepository<DatasetAccessionRef> {
/**
* Delete all.
*/
void deleteAll();
/**
* Save D.
*
* @param entities the entities
* @return the iterable
*/
Iterable<DatasetAccessionRef> save(Iterable<DatasetAccessionRef> entities);
/**
* Update records.
*
* @param entities the existing DatasetAccessionRefs to update
*/
void update(Iterable<DatasetAccessionRef> entities);
/**
* Update safely: check if records exist before persisting.
*
* @param entities the entities
*/
void updateSafely(Iterable<DatasetAccessionRef> entities);
/**
* Delete for dataset.
*
......
......@@ -41,6 +41,7 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import com.google.common.collect.Iterables;
import com.querydsl.core.types.dsl.BooleanExpression;
import com.querydsl.jpa.impl.JPAQuery;
import com.querydsl.jpa.impl.JPAQueryFactory;
......@@ -187,5 +188,9 @@ public class DatasetAccessionRefRepositoryCustomImpl implements DatasetAccession
public long countByDataset(Dataset dataset) {
return jpaQueryFactory.selectFrom(QDatasetAccessionRef.datasetAccessionRef).where(QDatasetAccessionRef.datasetAccessionRef.dataset.eq(dataset)).fetchCount();
}
@Override
public List<DatasetAccessionRef> findAll(BooleanExpression in) {
return jpaQueryFactory.select(QDatasetAccessionRef.datasetAccessionRef).where(in).fetch();
}
}
......@@ -27,20 +27,14 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.validation.Valid;
import com.querydsl.jpa.JPAExpressions;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.genesys.blocks.security.service.CustomAclService;
import org.genesys.catalog.model.Partner;
import org.genesys.catalog.model.dataset.Dataset;
......@@ -74,8 +68,8 @@ import org.genesys2.server.model.UserRole;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.model.genesys.AccessionRef;
import org.genesys2.server.model.genesys.QAccession;
import org.genesys2.server.persistence.AccessionRepository;
import org.genesys2.server.service.DownloadService;
import org.genesys2.server.service.worker.AccessionRefMatcher;
import org.genesys2.util.JPAUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
......@@ -90,22 +84,16 @@ import org.springframework.security.access.prepost.PostAuthorize;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.acls.domain.BasePermission;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.multipart.MultipartFile;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.querydsl.core.BooleanBuilder;
import com.querydsl.core.types.Predicate;
import com.querydsl.jpa.JPAExpressions;
/**
* The Class DatasetServiceImpl.
......@@ -149,10 +137,6 @@ public class DatasetServiceImpl implements DatasetService {
@Autowired
protected AsAdminInvoker asAdminInvoker;
/** The accessions repository. */
@Autowired
private AccessionRepository accessionRepository;
/** The download service. */
@Autowired
private DownloadService downloadService;
......@@ -169,8 +153,7 @@ public class DatasetServiceImpl implements DatasetService {
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
private PlatformTransactionManager transactionManager;
private AccessionRefMatcher accessionRefMatcher;
/**
* {@inheritDoc}
......@@ -629,19 +612,8 @@ public class DatasetServiceImpl implements DatasetService {
LOG.info("Done saving {} accession refs, have {} in dataset", accessionRefs.size(), loadedDataset.getAccessionCount());
datasetRepository.save(loadedDataset);
threadPoolTaskExecutor.execute(() -> {
try {
Thread.sleep(500);
TransactionDefinition transactionDefinition = new DefaultTransactionAttribute(TransactionDefinition.PROPAGATION_REQUIRED);
new TransactionTemplate(transactionManager, transactionDefinition).execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
rematchDatasetAccessions(dArs);
}
});
} catch (InterruptedException e) {
}
});
batchRematchAccessionRefs(dArs);
return lazyLoad(loadedDataset);
}
......@@ -952,72 +924,6 @@ public class DatasetServiceImpl implements DatasetService {
target.getDescriptors().addAll(descriptors.stream().distinct().collect(Collectors.toList()));
}
/**
* Looking for matching Accession and then sets that to AccessionRef
* @param <T>
*
* @param accessionRefs the accessionRefs
*
* @return accessionRefs with matching accessions from Genesys
*/
private Collection<DatasetAccessionRef> lookupMatchingAccessions(final Collection<DatasetAccessionRef> accessionRefs) {
StopWatch stopWatch = StopWatch.createStarted();
accessionRefs.forEach(ref -> ref.trimStringsToNull());
final AtomicInteger batchCounter = new AtomicInteger(0);
Iterables.partition(accessionRefs, 500).forEach(batch -> {
LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), batch.size());
List<Accession> foundAccessions = accessionRepository.find(true, batch);
LOG.info("Found {} matches for {} identifiers after {}ms", foundAccessions.size(), batch.size(), stopWatch.getTime());
// instCode / genus / acceNumb map
Map<String, Map<String, Map<String, Accession>>> instMap = new HashMap<>();
Map<String, Accession> doiAccessions = new HashMap<>();
foundAccessions.forEach(found -> {
if (found.getDoi() != null) {
doiAccessions.put(found.getDoi(), found);
}
Map<String, Map<String, Accession>> instGenusMap = instMap.get(found.getInstCode());
if (instGenusMap == null) {
instMap.put(found.getInstCode(), instGenusMap = new HashMap<>());
}
Map<String, Accession> genusMap = instGenusMap.get(found.getGenus());
if (genusMap == null) {
instGenusMap.put(found.getGenus(), genusMap = new HashMap<>());
}
genusMap.put(found.getAccessionNumber(), found);
});
batch.forEach(ref -> {
ref.setAccession(null);
if (StringUtils.isNotBlank(ref.getDoi())) {
Accession doiAcce = doiAccessions.get(ref.getDoi());
if (doiAcce != null) {
ref.setAccession(doiAcce);
return;
}
}
Map<String, Map<String, Accession>> instGenusMap = instMap.get(ref.getInstCode());
if (instGenusMap != null) {
Map<String, Accession> genusMap = instGenusMap.get(ref.getGenus());
if (genusMap != null) {
ref.setAccession(genusMap.get(ref.getAcceNumb()));
}
}
if (ref.getAccession() == null) {
LOG.debug("No match for {}", ref);
}
});
});
LOG.info("Matched {} accession refs after {}ms", accessionRefs.size(), stopWatch.getTime());
return accessionRefs;
}
@Override
@Transactional(readOnly = false)
......@@ -1036,61 +942,42 @@ public class DatasetServiceImpl implements DatasetService {
if (dataset == null) {
return dataset;
}
LOG.info("Linking {} accessions with dataset {}", dataset.getAccessionCount(), dataset.getId());
final AtomicInteger batchCounter = new AtomicInteger(0);
// Use parallel streams for lookups
List<DatasetAccessionRef> accessionRefs = accessionRefRepository.findAll(dataset);
Lists.partition(accessionRefs, 1000).parallelStream().forEach(batch -> {
LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), batch.size());
lookupMatchingAccessions(batch);
});
// Save in this transaction
accessionRefRepository.update(accessionRefs);
LOG.info("Done relinking {} accession refs.", accessionRefs.size());
dataset.setAccessionCount(accessionRefs.size());
dataset = datasetRepository.save(dataset);
batchRematchAccessionRefs(accessionRefs);
LOG.info("Done scheduling of relinking {} accession refs.", accessionRefs.size());
return lazyLoad(dataset);
}
@Transactional(readOnly = false)
@PreAuthorize("hasRole('ADMINISTRATOR')")
private List<DatasetAccessionRef> rematchDatasetAccessions(List<DatasetAccessionRef> accessionRefs) {
LOG.debug("Re-linking {} accessions refs with accessions", accessionRefs.size());
final AtomicInteger batchCounter = new AtomicInteger(0);
// Use parallel streams for lookups
Lists.partition(accessionRefs, 1000).parallelStream().forEach(batch -> {
LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), batch.size());
lookupMatchingAccessions(batch);
/**
* Schedule re-matching of AccessionRefs in batches
* @param accessionRefs
*/
private void batchRematchAccessionRefs(List<DatasetAccessionRef> accessionRefs) {
Lists.partition(accessionRefs, 10000).parallelStream().forEach((batch) -> {
threadPoolTaskExecutor.execute(() -> {
LOG.info("Rematching {} subset refs", batch.size());
accessionRefMatcher.rematchAccessionRefs(batch, accessionRefRepository);
});
});
// Save in this transaction
accessionRefRepository.updateSafely(accessionRefs);
LOG.warn("Done relinking {} accession refs.", accessionRefs.size());
return accessionRefs;
}
@Override
@Transactional(readOnly = false, propagation = Propagation.REQUIRED, isolation = Isolation.READ_UNCOMMITTED)
public int clearAccessionRefs(Collection<Accession> accessions) {
Set<UUID> uuids = accessions.stream().map(a -> a.getUuid()).collect(Collectors.toSet());
final Iterable<Dataset> datasets = datasetRepository.findAll(dataset.accessionRefs.any().accession.in(accessions));
AtomicInteger updates = new AtomicInteger(0);
datasets.forEach(dataset -> {
dataset.getAccessionRefs().stream()
// filter for matching accessions
.filter(ref -> ref.getAccession() != null && uuids.contains(ref.getAccession().getUuid()))
// clear accession reference
.forEach(ref -> {
updates.incrementAndGet();
ref.setAccession(null);
});
List<DatasetAccessionRef> referencedRefs = accessionRefRepository.findAll(QDatasetAccessionRef.datasetAccessionRef.accession.in(accessions));
referencedRefs.forEach((ref) -> {
ref.setAccession(null);
});
datasetRepository.save(datasets);
return updates.get();
accessionRefRepository.save(referencedRefs);
return referencedRefs.size();
}
/**
......
/*
* Copyright 2019 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys2.server.persistence;
import java.util.List;
import org.genesys2.server.model.genesys.AccessionRef;
import com.querydsl.core.types.dsl.BooleanExpression;
/**
* The Interface AccessionRefRepository.
*
* @param <T> the generic type
*/
public interface AccessionRefRepository<T extends AccessionRef> {
/**
* Delete all.
*/
void deleteAll();
/**
* Save
*
* @param entities the entities
* @return the iterable
*/
Iterable<T> save(Iterable<T> entities);
/**
* Update records.
*
* @param entities the existing SubsetAccessionRefs to update
*/
void update(Iterable<T> entities);
/**
* Update safely: check if records exist before persisting.
*
* @param entities the entities
*/
void updateSafely(Iterable<T> entities);
/**
* Find matching accessionRefs
*
* @param in
* @return
*/
List<T> findAll(BooleanExpression in);
}
......@@ -29,35 +29,8 @@ import org.springframework.stereotype.Repository;
* The SubsetAccessionRefRepository.
*/
@Repository
public interface SubsetAccessionRefRepository {
public interface SubsetAccessionRefRepository extends AccessionRefRepository<SubsetAccessionRef> {
/**
* Delete all.
*/
void deleteAll();
/**
* Save
*
* @param entities the entities
* @return the iterable
*/
Iterable<SubsetAccessionRef> save(Iterable<SubsetAccessionRef> entities);
/**
* Update records.
*
* @param entities the existing SubsetAccessionRefs to update
*/
void update(Iterable<SubsetAccessionRef> entities);
/**
* Update safely: check if records exist before persisting.
*
* @param entities the entities
*/
void updateSafely(Iterable<SubsetAccessionRef> entities);
/**
* Delete for subset.
*
......@@ -97,5 +70,4 @@ public interface SubsetAccessionRefRepository {
* @return the long
*/
long countBySubset(Subset subset);
}
......@@ -44,6 +44,7 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import com.google.common.collect.Iterables;
import com.querydsl.core.types.dsl.BooleanExpression;
import com.querydsl.jpa.impl.JPAQuery;
import com.querydsl.jpa.impl.JPAQueryFactory;
......@@ -198,5 +199,9 @@ public class SubsetAccessionRefRepositoryCustomImpl implements SubsetAccessionRe
public long countBySubset(Subset subset) {
return jpaQueryFactory.selectFrom(QSubsetAccessionRef.subsetAccessionRef).where(QSubsetAccessionRef.subsetAccessionRef.subset.eq(subset)).fetchCount();
}
@Override
public List<SubsetAccessionRef> findAll(BooleanExpression in) {
return jpaQueryFactory.select(QSubsetAccessionRef.subsetAccessionRef).where(in).fetch();
}
}
/*
* Copyright 2018 Global Crop Diversity Trust
* Copyright 2019 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -31,20 +31,21 @@ import org.springframework.stereotype.Repository;
@Repository
public interface SubsetRepository extends JpaRepository<Subset, Long>, QueryDslPredicateExecutor<Subset> {
/**
* Gets the subset by uuid.
*
* @param uuid the uuid
* @return the subset by uuid
*/
Subset getByUuid(UUID uuid);
/**
* Gets the subset by uuid.
*
* @param uuid the uuid
* @return the subset by uuid
*/
Subset getByUuid(UUID uuid);
/**
* Gets the record by uuid and version.
*
* @param uuid the uuid
* @param version the version
* @return the by uuid and version
*/
Subset getByUuidAndVersion(UUID uuid, int version);
/**
* Gets the record by uuid and version.
*
* @param uuid the uuid
* @param version the version
* @return the by uuid and version
*/
Subset getByUuidAndVersion(UUID uuid, int version);
}
/*
* Copyright 2019 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys2.server.service.worker;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.model.genesys.AccessionRef;
import org.genesys2.server.persistence.AccessionRefRepository;
import org.genesys2.server.persistence.AccessionRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.google.common.collect.Lists;
/**
* Handles matching AccessionRefs to Accessions.
*
* Extracted here so that batch actions can be clearly transactional.
* No authentication checks in place.
*
* @author Matija Obreza
*/
@Component
public class AccessionRefMatcher {
private static final Logger LOG = LoggerFactory.getLogger(AccessionRefMatcher.class);
@Autowired
private AccessionRepository accessionRepository;
/**
* Re-match accession refs to accessions in Genesys.
*
* @param <T> the type of AccessonRef
* @param accessionRefs the list of accession references
* @param accessionRefRepository the repository
* @return the list
*/
@Transactional(readOnly = false)
public <T extends AccessionRef> List<T> rematchAccessionRefs(List<T> accessionRefs, AccessionRefRepository<T> accessionRefRepository) {
LOG.debug("Linking {} accession refs with accessions", accessionRefs.size());
final AtomicInteger batchCounter = new AtomicInteger(0);
Lists.partition(accessionRefs, 10000).stream().forEach(batch -> {
LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), batch.size());
lookupMatchingAccessions(batch, accessionRefRepository);
// Save in this transaction
accessionRefRepository.save(batch);
});
LOG.info("Done relinking {} accession refs.", accessionRefs.size());
return accessionRefs;
}
/**
* Looking for matching Accession and then sets that to AccessionRef
*
* @param accessionRefs the accessionRefs
*
* @return accessionRefs with matching accessions from Genesys
*/
private <T extends AccessionRef> List<T> lookupMatchingAccessions(List<T> accessionRefs, AccessionRefRepository<T> accessionRefRepository) {
StopWatch stopWatch = StopWatch.createStarted();
accessionRefs.forEach(ref -> ref.trimStringsToNull());
final AtomicInteger batchCounter = new AtomicInteger(0);
Lists.partition(accessionRefs, 100).parallelStream().forEach(batch -> {
LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), batch.size());
List<Accession> foundAccessions = accessionRepository.find(true, batch);
LOG.info("Found {} matches for {} identifiers after {}ms", foundAccessions.size(), batch.size(), stopWatch.getTime());
// instCode / genus / acceNumb map
Map<String, Map<String, Map<String, Accession>>> instMap = new HashMap<>();
Map<String, Accession> doiAccessions = new HashMap<>();
foundAccessions.forEach(found -> {
if (found.getDoi() != null) {
doiAccessions.put(found.getDoi(), found);
}
Map<String, Map<String, Accession>> instGenusMap = instMap.get(found.getInstCode());
if (instGenusMap == null) {
instMap.put(found.getInstCode(), instGenusMap = new HashMap<>());
}
Map<String, Accession> genusMap = instGenusMap.get(found.getGenus());
if (genusMap == null) {
instGenusMap.put(found.getGenus(), genusMap = new HashMap<>());
}
genusMap.put(found.getAccessionNumber(), found);
});
batch.forEach(ref -> {
ref.setAccession(null);
if (StringUtils.isNotBlank(ref.getDoi())) {
Accession doiAcce = doiAccessions.get(ref.getDoi());
if (doiAcce != null) {
ref.setAccession(doiAcce);
return;
}
}
Map<String, Map<String, Accession>> instGenusMap = instMap.get(ref.getInstCode());
if (instGenusMap != null) {
Map<String, Accession> genusMap = instGenusMap.get(ref.getGenus());
if (genusMap != null) {
ref.setAccession(genusMap.get(ref.getAcceNumb()));
}
}
if (ref.getAccession() == null) {
LOG.debug("No match for {}", ref);
}
});
});
LOG.info("Matched {} accession refs after {}ms", accessionRefs.size(), stopWatch.getTime());
return accessionRefs;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment