Commit 84694442 authored by Matija Obreza's avatar Matija Obreza

Merge branch '429-massive-subsets-datasets' into 'master'

Resolve "Massive subsets/datasets"

Closes #429

See merge request genesys-pgr/genesys-server!375
parents 8d736a7f 62c2646b
......@@ -19,6 +19,7 @@ import java.util.List;
import org.genesys.catalog.model.dataset.Dataset;
import org.genesys.catalog.model.dataset.DatasetAccessionRef;
import org.genesys2.server.persistence.AccessionRefRepository;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Repository;
......@@ -27,35 +28,8 @@ import org.springframework.stereotype.Repository;
* The DatasetAccessionRefRepository.
*/
@Repository
public interface DatasetAccessionRefRepository {
public interface DatasetAccessionRefRepository extends AccessionRefRepository<DatasetAccessionRef> {
/**
* Delete all.
*/
void deleteAll();
/**
* Save D.
*
* @param entities the entities
* @return the iterable
*/
Iterable<DatasetAccessionRef> save(Iterable<DatasetAccessionRef> entities);
/**
* Update records.
*
* @param entities the existing DatasetAccessionRefs to update
*/
void update(Iterable<DatasetAccessionRef> entities);
/**
* Update safely: check if records exist before persisting.
*
* @param entities the entities
*/
void updateSafely(Iterable<DatasetAccessionRef> entities);
/**
* Delete for dataset.
*
......
......@@ -41,6 +41,7 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import com.google.common.collect.Iterables;
import com.querydsl.core.types.dsl.BooleanExpression;
import com.querydsl.jpa.impl.JPAQuery;
import com.querydsl.jpa.impl.JPAQueryFactory;
......@@ -187,5 +188,9 @@ public class DatasetAccessionRefRepositoryCustomImpl implements DatasetAccession
public long countByDataset(Dataset dataset) {
return jpaQueryFactory.selectFrom(QDatasetAccessionRef.datasetAccessionRef).where(QDatasetAccessionRef.datasetAccessionRef.dataset.eq(dataset)).fetchCount();
}
@Override
public List<DatasetAccessionRef> findAll(BooleanExpression in) {
return jpaQueryFactory.selectFrom(QDatasetAccessionRef.datasetAccessionRef).where(in).fetch();
}
}
/*
* Copyright 2019 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys2.server.persistence;
import java.util.List;
import org.genesys2.server.model.genesys.AccessionRef;
import com.querydsl.core.types.dsl.BooleanExpression;
/**
* The Interface AccessionRefRepository.
*
* @param <T> the generic type
*/
public interface AccessionRefRepository<T extends AccessionRef> {
/**
* Delete all.
*/
void deleteAll();
/**
* Save
*
* @param entities the entities
* @return the iterable
*/
Iterable<T> save(Iterable<T> entities);
/**
* Update records.
*
* @param entities the existing SubsetAccessionRefs to update
*/
void update(Iterable<T> entities);
/**
* Update safely: check if records exist before persisting.
*
* @param entities the entities
*/
void updateSafely(Iterable<T> entities);
/**
* Find matching accessionRefs
*
* @param in
* @return
*/
List<T> findAll(BooleanExpression in);
}
......@@ -29,35 +29,8 @@ import org.springframework.stereotype.Repository;
* The SubsetAccessionRefRepository.
*/
@Repository
public interface SubsetAccessionRefRepository {
public interface SubsetAccessionRefRepository extends AccessionRefRepository<SubsetAccessionRef> {
/**
* Delete all.
*/
void deleteAll();
/**
* Save
*
* @param entities the entities
* @return the iterable
*/
Iterable<SubsetAccessionRef> save(Iterable<SubsetAccessionRef> entities);
/**
* Update records.
*
* @param entities the existing SubsetAccessionRefs to update
*/
void update(Iterable<SubsetAccessionRef> entities);
/**
* Update safely: check if records exist before persisting.
*
* @param entities the entities
*/
void updateSafely(Iterable<SubsetAccessionRef> entities);
/**
* Delete for subset.
*
......@@ -97,5 +70,4 @@ public interface SubsetAccessionRefRepository {
* @return the long
*/
long countBySubset(Subset subset);
}
......@@ -44,6 +44,7 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import com.google.common.collect.Iterables;
import com.querydsl.core.types.dsl.BooleanExpression;
import com.querydsl.jpa.impl.JPAQuery;
import com.querydsl.jpa.impl.JPAQueryFactory;
......@@ -198,5 +199,9 @@ public class SubsetAccessionRefRepositoryCustomImpl implements SubsetAccessionRe
public long countBySubset(Subset subset) {
return jpaQueryFactory.selectFrom(QSubsetAccessionRef.subsetAccessionRef).where(QSubsetAccessionRef.subsetAccessionRef.subset.eq(subset)).fetchCount();
}
@Override
public List<SubsetAccessionRef> findAll(BooleanExpression in) {
return jpaQueryFactory.selectFrom(QSubsetAccessionRef.subsetAccessionRef).where(in).fetch();
}
}
/*
* Copyright 2018 Global Crop Diversity Trust
* Copyright 2019 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -31,20 +31,21 @@ import org.springframework.stereotype.Repository;
@Repository
public interface SubsetRepository extends JpaRepository<Subset, Long>, QueryDslPredicateExecutor<Subset> {
/**
* Gets the subset by uuid.
*
* @param uuid the uuid
* @return the subset by uuid
*/
Subset getByUuid(UUID uuid);
/**
* Gets the subset by uuid.
*
* @param uuid the uuid
* @return the subset by uuid
*/
Subset getByUuid(UUID uuid);
/**
* Gets the record by uuid and version.
*
* @param uuid the uuid
* @param version the version
* @return the by uuid and version
*/
Subset getByUuidAndVersion(UUID uuid, int version);
/**
* Gets the record by uuid and version.
*
* @param uuid the uuid
* @param version the version
* @return the by uuid and version
*/
Subset getByUuidAndVersion(UUID uuid, int version);
}
......@@ -22,6 +22,8 @@ import java.io.OutputStreamWriter;
import java.util.List;
import com.querydsl.core.types.Predicate;
import com.querydsl.jpa.JPQLQuery;
import org.genesys.catalog.model.dataset.Dataset;
import org.genesys.catalog.model.traits.Descriptor;
import org.genesys2.server.model.genesys.Metadata;
......@@ -38,7 +40,9 @@ public interface DownloadService {
void writeXlsxMCPD(AccessionFilter accessionFilter, OutputStream outputStream) throws IOException;
void writeXlsxMCPD(Predicate predicate, OutputStream outputStream) throws IOException;
void writeXlsxMCPD(Predicate predicate, OutputStream outputStream, String filters, String dataSource) throws IOException;
void writeXlsxMCPD(JPQLQuery<Long> queryAccessionId, OutputStream outputStream, String filters, String dataSource) throws IOException;
void writeXlsxPDCI(AccessionFilter accessionFilter, OutputStream outputStream) throws IOException;
......@@ -52,4 +56,6 @@ public interface DownloadService {
void writeXlsxGenesys1Descriptors(Crop crop, OutputStream outputStream) throws IOException;
}
......@@ -33,7 +33,14 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import com.querydsl.core.types.Predicate;
import com.querydsl.core.types.dsl.PathBuilder;
import com.querydsl.core.types.dsl.PathBuilderFactory;
import com.querydsl.jpa.JPQLQuery;
import org.apache.commons.lang3.StringUtils;
import org.apache.poi.POIXMLProperties;
import org.apache.poi.common.usermodel.HyperlinkType;
......@@ -69,6 +76,7 @@ import org.genesys2.server.model.genesys.ExperimentAccessionTrait;
import org.genesys2.server.model.genesys.Metadata;
import org.genesys2.server.model.genesys.PDCI;
import org.genesys2.server.model.genesys.Parameter;
import org.genesys2.server.model.genesys.QAccession;
import org.genesys2.server.model.genesys.Taxonomy2;
import org.genesys2.server.model.genesys.TraitCode;
import org.genesys2.server.model.impl.Country;
......@@ -90,6 +98,7 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.i18n.LocaleContextHolder;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.jpa.repository.support.Querydsl;
import org.springframework.jdbc.core.RowCallbackHandler;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
......@@ -167,6 +176,9 @@ public class DownloadServiceImpl implements DownloadService {
private String genesysCatalogUrl;
private HashMap<String, Method> pdciMethods;
@PersistenceContext
private EntityManager entityManager;
private static class WorkbookStyles {
CellStyle dateStyle;
......@@ -189,11 +201,23 @@ public class DownloadServiceImpl implements DownloadService {
}
@Override
public void writeXlsxMCPD(Predicate predicate, OutputStream outputStream) throws IOException {
writeXlsxMCPD(predicate, outputStream, "", "");
public void writeXlsxMCPD(Predicate predicate, OutputStream outputStream, String filters, String dataSource) throws IOException {
PathBuilder<Accession> builder = new PathBuilderFactory().create(Accession.class);
Querydsl querydsl = new Querydsl(entityManager, builder);
JPQLQuery<Long> query = querydsl.createQuery(QAccession.accession)
// select id only
.select(QAccession.accession.id)
// order by id
.orderBy(QAccession.accession.id.asc());
// apply filter
query.where(predicate);
writeXlsxMCPD(query, outputStream, filters, dataSource);
}
private void writeXlsxMCPD(Predicate predicate, OutputStream outputStream, String filters, String dataSource) throws IOException {
@Override
public void writeXlsxMCPD(JPQLQuery<Long> queryAccessionId, OutputStream outputStream, String filters, String dataSource) throws IOException {
XSSFWorkbook template = new XSSFWorkbook(getClass().getResourceAsStream("/template/download/MCPD.xlsx"));
POIXMLProperties props = template.getProperties();
......@@ -239,12 +263,12 @@ public class DownloadServiceImpl implements DownloadService {
// Write accession information
final Writer writer = new Writer();
try {
accessionProcessor.process(predicate, (accessions) -> {
accessionProcessor.process(queryAccessionId, (accessions) -> {
for (Accession accession: accessions) {
writer.processMCPD(sheet, wbStyles, accession);
}
return accessions;
});
}, null);
LOG.warn("Done streaming MCPD rows");
} catch (Exception e) {
LOG.warn("Error generating: {}", e.getMessage());
......
......@@ -74,27 +74,35 @@ public class AccessionProcessor {
process(filter.buildPredicate(), action, maxSize);
}
@Transactional(readOnly = true, propagation = Propagation.REQUIRES_NEW)
public void process(Predicate predicate, IAccessionBatchAction action) throws Exception {
process(predicate, action, null);
}
@Transactional(readOnly = true, propagation = Propagation.REQUIRES_NEW)
public void process(Predicate predicate, IAccessionBatchAction action, Long maxSize) throws Exception {
long count = accessionRepository.count(predicate);
PathBuilder<Accession> builder = new PathBuilderFactory().create(Accession.class);
Querydsl querydsl = new Querydsl(em, builder);
JPQLQuery<Long> query = querydsl.createQuery(QAccession.accession)
// select id only
.select(QAccession.accession.id)
// order by id
.orderBy(QAccession.accession.id.asc());
// select id only
.select(QAccession.accession.id)
// order by id
.orderBy(QAccession.accession.id.asc());
// apply filter
query.where(predicate);
process(query, action, maxSize);
}
/**
* Advanced usage.
*
* @param accessionIdQuery query statement that returns ordered accessionIds
* (for pagination)
* @param action
* @param maxSize
* @throws Exception
*/
@Transactional(readOnly = true, propagation = Propagation.REQUIRES_NEW)
public void process(JPQLQuery<Long> accessionIdQuery, IAccessionBatchAction action, Long maxSize) throws Exception {
long count = accessionIdQuery.fetchCount();
int startPosition = 0;
StopWatch stopWatch = new StopWatch();
......@@ -103,19 +111,20 @@ public class AccessionProcessor {
List<Long> results;
do {
query.offset(startPosition);
accessionIdQuery.offset(startPosition);
// Respect maxSize
if (maxSize != null && startPosition + batchSize > maxSize) {
// we would be over allowed number of records
query.limit(maxSize - startPosition);
accessionIdQuery.limit(maxSize - startPosition);
} else {
query.limit(batchSize);
accessionIdQuery.limit(batchSize);
}
stopWatch.split();
LOG.debug("Reading Accessions. Stopwatch={}s {}+{} of {}. Processing at {} accessions/s", stopWatch.getSplitTime() / 1000, startPosition, batchSize, count, (double)(startPosition+batchSize)/(stopWatch.getSplitTime() / 1000));
results = query.fetch();
LOG.debug("Reading Accessions. Stopwatch={}s {}+{} of {}. Processing at {} accessions/s", stopWatch.getSplitTime() / 1000, startPosition, batchSize, count,
(double) (startPosition + batchSize) / (stopWatch.getSplitTime() / 1000));
results = accessionIdQuery.fetch();
loadAndProcess(results, action);
// Start position is updated above.
......@@ -129,10 +138,12 @@ public class AccessionProcessor {
LOG.info("Processing Accessions took {}ms", stopWatch.getTime());
}
private void loadAndProcess(List<Long> accessionIds, IAccessionBatchAction action) throws Exception {
List<Accession> accessions = accessionRepository.findAll(accessionIds);
action.apply(accessions);
}
/**
* Apply action on accessions matching the provided filter.
*
......@@ -144,7 +155,7 @@ public class AccessionProcessor {
apply(filter.buildPredicate(), action);
}
/**
* Apply action on accessions matching the provided filter.
*
......@@ -179,13 +190,14 @@ public class AccessionProcessor {
do {
stopWatch.split();
LOG.debug("Reading Accessions. Stopwatch={}s {}+{} of {}. Processing at {} accessions/s", stopWatch.getSplitTime() / 1000, startPosition, batchSize, count, (double)(startPosition+batchSize)/(stopWatch.getSplitTime() / 1000));
LOG.debug("Reading Accessions. Stopwatch={}s {}+{} of {}. Processing at {} accessions/s", stopWatch.getSplitTime() / 1000, startPosition, batchSize, count,
(double) (startPosition + batchSize) / (stopWatch.getSplitTime() / 1000));
results = query.fetch();
asyncUpdate(results, action);
// Next page
query.offset(startPosition += results.size());
// Clear anything cached in the entity manager
em.clear();
} while (results.size() > 0);
......
/*
* Copyright 2019 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys2.server.service.worker;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.model.genesys.AccessionRef;
import org.genesys2.server.persistence.AccessionRefRepository;
import org.genesys2.server.persistence.AccessionRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.google.common.collect.Lists;
/**
* Handles matching AccessionRefs to Accessions.
*
* Extracted here so that batch actions can be clearly transactional.
* No authentication checks in place.
*
* @author Matija Obreza
*/
@Component
public class AccessionRefMatcher {
private static final Logger LOG = LoggerFactory.getLogger(AccessionRefMatcher.class);
@Autowired
private AccessionRepository accessionRepository;
/**
* Re-match accession refs to accessions in Genesys.
*
* @param <T> the type of AccessonRef
* @param accessionRefs the list of accession references
* @param accessionRefRepository the repository
* @return the list
*/
@Transactional(readOnly = false)
public <T extends AccessionRef> List<T> rematchAccessionRefs(List<T> accessionRefs, AccessionRefRepository<T> accessionRefRepository) {
LOG.debug("Linking {} accession refs with accessions", accessionRefs.size());
final AtomicInteger batchCounter = new AtomicInteger(0);
Lists.partition(accessionRefs, 10000).stream().forEach(batch -> {
LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), batch.size());
lookupMatchingAccessions(batch, accessionRefRepository);
// Save in this transaction
accessionRefRepository.save(batch);
});
LOG.info("Done relinking {} accession refs.", accessionRefs.size());
return accessionRefs;
}
/**
* Looking for matching Accession and then sets that to AccessionRef
*
* @param accessionRefs the accessionRefs
*
* @return accessionRefs with matching accessions from Genesys
*/
private <T extends AccessionRef> List<T> lookupMatchingAccessions(List<T> accessionRefs, AccessionRefRepository<T> accessionRefRepository) {
StopWatch stopWatch = StopWatch.createStarted();
accessionRefs.forEach(ref -> ref.trimStringsToNull());
final AtomicInteger batchCounter = new AtomicInteger(0);
Lists.partition(accessionRefs, 100).parallelStream().forEach(batch -> {
LOG.info("Batch {} with size {}", batchCounter.incrementAndGet(), batch.size());
List<Accession> foundAccessions = accessionRepository.find(true, batch);
LOG.info("Found {} matches for {} identifiers after {}ms", foundAccessions.size(), batch.size(), stopWatch.getTime());
// instCode / genus / acceNumb map
Map<String, Map<String, Map<String, Accession>>> instMap = new HashMap<>();
Map<String, Accession> doiAccessions = new HashMap<>();
foundAccessions.forEach(found -> {
if (found.getDoi() != null) {
doiAccessions.put(found.getDoi(), found);
}
Map<String, Map<String, Accession>> instGenusMap = instMap.get(found.getInstCode());
if (instGenusMap == null) {
instMap.put(found.getInstCode(), instGenusMap = new HashMap<>());
}
Map<String, Accession> genusMap = instGenusMap.get(found.getGenus());
if (genusMap == null) {
instGenusMap.put(found.getGenus(), genusMap = new HashMap<>());
}
genusMap.put(found.getAccessionNumber(), found);
});
batch.forEach(ref -> {
ref.setAccession(null);
if (StringUtils.isNotBlank(ref.getDoi())) {
Accession doiAcce = doiAccessions.get(ref.getDoi());
if (doiAcce != null) {
ref.setAccession(doiAcce);
return;
}
}
Map<String, Map<String, Accession>> instGenusMap = instMap.get(ref.getInstCode());
if (instGenusMap != null) {
Map<String, Accession> genusMap = instGenusMap.get(ref.getGenus());
if (genusMap != null) {
ref.setAccession(genusMap.get(ref.getAcceNumb()));
}
}
if (ref.getAccession() == null) {
LOG.debug("No match for {}", ref);
}
});
});
LOG.info("Matched {} accession refs after {}ms", accessionRefs.size(), stopWatch.getTime());
return accessionRefs;
}
}
......@@ -24,11 +24,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.validation.ConstraintViolationException;
import org.genesys.blocks.model.filters.StringFilter;
import org.genesys.catalog.model.dataset.Dataset;
import org.genesys.catalog.model.dataset.DatasetCreator;
......@@ -61,8 +62,6 @@ import org.springframework.security.test.context.support.WithMockUser;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import javax.validation.ConstraintViolationException;
@WithMockUser(username = "admin", password = "admin", roles = "ADMINISTRATOR")
public class DatasetServiceTest extends AbstractDatasetServiceTest {
......@@ -817,7 +816,7 @@ public class DatasetServiceTest extends AbstractDatasetServiceTest {
}
@Test
public void testLateAccessionRegistration() {
public void testLateAccessionRegistration() throws InterruptedException {
final Set<AccessionRef> ids1 = new HashSet<>();
ids1.add(makeAccessionIdentifier(TEST_INSTCODE, "A1", "Musa", null));
ids1.add(makeAccessionIdentifier(TEST_INSTCODE, "A2", "Musa", null));
......@@ -849,6 +848,8 @@ public class DatasetServiceTest extends AbstractDatasetServiceTest {
final Accession accession2 = upsertAccession(TEST_INSTCODE, "A8", "Musa");
datasetService.rematchDatasetAccessions(dataset1);
Thread.sleep(5000);
datasets1 = datasetService.listByAccession(accession2);
assertThat(datasets1.size(), is(1));
assertThat(datasets1.get(0).getUuid(), is(dataset1.getUuid()));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment