001// SPDX-License-Identifier: GPL-3.0-or-later 002 003package es.uvigo.esei.sing.textproc.abstracttppstep; 004 005import static es.uvigo.esei.sing.textproc.abstracttppstep.TppHelper.processAttributes; 006 007import java.util.ArrayList; 008import java.util.Collections; 009import java.util.Formatter; 010import java.util.HashMap; 011import java.util.HashSet; 012import java.util.Iterator; 013import java.util.List; 014import java.util.Map; 015import java.util.Map.Entry; 016import java.util.Set; 017import java.util.function.Predicate; 018 019import javax.json.JsonObject; 020import javax.json.stream.JsonGenerator; 021import javax.persistence.PersistenceException; 022import javax.ws.rs.client.Client; 023import javax.ws.rs.client.ClientBuilder; 024import javax.ws.rs.client.WebTarget; 025 026import es.uvigo.esei.sing.textproc.entity.ProcessedDocument; 027import es.uvigo.esei.sing.textproc.abstracttppstep.xml.definition.EndpointProcessingStepParameter; 028import es.uvigo.esei.sing.textproc.step.AbstractProcessingStep; 029import es.uvigo.esei.sing.textproc.step.ProcessingException; 030import lombok.NonNull; 031 032/** 033 * Handles common processing logic to all processing steps that call the Text 034 * Processing Python web service for processing documents. 035 * 036 * @author Alejandro González García 037 */ 038public abstract class AbstractTppProcessingStep extends AbstractProcessingStep { 039 private static final String ENDPOINT_PROCESSING_STEP_PARAMETER_NAME = new EndpointProcessingStepParameter().getName(); 040 041 private final List<Class<? extends ProcessedDocument>> processedDocumentTypes; 042 private final String stepDescriptionFormatString; 043 private final JsonResponseAttributeType jsonResponseAttributeType; 044 private final ProcessingConsumer<? super JsonGenerator> requestParametersAction; 045 private final ProcessingBiConsumer<? super Entry<String, JsonObject>, ? super Map<String, String>> storeProcessedAttributeAction; 046 047 /** 048 * Creates a new processing step that processes documents via calls to the Text 049 * Processing Python web service. 050 * 051 * @param validationPredicates A map with additional parameter 052 * validation predicates, specific to the 053 * concrete processing step. 054 * @param requiredParameters A set with additional mandatory 055 * parameters, specific to this processing 056 * step. 057 * @param processedDocumentTypes The processed entity types generated by 058 * the particular processing step. At the 059 * moment, this must be a list of two 060 * processed document types, where the 061 * first one is interpreted as the 062 * resulting type for titled documents, and 063 * the second one is the resulting type for 064 * untitled documents. The format of this 065 * list may be changed in the future if 066 * more unprocessed document types are 067 * added. 068 * @param stepDescriptionFormatString A format string (as defined by 069 * {@link Formatter}) that will be used for 070 * generating a user-friendly step activity 071 * description string. It will receive a 072 * single string parameter with the name of 073 * the unprocessed entity that is being 074 * processed. 075 * @param jsonResponseAttributeType The expected JSON type of processed 076 * document attributes in the web service 077 * response. 078 * @param requestParametersAction The action to execute to populate the 079 * request object with parameters, after 080 * the documents object. 081 * @param storeProcessedAttributeAction An action that receives the JSON 082 * document object of the response, and is 083 * expected to put its processed form in 084 * the provided map. 085 * @throws IllegalArgumentException If any parameter is {@code null} or invalid. 086 */ 087 protected AbstractTppProcessingStep( 088 final Map<String, Predicate<String>> validationPredicates, final Set<String> requiredParameters, 089 @NonNull final List<Class<? extends ProcessedDocument>> processedDocumentTypes, 090 @NonNull final String stepDescriptionFormatString, @NonNull final JsonResponseAttributeType jsonResponseAttributeType, 091 @NonNull final ProcessingConsumer<? super JsonGenerator> requestParametersAction, 092 @NonNull final ProcessingBiConsumer<? super Entry<String, JsonObject>, ? super Map<String, String>> storeProcessedAttributeAction 093 ) { 094 super( 095 getActualValidationPredicates(validationPredicates), 096 getActualRequiredParameters(requiredParameters) 097 ); 098 099 if (processedDocumentTypes.size() != 2) { 100 throw new IllegalArgumentException( 101 "There must be only one result entity type for each type of unprocessed document" 102 ); 103 } 104 105 try { 106 if (processedDocumentTypes.contains(null)) { 107 throw new IllegalArgumentException("A processed document type can't be null"); 108 } 109 } catch (final NullPointerException ignored) {} 110 111 this.processedDocumentTypes = Collections.unmodifiableList(new ArrayList<>(processedDocumentTypes)); 112 this.stepDescriptionFormatString = stepDescriptionFormatString; 113 this.jsonResponseAttributeType = jsonResponseAttributeType; 114 this.requestParametersAction = requestParametersAction; 115 this.storeProcessedAttributeAction = storeProcessedAttributeAction; 116 } 117 118 @Override 119 protected final void run() throws ProcessingException { 120 final Client wsClient = ClientBuilder.newClient(); 121 122 try { 123 final WebTarget target = wsClient.target(getParameters().get(ENDPOINT_PROCESSING_STEP_PARAMETER_NAME)); 124 125 // Delete previous results 126 for (final Class<? extends ProcessedDocument> processedDocumentType : processedDocumentTypes) { 127 deleteAllProcessedDocumentsOfType(processedDocumentType); 128 } 129 130 // Do the actual processing 131 for (int i = 0; i < processedDocumentTypes.size(); ++i) { 132 final String[] unprocessedAttributeNames = unprocessedDocumentsAttributes.get(i); 133 final Class<? extends ProcessedDocument> processedDocumentType = processedDocumentTypes.get(i); 134 135 forEachDocumentInNativeQuery( 136 unprocessedDocumentsQuerySuppliers.get(i), 137 String.format(stepDescriptionFormatString, unprocessedDocumentTypesNames.get(i)), 138 numberOfUnprocessedEntitiesProviders.get(i).get(), 139 (final List<String[]> batchAttributes) -> { 140 // Process entity attributes 141 final List<Map<String, String>> processedDocs = processAttributes( 142 batchAttributes, 1, unprocessedAttributeNames, target, 143 requestParametersAction, jsonResponseAttributeType, 144 storeProcessedAttributeAction 145 ); 146 // No benefit in iterating with several threads as order matters 147 final Iterator<Map<String, String>> processedDocsIter = processedDocs.iterator(); 148 149 // Persist the resulting processed documents 150 for (final String[] completeAttributes : batchAttributes) { 151 saveProcessedDocument( 152 processedDocumentType, Integer.parseInt(completeAttributes[0]), 153 processedDocsIter.next() // Same size than attributes 154 ); 155 } 156 }, 157 null 158 ); 159 } 160 } catch (final IllegalArgumentException | PersistenceException exc) { 161 throw new ProcessingException(DATA_ACCESS_EXCEPTION_MESSAGE, exc); 162 } finally { 163 wsClient.close(); 164 } 165 } 166 167 /** 168 * Returns a map with the provided validation predicates, plus any validation 169 * predicates common to all Text Processing Python processing steps. 170 * 171 * @param validationPredicates The validation predicates of the specific step. 172 * @return The described map. 173 * @throws IllegalArgumentException If {@code validationPredicates} is {@code null}. 174 */ 175 private static Map<String, Predicate<String>> getActualValidationPredicates(@NonNull final Map<String, Predicate<String>> validationPredicates) { 176 final Map<String, Predicate<String>> commonValidationPredicates = Map.of( 177 ENDPOINT_PROCESSING_STEP_PARAMETER_NAME, (final String value) -> value != null && !value.isBlank() 178 ); 179 180 final Map<String, Predicate<String>> actualValidationPredicates = new HashMap<>( 181 (int) Math.ceil((commonValidationPredicates.size() + validationPredicates.size()) / 0.75) 182 ); 183 184 actualValidationPredicates.putAll(commonValidationPredicates); 185 actualValidationPredicates.putAll(validationPredicates); 186 187 return actualValidationPredicates; 188 } 189 190 /** 191 * Returns a set with the provided required parameters, plus any required 192 * parameter common to all Text Processing Python processing steps. 193 * 194 * @param requiredParameters The required parameters for the specific step. 195 * @return The described set. 196 * @throws IllegalArgumentException If {@code requiredParameters} is 197 * {@code null}. 198 */ 199 private static Set<String> getActualRequiredParameters(@NonNull final Set<String> requiredParameters) { 200 final Set<String> commonRequiredParameters = Set.of( 201 ENDPOINT_PROCESSING_STEP_PARAMETER_NAME 202 ); 203 204 final Set<String> actualRequiredParameters = new HashSet<>( 205 (int) Math.ceil((commonRequiredParameters.size() + requiredParameters.size()) / 0.75) 206 ); 207 208 actualRequiredParameters.addAll(commonRequiredParameters); 209 actualRequiredParameters.addAll(requiredParameters); 210 211 return actualRequiredParameters; 212 } 213}