title | summary |
---|---|
TiDB 和 Python 的简单 CRUD 应用程序 |
给出一个 TiDB 和 Python 的简单 CRUD 应用程序示例。 |
本文档将展示如何使用 TiDB 和 Python 来构造一个简单的 CRUD 应用程序。
注意:
推荐使用 Python 3.10 及以上版本进行 TiDB 的应用程序的编写。
本节将介绍 TiDB 集群的启动方法。
你可以部署一个本地测试的 TiDB 集群或正式的 TiDB 集群。详细步骤,请参考:
基于 Git 的预配置的开发环境:现在就试试
该环境会自动克隆代码,并通过 TiUP 部署测试集群。
git clone https://github.com/pingcap-inc/tidb-example-python.git
SQLAlchemy 为当前比较流行的开源 Python ORM 之一。此处将以 SQLAlchemy 1.4.44 版本进行说明。
import uuid
from typing import List
from sqlalchemy import create_engine, String, Column, Integer, select, func
from sqlalchemy.orm import declarative_base, sessionmaker
engine = create_engine('mysql://root:@127.0.0.1:4000/test')
Base = declarative_base()
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
class Player(Base):
__tablename__ = "player"
id = Column(String(36), primary_key=True)
coins = Column(Integer)
goods = Column(Integer)
def __repr__(self):
return f'Player(id={self.id!r}, coins={self.coins!r}, goods={self.goods!r})'
def random_player(amount: int) -> List[Player]:
players = []
for _ in range(amount):
players.append(Player(id=uuid.uuid4(), coins=10000, goods=10000))
return players
def simple_example() -> None:
with Session() as session:
# create a player, who has a coin and a goods.
session.add(Player(id="test", coins=1, goods=1))
# get this player, and print it.
get_test_stmt = select(Player).where(Player.id == "test")
for player in session.scalars(get_test_stmt):
print(player)
# create players with bulk inserts.
# insert 1919 players totally, with 114 players per batch.
# each player has a random UUID
player_list = random_player(1919)
for idx in range(0, len(player_list), 114):
session.bulk_save_objects(player_list[idx:idx + 114])
# print the number of players
count = session.query(func.count(Player.id)).scalar()
print(f'number of players: {count}')
# print 3 players.
three_players = session.query(Player).limit(3).all()
for player in three_players:
print(player)
session.commit()
def trade_check(session: Session, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
# sell player goods check
sell_player = session.query(Player.goods).filter(Player.id == sell_id).with_for_update().one()
if sell_player.goods < amount:
print(f'sell player {sell_id} goods not enough')
return False
# buy player coins check
buy_player = session.query(Player.coins).filter(Player.id == buy_id).with_for_update().one()
if buy_player.coins < price:
print(f'buy player {buy_id} coins not enough')
return False
def trade(sell_id: str, buy_id: str, amount: int, price: int) -> None:
with Session() as session:
if trade_check(session, sell_id, buy_id, amount, price) is False:
return
# deduct the goods of seller, and raise his/her the coins
session.query(Player).filter(Player.id == sell_id). \
update({'goods': Player.goods - amount, 'coins': Player.coins + price})
# deduct the coins of buyer, and raise his/her the goods
session.query(Player).filter(Player.id == buy_id). \
update({'goods': Player.goods + amount, 'coins': Player.coins - price})
session.commit()
print("trade success")
def trade_example() -> None:
with Session() as session:
# create two players
# player 1: id is "1", has only 100 coins.
# player 2: id is "2", has 114514 coins, and 20 goods.
session.add(Player(id="1", coins=100, goods=0))
session.add(Player(id="2", coins=114514, goods=20))
session.commit()
# player 1 wants to buy 10 goods from player 2.
# it will cost 500 coins, but player 1 cannot afford it.
# so this trade will fail, and nobody will lose their coins or goods
trade(sell_id="2", buy_id="1", amount=10, price=500)
# then player 1 has to reduce the incoming quantity to 2.
# this trade will be successful
trade(sell_id="2", buy_id="1", amount=2, price=100)
with Session() as session:
traders = session.query(Player).filter(Player.id.in_(("1", "2"))).all()
for player in traders:
print(player)
session.commit()
simple_example()
trade_example()
相较于直接使用 Driver,SQLAlchemy 屏蔽了创建数据库连接时,不同数据库差异的细节。SQLAlchemy 还封装了大量的操作,如会话管理、基本对象的 CRUD 等,极大地简化了代码量。
Player
类为数据库表在程序内的映射。Player
的每个属性都对应着 player
表的一个字段。SQLAlchemy 使用 Player
类为了给 SQLAlchemy 提供更多的信息,使用了形如以上示例中的 id = Column(String(36), primary_key=True)
的类型定义,用来指示字段类型和其附加属性。id = Column(String(36), primary_key=True)
表示 id
字段为 String
类型,对应数据库类型为 VARCHAR
,长度为 36
,且为主键。
关于 SQLAlchemy 的更多使用方法,你可以参考 SQLAlchemy 官网。
peewee 为当前比较流行的开源 Python ORM 之一。此处将以 peewee 3.15.4 版本进行说明。
import os
import uuid
from typing import List
from peewee import *
from playhouse.db_url import connect
db = connect('mysql://root:@127.0.0.1:4000/test')
class Player(Model):
id = CharField(max_length=36, primary_key=True)
coins = IntegerField()
goods = IntegerField()
class Meta:
database = db
table_name = "player"
def random_player(amount: int) -> List[Player]:
players = []
for _ in range(amount):
players.append(Player(id=uuid.uuid4(), coins=10000, goods=10000))
return players
def simple_example() -> None:
# create a player, who has a coin and a goods.
Player.create(id="test", coins=1, goods=1)
# get this player, and print it.
test_player = Player.select().where(Player.id == "test").get()
print(f'id:{test_player.id}, coins:{test_player.coins}, goods:{test_player.goods}')
# create players with bulk inserts.
# insert 1919 players totally, with 114 players per batch.
# each player has a random UUID
player_list = random_player(1919)
Player.bulk_create(player_list, 114)
# print the number of players
count = Player.select().count()
print(f'number of players: {count}')
# print 3 players.
three_players = Player.select().limit(3)
for player in three_players:
print(f'id:{player.id}, coins:{player.coins}, goods:{player.goods}')
def trade_check(sell_id: str, buy_id: str, amount: int, price: int) -> bool:
sell_goods = Player.select(Player.goods).where(Player.id == sell_id).get().goods
if sell_goods < amount:
print(f'sell player {sell_id} goods not enough')
return False
buy_coins = Player.select(Player.coins).where(Player.id == buy_id).get().coins
if buy_coins < price:
print(f'buy player {buy_id} coins not enough')
return False
return True
def trade(sell_id: str, buy_id: str, amount: int, price: int) -> None:
with db.atomic() as txn:
try:
if trade_check(sell_id, buy_id, amount, price) is False:
txn.rollback()
return
# deduct the goods of seller, and raise his/her the coins
Player.update(goods=Player.goods - amount, coins=Player.coins + price).where(Player.id == sell_id).execute()
# deduct the coins of buyer, and raise his/her the goods
Player.update(goods=Player.goods + amount, coins=Player.coins - price).where(Player.id == buy_id).execute()
except Exception as err:
txn.rollback()
print(f'something went wrong: {err}')
else:
txn.commit()
print("trade success")
def trade_example() -> None:
# create two players
# player 1: id is "1", has only 100 coins.
# player 2: id is "2", has 114514 coins, and 20 goods.
Player.create(id="1", coins=100, goods=0)
Player.create(id="2", coins=114514, goods=20)
# player 1 wants to buy 10 goods from player 2.
# it will cost 500 coins, but player 1 cannot afford it.
# so this trade will fail, and nobody will lose their coins or goods
trade(sell_id="2", buy_id="1", amount=10, price=500)
# then player 1 has to reduce the incoming quantity to 2.
# this trade will be successful
trade(sell_id="2", buy_id="1", amount=2, price=100)
# let's take a look for player 1 and player 2 currently
after_trade_players = Player.select().where(Player.id.in_(["1", "2"]))
for player in after_trade_players:
print(f'id:{player.id}, coins:{player.coins}, goods:{player.goods}')
db.connect()
# recreate the player table
db.drop_tables([Player])
db.create_tables([Player])
simple_example()
trade_example()
相较于直接使用 Driver,peewee 屏蔽了创建数据库连接时,不同数据库差异的细节。peewee 还封装了大量的操作,如会话管理、基本对象的 CRUD 等,极大地简化了代码量。
Player
类为数据库表在程序内的映射。Player
的每个属性都对应着 player
表的一个字段。peewee 使用 Player
类为了给 peewee 提供更多的信息,使用了形如以上示例中的 id = CharField(max_length=36, primary_key=True)
的类型定义,用来指示字段类型和其附加属性。id = CharField(max_length=36, primary_key=True)
表示 id
字段为 CharField
类型,对应数据库类型为 VARCHAR
,长度为 36
,且为主键。
关于 peewee 的更多使用方法,你可以参考 peewee 官网。
mysqlclient 为当前比较流行的开源 Python Driver 之一。此处将以 mysqlclient 2.1.1 版本进行说明。虽然 Python 的 Driver 相较其他语言,使用也极其方便。但因其不可屏蔽底层实现,需手动管控事务的特性,如果没有大量必须使用 SQL 的场景,仍然推荐使用 ORM 进行程序编写。这可以降低程序的耦合性。
import uuid
from typing import List
import MySQLdb
from MySQLdb import Connection
from MySQLdb.cursors import Cursor
def get_connection(autocommit: bool = True) -> MySQLdb.Connection:
return MySQLdb.connect(
host="127.0.0.1",
port=4000,
user="root",
password="",
database="test",
autocommit=autocommit
)
def create_player(cursor: Cursor, player: tuple) -> None:
cursor.execute("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", player)
def get_player(cursor: Cursor, player_id: str) -> tuple:
cursor.execute("SELECT id, coins, goods FROM player WHERE id = %s", (player_id,))
return cursor.fetchone()
def get_players_with_limit(cursor: Cursor, limit: int) -> List[tuple]:
cursor.execute("SELECT id, coins, goods FROM player LIMIT %s", (limit,))
return cursor.fetchall()
def random_player(amount: int) -> List[tuple]:
players = []
for _ in range(amount):
players.append((uuid.uuid4(), 10000, 10000))
return players
def bulk_create_player(cursor: Cursor, players: List[tuple]) -> None:
cursor.executemany("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", players)
def get_count(cursor: Cursor) -> None:
cursor.execute("SELECT count(*) FROM player")
return cursor.fetchone()[0]
def trade_check(cursor: Cursor, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
get_player_with_lock_sql = "SELECT coins, goods FROM player WHERE id = %s FOR UPDATE"
# sell player goods check
cursor.execute(get_player_with_lock_sql, (sell_id,))
_, sell_goods = cursor.fetchone()
if sell_goods < amount:
print(f'sell player {sell_id} goods not enough')
return False
# buy player coins check
cursor.execute(get_player_with_lock_sql, (buy_id,))
buy_coins, _ = cursor.fetchone()
if buy_coins < price:
print(f'buy player {buy_id} coins not enough')
return False
def trade_update(cursor: Cursor, sell_id: str, buy_id: str, amount: int, price: int) -> None:
update_player_sql = "UPDATE player set goods = goods + %s, coins = coins + %s WHERE id = %s"
# deduct the goods of seller, and raise his/her the coins
cursor.execute(update_player_sql, (-amount, price, sell_id))
# deduct the coins of buyer, and raise his/her the goods
cursor.execute(update_player_sql, (amount, -price, buy_id))
def trade(connection: Connection, sell_id: str, buy_id: str, amount: int, price: int) -> None:
with connection.cursor() as cursor:
if trade_check(cursor, sell_id, buy_id, amount, price) is False:
connection.rollback()
return
try:
trade_update(cursor, sell_id, buy_id, amount, price)
except Exception as err:
connection.rollback()
print(f'something went wrong: {err}')
else:
connection.commit()
print("trade success")
def simple_example() -> None:
with get_connection(autocommit=True) as conn:
with conn.cursor() as cur:
# create a player, who has a coin and a goods.
create_player(cur, ("test", 1, 1))
# get this player, and print it.
test_player = get_player(cur, "test")
print(f'id:{test_player[0]}, coins:{test_player[1]}, goods:{test_player[2]}')
# create players with bulk inserts.
# insert 1919 players totally, with 114 players per batch.
# each player has a random UUID
player_list = random_player(1919)
for idx in range(0, len(player_list), 114):
bulk_create_player(cur, player_list[idx:idx + 114])
# print the number of players
count = get_count(cur)
print(f'number of players: {count}')
# print 3 players.
three_players = get_players_with_limit(cur, 3)
for player in three_players:
print(f'id:{player[0]}, coins:{player[1]}, goods:{player[2]}')
def trade_example() -> None:
with get_connection(autocommit=False) as conn:
with conn.cursor() as cur:
# create two players
# player 1: id is "1", has only 100 coins.
# player 2: id is "2", has 114514 coins, and 20 goods.
create_player(cur, ("1", 100, 0))
create_player(cur, ("2", 114514, 20))
conn.commit()
# player 1 wants to buy 10 goods from player 2.
# it will cost 500 coins, but player 1 cannot afford it.
# so this trade will fail, and nobody will lose their coins or goods
trade(conn, sell_id="2", buy_id="1", amount=10, price=500)
# then player 1 has to reduce the incoming quantity to 2.
# this trade will be successful
trade(conn, sell_id="2", buy_id="1", amount=2, price=100)
# let's take a look for player 1 and player 2 currently
with conn.cursor() as cur:
_, player1_coin, player1_goods = get_player(cur, "1")
print(f'id:1, coins:{player1_coin}, goods:{player1_goods}')
_, player2_coin, player2_goods = get_player(cur, "2")
print(f'id:2, coins:{player2_coin}, goods:{player2_goods}')
simple_example()
trade_example()
Driver 有着更低的封装程度,因此我们可以在程序内见到大量的 SQL。程序内查询到的 Player
,与 ORM 不同,因为没有数据对象的存在,Player
将以元组 (tuple) 进行表示。
关于 mysqlclient 的更多使用方法,你可以参考 mysqlclient 官方文档。
PyMySQL 为当前比较流行的开源 Python Driver 之一。此处将以 PyMySQL 1.0.2 版本进行说明。虽然 Python 的 Driver 相较其他语言,使用也极其方便。但因其不可屏蔽底层实现,需手动管控事务的特性,如果没有大量必须使用 SQL 的场景,仍然推荐使用 ORM 进行程序编写。这可以降低程序的耦合性。
import uuid
from typing import List
import pymysql.cursors
from pymysql import Connection
from pymysql.cursors import DictCursor
def get_connection(autocommit: bool = False) -> Connection:
return pymysql.connect(host='127.0.0.1',
port=4000,
user='root',
password='',
database='test',
cursorclass=DictCursor,
autocommit=autocommit)
def create_player(cursor: DictCursor, player: tuple) -> None:
cursor.execute("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", player)
def get_player(cursor: DictCursor, player_id: str) -> dict:
cursor.execute("SELECT id, coins, goods FROM player WHERE id = %s", (player_id,))
return cursor.fetchone()
def get_players_with_limit(cursor: DictCursor, limit: int) -> tuple:
cursor.execute("SELECT id, coins, goods FROM player LIMIT %s", (limit,))
return cursor.fetchall()
def random_player(amount: int) -> List[tuple]:
players = []
for _ in range(amount):
players.append((uuid.uuid4(), 10000, 10000))
return players
def bulk_create_player(cursor: DictCursor, players: List[tuple]) -> None:
cursor.executemany("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", players)
def get_count(cursor: DictCursor) -> int:
cursor.execute("SELECT count(*) as count FROM player")
return cursor.fetchone()['count']
def trade_check(cursor: DictCursor, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
get_player_with_lock_sql = "SELECT coins, goods FROM player WHERE id = %s FOR UPDATE"
# sell player goods check
cursor.execute(get_player_with_lock_sql, (sell_id,))
seller = cursor.fetchone()
if seller['goods'] < amount:
print(f'sell player {sell_id} goods not enough')
return False
# buy player coins check
cursor.execute(get_player_with_lock_sql, (buy_id,))
buyer = cursor.fetchone()
if buyer['coins'] < price:
print(f'buy player {buy_id} coins not enough')
return False
def trade_update(cursor: DictCursor, sell_id: str, buy_id: str, amount: int, price: int) -> None:
update_player_sql = "UPDATE player set goods = goods + %s, coins = coins + %s WHERE id = %s"
# deduct the goods of seller, and raise his/her the coins
cursor.execute(update_player_sql, (-amount, price, sell_id))
# deduct the coins of buyer, and raise his/her the goods
cursor.execute(update_player_sql, (amount, -price, buy_id))
def trade(connection: Connection, sell_id: str, buy_id: str, amount: int, price: int) -> None:
with connection.cursor() as cursor:
if trade_check(cursor, sell_id, buy_id, amount, price) is False:
connection.rollback()
return
try:
trade_update(cursor, sell_id, buy_id, amount, price)
except Exception as err:
connection.rollback()
print(f'something went wrong: {err}')
else:
connection.commit()
print("trade success")
def simple_example() -> None:
with get_connection(autocommit=True) as connection:
with connection.cursor() as cur:
# create a player, who has a coin and a goods.
create_player(cur, ("test", 1, 1))
# get this player, and print it.
test_player = get_player(cur, "test")
print(test_player)
# create players with bulk inserts.
# insert 1919 players totally, with 114 players per batch.
# each player has a random UUID
player_list = random_player(1919)
for idx in range(0, len(player_list), 114):
bulk_create_player(cur, player_list[idx:idx + 114])
# print the number of players
count = get_count(cur)
print(f'number of players: {count}')
# print 3 players.
three_players = get_players_with_limit(cur, 3)
for player in three_players:
print(player)
def trade_example() -> None:
with get_connection(autocommit=False) as connection:
with connection.cursor() as cur:
# create two players
# player 1: id is "1", has only 100 coins.
# player 2: id is "2", has 114514 coins, and 20 goods.
create_player(cur, ("1", 100, 0))
create_player(cur, ("2", 114514, 20))
connection.commit()
# player 1 wants to buy 10 goods from player 2.
# it will cost 500 coins, but player 1 cannot afford it.
# so this trade will fail, and nobody will lose their coins or goods
trade(connection, sell_id="2", buy_id="1", amount=10, price=500)
# then player 1 has to reduce the incoming quantity to 2.
# this trade will be successful
trade(connection, sell_id="2", buy_id="1", amount=2, price=100)
# let's take a look for player 1 and player 2 currently
with connection.cursor() as cur:
print(get_player(cur, "1"))
print(get_player(cur, "2"))
simple_example()
trade_example()
Driver 有着更低的封装程度,因此我们可以在程序内见到大量的 SQL。程序内查询到的 Player
,与 ORM 不同,因为没有数据对象的存在,Player
将以 dict 进行表示。
关于 PyMySQL 的更多使用方法,你可以参考 PyMySQL 官方文档。
mysql-connector-python 为当前比较流行的开源 Python Driver 之一。此处将以 mysql-connector-python 8.0.31 版本进行说明。虽然 Python 的 Driver 相较其他语言,使用也极其方便。但因其不可屏蔽底层实现,需手动管控事务的特性,如果没有大量必须使用 SQL 的场景,仍然推荐使用 ORM 进行程序编写。这可以降低程序的耦合性。
import uuid
from typing import List
from mysql.connector import connect, MySQLConnection
from mysql.connector.cursor import MySQLCursor
def get_connection(autocommit: bool = True) -> MySQLConnection:
connection = connect(host='127.0.0.1',
port=4000,
user='root',
password='',
database='test')
connection.autocommit = autocommit
return connection
def create_player(cursor: MySQLCursor, player: tuple) -> None:
cursor.execute("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", player)
def get_player(cursor: MySQLCursor, player_id: str) -> tuple:
cursor.execute("SELECT id, coins, goods FROM player WHERE id = %s", (player_id,))
return cursor.fetchone()
def get_players_with_limit(cursor: MySQLCursor, limit: int) -> List[tuple]:
cursor.execute("SELECT id, coins, goods FROM player LIMIT %s", (limit,))
return cursor.fetchall()
def random_player(amount: int) -> List[tuple]:
players = []
for _ in range(amount):
players.append((str(uuid.uuid4()), 10000, 10000))
return players
def bulk_create_player(cursor: MySQLCursor, players: List[tuple]) -> None:
cursor.executemany("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", players)
def get_count(cursor: MySQLCursor) -> int:
cursor.execute("SELECT count(*) FROM player")
return cursor.fetchone()[0]
def trade_check(cursor: MySQLCursor, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
get_player_with_lock_sql = "SELECT coins, goods FROM player WHERE id = %s FOR UPDATE"
# sell player goods check
cursor.execute(get_player_with_lock_sql, (sell_id,))
_, sell_goods = cursor.fetchone()
if sell_goods < amount:
print(f'sell player {sell_id} goods not enough')
return False
# buy player coins check
cursor.execute(get_player_with_lock_sql, (buy_id,))
buy_coins, _ = cursor.fetchone()
if buy_coins < price:
print(f'buy player {buy_id} coins not enough')
return False
def trade_update(cursor: MySQLCursor, sell_id: str, buy_id: str, amount: int, price: int) -> None:
update_player_sql = "UPDATE player set goods = goods + %s, coins = coins + %s WHERE id = %s"
# deduct the goods of seller, and raise his/her the coins
cursor.execute(update_player_sql, (-amount, price, sell_id))
# deduct the coins of buyer, and raise his/her the goods
cursor.execute(update_player_sql, (amount, -price, buy_id))
def trade(connection: MySQLConnection, sell_id: str, buy_id: str, amount: int, price: int) -> None:
with connection.cursor() as cursor:
if trade_check(cursor, sell_id, buy_id, amount, price) is False:
connection.rollback()
return
try:
trade_update(cursor, sell_id, buy_id, amount, price)
except Exception as err:
connection.rollback()
print(f'something went wrong: {err}')
else:
connection.commit()
print("trade success")
def simple_example() -> None:
with get_connection(autocommit=True) as connection:
with connection.cursor() as cur:
# create a player, who has a coin and a goods.
create_player(cur, ("test", 1, 1))
# get this player, and print it.
test_player = get_player(cur, "test")
print(f'id:{test_player[0]}, coins:{test_player[1]}, goods:{test_player[2]}')
# create players with bulk inserts.
# insert 1919 players totally, with 114 players per batch.
# each player has a random UUID
player_list = random_player(1919)
for idx in range(0, len(player_list), 114):
bulk_create_player(cur, player_list[idx:idx + 114])
# print the number of players
count = get_count(cur)
print(f'number of players: {count}')
# print 3 players.
three_players = get_players_with_limit(cur, 3)
for player in three_players:
print(f'id:{player[0]}, coins:{player[1]}, goods:{player[2]}')
def trade_example() -> None:
with get_connection(autocommit=False) as conn:
with conn.cursor() as cur:
# create two players
# player 1: id is "1", has only 100 coins.
# player 2: id is "2", has 114514 coins, and 20 goods.
create_player(cur, ("1", 100, 0))
create_player(cur, ("2", 114514, 20))
conn.commit()
# player 1 wants to buy 10 goods from player 2.
# it will cost 500 coins, but player 1 cannot afford it.
# so this trade will fail, and nobody will lose their coins or goods
trade(conn, sell_id="2", buy_id="1", amount=10, price=500)
# then player 1 has to reduce the incoming quantity to 2.
# this trade will be successful
trade(conn, sell_id="2", buy_id="1", amount=2, price=100)
# let's take a look for player 1 and player 2 currently
with conn.cursor() as cur:
_, player1_coin, player1_goods = get_player(cur, "1")
print(f'id:1, coins:{player1_coin}, goods:{player1_goods}')
_, player2_coin, player2_goods = get_player(cur, "2")
print(f'id:2, coins:{player2_coin}, goods:{player2_goods}')
simple_example()
trade_example()
Driver 有着更低的封装程度,因此我们可以在程序内见到大量的 SQL。程序内查询到的 Player
,与 ORM 不同,因为没有数据对象的存在,Player
将以 tuple 进行表示。
关于 mysql-connector-python 的更多使用方法,你可以参考 mysql-connector-python 官方文档。
本节将逐步介绍代码的运行方法。
建议:
在 Gitpod Playground 中尝试 Python 与 TiDB 的连接:现在就试试
本示例需手动初始化表,若你使用本地集群,可直接运行:
mysql --host 127.0.0.1 --port 4000 -u root < player_init.sql
mycli --host 127.0.0.1 --port 4000 -u root --no-warn < player_init.sql
若不使用本地集群,或未安装命令行客户端,请用喜欢的方式(如 Navicat、DBeaver 等 GUI 工具)直接登录集群,并运行 player_init.sql
文件内的 SQL 语句。
若你使用了 TiDB Serverless 集群,此处需使用系统本地的 CA 证书,并将证书路径记为 <ca_path>
以供后续指代。请参考以下系统相关的证书路径地址:
/etc/ssl/cert.pem
/etc/ssl/certs/ca-certificates.crt
/etc/pki/tls/certs/ca-bundle.crt
/etc/ssl/ca-bundle.pem
若设置后仍有证书错误,请查阅 TiDB Serverless 安全连接文档。
若你使用 TiDB Serverless 集群,更改 sqlalchemy_example.py
内 create_engine
函数的入参:
engine = create_engine('mysql://root:@127.0.0.1:4000/test')
若你设定的密码为 123456
,而且从 TiDB Serverless 集群面板中得到的连接信息为:
- Endpoint:
xxx.tidbcloud.com
- Port:
4000
- User:
2aEp24QWEDLqRFs.root
那么此处应将 create_engine
更改为:
engine = create_engine('mysql://2aEp24QWEDLqRFs.root:[email protected]:4000/test', connect_args={
"ssl_mode": "VERIFY_IDENTITY",
"ssl": {
"ca": "<ca_path>"
}
})
若你使用 TiDB Serverless 集群,更改 peewee_example.py
内 connect
函数的入参:
db = connect('mysql://root:@127.0.0.1:4000/test')
若你设定的密码为 123456
,而且从 TiDB Serverless 集群面板中得到的连接信息为:
- Endpoint:
xxx.tidbcloud.com
- Port:
4000
- User:
2aEp24QWEDLqRFs.root
那么此处应将 connect
更改为:
-
peewee 将 PyMySQL 作为 Driver 时:
db = connect('mysql://2aEp24QWEDLqRFs.root:[email protected]:4000/test', ssl_verify_cert=True, ssl_ca="<ca_path>")
-
peewee 将 mysqlclient 作为 Driver 时:
db = connect('mysql://2aEp24QWEDLqRFs.root:[email protected]:4000/test', ssl_mode="VERIFY_IDENTITY", ssl={"ca": "<ca_path>"})
由于 peewee 会将参数透传至 Driver 中,使用 peewee 时请注意 Driver 的使用类型。
若你使用 TiDB Serverless 集群,更改 mysqlclient_example.py
内 get_connection
函数:
def get_connection(autocommit: bool = True) -> MySQLdb.Connection:
return MySQLdb.connect(
host="127.0.0.1",
port=4000,
user="root",
password="",
database="test",
autocommit=autocommit
)
若你设定的密码为 123456
,而且从 TiDB Serverless 集群面板中得到的连接信息为:
- Endpoint:
xxx.tidbcloud.com
- Port:
4000
- User:
2aEp24QWEDLqRFs.root
那么此处应将 get_connection
更改为:
def get_connection(autocommit: bool = True) -> MySQLdb.Connection:
return MySQLdb.connect(
host="xxx.tidbcloud.com",
port=4000,
user="2aEp24QWEDLqRFs.root",
password="123456",
database="test",
autocommit=autocommit,
ssl_mode="VERIFY_IDENTITY",
ssl={
"ca": "<ca_path>"
}
)
若你使用 TiDB Serverless 集群,更改 pymysql_example.py
内 get_connection
函数:
def get_connection(autocommit: bool = False) -> Connection:
return pymysql.connect(host='127.0.0.1',
port=4000,
user='root',
password='',
database='test',
cursorclass=DictCursor,
autocommit=autocommit)
若你设定的密码为 123456
,而且从 TiDB Serverless 集群面板中得到的连接信息为:
- Endpoint:
xxx.tidbcloud.com
- Port:
4000
- User:
2aEp24QWEDLqRFs.root
那么此处应将 get_connection
更改为:
def get_connection(autocommit: bool = False) -> Connection:
return pymysql.connect(host='xxx.tidbcloud.com',
port=4000,
user='2aEp24QWEDLqRFs.root',
password='123546',
database='test',
cursorclass=DictCursor,
autocommit=autocommit,
ssl_ca='<ca_path>',
ssl_verify_cert=True,
ssl_verify_identity=True)
若你使用 TiDB Serverless 集群,更改 mysql_connector_python_example.py
内 get_connection
函数:
def get_connection(autocommit: bool = True) -> MySQLConnection:
connection = connect(host='127.0.0.1',
port=4000,
user='root',
password='',
database='test')
connection.autocommit = autocommit
return connection
若你设定的密码为 123456
,而且从 TiDB Serverless 集群面板中得到的连接信息为:
- Endpoint:
xxx.tidbcloud.com
- Port:
4000
- User:
2aEp24QWEDLqRFs.root
那么此处应将 get_connection
更改为:
def get_connection(autocommit: bool = True) -> MySQLConnection:
connection = connect(
host="xxx.tidbcloud.com",
port=4000,
user="2aEp24QWEDLqRFs.root",
password="123456",
database="test",
autocommit=autocommit,
ssl_ca='<ca_path>',
ssl_verify_identity=True
)
connection.autocommit = autocommit
return connection
运行前请先安装依赖:
pip3 install -r requirement.txt
当以后需要多次运行脚本时,请在每次运行前先依照表初始化一节再次进行表初始化。
python3 sqlalchemy_example.py
python3 peewee_example.py
python3 mysqlclient_example.py
python3 pymysql_example.py
python3 mysql_connector_python_example.py