Chapter 6 The Task Queue System

The Task Queue System maintains a list of outstanding analysis tasks in the database and is responsible for assigning tasks to analysis processes.

The database implementation is designed so that it can be utilised by either audioBlastAnalyse or in the future by an analysis package in another language (e.g. a Python analysis suite).

6.1 Database Implementation

The system is implemented as two tables in the database, tasks and tasks-progress. Tasks are assigned using the stored procedure get-tasks() for local files, or get-tasks-by-file() for web files. The get-tasks() procedure will assign n random tasks to the analysis process, whereas get-tasks-by-file() will return all outstanding tasks for a given file. The latter procedure removes the need for repeated downloads of the same file.

6.1.1 tasks table

The table tasks is a list of outstanding analysis tasks. It is populated automatically by triggers on the recordings and recordings-calcuated table. Analysis tasks are grouped into sections (recordings_calculated, sounscapes_minute and soundscpaes_second). The recordings-calculaed table has a TINYINT column for each of these, that is set to 1 once the task is completed.

6.1.2 tasks-progress table

The table tasks-progress is populated by the stored procedure get-tasks() when an analysis process requests outstanding tasks. It is used to prevent the same task being analysed to multiple analysis processes. Tasks from an unresponsive process (e.g. a process which has crashed) can be reassigned after an hour of inactivity by that process.

6.1.3 get-tasks() stored procedure

The stored procedure get-tasks(process_id, n, source) is used to assign n tasks to the analysis process process_id. Initially a call to _quickMaintain() is performed to deallocate time-expired tasks. It inserts n random unassigned tasks from the tasks table into the tasks-progress table along with the process_id and the current timestamp. It returns via a SELECT statement these tasks.

6.1.4 get-tasks-by-file() stored procedure

Initially a call to _quickMaintain() is performed to deallocate time-expired tasks. The stored procedure get-tasks(process_id, source) then assigns all outstanding tasks for a randomly chosen file to the analysis process process_id. It inserts these tasks from the tasks table into the tasks-progress table along with the process_id and the current timestamp. It returns via a SELECT statement these tasks.

6.1.5 delete-task() stored proecure

The stored procedure delete-task(process_id, source, id, task) marks a task as complete by deleting it from the tasks table. A trigger in the tasks table will delete the matching row from tasks-progress. The started time of any remaining tasks assigned to the same process_id in the tasks-progess table will be updated to the current time. A call is made to the stored procedure _quickMaintain() which will perform routine maintenance tasks, including removing any expired tasks in the tasks-progress table.

6.2 audioBlastAnalyse Implementation

The analysis suite audioBlastAnalyse has several functions that implement this task queue in the R language.

All these functions require a database connector (db) as a parameter, and a unique process_id for the analysis process. In the majority of cases these functions will be called automatically from within a call to the main analyse() function, where the database connector will already be configured and a process identifier is automatically generated.

6.2.1 Legacy mode

On some older Linux operating systems the RMariaDB package and libraries it builds against have issues dealing with stored procedures. For the time being a legacy mode is provided that implements the stored procedures as a sequence of queries. Setting legacy=TRUE as a parameter to either of the fetch queries will activate this mode.

6.2.2 Getting tasks

The get-tasks() stored procedure is accessed by using fetchDownloadableRecordings(), and get-tasks-by-file() by using fetchDownloadableRecordings().

fetchDownloadableRecordings(db, source, process_id, legacy=FALSE)
fetchUnanalysedRecordings(db, source, process_id, legacy=FALSE)

6.2.3 Removing completed tasks

deleteToDo(db, source, id, task, process_id)