Newer
Older
istex-enrich-monitoring / script / lodex-export / lodex-export.py
@Mathis EON Mathis EON on 18 Jun 2024 16 KB wip
#!/usr/bin/env python3

import sys
from pydantic import BaseModel, Field, RootModel, field_validator
from typing_extensions import Callable, Iterable, Literal, Annotated, Any, cast
import re
import json
import argparse

from enum import Enum


# Defines if a record is an aggregation or not
class Kind(Enum):
    Aggregate = (1,)
    Atomic = 2


def add_kind(kind_value):
    def decorator(cls):
        @classmethod
        def kind(cls) -> Kind:
            return kind_value

        setattr(cls, "kind", kind)
        return cls

    return decorator


def date_validator(value: str) -> str:
    if not re.match(r"^\d{4}-\d{2}-\d{2}$", value):
        raise ValueError("Date must be in the format yyyy-mm-dd")
    return value


@add_kind(Kind.Atomic)
class IstexNbDoc(BaseModel):
    type: Literal["istex.nb_doc"] = Field(..., description="Type de l'élément")
    date: str = Field(..., description="Date de la mesure")
    nb: int = Field(..., description="Nombre de documents")

    @field_validator("date")
    @classmethod
    def check_date_format(cls, value: str):
        return date_validator(value)


IstexNbDoc.__name__ = "istex.nb_doc"


@add_kind(Kind.Atomic)
class IstexNbDocEnriched(BaseModel):
    type: Literal["istex.nb_doc_enriched"] = Field(..., description="Type de l'élément")
    date: str = Field(..., description="Date de la mesure")
    nb: int = Field(..., description="Nombre de documents enrichis")

    @field_validator("date")
    @classmethod
    def check_date_format(cls, value: str):
        return date_validator(value)


IstexNbDocEnriched.__name__ = "istex.nb_doc_enriched"


@add_kind(Kind.Aggregate)
class IstexCategories(BaseModel):
    type: Literal[
        "istex.categories.scopus",
        "istex.categories.wos",
        "istex.categories.inist",
        "istex.categories.scienceMetrix",
    ] = Field(..., description="Type de l'élément")
    date: str = Field(..., description="Date de la mesure")
    category: str = Field(..., description="Nom de la catégorie")
    nb: int = Field(..., description="Nombre de documents pour cette catégorie")

    @field_validator("date")
    @classmethod
    def check_date_format(cls, value: str):
        return date_validator(value)


IstexCategories.__name__ = "istex.categories"


@add_kind(Kind.Aggregate)
class IstexLanguages(BaseModel):
    type: Literal["istex.languages"] = Field(..., description="Type de l'élément")
    date: str = Field(..., description="Date de la mesure")
    language: str = Field(..., description="Langue des documents")
    nb: int = Field(..., description="Nombre de documents pour cette langue")

    @field_validator("date")
    @classmethod
    def check_date_format(cls, value: str):
        return date_validator(value)


IstexLanguages.__name__ = "istex.languages"


@add_kind(Kind.Aggregate)
class IstexAccessConditions(BaseModel):
    type: Literal["istex.access_conditions"] = Field(
        ..., description="Type de l'élément"
    )
    date: str = Field(..., description="Date de la mesure")
    access_condition: str = Field(..., description="Condition d'accès des documents")
    nb: int = Field(..., description="Nombre de documents pour cette condition d'accès")

    @field_validator("date")
    @classmethod
    def check_date_format(cls, value: str):
        return date_validator(value)


IstexAccessConditions.__name__ = "istex.access_conditions"


@add_kind(Kind.Aggregate)
class IstexGenres(BaseModel):
    type: Literal["istex.genres"] = Field(..., description="Type de l'élément")
    date: str = Field(..., description="Date de la mesure")
    genre: str = Field(..., description="Genre des documents")
    nb: int = Field(..., description="Nombre de documents par genre")

    @field_validator("date")
    @classmethod
    def check_date_format(cls, value: str):
        return date_validator(value)


