Commit 87d2423c authored by Gradl, Tobias's avatar Gradl, Tobias
Browse files

447: Implement configurable scheduler (OPENED)

Task-Url: #447
parent 8bed7ef9
Pipeline #31221 passed with stage
in 44 seconds
......@@ -7,4 +7,5 @@ public class CrawlingAutomationConfigProperties {
private boolean online = false;
private boolean offline = false;
private int syncInterval = 300; // every 5 minutes
private String cronSchedule;
}
......@@ -11,9 +11,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
import org.joda.time.DateTime;
import org.joda.time.Days;
import org.joda.time.Minutes;
import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert;
......@@ -29,13 +29,13 @@ import eu.dariah.de.search.model.Dataset;
import eu.dariah.de.search.model.Endpoint;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
import eu.dariah.de.search.service.DatamodelService;
import lombok.extern.slf4j.Slf4j;
import eu.dariah.de.search.service.CrawlService.CrawlCompleteFlag;
import eu.dariah.de.search.service.CrawlService.CrawlErrorFlag;
import eu.dariah.de.search.service.CrawlService.CrawlOnlineFlag;
@Slf4j
public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCrawlManager {
protected final Logger logger = LoggerFactory.getLogger(TimedCrawlManagerImpl.class);
@Autowired private DatamodelService datamodelService;
@Autowired private AdminService adminService;
......@@ -47,8 +47,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
private DateTime lastSyncTimestamp;
private boolean maintenanceMode;
/* To block endpoints from being processed in parallel
* list sizes should always be 1 unless manual crawl is triggered by user */
private Map<String, List<String>> endpointIdCrawlIdMap = new HashMap<String, List<String>>();
......@@ -93,18 +92,18 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
public void afterPropertiesSet() throws Exception {
super.afterPropertiesSet();
if (!this.autocrawlOffline && !this.autocrawlOnline) {
logger.info("CrawlManager initialized without reocurring synchronization due to configuration");
log.info("CrawlManager initialized without reocurring synchronization due to configuration");
return;
}
if (this.autocrawlOnline || this.autocrawlOffline) {
if (!debugging && this.syncInterval < 60) {
this.syncInterval = 60;
logger.warn("Sync interval increased to 60s");
log.warn("Sync interval increased to 60s");
}
if (!debugging && this.timeout < 30) {
this.timeout = 30;
logger.warn("Crawl timeout increased to 30s (which is probably still too short)");
log.warn("Crawl timeout increased to 30s (which is probably still too short)");
}
this.syncTimer = new Timer();
this.setupSyncTimer();
......@@ -116,7 +115,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
Stopwatch sw = new Stopwatch().start();
if (debugging) {
logger.debug("Executing synchronization");
log.debug("Executing synchronization");
}
try {
statusMapslock.lock();
......@@ -131,10 +130,10 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
}
if (debugging) {
logger.debug("Synchronization completed in {}ms", sw.getElapsedTime());
log.debug("Synchronization completed in {}ms", sw.getElapsedTime());
}
} catch (Exception e) {
logger.error("Failed to execute crawl synchronization", e);
log.error("Failed to execute crawl synchronization", e);
} finally {
statusMapslock.unlock();
}
......@@ -156,6 +155,41 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
List<Crawl> lastOnlineCrawls;
Crawl lastOnlineCrawl;
// TODO Also check last online crawl success and duration
for (Collection collection : collectionService.getAll()) {
boolean modified = false;
boolean hasUpdatePolicy = (collection.getUpdatePeriod()!=null && !collection.getUpdatePeriod().isEmpty());
for (Endpoint endpoint : collection.getEndpoints()) {
for (Dataset dataset : endpoint.getDatasets()) {
// Find last online crawl
lastOnlineCrawls = crawlService.findCrawls(endpoint.getId(), dataset.getId(), CrawlOnlineFlag.Online, CrawlCompleteFlag.Both, CrawlErrorFlag.Both, 1);
if (lastOnlineCrawls!=null && !lastOnlineCrawls.isEmpty()) {
lastOnlineCrawl = lastOnlineCrawls.get(0);
} else {
lastOnlineCrawl = null;
}
Days d = Days.daysBetween(lastOnlineCrawl.getModified(), syncTimestamp);
Minutes m = Minutes.minutesBetween(lastOnlineCrawl.getCreated(), lastOnlineCrawl.getModified());
log.debug("{}: hasUpdatePolicy: {}, lastOnlineSucceeded: {}, duration: {}, age: {}", collection.getName(null), hasUpdatePolicy, lastOnlineCrawl!=null && lastOnlineCrawl.isComplete() && !lastOnlineCrawl.isError(), m.getMinutes(), d.getDays());
}
}
}
}
public void enqueueNewAndOutdatedDatasetsOld() {
DateTime syncTimestamp = DateTime.now();
List<Crawl> lastOnlineCrawls;
Crawl lastOnlineCrawl;
for (Collection collection : collectionService.getAll()) {
boolean modified = false;
boolean onlineUpdates = (collection.getUpdatePeriod()!=null && !collection.getUpdatePeriod().isEmpty());
......@@ -188,7 +222,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
}
}
} catch (Exception e) {
logger.error("Failed to process dataset for autocrawling", e);
log.error("Failed to process dataset for autocrawling", e);
dataset.setError(true);
modified = true;
}
......@@ -203,19 +237,19 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
private boolean enqueueOnlineCrawlIfAvailable(Collection collection, Endpoint endpoint, Dataset dataset) {
if (outdatedDatamodelIdCrawlIdMap.containsKey(dataset.getId())) {
if (debugging) {
logger.debug("Cannot enqueue online crawl due to outdated datamodel [{}] being reprocessed", dataset.getId());
log.debug("Cannot enqueue online crawl due to outdated datamodel [{}] being reprocessed", dataset.getId());
}
return false;
}
if (endpointIdCrawlIdMap.containsKey(endpoint.getId())) {
if (debugging) {
logger.debug("Cannot enqueue online crawl due to endpoint [{}] already being busy", endpoint.getId());
log.debug("Cannot enqueue online crawl due to endpoint [{}] already being busy", endpoint.getId());
}
return false;
}
if (debugging) {
logger.debug("Enqueued online crawl for endpoint [{}] and dataset [{}]", endpoint.getId(), dataset.getId());
log.debug("Enqueued online crawl for endpoint [{}] and dataset [{}]", endpoint.getId(), dataset.getId());
}
this.performOnlineCrawl(collection, endpoint, datamodelService.findById(dataset.getId()));
......@@ -233,7 +267,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
if (baseCrawlId!=null) {
this.performOfflineCrawl(collection, endpoint, datamodel, baseCrawlId);
if (this.debugging) {
logger.debug("");
log.debug("");
}
count++;
}
......@@ -241,7 +275,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
}
}
}
logger.debug("Reindexing {} datasets of datamodel {}", count, datamodel.getId());
log.debug("Reindexing {} datasets of datamodel {}", count, datamodel.getId());
return count;
}
......@@ -261,7 +295,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
return true;
} catch (Exception e) {
logger.error(String.format("Failed to recreate index and mapping for model [%s]", datamodel.getId()));
log.error(String.format("Failed to recreate index and mapping for model [%s]", datamodel.getId()));
return false;
}
}
......@@ -307,7 +341,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
@Override
protected void enqueue(CrawlPipeline pipeline, Crawl crawl) {
if (pipeline==null || crawl==null) {
logger.warn("Nothing to enqueue - either pipeline or crawl are not set");
log.warn("Nothing to enqueue - either pipeline or crawl are not set");
return;
}
try {
......@@ -320,7 +354,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
pipelineExecutor.execute(pipeline);
} catch (Exception e) {
logger.error("Failed to setup processing pipeline", e);
log.error("Failed to setup processing pipeline", e);
this.error(pipeline.getUuid());
} finally {
statusMapslock.unlock();
......@@ -345,14 +379,14 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
pipelineExecutor = Executors.newFixedThreadPool(getMaxPoolSize());
}
logger.info(String.format("Running sychronization with client [%s], interrupt timeout set to %s", syncClient.getClass().getSimpleName(), this.getTimeout()));
log.info(String.format("Running sychronization with client [%s], interrupt timeout set to %s", syncClient.getClass().getSimpleName(), this.getTimeout()));
// For interrupting the crawl thread if necessary
Timer timeoutTimer = new Timer();
timeoutTimerTask = new TimeoutTimerTask(pipelineExecutor.submit(syncClient), pipelineExecutor);
timeoutTimer.schedule(timeoutTimerTask, this.timeout * 1000);
} catch (Exception e) {
logger.error("An error occurred while executing synchronization", e);
log.error("An error occurred while executing synchronization", e);
}
}
......@@ -364,7 +398,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
// Wait until all threads are finished
while (!pipelineExecutor.isTerminated()) {}
} catch (final Exception e) {
logger.error("Error closing sync executor", e);
log.error("Error closing sync executor", e);
}
}
......@@ -398,12 +432,12 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
private void cleanupCompleted(UUID serviceId) {
CrawlPipeline pipeline = this.serviceIdServiceMap.get(serviceId);
if (pipeline==null || pipeline.getCrawlId()==null) {
logger.error("Failed to resolve pipeline for given service ID. Hashed id maps are probably inconsistent.");
log.error("Failed to resolve pipeline for given service ID. Hashed id maps are probably inconsistent.");
return;
}
Crawl c = crawlService.findById(pipeline.getCrawlId());
if (c==null) {
logger.error("Failed to resolve crawl for given service ID. Hashed id maps are probably inconsistent.");
log.error("Failed to resolve crawl for given service ID. Hashed id maps are probably inconsistent.");
return;
}
......@@ -412,7 +446,7 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
this.removeCrawlId(this.datamodelIdCrawlIdMap, c.getDatamodelId(), c.getId());;
this.removeCrawlId(this.endpointIdCrawlIdMap, c.getEndpointId(), c.getId());
} catch (Exception e) {
logger.error("Failed to process completed service. Hashed id maps are probably inconsistent.");
log.error("Failed to process completed service. Hashed id maps are probably inconsistent.");
} finally {
statusMapslock.unlock();
}
......@@ -441,11 +475,11 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
synchronize();
} catch (Exception e) {
logger.error("Failed to execute sync task", e);
log.error("Failed to execute sync task", e);
}
}
};
syncTimer.scheduleAtFixedRate(syncTimerTask, this.getSyncInterval()*1000, this.getSyncInterval()*1000);
logger.info(String.format("Scheduled synchronization every %s seconds", this.getSyncInterval()));
log.info(String.format("Scheduled synchronization every %s seconds", this.getSyncInterval()));
}
}
......@@ -15,7 +15,7 @@ public class BaseNamedEntityImpl extends BaseEntityImpl implements BaseNamedEnti
return names.get(locale);
}
// TODO: Better choose some default
if (!names.isEmpty()) {
if (names!=null && !names.isEmpty()) {
return names.get(names.keySet().iterator().next());
}
return null;
......
package eu.dariah.de.search.config;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import javax.annotation.PostConstruct;
import org.joda.time.DateTime;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronExpression;
import de.unibamberg.minf.gtf.GtfElementProcessor;
import de.unibamberg.minf.gtf.GtfMappingProcessor;
......@@ -33,16 +49,53 @@ import eu.dariah.de.search.crawling.files.FileUnpacker;
import eu.dariah.de.search.crawling.files.XmlChunker;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Data
@EqualsAndHashCode(callSuper=true)
@EnableScheduling
@Configuration
@ConfigurationProperties(prefix = "crawling")
public class CrawlingConfig extends CrawlingConfigProperties {
public class CrawlingConfig extends CrawlingConfigProperties implements SchedulingConfigurer {
private int maxThreadsPerServiceType = 6;
private int apiAccessPolitenessSpanMs = 200;
@Bean
public Executor crawlingAutomationTaskExecutor() {
return Executors.newSingleThreadScheduledExecutor();
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
CronExpression cronExpression = CronExpression.parse(this.getAutomation().getCronSchedule());
taskRegistrar.setScheduler(crawlingAutomationTaskExecutor());
taskRegistrar.addTriggerTask(new Runnable() {
@Override
public void run() {
log.debug("tick");
}
}, new Trigger() {
@Override
public Date nextExecutionTime(TriggerContext context) {
Optional<Date> lastCompletionTime = Optional.ofNullable(context.lastCompletionTime());
//Instant nextExecutionTime = lastCompletionTime.orElseGet(Date::new).toInstant().plusMillis(tickService.getDelay());
LocalDateTime nextExecutionTime = cronExpression.next(LocalDateTime.now());
log.debug("nextExecutionTime: {}", nextExecutionTime.toString());
log.debug("nextExecutionDate: {}", Date.from(nextExecutionTime.atZone(ZoneId.systemDefault()).toInstant()));
return Date.from(nextExecutionTime.atZone(ZoneId.systemDefault()).toInstant());
}
});
}
@PostConstruct
public void completeConfiguration() {
if (this.getAutomation()==null) {
......
package eu.dariah.de.search.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Data
@Configuration
@EnableScheduling
@ConfigurationProperties(prefix = "scheduling")
public class SchedulingConfig implements SchedulingConfigurer {@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
log.debug("Initializing scheduling config");
}
}
......@@ -43,5 +43,10 @@
<logger name="org.pac4j">
<level value="info" />
</logger>
<!-- Just to avoid numerous warnings while switching to elasticsearch v7.16, can be reset to warn in the future -->
<logger name="org.elasticsearch">
<level value="error" />
</logger>
</configuration>
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment