wordpress网站入口/上海职业技能培训机构
在我们编程时,有一些代码是固定的,例如Socket连接的代码,读取文件内容的代码,一般情况下我都是到网上搜一下然后直接粘贴下来改一改,当然如果你能自己记住所有的代码那更厉害,但是自己写毕竟不如粘贴来的快,而且自己写的代码还要测试,而一段经过测试的代码则可以多次使用,所以这里我就自己总结了一下python中常用的编程模板,如果还有哪些漏掉了请大家及时补充哈。
一、读写文件
1、读文件
(1)、一次性读取全部内容
filepath='D:/data.txt' #文件路径with open(filepath, 'r') as f:print f.read()
(2)读取固定字节大小
# -*- coding: UTF-8 -*-filepath='D:/data.txt' #文件路径f = open(filepath, 'r')
content=""
try:while True:chunk = f.read(8)if not chunk:breakcontent+=chunk
finally:f.close()print content
(3)每次读取一行
# -*- coding: UTF-8 -*-filepath='D:/data.txt' #文件路径f = open(filepath, "r")
content=""
try:while True:line = f.readline()if not line:breakcontent+=line
finally:f.close()print content
(4)一次读取所有的行
# -*- coding: UTF-8 -*-filepath='D:/data.txt' #文件路径with open(filepath, "r") as f:txt_list = f.readlines()for i in txt_list:print i,
2、写文件
# -*- coding: UTF-8 -*-filepath='D:/data1.txt' #文件路径with open(filepath, "w") as f: #w会覆盖原来的文件,a会在文件末尾追加f.write('1234')
二、连接Mysql数据库
1、连接
#!/usr/bin/python
# -*- coding: UTF-8 -*-import MySQLdbDB_URL='localhost'
USER_NAME='root'
PASSWD='1234'
DB_NAME='test'# 打开数据库连接
db = MySQLdb.connect(DB_URL,USER_NAME,PASSWD,DB_NAME)# 使用cursor()方法获取操作游标
cursor = db.cursor()# 使用execute方法执行SQL语句
cursor.execute("SELECT VERSION()")# 使用 fetchone() 方法获取一条数据库。
data = cursor.fetchone()print "Database version : %s " % data# 关闭数据库连接
db.close()
2、创建表
#!/usr/bin/python
# -*- coding: UTF-8 -*-import MySQLdb# 打开数据库连接
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法获取操作游标
cursor = db.cursor()# 如果数据表已经存在使用 execute() 方法删除表。
cursor.execute("DROP TABLE IF EXISTS EMPLOYEE")# 创建数据表SQL语句
sql = """CREATE TABLE EMPLOYEE (FIRST_NAME CHAR(20) NOT NULL,LAST_NAME CHAR(20),AGE INT, SEX CHAR(1),INCOME FLOAT )"""cursor.execute(sql)# 关闭数据库连接
db.close()
3、插入
#!/usr/bin/python
# -*- coding: UTF-8 -*-import MySQLdb# 打开数据库连接
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法获取操作游标
cursor = db.cursor()# SQL 插入语句
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,LAST_NAME, AGE, SEX, INCOME)VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
try:# 执行sql语句cursor.execute(sql)# 提交到数据库执行db.commit()
except:# Rollback in case there is any errordb.rollback()# 关闭数据库连接
db.close()
4、查询
#!/usr/bin/python
# -*- coding: UTF-8 -*-import MySQLdb# 打开数据库连接
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法获取操作游标
cursor = db.cursor()# SQL 查询语句
sql = "SELECT * FROM EMPLOYEE \WHERE INCOME > '%d'" % (1000)
try:# 执行SQL语句cursor.execute(sql)# 获取所有记录列表results = cursor.fetchall()for row in results:fname = row[0]lname = row[1]age = row[2]sex = row[3]income = row[4]# 打印结果print "fname=%s,lname=%s,age=%d,sex=%s,income=%d" % \(fname, lname, age, sex, income )
except:print "Error: unable to fecth data"# 关闭数据库连接
db.close()
5、更新
#!/usr/bin/python
# -*- coding: UTF-8 -*-import MySQLdb# 打开数据库连接
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法获取操作游标
cursor = db.cursor()# SQL 更新语句
sql = "UPDATE EMPLOYEE SET AGE = AGE + 1WHERE SEX = '%c'" % ('M')
try:# 执行SQL语句cursor.execute(sql)# 提交到数据库执行db.commit()
except:# 发生错误时回滚db.rollback()# 关闭数据库连接
db.close()
三、Socket
1、服务器
from socket import *
from time import ctimeHOST = ''
PORT = 21568
BUFSIZ = 1024
ADDR = (HOST, PORT)tcpSerSock = socket(AF_INET, SOCK_STREAM)
tcpSerSock.bind(ADDR)
tcpSerSock.listen(5)while True:print 'waiting for connection...'tcpCliSock, addr = tcpSerSock.accept() print '...connected from:', addrwhile True:try:data = tcpCliSock.recv(BUFSIZ) print '<', datatcpCliSock.send('[%s] %s' % (ctime(), data)) except:print 'disconnect from:', addrtcpCliSock.close() break
tcpSerSock.close()
2、客户端
from socket import *HOST = 'localhost'
PORT = 21568
BUFSIZ = 1024
ADDR = (HOST, PORT)tcpCliSock = socket(AF_INET, SOCK_STREAM)
tcpCliSock.connect(ADDR) try:while True:data = raw_input('>')if data == 'close':breakif not data:continuetcpCliSock.send(data) data = tcpCliSock.recv(BUFSIZ) print data
except:tcpCliSock.close()
四、多线程
import time, threading# 新线程执行的代码:
def loop():print 'thread %s is running...' % threading.current_thread().namen = 0while n < 5:n = n + 1print 'thread %s >>> %s' % (threading.current_thread().name, n)time.sleep(1)print 'thread %s ended.' % threading.current_thread().nameprint 'thread %s is running...' % threading.current_thread().name
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print 'thread %s ended.' % threading.current_thread().name
转载:http://blog.csdn.net/xingjiarong/article/details/50651235