Queue Processing

Dari includes the following classes for implementing queue processing background tasks that run asynchronously.

  • AsyncQueue represents an asynchronous queue.
  • AsyncProducer creates data and puts it (produces) into an output queue. A subclass must implement the produce method.
  • AsyncConsumer reads from an input queue and processes (consumes) the data. For example, a consumer can transform the data and save it to a database or put it in another queue. A subclass must implement the consume method.
  • AsyncProcessor does both: it consumes all items from an input queue, processes them, and produces new items into an output queue. A subclass must implement the process method.
../../_images/queue-task-types.png

Dari provides queue processing database support that is implemented from the above collection of classes. You can also implement these classes to create queue pipelines.

Queue Pipeline Example

An asynchronous queue pipeline uses parallel tasks and concurrent queues to sequentially process input values in stages. For efficient pipeline processing, an applicable number of producer and consumer task instances must be allocated to prevent overloading any of the queues, resulting in a pipeline bottleneck.

This example shows how to use the Dari asynchronous classes to implement a simple pipeline that processes Book objects. The purpose of the pipeline is to retrieve all Book objects stored in a database and identify the longest word across all of the books.

The diagram shows the pipeline process flow. The pipeline consists of four queues that are produced and consumed by five background tasks. The queues and tasks are created and started in the BookAnalyzer class.

../../_images/queue-pipeline.png

In the above diagram —

  1. The Dari-supplied AsyncDatabaseReader implementation retrieves all Book objects from the database and produces the items into bookQueue.
  2. BookReader implementation consumes bookQueue and produces to rawBookWordQueue, which contains BookWord objects.
  3. BookWordSanitizer implementation consumes items in rawBookWordQueue and produces to cleanedBookWordQueue, which also contains BookWord objects.
  4. BookWordFilter implementation consumes from cleanedBookWordQueue and produces to longestWordQueue, which contains string items.
  5. LongestWordCalculator implementation consumes from longestWordQueue and writes the longest detected word to a log.

The processing sequence shown in the diagram can be tracked in the Task Status tool.

../../_images/queue-status.png

In the above screenshot —

  • The “Book Analyzer Queues” section lists the queue processors in the context of producing and consuming queues. The “Production” column lists the tasks that produce items into a queue; the “Consumption” column lists the tasks that consume items in a queue. Tasks that function as both a producer and a consumer are listed in both columns.
  • The “Book Analyzer Queues” section reflects the pipeline flow in a bottom-up fashion. For example, in the first step, AsyncDatabaseReader reads from the database and produces to bookQueue. In the second step, BookAnalyzer, listed in the “Consumption” column, consumes from bookQueue (2a). BookAnalyzer then produces to rawBookWordQueue (2b), and is thus listed in the “Production” column as well.
  • The “Successes” and “Failures” columns show the number of items successfully or unsuccessfully produced into or consumed from the queue. The “Wait” column shows the average time it took a task to produce or consume each item in the queue.

You can have multiple instances of a task running if necessary, which would be reflected in the Task Status tool. For example, if you instantiate three BookReader tasks, then three instances would be listed:

../../_images/multiple-tasks.png

Book Class

Book represents a book stored in the database. The text of a book is referenced with a URL.

public class Book extends Content {

   @Required
   @Indexed(unique = true)
   private String name;

   // i.e. http://classics.mit.edu/Homer/odyssey.mb.txt
   private String textUrl;

   public String getName() {
      return name;
   }

   public void setName(String name) {
      this.name = name;
   }

   public String getTextUrl() {
      return textUrl;
   }

   public void setTextUrl(String textUrl) {
      this.textUrl = textUrl;
   }
}

BookWord Class

BookWord represents a word from a book, consisting of the record UUID of the book with which the word is associated, the word’s sequential position in the book, and the text of the word.

public class BookWord {

   private UUID bookId;
   private String word;
   private int index;

   public BookWord(UUID bookId, String word, int index) {
      this.bookId = bookId;
      this.word = word;
      this.index = index;
   }

   public UUID getBookId() {
      return bookId;
   }

   public String getWord() {
      return word;
   }

   public int getIndex() {
      return index;
   }
}

BookAnalyzer Class

The analyze method of BookAnalyzer creates the queues and creates and starts the consumer and producer tasks. The producer and consumer classes are included as inner classes in BookAnalyzer, but are shown in separate listings below.

public class BookAnalyzer {

   protected static final Logger LOGGER = LoggerFactory.getLogger(BookAnalyzer.class);
   public static final String EXECUTOR = "Book Analyzer";

   public static void analyze() {

      // Create the queues
      AsyncQueue<Book> bookQueue = new AsyncQueue<>();
      AsyncQueue<BookWord> rawBookWordQueue = new AsyncQueue<>(new ArrayBlockingQueue<>(500));
      AsyncQueue<BookWord> cleanedBookWordQueue = new AsyncQueue<>();
      AsyncQueue<String> longestWordQueue = new AsyncQueue<>();

      // Mark the queues to close automatically
      longestWordQueue.closeAutomatically();
      cleanedBookWordQueue.closeAutomatically();
      rawBookWordQueue.closeAutomatically();
      bookQueue.closeAutomatically();

      // Create the consumers and producers
      // Only 1 database reader
      AsyncDatabaseReader<Book> bookFinder = new AsyncDatabaseReader<>(EXECUTOR,
             bookQueue, Database.Static.getDefault(), Query.from(Book.class));

      /* For the following tasks, only 1 instance is created. To create multiple tasks,
         increase the task's count field (e.g. bookReaderCount). */

      // Implements AsyncConsumer, but also behaves like a Processor. See BookReader listing below.
      final int bookReaderCount = 1;
      List<AsyncConsumer<Book>> bookReaders = IntStream
             .range(0, bookReaderCount)
             .mapToObj(i -> new BookReader(
                     bookQueue, rawBookWordQueue))
             .collect(Collectors.toList());

      final int wordSanitizersCount = 1;
      List<AsyncProcessor<BookWord, BookWord>> wordSanitizers = IntStream
             .range(0, wordSanitizersCount)
             .mapToObj(i -> new BookWordSanitizer(
                     rawBookWordQueue, cleanedBookWordQueue))
             .collect(Collectors.toList());

      final int wordFiltersCount = 1;
      List<AsyncProcessor<BookWord, String>> wordFilters = IntStream
             .range(0, wordFiltersCount)
             .mapToObj(i -> new BookWordFilter(
                     cleanedBookWordQueue, longestWordQueue))
             .collect(Collectors.toList());

      // Only 1 longest word calculator (unless you want to share the running tally across threads).
      final int longestWordCalculatorsCount = 1;
      List<AsyncConsumer<String>> longestWordCalculators = IntStream
             .range(0, longestWordCalculatorsCount)
             .mapToObj(i -> new LongestWordCalculator(longestWordQueue))
             .collect(Collectors.toList());

      // Starts the Consumers and Producers
      longestWordCalculators.forEach(Task::submit);
      wordFilters.forEach(Task::submit);
      wordSanitizers.forEach(Task::submit);
      bookReaders.forEach(Task::submit);
      bookFinder.submit(); // runs once and stops

      LOGGER.info("Started Analyzing Books...");
   }
... // Inner Task classes for queue processing, shown below.
}

BookReader Class

As a subclass of AsynConsumer, BookReader implements the consume method. Using the java.util.Scanner class, the method consumes Book objects from bookQueue, parses the book into word tokens, and produces to rawBookWordQueue, which contains BookWord objects.

Note that even though BookReader extends AsynConsumer, the constructor makes BookReader a producer as well. The Queue#addProducer method enables a consumer to also take on a producer role.

public static class BookReader extends AsyncConsumer<Book> {

   private AsyncQueue<BookWord> output;

   public BookReader(AsyncQueue<Book> input, AsyncQueue<BookWord> output) {
         super(EXECUTOR, input);
         this.output = output;
         this.output.addProducer(this);
   }

   @Override
   protected void consume(Book book) throws Exception {
         Scanner sc = new Scanner(book.getTextStream());
         int index = 0;
         while (sc.hasNext()) {
            String word = sc.next();
            output.add(new BookWord(book.getId(), word, index));
            index++;
         }
   }

   @Override
   protected void finished() {
         try {
             super.finished();
         } finally {
             this.output.removeProducer(this);
         }
   }
}

BookWordSanitizer Class

As a subclass of AsyncProcessor, BookWordSanitizer implements the process method. The method consumes BookWord objects from rawBookWordQueue, removes punctuation from word text, and checks word text for unwanted characters. Transformed and remaining BookWord objects are produced to cleanedBookWordQueue. (Clean up code is not shown in the code example.)

 public static class BookWordSanitizer extends AsyncProcessor<BookWord, BookWord> {

   Set<Integer> PUNCTUATION_CHARACTERS = new HashSet<>(
             Arrays.asList(
                     (int) '~',   (int) '&',   (int) '{',   (int) '\'',
                     (int) '`',   (int) '*',   (int) '}',   (int) '"',
                     (int) '!',   (int) '(',   (int) '[',   (int) '<',
                     (int) '@',   (int) ')',   (int) ']',   (int) '>',
                     (int) '#',   (int) '-',   (int) '\\',  (int) ',',
                     (int) '$',   (int) '_',   (int) '|',   (int) '.',
                     (int) '%',   (int) '+',   (int) ';',   (int) '?',
                     (int) '^',   (int) '=',   (int) ':',   (int) '/'
             ));

   public BookWordSanitizer(AsyncQueue<BookWord> input, AsyncQueue<BookWord> output) {
         super(EXECUTOR, input, output);
   }

   @Override
   protected BookWord process(BookWord bookWord) throws Exception {
      String word = bookWord.getWord();

      Thread.sleep(1);

      // Trim punctuation and lower case it
         String sanitizedWord = trimPunctuation(word, PUNCTUATION_CHARACTERS::contains).toLowerCase();

      // Ignore email address like text and URLs
      if (sanitizedWord.contains("@") || sanitizedWord.startsWith("http")) {
             sanitizedWord = "";
      }

      if (!sanitizedWord.equals(word)) {
             return new BookWord(bookWord.getBookId(), sanitizedWord, bookWord.getIndex());

      } else {
             return bookWord;
      }
   }

   .. // Clean up methods not shown.
}

BookWordFilter Class

As a subclass of AsyncProcessor, BookWordFilter implements the process method. The method consumes BookWord objects from cleanedBookWordQueue, verifies that the object’s word text is at least one character in length, and produces the word text to longestWordQueue, a String queue.

public static class BookWordFilter extends AsyncProcessor<BookWord, String> {

   private static final int MINIMUM_WORD_LENGTH = 1;

   public BookWordFilter(AsyncQueue<BookWord> input, AsyncQueue<String> output) {
      super(EXECUTOR, input, output);
   }

   @Override
   protected String process(BookWord bookWord) throws Exception {
      String word = bookWord.getWord();

      if (word.length() < MINIMUM_WORD_LENGTH) {
         // Or you can throw an exception and handle it gracefully.
         return null;
       }
         return word;
   }

   @Override
    protected void handleError(BookWord item, Exception error) {
         // Handle errors here...
   }
}

LongestWordCalculator Class

1

As a subclass of AsynConsumer, LongestWordCalculator implements the consume method. The method consumes each word string from the longestWordQueue, compares the length of the current word to that of the longest word found so far, and sets a variable to the current word if it is indeed longer. Upon completion, the task writes the longest word found across all of the Book objects.

public static class LongestWordCalculator extends AsyncConsumer<String> {

   private String longestWord = null;

   public LongestWordCalculator(AsyncQueue<String> input) {
      super(EXECUTOR, input);
   }

   @Override
   protected void consume(String item) throws Exception {
      if (longestWord == null || longestWord.length() < item.length()) {
         longestWord = item;
      }
   }

   @Override
   protected void finished() {
      super.finished();
      LOGGER.info("Longest word is: " + longestWord);
   }
}