Commit 36b9d3d2 authored by Matija Obreza's avatar Matija Obreza
Browse files

Some code found here

parent 352fa577
Pipeline #36 skipped
......@@ -16,6 +16,7 @@
package org.genesys.geotools;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
......@@ -63,8 +64,12 @@ public class CountryCLI {
float longitude = Float.parseFloat(matcher.group(2).replace(",", "."));
float latitude = Float.parseFloat(matcher.group(3).replace(",", "."));
System.out.println(longitude + ", " + latitude + ", " + countryOfOriginService
.getCountries(longitude, latitude, origCty, ALLOWED_DISTANCE_MARGIN));
System.out.println(longitude
+ ", "
+ latitude
+ ", "
+ countryOfOriginService
.getCountries(longitude, latitude, origCty, ALLOWED_DISTANCE_MARGIN));
} else {
System.err.println("Invalid format: " + input);
......@@ -80,17 +85,17 @@ public class CountryCLI {
int columnLatitude = 4;
int columnOrigCty = 5;
CSVWriter writer = new CSVWriter(new OutputStreamWriter(System.out), ',', '"', '\\', "\n");
CSVReader reader = new CSVReader(new BufferedReader(new InputStreamReader(System.in), 10000), ',', '"', '\\', 0,
false);
CSVWriter writer = new CSVWriter(new OutputStreamWriter(new BufferedOutputStream(System.out, 10000)), ',', '"', '\\', "\n");
CSVReader reader = new CSVReader(new BufferedReader(new InputStreamReader(System.in), 10000), ',', '"', '\\',
0, false);
CountryOfOriginServiceImpl countryOfOriginService = new CountryOfOriginServiceImpl();
countryOfOriginService.afterPropertiesSet();
// Thread pool with nThreads concurrent threads
int nThreads = Math.max(Runtime.getRuntime().availableProcessors() - 1, 1);
System.err
.println("Using " + nThreads + " threads for " + Runtime.getRuntime().availableProcessors() + " CPUs.");
System.err.println("Using " + nThreads + " threads for " + Runtime.getRuntime().availableProcessors()
+ " CPUs.");
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(nThreads * 2);
......@@ -100,97 +105,101 @@ public class CountryCLI {
StopWatch stopWatch = new StopWatch();
int lineCount = 0;
int submitted = 0;
long lastLogTime = 0;
stopWatch.start();
ArrayList<?>[] inputLines = new ArrayList[nThreads];
for (int i = 0; i < nThreads; i++) {
inputLines[i] = new ArrayList<String[]>();
}
do {
submitted = 0;
// Reset
for (int i = 0; i < nThreads; i++) {
inputLines[i].clear();
}
// Submit
for (int i = 0; i < nThreads; i++) {
final ArrayList<String[]> inputs = (ArrayList<String[]>) inputLines[i];
for (int j = 0; j < 100; j++) {
String[] nextLine = reader.readNext();
lineCount++;
stopWatch.split();
long processingTimeSeconds = stopWatch.getSplitTime() / 1000;
if ((lineCount % 1000 == 0 && processingTimeSeconds > 0)
|| (lastLogTime != processingTimeSeconds && processingTimeSeconds % 10 == 0)) {
System.err.println(
"FYI, " + lineCount + " entries have been processed in " + processingTimeSeconds + " = "
+ (lineCount / processingTimeSeconds) + " lines/second");
countryOfOriginService.printCache();
lastLogTime = processingTimeSeconds;
}
if (nextLine == null) {
System.err.println("No input");
break;
}
inputs.add(nextLine);
final ArrayList<String[]> inputs = new ArrayList<String[]>();
for (int j = 0; j < 100; j++) {
String[] nextLine = reader.readNext();
lineCount++;
stopWatch.split();
long processingTimeSeconds = stopWatch.getSplitTime() / 1000;
if ((lineCount % 1000 == 0 && processingTimeSeconds > 0)
|| (lastLogTime != processingTimeSeconds && processingTimeSeconds % 10 == 0)) {
System.err.println("FYI, " + lineCount + " entries have been processed in " + processingTimeSeconds
+ " = " + (lineCount / processingTimeSeconds) + " lines/second");
countryOfOriginService.printCache();
lastLogTime = processingTimeSeconds;
}
if (inputs.size() == 0)
if (nextLine == null) {
System.err.println("No input");
break;
}
if ("101346".equals(nextLine[0])) {
System.err.println("Got what we're looking for");
}
inputs.add(nextLine);
}
// final int cpuId = i;
// System.err.println("Submitted " + cpuId);
if (inputs.size() == 0)
break;
submitted++;
// final int cpuId = i;
// System.err.println("Submitted " + cpuId);
threadPool.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < inputs.size(); i++) {
String[] nextLine = inputs.get(i);
String result = null;
String[] writeLine = Arrays.copyOf(nextLine, nextLine.length + 1);
threadPool.submit(new Runnable() {
@Override
public void run() {
for (String[] nextLine : inputs) {
String result = null;
String[] writeLine = Arrays.copyOf(nextLine, nextLine.length + 1);
if ("101346".equals(nextLine[0])) {
System.err.println("!!! Got what we're looking for");
}
try {
String origCty = nextLine[columnOrigCty].trim();
float longitude = Float.parseFloat(nextLine[columnLongitude].trim());
float latitude = Float.parseFloat(nextLine[columnLatitude].trim());
try {
String origCty = nextLine[columnOrigCty].trim();
float longitude = Float.parseFloat(nextLine[columnLongitude].trim());
float latitude = Float.parseFloat(nextLine[columnLatitude].trim());
try {
result = countryOfOriginService.getCountries(longitude, latitude, origCty,
ALLOWED_DISTANCE_MARGIN);
} catch (Exception e) {
result = e.getMessage();
}
} catch (Throwable e) {
result = "ERROR: " + e.getMessage();
}
result = countryOfOriginService.getCountries(longitude, latitude, origCty,
ALLOWED_DISTANCE_MARGIN);
writeLine[writeLine.length - 1] = result;
writer.writeNext(writeLine);
} catch (Exception e) {
result = e.getMessage();
}
} catch (Throwable e) {
result = "ERROR: " + e.getMessage();
}
writeLine[writeLine.length - 1] = result;
writer.writeNext(writeLine);
if ("101346".equals(nextLine[0])) {
System.err.println("Written what we're looking for");
}
// System.err.println("Done " + cpuId);
}
});
}
// System.err.println("Done " + cpuId);
}
});
} while (submitted > 0);
} while (true);
System.err.println("Shutting down");
threadPool.shutdown();
try {
if (! threadPool.awaitTermination(10, TimeUnit.MINUTES)) {
System.err.println("Tasks in queue did not finish in specified time. Increase timeout!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
threadPool.shutdownNow();
System.err.println("Shut down.");
writer.flush();
stopWatch.stop();
long processingTimeSeconds = stopWatch.getTime() / 1000;
System.err.println("FYI, " + lineCount + " entries have been processed in " + processingTimeSeconds
+ " = " + (lineCount / processingTimeSeconds) + " lines/second");
countryOfOriginService.printCache();
writer.close();
reader.close();
}
......
......@@ -75,7 +75,7 @@ public class CountryOfOriginServiceImpl implements CountryOfOriginService {
sourceAdmin0 = dataStoreAdm0.getFeatureSource(dataStoreAdm0.getTypeNames()[0]);
sourceAdmin0X = dataStoreAdm0X.getFeatureSource(dataStoreAdm0X.getTypeNames()[0]);
countryCache = CacheBuilder.newBuilder().maximumSize(1000).recordStats().expireAfterWrite(20, TimeUnit.SECONDS)
countryCache = CacheBuilder.newBuilder().maximumSize(5000).recordStats().expireAfterWrite(20, TimeUnit.SECONDS)
.build(new CacheLoader<LonLatCacheKey, String>() {
@Override
public String load(LonLatCacheKey key) throws Exception {
......@@ -100,8 +100,8 @@ public class CountryOfOriginServiceImpl implements CountryOfOriginService {
// 111319.488m for 1 degree, can trim by 1000 for 100m precision at
// equator
longitude = (long) (longitude * 1000) / 1000f;
latitude = (long) (latitude * 1000) / 1000f;
// longitude = (long) (longitude * 1000) / 1000f;
// latitude = (long) (latitude * 1000) / 1000f;
try {
return countryCache.get(new LonLatCacheKey(longitude, latitude, origCty, allowedDistanceMargin));
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment