Commit 352fa577 authored by Matija Obreza's avatar Matija Obreza
Browse files

Fully parallel execution of CountryCLI

parent 532d14b2
......@@ -22,11 +22,10 @@ import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
......@@ -64,12 +63,8 @@ public class CountryCLI {
float longitude = Float.parseFloat(matcher.group(2).replace(",", "."));
float latitude = Float.parseFloat(matcher.group(3).replace(",", "."));
System.out.println(longitude
+ ", "
+ latitude
+ ", "
+ countryOfOriginService
.getCountries(longitude, latitude, origCty, ALLOWED_DISTANCE_MARGIN));
System.out.println(longitude + ", " + latitude + ", " + countryOfOriginService
.getCountries(longitude, latitude, origCty, ALLOWED_DISTANCE_MARGIN));
} else {
System.err.println("Invalid format: " + input);
......@@ -86,15 +81,21 @@ public class CountryCLI {
int columnOrigCty = 5;
CSVWriter writer = new CSVWriter(new OutputStreamWriter(System.out), ',', '"', '\\', "\n");
CSVReader reader = new CSVReader(new BufferedReader(new InputStreamReader(System.in), 10000), ',', '"', '\\',
0, false);
CSVReader reader = new CSVReader(new BufferedReader(new InputStreamReader(System.in), 10000), ',', '"', '\\', 0,
false);
CountryOfOriginServiceImpl countryOfOriginService = new CountryOfOriginServiceImpl();
countryOfOriginService.afterPropertiesSet();
// Thread pool with nThreads concurrent threads
int nThreads = 7;
ExecutorService threadPool = Executors.newFixedThreadPool(nThreads);
int nThreads = Math.max(Runtime.getRuntime().availableProcessors() - 1, 1);
System.err
.println("Using " + nThreads + " threads for " + Runtime.getRuntime().availableProcessors() + " CPUs.");
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(nThreads * 2);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 + nThreads / 2, nThreads, 5, TimeUnit.SECONDS,
workQueue, new ThreadPoolExecutor.CallerRunsPolicy());
StopWatch stopWatch = new StopWatch();
......@@ -108,7 +109,6 @@ public class CountryCLI {
for (int i = 0; i < nThreads; i++) {
inputLines[i] = new ArrayList<String[]>();
}
Future<?>[] futures = new Future<?>[nThreads];
do {
submitted = 0;
......@@ -116,7 +116,6 @@ public class CountryCLI {
// Reset
for (int i = 0; i < nThreads; i++) {
inputLines[i].clear();
futures[i] = null;
}
// Submit
......@@ -128,12 +127,11 @@ public class CountryCLI {
stopWatch.split();
long processingTimeSeconds = stopWatch.getSplitTime() / 1000;
if ((lineCount % 1000 == 0)
if ((lineCount % 1000 == 0 && processingTimeSeconds > 0)
|| (lastLogTime != processingTimeSeconds && processingTimeSeconds % 10 == 0)) {
System.err
.println("FYI, " + lineCount + " entries have been processed in "
+ processingTimeSeconds + " = " + (lineCount / processingTimeSeconds)
+ " lines/second");
System.err.println(
"FYI, " + lineCount + " entries have been processed in " + processingTimeSeconds + " = "
+ (lineCount / processingTimeSeconds) + " lines/second");
countryOfOriginService.printCache();
lastLogTime = processingTimeSeconds;
}
......@@ -153,58 +151,42 @@ public class CountryCLI {
// System.err.println("Submitted " + cpuId);
submitted++;
futures[i] = threadPool.submit(new Callable<String[]>() {
threadPool.submit(new Runnable() {
@Override
public String[] call() throws Exception {
String[] res = new String[inputs.size()];
public void run() {
for (int i = 0; i < inputs.size(); i++) {
String[] nextLine = inputs.get(i);
String result = null;
String[] writeLine = Arrays.copyOf(nextLine, nextLine.length + 1);
try {
String origCty = nextLine[columnOrigCty].trim();
float longitude = Float.parseFloat(nextLine[columnLongitude].trim());
float latitude = Float.parseFloat(nextLine[columnLatitude].trim());
try {
res[i] = countryOfOriginService.getCountries(longitude, latitude, origCty,
result = countryOfOriginService.getCountries(longitude, latitude, origCty,
ALLOWED_DISTANCE_MARGIN);
} catch (Exception e) {
res[i] = e.getMessage();
result = e.getMessage();
}
} catch (Throwable e) {
res[i] = "ERROR: " + e.getMessage();
result = "ERROR: " + e.getMessage();
}
writeLine[writeLine.length - 1] = result;
writer.writeNext(writeLine);
}
// System.err.println("Processed " + cpuId);
return res;
// System.err.println("Done " + cpuId);
}
});
}
// Retrieve results
for (int i = 0; i < nThreads; i++) {
// System.err.println("Getting results " + i);
if (futures[i] == null)
break;
final ArrayList<String[]> inputs = (ArrayList<String[]>) inputLines[i];
String[] result = null;
try {
result = (String[]) futures[i].get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// System.err.println("Results " + i);
for (int j = 0; j < inputs.size(); j++) {
String[] nextLine = inputs.get(j);
String[] writeLine = Arrays.copyOf(nextLine, nextLine.length + 1);
writeLine[writeLine.length - 1] = result[j];
writer.writeNext(writeLine);
}
}
// System.err.println("Submitted " + submitted);
} while (submitted > 0);
System.err.println("Shutting down");
threadPool.shutdown();
stopWatch.stop();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment