Queue pipeline example
An asynchronous queue pipeline uses parallel tasks and concurrent queues to sequentially process input values in stages. For efficient pipeline processing, an applicable number of producer and consumer task instances must be allocated to prevent overloading any of the queues, resulting in a pipeline bottleneck.
This example shows how to use the Dari asynchronous classes to implement a simple pipeline that processes Book objects. The purpose of the pipeline is to retrieve all Book objects stored in a database and identify the longest word across all of the books.
The diagram shows the pipeline process flow. The pipeline consists of four queues that are produced and consumed by five background tasks. The queues and tasks are created and started in the BookAnalyzer class.

In the above diagram—
- The Dari-supplied AsyncDatabaseReader implementation retrieves all
Bookobjects from the database and produces the items intobookQueue. - BookReader implementation consumes
bookQueueand produces torawBookWordQueue, which contains BookWord objects. - BookWordSanitizer implementation consumes items in
rawBookWordQueueand produces tocleanedBookWordQueue, which also containsBookWordobjects. - BookWordFilter implementation consumes from
cleanedBookWordQueueand produces tolongestWordQueue, which contains string items. - LongestWordCalculator implementation consumes from
longestWordQueueand writes the longest detected word to a log.
The processing sequence shown in the diagram can be tracked in the Task Status tool.

In the above screen shot—
- The “Book Analyzer Queues” section lists the queue processors in the context of producing and consuming queues. The “Production” column lists the tasks that produce items into a queue; the “Consumption” column lists the tasks that consume items in a queue. Tasks that function as both a producer and a consumer are listed in both columns.
- The “Book Analyzer Queues” section reflects the pipeline flow in a bottom-up fashion. For example, in the first step, AsyncDatabaseReader reads from the database and produces to bookQueue. In the second step, BookAnalyzer, listed in the “Consumption” column, consumes from bookQueue (2a). BookAnalyzer then produces to rawBookWordQueue (2b), and is thus listed in the “Production” column as well.
- The “Successes” and “Failures” columns show the number of items successfully or unsuccessfully produced into or consumed from the queue. The “Wait” column shows the average time it took a task to produce or consume each item in the queue.
You can have multiple instances of a task running if necessary, which would be reflected in the Task Status tool. For example, if you instantiate three BookReader tasks, then three instances would be listed:

Book class
Book represents a book stored in the database. The text of a book is referenced with a URL.
1public class Book extends Content {23@Required4@Indexed(unique = true)5private String name;67/* i.e. http://classics.mit.edu/Homer/odyssey.mb.txt */8private String textUrl;910public String getName() {11return name;12}1314public void setName(String name) {15this.name = name;16}1718public String getTextUrl() {19return textUrl;20}2122public void setTextUrl(String textUrl) {23this.textUrl = textUrl;24}25}
BookWord class
BookWord represents a word from a book, consisting of the record UUID of the book with which the word is associated, the word’s sequential position in the book, and the text of the word.
1public class BookWord {23private UUID bookId;4private String word;5private int index;67public BookWord(UUID bookId, String word, int index) {8this.bookId = bookId;9this.word = word;10this.index = index;11}1213public UUID getBookId() {14return bookId;15}1617public String getWord() {18return word;19}2021public int getIndex() {22return index;23}24}
BookAnalyzer class
The analyze method of BookAnalyzer creates the queues and creates and starts the consumer and producer tasks. The producer and consumer classes are included as inner classes in BookAnalyzer, but are shown in separate listings below.
1public class BookAnalyzer {23protected static final Logger LOGGER = LoggerFactory.getLogger(BookAnalyzer.class);4public static final String EXECUTOR = "Book Analyzer";5public static void analyze() {6/* Create the queues */7AsyncQueue<Book> bookQueue = new AsyncQueue<>();8AsyncQueue<BookWord> rawBookWordQueue = new AsyncQueue<>(new ArrayBlockingQueue<>(500));9AsyncQueue<BookWord> cleanedBookWordQueue = new AsyncQueue<>();10AsyncQueue<String> longestWordQueue = new AsyncQueue<>();1112/* Mark the queues to close automatically */13longestWordQueue.closeAutomatically();14cleanedBookWordQueue.closeAutomatically();15rawBookWordQueue.closeAutomatically();16bookQueue.closeAutomatically();17/* Create the consumers and producers18Only 1 database reader */19AsyncDatabaseReader<Book> bookFinder = new AsyncDatabaseReader<>(EXECUTOR,20bookQueue, Database.Static.getDefault(), Query.from(Book.class));2122/* For the following tasks, only 1 instance is created. To create multiple tasks,23increase the task's count field (e.g. bookReaderCount). */2425/* Implements AsyncConsumer, but also behaves like a Processor. See BookReader listing below. */26final int bookReaderCount = 1;27List<AsyncConsumer<Book>> bookReaders = IntStream28.range(0, bookReaderCount)29.mapToObj(i -> new BookReader(30bookQueue, rawBookWordQueue))31.collect(Collectors.toList());3233final int wordSanitizersCount = 1;34List<AsyncProcessor<BookWord, BookWord>> wordSanitizers = IntStream35.range(0, wordSanitizersCount)36.mapToObj(i -> new BookWordSanitizer(37rawBookWordQueue, cleanedBookWordQueue))38.collect(Collectors.toList());3940final int wordFiltersCount = 1;41List<AsyncProcessor<BookWord, String>> wordFilters = IntStream42.range(0, wordFiltersCount)43.mapToObj(i -> new BookWordFilter(44cleanedBookWordQueue, longestWordQueue))45.collect(Collectors.toList());4647/* Only 1 longest word calculator (unless you want to share the running tally across threads). */48final int longestWordCalculatorsCount = 1;49List<AsyncConsumer<String>> longestWordCalculators = IntStream50.range(0, longestWordCalculatorsCount)51.mapToObj(i -> new LongestWordCalculator(longestWordQueue))52.collect(Collectors.toList());5354/* Starts the Consumers and Producers */55longestWordCalculators.forEach(Task::submit);56wordFilters.forEach(Task::submit);57wordSanitizers.forEach(Task::submit);58bookReaders.forEach(Task::submit);59bookFinder.submit(); // runs once and stops6061LOGGER.info("Started Analyzing Books...");62}63/* Inner Task classes for queue processing, shown below. */64}
BookReader class
As a subclass of AsynConsumer, BookReader implements the consume method. Using the java.util.Scanner class, the method consumes Book objects from bookQueue, parses the book into word tokens, and produces to rawBookWordQueue, which contains BookWord objects.
Even though BookReader extends AsynConsumer, the constructor makes BookReader a producer as well. The Queue#addProducer method enables a consumer to also take on a producer role.
1public static class BookReader extends AsyncConsumer<Book> {23private AsyncQueue<BookWord> output;45public BookReader(AsyncQueue<Book> input, AsyncQueue<BookWord> output) {6super(EXECUTOR, input);7this.output = output;8this.output.addProducer(this);9}1011@Override12protected void consume(Book book) throws Exception {13Scanner sc = new Scanner(book.getTextStream());14int index = 0;15while (sc.hasNext()) {16String word = sc.next();17output.add(new BookWord(book.getId(), word, index));18index++;19}20}2122@Override23protected void finished() {24try {25super.finished();26} finally {27this.output.removeProducer(this);28}29}30}
BookWordSanitizer class
As a subclass of AsyncProcessor, BookWordSanitizer implements the process method. The method consumes BookWord objects from rawBookWordQueue, removes punctuation from word text, and checks word text for unwanted characters. Transformed and remaining BookWord objects are produced to cleanedBookWordQueue. (Clean up code is not shown in the code example.)
1public static class BookWordSanitizer extends AsyncProcessor<BookWord, BookWord> {23Set<Integer> PUNCTUATION_CHARACTERS = new HashSet<>(4Arrays.asList(5(int) '~', (int) '&', (int) '{', (int) '\'',6(int) '`', (int) '*', (int) '}', (int) '"',7(int) '!', (int) '(', (int) '[', (int) '<',8(int) '@', (int) ')', (int) ']', (int) '>',9(int) '#', (int) '-', (int) '\\', (int) ',',10(int) '$', (int) '_', (int) '|', (int) '.',11(int) '%', (int) '+', (int) ';', (int) '?',12(int) '^', (int) '=', (int) ':', (int) '/'13)14);1516public BookWordSanitizer(AsyncQueue<BookWord> input, AsyncQueue<BookWord> output) {17super(EXECUTOR, input, output);18}1920@Override21protected BookWord process(BookWord bookWord) throws Exception {22String word = bookWord.getWord();2324Thread.sleep(1);2526/* Trim punctuation and lower case it */27String sanitizedWord = trimPunctuation(word, PUNCTUATION_CHARACTERS::contains).toLowerCase();2829/* Ignore email address like text and URLs */30if (sanitizedWord.contains("@") || sanitizedWord.startsWith("http")) {31sanitizedWord = "";32}3334if (!sanitizedWord.equals(word)) {35return new BookWord(bookWord.getBookId(), sanitizedWord, bookWord.getIndex());3637} else {38return bookWord;39}40}41/* Clean-up methods not shown. */42}
BookWordFilter class
As a subclass of AsyncProcessor, BookWordFilter implements the process method. The method consumes BookWord objects from cleanedBookWordQueue, verifies that the object’s word text is at least one character in length, and produces the word text to longestWordQueue, a String queue.
1public static class BookWordFilter extends AsyncProcessor<BookWord, String> {23private static final int MINIMUM_WORD_LENGTH = 1;45public BookWordFilter(AsyncQueue<BookWord> input, AsyncQueue<String> output) {6super(EXECUTOR, input, output);7}89@Override10protected String process(BookWord bookWord) throws Exception {11String word = bookWord.getWord();1213if (word.length() < MINIMUM_WORD_LENGTH) {14/* Or you can throw an exception and handle it gracefully. */15return null;16}17return word;18}1920@Override21protected void handleError(BookWord item, Exception error) {22/* Handle errors here... */23}24}
LongestWordCalculator Class
As a subclass of AsynConsumer, LongestWordCalculator implements the consume method. The method consumes each word string from the longestWordQueue, compares the length of the current word to that of the longest word found so far, and sets a variable to the current word if it is indeed longer. Upon completion, the task writes the longest word found across all of the Book objects.
1public static class LongestWordCalculator extends AsyncConsumer<String> {23private String longestWord = null;45public LongestWordCalculator(AsyncQueue<String> input) {6super(EXECUTOR, input);7}89@Override10protected void consume(String item) throws Exception {11if (longestWord == null || longestWord.length() < item.length()) {12longestWord = item;13}14}1516@Override17protected void finished() {18super.finished();19LOGGER.info("Longest word is: " + longestWord);20}21}