Commit 56f2b4da authored by Matija Obreza's avatar Matija Obreza
Browse files

Use Spring's ThreadPoolTaskExecutor

parent bf389ee6
......@@ -45,11 +45,6 @@ public interface GeoService {
List<Country> listAll(Locale locale);
/**
* Update alternate/translated names of countries
*/
void updateCountryNames() throws IOException;
Country getCountryByRefnameId(long refnameId);
List<Long> listCountryRefnameIds();
......
......@@ -24,16 +24,12 @@ import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import javax.annotation.PreDestroy;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.StopWatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpEntity;
......@@ -47,6 +43,7 @@ import org.genesys2.server.service.CountryNamesUpdater;
import org.genesys2.server.service.GeoService;
import org.genesys2.server.service.InstituteService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -67,12 +64,11 @@ public class CountryAlternateNamesUpdater implements CountryNamesUpdater {
@Autowired
private GeoService geoService;
private static final int nThreads = Runtime.getRuntime().availableProcessors();
@Autowired
private TaskExecutor taskExecutor;
private static final int BATCH_SIZE = 100;
private final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
/**
* Update local FaoInstitute with data from WIEWS database
*
......@@ -139,7 +135,7 @@ public class CountryAlternateNamesUpdater implements CountryNamesUpdater {
try {
final List<String[]> batch = new ArrayList<String[]>(BATCH_SIZE);
String prevRefnameId = null;
String[] line = null;
......@@ -148,10 +144,14 @@ public class CountryAlternateNamesUpdater implements CountryNamesUpdater {
LOG.info("Got " + countryRefnameIds.size() + " refnameIds");
int counter = 0;
// Timer
StopWatch stopWatch=new StopWatch();
stopWatch.start();
while ((line = reader.readNext()) != null) {
counter++;
if (counter % 10000 == 0) {
LOG.info("Country alternate names @ line " + counter);
if (LOG.isDebugEnabled() && counter % 10000 == 0) {
LOG.debug("Country alternate names @ line " + counter + " in " + stopWatch.getTime() + "ms");
}
for (int i = 0; i < line.length; i++) {
......@@ -188,7 +188,9 @@ public class CountryAlternateNamesUpdater implements CountryNamesUpdater {
LOG.warn("But no prefRefnameId!!");
}
}
LOG.info("Done importing alternate geonames");
stopWatch.stop();
LOG.info("Done importing alternate geonames in " + stopWatch.getTime() + "ms");
} catch (UnsupportedEncodingException e) {
LOG.error(e.getMessage(), e);
} finally {
......@@ -204,16 +206,7 @@ public class CountryAlternateNamesUpdater implements CountryNamesUpdater {
// Need copy!
final List<String[]> batchCopy = new ArrayList<String[]>(batch);
// while (threadPool.getQueue().size() >= nThreads) {
// LOG.warn("Queue is too large, waiting...");
// try {
// Thread.sleep(100);
// } catch (final InterruptedException e) {
// LOG.warn(e.getMessage());
// }
// }
threadPool.execute(new Runnable() {
taskExecutor.execute(new Runnable() {
@Override
public void run() {
// Fetch FaoInstitutes from DB
......@@ -253,23 +246,4 @@ public class CountryAlternateNamesUpdater implements CountryNamesUpdater {
}
});
}
@PreDestroy
public void shutdownPool() {
threadPool.shutdown();
LOG.info("Waiting for all threads to terminate");
try {
while (!threadPool.awaitTermination(1, TimeUnit.SECONDS)) {
try {
Thread.sleep(200);
} catch (final InterruptedException e) {
LOG.warn(e.getMessage());
}
}
LOG.info("All workers terminated.");
} catch (final InterruptedException e) {
LOG.error(e.getMessage(), e);
}
}
}
......@@ -49,9 +49,6 @@ public class GeoServiceImpl implements GeoService {
@Autowired
ContentService contentService;
@Autowired
private CountryAlternateNamesUpdater alternateNamesUpdater;
@Override
public List<Country> listAll() {
return countryRepository.findAll(new Sort("name", "current"));
......@@ -93,6 +90,8 @@ public class GeoServiceImpl implements GeoService {
// update from Davros, it has info on inactive country codes
updateDavrosCountries();
LOG.info("Country data up to date");
}
private void updateDavrosCountries() throws IOException {
......@@ -110,7 +109,7 @@ public class GeoServiceImpl implements GeoService {
final Country country = countryRepository.findByCode3(countryInfo.getIso3());
if (country == null) {
LOG.warn("Country " + countryInfo.getIso3() + " is not registered: " + countryInfo);
LOG.info("Country " + countryInfo.getIso3() + " is not registered: " + countryInfo);
if (countryInfo.isActive()) {
LOG.warn("Country is marked as active. Should not be.");
......@@ -172,7 +171,7 @@ public class GeoServiceImpl implements GeoService {
final Country country = countryRepository.findByCode3(countryInfo.getIso3());
if (country == null) {
LOG.warn("Country " + countryInfo + " is not registered");
LOG.info("Country " + countryInfo + " is not registered");
final Country newCountry = new Country();
newCountry.setCode2(countryInfo.getIso());
......@@ -213,11 +212,6 @@ public class GeoServiceImpl implements GeoService {
}
}
@Override
public void updateCountryNames() throws IOException {
alternateNamesUpdater.updateAlternateNames();
}
@Override
@PreAuthorize("hasRole('ADMINISTRATOR')")
@Transactional(readOnly = false)
......
......@@ -25,14 +25,9 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import javax.annotation.PreDestroy;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
......@@ -47,6 +42,7 @@ import org.genesys2.server.model.impl.FaoInstitute;
import org.genesys2.server.service.GeoService;
import org.genesys2.server.service.InstituteService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
import au.com.bytecode.opencsv.CSVReader;
......@@ -62,13 +58,12 @@ public class InstituteUpdater {
@Autowired
private GeoService geoService;
private static final int nThreads = Runtime.getRuntime().availableProcessors();
@Autowired
private TaskExecutor taskExecutor;
private static final int BATCH_SIZE = 50;
private final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
/**
* Update local FaoInstitute with data from WIEWS database
*
......@@ -153,19 +148,10 @@ public class InstituteUpdater {
private void workIt(final Map<String, String[]> batch) {
// while (threadPool.getQueue().size() > nThreads) {
// LOG.warn("Queue is too large, waiting...");
// try {
// Thread.sleep(100);
// } catch (final InterruptedException e) {
// LOG.warn(e.getMessage());
// }
// }
// Need copy!
final Map<String, String[]> batchCopy = new HashMap<String, String[]>(batch);
threadPool.execute(new Runnable() {
taskExecutor.execute(new Runnable() {
@Override
public void run() {
LOG.info("Processing next batch");
......@@ -251,7 +237,9 @@ public class InstituteUpdater {
Double alt = parseDoubleIgnore0(altitude, 1);
faoInstitute.setAltitude(alt);
if (faoInstitute.getCountry() == null || faoInstitute.getCountry().getCode3().equals(instCode.substring(0, 3))) {
// Update institute country if null or when not matching the code
// FIXME Some countries have "changed" (e.g. YUG)
if (faoInstitute.getCountry() == null || ! faoInstitute.getCountry().getCode3().equals(instCode.substring(0, 3))) {
faoInstitute.setCountry(geoService.getCountry(instCode.substring(0, 3)));
}
......@@ -260,22 +248,4 @@ public class InstituteUpdater {
});
}
@PreDestroy
public void shutdownPool() {
threadPool.shutdown();
LOG.info("Waiting for all threads to terminate");
try {
while (!threadPool.awaitTermination(1, TimeUnit.SECONDS)) {
try {
Thread.sleep(200);
} catch (final InterruptedException e) {
LOG.warn(e.getMessage());
}
}
LOG.info("All workers terminated.");
} catch (final InterruptedException e) {
LOG.error(e.getMessage(), e);
}
}
}
......@@ -27,11 +27,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.ArrayUtils;
......@@ -49,10 +44,12 @@ import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.model.impl.AccessionIdentifier3;
import org.genesys2.server.model.impl.FaoInstitute;
import org.genesys2.server.service.GenesysService;
import org.genesys2.server.service.GeoService;
import org.genesys2.server.service.InstituteService;
import org.genesys2.server.service.TaxonomyService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.task.TaskExecutor;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Component;
......@@ -65,10 +62,12 @@ public class SGSVInsertMissing {
public static final Log LOG = LogFactory.getLog(SGSVInsertMissing.class);
private static final int BATCH_SIZE = 50;
private static final int nThreads = Runtime.getRuntime().availableProcessors();
private final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
@Autowired
private GeoService geoService;
@Autowired
private TaskExecutor taskExecutor;
private static final String[] institutes = { "AUS039", "BDI003", "BDI004", "BDI005", "BRA001", "BRA008", "CAN004", "CHL002", "CIV039", "CRI001", "ECU076",
"GBR072", "GEO028", "GRC035", "IDN179", "ISR003", "ITA411", "KEN015", "KEN023", "KEN045", "KEN053", "KEN055", "KOR043", "MLI002", "MLI003",
"MLI219", "MMR003", "MNG030", "NGA010", "PAK001", "PER002", "PHL129", "PRK013", "SDN034", "THA214", "TJK027", "TWN006", "UGA031", "UKR001",
......@@ -232,16 +231,7 @@ public class SGSVInsertMissing {
final ArrayList<String[]> bulkCopy = new ArrayList<String[]>(bulk);
bulk.clear();
while (threadPool.getQueue().size() > nThreads) {
LOG.trace("Queue is too large, waiting...");
try {
Thread.sleep(100);
} catch (final InterruptedException e) {
LOG.warn(e.getMessage());
}
}
threadPool.execute(new Runnable() {
taskExecutor.execute(new Runnable() {
@Override
public void run() {
List<SGSVEntry> accns = new ArrayList<SGSVEntry>(bulkCopy.size());
......@@ -275,6 +265,7 @@ public class SGSVInsertMissing {
accn.setAccessionName(entry.acceNumb);
accn.setOrigin(entry.origCty);
accn.setCountryOfOrigin(geoService.getCountry(entry.origCty));
accn.setGenus(entry.genus);
accn.setTaxonomy(taxonomyService.ensureTaxonomy(entry.genus, entry.species, entry.fullTaxa));
accn.setInstituteCode(entry.instCode);
......@@ -312,23 +303,6 @@ public class SGSVInsertMissing {
return inst;
}
@PreDestroy
private void shutdownPool() {
threadPool.shutdown();
LOG.info("Waiting for all threads to terminate");
try {
while (!threadPool.awaitTermination(1, TimeUnit.SECONDS)) {
try {
Thread.sleep(200);
} catch (final InterruptedException e) {
LOG.warn(e.getMessage());
}
}
LOG.info("All workers terminated.");
} catch (final InterruptedException e) {
LOG.error(e.getMessage(), e);
}
}
private class SGSVEntry implements AccessionIdentifier3 {
String instCode;
......
......@@ -25,11 +25,6 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.ArrayUtils;
......@@ -50,6 +45,7 @@ import org.genesys2.server.service.GenesysService;
import org.hibernate.exception.ConstraintViolationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.task.TaskExecutor;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Component;
......@@ -62,9 +58,8 @@ public class SGSVUpdate {
public static final Log LOG = LogFactory.getLog(SGSVUpdate.class);
private static final int BATCH_SIZE = 50;
private static final int nThreads = Runtime.getRuntime().availableProcessors();
private final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
@Autowired
private TaskExecutor taskExecutor;
@Value("${download.files.dir}")
String filesPath;
......@@ -216,16 +211,7 @@ public class SGSVUpdate {
final ArrayList<String[]> bulkCopy = new ArrayList<String[]>(bulk);
bulk.clear();
while (threadPool.getQueue().size() > nThreads) {
LOG.trace("Queue is too large, waiting...");
try {
Thread.sleep(100);
} catch (final InterruptedException e) {
LOG.warn(e.getMessage());
}
}
threadPool.execute(new Runnable() {
taskExecutor.execute(new Runnable() {
@Override
public void run() {
List<SGSVEntry> accns = new ArrayList<SGSVEntry>(bulkCopy.size());
......@@ -303,24 +289,6 @@ public class SGSVUpdate {
});
}
@PreDestroy
private void shutdownPool() {
threadPool.shutdown();
LOG.info("Waiting for all threads to terminate");
try {
while (!threadPool.awaitTermination(1, TimeUnit.SECONDS)) {
try {
Thread.sleep(200);
} catch (final InterruptedException e) {
LOG.warn(e.getMessage());
}
}
LOG.info("All workers terminated.");
} catch (final InterruptedException e) {
LOG.error(e.getMessage(), e);
}
}
private class SGSVEntry implements AccessionIdentifier3 {
String instCode;
String acceNumb;
......
......@@ -92,16 +92,6 @@ public class AdminController {
return "redirect:/admin/";
}
@RequestMapping(method = RequestMethod.POST, value = "/refreshCountryNames")
public String refreshCountryNames() {
try {
geoService.updateCountryNames();
} catch (final IOException e) {
LOG.error(e.getMessage(), e);
}
return "redirect:/admin/";
}
@RequestMapping(method = RequestMethod.POST, value = "/reindexEverything")
public String reindexEverything() {
luceneIndexer.reindexEverything();
......
......@@ -53,4 +53,9 @@
</property>
</bean>
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="16" />
</bean>
</beans>
......@@ -11,39 +11,47 @@
<h1>
<spring:message code="admin.page.title" />
</h1>
<h3>Country data</h3>
<form method="post" action="<c:url value="/admin/refreshCountries" />">
<input type="submit" class="btn btn-default" value="Refresh country data" />
</form>
<form method="post" action="<c:url value="/admin/updateAlternateNames" />">
<input type="submit" class="btn btn-default" value="Update alternate GEO names" />
</form>
<h3>WIEWS</h3>
<form method="post" action="<c:url value="/admin/refreshWiews" />">
<input type="submit" class="btn btn-default" value="Refresh WIEWS data" />
</form>
<h3>Svalbard Global Seed Vault</h3>
<form method="post" action="<c:url value="/admin/updateSGSV" />">
<input type="submit" class="btn btn-default" value="Update SGSV" />
</form>
<form method="post" action="<c:url value="/admin/importSGSV" />">
<input type="submit" class="btn btn-default" value="Import SGSV" />
</form>
<form method="post" action="<c:url value="/admin/refreshCountries" />">
<input type="submit" class="btn btn-default" value="Refresh country data" />
</form>
<form method="post" action="<c:url value="/admin/reindexEverything" />">
<input type="submit" class="btn btn-default" class="btn btn-default" value="Reindex search indexes" />
</form>
<h3>Accession</h3>
<form method="post" action="<c:url value="/admin/updateAccessionCountryRefs" />">
<input type="submit" class="btn btn-default" class="btn btn-default" value="Update accession country info" />
</form>
<form method="post" action="<c:url value="/admin/updateInstituteCountryRefs" />">
<input type="submit" class="btn btn-default" class="btn btn-default" value="Update WIEWS country info" />
</form>
<form method="post" action="<c:url value="/admin/updateAccessionInstituteRefs" />">
<input type="submit" class="btn btn-default" value="Update accession institute info" />
</form>
<h3>Content</h3>
<form method="post" action="<c:url value="/admin/sanitize" />">
<input type="submit" class="btn btn-default" value="Sanitize HTML content" />
</form>
<form method="post" action="<c:url value="/admin/updateAlternateNames" />">
<input type="submit" class="btn btn-default" value="Update alternate GEO names" />
<h3>Full-text Search</h3>
<form method="post" action="<c:url value="/admin/reindexEverything" />">
<input type="submit" class="btn btn-default" class="btn btn-default" value="Reindex search indexes" />
</form>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment