Python 快速大批量写数据到PostgreSQL 数据表

Python和PostgreSQL数据库协同工作,需要用到psycopg2工具库,这也是Python编程中最常用的接口。但是,为了适应不同的写入速度,psycopg2工具库提供了多种执行写入的方法,常用的有以下四种:

下面对这四种方法的执行写数据到PostgreSQL数据表的速度做一个比较,看看插入10000条、100000条、1000000条记录,以上各种方法各自耗时多少。


创建数据表,定义计时函数

 CREATE TABLE IF NOT EXISTS public.test_table (
   source TEXT,
   datetime TIMESTAMP,  
   mean_temp NUMERIC  
 );


自定义的计时函数:

def measure_time(func):
    def time_it(*args, **kwargs):
        time_started = time.time()
        func(*args, **kwargs)
        time_elapsed = time.time()
        print("{execute} 耗时 {sec} 秒,插入 {rows} 行记录".format(execute=func.__name__,
                                                                                          sec=round(
                                                                                              time_elapsed - time_started,
                                                                                              4), rows=len(
                kwargs.get('values'))))

    return time_it


方法1:execute()

此方法是执行数据库操作的标准函数,为了插入大批量的数据行,就必须要针对数据使用循环,并且调用该方法,一行一行地把数据插入到数据库的表中。执行execute()的方法代码如下:

def method_execute(self, values):     
        for value in values:
            self.cursor.execute("INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME), value)
        self.connection.commit()

众所周知,执行循环是很慢的,插入1万条、10万条、100万条数据的执行结果如下:

可以看出,数据越多,花费的时间越长,两者基本上是线性关系。


方法2:executemany()

Psycopg2提供的executemany()方法,并不是对execute()方法的简单改进,执行executemany()的方法代码如下:

@measure_time 
def method_execute_many(self, values):
        self.cursor.executemany("INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME), values)
        self.connection.commit()

下图是执行不同的数据量所耗费的时间,结果如下图:

可以看出,方法1与方法2耗时并没有明显的差别。



方法3:execute_batch()

这是Psycopg2提供的又一种方法,它减少了访问数据库服务器的次数,与executemany()相比,性能有了很大的改进。这种方法把许多语句拼接在一起,只受到page_size的限制(PostgreSQL中一般是8kB),执行execute_batch()的方法代码如下:

@measure_time
    def method_execute_batch(self, values):
        psycopg2.extras.execute_batch(self.cursor, "INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME),
                                      values)
        self.connection.commit()

插入不同长度的数据,耗时结果如下图:

可以看出,这种方法的执行速度,比execute()方法和executemany()方法提高了将近4倍!


方法4:构建个性化的字符串

这种方法是构建一个个性化的字符串,然后用一条execute()语句来执行,代码如下:

@measure_time
    def method_string_building(self, values):
        argument_string = ",".join("('%s', '%s')" % (x, y) for (x, y) in values)
        self.cursor.execute("INSERT INTO {table} VALUES".format(table=TABLE_NAME) + argument_string)
        self.connection.commit()

看看这种方法的运行时间,结果如下图:


从图中的结果可以看出,这种方法是最快的,比方法1、方法2提高了20多倍,比方法3提高了3倍!

本文完整代码如下:

import time
import psycopg2
import psycopg2.extras

TABLE_NAME = 'TestTable'


def measure_time(func):
    def time_it(*args, **kwargs):
        time_started = time.time()
        func(*args, **kwargs)
        time_elapsed = time.time()
        print("{execute} 耗时 {sec} 秒,插入 {rows} 行记录".format(execute=func.__name__,
                                                                         sec=round(time_elapsed - time_started,4),
                                                                         rows=len(kwargs.get('values'))))

    return time_it


class PsycopgTest():

    def __init__(self, num_rows):
        self.num_rows = num_rows

    def create_dummy_data(self):
        values = []
        for i in range(self.num_rows):
            values.append((i + 1, 'test'))
        return values

    def connect(self):
        conn_string = "host={0} user={1} dbname={2} password={3}".format('localhost',
                                                                         'postgres',
                                                                         'tutorial', 'caspar')
        self.connection = psycopg2.connect(conn_string)
        self.cursor = self.connection.cursor()

    def create_table(self):
        self.cursor.execute(
            "CREATE TABLE IF NOT EXISTS {table} (id INT PRIMARY KEY, NAME text)".format(table=TABLE_NAME))
        self.connection.commit()

    def truncate_table(self):
        self.cursor.execute("TRUNCATE TABLE {table} RESTART IDENTITY".format(table=TABLE_NAME))
        self.connection.commit()
        
    @measure_time
    def method_execute(self, values):
        """Loop over the dataset and insert every row separately"""
        for value in values:
            self.cursor.execute("INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME), value)
        self.connection.commit()

    @measure_time
    def method_execute_many(self, values):
        self.cursor.executemany("INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME), values)
        self.connection.commit()

    @measure_time
    def method_execute_batch(self, values):
        psycopg2.extras.execute_batch(self.cursor, "INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME),
                                      values)
        self.connection.commit()

    @measure_time
    def method_string_building(self, values):
        argument_string = ",".join("('%s', '%s')" % (x, y) for (x, y) in values)
        self.cursor.execute("INSERT INTO {table} VALUES".format(table=TABLE_NAME) + argument_string)
        self.connection.commit()


def main():
    psyco = PsycopgTest(10000)
    psyco.connect()
    values = psyco.create_dummy_data()
    psyco.create_table()
    psyco.truncate_table()
    psyco.method_execute(values=values)
    # psyco.method_execute_many(values=values)
    # psyco.method_execute_batch(values=values)
    # psyco.method_string_building(values=values)


if __name__ == '__main__':
    main()

(本文完)

展开阅读全文

页面更新:2024-04-24

标签:数据表   数据   字符串   语句   函数   速度   快速   代码   数据库   时间   方法

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top