Commit d5cc8be7 authored by Matija Obreza's avatar Matija Obreza
Browse files

DS2 code organization

parent 3da7cb03
......@@ -3,6 +3,7 @@ package org.genesys2.server.model.dataset;
import java.util.List;
import java.util.UUID;
import javax.persistence.CascadeType;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.OneToMany;
......@@ -23,11 +24,11 @@ public class DS extends BasicModel {
private UUID uuid;
@OrderBy("index")
@OneToMany(mappedBy = "dataset")
@OneToMany(mappedBy = "dataset", cascade = { CascadeType.ALL })
private List<DSQualifier> qualifiers;
@OrderBy("index")
@OneToMany(mappedBy = "dataset")
@OneToMany(mappedBy = "dataset", cascade = { CascadeType.ALL })
private List<DSColumn> columns;
/**
......
package org.genesys2.server.model.dataset;
import java.nio.ByteBuffer;
import java.util.List;
import javax.persistence.CascadeType;
......@@ -12,9 +13,12 @@ import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.ManyToOne;
import javax.persistence.OneToMany;
import javax.persistence.PrePersist;
import javax.persistence.PreUpdate;
import javax.persistence.Table;
import javax.persistence.UniqueConstraint;
import org.apache.commons.codec.digest.DigestUtils;
import org.genesys.blocks.model.EntityId;
/**
......@@ -31,14 +35,14 @@ import org.genesys.blocks.model.EntityId;
@Table(name = "ds2row", uniqueConstraints = { @UniqueConstraint(columnNames = { "md5", "sha1" }) })
public class DSRow implements EntityId {
@ManyToOne(fetch=FetchType.LAZY, optional = false)
@JoinColumn(name="ds")
@ManyToOne(fetch = FetchType.LAZY, optional = false)
@JoinColumn(name = "ds")
private DS dataset;
@OneToMany(fetch = FetchType.LAZY, mappedBy = "row", cascade = { CascadeType.REMOVE })
@OneToMany(fetch = FetchType.LAZY, mappedBy = "row", cascade = { CascadeType.ALL })
private List<DSRowQualifier<?>> rowQualifiers;
@OneToMany(fetch = FetchType.LAZY, mappedBy = "row")
@OneToMany(fetch = FetchType.LAZY, mappedBy = "row", cascade = { CascadeType.ALL })
private List<DSValue<?>> values;
@Id
......@@ -51,6 +55,19 @@ public class DSRow implements EntityId {
@Column(columnDefinition = "binary(20)", updatable = false)
private byte[] sha1;
@PrePersist
@PreUpdate
private void prePersist() {
ByteBuffer keyBuffer = ByteBuffer.allocate(500);
for (DSRowQualifier<?> dsq : this.getRowQualifiers()) {
dsq.putKey(keyBuffer);
}
byte[] array = keyBuffer.array();
this.md5 = DigestUtils.md5(array);
this.sha1 = DigestUtils.sha1(array);
}
@Override
public Long getId() {
return this.id;
......@@ -63,11 +80,11 @@ public class DSRow implements EntityId {
public DS getDataset() {
return dataset;
}
public void setDataset(DS dataset) {
this.dataset = dataset;
}
public List<DSRowQualifier<?>> getRowQualifiers() {
return rowQualifiers;
}
......
package org.genesys2.server.model.dataset;
import java.nio.ByteBuffer;
import javax.persistence.DiscriminatorColumn;
import javax.persistence.DiscriminatorType;
import javax.persistence.Entity;
......@@ -80,4 +82,5 @@ public abstract class DSRowQualifier<T> {
return null;
}
public abstract void putKey(ByteBuffer keyBuffer);
}
package org.genesys2.server.model.dataset;
import java.nio.ByteBuffer;
import javax.persistence.Column;
import javax.persistence.DiscriminatorValue;
import javax.persistence.Entity;
......@@ -20,5 +22,9 @@ public class DSRowQualifierLong extends DSRowQualifier<Long> {
public void setValue(Long value) {
this.value = value;
}
@Override
public void putKey(ByteBuffer keyBuffer) {
keyBuffer.putLong(this.value);
}
}
......@@ -21,12 +21,15 @@ import javax.persistence.Entity;
import javax.persistence.FetchType;
import javax.persistence.JoinColumn;
import javax.persistence.OneToOne;
import javax.persistence.PrePersist;
import javax.persistence.PreUpdate;
import javax.persistence.Table;
import javax.persistence.Version;
import org.apache.commons.lang.StringUtils;
import org.genesys.blocks.auditlog.annotations.Audited;
import org.genesys.blocks.model.BasicModel;
import org.genesys.worldclim.WorldClimUtil;
import org.genesys2.server.model.impl.GeoReferencedEntity;
@Entity
......@@ -57,6 +60,15 @@ public class AccessionGeo extends BasicModel implements GeoReferencedEntity, Acc
private Long tileIndex;
/**
* Recalculate {@link #tileIndex} on insert and update
*/
@PrePersist
@PreUpdate
private void prePersist() {
tileIndex = WorldClimUtil.getTileIndex(5, this.longitude, this.latitude);
}
public long getVersion() {
return version;
}
......
......@@ -43,4 +43,7 @@ public interface AccessionGeoRepository extends JpaRepository<AccessionGeo, Long
@Query("select distinct(ag.tileIndex) from AccessionGeo ag where ag.tileIndex is not null")
Collection<Long> getTileIndexes();
@Query("select distinct(ag.accession.id) from AccessionGeo ag where ag.longitude between -180 and 180 and ag.latitude between -90 and 90 and ag.tileIndex is null")
Set<Long> withMissingTileIndex();
}
......@@ -16,7 +16,7 @@ import org.genesys2.server.model.json.WorldclimJson;
public interface DSService {
void saveDataset(DS ds);
DS saveDataset(DS ds);
DSQualifier addQualifier(DS ds, DSDescriptor d1);
DSColumn addDescriptor(DS ds, DSDescriptor dSDescriptor);
......
......@@ -24,7 +24,7 @@ public interface DescriptorService {
List<DSDescriptor> list();
void saveDescriptor(DSDescriptor dSDescriptor);
DSDescriptor saveDescriptor(DSDescriptor dSDescriptor);
DSDescriptor getDescriptor(String variableName);
}
......@@ -2,9 +2,7 @@ package org.genesys2.server.service.impl;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
......@@ -18,7 +16,6 @@ import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.xssf.streaming.SXSSFSheet;
......@@ -26,11 +23,11 @@ import org.apache.poi.xssf.streaming.SXSSFWorkbook;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import org.genesys2.server.model.dataset.DS;
import org.genesys2.server.model.dataset.DSColumn;
import org.genesys2.server.model.dataset.DSDescriptor;
import org.genesys2.server.model.dataset.DSQualifier;
import org.genesys2.server.model.dataset.DSRow;
import org.genesys2.server.model.dataset.DSRowQualifier;
import org.genesys2.server.model.dataset.DSValue;
import org.genesys2.server.model.dataset.DSDescriptor;
import org.genesys2.server.model.genesys.AccessionGeo;
import org.genesys2.server.model.json.WorldclimJson;
import org.genesys2.server.persistence.domain.AccessionGeoRepository;
......@@ -40,7 +37,6 @@ import org.genesys2.server.persistence.domain.dataset.DSRepository;
import org.genesys2.server.persistence.domain.dataset.DSRowQualifierRepository;
import org.genesys2.server.persistence.domain.dataset.DSRowRepository;
import org.genesys2.server.persistence.domain.dataset.DSValueRepository;
import org.genesys2.server.persistence.domain.dataset.DSDescriptorRepository;
import org.genesys2.server.service.DSService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -53,12 +49,10 @@ import org.springframework.transaction.annotation.Transactional;
public class DSServiceImpl implements DSService {
public static final Logger LOG = LoggerFactory.getLogger(DSServiceImpl.class);
private static final Charset CHARSET_UTF8 = Charset.forName("UTF8");
@Autowired
private DSRepository dsRepo;
@Autowired
private DSColumnRepository dsDescRepo;
private DSColumnRepository dsColRepo;
@Autowired
private DSQualifierRepository dsQualRepo;
@Autowired
......@@ -67,16 +61,14 @@ public class DSServiceImpl implements DSService {
private DSRowQualifierRepository dsRowQualiRepo;
@Autowired
private DSValueRepository dsValueRepo;
@Autowired
private DSDescriptorRepository descriptorRepo;
@Autowired
private AccessionGeoRepository accessionGeoRepository;
@Transactional
@Override
public void saveDataset(DS ds) {
dsRepo.save(ds);
public DS saveDataset(DS ds) {
return dsRepo.save(ds);
}
@Transactional
......@@ -98,7 +90,7 @@ public class DSServiceImpl implements DSService {
dsd.setDescriptor(d1);
dsd.setIndex(ds.getColumns().size() + 1);
ds.getColumns().add(dsd);
return dsDescRepo.save(dsd);
return dsColRepo.save(dsd);
}
@Override
......@@ -222,12 +214,12 @@ public class DSServiceImpl implements DSService {
}
private void saveRows(Collection<DSRow> dsrs) {
// ArrayList<DSRowQualifier<?>> quali = new ArrayList<DSRowQualifier<?>>();
// for (DSRow dsr : dsrs) {
// quali.addAll(dsr.getRowQualifiers());
// }
// dsRowQualiRepo.save(quali);
dsRowRepo.save(dsrs);
ArrayList<DSRowQualifier<?>> quali = new ArrayList<DSRowQualifier<?>>();
for (DSRow dsr : dsrs) {
quali.addAll(dsr.getRowQualifiers());
}
dsRowQualiRepo.save(quali);
}
private DSRow findOrMakeRow(DS ds, Object[] qualifiers) {
......@@ -288,32 +280,17 @@ public class DSServiceImpl implements DSService {
private DSRow makeRow(DS ds, Object... qualifiers) {
DSRow dsr = new DSRow();
ByteBuffer keyBuffer = ByteBuffer.allocate(500);
dsr.setDataset(ds);
List<DSRowQualifier<?>> rowQualifiers = new ArrayList<DSRowQualifier<?>>();
int i = 0;
for (DSQualifier dsq : ds.getQualifiers()) {
DSRowQualifier<?> dsrq = DSRowQualifier.make(qualifiers[i]);
if (qualifiers[i] instanceof Long) {
keyBuffer.putLong(((Long) qualifiers[i]).longValue());
} else if (qualifiers[i] instanceof String) {
keyBuffer.put(((String) qualifiers[i]).getBytes(CHARSET_UTF8));
} else if (qualifiers[i] instanceof Double) {
keyBuffer.putDouble(((Double) qualifiers[i]).doubleValue());
}
dsrq.setDatasetQualifier(dsq);
dsrq.setRow(dsr);
rowQualifiers.add(dsrq);
i++;
}
// LOG.info("MD4 length: " + DigestUtils.md5(keyBuffer.array()).length);
// LOG.info("SHA1 length: " +
// DigestUtils.sha1(keyBuffer.array()).length);
dsr.setMd5(DigestUtils.md5(keyBuffer.array()));
dsr.setSha1(DigestUtils.sha1(keyBuffer.array()));
dsr.setDataset(ds);
dsr.setRowQualifiers(rowQualifiers);
return dsr;
}
......@@ -335,15 +312,13 @@ public class DSServiceImpl implements DSService {
*/
@Override
@Transactional
public void worldclimUpdate(DS dataset, DSColumn dsd, Set<Long> tileIndexes, MappedByteBuffer buffer, short nullValue, double factor) {
if (LOG.isInfoEnabled()) {
LOG.info("Updating {} for tileIndexes: {}", dsd.getDescriptor().getCode(), tileIndexes.size());
}
public void worldclimUpdate(DS dataset, DSColumn dsc, Set<Long> tileIndexes, MappedByteBuffer buffer, short nullValue, double factor) {
LOG.debug("Updating {} for tileIndexes: {}", dsc.getDescriptor().getCode(), tileIndexes.size());
Map<Object, DSRow> rowMap = findOrMakeRows(dataset, tileIndexes);
// Load all existing values for rows
Map<Long, DSValue<?>> values = dsRowRepo.rowValueMap(rowMap.values(), dsd);
Map<Long, DSValue<?>> values = dsRowRepo.rowValueMap(rowMap.values(), dsc);
List<DSValue<?>> toSave = new ArrayList<DSValue<?>>();
for (final Long tileIndex : tileIndexes) {
......@@ -353,9 +328,7 @@ public class DSServiceImpl implements DSService {
try {
short val = buffer.getShort((int) (tileIndex * 2));
if (val != nullValue) {
if (LOG.isDebugEnabled()) {
LOG.debug("tile={} val={}", tileIndex, val);
}
LOG.trace("tile={} val={}", tileIndex, val);
Object value = null;
if (factor < 1.0) {
......@@ -372,9 +345,9 @@ public class DSServiceImpl implements DSService {
dsrv = values.get(dsr.getId());
if (dsrv == null) {
LOG.debug("Making new value for row {} and {}", dsr, dsd);
LOG.debug("Making new value for row {} and {}", dsr, dsc);
dsrv = DSValue.make(value);
dsrv.setDatasetColumn(dsd);
dsrv.setDatasetColumn(dsc);
dsrv.setRow(dsr);
}
dsrv.setValue2(value);
......@@ -398,7 +371,7 @@ public class DSServiceImpl implements DSService {
// Save values
dsValueRepo.save(toSave);
LOG.info("Done saving.");
LOG.debug("Done saving.");
}
@Override
......@@ -553,6 +526,6 @@ public class DSServiceImpl implements DSService {
LOG.info("Clearing DSValues for {}", dsd.getDescriptor().getCode());
int count = dsValueRepo.deleteFor(dsd);
LOG.info("Deleted {} DSValue cells", count);
dsDescRepo.delete(dsd);
dsColRepo.delete(dsd);
}
}
......@@ -33,21 +33,21 @@ public class DescriptorServiceImpl implements DescriptorService {
public static final Logger LOG = LoggerFactory.getLogger(DescriptorServiceImpl.class);
@Autowired
private DSDescriptorRepository dSDescriptorRepository;
private DSDescriptorRepository descriptorRepository;
@Override
public List<DSDescriptor> list() {
return dSDescriptorRepository.findAll();
return descriptorRepository.findAll();
}
@Transactional
@Override
public void saveDescriptor(DSDescriptor descriptor) {
dSDescriptorRepository.save(descriptor);
public DSDescriptor saveDescriptor(DSDescriptor descriptor) {
return descriptorRepository.save(descriptor);
}
@Override
public DSDescriptor getDescriptor(String variableName) {
return dSDescriptorRepository.findByCode(variableName);
return descriptorRepository.findByCode(variableName);
}
}
......@@ -25,18 +25,12 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.MappedByteBuffer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
......@@ -51,23 +45,25 @@ import org.genesys.worldclim.grid.generic.Header;
import org.genesys2.server.aspect.AsAdmin;
import org.genesys2.server.model.dataset.DS;
import org.genesys2.server.model.dataset.DSColumn;
import org.genesys2.server.model.dataset.DSQualifier;
import org.genesys2.server.model.dataset.DSDescriptor;
import org.genesys2.server.model.dataset.DSQualifier;
import org.genesys2.server.model.genesys.AccessionGeo;
import org.genesys2.server.persistence.domain.AccessionGeoRepository;
import org.genesys2.server.persistence.domain.GenesysLowlevelRepository;
import org.genesys2.server.service.DSService;
import org.genesys2.server.service.DescriptorService;
import org.genesys2.server.service.impl.FilterHandler.AppliedFilters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.RowCallbackHandler;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
/**
* Download data from WorldClim.org and update local cache.
* {@link WorldClimUpdaterImpl#WORLDCLIM_ORG}.
......@@ -93,6 +89,9 @@ public class WorldClimUpdater implements InitializingBean {
@Value("${worldclim.dir}")
private String worldclimDir;
@Autowired
private TaskExecutor taskExecutor;
private File worldClimDir;
@Autowired
......@@ -102,7 +101,7 @@ public class WorldClimUpdater implements InitializingBean {
private DescriptorService descriptorService;
@Autowired
@Qualifier("genesysLowlevelRepositoryCustomImpl")
@Qualifier("genesysLowlevelRepositoryCustomImpl")
private GenesysLowlevelRepository genesysLowlevel;
@Autowired
......@@ -116,16 +115,48 @@ public class WorldClimUpdater implements InitializingBean {
@AsAdmin
public void downloadAll() {
try {
downloadAndExtract(worldClimDir, "alt_2-5m_bil.zip");
downloadAndExtract(worldClimDir, "prec_2-5m_bil.zip");
downloadAndExtract(worldClimDir, "tmin_2-5m_bil.zip");
downloadAndExtract(worldClimDir, "tmax_2-5m_bil.zip");
downloadAndExtract(worldClimDir, "tmean_2-5m_bil.zip");
downloadAndExtract(worldClimDir, "bio_2-5m_bil.zip");
} catch (IOException e) {
LOG.error("Failed to download and import", e);
}
taskExecutor.execute(() -> {
try {
downloadAndExtract(worldClimDir, "alt_2-5m_bil.zip");
} catch (IOException e) {
LOG.error("Failed to download and import", e);
}
});
taskExecutor.execute(() -> {
try {
downloadAndExtract(worldClimDir, "prec_2-5m_bil.zip");
} catch (IOException e) {
LOG.error("Failed to download and import", e);
}
});
taskExecutor.execute(() -> {
try {
downloadAndExtract(worldClimDir, "tmin_2-5m_bil.zip");
} catch (IOException e) {
LOG.error("Failed to download and import", e);
}
});
taskExecutor.execute(() -> {
try {
downloadAndExtract(worldClimDir, "tmax_2-5m_bil.zip");
} catch (IOException e) {
LOG.error("Failed to download and import", e);
}
});
taskExecutor.execute(() -> {
try {
downloadAndExtract(worldClimDir, "tmean_2-5m_bil.zip");
} catch (IOException e) {
LOG.error("Failed to download and import", e);
}
});
taskExecutor.execute(() -> {
try {
downloadAndExtract(worldClimDir, "bio_2-5m_bil.zip");
} catch (IOException e) {
LOG.error("Failed to download and import", e);
}
});
}
/**
......@@ -141,23 +172,13 @@ public class WorldClimUpdater implements InitializingBean {
downloadAndExtract(worldClimDir, getFileName(variableName));
}
DS dataset = dsService.loadDatasetByUuid(WORLDCLIM_DATASET);
if (dataset == null) {
// Ensure dataset
dataset = createWorldClimDataset();
DSDescriptor foo = descriptorService.getDescriptor("tileIndex");
if (foo == null) {
foo = createDescriptor(dataset, "tileIndex");
}
dsService.addQualifier(dataset, foo);
} else {
}
DS dataset = ensureWorldClimDataset();
DSDescriptor dSDescriptor = descriptorService.getDescriptor(variableName);
DSDescriptor descriptor = descriptorService.getDescriptor(variableName);
DSColumn worldclimDescriptor = null;
if (dSDescriptor == null) {
dSDescriptor = createDescriptor(dataset, variableName);
worldclimDescriptor = dsService.addDescriptor(dataset, dSDescriptor);
if (descriptor == null) {
descriptor = createDescriptor(variableName);
worldclimDescriptor = dsService.addDescriptor(dataset, descriptor);
LOG.info("Created worldclimDescriptor {}", variableName);
} else {
for (DSColumn dsd : dataset.getColumns()) {
......@@ -168,10 +189,10 @@ public class WorldClimUpdater implements InitializingBean {
}
}
if (worldclimDescriptor == null)
worldclimDescriptor = dsService.addDescriptor(dataset, dSDescriptor);
worldclimDescriptor = dsService.addDescriptor(dataset, descriptor);
}
LOG.info("Using worldClim {}", worldclimDescriptor);
LOG.debug("Using worldClim {}", worldclimDescriptor);
GenericGridFile ggf = new GenericGridFile(worldClimDir, variableName);
Header header = ggf.readHeader();
......@@ -189,60 +210,70 @@ public class WorldClimUpdater implements InitializingBean {
}
Set<Long> tileIndexSet = getExistingTileIndexes();
if (tileIndexSet.isEmpty())
if (missingTileIndexes())
tileIndexSet = generateTileIndexes();
List<Long> tileIndexes = new ArrayList<Long>(tileIndexSet);
int batchSize = 1000;
for (int fromIndex = 0; fromIndex < tileIndexes.size(); fromIndex += batchSize) {
HashSet<Long> ids = new HashSet<Long>(tileIndexes.subList(fromIndex, Math.min(fromIndex + batchSize, tileIndexes.size())));
LOG.info("Processing tileIndexes: {} of {}", fromIndex, tileIndexes.size());
LOG.info("Processing variable {} for tileIndexes: {}-{} of {}", variableName, fromIndex, fromIndex + ids.size(), tileIndexes.size());
dsService.worldclimUpdate(dataset, worldclimDescriptor, ids, buffer, nullValue, factor);
}