导读:每个数据科学专业人员都必须从不同的数据源中提取、转换和加载(Extract-Transform-Load,ETL)数据。
本文将讨论如何使用Python为选定的流行数据库实现数据的ETL。对于关系数据库,选择MySQL,并将Elasticsearch作为文档数据库的例子展开。对于图形数据库,选择Neo4j。对于NoSQL,可参考此前文章中介绍的数据处理入门干货:MongoDB和pandas极简教程。
作者:萨扬·穆霍帕迪亚(Sayan Mukhopadhyay)
如需转载请联系华章科技
MySQLdb是在MySQL C接口上面开发的Python API。
1. 如何安装MySQLdb
首先,需要在计算机上安装Python MySQLdb模块。然后运行以下脚本:
#!/usr/bin/pythonimport MySQLdb
如果出现导入错误,则表示模块未正确安装。
以下是安装MySQL Python模块的说明:
$gunzip MySQL-python-1.2.2.tar.gz$tar –xvf MySQL-python-1.2.2.tar$cd MySQL-python-1.2.2$python setup.py build$python setup.py install
2. 数据库连接
在连接到MySQL数据库之前,请确保有以下内容。
3. INSERT操作
以下代码执行SQL INSERT语句,以便在STUDENT表中创建记录:
#!/usr/bin/pythonimport MySQLdb# Open database connectiondb = MySQLdb.connect("localhost","user","passwd","TEST" )# prepare a cursor object using cursor() methodcursor = db.cursor()# Prepare SQL query to INSERT a record into the database.sql = """INSERT INTO STUDENT(NAME, SUR_NAME, ROLL_NO) VALUES ('Sayan', 'Mukhopadhyay', 1)"""try: # Execute the SQL commandcursor.execute(sql) # Commit your changes in the database db.commit()except: # Rollback in case there is any error db.rollback()# disconnect from serverdb.close()
4. READ操作
以下代码从STUDENT表中提取数据并打印出来:
#!/usr/bin/pythonimport MySQLdb# Open database connectiondb = MySQLdb.connect("localhost","user","passwd","TEST" )# prepare a cursor object using cursor() methodcursor = db.cursor()# Prepare SQL query to INSERT a record into the database.sql = "SELECT * FROM STUDENT "try: # Execute the SQL commandcursor.execute(sql) # Fetch all the rows in a list of lists.results = cursor.fetchall()for row in results:fname = row[0]lname = row[1]id = row[2] # Now print fetched resultprint "name=%s,surname=%s,id=%d" % \ (fname, lname, id )except:print "Error: unable to fecth data"# disconnect from serverdb.close()
5. DELETE操作
以下代码从TEST中删除id=1的一行数据:
#!/usr/bin/pythonimport MySQLdb# Open database connectiondb = MySQLdb.connect("localhost","test","passwd","TEST")#prepare a cursor object using cursor() methodcursor = db.cursor()# PrepareSQL query to DELETE required recordssql="DELETE FROM STUDENT WHERE ROLL_NO=1"try:#Execute the SQL command cursor.execute(sql)#Commit your changes in the databasedb.commit()except:#Roll back in case there is any errordb.rollback()#disconnect from server db.close()
6. UPDATE操作
以下代码将lastname为Mukhopadhyay的记录更改为Mukherjee:
#!/usr/bin/pythonimport MySQLdb# Open database connectiondb = MySQLdb.connect("localhost","user","passwd","TEST" )# prepare a cursor object usingcursor() method cursor = db.cursor()# Prepare SQL query to UPDATE required recordssql = "UPDATE STUDENT SET SUR_NAME="Mukherjee" WHERE SUR_NAME="Mukhopadhyay""try: # Execute the SQL commandcursor.execute(sql) # Commit your changes in the databasedb.commit()except: # Rollback in case there is any errordb.rollback()# disconnect from serverdb.close()
7. COMMIT操作
提交操作提供对数据库完成修改命令,并且在此操作之后,无法将其还原。
8. ROLL-BACK操作
如果不能确认对数据的修改同时想要撤回操作,可以使用roll-back()方法。
以下是通过Python访问MySQL数据的完整示例。它将提供将数据存储为CSV文件或MySQL数据库中的数据的完整描述。
import MySQLdbimport sysout = open('Config1.txt','w')print "Enter the Data Source Type:"print "1. MySql"print "2. Text"print "3. Exit"while(1): data1 = sys.stdin.readline().strip() if(int(data1) == 1): out.write("source begin"+"\n"+"type=mysql\n") print "Enter the ip:" ip = sys.stdin.readline().strip() out.write("host=" + ip + "\n") print "Enter the database name:" db = sys.stdin.readline().strip() out.write("database=" + db + "\n") print "Enter the user name:" usr = sys.stdin.readline().strip() out.write("user=" + usr + "\n") print "Enter the password:" passwd = sys.stdin.readline().strip() out.write("password=" + passwd + "\n") connection = MySQLdb.connect(ip, usr, passwd, db) cursor = connection.cursor() query = "show tables" cursor.execute(query) data = cursor.fetchall() tables = [] for row in data: for field in row: tables.append(field.strip()) for i in range(len(tables)): print i, tables[i] tb = tables[int(sys.stdin.readline().strip())] out.write("table=" + tb + "\n") query = "describe " + tb cursor.execute(query) data = cursor.fetchall() columns = [] for row in data: columns.append(row[0].strip()) for i in range(len(columns)): print columns[i] print "Not index choose the exact column names seperated by coma" cols = sys.stdin.readline().strip() out.write("columns=" + cols + "\n") cursor.close() connection.close() out.write("source end"+"\n") print "Enter the Data Source Type:" print "1. MySql" print "2. Text" print "3. Exit" if(int(data1) == 2): print "path of text file:" path = sys.stdin.readline().strip() file = open(path) count = 0 for line in file: print line count = count + 1 if count > 3: break file.close() out.write("source begin"+"\n"+"type=text\n") out.write("path=" + path + "\n") print "enter delimeter:" dlm = sys.stdin.readline().strip() out.write("dlm=" + dlm + "\n") print "enter column indexes seperated by comma:" cols = sys.stdin.readline().strip() out.write("columns=" + cols + "\n") out.write("source end"+"\n") print "Enter the Data Source Type:" print "1. MySql" print "2. Text" print "3. Exit" if(int(data1) == 3): out.close() sys.exit()
Elasticsearch(ES)低级客户端提供从Python到ES REST端点的直接映射。Elasticsearch的一大优势是为数据分析提供了全栈解决方案。Elasticsearch作为数据库,有可配置前端Kibana、数据收集工具Logstash以及企业安全工具Shield。
下例具有称为cat、cluster、indices、ingest、nodes、snapshot和tasks的特征,根据任务分别转换为CatClient、ClusterClient、IndicesClient、IngestClient、NodesClient、SnapshotClient和TasksClient实例。这些实例是访问这些类及其方法的唯一方式。
你可以指定自己的连接类,可以通过提供的connection_class参数来使用。
# create connection to local host using the ThriftConnectionEs1=Elasticsearch(connection_class=ThriftConnection)
如果你想打开sniffing,那么有几个选择:
# create connection that will automatically inspect the cluster to get# the list of active nodes. Start with nodes running on 'esnode1' and# 'esnode2'Es1=Elasticsearch( ['esnode1', 'esnode2'],# sniff before doing anythingsniff_on_start=True,# refresh nodes after a node fails to respondsniff_on_connection_fail=True,# and also every 30 secondssniffer_timeout=30)
不同的主机可以有不同的参数,你可以为每个节点使用一个字典来指定它们。
# connect to localhost directly and another node using SSL on port 443# and an url_prefix. Note that ``port`` needs to be an int.Es1=Elasticsearch([{'host':'localhost'},{'host':'othernode','port':443,'url_prefix':'es','use_ssl':True},])
还支持SSL客户端身份验证(有关选项的详细说明,请参阅Urllib3HttpConnection)。
Es1=Elasticsearch(['localhost:443','other_host:443'],# turn on SSLuse_ssl=True,# make sure we verify SSL certificates (off by default)verify_certs=True,# provide a path to CA certs on diskca_certs='path to CA_certs',# PEM formatted SSL client certificateclient_cert='path to clientcert.pem',# PEM formatted SSL client keyclient_key='path to clientkey.pem')
许多类负责处理Elasticsearch集群。这里可以通过将参数传递给Elasticsearch类来忽略正在使用的默认子类。属于客户端的每个参数都将添加到Transport、ConnectionPool和Connection上。
例如,如果你要使用定制的ConnectionSelector类,只需传入selector_class参数即可。
整个API以很高的精确度包装了原始REST API,其中包括区分调用必需参数和可选参数。这意味着代码区分了按排位的参数和关键字参数。建议读者使用关键字参数来保证所有调用的一致性和安全性。
如果Elasticsearch返回2XX,则API调用成功(并将返回响应)。否则,将引发TransportError(或更具体的子类)的实例。你可以在异常中查看其他异常和错误状态。如果你不希望引发异常,可以通过传入ignore参数忽略状态代码或状态代码列表。
from elasticsearch import Elasticsearches=Elasticsearch()# ignore 400 cause by IndexAlreadyExistsException when creating an indexes.indices.create(index='test-index',ignore=400)# ignore 404 and 400es.indices.delete(index='test-index',ignore=[400,404])
Neo4j支持Neo4j Python驱动,并通过二进制协议与数据库连接。它试图保持简约及Python的惯用方式。
pip install neo4j-driverfrom neo4j.v1 import GraphDatabase, basic_authdriver11 = GraphDatabase.driver("bolt://localhost", auth=basic_auth("neo4j", "neo4j"))session11 = driver11.session()session11.run("CREATE (a:Person {name:'Sayan',title:'Mukhopadhyay'})")result11= session11.run("MATCH (a:Person) WHERE a.name ='Sayan' RETURN a.name AS name, a.title AS title")for recordi n result11:print("%s %s"% (record["title"], record["name"]))session11.close()
neo4j-rest-client的主要目标是确保已经使用本地Neo4j的Python程序员通过python-embedded的方式也能够访问Neo4j REST服务器。因此,neo4j-rest-client API的结构与python-embedded完全同步。但是引入了一种新的结构,以达到更加Python化的风格,并通过Neo4j团队引入的新特性来增强API。
另一个重要的数据库类是内存数据库。它在RAM中存储和处理数据。因此,对数据库的操作非常快,并且数据是灵活的。SQLite是内存数据库的一个流行范例。在Python中,需要使用sqlalchemy库来操作SQLite。在第1章的Flask和Falcon示例中,展示了如何从SQLite中选择数据。以下将展示如何在SQLite中存储Pandas数据框架:
from sqlalchemy import create_engineimport sqlite3conn = sqlite3.connect('multiplier.db')conn.execute('''CREATE TABLE if not exists multiplier (domain CHAR(50), low REAL, high REAL);''')conn.close()db_name = "sqlite:///" + prop + "_" + domain + str(i) + ".db"disk_engine = create_engine(db_name)df.to_sql('scores', disk_engine, if_exists='replace')
这部分内容请见此前的文章数据处理入门干货:MongoDB和pandas极简教程。
关于作者:Sayan Mukhopadhyay拥有超过13年的行业经验,并与瑞信、PayPal、CA Technologies、CSC和Mphasis等公司建立了联系。他对投资银行、在线支付、在线广告、IT架构和零售等领域的数据分析应用有着深刻的理解。他的专业领域是在分布式和数据驱动的环境(如实时分析、高频交易等)中,实现高性能计算。
本文摘编自《Python高级数据分析:机器学习、深度学习和NLP实例》,经出版方授权发布。
推荐语:本书介绍高级数据分析概念的广泛基础,以及最近的数据库革命,如Neo4j、弹性搜索和MongoDB。本书讨论了如何实现包括局部爬取在内的ETL技术,并应用于高频算法交易和目标导向的对话系统等领域。还有一些机器学习概念的例子,如半监督学习、深度学习和NLP。本书还涵盖了重要的传统数据分析技术,如时间序列和主成分分析等。