File size: 6,103 Bytes
03c0888
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import os
import asyncio
import logging
from pathlib import Path
import aiosqlite
from typing import Optional
import xxhash
import aiofiles
import shutil
import time
from datetime import datetime
from .async_logger import AsyncLogger, LogLevel

# Initialize logger
logger = AsyncLogger(log_level=LogLevel.DEBUG, verbose=True)

# logging.basicConfig(level=logging.INFO)
# logger = logging.getLogger(__name__)

class DatabaseMigration:
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.content_paths = self._ensure_content_dirs(os.path.dirname(db_path))
        
    def _ensure_content_dirs(self, base_path: str) -> dict:
        dirs = {
            'html': 'html_content',
            'cleaned': 'cleaned_html',
            'markdown': 'markdown_content', 
            'extracted': 'extracted_content',
            'screenshots': 'screenshots'
        }
        content_paths = {}
        for key, dirname in dirs.items():
            path = os.path.join(base_path, dirname)
            os.makedirs(path, exist_ok=True)
            content_paths[key] = path
        return content_paths

    def _generate_content_hash(self, content: str) -> str:
        x = xxhash.xxh64()
        x.update(content.encode())
        content_hash = x.hexdigest()
        return content_hash
        # return hashlib.sha256(content.encode()).hexdigest()

    async def _store_content(self, content: str, content_type: str) -> str:
        if not content:
            return ""
        
        content_hash = self._generate_content_hash(content)
        file_path = os.path.join(self.content_paths[content_type], content_hash)
        
        if not os.path.exists(file_path):
            async with aiofiles.open(file_path, 'w', encoding='utf-8') as f:
                await f.write(content)
                
        return content_hash

    async def migrate_database(self):
        """Migrate existing database to file-based storage"""
        # logger.info("Starting database migration...")
        logger.info("Starting database migration...", tag="INIT")
        
        try:
            async with aiosqlite.connect(self.db_path) as db:
                # Get all rows
                async with db.execute(
                    '''SELECT url, html, cleaned_html, markdown, 
                       extracted_content, screenshot FROM crawled_data'''
                ) as cursor:
                    rows = await cursor.fetchall()

                migrated_count = 0
                for row in rows:
                    url, html, cleaned_html, markdown, extracted_content, screenshot = row
                    
                    # Store content in files and get hashes
                    html_hash = await self._store_content(html, 'html')
                    cleaned_hash = await self._store_content(cleaned_html, 'cleaned')
                    markdown_hash = await self._store_content(markdown, 'markdown')
                    extracted_hash = await self._store_content(extracted_content, 'extracted')
                    screenshot_hash = await self._store_content(screenshot, 'screenshots')

                    # Update database with hashes
                    await db.execute('''
                        UPDATE crawled_data 
                        SET html = ?, 
                            cleaned_html = ?,
                            markdown = ?,
                            extracted_content = ?,
                            screenshot = ?
                        WHERE url = ?
                    ''', (html_hash, cleaned_hash, markdown_hash, 
                         extracted_hash, screenshot_hash, url))
                    
                    migrated_count += 1
                    if migrated_count % 100 == 0:
                        logger.info(f"Migrated {migrated_count} records...", tag="INIT")
                        

                await db.commit()
                logger.success(f"Migration completed. {migrated_count} records processed.", tag="COMPLETE")

        except Exception as e:
            # logger.error(f"Migration failed: {e}")
            logger.error(
                message="Migration failed: {error}",
                tag="ERROR",
                params={"error": str(e)}
            )
            raise e

async def backup_database(db_path: str) -> str:
    """Create backup of existing database"""
    if not os.path.exists(db_path):
        logger.info("No existing database found. Skipping backup.", tag="INIT")
        return None
        
    # Create backup with timestamp
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    backup_path = f"{db_path}.backup_{timestamp}"
    
    try:
        # Wait for any potential write operations to finish
        await asyncio.sleep(1)
        
        # Create backup
        shutil.copy2(db_path, backup_path)
        logger.info(f"Database backup created at: {backup_path}", tag="COMPLETE")
        return backup_path
    except Exception as e:
        # logger.error(f"Backup failed: {e}")
        logger.error(
                message="Migration failed: {error}",
                tag="ERROR",
                params={"error": str(e)}
            )
        raise e
    
async def run_migration(db_path: Optional[str] = None):
    """Run database migration"""
    if db_path is None:
        db_path = os.path.join(Path.home(), ".crawl4ai", "crawl4ai.db")
    
    if not os.path.exists(db_path):
        logger.info("No existing database found. Skipping migration.", tag="INIT")
        return
        
    # Create backup first
    backup_path = await backup_database(db_path)
    if not backup_path:
        return
    
    migration = DatabaseMigration(db_path)
    await migration.migrate_database()
    
def main():
    """CLI entry point for migration"""
    import argparse
    parser = argparse.ArgumentParser(description='Migrate Crawl4AI database to file-based storage')
    parser.add_argument('--db-path', help='Custom database path')
    args = parser.parse_args()
    
    asyncio.run(run_migration(args.db_path))

if __name__ == "__main__":
    main()