Commit 64076cb4 authored by Matija Obreza's avatar Matija Obreza
Browse files

Improved uploader

- Automatically log push jobs still in progress
- log & report time for upsert
parent 24c7b8c4
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
<attribute name="maven.pomderived" value="true"/> <attribute name="maven.pomderived" value="true"/>
</attributes> </attributes>
</classpathentry> </classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes> <attributes>
<attribute name="maven.pomderived" value="true"/> <attribute name="maven.pomderived" value="true"/>
</attributes> </attributes>
...@@ -27,5 +27,10 @@ ...@@ -27,5 +27,10 @@
<attribute name="maven.pomderived" value="true"/> <attribute name="maven.pomderived" value="true"/>
</attributes> </attributes>
</classpathentry> </classpathentry>
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/> <classpathentry kind="output" path="target/classes"/>
</classpath> </classpath>
...@@ -15,8 +15,19 @@ ...@@ -15,8 +15,19 @@
<arguments> <arguments>
</arguments> </arguments>
</buildCommand> </buildCommand>
<buildCommand>
<name>org.springframework.ide.eclipse.core.springbuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.springframework.ide.eclipse.boot.validation.springbootbuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec> </buildSpec>
<natures> <natures>
<nature>org.springframework.ide.eclipse.core.springnature</nature>
<nature>org.eclipse.jdt.core.javanature</nature> <nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature> <nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures> </natures>
......
...@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; ...@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
...@@ -88,8 +89,8 @@ public class PushDialog extends Dialog { ...@@ -88,8 +89,8 @@ public class PushDialog extends Dialog {
@Autowired @Autowired
protected DataSourceLoader dataSourceLoader; protected DataSourceLoader dataSourceLoader;
BlockingQueue<Runnable> linkedBlockingDeque = new LinkedBlockingDeque<Runnable>(10); BlockingQueue<Runnable> linkedBlockingDeque = new LinkedBlockingDeque<Runnable>(36);
private ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, linkedBlockingDeque, new ThreadPoolExecutor.CallerRunsPolicy()); private ThreadPoolExecutor executorService = new ThreadPoolExecutor(4, 8, 30, TimeUnit.SECONDS, linkedBlockingDeque, new ThreadPoolExecutor.CallerRunsPolicy());
protected Object result; protected Object result;
protected Shell shell; protected Shell shell;
...@@ -132,7 +133,7 @@ public class PushDialog extends Dialog { ...@@ -132,7 +133,7 @@ public class PushDialog extends Dialog {
logAppender = new SwtLogAppender(this.txtJson); logAppender = new SwtLogAppender(this.txtJson);
shell.setTabList(new Control[] { txtJson, toolBar }); shell.setTabList(new Control[] { txtJson, toolBar });
logAppender.setConversionPattern("%d{yyyy-MM-dd HH:mm:ss} %-5p - %m%n"); logAppender.setConversionPattern("%d{yyyy-MM-dd HH:mm:ss} %t %-5p - %m%n");
shell.open(); shell.open();
shell.layout(); shell.layout();
...@@ -166,12 +167,14 @@ public class PushDialog extends Dialog { ...@@ -166,12 +167,14 @@ public class PushDialog extends Dialog {
for (int i = futures.size() - 1; i >= 0; i--) { for (int i = futures.size() - 1; i >= 0; i--) {
Future<?> future = futures.get(i); Future<?> future = futures.get(i);
if (future.isDone()) { if (future.isDone() || future.isCancelled()) {
futures.remove(future); futures.remove(future);
} else {
future.cancel(true);
} }
} }
if (futures.size() > 0) { if (futures.size() > 1) {
_log.warn("Jobs still running: " + futures.size()); _log.warn("Jobs still running: " + futures.size());
} }
...@@ -349,6 +352,7 @@ public class PushDialog extends Dialog { ...@@ -349,6 +352,7 @@ public class PushDialog extends Dialog {
@Override @Override
public void run() { public void run() {
int count = 0; int count = 0;
try { try {
do { do {
...@@ -359,7 +363,7 @@ public class PushDialog extends Dialog { ...@@ -359,7 +363,7 @@ public class PushDialog extends Dialog {
break; break;
} }
_log.info("Batch start at row " + count); _log.info("Reading source data starting at row=" + count);
for (Object[] row : rows) { for (Object[] row : rows) {
count++; count++;
_log.debug(count + ": " + ArrayUtils.toString(row)); _log.debug(count + ": " + ArrayUtils.toString(row));
...@@ -388,7 +392,7 @@ public class PushDialog extends Dialog { ...@@ -388,7 +392,7 @@ public class PushDialog extends Dialog {
} }
Thread.sleep(10); Thread.sleep(5);
} while (true); } while (true);
_log.info("Pushing queued data"); _log.info("Pushing queued data");
...@@ -420,6 +424,30 @@ public class PushDialog extends Dialog { ...@@ -420,6 +424,30 @@ public class PushDialog extends Dialog {
_log.error("Failed to close rowReader: " + e.getMessage(), e); _log.error("Failed to close rowReader: " + e.getMessage(), e);
} }
} }
// Wait for uploads to finish
do {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
for (int i = futures.size() - 1; i >= 0; i--) {
Future<?> future = futures.get(i);
if (future.isDone()) {
futures.remove(future);
}
}
if (futures.size() > 1) {
_log.warn("Waiting for uploads still running: " + futures.size());
}
// We're the last job
} while (futures.size() > 1);
_log.warn("PUSH FINISHED.");
_log.warn("Really.");
} }
}); });
...@@ -459,6 +487,12 @@ public class PushDialog extends Dialog { ...@@ -459,6 +487,12 @@ public class PushDialog extends Dialog {
_log.debug("Nothing to push"); _log.debug("Nothing to push");
return; return;
} }
try {
Thread.sleep(10);
} catch (InterruptedException e1) {
return;
}
Future<?> future = executorService.submit(new Runnable() { Future<?> future = executorService.submit(new Runnable() {
final ArrayList<ObjectNode> accns = new ArrayList<ObjectNode>(instCodeBatch); final ArrayList<ObjectNode> accns = new ArrayList<ObjectNode>(instCodeBatch);
...@@ -467,10 +501,13 @@ public class PushDialog extends Dialog { ...@@ -467,10 +501,13 @@ public class PushDialog extends Dialog {
@Override @Override
public void run() { public void run() {
try { try {
_log.info("Pushing data for instCode=" + instCode + " size=" + accns.size()); StopWatch stopWatch = new StopWatch();
if (_log.isDebugEnabled()) { stopWatch.start();
_log.debug("Pushing data for instCode=" + instCode + " size=" + accns.size());
if (_log.isTraceEnabled()) {
for (ObjectNode o : accns) { for (ObjectNode o : accns) {
_log.debug(o); _log.trace(o);
} }
} }
...@@ -480,8 +517,8 @@ public class PushDialog extends Dialog { ...@@ -480,8 +517,8 @@ public class PushDialog extends Dialog {
else if (op == GenesysOp.DELETE) else if (op == GenesysOp.DELETE)
serverResponse = genesysClient.deleteAccessionsByName(instCode, objectMapper.writeValueAsString(accns)); serverResponse = genesysClient.deleteAccessionsByName(instCode, objectMapper.writeValueAsString(accns));
if (_log.isDebugEnabled()) if (_log.isTraceEnabled())
_log.debug(serverResponse); _log.trace(serverResponse);
try { try {
JsonNode ri = objectMapper.readTree(serverResponse); JsonNode ri = objectMapper.readTree(serverResponse);
...@@ -491,12 +528,12 @@ public class PushDialog extends Dialog { ...@@ -491,12 +528,12 @@ public class PushDialog extends Dialog {
if (t.has("error") && !t.get("error").isNull()) { if (t.has("error") && !t.get("error").isNull()) {
_log.error(t.get("instCode") + " " + t.get("acceNumb") + ":" + t.get("error")); _log.error(t.get("instCode") + " " + t.get("acceNumb") + ":" + t.get("error"));
} else if (t.has("result") && !t.get("result").isNull()) { } else if (t.has("result") && !t.get("result").isNull()) {
_log.debug(t.get("instCode") + " " + t.get("acceNumb") + ":" + t.get("result")); _log.trace(t.get("instCode") + " " + t.get("acceNumb") + ":" + t.get("result"));
} }
} }
}); });
_log.info("Push done."); _log.info("Push done for instCode=" + instCode + " size=" + accns.size() + " in " + stopWatch.getTime() + "ms.");
} catch (IOException e) { } catch (IOException e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment