edman.utils のソースコード

import re
from collections import defaultdict
from typing import Union, Callable
from datetime import datetime
import dateutil.parser
from bson import ObjectId, errors


[ドキュメント]class Utils: """ | 各クラス共通の静的メソッド | インスタンス化禁止 """ def __init__(self): raise NotImplementedError('not allowed')
[ドキュメント] @staticmethod def item_literal_check(list_child: Union[dict, list]) -> bool: """ | リストデータ内にリテラルやオブジェクトのデータだけあればTrue | それ以外はFalse | | OKパターン | list_child = [1,2,3] | list_child = [1,2,objectId()] | | NGパターン | list_child = {'A':'B'} | list_child = ['A':[1,2,3]] | list_child = [1,2,{'A':'B'}] :param list_child: :type list_child: dict or list :return: :rtype: bool """ result = True if isinstance(list_child, dict): result = False if isinstance(list_child, list): for j in list_child: if isinstance(j, dict) or isinstance(j, list): result = False break return result
[ドキュメント] @staticmethod def doc_traverse(doc: dict, target_keys: list, query: list, f: Callable) -> dict: """ | ドキュメントを走査し、クエリで指定した階層に指定の関数オブジェクトを適応 | 関数適応後のドキュメントを返す :param dict doc: :param list target_keys: コールバック関数の適応対象の辞書のキーのリスト :param list query: :param Callable f: コールバック関数 :return: doc :rtype: dict """ query.reverse() # リストの削除処理速度向上のため、逆リストにする def rec(document): """ 再帰中にクエリを一つづつ消費し、最後のクエリに到達したら更新 """ for key, value in document.items(): # クエリを全て消費しているなら終了 if len(query) == 0: break if isinstance(value, dict): if key == query[-1]: del query[-1] if len(query) == 0: # 最終クエリに到達しているなら更新 f(value, target_keys) else: rec(value) else: rec(value) # リストデータは項目と同じ扱いなので繰り返す elif isinstance(value, list) and Utils.item_literal_check( value): continue elif isinstance(value, list): # 現在のクエリが数値(リストのインデックス)なら再帰に入る if query[-1].isdecimal(): del query[-1] for i in value: rec(i) # 現在のクエリとループのキーが違う場合は繰り返す elif query[-1] != key: continue # 現在のキーがクエリと一致している場合 # 次のクエリが数値(リストのインデックスの時)、 # インデックスを直接指定して再帰に入る elif len(query) >= 2 and query[-2].isdecimal(): needle = int(query[-2]) del query[-2:] if len(query) == 0: f(value[needle], target_keys) else: rec(value[needle]) else: # リスト内の辞書を先に入る時、 # 現在のクエリがインデックスでないのはおかしい # したがって例外処理の対象になる raise ValueError('クエリが正しくありません.リストの場合、インデックスを指定してください') else: # 項目の部分は関係ないので繰り返す continue try: rec(doc) except ValueError as e: raise e except IndexError as e: raise e if len(query) != 0: raise ValueError(f'クエリが実行されませんでした. 実行されなかったクエリ:{query}') return doc
[ドキュメント] @staticmethod def conv_objectid(oid: Union[str, ObjectId]) -> ObjectId: """ | 文字列だった場合ObjectIdを変換する | 元々ObjectIdならそのまま :param oid: :type oid: ObjectId or str :return: result :rtype: ObjectId """ if isinstance(oid, str): try: result = ObjectId(oid) except errors.InvalidId: raise else: result = oid return result
[ドキュメント] @staticmethod def to_datetime(s: str) -> Union[datetime, str]: """ | 日付もしくは日付時間をdatetimeオブジェクトに変換 | 日付や日付時間にならないものは文字列に変換 :param str s: :return: :rtype: datetime or str """ if not isinstance(s, str): return str(s) try: return dateutil.parser.parse(s) except ValueError: return str(s)
[ドキュメント] @staticmethod def query_check(query: list, doc: dict) -> bool: """ クエリーが正しいか評価 :param list query: :param dict doc: :return: result :rtype: bool """ result = False for key in query: if key.isdecimal(): key = int(key) try: doc = doc[key] except (KeyError, IndexError): # インデクスの指定ミスは即時終了 result = False break else: # for~elseを利用していることに注意 if isinstance(doc, dict): result = True return result
[ドキュメント] @staticmethod def item_delete(doc: dict, del_keys: tuple) -> dict: """ 特定のキーの項目を削除 _id、親と子のリファレンス、ファイルリファレンスなど :param dict doc: :param tuple del_keys: :return: item :rtype: dict """ for key in (i for i in del_keys if i in doc): del doc[key] return doc
[ドキュメント] @staticmethod def child_combine(rec_result: list) -> dict: """ | 同じコレクションのデータをリストでまとめるジェネレータ | | コレクション:ドキュメントのリストを作成 | {collection:[{key:value},{key:value}...]} :param list rec_result: :return: :rtype: dict """ for bros in rec_result: tmp_bros = defaultdict(list) for docs in bros: for collection, doc in docs.items(): tmp_bros[collection].append(doc) yield dict(tmp_bros)
[ドキュメント] @staticmethod def field_name_check(field_name: str) -> bool: """ | MongoDBの命名規則チェック(フィールド名) | void, None(Null), 文字列先頭に($ .)は使用不可 | | https://docs.mongodb.com/manual/reference/limits/#Restrictions-on-Field-Names :param str field_name: :return: :rtype: bool """ if field_name is None: return False if not isinstance(field_name, str): field_name = str(field_name) if len(field_name) == 0: return False if field_name[0] in ('$', '.'): return False return True
[ドキュメント] @staticmethod def collection_name_check(collection_name: str) -> bool: """ | MongoDBの命名規則チェック(コレクション名) | # $ None(null) '' system. | 最初がアンスコか文字 | mongoの制約の他に頭文字に#もNG | | コレクション名空間の最大長は、データベース名、ドット(.)区切り文字 | およびコレクション名(つまり <database>.<collection>)を合わせて | 120バイトを超えないこと | ただし、子のメソッド利用時はDB名を取得するタイミングではないため、 | 文字数のチェックは行えないことを留意すること | | https://docs.mongodb.com/manual/reference/limits/#Restriction-on-Collection-Names :param str collection_name: :return: :rtype: bool """ if collection_name is None: return False if not isinstance(collection_name, str): collection_name = str(collection_name) collection_name_length = len(collection_name) if collection_name_length == 0: return False if '$' in collection_name: return False if collection_name.startswith( 'system.') or collection_name.startswith('#'): return False # 先頭に記号があるとマッチする if re.match(r'(\W)', collection_name) is not None: return False return True
[ドキュメント] @staticmethod def type_cast_conv(datatype: str) -> Union[bool, int, float, str, dateutil.parser.parse]: """ データタイプに合わせて型を選択する :param int datatype: :return: :rtype: bool or int or float or str or dateutil.parser.parse """ type_table = { 'bool': bool, 'int': int, 'float': float, 'str': str, 'datetime': dateutil.parser.parse } return type_table.get(datatype, str)