目 录CONTENT

文章目录

python实用脚本

Administrator
2024-10-31 / 1 评论 / 0 点赞 / 37 阅读 / 0 字

1. 文件和目录操作

创建目录:

import os
os.makedirs('new_directory', exist_ok=True)

删除目录:

import shutil
shutil.rmtree('directory_to_delete')

列出目录内容:

import os
for item in os.listdir('path/to/directory'):
    print(item)

复制文件:

import shutil
shutil.copy('source_file.txt', 'destination_file.txt')

移动文件:

import shutil
shutil.move('source_file.txt', 'destination_directory/')

重命名文件:

import os
os.rename('old_name.txt', 'new_name.txt')

读取文件内容:

with open('file.txt', 'r') as f:
    content = f.read()
    print(content)

写入文件内容:

with open('file.txt', 'w') as f:
    f.write('Hello, World!')

追加文件内容:

with open('file.txt', 'a') as f:
    f.write('\nAppending new line.')

检查文件是否存在:

import os
if os.path.exists('file.txt'):
    print('File exists.')

获取文件大小:

import os
file_size = os.path.getsize('file.txt')
print(f'File size: {file_size} bytes')

获取文件修改时间:

import os
mod_time = os.path.getmtime('file.txt')
print(f'Modified time: {mod_time}')

递归遍历目录:

import os
for root, dirs, files in os.walk('path/to/directory'):
    for name in files:
        print(os.path.join(root, name))

查找文件:

import os
for root, dirs, files in os.walk('path/to/directory'):
    if 'target_file.txt' in files:
        print(os.path.join(root, 'target_file.txt'))

压缩文件:

import zipfile
with zipfile.ZipFile('archive.zip', 'w') as zipf:
    zipf.write('file.txt')

解压缩文件:

import zipfile
with zipfile.ZipFile('archive.zip', 'r') as zipf:
    zipf.extractall('extracted_directory')

读取CSV文件:

import csv
with open('data.csv', 'r') as csvfile:
    reader = csv.reader(csvfile)
    for row in reader:
        print(row)

写入CSV文件:

import csv
with open('data.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(['Name', 'Age'])
    writer.writerow(['Alice', 30])
    writer.writerow(['Bob', 25])

读取JSON文件:

import json
with open('data.json', 'r') as f:
    data = json.load(f)
    print(data)

写入JSON文件:

import json
data = {'name': 'Alice', 'age': 30}
with open('data.json', 'w') as f:
    json.dump(data, f)

2. 网络请求

发送GET请求:

import requests
response = requests.get('https://api.example.com/data')
print(response.text)

发送POST请求:

import requests
payload = {'key': 'value'}
response = requests.post('https://api.example.com/submit', data=payload)
print(response.text)

发送带有头部的请求:

import requests
headers = {'Content-Type': 'application/json'}
response = requests.get('https://api.example.com/data', headers=headers)
print(response.text)

下载文件:

import requests
url = 'https://example.com/file.zip'
response = requests.get(url)
with open('downloaded_file.zip', 'wb') as f:
    f.write(response.content)

上传文件:

import requests
files = {'file': open('file.txt', 'rb')}
response = requests.post('https://api.example.com/upload', files=files)
print(response.text)

处理JSON响应:

import requests
response = requests.get('https://api.example.com/data')
data = response.json()
print(data)

处理XML响应:

import requests
import xml.etree.ElementTree as ET
response = requests.get('https://api.example.com/data.xml')
root = ET.fromstring(response.text)
for child in root:
    print(child.tag, child.attrib)

设置超时:

import requests
response = requests.get('https://api.example.com/data', timeout=5)
print(response.text)

处理HTTP错误:

import requests
try:
    response = requests.get('https://api.example.com/data')
    response.raise_for_status()
except requests.exceptions.HTTPError as errh:
    print("Http Error:", errh)
except requests.exceptions.ConnectionError as errc:
    print("Error Connecting:", errc)
except requests.exceptions.Timeout as errt:
    print("Timeout Error:", errt)
except requests.exceptions.RequestException as err:
    print("OOps: Something Else", err)

使用会话:

import requests
session = requests.Session()
session.auth = ('username', 'password')
response = session.get('https://api.example.com/data')
print(response.text)

3. 数据处理

字符串分割:

text = "apple,banana,orange"
fruits = text.split(',')
print(fruits)

字符串连接:

fruits = ['apple', 'banana', 'orange']
text = ','.join(fruits)
print(text)

字符串替换:

text = "Hello, World!"
new_text = text.replace('World', 'Python')
print(new_text)

字符串格式化:

name = "Alice"
age = 30
message = f"Name: {name}, Age: {age}"
print(message)

列表排序:

numbers = [3, 1, 4, 1, 5, 9]
sorted_numbers = sorted(numbers)
print(sorted_numbers)

字典排序:

from collections import OrderedDict
data = {'b': 2, 'a': 1, 'c': 3}
ordered_data = OrderedDict(sorted(data.items()))
print(ordered_data)

过滤列表:

numbers = [3, 1, 4, 1, 5, 9]
even_numbers = [num for num in numbers if num % 2 == 0]
print(even_numbers)

统计词频:

from collections import Counter
text = "hello world hello"
word_counts = Counter(text.split())
print(word_counts)

生成随机数:

import random
random_number = random.randint(1, 100)
print(random_number)

生成随机字符串:

import random
import string
random_string = ''.join(random.choices(string.ascii_letters + string.digits, k=10))
print(random_string)

日期和时间处理:

from datetime import datetime
now = datetime.now()
formatted_now = now.strftime("%Y-%m-%d %H:%M:%S")
print(formatted_now)

解析日期字符串:

from datetime import datetime
date_string = "2023-10-05 15:30:00"
parsed_date = datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
print(parsed_date)

计算两个日期之间的差值:

from datetime import datetime
date1 = datetime(2023, 10, 5)
date2 = datetime(2023, 10, 10)
delta = date2 - date1
print(delta.days)

生成日期范围:

from datetime import datetime, timedelta
start_date = datetime(2023, 10, 5)
end_date = datetime(2023, 10, 10)
date_range = [start_date + timedelta(days=x) for x in range((end_date - start_date).days + 1)]
for date in date_range:
    print(date.strftime("%Y-%m-%d"))

计算年龄:

from datetime import datetime
birthdate = datetime(1990, 1, 1)
today = datetime.today()
age = today.year - birthdate.year - ((today.month, today.day) < (birthdate.month, birthdate.day))
print(age)

4. 系统管理

执行系统命令:

import subprocess
result = subprocess.run(['ls', '-l'], capture_output=True, text=True)
print(result.stdout)

这段代码使用 subprocess 模块执行 ls -l 命令并打印输出结果。

获取当前工作目录:

import os
current_dir = os.getcwd()
print(current_dir)

此代码片段使用 os 模块获取并打印当前工作目录的路径。

更改当前工作目录:

import os
os.chdir('/path/to/new/directory')

这段代码更改当前工作目录到指定的路径。

获取环境变量:

import os
value = os.getenv('PATH')
print(value)

此代码片段获取名为 PATH 的环境变量的值并打印。

设置环境变量:

import os
os.environ['MY_VARIABLE'] = 'my_value'

这段代码设置一个名为 MY_VARIABLE 的环境变量,并赋予它值 my_value

获取系统信息:

import platform
system_info = platform.uname()
print(system_info)

此代码片段使用 platform 模块获取系统相关信息并打印。

获取CPU信息:

import psutil
cpu_percent = psutil.cpu_percent(interval=1)
print(cpu_percent)

这段代码使用 psutil 模块测量CPU使用率,并打印结果。

获取内存信息:

import psutil
memory_info = psutil.virtual_memory()
print(memory_info)

此代码片段获取虚拟内存信息并打印。

获取磁盘使用情况:

import psutil
disk_usage = psutil.disk_usage('/')
print(disk_usage)

这段代码获取根目录的磁盘使用情况并打印。

获取网络接口信息:

import psutil
net_if_addrs = psutil.net_if_addrs()
for interface, addrs in net_if_addrs.items():
    print(f"Interface: {interface}")
    for addr in addrs:
        print(f"  {addr.family.name}: {addr.address}")

此代码片段获取并打印所有网络接口的详细信息。

获取进程列表:

import psutil
for proc in psutil.process_iter(['pid', 'name']):
    print(proc.info)

这段代码遍历所有进程并打印它们的PID和名称。

杀死进程:

import psutil
for proc in psutil.process_iter(['pid', 'name']):
    if proc.info['name'] == 'process_name':
        proc.kill()

此代码片段查找名为 process_name 的进程并将其终止。

获取用户信息:

import pwd
user_info = pwd.getpwnam('username')
print(user_info)

这段代码使用 pwd 模块根据用户名获取用户信息并打印。

获取组信息:

import grp
group_info = grp.getgrnam('groupname')
print(group_info)

此代码片段根据组名获取组信息并打印。

获取系统日志:

import logging
logging.basicConfig(filename='app.log', level=logging.INFO)
logging.info('This is an info message')

这段代码配置日志系统,并将一条信息级别的消息写入到 app.log 文件中。

5. 文本处理

读取大文件:

with open('large_file.txt', 'r') as f:
    for line in f:
        print(line.strip())

这段代码逐行读取大文件并打印每行的内容。

逐行写入大文件:

with open('large_file.txt', 'w') as f:
    for i in range(1000000):
        f.write(f'Line {i}\n')

这段代码逐行写入100万行数据到文件中。

查找并替换文本:

with open('file.txt', 'r') as f:
    content = f.read()
new_content = content.replace('old_text', 'new_text')
with open('file.txt', 'w') as f:
    f.write(new_content)

这段代码读取文件内容,替换指定文本,然后写回文件。

读取并处理CSV文件:

import csv
with open('data.csv', 'r') as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
        print(row['Name'], row['Age'])

这段代码读取CSV文件,并打印每行的姓名和年龄。

写入CSV文件:

import csv
with open('data.csv', 'w', newline='') as csvfile:
    fieldnames = ['Name', 'Age']
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerow({'Name': 'Alice', 'Age': 30})
    writer.writerow({'Name': 'Bob', 'Age': 25})

这段代码创建一个CSV文件并写入两行数据。

读取并处理JSON文件:

import json
with open('data.json', 'r') as f:
    data = json.load(f)
    for item in data:
        print(item['name'], item['age'])

这段代码读取JSON文件并打印每个条目的姓名和年龄。

写入JSON文件:

import json
data = [{'name': 'Alice', 'age': 30}, {'name': 'Bob', 'age': 25}]
with open('data.json', 'w') as f:
    json.dump(data, f, indent=4)

这段代码将数据写入JSON文件,并格式化输出。

读取并处理XML文件:

import xml.etree.ElementTree as ET
tree = ET.parse('data.xml')
root = tree.getroot()
for child in root:
    print(child.tag, child.attrib)

这段代码读取XML文件并打印每个元素的标签和属性。

写入XML文件:

import xml.etree.ElementTree as ET
root = ET.Element("root")
doc = ET.SubElement(root, "doc")
ET.SubElement(doc, "field1", name="name").text = "some value1"
ET.SubElement(doc, "field2", name="name").text = "some value2"
tree = ET.ElementTree(root)
tree.write("output.xml")

这段代码创建一个XML文件并写入数据。

正则表达式匹配:

import re
text = "The quick brown fox jumps over the lazy dog"
pattern = r'\b\w{5}\b'
matches = re.findall(pattern, text)
print(matches)

这段代码使用正则表达式找出所有5个字符长度的单词。

正则表达式替换:

import re
text = "The quick brown fox jumps over the lazy dog"
new_text = re.sub(r'\b\w{5}\b', '*****', text)
print(new_text)

这段代码使用正则表达式替换5个字符长度的单词为星号。

提取URL:

import re
text = "Visit http://example.com  and https://another-example.com"
urls = re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', text)
print(urls)

这段代码使用正则表达式提取文本中的URL。

提取电子邮件地址:

import re
text = "Contact us at support@example.com or feedback@example.org"
emails = re.findall(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', text)
print(emails)

这段代码使用正则表达式提取电子邮件地址。

提取电话号码:

import re
text = "Call us at 123-456-7890 or 987-654-3210"
phone_numbers = re.findall(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', text)
print(phone_numbers)

这段代码使用正则表达式提取电话号码。

提取日期:

import re
text = "Meeting on 2023-10-05 and 2023-10-10"
dates = re.findall(r'\b\d{4}-\d{2}-\d{2}\b', text)
print(dates)

这段代码使用正则表达式提取日期。

6. 数学和科学计算

基本数学运算:

import math
print(math.sqrt(16))  # 平方根
print(math.pow(2, 3))  # 幂运算
print(math.sin(math.pi / 2))  # 正弦函数

这段代码演示了如何使用 Python 的 math 模块进行基本的数学运算。

矩阵运算:

import numpy as np
a = np.array([[1, 2], [3, 4]])
b = np.array([[5, 6], [7, 8]])
c = np.dot(a, b)  # 矩阵乘法
print(c)

这段代码使用 numpy 库进行矩阵乘法运算。

统计分析:

import numpy as np
data = np.array([1, 2, 3, 4, 5])
mean = np.mean(data)
median = np.median(data)
std = np.std(data)
print(f"Mean: {mean}, Median: {median}, Standard Deviation: {std}")

这段代码计算了一组数据的平均值、中位数和标准差。

线性回归:

import numpy as np
from sklearn.linear_model import LinearRegression
X = np.array([[1], [2], [3], [4], [5]])
y = np.array([2, 4, 6, 8, 10])
model = LinearRegression()
model.fit(X, y)
print(model.coef_, model.intercept_)

这段代码使用 sklearn 库进行简单的线性回归分析。

聚类分析:

from sklearn.cluster import KMeans
import numpy as np
X = np.array([[1, 2], [1, 4], [1, 0],
              [10, 2], [10, 4], [10, 0]])
kmeans = KMeans(n_clusters=2, random_state=0).fit(X)
print(kmeans.labels_)

这段代码使用 sklearn 库的 KMeans 算法进行聚类分析,并打印每个点的聚类标签。

7. Web开发

简单的Flask应用:

from flask import Flask
app = Flask(__name__)

@app.route('/')
def home():
    return 'Hello, World!'

if __name__ == '__main__':
    app.run(debug=True)

这个简单的Flask应用定义了一个路由,访问网站根目录时会返回“Hello, World!”。

Flask路由参数:

from flask import Flask
app = Flask(__name__)

@app.route('/user/<username>')
def show_user_profile(username):
    return f'User {username}'

if __name__ == '__main__':
    app.run(debug=True)

这段代码演示了如何在Flask中使用路由参数。

Flask表单提交:

from flask import Flask, request
app = Flask(__name__)

@app.route('/login', methods=['POST'])
def login():
    username = request.form['username']
    password = request.form['password']
    return f'Username: {username}, Password: {password}'

if __name__ == '__main__':
    app.run(debug=True)

这个Flask应用处理POST请求,并从表单中获取用户名和密码。

Flask模板渲染:

from flask import Flask, render_template
app = Flask(__name__)

@app.route('/')
def home():
    return render_template('index.html')

if __name__ == '__main__':
    app.run(debug=True)

这段代码展示了如何在Flask中渲染模板。

Flask静态文件:

from flask import Flask, send_from_directory
app = Flask(__name__)

@app.route('/static/<path:filename>')
def serve_static(filename):
    return send_from_directory('static', filename)

if __name__ == '__main__':
    app.run(debug=True)

这段代码演示了如何在Flask中提供静态文件。

Django简单项目:

django-admin startproject myproject
cd myproject
python manage.py runserver

这些命令用于创建一个新的Django项目并启动开发服务器。

Django模型定义:

from django.db import models

class Person(models.Model):
    first_name = models.CharField(max_length=30)
    last_name = models.CharField(max_length=30)

这段代码定义了一个Django模型,用于存储人名。

Django视图:

from django.shortcuts import render
from .models import Person

def person_list(request):
    persons = Person.objects.all()
    return render(request, 'person_list.html', {'persons': persons})

这个Django视图渲染一个模板,显示所有人员列表。

Django模板:

<!-- templates/person_list.html -->
<h1>Person List</h1>
<ul>
    {% for person in persons %}
        <li>{{ person.first_name }} {{ person.last_name }}</li>
    {% endfor %}
</ul>

这是Django模板文件,用于显示人员列表。

Django表单:

from django import forms

class PersonForm(forms.ModelForm):
    class Meta:
        model = Person
        fields = ['first_name', 'last_name']

这个Django表单基于Person模型,允许用户输入名字和姓氏。

8. 数据库操作

SQLite数据库操作:

import sqlite3
conn = sqlite3.connect('example.db')
c = conn.cursor()
# 创建表
c.execute('''CREATE TABLE stocks
             (date text, trans text, symbol text, qty real, price real)''')
# 插入数据
c.execute("INSERT INTO stocks VALUES ('2023-10-05','BUY','RHAT',100,35.14)")
# 提交事务
conn.commit()
# 查询数据
c.execute("SELECT * FROM stocks WHERE symbol = 'RHAT'")
print(c.fetchall())
# 关闭连接
conn.close()

MySQL数据库操作:

import mysql.connector
conn = mysql.connector.connect(
    host='localhost',
    user='yourusername',
    password='yourpassword',
    database='yourdatabase'
)
cursor = conn.cursor()
# 创建表
cursor.execute('''CREATE TABLE stocks
                   (date VARCHAR(255), trans VARCHAR(255), symbol VARCHAR(255), qty DOUBLE, price DOUBLE)''')
# 插入数据
cursor.execute("INSERT INTO stocks (date, trans, symbol, qty, price) VALUES (%s, %s, %s, %s, %s)",
               ('2023-10-05', 'BUY', 'RHAT', 100, 35.14))
# 提交事务
conn.commit()
# 查询数据
cursor.execute("SELECT * FROM stocks WHERE symbol = 'RHAT'")
for row in cursor.fetchall():
    print(row)
# 关闭连接
cursor.close()
conn.close()

PostgreSQL数据库操作:

import psycopg2
conn = psycopg2.connect(
    host='localhost',
    database='yourdatabase',
    user='yourusername',
    password='yourpassword'
)
cursor = conn.cursor()
# 创建表
cursor.execute('''CREATE TABLE stocks
                   (date VARCHAR(255), trans VARCHAR(255), symbol VARCHAR(255), qty DOUBLE PRECISION, price DOUBLE PRECISION)''')
# 插入数据
cursor.execute("INSERT INTO stocks (date, trans, symbol, qty, price) VALUES (%s, %s, %s, %s, %s)",
               ('2023-10-05', 'BUY', 'RHAT', 100, 35.14))
# 提交事务
conn.commit()
# 查询数据
cursor.execute("SELECT * FROM stocks WHERE symbol = 'RHAT'")
for row in cursor.fetchall():
    print(row)
# 关闭连接
cursor.close()
conn.close()

SQLAlchemy ORM操作:

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
engine = create_engine('sqlite:///example.db')
Base = declarative_base()
class Person(Base):
    __tablename__ = 'persons'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    age = Column(Integer)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# 插入数据
new_person = Person(name='Alice', age=30)
session.add(new_person)
session.commit()
# 查询数据
persons = session.query(Person).filter_by(name='Alice').all()
for person in persons:
    print(person.id, person.name, person.age)
# 关闭会话
session.close()

以上是四种不同数据库的操作示例,包括SQLite、MySQL、PostgreSQL以及使用SQLAlchemy ORM进行数据库操作。每个示例都包括了连接数据库、创建表、插入数据、查询数据和关闭连接的基本步骤。

9. 其他

定时任务:

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).seconds.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

这段代码使用 schedule 库来安排一个每10秒执行一次的任务。

多线程:

import threading

def print_numbers():
    for i in range(10):
        print(i)

def print_letters():
    for letter in 'abcdef':
        print(letter)

t1 = threading.Thread(target=print_numbers)
t2 = threading.Thread(target=print_letters)
t1.start()
t2.start()
t1.join()
t2.join()

这段代码创建两个线程,分别打印数字和字母。

多进程:

import multiprocessing

def print_numbers():
    for i in range(10):
        print(i)

def print_letters():
    for letter in 'abcdef':
        print(letter)

p1 = multiprocessing.Process(target=print_numbers)
p2 = multiprocessing.Process(target=print_letters)
p1.start()
p2.start()
p1.join()
p2.join()

这段代码创建两个进程,分别执行不同的函数。

异步编程:

import asyncio

async def print_numbers():
    for i in range(10):
        print(i)
        await asyncio.sleep(1)

async def print_letters():
    for letter in 'abcdef':
        print(letter)
        await asyncio.sleep(1)

async def main():
    task1 = asyncio.create_task(print_numbers())
    task2 = asyncio.create_task(print_letters())
    await task1
    await task2

asyncio.run(main())

这段代码使用 asyncio 库来执行两个异步任务。

日志记录:

import logging

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logging.debug('This is a debug message')
logging.info('This is an info message')
logging.warning('This is a warning message')
logging.error('This is an error message')
logging.critical('This is a critical message')

这段代码配置并使用 Python 的 logging 模块来记录不同级别的日志信息。

配置文件读取:

import configparser

config = configparser.ConfigParser()
config.read('config.ini')
db_host = config['DATABASE']['host']
db_port = config['DATABASE']['port']
db_user = config['DATABASE']['user']
db_password = config['DATABASE']['password']
print(f'DB Host: {db_host}, Port: {db_port}, User: {db_user}, Password: {db_password}')

这段代码使用 configparser 模块读取配置文件,并提取数据库连接信息。

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区