0. 概述 主要参考资料:
Python 3 Module of the Week | pymotw python 文档 | python 菜鸟教程 | runoob
重点看和科学计算相关的库,网络之类的库回头用到了再看
1. 文本格式化 1.1 基本方法 最基本的方法是使用 print 函数的格式化写法
1 print ("my name is %s,%d years old" % ('yishiyu' , 10 ))
其中占位符的含义为(参考自: 菜鸟教程 ):
占位符
含义
%c
字符及其 ascii 码
%s
字符串
%d
整数
%u
无符号整数
%o
无符号八进制数
%x
无符号十六进制小写
%X
无符号十六进制大写
%f
浮点数
%e
科学计数法浮点数
%p
十六进制变量地址
辅助指令
含义
-
左对齐
+
正数前显示加号
#
特殊进制前显示标志
0
默认填充 0 而非空格
%%
转义输出%
(var)
映射字典中的变量
m.n
最小位宽,小数精度
format 函数是 str 类的内置函数
首先可以使用带下标的占位符
1 2 s1 = "{} {}" .format ("hello" , "world" ) s2 = "{1} {0} {1}" .format ("hello" , "world" )
然后可以设置复杂的参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 print("网站名:{name}, 地址 {url}" .format (name="菜鸟教程" , url="www.runoob.com" )) site = {"name" : "菜鸟教程" , "url" : "www.runoob.com" } print("网站名:{name}, 地址 {url}" .format (**site)) my_list = ['菜鸟教程' , 'www.runoob.com' ] print("网站名:{0[0]}, 地址 {0[1]}" .format (my_list)) class AssignValue (object ): def __init__ (self, value ): self.value = value my_value = AssignValue(6 ) print('value 为: {0.value}' .format (my_value)) print("{:.2f}" .format (3.1415926 ))
新增格式指令(基本格式与 1.1.1 中的一样):
指令格式
含义
^
居中对齐
<
左对齐
>
右对齐
-
显示正负号
空格
正数前填充空格
{{}}
转义大括号
数字
位宽
格式化指令
含义
{:.2f}
保留小数点后两位
{:+.2f}
带符号保留小数点后两位
{:+.2f}
带符号保留小数点后两位
{:.0f}
不带小数
{:0>2d}
数字补零 (填充左边, 宽度为 2)
{:x<4d}
数字补 x (填充右边, 宽度为 4)
{:x<4d}
数字补 x (填充右边, 宽度为 4)
{:,}
以逗号分隔的数字格式
{:.2%}
百分比格式
{:.2e}
指数记法
{:>10d}
右对齐 (默认, 宽度为 10)
{:<10d}
左对齐 (宽度为 10)
{:^10d}
中间对齐 (宽度为 10)
1.3 pprint 包 可以使用 pprint 包美观地打印数据结构
函数
作用
pprint.pprint
美观地打印数据结构
pprint.pformat
获取美观打印的字符串(可以用于写入日志)
两个函数的完整函数声明:
1 2 pprint.pprint(object , stream=None , indent=1 , width=80 , depth=None , *, compact=False , sort_dicts=True ) pprint.pformat(object , indent=1 , width=80 , depth=None , *, compact=False , sort_dicts=True )
参数
作用
object
被打印的对象
stream
打印的流,默认为 sys.stdout
width
打印宽度
depth
打印深度
compact
压缩模式
2. 数据结构 list
,tuple
,set
,dict
四种数据结构略过
2.1 Enum:枚举类型 2.1.1 普通枚举类型 1 2 3 4 5 6 import enumclass Status (enum.Enum ): s1 = 1 s2 = 2 s3 = 3
枚举在迭代的时候会按照声明的顺序生成,而不是值或名字 然后枚举值是无序的,不能比较大小,只能通过==
和is
运算符比较
2.1.2 整数枚举类型 通过继承IntEnum
,可以使得枚举可以比较大小
1 2 3 4 5 6 import enumclass Status (enum.IntEnum ): s1 = 1 s2 = 2 s3 = 3
2.1.3 唯一枚举 普通枚举的每个值可以对应多个枚举,可以通过装饰器实现唯一枚举
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import enumclass Status_1 (enum.Enum ): s1 = 1 s2 = 2 s3 = 3 sone = 1 @enum.unique class Status_1 (enum.Enum ): s1 = 1 s2 = 2 s3 = 3 sone = 1
2.2 ChainMap:连锁字典 连锁字典是一个扩展的字典,可以把多个字典串联起来
连锁字典不会创建组成其字典的拷贝,而是直接引用(更改会作用到原字典上)
连锁字典在搜索的时候会从第一个字典开始依次搜索所有字典,直到找到一个值
连锁字典的更改只会作用到第一个字典上(无论是修改还是增加)
可以根据原有的连锁字典创建新层或删除旧层
_总之,ChainMap 可以把字典像栈一样组织起来_
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 from collections import ChainMapdict_a = {'a' : 'A' , 'c' : 'C' } dict_b = {'b' : 'B' , 'c' : 'D' } m = ChainMap(dict_a, dict_b) print('key=value' ) for k, v in m.items(): print('{:^3}={:^3}' .format (k, v)) m['a' ] = 'AAA' m['e' ] = 'EEE' print(dict_a) dict_c = {'f' : 'FFF' } child_m_1 = m.new_child() child_m_2 = m.new_child(dict_c) print(child_m_1) print(child_m_2) parent_m = m.parents print(m) print(parent_m)
2.3 Counter:计数器 Counter 是一个对可迭代对象的元素进行计数的类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 from collections import Counterc = Counter('abbccc' ) print(c) c.update('deefff' ) print(c) c.update('aab' ) print(c) print(c['a' ]) for k in c.elements(): print('{}={}' .format (k, c[k])) c_1 = Counter('aaabbc' ) c_2 = Counter('bccddd' ) print(c_1+c_2) print(c_1-c_2) print(c_1 & c_2) print(c_1 | c_2)
2.4 defaultdict:缺省值字典 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from collections import defaultdicts = [('yellow' , 1 ), ('blue' , 2 ), ('yellow' , 3 ), ('blue' , 4 ), ('red' , 1 )] d = defaultdict(list ) for k, v in s: d[k].append(v) print(d.items()) def constant_factory (value ): return lambda : value d = defaultdict(constant_factory('<missing>' )) d.update(name='John' , action='ran' ) print('%(name)s %(action)s to %(object)s' % d) print('{name} {action} to {object}' .format (**d))
2.5 deque:双向队列 即是可以双向操作的队列(其实也可以进行随机操作)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 from collections import dequed = deque('aaabbc' ,maxlen=20 ) d.extend('r1' ) d.append('r2' ) d.extendleft('l1' ) d.appendleft('l2' ) print(d) print(d[3 ]) d.insert(3 ,'x' ) print(d[3 ]) d.remove('x' ) print(d[3 ]) t = None print(t,d) t = d.pop() print(t,d) t = d.popleft() print(t,d) print(d) d.rotate(2 ) print(d) d.rotate(-2 ) print(d)
2.6 namedtuple:具名元组 简而言之,namedtuple 可以用作快速定义简单的结构体
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from collections import namedtuplePerson = namedtuple('Person' , ['name' , 'age' ]) bob = Person('Bob' , 30 ) jane = Person('Jane' , age=29 ) print(bob.name) for name, age in [bob, jane]: print('{} is {} years old' .format (name, age))
2.7 OrderedDict:有序字典
首先 OrderedDict 会记住键值对插入的顺序(更新键值对不会改变顺序)
然后再进行相等性比较的时候,OrderedDict 会额外对比插入顺序(在键值对相等的基础上)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 from collections import OrderedDictd = OrderedDict() d['a' ] = 'A' d['b' ] = 'B' d['c' ] = 'C' d['d' ] = 'D' for k,v in d.items(): print('{}:{}' .format (k,v)) d['c' ] = 'T' for k,v in d.items(): print('{}:{}' .format (k,v)) d.move_to_end('c' ) for k,v in d.items(): print('{}:{}' .format (k,v))
2.8 array:数组 可以把 array 看作只能存储一种数据类型的 list
代码
类型
最低大小
b
int
1
B
int
1
h
signed short
2
H
unsigned short
2
i
signed int
2
I
unsigned int
2
l
signed long
4
L
unsigned long
4
q
signed long long
8
Q
unsigned long long
8
f
float
4
d
double
8
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 import tempfilefrom array import arrayimport binasciis = b'this is the array' a = array('b' , s) print('as byte string:' , s) print('as array :' , a) print('as hex :' , binascii.hexlify(a)) a = array('i' , [1 , 2 , 3 ]) print(a) a.append(6 ) a.extend([777 , 888 ]) print(a) a = array('i' , range (7 )) output = tempfile.NamedTemporaryFile() print(a) a.tofile(output.file) output.flush() with open (output.name, 'rb' ) as input : raw_data = input .read() print(binascii.hexlify(raw_data)) input .seek(0 ) b = array('i' ) b.fromfile(input , len (a)) print(b)
2.9 heapq:堆排序算法 heapq 实现了一个最小堆
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 import heapqimport mathfrom io import StringIOdef show_tree (tree, total_width=36 , fill=' ' ): output = StringIO() last_row = -1 for i, n in enumerate (tree): if i: row = int (math.floor(math.log(i+1 , 2 ))) else : row = 0 if row != last_row: output.write('\n' ) collumns = 2 **row col_width = int (math.floor(total_width / collumns)) output.write(str (n).center(col_width, fill)) last_row = row print(output.getvalue()) print('-' * total_width) print() data = [19 , 9 , 4 , 10 , 11 ] heap = [] for n in data: print('add {:>3}:' .format (n)) heapq.heappush(heap, n) show_tree(heap) heapq.heapify(data) show_tree(data) top = heapq.heappop(data) print(top) show_tree(data) top = heapq.heapreplace(data, 99 ) print(top) show_tree(data) print(heapq.nlargest(2 , data)) print(heapq.nsmallest(2 , data)) data = [ [33 , 55 , 77 , 99 , 44 ], [66 , 45 , 78 , 24 , 68 ], [95 , 47 , 85 , 31 , 41 ] ] for l in data: heapq.heapify(l) print(data) heap = heapq.merge(*data) for i in heap: print(i, end=' ' )
2.10 bisect:有序列表算法 bisect 提供了基于二分法的有序列表查找和插入 其分别提供了两个方法来进行查找插入位置和插入(默认升序):
函数
作用
biset_left
查找最左的适合插入的位置
biset_right
查找最右的适合插入的位置
biset
同 biset_right
insort_left
在最左合适位置插入
insort_right
在最右合适位置插入
insort
同 insort_right
可以用这几个方法组合出一些自定义的工具
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from bisect import bisect_left, bisect_rightdef index (a, x ): 'Locate the leftmost value exactly equal to x' i = bisect_left(a, x) if i != len (a) and a[i] == x: return i raise ValueError def find_lt (a, x ): 'Find rightmost value less than x' i = bisect_left(a, x) if i: return a[i-1 ] raise ValueError def find_le (a, x ): 'Find rightmost value less than or equal to x' i = bisect_right(a, x) if i: return a[i-1 ] raise ValueError def find_gt (a, x ): 'Find leftmost value greater than x' i = bisect_right(a, x) if i != len (a): return a[i] raise ValueError def find_ge (a, x ): 'Find leftmost item greater than or equal to x' i = bisect_left(a, x) if i != len (a): return a[i] raise ValueErrors
2.11 queue:线程安全队列 queue 一共提供了三个队列:
Queue:普通的 FIFO 队列
LifoQueue:FILO 队列(栈,略)
PriorityQueue:优先队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 import queueq = queue.Queue(maxsize=0 ) for i in range (5 ): q.put(i,block=False ,timeout=None ) while not q.empty(): print(q.get(block=False ,timeout=None ),end=' ' ) print() import threadingq = queue.Queue() def worker (): while True : item = q.get() print(f'Working on {item} ' ) print(f'Finished {item} ' ) q.task_done() threading.Thread(target=worker, daemon=True ).start() for item in range (30 ): q.put(item) print('All task requests sent\n' , end='' ) q.join() print('All work completed' ) from dataclasses import dataclass, fieldfrom typing import Any@dataclass(order=True ) class PrioritizedItem : priority: int item: Any=field(compare=False ) pri = [5 ,8 ,4 ,6 ,2 ] data = [1 ,2 ,3 ,4 ,5 ] q = queue.PriorityQueue() for p,d in zip (pri,data): q.put(PrioritizedItem(p,d)) while not q.empty(): t = q.get() print(t) import functools@functools.total_ordering class PrioritizedClass : def __init__ (self,priority,item ): self.priority = priority self.item = item def show (self ): print('PrioritizedClass(priority={}, item={})' .format (self.priority,self.item)) def __eq__ (self,other ): try : return self.priority == other.priority except AttributeError: return NotImplemented def __lt__ (self,other ): try : return self.priority < other.priority except AttributeError: return NotImplemented pri = [5 ,8 ,4 ,6 ,2 ] data = [1 ,2 ,3 ,4 ,5 ] q = queue.PriorityQueue() for p,d in zip (pri,data): q.put(PrioritizedClass(p,d)) while not q.empty(): t = q.get() t.show()
2.12 copy:复制 2.12.1 copy.copy:浅复制 浅复制会创建一个浅副本(shallow copy) 浅副本可以看成是一个指向原对象的引用 如果建立 list 对象的浅副本,则会创建一个新的 list,其中的元素为原 list 中元素的引用
2.12.2 copy.deepcopy():深复制 深复制会完全复制一份对象,对于 list,则会把其中的每个对象都复制一份 可以通过自定义一个类的__copy__
和__deepcopy__
方法自定义类的复制行为
3. 语言工具 partial 是一个用来固定函数参数的工具
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import functoolsdef func (a, b, c=4 ): print('called with ({},{},{})' .format (a, b, c)) func(1 , 2 ) func(8 , 7 , 6 ) func_p = functools.partial(func, 9 , c=7 ) func_p(8 ) func_p('b' , c='c' )
为了使得一个类支持比较,需要实现__lt__(),__le__(),__eq__(),__gt__(),__ge__()
方法,tool_ordering 工具提供了一个简略方法,只需要实现__eq__()
和另一个比较方法,tool_ordering 会自动生成其他方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import functools@functools.total_ordering class Student : def __init__ (self, lastname, firstname ): self.lastname = lastname self.firstname = firstname def _is_valid_operand (self, other ): return (hasattr (other, "lastname" ) and hasattr (other, "firstname" )) def __eq__ (self, other ): if not self._is_valid_operand(other): return NotImplemented return ((self.lastname.lower(), self.firstname.lower()) == (other.lastname.lower(), other.firstname.lower())) def __lt__ (self, other ): if not self._is_valid_operand(other): return NotImplemented return ((self.lastname.lower(), self.firstname.lower()) < (other.lastname.lower(), other.firstname.lower())) john = Student('john' , 'aaa' ) bob = Student('bob' , 'bbb' ) print(john == bob) print(john > bob)
lru 即 Last Recent Used(最近使用) 以装饰器的方式为一个函数添加缓存功能,条件为参数必须可以散列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import functools@functools.lru_cache(maxsize=128 ,type =False ) def expensive (a, b ): print('expensive({},{})' .format (a, b)) return a*b MAX = 3 for i in range (MAX): for j in range (MAX): expensive(i, j) print(expensive.cache_info()) for i in range (MAX): for j in range (MAX): expensive(i, j) print(expensive.cache_info()) expensive.cache_clear() print(expensive.cache_info())
reduce 可以依次处理可迭代对象中的每个元素 如果既没有初始值,可迭代对象也为空,则会引发 TypeError
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import functoolsdef do_reduce (a,b ): print('do_reduce({},{})' .format (a,b)) return a+b data = range (5 ) result = functools.reduce(do_reduce,data) print(result) result = functools.reduce(do_reduce,data,10 ) print(result)
3.2.1 合并迭代器 3.2.1.1 串行合并 itertools.chain 能把多个可迭代对象串行连接
1 2 3 4 5 6 7 8 9 10 11 12 import itertoolsa=[1 ,2 ,3 ] b=[4 ,5 ,6 ] temp = [a,b] c=itertools.chain(a,b) print(list (c)) c=itertools.chain(*temp) print(list (c))
3.2.1.2 并行合并 内置的 zip 函数可以把多个可迭代对象并行连接
1 2 3 4 5 6 7 8 9 10 11 import itertoolsa = [1 , 2 , 3 ] b = [4 , 5 , 6 , 7 ] c = zip (a, b) print(list (c)) c = itertools.zip_longest(a, b) print(list (c))
3.2.2 转换输入 map 可以完成一个从多个集合到一个集合的映射
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 a = range (5 ) b = range (5 , 10 ) c = range (10 , 15 ) d = range (15 , 20 ) def add (a, b, c, d ): return (a, b, c, d, a+b+c+d) for i in map (add, a, b, c, d): print('{}+{}+{}+{}={}' .format (*i))
3.2.3 循环重复 itertools.cycle 可以不断循环一个迭代器
1 2 3 4 5 6 7 import itertoolsa = range (5 ) print(list (zip (range (13 ), itertools.cycle(a))))
itertools.repeat 可以不断重复一个值(略) 等同于 cycle([p])
3.2.4 过滤 itertools.takewhile 返回不满足条件之前 的所有内容 itertools.dropwhile 返回不满足条件元素之后(包括本身) 的所有内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import itertoolsa = [1 ,2 ,3 ,4 ,3 ,2 ,1 ] print(list ( itertools.takewhile( lambda x: x < 3 , a ) )) print(list ( itertools.dropwhile( lambda x: x < 3 , a ) ))
filter 返回所有满足条件的内容 itertools.filterfalse 返回所有不满足条件的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import itertoolsa = [1 ,2 ,3 ,4 ,3 ,2 ,1 ] print(list ( filter ( lambda x: x < 3 , a ) )) print(list ( itertools.filterfalse( lambda x: x < 3 , a ) ))
itertools.compress 根据选择器中的元素决定是否过滤
1 2 3 4 5 6 7 8 9 10 11 12 import itertoolsdata = [1 , 2 , 3 , 4 , 3 , 2 , 1 ] selectors = itertools.cycle([True , False ]) print(list ( itertools.compress( data, selectors ) ))
3.2.5 分组 itertools.groupby 可以对一个可迭代对象根据一定条件分组
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 import functoolsfrom itertools import *import operatorimport pprint@functools.total_ordering class Point : def __init__ (self, x, y ): self.x = x self.y = y def __repr__ (self ): return '({}, {})' .format (self.x, self.y) def __eq__ (self, other ): return (self.x, self.y) == (other.x, other.y) def __gt__ (self, other ): return (self.x, self.y) > (other.x, other.y) data = list (map (Point, cycle([0 ,1 ,2 ]), list (range (7 )))) print('Data:' ) pprint.pprint(data, width=35 ) print() print('Grouped, unsorted:' ) for k, g in groupby(data, operator.attrgetter('x' )): print(k, list (g)) print() data.sort() print('Grouped, sorted:' ) for k, g in groupby(data, operator.attrgetter('x' )): print(k, list (g)) print()
3.2.6 组合输入 itertools.product 可以把获取多个可迭代对象的笛卡尔积
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import itertoolsimport pprintSUITS = {'H' , 'D' , 'C' , 'S' } FACE = {'J' , 'Q' , 'K' , 'A' } result = itertools.product(SUITS,FACE) pprint.pprint(list (result),width=20 )
itertools.permutations 可以把一个可迭代对象中的元素的所有排列
1 2 3 4 5 6 7 8 9 10 11 12 13 import itertoolsimport pprintdata = 'abcd' pprint.pprint(list (itertools.permutations(data,3 )), width=40 )
itertools.combinations 可以把一个可迭代对象中的元素的所有组合
1 2 3 4 5 6 7 8 9 10 import itertoolsimport pprintdata = 'abcd' pprint.pprint(list (itertools.combinations(data,3 )), width=40 )
itertools.combinations_with_replacement 增加了自己与自己配对
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import itertoolsimport pprintdata = 'abcd' pprint.pprint(list ( itertools.combinations_with_replacement(data, 2 ) ), width=40 )
3.3 contextlib 3.3.1 上下文管理器 上下文管理器即通过 with 关键字进行一些关键资源的自动管理
基本的管理可以通过如下方式定义上下文类完成 其中__exit__
可以接受一个异常,如果可以处理这个异常,则返回False
以阻止异常继续传播,否则返回True
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class Context (): def __init__ (self ): print("__init__" ) def __enter__ (self ): print("__enter__" ) def __exit__ (self,exc_type,exc_val,exc_tb ): print("exit" ) with Context(): print("do something" )
然后可以通过在上下文开启的时候获取一个资源对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class Resource (): def __init__ (self, descript ): self.descript = descript def show (self ): print(self.descript) class Context (): def __init__ (self ): print("__init__" ) def __enter__ (self ): print("__enter__" ) return Resource("a Resource" ) def __exit__ (self, exc_type, exc_val, exc_tb ): print("exit" ) with Context() as resource: print("do something" ) resource.show()
3.3.2 contextlib.ContextDecorator 接下来是 contextlib 的使用方法和优点 首先可以继承其中的装饰器类,用来装饰函数(有点像 Spring 中的面向切片编程 AOP)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import contextlibclass Context (contextlib.ContextDecorator ): def __init__ (self, desc ): print("context init" ) self.desc = desc def __enter__ (self ): print("context enter with {}" .format (self.desc)) def __exit__ (self, exc_type, exc_val, exc_tb ): print("context exit" ) @Context("the desc" ) def func (): print("in function" ) func()
3.3.3 contextlib 然后是使用其中的管理器装饰器对象装饰一个函数 简化构造上下文管理器的过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import contextlib@contextlib.contextmanager def make_context (): print(" entering" ) try : yield {} except RuntimeError as err: print(" error:{}" .format (err)) finally : print(" exiting" ) print("Nomal:" ) with make_context() as value: print(" inside with statement:" , value) print("\nhandled error:" ) with make_context() as value: raise RuntimeError("runtime error example" ) print("\nunhandled error:" ) with make_context() as value: raise ValueError("value error example" )
3.3.4 conntextlib API 有的系统资源关闭的方式比较统一,比如资源关闭 API 很多都是close()
方法 contextlib 提供了一些 API 来对类似资源进行管理(当然这些资源本身有本身的资源管理器可以用) 无论 with 代码块内部是否发生异常,资源都会被安全关闭
1 2 3 4 5 6 7 8 9 10 11 import contextlibclass Door (): def __init__ (self ): print("open Door" ) def close (self ): print("close door" ) with contextlib.closing(Door()) as door: print("some code" )
4. 日期和时间 python 提供了三个时间相关的模块:
time:时间相关
datetime:日期时间相关
calendar:日期的计算与格式化
4.1 time 4.1.1 核心函数 核心的时间函数有:
time():返回以浮点数表示的时间(从 19700101 算起)
ctime():返回时间的格式化表示
1 2 3 4 5 6 7 8 9 10 11 12 import timeprint(time.time()) print(time.ctime()) print(time.ctime( time.time() + 15 ))
4.1.2 其他函数
函数
作用
time.monotonic
单调时钟
time.clock
处理器时钟
time.gmtime
以 UTC 格式返回时间结构体
time.localtime
返回当前时间时间结构体
time.mktime
把结构体转换成浮点数表示
time.strptime
把时间字符串解析成时间结构体
time.strftime
把时间结构体表示成字符串
4.2 datetime
类
作用
datetime.time
时间类(一天以内)
datetime.date
日期类
datetime.timedelta
时间差类
datetime.datetime
时间日期类
5. 数学运算 数学运算多用其他模块提供的工具(如 numpy 和 scipy),只看一些最基本的函数
5.1 随机数 random 模块提供了一系列随机数生成工具
函数
作用
random.random
[0,1)随机数生成
random.uniform
生成范围内随机浮点数
random.seed
指定种子
random.getstate
获取伪随机数生成器状态
random.setstate
设置伪随机数生成器状态
random.randint
生成范围内随机整数
random.range
生成间隔区间随机数
random.choice
随机选择序列内元素
random.choices
按照预设概率随机选择序列内元素
random.shuffle
打乱一个序列
random.sample
无随机不修改原序列采样
random.normalvariate
生成正态分布随机数
random.gauss
生成高斯分布随机数
另外 然后可以生成多个 random.Random 类的实例作为随机数生成器,使其互不干扰
5.2 数学工具 math 包提供了一些常量值,以及一些工具函数,如舍入函数,截断函数等
函数
作用
math.trunc
截断小数部分
math.floor
不大于其的最大整数
math.ceil
不小于其的最大整数
6. 文件系统 关于文件的一些原生操作
6.1 os.path:平台独立的文件名管理
函数
作用
path.split
把路径分为文件夹路径和文件名
path.dirname
提取路径的文件夹路径
path.basename
提取路径的文件名
path.splitext
把路径分割为文件名路径和扩展名部分
path.commonprefix
提取多个路径的公共前缀
path.commonpath
提取多个路径的公共路径部分
path.join
使用路径分隔符连接多个路径片段
path.normpath
规范化路径
path.abspath
相对路径转绝对路径
path.getatime
获取最后访问时间
path.getmtime
获取最后修改时间
path.getctime
获取创建时间
path.isabs
是否为绝对路径
path.isfile
是否为文件
path.isdir
是否为文件夹
path.islink
是否为链接
path.ismount
是否为挂载点(linux)
path.exists
是否存在
path.lexists
链接指向文件是否存在
_os.path 对于文件夹路径和文件名的分割基于最后一个路径分隔符_
6.2 pathlib:把路径作为对象
一般最常用的就是 Path 类
6.2.1 连接操作符 /
操作符可以用来连接把多个 pathlib 对象或者对象与字符串
1 2 3 4 5 6 7 import pathlibhome_root = pathlib.Path("/home" ) print(home_root / "yishiyu" ) print(home_root / pathlib.Path("yishiyu" ))
6.2.2 工具函数
函数
作用
pathlib.Path.resolve
规范化(包括解析链接路径)
pathlib.Path.joinpath
连接多个路径片段
pathlib.Path.with_name
替换文件名部分
pathlib.Path.with_suffix
替换扩展名部分
pathlib.Path.parts
分解路径成片段
pathlib.Path.iterdir
目录迭代器
pathlib.Path.glob
正则匹配内容的目录迭代器
pathlib.Path.write_bytes
写入字节序列
pathlib.Path.write_text
写入文本
pathlib.Path.read_bytes
读取字节序列
pathlib.Path.read_text
读取文本
pathlib.Path.open
打开文件并保存句柄
pathlib.Path.mkdir
创建目录
pathlib.Path.touch
创建文件
pathlib.Path.rmdir
删除空目录
pathlib.Path.unlink
删除其他文件相关内容
pathlib.Path.is_xxx
测试函数系列
pathlib.Path.stat
获取文件信息(stat_result)
属性
作用
pathlib.Path.parents
上级文件夹列表(直到根目录)
pathlib.Path.name
文件名
pathlib.Path.suffix
扩展名
pathlib.Path.stm
不带扩展名的文件名
os.stat_result.st_ctime
最后创建时间
os.stat_result.st_mtime
最后修改时间
os.stat_result.st_atime
最后访问时间
6.3 tempfile:临时文件系统对象
tempfile.TemporaryFile:创建一个无名临时文件,临时文件在被关闭的时候会直接被删除(close 或者上下文管理器关闭)
tempfile.NamedTemporaryFile:创建一个具名临时文件,在句柄关闭后被删除
tempfile.TemporaryDirectory:创建一个临时文件夹,在退出 with 语句块后删除
tempfile.gettmpdir:获取临时文件夹所在位置
tempfile.gettempprefix:获取临时文件夹前缀
6.4 shutil:高层文件操作
函数
作用
shutil.copyfile
拷贝文件内容
shutil.copy
拷贝文件
shutil.copy2
带元信息拷贝文件
shutil.copymode
拷贝操作权限
shutil.copystat
拷贝元信息
shutil.copytree
递归拷贝目录
shutil.rmtree
递归删除一个目录
shutil.move
移动一个文件或者目录
7. 数据持久化存储与交换 7.1 pickle:对象串行化
函数
作用
pickle.dump
序列化并写入文件
pickle.dumps
序列化为字节序列
pickle.load
读取文件并反序列化
pickle.loads
反序列化字节序列
同时一些对象是不适合进行序列化的,比如随机数生成器对象,如果目标对象有类似getstate
和setstate
的函数,可以直接序列化保存其状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import pickledata_1 = ['a' ,'b' ,['c' ,'d' ]] data_string = pickle.dumps(data_1) data_2 = pickle.loads(data_string) print(data_1) print(data_string) print(data_2) print("data_1 == data_2 ? :{}" .format (data_1==data_2))
7.2 shelve:对象的持久化存储 在不使用关系型数据库的时候,可以使用 shelve 存储数据 shelve 使用 dbm 非关系型数据库来存储数据 shelve 不支持并发写入,但是支持并发读取(在打开时指定’flag=r’) 同时 shelve 不支持可变子元素的修改.其存储的元素,如果任何一个子元素发生改变,则需要重新存储整个元素 可以通过
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import shelvewith shelve.open ('test_shelf.db' ) as s: s['key' ] = { 'int' : 10 , 'float' : 9.5 , 'string' : 'a string' } with shelve.open ('test_shelf.db' ) as s: print(s['key' ]) s['key' ]['int' ] = 99 print(s['key' ]['int' ]) temp = s['key' ] temp['int' ] = 99 s['key' ] = temp print(s['key' ]['int' ]) with shelve.open ('test_shelf.db' , writeback=True ) as s: s['key' ]['int' ] = 999 print(s['key' ]['int' ])
7.3 sqlite3:嵌入式关系数据库 sqlite 是一个无服务器的轻量级服务器 本体只有一个文件用来存储数据,通常用来存储一些软件内部的数据
除了下列列出的方法,还可以自定义一些非 sqlite 原生的数据类型,比如日期,序列化对象之类的,还有一些其他内容内容可以参考其他资料
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 import osimport csvimport sqlite3db_filename = 'test.db' with sqlite3.connect(db_filename) as conn: conn.executescript(""" 被执行的命令 """ ) cursor = conn.cursor() cursor.execute(""" 查询命令 """ ) task_id, priority, details, status = cursor.fetchone() for row in cursor.fetchmany(5 ): task_id, priority, details, status = row for row in cursor.fetchall(): task_id, priority, details, status = row for colinfo in cursor.description: print(colinfo) project_name = "my_project" query = """ select id, priority, details, status form task_id where project = ? """ cursor.execute(query,(project_name)) project_name = "my_project" query = """ select id, priority, details, status form task_id where project = :PROJECT_NAME """ cursor.execute(query,{'PROJECT_NAME' :project_name}) with open ("data_file" ) as csv_file: csv_reader = csv.DictReader(csv_file) cursor = conn.cursor() cursor.executemany(query,csv_reader) conn.commit() conn.rollback() conn.iterdump() def encrypt (data ): return "after encrypt:" + data conn.create_function("encrypt" ,encrypt) query = "update task sset details = encrypt(details)" cursor.execute(query)
7.4 常用文件格式解析 python 有众多文件格式解析的库,永远不要自己拿正则匹配啥的手动搞
8. 并发 python 提供的并发的包有:
subprocess:创建子进程并与之通信
signal:unix 信号机制
threading:面先对象的高层 API
multiprocessing:threading 的进程版,每个进程都是真正的系统进程
asyncio:基于类的协议系统或写成,为并发和异步 IO 管理提供了框架
concurrent.futures:类似进程池的线程池
主要学习 threading
,multiprocessing
和concurrent.futures
8.1 threading 8.1.1 通用使用方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 import threadingimport logginglogging.basicConfig( level=logging.DEBUG, format ='[%(levelname)s] (%(threadName)-10s) %(message)s' ) def worker (): logging.debug("starting" ) print("{} is working" .format ( threading.current_thread().getName() )) logging.debug("exiting" ) threads = [] for i in range (3 ): t = threading.Thread( name='thread_{}' .format (i), target=worker) threads.append(t) t.start() class MyThread (threading.Thread ): def run (self ): logging.debug('running' ) t = MyThread(name="thread_4" ) t.start() def delay (): logging.debug("i am late" ) t = threading.Timer(1 , delay) t.start()
8.1.2 守护线程 如上,主程序在所有线程退出后退出 但是有的进程可能不会主动退出,如一些软件自带的垃圾清理线程,可以把这些进程设置为守护进程,使得主线程无需等待这些进程就可以直接退出 当然也可以显式声明等待守护进程结束
1 2 3 4 5 6 7 8 9 10 11 12 import threadingt = threading.Thread( target=worker, daemon=True ) t.start() t.join()
8.1.3 进程同步 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 import threadingimport logginglogging.basicConfig( level=logging.DEBUG, format ='[%(levelname)s] (%(threadName)-10s) %(message)s' ) def demo (e1, e2 ): logging.debug("start" ) e1.wait() logging.debug("e1 is set" ) e2.wait(3 ) logging.debug("e2 ignored" ) logging.debug("exiting" ) e1 = threading.Event() e2 = threading.Event() t = threading.Thread( name="demo thread" , target=demo, args=(e1, e2) ) t.start() logging.debug("ready!" ) e1.set () def consumer (cond ): with cond: cond.wait() logging.debug("resource is available to sonsumer" ) def producer (cond ): with cond: logging.debug("make resource available" ) cond.notifyAll() condition = threading.Condition() c1 = threading.Thread(name="c1" , target=consumer, args=(condition,)) c2 = threading.Thread(name="c2" , target=consumer, args=(condition,)) p = threading.Thread(name="p" , target=producer, args=(condition,)) c1.start() c2.start() p.start() maxconnections = 5 pool_sema = threading.Semaphore(value=maxconnections) with pool_sema: try : pass finally : pass
8.2 multiprocessing multiprocessing 可以规避掉全局解释器锁(GIL)带来的瓶颈
8.2.1 通用使用方法 指定目标函数,构造派生,守护进程的使用方法都与线程类似,略过
同时由于与线程不同,进程试可以单独存在的,所有必要的时候需要将其杀死
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import multiprocessingimport timeimport logginglogging.basicConfig( level=logging.DEBUG, format ='[%(levelname)s] %(message)s' ) def worker (): while True : logging.debug("{} is working" .format ( multiprocessing.current_process().__name__)) time.sleep(0.1 ) if __name__ == "__main__" : multiprocessing.log_to_stderr(logging.DEBUG) p = multiprocessing.Process() p.start() time.sleep(3 ) p.terminate() p.join()
8.2.2 进程间通信 进程事件,锁机制,Condition,Semaphore 和线程事件类似,略过 进程间可以通过multiprocessing.Manager
创建的特殊容器交换数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import multiprocessingdef worker (d, key, value ): d[key] = value if __name__ == "__main__" : manager = multiprocessing.Manager() d = manager.dict () jobs = [ multiprocessing.Process( target=worker, args=(d, i, i*2 ) ) for i in range (5 ) ] for j in jobs: j.start() for j in jobs: j.join() print(d)
8.2.3 进程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import multiprocessingdef do_calculation (data ): return data*2 def start_process (): print("Starting" , multiprocessing.current_process().name) if __name__ == "__main__" : inputs = list (range (10 )) pool_size = multiprocessing.cpu_count() * 2 pool = multiprocessing.Pool( maxtasksperchild=2 , processes=pool_size, initializer=start_process ) outputs = pool.map (do_calculation, inputs) pool.close() pool.join() print(outputs)
8.3 concurrent.futures 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 from concurrent import futuresimport threadingimport timedef task (n ): print("{} is working" .format (threading.current_thread().name)) time.sleep(0.5 ) print("{} finished" .format (threading.current_thread().name)) return n/10 ex = futures.ThreadPoolExecutor(max_workers=3 ) results = ex.map (task, range (5 , 10 )) print("waiting for the result" ) real_results = list (results) print(real_results) f = ex.submit(task, 10 ) print(f.result()) wait_for = [ ex.submit(task, i) for i in range (5 , 8 ) ] for f in futures.as_completed(wait_for): print("the result is {}" .format (f.result())) def done (fn: futures.Future ): if fn.cancelled(): print("{}:cancelled" .format (fn.arg)) elif fn.done(): error = fn.exception() if error: print("{}: error returned" .format (fn.arg)) else : result = fn.result() print("{}:value({}) returned" .format (fn.arg, result)) f = ex.submit(task, 5 ) f.arg = 5 f.add_done_callback(done) def task_with_error (n ): print("{}:starting" .format (n)) raise ValueError("value error" ) f = ex.submit(task_with_error, 5 ) try : result = f.result() except ValueError as e: print("main:{} when assesing result" .format (e))