0. 概述

主要参考资料:

Python 3 Module of the Week | pymotw
python 文档 | python
菜鸟教程 | runoob

重点看和科学计算相关的库,网络之类的库回头用到了再看

1. 文本格式化

1.1 基本方法

最基本的方法是使用 print 函数的格式化写法

1
print ("my name is %s,%d years old" % ('yishiyu', 10))

其中占位符的含义为(参考自: 菜鸟教程):

占位符 含义
%c 字符及其 ascii 码
%s 字符串
%d 整数
%u 无符号整数
%o 无符号八进制数
%x 无符号十六进制小写
%X 无符号十六进制大写
%f 浮点数
%e 科学计数法浮点数
%p 十六进制变量地址
辅助指令 含义
- 左对齐
+ 正数前显示加号
# 特殊进制前显示标志
0 默认填充 0 而非空格
%% 转义输出%
(var) 映射字典中的变量
m.n 最小位宽,小数精度

1.2 format 函数

format 函数是 str 类的内置函数

首先可以使用带下标的占位符

1
2
s1 = "{} {}".format("hello", "world")
s2 = "{1} {0} {1}".format("hello", "world")

然后可以设置复杂的参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
print("网站名:{name}, 地址 {url}".format(name="菜鸟教程", url="www.runoob.com"))

# 通过字典设置参数
site = {"name": "菜鸟教程", "url": "www.runoob.com"}
print("网站名:{name}, 地址 {url}".format(**site))

# 通过列表索引设置参数
my_list = ['菜鸟教程', 'www.runoob.com']
print("网站名:{0[0]}, 地址 {0[1]}".format(my_list)) # "0" 是必须的

# 通过对象属性设置参数
class AssignValue(object):
def __init__(self, value):
self.value = value
my_value = AssignValue(6)
print('value 为: {0.value}'.format(my_value)) # "0" 是可选的

# format函数基本格式
print("{:.2f}".format(3.1415926))

新增格式指令(基本格式与 1.1.1 中的一样):

指令格式 含义
^ 居中对齐
< 左对齐
> 右对齐
- 显示正负号
空格 正数前填充空格
{{}} 转义大括号
数字 位宽
格式化指令 含义
{:.2f} 保留小数点后两位
{:+.2f} 带符号保留小数点后两位
{:+.2f} 带符号保留小数点后两位
{:.0f} 不带小数
{:0>2d} 数字补零 (填充左边, 宽度为 2)
{:x<4d} 数字补 x (填充右边, 宽度为 4)
{:x<4d} 数字补 x (填充右边, 宽度为 4)
{:,} 以逗号分隔的数字格式
{:.2%} 百分比格式
{:.2e} 指数记法
{:>10d} 右对齐 (默认, 宽度为 10)
{:<10d} 左对齐 (宽度为 10)
{:^10d} 中间对齐 (宽度为 10)

1.3 pprint 包

可以使用 pprint 包美观地打印数据结构

函数 作用
pprint.pprint 美观地打印数据结构
pprint.pformat 获取美观打印的字符串(可以用于写入日志)

两个函数的完整函数声明:

1
2
pprint.pprint(object, stream=None, indent=1, width=80, depth=None, *, compact=False, sort_dicts=True)
pprint.pformat(object, indent=1, width=80, depth=None, *, compact=False, sort_dicts=True)
参数 作用
object 被打印的对象
stream 打印的流,默认为 sys.stdout
width 打印宽度
depth 打印深度
compact 压缩模式

2. 数据结构

list,tuple,set,dict四种数据结构略过

2.1 Enum:枚举类型

2.1.1 普通枚举类型

1
2
3
4
5
6
import enum

class Status(enum.Enum):
s1 = 1
s2 = 2
s3 = 3

枚举在迭代的时候会按照声明的顺序生成,而不是值或名字
然后枚举值是无序的,不能比较大小,只能通过==is运算符比较

2.1.2 整数枚举类型

通过继承IntEnum,可以使得枚举可以比较大小

1
2
3
4
5
6
import enum

class Status(enum.IntEnum):
s1 = 1
s2 = 2
s3 = 3

2.1.3 唯一枚举

普通枚举的每个值可以对应多个枚举,可以通过装饰器实现唯一枚举

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import enum

class Status_1(enum.Enum):
s1 = 1
s2 = 2
s3 = 3
sone = 1

@enum.unique
class Status_1(enum.Enum):
s1 = 1
s2 = 2
s3 = 3
# 此时这个枚举值会引发错误
sone = 1

2.2 ChainMap:连锁字典

连锁字典是一个扩展的字典,可以把多个字典串联起来

  • 连锁字典不会创建组成其字典的拷贝,而是直接引用(更改会作用到原字典上)
  • 连锁字典在搜索的时候会从第一个字典开始依次搜索所有字典,直到找到一个值
  • 连锁字典的更改只会作用到第一个字典上(无论是修改还是增加)
  • 可以根据原有的连锁字典创建新层或删除旧层

_总之,ChainMap 可以把字典像栈一样组织起来_

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from collections import ChainMap

dict_a = {'a': 'A', 'c': 'C'}
dict_b = {'b': 'B', 'c': 'D'}

# 创建一个连锁字典
m = ChainMap(dict_a, dict_b)

# =================================================
# -------------------- 搜索字典 --------------------
# =================================================

print('key=value')
for k, v in m.items():
print('{:^3}={:^3}'.format(k, v))

# key=value
# b = B
# c = C
# a = A

# =================================================
# -------------------- 修改字典 --------------------
# =================================================

m['a'] = 'AAA'
m['e'] = 'EEE'
print(dict_a)

# {'a': 'AAA', 'c': 'C', 'e': 'EEE'}

# =================================================
# -------------------- 创建新层 --------------------
# =================================================

dict_c = {'f': 'FFF'}

# 添加一个新层
child_m_1 = m.new_child()
# 添加一个已有层
child_m_2 = m.new_child(dict_c)

print(child_m_1)
print(child_m_2)

# ChainMap({}, {'a': 'AAA', 'c': 'C', 'e': 'EEE'}, {'b': 'B', 'c': 'D'})
# ChainMap({'f': 'FFF'}, {'a': 'AAA', 'c': 'C', 'e': 'EEE'}, {'b': 'B', 'c': 'D'})

# =================================================
# -------------------- 删除旧层 --------------------
# =================================================


parent_m = m.parents
print(m)
print(parent_m)

# ChainMap({'a': 'AAA', 'c': 'C', 'e': 'EEE'}, {'b': 'B', 'c': 'D'})
# ChainMap({'b': 'B', 'c': 'D'})

2.3 Counter:计数器

Counter 是一个对可迭代对象的元素进行计数的类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from collections import Counter

# =================================================
# --------------------- 初始化 ---------------------
# =================================================

# c = Counter()
# c = Counter('gallahad')
# c = Counter({'red': 4, 'blue': 2})
# c = Counter(cats=4, dogs=8)
c = Counter('abbccc')
print(c)
# Counter({'c': 3, 'b': 2, 'a': 1})

# =================================================
# --------------------- 更新值 ---------------------
# =================================================

