Queue processing
Dari includes the following classes for implementing queue processing background tasks that run asynchronously.
AsyncQueue represents an asynchronous queue.
AsyncProducer creates data and puts it (produces) into an output queue. A subclass must implement the
produce
method.AsyncConsumer reads from an input queue and processes (consumes) the data. For example, a consumer can transform the data and save it to a database or put it in another queue. A subclass must implement the
consume
method.AsyncProcessor does both: it consumes all items from an input queue, processes them, and produces new items into an output queue. A subclass must implement the
process
method.

Dari provides queue processing database support that is implemented from the above collection of classes. You can also implement these classes to create queue pipelines.
Queue pipeline example
An asynchronous queue pipeline uses parallel tasks and concurrent queues to sequentially process input values in stages. For efficient pipeline processing, an applicable number of producer and consumer task instances must be allocated to prevent overloading any of the queues, resulting in a pipeline bottleneck.
This example shows how to use the Dari asynchronous classes to implement a simple pipeline that processes Book objects. The purpose of the pipeline is to retrieve all Book
objects stored in a database and identify the longest word across all of the books.
The diagram shows the pipeline process flow. The pipeline consists of four queues that are produced and consumed by five background tasks. The queues and tasks are created and started in the BookAnalyzer class.

In the above diagram—
The Dari-supplied AsyncDatabaseReader implementation retrieves all
Book
objects from the database and produces the items intobookQueue
.BookReader implementation consumes
bookQueue
and produces torawBookWordQueue
, which contains BookWord objects.BookWordSanitizer implementation consumes items in
rawBookWordQueue
and produces tocleanedBookWordQueue
, which also containsBookWord
objects.BookWordFilter implementation consumes from
cleanedBookWordQueue
and produces tolongestWordQueue
, which contains string items.LongestWordCalculator implementation consumes from
longestWordQueue
and writes the longest detected word to a log.
The processing sequence shown in the diagram can be tracked in the Task Status tool.

In the above screen shot—
The "Book Analyzer Queues" section lists the queue processors in the context of producing and consuming queues. The "Production" column lists the tasks that produce items into a queue; the "Consumption" column lists the tasks that consume items in a queue. Tasks that function as both a producer and a consumer are listed in both columns.
The "Book Analyzer Queues" section reflects the pipeline flow in a bottom-up fashion. For example, in the first step,
AsyncDatabaseReader
reads from the database and produces tobookQueue
. In the second step,BookAnalyzer
, listed in the "Consumption" column, consumes frombookQueue
(2a).BookAnalyzer
then produces torawBookWordQueue
(2b), and is thus listed in the "Production" column as well.The "Successes" and "Failures" columns show the number of items successfully or unsuccessfully produced into or consumed from the queue. The "Wait" column shows the average time it took a task to produce or consume each item in the queue.
You can have multiple instances of a task running if necessary, which would be reflected in the Task Status tool. For example, if you instantiate three BookReader
tasks, then three instances would be listed:

Book class
Book
represents a book stored in the database. The text of a book is referenced with a URL.
public class Book extends Content { @Required @Indexed(unique = true) private String name; /* i.e. http://classics.mit.edu/Homer/odyssey.mb.txt */ private String textUrl; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getTextUrl() { return textUrl; } public void setTextUrl(String textUrl) { this.textUrl = textUrl; } }
BookWord class
BookWord
represents a word from a book, consisting of the record UUID of the book with which the word is associated, the word's sequential position in the book, and the text of the word.
public class BookWord { private UUID bookId; private String word; private int index; public BookWord(UUID bookId, String word, int index) { this.bookId = bookId; this.word = word; this.index = index; } public UUID getBookId() { return bookId; } public String getWord() { return word; } public int getIndex() { return index; } }
BookAnalyzer class
The analyze
method of BookAnalyzer
creates the queues and creates and starts the consumer and producer tasks. The producer and consumer classes are included as inner classes in BookAnalyzer
, but are shown in separate listings below.
public class BookAnalyzer { protected static final Logger LOGGER = LoggerFactory.getLogger(BookAnalyzer.class); public static final String EXECUTOR = "Book Analyzer"; public static void analyze() { /* Create the queues */ AsyncQueue<Book> bookQueue = new AsyncQueue<>(); AsyncQueue<BookWord> rawBookWordQueue = new AsyncQueue<>(new ArrayBlockingQueue<>(500)); AsyncQueue<BookWord> cleanedBookWordQueue = new AsyncQueue<>(); AsyncQueue<String> longestWordQueue = new AsyncQueue<>(); /* Mark the queues to close automatically */ longestWordQueue.closeAutomatically(); cleanedBookWordQueue.closeAutomatically(); rawBookWordQueue.closeAutomatically(); bookQueue.closeAutomatically(); /* Create the consumers and producers Only 1 database reader */ AsyncDatabaseReader<Book> bookFinder = new AsyncDatabaseReader<>(EXECUTOR, bookQueue, Database.Static.getDefault(), Query.from(Book.class)); /* For the following tasks, only 1 instance is created. To create multiple tasks, increase the task's count field (e.g. bookReaderCount). */ /* Implements AsyncConsumer, but also behaves like a Processor. See BookReader listing below. */ final int bookReaderCount = 1; List<AsyncConsumer<Book>> bookReaders = IntStream .range(0, bookReaderCount) .mapToObj(i -> new BookReader( bookQueue, rawBookWordQueue)) .collect(Collectors.toList()); final int wordSanitizersCount = 1; List<AsyncProcessor<BookWord, BookWord>> wordSanitizers = IntStream .range(0, wordSanitizersCount) .mapToObj(i -> new BookWordSanitizer( rawBookWordQueue, cleanedBookWordQueue)) .collect(Collectors.toList()); final int wordFiltersCount = 1; List<AsyncProcessor<BookWord, String>> wordFilters = IntStream .range(0, wordFiltersCount) .mapToObj(i -> new BookWordFilter( cleanedBookWordQueue, longestWordQueue)) .collect(Collectors.toList()); /* Only 1 longest word calculator (unless you want to share the running tally across threads). */ final int longestWordCalculatorsCount = 1; List<AsyncConsumer<String>> longestWordCalculators = IntStream .range(0, longestWordCalculatorsCount) .mapToObj(i -> new LongestWordCalculator(longestWordQueue)) .collect(Collectors.toList()); /* Starts the Consumers and Producers */ longestWordCalculators.forEach(Task::submit); wordFilters.forEach(Task::submit); wordSanitizers.forEach(Task::submit); bookReaders.forEach(Task::submit); bookFinder.submit(); // runs once and stops LOGGER.info("Started Analyzing Books..."); } /* Inner Task classes for queue processing, shown below. */ }
BookReader class
As a subclass of AsynConsumer
, BookReader
implements the consume
method. Using the java.util.Scanner
class, the method consumes Book
objects from bookQueue
, parses the book into word tokens, and produces to rawBookWordQueue
, which contains BookWord
objects.
Even though BookReader
extends AsynConsumer
, the constructor makes BookReader
a producer as well. The Queue#addProducer
method enables a consumer to also take on a producer role.
public static class BookReader extends AsyncConsumer<Book> { private AsyncQueue<BookWord> output; public BookReader(AsyncQueue<Book> input, AsyncQueue<BookWord> output) { super(EXECUTOR, input); this.output = output; this.output.addProducer(this); } @Override protected void consume(Book book) throws Exception { Scanner sc = new Scanner(book.getTextStream()); int index = 0; while (sc.hasNext()) { String word = sc.next(); output.add(new BookWord(book.getId(), word, index)); index++; } } @Override protected void finished() { try { super.finished(); } finally { this.output.removeProducer(this); } } }
BookWordSanitizer class
As a subclass of AsyncProcessor
, BookWordSanitizer
implements the process
method. The method consumes BookWord
objects from rawBookWordQueue
, removes punctuation from word text, and checks word text for unwanted characters. Transformed and remaining BookWord
objects are produced to cleanedBookWordQueue
. (Clean up code is not shown in the code example.)
public static class BookWordSanitizer extends AsyncProcessor<BookWord, BookWord> { Set<Integer> PUNCTUATION_CHARACTERS = new HashSet<>( Arrays.asList( (int) '~', (int) '&', (int) '{', (int) '\'', (int) '`', (int) '*', (int) '}', (int) '"', (int) '!', (int) '(', (int) '[', (int) '<', (int) '@', (int) ')', (int) ']', (int) '>', (int) '#', (int) '-', (int) '\\', (int) ',', (int) '$', (int) '_', (int) '|', (int) '.', (int) '%', (int) '+', (int) ';', (int) '?', (int) '^', (int) '=', (int) ':', (int) '/' ) ); public BookWordSanitizer(AsyncQueue<BookWord> input, AsyncQueue<BookWord> output) { super(EXECUTOR, input, output); } @Override protected BookWord process(BookWord bookWord) throws Exception { String word = bookWord.getWord(); Thread.sleep(1); /* Trim punctuation and lower case it */ String sanitizedWord = trimPunctuation(word, PUNCTUATION_CHARACTERS::contains).toLowerCase(); /* Ignore email address like text and URLs */ if (sanitizedWord.contains("@") || sanitizedWord.startsWith("http")) { sanitizedWord = ""; } if (!sanitizedWord.equals(word)) { return new BookWord(bookWord.getBookId(), sanitizedWord, bookWord.getIndex()); } else { return bookWord; } } /* Clean-up methods not shown. */ }
BookWordFilter class
As a subclass of AsyncProcessor
, BookWordFilter
implements the process
method. The method consumes BookWord
objects from cleanedBookWordQueue
, verifies that the object's word text is at least one character in length, and produces the word text to longestWordQueue
, a String
queue.
public static class BookWordFilter extends AsyncProcessor<BookWord, String> { private static final int MINIMUM_WORD_LENGTH = 1; public BookWordFilter(AsyncQueue<BookWord> input, AsyncQueue<String> output) { super(EXECUTOR, input, output); } @Override protected String process(BookWord bookWord) throws Exception { String word = bookWord.getWord(); if (word.length() < MINIMUM_WORD_LENGTH) { /* Or you can throw an exception and handle it gracefully. */ return null; } return word; } @Override protected void handleError(BookWord item, Exception error) { /* Handle errors here... */ } }
LongestWordCalculator Class
As a subclass of AsynConsumer
, LongestWordCalculator
implements the consume
method. The method consumes each word string from the longestWordQueue
, compares the length of the current word to that of the longest word found so far, and sets a variable to the current word if it is indeed longer. Upon completion, the task writes the longest word found across all of the Book
objects.
public static class LongestWordCalculator extends AsyncConsumer<String> { private String longestWord = null; public LongestWordCalculator(AsyncQueue<String> input) { super(EXECUTOR, input); } @Override protected void consume(String item) throws Exception { if (longestWord == null || longestWord.length() < item.length()) { longestWord = item; } } @Override protected void finished() { super.finished(); LOGGER.info("Longest word is: " + longestWord); } }