Commit 729f50ed authored by Matija Obreza's avatar Matija Obreza

No INSTCODE filters

parent da352684
......@@ -17,6 +17,7 @@ package org.genesys2.anno.gui;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
......@@ -77,6 +78,8 @@ public class PushDialog extends Dialog {
private static final Logger _log = Logger.getLogger(PushDialog.class);
protected static final int BATCH_SIZE = 50;
protected static final int MAX_UPLOAD_THREADS = 4;
protected static final int MAX_POOL_SIZE = 30;
public static enum GenesysOp {
UPSERT, DELETE
......@@ -95,7 +98,7 @@ public class PushDialog extends Dialog {
protected DataSourceLoader dataSourceLoader;
BlockingQueue<Runnable> linkedBlockingDeque = new LinkedBlockingDeque<Runnable>(MAX_UPLOAD_THREADS * 3);
private ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, MAX_UPLOAD_THREADS + 2, 30, TimeUnit.SECONDS, linkedBlockingDeque, new ThreadPoolExecutor.CallerRunsPolicy());
private ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, MAX_UPLOAD_THREADS + 2, MAX_POOL_SIZE, TimeUnit.SECONDS, linkedBlockingDeque, new ThreadPoolExecutor.CallerRunsPolicy());
protected Object result;
protected Shell shell;
......@@ -347,12 +350,11 @@ public class PushDialog extends Dialog {
Future<?> future = executorService.submit(() -> {
try {
final RowReader rowReader1 = dataSourceLoader.createRowReader(dataSourceSheet, dataSource);
rowReader1.setSkipRows(dataSourceSheet.getHeaderRowIndex() + 1);
List<String> instCodesOrder = new ArrayList<>();
{
try (final RowReader rowReader1 = dataSourceLoader.createRowReader(dataSourceSheet, dataSource)) {
rowReader1.setSkipRows(dataSourceSheet.getHeaderRowIndex() + 1);
Set<String> instCodes = new HashSet<>();
// Read instCodes
List<Object[]> rows = null;
......@@ -360,8 +362,8 @@ public class PushDialog extends Dialog {
do {
rows = rowReader1.readRows(1000);
for (Object[] row : rows) {
count ++;
count++;
Map<String, ?> accnMap = RowConverter.toMap(dataSourceSheet, row, columnDefs);
final String accessionInstCode = (String) accnMap.get(RdfMCPD.INSTCODE);
if (instCodes.add(accessionInstCode)) {
......@@ -371,38 +373,31 @@ public class PushDialog extends Dialog {
}
} while (rows != null && rows.size() > 0);
}
_log.warn("Have instCodes: " + instCodesOrder.stream().sorted().collect(Collectors.toList()));
_log.warn("Have instCodes: " + instCodesOrder.stream().sorted().collect(Collectors.toList()));
// Ping the server
genesysClient.me();
for (String instCode : instCodesOrder) {
_log.warn("Queueing upload for " + instCode);
_log.warn("Queueing upload for " + instCodesOrder.toString());
Future<?> instFuture = executorService.submit(new Runnable() {
// Filter only for specific INSTCODES
final Set<String> instCodesFilter = new HashSet<>(); // (instCodesOrder);
@Override
public void run() {
_log.warn("Running upload for " + instCode);
try (RowReader rowReader = dataSourceLoader.createRowReader(dataSourceSheet, dataSource)) {
rowReader.setSkipRows(dataSourceSheet.getHeaderRowIndex() + 1);
Future<?> instFuture = executorService.submit(new Runnable() {
final Map<String, List<ObjectNode>> instCodeMap = new HashMap<String, List<ObjectNode>>();
uploadInsitute(instCode, rowReader);
@Override
public void run() {
_log.warn("Running upload for " + instCodesFilter.toString());
try (RowReader rowReader = dataSourceLoader.createRowReader(dataSourceSheet, dataSource)) {
rowReader.setSkipRows(dataSourceSheet.getHeaderRowIndex() + 1);
} catch (IOException | UnsupportedDataFormatException e) {
_log.error(e.getMessage(), e);
}
}
private void uploadInsitute(String instCode, RowReader rowReader) {
int rowCount = 0;
int accessionCount = 0;
List<ObjectNode> instCodeBatch = new ArrayList<>(BATCH_SIZE);
_log.warn("Uploading data for " + instCode);
try {
do {
List<Object[]> rows = rowReader.readRows(100);
......@@ -413,15 +408,15 @@ public class PushDialog extends Dialog {
}
// _log.info("Reading source data for " + instCode + " starting at row=" +
// count);
for (Object[] row : rows) {
rowCount++;
_log.trace(rowCount + ": " + ArrayUtils.toString(row));
Map<String, ?> accnMap = RowConverter.toMap(dataSourceSheet, row, columnDefs);
final String accessionInstCode = (String) accnMap.get(RdfMCPD.INSTCODE);
if (!accessionInstCode.equals(instCode)) {
// If filtering, check INSTCODE is listed
if (instCodesFilter.size() > 0 && ! instCodesFilter.contains(accessionInstCode)) {
continue;
}
......@@ -429,27 +424,41 @@ public class PushDialog extends Dialog {
ObjectNode accnJson = RowConverter.toJson(accnMap);
accessionCount++;
instCodeBatch.add(accnJson);
if (instCodeBatch.size() >= BATCH_SIZE) {
doAsyncPush(operation, genesysClient, accessionInstCode, instCodeBatch);
instCodeBatch.clear();
List<ObjectNode> instCodeBatch = null;
synchronized (instCodeMap) {
instCodeBatch = instCodeMap.get(accessionInstCode);
if (instCodeBatch == null) {
instCodeMap.put(accessionInstCode, instCodeBatch = new ArrayList<ObjectNode>());
}
}
synchronized (instCodeBatch) {
instCodeBatch.add(accnJson);
if (instCodeBatch.size() >= BATCH_SIZE) {
doAsyncPush(operation, genesysClient, accessionInstCode, instCodeBatch);
}
}
} catch (GenesysJSONIncompleteException e) {
_log.info("Ignoring incomplete accession " + ArrayUtils.toString(row));
}
Thread.sleep(1);
}
Thread.sleep(1);
} while (true);
_log.info("Pushing remaining data for " + instCode);
if (instCodeBatch.size() > 0) {
doAsyncPush(operation, genesysClient, instCode, instCodeBatch);
_log.info("Pushing queued data");
for (final String instCode : instCodeMap.keySet()) {
List<ObjectNode> accns = instCodeMap.get(instCode);
if (accns.size() > 0) {
doAsyncPush(operation, genesysClient, instCode, accns);
}
}
_log.warn("Done processing upload jobs for " + accessionCount + " accessions for " + instCode);
_log.warn("Done processing upload jobs for " + accessionCount + " accessions");
} catch (GenesysJSONException e) {
_log.error("Genesys JSON conversion failed in batch" + rowCount);
......@@ -461,20 +470,20 @@ public class PushDialog extends Dialog {
e.printStackTrace();
} finally {
_log.info("Reader finished.");
// try {
// rowReader.close();
// } catch (IOException e) {
// _log.error("Failed to close rowReader: " + e.getMessage(), e);
// }
// try {
// rowReader.close();
// } catch (IOException e) {
// _log.error("Failed to close rowReader: " + e.getMessage(), e);
// }
}
}
});
futures.add(instFuture);
} catch (IOException | UnsupportedDataFormatException e) {
_log.error(e.getMessage(), e);
}
}
});
Thread.sleep(200);
}
futures.add(instFuture);
// Wait for uploads to finish
do {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment