百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文
Python疯狂练习60天——第十四天_疯狂python讲义豆瓣评分

Python疯狂练习60天——第十四天_疯狂python讲义豆瓣评分

  • 网站名称:Python疯狂练习60天——第十四天_疯狂python讲义豆瓣评分
  • 网站分类:技术文章
  • 收录时间:2025-10-01 20:03
  • 网站地址:

进入网站

“Python疯狂练习60天——第十四天_疯狂python讲义豆瓣评分” 网站介绍

今日练习主题:数据序列化与存储

今天我们将学习Python中的数据序列化格式和存储技术,包括JSON、XML、Pickle、CSV、数据库操作等。

练习1:JSON序列化

import json
import datetime
from decimal import Decimal
from enum import Enum

class Status(Enum):
    ACTIVE = "active"
    INACTIVE = "inactive"
    PENDING = "pending"

class CustomEncoder(json.JSONEncoder):
    """自定义JSON编码器"""
    def default(self, obj):
        if isinstance(obj, datetime.datetime):
            return obj.isoformat()
        elif isinstance(obj, datetime.date):
            return obj.isoformat()
        elif isinstance(obj, Decimal):
            return float(obj)
        elif isinstance(obj, Enum):
            return obj.value
        elif isinstance(obj, set):
            return list(obj)
        return super().default(obj)

def json_examples():
    """JSON序列化示例"""
    print("=== JSON序列化 ===")
    
    # 基本数据类型
    data = {
        "string": "Hello World",
        "number": 42,
        "float": 3.14,
        "boolean": True,
        "null": None,
        "list": [1, 2, 3],
        "dict": {"key": "value"}
    }
    
    # 序列化到字符串
    json_str = json.dumps(data, indent=2, ensure_ascii=False)
    print("JSON字符串:")
    print(json_str)
    
    # 反序列化
    parsed_data = json.loads(json_str)
    print("\n反序列化数据:")
    print(parsed_data)
    
    # 文件操作
    with open('data.json', 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=2, ensure_ascii=False)
    
    with open('data.json', 'r', encoding='utf-8') as f:
        loaded_data = json.load(f)
    
    print("\n从文件加载的数据:")
    print(loaded_data)
    
    # 复杂对象序列化
    complex_data = {
        "timestamp": datetime.datetime.now(),
        "date": datetime.date.today(),
        "price": Decimal("19.99"),
        "status": Status.ACTIVE,
        "tags": {"python", "json", "serialization"}
    }
    
    print("\n复杂对象序列化:")
    complex_json = json.dumps(complex_data, cls=CustomEncoder, indent=2)
    print(complex_json)
    
    # JSONPath查询(简单实现)
    def json_path(data, path):
        """简单的JSONPath查询"""
        keys = path.split('.')
        current = data
        for key in keys:
            if isinstance(current, dict) and key in current:
                current = current[key]
            else:
                return None
        return current
    
    sample_data = {
        "user": {
            "name": "Alice",
            "age": 25,
            "address": {
                "city": "Beijing",
                "zipcode": "100000"
            }
        }
    }
    
    print(f"\nJSONPath查询: user.address.city = {json_path(sample_data, 'user.address.city')}")

json_examples()

练习2:XML处理

import xml.etree.ElementTree as ET
from xml.dom import minidom

def xml_examples():
    """XML处理示例"""
    print("\n=== XML处理 ===")
    
    # 创建XML文档
    root = ET.Element("catalog")
    
    # 添加子元素
    for i in range(3):
        book = ET.SubElement(root, "book", id=str(i+1))
        title = ET.SubElement(book, "title")
        title.text = f"Python Book {i+1}"
        author = ET.SubElement(book, "author")
        author.text = f"Author {i+1}"
        price = ET.SubElement(book, "price")
        price.text = str(29.99 + i)
    
    # 转换为字符串并美化输出
    rough_string = ET.tostring(root, 'utf-8')
    reparsed = minidom.parseString(rough_string)
    pretty_xml = reparsed.toprettyxml(indent="  ")
    
    print("生成的XML:")
    print(pretty_xml)
    
    # 保存到文件
    with open('catalog.xml', 'w', encoding='utf-8') as f:
        f.write(pretty_xml)
    
    # 解析XML文件
    print("\n解析XML文件:")
    tree = ET.parse('catalog.xml')
    root = tree.getroot()
    
    # 遍历XML元素
    for book in root.findall('book'):
        book_id = book.get('id')
        title = book.find('title').text
        author = book.find('author').text
        price = book.find('price').text
        print(f"书ID: {book_id}, 标题: {title}, 作者: {author}, 价格: {price}")
    
    # XPath查询
    print("\nXPath查询:")
    expensive_books = root.findall(".//book[price>30]")
    for book in expensive_books:
        title = book.find('title').text
        price = book.find('price').text
        print(f"昂贵书籍: {title}, 价格: {price}")

xml_examples()

练习3:Pickle序列化

import pickle
import datetime

class User:
    """用户类"""
    def __init__(self, name, email, created_at=None):
        self.name = name
        self.email = email
        self.created_at = created_at or datetime.datetime.now()
        self._password = None  # 注意:敏感数据不应序列化
    
    def set_password(self, password):
        self._password = password
    
    def __repr__(self):
        return f"User(name='{self.name}', email='{self.email}', created_at='{self.created_at}')"

def pickle_examples():
    """Pickle序列化示例"""
    print("\n=== Pickle序列化 ===")
    
    # 创建复杂对象
    users = [
        User("Alice", "alice@example.com"),
        User("Bob", "bob@example.com"),
        User("Charlie", "charlie@example.com")
    ]
    
    # 设置密码(注意安全风险)
    users[0].set_password("secret123")
    
    # 序列化到字节
    print("序列化对象:")
    pickle_data = pickle.dumps(users)
    print(f"Pickle数据长度: {len(pickle_data)} 字节")
    
    # 反序列化
    loaded_users = pickle.loads(pickle_data)
    print("\n反序列化对象:")
    for user in loaded_users:
        print(user)
    
    # 文件操作
    with open('users.pkl', 'wb') as f:  # 注意是二进制模式
        pickle.dump(users, f)
    
    with open('users.pkl', 'rb') as f:
        file_users = pickle.load(f)
    
    print("\n从文件加载的用户:")
    for user in file_users:
        print(user)
    
    # 安全警告
    print("\n安全提示: Pickle可以执行任意代码,不要反序列化不受信任的数据!")

pickle_examples()

练习4:CSV文件处理

import csv
import json

def csv_examples():
    """CSV文件处理示例"""
    print("\n=== CSV文件处理 ===")
    
    # 准备数据
    users = [
        {"name": "Alice", "age": 25, "city": "Beijing", "email": "alice@example.com"},
        {"name": "Bob", "age": 30, "city": "Shanghai", "email": "bob@example.com"},
        {"name": "Charlie", "age": 35, "city": "Guangzhou", "email": "charlie@example.com"}
    ]
    
    # 写入CSV文件
    with open('users.csv', 'w', encoding='utf-8', newline='') as f:
        fieldnames = ['name', 'age', 'city', 'email']
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        
        writer.writeheader()
        writer.writerows(users)
    
    print("CSV文件写入完成")
    
    # 读取CSV文件
    print("\n读取CSV文件:")
    with open('users.csv', 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for i, row in enumerate(reader, 1):
            print(f"行 {i}: {dict(row)}")
    
    # 使用不同的分隔符
    print("\n使用分号分隔符:")
    with open('users_semicolon.csv', 'w', encoding='utf-8', newline='') as f:
        writer = csv.writer(f, delimiter=';')
        writer.writerow(['name', 'age', 'city'])
        writer.writerow(['Alice', 25, 'Beijing'])
        writer.writerow(['Bob', 30, 'Shanghai'])
    
    # 读取分号分隔的文件
    with open('users_semicolon.csv', 'r', encoding='utf-8') as f:
        reader = csv.reader(f, delimiter=';')
        for row in reader:
            print(row)
    
    # CSV与JSON转换
    print("\nCSV转JSON:")
    csv_to_json('users.csv', 'users.json')

def csv_to_json(csv_file, json_file):
    """将CSV文件转换为JSON文件"""
    data = []
    
    with open(csv_file, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            data.append(row)
    
    with open(json_file, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=2, ensure_ascii=False)
    
    print(f"已转换 {len(data)} 行数据到 {json_file}")

csv_examples()

练习5:SQLite数据库

import sqlite3
import json
from datetime import datetime

def sqlite_examples():
    """SQLite数据库示例"""
    print("\n=== SQLite数据库 ===")
    
    # 连接数据库(如果不存在会创建)
    conn = sqlite3.connect('example.db')
    
    # 创建游标
    cursor = conn.cursor()
    
    # 创建表
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS users (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        email TEXT UNIQUE NOT NULL,
        age INTEGER,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    ''')
    
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS orders (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        user_id INTEGER,
        product_name TEXT NOT NULL,
        price REAL NOT NULL,
        quantity INTEGER DEFAULT 1,
        order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        FOREIGN KEY (user_id) REFERENCES users (id)
    )
    ''')
    
    # 插入数据
    users_data = [
        ('Alice', 'alice@example.com', 25),
        ('Bob', 'bob@example.com', 30),
        ('Charlie', 'charlie@example.com', 35)
    ]
    
    cursor.executemany(
        'INSERT OR IGNORE INTO users (name, email, age) VALUES (?, ?, ?)',
        users_data
    )
    
    # 插入订单数据
    orders_data = [
        (1, 'Python Book', 29.99, 2),
        (1, 'Mouse', 15.50, 1),
        (2, 'Keyboard', 45.00, 1),
        (3, 'Monitor', 199.99, 1)
    ]
    
    cursor.executemany(
        'INSERT INTO orders (user_id, product_name, price, quantity) VALUES (?, ?, ?, ?)',
        orders_data
    )
    
    # 提交事务
    conn.commit()
    
    # 查询数据
    print("所有用户:")
    cursor.execute('SELECT * FROM users')
    for row in cursor.fetchall():
        print(row)
    
    print("\n用户及其订单:")
    cursor.execute('''
    SELECT u.name, u.email, o.product_name, o.price, o.quantity
    FROM users u
    JOIN orders o ON u.id = o.user_id
    ORDER BY u.name
    ''')
    
    for row in cursor.fetchall():
        print(row)
    
    # 参数化查询
    print("\n查询年龄大于30的用户:")
    cursor.execute('SELECT * FROM users WHERE age > ?', (30,))
    for row in cursor.fetchall():
        print(row)
    
    # 使用字典式游标
    print("\n使用字典式游标:")
    conn.row_factory = sqlite3.Row
    cursor = conn.cursor()
    
    cursor.execute('SELECT * FROM users')
    for row in cursor.fetchall():
        print(dict(row))
    
    # 数据库导出为JSON
    export_to_json(conn, 'users_export.json')
    
    # 关闭连接
    conn.close()

def export_to_json(conn, filename):
    """将数据库表导出为JSON"""
    cursor = conn.cursor()
    
    # 获取所有表名
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
    tables = [row[0] for row in cursor.fetchall()]
    
    data = {}
    
    for table in tables:
        cursor.execute(f'SELECT * FROM {table}')
        rows = cursor.fetchall()
        
        # 获取列名
        col_names = [description[0] for description in cursor.description]
        
        # 转换为字典列表
        table_data = []
        for row in rows:
            table_data.append(dict(zip(col_names, row)))
        
        data[table] = table_data
    
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=2, ensure_ascii=False, default=str)
    
    print(f"\n数据库已导出到 {filename}")

sqlite_examples()

练习6:SQLAlchemy ORM

from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime

# 创建基类
Base = declarative_base()

class User(Base):
    """用户模型"""
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(50), nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    age = Column(Integer)
    created_at = Column(DateTime, default=datetime.now)
    
    # 关系
    orders = relationship("Order", back_populates="user")
    
    def __repr__(self):
        return f"<User(name='{self.name}', email='{self.email}')>"

class Order(Base):
    """订单模型"""
    __tablename__ = 'orders'
    
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    product_name = Column(String(100), nullable=False)
    price = Column(Float, nullable=False)
    quantity = Column(Integer, default=1)
    order_date = Column(DateTime, default=datetime.now)
    
    # 关系
    user = relationship("User", back_populates="orders")
    
    def __repr__(self):
        return f"<Order(product='{self.product_name}', price={self.price})>"

def sqlalchemy_examples():
    """SQLAlchemy ORM示例"""
    print("\n=== SQLAlchemy ORM ===")
    
    # 创建引擎
    engine = create_engine('sqlite:///orm_example.db', echo=False)
    
    # 创建表
    Base.metadata.create_all(engine)
    
    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()
    
    # 创建数据
    users = [
        User(name="Alice", email="alice@example.com", age=25),
        User(name="Bob", email="bob@example.com", age=30),
        User(name="Charlie", email="charlie@example.com", age=35)
    ]
    
    # 添加到会话
    session.add_all(users)
    session.commit()
    
    # 创建订单
    orders = [
        Order(user=users[0], product_name="Python Book", price=29.99, quantity=2),
        Order(user=users[0], product_name="Mouse", price=15.50),
        Order(user=users[1], product_name="Keyboard", price=45.00),
        Order(user=users[2], product_name="Monitor", price=199.99)
    ]
    
    session.add_all(orders)
    session.commit()
    
    # 查询数据
    print("所有用户:")
    all_users = session.query(User).all()
    for user in all_users:
        print(user)
    
    print("\n查询年龄大于28的用户:")
    older_users = session.query(User).filter(User.age > 28).all()
    for user in older_users:
        print(user)
    
    print("\n用户及其订单:")
    users_with_orders = session.query(User).join(Order).all()
    for user in users_with_orders:
        print(f"{user.name} 的订单:")
        for order in user.orders:
            print(f"  - {order.product_name}: ${order.price} x {order.quantity}")
    
    # 更新数据
    print("\n更新用户年龄:")
    user = session.query(User).filter_by(name="Alice").first()
    if user:
        user.age = 26
        session.commit()
        print(f"更新后: {user}")
    
    # 删除数据
    print("\n删除用户:")
    user_to_delete = session.query(User).filter_by(name="Charlie").first()
    if user_to_delete:
        session.delete(user_to_delete)
        session.commit()
        print("用户已删除")
    
    # 复杂查询
    print("\n统计每个用户的订单总金额:")
    from sqlalchemy import func
    
    result = session.query(
        User.name,
        func.sum(Order.price * Order.quantity).label('total_spent')
    ).join(Order).group_by(User.id).all()
    
    for name, total in result:
        print(f"{name}: ${total:.2f}")
    
    # 关闭会话
    session.close()

sqlalchemy_examples()

练习7:数据压缩

import gzip
import bz2
import lzma
import json

def compression_examples():
    """数据压缩示例"""
    print("\n=== 数据压缩 ===")
    
    # 准备测试数据
    data = {
        "users": [
            {"id": i, "name": f"User_{i}", "email": f"user_{i}@example.com"}
            for i in range(1000)
        ]
    }
    
    json_data = json.dumps(data)
    original_size = len(json_data.encode('utf-8'))
    print(f"原始数据大小: {original_size} 字节")
    
    # Gzip压缩
    gzip_data = gzip.compress(json_data.encode('utf-8'))
    gzip_size = len(gzip_data)
    gzip_ratio = (1 - gzip_size / original_size) * 100
    print(f"Gzip压缩后: {gzip_size} 字节, 压缩率: {gzip_ratio:.1f}%")
    
    # 保存gzip文件
    with gzip.open('data.json.gz', 'wb') as f:
        f.write(json_data.encode('utf-8'))
    
    # 读取gzip文件
    with gzip.open('data.json.gz', 'rb') as f:
        decompressed_data = f.read().decode('utf-8')
    print(f"Gzip解压缩验证: {len(decompressed_data) == original_size}")
    
    # Bzip2压缩
    bzip2_data = bz2.compress(json_data.encode('utf-8'))
    bzip2_size = len(bzip2_data)
    bzip2_ratio = (1 - bzip2_size / original_size) * 100
    print(f"Bzip2压缩后: {bzip2_size} 字节, 压缩率: {bzip2_ratio:.1f}%")
    
    # LZMA压缩 (xz)
    lzma_data = lzma.compress(json_data.encode('utf-8'))
    lzma_size = len(lzma_data)
    lzma_ratio = (1 - lzma_size / original_size) * 100
    print(f"LZMA压缩后: {lzma_size} 字节, 压缩率: {lzma_ratio:.1f}%")
    
    # 比较压缩效果
    print(f"\n压缩效果比较:")
    print(f"Gzip:  {gzip_size:6d} 字节 ({gzip_ratio:5.1f}% 压缩率)")
    print(f"Bzip2: {bzip2_size:6d} 字节 ({bzip2_ratio:5.1f}% 压缩率)")
    print(f"LZMA:  {lzma_size:6d} 字节 ({lzma_ratio:5.1f}% 压缩率)")

compression_examples()

练习8:配置文件处理

import configparser
import toml
import yaml

def config_examples():
    """配置文件处理示例"""
    print("\n=== 配置文件处理 ===")
    
    # INI格式 (configparser)
    print("INI格式配置:")
    config = configparser.ConfigParser()
    
    config['Database'] = {
        'host': 'localhost',
        'port': '5432',
        'username': 'admin',
        'password': 'secret',
        'database': 'mydb'
    }
    
    config['App'] = {
        'debug': 'true',
        'log_level': 'INFO',
        'max_workers': '4'
    }
    
    with open('config.ini', 'w', encoding='utf-8') as f:
        config.write(f)
    
    # 读取INI配置
    config_read = configparser.ConfigParser()
    config_read.read('config.ini', encoding='utf-8')
    
    db_host = config_read['Database']['host']
    app_debug = config_read.getboolean('App', 'debug')
    print(f"数据库主机: {db_host}, 调试模式: {app_debug}")
    
    # TOML格式
    print("\nTOML格式配置:")
    toml_config = {
        'database': {
            'host': 'localhost',
            'port': 5432,
            'credentials': {
                'username': 'admin',
                'password': 'secret'
            }
        },
        'app': {
            'debug': True,
            'log_level': 'INFO',
            'features': ['auth', 'api', 'logging']
        }
    }
    
    with open('config.toml', 'w', encoding='utf-8') as f:
        toml.dump(toml_config, f)
    
    # 读取TOML配置
    with open('config.toml', 'r', encoding='utf-8') as f:
        loaded_toml = toml.load(f)
    
    print(f"TOML配置: {loaded_toml['database']['host']}")
    
    # YAML格式
    print("\nYAML格式配置:")
    yaml_config = {
        'server': {
            'host': '0.0.0.0',
            'port': 8000,
            'ssl': {
                'enabled': True,
                'certfile': '/path/to/cert.pem'
            }
        },
        'database': {
            'url': 'postgresql://user:pass@localhost/db',
            'pool_size': 5,
            'echo': False
        },
        'features': ['rest', 'graphql', 'websocket']
    }
    
    with open('config.yaml', 'w', encoding='utf-8') as f:
        yaml.dump(yaml_config, f, default_flow_style=False, allow_unicode=True)
    
    # 读取YAML配置
    with open('config.yaml', 'r', encoding='utf-8') as f:
        loaded_yaml = yaml.safe_load(f)
    
    print(f"YAML配置: {loaded_yaml['server']['port']}")
    
    # 配置转换
    print("\n配置格式转换:")
    convert_config_to_json('config.ini', 'config_from_ini.json')
    convert_config_to_json('config.toml', 'config_from_toml.json')
    convert_config_to_json('config.yaml', 'config_from_yaml.json')

def convert_config_to_json(source_file, target_file):
    """将各种配置格式转换为JSON"""
    if source_file.endswith('.ini'):
        config = configparser.ConfigParser()
        config.read(source_file, encoding='utf-8')
        data = {s: dict(config.items(s)) for s in config.sections()}
    
    elif source_file.endswith('.toml'):
        with open(source_file, 'r', encoding='utf-8') as f:
            data = toml.load(f)
    
    elif source_file.endswith(('.yaml', '.yml')):
        with open(source_file, 'r', encoding='utf-8') as f:
            data = yaml.safe_load(f)
    
    else:
        return
    
    with open(target_file, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=2, ensure_ascii=False)
    
    print(f"已转换 {source_file} -> {target_file}")

config_examples()

今日挑战:

创建一个完整的数据导入导出工具,支持多种格式的转换和存储。

# 挑战练习:数据导入导出工具
import pandas as pd
import sqlite3
import json
import xml.etree.ElementTree as ET
from pathlib import Path

class DataTransformer:
    """数据转换工具"""
    
    def __init__(self):
        self.supported_formats = ['json', 'csv', 'xml', 'sqlite', 'excel']
    
    def detect_format(self, file_path):
        """检测文件格式"""
        path = Path(file_path)
        ext = path.suffix.lower()[1:]  # 去掉点号
        return ext if ext in self.supported_formats else None
    
    def read_data(self, file_path, format_type=None):
        """读取数据"""
        if format_type is None:
            format_type = self.detect_format(file_path)
        
        if format_type == 'json':
            with open(file_path, 'r', encoding='utf-8') as f:
                return json.load(f)
        
        elif format_type == 'csv':
            return pd.read_csv(file_path).to_dict('records')
        
        elif format_type == 'xml':
            return self._read_xml(file_path)
        
        elif format_type == 'sqlite':
            return self._read_sqlite(file_path)
        
        elif format_type in ['xlsx', 'xls']:
            return pd.read_excel(file_path).to_dict('records')
        
        else:
            raise ValueError(f"不支持的格式: {format_type}")
    
    def write_data(self, data, file_path, format_type):
        """写入数据"""
        if format_type == 'json':
            with open(file_path, 'w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, ensure_ascii=False)
        
        elif format_type == 'csv':
            df = pd.DataFrame(data)
            df.to_csv(file_path, index=False, encoding='utf-8')
        
        elif format_type == 'xml':
            self._write_xml(data, file_path)
        
        elif format_type == 'sqlite':
            self._write_sqlite(data, file_path)
        
        elif format_type == 'excel':
            df = pd.DataFrame(data)
            df.to_excel(file_path, index=False)
        
        else:
            raise ValueError(f"不支持的格式: {format_type}")
    
    def convert(self, source_path, target_path, target_format=None):
        """转换数据格式"""
        if target_format is None:
            target_format = self.detect_format(target_path)
        
        source_format = self.detect_format(source_path)
        
        print(f"转换: {source_path} ({source_format}) -> {target_path} ({target_format})")
        
        # 读取源数据
        data = self.read_data(source_path, source_format)
        
        # 写入目标格式
        self.write_data(data, target_path, target_format)
        
        print(f"转换完成! 共处理 {len(data)} 条记录")
    
    def _read_xml(self, file_path):
        """读取XML文件"""
        tree = ET.parse(file_path)
        root = tree.getroot()
        
        data = []
        for item in root:
            item_data = {}
            for child in item:
                item_data[child.tag] = child.text
            data.append(item_data)
        
        return data
    
    def _write_xml(self, data, file_path):
        """写入XML文件"""
        root = ET.Element('data')
        
        for item in data:
            item_elem = ET.SubElement(root, 'item')
            for key, value in item.items():
                child = ET.SubElement(item_elem, key)
                child.text = str(value)
        
        tree = ET.ElementTree(root)
        tree.write(file_path, encoding='utf-8', xml_declaration=True)
    
    def _read_sqlite(self, file_path, table_name='data'):
        """读取SQLite数据库"""
        conn = sqlite3.connect(file_path)
        conn.row_factory = sqlite3.Row
        cursor = conn.cursor()
        
        # 获取所有表名
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
        tables = [row[0] for row in cursor.fetchall()]
        
        if table_name not in tables:
            return []
        
        cursor.execute(f'SELECT * FROM {table_name}')
        rows = cursor.fetchall()
        
        data = [dict(row) for row in rows]
        conn.close()
        
        return data
    
    def _write_sqlite(self, data, file_path, table_name='data'):
        """写入SQLite数据库"""
        conn = sqlite3.connect(file_path)
        cursor = conn.cursor()
        
        # 删除已存在的表
        cursor.execute(f'DROP TABLE IF EXISTS {table_name}')
        
        # 创建新表
        if data:
            # 从第一条数据获取列名
            columns = list(data[0].keys())
            col_defs = ', '.join([f'{col} TEXT' for col in columns])
            cursor.execute(f'CREATE TABLE {table_name} (id INTEGER PRIMARY KEY, {col_defs})')
            
            # 插入数据
            for item in data:
                cols = ', '.join(item.keys())
                placeholders = ', '.join(['?' for _ in item])
                values = [str(v) for v in item.values()]
                
                cursor.execute(
                    f'INSERT INTO {table_name} ({cols}) VALUES ({placeholders})',
                    values
                )
        
        conn.commit()
        conn.close()

def demo_data_transformer():
    """演示数据转换工具"""
    print("=== 数据转换工具演示 ===")
    
    transformer = DataTransformer()
    
    # 创建测试数据
    test_data = [
        {'name': 'Alice', 'age': 25, 'city': 'Beijing', 'email': 'alice@example.com'},
        {'name': 'Bob', 'age': 30, 'city': 'Shanghai', 'email': 'bob@example.com'},
        {'name': 'Charlie', 'age': 35, 'city': 'Guangzhou', 'email': 'charlie@example.com'}
    ]
    
    # 先写入JSON格式
    with open('test_data.json', 'w', encoding='utf-8') as f:
        json.dump(test_data, f, indent=2, ensure_ascii=False)
    
    print("创建测试数据完成")
    
    # 执行各种转换
    conversions = [
        ('test_data.json', 'test_data.csv', 'csv'),
        ('test_data.json', 'test_data.xml', 'xml'),
        ('test_data.json', 'test_data.db', 'sqlite'),
        ('test_data.json', 'test_data.xlsx', 'excel'),
        ('test_data.csv', 'converted_from_csv.json', 'json'),
        ('test_data.xml', 'converted_from_xml.json', 'json')
    ]
    
    for source, target, target_format in conversions:
        try:
            transformer.convert(source, target, target_format)
        except Exception as e:
            print(f"转换失败 {source} -> {target}: {e}")
    
    # 验证转换结果
    print("\n验证转换结果:")
    for file in ['test_data.csv', 'test_data.xml', 'converted_from_csv.json']:
        if Path(file).exists():
            data = transformer.read_data(file)
            print(f"{file}: {len(data)} 条记录")
    
    # 清理文件
    print("\n清理测试文件...")
    for file in [
        'test_data.json', 'test_data.csv', 'test_data.xml', 
        'test_data.db', 'test_data.xlsx', 'converted_from_csv.json',
        'converted_from_xml.json', 'config.ini', 'config.toml', 
        'config.yaml', 'data.json', 'catalog.xml', 'users.pkl',
        'users.csv', 'users_semicolon.csv', 'users.json'
    ]:
        if Path(file).exists():
            Path(file).unlink()
    
    print("演示完成!")

# 运行演示
demo_data_transformer()

学习提示:

  1. JSON:最常用的数据交换格式,适合Web应用
  2. XML:结构化的标记语言,适合复杂数据结构
  3. Pickle:Python专用的二进制序列化,但存在安全风险
  4. CSV:简单的表格数据格式,适合Excel集成
  5. SQLite:轻量级文件数据库,适合本地存储
  6. SQLAlchemy:Python ORM工具,简化数据库操作
  7. 数据压缩:根据需求选择合适的压缩算法
  8. 配置文件:根据团队习惯选择INI、TOML或YAML

明天我们将学习Web开发基础!坚持练习,你的数据处理能力会越来越强