c.update('deefff')
print(c)
# Counter({'c': 3, 'f': 3, 'b': 2, 'e': 2, 'a': 1, 'd': 1})

c.update('aab')
print(c)
# Counter({'a': 3, 'b': 3, 'c': 3, 'f': 3, 'e': 2, 'd': 1})

# =================================================
# --------------------- 访问值 ---------------------
# =================================================

print(c['a'])
for k in c.elements():
print('{}={}'.format(k, c[k]))
# 3
# a=3
# a=3
# a=3
# ...

# =================================================
# --------------------- 复杂操作 -------------------
# =================================================

c_1 = Counter('aaabbc')
c_2 = Counter('bccddd')

print(c_1+c_2)
print(c_1-c_2)
print(c_1 & c_2)
print(c_1 | c_2)
# Counter({'a': 3, 'b': 3, 'c': 3, 'd': 3})
# Counter({'a': 3, 'b': 1})
# Counter({'b': 1, 'c': 1})
# Counter({'a': 3, 'd': 3, 'b': 2, 'c': 2})

2.4 defaultdict:缺省值字典

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from collections import defaultdict

# =================================================
# --------------------- 示例 1 ---------------------
# =================================================

s = [('yellow', 1), ('blue', 2), ('yellow', 3), ('blue', 4), ('red', 1)]
d = defaultdict(list)
for k, v in s:
d[k].append(v)

print(d.items())
# dict_items([('yellow', [1, 3]), ('blue', [2, 4]), ('red', [1])])

# =================================================
# --------------------- 示例 2 ---------------------
# =================================================

def constant_factory(value):
return lambda: value


d = defaultdict(constant_factory('<missing>'))
d.update(name='John', action='ran')

print('%(name)s %(action)s to %(object)s' % d)
print('{name} {action} to {object}'.format(**d))
# John ran to <missing>
# John ran to <missing>

2.5 deque:双向队列

即是可以双向操作的队列(其实也可以进行随机操作)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from collections import deque

# 可以指定最大长度
d = deque('aaabbc',maxlen=20)

# =================================================
# --------------------- 两端操作 -------------------
# =================================================

d.extend('r1')
d.append('r2')
d.extendleft('l1')
d.appendleft('l2')
print(d)
# deque(['l2', '1', 'l', 'a', 'a', 'a', 'b', 'b', 'c', 'r', '1', 'r2'])

# =================================================
# --------------------- 随机操作 -------------------
# =================================================

print(d[3])
d.insert(3,'x')
print(d[3])
d.remove('x')
print(d[3])
# a
# x
# a

# =================================================
# --------------------- 消费操作 -------------------
# =================================================
# 这些操作是线程安全的

t = None
print(t,d)
t = d.pop()
print(t,d)
t = d.popleft()
print(t,d)
# None deque(['l2', '1', 'l', 'a', 'a', 'a', 'b', 'b', 'c', 'r', '1', 'r2'])
# r2 deque(['l2', '1', 'l', 'a', 'a', 'a', 'b', 'b', 'c', 'r', '1'])
# l2 deque(['1', 'l', 'a', 'a', 'a', 'b', 'b', 'c', 'r', '1'])

# =================================================
# --------------------- 旋转操作 -------------------
# =================================================

print(d)
d.rotate(2)
print(d)
d.rotate(-2)
print(d)
# deque(['1', 'l', 'a', 'a', 'a', 'b', 'b', 'c', 'r', '1'])
# deque(['r', '1', '1', 'l', 'a', 'a', 'a', 'b', 'b', 'c'])
# deque(['1', 'l', 'a', 'a', 'a', 'b', 'b', 'c', 'r', '1'])

2.6 namedtuple:具名元组

简而言之,namedtuple 可以用作快速定义简单的结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from collections import namedtuple

# Person = namedtuple('Person','name age')
Person = namedtuple('Person', ['name', 'age'])


bob = Person('Bob', 30)
jane = Person('Jane', age=29)
print(bob.name)
# Bob

for name, age in [bob, jane]:
print('{} is {} years old'.format(name, age))
# Bob is 30 years old
# Jane is 29 years old

2.7 OrderedDict:有序字典

  • 首先 OrderedDict 会记住键值对插入的顺序(更新键值对不会改变顺序)
  • 然后再进行相等性比较的时候,OrderedDict 会额外对比插入顺序(在键值对相等的基础上)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from collections import OrderedDict

d = OrderedDict()
d['a'] = 'A'
d['b'] = 'B'
d['c'] = 'C'
d['d'] = 'D'

for k,v in d.items():
print('{}:{}'.format(k,v))
# a:A
# b:B
# c:C
# d:D

# =================================================
# --------------------- 更新键值对 ------------------
# =================================================

d['c'] = 'T'
for k,v in d.items():
print('{}:{}'.format(k,v))
# a:A
# b:B
# c:T
# d:D

# =================================================
# --------------------- 重排操作 -------------------
# =================================================
d.move_to_end('c')
for k,v in d.items():
print('{}:{}'.format(k,v))
# a:A
# b:B
# d:D
# c:T

2.8 array:数组

可以把 array 看作只能存储一种数据类型的 list

代码 类型 最低大小
b int 1
B int 1
h signed short 2
H unsigned short 2
i signed int 2
I unsigned int 2
l signed long 4
L unsigned long 4
q signed long long 8
Q unsigned long long 8
f float 4
d double 8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56


import tempfile
from array import array

# =================================================
# --------------------- 基本用法 -------------------
# =================================================
import binascii

s = b'this is the array'
a = array('b', s)

print('as byte string:', s)
print('as array :', a)
print('as hex :', binascii.hexlify(a))
# as byte string: b'this is the array'
# as array : array('b', [116, 104, 105, 115, 32, 105, 115, 32, 116, 104, 101, 32, 97, 114, 114, 97, 121])
# as hex : b'7468697320697320746865206172726179'

# =================================================
# --------------------- 修改方法 -------------------
# =================================================

a = array('i', [1, 2, 3])
print(a)

a.append(6)
a.extend([777, 888])
print(a)
# array('i', [1, 2, 3])
# array('i', [1, 2, 3, 6, 777, 888])

# =================================================
# --------------------- 文件操作 -------------------
# =================================================

a = array('i', range(7))
output = tempfile.NamedTemporaryFile()
print(a)
# array('i', [0, 1, 2, 3, 4, 5, 6])

# 写入文件
a.tofile(output.file)
output.flush()

# 读取文件
with open(output.name, 'rb') as input:
raw_data = input.read()
print(binascii.hexlify(raw_data))

input.seek(0)
b = array('i')
b.fromfile(input, len(a))
print(b)
# array('i', [0, 1, 2, 3, 4, 5, 6])

2.9 heapq:堆排序算法

heapq 实现了一个最小堆

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import heapq
import math
from io import StringIO


def show_tree(tree, total_width=36, fill=' '):
output = StringIO()
last_row = -1
for i, n in enumerate(tree):
if i:
# 堆是完全二叉树,根据节点下标计算层数
row = int(math.floor(math.log(i+1, 2)))
else:
row = 0
# 进入下一行
if row != last_row:
output.write('\n')
# 计算当前行节点数和单个节点宽度
collumns = 2**row
col_width = int(math.floor(total_width / collumns))
# 居中显示
output.write(str(n).center(col_width, fill))
last_row = row
print(output.getvalue())
print('-' * total_width)
print()

# =================================================
# -------------------- 堆插入操作 -------------------
# =================================================


data = [19, 9, 4, 10, 11]
heap = []
for n in data:
print('add {:>3}:'.format(n))
# 每次插入一个值,都会触发小顶堆重新排序
heapq.heappush(heap, n)
show_tree(heap)

# =================================================
# -------------------- 堆重排操作 -------------------
# =================================================

# 把整个无序的堆排序
heapq.heapify(data)
show_tree(data)

# =================================================
# -------------------- 堆访问操作 -------------------
# =================================================

# 返回最小值
top = heapq.heappop(data)
print(top)
show_tree(data)

# =================================================
# -------------------- 堆修改操作 -------------------
# =================================================

# 返回最小值,并插入一个值
top = heapq.heapreplace(data, 99)
print(top)
show_tree(data)

# =================================================
# -------------------- 堆查询操作 -------------------
# =================================================

print(heapq.nlargest(2, data))
print(heapq.nsmallest(2, data))
# [99, 19]
# [10, 11]

# =================================================
# -------------------- 堆合并操作 -------------------
# =================================================
# 把几个已经有序的堆合并成一个堆

data = [
[33, 55, 77, 99, 44],
[66, 45, 78, 24, 68],
[95, 47, 85, 31, 41]
]

for l in data:
heapq.heapify(l)

print(data)
# [[33, 44, 77, 99, 55],
# [24, 45, 78, 66, 68],
# [31, 41, 85, 47, 95]]

heap = heapq.merge(*data)
for i in heap:
print(i, end=' ')
# 24 31 33 41 44 45 77 78 66 68 85 47 95 99 55

2.10 bisect:有序列表算法

bisect 提供了基于二分法的有序列表查找和插入
其分别提供了两个方法来进行查找插入位置和插入(默认升序):

函数 作用
biset_left 查找最左的适合插入的位置
biset_right 查找最右的适合插入的位置
biset 同 biset_right
insort_left 在最左合适位置插入
insort_right 在最右合适位置插入
insort 同 insort_right

可以用这几个方法组合出一些自定义的工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from bisect import bisect_left, bisect_right


def index(a, x):
'Locate the leftmost value exactly equal to x'
i = bisect_left(a, x)
if i != len(a) and a[i] == x:
return i
raise ValueError


def find_lt(a, x):
'Find rightmost value less than x'
i = bisect_left(a, x)
if i:
return a[i-1]
raise ValueError


def find_le(a, x):
'Find rightmost value less than or equal to x'
i = bisect_right(a, x)
if i:
return a[i-1]
raise ValueError


def find_gt(a, x):
'Find leftmost value greater than x'
i = bisect_right(a, x)
if i != len(a):
return a[i]
raise ValueError


def find_ge(a, x):
'Find leftmost item greater than or equal to x'
i = bisect_left(a, x)
if i != len(a):
return a[i]
raise ValueErrors

2.11 queue:线程安全队列

queue 一共提供了三个队列:

  • Queue:普通的 FIFO 队列
  • LifoQueue:FILO 队列(栈,略)
  • PriorityQueue:优先队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import queue

# =================================================
# --------------------- 普通队列 -------------------
# =================================================

# 可以设置最大值,设置为0则为无限
q = queue.Queue(maxsize=0)

for i in range(5):
# 如果达到上限,则会阻塞,直到有线程消耗了一个对象
q.put(i,block=False,timeout=None)
# q.put_nowait(i)

while not q.empty():
# 如果为空,则会阻塞,直到有线程插入一个对象
print(q.get(block=False,timeout=None),end=' ')
# q.get_nowait()
# 0 1 2 3 4

print()

# =================================================
# --------------------- 任务队列 -------------------
# =================================================

import threading

q = queue.Queue()

def worker():
while True:
item = q.get()
print(f'Working on {item}')
print(f'Finished {item}')
# 减少一个任务计数
q.task_done()

# 开启 worker 线程
threading.Thread(target=worker, daemon=True).start()

for item in range(30):
# 增加一个任务计数
q.put(item)
print('All task requests sent\n', end='')

# 阻塞直到所有任务完成
q.join()
print('All work completed')

# =================================================
# -------------------- 优先队列 1 ------------------
# =================================================

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
priority: int
item: Any=field(compare=False)

pri = [5,8,4,6,2]
data = [1,2,3,4,5]
q = queue.PriorityQueue()

for p,d in zip(pri,data):
q.put(PrioritizedItem(p,d))

while not q.empty():
t = q.get()
print(t)

# PrioritizedItem(priority=2, item=5)
# PrioritizedItem(priority=4, item=3)
# PrioritizedItem(priority=5, item=1)
# PrioritizedItem(priority=6, item=4)
# PrioritizedItem(priority=8, item=2)

# =================================================
# -------------------- 优先队列 2 ------------------
# =================================================
import functools

@functools.total_ordering
class PrioritizedClass:
def __init__(self,priority,item):
self.priority = priority
self.item = item

def show(self):
print('PrioritizedClass(priority={}, item={})'.format(self.priority,self.item))

def __eq__(self,other):
try:
return self.priority == other.priority
except AttributeError:
return NotImplemented

def __lt__(self,other):
try:
return self.priority < other.priority
except AttributeError:
return NotImplemented

pri = [5,8,4,6,2]
data = [1,2,3,4,5]
q = queue.PriorityQueue()

for p,d in zip(pri,data):
q.put(PrioritizedClass(p,d))

while not q.empty():
t = q.get()
t.show()

# PrioritizedClass(priority=2, item=5)
# PrioritizedClass(priority=4, item=3)
# PrioritizedClass(priority=5, item=1)
# PrioritizedClass(priority=6, item=4)
# PrioritizedClass(priority=8, item=2)

2.12 copy:复制

2.12.1 copy.copy:浅复制

浅复制会创建一个浅副本(shallow copy)
浅副本可以看成是一个指向原对象的引用
如果建立 list 对象的浅副本,则会创建一个新的 list,其中的元素为原 list 中元素的引用

2.12.2 copy.deepcopy():深复制

深复制会完全复制一份对象,对于 list,则会把其中的每个对象都复制一份
可以通过自定义一个类的__copy____deepcopy__方法自定义类的复制行为

3. 语言工具

3.1 functools

3.1.1 functools.partial

partial 是一个用来固定函数参数的工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import functools


def func(a, b, c=4):
print('called with ({},{},{})'.format(a, b, c))


func(1, 2)
func(8, 7, 6)

# =================================================
# --------------------- 固定参数 -------------------
# =================================================

func_p = functools.partial(func, 9, c=7)
func_p(8)
func_p('b', c='c')
# called with (9,8,7)
# called with (9,b,c)

# func_p('b', c='c',a='a')
# 重复定义位置参数会引发错误
# TypeError: func() got multiple values for argument 'a'

3.1.2 functools.tool_ordering

为了使得一个类支持比较,需要实现__lt__(),__le__(),__eq__(),__gt__(),__ge__()方法,tool_ordering 工具提供了一个简略方法,只需要实现__eq__()和另一个比较方法,tool_ordering 会自动生成其他方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import functools


@functools.total_ordering
class Student:
def __init__(self, lastname, firstname):
self.lastname = lastname
self.firstname = firstname

def _is_valid_operand(self, other):
return (hasattr(other, "lastname") and
hasattr(other, "firstname"))

def __eq__(self, other):
if not self._is_valid_operand(other):
return NotImplemented
return ((self.lastname.lower(), self.firstname.lower()) ==
(other.lastname.lower(), other.firstname.lower()))

def __lt__(self, other):
if not self._is_valid_operand(other):
return NotImplemented
return ((self.lastname.lower(), self.firstname.lower()) <
(other.lastname.lower(), other.firstname.lower()))


john = Student('john', 'aaa')
bob = Student('bob', 'bbb')

print(john == bob)
print(john > bob)
# False
# True

3.1.3 functools.lru_cache

lru 即 Last Recent Used(最近使用)
以装饰器的方式为一个函数添加缓存功能,条件为参数必须可以散列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import functools

# 默认最大缓存为128,不同类型参数放一块儿缓冲
# 要求参数必须可以散列,否则产生一个TypeError
@functools.lru_cache(maxsize=128,type=False)
def expensive(a, b):
print('expensive({},{})'.format(a, b))
return a*b


MAX = 3

# =================================================
# -------------------- 第一次调用 -------------------
# =================================================

for i in range(MAX):
for j in range(MAX):
expensive(i, j)
print(expensive.cache_info())
# ...
# expensive(2,0)
# expensive(2,1)
# expensive(2,2)
# CacheInfo(hits=0, misses=9, maxsize=128, currsize=9)

# =================================================
# -------------------- 第二次调用 -------------------
# =================================================

for i in range(MAX):
for j in range(MAX):
expensive(i, j)
print(expensive.cache_info())
# CacheInfo(hits=9, misses=9, maxsize=128, currsize=9)

# =================================================
# -------------------- 清空缓存 --------------------
# =================================================


expensive.cache_clear()
print(expensive.cache_info())
# CacheInfo(hits=0, misses=0, maxsize=128, currsize=0)

3.1.4 functools.reduce

reduce 可以依次处理可迭代对象中的每个元素
如果既没有初始值,可迭代对象也为空,则会引发 TypeError

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import functools

def do_reduce(a,b):
print('do_reduce({},{})'.format(a,b))
return a+b

data = range(5)

# =================================================
# ------------------- 无初始值调用 ------------------
# =================================================

result = functools.reduce(do_reduce,data)
print(result)
# do_reduce(0,1)
# do_reduce(1,2)
# do_reduce(3,3)
# do_reduce(6,4)
# 10

# =================================================
# ------------------- 有初始值调用 ------------------
# =================================================

result = functools.reduce(do_reduce,data,10)
print(result)
# do_reduce(10,0)
# do_reduce(10,1)
# do_reduce(11,2)
# do_reduce(13,3)
# do_reduce(16,4)
# 20

3.2 itertools

3.2.1 合并迭代器

3.2.1.1 串行合并

itertools.chain 能把多个可迭代对象串行连接

1
2
3
4
5
6
7
8
9
10
11
12
import itertools

a=[1,2,3]
b=[4,5,6]
temp = [a,b]

c=itertools.chain(a,b)
print(list(c))
c=itertools.chain(*temp)
print(list(c))
# [1, 2, 3, 4, 5, 6]
# [1, 2, 3, 4, 5, 6]
3.2.1.2 并行合并

内置的 zip 函数可以把多个可迭代对象并行连接

1
2
3
4
5
6
7
8
9
10
11
import itertools

a = [1, 2, 3]
b = [4, 5, 6, 7]

c = zip(a, b)
print(list(c))
c = itertools.zip_longest(a, b)
print(list(c))
# [(1, 4), (2, 5), (3, 6)]
# [(1, 4), (2, 5), (3, 6), (None, 7)]

3.2.2 转换输入

map 可以完成一个从多个集合到一个集合的映射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
a = range(5)
b = range(5, 10)
c = range(10, 15)
d = range(15, 20)


def add(a, b, c, d):
return (a, b, c, d, a+b+c+d)


for i in map(add, a, b, c, d):
print('{}+{}+{}+{}={}'.format(*i))

# 0+5+10+15=30
# 1+6+11+16=34
# 2+7+12+17=38
# 3+8+13+18=42
# 4+9+14+19=46

3.2.3 循环重复

itertools.cycle 可以不断循环一个迭代器

1
2
3
4
5
6
7
import itertools

a = range(5)

print(list(zip(range(13),
itertools.cycle(a))))
# [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 0), (6, 1), (7, 2), (8, 3), (9, 4), (10, 0), (11, 1), (12, 2)]

itertools.repeat 可以不断重复一个值(略)
等同于 cycle([p])

3.2.4 过滤

itertools.takewhile 返回不满足条件之前的所有内容
itertools.dropwhile 返回不满足条件元素之后(包括本身)的所有内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import itertools

a = [1,2,3,4,3,2,1]

print(list(
itertools.takewhile(
lambda x: x < 3,
a
)
))
print(list(
itertools.dropwhile(
lambda x: x < 3,
a
)
))
# [1, 2]
# [3, 4, 3, 2, 1]

filter 返回所有满足条件的内容
itertools.filterfalse 返回所有不满足条件的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import itertools

a = [1,2,3,4,3,2,1]

print(list(
filter(
lambda x: x < 3,
a
)
))
print(list(
itertools.filterfalse(
lambda x: x < 3,
a
)
))
# [1, 2, 2, 1]
# [3, 4, 3]

itertools.compress 根据选择器中的元素决定是否过滤

1
2
3
4
5
6
7
8
9
10
11
12
import itertools

data = [1, 2, 3, 4, 3, 2, 1]
selectors = itertools.cycle([True, False])

print(list(
itertools.compress(
data,
selectors
)
))
# [1, 3, 3, 1]

3.2.5 分组

itertools.groupby 可以对一个可迭代对象根据一定条件分组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import functools
from itertools import *
import operator
import pprint


@functools.total_ordering
class Point:

def __init__(self, x, y):
self.x = x
self.y = y

def __repr__(self):
return '({}, {})'.format(self.x, self.y)

def __eq__(self, other):
return (self.x, self.y) == (other.x, other.y)

def __gt__(self, other):
return (self.x, self.y) > (other.x, other.y)


data = list(map(Point,
cycle([0,1,2]),
list(range(7))))
print('Data:')
pprint.pprint(data, width=35)
print()

# 给不排序的数据分组
print('Grouped, unsorted:')
for k, g in groupby(data, operator.attrgetter('x')):
print(k, list(g))
print()

data.sort()

