Commit ff99ef90 authored by Matija Obreza's avatar Matija Obreza
Browse files

Buffered input reader and larger batch size

parent c1074f2c
...@@ -20,6 +20,7 @@ import java.io.BufferedReader; ...@@ -20,6 +20,7 @@ import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
...@@ -85,13 +86,14 @@ public class CountryCLI { ...@@ -85,13 +86,14 @@ public class CountryCLI {
int columnOrigCty = 5; int columnOrigCty = 5;
CSVWriter writer = new CSVWriter(new OutputStreamWriter(System.out), ',', '"', '\\', "\n"); CSVWriter writer = new CSVWriter(new OutputStreamWriter(System.out), ',', '"', '\\', "\n");
CSVReader reader = new CSVReader(new InputStreamReader(System.in), ',', '"', '\\', 0, false); CSVReader reader = new CSVReader(new BufferedReader(new InputStreamReader(System.in), 10000), ',', '"', '\\',
0, false);
CountryOfOriginServiceImpl countryOfOriginService = new CountryOfOriginServiceImpl(); CountryOfOriginServiceImpl countryOfOriginService = new CountryOfOriginServiceImpl();
countryOfOriginService.afterPropertiesSet(); countryOfOriginService.afterPropertiesSet();
// Thread pool with nThreads concurrent threads // Thread pool with nThreads concurrent threads
int nThreads = 4; int nThreads = 7;
ExecutorService threadPool = Executors.newFixedThreadPool(nThreads); ExecutorService threadPool = Executors.newFixedThreadPool(nThreads);
StopWatch stopWatch = new StopWatch(); StopWatch stopWatch = new StopWatch();
...@@ -102,19 +104,25 @@ public class CountryCLI { ...@@ -102,19 +104,25 @@ public class CountryCLI {
long lastLogTime = 0; long lastLogTime = 0;
stopWatch.start(); stopWatch.start();
String[][] inputLines = new String[nThreads][]; ArrayList<?>[] inputLines = new ArrayList[nThreads];
for (int i = 0; i < nThreads; i++) {
inputLines[i] = new ArrayList<String[]>();
}
Future<?>[] futures = new Future<?>[nThreads]; Future<?>[] futures = new Future<?>[nThreads];
do { do {
submitted = 0;
// Reset // Reset
for (int i = nThreads - 1; i >= 0; i--) { for (int i = 0; i < nThreads; i++) {
inputLines[i] = null; inputLines[i].clear();
futures[i] = null; futures[i] = null;
submitted = 0;
} }
// Submit // Submit
for (int i = nThreads - 1; i >= 0; i--) { for (int i = 0; i < nThreads; i++) {
final ArrayList<String[]> inputs = (ArrayList<String[]>) inputLines[i];
for (int j = 0; j < 100; j++) {
String[] nextLine = reader.readNext(); String[] nextLine = reader.readNext();
lineCount++; lineCount++;
...@@ -122,57 +130,83 @@ public class CountryCLI { ...@@ -122,57 +130,83 @@ public class CountryCLI {
long processingTimeSeconds = stopWatch.getSplitTime() / 1000; long processingTimeSeconds = stopWatch.getSplitTime() / 1000;
if ((lineCount % 1000 == 0) if ((lineCount % 1000 == 0)
|| (lastLogTime != processingTimeSeconds && processingTimeSeconds % 10 == 0)) { || (lastLogTime != processingTimeSeconds && processingTimeSeconds % 10 == 0)) {
System.err.println("FYI, " + lineCount + " entries have been processed in " + processingTimeSeconds System.err
+ " = " + (lineCount / processingTimeSeconds) + " lines/second"); .println("FYI, " + lineCount + " entries have been processed in "
+ processingTimeSeconds + " = " + (lineCount / processingTimeSeconds)
+ " lines/second");
countryOfOriginService.printCache(); countryOfOriginService.printCache();
lastLogTime = processingTimeSeconds; lastLogTime = processingTimeSeconds;
} }
if (nextLine == null) if (nextLine == null) {
System.err.println("No input");
break; break;
}
inputLines[i] = nextLine; inputs.add(nextLine);
submitted++; }
futures[i] = threadPool.submit(new Callable<String>() { if (inputs.size() == 0)
break;
// final int cpuId = i;
// System.err.println("Submitted " + cpuId);
submitted++;
futures[i] = threadPool.submit(new Callable<String[]>() {
@Override @Override
public String call() throws Exception { public String[] call() throws Exception {
String[] res = new String[inputs.size()];
for (int i = 0; i < inputs.size(); i++) {
String[] nextLine = inputs.get(i);
try { try {
String origCty = nextLine[columnOrigCty].trim(); String origCty = nextLine[columnOrigCty].trim();
float longitude = Float.parseFloat(nextLine[columnLongitude].trim()); float longitude = Float.parseFloat(nextLine[columnLongitude].trim());
float latitude = Float.parseFloat(nextLine[columnLatitude].trim()); float latitude = Float.parseFloat(nextLine[columnLatitude].trim());
try { try {
return countryOfOriginService.getCountries(longitude, latitude, origCty, res[i] = countryOfOriginService.getCountries(longitude, latitude, origCty,
ALLOWED_DISTANCE_MARGIN); ALLOWED_DISTANCE_MARGIN);
} catch (Exception e) { } catch (Exception e) {
return e.getMessage(); res[i] = e.getMessage();
} }
} catch (Throwable e) { } catch (Throwable e) {
return "ERROR: " + e.getMessage(); res[i] = "ERROR: " + e.getMessage();
}
} }
// System.err.println("Processed " + cpuId);
return res;
} }
}); });
} }
// Retrieve results // Retrieve results
for (int i = nThreads - 1; i >= 0; i--) { for (int i = 0; i < nThreads; i++) {
// System.err.println("Getting results " + i);
if (futures[i] == null) if (futures[i] == null)
break; break;
String result = null; final ArrayList<String[]> inputs = (ArrayList<String[]>) inputLines[i];
String[] result = null;
try { try {
result = (String) futures[i].get(); result = (String[]) futures[i].get();
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
e.printStackTrace(); e.printStackTrace();
result = e.getMessage();
} }
String[] writeLine = Arrays.copyOf(inputLines[i], inputLines[i].length + 1); // System.err.println("Results " + i);
writeLine[writeLine.length - 1] = result; for (int j = 0; j < inputs.size(); j++) {
String[] nextLine = inputs.get(j);
String[] writeLine = Arrays.copyOf(nextLine, nextLine.length + 1);
writeLine[writeLine.length - 1] = result[j];
writer.writeNext(writeLine); writer.writeNext(writeLine);
} }
}
// System.err.println("Submitted " + submitted);
} while (submitted > 0); } while (submitted > 0);
threadPool.shutdown();
stopWatch.stop(); stopWatch.stop();
writer.close(); writer.close();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment