"""
# 7、分层索引、
"""
import pandas as pd
import numpy as np
index = [('California', 2000), ('California', 2010),
('New York', 2000), ('New York', 2010),
('Texas', 2000), ('Texas', 2010)]
populations = [33871648, 37253956,
18976457, 19378102,
20851820, 25145561]
pop = pd.Series(populations, index=index)
print(pop)
# 切片
print(pop[('California', 2010):('Texas', 2000)])
# 例如,如果您需要从2010年中选择所有值
# 通过列表生成式,来生成2010对应的索引数据
print(pop[[i for i in pop.index if i[1] == 2010]])
# 上面的列表生成式的索引方式,代码看起来还是有些小复杂
# 下面我们用MultiIndex分层索引
'''
请注意,其中MultiIndex包含多个索引级别-在这种情况下,包括状态名称和年份,以及为每个数据点编码这些级别的多个标签。
'''
index = pd.MultiIndex.from_tuples(index)
print(index)
# 重置索引
pop = pop.reindex(index)
print(pop)
# 再次查询2010年的所有数据
print(pop[:, 2010])
# 取消堆叠
pop_df = pop.unstack()
print(pop_df)
# 堆叠
print(pop_df.stack())
# 添加一列
pop_df = pd.DataFrame({'total': pop,
'under18': [9267089, 9284094,
4687374, 4318033,
5906301, 6879014]})
print(pop_df)
# 根据上述数据,我们在此按年份计算了18岁以下的人口比例
f_u18 = pop_df['under18'] / pop_df['total']
print(f_u18.unstack())
# 创建多个索引的df
df = pd.DataFrame(np.random.rand(4, 2),
index=[['a', 'a', 'b', 'b'], [1, 2, 1, 2]],
columns=['data1', 'data2'])
print(df)
data = {('California', 2000): 33871648,
('California', 2010): 37253956,
('Texas', 2000): 20851820,
('Texas', 2010): 25145561,
('New York', 2000): 18976457,
('New York', 2010): 19378102}
print(pd.Series(data))
# 通过多个一维数组创建多维索引
index1 = pd.MultiIndex.from_arrays([['a', 'a', 'b', 'b'], [1, 2, 1, 2]])
print(index1)
# 通过多个元组创建多维索引
index2 = pd.MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1), ('b', 2)])
print(index2)
# 通过单个索引的笛卡尔积构造它
index3 = pd.MultiIndex.from_product([['a', 'b'], [1, 2]])
print(index3)
# 通过传递levels(包含每个级别的可用索引值labels的列表的列表)和(引用这些标签的列表的列表)直接使用其内部编码来构造:
index4 = pd.MultiIndex(levels=[['a', 'b'], [1, 2]],
codes=[[0, 0, 1, 1], [0, 1, 0, 1]])
print(index4)
print(pop.index)
# 给多层索引的Series 索引起名
pop.index.names = ['state', 'year']
print(pop)
# 层次索引和列
index = pd.MultiIndex.from_product([[2013, 2014], [1, 2]], names=['year', 'visit'])
columns = pd.MultiIndex.from_product([['Bob', 'Guido', 'Sue'], ['HR', 'Temp']], names=['subject', 'type'])
# mock 一些数据
data = np.round(np.random.randn(4, 6), 1)
data[:, ::2] *= 10
data += 37
# create the DataFrame
health_data = pd.DataFrame(data, index=index, columns=columns)
'''DataFrames乘法索引'''
print(health_data)
# 通过人名来访问
print(health_data['Guido'])
# 索引和切片一个多指标
print(pop)
# 通过部分索引取值
print(pop['California'])
# 通过完整索引取值
print(pop['California', 2000])
# 通过部分索引,切片取值
print(pop.loc['California':'New York'])
# 使用部分索引取值.
print(pop.loc[:, 2000])
# 使用布尔索引
print(pop[pop > 22000000])
# 使用花式索引
print(pop[['California', 'Texas']])
print(health_data)
print(health_data['Guido', 'HR'])
print(health_data.iloc[:2, :2]) # 2行,2列
print(health_data.loc[:, ('Bob', 'HR')])
# 元组中创建切片将导致语法错误:
# print(health_data.loc[(:,1), (:, 'HR')])
# 上述解决方案
idx = pd.IndexSlice
print(health_data.loc[idx[:, 1], idx[:, 'HR']])
'''
警告 如果索引未排序,许多MultiIndex切片操作将失败。
'''
index = pd.MultiIndex.from_product([['a', 'c', 'b'], [1, 2]])
data = pd.Series(np.random.rand(6), index=index)
data.index.names = ['char', 'int']
print(data)
# 出错的原因就是,未对MultiIndex进行排序的结果。
# 熊猫提供了许多方便的例程来执行这种排序
# sort_index()和sortlevel()方法
try:
data['a':'b']
except KeyError as e:
print(type(e))
print(e)
data = data.sort_index()
print(data)
print(data['a':'b'])
'''
Stacking and unstacking indices¶
说明:
如前所述,可以将数据集从堆叠的多索引转换为简单的二维表示形式,可以选择指定要使用的级别:
'''
print(pop)
print('---------------------')
print(pop.unstack(level=0))
print('---------------------')
print(pop.unstack(level=1))
print('---------------------')
# 与unstack()相反stack(),在这里可用于恢复原始系列:
print(pop.unstack().stack()) # 还原
'''指标设置和重置'''
# 重新排列层次结构数据的另一种方法是将索引标签转换为列
# 这可以通过该reset_index方法来完成
pop_flat = pop.reset_index(name='population')
print(pop_flat)
print(pop_flat.set_index(['state', 'year']))
'''多指标上的数据聚合'''
# 每年两次访问中的测量结果进行平均。
data_mean = health_data.mean(level='year')
print(data_mean)
# 使用axis关键字,我们还可以在列的各个级别之间取均值
print(data_mean.mean(axis=1, level='type'))
原创-Pandas心法之处理丢失数据-3
"""
# 6、处理丢失数据
"""
# 现实世界中的数据很少是干净且同质的。特别是,许多有趣的数据集将丢失一些数据
import numpy as np
import pandas as pd
# 第一种:缺少的数据为None
vals1 = np.array([1, None, 3, 4])
print(vals1)
# 因为数组里面有None控制,so,聚合函数报错
print(vals1.sum()) # 报错
# 第二种:另一个缺少的数据表示形式NaN
vals2 = np.array([1, np.nan, 3, 4])
print(vals2)
print(vals2.dtype)
# 这种缺失的数据,在计算和聚合的时候不会错误
print(1 + np.nan)
print(0 * np.nan)
# 使用nan式聚合函数
print(np.sum(vals2)) # 错误
print(np.nansum(vals2))
print(np.nanmin(vals2))
# NaN and None in Pandas¶
# 在pandas 里面None 和nan d都统一处理NaN
print(pd.Series([1, np.nan, 2, None]))
x = pd.Series(range(2), dtype=int)
print(x)
x[0] = None
print(x)
# 对空值进行处理
'''
isnull():生成一个布尔值掩码,指示缺少的值
notnull(): 的反面 isnull()
dropna():返回过滤后的数据版本
fillna():返回填充或估算缺失值的数据副本
'''
# 检查空值
data = pd.Series([1, np.nan, 'hello', None])
# 为空的布尔数组
print(data.isnull())
# 为空的数据
print(data[data.isnull()])
# 不为空的数据
print(data[data.notnull()])
# 删除空值
print(data.dropna())
# DataFrame 的空值处理
df = pd.DataFrame([[1, np.nan, 2],
[2, 3, 5],
[np.nan, 4, 6]])
print(df)
print('----------------')
print(df.dropna()) # 暴力的只要行中包括na就全行删除。
print(df.dropna(axis='columns')) # 只要列中包括na,就删除整列
# 增加一列全是nan的
df[3] = np.nan
print(df)
print('----------------')
# 删除某一列中全是nan 的数据
print(df.dropna(axis='columns', how='all'))
# 删除每行少于3个非空值的数据 ,thresh 为控制参数
print(df.dropna(axis='rows', thresh=3))
# 此处的第一行和最后一行已删除,因为它们仅包含两个非空值
# 填充空值,一味的删除也不是个办法
# 有时,您宁可使用有效值替换它们,也不要丢弃NA值
data = pd.Series([1, np.nan, 2, None, 3], index=list('abcde'))
print(data)
# 把空值都填充为零
print(data.fillna(0))
# 把空值填充为上一行不为空的值,叫向前填充
print(data.fillna(method='ffill'))
# 把空值填充为下一行不为空的值,叫向后填充
print(data.fillna(method='bfill'))
print(df)
print('-------')
print(df.fillna(method='ffill', axis=1)) # 沿X轴方向向前填充
# 请注意,如果在向前填充过程中先前值不可用,则NA值将保留。
原创-Pandas心法之初级数据操作与索引对齐-2
1、数据操作
"""
# 4、对熊猫中的数据进行操作
由于Pandas设计为可与NumPy配合使用,
因此任何NumPy ufunc均可在PandasSeries和DataFrame对象上使用
"""
import pandas as pd
import numpy as np
rng = np.random.RandomState(42)
ser = pd.Series(rng.randint(0, 10, 4))
print(ser)
df = pd.DataFrame(rng.randint(0, 10, (3, 4)),
columns=['A', 'B', 'C', 'D'])
print(df)
print(np.max(ser)) # 求Series 中最大的值
# 复杂一点的计算
print(np.sin(df * np.pi / 4))
2、索引对齐
'''
# 5、索引对齐
对于两个Series或两个DataFrame对象的二进制操作,Pandas将在执行操作的过程中对齐索引。在处理不完整的数据时,这非常方便
'''
# 两个Series 中有不同的index 索引
area = pd.Series({'Alaska': 1723337, 'Texas': 695662,
'California': 423967}, name='area')
population = pd.Series({'California': 38332521, 'Texas': 26448193,
'New York': 19651127}, name='population')
print(population / area)
# 找出两个Series 中所有行索引
print(area.index | population.index)
print(area.index & population.index)
# 处理索引不对齐的情况、
A = pd.Series([2, 4, 6], index=[0, 1, 2])
B = pd.Series([1, 3, 5], index=[1, 2, 3])
print(A + B)
# 使用NaN不是理想的行为.
# 处理方式,在索引不对齐的情况,把不存在的索引值填充为零。
print(A.add(B, fill_value=0))
df1 = pd.DataFrame(rng.randint(0, 20, (2, 2)),
columns=list('AB'))
print(df1)
print('------------------')
df2 = pd.DataFrame(rng.randint(0, 10, (3, 3)),
columns=list('BAC'))
print(df2)
print('------------------')
print(df1+df2)
'''
请注意,无论两个对象中的索引顺序如何,
索引都正确对齐,并且对结果中的索引进行排序
'''
# 更好的处理NaN处理方式
print(df1.stack()) # 堆叠,把列索引变行索引,相当于行的二级索引。
fill = df1.stack().mean()
print(fill)
df1.add(df2, fill_value=fill) # 把NaN 填充为均值。
'''
基础的运算符可以直接应用与DataFrame
+ add()
- sub(), subtract()
* mul(), multiply()
/ truediv(),div(),divide()
// floordiv()
% mod()
** pow()
'''
# numpy 二维数组处理 减法
A = rng.randint(10, size=(3, 4))
# print(A)
# print(A - A[0])
# DataFrame数据 处理方式
df = pd.DataFrame(A, columns=list('QRST'), index=['a', 'b', 'c'])
print(df)
print('--------------------')
# 取出列索引为Q 行索引名为a 的数据:3
print(df['Q']['a'])
# 。你的目的虽然是取出行索引全部的数据。但是语法方式行不通。
print(df['a']) # 错误
# 通过堆叠,你可以取出行索引为a 的全部数据。
print(df.stack()['a']) # 正确
print(df - df.iloc[0]) # 默认沿着行 axis =1 计算。
print(df['Q']) # 列索引
print(df.iloc[0]) # 行索引
# 如果要改为按列操作,
# 则可以在指定axis关键字的同时使用前面提到的对象方法:
print(df.subtract(df['R'], axis=0))
# 行数据索引为0,列索引步长为2.
halfrow = df.iloc[0, ::2]
print(halfrow)
print(df - halfrow)
原创-Pandas心法之数据结构篇-1
一、前言
在最基本的层面上,Pandas对象可以看作是NumPy结构化数组的增强版本,
其中,行 (X轴)和列 (Y轴)使用标签而不是简单的整数索引进行标识。
我们先介绍这三个基本熊猫的数据结构:Series,DataFrame和Index。
Series:是带有显示索引数据的一维数组说明:它的索引可以不必是整数。是具有与值相关联的显式定义的索引。
有时只是操作Series ,比操作字典更有效。
DataFrame:是一个带有显示索引数据的二维数组。
实际上DataFrame是由多个Series 组成。 DataFrame 不仅有行索引,还有列索引。
Index:一个显式索引,可以修改,可逻辑操作的一维数组。
二、Series 数据结构
import pandas as pd
'''
# 1、Series 、
说明:Series是一维数组。可以从列表或数组中创建它
'''
data = pd.Series([0.25, 0.5, 0.75, 1.0])
print(data)
# 通过values、indexs属性访问它
print(data.values)
print(data.index)
# 通过索引访问它
print(data[1])
# 通过切片来访问它
print(data[1:3])
'''说明:Pandas 的Series 比Numpy的一维数组和列表更加具有灵活性'''
# 创建带有显示索引的Series
data = pd.Series([0.25, 0.5, 0.75, 1.0],
index=['a', 'b', 'c', 'd'])
print(data)
# 通过显示索引访问数据
print(data['a'])
# 也可以用非连续数字来作为索引
data = pd.Series([0.25, 0.5, 0.75, 1.0],
index=[2, 5, 3, 7])
print(data)
print(data[5])
# 把字典转化为Series。
population_dict = {'California': 38332521,
'Texas': 26448193,
'New York': 19651127,
'Florida': 19552860,
'Illinois': 12882135}
population = pd.Series(population_dict)
print(population)
# 可以用字典的方式,读取数据
print(population['California'])
# 与字典不同的它还支持切片
print(population['California':'Florida'])
# 创建series 对象
pd.Series(data, index=index)
# 例如,data可以是列表或NumPy数组,在这种情况下index默认为整数序列
print(pd.Series([2, 4, 6]))
# Series 也可以是个标量
print(pd.Series(5, index=[100, 200, 300]))
# data 也可以是个字典
print(pd.Series({2: 'a', 1: 'b', 3: 'c'}))
# 只显示和构建设置了显示索引的数据
print(pd.Series({2: 'a', 1: 'b', 3: 'c'}, index=[3, 2]))
三、DataFrame数据结构
"""
# 2、DataFrame
说明:带有显示索引的二维数组。由多个Series组成。
"""
import pandas as pd
import numpy as np
area_dict = {'California': 423967, 'Texas': 695662, 'New York': 141297,
'Florida': 170312, 'Illinois': 149995}
population_dict = {'California': 38332521,
'Texas': 26448193,
'New York': 19651127,
'Florida': 19552860,
'Illinois': 12882135}
#
population = pd.Series(population_dict)
area = pd.Series(area_dict)
# 通过两个Series 创建一个DataFrame. 每个Series 是一列。
states = pd.DataFrame({'population': population,
'area': area})
print(states)
print(states['area']) # 访问某列就是返回Series数据
# 通过index、columns、values 属性访问DataFrame 的数据
print(states.index)
print(states.columns)
print(states.values)
# 通过单列的Series 构造单列的DataFrame
print(pd.DataFrame(population, columns=['population']))
# 通过 list of dicts 来创建DateFrame
data = [{'a': i, 'b': 2 * i} for i in range(3)]
print(data)
print(pd.DataFrame(data))
# 通过dict of Lists 来创建DataFrame
test_dict = {'id': [1, 2, 3],
'name': ['Alice', 'Bob', 'Cindy'],
'math': [90, 89, 99],
'english': [89, 94, 80]
}
# [1].直接写入参数test_dict
test_dict_df = pd.DataFrame(test_dict, index=['a', 'b', 'c'])
print(test_dict_df)
# 即使字典中缺少某些键,也会用NaN来进行填充
print(pd.DataFrame([{'a': 1, 'b': 2}, {'b': 3, 'c': 4}]))
# 通过Numpy的二维数组,来创建DataFrame
print(pd.DataFrame(np.random.rand(3, 2), # 3行,2列的Numpy二维数组
columns=['foo', 'bar'], # 给每列起名字
index=['a', 'b', 'c'])) # 给每行加索引
# 通过Numpy结构化数据创建DataFrame
A = np.zeros(3, dtype=[('A', 'i8'), ('B', 'f8')])
print(pd.DataFrame(A))
Python 异步编程-Gevent 总结
1、Gevent介绍与案例
2、 案例之非确定性task与服务器异步
3、 状态判断与杀死进程
4、超时控制
5、猴子补丁与事件异步通信
6、异步安全操作之队列
7、组和池
8、锁和信号量以及线程局部变量
9、子进程
10、Actor 模型
11、案例-聊天服务器
12、案例-ZeroMQ与Servers
13、案例-Websockets
以上来源参考:https://sdiehl.github.io/gevent-tutorial/ 对此进行翻译和整理,部分修改以适用于gevent1.4.0 版本。
Python-异步编程-gevent-13
Websockets
需要 gevent-websocket 的 Websocket 示例。
# Simple gevent-websocket server
import json
import random
from gevent import pywsgi, sleep
from geventwebsocket.handler import WebSocketHandler
class WebSocketApp(object):
'''Send random data to the websocket'''
def __call__(self, environ, start_response):
ws = environ['wsgi.websocket']
x = 0
while True:
data = json.dumps({'x': x, 'y': random.randint(1, 5)})
ws.send(data)
x += 1
sleep(0.5)
server = pywsgi.WSGIServer(("", 10000), WebSocketApp(),
handler_class=WebSocketHandler)
server.serve_forever()
HTML Page:
<html>
<head>
<title>Minimal websocket application</title>
<script type="text/javascript" src="jquery.min.js"></script>
<script type="text/javascript">
$(function() {
// Open up a connection to our server
var ws = new WebSocket("ws://localhost:10000/");
// What do we do when we get a message?
ws.onmessage = function(evt) {
$("#placeholder").append('<p>' + evt.data + '</p>')
}
// Just update our conn_status field with the connection status
ws.onopen = function(evt) {
$('#conn_status').html('<b>Connected</b>');
}
ws.onerror = function(evt) {
$('#conn_status').html('<b>Error</b>');
}
ws.onclose = function(evt) {
$('#conn_status').html('<b>Closed</b>');
}
});
</script>
</head>
<body>
<h1>WebSocket Example</h1>
<div id="conn_status">Not Connected</div>
<div id="placeholder" style="width:600px;height:300px;"></div>
</body>
</html>
Python-异步编程-gevent-12
Gevent ZeroMQ
ZeroMQ 被其作者描述为“充当并发框架的套接字库”。 它是一个非常强大的消息传递层,用于构建并发和分布式应用程序。
ZeroMQ 提供了多种套接字原语,其中最简单的是请求-响应套接字对。 套接字有两种感兴趣的方法 send 和 recv,这两种方法通常都是阻塞操作。 但这已由 Travis Cline 的一个出色的库(现在是 pyzmq 的一部分)进行了补救,该库使用 gevent.socket 以非阻塞方式轮询 ZeroMQ 套接字。
# Note: Remember to ``pip install pyzmq``
import gevent
import zmq.green as zmq
# Global Context
context = zmq.Context()
def server():
server_socket = context.socket(zmq.REQ)
server_socket.bind("tcp://127.0.0.1:5000")
for request in range(1,10):
server_socket.send("Hello")
print('Switched to Server for %s' % request)
# Implicit context switch occurs here
server_socket.recv()
def client():
client_socket = context.socket(zmq.REP)
client_socket.connect("tcp://127.0.0.1:5000")
for request in range(1,10):
client_socket.recv()
print('Switched to Client for %s' % request)
# Implicit context switch occurs here
client_socket.send("World")
publisher = gevent.spawn(server)
client = gevent.spawn(client)
gevent.joinall([publisher, client])
Switched to Server for 1
Switched to Client for 1
Switched to Server for 2
Switched to Client for 2
Switched to Server for 3
Switched to Client for 3
Switched to Server for 4
Switched to Client for 4
Switched to Server for 5
Switched to Client for 5
Switched to Server for 6
Switched to Client for 6
Switched to Server for 7
Switched to Client for 7
Switched to Server for 8
Switched to Client for 8
Switched to Server for 9
Switched to Client for 9
Simple Servers
# On Unix: Access with ``$ nc 127.0.0.1 5000``
# On Window: Access with ``$ telnet 127.0.0.1 5000``
from gevent.server import StreamServer
def handle(socket, address):
socket.send("Hello from a telnet!\n")
for i in range(5):
socket.send(str(i) + '\n')
socket.close()
server = StreamServer(('127.0.0.1', 5000), handle)
server.serve_forever()
WSGI Servers
Gevent 提供了两个 WSGI 服务器,用于通过 HTTP 提供内容。 此后称为 wsgi 和 pywsgi:
gevent.wsgi.WSGIServer
gevent.pywsgi.WSGIServer
在 1.0.x 之前的早期 gevent 版本中,gevent 使用 libevent 而不是 libev。 Libevent 包含一个快速的 HTTP 服务器,它被 gevent 的 wsgi 服务器使用。
在 gevent 1.0.x 中没有包含 http 服务器。 相反,gevent.wsgi 现在是 gevent.pywsgi 中纯 Python 服务器的别名。
Streaming Servers
如果您使用的是 gevent 1.0.x,则本节不适用
对于那些熟悉流式 HTTP 服务的人来说,核心思想是在标头中我们不指定内容的长度。 相反,我们保持连接打开并沿管道冲洗块,在每个前面加上一个表示块长度的十六进制数字。 当发送大小为零的块时,流将关闭。
HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked
8
<p>Hello
9
World</p>
0
无法在 wsgi 中创建上述 HTTP 连接,因为不支持流式传输。 相反,它必须缓冲。
from gevent.wsgi import WSGIServer
def application(environ, start_response):
status = '200 OK'
body = '<p>Hello World</p>'
headers = [
('Content-Type', 'text/html')
]
start_response(status, headers)
return [body]
WSGIServer(('', 8000), application).serve_forever()
然而,使用 pywsgi 我们可以将我们的处理程序编写为生成器并通过 chun 生成结果块
from gevent.pywsgi import WSGIServer
def application(environ, start_response):
status = '200 OK'
headers = [
('Content-Type', 'text/html')
]
start_response(status, headers)
yield "<p>Hello"
yield "World</p>"
WSGIServer(('', 8000), application).serve_forever()
但无论如何,与其他 Python 服务器相比,Gevent 服务器的性能是惊人的。 libev 是一项经过严格审查的技术,众所周知,它的衍生服务器在规模上表现良好。
要进行基准测试,请尝试使用 Apache Benchmark ab 或查看 Python WSGI 服务器的基准以与其他服务器进行比较。
$ ab -n 10000 -c 100 http://127.0.0.1:8000/
Python-异步编程-gevent-11
Chat Server
最后一个激励示例,一个实时聊天室。 此示例需要 Flask(但不一定如此,您可以使用 Django、Pyramid 等)。 可以在此处找到相应的 Javascript 和 HTML 文件 https://github.com/ZhangTongLe/minichat
# Micro gevent chatroom.
# ----------------------
from flask import Flask, render_template, request
from gevent import queue
from gevent.pywsgi import WSGIServer
import simplejson as json
app = Flask(__name__)
app.debug = True
rooms = {
'topic1': Room(),
'topic2': Room(),
}
users = {}
class Room(object):
def __init__(self):
self.users = set()
self.messages = []
def backlog(self, size=25):
return self.messages[-size:]
def subscribe(self, user):
self.users.add(user)
def add(self, message):
for user in self.users:
print(user)
user.queue.put_nowait(message)
self.messages.append(message)
class User(object):
def __init__(self):
self.queue = queue.Queue()
@app.route('/')
def choose_name():
return render_template('choose.html')
@app.route('/<uid>')
def main(uid):
return render_template('main.html',
uid=uid,
rooms=rooms.keys()
)
@app.route('/<room>/<uid>')
def join(room, uid):
user = users.get(uid, None)
if not user:
users[uid] = user = User()
active_room = rooms[room]
active_room.subscribe(user)
print('subscribe %s %s' % (active_room, user))
messages = active_room.backlog()
return render_template('room.html',
room=room, uid=uid, messages=messages)
@app.route("/put/<room>/<uid>", methods=["POST"])
def put(room, uid):
user = users[uid]
room = rooms[room]
message = request.form['message']
room.add(':'.join([uid, message]))
return ''
@app.route("/poll/<uid>", methods=["POST"])
def poll(uid):
try:
msg = users[uid].queue.get(timeout=10)
except queue.Empty:
msg = []
return json.dumps(msg)
if __name__ == "__main__":
http = WSGIServer(('', 5000), app)
http.serve_forever()
Python-异步编程-gevent-10
Actors
Actor 模型是由 Erlang 语言推广的更高级别的并发模型。 简而言之,主要思想是您有一组独立的 Actor,它们有一个收件箱,他们可以从中接收来自其他 Actor 的消息。 Actor 内的主循环遍历其消息并根据其所需的行为采取行动。
Gevent 没有原始 Actor 类型,但我们可以使用子类 Greenlet 中的 Queue 非常简单地定义一个。
import gevent
from gevent.queue import Queue
class Actor(gevent.Greenlet):
def __init__(self):
self.inbox = Queue()
Greenlet.__init__(self)
def receive(self, message):
"""
Define in your subclass.
"""
raise NotImplemented()
def _run(self):
self.running = True
while self.running:
message = self.inbox.get()
self.receive(message)
In a use case:
import gevent
from gevent.queue import Queue
from gevent import Greenlet
class Pinger(Actor):
def receive(self, message):
print(message)
pong.inbox.put('ping')
gevent.sleep(0)
class Ponger(Actor):
def receive(self, message):
print(message)
ping.inbox.put('pong')
gevent.sleep(0)
ping = Pinger()
pong = Ponger()
ping.start()
pong.start()
ping.inbox.put('start')
gevent.joinall([ping, pong])
Python-异步编程-gevent-9
子进程
从 gevent 1.0 开始,已经添加了 gevent.subprocess——Python 的 subprocess 模块的修补版本。 它支持对子进程的协作等待。
# 中间插入子进程
import gevent
from gevent.subprocess import Popen, PIPE
def cron():
while True:
print("cron")
gevent.sleep(0.2)
g = gevent.spawn(cron)
sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True)
out, err = sub.communicate()
g.kill()
print(out.rstrip())
'''
cron
cron
cron
cron
cron
Linux
'''
许多人还希望将 gevent 和 multiprocessing 一起使用。 最明显的挑战之一是多处理提供的进程间通信在默认情况下是不合作的。 由于基于 multiprocessing.Connection 的对象(例如 Pipe)公开了它们的底层文件描述符,因此 gevent.socket.wait_read 和 wait_write 可用于在实际读取/写入之前协同等待准备读取/准备写入事件:
import gevent
from multiprocessing import Process, Pipe
from gevent.socket import wait_read, wait_write
# To Process
a, b = Pipe()
# From Process
c, d = Pipe()
def relay():
for i in xrange(10):
msg = b.recv()
c.send(msg + " in " + str(i))
def put_msg():
for i in xrange(10):
wait_write(a.fileno())
a.send('hi')
def get_msg():
for i in xrange(10):
wait_read(d.fileno())
print(d.recv())
if __name__ == '__main__':
proc = Process(target=relay)
proc.start()
g1 = gevent.spawn(get_msg)
g2 = gevent.spawn(put_msg)
gevent.joinall([g1, g2], timeout=1)
但是请注意,多处理和 gevent 的组合带来了某些依赖于操作系统的陷阱,其中包括:
在符合 POSIX 的系统上分叉后,孩子的 gevent 状态是不适定的。一个副作用是 greenlets 在 multiprocessing.Process 创建之前产生,在父进程和子进程中运行。
上面 put_msg() 中的 a.send() 可能仍会以非合作方式阻塞调用线程:准备写入事件仅确保可以写入一个字节。在尝试写入完成之前,底层缓冲区可能已满。
上述基于 wait_write() / wait_read() 的方法在 Windows 上不起作用(IOError: 3 is not a socket (file is not supported)),因为 Windows 无法监视事件管道。
Python 包 gipc 在符合 POSIX 的系统和 Windows 系统上以一种非常透明的方式为您克服了这些挑战。它提供了 gevent-aware multiprocessing.Process-based child processes 和基于管道的 gevent-cooperative 进程间通信。