Commit d89b2a9d authored by Gradl, Tobias's avatar Gradl, Tobias
Browse files

446: Reimplement automatic online and offline crawl capabilities

(OPENED)

Task-Url: #446
parent 7837e5b4
Pipeline #31420 passed with stage
in 37 seconds
......@@ -9,6 +9,7 @@ import org.joda.time.Days;
import org.joda.time.Period;
import org.joda.time.Seconds;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dariah.de.search.automation.schedule.NextExecution.CALCULATION_METHODS;
......@@ -23,7 +24,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@Data
public abstract class BaseCrawlRunner implements ScheduledRunnable, DisposableBean {
public abstract class BaseCrawlRunner implements ScheduledRunnable, DisposableBean, InitializingBean {
@Autowired protected CrawlManager crawlManager;
@Autowired protected CrawlService crawlService;
......@@ -46,6 +47,10 @@ public abstract class BaseCrawlRunner implements ScheduledRunnable, DisposableBe
this.lastCompletion = lastCompletion;
}
@Override
public void afterPropertiesSet() throws Exception {
this.init();
}
@Override
public void run() {
......@@ -137,5 +142,6 @@ public abstract class BaseCrawlRunner implements ScheduledRunnable, DisposableBe
return -1;
}
protected abstract void init();
protected abstract void executeCrawling();
}
......@@ -2,11 +2,13 @@ package eu.dariah.de.search.automation.schedule;
import org.joda.time.DateTime;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class NextExecution {
public enum CALCULATION_METHODS { NONE, DURATION_BASED, POLICY_BASED, DURATION_AND_POLICY_BASED }
public enum CALCULATION_METHODS { STATIC, DURATION_BASED, POLICY_BASED, DURATION_AND_POLICY_BASED }
private final CALCULATION_METHODS calculationMethod;
private final int policyInDays;
......@@ -16,6 +18,18 @@ public class NextExecution {
private final double calculatedTargetAge;
private final DateTime nextExecutionTimestamp;
public NextExecution(DateTime nextExecutionTimestamp) {
this.nextExecutionTimestamp = nextExecutionTimestamp;
this.calculationMethod = CALCULATION_METHODS.STATIC;
this.policyInDays = -1;
this.lastDurationInMinutes = Double.NaN;
this.durationBasedMinAge = Double.NaN;
this.durationBasedMaxAge = Double.NaN;
this.calculatedTargetAge = Double.NaN;
}
public boolean isOverdue() {
return nextExecutionTimestamp!=null && this.getReferenceTimestamp().isAfter(nextExecutionTimestamp);
}
......
......@@ -10,7 +10,11 @@ import lombok.extern.slf4j.Slf4j;
public class OfflineCrawlRunner extends BaseCrawlRunner {
@Override
protected void executeCrawling() {
log.debug("Checking status of available datamodels");
log.debug("Checking status of available datamodels");
}
@Override
public void init() {
log.debug("Initializing automatic offline crawling capabilities");
}
}
......@@ -27,93 +27,75 @@ public class OnlineCrawlRunner extends BaseCrawlRunner {
* list sizes should always be 1 unless manual crawl is triggered by user */
private Map<String, List<String>> endpointIdCrawlIdMap = new HashMap<>();
@Override
public void init() {
log.debug("Initializing automatic online crawling capabilities");
this.checkAndCrawlOnline(false);
}
@Override
protected void executeCrawling() {
log.debug("Checking status of available datasets");
this.enqueueNewAndOutdatedDatasets();
this.checkAndCrawlOnline(true);
}
public void enqueueNewAndOutdatedDatasets() {
DateTime syncTimestamp = DateTime.now();
public void checkAndCrawlOnline(boolean enqueueExpired) {
for (Collection c : collectionService.getAll()) {
boolean savec = false;
for (Endpoint ep : c.getEndpoints()) {
for (Dataset ds : ep.getDatasets()) {
this.calculateNextExecution(c, ep, ds);
savec = this.updateNextExecutionIfChanged(c, ep, ds) || savec;
if (ds.isOutdated()) {
this.enqueue(c, ep, ds);
}
}
}
if(savec) {
collectionService.saveCollection(c);
}
}
}
private NextExecution calculateNextExecution(Collection c, Endpoint ep, Dataset ds) {
private boolean updateNextExecutionIfChanged(Collection c, Endpoint ep, Dataset ds) {
Crawl lastOnlineCrawl = crawlService.findCrawls(ep.getId(), ds.getId(), CrawlOnlineFlag.Online, CrawlCompleteFlag.Both, CrawlErrorFlag.Both, 1).stream().findFirst().orElse(null);
DateTime lastStart = null;
DateTime lastEnd = null;
if (lastOnlineCrawl!=null) {
lastStart = lastOnlineCrawl.getCreated();
lastEnd = lastOnlineCrawl.getModified();
NextExecution ne = this.calculateNextExecution(c, ep, ds, lastOnlineCrawl);
boolean updated = false;
if (ds.getNextExecution()==null || !ds.getNextExecution().isEqual(ne.getNextExecutionTimestamp())) {
ds.setNextExecution(ne.getNextExecutionTimestamp());
log.debug("Updating next execution for {} [{}] => {}", c.getName(null), ds.getId(), ds.getNextExecution().toString());
updated = true;
}
log.debug("Determining next execution for {}", c.getName(null));
return this.calculateNextExecution(c.getUpdatePeriod(), lastStart, lastEnd);
}
public void enqueueNewAndOutdatedDatasetsOld() {
DateTime syncTimestamp = DateTime.now();
List<Crawl> lastOnlineCrawls;
Crawl lastOnlineCrawl;
for (Collection collection : collectionService.getAll()) {
boolean modified = false;
boolean onlineUpdates = (collection.getUpdatePeriod()!=null && !collection.getUpdatePeriod().isEmpty());
for (Endpoint endpoint : collection.getEndpoints()) {
for (Dataset dataset : endpoint.getDatasets()) {
try {
if (dataset.isNew()) {
// New dataset -> crawl online
this.enqueueOnlineCrawlIfAvailable(collection, endpoint, dataset);
continue;
} else if (onlineUpdates && dataset.isOutdated()) {
// Dataset already marked outdated -> crawl online
this.enqueueOnlineCrawlIfAvailable(collection, endpoint, dataset);
continue;
}
// Find last online crawl
lastOnlineCrawls = crawlService.findCrawls(endpoint.getId(), dataset.getId(), CrawlOnlineFlag.Online, CrawlCompleteFlag.Both, CrawlErrorFlag.Both, 1);
if (lastOnlineCrawls==null || lastOnlineCrawls.size()==0) {
// No online crawl available ~ new dataset -> crawl online
this.enqueueOnlineCrawlIfAvailable(collection, endpoint, dataset);
} else if (onlineUpdates) {
lastOnlineCrawl = lastOnlineCrawls.get(0);
// Detect outdated online
if (syncTimestamp.isAfter(lastOnlineCrawl.getModified().plus(new Period(collection.getUpdatePeriod())))) {
dataset.setOutdated(true);
modified = true;
// Dataset outdates as per configuration -> crawl online
this.enqueueOnlineCrawlIfAvailable(collection, endpoint, dataset);
}
}
} catch (Exception e) {
log.error("Failed to process dataset for autocrawling", e);
dataset.setError(true);
modified = true;
}
}
if (lastOnlineCrawl==null) {
if (!ds.isNew()) {
ds.setNew(true);
updated = true;
}
if (modified) {
collectionService.saveCollection(collection);
if (ds.isOutdated()) {
ds.setOutdated(false);
updated = true;
}
} else if (ds.isNew()) {
ds.setNew(false);
updated = true;
}
if (ne.getNextExecutionTimestamp().isEqualNow() || ne.getNextExecutionTimestamp().isBeforeNow() && !ds.isOutdated() && !ds.isNew()) {
ds.setOutdated(true);
updated = true;
}
return updated;
}
private NextExecution calculateNextExecution(Collection c, Endpoint ep, Dataset ds, Crawl lastOnlineCrawl) {
if (lastOnlineCrawl==null || lastOnlineCrawl.getCreated()==null || lastOnlineCrawl.getModified()==null) {
return new NextExecution(DateTime.now());
}
return BaseCrawlRunner.calculateNextExecution(c.getUpdatePeriod(), lastOnlineCrawl.getCreated(), lastOnlineCrawl.getModified());
}
private boolean enqueueOnlineCrawlIfAvailable(Collection collection, Endpoint endpoint, Dataset dataset) {
private boolean enqueue(Collection collection, Endpoint endpoint, Dataset dataset) {
// Cannot happen as online and offline cannot run in parallel
/*if (outdatedDatamodelIdCrawlIdMap.containsKey(dataset.getId())) {
......@@ -135,6 +117,5 @@ public class OnlineCrawlRunner extends BaseCrawlRunner {
crawlManager.performOnlineCrawl(collection, endpoint, datamodelService.findById(dataset.getId()));
return true;
}
}
}
package eu.dariah.de.search.model;
import org.joda.time.DateTime;
import de.unibamberg.minf.dme.model.base.BaseIdentifiable;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
/**
* The ID of the dataset matches the ID of the bound datamodel
* @author tobias
*
*/
@Data
@NoArgsConstructor
@EqualsAndHashCode(callSuper=false)
public class Dataset extends BaseIdentifiable {
private static final long serialVersionUID = -3289136791255603713L;
......@@ -15,23 +23,8 @@ public class Dataset extends BaseIdentifiable {
private boolean error;
private boolean deleted;
private boolean outdated;
private DateTime nextExecution;
public String getRemoteAlias() { return remoteAlias; }
public void setRemoteAlias(String remoteAlias) { this.remoteAlias = remoteAlias; }
public boolean isNew() { return isNew; }
public void setNew(boolean isNew) { this.isNew = isNew; }
public boolean isDeleted() { return deleted; }
public void setDeleted(boolean deleted) { this.deleted = deleted; }
public boolean isOutdated() { return outdated; }
public void setOutdated(boolean outdated) { this.outdated = outdated; }
public boolean isError() { return error; }
public void setError(boolean error) { this.error = error; }
public Dataset() {}
public Dataset(String datamodelId) {
this.setId(datamodelId);
......
......@@ -21,6 +21,7 @@ public class DatasetPojo extends BaseIdentifiable {
private boolean errorLock;
private boolean processing;
private boolean waiting;
private boolean outdated;
public String getDatamodelName() {
if (this.getDatamodelPojo()==null || this.getDatamodelPojo().getName()==null) {
......
......@@ -40,6 +40,7 @@ public class DatasetConverter extends BaseConverter<Dataset, DatasetPojo> {
pojo.setNew(dataset.isNew());
pojo.setRemoteAlias(dataset.getRemoteAlias());
pojo.setErrorLock(dataset.isError());
pojo.setOutdated(dataset.isOutdated());
// State
if (endpointId!=null) {
......@@ -62,6 +63,13 @@ public class DatasetConverter extends BaseConverter<Dataset, DatasetPojo> {
pojo.setLastCrawlCompleted("-");
}
}
if (dataset.getNextExecution()!=null) {
pojo.setNextCrawlPlanned(pojoFormatDateTime(dataset.getNextExecution(), locale));
} else {
pojo.setNextCrawlPlanned("-");
}
ExtendedDatamodelContainer edm = datamodelService.findById(dataset.getId());
if (edm!=null) {
pojo.setDatamodelPojo(datamodelConverter.convert(edm, locale));
......
......@@ -77,6 +77,33 @@ class BaseCrawlRunnerTest {
assertNextExecutionData(ne, dtRef, false, true, 1462, 1462, new DateTime(dtEnd).plusDays(1462));
}
@Test
void testCalculateShortConfiguredPeriodExecution() {
DateTime dtEnd = new DateTime(2021, 1, 1, 0, 0);
DateTime dtRef = new DateTime(2022, 2, 1, 0, 0);
// Always use minimum calculated duration if policy is same or lower
NextExecution ne = BaseCrawlRunner.calculateNextExecution("P1D", dtEnd.minusSeconds(3), dtEnd);
assertNextExecutionData(ne, dtRef, true, true, 1, 5, new DateTime(dtEnd).plusDays(1));
ne = BaseCrawlRunner.calculateNextExecution("P1D", dtEnd.minusMinutes(10), dtEnd);
assertNextExecutionData(ne, dtRef, true, true, 2, 10, new DateTime(dtEnd).plusDays(2));
ne = BaseCrawlRunner.calculateNextExecution("P1D", dtEnd.minusHours(1), dtEnd);
assertNextExecutionData(ne, dtRef, true, true, 6, 30, new DateTime(dtEnd).plusDays(6));
ne = BaseCrawlRunner.calculateNextExecution("P1D", dtEnd.minusHours(2), dtEnd);
assertNextExecutionData(ne, dtRef, true, true, 11, 55, new DateTime(dtEnd).plusDays(11));
ne = BaseCrawlRunner.calculateNextExecution("P1D", dtEnd.minusDays(2), dtEnd);
assertNextExecutionData(ne, dtRef, true, true, 75, 375, new DateTime(dtEnd).plusDays(75));
ne = BaseCrawlRunner.calculateNextExecution("P1D", dtEnd.minusMonths(2), dtEnd);
assertNextExecutionData(ne, dtRef, true, true, 312, 1560, new DateTime(dtEnd).plusDays(312));
ne = BaseCrawlRunner.calculateNextExecution("P1D", dtEnd.minusYears(2), dtEnd);
assertNextExecutionData(ne, dtRef, false, true, 1462, 1462, new DateTime(dtEnd).plusDays(1462));
}
@Test
void testCalculateWithoutPeriodExecution() {
......
Subproject commit a4ee920a187c6c244eede903a5120fdcf1804f5c
Subproject commit 1089c7baa585779e498bf8d376622859652e7808
Subproject commit 2f8376d53c4c62e867e0ee5f188ef2bb205f5422
Subproject commit 5ebec7d6dc1bfeab40b3271217c55421225bad77
Subproject commit 86e86c582fbcdfd87982a322a1dd2a9d6a97a915
Subproject commit b6183a95a05b531297838fb58d7ab3355f865729
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment