Commit 1c6092dc authored by Matija Obreza's avatar Matija Obreza
Browse files

AccessionProcessor: apply() retries execution 5 times in case of exceptions

- Added transaction timeouts to methods called from thread pool
parent ba22497d
......@@ -372,10 +372,11 @@ public class AdminController {
accessionGeoRepository.save(toSave);
LOG.debug("Regenerated {} tileIndexes", accessions.size());
LOG.debug("Assigned {} tileIndexes", accessions.size());
return accessions;
});
LOG.warn("Done recalculating tileIndexes");
return "redirect:/admin/";
}
......
......@@ -328,7 +328,7 @@ public class AccessionServiceImpl implements AccessionService {
}
@Override
@Transactional(readOnly = false, propagation = Propagation.REQUIRES_NEW)
@Transactional(readOnly = false, propagation = Propagation.REQUIRES_NEW, timeout = 10)
public List<Accession> processAccessions(List<Long> accessionIds, IAccessionBatchAction action) {
List<Accession> accessions = accessionRepository.findAll(accessionIds);
LOG.debug("Processing {} accessions of {} IDs provided", accessions.size(), accessionIds.size());
......
......@@ -65,7 +65,7 @@ public class ClimateDataServiceImpl implements ClimateDataService {
* @see org.genesys2.server.service.ClimateDataService#worldclimUpdate(java.lang.String, java.util.HashSet, java.nio.MappedByteBuffer, short, double)
*/
@Override
@Transactional
@Transactional(timeout = 30)
public void worldclimUpdate(String variableName, HashSet<Long> tileIndexes, MappedByteBuffer buffer, Header header, double factor) {
LOG.debug("Updating {} for tileIndexes: {}", variableName, tileIndexes.size());
......@@ -93,12 +93,12 @@ public class ClimateDataServiceImpl implements ClimateDataService {
value = new Long((long) (factor * val));
}
try {
BeanUtils.setProperty(tileClimate, variableName, value);
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.error("Cannot set TileClimate#{} to {}. {}", variableName, value, e.getMessage());
}
try {
BeanUtils.setProperty(tileClimate, variableName, value);
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.error("Cannot set TileClimate#{} to {}. {}", variableName, value, e.getMessage());
}
}
} catch (IndexOutOfBoundsException e) {
LOG.error("OUT OF BOUND tile={} capacity={} limit={}", tileIndex, buffer.capacity(), buffer.limit());
......
......@@ -205,13 +205,22 @@ public class AccessionProcessor {
final ArrayList<Long> copy = new ArrayList<>(accessionIds);
threadPoolTaskExecutor.execute(() -> {
try {
LOG.trace("Executing action on {} Accessions.", copy.size());
accessionService.processAccessions(copy, action);
} catch (Throwable e) {
LOG.warn("Error executing action on batch: {}", e.getMessage(), e);
} finally {
LOG.trace("Done executing action on {} accessions.", copy.size());
for (int i = 5; i >= 0; i--) {
try {
LOG.trace("Executing action on {} Accessions.", copy.size());
accessionService.processAccessions(copy, action);
LOG.trace("Done executing action on {} accessions.", copy.size());
break; // break try-5-times loop
} catch (Throwable e) {
if (i == 0) {
throw e;
}
LOG.warn("Error executing action on batch: {}. Retrying.", e.getMessage());
try {
Thread.sleep(250);
} catch (InterruptedException e1) {
}
}
}
});
}
......
......@@ -54,7 +54,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.task.TaskExecutor;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonParseException;
......@@ -172,20 +171,9 @@ public class WorldClimUpdater implements InitializingBean {
MappedByteBuffer buffer = ggf.mapDataBuffer();
double factor = 1.0;
if (variableName.matches("(bio)(1|2|5|6|7|8|9|10|11)")) {
// bioclimatic variables derived from the tmean, tmin, tmax (factor = .1f)
factor = .1f;
} else if (variableName.matches("(bio)(3|4)")) {
// (* 100) at http://worldclim.org/bioclim
factor = .01f;
} else if (variableName.matches("(tmean|tmax|tmin)\\d{1,2}")) {
factor = .1f;
}
double factor = WorldClimUtil.getFactor(variableName);
Set<Long> tileIndexSet = getExistingTileIndexes();
if (missingTileIndexes())
tileIndexSet.addAll(generateTileIndexes());
List<Long> tileIndexes = new ArrayList<Long>(tileIndexSet);
int batchSize = 1000;
......@@ -197,10 +185,11 @@ public class WorldClimUpdater implements InitializingBean {
try {
climateDataService.worldclimUpdate(variableName, ids, buffer, header, factor);
break; // break try-5-times loop
} catch (DataIntegrityViolationException e) {
} catch (Throwable e) {
if (i == 0) {
throw e;
}
LOG.warn("Problem updating variable {}. {}. Retrying.", variableName, e.getMessage());
try {
Thread.sleep(250);
} catch (InterruptedException e1) {
......@@ -365,7 +354,15 @@ public class WorldClimUpdater implements InitializingBean {
* Update all WorldClim variables
*/
public void update() {
if (missingTileIndexes()) {
try {
generateTileIndexes();
} catch (IOException e) {
LOG.error("Error generating missing tileIndexes: {}", e.getMessage());
throw new RuntimeException("Error generating missing tileIndexes", e);
}
}
try {
update("alt");
} catch (IOException e) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment