快乐学习
前程无忧、中华英才非你莫属!

原创-Pandas心法之数据结构篇-1

zhangtongle阅读(2453)

一、前言

在最基本的层面上,Pandas对象可以看作是NumPy结构化数组的增强版本,

其中,行 (X轴)和列 (Y轴)使用标签而不是简单的整数索引进行标识。

我们先介绍这三个基本熊猫的数据结构:Series,DataFrame和Index。

Series:是带有显示索引数据的一维数组说明:它的索引可以不必是整数。是具有与值相关联的显式定义的索引。
有时只是操作Series ,比操作字典更有效。

DataFrame:是一个带有显示索引数据的二维数组。
实际上DataFrame是由多个Series 组成。 DataFrame 不仅有行索引,还有列索引。

Index:一个显式索引,可以修改,可逻辑操作的一维数组。


二、Series 数据结构

import pandas as pd

'''
# 1、Series 、
说明:Series是一维数组。可以从列表或数组中创建它
'''
data = pd.Series([0.25, 0.5, 0.75, 1.0])
print(data)

# 通过values、indexs属性访问它
print(data.values)
print(data.index)

# 通过索引访问它
print(data[1])

# 通过切片来访问它
print(data[1:3])

'''说明:Pandas 的Series 比Numpy的一维数组和列表更加具有灵活性'''

# 创建带有显示索引的Series

data = pd.Series([0.25, 0.5, 0.75, 1.0],
                 index=['a', 'b', 'c', 'd'])

print(data)
# 通过显示索引访问数据
print(data['a'])

# 也可以用非连续数字来作为索引
data = pd.Series([0.25, 0.5, 0.75, 1.0],
                 index=[2, 5, 3, 7])
print(data)
print(data[5])

# 把字典转化为Series。
population_dict = {'California': 38332521,
                   'Texas': 26448193,
                   'New York': 19651127,
                   'Florida': 19552860,
                   'Illinois': 12882135}
population = pd.Series(population_dict)
print(population)

# 可以用字典的方式,读取数据
print(population['California'])

# 与字典不同的它还支持切片
print(population['California':'Florida'])

# 创建series 对象
pd.Series(data, index=index)

# 例如,data可以是列表或NumPy数组,在这种情况下index默认为整数序列
print(pd.Series([2, 4, 6]))

# Series 也可以是个标量
print(pd.Series(5, index=[100, 200, 300]))

# data 也可以是个字典
print(pd.Series({2: 'a', 1: 'b', 3: 'c'}))

# 只显示和构建设置了显示索引的数据
print(pd.Series({2: 'a', 1: 'b', 3: 'c'}, index=[3, 2]))

三、DataFrame数据结构

"""
# 2、DataFrame
说明:带有显示索引的二维数组。由多个Series组成。
"""
import pandas as pd
import numpy as np

area_dict = {'California': 423967, 'Texas': 695662, 'New York': 141297,
             'Florida': 170312, 'Illinois': 149995}

population_dict = {'California': 38332521,
                   'Texas': 26448193,
                   'New York': 19651127,
                   'Florida': 19552860,
                   'Illinois': 12882135}
#
population = pd.Series(population_dict)
area = pd.Series(area_dict)

# 通过两个Series 创建一个DataFrame. 每个Series 是一列。
states = pd.DataFrame({'population': population,
                       'area': area})
print(states)
print(states['area'])  # 访问某列就是返回Series数据

# 通过index、columns、values 属性访问DataFrame 的数据
print(states.index)
print(states.columns)
print(states.values)

# 通过单列的Series 构造单列的DataFrame
print(pd.DataFrame(population, columns=['population']))

# 通过 list of dicts 来创建DateFrame
data = [{'a': i, 'b': 2 * i} for i in range(3)]
print(data)
print(pd.DataFrame(data))

# 通过dict of Lists 来创建DataFrame
test_dict = {'id': [1, 2, 3],
             'name': ['Alice', 'Bob', 'Cindy'],
             'math': [90, 89, 99],
             'english': [89, 94, 80]
             }

# [1].直接写入参数test_dict
test_dict_df = pd.DataFrame(test_dict, index=['a', 'b', 'c'])
print(test_dict_df)

# 即使字典中缺少某些键,也会用NaN来进行填充
print(pd.DataFrame([{'a': 1, 'b': 2}, {'b': 3, 'c': 4}]))

# 通过Numpy的二维数组,来创建DataFrame
print(pd.DataFrame(np.random.rand(3, 2),  # 3行,2列的Numpy二维数组
                   columns=['foo', 'bar'],  # 给每列起名字
                   index=['a', 'b', 'c']))  # 给每行加索引

# 通过Numpy结构化数据创建DataFrame
A = np.zeros(3, dtype=[('A', 'i8'), ('B', 'f8')])
print(pd.DataFrame(A))

Python 异步编程-Gevent 总结

zhangtongle阅读(2382)

1、Gevent介绍与案例

2、 案例之非确定性task与服务器异步

3、 状态判断与杀死进程

4、超时控制

5、猴子补丁与事件异步通信

6、异步安全操作之队列

7、组和池

8、锁和信号量以及线程局部变量

9、子进程

10、Actor 模型

11、案例-聊天服务器

12、案例-ZeroMQ与Servers

13、案例-Websockets

以上来源参考:https://sdiehl.github.io/gevent-tutorial/ 对此进行翻译和整理,部分修改以适用于gevent1.4.0 版本。

Python-异步编程-gevent-13

zhangtongle阅读(2333)

Websockets

需要 gevent-websocket 的 Websocket 示例。

# Simple gevent-websocket server
import json
import random

from gevent import pywsgi, sleep
from geventwebsocket.handler import WebSocketHandler

class WebSocketApp(object):
    '''Send random data to the websocket'''

    def __call__(self, environ, start_response):
        ws = environ['wsgi.websocket']
        x = 0
        while True:
            data = json.dumps({'x': x, 'y': random.randint(1, 5)})
            ws.send(data)
            x += 1
            sleep(0.5)

server = pywsgi.WSGIServer(("", 10000), WebSocketApp(),
    handler_class=WebSocketHandler)
server.serve_forever()

HTML Page:

<html>
    <head>
        <title>Minimal websocket application</title>
        <script type="text/javascript" src="jquery.min.js"></script>
        <script type="text/javascript">
        $(function() {
            // Open up a connection to our server
            var ws = new WebSocket("ws://localhost:10000/");

            // What do we do when we get a message?
            ws.onmessage = function(evt) {
                $("#placeholder").append('<p>' + evt.data + '</p>')
            }
            // Just update our conn_status field with the connection status
            ws.onopen = function(evt) {
                $('#conn_status').html('<b>Connected</b>');
            }
            ws.onerror = function(evt) {
                $('#conn_status').html('<b>Error</b>');
            }
            ws.onclose = function(evt) {
                $('#conn_status').html('<b>Closed</b>');
            }
        });
    </script>
    </head>
    <body>
        <h1>WebSocket Example</h1>
        <div id="conn_status">Not Connected</div>
        <div id="placeholder" style="width:600px;height:300px;"></div>
    </body>
</html>

Python-异步编程-gevent-12

zhangtongle阅读(2588)

Gevent ZeroMQ

ZeroMQ 被其作者描述为“充当并发框架的套接字库”。 它是一个非常强大的消息传递层,用于构建并发和分布式应用程序。

ZeroMQ 提供了多种套接字原语,其中最简单的是请求-响应套接字对。 套接字有两种感兴趣的方法 send 和 recv,这两种方法通常都是阻塞操作。 但这已由 Travis Cline 的一个出色的库(现在是 pyzmq 的一部分)进行了补救,该库使用 gevent.socket 以非阻塞方式轮询 ZeroMQ 套接字。

# Note: Remember to ``pip install pyzmq``
import gevent
import zmq.green as zmq

# Global Context
context = zmq.Context()

def server():
    server_socket = context.socket(zmq.REQ)
    server_socket.bind("tcp://127.0.0.1:5000")

    for request in range(1,10):
        server_socket.send("Hello")
        print('Switched to Server for %s' % request)
        # Implicit context switch occurs here
        server_socket.recv()

def client():
    client_socket = context.socket(zmq.REP)
    client_socket.connect("tcp://127.0.0.1:5000")

    for request in range(1,10):

        client_socket.recv()
        print('Switched to Client for %s' % request)
        # Implicit context switch occurs here
        client_socket.send("World")

publisher = gevent.spawn(server)
client    = gevent.spawn(client)

gevent.joinall([publisher, client])

Switched to Server for 1
Switched to Client for 1
Switched to Server for 2
Switched to Client for 2
Switched to Server for 3
Switched to Client for 3
Switched to Server for 4
Switched to Client for 4
Switched to Server for 5
Switched to Client for 5
Switched to Server for 6
Switched to Client for 6
Switched to Server for 7
Switched to Client for 7
Switched to Server for 8
Switched to Client for 8
Switched to Server for 9
Switched to Client for 9

Simple Servers

# On Unix: Access with ``$ nc 127.0.0.1 5000``
# On Window: Access with ``$ telnet 127.0.0.1 5000``

from gevent.server import StreamServer

def handle(socket, address):
    socket.send("Hello from a telnet!\n")
    for i in range(5):
        socket.send(str(i) + '\n')
    socket.close()

server = StreamServer(('127.0.0.1', 5000), handle)
server.serve_forever()

WSGI Servers

Gevent 提供了两个 WSGI 服务器,用于通过 HTTP 提供内容。 此后称为 wsgi 和 pywsgi:

gevent.wsgi.WSGIServer
gevent.pywsgi.WSGIServer
在 1.0.x 之前的早期 gevent 版本中,gevent 使用 libevent 而不是 libev。 Libevent 包含一个快速的 HTTP 服务器,它被 gevent 的 wsgi 服务器使用。

在 gevent 1.0.x 中没有包含 http 服务器。 相反,gevent.wsgi 现在是 gevent.pywsgi 中纯 Python 服务器的别名。

Streaming Servers

如果您使用的是 gevent 1.0.x,则本节不适用

对于那些熟悉流式 HTTP 服务的人来说,核心思想是在标头中我们不指定内容的长度。 相反,我们保持连接打开并沿管道冲洗块,在每个前面加上一个表示块长度的十六进制数字。 当发送大小为零的块时,流将关闭。

HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked

8
<p>Hello

9
World</p>

0

无法在 wsgi 中创建上述 HTTP 连接,因为不支持流式传输。 相反,它必须缓冲。

from gevent.wsgi import WSGIServer

def application(environ, start_response):
    status = '200 OK'
    body = '<p>Hello World</p>'

    headers = [
        ('Content-Type', 'text/html')
    ]

    start_response(status, headers)
    return [body]

WSGIServer(('', 8000), application).serve_forever()

然而,使用 pywsgi 我们可以将我们的处理程序编写为生成器并通过 chun 生成结果块

from gevent.pywsgi import WSGIServer

def application(environ, start_response):
    status = '200 OK'

    headers = [
        ('Content-Type', 'text/html')
    ]

    start_response(status, headers)
    yield "<p>Hello"
    yield "World</p>"

WSGIServer(('', 8000), application).serve_forever()

但无论如何,与其他 Python 服务器相比,Gevent 服务器的性能是惊人的。 libev 是一项经过严格审查的技术,众所周知,它的衍生服务器在规模上表现良好。

要进行基准测试,请尝试使用 Apache Benchmark ab 或查看 Python WSGI 服务器的基准以与其他服务器进行比较。

$ ab -n 10000 -c 100 http://127.0.0.1:8000/

Python-异步编程-gevent-11

zhangtongle阅读(2352)

Chat Server

最后一个激励示例,一个实时聊天室。 此示例需要 Flask(但不一定如此,您可以使用 Django、Pyramid 等)。 可以在此处找到相应的 Javascript 和 HTML 文件 https://github.com/ZhangTongLe/minichat

# Micro gevent chatroom.
# ----------------------

from flask import Flask, render_template, request

from gevent import queue
from gevent.pywsgi import WSGIServer

import simplejson as json

app = Flask(__name__)
app.debug = True

rooms = {
    'topic1': Room(),
    'topic2': Room(),
}

users = {}

class Room(object):

    def __init__(self):
        self.users = set()
        self.messages = []

    def backlog(self, size=25):
        return self.messages[-size:]

    def subscribe(self, user):
        self.users.add(user)

    def add(self, message):
        for user in self.users:
            print(user)
            user.queue.put_nowait(message)
        self.messages.append(message)

class User(object):

    def __init__(self):
        self.queue = queue.Queue()

@app.route('/')
def choose_name():
    return render_template('choose.html')

@app.route('/<uid>')
def main(uid):
    return render_template('main.html',
        uid=uid,
        rooms=rooms.keys()
    )

@app.route('/<room>/<uid>')
def join(room, uid):
    user = users.get(uid, None)

    if not user:
        users[uid] = user = User()

    active_room = rooms[room]
    active_room.subscribe(user)
    print('subscribe %s %s' % (active_room, user))

    messages = active_room.backlog()

    return render_template('room.html',
        room=room, uid=uid, messages=messages)

@app.route("/put/<room>/<uid>", methods=["POST"])
def put(room, uid):
    user = users[uid]
    room = rooms[room]

    message = request.form['message']
    room.add(':'.join([uid, message]))

    return ''

@app.route("/poll/<uid>", methods=["POST"])
def poll(uid):
    try:
        msg = users[uid].queue.get(timeout=10)
    except queue.Empty:
        msg = []
    return json.dumps(msg)

if __name__ == "__main__":
    http = WSGIServer(('', 5000), app)
    http.serve_forever()

Python-异步编程-gevent-10

zhangtongle阅读(2314)

Actors

Actor 模型是由 Erlang 语言推广的更高级别的并发模型。 简而言之,主要思想是您有一组独立的 Actor,它们有一个收件箱,他们可以从中接收来自其他 Actor 的消息。 Actor 内的主循环遍历其消息并根据其所需的行为采取行动。

Gevent 没有原始 Actor 类型,但我们可以使用子类 Greenlet 中的 Queue 非常简单地定义一个。

import gevent
from gevent.queue import Queue

class Actor(gevent.Greenlet):

    def __init__(self):
        self.inbox = Queue()
        Greenlet.__init__(self)

    def receive(self, message):
        """
        Define in your subclass.
        """
        raise NotImplemented()

    def _run(self):
        self.running = True

        while self.running:
            message = self.inbox.get()
            self.receive(message)
In a use case:

import gevent
from gevent.queue import Queue
from gevent import Greenlet

class Pinger(Actor):
    def receive(self, message):
        print(message)
        pong.inbox.put('ping')
        gevent.sleep(0)

class Ponger(Actor):
    def receive(self, message):
        print(message)
        ping.inbox.put('pong')
        gevent.sleep(0)

ping = Pinger()
pong = Ponger()

ping.start()
pong.start()

ping.inbox.put('start')
gevent.joinall([ping, pong])

Python-异步编程-gevent-9

zhangtongle阅读(2301)

子进程

从 gevent 1.0 开始,已经添加了 gevent.subprocess——Python 的 subprocess 模块的修补版本。 它支持对子进程的协作等待。

# 中间插入子进程
import gevent
from gevent.subprocess import Popen, PIPE

def cron():
    while True:
        print("cron")
        gevent.sleep(0.2)

g = gevent.spawn(cron)
sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True)
out, err = sub.communicate()
g.kill()
print(out.rstrip())

'''

cron
cron
cron
cron
cron
Linux

'''

许多人还希望将 gevent 和 multiprocessing 一起使用。 最明显的挑战之一是多处理提供的进程间通信在默认情况下是不合作的。 由于基于 multiprocessing.Connection 的对象(例如 Pipe)公开了它们的底层文件描述符,因此 gevent.socket.wait_read 和 wait_write 可用于在实际读取/写入之前协同等待准备读取/准备写入事件:

import gevent
from multiprocessing import Process, Pipe
from gevent.socket import wait_read, wait_write

# To Process
a, b = Pipe()

# From Process
c, d = Pipe()

def relay():
    for i in xrange(10):
        msg = b.recv()
        c.send(msg + " in " + str(i))

def put_msg():
    for i in xrange(10):
        wait_write(a.fileno())
        a.send('hi')

def get_msg():
    for i in xrange(10):
        wait_read(d.fileno())
        print(d.recv())

if __name__ == '__main__':
    proc = Process(target=relay)
    proc.start()

    g1 = gevent.spawn(get_msg)
    g2 = gevent.spawn(put_msg)
    gevent.joinall([g1, g2], timeout=1)

但是请注意,多处理和 gevent 的组合带来了某些依赖于操作系统的陷阱,其中包括:

在符合 POSIX 的系统上分叉后,孩子的 gevent 状态是不适定的。一个副作用是 greenlets 在 multiprocessing.Process 创建之前产生,在父进程和子进程中运行。
上面 put_msg() 中的 a.send() 可能仍会以非合作方式阻塞调用线程:准备写入事件仅确保可以写入一个字节。在尝试写入完成之前,底层缓冲区可能已满。
上述基于 wait_write() / wait_read() 的方法在 Windows 上不起作用(IOError: 3 is not a socket (file is not supported)),因为 Windows 无法监视事件管道。
Python 包 gipc 在符合 POSIX 的系统和 Windows 系统上以一种非常透明的方式为您克服了这些挑战。它提供了 gevent-aware multiprocessing.Process-based child processes 和基于管道的 gevent-cooperative 进程间通信。

Python-异步编程-gevent-8

zhangtongle阅读(2268)

锁和信号量

信号量是一种低级同步原语,它允许 greenlet 协调和限制并发访问或执行。信号量公开了两个方法,acquire和release的信号量已经被获取和释放被称为绑定的旗语的次数之间的差异。如果信号量边界达到 0,它将阻塞,直到另一个 greenlet 释放其获取。

from gevent import sleep
from gevent.__semaphore import BoundedSemaphore
from gevent.pool import Pool

sem = BoundedSemaphore(2)

def worker1(n):
    sem.acquire()
    print('Worker %i acquired semaphore' % n)
    sleep(0)
    sem.release()
    print('Worker %i released semaphore' % n)

def worker2(n):
    with sem:
        print('Worker %i acquired semaphore' % n)
        sleep(0)
    print('Worker %i released semaphore' % n)

pool = Pool()
pool.map(worker1, range(0, 2))
pool.map(worker2, range(3, 6))

# Worker 0 acquired semaphore
# Worker 1 acquired semaphore
# Worker 0 released semaphore
# Worker 1 released semaphore
# Worker 3 acquired semaphore
# Worker 4 acquired semaphore
# Worker 3 released semaphore
# Worker 4 released semaphore
# Worker 5 acquired semaphore
# Worker 5 released semaphore

线程局部变量

Gevent 还允许您指定 greenlet 上下文的本地数据。在内部,这是作为全局查找实现的,它寻址由 greenlet 的getcurrent()值作为键的私有命名空间。

import gevent
from gevent.local import local

stash = local()

def f1():
    stash.x = 1
    print(stash.x)

def f2():
    stash.y = 2
    print(stash.y)
    try:
        stash.x
    except AttributeError:
        print("x is not local to f2")

g1 = gevent.spawn(f1)
g2 = gevent.spawn(f2)

gevent.joinall([g1, g2, g2])

Python-异步编程-gevent-6

zhangtongle阅读(2221)

队列

队列是有序的数据集,它们具有通常的put/get 操作,但以一种可以在 Greenlets 之间安全操作的方式编写。

例如,如果一个 Greenlet 从队列中抓取一个项目,另一个同时执行的 Greenlet 将不会抓取同一个项目。

import gevent
from gevent.queue import Queue

tasks = Queue()

def worker(n):
    while not tasks.empty():
        task = tasks.get()
        print('Worker %s got task %s' % (n, task))
        gevent.sleep(0)

    print('Quitting time!')

def boss():
    for i in range(1, 25):
        tasks.put_nowait(i)

gevent.spawn(boss).join()

gevent.joinall([
    gevent.spawn(worker, 'steve'),
    gevent.spawn(worker, 'john'),
    gevent.spawn(worker, 'nancy'),
])

'''
输出结果

Worker steve got task 1
Worker john got task 2
Worker nancy got task 3
Worker steve got task 4
Worker john got task 5
Worker nancy got task 6
Worker steve got task 7
Worker john got task 8
Worker nancy got task 9
Worker steve got task 10
Worker john got task 11
Worker nancy got task 12
Worker steve got task 13
Worker john got task 14
Worker nancy got task 15
Worker steve got task 16
Worker john got task 17
Worker nancy got task 18
Worker steve got task 19
Worker john got task 20
Worker nancy got task 21
Worker steve got task 22
Worker john got task 23
Worker nancy got task 24
Quitting time!
Quitting time!
Quitting time!

Process finished with exit code 0

'''

从输出结果中,可以看出从队列取出来的资源,并没有重复。

队列也可以阻塞put或get在需要时阻塞。

每个put and get操作都有一个非阻塞对应物,put_nowait并且 get_nowait不会阻塞,而是在操作不可能时引发gevent.queue.Empty或 引发gevent.queue.Full。

在这个例子中,我们让老板同时运行到工人,并且对队列有一个限制,防止它包含三个以上的元素。此限制意味着put 操作将阻塞,直到队列上有空间。相反,get如果队列上没有要获取的元素,则操作将阻塞,它还需要一个超时参数,以允许队列退出,gevent.queue.Empty如果在超时的时间范围内找不到工作,则异常退出 。

import gevent
from gevent.queue import Queue, Empty

tasks = Queue(maxsize=3)  # 队列最大3

def worker(name):
    try:
        while True:
            task = tasks.get(timeout=1)  # 将队列大小减 1
            print('Worker %s got task %s' % (name, task))
            gevent.sleep(0)
    except Empty:
        print('Quitting time!')

def boss():
    """
    老板会等到个别工人下班
     free,因为任务队列的最大大小是 3。
    """

    for i in range(1, 10):
        tasks.put(i)  # 只能添加3次,被阻塞了
        print('我是迭代1')
    print('在迭代 1 中分配所有工作')

    for i in range(10, 20):
        tasks.put(i)
        print('我是迭代2')
    print('在迭代 2 中分配所有工作')

gevent.joinall([
    gevent.spawn(boss),
    gevent.spawn(worker, 'steve'),
    gevent.spawn(worker, 'john'),
    gevent.spawn(worker, 'bob'),
])

'''
我是迭代1
我是迭代1
我是迭代1
Worker steve got task 1
Worker john got task 2
Worker bob got task 3
我是迭代1
我是迭代1
我是迭代1
Worker steve got task 4
Worker john got task 5
Worker bob got task 6
我是迭代1
我是迭代1
我是迭代1
在迭代 1 中分配所有工作
Worker steve got task 7
Worker john got task 8
Worker bob got task 9
我是迭代2
我是迭代2
我是迭代2
Worker steve got task 10
Worker john got task 11
Worker bob got task 12
我是迭代2
我是迭代2
我是迭代2
Worker steve got task 13
Worker john got task 14
Worker bob got task 15
我是迭代2
我是迭代2
我是迭代2
Worker steve got task 16
Worker john got task 17
Worker bob got task 18
我是迭代2
在迭代 2 中分配所有工作
Worker steve got task 19
Quitting time!
Quitting time!
Quitting time!

Process finished with exit code 0

'''

Python 异步编程-Gevent-4

zhangtongle阅读(2275)

Gevent超时控制

超时是对代码块或 Greenlet 运行时的限制。

import gevent
from gevent import Timeout

seconds = 10

timeout = Timeout(seconds)
timeout.start()

def wait():
    gevent.sleep(10)

try:
    gevent.spawn(wait).join()
except Timeout:
    print('Could not complete')

它们还可以在with语句中与上下文管理器一起使用。

import gevent
from gevent import Timeout

time_to_wait = 5 # seconds

class TooLong(Exception):
    pass

with Timeout(time_to_wait, TooLong):
    gevent.sleep(10)

此外,gevent 还为各种 Greenlet 和数据结构相关调用提供超时参数。例如:

import gevent
from gevent import Timeout

def wait():
    gevent.sleep(2)

timer = Timeout(1).start()
thread1 = gevent.spawn(wait)

try:
    thread1.join(timeout=timer)
except Timeout:
    print('Thread 1 timed out')

# --

timer = Timeout.start_new(1)
thread2 = gevent.spawn(wait)

try:
    thread2.get(timeout=timer)
except Timeout:
    print('Thread 2 timed out')

# --

try:
    gevent.with_timeout(1, wait)
except Timeout:
    print('Thread 3 timed out')

Thread 1 timed out
Thread 2 timed out
Thread 3 timed out

特别的技术,给特别的你!

联系QQ:1071235258QQ群:710045715
error: Sorry,暂时内容不可复制!