001// SPDX-License-Identifier: GPL-3.0-or-later
002
003package es.uvigo.esei.sing.textproc.step;
004
005import java.io.PrintStream;
006import java.util.ArrayList;
007import java.util.Collections;
008import java.util.HashMap;
009import java.util.HashSet;
010import java.util.List;
011import java.util.Map;
012import java.util.Map.Entry;
013import java.util.Set;
014import java.util.function.Consumer;
015import java.util.function.Predicate;
016import java.util.function.Supplier;
017import java.util.logging.Level;
018import java.util.stream.Stream;
019
020import javax.persistence.EntityManager;
021import javax.persistence.EntityTransaction;
022import javax.persistence.PersistenceException;
023import javax.persistence.Query;
024import javax.persistence.criteria.CriteriaDelete;
025
026import es.uvigo.esei.sing.textproc.entity.ProcessedDocument;
027import es.uvigo.esei.sing.textproc.logging.TextProcLogging;
028import es.uvigo.esei.sing.textproc.persistence.TextProcPersistence;
029import es.uvigo.esei.sing.textproc.step.internal.ProcessingStepInterface;
030import es.uvigo.esei.sing.textproc.step.xml.definition.BatchSizeProcessingStepParameter;
031import es.uvigo.esei.sing.textproc.step.xml.definition.PageSizeProcessingStepParameter;
032import es.uvigo.esei.sing.textproc.step.xml.definition.PrimaryKeyColumnProcessingStepParameter;
033import es.uvigo.esei.sing.textproc.step.xml.definition.TextColumnProcessingStepParameter;
034import es.uvigo.esei.sing.textproc.step.xml.definition.TextDocumentTableNameProcessingStepParameter;
035import es.uvigo.esei.sing.textproc.step.xml.definition.TextDocumentWithTitleTableNameProcessingStepParameter;
036import es.uvigo.esei.sing.textproc.step.xml.definition.TitleColumnProcessingStepParameter;
037import lombok.NonNull;
038import me.tongfei.progressbar.DelegatingProgressBarConsumer;
039import me.tongfei.progressbar.ProgressBar;
040import me.tongfei.progressbar.ProgressBarBuilder;
041import me.tongfei.progressbar.ProgressBarStyle;
042
043/**
044 * Contains parameter validation logic common to processing steps, reducing the
045 * effort needed to implement the {@link ProcessingStepInterface} interface and
046 * ensuring all processing steps behave in a consistent manner.
047 *
048 * @author Alejandro González García
049 * @implNote The implementation of this class is not thread safe.
050 */
051public abstract class AbstractProcessingStep implements ProcessingStepInterface {
052        // Common step parameter names
053        protected static final String PAGE_SIZE_STEP_PARAMETER_NAME = new PageSizeProcessingStepParameter().getName();
054        protected static final String BATCH_SIZE_STEP_PARAMETER_NAME = new BatchSizeProcessingStepParameter().getName();
055        protected static final String TEXT_DOCUMENT_WITH_TITLE_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME = new TextDocumentWithTitleTableNameProcessingStepParameter().getName();
056        protected static final String TEXT_DOCUMENT_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME = new TextDocumentTableNameProcessingStepParameter().getName();
057        protected static final String PRIMARY_KEY_COLUMN_PROCESSING_STEP_PARAMETER_NAME = new PrimaryKeyColumnProcessingStepParameter().getName();
058        protected static final String TEXT_COLUMN_PROCESSING_STEP_PARAMETER_NAME = new TextColumnProcessingStepParameter().getName();
059        protected static final String TITLE_COLUMN_PROCESSING_STEP_PARAMETER_NAME = new TitleColumnProcessingStepParameter().getName();
060
061        // Common step parameter default values
062        /**
063         * The default page size. Increase for optimal performance until memory usage,
064         * DB commit performance or transaction commit frequency are an issue. Ideally,
065         * the page size should be a multiple of the batch size.
066         */
067        protected static final String DEFAULT_PAGE_SIZE_STEP_PARAMETER = "32768";
068        /**
069         * The default batch size. The documents in a page will be divided in batches
070         * with this many documents, as much as possible. The documents in a batch will
071         * be processed together, in the same thread.
072         */
073        protected static final String DEFAULT_BATCH_SIZE_STEP_PARAMETER = "512";
074        protected static final String DEFAULT_PRIMARY_KEY_COLUMN_PROCESSING_STEP_PARAMETER = "id";
075        protected static final String DEFAULT_TEXT_COLUMN_PROCESSING_STEP_PARAMETER = "text";
076        protected static final String DEFAULT_TITLE_COLUMN_PROCESSING_STEP_PARAMETER = "title";
077
078        private final Map<String, Predicate<String>> validationPredicates;
079        private final Set<String> requiredParameters;
080        private Map<String, String> parameters = null;
081        private volatile boolean databaseEntitiesChanged = false;
082
083        // Save System.out value at class initialization time so
084        // we're not affected by steps reassigning System.out
085        private static final PrintStream STDOUT = System.out;
086        private final DelegatingProgressBarConsumer progressBarConsumer = new DelegatingProgressBarConsumer(
087                (final String str) -> {
088                        STDOUT.print('\r');
089                        STDOUT.print(str);
090                }
091        );
092
093        protected static final String DATA_ACCESS_EXCEPTION_MESSAGE = "An exception occurred during a data access operation";
094        /**
095         * The names of all the unprocessed document types.
096         */
097        protected final List<String> unprocessedDocumentTypesNames = List.of(
098                "titled docs",
099                "untitled docs"
100        );
101        /**
102         * The native query suppliers for all the known types of unprocessed documents,
103         * in the same order as {@code unprocessedDocumentTypesNames}.
104         */
105        protected final List<Supplier<? extends Query>> unprocessedDocumentsQuerySuppliers = List.of(
106                () -> TextProcPersistence.get().getEntityManager().createNativeQuery(buildUnprocessedDocumentWithTitleSelectStatement()),
107                () -> TextProcPersistence.get().getEntityManager().createNativeQuery(buildUnprocessedDocumentSelectStatement())
108        );
109        /**
110         * The non primary key attribute names of all unprocessed document types, in the
111         * same order as {@code unprocessedDocumentTypesNames}.
112         */
113        protected final List<String[]> unprocessedDocumentsAttributes = List.of(
114                new String[] { "title", "text" },
115                new String[] { "text" }
116        );
117        /**
118         * Suppliers that count how many unprocessed entities of a type there are. The
119         * list is in the same order as {@code unprocessedDocumentTypesNames}.
120         */
121        protected final List<Supplier<Long>> numberOfUnprocessedEntitiesProviders = List.of(
122                () -> getUnprocessedDocumentsWithTitle(),
123                () -> getUnprocessedDocuments()
124        );
125
126        /**
127         * Constructs a new abstract processing step, with the given parameter
128         * validation predicates and required parameters. Common validation parameters
129         * will be added automatically.
130         *
131         * @param validationPredicates The validation predicates to use to validate the
132         *                             parameters, including optional ones.
133         * @param requiredParameters   A set of parameter names whose presence is
134         *                             required.
135         * @throws IllegalArgumentException If any parameter is {@code null}.
136         */
137        protected AbstractProcessingStep(
138                @NonNull final Map<String, Predicate<String>> validationPredicates, @NonNull final Set<String> requiredParameters
139        ) {
140                final Map<String, Predicate<String>> commonValidationPredicates = Map.of(
141                        PAGE_SIZE_STEP_PARAMETER_NAME, (final String value) -> {
142                                try {
143                                        if (Integer.parseInt(value) < 1) {
144                                                throw new NumberFormatException();
145                                        }
146
147                                        return true;
148                                } catch (final NumberFormatException exc) {
149                                        return false;
150                                }
151                        },
152                        BATCH_SIZE_STEP_PARAMETER_NAME, (final String value) -> {
153                                try {
154                                        final int actualValue = Integer.parseInt(value);
155                                        if (actualValue < 1) {
156                                                throw new NumberFormatException();
157                                        }
158
159                                        return
160                                                actualValue <= Integer.parseInt(
161                                                        getParameters().getOrDefault(PAGE_SIZE_STEP_PARAMETER_NAME, DEFAULT_PAGE_SIZE_STEP_PARAMETER)
162                                                );
163                                } catch (final NumberFormatException exc) {
164                                        return false;
165                                }
166                        },
167                        TEXT_DOCUMENT_WITH_TITLE_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME, (final String value) ->
168                                value != null && !value.isBlank(),
169                        TEXT_DOCUMENT_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME, (final String value) ->
170                                value != null && !value.isBlank(),
171                        PRIMARY_KEY_COLUMN_PROCESSING_STEP_PARAMETER_NAME, (final String value) ->
172                                value != null && !value.isBlank(),
173                        TEXT_COLUMN_PROCESSING_STEP_PARAMETER_NAME, (final String value) ->
174                                value != null && !value.isBlank(),
175                        TITLE_COLUMN_PROCESSING_STEP_PARAMETER_NAME, (final String value) ->
176                                value != null && !value.isBlank()
177                );
178
179                final Set<String> commonRequiredParameters = Set.of(
180                        TEXT_DOCUMENT_WITH_TITLE_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME,
181                        TEXT_DOCUMENT_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME
182                );
183
184                final Map<String, Predicate<String>> actualValidationPredicates = new HashMap<>(
185                        (int) Math.ceil((commonValidationPredicates.size() + validationPredicates.size()) / 0.75)
186                );
187                final Set<String> actualRequiredParameters = new HashSet<>(
188                        (int) Math.ceil((commonRequiredParameters.size() + requiredParameters.size()) / 0.75)
189                );
190
191                actualValidationPredicates.putAll(commonValidationPredicates);
192                actualValidationPredicates.putAll(validationPredicates);
193                actualRequiredParameters.addAll(commonRequiredParameters);
194                actualRequiredParameters.addAll(requiredParameters);
195
196                this.validationPredicates = Collections.unmodifiableMap(actualValidationPredicates);
197                this.requiredParameters = Collections.unmodifiableSet(actualRequiredParameters);
198        }
199
200        /**
201         * {@inheritDoc}
202         *
203         * @implNote The implementation of this method saves the user provides parameter
204         *           values, executes their validation predicates, and then invokes
205         *           {@link #run()}, making sure a JPA transaction has begun.
206         */
207        @Override
208        public final void execute(@NonNull final Map<String, String> parameters) throws ProcessingException {
209                final EntityTransaction entityTransaction = TextProcPersistence.get().getEntityManager().getTransaction();
210                final String stepName = getClass().getSimpleName();
211                boolean startedTransaction = false;
212                boolean transactionSuccessful = true;
213
214                this.parameters = parameters;
215
216                System.out.println();
217                validateParameters();
218
219                try {
220                        if (!entityTransaction.isActive()) {
221                                entityTransaction.begin();
222                                startedTransaction = true;
223                        }
224
225                        System.out.print("> Executing ");
226                        System.out.print(stepName);
227                        System.out.println("...");
228
229                        run();
230
231                        System.out.println();
232                        System.out.print("> ");
233                        System.out.print(stepName);
234                        System.out.println(" done.");
235                } catch (final ProcessingException exc) {
236                        transactionSuccessful = false;
237                        throw exc;
238                } catch (final PersistenceException exc) {
239                        transactionSuccessful = false;
240                        throw new ProcessingException(DATA_ACCESS_EXCEPTION_MESSAGE, exc);
241                } finally {
242                        if (startedTransaction && !transactionSuccessful) {
243                                entityTransaction.setRollbackOnly();
244                        }
245
246                        // Silently flush changes, just in case it was not done per page
247                        TextProcPersistence.get().flushEntities();
248                }
249        }
250
251        /**
252         * Validates the user-specified parameter map. This method doesn't have any side
253         * effect if validation is successful.
254         *
255         * @throws ProcessingException If the validation is unsuccessful.
256         */
257        private void validateParameters() throws ProcessingException {
258                for (final String requiredParameter : requiredParameters) {
259                        if (!parameters.containsKey(requiredParameter)) {
260                                throw new ProcessingException(
261                                        "Missing required parameter for step " + getClass().getSimpleName() + ": " +
262                                        requiredParameter
263                                );
264                        }
265                }
266
267                for (final Entry<String, String> entry : parameters.entrySet()) {
268                        if (!validationPredicates.get(entry.getKey()).test(entry.getValue())) {
269                                throw new ProcessingException(
270                                        "The value for the parameter " + entry.getKey() + " in the " + getClass().getSimpleName() + " step is not valid"
271                                );
272                        }
273                }
274        }
275
276        /**
277         * Returns the parameters provided by the user for this step.
278         *
279         * @return An unmodifiable, non-null map with the step parameters, where the
280         *         keys are the parameter names.
281         */
282        protected final Map<String, String> getParameters() {
283                return parameters;
284        }
285
286        /**
287         * Executes the given action for each batch of documents retrieved by a native
288         * JPA query. For the purposes of this method, native JPA queries are to be used
289         * when the queried table doesn't have a configured mapping entity. Therefore,
290         * it returns the value of the queried columns as strings, not performing any
291         * relational to object mapping beyond that.
292         * <p>
293         * To maximize performance, the processing action may be executed in any thread,
294         * so thread-safety must be guaranteed in its implementation if shared state is
295         * to be accessed.
296         * </p>
297         * <p>
298         * This method assumes a transaction is already active.
299         * </p>
300         *
301         * @param querySupplier     A query supplier, that must return an appropriate,
302         *                          non-null native query object when invoked. This
303         *                          allows recreating the query object when needed.
304         * @param taskName          The name of the task that will be performed with the
305         *                          documents. It will be shown to the user.
306         * @param numberOfDocuments The total number of documents that will be processed
307         *                          by the action. It must be zero or greater.
308         * @param action            The action to execute for every batch of documents.
309         *                          A batch contains at least one document. The list
310         *                          supplied to the consumer is not modifiable.
311         * @param pageEndAction     The action to execute after a document page is
312         *                          processed, if processing is successful. It might be
313         *                          {@code null}, in which case nothing will be done. In
314         *                          any case, no matter if processing is successful or
315         *                          not, any database transactions made by calling
316         *                          methods of this class are committed or rolled back
317         *                          before invoking this action.
318         * @throws ProcessingException If any parameter is invalid, or an exception
319         *                             occurred during the processing.
320         */
321        protected final void forEachDocumentInNativeQuery(
322                @NonNull final Supplier<? extends Query> querySupplier, @NonNull final String taskName, final long numberOfDocuments,
323                @NonNull final ProcessingConsumer<List<String[]>> action,
324                final Runnable pageEndAction
325        ) throws ProcessingException {
326                long numberOfPages;
327                long numberOfSteps;
328                final int pageSize = Integer.parseInt(
329                        getParameters().getOrDefault(PAGE_SIZE_STEP_PARAMETER_NAME, DEFAULT_PAGE_SIZE_STEP_PARAMETER)
330                );
331                final int batchSize = Integer.parseInt(
332                        getParameters().getOrDefault(BATCH_SIZE_STEP_PARAMETER_NAME, DEFAULT_BATCH_SIZE_STEP_PARAMETER)
333                );
334                final List<String[]> entityAttributesBatch = new ArrayList<>(batchSize);
335
336                numberOfPages = numberOfDocuments / pageSize + (numberOfDocuments % pageSize == 0 ? 0 : 1);
337                numberOfSteps =
338                        // At least one step per page
339                        numberOfPages +
340                        // For each surely complete page (every page except the last), there can be an extra step
341                        // because of documents that didn't complete a batch
342                        (numberOfPages - 1) * (pageSize % batchSize == 0 ? 0 : 1) +
343                        // For the last page, there can be an extra step too (but it might not be a complete page)
344                        ((numberOfDocuments % pageSize) % batchSize == 0 ? 0 : 1);
345
346                try {
347                        for (int page = 0; page < numberOfPages; ++page) {
348                                final Query query = querySupplier.get();
349
350                                if (query == null) {
351                                        throw new ProcessingException("The query supplier returned a null query");
352                                }
353
354                                query.setFirstResult(page * pageSize);
355                                query.setMaxResults(pageSize);
356
357                                // We do not use getResultStream() to increase the chance that the resulting stream has a defined size
358                                final Stream<?> resultStream = query.getResultList().parallelStream();
359
360                                ProgressBar.wrap(
361                                        resultStream,
362                                        new ProgressBarBuilder()
363                                                .setConsumer(progressBarConsumer)
364                                                .setTaskName(taskName + " (" + (page + 1) + "/" + numberOfSteps + ")")
365                                                .setStyle(ProgressBarStyle.ASCII)
366                                                .showSpeed()
367                                ).forEach((final Object result) -> {
368                                        String[] attributes;
369                                        List<String[]> currentBatch = null;
370
371                                        try {
372                                                if (result.getClass().isArray()) {
373                                                        // Several columns in result. Convert them to strings
374                                                        // (casting Object[] to String[] doesn't work)
375                                                        attributes = new String[((Object[]) result).length];
376        
377                                                        for (int i = 0; i < attributes.length; ++i) {
378                                                                attributes[i] = columnAttributeToString(((Object[]) result)[i]);
379                                                        }
380                                                } else {
381                                                        // One column in result. Wrap it in an array
382                                                        attributes = new String[] { columnAttributeToString(result) };
383                                                }
384
385                                                synchronized (entityAttributesBatch) {
386                                                        entityAttributesBatch.add(attributes);
387
388                                                        // Batch completed. Copy it so we exit the critical section ASAP,
389                                                        // and we can add documents to a new batch without the consumer noticing
390                                                        if (entityAttributesBatch.size() >= batchSize) {
391                                                                currentBatch = List.copyOf(entityAttributesBatch);
392                                                                entityAttributesBatch.clear();
393                                                        }
394                                                }
395
396                                                // Accept the batch. This can block until the batch is processed,
397                                                // but as we are outside a critical section it's not a bottleneck
398                                                if (currentBatch != null) {
399                                                        action.accept(currentBatch);
400                                                }
401                                        } catch (final Exception exc) {
402                                                // Exceptions thrown by this method get silently discarded.
403                                                // Handle that by logging them
404                                                TextProcLogging.getLogger().log(
405                                                        Level.WARNING, "An exception occurred while processing a batch of documents. Skipping...", exc
406                                                );
407                                        }
408                                });
409
410                                // Accept any remaining entity that did not make it to a complete batch
411                                try {
412                                        synchronized (entityAttributesBatch) {
413                                                final int lastBatchSize = entityAttributesBatch.size();
414
415                                                if (lastBatchSize > 0) {
416                                                        try (final ProgressBar progressBar =
417                                                                new ProgressBarBuilder()
418                                                                        .setConsumer(progressBarConsumer)
419                                                                        .setTaskName(taskName + " (" + (page + 2) + "/" + numberOfSteps + ")")
420                                                                        .setStyle(ProgressBarStyle.ASCII)
421                                                                        .showSpeed()
422                                                                        .setInitialMax(lastBatchSize)
423                                                                        .build()
424                                                        ) {
425                                                                action.accept(Collections.unmodifiableList(entityAttributesBatch));
426                                                                progressBar.stepTo(lastBatchSize);
427                                                        }
428                                                }
429                                        }
430                                } catch (final Exception exc) {
431                                        TextProcLogging.getLogger().log(
432                                                Level.WARNING, "An exception occurred while processing a batch of documents. Skipping...", exc
433                                        );
434                                }
435
436                                System.out.println();
437                                if (databaseEntitiesChanged) {
438                                        System.out.println("> Committing changes to the database...");
439                                        TextProcPersistence.get().flushEntities();
440                                        System.out.print("> Changes committed.");
441
442                                        databaseEntitiesChanged = false;
443                                }
444
445                                if (pageEndAction != null) {
446                                        pageEndAction.run();
447                                }
448                        }
449                } catch (final PersistenceException exc) {
450                        throw new ProcessingException(DATA_ACCESS_EXCEPTION_MESSAGE, exc);
451                }
452        }
453
454
455        /**
456         * Stores a processed document in the database, from its processed attributes.
457         * This method starts and commits or rollbacks a JPA transaction, if no
458         * transaction is already active.
459         *
460         * @param documentType        The type of document that is being processed, and
461         *                            will be stored. It will be instantiated via
462         *                            reflection, so the module containing the type
463         *                            definition must open its package for deep
464         *                            reflection to the module containing this code.
465         * @param primaryKey          The primary key of the processed document.
466         * @param processedAttributes The processed attributes of the document. Their
467         *                            names (keys) must match the attributes of the
468         *                            concrete document type.
469         * @throws ProcessingException      If some error occurs during the operation.
470         * @throws IllegalArgumentException If any parameter is {@code null}.
471         * @throws PersistenceException     If some data access error occurs.
472         */
473        protected final void saveProcessedDocument(
474                @NonNull final Class<? extends ProcessedDocument> documentType, final int primaryKey, @NonNull final Map<String, String> processedAttributes
475        ) throws ProcessingException {
476                final EntityManager thisThreadEntityManager = TextProcPersistence.get().getEntityManager();
477                final EntityTransaction entityTransaction = thisThreadEntityManager.getTransaction();
478                final ProcessedDocument entityToPersist;
479                boolean startedTransaction = false;
480                boolean transactionSuccessful = true;
481
482                try {
483                        entityToPersist = documentType.getConstructor(Integer.class).newInstance(primaryKey);
484                } catch (final ReflectiveOperationException exc) {
485                        throw new ProcessingException(
486                                "Document class implementation contract violation, or unappropriate permissions for reflective operation", exc
487                        );
488                }
489
490                if (!entityTransaction.isActive()) {
491                        entityTransaction.begin();
492                        startedTransaction = true;
493                }
494
495                try {
496                        thisThreadEntityManager.persist(entityToPersist);
497                        databaseEntitiesChanged = true;
498                } catch (final PersistenceException exc) {
499                        transactionSuccessful = false;
500                        throw exc;
501                } finally {
502                        if (startedTransaction && !transactionSuccessful) {
503                                entityTransaction.setRollbackOnly();
504                        }
505                }
506
507                // Change the entity processed data
508                for (final Entry<String, String> processedAttribute : processedAttributes.entrySet()) {
509                        try {
510                                final String attributeName = processedAttribute.getKey();
511
512                                documentType
513                                        .getMethod("set" + attributeName.substring(0, 1).toUpperCase() + attributeName.substring(1), String.class)
514                                        .invoke(entityToPersist, processedAttribute.getValue());
515                        } catch (final ReflectiveOperationException exc) {
516                                throw new ProcessingException(
517                                        "Couldn't invoke attribute setter", exc
518                                );
519                        }
520                }
521        }
522
523        /**
524         * Constructs the SELECT SQL statement for retrieving unprocessed text documents
525         * with title from a database, using native queries.
526         *
527         * @return The described statement.
528         */
529        protected final String buildUnprocessedDocumentWithTitleSelectStatement() {
530                final Map<String, String> parameters = getParameters();
531                final StringBuilder dmlSentenceBuilder = new StringBuilder(96);
532
533                dmlSentenceBuilder.append("SELECT ");
534                dmlSentenceBuilder.append(
535                        parameters.getOrDefault(PRIMARY_KEY_COLUMN_PROCESSING_STEP_PARAMETER_NAME, DEFAULT_PRIMARY_KEY_COLUMN_PROCESSING_STEP_PARAMETER)
536                );
537                dmlSentenceBuilder.append(", ");
538                dmlSentenceBuilder.append(
539                        parameters.getOrDefault(TITLE_COLUMN_PROCESSING_STEP_PARAMETER_NAME, DEFAULT_TITLE_COLUMN_PROCESSING_STEP_PARAMETER)
540                );
541                dmlSentenceBuilder.append(", ");
542                dmlSentenceBuilder.append(
543                        parameters.getOrDefault(TEXT_COLUMN_PROCESSING_STEP_PARAMETER_NAME, DEFAULT_TEXT_COLUMN_PROCESSING_STEP_PARAMETER)
544                );
545                dmlSentenceBuilder.append(" FROM ");
546                dmlSentenceBuilder.append(
547                        parameters.get(TEXT_DOCUMENT_WITH_TITLE_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME)
548                );
549
550                return dmlSentenceBuilder.toString();
551        }
552
553        /**
554         * Constructs the SELECT SQL statement for retrieving unprocessed text documents
555         * without title from a database, using native queries.
556         *
557         * @return The described statement.
558         */
559        protected final String buildUnprocessedDocumentSelectStatement() {
560                final Map<String, String> parameters = getParameters();
561                final StringBuilder dmlSentenceBuilder = new StringBuilder(64);
562
563                dmlSentenceBuilder.append("SELECT ");
564                dmlSentenceBuilder.append(
565                        parameters.getOrDefault(PRIMARY_KEY_COLUMN_PROCESSING_STEP_PARAMETER_NAME, DEFAULT_PRIMARY_KEY_COLUMN_PROCESSING_STEP_PARAMETER)
566                );
567                dmlSentenceBuilder.append(", ");
568                dmlSentenceBuilder.append(
569                        parameters.getOrDefault(TEXT_COLUMN_PROCESSING_STEP_PARAMETER_NAME, DEFAULT_TEXT_COLUMN_PROCESSING_STEP_PARAMETER)
570                );
571                dmlSentenceBuilder.append(" FROM ");
572                dmlSentenceBuilder.append(
573                        parameters.get(TEXT_DOCUMENT_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME)
574                );
575
576                return dmlSentenceBuilder.toString();
577        }
578
579        /**
580         * Returns the number of unprocessed text documents with title in the database.
581         * This method assumes a transaction is already active.
582         *
583         * @return The described number.
584         * @throws PersistenceException If some error occurs while executing SQL
585         *                              statements in the database.
586         */
587        protected final long getUnprocessedDocumentsWithTitle() {
588                return ((Number) TextProcPersistence.get().getEntityManager().createNativeQuery(
589                        "SELECT COUNT(*) FROM " +
590                        getParameters().get(TEXT_DOCUMENT_WITH_TITLE_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME)
591                ).getSingleResult()).longValue();
592        }
593
594        /**
595         * Returns the number of unprocessed text documents without title in the
596         * database. This method assumes a transaction is already active.
597         *
598         * @return The described number.
599         * @throws PersistenceException If some error occurs while executing SQL
600         *                              statements in the database.
601         */
602        protected final long getUnprocessedDocuments() {
603                return ((Number) TextProcPersistence.get().getEntityManager().createNativeQuery(
604                        "SELECT COUNT(*) FROM " +
605                        getParameters().get(TEXT_DOCUMENT_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME)
606                ).getSingleResult()).longValue();
607        }
608
609        /**
610         * Deletes all the processed documents of a given type from the database.
611         *
612         * @param <T>          The type of documents to delete.
613         * @param documentType The type of documents to delete.
614         * @throws IllegalArgumentException If {@code documentType} is {@code null}.
615         * @throws PersistenceException     If some error occurs while executing SQL
616         *                                  statements in the database.
617         */
618        protected final <T extends ProcessedDocument> void deleteAllProcessedDocumentsOfType(@NonNull final Class<T> documentType) {
619                final EntityManager entityManager = TextProcPersistence.get().getEntityManager();
620
621                final CriteriaDelete<T> deleteCriteria = entityManager.getCriteriaBuilder().createCriteriaDelete(documentType);
622                deleteCriteria.from(documentType);
623
624                System.out.println("> Deleting " + documentType.getSimpleName() + " entities...");
625                entityManager.createQuery(deleteCriteria).executeUpdate();
626
627                databaseEntitiesChanged = true;
628        }
629
630        /**
631         * Executes the processing step implemented by this object. The processing step
632         * parameters are already validated and available upon request on
633         * {@link #getParameters()}. This method is invoked in the context of a JPA
634         * transaction that is started and committed or rolled back automatically.
635         *
636         * @throws ProcessingException If an exception occurs during execution.
637         */
638        protected abstract void run() throws ProcessingException;
639
640        /**
641         * Converts a column attribute value to a string.
642         *
643         * @param attribute The attribute to convert.
644         * @return The converted attribute value.
645         * @throws ProcessingException If the value could not be converted.
646         */
647        private String columnAttributeToString(final Object attribute) throws ProcessingException {
648                if (attribute != null) {
649                        return attribute.toString();
650                } else {
651                        throw new ProcessingException("Unexpected column type");
652                }
653        }
654
655        /**
656         * A consumer of data to be processed, which can throw a checked
657         * {@link ProcessingException}.
658         *
659         * @author Alejandro González García
660         *
661         * @param <T> The type of data to be processed.
662         * @see Consumer
663         */
664        @FunctionalInterface
665        public static interface ProcessingConsumer<T> {
666                /**
667                 * Performs the processing operation on the given argument.
668                 *
669                 * @param t The input argument.
670                 */
671                public void accept(final T t) throws ProcessingException;
672        }
673
674        /**
675         * A processing consumer that does nothing. It never throws exceptions.
676         *
677         * @author Alejandro González García
678         *
679         * @param <T> The type of objects consumed by this consumer.
680         */
681        public static final class NullProcessingConsumer<T> implements ProcessingConsumer<T> {
682                @Override
683                public void accept(final T t) throws ProcessingException {
684                        // Do nothing
685                }
686        }
687}