# 给分组后的数据分组
print('Grouped, sorted:')
for k, g in groupby(data, operator.attrgetter('x')):
print(k, list(g))
print()
# Data:
# [(0, 0),
# (1, 1),
# (2, 2),
# (0, 3),
# (1, 4),
# (2, 5),
# (0, 6)]

# Grouped, unsorted:
# 0 [(0, 0)]
# 1 [(1, 1)]
# 2 [(2, 2)]
# 0 [(0, 3)]
# 1 [(1, 4)]
# 2 [(2, 5)]
# 0 [(0, 6)]

# Grouped, sorted:
# 0 [(0, 0), (0, 3), (0, 6)]
# 1 [(1, 1), (1, 4)]
# 2 [(2, 2), (2, 5)]

3.2.6 组合输入

itertools.product 可以把获取多个可迭代对象的笛卡尔积

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import itertools
import pprint

SUITS = {'H', 'D', 'C', 'S'}
FACE = {'J', 'Q', 'K', 'A'}

result = itertools.product(SUITS,FACE)
pprint.pprint(list(result),width=20)
# [('D', 'K'),
# ('D', 'Q'),
# ('D', 'J'),
# ('D', 'A'),
# ('S', 'K'),
# ('S', 'Q'),
# ('S', 'J'),
# ('S', 'A'),
# ('C', 'K'),
# ('C', 'Q'),
# ('C', 'J'),
# ('C', 'A'),
# ('H', 'K'),
# ('H', 'Q'),
# ('H', 'J'),
# ('H', 'A')]

itertools.permutations 可以把一个可迭代对象中的元素的所有排列

1
2
3
4
5
6
7
8
9
10
11
12
13
import itertools
import pprint

data = 'abcd'
pprint.pprint(list(itertools.permutations(data,3)),
width=40)
# [('a', 'b', 'c'),
# ('a', 'b', 'd'),
# ('a', 'c', 'b'),
# ...
# ('d', 'b', 'c'),
# ('d', 'c', 'a'),
# ('d', 'c', 'b')]

itertools.combinations 可以把一个可迭代对象中的元素的所有组合

1
2
3
4
5
6
7
8
9
10
import itertools
import pprint

data = 'abcd'
pprint.pprint(list(itertools.combinations(data,3)),
width=40)
# [('a', 'b', 'c'),
# ('a', 'b', 'd'),
# ('a', 'c', 'd'),
# ('b', 'c', 'd')]

itertools.combinations_with_replacement 增加了自己与自己配对

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import itertools
import pprint

data = 'abcd'
pprint.pprint(list(
itertools.combinations_with_replacement(data, 2)
), width=40)
# [('a', 'a'),
# ('a', 'b'),
# ('a', 'c'),
# ('a', 'd'),
# ('b', 'b'),
# ('b', 'c'),
# ('b', 'd'),
# ('c', 'c'),
# ('c', 'd'),
# ('d', 'd')]

3.3 contextlib

3.3.1 上下文管理器

上下文管理器即通过 with 关键字进行一些关键资源的自动管理

基本的管理可以通过如下方式定义上下文类完成
其中__exit__可以接受一个异常,如果可以处理这个异常,则返回False以阻止异常继续传播,否则返回True

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Context():
def __init__(self):
print("__init__")

def __enter__(self):
print("__enter__")

def __exit__(self,exc_type,exc_val,exc_tb):
print("exit")

with Context():
print("do something")
# __init__
# __enter__
# do something
# exit

然后可以通过在上下文开启的时候获取一个资源对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Resource():
def __init__(self, descript):
self.descript = descript

def show(self):
print(self.descript)


class Context():
def __init__(self):
print("__init__")

def __enter__(self):
print("__enter__")
return Resource("a Resource")

def __exit__(self, exc_type, exc_val, exc_tb):
print("exit")


with Context() as resource:
print("do something")
resource.show()

# __init__
# __enter__
# do something
# a Resource
# exit

3.3.2 contextlib.ContextDecorator

接下来是 contextlib 的使用方法和优点
首先可以继承其中的装饰器类,用来装饰函数(有点像 Spring 中的面向切片编程 AOP)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import contextlib


class Context(contextlib.ContextDecorator):
def __init__(self, desc):
print("context init")
self.desc = desc

def __enter__(self):
print("context enter with {}".format(self.desc))

def __exit__(self, exc_type, exc_val, exc_tb):
print("context exit")


@Context("the desc")
def func():
print("in function")


func()
# context init
# context enter with the desc
# in function
# context exit

3.3.3 contextlib

然后是使用其中的管理器装饰器对象装饰一个函数
简化构造上下文管理器的过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import contextlib


@contextlib.contextmanager
def make_context():
print(" entering")
try:
yield{}
except RuntimeError as err:
print(" error:{}".format(err))
finally:
print(" exiting")


print("Nomal:")
with make_context() as value:
print(" inside with statement:", value)

print("\nhandled error:")
with make_context() as value:
raise RuntimeError("runtime error example")

print("\nunhandled error:")
with make_context() as value:
raise ValueError("value error example")

# Nomal:
# entering
# inside with statement: {}
# exiting

# handled error:
# entering
# error:runtime error example
# exiting

# unhandled error:
# entering
# exiting
# Traceback (most recent call last):
# ...
# raise ValueError("value error example")
# ValueError: value error example

3.3.4 conntextlib API

有的系统资源关闭的方式比较统一,比如资源关闭 API 很多都是close()方法
contextlib 提供了一些 API 来对类似资源进行管理(当然这些资源本身有本身的资源管理器可以用)
无论 with 代码块内部是否发生异常,资源都会被安全关闭

1
2
3
4
5
6
7
8
9
10
11
import contextlib

class Door():
def __init__(self):
print("open Door")

def close(self):
print("close door")

with contextlib.closing(Door()) as door:
print("some code")

4. 日期和时间

python 提供了三个时间相关的模块:

  • time:时间相关
  • datetime:日期时间相关
  • calendar:日期的计算与格式化

4.1 time

4.1.1 核心函数

核心的时间函数有:

  • time():返回以浮点数表示的时间(从 19700101 算起)
  • ctime():返回时间的格式化表示
1
2
3
4
5
6
7
8
9
10
11
12
import time

print(time.time())
print(time.ctime())

# 时间运算
print(time.ctime(
time.time() + 15
))
# 1613917707.6227214
# Sun Feb 21 22:28:27 2021
# Sun Feb 21 22:28:42 2021

4.1.2 其他函数

函数 作用
time.monotonic 单调时钟
time.clock 处理器时钟
time.gmtime 以 UTC 格式返回时间结构体
time.localtime 返回当前时间时间结构体
time.mktime 把结构体转换成浮点数表示
time.strptime 把时间字符串解析成时间结构体
time.strftime 把时间结构体表示成字符串

4.2 datetime

作用
datetime.time 时间类(一天以内)
datetime.date 日期类
datetime.timedelta 时间差类
datetime.datetime 时间日期类

5. 数学运算

数学运算多用其他模块提供的工具(如 numpy 和 scipy),只看一些最基本的函数

5.1 随机数

random 模块提供了一系列随机数生成工具

函数 作用
random.random [0,1)随机数生成
random.uniform 生成范围内随机浮点数
random.seed 指定种子
random.getstate 获取伪随机数生成器状态
random.setstate 设置伪随机数生成器状态
random.randint 生成范围内随机整数
random.range 生成间隔区间随机数
random.choice 随机选择序列内元素
random.choices 按照预设概率随机选择序列内元素
random.shuffle 打乱一个序列
random.sample 无随机不修改原序列采样
random.normalvariate 生成正态分布随机数
random.gauss 生成高斯分布随机数

另外
然后可以生成多个 random.Random 类的实例作为随机数生成器,使其互不干扰

5.2 数学工具

math 包提供了一些常量值,以及一些工具函数,如舍入函数,截断函数等

函数 作用
math.trunc 截断小数部分
math.floor 不大于其的最大整数
math.ceil 不小于其的最大整数

6. 文件系统

关于文件的一些原生操作

6.1 os.path:平台独立的文件名管理

函数 作用
path.split 把路径分为文件夹路径和文件名
path.dirname 提取路径的文件夹路径
path.basename 提取路径的文件名
path.splitext 把路径分割为文件名路径和扩展名部分
path.commonprefix 提取多个路径的公共前缀
path.commonpath 提取多个路径的公共路径部分
path.join 使用路径分隔符连接多个路径片段
path.normpath 规范化路径
path.abspath 相对路径转绝对路径
path.getatime 获取最后访问时间
path.getmtime 获取最后修改时间
path.getctime 获取创建时间
path.isabs 是否为绝对路径
path.isfile 是否为文件
path.isdir 是否为文件夹
path.islink 是否为链接
path.ismount 是否为挂载点(linux)
path.exists 是否存在
path.lexists 链接指向文件是否存在

_os.path 对于文件夹路径和文件名的分割基于最后一个路径分隔符_

6.2 pathlib:把路径作为对象

一般最常用的就是 Path 类

6.2.1 连接操作符

/操作符可以用来连接把多个 pathlib 对象或者对象与字符串

1
2
3
4
5
6
7
import pathlib

home_root = pathlib.Path("/home")
print(home_root / "yishiyu")
print(home_root / pathlib.Path("yishiyu"))
# \home\yishiyu
# \home\yishiyu

6.2.2 工具函数

函数 作用
pathlib.Path.resolve 规范化(包括解析链接路径)
pathlib.Path.joinpath 连接多个路径片段
pathlib.Path.with_name 替换文件名部分
pathlib.Path.with_suffix 替换扩展名部分
pathlib.Path.parts 分解路径成片段
pathlib.Path.iterdir 目录迭代器
pathlib.Path.glob 正则匹配内容的目录迭代器
pathlib.Path.write_bytes 写入字节序列
pathlib.Path.write_text 写入文本
pathlib.Path.read_bytes 读取字节序列
pathlib.Path.read_text 读取文本
pathlib.Path.open 打开文件并保存句柄
pathlib.Path.mkdir 创建目录
pathlib.Path.touch 创建文件
pathlib.Path.rmdir 删除空目录
pathlib.Path.unlink 删除其他文件相关内容
pathlib.Path.is_xxx 测试函数系列
pathlib.Path.stat 获取文件信息(stat_result)
属性 作用
pathlib.Path.parents 上级文件夹列表(直到根目录)
pathlib.Path.name 文件名
pathlib.Path.suffix 扩展名
pathlib.Path.stm 不带扩展名的文件名
os.stat_result.st_ctime 最后创建时间
os.stat_result.st_mtime 最后修改时间
os.stat_result.st_atime 最后访问时间

6.3 tempfile:临时文件系统对象

  • tempfile.TemporaryFile:创建一个无名临时文件,临时文件在被关闭的时候会直接被删除(close 或者上下文管理器关闭)
  • tempfile.NamedTemporaryFile:创建一个具名临时文件,在句柄关闭后被删除
  • tempfile.TemporaryDirectory:创建一个临时文件夹,在退出 with 语句块后删除
  • tempfile.gettmpdir:获取临时文件夹所在位置
  • tempfile.gettempprefix:获取临时文件夹前缀

6.4 shutil:高层文件操作

函数 作用
shutil.copyfile 拷贝文件内容
shutil.copy 拷贝文件
shutil.copy2 带元信息拷贝文件
shutil.copymode 拷贝操作权限
shutil.copystat 拷贝元信息
shutil.copytree 递归拷贝目录
shutil.rmtree 递归删除一个目录
shutil.move 移动一个文件或者目录

7. 数据持久化存储与交换

7.1 pickle:对象串行化

函数 作用
pickle.dump 序列化并写入文件
pickle.dumps 序列化为字节序列
pickle.load 读取文件并反序列化
pickle.loads 反序列化字节序列

同时一些对象是不适合进行序列化的,比如随机数生成器对象,如果目标对象有类似getstatesetstate的函数,可以直接序列化保存其状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import pickle

data_1 = ['a','b',['c','d']]

data_string = pickle.dumps(data_1)
data_2 = pickle.loads(data_string)

print(data_1)
print(data_string)
print(data_2)
print("data_1 == data_2 ? :{}".format(data_1==data_2))
# ['a', 'b', ['c', 'd']]
# b'\x80\x04\x95\x19\x00\x00\x00\x00\x00\x00\x00]\x94(\x8c\x01a\x94\x8c\x01b\x94]\x94(\x8c\x01c\x94\x8c\x01d\x94ee.'
# ['a', 'b', ['c', 'd']]
# data_1 == data_2 ? :True

7.2 shelve:对象的持久化存储

在不使用关系型数据库的时候,可以使用 shelve 存储数据
shelve 使用 dbm 非关系型数据库来存储数据
shelve 不支持并发写入,但是支持并发读取(在打开时指定’flag=r’)
同时 shelve 不支持可变子元素的修改.其存储的元素,如果任何一个子元素发生改变,则需要重新存储整个元素
可以通过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import shelve

with shelve.open('test_shelf.db') as s:
s['key'] = {
'int': 10,
'float': 9.5,
'string': 'a string'
}

with shelve.open('test_shelf.db') as s:
print(s['key'])
# {'int': 10, 'float': 9.5, 'string': 'a string'}
# 这一步会不会修改数据库
s['key']['int'] = 99
print(s['key']['int'])
temp = s['key']
temp['int'] = 99
s['key'] = temp
print(s['key']['int'])
# 10
# 99

# 开启自动写回
with shelve.open('test_shelf.db', writeback=True) as s:
# 开启自动写回后这一步会修改数据库
s['key']['int'] = 999
print(s['key']['int'])
# 999

7.3 sqlite3:嵌入式关系数据库

sqlite 是一个无服务器的轻量级服务器
本体只有一个文件用来存储数据,通常用来存储一些软件内部的数据

除了下列列出的方法,还可以自定义一些非 sqlite 原生的数据类型,比如日期,序列化对象之类的,还有一些其他内容内容可以参考其他资料

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import os
import csv
import sqlite3

db_filename = 'test.db'

# 如果目标文件不存在,则会自动创建
# 如果提供的名字为 `:memory:` 则会创建一个内存文件
with sqlite3.connect(db_filename) as conn:
# =================================================
# -------------------- 执行命令 --------------------
# =================================================
conn.executescript("""
被执行的命令
""")

# =================================================
# -------------------- 查询数据 --------------------
# =================================================
cursor = conn.cursor()
cursor.execute("""
查询命令
""")
# 获取单个结果
task_id, priority, details, status = cursor.fetchone()
# 获取多个结果
for row in cursor.fetchmany(5):
task_id, priority, details, status = row
# 获取所有结果
for row in cursor.fetchall():
task_id, priority, details, status = row
# 获取查询到数据的元信息
for colinfo in cursor.description:
print(colinfo)

# =================================================
# -------------------- 命令变量 --------------------
# =================================================
# 位置参数
project_name = "my_project"
query = """
select id, priority, details, status form task_id
where project = ?
"""
cursor.execute(query,(project_name))
# 具名参数
project_name = "my_project"
query = """
select id, priority, details, status form task_id
where project = :PROJECT_NAME
"""
cursor.execute(query,{'PROJECT_NAME':project_name})
# 批量带命令执行
with open("data_file") as csv_file:
csv_reader = csv.DictReader(csv_file)
cursor = conn.cursor()
cursor.executemany(query,csv_reader)

# =================================================
# --------------------- 事务 ----------------------
# =================================================
# 事务管理通常与try execpt共同使用
# 提交事务
conn.commit()
# 回滚事务
conn.rollback()

# =================================================
# -------------------- 隔离级别 --------------------
# =================================================
# 略 可以参考python3 标准库
#

# =================================================
# -------------------- 内容导出 --------------------
# =================================================
# 导出一些列命令用来重建数据库状态
# 以迭代器的形式导出
conn.iterdump()

# =================================================
# ------------------ 使用python函数 ----------------
# =================================================
def encrypt(data):
return "after encrypt:" + data
conn.create_function("encrypt",encrypt)
query = "update task sset details = encrypt(details)"
cursor.execute(query)

7.4 常用文件格式解析

python 有众多文件格式解析的库,永远不要自己拿正则匹配啥的手动搞

8. 并发

python 提供的并发的包有:

  • subprocess:创建子进程并与之通信
  • signal:unix 信号机制
  • threading:面先对象的高层 API
  • multiprocessing:threading 的进程版,每个进程都是真正的系统进程
  • asyncio:基于类的协议系统或写成,为并发和异步 IO 管理提供了框架
  • concurrent.futures:类似进程池的线程池

主要学习 threading,multiprocessingconcurrent.futures

8.1 threading

8.1.1 通用使用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import threading
import logging

# =================================================
# ------------------ 指定启动函数 -------------------
# =================================================

# 使用日志模块记录各个线程中的信息
# 同时把各种信息嵌入到日志中
logging.basicConfig(
level=logging.DEBUG,
format='[%(levelname)s] (%(threadName)-10s) %(message)s'
)


def worker():
logging.debug("starting")
print("{} is working".format(
threading.current_thread().getName()
))
logging.debug("exiting")


# 除非明确不需要,主线程最好保证自己有子线程的引用
threads = []
for i in range(3):
t = threading.Thread(
name='thread_{}'.format(i),
target=worker)
threads.append(t)
t.start()
# [DEBUG] (thread_0 ) starting
# thread_0 is working
# [DEBUG] (thread_1 ) starting
# thread_1 is working
# [DEBUG] (thread_0 ) exiting
# [DEBUG] (thread_2 ) starting
# thread_2 is working
# [DEBUG] (thread_1 ) exiting
# [DEBUG] (thread_2 ) exiting

# =================================================
# -------------------- 派生线程 --------------------
# =================================================


class MyThread(threading.Thread):
def run(self):
logging.debug('running')


t = MyThread(name="thread_4")
t.start()
# [DEBUG] (thread_4 ) running

# =================================================
# -------------------- 延时线程 --------------------
# =================================================


def delay():
logging.debug("i am late")


# 延时任务
t = threading.Timer(1, delay)
t.start()
# [DEBUG] (Thread-1 ) i am late

8.1.2 守护线程

如上,主程序在所有线程退出后退出
但是有的进程可能不会主动退出,如一些软件自带的垃圾清理线程,可以把这些进程设置为守护进程,使得主线程无需等待这些进程就可以直接退出
当然也可以显式声明等待守护进程结束

1
2
3
4
5
6
7
8
9
10
11
12
import threading

# 声明守护进程
t = threading.Thread(
target=worker,
daemon=True)

t.start()
# 显式声明等待守护进程
t.join()
# 设置等待守护进程时间上限(秒)
# t.join(3)

8.1.3 进程同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import threading
import logging

# 使用日志模块记录各个线程中的信息
# 同时把各种信息嵌入到日志中
logging.basicConfig(
level=logging.DEBUG,
format='[%(levelname)s] (%(threadName)-10s) %(message)s'
)

# =================================================
# -------------------- 事件通信 --------------------
# =================================================


def demo(e1, e2):
logging.debug("start")
e1.wait()
logging.debug("e1 is set")
e2.wait(3)
logging.debug("e2 ignored")
logging.debug("exiting")


e1 = threading.Event()
e2 = threading.Event()
t = threading.Thread(
name="demo thread",
target=demo,
args=(e1, e2)
)
t.start()
logging.debug("ready!")

e1.set()
# [DEBUG] (demo thread) start
# [DEBUG] (MainThread) ready!
# [DEBUG] (demo thread) e1 is set
# [DEBUG] (demo thread) e2 ignored
# [DEBUG] (demo thread) exiting

# =================================================
# ------------------- 线程锁同步 -------------------
# =================================================
# 普通不可重入锁
# lock = threading.Lock()
# 请求锁,得到后立即上锁
# lock.acquire()
# lock.acquire(超时值)
# 询问锁,如果已上锁则返回True,否则为False
# lock.locked()
# 释放锁
# lock.release()

# 可重入锁
# 同一个线程可以重复获取,但是需要释放同样次数
# lock = threading.RLock()

# =================================================
# ------------------- 线程锁技巧 -------------------
# =================================================
# 可以使用with语句管理上下文,以下两种方法等效
# with lock:
# logging.debug("something")
#
# lock.acquire()
# try:
# logging.debug("something")
# finally:
# lock.release()

# =================================================
# ------------------- 同步线程 --------------------
# =================================================
# condition 在wait的时候就会释放上下文中请求的锁


def consumer(cond):
with cond:
cond.wait()
logging.debug("resource is available to sonsumer")


def producer(cond):
with cond:
logging.debug("make resource available")
cond.notifyAll()


condition = threading.Condition()
c1 = threading.Thread(name="c1", target=consumer, args=(condition,))
c2 = threading.Thread(name="c2", target=consumer, args=(condition,))
p = threading.Thread(name="p", target=producer, args=(condition,))

c1.start()
c2.start()
p.start()
# [DEBUG] (p ) make resource available
# [DEBUG] (c1 ) resource is available to sonsumer
# [DEBUG] (c2 ) resource is available to sonsumer

# =================================================
# ------------------- 信号量机制 --------------------
# =================================================
# 信号量机制多用于连接池类似的东西
maxconnections = 5
pool_sema = threading.Semaphore(value=maxconnections)
with pool_sema:
# conn = connectdb()
try:
# ... use connection ...
pass
finally:
# conn.close()
pass

8.2 multiprocessing

multiprocessing 可以规避掉全局解释器锁(GIL)带来的瓶颈

8.2.1 通用使用方法

指定目标函数,构造派生,守护进程的使用方法都与线程类似,略过

同时由于与线程不同,进程试可以单独存在的,所有必要的时候需要将其杀死

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import multiprocessing
import time
import logging

logging.basicConfig(
level=logging.DEBUG,
format='[%(levelname)s] %(message)s'
)


def worker():
while True:
logging.debug("{} is working".format(
multiprocessing.current_process().__name__))
time.sleep(0.1)


if __name__ == "__main__":
multiprocessing.log_to_stderr(logging.DEBUG)
p = multiprocessing.Process()

p.start()

time.sleep(3)

# 分别是 SIGTERM 信号和 SIGKILL 信号
# SIGINT: 用户ctrl-c终止进程
# SIGTERM: 系统终止进程信号
# SIGKILL: 系统强制终止进程信号
p.terminate()
# p.kill()

# 等待目标进程终止
p.join()
# [INFO/Process-1] child process calling self.run()
# [INFO/Process-1] process shutting down
# [DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
# [DEBUG/Process-1] running the remaining "atexit" finalizers
# [INFO/Process-1] process exiting with exitcode 0
# [INFO/MainProcess] process shutting down
# [DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
# [DEBUG/MainProcess] running the remaining "atexit" finalizers

8.2.2 进程间通信

进程事件,锁机制,Condition,Semaphore 和线程事件类似,略过
进程间可以通过multiprocessing.Manager创建的特殊容器交换数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import multiprocessing


def worker(d, key, value):
d[key] = value


if __name__ == "__main__":
manager = multiprocessing.Manager()
# 创建共享数据结构
d = manager.dict()
jobs = [
multiprocessing.Process(
target=worker,
args=(d, i, i*2)
)
for i in range(5)
]

# 多个进程共用的数据结构
for j in jobs:
j.start()

for j in jobs:
j.join()

print(d)
# {1: 2, 0: 0, 3: 6, 4: 8, 2: 4}

8.2.3 进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import multiprocessing


def do_calculation(data):
return data*2


def start_process():
print("Starting", multiprocessing.current_process().name)


if __name__ == "__main__":
inputs = list(range(10))
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(
# 每个进程最多完成的任务数,执行完后重启,避免耗费太多资源
maxtasksperchild=2,
processes=pool_size,
initializer=start_process
)

# 使用多个进程执行计算
outputs = pool.map(do_calculation, inputs)
pool.close()
pool.join()

print(outputs)
# Starting SpawnPoolWorker-1
# Starting SpawnPoolWorker-2
# Starting SpawnPoolWorker-4
# Starting SpawnPoolWorker-6
# Starting SpawnPoolWorker-3
# Starting SpawnPoolWorker-5
# Starting SpawnPoolWorker-8
# Starting SpawnPoolWorker-7
# [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

8.3 concurrent.futures

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from concurrent import futures
import threading
import time


def task(n):
print("{} is working".format(threading.current_thread().name))
time.sleep(0.5)
print("{} finished".format(threading.current_thread().name))
return n/10


# 构造线程池
ex = futures.ThreadPoolExecutor(max_workers=3)

# =================================================
# --------------------- 大量调度 -------------------
# =================================================

# 使用线程执行计算
results = ex.map(task, range(5, 10))
print("waiting for the result")
real_results = list(results)
print(real_results)
# ThreadPoolExecutor-0_0 is working
# ThreadPoolExecutor-0_1 is working
# ThreadPoolExecutor-0_2 is working
# waiting for the result
# ThreadPoolExecutor-0_0 finished
# ThreadPoolExecutor-0_0 is working
# ThreadPoolExecutor-0_1 finished
# ThreadPoolExecutor-0_1 is working
# ThreadPoolExecutor-0_2 finished
# ThreadPoolExecutor-0_0 finished
# ThreadPoolExecutor-0_1 finished
# [0.5, 0.6, 0.7, 0.8, 0.9]

# =================================================
# --------------------- 单个调度 -------------------
# =================================================

# 提交单个任务
f = ex.submit(task, 10)
print(f.result())
# ThreadPoolExecutor-0_2 is working
# ThreadPoolExecutor-0_2 finished
# 1.0

# =================================================
# -------------------- 自动处理结果 -----------------
# =================================================
wait_for = [
ex.submit(task, i)
for i in range(5, 8)
]

# 完成任务后自动处理
for f in futures.as_completed(wait_for):
print("the result is {}".format(f.result()))
# ThreadPoolExecutor-0_2 is working
# ThreadPoolExecutor-0_0 is working
# ThreadPoolExecutor-0_1 is working
# ThreadPoolExecutor-0_0 finished
# ThreadPoolExecutor-0_1 finished
# the result is 0.6
# ThreadPoolExecutor-0_2 finished
# the result is 0.7
# the result is 0.5

# =================================================
# -------------------- 指定回调函数 -----------------
# =================================================


def done(fn: futures.Future):
if fn.cancelled():
print("{}:cancelled".format(fn.arg))
elif fn.done():
error = fn.exception()
if error:
print("{}: error returned".format(fn.arg))
else:
result = fn.result()
print("{}:value({}) returned".format(fn.arg, result))


f = ex.submit(task, 5)
f.arg = 5
f.add_done_callback(done)
# ThreadPoolExecutor-0_0 is working
# ThreadPoolExecutor-0_0 finished
# 5:value(0.5) returned

# =================================================
# --------------------- 撤销任务 -------------------
# =================================================
# Future对象支持撤销执行
# cancel函数返回一个布尔值,表示是否撤销成功
# f.cancel()
# cancelled() 查询任务是否被撤销

# =================================================
# --------------------- 错误处理 -------------------
# =================================================


def task_with_error(n):
print("{}:starting".format(n))
raise ValueError("value error")


f = ex.submit(task_with_error, 5)
try:
result = f.result()
except ValueError as e:
print("main:{} when assesing result".format(e))

# main:value error when assesing result

# =================================================
# --------------------- 上下文管理 ------------------
# =================================================
# 线程池可以结合上下文管理关键字:with
# with futures.ThreadPoolExecutor(max_workers=2) as ex:
# ex.submit(task,1)
# ex.submit(task,3)

# =================================================
# --------------------- 进程池调度 ------------------
# =================================================
# concurrent.futures是支持进程池调度的
# ex = futures.ProcessPoolExecutor(max_workers=2)
# 同时支持对进程的调度
# pid1 = os.getpid()
# os.kill(pid1,signal.SIGHUP)
# 如果进程被杀死或者意外退出
# ProcessPoolExecutor可以看成是意外中断,无法执行再调度任务
# 会触发 futures.process.BrokenProcessPool 异常
# try:
# result = f.result()
# except futures.process.BrokenProcessPool as e:
# print(e)