IstexGenres.__name__ = "istex.genres"


@add_kind(Kind.Atomic)
class CorpusNbDoc(BaseModel):
    type: Literal["corpus.nb_doc"] = Field(..., description="Type de l'élément")
    date: str = Field(..., description="Date de la mesure")
    corpus: str = Field(..., description="Nom du corpus")
    nb: int = Field(..., description="Nombre de documents dans le corpus")

    @field_validator("date")
    @classmethod
    def check_date_format(cls, value: str):
        return date_validator(value)


CorpusNbDoc.__name__ = "corpus.nb_doc"


@add_kind(Kind.Atomic)
class CorpusEnrichment(BaseModel):
    type: Literal["corpus.enrichment"] = Field(..., description="Type de l'élément")
    date: str = Field(..., description="Date de la mesure")
    corpus: str = Field(..., description="Nom du corpus")
    enrichment: str = Field(..., description="Type d'enrichissement")
    nb: int = Field(..., description="Nombre de documents enrichis")

    @field_validator("date")
    @classmethod
    def check_date_format(cls, value: str):
        return date_validator(value)


CorpusEnrichment.__name__ = "corpus.enrichment"


@add_kind(Kind.Atomic)
class CorpusEnrichmentLang(BaseModel):
    type: Literal["corpus.enrichment.lang"] = Field(
        ..., description="Type de l'élément"
    )
    date: str = Field(..., description="Date de la mesure")
    corpus: str = Field(..., description="Nom du corpus")
    enrichment: str = Field(..., description="Type d'enrichissement")
    language: str = Field(..., description="Langue des documents enrichis")
    nb: int = Field(..., description="Nombre de documents enrichis pour cette langue")

    @field_validator("date")
    @classmethod
    def check_date_format(cls, value: str):
        return date_validator(value)


CorpusEnrichmentLang.__name__ = "corpus.enrichment.lang"


@add_kind(Kind.Atomic)
class CorpusLanguage(BaseModel):
    type: Literal["corpus.language"] = Field(..., description="Type de l'élément")
    date: str = Field(..., description="Date de la mesure")
    corpus: str = Field(..., description="Nom du corpus")
    language: str = Field(..., description="Langue des documents")
    nb: int = Field(..., description="Nombre de documents pour cette langue")

    @field_validator("date")
    @classmethod
    def check_date_format(cls, value: str):
        return date_validator(value)


CorpusLanguage.__name__ = "corpus.language"


@add_kind(Kind.Atomic)
class CorpusAccessCondition(BaseModel):
    type: Literal["corpus.access_condition"] = Field(
        ..., description="Type de l'élément"
    )
    date: str = Field(..., description="Date de la mesure")
    corpus: str = Field(..., description="Nom du corpus")
    access_condition: str = Field(..., description="Condition d'accès des documents")
    nb: int = Field(..., description="Nombre de documents pour cette condition d'accès")

    @field_validator("date")
    @classmethod
    def check_date_format(cls, value: str):
        return date_validator(value)


CorpusAccessCondition.__name__ = "corpus.access_condition"


@add_kind(Kind.Atomic)
class CorpusGenre(BaseModel):
    type: Literal["corpus.genre"] = Field(..., description="Type de l'élément")
    date: str = Field(..., description="Date de la mesure")
    corpus: str = Field(..., description="Nom du corpus")
    genre: str = Field(..., description="Genre des documents")
    nb: int = Field(..., description="Nombre de documents par genre")

    @field_validator("date")
    @classmethod
    def check_date_format(cls, value: str):
        return date_validator(value)


CorpusGenre.__name__ = "corpus.genre"


@add_kind(Kind.Atomic)
class CorpusCategory(BaseModel):
    type: Literal[
        "corpus.category.wos",
        "corpus.category.scopus",
        "corpus.category.inist",
        "corpus.category.scienceMetrix",
    ] = Field(..., description="Type de l'élément")
    date: str = Field(..., description="Date de la mesure")
    cateogry: str = Field(..., description="Nom de la catégorie")
    nb: int = Field(..., description="Nombre de documents pour cette catégorie")
    corpus: str = Field(..., description="Nom du corpus")

    @field_validator("date")
    @classmethod
    def check_date_format(cls, value: str):
        return date_validator(value)


CorpusCategory.__name__ = "corpus.category"


@add_kind(Kind.Atomic)
class CorpusHostGenre(BaseModel):
    type: Literal["corpus.host.genre"] = Field(..., description="Type de l'élément")
    date: str = Field(..., description="Date de la mesure")
    genre: str = Field(..., description="Genre des documents")
    nb: int = Field(..., description="Nombre de documents par genre")
    corpus: str = Field(..., description="Nom du corpus")

    @field_validator("date")
    def check_date_format(cls, value: str):
        return date_validator(value)


CorpusHostGenre.__name__ = "corpus.host.genre"

DataList = RootModel[
    Annotated[
        IstexNbDoc
        | IstexNbDocEnriched
        | IstexCategories
        | IstexLanguages
        | IstexAccessConditions
        | IstexGenres
        | CorpusNbDoc
        | CorpusEnrichment
        | CorpusEnrichmentLang
        | CorpusLanguage
        | CorpusAccessCondition
        | CorpusGenre
        | CorpusHostGenre
        | CorpusCategory,
        Field(discriminator="type"),
    ]
]

RecordType = (
    IstexNbDoc
    | IstexNbDocEnriched
    | IstexCategories
    | IstexLanguages
    | IstexAccessConditions
    | IstexGenres
    | CorpusNbDoc
    | CorpusEnrichment
    | CorpusEnrichmentLang
    | CorpusLanguage
    | CorpusAccessCondition
    | CorpusGenre
    | CorpusHostGenre
    | CorpusCategory
)


class DataModel(DataList):
    @classmethod
    def filter_by_type(
        cls, records: RecordType, record_type: type[RecordType]
    ) -> RecordType | None:

        record: RecordType
        (_, record) = next(iter(records))

        if type(record) is record_type:
            return record
        else:
            return None


type DateKey = str
type ContextKey = type[RecordType]
type CorpusNameKey = str
type Key = tuple[DateKey, ContextKey, CorpusNameKey | None]


