-
Python疯狂练习60天——第十四天_疯狂python讲义豆瓣评分
- 网站名称:Python疯狂练习60天——第十四天_疯狂python讲义豆瓣评分
- 网站分类:技术文章
- 收录时间:2025-10-01 20:03
- 网站地址:
“Python疯狂练习60天——第十四天_疯狂python讲义豆瓣评分” 网站介绍
今日练习主题:数据序列化与存储
今天我们将学习Python中的数据序列化格式和存储技术,包括JSON、XML、Pickle、CSV、数据库操作等。
练习1:JSON序列化
import json
import datetime
from decimal import Decimal
from enum import Enum
class Status(Enum):
ACTIVE = "active"
INACTIVE = "inactive"
PENDING = "pending"
class CustomEncoder(json.JSONEncoder):
"""自定义JSON编码器"""
def default(self, obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()
elif isinstance(obj, datetime.date):
return obj.isoformat()
elif isinstance(obj, Decimal):
return float(obj)
elif isinstance(obj, Enum):
return obj.value
elif isinstance(obj, set):
return list(obj)
return super().default(obj)
def json_examples():
"""JSON序列化示例"""
print("=== JSON序列化 ===")
# 基本数据类型
data = {
"string": "Hello World",
"number": 42,
"float": 3.14,
"boolean": True,
"null": None,
"list": [1, 2, 3],
"dict": {"key": "value"}
}
# 序列化到字符串
json_str = json.dumps(data, indent=2, ensure_ascii=False)
print("JSON字符串:")
print(json_str)
# 反序列化
parsed_data = json.loads(json_str)
print("\n反序列化数据:")
print(parsed_data)
# 文件操作
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
with open('data.json', 'r', encoding='utf-8') as f:
loaded_data = json.load(f)
print("\n从文件加载的数据:")
print(loaded_data)
# 复杂对象序列化
complex_data = {
"timestamp": datetime.datetime.now(),
"date": datetime.date.today(),
"price": Decimal("19.99"),
"status": Status.ACTIVE,
"tags": {"python", "json", "serialization"}
}
print("\n复杂对象序列化:")
complex_json = json.dumps(complex_data, cls=CustomEncoder, indent=2)
print(complex_json)
# JSONPath查询(简单实现)
def json_path(data, path):
"""简单的JSONPath查询"""
keys = path.split('.')
current = data
for key in keys:
if isinstance(current, dict) and key in current:
current = current[key]
else:
return None
return current
sample_data = {
"user": {
"name": "Alice",
"age": 25,
"address": {
"city": "Beijing",
"zipcode": "100000"
}
}
}
print(f"\nJSONPath查询: user.address.city = {json_path(sample_data, 'user.address.city')}")
json_examples()
练习2:XML处理
import xml.etree.ElementTree as ET
from xml.dom import minidom
def xml_examples():
"""XML处理示例"""
print("\n=== XML处理 ===")
# 创建XML文档
root = ET.Element("catalog")
# 添加子元素
for i in range(3):
book = ET.SubElement(root, "book", id=str(i+1))
title = ET.SubElement(book, "title")
title.text = f"Python Book {i+1}"
author = ET.SubElement(book, "author")
author.text = f"Author {i+1}"
price = ET.SubElement(book, "price")
price.text = str(29.99 + i)
# 转换为字符串并美化输出
rough_string = ET.tostring(root, 'utf-8')
reparsed = minidom.parseString(rough_string)
pretty_xml = reparsed.toprettyxml(indent=" ")
print("生成的XML:")
print(pretty_xml)
# 保存到文件
with open('catalog.xml', 'w', encoding='utf-8') as f:
f.write(pretty_xml)
# 解析XML文件
print("\n解析XML文件:")
tree = ET.parse('catalog.xml')
root = tree.getroot()
# 遍历XML元素
for book in root.findall('book'):
book_id = book.get('id')
title = book.find('title').text
author = book.find('author').text
price = book.find('price').text
print(f"书ID: {book_id}, 标题: {title}, 作者: {author}, 价格: {price}")
# XPath查询
print("\nXPath查询:")
expensive_books = root.findall(".//book[price>30]")
for book in expensive_books:
title = book.find('title').text
price = book.find('price').text
print(f"昂贵书籍: {title}, 价格: {price}")
xml_examples()
练习3:Pickle序列化
import pickle
import datetime
class User:
"""用户类"""
def __init__(self, name, email, created_at=None):
self.name = name
self.email = email
self.created_at = created_at or datetime.datetime.now()
self._password = None # 注意:敏感数据不应序列化
def set_password(self, password):
self._password = password
def __repr__(self):
return f"User(name='{self.name}', email='{self.email}', created_at='{self.created_at}')"
def pickle_examples():
"""Pickle序列化示例"""
print("\n=== Pickle序列化 ===")
# 创建复杂对象
users = [
User("Alice", "alice@example.com"),
User("Bob", "bob@example.com"),
User("Charlie", "charlie@example.com")
]
# 设置密码(注意安全风险)
users[0].set_password("secret123")
# 序列化到字节
print("序列化对象:")
pickle_data = pickle.dumps(users)
print(f"Pickle数据长度: {len(pickle_data)} 字节")
# 反序列化
loaded_users = pickle.loads(pickle_data)
print("\n反序列化对象:")
for user in loaded_users:
print(user)
# 文件操作
with open('users.pkl', 'wb') as f: # 注意是二进制模式
pickle.dump(users, f)
with open('users.pkl', 'rb') as f:
file_users = pickle.load(f)
print("\n从文件加载的用户:")
for user in file_users:
print(user)
# 安全警告
print("\n安全提示: Pickle可以执行任意代码,不要反序列化不受信任的数据!")
pickle_examples()
练习4:CSV文件处理
import csv
import json
def csv_examples():
"""CSV文件处理示例"""
print("\n=== CSV文件处理 ===")
# 准备数据
users = [
{"name": "Alice", "age": 25, "city": "Beijing", "email": "alice@example.com"},
{"name": "Bob", "age": 30, "city": "Shanghai", "email": "bob@example.com"},
{"name": "Charlie", "age": 35, "city": "Guangzhou", "email": "charlie@example.com"}
]
# 写入CSV文件
with open('users.csv', 'w', encoding='utf-8', newline='') as f:
fieldnames = ['name', 'age', 'city', 'email']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(users)
print("CSV文件写入完成")
# 读取CSV文件
print("\n读取CSV文件:")
with open('users.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader, 1):
print(f"行 {i}: {dict(row)}")
# 使用不同的分隔符
print("\n使用分号分隔符:")
with open('users_semicolon.csv', 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f, delimiter=';')
writer.writerow(['name', 'age', 'city'])
writer.writerow(['Alice', 25, 'Beijing'])
writer.writerow(['Bob', 30, 'Shanghai'])
# 读取分号分隔的文件
with open('users_semicolon.csv', 'r', encoding='utf-8') as f:
reader = csv.reader(f, delimiter=';')
for row in reader:
print(row)
# CSV与JSON转换
print("\nCSV转JSON:")
csv_to_json('users.csv', 'users.json')
def csv_to_json(csv_file, json_file):
"""将CSV文件转换为JSON文件"""
data = []
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
data.append(row)
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"已转换 {len(data)} 行数据到 {json_file}")
csv_examples()
练习5:SQLite数据库
import sqlite3
import json
from datetime import datetime
def sqlite_examples():
"""SQLite数据库示例"""
print("\n=== SQLite数据库 ===")
# 连接数据库(如果不存在会创建)
conn = sqlite3.connect('example.db')
# 创建游标
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
age INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
product_name TEXT NOT NULL,
price REAL NOT NULL,
quantity INTEGER DEFAULT 1,
order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# 插入数据
users_data = [
('Alice', 'alice@example.com', 25),
('Bob', 'bob@example.com', 30),
('Charlie', 'charlie@example.com', 35)
]
cursor.executemany(
'INSERT OR IGNORE INTO users (name, email, age) VALUES (?, ?, ?)',
users_data
)
# 插入订单数据
orders_data = [
(1, 'Python Book', 29.99, 2),
(1, 'Mouse', 15.50, 1),
(2, 'Keyboard', 45.00, 1),
(3, 'Monitor', 199.99, 1)
]
cursor.executemany(
'INSERT INTO orders (user_id, product_name, price, quantity) VALUES (?, ?, ?, ?)',
orders_data
)
# 提交事务
conn.commit()
# 查询数据
print("所有用户:")
cursor.execute('SELECT * FROM users')
for row in cursor.fetchall():
print(row)
print("\n用户及其订单:")
cursor.execute('''
SELECT u.name, u.email, o.product_name, o.price, o.quantity
FROM users u
JOIN orders o ON u.id = o.user_id
ORDER BY u.name
''')
for row in cursor.fetchall():
print(row)
# 参数化查询
print("\n查询年龄大于30的用户:")
cursor.execute('SELECT * FROM users WHERE age > ?', (30,))
for row in cursor.fetchall():
print(row)
# 使用字典式游标
print("\n使用字典式游标:")
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('SELECT * FROM users')
for row in cursor.fetchall():
print(dict(row))
# 数据库导出为JSON
export_to_json(conn, 'users_export.json')
# 关闭连接
conn.close()
def export_to_json(conn, filename):
"""将数据库表导出为JSON"""
cursor = conn.cursor()
# 获取所有表名
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
data = {}
for table in tables:
cursor.execute(f'SELECT * FROM {table}')
rows = cursor.fetchall()
# 获取列名
col_names = [description[0] for description in cursor.description]
# 转换为字典列表
table_data = []
for row in rows:
table_data.append(dict(zip(col_names, row)))
data[table] = table_data
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False, default=str)
print(f"\n数据库已导出到 {filename}")
sqlite_examples()
练习6:SQLAlchemy ORM
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime
# 创建基类
Base = declarative_base()
class User(Base):
"""用户模型"""
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50), nullable=False)
email = Column(String(100), unique=True, nullable=False)
age = Column(Integer)
created_at = Column(DateTime, default=datetime.now)
# 关系
orders = relationship("Order", back_populates="user")
def __repr__(self):
return f"<User(name='{self.name}', email='{self.email}')>"
class Order(Base):
"""订单模型"""
__tablename__ = 'orders'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'))
product_name = Column(String(100), nullable=False)
price = Column(Float, nullable=False)
quantity = Column(Integer, default=1)
order_date = Column(DateTime, default=datetime.now)
# 关系
user = relationship("User", back_populates="orders")
def __repr__(self):
return f"<Order(product='{self.product_name}', price={self.price})>"
def sqlalchemy_examples():
"""SQLAlchemy ORM示例"""
print("\n=== SQLAlchemy ORM ===")
# 创建引擎
engine = create_engine('sqlite:///orm_example.db', echo=False)
# 创建表
Base.metadata.create_all(engine)
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 创建数据
users = [
User(name="Alice", email="alice@example.com", age=25),
User(name="Bob", email="bob@example.com", age=30),
User(name="Charlie", email="charlie@example.com", age=35)
]
# 添加到会话
session.add_all(users)
session.commit()
# 创建订单
orders = [
Order(user=users[0], product_name="Python Book", price=29.99, quantity=2),
Order(user=users[0], product_name="Mouse", price=15.50),
Order(user=users[1], product_name="Keyboard", price=45.00),
Order(user=users[2], product_name="Monitor", price=199.99)
]
session.add_all(orders)
session.commit()
# 查询数据
print("所有用户:")
all_users = session.query(User).all()
for user in all_users:
print(user)
print("\n查询年龄大于28的用户:")
older_users = session.query(User).filter(User.age > 28).all()
for user in older_users:
print(user)
print("\n用户及其订单:")
users_with_orders = session.query(User).join(Order).all()
for user in users_with_orders:
print(f"{user.name} 的订单:")
for order in user.orders:
print(f" - {order.product_name}: ${order.price} x {order.quantity}")
# 更新数据
print("\n更新用户年龄:")
user = session.query(User).filter_by(name="Alice").first()
if user:
user.age = 26
session.commit()
print(f"更新后: {user}")
# 删除数据
print("\n删除用户:")
user_to_delete = session.query(User).filter_by(name="Charlie").first()
if user_to_delete:
session.delete(user_to_delete)
session.commit()
print("用户已删除")
# 复杂查询
print("\n统计每个用户的订单总金额:")
from sqlalchemy import func
result = session.query(
User.name,
func.sum(Order.price * Order.quantity).label('total_spent')
).join(Order).group_by(User.id).all()
for name, total in result:
print(f"{name}: ${total:.2f}")
# 关闭会话
session.close()
sqlalchemy_examples()
练习7:数据压缩
import gzip
import bz2
import lzma
import json
def compression_examples():
"""数据压缩示例"""
print("\n=== 数据压缩 ===")
# 准备测试数据
data = {
"users": [
{"id": i, "name": f"User_{i}", "email": f"user_{i}@example.com"}
for i in range(1000)
]
}
json_data = json.dumps(data)
original_size = len(json_data.encode('utf-8'))
print(f"原始数据大小: {original_size} 字节")
# Gzip压缩
gzip_data = gzip.compress(json_data.encode('utf-8'))
gzip_size = len(gzip_data)
gzip_ratio = (1 - gzip_size / original_size) * 100
print(f"Gzip压缩后: {gzip_size} 字节, 压缩率: {gzip_ratio:.1f}%")
# 保存gzip文件
with gzip.open('data.json.gz', 'wb') as f:
f.write(json_data.encode('utf-8'))
# 读取gzip文件
with gzip.open('data.json.gz', 'rb') as f:
decompressed_data = f.read().decode('utf-8')
print(f"Gzip解压缩验证: {len(decompressed_data) == original_size}")
# Bzip2压缩
bzip2_data = bz2.compress(json_data.encode('utf-8'))
bzip2_size = len(bzip2_data)
bzip2_ratio = (1 - bzip2_size / original_size) * 100
print(f"Bzip2压缩后: {bzip2_size} 字节, 压缩率: {bzip2_ratio:.1f}%")
# LZMA压缩 (xz)
lzma_data = lzma.compress(json_data.encode('utf-8'))
lzma_size = len(lzma_data)
lzma_ratio = (1 - lzma_size / original_size) * 100
print(f"LZMA压缩后: {lzma_size} 字节, 压缩率: {lzma_ratio:.1f}%")
# 比较压缩效果
print(f"\n压缩效果比较:")
print(f"Gzip: {gzip_size:6d} 字节 ({gzip_ratio:5.1f}% 压缩率)")
print(f"Bzip2: {bzip2_size:6d} 字节 ({bzip2_ratio:5.1f}% 压缩率)")
print(f"LZMA: {lzma_size:6d} 字节 ({lzma_ratio:5.1f}% 压缩率)")
compression_examples()
练习8:配置文件处理
import configparser
import toml
import yaml
def config_examples():
"""配置文件处理示例"""
print("\n=== 配置文件处理 ===")
# INI格式 (configparser)
print("INI格式配置:")
config = configparser.ConfigParser()
config['Database'] = {
'host': 'localhost',
'port': '5432',
'username': 'admin',
'password': 'secret',
'database': 'mydb'
}
config['App'] = {
'debug': 'true',
'log_level': 'INFO',
'max_workers': '4'
}
with open('config.ini', 'w', encoding='utf-8') as f:
config.write(f)
# 读取INI配置
config_read = configparser.ConfigParser()
config_read.read('config.ini', encoding='utf-8')
db_host = config_read['Database']['host']
app_debug = config_read.getboolean('App', 'debug')
print(f"数据库主机: {db_host}, 调试模式: {app_debug}")
# TOML格式
print("\nTOML格式配置:")
toml_config = {
'database': {
'host': 'localhost',
'port': 5432,
'credentials': {
'username': 'admin',
'password': 'secret'
}
},
'app': {
'debug': True,
'log_level': 'INFO',
'features': ['auth', 'api', 'logging']
}
}
with open('config.toml', 'w', encoding='utf-8') as f:
toml.dump(toml_config, f)
# 读取TOML配置
with open('config.toml', 'r', encoding='utf-8') as f:
loaded_toml = toml.load(f)
print(f"TOML配置: {loaded_toml['database']['host']}")
# YAML格式
print("\nYAML格式配置:")
yaml_config = {
'server': {
'host': '0.0.0.0',
'port': 8000,
'ssl': {
'enabled': True,
'certfile': '/path/to/cert.pem'
}
},
'database': {
'url': 'postgresql://user:pass@localhost/db',
'pool_size': 5,
'echo': False
},
'features': ['rest', 'graphql', 'websocket']
}
with open('config.yaml', 'w', encoding='utf-8') as f:
yaml.dump(yaml_config, f, default_flow_style=False, allow_unicode=True)
# 读取YAML配置
with open('config.yaml', 'r', encoding='utf-8') as f:
loaded_yaml = yaml.safe_load(f)
print(f"YAML配置: {loaded_yaml['server']['port']}")
# 配置转换
print("\n配置格式转换:")
convert_config_to_json('config.ini', 'config_from_ini.json')
convert_config_to_json('config.toml', 'config_from_toml.json')
convert_config_to_json('config.yaml', 'config_from_yaml.json')
def convert_config_to_json(source_file, target_file):
"""将各种配置格式转换为JSON"""
if source_file.endswith('.ini'):
config = configparser.ConfigParser()
config.read(source_file, encoding='utf-8')
data = {s: dict(config.items(s)) for s in config.sections()}
elif source_file.endswith('.toml'):
with open(source_file, 'r', encoding='utf-8') as f:
data = toml.load(f)
elif source_file.endswith(('.yaml', '.yml')):
with open(source_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
else:
return
with open(target_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"已转换 {source_file} -> {target_file}")
config_examples()
今日挑战:
创建一个完整的数据导入导出工具,支持多种格式的转换和存储。
# 挑战练习:数据导入导出工具
import pandas as pd
import sqlite3
import json
import xml.etree.ElementTree as ET
from pathlib import Path
class DataTransformer:
"""数据转换工具"""
def __init__(self):
self.supported_formats = ['json', 'csv', 'xml', 'sqlite', 'excel']
def detect_format(self, file_path):
"""检测文件格式"""
path = Path(file_path)
ext = path.suffix.lower()[1:] # 去掉点号
return ext if ext in self.supported_formats else None
def read_data(self, file_path, format_type=None):
"""读取数据"""
if format_type is None:
format_type = self.detect_format(file_path)
if format_type == 'json':
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
elif format_type == 'csv':
return pd.read_csv(file_path).to_dict('records')
elif format_type == 'xml':
return self._read_xml(file_path)
elif format_type == 'sqlite':
return self._read_sqlite(file_path)
elif format_type in ['xlsx', 'xls']:
return pd.read_excel(file_path).to_dict('records')
else:
raise ValueError(f"不支持的格式: {format_type}")
def write_data(self, data, file_path, format_type):
"""写入数据"""
if format_type == 'json':
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
elif format_type == 'csv':
df = pd.DataFrame(data)
df.to_csv(file_path, index=False, encoding='utf-8')
elif format_type == 'xml':
self._write_xml(data, file_path)
elif format_type == 'sqlite':
self._write_sqlite(data, file_path)
elif format_type == 'excel':
df = pd.DataFrame(data)
df.to_excel(file_path, index=False)
else:
raise ValueError(f"不支持的格式: {format_type}")
def convert(self, source_path, target_path, target_format=None):
"""转换数据格式"""
if target_format is None:
target_format = self.detect_format(target_path)
source_format = self.detect_format(source_path)
print(f"转换: {source_path} ({source_format}) -> {target_path} ({target_format})")
# 读取源数据
data = self.read_data(source_path, source_format)
# 写入目标格式
self.write_data(data, target_path, target_format)
print(f"转换完成! 共处理 {len(data)} 条记录")
def _read_xml(self, file_path):
"""读取XML文件"""
tree = ET.parse(file_path)
root = tree.getroot()
data = []
for item in root:
item_data = {}
for child in item:
item_data[child.tag] = child.text
data.append(item_data)
return data
def _write_xml(self, data, file_path):
"""写入XML文件"""
root = ET.Element('data')
for item in data:
item_elem = ET.SubElement(root, 'item')
for key, value in item.items():
child = ET.SubElement(item_elem, key)
child.text = str(value)
tree = ET.ElementTree(root)
tree.write(file_path, encoding='utf-8', xml_declaration=True)
def _read_sqlite(self, file_path, table_name='data'):
"""读取SQLite数据库"""
conn = sqlite3.connect(file_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 获取所有表名
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
if table_name not in tables:
return []
cursor.execute(f'SELECT * FROM {table_name}')
rows = cursor.fetchall()
data = [dict(row) for row in rows]
conn.close()
return data
def _write_sqlite(self, data, file_path, table_name='data'):
"""写入SQLite数据库"""
conn = sqlite3.connect(file_path)
cursor = conn.cursor()
# 删除已存在的表
cursor.execute(f'DROP TABLE IF EXISTS {table_name}')
# 创建新表
if data:
# 从第一条数据获取列名
columns = list(data[0].keys())
col_defs = ', '.join([f'{col} TEXT' for col in columns])
cursor.execute(f'CREATE TABLE {table_name} (id INTEGER PRIMARY KEY, {col_defs})')
# 插入数据
for item in data:
cols = ', '.join(item.keys())
placeholders = ', '.join(['?' for _ in item])
values = [str(v) for v in item.values()]
cursor.execute(
f'INSERT INTO {table_name} ({cols}) VALUES ({placeholders})',
values
)
conn.commit()
conn.close()
def demo_data_transformer():
"""演示数据转换工具"""
print("=== 数据转换工具演示 ===")
transformer = DataTransformer()
# 创建测试数据
test_data = [
{'name': 'Alice', 'age': 25, 'city': 'Beijing', 'email': 'alice@example.com'},
{'name': 'Bob', 'age': 30, 'city': 'Shanghai', 'email': 'bob@example.com'},
{'name': 'Charlie', 'age': 35, 'city': 'Guangzhou', 'email': 'charlie@example.com'}
]
# 先写入JSON格式
with open('test_data.json', 'w', encoding='utf-8') as f:
json.dump(test_data, f, indent=2, ensure_ascii=False)
print("创建测试数据完成")
# 执行各种转换
conversions = [
('test_data.json', 'test_data.csv', 'csv'),
('test_data.json', 'test_data.xml', 'xml'),
('test_data.json', 'test_data.db', 'sqlite'),
('test_data.json', 'test_data.xlsx', 'excel'),
('test_data.csv', 'converted_from_csv.json', 'json'),
('test_data.xml', 'converted_from_xml.json', 'json')
]
for source, target, target_format in conversions:
try:
transformer.convert(source, target, target_format)
except Exception as e:
print(f"转换失败 {source} -> {target}: {e}")
# 验证转换结果
print("\n验证转换结果:")
for file in ['test_data.csv', 'test_data.xml', 'converted_from_csv.json']:
if Path(file).exists():
data = transformer.read_data(file)
print(f"{file}: {len(data)} 条记录")
# 清理文件
print("\n清理测试文件...")
for file in [
'test_data.json', 'test_data.csv', 'test_data.xml',
'test_data.db', 'test_data.xlsx', 'converted_from_csv.json',
'converted_from_xml.json', 'config.ini', 'config.toml',
'config.yaml', 'data.json', 'catalog.xml', 'users.pkl',
'users.csv', 'users_semicolon.csv', 'users.json'
]:
if Path(file).exists():
Path(file).unlink()
print("演示完成!")
# 运行演示
demo_data_transformer()
学习提示:
- JSON:最常用的数据交换格式,适合Web应用
- XML:结构化的标记语言,适合复杂数据结构
- Pickle:Python专用的二进制序列化,但存在安全风险
- CSV:简单的表格数据格式,适合Excel集成
- SQLite:轻量级文件数据库,适合本地存储
- SQLAlchemy:Python ORM工具,简化数据库操作
- 数据压缩:根据需求选择合适的压缩算法
- 配置文件:根据团队习惯选择INI、TOML或YAML
明天我们将学习Web开发基础!坚持练习,你的数据处理能力会越来越强
更多相关网站
- Python目录规范:呐,这个就叫专业!
- 在线文档预览kkFileView部署及使用指南
- 逐步分解,一文教会你如何用 jenkins+docker 实现主从模式
- 每天一个 Python 库:logging 用法精讲,高效简洁的输出日志
- 开源:NginxWebUI一款图形化管理Nginx配置的工具
- 如何使用Java API操作HDFS系统?_hdfs java api的常见环境准备?
- Gitlab+P3C-PMD(阿里云插件)标准化你团队的代码和提交信息
- 使用 Docker 部署 最新版本Apache Doris3.0:踩坑与解决指南
- NginxWebUI - 图形化的 Nginx 配置管理工具
- Linux服务器终端中文乱码解决_linux 终端显示中文
- 《Servlet》第05节:创建第一个Servlet程序(HelloSevlet)
- java项目相关知识点整理_java工程项目
- java高级用法之:无所不能的java,本地方法调用实况
- 还用swagger 吗!推荐一种好用的接口文档自动管理方案
- 小技巧!两分钟解决IntelliJ IDEA中文乱码问题
- JavaEE高级开发:Tomcat7优化配置_tomcat9调优
- Java Java命令学习系列(一)——Jps
- Maven快速入门(二)手动创建maven项目hellomaven
- 最近发表
- 标签列表
-
- mydisktest_v298 (35)
- sql 日期比较 (33)
- document.appendchild (35)
- 头像打包下载 (35)
- 梦幻诛仙表情包 (36)
- java面试宝典2019pdf (26)
- disk++ (30)
- 加密与解密第四版pdf (29)
- iteye (26)
- centos7.4下载 (32)
- intouch2014r2sp1永久授权 (33)
- jdk1.8.0_191下载 (27)
- axure9注册码 (30)
- 兔兔工程量计算软件下载 (27)
- ccproxy破解版 (31)
- aida64模板 (28)
- engine=innodb (33)
- shiro jwt (28)
- segoe ui是什么字体 (27)
- head first java电子版 (32)
- clickhouse中文文档 (28)
- jdk-8u181-linux-x64.tar.gz (32)
- 计算机网络自顶向下pdf (34)
- -dfile.encoding=utf-8 (33)
- jdk1.9下载 (32)