001// SPDX-License-Identifier: GPL-3.0-or-later
002
003package es.uvigo.esei.sing.textproc.abstracttppstep;
004
005import static es.uvigo.esei.sing.textproc.abstracttppstep.TppHelper.processAttributes;
006
007import java.util.ArrayList;
008import java.util.Collections;
009import java.util.Formatter;
010import java.util.HashMap;
011import java.util.HashSet;
012import java.util.Iterator;
013import java.util.List;
014import java.util.Map;
015import java.util.Map.Entry;
016import java.util.Set;
017import java.util.function.Predicate;
018
019import javax.json.JsonObject;
020import javax.json.stream.JsonGenerator;
021import javax.persistence.PersistenceException;
022import javax.ws.rs.client.Client;
023import javax.ws.rs.client.ClientBuilder;
024import javax.ws.rs.client.WebTarget;
025
026import es.uvigo.esei.sing.textproc.entity.ProcessedDocument;
027import es.uvigo.esei.sing.textproc.abstracttppstep.xml.definition.EndpointProcessingStepParameter;
028import es.uvigo.esei.sing.textproc.step.AbstractProcessingStep;
029import es.uvigo.esei.sing.textproc.step.ProcessingException;
030import lombok.NonNull;
031
032/**
033 * Handles common processing logic to all processing steps that call the Text
034 * Processing Python web service for processing documents.
035 *
036 * @author Alejandro González García
037 */
038public abstract class AbstractTppProcessingStep extends AbstractProcessingStep {
039        private static final String ENDPOINT_PROCESSING_STEP_PARAMETER_NAME = new EndpointProcessingStepParameter().getName();
040
041        private final List<Class<? extends ProcessedDocument>> processedDocumentTypes;
042        private final String stepDescriptionFormatString;
043        private final JsonResponseAttributeType jsonResponseAttributeType;
044        private final ProcessingConsumer<? super JsonGenerator> requestParametersAction;
045        private final ProcessingBiConsumer<? super Entry<String, JsonObject>, ? super Map<String, String>> storeProcessedAttributeAction;
046
047        /**
048         * Creates a new processing step that processes documents via calls to the Text
049         * Processing Python web service.
050         *
051         * @param validationPredicates          A map with additional parameter
052         *                                      validation predicates, specific to the
053         *                                      concrete processing step.
054         * @param requiredParameters            A set with additional mandatory
055         *                                      parameters, specific to this processing
056         *                                      step.
057         * @param processedDocumentTypes        The processed entity types generated by
058         *                                      the particular processing step. At the
059         *                                      moment, this must be a list of two
060         *                                      processed document types, where the
061         *                                      first one is interpreted as the
062         *                                      resulting type for titled documents, and
063         *                                      the second one is the resulting type for
064         *                                      untitled documents. The format of this
065         *                                      list may be changed in the future if
066         *                                      more unprocessed document types are
067         *                                      added.
068         * @param stepDescriptionFormatString   A format string (as defined by
069         *                                      {@link Formatter}) that will be used for
070         *                                      generating a user-friendly step activity
071         *                                      description string. It will receive a
072         *                                      single string parameter with the name of
073         *                                      the unprocessed entity that is being
074         *                                      processed.
075         * @param jsonResponseAttributeType     The expected JSON type of processed
076         *                                      document attributes in the web service
077         *                                      response.
078         * @param requestParametersAction       The action to execute to populate the
079         *                                      request object with parameters, after
080         *                                      the documents object.
081         * @param storeProcessedAttributeAction An action that receives the JSON
082         *                                      document object of the response, and is
083         *                                      expected to put its processed form in
084         *                                      the provided map.
085         * @throws IllegalArgumentException If any parameter is {@code null} or invalid.
086         */
087        protected AbstractTppProcessingStep(
088                final Map<String, Predicate<String>> validationPredicates, final Set<String> requiredParameters,
089                @NonNull final List<Class<? extends ProcessedDocument>> processedDocumentTypes,
090                @NonNull final String stepDescriptionFormatString, @NonNull final JsonResponseAttributeType jsonResponseAttributeType,
091                @NonNull final ProcessingConsumer<? super JsonGenerator> requestParametersAction,
092                @NonNull final ProcessingBiConsumer<? super Entry<String, JsonObject>, ? super Map<String, String>> storeProcessedAttributeAction 
093        ) {
094                super(
095                        getActualValidationPredicates(validationPredicates),
096                        getActualRequiredParameters(requiredParameters)
097                );
098
099                if (processedDocumentTypes.size() != 2) {
100                        throw new IllegalArgumentException(
101                                "There must be only one result entity type for each type of unprocessed document"
102                        );
103                }
104
105                try {
106                        if (processedDocumentTypes.contains(null)) {
107                                throw new IllegalArgumentException("A processed document type can't be null");
108                        }
109                } catch (final NullPointerException ignored) {}
110
111                this.processedDocumentTypes = Collections.unmodifiableList(new ArrayList<>(processedDocumentTypes));
112                this.stepDescriptionFormatString = stepDescriptionFormatString;
113                this.jsonResponseAttributeType = jsonResponseAttributeType;
114                this.requestParametersAction = requestParametersAction;
115                this.storeProcessedAttributeAction = storeProcessedAttributeAction;
116        }
117
118        @Override
119        protected final void run() throws ProcessingException {
120                final Client wsClient = ClientBuilder.newClient();
121
122                try {
123                        final WebTarget target = wsClient.target(getParameters().get(ENDPOINT_PROCESSING_STEP_PARAMETER_NAME));
124
125                        // Delete previous results
126                        for (final Class<? extends ProcessedDocument> processedDocumentType : processedDocumentTypes) {
127                                deleteAllProcessedDocumentsOfType(processedDocumentType);
128                        }
129
130                        // Do the actual processing
131                        for (int i = 0; i < processedDocumentTypes.size(); ++i) {
132                                final String[] unprocessedAttributeNames = unprocessedDocumentsAttributes.get(i);
133                                final Class<? extends ProcessedDocument> processedDocumentType = processedDocumentTypes.get(i);
134
135                                forEachDocumentInNativeQuery(
136                                        unprocessedDocumentsQuerySuppliers.get(i),
137                                        String.format(stepDescriptionFormatString, unprocessedDocumentTypesNames.get(i)),
138                                        numberOfUnprocessedEntitiesProviders.get(i).get(),
139                                        (final List<String[]> batchAttributes) -> {
140                                                // Process entity attributes
141                                                final List<Map<String, String>> processedDocs = processAttributes(
142                                                        batchAttributes, 1, unprocessedAttributeNames, target,
143                                                        requestParametersAction, jsonResponseAttributeType,
144                                                        storeProcessedAttributeAction
145                                                );
146                                                // No benefit in iterating with several threads as order matters
147                                                final Iterator<Map<String, String>> processedDocsIter = processedDocs.iterator();
148
149                                                // Persist the resulting processed documents
150                                                for (final String[] completeAttributes : batchAttributes) {
151                                                        saveProcessedDocument(
152                                                                processedDocumentType, Integer.parseInt(completeAttributes[0]),
153                                                                processedDocsIter.next() // Same size than attributes
154                                                        );
155                                                }
156                                        },
157                                        null
158                                );
159                        }
160                } catch (final IllegalArgumentException | PersistenceException exc) {
161                        throw new ProcessingException(DATA_ACCESS_EXCEPTION_MESSAGE, exc);
162                } finally {
163                        wsClient.close();
164                }
165        }
166
167        /**
168         * Returns a map with the provided validation predicates, plus any validation
169         * predicates common to all Text Processing Python processing steps.
170         *
171         * @param validationPredicates The validation predicates of the specific step.
172         * @return The described map.
173         * @throws IllegalArgumentException If {@code validationPredicates} is {@code null}.
174         */
175        private static Map<String, Predicate<String>> getActualValidationPredicates(@NonNull final Map<String, Predicate<String>> validationPredicates) {
176                final Map<String, Predicate<String>> commonValidationPredicates = Map.of(
177                        ENDPOINT_PROCESSING_STEP_PARAMETER_NAME, (final String value) -> value != null && !value.isBlank()
178                );
179
180                final Map<String, Predicate<String>> actualValidationPredicates = new HashMap<>(
181                        (int) Math.ceil((commonValidationPredicates.size() + validationPredicates.size()) / 0.75)
182                );
183
184                actualValidationPredicates.putAll(commonValidationPredicates);
185                actualValidationPredicates.putAll(validationPredicates);
186
187                return actualValidationPredicates;
188        }
189
190        /**
191         * Returns a set with the provided required parameters, plus any required
192         * parameter common to all Text Processing Python processing steps.
193         *
194         * @param requiredParameters The required parameters for the specific step.
195         * @return The described set.
196         * @throws IllegalArgumentException If {@code requiredParameters} is
197         *                                  {@code null}.
198         */
199        private static Set<String> getActualRequiredParameters(@NonNull final Set<String> requiredParameters) {
200                final Set<String> commonRequiredParameters = Set.of(
201                        ENDPOINT_PROCESSING_STEP_PARAMETER_NAME
202                );
203
204                final Set<String> actualRequiredParameters = new HashSet<>(
205                        (int) Math.ceil((commonRequiredParameters.size() + requiredParameters.size()) / 0.75)
206                );
207
208                actualRequiredParameters.addAll(commonRequiredParameters);
209                actualRequiredParameters.addAll(requiredParameters);
210
211                return actualRequiredParameters;
212        }
213}