さらに、「LLMはすべてをこなすことができるというわけではありません。大抵のことはできません。
しかし、現在のLLMはすでにかなり価値のあるものになっています」「私の挙げた事例はほとんど簡単なタスクで、コンピューターサイエンスの学部生なら誰でも学んで実行できるレベルのタスクです。
しかし、これらのタスクに関する質問に対して何でも気軽に回答してくれるような魔法のような友だちは誰も持っていません。しかし、LLMならそれが可能です。
LLMはまだプログラマーの仕事を完全に代替することはできませんが、簡単なタスクなら十分に代替可能です」と語りました。
⇧ AIが誤った情報を提供した場合に、全く知見の無い領域だと判断が付かなくて誤った知識が定着してしまうリスクもありますけどね...
で、結局のところ、情報の正しさを確かめるために、自分で検索することになるんですけどね...
ORM(Object Relational Mapping)とは
Wikipediaによりますと、
オブジェクト関係マッピング(英: Object-relational mapping、O/RM、ORM)とは、データベースとオブジェクト指向プログラミング言語の間の非互換なデータを変換するプログラミング技法である。オブジェクト関連マッピングとも呼ぶ。
実際には、オブジェクト指向言語から使える「仮想」オブジェクトデータベースを構築する手法である。オブジェクト関係マッピングを行うソフトウェアパッケージは商用のものもフリーなものもあるが、場合によっては独自に開発することもある。
⇧ とのこと。
とで、互いにデータの扱いが異なるので、相互にデータのやり取りする際に煩雑な処理が発生してしまうのだけど、そのあたりの処理を担ってくれるのが「ORM(Object Relational Mapping)」ですと。
イメージとしては、
⇧ 上図のように、
の橋渡しをしてくれる部分を担ってくれるのが「ORM(Object Relational Mapping)」ですと。
SQLAlchemyとは
公式のドキュメントによりますと、
SQLAlchemy is the Python SQL toolkit and Object Relational Mapper that gives application developers the full power and flexibility of SQL.
⇧ とあり、Python向けの「ORM(Object Relational Mapping)」ライブラリですと。
⇧ 上図のようなライブラリの構成らしいですと。
実際にアプリケーション側では、
⇧ 上図のように、「SQLAlchemyRepository」インターフェイスを実装した抽象クラスを継承したRepositoryクラスを用意していく感じになるっぽいのかね。
ただ、「SQLAlchemy」としては、そのようなインターフェイスを用意していないらしいので、「Repository pattern」を実現したいのであれば、アプリケーションの実装者が作るしかないみたい...
PythonのORMのSQLAlchemyでPostgreSQLとやり取りするのに必要な情報
とりあえず、
⇧ メジャーバージョンとして、
- 1.x系
- 2.x系
が公開されているようですが、2.x系を利用していく感じになるかと。
2.x系を利用するための要件としては、
⇧ とあるので、「Python 3.7」以上がインストールされている環境であれば利用できる模様。
と思ったら、
⇧ 各々の「RDBMS(Relational DataBase Management Systems)」向けのPythonライブラリをインストールする必要があるらしい。
公式のドキュメントを見ても、
⇧ インストールについては触れられていないという...
インストールします。
非同期処理の場合は、「psycopg」の方が必要らしいので、インストール。
何やら、
⇧ 他にも、依存関係が必要だったらしい...
SQLAlchemyのドキュメントが分かり辛い...
インストール。
とりあえず、
⇧ 上記の記事の時に利用してるPython仮想環境にログインし、「SQLAlchemy」をインストールします。
⇧ インストールされました。
PostgreSQL側にテーブルを作成
今回は、「WSL 2(Windows Subsystem for Linux 2)」環境の「Rocky Linux」にインストールしているPostgreSQLを利用していきます。
このあたりは、ご自分の環境に合わせたものをご利用ください。
userテーブルを作成します。
-- CREATE TABLE -- CREATE TABLE "user" ( id SERIAL NOT NULL, first_name VARCHAR(255), last_name VARCHAR(255), first_name_kana VARCHAR(255), last_name_kana VARCHAR(255), email VARCHAR(128), birth DATE, gender INTEGER, tel VARCHAR(15), create_user VARCHAR(255) NOT NULL, create_at TIMESTAMP WITHOUT TIME ZONE NOT NULL, update_user VARCHAR(255), update_at TIMESTAMP WITHOUT TIME ZONE, PRIMARY KEY (id) ); -- コメント -- COMMENT ON TABLE "user" IS 'ユーザー情報'; COMMENT ON COLUMN "user"."id" IS 'ユーザーID'; COMMENT ON COLUMN "user"."first_name" IS '名'; COMMENT ON COLUMN "user"."last_name" IS '氏'; COMMENT ON COLUMN "user"."first_name_kana" IS '名カナ'; COMMENT ON COLUMN "user"."last_name_kana" IS '氏カナ'; COMMENT ON COLUMN "user"."email" IS 'メールアドレス'; COMMENT ON COLUMN "user"."birth" IS '生年月日'; COMMENT ON COLUMN "user"."gender" IS '性別'; COMMENT ON COLUMN "user"."tel" IS '電話番号'; COMMENT ON COLUMN "user"."create_user" IS '登録ユーザー'; COMMENT ON COLUMN "user"."create_at" IS '登録日時'; COMMENT ON COLUMN "user"."update_user" IS '更新ユーザー'; COMMENT ON COLUMN "user"."update_at" IS '更新日時';
⇧ とりあえず、userテーブルが作成されました。
PostgreSQLの認証方式をscram-sha-256にする
で、今回、SQLAlchemyで利用するPostgreSQLのドライバーが「psycopg3」ってものなのだけど(ライブラリの名称は「psycopg」なのがややこしい)、認証方式が「scram-sha-256」になっている必要があるので、
⇧ 上記サイト様を参考に、
- /var/lib/data/xx(※1)/postgres.conf
- /var/lib/data/xx(※1)/pg_hba.conf
※1 xxは、PostgreSQLのバージョン。あったり無かったり。
を探して、
⇧ を参考に設定を変更します。
認証方式を「scram-sha-256」に変更後、ユーザーのパスワードを変更するのを忘れずに。
PythonのORMの1つであるSQLAlchemyのAPIでPostgreSQLとやり取りしてみる
PostgreSQL側にテーブルも作成できたのですが、
⇧ 上記サイト様にありますように、「SQLAlchemy」で用意されているAPIを利用して、テーブルを作成することもできる模様。
ちなみに、
Itemクラスには__init__関数を定義していないが、コンストラクタに引数を指定して定義できた。
⇧ 上記サイト様によりますと、エンティティクラスに、__init__を定義していなくても、引数ありでインスタンス化できるらしい。
では、「SQLAlchemy」のAPIで基本的なCRUD処理を試してみますか。
以下、記載している以外のソースコードについては、変更がないということで、過去の記事をご参考くださいませ。
■C:\Users\Toshinobu\Desktop\soft_work\python_work\fastapi\app\src\main\resources\application-dev.yml
# データベース接続情報 database: user: test_user password: password host: 172.24.91.141 port: 5432 database: testdb dialect: postgresql driver: psycopg_async encoding: utf8 # logging: True pool_size: 5 pool_connection_timeout: max_overflow: 5 pool_recycle: 3600 pool_pre_ping: True
■C:\Users\Toshinobu\Desktop\soft_work\python_work\fastapi\app\src\main\py\config\app_config.py
import os import sys from pathlib import Path import asyncio import yaml from injector import Module, provider, singleton from app.src.main.py.database.database import DatabaseConnection from app.src.main.py.database.database_config_interface import DatabaseConfigInterface class AppConfigError(Exception): pass @singleton class AppConfig(Module, DatabaseConfigInterface): DB_USER: str DB_PASS: str DB_HOST: str DB_PORT: str DB_DATABASE: str DB_DIALECT: str DB_DRIVER: str DB_ENCODING: str SQL_LOGGING: str DB_POOL_SIZE: int DB_POOL_CONN_TIMEOUT: int DB_MAX_OVERFLOW: int DB_POOL_RECYCLE: int DB_POOL_PRE_PING: bool ENV: str = os.getenv("ENV") APP_CONFIG_FILE_PREFIX: str = "application-" APP_CONFIG_FILE_EXTETION: str = ".yml" APP_CONFIG_FILE_PATH: str = "resources" APP_CONFIG_FILE_KEY_ROOT_DATABASE: str = "database" @singleton @provider def provide_database_connection(self) -> DatabaseConnection: return DatabaseConnection( self.db_url(), self.migration_url(), self.get_option() ) def __init__(self): # APP_CONFIG_FILE: str = "".join( [self.APP_CONFIG_FILE_PREFIX, self.ENV, self.APP_CONFIG_FILE_EXTETION] ) APP_CONFIG_FILE_LOCATION: str = os.path.join( self.APP_CONFIG_FILE_PATH, APP_CONFIG_FILE ) ROOT_DIR: str = Path(os.path.dirname(__file__)).parent.parent APP_CONFIG_FILE_LOCATION = os.path.join(ROOT_DIR, APP_CONFIG_FILE_LOCATION) try: with open(APP_CONFIG_FILE_LOCATION, "r", encoding="utf8") as yml: config = yaml.safe_load(yml) self.DB_USER = config[self.APP_CONFIG_FILE_KEY_ROOT_DATABASE]["user"] self.DB_PASS = config[self.APP_CONFIG_FILE_KEY_ROOT_DATABASE]["password"] self.DB_HOST = config[self.APP_CONFIG_FILE_KEY_ROOT_DATABASE]["host"] self.DB_PORT = config[self.APP_CONFIG_FILE_KEY_ROOT_DATABASE]["port"] self.DB_DATABASE = config[self.APP_CONFIG_FILE_KEY_ROOT_DATABASE][ "database" ] self.DB_DIALECT = config[self.APP_CONFIG_FILE_KEY_ROOT_DATABASE]["dialect"] self.DB_DRIVER = config[self.APP_CONFIG_FILE_KEY_ROOT_DATABASE]["driver"] self.DB_ENCODING = config[self.APP_CONFIG_FILE_KEY_ROOT_DATABASE][ "encoding" ] self.SQL_LOGGING = config[self.APP_CONFIG_FILE_KEY_ROOT_DATABASE]["logging"] self.DB_POOL_SIZE = config[self.APP_CONFIG_FILE_KEY_ROOT_DATABASE][ "pool_size" ] self.DB_POOL_CONN_TIMEOUT = config[self.APP_CONFIG_FILE_KEY_ROOT_DATABASE][ "pool_connection_timeout" ] self.DB_MAX_OVERFLOW = config[self.APP_CONFIG_FILE_KEY_ROOT_DATABASE][ "max_overflow" ] self.DB_POOL_RECYCLE = config[self.APP_CONFIG_FILE_KEY_ROOT_DATABASE][ "pool_recycle" ] self.DB_POOL_PRE_PING = config[self.APP_CONFIG_FILE_KEY_ROOT_DATABASE][ "pool_pre_ping" ] if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) except Exception as e: raise AppConfigError("setting error") def db_url(self) -> str: # return f"{self.DB_DIALECT}+{self.DB_DRIVER}://{self.DB_USER}:{self.DB_PASS}@{self.DB_HOST}:{self.DB_PORT}/{self.DB_DATABASE}?charset={self.DB_ENCODING}" return f"{self.DB_DIALECT}+{self.DB_DRIVER}://{self.DB_USER}:{self.DB_PASS}@{self.DB_HOST}:{self.DB_PORT}/{self.DB_DATABASE}" def migration_url(self) -> str: # return f"{self.DB_DIALECT}+{self.DB_DRIVER}://{self.DB_USER}:{self.DB_PASS}@{self.DB_HOST}:{self.DB_PORT}/{self.DB_DATABASE}?charset={self.DB_ENCODING}" return f"{self.DB_DIALECT}+{self.DB_DRIVER}://{self.DB_USER}:{self.DB_PASS}@{self.DB_HOST}:{self.DB_PORT}/{self.DB_DATABASE}" def get_option(self): return { "echo": self.SQL_LOGGING, "echo_pool": self.SQL_LOGGING, "pool_size": self.DB_POOL_SIZE, "pool_timeout": self.DB_POOL_CONN_TIMEOUT, "max_overflow": self.DB_MAX_OVERFLOW, "pool_recycle": self.DB_POOL_RECYCLE, "pool_pre_ping": self.DB_POOL_PRE_PING, "client_encoding": self.DB_ENCODING, } ApplicationConfig = AppConfig()
■C:\Users\Toshinobu\Desktop\soft_work\python_work\fastapi\app\src\main\py\database\database.py
import asyncio from contextlib import asynccontextmanager from injector import singleton from sqlalchemy.ext.asyncio import ( AsyncEngine , async_scoped_session , create_async_engine , AsyncSession ) from sqlalchemy.orm import sessionmaker #from sqlmodel.ext.asyncio.session import AsyncSession class DatabaseConnection: @singleton def __init__(self, connection_url: str, migration_url: str, option: dict = {}): self.connection_url = connection_url self.migration_url = migration_url self.option = option self.engine = self.get_async_engine() self.session = self.get_session(self.engine) @asynccontextmanager async def get_db(self): async with self.session() as session: yield session async def close_engine(self): if self.engine: await self.engine.dispose() await self.session.close() self.engine = None self.session = None def get_url(self) -> str: return self.connection_url def get_migration_url(self) -> str: return self.migration_url def get_async_engine(self) -> AsyncEngine: return create_async_engine(self.connection_url, **self.option) def get_session(self, engine: AsyncEngine) -> AsyncSession: async_session_factory = sessionmaker( autocommit=False , autoflush=False , bind=engine , class_=AsyncSession , expire_on_commit=True ) # セッションのスコープ設定 return async_scoped_session(async_session_factory, scopefunc=asyncio.current_task)
■C:\Users\Toshinobu\Desktop\soft_work\python_work\fastapi\app\src\main\py\code\gender.py
from enum import IntEnum class Gender(IntEnum): MAN = 0 FEMALE = 1
■C:\Users\Toshinobu\Desktop\soft_work\python_work\fastapi\app\src\main\py\entity\user_entity.py
from dataclasses import dataclass from datetime import date, datetime # from sqlalchemy.ext.declarative import declarative_base from app.src.main.py.entity.base.declarative_base import Base # SQLテーブルのカラム設定用 from sqlalchemy import ( Table, Column, Integer, Float, String, Date, DateTime, Boolean, ForeignKey, ) # @dataclass # class UserEntity: # id: int # first_name: str # last_name: str # first_name_kana: str # last_name_kana: str # email: str # birth: date | None = None # gender: int | None = None # address_id: int | None = None # tel: str | None = None # created_at: datetime = datetime.now() # update_at: datetime | None = None # Base = declarative_base() class UserEntity(Base): __tablename__ = "user" __table_args__ = {"comment": "ユーザー情報"} id: int = Column(name="id", type_=Integer, primary_key=True, comment="ユーザーID") first_name: str = Column(name="first_name", type_=String(255), comment="名") last_name: str = Column(name="last_name", type_=String(255), comment="氏") first_name_kana: str = Column( name="first_name_kana", type_=String(255), comment="名カナ" ) last_name_kana: str = Column( name="last_name_kana", type_=String(255), comment="氏カナ" ) email: str = Column(name="email", type_=String(128), comment="メールアドレス") birth: date = Column(name="birth", type_=Date, comment="生年月日") gender: int = Column(name="gender", type_=Integer, comment="性別") tel: str = Column(name="tel", type_=String(15), comment="電話番号") create_user: datetime = Column( name="create_user", type_=String(255), nullable=False, comment="登録ユーザー" ) create_at: datetime = Column( name="create_at", type_=DateTime, default=datetime.now(), nullable=False, comment="登録日時", ) update_user: datetime = Column( name="update_user", type_=String(255), nullable=True, comment="更新ユーザー" ) update_at: datetime = Column( name="update_at", type_=DateTime, nullable=True, comment="更新日時" )
■C:\Users\Toshinobu\Desktop\soft_work\python_work\fastapi\app\src\main\py\repository\user_repository.py
from injector import inject, singleton from sqlalchemy import select, insert, update, delete, func from app.src.main.py.database.database import DatabaseConnection from app.src.main.py.entity.user_entity import UserEntity @singleton class UserRepository: @inject def __init__(self, db: DatabaseConnection) -> None: self.db = db # Count async def count(self) -> int: """ レコード件数を取得する """ async with self.db.get_db() as db_session: result = await db_session.execute(select(func.count(UserEntity.id))) count = result.scalar_one() return count # Delete async def delete(self, user_id: int) -> bool: """ ユーザー情報を削除する """ async with self.db.get_db() as db_session: result = await db_session.execute( delete(UserEntity).where(UserEntity.id == user_id) ) await db_session.commit() return result.rowcount > 0 # Select async def find_by_id(self, user_id: int) -> UserEntity: """ ユーザー情報を検索する """ async with self.db.get_db() as db_session: result = await db_session.execute( select(UserEntity).where(UserEntity.id == user_id) ) return result.scalar_one_or_none() async def find_latest_one(self) -> UserEntity: """ ユーザー情報を検索する(最新の1件) """ async with self.db.get_db() as db_session: result = await db_session.execute( select(UserEntity).order_by(UserEntity.id.desc()).limit(1) ) return result.scalar_one_or_none() # Insert async def insert(self, user: UserEntity) -> bool: """ ユーザー情報を登録する """ async with self.db.get_db() as db_session: db_session.add(user) await db_session.commit() return True # Update async def update(self, user: UserEntity) -> bool: """ ユーザー情報を更新する """ async with self.db.get_db() as db_session: result = await db_session.execute( select(UserEntity).where(UserEntity.id == user.id) ) existing_user = result.scalar_one_or_none() if existing_user: for column in UserEntity.__table__.columns.keys(): setattr(existing_user, column, getattr(user, column)) await db_session.commit() return True return False
■C:\Users\Toshinobu\Desktop\soft_work\python_work\fastapi\app\src\main\py\service\user_service.py
from app.src.main.py.entity.user_entity import UserEntity from app.src.main.py.repository.user_repository import UserRepository from injector import inject, singleton @singleton class UserService: @inject def __init__(self, user_repository: UserRepository): self.user_repository = user_repository async def count(self) -> int: return await self.user_repository.count() async def delete(self, user_id: int) -> bool: return await self.user_repository.delete(user_id) async def find_user(self, user_id: int) -> UserEntity: return await self.user_repository.find_by_id(user_id) async def find_latest_one(self) -> UserEntity: return await self.user_repository.find_latest_one() async def insert(self, user: UserEntity) -> bool: return await self.user_repository.insert(user) async def update(self, user: UserEntity) -> bool: return await self.user_repository.update(user)
テスト用のコード。
■C:\Users\Toshinobu\Desktop\soft_work\python_work\fastapi\app\src\test\py\service\test_user_service.py
import pytest import asyncio import sys from datetime import date, datetime from pprint import pprint from injector import Injector, inject # from app.src.main.py.config.app_config import AppConfig from app.src.main.py.service.user_service import UserService from app.src.main.py.entity.user_entity import UserEntity # from app.src.main.py.repository.user_repository import UserRepository from app.src.main.py.core.dependency_inject import di_injector from app.src.main.py.code.gender import Gender if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) class TestUserService: # def __del__(self) -> None: # print("TestUserServiceのデストラクタを実行") # @pytest.fixture(autouse=True) # def setup(self): # self.user_service = UserService # UserServiceのインスタンスを作成 # yield # #@inject # @pytest.fixture(autouse=True) # def execute_before_test(self, user_service: UserService): # """ # テスト実行前処理 # """ # self.user_service = user_service # yield # @pytest.fixture(scope="class", autouse=True) # def di(self): # #DependencyInjector.get_class(DependencyInjector, AppConfig) # # Injector([AppConfig()]) @pytest.fixture(scope="function", autouse=True) def execute_before_test(self, request): """ テスト実行前処理 """ print("\n") print("【テスト開始】" + str(request.node.name)) yield @pytest.fixture(scope="function", autouse=True) def execute_after_test(self, request): """ テスト実行後処理 """ yield print("【テスト完了】" + str(request.node.name), end="") # del(self) ### テスト ### ################################################################ # テスト01 # ・insert ################################################################ @pytest.mark.asyncio async def test_01(self): user_service = di_injector.get_class(UserService) user_id: int = 1 user: UserEntity = UserEntity( id=user_id, first_name="鈴木", last_name="一郎", first_name_kana="スズキ", last_name_kana="イチロウ", email="ichiro@hoge.com", birth=date(1983, 7, 20), gender=int(Gender.MAN), tel="000-8888-8888", create_user="system", create_at=datetime.now(), # update_user=None, # update_at=None, ) user_count: int = await user_service.count() print(user_count) if user_count != 0: # 最新のユーザー情報を取得 latest_user: UserEntity = await user_service.find_latest_one() user.id = latest_user.id + 1 await user_service.insert(user) ################################################################ # テスト02 # ・count # ・select ################################################################ @pytest.mark.asyncio async def test_02(self): user_service = di_injector.get_class(UserService) user_id: int = 1 user_count: int = await user_service.count() print("■■■レコード件数■■■") print(user_count) if user_count != 0: # 最新のユーザー情報を取得 latest_user: UserEntity = await user_service.find_latest_one() user_id = latest_user.id user: UserEntity = await user_service.find_user(user_id) print("■■■取得結果■■■") pprint(vars(user)) ################################################################ # テスト03 # ・update ################################################################ @pytest.mark.asyncio async def test_03(self): user_service = di_injector.get_class(UserService) user_id: int = 1 user_count: int = await user_service.count() print("■■■レコード件数■■■") print(user_count) if user_count != 0: # 最新のユーザー情報を取得 latest_user: UserEntity = await user_service.find_latest_one() user_id = latest_user.id user: UserEntity = await user_service.find_user(user_id) print("■■■取得結果■■■") pprint(vars(user)) # 更新処理 user.birth = date(9999, 7, 20) result: bool = await user_service.update(user) print(result) ################################################################ # テスト04 # ・delete ################################################################ @pytest.mark.asyncio async def test_04(self): user_service = di_injector.get_class(UserService) user_id: int = 1 user_count: int = await user_service.count() print("■■■レコード件数■■■") print(user_count) if user_count != 0: # 最新のユーザー情報を取得 latest_user: UserEntity = await user_service.find_latest_one() user_id = latest_user.id result: bool = await user_service.delete(user_id) print(result)
⇧ で保存。
後は、データベースの処理とかを非同期にしたので、テスト側でも非同期のライブラリをインストールする必要があるようなので、インストール。
で、テストを実行。
Pythonの「依存性注入(DI:Dependency Injection)」の挙動が、いまいちよく分からないですが...
あと、必要なライブラリが分かり辛過ぎるんよね...
そして、
⇧ 性能問題とかも考えていかねばならないっぽいですな...
とりあえず、
⇧ 利用者が多いはずなので、実用的な実装の情報が見つかることを願うばかりですな...
「RDBMS(Relational DataBase Management Systems)」に対するドライバーに依りけりとは思いますが、
- 同期
- 非同期
の両方が選択肢としてあるなら、
まとめ
あとは、使えるドライバーがそもそも違うのでここも注意するところかなと思います。ただ、非同期処理対応のasyncpgはpsycopg2よりも3倍早いということなので、かなりメリットはありそうな気がします。
⇧「非同期」の方が性能が良い傾向にあるのかね?
今日も今日とて、Pythonの学習のモチベーションが上がらない...
毎度モヤモヤ感が半端ない…
今回はこのへんで。