Commit 1d6580ea authored by Matija Obreza's avatar Matija Obreza
Browse files

Improved AccessionCounter delays

- Count for large collections takes a while and blocks upserts
parent 04bbeffc
......@@ -33,8 +33,8 @@ import org.springframework.stereotype.Component;
/**
* Updating {@link FaoInstitute} accession count during upsert and delete is not
* efficient. This component keeps requests for updates in a delayed queue, keeping
* the most recent request for update.
* efficient. This component keeps requests for updates in a delayed queue,
* keeping the most recent request for update.
*
* It processes expired recounts every X seconds.
*
......@@ -42,14 +42,20 @@ import org.springframework.stereotype.Component;
*/
@Component
public class AccessionCounter implements InitializingBean, DisposableBean {
private static final Logger LOG = LoggerFactory.getLogger(AccessionCounter.class);
// Interval time (in ms) for scanning the queue for expired updates
private static final int DELAY_BETWEEN_QUEUESCANS = 2000;
// If a request comes before this delay (in ms), reschedule
private static final long MINTIME_BETWEEN_UPDATES = 10000;
private DelayQueue<DelayedObject<String>> instituteQueue;
@Autowired
InstituteService instituteService;
@Autowired(required = false)
private InstituteService instituteService;
@Autowired
@Autowired(required = false)
private GenesysService genesysService;
@Override
......@@ -72,18 +78,18 @@ public class AccessionCounter implements InitializingBean, DisposableBean {
return;
}
DelayedObject<String> delay = new DelayedObject<String>(institute.getCode(), 2000);
DelayedObject<String> delay = new DelayedObject<String>(institute.getCode(), MINTIME_BETWEEN_UPDATES);
synchronized (instituteQueue) {
LOG.trace("Queue size {}, contains {} = {}", instituteQueue.size(), delay.getObj(), instituteQueue.contains(delay));
if (! instituteQueue.remove(delay)) {
if (!instituteQueue.remove(delay)) {
LOG.debug("Element was not removed {}", delay.getObj());
}
instituteQueue.put(delay);
}
LOG.trace("Scheduled re-count for {}", institute.getCode());
LOG.trace("Rescheduled count for {}", institute.getCode());
}
@Scheduled(fixedDelay = 2000)
@Scheduled(fixedDelay = DELAY_BETWEEN_QUEUESCANS)
public void processQueues() {
DelayedObject<String> forProcessing = null;
do {
......@@ -99,6 +105,11 @@ public class AccessionCounter implements InitializingBean, DisposableBean {
return;
}
if (instituteService == null || genesysService == null) {
LOG.warn("Cannot update count for {}, no services.", forProcessing.getObj());
return;
}
FaoInstitute institute = instituteService.findInstitute(forProcessing.getObj());
if (institute != null) {
LOG.info("Updating count for {}", institute.getCode());
......@@ -134,7 +145,9 @@ public class AccessionCounter implements InitializingBean, DisposableBean {
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expiryTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
long delay = unit.convert(expiryTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
LOG.trace("{} scheduled in {}ms", this.obj, delay);
return delay;
}
@Override
......
package org.genesys2.server.service.worker;
import org.apache.commons.lang3.RandomUtils;
import org.genesys2.server.model.impl.FaoInstitute;
import org.genesys2.server.test.PropertyPlacholderInitializer;
import org.genesys2.spring.config.SchedulerConfig;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@Ignore
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = AccessionCounterTest.Config.class, initializers = PropertyPlacholderInitializer.class)
public class AccessionCounterTest {
// private static final Logger LOG = LoggerFactory.getLogger(AccessionCounterTest.class);
@Import(SchedulerConfig.class)
public static class Config {
@Bean
public AccessionCounter accessionCounter() {
return new AccessionCounter();
}
}
@Autowired
private AccessionCounter accessionCounter;
@Test
public void test1() throws InterruptedException {
FaoInstitute inst = new FaoInstitute();
inst.setCode("bbb001");
FaoInstitute inst2 = new FaoInstitute();
inst2.setCode("AAA001");
for (int i=0; i<10; i++) {
if (RandomUtils.nextBoolean()) {
accessionCounter.recountInstitute(inst);
} else {
accessionCounter.recountInstitute(inst2);
}
Thread.sleep(RandomUtils.nextLong(2000, 5000));
}
Thread.sleep(11000);
}
}
......@@ -30,6 +30,7 @@ log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n
log4j.rootLogger=warn, stdout
log4j.category.org.genesys2=info
#log4j.category.org.genesys2.server.service.worker.AccessionCounter=trace
#log4j.category.org.hibernate.cache=debug
#log4j.category.org.hibernate.jdbc=debug
log4j.category.org.hibernate.jdbc.AbstractBatcher=debug
\ No newline at end of file
log4j.category.org.hibernate.jdbc.AbstractBatcher=debug
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment