Skip to content

Document Processing Workflow

Document processing is one of the core functions of the DocuSnap-Backend system, used to convert document images into structured text and extract key information. This page details the complete document processing workflow.

Workflow Overview

The document processing workflow converts document images into structured text data, including document title, type, content summary, key information, etc. The entire process includes image processing, OCR recognition, LLM analysis, and result generation.

graph TD
    A["Receive Document Image"] --> B["Request Validation & Decryption"]
    B --> C["Check Cache"]
    C -->|"Cache Hit"| D["Return Cached Result"]
    C -->|"Cache Miss"| E["Create Task"]
    E --> F["OCR Processing"]
    F --> G["LLM Document Analysis"]
    G --> H["Result Storage"]
    H --> I["Return Processing Result"]

Detailed Process

1. Request Reception and Validation

The document processing workflow begins with receiving a client request:

  1. Receive Request:
  2. Client sends request through the /api/process_document endpoint
  3. Request contains encrypted document images (Base64 format)
  4. Request uses RSA and AES hybrid encryption

  5. Request Decryption:

  6. Use RSA private key to decrypt AES key
  7. Use AES key to decrypt request data
  8. Verify request signature to ensure data integrity

  9. Parameter Validation:

  10. Validate if request parameters are complete
  11. Validate if image format is correct
  12. Validate if image count is within allowed range

Code Example:

@app.route('/api/process_document', methods=['POST'])
def process_document():
    try:
        # Decrypt request
        if not request.is_json:
            return jsonify({"error": "Request must be in JSON format"}), 400

        data = request.get_json()

        # Validate required parameters
        required_fields = ['encrypted_data', 'encrypted_key', 'signature']
        for field in required_fields:
            if field not in data:
                return jsonify({"error": f"Missing required parameter: {field}"}), 400

        # Decrypt request data
        try:
            decrypted_data, aes_key = decrypt_request(
                data['encrypted_data'],
                data['encrypted_key'],
                data['signature']
            )
        except Exception as e:
            return jsonify({"error": f"Request decryption failed: {str(e)}"}), 400

        # Validate decrypted data
        if 'images' not in decrypted_data or not isinstance(decrypted_data['images'], list):
            return jsonify({"error": "Missing images parameter or incorrect format"}), 400

        if len(decrypted_data['images']) == 0:
            return jsonify({"error": "At least one image must be provided"}), 400

        if len(decrypted_data['images']) > MAX_IMAGES:
            return jsonify({"error": f"Image count exceeds limit (maximum {MAX_IMAGES})"}), 400

        # Process request...

2. Cache Check and Task Creation

Before processing the request, the system checks if there are cached results to avoid duplicate computation:

  1. Cache Check:
  2. Generate cache key based on request parameters
  3. Query database for matching cached results
  4. If cache hit, directly return cached result

  5. Task Creation:

  6. If cache miss, generate a unique task ID
  7. Create task record, set status to "pending"
  8. Add task to processing queue

  9. Return Task ID:

  10. Return task ID to client
  11. Client can use task ID to query processing status and results

Code Example:

# Check cache
cache_key = sha256_hash(json.dumps(decrypted_data, sort_keys=True))
cached_result = check_cache(cache_key)

if cached_result:
    # Return cached result
    response = encrypt_response(cached_result, aes_key)
    return jsonify(response), 200

# Create task
task_id = str(uuid.uuid4())
create_task_record(task_id, 'document')

# Build task object
task = {
    'id': task_id,
    'type': 'document',
    'data': decrypted_data,
    'cache_key': cache_key,
    'aes_key': aes_key
}

# Add task to queue
add_task_to_queue(task)

# Return task ID
response = encrypt_response({'task_id': task_id}, aes_key)
return jsonify(response), 202  # 202 Accepted

3. OCR Processing

After a task enters the queue, worker threads perform OCR processing:

  1. Image Preprocessing:
  2. Decode Base64 images
  3. Perform necessary image optimization (such as resizing, contrast enhancement, etc.)

  4. OCR Service Call:

  5. Call CnOCR service for text recognition
  6. Process multiple images in parallel to improve efficiency
  7. Use semaphores to control concurrency

  8. OCR Result Processing:

  9. Collect OCR results for all images
  10. Merge into complete text content
  11. Prepare input for LLM analysis

