Skip to content

Task Processing Module

The Task Processing Module is the core coordinator of the DocuSnap-Backend system, responsible for managing the lifecycle and workflow of tasks. This module implements an asynchronous task processing mechanism, enhancing the system's concurrent processing capability through task queues and worker thread pools.

Module Responsibilities

The main responsibilities of the Task Processing Module include:

  1. Task Reception and Validation: Receiving client requests, validating request parameters and formats
  2. Task Queue Management: Adding tasks to the queue, managing task priorities and statuses
  3. Worker Thread Coordination: Allocating and managing worker threads to process tasks in the queue
  4. Processing Strategy Selection: Selecting appropriate processing strategies based on task types
  5. Task Status Tracking: Tracking and updating the processing status and results of tasks
  6. Inter-module Coordination: Coordinating the work between the OCR Processing Module and the LLM Processing Module

Core Components

1. Task Queue

The task queue is a thread-safe queue used to store pending tasks. The system uses the Queue class from the Python standard library to implement the task queue, ensuring safe operations in a multi-threaded environment.

Code Example:

# Initialize task queue
task_queue = Queue()

def add_task_to_queue(task):
    """Add a task to the queue"""
    task_queue.put(task)
    return task['id']

2. Worker Thread Pool

The worker thread pool consists of multiple worker threads responsible for retrieving tasks from the task queue and executing them. The system uses Python's threading module to create and manage worker threads.

Code Example:

# Initialize worker threads
def init_worker_threads(num_threads=4):
    """Initialize worker thread pool"""
    for _ in range(num_threads):
        thread = threading.Thread(target=worker, daemon=True)
        thread.start()

# Worker thread function
def worker():
    """Worker thread function, gets tasks from the queue and processes them"""
    while True:
        task = task_queue.get()
        try:
            # Select processing strategy based on task type
            if task['type'] == 'document':
                process_document_task(task)
            elif task['type'] == 'form':
                process_form_task(task)
            elif task['type'] == 'form_filling':
                process_form_filling_task(task)
        except Exception as e:
            # Error handling
            update_task_status(task['id'], 'error', str(e))
        finally:
            task_queue.task_done()

3. Task Status Management

Task status management is responsible for tracking and updating the processing status and results of tasks, including creating task records, updating task status, and storing processing results.

Code Example:

def create_task(task_type, task_data):
    """Create a new task and record it in the database"""
    task_id = str(uuid.uuid4())

    conn = get_db_connection()
    cursor = conn.cursor()

    cursor.execute(
        "INSERT INTO tasks (id, type, status, created_at) VALUES (?, ?, ?, ?)",
        (task_id, task_type, 'pending', int(time.time()))
    )

    conn.commit()
    conn.close()

    # Build task object
    task = {
        'id': task_id,
        'type': task_type,
        'data': task_data
    }

    return task

def update_task_status(task_id, status, result=None):
    """Update task status and result"""
    conn = get_db_connection()
    cursor = conn.cursor()

    if result:
        cursor.execute(
            "UPDATE tasks SET status = ?, result = ? WHERE id = ?",
            (status, json.dumps(result), task_id)
        )
    else:
        cursor.execute(
            "UPDATE tasks SET status = ? WHERE id = ?",
            (status, task_id)
        )

    conn.commit()
    conn.close()

4. Processing Strategy Selector

The processing strategy selector is responsible for selecting the appropriate processing strategy based on the task type, implementing the strategy pattern design concept.

Code Example:

def process_document_task(task):
    """Process document type tasks"""
    update_task_status(task['id'], 'processing')

    try:
        # Process images
        ocr_results = process_images(task['data']['images'])

        # Build prompt and call LLM
        prompt = build_document_prompt(ocr_results)
        llm_result = call_llm_api(prompt)

        # Parse results and update task status
        parsed_result = parse_document_result(llm_result)
        update_task_status(task['id'], 'completed', parsed_result)
    except Exception as e:
        update_task_status(task['id'], 'error', str(e))
        raise

def process_form_task(task):
    """Process form type tasks"""
    # Similar to process_document_task, but using form processing strategy
    # ...

def process_form_filling_task(task):
    """Process form filling type tasks"""
    # Similar to process_document_task, but using form filling strategy
    # ...

Workflow

The workflow of the Task Processing Module is as follows:

  1. Task Creation:
  2. Client sends a request
  3. System validates request parameters and format
  4. Creates task record and generates task ID
  5. Adds task to the task queue

  6. Task Assignment:

  7. Worker thread retrieves task from queue
  8. Selects processing strategy based on task type
  9. Updates task status to "processing"

  10. Task Execution:

  11. Calls OCR Processing Module to process images
  12. Calls LLM Processing Module to analyze text
  13. Processes and integrates results

  14. Task Completion:

  15. Updates task status to "completed" or "error"
  16. Stores processing results
  17. Prepares response data

  18. Result Query:

  19. Client queries task status
  20. System returns task status and results (if available)

Design Pattern Application

The Task Processing Module applies the following design patterns:

  1. Producer-Consumer Pattern:
  2. API endpoints act as producers, generating tasks and adding them to the queue
  3. Worker threads act as consumers, retrieving tasks from the queue and processing them
  4. The task queue decouples task generation and execution

  5. Strategy Pattern:

  6. Different processing strategies are selected based on task type
  7. Each task type corresponds to a processing function
  8. Facilitates adding new task types and processing strategies

  9. Command Pattern:

  10. Task objects encapsulate all execution information
  11. Worker threads execute tasks without needing to know specific details
  12. Supports asynchronous execution and status tracking of tasks

Concurrency Control

The Task Processing Module implements the following concurrency control mechanisms:

  1. Thread-safe Queue: Uses Python's Queue class to ensure thread safety of the task queue
  2. Worker Thread Pool: Limits the number of concurrent worker threads to avoid excessive resource consumption
  3. Task Status Locks: Uses database transactions to ensure atomicity of task status updates
  4. Semaphore Control: Uses semaphores to control concurrent access to external services

Error Handling

The Task Processing Module implements comprehensive error handling mechanisms:

  1. Exception Catching: Catches and logs exceptions during task processing
  2. Status Updates: Updates task status to "error" and records error information
  3. Error Recovery: Worker threads continue processing other tasks even if a task fails
  4. Error Notification: Reports errors to clients through the task status API

Module Interfaces

The Task Processing Module provides the following main interfaces:

  1. External Interfaces:
  2. process_document: API endpoint for processing document tasks
  3. process_form: API endpoint for processing form tasks
  4. process_form_filling: API endpoint for processing form filling tasks
  5. get_task_status: API endpoint for querying task status

  6. Internal Interfaces:

  7. add_task_to_queue: Adds tasks to the queue
  8. create_task: Creates task records
  9. update_task_status: Updates task status and results
  10. get_task_result: Retrieves task results

Performance Considerations

Performance optimization measures in the Task Processing Module include:

  1. Asynchronous Processing: Uses task queues and worker threads for asynchronous processing, improving response speed
  2. Parallel Execution: Multiple worker threads process tasks in parallel, improving throughput
  3. Resource Control: Limits the number of concurrent worker threads to avoid excessive resource consumption
  4. Cache Utilization: Uses caching to store and retrieve task results, avoiding repeated computation

Extensibility

The extensibility of the Task Processing Module is reflected in:

  1. Adding New Task Types: Can easily add new task types and processing strategies
  2. Adjusting Concurrency: Can adjust the number of worker threads based on requirements
  3. Optimizing Scheduling Strategies: Can implement more complex task scheduling and priority strategies
  4. Distributed Extension: Design supports future extension to a distributed task processing system

Through these designs and implementations, the Task Processing Module serves as the core coordinator of the DocuSnap-Backend system, effectively managing task processing workflows and improving the system's concurrent processing capability and response speed.