Python SQLite3数据库操作类分享

yipeiwu_com6年前Python基础

接触Python时间也不是很长的,最近有个项目需要分析数据,于是选用Python为编程语言,除了语言特性外主要还是看重Python对于SQLite3数据库良好的支持能力了,因为需要灵活处理大量的中间数据。

刚开始一些模块我还乐此不疲的写SQL语句,后来渐渐厌倦了,回想到以前捣鼓C#的时候利用反射初步构建了个SQL查询构造器,直到发现linq,于是放弃了这个计划,当然微软后来又推出了Entity Framework,这些都是后话了,而且现在我对微软的东西兴趣不是很大的,好了,扯多了,下面继续正文。

对了,再扯一句,优秀的博客程序Drupal也使用了类似的查询构造器进行数据库查询,避免直接写SQL语句,另外这样做的一点点好处就是,可以一定程度的屏蔽平台相关性,对于数据库迁移还是有帮助的。

不过我今天介绍的数据库辅助类查询构造器是个很简单的东东,甚至只限于SQLite数据库,如果有童鞋感兴趣可以完善下,我目前只要操作SQLite顺手就可以了,对于比较大的数据库应用就直接上ORM吧。


先看代码:

复制代码 代码如下:

import sqlite3

# ***************************************************
# *
# * Description: Python操作SQLite3数据库辅助类(查询构造器)
# * Author: wangye
# *
# ***************************************************

def _wrap_value(value):
    return repr(value)

def _wrap_values(values):
    return list(map(_wrap_value, values))

def _wrap_fields(fields):
    for key,value in fields.items():
        fields[key] = _wrap_value(value)
    return fields

def _concat_keys(keys):
    return "[" + "],[".join(keys) + "]"

def _concat_values(values):
    return ",".join(values)

def _concat_fields(fields, operator = (None, ",")):
    if operator:
        unit_operator, group_operator = operator
    # fields = _wrap_fields(fields)
    compiled = []
    for key,value in fields.items():
        compiled.append("[" + key + "]")
        if unit_operator:
            compiled.append(unit_operator)
            compiled.append(value)
        compiled.append(group_operator)
    compiled.pop() # pop last group_operator
    return " ".join(compiled)

class DataCondition(object):
    """
        本类用于操作SQL构造器辅助类的条件语句部分

        例如:
        DataCondition(("=", "AND"), id = 26)
        DataCondition(("=", "AND"), True, id = 26)
    """

    def __init__(self, operator = ("=", "AND"), ingroup = True, **kwargs):
        """
            构造方法
            参数:
                operator 操作符,分为(表达式操作符, 条件运算符)
                ingroup  是否分组,如果分组,将以括号包含
                kwargs   键值元组,包含数据库表的列名以及值
                         注意这里的等于号不等于实际生成SQL语句符号
                         实际符号是由operator[0]控制的
            例如:
            DataCondition(("=", "AND"), id = 26)
            (id=26)
            DataCondition((">", "OR"), id = 26, age = 35)
            (id>26 OR age>35)
            DataCondition(("LIKE", "OR"), False, name = "John", company = "Google")
            name LIKE 'John' OR company LIKE "Google"
        """
        self.ingroup = ingroup
        self.fields = kwargs
        self.operator = operator

    def __unicode__(self):
        self.fields = _wrap_fields(self.fields)
        result = _concat_fields(self.fields, self.operator)
        if self.ingroup:
            return "(" + result + ")"
        return result

    def __str__(self):
        return self.__unicode__()

    def toString(self):
        return self.__unicode__()

class DataHelper(object):

    """
        SQLite3 数据查询辅助类
    """

    def __init__(self, filename):
        """
            构造方法
            参数: filename 为SQLite3 数据库文件名
        """
        self.file_name = filename

    def open(self):
        """
            打开数据库并设置游标
        """
        self.connection = sqlite3.connect(self.file_name)
        self.cursor = self.connection.cursor()
        return self

    def close(self):
        """
            关闭数据库,注意若不显式调用此方法,
            在类被回收时也会尝试调用
        """
        if hasattr(self, "connection") and self.connection:
            self.connection.close()

    def __del__(self):
        """
            析构方法,做一些清理工作
        """
        self.close()

    def commit(self):
        """
            提交事务
            SELECT语句不需要此操作,默认的execute方法的
            commit_at_once设为True会隐式调用此方法,
            否则就需要显示调用本方法。
        """
        self.connection.commit()

    def execute(self, sql = None, commit_at_once = True):
        """
            执行SQL语句
            参数:
                sql  要执行的SQL语句,若为None,则调用构造器生成的SQL语句。
                commit_at_once 是否立即提交事务,如果不立即提交,
                对于非查询操作,则需要调用commit显式提交。
        """
        if not sql:
            sql = self.sql
        self.cursor.execute(sql)
        if commit_at_once:
            self.commit()

    def fetchone(self, sql = None):
        """
            取一条记录
        """
        self.execute(sql, False)
        return self.cursor.fetchone()

    def fetchall(self, sql = None):
        """
            取所有记录
        """
        self.execute(sql, False)
        return self.cursor.fetchall()

    def __concat_keys(self, keys):
        return _concat_keys(keys)

    def __concat_values(self, values):
        return _concat_values(values)

    def table(self, *args):
        """
            设置查询的表,多个表名用逗号分隔
        """
        self.tables = args
        self.tables_snippet = self.__concat_keys(self.tables)
        return self

    def __wrap_value(self, value):
        return _wrap_value(value)

    def __wrap_values(self, values):
        return _wrap_values(values)

    def __wrap_fields(self, fields):
        return _wrap_fields(fields)

    def __where(self):
        # self.condition_snippet
        if hasattr(self, "condition_snippet"):
            self.where_snippet = " WHERE " + self.condition_snippet

    def __select(self):
        template = "SELECT %(keys)s FROM %(tables)s"
        body_snippet_fields = {
            "tables" : self.tables_snippet,
            "keys" : self.__concat_keys(self.body_keys),
        }
        self.sql = template % body_snippet_fields

    def __insert(self):
        template = "INSERT INTO %(tables)s (%(keys)s) VALUES (%(values)s)"
        body_snippet_fields = {
            "tables" : self.tables_snippet,
            "keys" : self.__concat_keys(list(self.body_fields.keys())),
            "values" : self.__concat_values(list(self.body_fields.values()))
        }
        self.sql = template % body_snippet_fields

    def __update(self):
        template = "UPDATE %(tables)s SET %(fields)s"
        body_snippet_fields = {
            "tables" : self.tables_snippet,
            "fields" : _concat_fields(self.body_fields, ("=",","))
        }
        self.sql = template % body_snippet_fields

    def __delete(self):
        template = "DELETE FROM %(tables)s"
        body_snippet_fields = {
            "tables" : self.tables_snippet
        }
        self.sql = template % body_snippet_fields

    def __build(self):
        {
            "SELECT": self.__select,
            "INSERT": self.__insert,
            "UPDATE": self.__update,
            "DELETE": self.__delete
        }[self.current_token]()

    def __unicode__(self):
        return self.sql

    def __str__(self):
        return self.__unicode__()

    def select(self, *args):
        self.current_token = "SELECT"
        self.body_keys = args
        self.__build()
        return self

    def insert(self, **kwargs):
        self.current_token = "INSERT"
        self.body_fields = self.__wrap_fields(kwargs)
        self.__build()
        return self

    def update(self, **kwargs):
        self.current_token = "UPDATE"
        self.body_fields = self.__wrap_fields(kwargs)
        self.__build()
        return self

    def delete(self, *conditions):
        self.current_token = "DELETE"
        self.__build()
        #if *conditions:
        self.where(*conditions)
        return self

    def where(self, *conditions):
        conditions = list(map(str, conditions))
        self.condition_snippet = " AND ".join(conditions)
        self.__where()
        if hasattr(self, "where_snippet"):
            self.sql += self.where_snippet
        return self

下面举几个例子供大家参考吧:

复制代码 代码如下:

db = DataHelper("/home/wangye/sample.db3")
db.open() # 打开数据库
db.execute("""
    CREATE TABLE [staffs] (
      [staff_id] INTEGER PRIMARY KEY AUTOINCREMENT,
      [staff_name] TEXT NOT NULL,
      [staff_cardnum] TEXT NOT NULL,
      [staff_reserved] INTEGER NOT NULL
)
""") # 直接执行SQL语句,注意这里commit_at_once默认为True

db.table("staffs").insert(staff_name="John", staff_cardnum="1001", staff_reserved=0)
# 插入一条记录

rs = db.table("staffs").select("staff_id", "staff_name").fetchall()
# 直接取出所有staff_id和staff_name

rs = db.table("staffs").select("staff_name").where(DataCondition(("=", "AND"), id = 1)).fetchone()
# 取一条staff_id为1的staff_name

rs = db.table("staffs").select("staff_name").where(DataCondition(("<", "AND"), id = 100), DataCondition(("=", "AND"), staff_reserved = 1)).fetchone()
# 取一条id小于100并且staff_reserved为1的staff_name记录

db.close() # 关闭数据库

目前还没有让其支持星号(*)操作符,另外在多表同名列操作方面处理得也不是很好,这个只用于日常简单的脚本操作,最好不要用于生产环境,因为可能有未知问题。

相关文章

浅谈Python 中整型对象的存储问题

在 Python 整型对象所存储的位置是不同的, 有一些是一直存储在某个存储里面, 而其它的, 则在使用时开辟出空间. 说这句话的理由, 可以看看如下代码: a = 5 b = 5...

Python 图像处理: 生成二维高斯分布蒙版的实例

Python 图像处理: 生成二维高斯分布蒙版的实例

在图像处理以及图像特效中,经常会用到一种成高斯分布的蒙版,蒙版可以用来做图像融合,将不同内容的两张图像结合蒙版,可以营造不同的艺术效果。 这里II 表示合成后的图像,FF 表示前景图,...

深入浅析Python科学计算库Scipy及安装步骤

一、Scipy 入门 1.1、Scipy 简介及安装 官网:http://www.scipy.org/SciPy 安装:在C:\Python27\Scripts下打开cmd执行: 执...

python将多个文本文件合并为一个文本的代码(便于搜索)

但是,当一本书学过之后,对一般的技术和函数都有了印象,突然想要查找某个函数的实例代码时,却感到很困难,因为一本书的源代码目录很长,往往有几十甚至上百个源代码文件,想要找到自己想要的函数实...

python操作cfg配置文件方式

*.cfg文件一般是程序运行的配置文件,python为读写常见配置文件提供了一个ConfigParser模块,所以在python中解析配置文件相当简单,下面就举例说明一下具体的操作方法。...