Code Example:

def process_images(images):
    """Process multiple images in parallel"""
    # Create semaphore to control concurrent access to OCR service
    ocr_semaphore = threading.Semaphore(OCR_MAX_CONCURRENT)

    # Create thread pool
    with ThreadPoolExecutor(max_workers=OCR_MAX_CONCURRENT) as executor:
        # Submit all image processing tasks
        future_to_image = {
            executor.submit(process_single_image, image, ocr_semaphore): i 
            for i, image in enumerate(images)
        }

        # Collect results
        results = []
        for future in as_completed(future_to_image):
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                logging.error(f"Image processing failed: {str(e)}")

        # Sort results by original order
        results.sort(key=lambda x: x['index'])
        return [r['text'] for r in results]

4. LLM Processing

After OCR processing is complete, the system uses LLM to analyze the text content:

  1. Prompt Construction:
  2. Use document processing prompt template
  3. Insert OCR text into prompt template
  4. Guide LLM to analyze document structure and content

  5. LLM API Call:

  6. Call Zhipu AI's LLM API
  7. Send constructed prompt
  8. Receive LLM response

  9. Response Parsing:

  10. Parse text returned by LLM
  11. Extract structured data in JSON format
  12. Handle potential format issues

Code Example:

def process_document_task(task):
    """Process document type tasks"""
    update_task_status(task['id'], 'processing')

    try:
        # Process images
        ocr_results = process_images(task['data']['images'])

        # Build prompt and call LLM
        prompt = build_document_prompt(ocr_results)
        llm_result = call_llm_api(prompt)

        # Parse results
        parsed_result = parse_document_result(llm_result)

        # Format results
        final_result = format_document_result(parsed_result)

        # Update task status and results
        update_task_status(task['id'], 'completed', final_result)

        # Store cache
        store_cache(task['cache_key'], final_result)

    except Exception as e:
        update_task_status(task['id'], 'error', {"error": str(e)})
        raise

5. Result Processing

After LLM processing is complete, the system processes and stores the results:

  1. Result Formatting:
  2. Convert parsed LLM results to standard format
  3. Add metadata (such as processing time, source, etc.)
  4. Ensure output format consistency

  5. Result Storage:

  6. Update task status to "completed"
  7. Store processing results
  8. Add results to cache

  9. Error Handling:

  10. If errors occur during processing, record error information
  11. Update task status to "error"
  12. Store error details

Code Example:

def format_document_result(parsed_result):
    """Format document processing results"""
    # Ensure result contains necessary fields
    result = {
        "title": parsed_result.get("title", "Unknown Title"),
        "document_type": parsed_result.get("document_type", "Unknown Type"),
        "summary": parsed_result.get("summary", ""),
        "key_dates": parsed_result.get("key_dates", []),
        "key_figures": parsed_result.get("key_figures", []),
        "key_entities": parsed_result.get("key_entities", []),
        "metadata": {
            "processed_at": int(time.time()),
            "source": "document_processing"
        }
    }

    return result

6. Status Query and Result Return

Clients can use the task ID to query processing status and results:

  1. Status Query:
  2. Client queries task status through the /api/task_status endpoint
  3. Provides task ID as parameter
  4. System returns current task status (pending, processing, completed, error)

  5. Result Query:

  6. If task is completed, system returns processing results
  7. If task is not completed, system returns current status
  8. If task has error, system returns error information

  9. Response Encryption:

  10. Encrypt response data using previous AES key
  11. Calculate signature for response data
  12. Return encrypted response and signature

Code Example:

@app.route('/api/task_status', methods=['POST'])
def get_task_status():
    try:
        # Decrypt request
        if not request.is_json:
            return jsonify({"error": "Request must be in JSON format"}), 400

        data = request.get_json()

        # Validate required parameters
        required_fields = ['encrypted_data', 'encrypted_key', 'signature']
        for field in required_fields:
            if field not in data:
                return jsonify({"error": f"Missing required parameter: {field}"}), 400

        # Decrypt request data
        try:
            decrypted_data, aes_key = decrypt_request(
                data['encrypted_data'],
                data['encrypted_key'],
                data['signature']
            )
        except Exception as e:
            return jsonify({"error": f"Request decryption failed: {str(e)}"}), 400

        # Validate task ID
        if 'task_id' not in decrypted_data:
            return jsonify({"error": "Missing task_id parameter"}), 400

        task_id = decrypted_data['task_id']

        # Query task status
        task_result = get_task_result(task_id)

        if not task_result:
            return jsonify({"error": "Task does not exist"}), 404

        # Encrypt response
        response = encrypt_response(task_result, aes_key)
        return jsonify(response), 200

    except Exception as e:
        return jsonify({"error": f"Failed to process request: {str(e)}"}), 500

Document Processing Prompt

The document processing workflow uses a specially designed prompt template to guide LLM in analyzing document content:

You are a professional document analysis assistant. Please analyze the following document content, extract key information, and output in the specified format.

Document content:
{document_text}

Please provide the following information:
1. Document title
2. Document type (e.g., report, contract, notice, etc.)
3. Main content summary
4. Key dates and figures
5. Important names and organizations

Please output the result in JSON format, including the following fields:
- title: Document title
- document_type: Document type
- summary: Content summary (100-200 words)
- key_dates: List of key dates, each item includes date (date) and context (context)
- key_figures: List of key figures, each item includes figure (figure) and context (context)
- key_entities: List of important names and organizations, each item includes name (name) and role (role)

This prompt template guides LLM to extract key information from documents and output results in structured JSON format.

Processing Result Example

The output of the document processing workflow is structured JSON data containing key information about the document:

{
  "title": "Report on 2023 Budget Execution",
  "document_type": "Financial Report",
  "summary": "This report provides a detailed analysis of the 2023 budget execution, including income and expenditure overview, major project execution progress, and existing issues. Overall execution is good, with income achievement rate of 98%, expenditure controlled within budget, and major projects progressing as planned.",
  "key_dates": [
    {
      "date": "2023-01-15",
      "context": "Budget plan officially approved"
    },
    {
      "date": "2023-06-30",
      "context": "First half budget execution review"
    },
    {
      "date": "2023-12-31",
      "context": "Annual budget execution deadline"
    }
  ],
  "key_figures": [
    {
      "figure": "1,500,000",
      "context": "Annual total budget (yuan)"
    },
    {
      "figure": "1,470,000",
      "context": "Actual total income (yuan)"
    },
    {
      "figure": "1,420,000",
      "context": "Actual total expenditure (yuan)"
    },
    {
      "figure": "98%",
      "context": "Income achievement rate"
    }
  ],
  "key_entities": [
    {
      "name": "Zhang Ming",
      "role": "Financial Director"
    },
    {
      "name": "Finance Committee",
      "role": "Budget approval organization"
    },
    {
      "name": "Li Hua",
      "role": "Budget execution manager"
    }
  ],
  "metadata": {
    "processed_at": 1679012345,
    "source": "document_processing"
  }
}

Error Handling

The document processing workflow implements comprehensive error handling mechanisms:

  1. Request Errors:
  2. Missing parameters or incorrect format
  3. Decryption failure or signature verification failure
  4. Image count exceeds limit

  5. OCR Errors:

  6. Image decoding failure
  7. OCR service call failure
  8. OCR result format error

  9. LLM Errors:

  10. LLM API call failure
  11. Response parsing failure
  12. Result format does not meet expectations

  13. System Errors:

  14. Database operation failure
  15. Insufficient resources
  16. Service unavailable

Each type of error has a corresponding handling strategy, ensuring the system can gracefully handle exceptions and provide clear error feedback.

Performance Optimization

The document processing workflow implements various performance optimization measures:

  1. Parallel Processing:
  2. Use thread pools to process multiple images in parallel
  3. Improve processing efficiency, reduce total processing time

  4. Caching Mechanism:

  5. Cache processing results to avoid duplicate computation
  6. Improve response speed for identical requests

  7. Asynchronous Processing:

  8. Use task queues and worker threads for asynchronous processing
  9. Improve system responsiveness and concurrent processing capability

  10. Resource Control:

  11. Use semaphores to control concurrency
  12. Avoid excessive resource consumption and service overload

Through these designs and implementations, the document processing workflow can efficiently and reliably convert document images into structured text data, providing valuable document analysis services to users.