Commit d37abc28 authored by Gradl, Tobias's avatar Gradl, Tobias
Browse files

446: Reimplement automatic online crawl capabilities (OPENED)

Task-Url: #446
parent d89b2a9d
Pipeline #31545 passed with stage
in 36 seconds
......@@ -28,7 +28,7 @@ allprojects {
ext {
coreVersion = "6.5.4-SNAPSHOT"
gtfVersion = "2.3.10-SNAPSHOT"
processingVersion = "4.3.14-SNAPSHOT"
processingVersion = "4.3.15-SNAPSHOT"
colregModelVersion = "4.5.11-SNAPSHOT"
dariahSpVersion = "2.1.7-RELEASE"
......
......@@ -32,7 +32,7 @@ public abstract class BaseCrawlRunner implements ScheduledRunnable, DisposableBe
@Autowired protected DatamodelService datamodelService;
@Getter @Setter private boolean active;
@Getter @Setter private boolean automationEnabled;
@Getter @Setter private String cronExpression;
@Getter @Setter protected boolean debugging;
@Getter @Setter private int timeout;
......@@ -54,7 +54,7 @@ public abstract class BaseCrawlRunner implements ScheduledRunnable, DisposableBe
@Override
public void run() {
this.executeCrawling();
this.executeAutomation();
this.setLastCompletion(Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()));
}
......@@ -143,5 +143,5 @@ public abstract class BaseCrawlRunner implements ScheduledRunnable, DisposableBe
}
protected abstract void init();
protected abstract void executeCrawling();
protected abstract void executeAutomation();
}
......@@ -10,7 +10,6 @@ import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.support.CronExpression;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
......@@ -19,12 +18,10 @@ public class CronTrigger implements Trigger {
private final CronExpression cronExpression;
@Getter private final ScheduledRunnable runnable;
@Getter private final boolean active;
public CronTrigger(String triggerName, String cronExpression, ScheduledRunnable runnable) {
this.triggerName = triggerName;
this.cronExpression = this.parseCronExpression(cronExpression);
this.active = this.cronExpression!=null;
this.runnable = runnable;
this.setRunnableDates(getNextExecution(), null);
}
......
......@@ -9,7 +9,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class OfflineCrawlRunner extends BaseCrawlRunner {
@Override
protected void executeCrawling() {
protected void executeAutomation() {
log.debug("Checking status of available datamodels");
}
......
package eu.dariah.de.search.automation.schedule;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.joda.time.DateTime;
import org.joda.time.Period;
import eu.dariah.de.search.model.Collection;
import eu.dariah.de.search.model.Crawl;
......@@ -16,6 +11,8 @@ import eu.dariah.de.search.service.CrawlService.CrawlErrorFlag;
import eu.dariah.de.search.service.CrawlService.CrawlOnlineFlag;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Data
......@@ -23,9 +20,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class OnlineCrawlRunner extends BaseCrawlRunner {
/* To block endpoints from being processed in parallel
* list sizes should always be 1 unless manual crawl is triggered by user */
private Map<String, List<String>> endpointIdCrawlIdMap = new HashMap<>();
@Getter @Setter private boolean collectionsExpire;
@Override
public void init() {
......@@ -34,9 +29,9 @@ public class OnlineCrawlRunner extends BaseCrawlRunner {
}
@Override
protected void executeCrawling() {
protected void executeAutomation() {
log.debug("Checking status of available datasets");
this.checkAndCrawlOnline(true);
this.checkAndCrawlOnline(this.isAutomationEnabled());
}
public void checkAndCrawlOnline(boolean enqueueExpired) {
......@@ -45,8 +40,8 @@ public class OnlineCrawlRunner extends BaseCrawlRunner {
for (Endpoint ep : c.getEndpoints()) {
for (Dataset ds : ep.getDatasets()) {
savec = this.updateNextExecutionIfChanged(c, ep, ds) || savec;
if (ds.isOutdated()) {
this.enqueue(c, ep, ds);
if (enqueueExpired && ds.isOutdated()) {
crawlManager.performOnlineCrawl(c, ep, datamodelService.findById(ds.getId()));
}
}
}
......@@ -72,7 +67,7 @@ public class OnlineCrawlRunner extends BaseCrawlRunner {
ds.setNew(true);
updated = true;
}
if (ds.isOutdated()) {
if (this.isCollectionsExpire() && ds.isOutdated()) {
ds.setOutdated(false);
updated = true;
}
......@@ -81,9 +76,12 @@ public class OnlineCrawlRunner extends BaseCrawlRunner {
updated = true;
}
if (ne.getNextExecutionTimestamp().isEqualNow() || ne.getNextExecutionTimestamp().isBeforeNow() && !ds.isOutdated() && !ds.isNew()) {
if (this.isCollectionsExpire() && (ne.getNextExecutionTimestamp().isEqualNow() || ne.getNextExecutionTimestamp().isBeforeNow() && !ds.isOutdated() && !ds.isNew())) {
ds.setOutdated(true);
updated = true;
} else if (ne.getNextExecutionTimestamp().isAfterNow() && ds.isOutdated()) {
ds.setOutdated(false);
updated = true;
}
return updated;
}
......@@ -94,28 +92,4 @@ public class OnlineCrawlRunner extends BaseCrawlRunner {
}
return BaseCrawlRunner.calculateNextExecution(c.getUpdatePeriod(), lastOnlineCrawl.getCreated(), lastOnlineCrawl.getModified());
}
private boolean enqueue(Collection collection, Endpoint endpoint, Dataset dataset) {
// Cannot happen as online and offline cannot run in parallel
/*if (outdatedDatamodelIdCrawlIdMap.containsKey(dataset.getId())) {
if (debugging) {
log.debug("Cannot enqueue online crawl due to outdated datamodel [{}] being reprocessed", dataset.getId());
}
return false;
}*/
if (endpointIdCrawlIdMap.containsKey(endpoint.getId())) {
if (debugging) {
log.debug("Cannot enqueue online crawl due to endpoint [{}] already being busy", endpoint.getId());
}
return false;
}
if (debugging) {
log.debug("Enqueued online crawl for endpoint [{}] and dataset [{}]", endpoint.getId(), dataset.getId());
}
crawlManager.performOnlineCrawl(collection, endpoint, datamodelService.findById(dataset.getId()));
return true;
}
}
......@@ -6,10 +6,14 @@ import lombok.extern.slf4j.Slf4j;
@Data
@Slf4j
public class CrawlingAutomationConfigProperties {
private String cronSchedule;
private String cronSchedule = "0 0 4 * * *";
private String cronScheduleOnline;
private String cronScheduleOffline;
private boolean autocrawlCollections;
private boolean autorefreshIndices;
private boolean collectionsExpire;
public void checkSchedules() {
if (cronSchedule!=null) {
if (cronScheduleOnline==null) {
......
......@@ -6,9 +6,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.stream.IntStream;
import javax.servlet.http.HttpServletResponse;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
......@@ -29,6 +31,7 @@ import eu.dariah.de.search.pojo.DatasetPojo;
import eu.dariah.de.search.pojo.DatasourcePojo;
import eu.dariah.de.search.pojo.EndpointPojo;
import eu.dariah.de.search.pojo.conversion.CollectionConverter;
import eu.dariah.de.search.pojo.conversion.base.BaseConverter;
import eu.dariah.de.search.query.execution.AggregationService;
import eu.dariah.de.search.service.CollectionService;
......@@ -57,10 +60,12 @@ public class CollectionController extends BaseController {
public String listDatasources(Model model, Locale locale) {
model.addAttribute("colregUrl", apiConfig.getColreg().getBaseUrl());
model.addAttribute("autoSyncCr", crSyncService.isAutosync());
model.addAttribute("autoOnline", crawlRunner.isActive());
model.addAttribute("autoOnline", crawlRunner.isAutomationEnabled());
model.addAttribute("cronExpression", crawlRunner.getCronExpression());
model.addAttribute("nextExecution", crawlRunner.getNextExecution());
model.addAttribute("lastCompletion", crawlRunner.getLastCompletion());
model.addAttribute("collectionsExpire", crawlRunner.isCollectionsExpire());
model.addAttribute("nextExecution", crawlRunner.getNextExecution()==null ? "-" : BaseConverter.pojoFormatDateTime(new DateTime(crawlRunner.getNextExecution()), locale));
model.addAttribute("lastCompletion", crawlRunner.getLastCompletion()==null ? "-" : BaseConverter.pojoFormatDateTime(new DateTime(crawlRunner.getLastCompletion()), locale));
return "collections/list";
}
......@@ -96,7 +101,7 @@ public class CollectionController extends BaseController {
@RequestMapping(method=GET, value={"/async/triggerOnline"})
public @ResponseBody ModelActionPojo triggerOnline(Model model, Locale locale) {
crawlingAutomationTaskExecutor.execute(crawlRunner);
crawlingAutomationTaskExecutor.execute(() -> crawlRunner.checkAndCrawlOnline(true));
ModelActionPojo result = new ModelActionPojo();
result.setSuccess(true);
......
......@@ -62,7 +62,7 @@ public class DatamodelController extends BaseController {
public String listDatamodels(Model model, Locale locale) {
model.addAttribute("dmeUrl", apiConfig.getDme().getBaseUrl());
model.addAttribute("autoSyncDme", dmeSyncService.isAutosync());
model.addAttribute("autoOffline", crawlRunner.isActive());
model.addAttribute("autoOffline", crawlRunner.isAutomationEnabled());
model.addAttribute("cronExpression", crawlRunner.getCronExpression());
model.addAttribute("nextExecution", crawlRunner.getNextExecution());
model.addAttribute("lastCompletion", crawlRunner.getLastCompletion());
......
......@@ -39,8 +39,11 @@ import eu.dariah.de.search.model.Dataset;
import eu.dariah.de.search.model.Collection;
import eu.dariah.de.search.model.Endpoint;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
import eu.dariah.de.search.pojo.DatamodelPojo;
import eu.dariah.de.search.query.execution.AggregationService;
import eu.dariah.de.search.service.CollectionService;
import eu.dariah.de.search.service.CrawlService;
import eu.dariah.de.search.service.DatamodelService;
import eu.dariah.de.search.service.ResourceIndexingServiceImpl;
public class CrawlManagerImpl implements CrawlManager, ApplicationContextAware, InitializingBean {
......@@ -48,8 +51,10 @@ public class CrawlManagerImpl implements CrawlManager, ApplicationContextAware,
@Autowired protected CrawlService crawlService;
@Autowired protected CollectionService collectionService;
@Autowired protected DatamodelService datamodelService;
@Autowired private ObjectMapper objectMapper;
@Autowired private AggregationService aggregationService;
private String baseDownloadPath;
......@@ -335,8 +340,13 @@ public class CrawlManagerImpl implements CrawlManager, ApplicationContextAware,
c.setError(true);
c.setComplete(false);
} else if (state==ProcessingServiceStates.COMPLETE) {
c.setError(false);
c.setComplete(true);
try {
c.setError(aggregationService.getDocumentCount(datamodelService.findById(c.getDatamodelId()).getIndexName(), c.getEndpointId()) == 0);
} catch (Exception e) {
logger.error("Failed to get document count for endpoint", e);
c.setError(true);
}
} else {
c.setError(false);
c.setComplete(false);
......
......@@ -16,7 +16,6 @@ import de.unibamberg.minf.processing.exception.ProcessingConfigException;
import de.unibamberg.minf.processing.listener.ProcessingListener;
import eu.dariah.de.search.crawling.crawler.Crawler;
import net.bytebuddy.asm.Advice.This;
public class CrawlPipelineImpl implements CrawlPipeline {
protected static final Logger logger = LoggerFactory.getLogger(CrawlPipelineImpl.class);
......@@ -239,7 +238,7 @@ public class CrawlPipelineImpl implements CrawlPipeline {
this.currentRunnableId = null;
if (!this.isCancellationRequested()) {
logger.info("Pipeline completed.");
logger.info("Pipeline completed.");
this.setState(ProcessingServiceStates.COMPLETE);
} else {
logger.info("Pipeline cancelled.");
......
......@@ -44,7 +44,7 @@ public class DatasetConverter extends BaseConverter<Dataset, DatasetPojo> {
// State
if (endpointId!=null) {
List<Crawl> latestCrawls = crawlService.findCrawls(endpointId, dataset.getId(), CrawlOnlineFlag.Online, CrawlCompleteFlag.Complete, CrawlErrorFlag.NoError, 1);
List<Crawl> latestCrawls = crawlService.findCrawls(endpointId, dataset.getId(), CrawlOnlineFlag.Online, CrawlCompleteFlag.Both, CrawlErrorFlag.Both, 1);
if (!latestCrawls.isEmpty()) {
CrawlPojo cPojo = crawlConverter.convert(latestCrawls.get(0), locale);
pojo.setProcessing(cPojo.isBusy());
......@@ -65,7 +65,7 @@ public class DatasetConverter extends BaseConverter<Dataset, DatasetPojo> {
}
if (dataset.getNextExecution()!=null) {
pojo.setNextCrawlPlanned(pojoFormatDateTime(dataset.getNextExecution(), locale));
pojo.setNextCrawlPlanned(pojoFormatDate(dataset.getNextExecution(), locale));
} else {
pojo.setNextCrawlPlanned("-");
}
......
......@@ -15,6 +15,8 @@ import eu.dariah.de.search.config.ApiConfigProperties;
public abstract class BaseConverter<T, TPojo> implements Converter<T, TPojo> {
private final static DateTimeFormatter POJO_DATETIME_FORMATTER = DateTimeFormat.forStyle("MM");
private final static DateTimeFormatter POJO_DATE_FORMATTER = DateTimeFormat.forStyle("M-");
@Autowired protected MessageSource messageSource;
@Autowired protected ApiConfigProperties apiConfig;
......@@ -41,7 +43,11 @@ public abstract class BaseConverter<T, TPojo> implements Converter<T, TPojo> {
return result;
}
protected static String pojoFormatDateTime(DateTime dateTime, Locale locale) {
public static String pojoFormatDateTime(DateTime dateTime, Locale locale) {
return dateTime.toString(POJO_DATETIME_FORMATTER.withLocale(locale));
}
public static String pojoFormatDate(DateTime dateTime, Locale locale) {
return dateTime.toString(POJO_DATE_FORMATTER.withLocale(locale));
}
}
......@@ -58,7 +58,6 @@ public class CrawlingConfig extends CrawlingConfigProperties implements Scheduli
this.setAutomation(new CrawlingAutomationConfigProperties());
}
this.getAutomation().checkSchedules();
}
@Override
......@@ -69,17 +68,15 @@ public class CrawlingConfig extends CrawlingConfigProperties implements Scheduli
private void configureRunnerTask(ScheduledTaskRegistrar taskRegistrar, BaseCrawlRunner runner, String cronExpression) {
if (cronExpression==null) {
return;
}
CronTrigger runnerTrigger = new CronTrigger(runner.getClass().getSimpleName(), cronExpression, runner);
taskRegistrar.setScheduler(crawlingAutomationTaskExecutor());
taskRegistrar.addTriggerTask(runner, runnerTrigger);
if (runnerTrigger.isActive()) {
runner.setActive(true);
runner.setCronExpression(cronExpression);
} else {
runner.setActive(false);
}
runner.setCronExpression(cronExpression);
}
@Bean
......@@ -91,6 +88,7 @@ public class CrawlingConfig extends CrawlingConfigProperties implements Scheduli
@Bean
public OfflineCrawlRunner offlineCrawlRunner() {
OfflineCrawlRunner runner = new OfflineCrawlRunner();
runner.setAutomationEnabled(this.getAutomation().isAutorefreshIndices());
return runner;
}
......@@ -98,6 +96,8 @@ public class CrawlingConfig extends CrawlingConfigProperties implements Scheduli
@Bean
public OnlineCrawlRunner onlineCrawlRunner() {
OnlineCrawlRunner runner = new OnlineCrawlRunner();
runner.setAutomationEnabled(this.getAutomation().isAutocrawlCollections());
runner.setCollectionsExpire(this.getAutomation().isCollectionsExpire());
return runner;
}
......
Subproject commit 1089c7baa585779e498bf8d376622859652e7808
Subproject commit ccb60523653c23dc8eea77159153e0121535df33
Subproject commit 5ebec7d6dc1bfeab40b3271217c55421225bad77
Subproject commit de83453c7c200fb3ef07349177915ef93d35f32b
Subproject commit b6183a95a05b531297838fb58d7ab3355f865729
Subproject commit b90eeb28277b5a30c92910f0676cf98c7d56ca51
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment