1. 文件和目录操作
创建目录:
import os
os.makedirs('new_directory', exist_ok=True)
删除目录:
import shutil
shutil.rmtree('directory_to_delete')
列出目录内容:
import os
for item in os.listdir('path/to/directory'):
print(item)
复制文件:
import shutil
shutil.copy('source_file.txt', 'destination_file.txt')
移动文件:
import shutil
shutil.move('source_file.txt', 'destination_directory/')
重命名文件:
import os
os.rename('old_name.txt', 'new_name.txt')
读取文件内容:
with open('file.txt', 'r') as f:
content = f.read()
print(content)
写入文件内容:
with open('file.txt', 'w') as f:
f.write('Hello, World!')
追加文件内容:
with open('file.txt', 'a') as f:
f.write('\nAppending new line.')
检查文件是否存在:
import os
if os.path.exists('file.txt'):
print('File exists.')
获取文件大小:
import os
file_size = os.path.getsize('file.txt')
print(f'File size: {file_size} bytes')
获取文件修改时间:
import os
mod_time = os.path.getmtime('file.txt')
print(f'Modified time: {mod_time}')
递归遍历目录:
import os
for root, dirs, files in os.walk('path/to/directory'):
for name in files:
print(os.path.join(root, name))
查找文件:
import os
for root, dirs, files in os.walk('path/to/directory'):
if 'target_file.txt' in files:
print(os.path.join(root, 'target_file.txt'))
压缩文件:
import zipfile
with zipfile.ZipFile('archive.zip', 'w') as zipf:
zipf.write('file.txt')
解压缩文件:
import zipfile
with zipfile.ZipFile('archive.zip', 'r') as zipf:
zipf.extractall('extracted_directory')
读取CSV文件:
import csv
with open('data.csv', 'r') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
print(row)
写入CSV文件:
import csv
with open('data.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Name', 'Age'])
writer.writerow(['Alice', 30])
writer.writerow(['Bob', 25])
读取JSON文件:
import json
with open('data.json', 'r') as f:
data = json.load(f)
print(data)
写入JSON文件:
import json
data = {'name': 'Alice', 'age': 30}
with open('data.json', 'w') as f:
json.dump(data, f)
2. 网络请求
发送GET请求:
import requests
response = requests.get('https://api.example.com/data')
print(response.text)
发送POST请求:
import requests
payload = {'key': 'value'}
response = requests.post('https://api.example.com/submit', data=payload)
print(response.text)
发送带有头部的请求:
import requests
headers = {'Content-Type': 'application/json'}
response = requests.get('https://api.example.com/data', headers=headers)
print(response.text)
下载文件:
import requests
url = 'https://example.com/file.zip'
response = requests.get(url)
with open('downloaded_file.zip', 'wb') as f:
f.write(response.content)
上传文件:
import requests
files = {'file': open('file.txt', 'rb')}
response = requests.post('https://api.example.com/upload', files=files)
print(response.text)
处理JSON响应:
import requests
response = requests.get('https://api.example.com/data')
data = response.json()
print(data)
处理XML响应:
import requests
import xml.etree.ElementTree as ET
response = requests.get('https://api.example.com/data.xml')
root = ET.fromstring(response.text)
for child in root:
print(child.tag, child.attrib)
设置超时:
import requests
response = requests.get('https://api.example.com/data', timeout=5)
print(response.text)
处理HTTP错误:
import requests
try:
response = requests.get('https://api.example.com/data')
response.raise_for_status()
except requests.exceptions.HTTPError as errh:
print("Http Error:", errh)
except requests.exceptions.ConnectionError as errc:
print("Error Connecting:", errc)
except requests.exceptions.Timeout as errt:
print("Timeout Error:", errt)
except requests.exceptions.RequestException as err:
print("OOps: Something Else", err)
使用会话:
import requests
session = requests.Session()
session.auth = ('username', 'password')
response = session.get('https://api.example.com/data')
print(response.text)
3. 数据处理
字符串分割:
text = "apple,banana,orange"
fruits = text.split(',')
print(fruits)
字符串连接:
fruits = ['apple', 'banana', 'orange']
text = ','.join(fruits)
print(text)
字符串替换:
text = "Hello, World!"
new_text = text.replace('World', 'Python')
print(new_text)
字符串格式化:
name = "Alice"
age = 30
message = f"Name: {name}, Age: {age}"
print(message)
列表排序:
numbers = [3, 1, 4, 1, 5, 9]
sorted_numbers = sorted(numbers)
print(sorted_numbers)
字典排序:
from collections import OrderedDict
data = {'b': 2, 'a': 1, 'c': 3}
ordered_data = OrderedDict(sorted(data.items()))
print(ordered_data)
过滤列表:
numbers = [3, 1, 4, 1, 5, 9]
even_numbers = [num for num in numbers if num % 2 == 0]
print(even_numbers)
统计词频:
from collections import Counter
text = "hello world hello"
word_counts = Counter(text.split())
print(word_counts)
生成随机数:
import random
random_number = random.randint(1, 100)
print(random_number)
生成随机字符串:
import random
import string
random_string = ''.join(random.choices(string.ascii_letters + string.digits, k=10))
print(random_string)
日期和时间处理:
from datetime import datetime
now = datetime.now()
formatted_now = now.strftime("%Y-%m-%d %H:%M:%S")
print(formatted_now)
解析日期字符串:
from datetime import datetime
date_string = "2023-10-05 15:30:00"
parsed_date = datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
print(parsed_date)
计算两个日期之间的差值:
from datetime import datetime
date1 = datetime(2023, 10, 5)
date2 = datetime(2023, 10, 10)
delta = date2 - date1
print(delta.days)
生成日期范围:
from datetime import datetime, timedelta
start_date = datetime(2023, 10, 5)
end_date = datetime(2023, 10, 10)
date_range = [start_date + timedelta(days=x) for x in range((end_date - start_date).days + 1)]
for date in date_range:
print(date.strftime("%Y-%m-%d"))
计算年龄:
from datetime import datetime
birthdate = datetime(1990, 1, 1)
today = datetime.today()
age = today.year - birthdate.year - ((today.month, today.day) < (birthdate.month, birthdate.day))
print(age)
4. 系统管理
执行系统命令:
import subprocess
result = subprocess.run(['ls', '-l'], capture_output=True, text=True)
print(result.stdout)
这段代码使用 subprocess
模块执行 ls -l
命令并打印输出结果。
获取当前工作目录:
import os
current_dir = os.getcwd()
print(current_dir)
此代码片段使用 os
模块获取并打印当前工作目录的路径。
更改当前工作目录:
import os
os.chdir('/path/to/new/directory')
这段代码更改当前工作目录到指定的路径。
获取环境变量:
import os
value = os.getenv('PATH')
print(value)
此代码片段获取名为 PATH
的环境变量的值并打印。
设置环境变量:
import os
os.environ['MY_VARIABLE'] = 'my_value'
这段代码设置一个名为 MY_VARIABLE
的环境变量,并赋予它值 my_value
。
获取系统信息:
import platform
system_info = platform.uname()
print(system_info)
此代码片段使用 platform
模块获取系统相关信息并打印。
获取CPU信息:
import psutil
cpu_percent = psutil.cpu_percent(interval=1)
print(cpu_percent)
这段代码使用 psutil
模块测量CPU使用率,并打印结果。
获取内存信息:
import psutil
memory_info = psutil.virtual_memory()
print(memory_info)
此代码片段获取虚拟内存信息并打印。
获取磁盘使用情况:
import psutil
disk_usage = psutil.disk_usage('/')
print(disk_usage)
这段代码获取根目录的磁盘使用情况并打印。
获取网络接口信息:
import psutil
net_if_addrs = psutil.net_if_addrs()
for interface, addrs in net_if_addrs.items():
print(f"Interface: {interface}")
for addr in addrs:
print(f" {addr.family.name}: {addr.address}")
此代码片段获取并打印所有网络接口的详细信息。
获取进程列表:
import psutil
for proc in psutil.process_iter(['pid', 'name']):
print(proc.info)
这段代码遍历所有进程并打印它们的PID和名称。
杀死进程:
import psutil
for proc in psutil.process_iter(['pid', 'name']):
if proc.info['name'] == 'process_name':
proc.kill()
此代码片段查找名为 process_name
的进程并将其终止。
获取用户信息:
import pwd
user_info = pwd.getpwnam('username')
print(user_info)
这段代码使用 pwd
模块根据用户名获取用户信息并打印。
获取组信息:
import grp
group_info = grp.getgrnam('groupname')
print(group_info)
此代码片段根据组名获取组信息并打印。
获取系统日志:
import logging
logging.basicConfig(filename='app.log', level=logging.INFO)
logging.info('This is an info message')
这段代码配置日志系统,并将一条信息级别的消息写入到 app.log
文件中。
5. 文本处理
读取大文件:
with open('large_file.txt', 'r') as f:
for line in f:
print(line.strip())
这段代码逐行读取大文件并打印每行的内容。
逐行写入大文件:
with open('large_file.txt', 'w') as f:
for i in range(1000000):
f.write(f'Line {i}\n')
这段代码逐行写入100万行数据到文件中。
查找并替换文本:
with open('file.txt', 'r') as f:
content = f.read()
new_content = content.replace('old_text', 'new_text')
with open('file.txt', 'w') as f:
f.write(new_content)
这段代码读取文件内容,替换指定文本,然后写回文件。
读取并处理CSV文件:
import csv
with open('data.csv', 'r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
print(row['Name'], row['Age'])
这段代码读取CSV文件,并打印每行的姓名和年龄。
写入CSV文件:
import csv
with open('data.csv', 'w', newline='') as csvfile:
fieldnames = ['Name', 'Age']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerow({'Name': 'Alice', 'Age': 30})
writer.writerow({'Name': 'Bob', 'Age': 25})
这段代码创建一个CSV文件并写入两行数据。
读取并处理JSON文件:
import json
with open('data.json', 'r') as f:
data = json.load(f)
for item in data:
print(item['name'], item['age'])
这段代码读取JSON文件并打印每个条目的姓名和年龄。
写入JSON文件:
import json
data = [{'name': 'Alice', 'age': 30}, {'name': 'Bob', 'age': 25}]
with open('data.json', 'w') as f:
json.dump(data, f, indent=4)
这段代码将数据写入JSON文件,并格式化输出。
读取并处理XML文件:
import xml.etree.ElementTree as ET
tree = ET.parse('data.xml')
root = tree.getroot()
for child in root:
print(child.tag, child.attrib)
这段代码读取XML文件并打印每个元素的标签和属性。
写入XML文件:
import xml.etree.ElementTree as ET
root = ET.Element("root")
doc = ET.SubElement(root, "doc")
ET.SubElement(doc, "field1", name="name").text = "some value1"
ET.SubElement(doc, "field2", name="name").text = "some value2"
tree = ET.ElementTree(root)
tree.write("output.xml")
这段代码创建一个XML文件并写入数据。
正则表达式匹配:
import re
text = "The quick brown fox jumps over the lazy dog"
pattern = r'\b\w{5}\b'
matches = re.findall(pattern, text)
print(matches)
这段代码使用正则表达式找出所有5个字符长度的单词。
正则表达式替换:
import re
text = "The quick brown fox jumps over the lazy dog"
new_text = re.sub(r'\b\w{5}\b', '*****', text)
print(new_text)
这段代码使用正则表达式替换5个字符长度的单词为星号。
提取URL:
import re
text = "Visit http://example.com and https://another-example.com"
urls = re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', text)
print(urls)
这段代码使用正则表达式提取文本中的URL。
提取电子邮件地址:
import re
text = "Contact us at support@example.com or feedback@example.org"
emails = re.findall(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', text)
print(emails)
这段代码使用正则表达式提取电子邮件地址。
提取电话号码:
import re
text = "Call us at 123-456-7890 or 987-654-3210"
phone_numbers = re.findall(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', text)
print(phone_numbers)
这段代码使用正则表达式提取电话号码。
提取日期:
import re
text = "Meeting on 2023-10-05 and 2023-10-10"
dates = re.findall(r'\b\d{4}-\d{2}-\d{2}\b', text)
print(dates)
这段代码使用正则表达式提取日期。
6. 数学和科学计算
基本数学运算:
import math
print(math.sqrt(16)) # 平方根
print(math.pow(2, 3)) # 幂运算
print(math.sin(math.pi / 2)) # 正弦函数
这段代码演示了如何使用 Python 的 math
模块进行基本的数学运算。
矩阵运算:
import numpy as np
a = np.array([[1, 2], [3, 4]])
b = np.array([[5, 6], [7, 8]])
c = np.dot(a, b) # 矩阵乘法
print(c)
这段代码使用 numpy
库进行矩阵乘法运算。
统计分析:
import numpy as np
data = np.array([1, 2, 3, 4, 5])
mean = np.mean(data)
median = np.median(data)
std = np.std(data)
print(f"Mean: {mean}, Median: {median}, Standard Deviation: {std}")
这段代码计算了一组数据的平均值、中位数和标准差。
线性回归:
import numpy as np
from sklearn.linear_model import LinearRegression
X = np.array([[1], [2], [3], [4], [5]])
y = np.array([2, 4, 6, 8, 10])
model = LinearRegression()
model.fit(X, y)
print(model.coef_, model.intercept_)
这段代码使用 sklearn
库进行简单的线性回归分析。
聚类分析:
from sklearn.cluster import KMeans
import numpy as np
X = np.array([[1, 2], [1, 4], [1, 0],
[10, 2], [10, 4], [10, 0]])
kmeans = KMeans(n_clusters=2, random_state=0).fit(X)
print(kmeans.labels_)
这段代码使用 sklearn
库的 KMeans
算法进行聚类分析,并打印每个点的聚类标签。
7. Web开发
简单的Flask应用:
from flask import Flask
app = Flask(__name__)
@app.route('/')
def home():
return 'Hello, World!'
if __name__ == '__main__':
app.run(debug=True)
这个简单的Flask应用定义了一个路由,访问网站根目录时会返回“Hello, World!”。
Flask路由参数:
from flask import Flask
app = Flask(__name__)
@app.route('/user/<username>')
def show_user_profile(username):
return f'User {username}'
if __name__ == '__main__':
app.run(debug=True)
这段代码演示了如何在Flask中使用路由参数。
Flask表单提交:
from flask import Flask, request
app = Flask(__name__)
@app.route('/login', methods=['POST'])
def login():
username = request.form['username']
password = request.form['password']
return f'Username: {username}, Password: {password}'
if __name__ == '__main__':
app.run(debug=True)
这个Flask应用处理POST请求,并从表单中获取用户名和密码。
Flask模板渲染:
from flask import Flask, render_template
app = Flask(__name__)
@app.route('/')
def home():
return render_template('index.html')
if __name__ == '__main__':
app.run(debug=True)
这段代码展示了如何在Flask中渲染模板。
Flask静态文件:
from flask import Flask, send_from_directory
app = Flask(__name__)
@app.route('/static/<path:filename>')
def serve_static(filename):
return send_from_directory('static', filename)
if __name__ == '__main__':
app.run(debug=True)
这段代码演示了如何在Flask中提供静态文件。
Django简单项目:
django-admin startproject myproject
cd myproject
python manage.py runserver
这些命令用于创建一个新的Django项目并启动开发服务器。
Django模型定义:
from django.db import models
class Person(models.Model):
first_name = models.CharField(max_length=30)
last_name = models.CharField(max_length=30)
这段代码定义了一个Django模型,用于存储人名。
Django视图:
from django.shortcuts import render
from .models import Person
def person_list(request):
persons = Person.objects.all()
return render(request, 'person_list.html', {'persons': persons})
这个Django视图渲染一个模板,显示所有人员列表。
Django模板:
<!-- templates/person_list.html -->
<h1>Person List</h1>
<ul>
{% for person in persons %}
<li>{{ person.first_name }} {{ person.last_name }}</li>
{% endfor %}
</ul>
这是Django模板文件,用于显示人员列表。
Django表单:
from django import forms
class PersonForm(forms.ModelForm):
class Meta:
model = Person
fields = ['first_name', 'last_name']
这个Django表单基于Person
模型,允许用户输入名字和姓氏。
8. 数据库操作
SQLite数据库操作:
import sqlite3
conn = sqlite3.connect('example.db')
c = conn.cursor()
# 创建表
c.execute('''CREATE TABLE stocks
(date text, trans text, symbol text, qty real, price real)''')
# 插入数据
c.execute("INSERT INTO stocks VALUES ('2023-10-05','BUY','RHAT',100,35.14)")
# 提交事务
conn.commit()
# 查询数据
c.execute("SELECT * FROM stocks WHERE symbol = 'RHAT'")
print(c.fetchall())
# 关闭连接
conn.close()
MySQL数据库操作:
import mysql.connector
conn = mysql.connector.connect(
host='localhost',
user='yourusername',
password='yourpassword',
database='yourdatabase'
)
cursor = conn.cursor()
# 创建表
cursor.execute('''CREATE TABLE stocks
(date VARCHAR(255), trans VARCHAR(255), symbol VARCHAR(255), qty DOUBLE, price DOUBLE)''')
# 插入数据
cursor.execute("INSERT INTO stocks (date, trans, symbol, qty, price) VALUES (%s, %s, %s, %s, %s)",
('2023-10-05', 'BUY', 'RHAT', 100, 35.14))
# 提交事务
conn.commit()
# 查询数据
cursor.execute("SELECT * FROM stocks WHERE symbol = 'RHAT'")
for row in cursor.fetchall():
print(row)
# 关闭连接
cursor.close()
conn.close()
PostgreSQL数据库操作:
import psycopg2
conn = psycopg2.connect(
host='localhost',
database='yourdatabase',
user='yourusername',
password='yourpassword'
)
cursor = conn.cursor()
# 创建表
cursor.execute('''CREATE TABLE stocks
(date VARCHAR(255), trans VARCHAR(255), symbol VARCHAR(255), qty DOUBLE PRECISION, price DOUBLE PRECISION)''')
# 插入数据
cursor.execute("INSERT INTO stocks (date, trans, symbol, qty, price) VALUES (%s, %s, %s, %s, %s)",
('2023-10-05', 'BUY', 'RHAT', 100, 35.14))
# 提交事务
conn.commit()
# 查询数据
cursor.execute("SELECT * FROM stocks WHERE symbol = 'RHAT'")
for row in cursor.fetchall():
print(row)
# 关闭连接
cursor.close()
conn.close()
SQLAlchemy ORM操作:
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
engine = create_engine('sqlite:///example.db')
Base = declarative_base()
class Person(Base):
__tablename__ = 'persons'
id = Column(Integer, primary_key=True)
name = Column(String)
age = Column(Integer)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# 插入数据
new_person = Person(name='Alice', age=30)
session.add(new_person)
session.commit()
# 查询数据
persons = session.query(Person).filter_by(name='Alice').all()
for person in persons:
print(person.id, person.name, person.age)
# 关闭会话
session.close()
以上是四种不同数据库的操作示例,包括SQLite、MySQL、PostgreSQL以及使用SQLAlchemy ORM进行数据库操作。每个示例都包括了连接数据库、创建表、插入数据、查询数据和关闭连接的基本步骤。
9. 其他
定时任务:
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).seconds.do(job)
while True:
schedule.run_pending()
time.sleep(1)
这段代码使用 schedule
库来安排一个每10秒执行一次的任务。
多线程:
import threading
def print_numbers():
for i in range(10):
print(i)
def print_letters():
for letter in 'abcdef':
print(letter)
t1 = threading.Thread(target=print_numbers)
t2 = threading.Thread(target=print_letters)
t1.start()
t2.start()
t1.join()
t2.join()
这段代码创建两个线程,分别打印数字和字母。
多进程:
import multiprocessing
def print_numbers():
for i in range(10):
print(i)
def print_letters():
for letter in 'abcdef':
print(letter)
p1 = multiprocessing.Process(target=print_numbers)
p2 = multiprocessing.Process(target=print_letters)
p1.start()
p2.start()
p1.join()
p2.join()
这段代码创建两个进程,分别执行不同的函数。
异步编程:
import asyncio
async def print_numbers():
for i in range(10):
print(i)
await asyncio.sleep(1)
async def print_letters():
for letter in 'abcdef':
print(letter)
await asyncio.sleep(1)
async def main():
task1 = asyncio.create_task(print_numbers())
task2 = asyncio.create_task(print_letters())
await task1
await task2
asyncio.run(main())
这段代码使用 asyncio
库来执行两个异步任务。
日志记录:
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logging.debug('This is a debug message')
logging.info('This is an info message')
logging.warning('This is a warning message')
logging.error('This is an error message')
logging.critical('This is a critical message')
这段代码配置并使用 Python 的 logging
模块来记录不同级别的日志信息。
配置文件读取:
import configparser
config = configparser.ConfigParser()
config.read('config.ini')
db_host = config['DATABASE']['host']
db_port = config['DATABASE']['port']
db_user = config['DATABASE']['user']
db_password = config['DATABASE']['password']
print(f'DB Host: {db_host}, Port: {db_port}, User: {db_user}, Password: {db_password}')
这段代码使用 configparser
模块读取配置文件,并提取数据库连接信息。
评论区