Commit 6eeb39f9 authored by Gradl, Tobias's avatar Gradl, Tobias
Browse files

1327: Separate IndexingService from ResourceEnrichmentService

Task-Url: https://pm.winseda.de/issues/1327
parent 28435b2c
......@@ -28,7 +28,6 @@ import eu.dariah.de.search.Constants.FileTypes;
import eu.dariah.de.search.crawling.crawler.Crawler;
import eu.dariah.de.search.crawling.crawler.Processor;
import eu.dariah.de.search.crawling.gtf.CrawlingExecutionContext;
import eu.dariah.de.search.es.service.IndexingServiceImpl;
import eu.dariah.de.search.model.Crawl;
import eu.dariah.de.search.model.Dataset;
import eu.dariah.de.search.model.Collection;
......@@ -37,6 +36,7 @@ import eu.dariah.de.search.model.ExtendedDatamodelContainer;
import eu.dariah.de.search.query.execution.DocumentService;
import eu.dariah.de.search.service.CollectionService;
import eu.dariah.de.search.service.CrawlService;
import eu.dariah.de.search.service.ResourceIndexingServiceImpl;
public class CrawlManagerImpl implements CrawlManager, ApplicationContextAware, InitializingBean {
protected static final Logger logger = LoggerFactory.getLogger(CrawlManagerImpl.class);
......@@ -230,10 +230,10 @@ public class CrawlManagerImpl implements CrawlManager, ApplicationContextAware,
CrawlingExecutionContext ctx = new CrawlingExecutionContext(this.baseDownloadPath, c);
Crawler[] crawlers = this.getCrawlers(m, c.getBaseCrawlId()==null);
IndexingServiceImpl indexer;
ResourceIndexingServiceImpl indexer;
for (Crawler crawler : crawlers) {
if (crawler instanceof Processor) {
indexer = appContext.getBean(IndexingServiceImpl.class);
indexer = appContext.getBean(ResourceIndexingServiceImpl.class);
indexer.setDatasourceId(c.getCollectionId());
indexer.setIndex(sc.getIndexName());
......
package eu.dariah.de.search.es.service;
import de.unibamberg.minf.processing.consumption.ResourceConsumptionService;
import eu.dariah.de.search.es.client.base.BaseEsClient;
import java.util.List;
public interface IndexingService extends BaseEsClient, ResourceConsumptionService {
import eu.dariah.de.search.indexing.model.ResourceContainer;
public interface IndexingService {
public void init(String index, String type, String datasourceId, String endpointId);
public void appendMetadata(List<ResourceContainer> resourceContainers);
public int indexResources(List<ResourceContainer> resourceContainers);
}
......@@ -6,8 +6,9 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.Map.Entry;
import org.apache.commons.codec.digest.DigestUtils;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
......@@ -22,8 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.MissingNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import de.unibamberg.minf.core.util.json.JsonNodeHelper;
import de.unibamberg.minf.processing.exception.ProcessingConfigException;
import de.unibamberg.minf.processing.model.SerializableResource;
import de.unibamberg.minf.processing.model.SerializableRootResource;
import de.unibamberg.minf.processing.model.base.Resource;
......@@ -35,16 +36,11 @@ import eu.dariah.de.search.es.client.IndexingClient;
import eu.dariah.de.search.es.service.base.BaseEsServiceImpl;
import eu.dariah.de.search.indexing.model.ResourceContainer;
import eu.dariah.de.search.mapping.MappingGenerationService;
import eu.dariah.de.search.transformation.ResourceEnrichmentService;
@Component
@Scope("prototype")
public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingService {
public static final String GSMODEL_IDENTIFIER = "Indexing.Identifier";
public static final String GSMODEL_IMAGE = "Indexing.Image";
public static final int BULK_CHUNK_SIZE = 10;
public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingService {
@Autowired private IndexingClient indexingClient;
@Autowired private MappingGenerationService mappingGenerationService;
@Autowired private AdminService adminService;
......@@ -54,79 +50,26 @@ public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingSe
@Autowired private ObjectMapper objectMapper;
@Value("${datamodels.integration}")
private String integrationModelEntityId;
@Value("${datamodels.presentation}")
private String presentationModelEntityId;
@Value("${debugging.indexing.disable_downloads:#{false}}")
private boolean disableDownloads;
@Value("${indexing.resources.index_errors:#{true}}")
private boolean indexErrors;
private String index;
private String type;
private String datasourceId;
private String endpointId;
private boolean initialized = false;
public String getIndex() { return index; }
public void setIndex(String index) { this.index = index; }
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getDatasourceId() { return datasourceId; }
public void setDatasourceId(String datasourceId) { this.datasourceId = datasourceId; }
public String getEndpointId() { return endpointId; }
public void setEndpointId(String endpointId) { this.endpointId = endpointId; }
public boolean isInitialized() { return initialized; }
@Autowired private ResourceEnrichmentService enrichmentService;
@Override
public void init(String datamodelId) {
if (!initialized) {
try {
logger.debug("Initializing new indexing client...");
enrichmentService.init(datamodelId);
} catch (ProcessingConfigException e) {
logger.error("Failed to initialize indexing client", e);
}
}
}
@Override
public void consume(Resource resource) {
this.enrichmentService.consume(resource);
// Chunk-processing incoming data
if (this.enrichmentService.getResourceBatch().size()>=BULK_CHUNK_SIZE) {
this.commit();
}
public void init(String index, String type, String datasourceId, String endpointId) {
this.index = index;
this.type = type;
this.datasourceId = datasourceId;
this.endpointId = endpointId;
}
@Override
public int commit() {
this.enrichmentService.commit();
this.appendMetadata();
this.indexResources();
this.enrichmentService.reset();
return 0;
}
private void appendMetadata() {
public void appendMetadata(List<ResourceContainer> resourceContainers) {
// Metadata rendered with the resource
for (ResourceContainer rc : this.enrichmentService.getResourceBatch()) {
for (ResourceContainer rc : resourceContainers) {
Resource rMetaParent = new SerializableResource(Constants.ELEMENT_KEY_META_RESOURCE, null);
Resource rMetaParentMerge;
if (SerializableRootResource.class.isAssignableFrom(rc.getContent().getClass())) {
......@@ -147,8 +90,8 @@ public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingSe
ObjectNode metaNode = objectMapper.createObjectNode();
jsonNodeHelper.replaceRecursive(metaNode, new String[] {Constants.ELEMENT_KEY_META, Constants.ELEMENT_KEY_META_RESOURCE}, n);
jsonNodeHelper.replaceRecursive(metaNode, Constants.ELEMENT_KEY_COLLECTION_ID.split("\\."), this.getDatasourceId());
jsonNodeHelper.replaceRecursive(metaNode, Constants.ELEMENT_KEY_ENDPOINT_ID.split("\\."), this.getEndpointId());
jsonNodeHelper.replaceRecursive(metaNode, Constants.ELEMENT_KEY_COLLECTION_ID.split("\\."), datasourceId);
jsonNodeHelper.replaceRecursive(metaNode, Constants.ELEMENT_KEY_ENDPOINT_ID.split("\\."), endpointId);
JsonNode errNode = this.getErrorNode(rc);
if (!errNode.isMissingNode()) {
......@@ -177,6 +120,88 @@ public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingSe
}
}
@Override
public int indexResources(List<ResourceContainer> resourceContainers) {
if (resourceContainers.isEmpty()) {
return 0;
}
// TODO: Refactor this...
Map<String, Map<String, Object>> sources = new HashMap<>();
for (ResourceContainer rc : resourceContainers) {
sources.put(rc.getId(), rc.toSource());
}
BulkResponse bulkResponse = indexingClient.bulkIndexSources(this.index, sources);
int docCount = bulkResponse.getItems().length;
Map<String, Map<String, Object>> succSources = new HashMap<>();
BulkItemResponse resp;
if (bulkResponse.hasFailures()) {
logger.debug(String.format("Completed bulk indexing %s records WITH errors", docCount));
Map<String, Map<String, Object>> errSources = new HashMap<> ();
for (int i=0; i<bulkResponse.getItems().length; i++) {
resp = bulkResponse.getItems()[i];
if (resp.isFailed()) {
if (StrictDynamicMappingException.class.isAssignableFrom(resp.getFailure().getCause().getClass()) ||
resp.getFailureMessage().contains("strict_dynamic_mapping_exception")
) {
errSources.put(resp.getId(), sources.get(resp.getId()));
} else {
logger.warn(resp.getFailureMessage());
}
} else {
succSources.put(resp.getId(), sources.get(resp.getId()));
}
}
if (errSources.size()>0) {
logger.info("Resources failed for insufficient elasticsearch mapping. Trying to extend mapping...");
if (this.tryRemapping(errSources.values())) {
logger.info("Mapping extended...retrying failed resources");
bulkResponse = indexingClient.bulkIndexSources(this.index, errSources);
docCount = bulkResponse.getItems().length;
logger.debug(String.format("Retry...completed bulk indexing %s records", docCount));
if (bulkResponse.hasFailures()) {
logger.warn("Retry...bulk indexing resulted in errors: " + bulkResponse.buildFailureMessage());
for (int i=0; i<bulkResponse.getItems().length; i++) {
resp = bulkResponse.getItems()[i];
if (!resp.isFailed()) {
succSources.put(resp.getId(), errSources.get(resp.getId()));
}
}
} else {
for (int i=0; i<bulkResponse.getItems().length; i++) {
succSources.put(bulkResponse.getItems()[i].getId(), errSources.get(bulkResponse.getItems()[i].getId()));
}
}
}
}
} else {
for (int i=0; i<bulkResponse.getItems().length; i++) {
succSources.put(bulkResponse.getItems()[i].getId(), sources.get(bulkResponse.getItems()[i].getId()));
}
}
logger.debug("Bulk indexed {} documents", succSources.size());
for (Entry<String, Map<String, Object>> succSource : succSources.entrySet()) {
if (succSource.getValue().containsKey(RootElementKeys.PRESENTATION.toString())) {
this.handleDownloads(succSource.getKey(), (Resource)succSource.getValue().get(RootElementKeys.PRESENTATION.toString()));
}
}
return docCount;
}
private JsonNode resourceToSimpleNode(Resource r) {
if (r.getChildResources()==null && r.getValue()!=null) {
return objectMapper.valueToTree(r.getValue());
......@@ -264,91 +289,12 @@ public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingSe
}
}
if (!urls.isEmpty()) {
cachedImageService.cacheImage(this.getDatasourceId(), this.getEndpointId(), this.getType(), resourceIndex, urls);
cachedImageService.cacheImage(datasourceId, endpointId, type, resourceIndex, urls);
}
}
}
private int indexResources() {
if (this.enrichmentService.getResourceBatch().isEmpty()) {
return 0;
}
// TODO: Refactor this...
Map<String, Map<String, Object>> sources = new HashMap<>();
for (ResourceContainer rc : this.enrichmentService.getResourceBatch()) {
sources.put(rc.getId(), rc.toSource());
}
BulkResponse bulkResponse = indexingClient.bulkIndexSources(this.index, sources);
int docCount = bulkResponse.getItems().length;
Map<String, Map<String, Object>> succSources = new HashMap<>();
BulkItemResponse resp;
if (bulkResponse.hasFailures()) {
logger.debug(String.format("Completed bulk indexing %s records WITH errors", docCount));
Map<String, Map<String, Object>> errSources = new HashMap<> ();
for (int i=0; i<bulkResponse.getItems().length; i++) {
resp = bulkResponse.getItems()[i];
if (resp.isFailed()) {
if (StrictDynamicMappingException.class.isAssignableFrom(resp.getFailure().getCause().getClass()) ||
resp.getFailureMessage().contains("strict_dynamic_mapping_exception")
) {
errSources.put(resp.getId(), sources.get(resp.getId()));
} else {
logger.warn(resp.getFailureMessage());
}
} else {
succSources.put(resp.getId(), sources.get(resp.getId()));
}
}
if (errSources.size()>0) {
logger.info("Resources failed for insufficient elasticsearch mapping. Trying to extend mapping...");
if (this.tryRemapping(errSources.values())) {
logger.info("Mapping extended...retrying failed resources");
bulkResponse = indexingClient.bulkIndexSources(this.index, errSources);
docCount = bulkResponse.getItems().length;
logger.debug(String.format("Retry...completed bulk indexing %s records", docCount));
if (bulkResponse.hasFailures()) {
logger.warn("Retry...bulk indexing resulted in errors: " + bulkResponse.buildFailureMessage());
for (int i=0; i<bulkResponse.getItems().length; i++) {
resp = bulkResponse.getItems()[i];
if (!resp.isFailed()) {
succSources.put(resp.getId(), errSources.get(resp.getId()));
}
}
} else {
for (int i=0; i<bulkResponse.getItems().length; i++) {
succSources.put(bulkResponse.getItems()[i].getId(), errSources.get(bulkResponse.getItems()[i].getId()));
}
}
}
}
} else {
for (int i=0; i<bulkResponse.getItems().length; i++) {
succSources.put(bulkResponse.getItems()[i].getId(), sources.get(bulkResponse.getItems()[i].getId()));
}
}
logger.debug("Bulk indexed {} documents", succSources.size());
for (Entry<String, Map<String, Object>> succSource : succSources.entrySet()) {
if (succSource.getValue().containsKey(RootElementKeys.PRESENTATION.toString())) {
this.handleDownloads(succSource.getKey(), (Resource)succSource.getValue().get(RootElementKeys.PRESENTATION.toString()));
}
}
return docCount;
}
private boolean tryRemapping(Collection<Map<String, Object>> remapSources) {
List<Resource> resources = new ArrayList<Resource>(remapSources.size());
......@@ -364,4 +310,5 @@ public class IndexingServiceImpl extends BaseEsServiceImpl implements IndexingSe
}
return true;
}
}
package eu.dariah.de.search.service;
import de.unibamberg.minf.processing.consumption.ResourceConsumptionService;
import eu.dariah.de.search.es.client.base.BaseEsClient;
public interface ResourceIndexingService extends BaseEsClient, ResourceConsumptionService {
}
package eu.dariah.de.search.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import de.unibamberg.minf.processing.exception.ProcessingConfigException;
import de.unibamberg.minf.processing.model.base.Resource;
import eu.dariah.de.search.es.service.IndexingService;
import eu.dariah.de.search.transformation.ResourceEnrichmentService;
@Component
@Scope("prototype")
public class ResourceIndexingServiceImpl implements ResourceIndexingService {
protected final Logger logger = LoggerFactory.getLogger(ResourceIndexingServiceImpl.class);
public static final String GSMODEL_IDENTIFIER = "Indexing.Identifier";
public static final String GSMODEL_IMAGE = "Indexing.Image";
public static final int BULK_CHUNK_SIZE = 10;
@Autowired private ResourceEnrichmentService enrichmentService;
@Autowired private IndexingService indexingService;
@Value("${datamodels.integration}")
private String integrationModelEntityId;
@Value("${datamodels.presentation}")
private String presentationModelEntityId;
@Value("${debugging.indexing.disable_downloads:#{false}}")
private boolean disableDownloads;
@Value("${indexing.resources.index_errors:#{true}}")
private boolean indexErrors;
private String index;
private String type;
private String datasourceId;
private String endpointId;
private boolean initialized = false;
public String getIndex() { return index; }
public void setIndex(String index) { this.index = index; }
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getDatasourceId() { return datasourceId; }
public void setDatasourceId(String datasourceId) { this.datasourceId = datasourceId; }
public String getEndpointId() { return endpointId; }
public void setEndpointId(String endpointId) { this.endpointId = endpointId; }
public boolean isInitialized() { return initialized; }
@Override
public void init(String datamodelId) {
if (!initialized) {
try {
logger.debug("Initializing new indexing client...");
indexingService.init(index, type, datamodelId, endpointId);
enrichmentService.init(datamodelId);
} catch (ProcessingConfigException e) {
logger.error("Failed to initialize indexing client", e);
}
}
}
@Override
public void consume(Resource resource) {
this.enrichmentService.consume(resource);
// Chunk-processing incoming data
if (this.enrichmentService.getResourceBatch().size()>=BULK_CHUNK_SIZE) {
this.commit();
}
}
@Override
public int commit() {
this.enrichmentService.commit();
indexingService.appendMetadata(this.enrichmentService.getResourceBatch());
indexingService.indexResources(this.enrichmentService.getResourceBatch());
this.enrichmentService.reset();
return 0;
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment