Commit 770e394e authored by Gradl, Tobias's avatar Gradl, Tobias
Browse files

446: Reimplement automatic online and offline crawl capabilities

(OPENED)

Task-Url: #446
parent cf3dfc99
Pipeline #31077 passed with stage
in 2 minutes and 14 seconds
...@@ -18,9 +18,11 @@ import org.springframework.web.bind.annotation.ResponseBody; ...@@ -18,9 +18,11 @@ import org.springframework.web.bind.annotation.ResponseBody;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import de.unibamberg.minf.core.web.controller.DataTableList; import de.unibamberg.minf.core.web.controller.DataTableList;
import de.unibamberg.minf.core.web.pojo.MessagePojo;
import de.unibamberg.minf.core.web.pojo.ModelActionPojo; import de.unibamberg.minf.core.web.pojo.ModelActionPojo;
import eu.dariah.de.search.automation.CollectionSyncService; import eu.dariah.de.search.automation.CollectionSyncService;
import eu.dariah.de.search.config.CrawlingConfigProperties; import eu.dariah.de.search.config.CrawlingConfigProperties;
import eu.dariah.de.search.crawling.TimedCrawlManagerImpl;
import eu.dariah.de.search.pojo.CollectionPojo; import eu.dariah.de.search.pojo.CollectionPojo;
import eu.dariah.de.search.pojo.DatasetPojo; import eu.dariah.de.search.pojo.DatasetPojo;
import eu.dariah.de.search.pojo.DatasourcePojo; import eu.dariah.de.search.pojo.DatasourcePojo;
...@@ -36,6 +38,7 @@ public class CollectionController extends BaseController { ...@@ -36,6 +38,7 @@ public class CollectionController extends BaseController {
@Autowired private CollectionConverter collectionConverter; @Autowired private CollectionConverter collectionConverter;
@Autowired private CollectionSyncService crSyncService; @Autowired private CollectionSyncService crSyncService;
@Autowired private DatamodelService datamodelService; @Autowired private DatamodelService datamodelService;
@Autowired private TimedCrawlManagerImpl crawlManager;
@Autowired private CrawlingConfigProperties crawlingConfig; @Autowired private CrawlingConfigProperties crawlingConfig;
...@@ -88,6 +91,17 @@ public class CollectionController extends BaseController { ...@@ -88,6 +91,17 @@ public class CollectionController extends BaseController {
return result; return result;
} }
@RequestMapping(method=GET, value={"/async/triggerOnline"})
public @ResponseBody ModelActionPojo triggerOnline(Model model, Locale locale) {
crawlManager.enqueueNewAndOutdatedDatasets();
ModelActionPojo result = new ModelActionPojo();
result.setSuccess(true);
result.setMessage(new MessagePojo("error", "~error.head", "~error.body"));
return result;
}
@RequestMapping(method=GET, value={"/getColregStatus"}) @RequestMapping(method=GET, value={"/getColregStatus"})
public @ResponseBody ModelActionPojo getColregStatus(Model model, Locale locale) { public @ResponseBody ModelActionPojo getColregStatus(Model model, Locale locale) {
ModelActionPojo result = new ModelActionPojo(true); ModelActionPojo result = new ModelActionPojo(true);
......
...@@ -27,6 +27,7 @@ import de.unibamberg.minf.core.web.pojo.ModelActionPojo; ...@@ -27,6 +27,7 @@ import de.unibamberg.minf.core.web.pojo.ModelActionPojo;
import eu.dariah.de.search.automation.DmeSyncService; import eu.dariah.de.search.automation.DmeSyncService;
import eu.dariah.de.search.config.CrawlingConfigProperties; import eu.dariah.de.search.config.CrawlingConfigProperties;
import eu.dariah.de.search.config.MainConfigProperties; import eu.dariah.de.search.config.MainConfigProperties;
import eu.dariah.de.search.crawling.TimedCrawlManagerImpl;
import eu.dariah.de.search.mapping.MappingGenerationService; import eu.dariah.de.search.mapping.MappingGenerationService;
import eu.dariah.de.search.model.ExtendedDatamodelContainer; import eu.dariah.de.search.model.ExtendedDatamodelContainer;
import eu.dariah.de.search.pojo.DatamodelPojo; import eu.dariah.de.search.pojo.DatamodelPojo;
...@@ -40,6 +41,7 @@ public class DatamodelController extends BaseController { ...@@ -40,6 +41,7 @@ public class DatamodelController extends BaseController {
@Autowired private DatamodelConverter datamodelConverter; @Autowired private DatamodelConverter datamodelConverter;
@Autowired private MappingGenerationService mappingGenerationService; @Autowired private MappingGenerationService mappingGenerationService;
@Autowired private DmeSyncService dmeSyncService; @Autowired private DmeSyncService dmeSyncService;
@Autowired private TimedCrawlManagerImpl crawlManager;
@Autowired private MainConfigProperties config; @Autowired private MainConfigProperties config;
@Autowired private CrawlingConfigProperties crawlingConfig; @Autowired private CrawlingConfigProperties crawlingConfig;
...@@ -117,6 +119,16 @@ public class DatamodelController extends BaseController { ...@@ -117,6 +119,16 @@ public class DatamodelController extends BaseController {
return result; return result;
} }
@RequestMapping(method=GET, value={"/async/triggerOffline"})
public @ResponseBody ModelActionPojo triggerOffline(Model model, Locale locale) {
crawlManager.reindexOutdatedData();
ModelActionPojo result = new ModelActionPojo();
result.setSuccess(true);
result.setMessage(new MessagePojo("error", "~error.head", "~error.body"));
return result;
}
@RequestMapping(method=GET, value={"/getDmeStatus"}) @RequestMapping(method=GET, value={"/getDmeStatus"})
public @ResponseBody ModelActionPojo getDmeStatus(Model model, Locale locale) { public @ResponseBody ModelActionPojo getDmeStatus(Model model, Locale locale) {
ModelActionPojo result = new ModelActionPojo(true); ModelActionPojo result = new ModelActionPojo(true);
......
...@@ -120,18 +120,10 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw ...@@ -120,18 +120,10 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
} }
try { try {
statusMapslock.lock(); statusMapslock.lock();
// TODO Reimplement once a new reindexing strategy is in place if (autocrawlOffline) {
/*if (autocrawlOffline) { this.reindexOutdatedData();
// Handle outdated models if any and conditions are met }
List<ExtendedDatamodelContainer> refreshDatamodels = this.determineRefreshableDatamodels();
for (ExtendedDatamodelContainer datamodel : refreshDatamodels) {
// Drop all indexed data and recreate index with new mapping
if (this.recreateIndex(datamodel)) {
this.reindexDatamodel(datamodel);
}
}
}*/
if (autocrawlOnline) { if (autocrawlOnline) {
// Handle new or updated datasets // Handle new or updated datasets
...@@ -148,8 +140,18 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw ...@@ -148,8 +140,18 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
} }
} }
public void reindexOutdatedData() {
// Handle outdated models if any and conditions are met
List<ExtendedDatamodelContainer> refreshDatamodels = this.determineRefreshableDatamodels();
for (ExtendedDatamodelContainer datamodel : refreshDatamodels) {
// Drop all indexed data and recreate index with new mapping
if (this.recreateIndex(datamodel)) {
this.reindexDatamodel(datamodel);
}
}
}
private void enqueueNewAndOutdatedDatasets() { public void enqueueNewAndOutdatedDatasets() {
DateTime syncTimestamp = DateTime.now(); DateTime syncTimestamp = DateTime.now();
List<Crawl> lastOnlineCrawls; List<Crawl> lastOnlineCrawls;
Crawl lastOnlineCrawl; Crawl lastOnlineCrawl;
...@@ -274,13 +276,16 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw ...@@ -274,13 +276,16 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
} }
/*private List<ExtendedDatamodelContainer> determineRefreshableDatamodels() { private List<ExtendedDatamodelContainer> determineRefreshableDatamodels() {
List<ExtendedDatamodelContainer> refreshDatamodels = new ArrayList<ExtendedDatamodelContainer>(); List<ExtendedDatamodelContainer> refreshDatamodels = new ArrayList<ExtendedDatamodelContainer>();
// Start of outdated reprocessing only between 1 and 3 o'clock // Start of outdated reprocessing only between 1 and 3 o'clock
if (DateTime.now().getHourOfDay()>0 && DateTime.now().getHourOfDay()<=2) { if (DateTime.now().getHourOfDay()>0 && DateTime.now().getHourOfDay()<=2) {
for (ExtendedDatamodelContainer datamodel : datamodelService.findAll()) { for (ExtendedDatamodelContainer datamodel : datamodelService.findAll()) {
if (mappingGenerationService.getIsOutdated(datamodel)) {
// TODO: Reimplement
/*if (mappingGenerationService.getIsOutdated(datamodel)) {
if (!this.outdatedDatamodelIdCrawlIdMap.containsKey(datamodel.getId())) { if (!this.outdatedDatamodelIdCrawlIdMap.containsKey(datamodel.getId())) {
// Start blocking of outdated datamodel // Start blocking of outdated datamodel
this.outdatedDatamodelIdCrawlIdMap.put(datamodel.getId(), null); this.outdatedDatamodelIdCrawlIdMap.put(datamodel.getId(), null);
...@@ -291,11 +296,11 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw ...@@ -291,11 +296,11 @@ public class TimedCrawlManagerImpl extends CrawlManagerImpl implements TimedCraw
refreshDatamodels.add(datamodel); refreshDatamodels.add(datamodel);
} }
} }
} }*/
} }
} }
return refreshDatamodels; return refreshDatamodels;
}*/ }
......
package eu.dariah.de.search.es.client; package eu.dariah.de.search.es.client;
import java.io.IOException;
import java.util.Map; import java.util.Map;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
public interface IndexingClient { public interface IndexingClient {
public BulkResponse bulkIndexSources(String index, Map<String, Map<String, Object>> idSourceMap); public BulkResponse bulkIndexSources(String index, Map<String, Map<String, Object>> idSourceMap) throws IOException;
public long indexSources(String indexName, Map<String, Map<String, Object>> idSourceMap); public long indexSources(String indexName, Map<String, Map<String, Object>> idSourceMap);
public void indexSource(String string, String resourceId, String source); public void indexSource(String string, String resourceId, String source);
} }
...@@ -85,28 +85,24 @@ public class IndexingClientImpl extends BaseEsClientImpl implements IndexingClie ...@@ -85,28 +85,24 @@ public class IndexingClientImpl extends BaseEsClientImpl implements IndexingClie
} }
@Override @Override
public BulkResponse bulkIndexSources(String index, Map<String, Map<String, Object>> idSourceMap) { public BulkResponse bulkIndexSources(String index, Map<String, Map<String, Object>> idSourceMap) throws IOException {
BulkRequest bulkRequest = new BulkRequest(); BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest; IndexRequest indexRequest;
String strSource; String strSource;
try {
for (Entry<String, Map<String, Object>> source : idSourceMap.entrySet()) { for (Entry<String, Map<String, Object>> source : idSourceMap.entrySet()) {
try { try {
strSource = indexingObjectMapper.writeValueAsString(source.getValue()); strSource = indexingObjectMapper.writeValueAsString(source.getValue());
if (logResources && logger.isDebugEnabled()) { if (logResources && logger.isDebugEnabled()) {
logger.debug(strSource); logger.debug(strSource);
}
indexRequest = new IndexRequest(index).id(source.getKey()).source(strSource, XContentType.JSON);
bulkRequest.add(indexRequest);
} catch (JsonProcessingException e) {
logger.warn("Failed to index resource", e);
} }
indexRequest = new IndexRequest(index).id(source.getKey()).source(strSource, XContentType.JSON);
bulkRequest.add(indexRequest);
} catch (JsonProcessingException e) {
logger.warn("Failed to index resource", e);
} }
return client.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
logger.error("Error while bulk indexing resources", e);
} }
return null; return client.bulk(bulkRequest, RequestOptions.DEFAULT);
} }
} }
...@@ -66,7 +66,7 @@ public class SearchClientImpl extends BaseEsClientImpl implements SearchClient { ...@@ -66,7 +66,7 @@ public class SearchClientImpl extends BaseEsClientImpl implements SearchClient {
} }
return response.getCount(); return response.getCount();
} catch (Exception e) { } catch (Exception e) {
logger.error("Failed to execute count: " + e.getMessage(), e); //logger.error("Failed to execute count: " + e.getMessage(), e);
} }
return 0; return 0;
} }
......
...@@ -133,73 +133,79 @@ public class IndexingServiceImpl implements IndexingService { ...@@ -133,73 +133,79 @@ public class IndexingServiceImpl implements IndexingService {
sources.put(rc.getId(), rc.toSource()); sources.put(rc.getId(), rc.toSource());
} }
BulkResponse bulkResponse = indexingClient.bulkIndexSources(this.index, sources); try {
int docCount = bulkResponse.getItems().length; BulkResponse bulkResponse = indexingClient.bulkIndexSources(this.index, sources);
int docCount = bulkResponse.getItems().length;
Map<String, Map<String, Object>> succSources = new HashMap<>();
BulkItemResponse resp;
if (bulkResponse.hasFailures()) {
log.debug(String.format("Completed bulk indexing %s records WITH errors", docCount));
Map<String, Map<String, Object>> errSources = new HashMap<> ();
Map<String, Map<String, Object>> succSources = new HashMap<>();
for (int i=0; i<bulkResponse.getItems().length; i++) { BulkItemResponse resp;
resp = bulkResponse.getItems()[i]; if (bulkResponse.hasFailures()) {
if (resp.isFailed()) {
if (StrictDynamicMappingException.class.isAssignableFrom(resp.getFailure().getCause().getClass()) ||
resp.getFailureMessage().contains("strict_dynamic_mapping_exception") log.debug(String.format("Completed bulk indexing %s records WITH errors", docCount));
) {
errSources.put(resp.getId(), sources.get(resp.getId())); Map<String, Map<String, Object>> errSources = new HashMap<> ();
for (int i=0; i<bulkResponse.getItems().length; i++) {
resp = bulkResponse.getItems()[i];
if (resp.isFailed()) {
if (StrictDynamicMappingException.class.isAssignableFrom(resp.getFailure().getCause().getClass()) ||
resp.getFailureMessage().contains("strict_dynamic_mapping_exception")
) {
errSources.put(resp.getId(), sources.get(resp.getId()));
} else {
log.warn(resp.getFailureMessage());
}
} else { } else {
log.warn(resp.getFailureMessage()); succSources.put(resp.getId(), sources.get(resp.getId()));
} }
} else {
succSources.put(resp.getId(), sources.get(resp.getId()));
} }
} if (errSources.size()>0) {
if (errSources.size()>0) { log.info("Resources failed for insufficient elasticsearch mapping. Trying to extend mapping...");
log.info("Resources failed for insufficient elasticsearch mapping. Trying to extend mapping..."); if (this.tryRemapping(errSources.values())) {
if (this.tryRemapping(errSources.values())) { log.info("Mapping extended...retrying failed resources");
log.info("Mapping extended...retrying failed resources");
bulkResponse = indexingClient.bulkIndexSources(this.index, errSources);
docCount = bulkResponse.getItems().length;
log.debug(String.format("Retry...completed bulk indexing %s records", docCount));
if (bulkResponse.hasFailures()) { bulkResponse = indexingClient.bulkIndexSources(this.index, errSources);
log.warn("Retry...bulk indexing resulted in errors: " + bulkResponse.buildFailureMessage()); docCount = bulkResponse.getItems().length;
for (int i=0; i<bulkResponse.getItems().length; i++) {
resp = bulkResponse.getItems()[i]; log.debug(String.format("Retry...completed bulk indexing %s records", docCount));
if (!resp.isFailed()) {
succSources.put(resp.getId(), errSources.get(resp.getId())); if (bulkResponse.hasFailures()) {
log.warn("Retry...bulk indexing resulted in errors: " + bulkResponse.buildFailureMessage());
for (int i=0; i<bulkResponse.getItems().length; i++) {
resp = bulkResponse.getItems()[i];
if (!resp.isFailed()) {
succSources.put(resp.getId(), errSources.get(resp.getId()));
}
} }
} } else {
} else { for (int i=0; i<bulkResponse.getItems().length; i++) {
for (int i=0; i<bulkResponse.getItems().length; i++) { succSources.put(bulkResponse.getItems()[i].getId(), errSources.get(bulkResponse.getItems()[i].getId()));
succSources.put(bulkResponse.getItems()[i].getId(), errSources.get(bulkResponse.getItems()[i].getId())); }
} }
} }
}
} else {
for (int i=0; i<bulkResponse.getItems().length; i++) {
succSources.put(bulkResponse.getItems()[i].getId(), sources.get(bulkResponse.getItems()[i].getId()));
} }
} }
} else {
for (int i=0; i<bulkResponse.getItems().length; i++) { log.debug("Bulk indexed {} documents", succSources.size());
succSources.put(bulkResponse.getItems()[i].getId(), sources.get(bulkResponse.getItems()[i].getId()));
} for (Entry<String, Map<String, Object>> succSource : succSources.entrySet()) {
} if (succSource.getValue().containsKey(RootElementKeys.PRESENTATION.toString())) {
this.handleDownloads(succSource.getKey(), (Resource)succSource.getValue().get(RootElementKeys.PRESENTATION.toString()));
log.debug("Bulk indexed {} documents", succSources.size()); }
}
return docCount;
for (Entry<String, Map<String, Object>> succSource : succSources.entrySet()) { } catch (Exception e) {
if (succSource.getValue().containsKey(RootElementKeys.PRESENTATION.toString())) { log.error("Failed to execute bulk index request", e);
this.handleDownloads(succSource.getKey(), (Resource)succSource.getValue().get(RootElementKeys.PRESENTATION.toString())); return 0;
} }
}
return docCount;
} }
......
Subproject commit 60db673cb20da87f563e04fc8aa05b09e72b89b4 Subproject commit 0201f7065c7d782a49ab7faa01876f01aabf3c31
Subproject commit df67695877aa86e7ce77434952e886a616b7303e Subproject commit bc9348fa600b80863e88dfe43a7d570ec571865e
Subproject commit b951057e62d916791f2509a861e58f8c317c2e00 Subproject commit 25eda28254d770a11f0a831c20d6af715f362382
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment