Commit 1bec9544 authored by Matija Obreza's avatar Matija Obreza
Browse files

SGSV runner

parent 940a67aa
package org.crophub.rest.runner;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.web.WebAppConfiguration;
@WebAppConfiguration
@ContextConfiguration(locations = { "classpath:spring/spring-common.xml",
"classpath:spring/spring-cache.xml",
"classpath:spring/spring-db.xml",
"classpath:spring/spring-security-acl.xml",
"classpath:spring/spring-security.xml" })
@ActiveProfiles("test")
public abstract class RunnerTest {
}
/**
* Copyright 2013 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.crophub.rest.runner;
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient;
import org.crophub.rest.common.model.impl.ColumnGroups;
import org.crophub.rest.common.model.impl.DataMapping;
import org.crophub.rest.common.model.impl.DataSet;
import org.crophub.rest.common.model.impl.Descriptor;
import org.crophub.rest.common.persistence.domain.DescriptorRepository;
import org.crophub.rest.common.service.DataService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import au.com.bytecode.opencsv.CSVReader;
import com.fasterxml.jackson.databind.ObjectMapper;
@RunWith(SpringJUnit4ClassRunner.class)
public class SGSVUpdate extends RunnerTest {
private static final String SGSV_DOWNLOAD_URL = "http://www.nordgen.org/sgsv/download.php?file=/scope/sgsv/files/sgsv_templates.tab";
public static final Log LOG = LogFactory.getLog(SGSVUpdate.class);
private static final ObjectMapper mapper = new ObjectMapper();
private static final int nThreads = Runtime.getRuntime().availableProcessors();
private final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
@Value("${download.files.dir}")
String filesPath;
@Autowired
DescriptorRepository descriptorRepository;
@Autowired
private DataService dataService;
@Test
public void updateSGSV() {
final HttpClient httpclient = new DefaultHttpClient();
final HttpGet httpget = new HttpGet(SGSV_DOWNLOAD_URL);
HttpResponse response = null;
try {
response = httpclient.execute(httpget);
} catch (final ClientProtocolException e) {
LOG.error(e.getMessage(), e);
} catch (final IOException e) {
LOG.error(e.getMessage(), e);
}
LOG.debug(response.getStatusLine());
// Get hold of the response entity
final HttpEntity entity = response.getEntity();
for (final Header header : response.getAllHeaders()) {
LOG.debug(header);
}
LOG.debug(entity.getContentType() + " " + entity.getContentLength());
try {
importSGSVStream(entity.getContent(), SGSV_DOWNLOAD_URL);
} catch (final Throwable e) {
httpget.abort();
}
}
@Test
public void updateSGSVFile() {
final File file = new File(filesPath, "sgsv_templates_20130502.tab");
FileInputStream fis = null;
try {
fis = new FileInputStream(file);
importSGSVStream(fis, file.getAbsolutePath());
} catch (final FileNotFoundException e) {
LOG.error(e.getMessage(), e);
} catch (final IOException e) {
LOG.error(e.getMessage(), e);
} finally {
IOUtils.closeQuietly(fis);
}
}
private void importSGSVStream(final InputStream str, final String source) throws IOException {
int counter = 0;
CSVReader reader = null;
try {
reader = new CSVReader(new BufferedReader(new InputStreamReader(str)), '\t', '"', false);
final String[] headers = reader.readNext();
final Descriptor[] descriptors = new Descriptor[headers.length];
descriptors[0] = descriptorRepository.findByCode("SGSV_ID");
descriptors[1] = descriptorRepository.findByCode("INSTCODE");
descriptors[2] = descriptorRepository.findByCode("SGSV_BOXNO");
descriptors[3] = descriptorRepository.findByCode("COLLNAME");
descriptors[4] = descriptorRepository.findByCode("ACCENUMB");
descriptors[5] = descriptorRepository.findByCode("FULL_SCINAME");
descriptors[7] = descriptorRepository.findByCode("QTY");
descriptors[8] = descriptorRepository.findByCode("regeneration_month_and_year");
descriptors[9] = descriptorRepository.findByCode("OTHERNUMB");
descriptors[10] = descriptorRepository.findByCode("provider_institute_code");
descriptors[12] = descriptorRepository.findByCode("ORIGCTY");
descriptors[16] = descriptorRepository.findByCode("GENUS");
descriptors[17] = descriptorRepository.findByCode("species_epithet");
descriptors[17] = descriptorRepository.findByCode("SPECIES");
descriptors[19] = descriptorRepository.findByCode("TAXON_NAME");
descriptors[20] = descriptorRepository.findByCode("SGSV_DEPOSIT_DATE");
descriptors[23] = descriptorRepository.findByCode("SGSV_BOXID");
descriptors[24] = descriptorRepository.findByCode("SGSV_TAXONID");
descriptors[25] = descriptorRepository.findByCode("taxon_authority");
descriptors[26] = descriptorRepository.findByCode("infraspesific_epithet");
descriptors[27] = descriptorRepository.findByCode("VERNACULAR_NAME");
descriptors[29] = descriptorRepository.findByCode("SGSV_GENUSID");
LOG.debug("Headers: " + headers.length);
assertTrue(headers.length == 30);
final ColumnGroups grouping = new ColumnGroups();
final Map<Integer, Long> traitMap = new HashMap<Integer, Long>();
for (int i = 0; i < headers.length; i++) {
LOG.debug(i + ": " + headers[i]);
if (descriptors[i] == null) {
LOG.warn("No trait for " + headers[i]);
} else {
LOG.debug("Using trait " + descriptors[i]);
grouping.addColumn(i);
traitMap.put(i, descriptors[i].getId());
}
}
final DataSet dataset = new DataSet();
dataset.setName("SGSV update");
dataset.setHeaders(mapper.writeValueAsString(headers));
final DataMapping dataMapping = new DataMapping();
dataMapping.setGrouping(grouping);
dataMapping.setDescriptorMap(traitMap);
dataset.setMapping(mapper.writeValueAsString(dataMapping));
dataset.setSource(source);
dataset.setUploadDate(new Date());
dataService.save(dataset);
final List<String[]> bulk = new ArrayList<String[]>(1000);
String[] line = null;
while ((line = reader.readNext()) != null) {
if (counter % 1000 == 0) {
LOG.debug(counter + ": " + ArrayUtils.toString(line));
}
// Clean up
for (int i = 0; i < line.length; i++) {
line[i] = line[i].trim();
if (line[i].equals("null") || line[i].equals("<null>") || StringUtils.isBlank(line[i])) {
line[i] = null;
}
}
bulk.add(line);
counter++;
if (counter % 1000 == 0) {
workIt(dataset, descriptors, bulk);
bulk.clear();
}
}
dataService.writeEntries(dataset, descriptors, bulk);
bulk.clear();
} catch (final Throwable e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
} finally {
IOUtils.closeQuietly(reader);
}
// Wait for pool
shutdownPool();
LOG.info("Done importing SGSV data. Imported: " + counter);
}
private void workIt(final DataSet dataset, final Descriptor[] descriptors, final List<String[]> bulk) {
while (threadPool.getQueue().size() > nThreads) {
LOG.trace("Queue is too large, waiting...");
try {
Thread.sleep(100);
} catch (final InterruptedException e) {
LOG.warn(e.getMessage());
}
}
threadPool.execute(new SparseDataUpdater(dataService, dataset, descriptors, new ArrayList<String[]>(bulk)));
}
private void shutdownPool() {
threadPool.shutdown();
LOG.info("Waiting for all threads to terminate");
try {
while (!threadPool.awaitTermination(1, TimeUnit.SECONDS)) {
try {
Thread.sleep(200);
} catch (final InterruptedException e) {
LOG.warn(e.getMessage());
}
}
LOG.info("All workers terminated.");
} catch (final InterruptedException e) {
LOG.error(e.getMessage(), e);
}
}
private static class SparseDataUpdater implements Runnable {
private final DataSet dataset;
private final Descriptor[] descriptors;
private final ArrayList<String[]> bulk;
private final DataService dataService;
public SparseDataUpdater(final DataService dataService, final DataSet dataset, final Descriptor[] descriptors, final ArrayList<String[]> bulk) {
this.dataService = dataService;
this.dataset = dataset;
this.descriptors = descriptors;
this.bulk = bulk;
}
@Override
public void run() {
dataService.writeEntries(dataset, descriptors, bulk);
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment