时间戳

10位(秒级)

1
round(time.time())

13(毫秒级)

1
round(time.time() * 10 ** 3)

格式化输出

1
2
In [24]: time.strftime('%Y-%m-%d %H:%M:%S')
Out[24]: '2022-06-12 22:16:59'

解析字符串

1
2
3
4
5
In [34]: time.strptime('2022-06-13', '%Y-%m-%d')
Out[34]: time.struct_time(tm_year=2022, tm_mon=6, tm_mday=13, tm_hour=0, tm_min=0, tm_sec=0, tm_wday=0, tm_yday=164, tm_isdst=-1)

In [35]: time.strptime("30 Nov 00", "%d %b %y")
Out[35]: time.struct_time(tm_year=2000, tm_mon=11, tm_mday=30, tm_hour=0, tm_min=0, tm_sec=0, tm_wday=3, tm_yday=335, tm_isdst=-1)

localtime转时间戳

1
2
3
4
5
6
7
8
In [30]: time.localtime()
Out[30]: time.struct_time(tm_year=2022, tm_mon=6, tm_mday=12, tm_hour=22, tm_min=38, tm_sec=39, tm_wday=6, tm_yday=163, tm_isdst=0)

In [33]: time.mktime(time.localtime())
Out[33]: 1655044789.0

# 获取时间戳各个部分
year, month, day, hour, minute, second = time.localtime().tm_year, time.localtime().tm_mon, time.localtime().tm_mday, time.localtime().tm_hour, time.localtime().tm_min, time.localtime().tm_sec

PurePath路径处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from pathlib import PurePath

In [5]: path = 'C:\\Users\\Desktop\\hello-world\\cxs.png'
In [6]: pwp = PurePath(path)

In [7]: pwp.parts
Out[7]: ('C:\\', 'Users', 'Desktop', 'hello-world', 'cxs.png')

# 文件名
In [17]: pwp.name
Out[17]: 'cxs.png'

# 文件类型
In [18]: pwp.suffix
Out[18]: '.png'

lxml的使用

两种方式构造 root
  1. etree.HTML(会自动添加html节点
1
2
3
4
In [2]: from lxml import etree
In [3]: root1 = etree.HTML(text)
In [8]: root1
Out[8]: <Element html at 0x20b73f77488>
  1. html.fromstring
1
2
3
4
In [6]: from lxml import html
In [7]: root2 = html.fromstring(text)
In [9]: root2
Out[9]: <Element div at 0x20b75078778>
html节点相关操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 去除指定类型节点
etree.strip_elements(root, "script", "style")
# 只去除标签,保留内部文本
etree.strip_tags(root, "comment")
# 去除指定节点
for a in root.cssselect("a[action-type='feed_list_url']"):
a.getparent().remove(a)
# 节点属性修改
img1 = root.cssselect("img")[0]
img1.attrib["src"] = "..."
# 构造子节点(div下面构造img子节点)
divs = root.cssselect("div.image-block")
for ele in divs:
img = etree.SubElement(ele, "img")
img.attrib["src"] = src

# 获取全部纯文本
content = "".join(tree.itertext()).strip()
# 重新生成HTML,root也可以是某个节点
return etree.tostring(root, encoding="unicode", method="html")
cleaner过滤器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from lxml.html.clean import Cleaner

'''
部分可选参数如下,具体查阅源码:
``scripts``:
Removes any ``<script>`` tags.

``javascript``:
Removes any Javascript, like an ``onclick`` attribute. Also removes stylesheets
as they could contain Javascript.

``comments``:
Removes any comments.

``style``:
Removes any style tags.

``inline_style``
Removes any style attributes. Defaults to the value of the ``style`` option.

``links``:
Removes any ``<link>`` tags

``meta``:
Removes any ``<meta>`` tags

``page_structure``:
Structural parts of a page: ``<head>``, ``<html>``, ``<title>``.

``processing_instructions``:
Removes any processing instructions.

``embedded``:
Removes any embedded objects (flash, iframes)

``frames``:
Removes any frame-related tags

``forms``:
Removes any form tags

``annoying_tags``:
Tags that aren't *wrong*, but are annoying. ``<blink>`` and ``<marquee>``

``remove_tags``:
A list of tags to remove. Only the tags will be removed,
their content will get pulled up into the parent tag.

``kill_tags``:
A list of tags to kill. Killing also removes the tag's content,
i.e. the whole subtree, not just the tag itself.

``allow_tags``:
A list of tags to include (default include all).

``remove_unknown_tags``:
Remove any tags that aren't standard parts of HTML.
'''

cleaner = Cleaner(style=True)
clean_text = cleaner.clean_html(tag_text)

asycnio

基本使用
  • async 包装函数创建一个协程
  • await 可以针对耗时的操作进行挂起,类似yield,函数让出控制权
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import asyncio
import time

from pprint import pprint

async def print_myself(name, slp):
print(f'My name is {name}')
await asyncio.sleep(slp)
return f"{name}函数已执行完毕"

def callback(future):
print("Callback: ", future.result())

coroutine1 = print_myself("cxs", 1)
coroutine2 = print_myself("cxw", 2)
coroutine3 = print_myself("clw", 3)

# 创建一个事件循环
loop = asyncio.get_event_loop()

"""
两种方式创建 future对象
"""
task = loop.create_task(coroutine)
# 或者
task = asyncio.ensure_future(coroutine)

#
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3),
]

# 判断类型
print(isinstance(tasks, asyncio.Future))

# 绑定回调函数,获取task返回值
task.add_done_callback(callback)

# 将协程注册进事件循环
loop.run_until_complete(asyncio.wait(tasks))
# 或者
loop.run_until_complete(asyncio.gather(*tasks))

loop.close()
配合多线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import time
import asyncio

from threading import Thread, current_thread


def start_loop(loop):
"""
开启一个无限事件循环
"""
asyncio.set_event_loop(loop)
loop.run_forever()


def get_thread_name():
print("current thread name: ", current_thread())


async def coroutine_work(x):
"""
协程
"""
get_thread_name()
print("start: ", int(time.time()))
await asyncio.sleep(x)
end = time.time()
print(f"end: ", int(time.time()))


def common_work(x):
"""
普通函数
"""
get_thread_name()
start = time.time()
time.sleep(x)
end = time.time()
print(f"耗时{end - start}")


def main():
"""
主线程
"""
get_thread_name()
loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(loop,)) # 子线程
t.start()

# 协程注册到事件循环
asyncio.run_coroutine_threadsafe(coroutine_work(4), loop)
asyncio.run_coroutine_threadsafe(coroutine_work(2), loop)


main()

'''
输出如下:

current thread name: <_MainThread(MainThread, started 36316)>
current thread name: <Thread(Thread-1, started 4136)>
start: 1654077800
current thread name: <Thread(Thread-1, started 4136)>
start: 1654077800
end: 1654077802
end: 1654077804
'''

从输出结果可以得出:

  • 两个协程都是在子线程中运行,区别于主线程
  • 两个协程是同时启动,并发运行

requests 发送文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
files = {
"file": (
file_name,
BytesIO(resp.content),
content_type, # 'image/png'
),
}
data = {
"token": "4l10x2T4B/4=",
"bucket": "common",
"mime_type": "text/css",
}
url = "https://xxx.com/oss/upload"
result = requests.post(url, files=files, data=data).json()