Commit 427d6f17 authored by Matija Obreza's avatar Matija Obreza

Bulk exec an action on selected accessions

- Increased threadpool queue to 30 items
parent 8e3ee85b
This diff is collapsed.
......@@ -75,7 +75,7 @@ public class ElasticJPAListener {
*/
@AfterReturning(value = "execution(* org.springframework.data.jpa.repository.JpaRepository.save(..))", returning = "result")
public void afterPersist(final JoinPoint joinPoint, final Object result) {
LOG.debug("JPA afterPersist {} {}", joinPoint.toLongString(), joinPoint.getTarget());
LOG.trace("JPA afterPersist {} {}", joinPoint.toLongString(), joinPoint.getTarget());
try {
scheduleReindexing(result);
......
/*
* Copyright 2018 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys2.server.mvc.admin;
import java.util.ArrayList;
import java.util.List;
import javax.persistence.EntityManager;
import org.apache.commons.lang3.time.StopWatch;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.model.genesys.QAccession;
import org.genesys2.server.persistence.AccessionRepository;
import org.genesys2.server.service.AccessionService;
import org.genesys2.server.service.AccessionService.IAccessionAction;
import org.genesys2.server.service.filter.AccessionFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.jpa.repository.support.Querydsl;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import com.querydsl.core.types.Predicate;
import com.querydsl.core.types.dsl.PathBuilder;
import com.querydsl.core.types.dsl.PathBuilderFactory;
import com.querydsl.jpa.JPQLQuery;
/**
* Executes actions on filtered accessions.
*/
@Component
public class AccessionProcessor {
private static final Logger LOG = LoggerFactory.getLogger(AccessionProcessor.class);
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
private AccessionService accessionService;
@Autowired
private AccessionRepository accessionRepository;
@Autowired
private EntityManager em;
/// Size of database batch scan for IDs
private int batchSize = 1000;
/**
* Apply action on accessions matching the provided filter.
*
* @param filter the filter
* @param action the action
*/
@Transactional(readOnly = true, propagation = Propagation.REQUIRES_NEW)
public void apply(AccessionFilter filter, IAccessionAction action) {
Predicate predicate = filter.buildQuery();
long count = accessionRepository.count(predicate);
PathBuilder<Accession> builder = new PathBuilderFactory().create(Accession.class);
Querydsl querydsl = new Querydsl(em, builder);
JPQLQuery<Long> query = querydsl.createQuery(QAccession.accession)
// select id only
.select(QAccession.accession.id)
// order by id
.orderBy(QAccession.accession.id.asc());
// apply filter
query.where(predicate);
int startPosition = 0;
query.offset(startPosition);
query.limit(batchSize);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
List<Long> results;
do {
stopWatch.split();
LOG.debug("Reading Accessions. Stopwatch={}s {}+{} of {}. Processing at {} accessions/s", stopWatch.getSplitTime() / 1000, startPosition, batchSize, count, (double)(startPosition+batchSize)/(stopWatch.getSplitTime() / 1000));
results = query.fetch();
asyncUpdate(results, action);
// Next page
query.offset(startPosition += results.size());
// Clear anything cached in the entity manager
em.clear();
} while (results.size() > 0);
stopWatch.stop();
LOG.info("Processing Accessions for filter {} took {}ms", filter, stopWatch.getTime());
}
private void asyncUpdate(List<Long> accessionIds, IAccessionAction action) {
if (accessionIds.size() == 0) {
return;
}
final ArrayList<Long> copy = new ArrayList<>(accessionIds);
threadPoolTaskExecutor.execute(() -> {
try {
LOG.trace("Executing action on {} Accessions.", copy.size());
accessionService.processAccessions(copy, action);
} catch (Throwable e) {
LOG.warn("Error executing action on batch: {}", e.getMessage(), e);
} finally {
LOG.trace("Done executing action on {} accessions.", copy.size());
}
});
}
}
......@@ -29,8 +29,11 @@ import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.lang.time.StopWatch;
import org.genesys.catalog.service.DatasetService;
import org.genesys2.server.model.genesys.AccessionId;
import org.genesys2.server.model.genesys.PDCI;
import org.genesys2.server.model.impl.FaoInstitute;
import org.genesys2.server.persistence.AccessionRepository;
import org.genesys2.server.persistence.PDCIRepository;
import org.genesys2.server.service.ContentService;
import org.genesys2.server.service.CountryNamesUpdater;
import org.genesys2.server.service.CropService;
......@@ -39,9 +42,11 @@ import org.genesys2.server.service.GeoRegionService;
import org.genesys2.server.service.GeoService;
import org.genesys2.server.service.InstituteService;
import org.genesys2.server.service.TaxonomyService;
import org.genesys2.server.service.filter.AccessionFilter;
import org.genesys2.server.service.worker.ITPGRFAStatusUpdater;
import org.genesys2.server.service.worker.InstituteUpdater;
import org.genesys2.server.service.worker.SGSVUpdate;
import org.genesys2.util.PDCICalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -53,8 +58,11 @@ import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.xml.sax.SAXException;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@Controller
......@@ -111,6 +119,12 @@ public class AdminController {
@Autowired
private ContentService contentService;
@Autowired
private AccessionProcessor accessionProcessor;
@Autowired
private PDCIRepository pdciRepository;
@RequestMapping("/")
public String root(Model model) {
return "/admin/index";
......@@ -212,7 +226,7 @@ public class AdminController {
return "redirect:/admin/";
}
@RequestMapping(value = "/pdci", method = RequestMethod.POST, params = "updatePdciStats")
@RequestMapping(value = "/pdci", method = RequestMethod.POST, params = "action=institute-pdci")
public String updatePDCI() {
for (FaoInstitute institute: instituteService.listActive(new PageRequest(0, Integer.MAX_VALUE))) {
LOG.info("Updating PDCI for {}", institute.getCode());
......@@ -222,6 +236,37 @@ public class AdminController {
return "redirect:/admin/";
}
@RequestMapping(value = "/pdci", method = RequestMethod.POST, params = "action=filtered-pdci")
public String updateFilteredPDCI(@RequestParam(name = "filter") String filters) throws JsonParseException, JsonMappingException, IOException {
AccessionFilter filter = mapper.readValue(filters, AccessionFilter.class);
LOG.warn("Recalculating PDCI for accessions matching filter: {}", filter);
accessionProcessor.apply(filter, (accession) -> {
// Everything here is executed within a @Transaction(readOnly = false) context
AccessionId accessionId = accession.getAccessionId();
PDCI pdci = accessionId.getPdci();
// create new PDCI if missing
PDCI resultingPdci = PDCICalculator.updatePdci(pdci == null ? new PDCI() : pdci, accession);
// if PDCI was missing, link it with accession
if (pdci == null) {
LOG.trace("Assigning new PDCI for {}", accession);
resultingPdci.setAccession(accessionId);
accessionId.setPdci(resultingPdci);
// also updates accession.accessionId.pdci
return accessionRepository.save(accession);
} else {
LOG.trace("Updating PDCI for {}", accession);
accessionId.setPdci(pdciRepository.save(resultingPdci));
return accession;
}
});
return "redirect:/admin/";
}
@RequestMapping(value = "/admin-action", method = RequestMethod.POST, params = "georegion")
public String updateGeoReg() throws IOException, ParserConfigurationException, SAXException {
geoRegionService.updateGeoRegionData();
......
......@@ -58,4 +58,24 @@ public interface AccessionService {
* @return map with UUIDs and related AccessionIdentifiers
*/
Map<UUID, AccessionIdentifier3> toUUID(List<? extends AccessionIdentifier3> identifiers);
/**
* Loads {@link Accession} by IDs and executes the action on each accession in a
* Spring transaction. Spring security context not supported.
*
* @param accessionIds List of accession IDs
* @param action the action to execute on each accession
* @return the list of processed accessions
*/
List<Accession> processAccessions(List<Long> accessionIds, IAccessionAction action);
public static interface IAccessionAction {
/**
* Run action on Accession
*
* @param a the accession
* @return must return the resulting {@link Accession}
*/
Accession apply(Accession a);
}
}
......@@ -15,11 +15,11 @@
*/
package org.genesys2.server.service.impl;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.Map;
import java.util.HashMap;
import java.util.Optional;
import java.util.UUID;
import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.model.genesys.AccessionData;
......@@ -29,12 +29,16 @@ import org.genesys2.server.persistence.AccessionRepository;
import org.genesys2.server.service.AccessionService;
import org.genesys2.server.service.ElasticsearchService;
import org.genesys2.server.service.filter.AccessionFilter;
import org.genesys2.server.service.worker.AccessionCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
/**
......@@ -44,12 +48,18 @@ import org.springframework.transaction.annotation.Transactional;
@Transactional(readOnly = true)
public class AccessionServiceImpl implements AccessionService {
public static final Logger LOG = LoggerFactory.getLogger(AccessionServiceImpl.class);
/** The accession repository. */
@Autowired
private AccessionRepository accessionRepository;
@Autowired
private ElasticsearchService elasticsearchService;
@Autowired
private AccessionCounter accessionCounter;
private <T extends AccessionData> T lazyLoad(T accession) {
if (accession != null) {
......@@ -137,4 +147,14 @@ public class AccessionServiceImpl implements AccessionService {
}
return new PageImpl<>(content, page, total);
}
@Override
@Transactional(readOnly = false, propagation = Propagation.REQUIRES_NEW)
public List<Accession> processAccessions(List<Long> accessionIds, IAccessionAction action) {
List<Accession> accessions = accessionRepository.findAll(accessionIds);
accessions.stream().map(a -> a.getInstitute()).distinct().forEach(institute -> accessionCounter.recountInstitute(institute));
LOG.debug("Processing {} accessions of {} IDs provided", accessions.size(), accessionIds.size());
accessions.forEach(accession -> action.apply(accession));
return accessions;
}
}
......@@ -97,8 +97,7 @@ import com.querydsl.jpa.JPQLQuery;
@Transactional(readOnly = true)
public class ElasticsearchServiceImpl implements ElasticsearchService, InitializingBean {
/** The Constant LOG. */
public static final Logger LOG = LoggerFactory.getLogger(ElasticsearchServiceImpl.class);
private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchServiceImpl.class);
private static final String INDEX_READ = "_read";
private static final String INDEX_WRITE = "_write";
......
......@@ -34,7 +34,7 @@ public class SchedulerConfig {
final ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(4);
pool.setMaxPoolSize(8);
pool.setQueueCapacity(10);
pool.setQueueCapacity(30);
pool.setThreadNamePrefix("genesys-background-");
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return pool;
......
......@@ -27,7 +27,14 @@ import org.slf4j.LoggerFactory;
*/
public class PDCICalculator {
public static final Logger LOG = LoggerFactory.getLogger(PDCICalculator.class);
/**
* Updates and returns the updated PDCI object for the accession.
*
* @param pdci the PDCI object
* @param accession accession on which PDCI is calculated
* @return the updated PDCI object
*/
public static PDCI updatePdci(PDCI pdci, Accession accession) {
pdci.reset();
......@@ -53,7 +60,7 @@ public class PDCICalculator {
return pdci;
}
private static void independentDescriptors(PDCI pdci, Accession accession) {
AccessionId accessionId = accession.getAccessionId();
......@@ -74,7 +81,7 @@ public class PDCICalculator {
pdci.setSampStat(80);
}
if (StringUtils.isNotBlank(accession.getDonorCode())) {
if (! accession.getDonorCode().equals(accession.getInstituteCode())) {
if (!accession.getDonorCode().equals(accession.getInstituteCode())) {
if (MCPDUtil.isWiewsCode(accession.getDonorCode())) {
pdci.setDonorCode(40);
} else {
......@@ -97,7 +104,7 @@ public class PDCICalculator {
}
if (accessionId.getDuplSite() != null && accessionId.getDuplSite().size() > 0) {
if (accessionId.getDuplSite().stream().filter(code -> ! MCPDUtil.isWiewsCode(code)).count() == 0) {
if (accessionId.getDuplSite().stream().filter(code -> !MCPDUtil.isWiewsCode(code)).count() == 0) {
pdci.setDuplSite(30);
} else {
pdci.setDuplSite(15);
......@@ -277,13 +284,13 @@ public class PDCICalculator {
private static void breedingMaterial(PDCI pdci, Accession accession) {
AccessionId accessionId = accession.getAccessionId();
AccessionCollect coll = accessionId.getColl();
if (accession.getOrigCty() != null) {
pdci.setOrigCty(40);
}
if (accessionId.getBreederCode() != null && accessionId.getBreederCode().size() > 0) {
if (accessionId.getBreederCode().stream().filter(code -> ! MCPDUtil.isWiewsCode(code)).count() == 0) {
if (accessionId.getBreederCode().stream().filter(code -> !MCPDUtil.isWiewsCode(code)).count() == 0) {
pdci.setBredCode(110);
} else {
pdci.setBredCode(55);
......@@ -326,7 +333,7 @@ public class PDCICalculator {
}
if (accessionId.getBreederCode() != null && accessionId.getBreederCode().size() > 0) {
if (accessionId.getBreederCode().stream().filter(code -> ! MCPDUtil.isWiewsCode(code)).count() == 0) {
if (accessionId.getBreederCode().stream().filter(code -> !MCPDUtil.isWiewsCode(code)).count() == 0) {
pdci.setBredCode(80);
} else {
pdci.setBredCode(40);
......@@ -384,7 +391,7 @@ public class PDCICalculator {
}
if (accessionId.getBreederCode() != null && accessionId.getBreederCode().size() > 0) {
if (accessionId.getBreederCode().stream().filter(code -> ! MCPDUtil.isWiewsCode(code)).count() == 0) {
if (accessionId.getBreederCode().stream().filter(code -> !MCPDUtil.isWiewsCode(code)).count() == 0) {
pdci.setBredCode(10);
} else {
pdci.setBredCode(5);
......
......@@ -22,7 +22,9 @@
</form>
<form method="post" action="<c:url value="/admin/pdci" />">
<input type="submit" class="btn btn-default" name="updatePdciStats" value="Update all institute PDCI stats" />
<input type="text" name="filter" placeholder="{}" value="{}" />
<button name="action" class="btn btn-default" value="institute-pdci">Update PDCI stats for all institutes</button>
<button name="action" class="btn btn-default" value="filtered-pdci">Calculate PDCI for filter</button>
<%-- <button name="action" value="missing-pdci" class="btn btn-default">Calculate missing PDCI</button>
<button name="action" value="full-recalc" class="btn btn-default">Clean and recalculate</button> --%>
<!-- CSRF protection -->
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment