Commit 38eabe65 authored by Matija Obreza's avatar Matija Obreza
Browse files

Milko build

parent 64076cb4
......@@ -19,14 +19,17 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.time.StopWatch;
......@@ -55,6 +58,7 @@ import org.genesys2.anno.converter.RowConverter;
import org.genesys2.anno.model.OAuthSettings;
import org.genesys2.anno.model.Settings;
import org.genesys2.anno.parser.RowReader;
import org.genesys2.anno.predefined.RdfMCPD;
import org.genesys2.client.oauth.GenesysApiException;
import org.genesys2.client.oauth.GenesysClient;
import org.genesys2.client.oauth.OAuthAuthenticationException;
......@@ -71,7 +75,8 @@ import swing2swt.layout.BorderLayout;
public class PushDialog extends Dialog {
private static final Logger _log = Logger.getLogger(PushDialog.class);
protected static final int BATCH_SIZE = 100;
protected static final int BATCH_SIZE = 50;
protected static final int MAX_UPLOAD_THREADS = 2;
public static enum GenesysOp {
UPSERT, DELETE
......@@ -89,8 +94,8 @@ public class PushDialog extends Dialog {
@Autowired
protected DataSourceLoader dataSourceLoader;
BlockingQueue<Runnable> linkedBlockingDeque = new LinkedBlockingDeque<Runnable>(36);
private ThreadPoolExecutor executorService = new ThreadPoolExecutor(4, 8, 30, TimeUnit.SECONDS, linkedBlockingDeque, new ThreadPoolExecutor.CallerRunsPolicy());
BlockingQueue<Runnable> linkedBlockingDeque = new LinkedBlockingDeque<Runnable>(MAX_UPLOAD_THREADS * 3);
private ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, MAX_UPLOAD_THREADS + 1, 30, TimeUnit.SECONDS, linkedBlockingDeque, new ThreadPoolExecutor.CallerRunsPolicy());
protected Object result;
protected Shell shell;
......@@ -339,135 +344,179 @@ public class PushDialog extends Dialog {
OAuthSettings oauthSettings = settings.getOauthSettings();
_log.info("Starting push to " + oauthSettings.getServerUrl());
final GenesysClient genesysClient = SpringConfig.createGenesysClient(oauthSettings);
try {
// Ping the server
genesysClient.me();
final RowReader rowReader = dataSourceLoader.createRowReader(dataSourceSheet, dataSource);
rowReader.setSkipRows(dataSourceSheet.getHeaderRowIndex() + 1);
Future<?> future = executorService.submit(() -> {
try {
final RowReader rowReader1 = dataSourceLoader.createRowReader(dataSourceSheet, dataSource);
rowReader1.setSkipRows(dataSourceSheet.getHeaderRowIndex() + 1);
Future<?> future = executorService.submit(new Runnable() {
final Map<String, List<ObjectNode>> instCodeMap = new HashMap<String, List<ObjectNode>>();
List<String> instCodesOrder = new ArrayList<>();
@Override
public void run() {
{
Set<String> instCodes = new HashSet<>();
// Read instCodes
List<Object[]> rows = null;
int count = 0;
try {
do {
rows = rowReader1.readRows(1000);
for (Object[] row : rows) {
count ++;
Map<String, ?> accnMap = RowConverter.toMap(dataSourceSheet, row, columnDefs);
final String accessionInstCode = (String) accnMap.get(RdfMCPD.INSTCODE);
if (instCodes.add(accessionInstCode)) {
_log.info("Detected INSTCODE: " + accessionInstCode + " in line " + count);
instCodesOrder.add(accessionInstCode);
}
}
} while (rows != null && rows.size() > 0);
}
_log.warn("Have instCodes: " + instCodesOrder.stream().sorted().collect(Collectors.toList()));
do {
List<Object[]> rows = rowReader.readRows(100);
if (rows.size() == 0) {
_log.info("Exhausted data source.");
break;
// Ping the server
genesysClient.me();
for (String instCode : instCodesOrder) {
Future<?> instFuture = executorService.submit(new Runnable() {
final Map<String, List<ObjectNode>> instCodeMap = new HashMap<String, List<ObjectNode>>();
@Override
public void run() {
try (RowReader rowReader = dataSourceLoader.createRowReader(dataSourceSheet, dataSource)) {
rowReader.setSkipRows(dataSourceSheet.getHeaderRowIndex() + 1);
uploadInsitute(instCode, rowReader);
} catch (IOException | UnsupportedDataFormatException e) {
_log.error(e.getMessage(), e);
}
}
_log.info("Reading source data starting at row=" + count);
for (Object[] row : rows) {
count++;
_log.debug(count + ": " + ArrayUtils.toString(row));
Map<String, ?> accnMap = RowConverter.toMap(dataSourceSheet, row, columnDefs);
try {
ObjectNode accnJson = RowConverter.toJson(accnMap);
final String instCode = accnJson.get(Api1Constants.Accession.INSTCODE).textValue();
private void uploadInsitute(String instCode, RowReader rowReader) {
int rowCount = 0;
int accessionCount = 0;
List<ObjectNode> instCodeBatch = new ArrayList<>(BATCH_SIZE);
_log.warn("Uploading data for " + instCode);
List<ObjectNode> instCodeBatch = null;
synchronized (instCodeMap) {
instCodeBatch = instCodeMap.get(instCode);
if (instCodeBatch == null) {
instCodeMap.put(instCode, instCodeBatch = new ArrayList<ObjectNode>());
}
try {
do {
List<Object[]> rows = rowReader.readRows(100);
if (rows.size() == 0) {
_log.info("Exhausted data source.");
break;
}
synchronized (instCodeBatch) {
instCodeBatch.add(accnJson);
if (instCodeBatch.size() >= BATCH_SIZE) {
doAsyncPush(operation, genesysClient, instCode, instCodeBatch);
// _log.info("Reading source data for " + instCode + " starting at row=" +
// count);
for (Object[] row : rows) {
rowCount++;
_log.trace(rowCount + ": " + ArrayUtils.toString(row));
Map<String, ?> accnMap = RowConverter.toMap(dataSourceSheet, row, columnDefs);
final String accessionInstCode = (String) accnMap.get(RdfMCPD.INSTCODE);
if (!accessionInstCode.equals(instCode)) {
continue;
}
try {
ObjectNode accnJson = RowConverter.toJson(accnMap);
accessionCount++;
instCodeBatch.add(accnJson);
if (instCodeBatch.size() >= BATCH_SIZE) {
doAsyncPush(operation, genesysClient, accessionInstCode, instCodeBatch);
instCodeBatch.clear();
}
} catch (GenesysJSONIncompleteException e) {
_log.info("Ignoring incomplete accession " + ArrayUtils.toString(row));
}
}
} catch (GenesysJSONIncompleteException e) {
_log.info("Ignoring incomplete accession " + ArrayUtils.toString(row));
}
}
Thread.sleep(1);
} while (true);
Thread.sleep(5);
} while (true);
_log.info("Pushing remaining data for " + instCode);
if (instCodeBatch.size() > 0) {
doAsyncPush(operation, genesysClient, instCode, instCodeBatch);
}
_log.info("Pushing queued data");
for (final String instCode : instCodeMap.keySet()) {
List<ObjectNode> accns = instCodeMap.get(instCode);
if (accns.size() > 0) {
doAsyncPush(operation, genesysClient, instCode, accns);
_log.warn("Done processing upload jobs for " + accessionCount + " accessions for " + instCode);
} catch (GenesysJSONException e) {
_log.error("Genesys JSON conversion failed in batch" + rowCount);
_log.error(e.getMessage(), e);
} catch (InterruptedException e) {
_log.info("Execution was interrupted");
} catch (Throwable e) {
_log.error(e.getMessage(), e);
e.printStackTrace();
} finally {
_log.info("Reader finished.");
// try {
// rowReader.close();
// } catch (IOException e) {
// _log.error("Failed to close rowReader: " + e.getMessage(), e);
// }
}
}
});
_log.info("Done queuing upload jobs!");
futures.add(instFuture);
} catch (IOException e) {
_log.error("Error reading row " + rowReader.getRowCount());
_log.error(e.getMessage(), e);
} catch (GenesysJSONException e) {
_log.error("Genesys JSON conversion failed in batch" + count);
_log.error(e.getMessage(), e);
Thread.sleep(200);
}
// Wait for uploads to finish
do {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
_log.info("Execution was interrupted");
} catch (Throwable e) {
_log.error(e.getMessage(), e);
e.printStackTrace();
} finally {
_log.info("Reader finished.");
try {
rowReader.close();
} catch (IOException e) {
_log.error("Failed to close rowReader: " + e.getMessage(), e);
}
break;
}
// Wait for uploads to finish
do {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
for (int i = futures.size() - 1; i >= 0; i--) {
Future<?> future = futures.get(i);
if (future.isDone()) {
futures.remove(future);
}
}
if (futures.size() > 1) {
_log.warn("Waiting for uploads still running: " + futures.size());
for (int i = futures.size() - 1; i >= 0; i--) {
Future<?> f = futures.get(i);
if (f.isDone()) {
futures.remove(f);
}
// We're the last job
} while (futures.size() > 1);
_log.warn("PUSH FINISHED.");
_log.warn("Really.");
}
});
}
futures.add(future);
if (futures.size() > 1) {
_log.warn("Waiting for uploads still running: " + futures.size());
}
} catch (OAuthAuthenticationException e) {
_log.error(e.getMessage(), e);
} catch (PleaseRetryException e) {
_log.error(e.getMessage(), e);
} catch (GenesysApiException e) {
_log.error(e.getMessage(), e);
} catch (IOException e) {
_log.error(e.getMessage(), e);
} catch (UnsupportedDataFormatException e) {
_log.error(e.getMessage(), e);
} catch (Throwable e) {
_log.error(e.getMessage(), e);
} finally {
_log.info("Done.");
}
// We're the last job
} while (futures.size() > 1);
_log.warn("PUSH FINISHED.");
_log.warn("Really.");
} catch (OAuthAuthenticationException e) {
_log.error(e.getMessage(), e);
} catch (PleaseRetryException e) {
_log.error(e.getMessage(), e);
} catch (GenesysApiException e) {
_log.error(e.getMessage(), e);
} catch (IOException e) {
_log.error(e.getMessage(), e);
} catch (UnsupportedDataFormatException e) {
_log.error(e.getMessage(), e);
} catch (Throwable e) {
_log.error(e.getMessage(), e);
} finally {
_log.info("Done.");
}
});
futures.add(future);
}
public void setDataSourceSheet(IDataSourceSheet dss) {
......@@ -488,6 +537,9 @@ public class PushDialog extends Dialog {
return;
}
final ArrayList<ObjectNode> accns = new ArrayList<ObjectNode>(instCodeBatch);
instCodeBatch.clear();
try {
Thread.sleep(10);
} catch (InterruptedException e1) {
......@@ -495,7 +547,6 @@ public class PushDialog extends Dialog {
}
Future<?> future = executorService.submit(new Runnable() {
final ArrayList<ObjectNode> accns = new ArrayList<ObjectNode>(instCodeBatch);
final GenesysOp op = operation;
@Override
......@@ -503,7 +554,7 @@ public class PushDialog extends Dialog {
try {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
_log.debug("Pushing data for instCode=" + instCode + " size=" + accns.size());
if (_log.isTraceEnabled()) {
for (ObjectNode o : accns) {
......@@ -553,9 +604,6 @@ public class PushDialog extends Dialog {
});
futures.add(future);
instCodeBatch.clear();
}
public void setDataSource(IDataSource dataSource) {
......
......@@ -24,8 +24,8 @@ import org.apache.log4j.Logger;
public class GeoUtils {
private static final Logger _log = Logger.getLogger(GeoUtils.class);
private static final Pattern LATITUDE = Pattern.compile("([\\d\\- ]{2})([\\d\\- ]{2})([\\d\\- ]{2})(N|S)");
private static final Pattern LONGITUDE = Pattern.compile("([\\d\\- ]{3})([\\d\\- ]{2})([\\d\\- ]{2})(E|W)");
private static final Pattern LATITUDE = Pattern.compile("([\\d\\- ]{2})([\\d\\- ]{2})([\\d\\- ]{2})(N|S)", Pattern.CASE_INSENSITIVE);
private static final Pattern LONGITUDE = Pattern.compile("([\\d\\- ]{3})([\\d\\- ]{2})([\\d\\- ]{2})(E|W)", Pattern.CASE_INSENSITIVE);
/**
* Convert DDMMSS[NS] to decimal latitude
......@@ -46,7 +46,7 @@ public class GeoUtils {
Matcher m = LATITUDE.matcher(str.replace('-', '0').replace(' ', '0'));
if (m.matches()) {
_log.trace("Latitude " + str + " matches format DDMMSS[EW]");
_log.trace("Latitude " + str + " matches format DDMMSS[NS]");
String deg = m.group(1);
String min = m.group(2);
String sec = m.group(3);
......@@ -55,12 +55,12 @@ public class GeoUtils {
double latitude = Double.parseDouble(deg);
latitude += Double.parseDouble(min) / 60.0;
latitude += Double.parseDouble(sec) / 60.0 / 60.0;
latitude *= compass.equals("N") ? 1.0 : -1.0;
latitude *= compass.toUpperCase().equals("N") ? 1.0 : -1.0;
latitude = Math.round(latitude * 100000.0) / 100000.0;
return latitude;
} else {
_log.error("Latitude " + str + " does not match format DDMMSS[EW]");
throw new CoordinateConversionException("Latitude " + str + " does not match format DDMMSS[EW]");
throw new CoordinateConversionException("Latitude " + str + " does not match format DDMMSS[NS]");
}
}
......@@ -93,7 +93,7 @@ public class GeoUtils {
double longitude = Double.parseDouble(deg);
longitude += Double.parseDouble(min) / 60.0;
longitude += Double.parseDouble(sec) / 60.0 / 60.0;
longitude *= compass.equals("E") ? 1.0 : -1.0;
longitude *= compass.toUpperCase().equals("E") ? 1.0 : -1.0;
longitude = Math.round(longitude * 100000.0) / 100000.0;
return longitude;
} else {
......
......@@ -15,6 +15,7 @@
*/
package org.genesys2.anno.util;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.*;
import org.junit.Test;
......@@ -29,7 +30,7 @@ public class GeoUtilsTest {
assertTrue(GeoUtils.toDecimalLatitude("000000N") == 0.0);
assertTrue(GeoUtils.toDecimalLatitude("000000S") == -0.0);
assertTrue(GeoUtils.toDecimalLatitude("100000N") == 10.0);
assertThat(GeoUtils.toDecimalLatitude("100000N"), is(10.0));
assertTrue(GeoUtils.toDecimalLatitude("100---N") == 10.0);
assertTrue(GeoUtils.toDecimalLatitude("100000S") == -10.0);
assertTrue(GeoUtils.toDecimalLatitude("000001N") == 0.00028);
......@@ -37,7 +38,7 @@ public class GeoUtilsTest {
assertTrue(GeoUtils.toDecimalLatitude("000010N") == 0.00278);
assertTrue(GeoUtils.toDecimalLatitude("000010S") == -0.00278);
}
@Test
public void testLongitude() throws CoordinateConversionException {
assertTrue(GeoUtils.toDecimalLongitude(null) == null);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment