Commit c1074f2c authored by Matija Obreza's avatar Matija Obreza
Browse files

Multi-threaded execution

parent 952e301c
......@@ -21,6 +21,11 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
......@@ -85,48 +90,89 @@ public class CountryCLI {
CountryOfOriginServiceImpl countryOfOriginService = new CountryOfOriginServiceImpl();
countryOfOriginService.afterPropertiesSet();
// Thread pool with nThreads concurrent threads
int nThreads = 4;
ExecutorService threadPool = Executors.newFixedThreadPool(nThreads);
StopWatch stopWatch = new StopWatch();
String[] nextLine;
int lineCount = 0;
int submitted = 0;
long lastLogTime = 0;
stopWatch.start();
while ((nextLine = reader.readNext()) != null) {
lineCount++;
stopWatch.split();
long processingTimeSeconds = stopWatch.getSplitTime() / 1000;
if ((lineCount % 1000 == 0) || (lastLogTime != processingTimeSeconds && processingTimeSeconds % 10 == 0)) {
System.err.println("FYI, " + lineCount + " entries have been processed in " + processingTimeSeconds
+ " = " + (lineCount / processingTimeSeconds) + " lines/second");
countryOfOriginService.printCache();
lastLogTime = processingTimeSeconds;
}
String[] writeLine = Arrays.copyOf(nextLine, nextLine.length + 1);
String[][] inputLines = new String[nThreads][];
Future<?>[] futures = new Future<?>[nThreads];
try {
String origCty = nextLine[columnOrigCty].trim();
float longitude = Float.parseFloat(nextLine[columnLongitude].trim());
float latitude = Float.parseFloat(nextLine[columnLatitude].trim());
do {
// Reset
for (int i = nThreads - 1; i >= 0; i--) {
inputLines[i] = null;
futures[i] = null;
submitted = 0;
}
try {
String geoCountry = countryOfOriginService.getCountries(longitude, latitude, origCty,
ALLOWED_DISTANCE_MARGIN);
writeLine[writeLine.length - 1] = geoCountry;
// Submit
for (int i = nThreads - 1; i >= 0; i--) {
String[] nextLine = reader.readNext();
lineCount++;
stopWatch.split();
long processingTimeSeconds = stopWatch.getSplitTime() / 1000;
if ((lineCount % 1000 == 0)
|| (lastLogTime != processingTimeSeconds && processingTimeSeconds % 10 == 0)) {
System.err.println("FYI, " + lineCount + " entries have been processed in " + processingTimeSeconds
+ " = " + (lineCount / processingTimeSeconds) + " lines/second");
countryOfOriginService.printCache();
lastLogTime = processingTimeSeconds;
}
// TODO Use origCty?
if (nextLine == null)
break;
} catch (Exception e) {
writeLine[writeLine.length - 1] = e.getMessage();
inputLines[i] = nextLine;
submitted++;
futures[i] = threadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
try {
String origCty = nextLine[columnOrigCty].trim();
float longitude = Float.parseFloat(nextLine[columnLongitude].trim());
float latitude = Float.parseFloat(nextLine[columnLatitude].trim());
try {
return countryOfOriginService.getCountries(longitude, latitude, origCty,
ALLOWED_DISTANCE_MARGIN);
} catch (Exception e) {
return e.getMessage();
}
} catch (Throwable e) {
return "ERROR: " + e.getMessage();
}
}
});
}
// Retrieve results
for (int i = nThreads - 1; i >= 0; i--) {
if (futures[i] == null)
break;
String result = null;
try {
result = (String) futures[i].get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
result = e.getMessage();
}
} catch (Throwable e) {
writeLine[writeLine.length - 1] = "ERROR: " + e.getMessage();
String[] writeLine = Arrays.copyOf(inputLines[i], inputLines[i].length + 1);
writeLine[writeLine.length - 1] = result;
writer.writeNext(writeLine);
}
} while (submitted > 0);
writer.writeNext(writeLine);
}
stopWatch.stop();
writer.close();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment