Commit e59edf9a authored by Matija Obreza's avatar Matija Obreza
Browse files

Improved CLI tools (fixed #1)

parent dacd560c
......@@ -39,15 +39,14 @@
<jdk.source>1.8</jdk.source>
<!-- use the latest snapshot -->
<geotools.version>15-SNAPSHOT</geotools.version>
<jar.mainclass>org.genesys.geotools.LandOrSeaCLI</jar.mainclass>
<jar.mainclass>org.genesys.geotools.CLI</jar.mainclass>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
......@@ -79,7 +78,75 @@
<artifactId>opencsv</artifactId>
<version>3.6</version>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
<id>normal</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
</profile>
<profile>
<id>release</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.3</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<build>
<plugins>
<plugin>
......@@ -99,7 +166,7 @@
<source>${jdk.source}</source>
<target>${jdk.target}</target>
<optimize>true</optimize>
<showDeprecation>${show.deprecations}</showDeprecation>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
......@@ -109,7 +176,7 @@
<version>2.19.1</version>
<configuration>
<forkMode>once</forkMode>
<argLine>-Xms512m -Xmx1024m -Djava.awt.headless=true</argLine>
<argLine>-Djava.awt.headless=true</argLine>
<testFailureIgnore>false</testFailureIgnore>
</configuration>
</plugin>
......
/*
* Copyright 2016 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys.geotools;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is the default entry point for the -jar execution
*/
public class CLI {
private final static Logger LOG = LoggerFactory.getLogger(CLI.class);
private static void printHelp() {
System.out.println("Usage: java -jar geo-tools.jar country|landorsea [options] <inputFile> <outputFile>");
System.out.println("\nOptions:");
System.out.println("<inputFile> File name or - to read CSV from STDIN");
System.out.println("<outputFile> File name or - to write CSV to STDOUT");
System.out.println("-v Increase log level.");
System.out.println("-v Increase log level.");
System.out.println("-csv ',\"\\' Set input CSV separator, quote and escape chars");
System.out.println("\nThe program writes log messages to STDERR.");
}
public static void main(String[] args) {
String sourceName = null;
String outputName = null;
InputStream inputStream = null;
OutputStream outputStream = null;
if (args.length < 1) {
printHelp();
System.exit(-1);
}
GeoTool tool = null;
if ("country".equals(args[0])) {
tool = new CountryCLI();
} else if ("landorsea".equals(args[0])) {
tool = new LandOrSeaCLI();
} else {
printHelp();
System.exit(-1);
}
char separatorChar = ',';
char quoteChar = 0;
char escapeChar = 0;
for (int i = 1; i < args.length; i++) {
String arg = args[i];
if ("-v".equals(arg)) {
// Increase logging level
org.apache.log4j.Logger logger = LogManager.getLogger("org.genesys");
logger.setLevel(nextLevel(logger.getLevel()));
LOG.info("Increasing loglevel to {}", logger.getLevel());
} else if ("-csv".equals(arg)) {
char[] csv = args[++i].toCharArray();
LOG.info("Setting CSV chars to {}", csv);
for (int j = 0; j < csv.length; j++) {
if (j == 0)
separatorChar = csv[0];
if (j == 1)
quoteChar = csv[1];
if (j == 2)
escapeChar = csv[2];
}
} else {
// Files
if (sourceName == null) {
sourceName = arg;
} else if (outputName == null) {
outputName = arg;
} else {
// Too many arguments.
LOG.error("Too many arguments.");
System.exit(-1);
}
}
}
if (sourceName == null || outputName == null) {
LOG.error("Too few arguments.\n");
printHelp();
System.exit(-1);
}
try {
if ("-".equals(sourceName)) {
inputStream = System.in;
} else {
inputStream = new FileInputStream(sourceName);
}
if ("-".equals(outputName)) {
outputStream = System.out;
} else {
File outputFile = new File(outputName);
if (outputFile.exists()) {
LOG.error("Output file exists. Refusing to overwrite.");
System.exit(-1);
}
outputStream = new FileOutputStream(outputFile);
}
if (inputStream == null || outputStream == null) {
LOG.error("Too few arguments.\n");
printHelp();
System.exit(-1);
}
tool.execute(inputStream, outputStream, separatorChar, quoteChar, escapeChar);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}
private static Level nextLevel(Level level) {
if (level.equals(Level.FATAL)) {
return Level.ERROR;
} else if (level.equals(Level.ERROR)) {
return Level.WARN;
} else if (level.equals(Level.WARN)) {
return Level.INFO;
} else if (level.equals(Level.INFO)) {
return Level.DEBUG;
} else if (level.equals(Level.DEBUG)) {
return Level.TRACE;
} else
return Level.ALL;
}
}
......@@ -16,13 +16,17 @@
package org.genesys.geotools;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
......@@ -30,15 +34,27 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.genesys.geotools.service.HeaderUtils;
import org.genesys.geotools.service.impl.CountryOfOriginServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.opencsv.CSVReader;
import com.opencsv.CSVWriter;
public class CountryCLI {
public class CountryCLI implements GeoTool {
private final static Logger LOG = LoggerFactory.getLogger(CountryCLI.class);
// 2000 meters
private static final int ALLOWED_DISTANCE_MARGIN = 2000;
private static final String HEADER_LONGITUDE = "LONGITUDE";
private static final String HEADER_LATITUDE = "LATITUDE";
private static final String HEADER_ORIGCTY = "ORIGCTY";
private static final String HEADER_ORIGCTY_CHECK = "ORIGCTY_check";
public static void main(String arg[]) throws Exception {
if (arg.length == 0) {
CountryOfOriginServiceImpl countryOfOriginService = new CountryOfOriginServiceImpl();
......@@ -48,8 +64,8 @@ public class CountryCLI {
Pattern pattern = Pattern.compile("^(\\w{3})[\\s,]+(\\-?\\d*\\.?\\d*)[\\s,]+(\\-?\\d*\\.?\\d*)$");
System.err.println("Expects input rows in format: Longitude\tLatitude");
System.err.println("Enter 'q' to quit.");
LOG.info("Expects input rows in format: Longitude\tLatitude");
LOG.info("Enter 'q' to quit.");
System.out.println("OrigCty\tLongitude\tLatitude\tResult");
String input = null;
do {
......@@ -64,143 +80,152 @@ public class CountryCLI {
float longitude = Float.parseFloat(matcher.group(2).replace(",", "."));
float latitude = Float.parseFloat(matcher.group(3).replace(",", "."));
System.out.println(longitude
+ ", "
+ latitude
+ ", "
+ countryOfOriginService
.getCountries(longitude, latitude, origCty, ALLOWED_DISTANCE_MARGIN));
System.out.println(longitude + ", " + latitude + ", " + countryOfOriginService.getCountries(longitude, latitude, origCty, ALLOWED_DISTANCE_MARGIN));
} else {
System.err.println("Invalid format: " + input);
LOG.info("Invalid format: " + input);
}
} while (input != null && !"q".equals(input));
} else {
doCSV(arg);
new CountryCLI().execute(System.in, System.out, ',', '"', (char) 0);
}
}
private static void doCSV(String[] arg) throws IOException {
int columnLongitude = 3;
int columnLatitude = 4;
int columnOrigCty = 5;
public void execute(InputStream input, OutputStream output, char separatorChar, char quoteChar, char escapeChar) throws IOException {
CSVWriter writer = new CSVWriter(new OutputStreamWriter(new BufferedOutputStream(System.out, 10000)), ',', '"', '\\', "\n");
CSVReader reader = new CSVReader(new BufferedReader(new InputStreamReader(System.in), 10000), ',', '"', '\\',
0, false);
try (CSVReader reader = new CSVReader(new BufferedReader(new InputStreamReader(input), 10000), separatorChar, quoteChar, escapeChar, 0, false)) {
CountryOfOriginServiceImpl countryOfOriginService = new CountryOfOriginServiceImpl();
countryOfOriginService.afterPropertiesSet();
// Scan for headers
String[] headers = reader.readNext();
LOG.debug("Input CSV headers: {}", Arrays.toString(headers));
// Thread pool with nThreads concurrent threads
int nThreads = Math.max(Runtime.getRuntime().availableProcessors() - 1, 1);
System.err.println("Using " + nThreads + " threads for " + Runtime.getRuntime().availableProcessors()
+ " CPUs.");
HeaderUtils.throwIfHeaderFound(headers, new String[] { HEADER_ORIGCTY_CHECK });
HeaderUtils.throwIfHeaderNotFound(headers, new String[] { HEADER_LONGITUDE, HEADER_LATITUDE });
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(nThreads * 2);
List<String> outputHeaders = new ArrayList<>(Arrays.asList(headers));
List<Integer> outputMapping = HeaderUtils.mapHeaderPositions(headers);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 + nThreads / 2, nThreads, 5, TimeUnit.SECONDS,
workQueue, new ThreadPoolExecutor.CallerRunsPolicy());
LOG.debug("Source mapping indexes: {}", outputMapping);
LOG.debug("Initial output headers: {}", outputHeaders);
StopWatch stopWatch = new StopWatch();
Map<String, Integer> sourceMapping = HeaderUtils.makeSourceMapping(headers, new String[] { HEADER_LONGITUDE, HEADER_LATITUDE, HEADER_ORIGCTY });
outputHeaders.add(Math.max(outputHeaders.indexOf(HEADER_ORIGCTY), outputHeaders.indexOf(HEADER_LATITUDE)) + 1, HEADER_ORIGCTY_CHECK);
int lineCount = 0;
LOG.info("Output headers: {}", outputHeaders);
LOG.info("Source mapping indexes: {}", outputMapping);
long lastLogTime = 0;
stopWatch.start();
try (CSVWriter writer = new CSVWriter(new BufferedWriter(new OutputStreamWriter(output)), ',', '"', '\\', "\r\n")) {
do {
final ArrayList<String[]> inputs = new ArrayList<String[]>();
writer.writeNext(outputHeaders.toArray(ArrayUtils.EMPTY_STRING_ARRAY));
for (int j = 0; j < 100; j++) {
String[] nextLine = reader.readNext();
lineCount++;
stopWatch.split();
long processingTimeSeconds = stopWatch.getSplitTime() / 1000;
if ((lineCount % 1000 == 0 && processingTimeSeconds > 0)
|| (lastLogTime != processingTimeSeconds && processingTimeSeconds % 10 == 0)) {
System.err.println("FYI, " + lineCount + " entries have been processed in " + processingTimeSeconds
+ " = " + (lineCount / processingTimeSeconds) + " lines/second");
countryOfOriginService.printCache();
lastLogTime = processingTimeSeconds;
}
CountryOfOriginServiceImpl countryOfOriginService = new CountryOfOriginServiceImpl();
countryOfOriginService.afterPropertiesSet();
if (nextLine == null) {
System.err.println("No input");
break;
}
if ("101346".equals(nextLine[0])) {
System.err.println("Got what we're looking for");
}
inputs.add(nextLine);
}
// Thread pool with nThreads concurrent threads
int nThreads = Math.max(Runtime.getRuntime().availableProcessors() - 1, 1);
LOG.info("Using " + nThreads + " threads for " + Runtime.getRuntime().availableProcessors() + " CPUs.");
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(nThreads * 2);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 + nThreads / 2, nThreads, 5, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.CallerRunsPolicy());
StopWatch stopWatch = new StopWatch();
int lineCount = 0;
long lastLogTime = 0;
stopWatch.start();
if (inputs.size() == 0)
break;
do {
final ArrayList<String[]> inputs = new ArrayList<String[]>();
// final int cpuId = i;
// System.err.println("Submitted " + cpuId);
for (int j = 0; j < 100; j++) {
String[] nextLine = reader.readNext();
lineCount++;
threadPool.submit(new Runnable() {
@Override
public void run() {
for (String[] nextLine : inputs) {
String result = null;
String[] writeLine = Arrays.copyOf(nextLine, nextLine.length + 1);
if ("101346".equals(nextLine[0])) {
System.err.println("!!! Got what we're looking for");
stopWatch.split();
long processingTimeSeconds = stopWatch.getSplitTime() / 1000;
if ((lineCount % 1000 == 0 && processingTimeSeconds > 0) || (lastLogTime != processingTimeSeconds && processingTimeSeconds % 10 == 0)) {
LOG.info("FYI, " + lineCount + " entries have been processed in " + processingTimeSeconds + " = " + (lineCount / processingTimeSeconds) + " lines/second");
countryOfOriginService.printCache();
lastLogTime = processingTimeSeconds;
}
try {
String origCty = nextLine[columnOrigCty].trim();
float longitude = Float.parseFloat(nextLine[columnLongitude].trim());
float latitude = Float.parseFloat(nextLine[columnLatitude].trim());
try {
result = countryOfOriginService.getCountries(longitude, latitude, origCty,
ALLOWED_DISTANCE_MARGIN);
if (nextLine == null) {
LOG.info("No input");
break;
}
// if ("101346".equals(nextLine[0])) {
// LOG.info("Got what we're looking for");
// }
inputs.add(nextLine);
}
if (inputs.size() == 0)
break;
// final int cpuId = i;
// LOG.info("Submitted " + cpuId);
threadPool.submit(new Runnable() {
@Override
public void run() {
for (String[] nextLine : inputs) {
String[] outputLine = HeaderUtils.toOutputLine(nextLine, outputHeaders, outputMapping);
String result = null;
// if ("101346".equals(nextLine[0])) {
// LOG.info("!!! Got what we're looking for");
// }
try {
String origCty = nextLine[sourceMapping.get(HEADER_ORIGCTY)].trim();
float longitude = Float.parseFloat(nextLine[sourceMapping.get(HEADER_LONGITUDE)].trim());
float latitude = Float.parseFloat(nextLine[sourceMapping.get(HEADER_LATITUDE)].trim());
try {
result = countryOfOriginService.getCountries(longitude, latitude, origCty, ALLOWED_DISTANCE_MARGIN);
} catch (Exception e) {
result = e.getMessage();
}
} catch (Throwable e) {
result = "ERROR: " + e.getMessage();
}
outputLine[outputHeaders.indexOf(HEADER_ORIGCTY_CHECK)] = result;
writer.writeNext(outputLine);
// if ("101346".equals(nextLine[0])) {
// LOG.info("Written what we're looking for");
// }
} catch (Exception e) {
result = e.getMessage();
}
} catch (Throwable e) {
result = "ERROR: " + e.getMessage();
// LOG.info("Done " + cpuId);
}
});
writeLine[writeLine.length - 1] = result;
writer.writeNext(writeLine);
if ("101346".equals(nextLine[0])) {
System.err.println("Written what we're looking for");
}
} while (true);
LOG.info("Shutting down");
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(10, TimeUnit.MINUTES)) {
LOG.info("Tasks in queue did not finish in specified time. Increase timeout!");
}
// System.err.println("Done " + cpuId);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
threadPool.shutdownNow();
LOG.info("Shut down.");
} while (true);
System.err.println("Shutting down");
stopWatch.stop();
long processingTimeSeconds = stopWatch.getTime() / 1000;
LOG.info("FYI, " + lineCount + " entries have been processed in " + processingTimeSeconds + " = " + (lineCount / processingTimeSeconds) + " lines/second");
countryOfOriginService.printCache();
threadPool.shutdown();
try {
if (! threadPool.awaitTermination(10, TimeUnit.MINUTES)) {
System.err.println("Tasks in queue did not finish in specified time. Increase timeout!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
threadPool.shutdownNow();
System.err.println("Shut down.");
writer.flush();
stopWatch.stop();
long processingTimeSeconds = stopWatch.getTime() / 1000;
System.err.println("FYI, " + lineCount + " entries have been processed in " + processingTimeSeconds
+ " = " + (lineCount / processingTimeSeconds) + " lines/second");
countryOfOriginService.printCache();
writer.close();
reader.close();
}
}
/*
* Copyright 2016 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language gov