Commit 7837e5b4 authored by Gradl, Tobias's avatar Gradl, Tobias
Browse files

446: Reimplement automatic online and offline crawl capabilities

(OPENED)

Task-Url: #446
parent 8e7b59e0
Pipeline #31344 passed with stage
in 43 seconds
package eu.dariah.de.search.automation.crawling;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dariah.de.search.automation.schedule.ScheduledRunnable;
import eu.dariah.de.search.crawling.CrawlManager;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Data
public abstract class BaseCrawlRunner implements ScheduledRunnable, DisposableBean {
@Autowired CrawlManager crawlManager;
@Getter @Setter private boolean active;
@Getter @Setter private String cronExpression;
@Getter @Setter protected boolean debugging;
@Getter @Setter private int timeout;
@Getter private Date nextExecution;
public synchronized void setNextExecution(Date nextExecution) {
this.nextExecution = nextExecution;
}
@Getter private Date lastCompletion;
public synchronized void setLastCompletion(Date lastCompletion) {
this.lastCompletion = lastCompletion;
}
@Override
public void run() {
this.executeCrawling();
this.setLastCompletion(Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()));
}
@Override
public void destroy() throws Exception {
try {
/*pipelineExecutor.shutdown();
// Wait until all threads are finished
while (!pipelineExecutor.isTerminated()) {}*/
} catch (final Exception e) {
log.error("Error closing sync executor", e);
}
}
protected abstract void executeCrawling();
}
package eu.dariah.de.search.automation.crawling;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
@Data
@EqualsAndHashCode(callSuper=false)
@Slf4j
public class OnlineCrawlRunner extends BaseCrawlRunner {
@Override
protected void executeCrawling() {
}
}
package eu.dariah.de.search.automation.schedule;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import org.joda.time.DateTime;
import org.joda.time.Days;
import org.joda.time.Period;
import org.joda.time.Seconds;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dariah.de.search.automation.schedule.NextExecution.CALCULATION_METHODS;
import eu.dariah.de.search.crawling.CrawlManager;
import eu.dariah.de.search.service.CollectionService;
import eu.dariah.de.search.service.CrawlService;
import eu.dariah.de.search.service.DatamodelService;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Data
public abstract class BaseCrawlRunner implements ScheduledRunnable, DisposableBean {
@Autowired protected CrawlManager crawlManager;
@Autowired protected CrawlService crawlService;
@Autowired protected CollectionService collectionService;
@Autowired protected DatamodelService datamodelService;
@Getter @Setter private boolean active;
@Getter @Setter private String cronExpression;
@Getter @Setter protected boolean debugging;
@Getter @Setter private int timeout;
@Getter private Date nextExecution;
public synchronized void setNextExecution(Date nextExecution) {
this.nextExecution = nextExecution;
}
@Getter private Date lastCompletion;
public synchronized void setLastCompletion(Date lastCompletion) {
this.lastCompletion = lastCompletion;
}
@Override
public void run() {
this.executeCrawling();
this.setLastCompletion(Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()));
}
@Override
public void destroy() throws Exception {
try {
/*pipelineExecutor.shutdown();
// Wait until all threads are finished
while (!pipelineExecutor.isTerminated()) {}*/
} catch (final Exception e) {
log.error("Error closing sync executor", e);
}
}
protected static NextExecution calculateNextExecution(String updatePeriod, DateTime lastStart, DateTime lastEnd) {
if (lastStart==null) {
throw new IllegalArgumentException("No lastStart value provided: NULL");
}
if (lastEnd==null) {
throw new IllegalArgumentException("Next execution calculation not possible if still in progress; no lastEnd provided");
}
if (lastStart.isAfter(lastEnd)) {
throw new IllegalArgumentException("LastStart cannot be after lastEnd");
}
int policyDays = getPolicyAsDays(lastEnd, updatePeriod);
double lastDurationMinutes = Seconds.secondsBetween(lastStart, lastEnd).getSeconds() / 60D;
// durationBasedMinAge = (log15 lastDurationMinutes+15)^4
double durationBasedMinAge = Math.round(Math.pow((Math.log(lastDurationMinutes+15)/Math.log(15)),4));
double durationBasedMaxAge = durationBasedMinAge*5;
// Helper for exceptionally long durations (longer than 670 days, so purely theoretical here)
if (durationBasedMinAge < lastDurationMinutes/1440*2) {
durationBasedMinAge = lastDurationMinutes/1440*2;
durationBasedMaxAge = lastDurationMinutes/1440*2;
log.debug("Resetting durationBasedMinAge/durationBasedMaxAge for long duration ({} days)", lastDurationMinutes/1440);
}
/* durM minAgeD maxAgeD
* 0 1 5
5 1 5
10 2 10
60 6 30
120 11 55
1440 52 260
7200 116 580
*/
CALCULATION_METHODS calculationMethod;
double targetAge;
// Use policy if defined and valid
if (policyDays > durationBasedMinAge) {
targetAge = policyDays;
calculationMethod = CALCULATION_METHODS.POLICY_BASED;
log.debug("Setting target age as policyDays => {} days", targetAge);
}
// Use minimum by duration if policy too low, but specified
else if (policyDays > 0) {
targetAge = durationBasedMinAge;
calculationMethod = CALCULATION_METHODS.DURATION_AND_POLICY_BASED;
log.debug("Setting target age as durationBasedMinAge due to low policy days ({} days) => {} days", policyDays, targetAge);
}
// Use duration values if no policy set
else {
targetAge = (durationBasedMinAge+durationBasedMaxAge)/2;
calculationMethod = CALCULATION_METHODS.DURATION_BASED;
log.debug("No update period specified, setting target age to durationBasedMinAge+durationBasedMaxAge)/2 => {} days", targetAge);
}
DateTime nextExecution = lastEnd.plusDays((int)targetAge);
return new NextExecution(calculationMethod, policyDays, lastDurationMinutes, durationBasedMinAge, durationBasedMaxAge, targetAge, nextExecution);
}
protected static int getPolicyAsDays(DateTime lastEnd, String updatePeriod) {
try {
if (updatePeriod!=null && !updatePeriod.isEmpty()) {
Period p = Period.parse(updatePeriod);
return (int)p.toDurationFrom(lastEnd).getStandardDays();
}
} catch (Exception e) {
log.error("Failed to determine update policy in days", e);
}
return -1;
}
protected abstract void executeCrawling();
}
package eu.dariah.de.search.automation.schedule;
import org.joda.time.DateTime;
import lombok.Data;
@Data
public class NextExecution {
public enum CALCULATION_METHODS { NONE, DURATION_BASED, POLICY_BASED, DURATION_AND_POLICY_BASED }
private final CALCULATION_METHODS calculationMethod;
private final int policyInDays;
private final double lastDurationInMinutes;
private final double durationBasedMinAge;
private final double durationBasedMaxAge;
private final double calculatedTargetAge;
private final DateTime nextExecutionTimestamp;
public boolean isOverdue() {
return nextExecutionTimestamp!=null && this.getReferenceTimestamp().isAfter(nextExecutionTimestamp);
}
public boolean isCalculated() {
return !Double.isNaN(calculatedTargetAge) && nextExecutionTimestamp!=null;
}
protected DateTime getReferenceTimestamp() {
return DateTime.now();
}
}
\ No newline at end of file
package eu.dariah.de.search.automation.crawling;
package eu.dariah.de.search.automation.schedule;
import lombok.Data;
import lombok.EqualsAndHashCode;
......@@ -10,6 +10,7 @@ import lombok.extern.slf4j.Slf4j;
public class OfflineCrawlRunner extends BaseCrawlRunner {
@Override
protected void executeCrawling() {
log.debug("Checking status of available datamodels");
}
}
package eu.dariah.de.search.automation.schedule;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.joda.time.DateTime;
import org.joda.time.Period;
import eu.dariah.de.search.model.Collection;
import eu.dariah.de.search.model.Crawl;
import eu.dariah.de.search.model.Dataset;
import eu.dariah.de.search.model.Endpoint;
import eu.dariah.de.search.service.CrawlService.CrawlCompleteFlag;
import eu.dariah.de.search.service.CrawlService.CrawlErrorFlag;
import eu.dariah.de.search.service.CrawlService.CrawlOnlineFlag;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
@Data
@EqualsAndHashCode(callSuper=false)
@Slf4j
public class OnlineCrawlRunner extends BaseCrawlRunner {
/* To block endpoints from being processed in parallel
* list sizes should always be 1 unless manual crawl is triggered by user */
private Map<String, List<String>> endpointIdCrawlIdMap = new HashMap<>();
@Override
protected void executeCrawling() {
log.debug("Checking status of available datasets");
this.enqueueNewAndOutdatedDatasets();
}
public void enqueueNewAndOutdatedDatasets() {
DateTime syncTimestamp = DateTime.now();
for (Collection c : collectionService.getAll()) {
for (Endpoint ep : c.getEndpoints()) {
for (Dataset ds : ep.getDatasets()) {
this.calculateNextExecution(c, ep, ds);
}
}
}
}
private NextExecution calculateNextExecution(Collection c, Endpoint ep, Dataset ds) {
Crawl lastOnlineCrawl = crawlService.findCrawls(ep.getId(), ds.getId(), CrawlOnlineFlag.Online, CrawlCompleteFlag.Both, CrawlErrorFlag.Both, 1).stream().findFirst().orElse(null);
DateTime lastStart = null;
DateTime lastEnd = null;
if (lastOnlineCrawl!=null) {
lastStart = lastOnlineCrawl.getCreated();
lastEnd = lastOnlineCrawl.getModified();
}
log.debug("Determining next execution for {}", c.getName(null));
return this.calculateNextExecution(c.getUpdatePeriod(), lastStart, lastEnd);
}
public void enqueueNewAndOutdatedDatasetsOld() {
DateTime syncTimestamp = DateTime.now();
List<Crawl> lastOnlineCrawls;
Crawl lastOnlineCrawl;
for (Collection collection : collectionService.getAll()) {
boolean modified = false;
boolean onlineUpdates = (collection.getUpdatePeriod()!=null && !collection.getUpdatePeriod().isEmpty());
for (Endpoint endpoint : collection.getEndpoints()) {
for (Dataset dataset : endpoint.getDatasets()) {
try {
if (dataset.isNew()) {
// New dataset -> crawl online
this.enqueueOnlineCrawlIfAvailable(collection, endpoint, dataset);
continue;
} else if (onlineUpdates && dataset.isOutdated()) {
// Dataset already marked outdated -> crawl online
this.enqueueOnlineCrawlIfAvailable(collection, endpoint, dataset);
continue;
}
// Find last online crawl
lastOnlineCrawls = crawlService.findCrawls(endpoint.getId(), dataset.getId(), CrawlOnlineFlag.Online, CrawlCompleteFlag.Both, CrawlErrorFlag.Both, 1);
if (lastOnlineCrawls==null || lastOnlineCrawls.size()==0) {
// No online crawl available ~ new dataset -> crawl online
this.enqueueOnlineCrawlIfAvailable(collection, endpoint, dataset);
} else if (onlineUpdates) {
lastOnlineCrawl = lastOnlineCrawls.get(0);
// Detect outdated online
if (syncTimestamp.isAfter(lastOnlineCrawl.getModified().plus(new Period(collection.getUpdatePeriod())))) {
dataset.setOutdated(true);
modified = true;
// Dataset outdates as per configuration -> crawl online
this.enqueueOnlineCrawlIfAvailable(collection, endpoint, dataset);
}
}
} catch (Exception e) {
log.error("Failed to process dataset for autocrawling", e);
dataset.setError(true);
modified = true;
}
}
}
if (modified) {
collectionService.saveCollection(collection);
}
}
}
private boolean enqueueOnlineCrawlIfAvailable(Collection collection, Endpoint endpoint, Dataset dataset) {
// Cannot happen as online and offline cannot run in parallel
/*if (outdatedDatamodelIdCrawlIdMap.containsKey(dataset.getId())) {
if (debugging) {
log.debug("Cannot enqueue online crawl due to outdated datamodel [{}] being reprocessed", dataset.getId());
}
return false;
}*/
if (endpointIdCrawlIdMap.containsKey(endpoint.getId())) {
if (debugging) {
log.debug("Cannot enqueue online crawl due to endpoint [{}] already being busy", endpoint.getId());
}
return false;
}
if (debugging) {
log.debug("Enqueued online crawl for endpoint [{}] and dataset [{}]", endpoint.getId(), dataset.getId());
}
crawlManager.performOnlineCrawl(collection, endpoint, datamodelService.findById(dataset.getId()));
return true;
}
}
......@@ -5,6 +5,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executor;
import javax.servlet.http.HttpServletResponse;
import org.joda.time.format.DateTimeFormat;
......@@ -21,7 +23,7 @@ import de.unibamberg.minf.core.web.controller.DataTableList;
import de.unibamberg.minf.core.web.pojo.MessagePojo;
import de.unibamberg.minf.core.web.pojo.ModelActionPojo;
import eu.dariah.de.search.automation.CollectionSyncService;
import eu.dariah.de.search.automation.crawling.OnlineCrawlRunner;
import eu.dariah.de.search.automation.schedule.OnlineCrawlRunner;
import eu.dariah.de.search.pojo.CollectionPojo;
import eu.dariah.de.search.pojo.DatasetPojo;
import eu.dariah.de.search.pojo.DatasourcePojo;
......@@ -39,6 +41,8 @@ public class CollectionController extends BaseController {
@Autowired private CollectionSyncService crSyncService;
@Autowired private OnlineCrawlRunner crawlRunner;
@Autowired private Executor crawlingAutomationTaskExecutor;
public CollectionController() {
super("collections");
}
......@@ -92,7 +96,7 @@ public class CollectionController extends BaseController {
@RequestMapping(method=GET, value={"/async/triggerOnline"})
public @ResponseBody ModelActionPojo triggerOnline(Model model, Locale locale) {
crawlRunner.run();
crawlingAutomationTaskExecutor.execute(crawlRunner);
ModelActionPojo result = new ModelActionPojo();
result.setSuccess(true);
......
......@@ -6,6 +6,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executor;
import javax.servlet.http.HttpServletResponse;
......@@ -25,7 +26,7 @@ import de.unibamberg.minf.core.web.controller.DataTableList;
import de.unibamberg.minf.core.web.pojo.MessagePojo;
import de.unibamberg.minf.core.web.pojo.ModelActionPojo;
import eu.dariah.de.search.automation.DmeSyncService;
import eu.dariah.de.search.automation.crawling.OfflineCrawlRunner;
import eu.dariah.de.search.automation.schedule.OfflineCrawlRunner;
import eu.dariah.de.search.config.MainConfigProperties;
import eu.dariah.de.search.mapping.MappingGenerationService;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
......@@ -43,6 +44,7 @@ public class DatamodelController extends BaseController {
@Autowired private MappingGenerationService mappingGenerationService;
@Autowired private DmeSyncService dmeSyncService;
@Autowired private OfflineCrawlRunner crawlRunner;
@Autowired private Executor crawlingAutomationTaskExecutor;
@Autowired private MainConfigProperties config;
......@@ -123,7 +125,7 @@ public class DatamodelController extends BaseController {
@RequestMapping(method=GET, value={"/async/triggerOffline"})
public @ResponseBody ModelActionPojo triggerOffline(Model model, Locale locale) {
crawlRunner.run();
crawlingAutomationTaskExecutor.execute(crawlRunner);
ModelActionPojo result = new ModelActionPojo();
result.setSuccess(true);
......
......@@ -43,13 +43,13 @@ public class OldRunnableCrawlManagerImpl extends CrawlManagerImpl implements Run
/* To block endpoints from being processed in parallel
* list sizes should always be 1 unless manual crawl is triggered by user */
private Map<String, List<String>> endpointIdCrawlIdMap = new HashMap<String, List<String>>();
//private Map<String, List<String>> endpointIdCrawlIdMap = new HashMap<String, List<String>>();
/* This is to keep track about indexed datamodels e.g. to determine, when outdated recrawl can happen */
private Map<String, List<String>> datamodelIdCrawlIdMap = new HashMap<String, List<String>>();
//private Map<String, List<String>> datamodelIdCrawlIdMap = new HashMap<String, List<String>>();
/* datamodels contained are blocked for further crawling until refresh has completed */
private Map<String, List<String>> outdatedDatamodelIdCrawlIdMap = new HashMap<String, List<String>>();
//private Map<String, List<String>> outdatedDatamodelIdCrawlIdMap = new HashMap<String, List<String>>();
@Getter private boolean cronActive;
......@@ -64,7 +64,7 @@ public class OldRunnableCrawlManagerImpl extends CrawlManagerImpl implements Run
@Getter private boolean maintenanceMode;
public void setCronActive(boolean cronActive) {
/*public void setCronActive(boolean cronActive) {
this.cronActive = cronActive;
if (!debugging) {
if (!this.cronActive && this.autocrawlOnline) {
......@@ -76,7 +76,7 @@ public class OldRunnableCrawlManagerImpl extends CrawlManagerImpl implements Run
log.warn("crawling.automation.offline is configured as true, but no valid cronjob provided; overriding crawling.automation.offline=false");
}
}
}
}*/
@Override
......@@ -86,11 +86,11 @@ public class OldRunnableCrawlManagerImpl extends CrawlManagerImpl implements Run
@Override
public void afterPropertiesSet() throws Exception {
super.afterPropertiesSet();
/*super.afterPropertiesSet();
if (!debugging && this.timeout < 30) {
this.timeout = 30;
log.warn("Crawl manager timeout increased to 30s");
}
}*/
}
......@@ -109,7 +109,7 @@ public class OldRunnableCrawlManagerImpl extends CrawlManagerImpl implements Run
if (autocrawlOnline) {
// Handle new or updated datasets
this.enqueueNewAndOutdatedDatasets();
//this.enqueueNewAndOutdatedDatasets();
}
if (debugging) {
......@@ -133,7 +133,7 @@ public class OldRunnableCrawlManagerImpl extends CrawlManagerImpl implements Run
}
}
public void enqueueNewAndOutdatedDatasets() {
/*public void enqueueNewAndOutdatedDatasets() {
DateTime syncTimestamp = DateTime.now();
List<Crawl> lastOnlineCrawls;
Crawl lastOnlineCrawl;
......@@ -237,7 +237,7 @@ public class OldRunnableCrawlManagerImpl extends CrawlManagerImpl implements Run
this.performOnlineCrawl(collection, endpoint, datamodelService.findById(dataset.getId()));
return true;
}
}*/
private int reindexDatamodel(ExtendedDatamodelContainer datamodel) {
int count = 0;
......@@ -332,8 +332,8 @@ public class OldRunnableCrawlManagerImpl extends CrawlManagerImpl implements Run
this.crawlIdServiceIdMap.put(crawl.getId(), pipeline.getUuid());
this.serviceIdServiceMap.put(pipeline.getUuid(), pipeline);
this.addCrawlIdMapping(this.endpointIdCrawlIdMap, crawl.getEndpointId(), crawl.getId());
this.addCrawlIdMapping(this.datamodelIdCrawlIdMap, crawl.getDatamodelId(), crawl.getId());
//this.addCrawlIdMapping(this.endpointIdCrawlIdMap, crawl.getEndpointId(), crawl.getId());
//this.addCrawlIdMapping(this.datamodelIdCrawlIdMap, crawl.getDatamodelId(), crawl.getId());
pipelineExecutor.execute(pipeline);
} catch (Exception e) {
......@@ -344,6 +344,8 @@ public class OldRunnableCrawlManagerImpl extends CrawlManagerImpl implements Run
}
}
private void addCrawlIdMapping(Map<String, List<String>> crawlIdMap, String key, String crawlId) {
List<String> crawlIds;
if (crawlIdMap.containsKey(key)) {
......@@ -426,8 +428,8 @@ public class OldRunnableCrawlManagerImpl extends CrawlManagerImpl implements Run
try {
statusMapslock.lock();
this.removeCrawlId(this.datamodelIdCrawlIdMap, c.getDatamodelId(), c.getId());;
this.removeCrawlId(this.endpointIdCrawlIdMap, c.getEndpointId(), c.getId());
//this.removeCrawlId(this.datamodelIdCrawlIdMap, c.getDatamodelId(), c.getId());;
//this.removeCrawlId(this.endpointIdCrawlIdMap, c.getEndpointId(), c.getId());
} catch (Exception e) {