Commit 24dd0bb3 authored by Gradl, Tobias's avatar Gradl, Tobias
Browse files

451: Reimplement automatic offline crawl capabilities (OPENED)

Task-Url: #451
parent 49b141da
Pipeline #31616 passed with stage
in 34 seconds
......@@ -5,7 +5,6 @@ import java.time.ZoneId;
import java.util.Date;
import org.joda.time.DateTime;
import org.joda.time.Days;
import org.joda.time.Period;
import org.joda.time.Seconds;
import org.springframework.beans.factory.DisposableBean;
......@@ -13,9 +12,6 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dariah.de.search.automation.schedule.NextExecution.CALCULATION_METHODS;
import eu.dariah.de.search.crawling.CrawlManager;
import eu.dariah.de.search.service.CollectionService;
import eu.dariah.de.search.service.CrawlService;
import eu.dariah.de.search.service.DatamodelService;
import lombok.Data;
import lombok.Getter;
......@@ -26,12 +22,8 @@ import lombok.extern.slf4j.Slf4j;
@Data
public abstract class BaseCrawlRunner implements ScheduledRunnable, DisposableBean, InitializingBean {
@Autowired protected CrawlManager crawlManager;
@Autowired protected CrawlService crawlService;
@Autowired protected CollectionService collectionService;
@Autowired protected DatamodelService datamodelService;
@Getter @Setter private boolean automationEnabled;
@Getter @Setter private String cronExpression;
@Getter @Setter protected boolean debugging;
......
package eu.dariah.de.search.automation.schedule;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dariah.de.search.es.service.AdminService;
import eu.dariah.de.search.mapping.MappingGenerationService;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
......@@ -8,13 +14,31 @@ import lombok.extern.slf4j.Slf4j;
@EqualsAndHashCode(callSuper=false)
@Slf4j
public class OfflineCrawlRunner extends BaseCrawlRunner {
@Override
protected void executeAutomation() {
log.debug("Checking status of available datamodels");
}
@Autowired private AdminService adminService;
@Autowired private MappingGenerationService mappingGenerationService;
@Override
public void init() {
log.debug("Initializing automatic offline crawling capabilities");
this.checkAndRecreateIndices(false);
}
@Override
protected void executeAutomation() {
log.debug("Checking status of available datamodels");
this.checkAndRecreateIndices(this.isAutomationEnabled());
}
public void checkAndRecreateIndices(boolean recreate) {
for (ExtendedDatamodelContainer dmc : datamodelService.findAll()) {
}
}
}
package eu.dariah.de.search.automation.schedule;
import org.joda.time.DateTime;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dariah.de.search.crawling.CrawlManager;
import eu.dariah.de.search.model.Collection;
import eu.dariah.de.search.model.Crawl;
import eu.dariah.de.search.model.Dataset;
import eu.dariah.de.search.model.Endpoint;
import eu.dariah.de.search.service.CollectionService;
import eu.dariah.de.search.service.CrawlService;
import eu.dariah.de.search.service.DatamodelService;
import eu.dariah.de.search.service.CrawlService.CrawlCompleteFlag;
import eu.dariah.de.search.service.CrawlService.CrawlErrorFlag;
import eu.dariah.de.search.service.CrawlService.CrawlOnlineFlag;
......@@ -20,6 +25,11 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class OnlineCrawlRunner extends BaseCrawlRunner {
@Autowired protected CrawlManager crawlManager;
@Autowired protected CrawlService crawlService;
@Autowired protected CollectionService collectionService;
@Getter @Setter private boolean collectionsExpire;
@Override
......
......@@ -125,7 +125,7 @@ public class DatamodelController extends BaseController {
@RequestMapping(method=GET, value={"/async/triggerOffline"})
public @ResponseBody ModelActionPojo triggerOffline(Model model, Locale locale) {
crawlingAutomationTaskExecutor.execute(crawlRunner);
crawlingAutomationTaskExecutor.execute(() -> crawlRunner.checkAndRecreateIndices(true));
ModelActionPojo result = new ModelActionPojo();
result.setSuccess(true);
......
......@@ -2,14 +2,12 @@ package eu.dariah.de.search.es.service;
import com.fasterxml.jackson.databind.JsonNode;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
public interface AdminService {
public boolean getIndexExists(String indexName);
public boolean dropIndex(String indexName);
public boolean createIndexIfNotExists(String indexName);
public boolean getMappingExists(String indexName);
public boolean putMapping(String indexName, JsonNode mapping);
public boolean getIsOutdated(ExtendedDatamodelContainer sc, JsonNode mapping);
public boolean dropIndex(String indexName);
boolean getIndexExists(String indexName);
boolean getIsOutdated(String indexName, JsonNode mapping);
public String getMapping(String indexName);
}
package eu.dariah.de.search.es.service;
import java.io.File;
import org.apache.commons.io.FileUtils;
import org.skyscreamer.jsonassert.JSONCompare;
import org.skyscreamer.jsonassert.JSONCompareMode;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dariah.de.search.es.client.IndexClient;
import eu.dariah.de.search.es.client.MappingClient;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class AdminServiceImpl implements AdminService, InitializingBean {
public class AdminServiceImpl implements AdminService {
@Autowired private IndexClient indexClient;
@Autowired private MappingClient mappingClient;
@Autowired private ObjectMapper objMapper;
@Value(value="${paths.currentMappings:${paths.main}/currentMappings}")
private String mappingsPath;
@Override
public void afterPropertiesSet() throws Exception {
File mappingsDir = new File(mappingsPath);
if (!mappingsDir.exists()) {
FileUtils.forceMkdir(mappingsDir);
}
}
@Override
public boolean createIndexIfNotExists(String indexName) {
if (!this.getIndexExists(indexName)) {
......@@ -75,47 +56,18 @@ public class AdminServiceImpl implements AdminService, InitializingBean {
if (!this.getIndexExists(indexName)) {
indexClient.createIndex(indexName);
}
return mappingClient.putMapping(indexName, mapping.toString());
}
@Override
public boolean getIsOutdated(ExtendedDatamodelContainer sc, JsonNode mapping) {
return this.getIsOutdated(sc.getIndexName(), mapping);
}
@Override
public boolean getIsOutdated(String indexName, JsonNode mapping) {
Assert.notNull(indexName);
String temporaryIndexName = "tmp" + indexName;
File mappingsFile = new File(mappingsPath + File.separator + indexName);
try {
JsonNode nExist;
File mappingsFile = new File(mappingsPath + File.separator + indexName);
if (mappingsFile.exists()) {
nExist = objMapper.readValue(mappingsFile, JsonNode.class);
} else if (indexClient.indexExists(indexName) && mappingClient.mappingExists(indexName)) {
nExist = objMapper.readValue(mappingClient.getMapping(indexName), JsonNode.class);
objMapper.writeValue(mappingsFile, nExist);
} else {
return true;
}
if (this.getIndexExists(temporaryIndexName)) {
indexClient.deleteIndex(temporaryIndexName);
}
if (indexClient.createIndex(temporaryIndexName)) {
mappingClient.putMapping(temporaryIndexName, mapping.toString());
JsonNode nTemporary = objMapper.readValue(mappingClient.getMapping(temporaryIndexName), JsonNode.class);
indexClient.deleteIndex(temporaryIndexName);
return !JSONCompare.compareJSON(nExist.toString(), nTemporary.toString(), JSONCompareMode.NON_EXTENSIBLE).passed();
}
return true;
objMapper.writeValue(mappingsFile, mapping);
} catch (Exception e) {
log.error("Failed to compare elasticsearch metdata", e);
return true;
} finally {
if (this.getIndexExists(temporaryIndexName)) {
indexClient.deleteIndex(temporaryIndexName);
}
log.error("Failed to write mapping to filesystem", e);
}
return mappingClient.putMapping(indexName, mapping.toString());
}
@Override
public String getMapping(String indexName) {
return mappingClient.getMapping(indexName);
}
}
\ No newline at end of file
......@@ -304,7 +304,7 @@ public class IndexingServiceImpl implements IndexingService {
private boolean tryRemapping(Collection<Map<String, Object>> remapSources) {
List<Resource> resources = new ArrayList<Resource>(remapSources.size());
List<Resource> resources = new ArrayList<>(remapSources.size());
for (Map<String, Object> source : remapSources) {
resources.add((Resource)source.get(RootElementKeys.CONTENT.toString()));
......@@ -312,10 +312,10 @@ public class IndexingServiceImpl implements IndexingService {
JsonNode mapping = mappingGenerationService.generateMappingForModelAndResources(this.type, resources);
if (adminService.getIsOutdated(index, mapping)) {
//if (adminService.getIsOutdated(index, mapping)) {
return adminService.putMapping(index, mapping);
}
return true;
/*}
return true;*/
}
}
package eu.dariah.de.search.mapping;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import de.unibamberg.minf.processing.model.SerializableResource;
import de.unibamberg.minf.processing.model.base.Resource;
public class MappingToResourceDeserializer extends StdDeserializer<List<Resource>> {
private static final long serialVersionUID = -6693343829785311631L;
private static final String PROPERTIES_FIELD = "properties";
public MappingToResourceDeserializer() {
super(Resource.class);
}
@Override
public List<Resource> deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException {
JsonNode node = jp.getCodec().readTree(jp);
return deserializeMappingProperties(jp, node);
}
private List<Resource> deserializeMappingProperties(JsonParser jp, JsonNode node) {
Iterator<String> fieldNames = node.fieldNames();
List<Resource> result = new ArrayList<>();
String fieldName;
JsonNode value;
while (fieldNames.hasNext()) {
fieldName = fieldNames.next();
value = node.get(fieldName);
if (fieldName.equals(PROPERTIES_FIELD)) {
result.addAll(this.deserializeMappingField(jp, value));
}
}
return result;
}
private List<Resource> deserializeMappingField(JsonParser jp, JsonNode node) {
Iterator<String> fieldNames = node.fieldNames();
List<Resource> result = new ArrayList<>();
String fieldName;
JsonNode value;
Resource r;
while (fieldNames.hasNext()) {
fieldName = fieldNames.next();
value = node.get(fieldName);
r = new SerializableResource(fieldName, null);
r.setChildResources(this.deserializeMappingProperties(jp, value));
result.add(r);
}
return result;
}
}
......@@ -47,4 +47,8 @@ public interface DatamodelService {
//public Map<String, Long> getEndpointDocumentCount(String indexName, List<String> collectionIds);
public String getIndexName(String modelId);
public boolean getIsOutdated(ExtendedDatamodelContainer sc, JsonNode mapping);
public boolean getIsOutdated(String datamodelId, String indexName, JsonNode mapping);
}
package eu.dariah.de.search.service;
import java.io.File;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.skyscreamer.jsonassert.JSONCompare;
import org.skyscreamer.jsonassert.JSONCompareMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.type.CollectionType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import de.unibamberg.minf.dme.model.datamodel.base.Datamodel;
import de.unibamberg.minf.processing.model.base.Resource;
import eu.dariah.de.search.dao.fs.DatamodelConfigDao;
import eu.dariah.de.search.es.service.AdminService;
import eu.dariah.de.search.mapping.MappingGenerationService;
import eu.dariah.de.search.mapping.MappingToResourceDeserializer;
import eu.dariah.de.search.model.ExtendedDatamodelContainer;
@Component
public class DatamodelServiceImpl implements DatamodelService, InitializingBean {
protected final Logger logger = LoggerFactory.getLogger(DatamodelServiceImpl.class);
private static CollectionType resourceListTypeReference = TypeFactory.defaultInstance().constructCollectionType(List.class, Resource.class);
private static ObjectMapper mappingToResourceMapper = getMappingToResourceMapper();
protected final Logger logger = LoggerFactory.getLogger(DatamodelServiceImpl.class);
@Autowired private DatamodelConfigDao datamodelConfigDao;
@Autowired private AdminService adminService;
@Autowired private ObjectMapper objMapper;
@Autowired private MappingGenerationService mappingGenerationService;
@Value(value="${paths.currentMappings:${paths.main}/currentMappings}")
private String mappingsPath;
// Hashtable is synchronized
private Hashtable<String, ExtendedDatamodelContainer> schemaMap;
......@@ -29,12 +49,12 @@ public class DatamodelServiceImpl implements DatamodelService, InitializingBean
@Override
public void afterPropertiesSet() throws Exception {
/*for (ExtendedDatamodelContainer sc : datamodelConfigDao.findAll()) {
adminService.createIndexIfNotExists(sc.getIndexName());
}*/
File mappingsDir = new File(mappingsPath);
if (!mappingsDir.exists()) {
FileUtils.forceMkdir(mappingsDir);
}
this.findAll(true);
}
@Override
public List<Datamodel> findAllModels() {
......@@ -155,14 +175,12 @@ public class DatamodelServiceImpl implements DatamodelService, InitializingBean
@Override
public boolean clearIndex(ExtendedDatamodelContainer sc, JsonNode mapping) {
if (adminService.getIndexExists(sc.getIndexName())) {
if (!adminService.dropIndex(sc.getIndexName())) {
logger.warn("Could not drop existing index [{}]", sc.getIndexName());
return false;
}
if (adminService.getIndexExists(sc.getIndexName()) && !adminService.dropIndex(sc.getIndexName())) {
logger.warn("Could not drop existing index [{}]", sc.getIndexName());
return false;
}
adminService.putMapping(sc.getIndexName(), mapping);
boolean outdated = adminService.getIsOutdated(sc.getIndexName(), mapping);
boolean outdated = this.getIsOutdated(sc, mapping);
if (sc.isOutdated()!=outdated) {
sc.setOutdated(outdated);
this.saveOrUpdate(sc);
......@@ -175,4 +193,48 @@ public class DatamodelServiceImpl implements DatamodelService, InitializingBean
public String getIndexName(String modelId) {
return datamodelConfigDao.getIndexName(modelId);
}
@Override
public boolean getIsOutdated(ExtendedDatamodelContainer sc, JsonNode mapping) {
return this.getIsOutdated(sc.getId(), sc.getIndexName(), mapping);
}
@Override
public boolean getIsOutdated(String datamodelId, String indexName, JsonNode mapping) {
try {
// Holds mapping as in filesystem and thus the index
JsonNode nExist;
File mappingsFile = new File(mappingsPath + File.separator + indexName);
if (mappingsFile.exists()) {
nExist = objMapper.readValue(mappingsFile, JsonNode.class);
} else if (adminService.getIndexExists(indexName) && adminService.getMappingExists(indexName)) {
nExist = objMapper.readValue(adminService.getMapping(indexName), JsonNode.class);
objMapper.writeValue(mappingsFile, nExist);
} else {
return true;
}
// Convert existing mapping to resources
List<Resource> convertedMapping = mappingToResourceMapper.readValue(mappingsFile, resourceListTypeReference);
// Create new mapping based on datamodel and currently mapped fields (converted resources)
JsonNode nCompare = mappingGenerationService.generateMappingForModelAndResources(indexName, convertedMapping);
// Compare mapping in index and newly generated
return !JSONCompare.compareJSON(nExist.toString(), nCompare.toString(), JSONCompareMode.NON_EXTENSIBLE).passed();
} catch (Exception e) {
logger.error("Failed to compare elasticsearch metdata", e);
return true;
}
}
private static ObjectMapper getMappingToResourceMapper() {
ObjectMapper m = new ObjectMapper();
MappingToResourceDeserializer resourceDeserializer = new MappingToResourceDeserializer();
SimpleModule module = new SimpleModule();
module.addDeserializer(List.class, resourceDeserializer);
m.registerModule(module);
return m;
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment