001// SPDX-License-Identifier: GPL-3.0-or-later 002 003package es.uvigo.esei.sing.textproc.step; 004 005import java.io.PrintStream; 006import java.util.ArrayList; 007import java.util.Collections; 008import java.util.HashMap; 009import java.util.HashSet; 010import java.util.List; 011import java.util.Map; 012import java.util.Map.Entry; 013import java.util.Set; 014import java.util.function.Consumer; 015import java.util.function.Predicate; 016import java.util.function.Supplier; 017import java.util.logging.Level; 018import java.util.stream.Stream; 019 020import javax.persistence.EntityManager; 021import javax.persistence.EntityTransaction; 022import javax.persistence.PersistenceException; 023import javax.persistence.Query; 024import javax.persistence.criteria.CriteriaDelete; 025 026import es.uvigo.esei.sing.textproc.entity.ProcessedDocument; 027import es.uvigo.esei.sing.textproc.logging.TextProcLogging; 028import es.uvigo.esei.sing.textproc.persistence.TextProcPersistence; 029import es.uvigo.esei.sing.textproc.step.internal.ProcessingStepInterface; 030import es.uvigo.esei.sing.textproc.step.xml.definition.BatchSizeProcessingStepParameter; 031import es.uvigo.esei.sing.textproc.step.xml.definition.PageSizeProcessingStepParameter; 032import es.uvigo.esei.sing.textproc.step.xml.definition.PrimaryKeyColumnProcessingStepParameter; 033import es.uvigo.esei.sing.textproc.step.xml.definition.TextColumnProcessingStepParameter; 034import es.uvigo.esei.sing.textproc.step.xml.definition.TextDocumentTableNameProcessingStepParameter; 035import es.uvigo.esei.sing.textproc.step.xml.definition.TextDocumentWithTitleTableNameProcessingStepParameter; 036import es.uvigo.esei.sing.textproc.step.xml.definition.TitleColumnProcessingStepParameter; 037import lombok.NonNull; 038import me.tongfei.progressbar.DelegatingProgressBarConsumer; 039import me.tongfei.progressbar.ProgressBar; 040import me.tongfei.progressbar.ProgressBarBuilder; 041import me.tongfei.progressbar.ProgressBarStyle; 042 043/** 044 * Contains parameter validation logic common to processing steps, reducing the 045 * effort needed to implement the {@link ProcessingStepInterface} interface and 046 * ensuring all processing steps behave in a consistent manner. 047 * 048 * @author Alejandro González García 049 * @implNote The implementation of this class is not thread safe. 050 */ 051public abstract class AbstractProcessingStep implements ProcessingStepInterface { 052 // Common step parameter names 053 protected static final String PAGE_SIZE_STEP_PARAMETER_NAME = new PageSizeProcessingStepParameter().getName(); 054 protected static final String BATCH_SIZE_STEP_PARAMETER_NAME = new BatchSizeProcessingStepParameter().getName(); 055 protected static final String TEXT_DOCUMENT_WITH_TITLE_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME = new TextDocumentWithTitleTableNameProcessingStepParameter().getName(); 056 protected static final String TEXT_DOCUMENT_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME = new TextDocumentTableNameProcessingStepParameter().getName(); 057 protected static final String PRIMARY_KEY_COLUMN_PROCESSING_STEP_PARAMETER_NAME = new PrimaryKeyColumnProcessingStepParameter().getName(); 058 protected static final String TEXT_COLUMN_PROCESSING_STEP_PARAMETER_NAME = new TextColumnProcessingStepParameter().getName(); 059 protected static final String TITLE_COLUMN_PROCESSING_STEP_PARAMETER_NAME = new TitleColumnProcessingStepParameter().getName(); 060 061 // Common step parameter default values 062 /** 063 * The default page size. Increase for optimal performance until memory usage, 064 * DB commit performance or transaction commit frequency are an issue. Ideally, 065 * the page size should be a multiple of the batch size. 066 */ 067 protected static final String DEFAULT_PAGE_SIZE_STEP_PARAMETER = "32768"; 068 /** 069 * The default batch size. The documents in a page will be divided in batches 070 * with this many documents, as much as possible. The documents in a batch will 071 * be processed together, in the same thread. 072 */ 073 protected static final String DEFAULT_BATCH_SIZE_STEP_PARAMETER = "512"; 074 protected static final String DEFAULT_PRIMARY_KEY_COLUMN_PROCESSING_STEP_PARAMETER = "id"; 075 protected static final String DEFAULT_TEXT_COLUMN_PROCESSING_STEP_PARAMETER = "text"; 076 protected static final String DEFAULT_TITLE_COLUMN_PROCESSING_STEP_PARAMETER = "title"; 077 078 private final Map<String, Predicate<String>> validationPredicates; 079 private final Set<String> requiredParameters; 080 private Map<String, String> parameters = null; 081 private volatile boolean databaseEntitiesChanged = false; 082 083 // Save System.out value at class initialization time so 084 // we're not affected by steps reassigning System.out 085 private static final PrintStream STDOUT = System.out; 086 private final DelegatingProgressBarConsumer progressBarConsumer = new DelegatingProgressBarConsumer( 087 (final String str) -> { 088 STDOUT.print('\r'); 089 STDOUT.print(str); 090 } 091 ); 092 093 protected static final String DATA_ACCESS_EXCEPTION_MESSAGE = "An exception occurred during a data access operation"; 094 /** 095 * The names of all the unprocessed document types. 096 */ 097 protected final List<String> unprocessedDocumentTypesNames = List.of( 098 "titled docs", 099 "untitled docs" 100 ); 101 /** 102 * The native query suppliers for all the known types of unprocessed documents, 103 * in the same order as {@code unprocessedDocumentTypesNames}. 104 */ 105 protected final List<Supplier<? extends Query>> unprocessedDocumentsQuerySuppliers = List.of( 106 () -> TextProcPersistence.get().getEntityManager().createNativeQuery(buildUnprocessedDocumentWithTitleSelectStatement()), 107 () -> TextProcPersistence.get().getEntityManager().createNativeQuery(buildUnprocessedDocumentSelectStatement()) 108 ); 109 /** 110 * The non primary key attribute names of all unprocessed document types, in the 111 * same order as {@code unprocessedDocumentTypesNames}. 112 */ 113 protected final List<String[]> unprocessedDocumentsAttributes = List.of( 114 new String[] { "title", "text" }, 115 new String[] { "text" } 116 ); 117 /** 118 * Suppliers that count how many unprocessed entities of a type there are. The 119 * list is in the same order as {@code unprocessedDocumentTypesNames}. 120 */ 121 protected final List<Supplier<Long>> numberOfUnprocessedEntitiesProviders = List.of( 122 () -> getUnprocessedDocumentsWithTitle(), 123 () -> getUnprocessedDocuments() 124 ); 125 126 /** 127 * Constructs a new abstract processing step, with the given parameter 128 * validation predicates and required parameters. Common validation parameters 129 * will be added automatically. 130 * 131 * @param validationPredicates The validation predicates to use to validate the 132 * parameters, including optional ones. 133 * @param requiredParameters A set of parameter names whose presence is 134 * required. 135 * @throws IllegalArgumentException If any parameter is {@code null}. 136 */ 137 protected AbstractProcessingStep( 138 @NonNull final Map<String, Predicate<String>> validationPredicates, @NonNull final Set<String> requiredParameters 139 ) { 140 final Map<String, Predicate<String>> commonValidationPredicates = Map.of( 141 PAGE_SIZE_STEP_PARAMETER_NAME, (final String value) -> { 142 try { 143 if (Integer.parseInt(value) < 1) { 144 throw new NumberFormatException(); 145 } 146 147 return true; 148 } catch (final NumberFormatException exc) { 149 return false; 150 } 151 }, 152 BATCH_SIZE_STEP_PARAMETER_NAME, (final String value) -> { 153 try { 154 final int actualValue = Integer.parseInt(value); 155 if (actualValue < 1) { 156 throw new NumberFormatException(); 157 } 158 159 return 160 actualValue <= Integer.parseInt( 161 getParameters().getOrDefault(PAGE_SIZE_STEP_PARAMETER_NAME, DEFAULT_PAGE_SIZE_STEP_PARAMETER) 162 ); 163 } catch (final NumberFormatException exc) { 164 return false; 165 } 166 }, 167 TEXT_DOCUMENT_WITH_TITLE_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME, (final String value) -> 168 value != null && !value.isBlank(), 169 TEXT_DOCUMENT_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME, (final String value) -> 170 value != null && !value.isBlank(), 171 PRIMARY_KEY_COLUMN_PROCESSING_STEP_PARAMETER_NAME, (final String value) -> 172 value != null && !value.isBlank(), 173 TEXT_COLUMN_PROCESSING_STEP_PARAMETER_NAME, (final String value) -> 174 value != null && !value.isBlank(), 175 TITLE_COLUMN_PROCESSING_STEP_PARAMETER_NAME, (final String value) -> 176 value != null && !value.isBlank() 177 ); 178 179 final Set<String> commonRequiredParameters = Set.of( 180 TEXT_DOCUMENT_WITH_TITLE_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME, 181 TEXT_DOCUMENT_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME 182 ); 183 184 final Map<String, Predicate<String>> actualValidationPredicates = new HashMap<>( 185 (int) Math.ceil((commonValidationPredicates.size() + validationPredicates.size()) / 0.75) 186 ); 187 final Set<String> actualRequiredParameters = new HashSet<>( 188 (int) Math.ceil((commonRequiredParameters.size() + requiredParameters.size()) / 0.75) 189 ); 190 191 actualValidationPredicates.putAll(commonValidationPredicates); 192 actualValidationPredicates.putAll(validationPredicates); 193 actualRequiredParameters.addAll(commonRequiredParameters); 194 actualRequiredParameters.addAll(requiredParameters); 195 196 this.validationPredicates = Collections.unmodifiableMap(actualValidationPredicates); 197 this.requiredParameters = Collections.unmodifiableSet(actualRequiredParameters); 198 } 199 200 /** 201 * {@inheritDoc} 202 * 203 * @implNote The implementation of this method saves the user provides parameter 204 * values, executes their validation predicates, and then invokes 205 * {@link #run()}, making sure a JPA transaction has begun. 206 */ 207 @Override 208 public final void execute(@NonNull final Map<String, String> parameters) throws ProcessingException { 209 final EntityTransaction entityTransaction = TextProcPersistence.get().getEntityManager().getTransaction(); 210 final String stepName = getClass().getSimpleName(); 211 boolean startedTransaction = false; 212 boolean transactionSuccessful = true; 213 214 this.parameters = parameters; 215 216 System.out.println(); 217 validateParameters(); 218 219 try { 220 if (!entityTransaction.isActive()) { 221 entityTransaction.begin(); 222 startedTransaction = true; 223 } 224 225 System.out.print("> Executing "); 226 System.out.print(stepName); 227 System.out.println("..."); 228 229 run(); 230 231 System.out.println(); 232 System.out.print("> "); 233 System.out.print(stepName); 234 System.out.println(" done."); 235 } catch (final ProcessingException exc) { 236 transactionSuccessful = false; 237 throw exc; 238 } catch (final PersistenceException exc) { 239 transactionSuccessful = false; 240 throw new ProcessingException(DATA_ACCESS_EXCEPTION_MESSAGE, exc); 241 } finally { 242 if (startedTransaction && !transactionSuccessful) { 243 entityTransaction.setRollbackOnly(); 244 } 245 246 // Silently flush changes, just in case it was not done per page 247 TextProcPersistence.get().flushEntities(); 248 } 249 } 250 251 /** 252 * Validates the user-specified parameter map. This method doesn't have any side 253 * effect if validation is successful. 254 * 255 * @throws ProcessingException If the validation is unsuccessful. 256 */ 257 private void validateParameters() throws ProcessingException { 258 for (final String requiredParameter : requiredParameters) { 259 if (!parameters.containsKey(requiredParameter)) { 260 throw new ProcessingException( 261 "Missing required parameter for step " + getClass().getSimpleName() + ": " + 262 requiredParameter 263 ); 264 } 265 } 266 267 for (final Entry<String, String> entry : parameters.entrySet()) { 268 if (!validationPredicates.get(entry.getKey()).test(entry.getValue())) { 269 throw new ProcessingException( 270 "The value for the parameter " + entry.getKey() + " in the " + getClass().getSimpleName() + " step is not valid" 271 ); 272 } 273 } 274 } 275 276 /** 277 * Returns the parameters provided by the user for this step. 278 * 279 * @return An unmodifiable, non-null map with the step parameters, where the 280 * keys are the parameter names. 281 */ 282 protected final Map<String, String> getParameters() { 283 return parameters; 284 } 285 286 /** 287 * Executes the given action for each batch of documents retrieved by a native 288 * JPA query. For the purposes of this method, native JPA queries are to be used 289 * when the queried table doesn't have a configured mapping entity. Therefore, 290 * it returns the value of the queried columns as strings, not performing any 291 * relational to object mapping beyond that. 292 * <p> 293 * To maximize performance, the processing action may be executed in any thread, 294 * so thread-safety must be guaranteed in its implementation if shared state is 295 * to be accessed. 296 * </p> 297 * <p> 298 * This method assumes a transaction is already active. 299 * </p> 300 * 301 * @param querySupplier A query supplier, that must return an appropriate, 302 * non-null native query object when invoked. This 303 * allows recreating the query object when needed. 304 * @param taskName The name of the task that will be performed with the 305 * documents. It will be shown to the user. 306 * @param numberOfDocuments The total number of documents that will be processed 307 * by the action. It must be zero or greater. 308 * @param action The action to execute for every batch of documents. 309 * A batch contains at least one document. The list 310 * supplied to the consumer is not modifiable. 311 * @param pageEndAction The action to execute after a document page is 312 * processed, if processing is successful. It might be 313 * {@code null}, in which case nothing will be done. In 314 * any case, no matter if processing is successful or 315 * not, any database transactions made by calling 316 * methods of this class are committed or rolled back 317 * before invoking this action. 318 * @throws ProcessingException If any parameter is invalid, or an exception 319 * occurred during the processing. 320 */ 321 protected final void forEachDocumentInNativeQuery( 322 @NonNull final Supplier<? extends Query> querySupplier, @NonNull final String taskName, final long numberOfDocuments, 323 @NonNull final ProcessingConsumer<List<String[]>> action, 324 final Runnable pageEndAction 325 ) throws ProcessingException { 326 long numberOfPages; 327 long numberOfSteps; 328 final int pageSize = Integer.parseInt( 329 getParameters().getOrDefault(PAGE_SIZE_STEP_PARAMETER_NAME, DEFAULT_PAGE_SIZE_STEP_PARAMETER) 330 ); 331 final int batchSize = Integer.parseInt( 332 getParameters().getOrDefault(BATCH_SIZE_STEP_PARAMETER_NAME, DEFAULT_BATCH_SIZE_STEP_PARAMETER) 333 ); 334 final List<String[]> entityAttributesBatch = new ArrayList<>(batchSize); 335 336 numberOfPages = numberOfDocuments / pageSize + (numberOfDocuments % pageSize == 0 ? 0 : 1); 337 numberOfSteps = 338 // At least one step per page 339 numberOfPages + 340 // For each surely complete page (every page except the last), there can be an extra step 341 // because of documents that didn't complete a batch 342 (numberOfPages - 1) * (pageSize % batchSize == 0 ? 0 : 1) + 343 // For the last page, there can be an extra step too (but it might not be a complete page) 344 ((numberOfDocuments % pageSize) % batchSize == 0 ? 0 : 1); 345 346 try { 347 for (int page = 0; page < numberOfPages; ++page) { 348 final Query query = querySupplier.get(); 349 350 if (query == null) { 351 throw new ProcessingException("The query supplier returned a null query"); 352 } 353 354 query.setFirstResult(page * pageSize); 355 query.setMaxResults(pageSize); 356 357 // We do not use getResultStream() to increase the chance that the resulting stream has a defined size 358 final Stream<?> resultStream = query.getResultList().parallelStream(); 359 360 ProgressBar.wrap( 361 resultStream, 362 new ProgressBarBuilder() 363 .setConsumer(progressBarConsumer) 364 .setTaskName(taskName + " (" + (page + 1) + "/" + numberOfSteps + ")") 365 .setStyle(ProgressBarStyle.ASCII) 366 .showSpeed() 367 ).forEach((final Object result) -> { 368 String[] attributes; 369 List<String[]> currentBatch = null; 370 371 try { 372 if (result.getClass().isArray()) { 373 // Several columns in result. Convert them to strings 374 // (casting Object[] to String[] doesn't work) 375 attributes = new String[((Object[]) result).length]; 376 377 for (int i = 0; i < attributes.length; ++i) { 378 attributes[i] = columnAttributeToString(((Object[]) result)[i]); 379 } 380 } else { 381 // One column in result. Wrap it in an array 382 attributes = new String[] { columnAttributeToString(result) }; 383 } 384 385 synchronized (entityAttributesBatch) { 386 entityAttributesBatch.add(attributes); 387 388 // Batch completed. Copy it so we exit the critical section ASAP, 389 // and we can add documents to a new batch without the consumer noticing 390 if (entityAttributesBatch.size() >= batchSize) { 391 currentBatch = List.copyOf(entityAttributesBatch); 392 entityAttributesBatch.clear(); 393 } 394 } 395 396 // Accept the batch. This can block until the batch is processed, 397 // but as we are outside a critical section it's not a bottleneck 398 if (currentBatch != null) { 399 action.accept(currentBatch); 400 } 401 } catch (final Exception exc) { 402 // Exceptions thrown by this method get silently discarded. 403 // Handle that by logging them 404 TextProcLogging.getLogger().log( 405 Level.WARNING, "An exception occurred while processing a batch of documents. Skipping...", exc 406 ); 407 } 408 }); 409 410 // Accept any remaining entity that did not make it to a complete batch 411 try { 412 synchronized (entityAttributesBatch) { 413 final int lastBatchSize = entityAttributesBatch.size(); 414 415 if (lastBatchSize > 0) { 416 try (final ProgressBar progressBar = 417 new ProgressBarBuilder() 418 .setConsumer(progressBarConsumer) 419 .setTaskName(taskName + " (" + (page + 2) + "/" + numberOfSteps + ")") 420 .setStyle(ProgressBarStyle.ASCII) 421 .showSpeed() 422 .setInitialMax(lastBatchSize) 423 .build() 424 ) { 425 action.accept(Collections.unmodifiableList(entityAttributesBatch)); 426 progressBar.stepTo(lastBatchSize); 427 } 428 } 429 } 430 } catch (final Exception exc) { 431 TextProcLogging.getLogger().log( 432 Level.WARNING, "An exception occurred while processing a batch of documents. Skipping...", exc 433 ); 434 } 435 436 System.out.println(); 437 if (databaseEntitiesChanged) { 438 System.out.println("> Committing changes to the database..."); 439 TextProcPersistence.get().flushEntities(); 440 System.out.print("> Changes committed."); 441 442 databaseEntitiesChanged = false; 443 } 444 445 if (pageEndAction != null) { 446 pageEndAction.run(); 447 } 448 } 449 } catch (final PersistenceException exc) { 450 throw new ProcessingException(DATA_ACCESS_EXCEPTION_MESSAGE, exc); 451 } 452 } 453 454 455 /** 456 * Stores a processed document in the database, from its processed attributes. 457 * This method starts and commits or rollbacks a JPA transaction, if no 458 * transaction is already active. 459 * 460 * @param documentType The type of document that is being processed, and 461 * will be stored. It will be instantiated via 462 * reflection, so the module containing the type 463 * definition must open its package for deep 464 * reflection to the module containing this code. 465 * @param primaryKey The primary key of the processed document. 466 * @param processedAttributes The processed attributes of the document. Their 467 * names (keys) must match the attributes of the 468 * concrete document type. 469 * @throws ProcessingException If some error occurs during the operation. 470 * @throws IllegalArgumentException If any parameter is {@code null}. 471 * @throws PersistenceException If some data access error occurs. 472 */ 473 protected final void saveProcessedDocument( 474 @NonNull final Class<? extends ProcessedDocument> documentType, final int primaryKey, @NonNull final Map<String, String> processedAttributes 475 ) throws ProcessingException { 476 final EntityManager thisThreadEntityManager = TextProcPersistence.get().getEntityManager(); 477 final EntityTransaction entityTransaction = thisThreadEntityManager.getTransaction(); 478 final ProcessedDocument entityToPersist; 479 boolean startedTransaction = false; 480 boolean transactionSuccessful = true; 481 482 try { 483 entityToPersist = documentType.getConstructor(Integer.class).newInstance(primaryKey); 484 } catch (final ReflectiveOperationException exc) { 485 throw new ProcessingException( 486 "Document class implementation contract violation, or unappropriate permissions for reflective operation", exc 487 ); 488 } 489 490 if (!entityTransaction.isActive()) { 491 entityTransaction.begin(); 492 startedTransaction = true; 493 } 494 495 try { 496 thisThreadEntityManager.persist(entityToPersist); 497 databaseEntitiesChanged = true; 498 } catch (final PersistenceException exc) { 499 transactionSuccessful = false; 500 throw exc; 501 } finally { 502 if (startedTransaction && !transactionSuccessful) { 503 entityTransaction.setRollbackOnly(); 504 } 505 } 506 507 // Change the entity processed data 508 for (final Entry<String, String> processedAttribute : processedAttributes.entrySet()) { 509 try { 510 final String attributeName = processedAttribute.getKey(); 511 512 documentType 513 .getMethod("set" + attributeName.substring(0, 1).toUpperCase() + attributeName.substring(1), String.class) 514 .invoke(entityToPersist, processedAttribute.getValue()); 515 } catch (final ReflectiveOperationException exc) { 516 throw new ProcessingException( 517 "Couldn't invoke attribute setter", exc 518 ); 519 } 520 } 521 } 522 523 /** 524 * Constructs the SELECT SQL statement for retrieving unprocessed text documents 525 * with title from a database, using native queries. 526 * 527 * @return The described statement. 528 */ 529 protected final String buildUnprocessedDocumentWithTitleSelectStatement() { 530 final Map<String, String> parameters = getParameters(); 531 final StringBuilder dmlSentenceBuilder = new StringBuilder(96); 532 533 dmlSentenceBuilder.append("SELECT "); 534 dmlSentenceBuilder.append( 535 parameters.getOrDefault(PRIMARY_KEY_COLUMN_PROCESSING_STEP_PARAMETER_NAME, DEFAULT_PRIMARY_KEY_COLUMN_PROCESSING_STEP_PARAMETER) 536 ); 537 dmlSentenceBuilder.append(", "); 538 dmlSentenceBuilder.append( 539 parameters.getOrDefault(TITLE_COLUMN_PROCESSING_STEP_PARAMETER_NAME, DEFAULT_TITLE_COLUMN_PROCESSING_STEP_PARAMETER) 540 ); 541 dmlSentenceBuilder.append(", "); 542 dmlSentenceBuilder.append( 543 parameters.getOrDefault(TEXT_COLUMN_PROCESSING_STEP_PARAMETER_NAME, DEFAULT_TEXT_COLUMN_PROCESSING_STEP_PARAMETER) 544 ); 545 dmlSentenceBuilder.append(" FROM "); 546 dmlSentenceBuilder.append( 547 parameters.get(TEXT_DOCUMENT_WITH_TITLE_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME) 548 ); 549 550 return dmlSentenceBuilder.toString(); 551 } 552 553 /** 554 * Constructs the SELECT SQL statement for retrieving unprocessed text documents 555 * without title from a database, using native queries. 556 * 557 * @return The described statement. 558 */ 559 protected final String buildUnprocessedDocumentSelectStatement() { 560 final Map<String, String> parameters = getParameters(); 561 final StringBuilder dmlSentenceBuilder = new StringBuilder(64); 562 563 dmlSentenceBuilder.append("SELECT "); 564 dmlSentenceBuilder.append( 565 parameters.getOrDefault(PRIMARY_KEY_COLUMN_PROCESSING_STEP_PARAMETER_NAME, DEFAULT_PRIMARY_KEY_COLUMN_PROCESSING_STEP_PARAMETER) 566 ); 567 dmlSentenceBuilder.append(", "); 568 dmlSentenceBuilder.append( 569 parameters.getOrDefault(TEXT_COLUMN_PROCESSING_STEP_PARAMETER_NAME, DEFAULT_TEXT_COLUMN_PROCESSING_STEP_PARAMETER) 570 ); 571 dmlSentenceBuilder.append(" FROM "); 572 dmlSentenceBuilder.append( 573 parameters.get(TEXT_DOCUMENT_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME) 574 ); 575 576 return dmlSentenceBuilder.toString(); 577 } 578 579 /** 580 * Returns the number of unprocessed text documents with title in the database. 581 * This method assumes a transaction is already active. 582 * 583 * @return The described number. 584 * @throws PersistenceException If some error occurs while executing SQL 585 * statements in the database. 586 */ 587 protected final long getUnprocessedDocumentsWithTitle() { 588 return ((Number) TextProcPersistence.get().getEntityManager().createNativeQuery( 589 "SELECT COUNT(*) FROM " + 590 getParameters().get(TEXT_DOCUMENT_WITH_TITLE_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME) 591 ).getSingleResult()).longValue(); 592 } 593 594 /** 595 * Returns the number of unprocessed text documents without title in the 596 * database. This method assumes a transaction is already active. 597 * 598 * @return The described number. 599 * @throws PersistenceException If some error occurs while executing SQL 600 * statements in the database. 601 */ 602 protected final long getUnprocessedDocuments() { 603 return ((Number) TextProcPersistence.get().getEntityManager().createNativeQuery( 604 "SELECT COUNT(*) FROM " + 605 getParameters().get(TEXT_DOCUMENT_TABLE_NAME_PROCESSING_STEP_PARAMETER_NAME) 606 ).getSingleResult()).longValue(); 607 } 608 609 /** 610 * Deletes all the processed documents of a given type from the database. 611 * 612 * @param <T> The type of documents to delete. 613 * @param documentType The type of documents to delete. 614 * @throws IllegalArgumentException If {@code documentType} is {@code null}. 615 * @throws PersistenceException If some error occurs while executing SQL 616 * statements in the database. 617 */ 618 protected final <T extends ProcessedDocument> void deleteAllProcessedDocumentsOfType(@NonNull final Class<T> documentType) { 619 final EntityManager entityManager = TextProcPersistence.get().getEntityManager(); 620 621 final CriteriaDelete<T> deleteCriteria = entityManager.getCriteriaBuilder().createCriteriaDelete(documentType); 622 deleteCriteria.from(documentType); 623 624 System.out.println("> Deleting " + documentType.getSimpleName() + " entities..."); 625 entityManager.createQuery(deleteCriteria).executeUpdate(); 626 627 databaseEntitiesChanged = true; 628 } 629 630 /** 631 * Executes the processing step implemented by this object. The processing step 632 * parameters are already validated and available upon request on 633 * {@link #getParameters()}. This method is invoked in the context of a JPA 634 * transaction that is started and committed or rolled back automatically. 635 * 636 * @throws ProcessingException If an exception occurs during execution. 637 */ 638 protected abstract void run() throws ProcessingException; 639 640 /** 641 * Converts a column attribute value to a string. 642 * 643 * @param attribute The attribute to convert. 644 * @return The converted attribute value. 645 * @throws ProcessingException If the value could not be converted. 646 */ 647 private String columnAttributeToString(final Object attribute) throws ProcessingException { 648 if (attribute != null) { 649 return attribute.toString(); 650 } else { 651 throw new ProcessingException("Unexpected column type"); 652 } 653 } 654 655 /** 656 * A consumer of data to be processed, which can throw a checked 657 * {@link ProcessingException}. 658 * 659 * @author Alejandro González García 660 * 661 * @param <T> The type of data to be processed. 662 * @see Consumer 663 */ 664 @FunctionalInterface 665 public static interface ProcessingConsumer<T> { 666 /** 667 * Performs the processing operation on the given argument. 668 * 669 * @param t The input argument. 670 */ 671 public void accept(final T t) throws ProcessingException; 672 } 673 674 /** 675 * A processing consumer that does nothing. It never throws exceptions. 676 * 677 * @author Alejandro González García 678 * 679 * @param <T> The type of objects consumed by this consumer. 680 */ 681 public static final class NullProcessingConsumer<T> implements ProcessingConsumer<T> { 682 @Override 683 public void accept(final T t) throws ProcessingException { 684 // Do nothing 685 } 686 } 687}