Async Database

Dari provides asynchronous database support for efficient batch processing in high-volume environments. AsyncDatabaseReader reads from a database to a queue, and AsyncDatabaseWriter writes to a database from a queue.

AsyncDatabaseReader and AsyncDatabaseWriter are implemented as background tasks, extending AsyncProducer and AsyncConsumer respectively. AsyncProducer produces items into an output queue; a subclass of AsyncProducer implements how data is produced into the queue. AsyncConsumer consumes all items from an input queue; a subclass of AsyncConsumer implements how the queue data is consumed. You can create your own queue processing background tasks by subclassing AsyncProducer or AsyncConsumer.

Note that when a background task is created, it appears in the Task Status tool.

AsyncDatabaseReader

AsyncDatabaseReader enables a queue to be produced from a database query. In the following example, a queue is created and populated with Article objects retrieved from the database.

1
2
3
4
5
6
7
8
AsyncQueue<Article> articleQueue = new AsyncQueue<Article>();
Query<Article> articleQuery = Query.from(Article.class);
new AsyncDatabaseReader(
    "Article Reader",
    articleQueue,
    Database.Static.getDefault(),
    articleQuery)
    .submit();

In the previous snippet—

  • Line 1 creates an AsyncQueue object, to be used by the AsyncDatabaseReader object.
  • Line 2 creates a Query for all Article objects in the database.
  • Lines 3–7 create an AsyncDatabaseReader object. The constructor takes the following arguments:
    • the first argument is the executor, identified as “Article Reader”. If it is null, the default executor is used.
    • the second argument identifies the queue where the Article objects are written. If null, AsyncDatabaseReader creates a new queue.
    • the third argument is the database from which the objects are retrieved. The value can’t be null.
    • the fourth argument is the database query to execute. The value can’t be null.
  • Line 8 submits the AsyncDatabaseReader task to run.

Typically when a queue is populated by an AsyncDatabaseReader task, it is processed by another background task, one that reads objects from an input queue, processes them, and writes the processed objects to an output queue. The following code creates such a task, which reads from the “articleQueue” populated by “Article Reader”, processes the articles, then writes them to an output queue called “processedQueue”. This task could, for example, change the lead image that every article references.

// Create processing task that reads from articleQueue, processes articles, and writes to processedQueue
AsyncQueue<Article> processedQueue = new AsyncQueue<Article>(new ArrayBlockingQueue<Article>(10000));
   new ProcessorTask(
       "Article Processor",
       articleQueue,
       processedQueue)
       .submit();

AsyncDatabaseWriter

AsyncDatabaseWriter commits many objects to a database in batches. Continuing with the above scenario, the following example creates an AsyncDatabaseWriter task, which commits the articles in “processedQueue” to the database.

1
2
3
4
5
6
7
8
new AsyncDatabaseWriter(
    "Article Writer",
    processedQueue,
    Database.Static.getDefault(),
    WriteOperation.SAVE,
    100,
    true)
    .submit();

In the previous snippet—

  • Lines 1–7 create an AsyncDatabaseWriter object. The constructor takes the following arguments:
    • the first argument is the executor, identified as “Article Writer”. If it is null, the default executor is used.
    • the second argument identifies the input queue where the Article objects are stored in memory. The value can’t be null.
    • the third argument is the database to which the objects are written. The value can’t be null.
    • the fourth argument is the WriteOperation to use. The value can’t be null.
    • the fifth argument is the commit size, the number of items to save in a single commit to the database.
    • the sixth argument is the commitEventually flag. If true, the Database commitWritesEventually method is used instead of the commitWrites method.
  • Line 8 submits the AsyncDatabaseWriter task to run.