Python clickhouse-driver 类库使用学习总结

实践环境

python3 .9.13

clickhouse-driver 0.2.9

实践操作

# -*- coding:utf-8 -*-
import clickhouse_driver
if __name__ == '__main__':
 host = '192.168.88.131'
 port = 9000 # 注意,不能使用默认的8123
 username = 'testacc'
 password = 'test1234'
 database = 'default'
 # 连接方式1
 # conn = clickhouse_driver.connect(database = database,
 # user = username,
 # password=password,
 # host = host,
 # port = port)
 # 连接方式2
 connection_str = f'clickhouse://{username}:{password}@{host}:{port}/{database}'
 conn = clickhouse_driver.connect(connection_str)
 cursor = conn.cursor()
 cursor.execute('SHOW TABLES')
 res = cursor.fetchall()
 print(res) # 输出形如 [('table1',), ('test',)]
 # 删除表
 cursor.execute('DROP TABLE IF EXISTS test')
 print(cursor.fetchall()) # 输出:[]
 cursor.execute('CREATE TABLE test (x Int32) ENGINE = Memory')
 print(cursor.fetchall()) # 输出:[]
 cursor.executemany('INSERT INTO test (x) VALUES', [{'x': 100}])
 print(cursor.rowcount) # 获取execute 产生记录数 输出:1
 # 插入多条
 cursor.executemany('INSERT INTO test (x) VALUES', [[300],[400]])
 cursor.executemany('INSERT INTO test (x) VALUES', [[200]])
 print(cursor.rowcount) # 输出:1
 
 cursor.execute('INSERT INTO test (x) SELECT * FROM system.numbers LIMIT %(limit)s', {'limit': 3})
 print(cursor.rowcount) # 输出:1
 cursor.execute('SELECT sum(x) AS sum_value FROM test')
 print(cursor.rowcount) # 输出:1
 print(cursor.columns_with_types) # 获取查询列名及类型,输出:[('sum_value', 'Int64')]
 cursor.execute('SELECT * FROM test')
 print(cursor.rowcount) # 输出:5
 print(cursor.columns_with_types) # 输出:[('x', 'Int32')]
 res = cursor.fetchall()
 print(res) # 输出:[(100,), (200,), (0,), (1,), (2,)]
 print(cursor.fetchone()) #输出:None
 #############################
 cursor.execute('SELECT * FROM test')
 print(cursor.fetchone()) # 输出:(100,)
 # 仅取3条
 print(cursor.fetchmany(3)) # 输出:[(200,), (0,), (1,)]
 #############################
 cursor.execute('SELECT * FROM test')
 print(cursor.fetchone()) # 输出:(100,)
 print(cursor.fetchall()) # 输出:[(200,), (0,), (1,), (2,)]
 cursor.close()
 conn.close()

说明:

conn = clickhouse_driver.connect(connection_str)

connection_str

'clickhouse://[{username}:{password}@]{host}[:{port}][/{database}]'

其中,{database}默认为default

类库封装

# -*- coding:utf-8 -*-
import re
import traceback
import clickhouse_driver
from utils.log import logger
class ClickhouseCli:
 def __init__(self, db_name='', db_host='', port=3306, user='', password='', connect_timeout=15):
 try:
 self.dbconn = None
 self.host = db_host
 self.port = port
 self.user = user
 self.passwd = password
 self.db_name = db_name
 self.connect_timeout = connect_timeout
 self.connect_config = {'host': self.host, 'port': self.port, 'user': self.user, 'password': self.passwd,
 'database': self.db_name, 'connect_timeout': self.connect_timeout}
 self.__connect_database()
 logger.debug('初始化数据库连接成功(数据库:%s)' % self.db_name)
 except Exception as e:
 raise Exception('初始化数据库(%s)连接失败:%s' % (self.db_name, traceback.format_exc()))
 def __connect_database(self):
 self.dbconn = clickhouse_driver.connect(**self.connect_config)
 
 def insert(self, query, params=None):
 '''插入单条数据
 示例:
 :query "INSERT INTO test (x) VALUES"
 :params [{'x': 100}]
 
 :query "INSERT INTO test (x) VALUES"
 :params [[100]]
 
 :query "INSERT INTO test (x) SELECT * FROM system.numbers LIMIT %(limit)s"
 :params {'limit': 3}
 '''
 
 try:
 db_cursor = self.dbconn.cursor()
 except Exception:
 logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())
 self.__connect_database()
 db_cursor = self.dbconn.cursor()
 try:
 db_cursor.execute(query, params)
 db_cursor.close()
 except Exception:
 db_cursor.close()
 raise Exception(f'执行数据库插入操作({query})失败:{traceback.format_exc()}')
 def delete(self, query, params=None):
 '''例子:
 :query ALTER TABLE test DELETE WHERE id = %(id)s'
 :params [{'id': 1}]
 当然,也可以把参数放到query中 ALTER TABLE test DELETE WHERE id = 2
 '''
 
 try:
 db_cursor = self.dbconn.cursor()
 except Exception:
 logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())
 self.__connect_database()
 db_cursor = self.dbconn.cursor()
 try:
 db_cursor.execute(query, params)
 db_cursor.close()
 except Exception:
 db_cursor.close()
 raise Exception(f'执行数据库删除操作({query})失败:{traceback.format_exc()}')
 def update(self, query, params=None, auto_optimize=False):
 '''
 :auto_optimize 是否自动优化表 True - 是 False - 否
 例子:
 :query "ALTER TABLE test UPDATE log_message=%(log_message)s WHERE id = %(id)s"
 :params {'log_message':'log message', 'id': 2}
 当然,也可以把参数放到query中 ALTER TABLE test UPDATE log_message='log message' WHERE id = 2
 '''
 try:
 db_cursor = self.dbconn.cursor()
 except Exception:
 logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())
 self.__connect_database()
 db_cursor = self.dbconn.cursor()
 try:
 db_cursor.execute(query, params)
 if auto_optimize:
 res = re.findall(r'alter\s+table\s+(.+)\s+update', query, re.I)
 if len(res) > 0:
 table_name = res[0]
 db_cursor.execute(f'OPTIMIZE TABLE {table_name} FINAL')
 db_cursor.close()
 except Exception:
 db_cursor.close()
 raise Exception(f'执行数据库更新操作({query})失败:{traceback.format_exc()}')
 
 def select(self, query, params=None):
 '''查询结果最多只包含一条记录
 
 示例:查询获取获取id为2的记录
 :query "SELECT * FROM test WHERE id = %(id)s"
 :param {'id': 2}
 当然,也可以把参数放到query中 SELECT * FROM test WHERE id = 2
 '''
 result = []
 try:
 db_cursor = self.dbconn.cursor()
 except Exception:
 logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())
 self.__connect_database()
 db_cursor = self.dbconn.cursor()
 try:
 db_cursor.execute(query, params)
 query_result = db_cursor.fetchall()
 if query_result:
 result = query_result[0]
 db_cursor.close()
 except Exception:
 db_cursor.close()
 raise Exception(f'执行数据库查询操作({query})失败:{traceback.format_exc()}')
 return result
 def select_many(self, query, params=None):
 '''查询,查询结果包含多条记录'''
 try:
 db_cursor = self.dbconn.cursor()
 except Exception:
 logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库(%s)' % (traceback.format_exc(), self.db_name))
 self.__connect_database()
 db_cursor = self.dbconn.cursor()
 
 try:
 db_cursor.execute(query, params)
 query_result = db_cursor.fetchall()
 db_cursor.close()
 except Exception:
 db_cursor.close()
 raise Exception(f'执行数据库查询操作({query})失败:{traceback.format_exc()}')
 return query_result
 def close(self):
 if self.dbconn:
 self.dbconn.close()
 self.dbconn = None
 def __del__(self):
 self.close()
 
 
if __name__ == '__main__':
 from datetime import datetime
 db_cli = ClickhouseCli(db_name='testdb', db_host='192.168.88.131', port=9000, user='testacc', password='test1234', connect_timeout=15)
 db_cli.insert('INSERT INTO test (id, event_date, log_message) VALUES', [[1, datetime.now(), 'test message 1']])
 db_cli.insert('INSERT INTO test (id, event_date, log_message) VALUES', [[2, datetime.now(), 'test message 2'],[3, datetime.now(), 'test message 3']])
 res = db_cli.select_many('SELECT * FROM test')
 print(res)
 #输出:[(1, datetime.date(2025, 9, 25), 'test message 1'), (2, datetime.date(2025, 9, 25), 'test message 2'), (3, datetime.date(2025, 9, 25), 'test message 3')]
 
 res = db_cli.select('SELECT * FROM test WHERE id = %(id)s', {'id': 2})
 print(res) # 输出:(2, datetime.date(2025, 9, 25), 'test message 2')
 res = db_cli.select('SELECT * FROM test WHERE id = 2')
 print(res)
 
 db_cli.update("ALTER TABLE test UPDATE log_message = %(log_message)s WHERE id = %(id)s", {'log_message':'log_message %s' % datetime.now().strftime('%Y%m%d%H%M%S'), 'id': 2}, auto_optimize=True)
 res = db_cli.select('SELECT * FROM test WHERE id = 2')
 print(res)
 
 db_cli.update("ALTER TABLE test UPDATE log_message = '%s' WHERE id = 2" % ('log_message %s' % datetime.now().strftime('%Y%m%d%H%M%S')))
 res = db_cli.select('SELECT * FROM test WHERE id = 2')
 db_cli.delete('ALTER TABLE test DELETE WHERE id in (%(id)s, %(id2)s)', {'id': 2, 'id2': 3})

注意:误区

当前驱动版本下验证,使用类似以下代码,尝试切换当前数据库至目标数据库test_db,然后获取获取test_db数据库中所有表

db_cli.select('USE `test_db`')
tables = db_cli.select('SHOW TABLES') 

实际执行结果,db_cli.select('SHOW TABLES')总是返回初始化连接时连接的数据库的中的表。解决方法如下:

tables = db_cli.select('SHOW TABLES FROM test_db') 

参考链接

https://pypi.org/project/clickhouse-driver/#description

https://clickhouse-driver.readthedocs.io/en/latest/dbapi.html#clickhouse_driver.dbapi.connect

https://clickhouse-driver.readthedocs.io/en/latest/dbapi.html#cursor

作者:授客原文地址:https://www.cnblogs.com/shouke/p/19134089

%s 个评论

要回复文章请先登录注册