Commit 97d90d1f authored by Matija Obreza's avatar Matija Obreza
Browse files

Fixed concurrency issues in Geo processors

parent efcdeaf6
......@@ -60,6 +60,8 @@ public interface ProcessService {
symbols.setGroupingSeparator(decimalMark == ',' ? '.' : ',');
decimalFormat.setDecimalFormatSymbols(symbols);
decimalFormat.setMinimumFractionDigits(8);
decimalFormat.setNegativePrefix("-");
decimalFormat.setNegativeSuffix("");
return decimalFormat;
}
}
......@@ -6,6 +6,7 @@ import java.text.DecimalFormat;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
......@@ -16,6 +17,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.ConcurrentUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.genesys.geotools.service.CountryOfOriginService;
import org.genesys.geotools.service.HeaderUtils;
......@@ -27,6 +29,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.google.common.collect.Lists;
import com.opencsv.CSVReader;
/**
......@@ -48,9 +51,7 @@ public class CountryProcessServiceImpl implements ProcessService {
private List<String[]> execute(Reader readerr, char separatorChar, char quoteChar, char escapeChar, Character decimalMark) throws IOException {
final DecimalFormat decimalFormat = createDecimalFormat(decimalMark);
final List<String[]> lines = new LinkedList<>();
final List<String[]> lines = Collections.synchronizedList(new LinkedList<>());
try (CSVReader reader = new CSVReader(readerr, separatorChar, quoteChar, escapeChar, 0, false)) {
......@@ -135,11 +136,18 @@ public class CountryProcessServiceImpl implements ProcessService {
break;
threadPool.submit(new Runnable() {
final DecimalFormat decimalFormat = createDecimalFormat(decimalMark);
@Override
public void run() {
for (String[] nextLine : inputs) {
String[] outputLine = HeaderUtils.toOutputLine(nextLine, outputHeaders, outputMapping);
if (outputLine == null) {
LOG.warn("Couldn't make outputline from {}", Arrays.toString(nextLine));
continue;
}
String result = null;
// if ("101346".equals(nextLine[0])) {
......@@ -154,8 +162,6 @@ public class CountryProcessServiceImpl implements ProcessService {
float longitude = decimalFormat.parse(declongitude).floatValue();
float latitude = decimalFormat.parse(declatitude).floatValue();
LOG.debug("Parsed DECLATITUDE/DECLONGITUDE ({}, {}) --> ({}, {})", declongitude, declatitude, longitude, latitude);
outputLine[outputHeaders.indexOf(ApplicationUtils.HEADER_DECLATITUDE_CHECK)] = decimalFormat.format(latitude);
outputLine[outputHeaders.indexOf(ApplicationUtils.HEADER_DECLONGITUDE_CHECK)] = decimalFormat.format(longitude);
try {
result = countryOfOriginService.getCountries(longitude, latitude, origCty, ApplicationUtils.ALLOWED_DISTANCE_MARGIN);
......@@ -163,9 +169,17 @@ public class CountryProcessServiceImpl implements ProcessService {
} catch (Exception e) {
result = e.getMessage();
}
try {
outputLine[outputHeaders.indexOf(ApplicationUtils.HEADER_DECLATITUDE_CHECK)] = decimalFormat.format(latitude);
outputLine[outputHeaders.indexOf(ApplicationUtils.HEADER_DECLONGITUDE_CHECK)] = decimalFormat.format(longitude);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
} catch (NumberFormatException e) {
result = "Unparseable lat/lon";
LOG.error("Unparsable lat/lon from ({}, {})", declongitude, declatitude, e);
result = "Unparseable lat/lon: " + e.getMessage();
} catch (Throwable e) {
LOG.error("Error parsing lat/lon: {}", e.getMessage());
result = "ERROR: " + e.getMessage();
}
}
......@@ -200,6 +214,9 @@ public class CountryProcessServiceImpl implements ProcessService {
long processingTimeSeconds = Math.max(1, stopWatch.getTime() / 1000);
LOG.info("FYI, {} entries have been processed in {}s = {}lines/ms = {}ms/line", lineCount, processingTimeSeconds, ((float) lineCount / stopWatch.getTime()), ((float) stopWatch.getTime() / lineCount));
}
// for (String[] l : lines) {
// System.err.println(Arrays.toString(l));
// }
return lines;
}
}
......@@ -6,6 +6,7 @@ import java.text.DecimalFormat;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
......@@ -52,9 +53,7 @@ public class LandOrSeaProcessServiceImpl implements ProcessService {
*/
public List<String[]> execute(Reader readerr, char separatorChar, char quoteChar, char escapeChar, char decimalMark) throws IOException {
final DecimalFormat decimalFormat = createDecimalFormat(decimalMark);
final List<String[]> lines = new LinkedList<>();
final List<String[]> lines = Collections.synchronizedList(new LinkedList<>());
try (CSVReader reader = new CSVReader(readerr, separatorChar, quoteChar, escapeChar, 0, false)) {
......@@ -136,12 +135,19 @@ public class LandOrSeaProcessServiceImpl implements ProcessService {
break;
threadPool.submit(new Runnable() {
final DecimalFormat decimalFormat = createDecimalFormat(decimalMark);
@Override
public void run() {
for (String[] nextLine : inputs) {
String[] outputLine = HeaderUtils.toOutputLine(nextLine, outputHeaders, outputMapping);
if (outputLine == null) {
LOG.warn("Couldn't make outputline from {}", Arrays.toString(nextLine));
continue;
}
String declongitude = StringUtils.trimToNull(nextLine[sourceMapping.get(ApplicationUtils.HEADER_LONGITUDE)]);
String declatitude = StringUtils.trimToNull(nextLine[sourceMapping.get(ApplicationUtils.HEADER_LATITUDE)]);
......@@ -152,12 +158,19 @@ public class LandOrSeaProcessServiceImpl implements ProcessService {
float longitude = decimalFormat.parse(declongitude).floatValue();
float latitude = decimalFormat.parse(declatitude).floatValue();
LOG.debug("Parsed DECLATITUDE/DECLONGITUDE ({}, {}) --> ({}, {})", declongitude, declatitude, longitude, latitude);
outputLine[outputHeaders.indexOf(ApplicationUtils.HEADER_DECLATITUDE_CHECK)] = decimalFormat.format(latitude);
outputLine[outputHeaders.indexOf(ApplicationUtils.HEADER_DECLONGITUDE_CHECK)] = decimalFormat.format(longitude);
result = landOrSeaService.classifyLocation(longitude, latitude, ApplicationUtils.ALLOWED_DISTANCE_MARGIN);
} catch (Exception e) {
LOG.debug("Error parsing lat/lon", e);
try {
outputLine[outputHeaders.indexOf(ApplicationUtils.HEADER_DECLATITUDE_CHECK)] = decimalFormat.format(latitude);
outputLine[outputHeaders.indexOf(ApplicationUtils.HEADER_DECLONGITUDE_CHECK)] = decimalFormat.format(longitude);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
} catch (NumberFormatException e) {
LOG.error("Unparsable lat/lon from ({}, {})", declongitude, declatitude);
result = "Unparseable lat/lon: " + e.getMessage();
} catch (Throwable e) {
LOG.error("Error parsing lat/lon: {}", e.getMessage());
if (e.getCause() != null)
result = "ERROR: " + e.getCause().getMessage();
else
......@@ -190,7 +203,9 @@ public class LandOrSeaProcessServiceImpl implements ProcessService {
long processingTimeSeconds = Math.max(1, stopWatch.getTime() / 1000);
LOG.info("FYI, {} entries have been processed in {}s = {}lines/ms = {}ms/line", lineCount, processingTimeSeconds, ((float) lineCount / stopWatch.getTime()), ((float) stopWatch.getTime() / lineCount));
}
for (String[] l : lines) {
System.err.println(Arrays.toString(l));
}
return lines;
}
}
......@@ -25,6 +25,7 @@ import java.io.UnsupportedEncodingException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
......@@ -126,7 +127,7 @@ public class TaxonomyProcessServiceImpl implements ProcessService {
private List<String[]> run(final Reader reader, final Character separator, final Character quoteChar, final Character escapeChar, final Boolean toCurrentTaxa,
final Character decimalMark) throws UnsupportedEncodingException, FileNotFoundException, IOException {
final List<String[]> lines = new LinkedList<>();
final List<String[]> lines = Collections.synchronizedList(new LinkedList<>());
taxonomyChecker.setTaxonomyDatabase(taxonomyDatabase);
LOG.info("Running the validation against {}", taxonomyDatabase);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment