Commit 05ab953c authored by Matija Obreza's avatar Matija Obreza

Resource conscious accession#crop update

parent 197ac55b
...@@ -128,8 +128,9 @@ public interface AccessionRepository extends JpaRepository<Accession, Long> { ...@@ -128,8 +128,9 @@ public interface AccessionRepository extends JpaRepository<Accession, Long> {
public long countByHistoric(boolean historic); public long countByHistoric(boolean historic);
@Query("select a from Accession a where a.cropName != null")
Stream<Accession> streamWithCropname(); @Query("select a.id from Accession a where a.cropName != null")
List<Long> listAccessionIdsWithCropname();
@Query("update Accession a set a.crop = ?2 where a = ?1") @Query("update Accession a set a.crop = ?2 where a = ?1")
@Modifying @Modifying
......
...@@ -268,8 +268,7 @@ public interface GenesysService { ...@@ -268,8 +268,7 @@ public interface GenesysService {
void regenerateAccessionSequentialNumber(); void regenerateAccessionSequentialNumber();
Stream<Accession> streamWithCropname();
List<Accession> saveAccessionCrops(ArrayList<Long> accessionIds);
List<Accession> saveAccessionCrops(ArrayList<Accession> copy);
} }
...@@ -32,7 +32,7 @@ import java.util.List; ...@@ -32,7 +32,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Stream; import java.util.stream.Collectors;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream; import java.util.zip.ZipOutputStream;
...@@ -775,7 +775,7 @@ public class GenesysServiceImpl implements GenesysService, DatasetService { ...@@ -775,7 +775,7 @@ public class GenesysServiceImpl implements GenesysService, DatasetService {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Updating " + accession); LOG.debug("Updating " + accession);
System.err.println("Saving " + accession + " ST=" + accession.getStorage()); LOG.debug("Saving " + accession + " ST=" + accession.getStorage());
AccessionData res = null; AccessionData res = null;
if (accession instanceof AccessionHistoric) { if (accession instanceof AccessionHistoric) {
res = accessionHistoricRepository.save((AccessionHistoric) accession); res = accessionHistoricRepository.save((AccessionHistoric) accession);
...@@ -1790,17 +1790,14 @@ public class GenesysServiceImpl implements GenesysService, DatasetService { ...@@ -1790,17 +1790,14 @@ public class GenesysServiceImpl implements GenesysService, DatasetService {
} }
@Override
public Stream<Accession> streamWithCropname() {
return accessionRepository.streamWithCropname();
}
@Override @Override
@Transactional(readOnly = false, propagation = Propagation.REQUIRES_NEW) @Transactional(readOnly = false, propagation = Propagation.REQUIRES_NEW)
// @PreAuthorize("hasRole('ADMINISTRATOR')") // @PreAuthorize("hasRole('ADMINISTRATOR')")
public List<Accession> saveAccessionCrops(ArrayList<Accession> accessions) { public List<Accession> saveAccessionCrops(ArrayList<Long> accessionIds) {
for (Accession accession : accessions) List<Accession> accessions = new ArrayList<>(accessionRepository.findAll(accessionIds));
for (Accession accession : accessions) {
accessionRepository.updateCrop(accession, cropService.getCrop(accession.getCropName())); accessionRepository.updateCrop(accession, cropService.getCrop(accession.getCropName()));
}
return accessions; return accessions;
} }
} }
...@@ -19,9 +19,8 @@ import org.springframework.stereotype.Component; ...@@ -19,9 +19,8 @@ import org.springframework.stereotype.Component;
import com.hazelcast.core.IQueue; import com.hazelcast.core.IQueue;
/** /**
* ES Processor component uses Spring's @Scheduled annotation to scan queues * ES Processor component uses Spring's @Scheduled annotation to scan queues with 2000ms delay measured from the
* with 2000ms delay measured from the completion time of each preceding * completion time of each preceding invocation.
* invocation.
*/ */
@Component @Component
class ElasticUpdaterProcessor { class ElasticUpdaterProcessor {
...@@ -88,8 +87,7 @@ class ElasticUpdaterProcessor { ...@@ -88,8 +87,7 @@ class ElasticUpdaterProcessor {
} }
if (LOG.isInfoEnabled() && (i % 500 == 0)) { if (LOG.isInfoEnabled() && (i % 500 == 0)) {
LOG.info("Queue size update=" + elasticUpdateQueue.size() + " indexed=" + i + " rate=" + (1000.0 * i / (stopWatch.getTime())) LOG.info("Queue size update=" + elasticUpdateQueue.size() + " indexed=" + i + " rate=" + Math.round(1000.0 * i / (stopWatch.getTime())) + " records/s");
+ " records/s");
} }
} while (toUpdate != null); } while (toUpdate != null);
......
...@@ -32,9 +32,11 @@ import javax.xml.parsers.ParserConfigurationException; ...@@ -32,9 +32,11 @@ import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.StopWatch;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.genesys2.server.model.genesys.Accession; import org.genesys2.server.model.genesys.Accession;
import org.genesys2.server.persistence.domain.AccessionRepository;
import org.genesys2.server.persistence.domain.GenesysLowlevelRepository; import org.genesys2.server.persistence.domain.GenesysLowlevelRepository;
import org.genesys2.server.service.CountryNamesUpdater; import org.genesys2.server.service.CountryNamesUpdater;
import org.genesys2.server.service.CropService; import org.genesys2.server.service.CropService;
...@@ -118,6 +120,9 @@ public class AdminController { ...@@ -118,6 +120,9 @@ public class AdminController {
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
@Autowired
private AccessionRepository accessionRepository;
@RequestMapping("/") @RequestMapping("/")
public String root(Model model) { public String root(Model model) {
return "/admin/index"; return "/admin/index";
...@@ -362,8 +367,7 @@ public class AdminController { ...@@ -362,8 +367,7 @@ public class AdminController {
} }
/** /**
* Scan AccessionData table and convert ACCENUMB to ACCENUMBNUMB (extract * Scan AccessionData table and convert ACCENUMB to ACCENUMBNUMB (extract the number from the ACCNUMB)
* the number from the ACCNUMB)
* *
* @return * @return
*/ */
...@@ -387,26 +391,28 @@ public class AdminController { ...@@ -387,26 +391,28 @@ public class AdminController {
return "redirect:/admin/"; return "redirect:/admin/";
} }
@Transactional(readOnly=true, propagation=Propagation.REQUIRES_NEW)
@RequestMapping(value = "/cropname-crop", method = RequestMethod.POST) @RequestMapping(value = "/cropname-crop", method = RequestMethod.POST)
public String assignCropWithCropname() { public String assignCropWithCropname() {
LOG.info("Assigning crops to accessions with CROPNAME."); LOG.info("Assigning crops to accessions with CROPNAME.");
AtomicLong counter = new AtomicLong(0); AtomicLong counter = new AtomicLong(0);
List<Accession> batch = Collections.synchronizedList(new ArrayList<>()); StopWatch stopWatch = new StopWatch();
genesysService.streamWithCropname().parallel().forEach(accession -> { stopWatch.start();
batch.add(accession); final List<Long> batch = Collections.synchronizedList(new ArrayList<>());
ArrayList<Accession> copy = null; List<Long> list = accessionRepository.listAccessionIdsWithCropname();
LOG.info("The list has " + list.size() + " elements");
list.stream().parallel().forEach(accessionId -> {
batch.add(accessionId);
ArrayList<Long> copy = null;
synchronized (batch) { synchronized (batch) {
if (batch.size() > 100) { if (batch.size() > 100) {
copy = new ArrayList<>(batch); copy = new ArrayList<>(batch);
batch.clear(); batch.clear();
} }
} }
if (copy != null) { if (copy != null)
genesysService.saveAccessionCrops(copy); genesysService.saveAccessionCrops(copy);
}
if (counter.incrementAndGet() % 1000 == 0 && LOG.isInfoEnabled()) { if (counter.incrementAndGet() % 1000 == 0 && LOG.isInfoEnabled()) {
LOG.info("Updated " + counter.get() + " records"); LOG.info("Updated " + counter.get() + " records overall_rate=" + Math.round(1000.0 * counter.get() / (stopWatch.getTime())) + " records/s");
} }
}); });
LOG.info("Done assigning crops to accessions with CROPNAME."); LOG.info("Done assigning crops to accessions with CROPNAME.");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment