Commit ad5d1748 authored by Matija Obreza's avatar Matija Obreza
Browse files

Reduced scope, multi-threaded loading

parent c757d1c7
......@@ -30,6 +30,6 @@ public interface GeonamesService {
* @param list list of geonames to be updated in the db
* @throws Exception when list of geonames not saved
*/
void update(List<Geoname> list) throws Exception;
void upsert(List<Geoname> list) throws Exception;
}
......@@ -15,6 +15,9 @@
*/
package org.genesys.catalog.service.impl;
import java.util.List;
import java.util.stream.Collectors;
import org.genesys.catalog.service.GeonamesService;
import org.genesys.common.model.Geoname;
import org.genesys.common.persistence.GeonameRepository;
......@@ -24,8 +27,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
* The Class GeonamesServiceImpl.
*
......@@ -42,7 +43,21 @@ public class GeonamesServiceImpl implements GeonamesService {
@Override
@Transactional
public void update(final List<Geoname> list) throws Exception {
geonameRepository.bulkSave(list);
public void upsert(final List<Geoname> list) throws Exception {
if (list.isEmpty()) {
return;
}
List<Geoname> existing = geonameRepository.findAll(list.stream().map(geoname -> geoname.getId()).collect(Collectors.toSet()));
for (Geoname geoname : list) {
geoname.setVersion(
// find matching id
existing.stream().filter(e -> e.getId().equals(geoname.getId()))
// get its version
.map(e -> e.getVersion())
// or null
.findFirst().orElse(null));
}
geonameRepository.save(list);
}
}
......@@ -24,13 +24,12 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
......@@ -38,10 +37,10 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.genesys.catalog.service.GeonamesService;
import org.genesys.common.model.Geoname;
import org.genesys.common.persistence.GeonameRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Component;
......@@ -51,14 +50,12 @@ import org.springframework.stereotype.Component;
@Component
public class GeonamesUpdater {
private static final int BATCH_SIZE = 500;
private static final int BATCH_SIZE = 100;
private static final Logger LOG = LoggerFactory.getLogger(GeonamesUpdater.class);
public static final String DUMP_FILE_NAME = "allCountries.txt";
// public static final String GEONAMES_DUMP_URL = "http://download.geonames.org/export/dump/UA.zip";
public static final String GEONAMES_DUMP_URL = "http://download.geonames.org/export/dump/allCountries.zip";
private static final String TEMP_DIR = "temp";
private static boolean RUNNING = false;
public final static int GEONAME_ID = 0;
......@@ -81,13 +78,15 @@ public class GeonamesUpdater {
public final static int TIMEZONE = 17;
public final static int MODIFICATION_DATE = 18;
@Autowired
private GeonameRepository geonameRepository;
// http://www.geonames.org/export/codes.html
private static final Character[] IMPORTED_FEATURE_CLASSES = { 'A', 'H', 'L', 'P', 'T', 'V' };
private static final String[] IMPORTED_FEATURE_CODES = { "PCL", "PCLD", "PCLI", "PCLIX", "PCLS", "PCLH", "ADM1", "ADM2", "ADM3", "OCN", "SEA", "LK", "PPL" };
@Autowired
private GeonamesService geonamesService;
private ExecutorService executor = Executors.newFixedThreadPool(1);
@Autowired
private TaskExecutor executor;
/**
* Update local Geonames with data from geonames.org
......@@ -95,7 +94,7 @@ public class GeonamesUpdater {
*/
@PreAuthorize("hasRole('ADMINISTRATOR')")
public void updateGeonames() {
executor.submit(() -> {
executor.execute(() -> {
try {
if (!isRunning()) {
downloadUnpackAndImportGeonames();
......@@ -106,15 +105,40 @@ public class GeonamesUpdater {
});
}
public static synchronized boolean isRunning() {
return RUNNING;
}
private void downloadUnpackAndImportGeonames() throws IOException {
LOG.warn("Downloading geonames data from {}", GEONAMES_DUMP_URL);
LOG.warn("Importing geonames data");
RUNNING = true;
File dumpFile = new File("data", DUMP_FILE_NAME);
try {
if (!dumpFile.exists()) {
dumpFile = downloadGeonames();
}
if (dumpFile != null) {
importGeonames(dumpFile);
} else {
LOG.warn("Expected file {} was not found.", DUMP_FILE_NAME);
}
} catch (Throwable e) {
// FileUtils.deleteQuietly(dumpFile);
}
}
private File downloadGeonames() throws IOException {
LOG.warn("Downloading geonames data from {}", GEONAMES_DUMP_URL);
final CloseableHttpClient httpclient = HttpClientBuilder.create().build();
final HttpGet httpget = new HttpGet(GEONAMES_DUMP_URL);
final HttpResponse response;
final HttpEntity entity;
InputStream instream = null;
File dumbFile = null;
try {
response = httpclient.execute(httpget);
......@@ -128,13 +152,16 @@ public class GeonamesUpdater {
while (ze != null) {
final String fileName = ze.getName();
final File newFile = new File(TEMP_DIR + File.separator + fileName);
LOG.warn("Unpacking {} file to {}", fileName, newFile.getAbsolutePath());
if (fileName.equals(DUMP_FILE_NAME)) {
dumbFile = newFile;
if (!fileName.equals(DUMP_FILE_NAME)) {
LOG.info("Skipping {}", fileName);
continue;
}
//update directories for sub directories in zip
final File newFile = File.createTempFile("geonames", fileName);
LOG.warn("Unpacking {} file to {}", fileName, newFile.getAbsolutePath());
// update directories for sub directories in zip
new File(newFile.getParent()).mkdirs();
final FileOutputStream fos = new FileOutputStream(newFile);
int len;
......@@ -146,14 +173,10 @@ public class GeonamesUpdater {
fos.close();
zis.closeEntry();
ze = zis.getNextEntry();
}
if (dumbFile != null) {
importGeonames(dumbFile);
} else {
LOG.warn("Expected file {} was not found.", DUMP_FILE_NAME);
throw new IOException("Missing file " + DUMP_FILE_NAME);
return newFile;
}
} catch (final Throwable e) {
LOG.error("Geonames download and unpack failed to complete.", e);
throw new IOException(e);
......@@ -161,8 +184,9 @@ public class GeonamesUpdater {
RUNNING = false;
IOUtils.closeQuietly(httpclient);
IOUtils.closeQuietly(instream);
FileUtils.deleteQuietly(new File(TEMP_DIR));
}
throw new IOException("Could not find " + DUMP_FILE_NAME + " in archive");
}
private void importGeonames(final File unpackedFile) throws IOException {
......@@ -174,26 +198,24 @@ public class GeonamesUpdater {
try {
inputStream = new FileInputStream(unpackedFile.getAbsolutePath());
sc = new Scanner(inputStream, "UTF-8");
List<Geoname> listToSave = new ArrayList<>(BATCH_SIZE);
List<String[]> linesToImport = new ArrayList<>(BATCH_SIZE);
long k = 0;
while (sc.hasNextLine()) {
final String line = sc.nextLine();
final String[] values = line.split("\t");
if (listToSave.size() == BATCH_SIZE) {
processData(listToSave);
} else {
Geoname geoname = geonameRepository.findOne(Long.valueOf(values[GEONAME_ID].trim()));
if (geoname == null) {
geoname = new Geoname();
extractParsedLineIntoGeoname(geoname, values);
} else {
extractParsedLineIntoGeoname(geoname, values);
}
listToSave.add(geoname);
if (linesToImport.size() == BATCH_SIZE) {
processData(linesToImport);
}
linesToImport.add(values);
k++;
if (k % 10000 == 1) {
LOG.info("Read {} lines", k);
}
processData(listToSave);
}
processData(linesToImport);
LOG.info("Done importing geonames database");
IOUtils.closeQuietly(sc);
......@@ -202,49 +224,82 @@ public class GeonamesUpdater {
}
}
private void processData(final List<String[]> linesToImport) {
if (linesToImport.isEmpty()) {
return;
}
final List<String[]> copy = new ArrayList<>(linesToImport);
linesToImport.clear();
executor.execute(() -> {
List<Geoname> toSave = new ArrayList<>();
for (String[] values : copy) {
Geoname parsed = extractParsedLineIntoGeoname(new Geoname(), values);
if (ArrayUtils.contains(IMPORTED_FEATURE_CLASSES, parsed.getFeatureClass())
//
&& ArrayUtils.contains(IMPORTED_FEATURE_CODES, parsed.getFeatureCode())) {
if (parsed.getAdmin3Code() == null) {
toSave.add(parsed);
}
}
}
if (toSave.size() == 0) {
return;
}
try {
LOG.info("Upserting {} geonames", toSave.size());
geonamesService.upsert(toSave);
} catch (final Throwable e) {
LOG.error("Some data bulk was not saved");
for (int i = 0; i < toSave.size(); i++) {
try {
geonamesService.upsert(toSave.subList(i, i + 1));
} catch (final Throwable e1) {
Geoname g = toSave.get(i);
LOG.warn("Data could not be saved geoname_id={}: {}", g.getId(), g.getName(), e1);
}
}
}
});
}
/**
* Extract data from line into Geoname instance
*
* @return
*/
private void extractParsedLineIntoGeoname(final Geoname geoname, final String[] values) {
private Geoname extractParsedLineIntoGeoname(final Geoname geoname, final String[] values) {
geoname.setId(Long.valueOf(values[GEONAME_ID].trim()));
geoname.setName(values[NAME]);
geoname.setAsciiname(values[ASCII_NAME]);
geoname.setAsciiName(values[ASCII_NAME]);
// String alternateNames = values[ALTERNATE_NAMES];
// // Remove all 4-byte utf8 characters
// alternateNames = alternateNames.replaceAll("[^\\u0000-\\uFFFF]", "");
// // geoname.setAlternateNames(StringUtils.trimToNull(alternateNames));
//TODO error when saving string with ASCII characters
// SQLException: Incorrect string value: '\xF0\x90\x8C\xB0\xF0\x90...' for column 'alternatenames'
geoname.setAlternatenames(values[ALTERNATE_NAMES].replaceAll("[^\\p{ASCII}]", ""));
geoname.setLatitude(!values[LATITUDE].isEmpty() ? Double.valueOf(values[LATITUDE]) : null);
geoname.setLongitude(!values[LONGITUDE].isEmpty() ? Double.valueOf(values[LONGITUDE]) : null);
geoname.setFeatureClass(values[FEATURE_CLASS]);
geoname.setFeatureCode(values[FEATURE_CODE]);
geoname.setCountryCode(values[COUNTRY_CODE]);
geoname.setCc2(values[CC2]);
geoname.setAdmin1Code(values[ADMIN1_CODE]);
geoname.setAdmin2Code(values[ADMIN2_CODE]);
geoname.setAdmin3Code(values[ADMIN3_CODE]);
geoname.setAdmin4Code(values[ADMIN4_CODE]);
geoname.setPopulation(!values[POPULATION].isEmpty() ? Long.valueOf(values[POPULATION]) : null);
geoname.setElevation(!values[ELEVATION].isEmpty() ? Integer.valueOf(values[ELEVATION]) : null);
geoname.setDem(!values[DEM].isEmpty() ? Integer.valueOf(values[DEM]) : null);
geoname.setTimezone(values[TIMEZONE]);
geoname.setFeatureClass(values[FEATURE_CLASS].length() == 0 ? null : values[FEATURE_CLASS].charAt(0));
geoname.setFeatureCode(StringUtils.trimToNull(values[FEATURE_CODE]));
geoname.setCountryCode(StringUtils.trimToNull(values[COUNTRY_CODE]));
geoname.setCc2(StringUtils.trimToNull(values[CC2]));
geoname.setAdmin1Code(StringUtils.trimToNull(values[ADMIN1_CODE]));
geoname.setAdmin2Code(StringUtils.trimToNull(values[ADMIN2_CODE]));
geoname.setAdmin3Code(StringUtils.trimToNull(values[ADMIN3_CODE]));
geoname.setAdmin4Code(StringUtils.trimToNull(values[ADMIN4_CODE]));
// geoname.setPopulation(!values[POPULATION].isEmpty() ?
// Long.valueOf(values[POPULATION]) : null);
// geoname.setElevation(!values[ELEVATION].isEmpty() ?
// Integer.valueOf(values[ELEVATION]) : null);
// geoname.setDem(!values[DEM].isEmpty() ? Integer.valueOf(values[DEM]) : null);
// geoname.setTimezone(StringUtils.trimToNull(values[TIMEZONE]));
try {
geoname.setModificationDate(!values[MODIFICATION_DATE].isEmpty() ? new SimpleDateFormat("yy-mm-dd").parse(values[MODIFICATION_DATE]) : null);
} catch (Exception ex) {
geoname.setModificationDate(null);
}
}
private void processData(final List<Geoname> bulk) {
final List<Geoname> copy = new ArrayList<>(bulk);
bulk.clear();
try {
geonamesService.update(copy);
} catch (final Exception e) {
LOG.error("Some data bulk was not saved, read logs!", e);
}
}
public static synchronized boolean isRunning() {
return RUNNING;
return geoname;
}
}
......@@ -15,15 +15,17 @@
*/
package org.genesys.common.model;
import org.genesys.blocks.model.AuditedVersionedModelWithoutId;
import java.io.Serializable;
import java.util.Date;
import javax.persistence.Cacheable;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Lob;
import javax.persistence.Table;
import java.util.Date;
import javax.persistence.Version;
import org.genesys.blocks.model.EntityId;
/**
* @author Maxym Borodenko
......@@ -31,15 +33,20 @@ import java.util.Date;
@Entity
@Cacheable
@Table(name = "geoname")
public class Geoname extends AuditedVersionedModelWithoutId {
public class Geoname implements EntityId, Serializable {
private static final long serialVersionUID = -292115847908509420L;
/**
* the Id value that geonames.org provides
*/
@Id
@Column(name = "geoname_id", unique = true, nullable = false)
@Column(name = "geonameId", unique = true, nullable = false)
private Long id;
@Version
private Integer version;
/**
* name of geographical point
*/
......@@ -50,14 +57,7 @@ public class Geoname extends AuditedVersionedModelWithoutId {
* name of geographical point in plain ascii characters
*/
@Column(length = 200)
private String asciiname;
/**
* alternate names
*/
@Lob
@Column
private String alternatenames;
private String asciiName;
/**
* latitude in decimal degrees
......@@ -74,79 +74,79 @@ public class Geoname extends AuditedVersionedModelWithoutId {
/**
* feature class
*/
@Column(name = "feature_class", length = 1)
private String featureClass;
@Column(length = 1)
private Character featureClass;
/**
* feature code
*/
@Column(name = "feature_code", length = 10)
@Column(length = 10)
private String featureCode;
/**
* ISO-3166 2-letter country code
*/
@Column(name = "country_code", length = 2)
@Column(length = 2)
private String countryCode;
/**
* alternate country codes, comma separated, ISO-3166 2-letter country code
*/
@Column(name = "cc2", length = 200)
@Column(length = 200)
private String cc2;
/**
* fipscode (subject to change to iso code)
*/
@Column(name = "admin1_code", length = 20)
@Column(length = 20)
private String admin1Code;
/**
* code for the second administrative division
*/
@Column(name = "admin2_code", length = 80)
@Column(length = 80)
private String admin2Code;
/**
* code for third level administrative division
*/
@Column(name = "admin3_code", length = 20)
@Column(length = 20)
private String admin3Code;
/**
* code for fourth level administrative division
*/
@Column(name = "admin4_code", length = 20)
@Column(length = 20)
private String admin4Code;
/**
* the population
*/
@Column
private Long population;
// /**
// * the population
// */
// @Column
// private Long population;
/**
* the elevation in meters
*/
@Column
private Integer elevation;
// /**
// * the elevation in meters
// */
// @Column
// private Integer elevation;
/**
* digital elevation model
*/
@Column
private Integer dem;
// /**
// * digital elevation model
// */
// @Column
// private Integer dem;
/**
* the timezone
*/
@Column(length = 40)
private String timezone;
// /**
// * the timezone
// */
// @Column(length = 40)
// private String timezone;
/**
* the modification date
*/
@Column(name = "modification_date")
@Column
private Date modificationDate;
/**
......@@ -167,6 +167,24 @@ public class Geoname extends AuditedVersionedModelWithoutId {
this.id = id;
}
/**
* Gets the version.
*
* @return the version
*/
public Integer getVersion() {
return version;
}
/**
* Sets the version.
*
* @param version the new version
*/
public void setVersion(Integer version) {
this.version = version;
}
/**
* Gets the name.
*
......@@ -190,8 +208,8 @@ public class Geoname extends AuditedVersionedModelWithoutId {
*
* @return the ascii name
*/
public String getAsciiname() {
return asciiname;
public String getAsciiName() {
return asciiName;
}
/**
......@@ -199,26 +217,8 @@ public class Geoname extends AuditedVersionedModelWithoutId {
*
* @param asciiname the ascii name to set
*/
public void setAsciiname(final String asciiname) {
this.asciiname = asciiname;
}
/**