class Statistics:
    def __init__(self) -> None:
        self.contexts: list[type[RecordType]] = [
            CorpusNbDoc,
            IstexNbDoc,
            IstexNbDocEnriched,
        ]
        self.statistic_units: list[type[RecordType]] = [
            CorpusEnrichment,
            CorpusAccessCondition,
        ]

        self.contexts_data: dict[Key, Any] = {}
        self.corpus_statistics: list[RecordType] = list()
        self.contextualized_statistics: list[dict[str, Any]] = list()

    def collect_contextual_data(self, record: RecordType) -> None:
        """
        Collecte les données contextuelles (nombre de documents dans Istex, nombre de documents enrichis dans Istex, nombre de documents dans un corpus) pour permettre l'enrichissement ultérieur des statistiques.

        Les données contextuelles peuvent être au niveau d'Istex ou d'un corpus.

        Ces données sont stockées dans un dictionnaire avec pour clé : date du relevé statistique, le type de contexte et le cas échéant le nom du corpus. Si les données concernent Istex le nom du corpus doit être remplacé par `None`
        """
        for context_type in self.contexts:
            doc: RecordType | list[RecordType] | None = DataModel.filter_by_type(
                record, context_type
            )

            if doc:
                data: dict[str, Any] | list[Any]

                corpus: str | None = dict(doc).get("corpus", None)
                key: tuple[str, type[RecordType], str | None] = (
                    doc.date,
                    context_type,
                    corpus,
                )

                if doc.kind() == Kind.Aggregate:  # si agrégat on créer une liste
                    data = [dict(doc)]
                else:  # Sinon on créer un dictionnaire
                    data = dict(doc)

                if self.contexts_data.get(key):
                    self.contexts_data[key] += data
                else:
                    self.contexts_data[key] = data

    def collect_statistics(self, record: RecordType) -> None:
        """
        Collecte les données statistiques. C'est à dire les données qui ne sont pas contextuelles.
        """
        for type in self.statistic_units:
            doc = DataModel.filter_by_type(record, type)

            if doc:
                self.corpus_statistics.append(doc)

    def add_context(self) -> None:
        """
        Ajoute pour chaque donnée statistique des données contextuelles (nombre de documents dans Istex, nombre de documents enrichis dans Istex, nombre de documents dans le corpus). L'ajout des données contextuelles repose sur la date et éventuellement le nom du corpus.
        """
        for record in self.corpus_statistics:
            date: str = record.date
            corpus: str = record.corpus
            record = dict(record)

            for context in self.contexts:
                istex_context_data: RecordType | None = self.contexts_data.get(
                    (date, context, None)
                )

                if istex_context_data:
                    record[context.__name__] = istex_context_data

                corpus_context_data: RecordType | None = self.contexts_data.get(
                    (date, context, corpus)
                )

                if corpus_context_data:
                    record[context.__name__] = corpus_context_data

            self.contextualized_statistics.append(record)

    def load(self, file_path: str) -> None:
        with open(file_path) as file:
            while True:
                line: str = file.readline()

                if not line:
                    break

                self.process_lines(line)

            self.add_context()

    def read(self) -> None:
        for line in sys.stdin:
            if "Exit" == line.rstrip():
                break

            self.process_lines(line)

        self.add_context()

    def process_lines(self, line: str) -> None:
        try:
            parsed_data: RootModel[RecordType] = DataList.model_validate_json(line)

            if parsed_data:
                data: RecordType = cast(RecordType, parsed_data)

                self.collect_contextual_data(data)
                self.collect_statistics(data)
        except:
            print(line)

    def write(
        self, formatter: Callable[[list[Any]], list[str]], file: str | None = None
    ) -> None:
        if file:
            with open(file, "w") as f:
                for record in formatter(self.contextualized_statistics):
                    _ = f.write(record)

        else:
            for record in formatter(self.contextualized_statistics):
                sys.stdout.write(record)


def jsonl_formatter(
    preprocessor: Callable[[list[Any]], list[Any]] | None = None
) -> Callable[[list[Any]], list[str]]:
    def format(data: list[Any]) -> list[str]:
        d: list[Any] = data

        if preprocessor:
            d: list[Any] = preprocessor(data)

        result: list[str] = list(map(lambda elem: json.dumps(elem), d))
        return result

    return format


def group_by_preprocessor(
    key: str,
) -> Callable[[list[dict[str, Any]]], list[dict[str, list[Any]]]]:
    def group_by(data: list[dict[str, Any]]) -> list[dict[str, list[Any]]]:
        grouped_data: dict[str, list[Any]] = {}

        for item in data:
            if key in item:
                value: Any = item[key]

                if value not in grouped_data:
                    grouped_data[value] = []

                grouped_data[value].append(item)

        return [{key: group, "data": grouped_data[group]} for group in grouped_data]

    return group_by


def main():
    parser = argparse.ArgumentParser(prog="PROG", usage="%(prog)s [options]")
    _ = parser.add_argument(
        "-f",
        "--file",
        nargs="?",
        help="File to read data from. If no file is provided the STDIN is used",
        type=str,
    )

    _ = parser.add_argument(
        "-o",
        "--output",
        nargs="?",
        help="File to write data to. If no file is provided the STDOUT is used",
        type=str,
    )

    args = parser.parse_args()
    file: str = cast(str, args.file)
    output: str = cast(str, args.output)

    statistics_builder = Statistics()

    if file:
        statistics_builder.load(file)
    else:
        statistics_builder.read()

    statistics_builder.write(jsonl_formatter(group_by_preprocessor("corpus")), output)


if __name__ == "__main__":
    main()