Ir para conteúdo
  • 0

Aula - Atualização Incremental com API Paginada


Henrique Marsal

Pergunta

  • Alunos

Peguei a aula e tentei adaptar o script que faz a atualização incremental dos dados no mysyql na API Movidesk no tipo oData. mas o script não lida com a alternativa de inserir novas colunas . eu preciso dar um drop no banco e reinserir os dados outra vez. Preciso de algum help para me ajudar o script a lidar com a opção de ao inserir novas colunas ele fazer att full da coluna nova e depois incremental normal. 

Já tentei umas 200x com GPT mas na hora de rodar o script ele travava.

 

import requests
import pandas as pd
from dotenv import load_dotenv
import os
import time
from sqlalchemy import (
    create_engine,
    MetaData,
    Table,
    Column,
    Integer,
    String,
    DECIMAL,
    DATETIME,
    inspect,
    text,
)
from sqlalchemy.exc import IntegrityError
from datetime import datetime, timedelta
 
# Carrega as variáveis de ambiente do arquivo .env
load_dotenv()
 
# Define a URL base da API Movidesk
 
# Pega o token do arquivo .env
token = os.getenv("API_TOKEN")
 
# Função para chamar a API Movidesk e obter os dados paginados
def chamar_api_movidesk(token, params=None😞
    lista_dados_todas_paginas = []
    skip = 0  # Controle de paginação, começa com 0
    top = 1000  # Número máximo de resultados por página
 
    # Configura parâmetros fixos
    if params is None:
        params = {}
 
    # Inclui o token e a paginação nos parâmetros
    params.update({"token": token, "$top": top, "$skip": skip})
 
    while True:
        response = requests.get(base_url, params=params)
        response_ajustado_json = response.json()
 
        # Verifica se a resposta contém dados
        if isinstance(response_ajustado_json, list) and response_ajustado_json:
            lista_dados_todas_paginas.extend(response_ajustado_json)
        else:
            # Se a resposta for uma lista vazia, significa que não há mais páginas
            break
 
        # Atualiza o valor de skip para pegar a próxima página
        skip += top
        params["$skip"] = skip
        time.sleep(1)  # Respeita o limite de requisições da API
 
    return lista_dados_todas_paginas
 
# Função para obter todos os tickets sem filtro
def chamar_api_inicial_completa():
    # Definindo campos de interesse
    params = {"$select": "id,createdDate,lastUpdate,type,category,urgency,status,ownerTeam,serviceFull,resolvedIn,slaSolutionDate,slaRealResponseDate,slaResponseDate"}
    lista_dados_todas_paginas = chamar_api_movidesk(token, params)
    df = pd.DataFrame(lista_dados_todas_paginas)
    return df
 
# Função para filtrar tickets criados nos últimos dois dias
def atualizar_dados_ultimos_dois_dias():
    dois_dias_atras = (datetime.now() - timedelta(days=2)).strftime(
        "%Y-%m-%dT%H:%M:%SZ"
    )
    # Definindo parâmetros com filtro de data
    params = {
        "$select": "id,createdDate,lastUpdate",
        "$filter": f"createdDate ge {dois_dias_atras}",
    }
    lista_dados_todas_paginas = chamar_api_movidesk(token, params)
    df = pd.DataFrame(lista_dados_todas_paginas)
    return df
 
# Converta as colunas de data para o formato aceito pelo MySQL
data_columns = ["createdDate", "lastUpdate"]
 
# Conexão com o banco de dados MySQL - PUXANDO OS DADOS DO ARQUIVO .env
db_user = os.getenv("DB_USER")  # Nome do usuário no MySQL
db_password = os.getenv("DB_PASSWORD")  # Senha do MySQL
db_host = os.getenv("DB_HOST")  # Host do banco de dados
db_name = os.getenv("DB_NAME")  # Nome do banco de dados
 
# String de conexão com o banco de dados, passando as variáveis do .env
engine = create_engine(f"mysql+pymysql://{db_user}:{db_password}@{db_host}/{db_name}")
 
# Instanciando o objeto MetaData para criar a tabela no banco de dados
metadata = MetaData()
 
# Crie a tabela com todas as colunas necessárias e defina a chave primária
tickets_table = Table(
    "tickets",
    metadata,
    Column("id", String(255), primary_key=True),
    Column("createdDate", DATETIME),
    Column("lastUpdate", DATETIME),
)
 
# Verifica se a tabela já existe no banco de dados
inspector = inspect(engine)
if "tickets" in inspector.get_table_names():
    with engine.connect() as connection:
        result = connection.execute(text("SELECT COUNT(*) FROM tickets"))
        count = result.fetchone()[0]
        if count > 0:
            # Se a tabela tiver dados, atualiza os últimos dois dias
            df_incremental = atualizar_dados_ultimos_dois_dias()
            if df_incremental.empty:
                print("Não há dados para atualizar.")
            else:
                # Converte as colunas de data para o formato correto
                for col in data_columns:
                    if col in df_incremental.columns:
                        df_incremental[col] = pd.to_datetime(
                            df_incremental[col]
                        ).dt.strftime("%Y-%m-%d %H:%M:%S")
 
                inseridos_com_sucesso = 0
                for index, row in df_incremental.iterrows():
                    try:
                        row.to_frame().T.to_sql(
                            "tickets", con=engine, if_exists="append", index=False
                        )
                        inseridos_com_sucesso += 1
                    except IntegrityError:
                        continue  # Ignora duplicações
 
                print(
                    f"Total de registros inseridos com sucesso: {inseridos_com_sucesso}"
                )
        else:
            print("Tabela existente mas sem dados, realizando carga inicial completa.")
            df_inicial = chamar_api_inicial_completa()
else:
    print("Tabela não existe, criando tabela e realizando carga inicial completa.")
    metadata.create_all(engine)  # Cria a tabela
    df_inicial = chamar_api_inicial_completa()
 
# Verifica se df_inicial foi criado e insere dados no banco
if "df_inicial" in locals():
    for col in data_columns:
        if col in df_inicial.columns:
            df_inicial[col] = pd.to_datetime(df_inicial[col]).dt.strftime(
                "%Y-%m-%d %H:%M:%S"
            )
 
    inseridos_com_sucesso = 0
    for index, row in df_inicial.iterrows():
        try:
            row.to_frame().T.to_sql(
                "tickets", con=engine, if_exists="append", index=False
            )
            inseridos_com_sucesso += 1
        except IntegrityError:
            continue  # Ignora duplicações
 
    print(f"Total de registros inseridos com sucesso: {inseridos_com_sucesso}")
Editado por Henrique Marsal
Link para o comentário
Compartilhar em outros sites

1 resposta a esta questão

Posts Recomendados

  • 0
  • Alunos
17 horas atrás, Henrique Marsal disse:

Peguei a aula e tentei adaptar o script que faz a atualização incremental dos dados no mysyql na API Movidesk no tipo oData. mas o script não lida com a alternativa de inserir novas colunas . eu preciso dar um drop no banco e reinserir os dados outra vez. Preciso de algum help para me ajudar o script a lidar com a opção de ao inserir novas colunas ele fazer att full da coluna nova e depois incremental normal. 

Já tentei umas 200x com GPT mas na hora de rodar o script ele travava.

 

import requests
import pandas as pd
from dotenv import load_dotenv
import os
import time
from sqlalchemy import (
    create_engine,
    MetaData,
    Table,
    Column,
    Integer,
    String,
    DECIMAL,
    DATETIME,
    inspect,
    text,
)
from sqlalchemy.exc import IntegrityError
from datetime import datetime, timedelta
 
# Carrega as variáveis de ambiente do arquivo .env
load_dotenv()
 
# Define a URL base da API Movidesk
 
# Pega o token do arquivo .env
token = os.getenv("API_TOKEN")
 
# Função para chamar a API Movidesk e obter os dados paginados
def chamar_api_movidesk(token, params=None😞
    lista_dados_todas_paginas = []
    skip = 0  # Controle de paginação, começa com 0
    top = 1000  # Número máximo de resultados por página
 
    # Configura parâmetros fixos
    if params is None:
        params = {}
 
    # Inclui o token e a paginação nos parâmetros
    params.update({"token": token, "$top": top, "$skip": skip})
 
    while True:
        response = requests.get(base_url, params=params)
        response_ajustado_json = response.json()
 
        # Verifica se a resposta contém dados
        if isinstance(response_ajustado_json, list) and response_ajustado_json:
            lista_dados_todas_paginas.extend(response_ajustado_json)
        else:
            # Se a resposta for uma lista vazia, significa que não há mais páginas
            break
 
        # Atualiza o valor de skip para pegar a próxima página
        skip += top
        params["$skip"] = skip
        time.sleep(1)  # Respeita o limite de requisições da API
 
    return lista_dados_todas_paginas
 
# Função para obter todos os tickets sem filtro
def chamar_api_inicial_completa():
    # Definindo campos de interesse
    params = {"$select": "id,createdDate,lastUpdate,type,category,urgency,status,ownerTeam,serviceFull,resolvedIn,slaSolutionDate,slaRealResponseDate,slaResponseDate"}
    lista_dados_todas_paginas = chamar_api_movidesk(token, params)
    df = pd.DataFrame(lista_dados_todas_paginas)
    return df
 
# Função para filtrar tickets criados nos últimos dois dias
def atualizar_dados_ultimos_dois_dias():
    dois_dias_atras = (datetime.now() - timedelta(days=2)).strftime(
        "%Y-%m-%dT%H:%M:%SZ"
    )
    # Definindo parâmetros com filtro de data
    params = {
        "$select": "id,createdDate,lastUpdate",
        "$filter": f"createdDate ge {dois_dias_atras}",
    }
    lista_dados_todas_paginas = chamar_api_movidesk(token, params)
    df = pd.DataFrame(lista_dados_todas_paginas)
    return df
 
# Converta as colunas de data para o formato aceito pelo MySQL
data_columns = ["createdDate", "lastUpdate"]
 
# Conexão com o banco de dados MySQL - PUXANDO OS DADOS DO ARQUIVO .env
db_user = os.getenv("DB_USER")  # Nome do usuário no MySQL
db_password = os.getenv("DB_PASSWORD")  # Senha do MySQL
db_host = os.getenv("DB_HOST")  # Host do banco de dados
db_name = os.getenv("DB_NAME")  # Nome do banco de dados
 
# String de conexão com o banco de dados, passando as variáveis do .env
engine = create_engine(f"mysql+pymysql://{db_user}:{db_password}@{db_host}/{db_name}")
 
# Instanciando o objeto MetaData para criar a tabela no banco de dados
metadata = MetaData()
 
# Crie a tabela com todas as colunas necessárias e defina a chave primária
tickets_table = Table(
    "tickets",
    metadata,
    Column("id", String(255), primary_key=True),
    Column("createdDate", DATETIME),
    Column("lastUpdate", DATETIME),
)
 
# Verifica se a tabela já existe no banco de dados
inspector = inspect(engine)
if "tickets" in inspector.get_table_names():
    with engine.connect() as connection:
        result = connection.execute(text("SELECT COUNT(*) FROM tickets"))
        count = result.fetchone()[0]
        if count > 0:
            # Se a tabela tiver dados, atualiza os últimos dois dias
            df_incremental = atualizar_dados_ultimos_dois_dias()
            if df_incremental.empty:
                print("Não há dados para atualizar.")
            else:
                # Converte as colunas de data para o formato correto
                for col in data_columns:
                    if col in df_incremental.columns:
                        df_incremental[col] = pd.to_datetime(
                            df_incremental[col]
                        ).dt.strftime("%Y-%m-%d %H:%M:%S")
 
                inseridos_com_sucesso = 0
                for index, row in df_incremental.iterrows():
                    try:
                        row.to_frame().T.to_sql(
                            "tickets", con=engine, if_exists="append", index=False
                        )
                        inseridos_com_sucesso += 1
                    except IntegrityError:
                        continue  # Ignora duplicações
 
                print(
                    f"Total de registros inseridos com sucesso: {inseridos_com_sucesso}"
                )
        else:
            print("Tabela existente mas sem dados, realizando carga inicial completa.")
            df_inicial = chamar_api_inicial_completa()
else:
    print("Tabela não existe, criando tabela e realizando carga inicial completa.")
    metadata.create_all(engine)  # Cria a tabela
    df_inicial = chamar_api_inicial_completa()
 
# Verifica se df_inicial foi criado e insere dados no banco
if "df_inicial" in locals():
    for col in data_columns:
        if col in df_inicial.columns:
            df_inicial[col] = pd.to_datetime(df_inicial[col]).dt.strftime(
                "%Y-%m-%d %H:%M:%S"
            )
 
    inseridos_com_sucesso = 0
    for index, row in df_inicial.iterrows():
        try:
            row.to_frame().T.to_sql(
                "tickets", con=engine, if_exists="append", index=False
            )
            inseridos_com_sucesso += 1
        except IntegrityError:
            continue  # Ignora duplicações
 
    print(f"Total de registros inseridos com sucesso: {inseridos_com_sucesso}")

Para resolver o problema de lidar com a inserção de novas colunas sem precisar fazer um drop na tabela, você pode seguir uma abordagem onde o script verifica quais colunas existem no banco de dados e compara com as colunas da API. Se houver novas colunas, ele faz uma atualização full para essas colunas e continua com o processo incremental. Além disso, caso alguma coluna não venha mais da API, o script emite um alerta.

Exemplo sugirido pelo nosso amigo GPT:
 

import requests
import pandas as pd
from dotenv import load_dotenv
import os
import time
from sqlalchemy import create_engine, inspect, text
from sqlalchemy.exc import IntegrityError
from datetime import datetime, timedelta

# Carrega as variáveis de ambiente do arquivo .env
load_dotenv()

# Define a URL base da API Movidesk
base_url = "https://api.movidesk.com/public/v1/tickets/past"
token = os.getenv("API_TOKEN")

# Função para chamar a API Movidesk e obter os dados paginados
def chamar_api_movidesk(token, params=None):
    lista_dados_todas_paginas = []
    skip = 0  
    top = 1000  

    if params is None:
        params = {}

    params.update({"token": token, "$top": top, "$skip": skip})

    while True:
        response = requests.get(base_url, params=params)
        response_ajustado_json = response.json()

        if isinstance(response_ajustado_json, list) and response_ajustado_json:
            lista_dados_todas_paginas.extend(response_ajustado_json)
        else:
            break

        skip += top
        params["$skip"] = skip
        time.sleep(1)

    return lista_dados_todas_paginas

# Função para comparar colunas entre o banco de dados e a API
def verificar_novas_colunas(api_columns, db_columns):
    novas_colunas = [col for col in api_columns if col not in db_columns]
    colunas_faltantes = [col for col in db_columns if col not in api_columns]

    return novas_colunas, colunas_faltantes

# Função para obter as colunas atuais da tabela do banco
def get_db_columns(engine, tabela):
    inspector = inspect(engine)
    return [col["name"] for col in inspector.get_columns(tabela)]

# Função para atualizar os últimos dois dias
def atualizar_dados_ultimos_dois_dias():
    dois_dias_atras = (datetime.now() - timedelta(days=2)).strftime("%Y-%m-%dT%H:%M:%SZ")
    params = {"$select": "id,createdDate,lastUpdate", "$filter": f"createdDate ge {dois_dias_atras}"}
    lista_dados_todas_paginas = chamar_api_movidesk(token, params)
    return pd.DataFrame(lista_dados_todas_paginas)

# Conexão com o banco de dados MySQL
db_user = os.getenv("DB_USER")
db_password = os.getenv("DB_PASSWORD")
db_host = os.getenv("DB_HOST")
db_name = os.getenv("DB_NAME")

engine = create_engine(f"mysql+pymysql://{db_user}:{db_password}@{db_host}/{db_name}")

# Pega os dados da API
df_api = chamar_api_movidesk(token)
api_columns = df_api.columns.tolist()

# Verifica se a tabela já existe no banco
if "tickets" in inspect(engine).get_table_names():
    db_columns = get_db_columns(engine, "tickets")

    novas_colunas, colunas_faltantes = verificar_novas_colunas(api_columns, db_columns)

    if novas_colunas:
        print(f"Novas colunas detectadas: {novas_colunas}. Realizando atualização completa.")
        # Código para lidar com a inserção completa dessas colunas
        df_api[novas_colunas].to_sql('tickets', con=engine, if_exists='replace', index=False)
    else:
        print("Nenhuma nova coluna detectada, realizando atualização incremental.")
        df_incremental = atualizar_dados_ultimos_dois_dias()
        df_incremental.to_sql('tickets', con=engine, if_exists='append', index=False)

    if colunas_faltantes:
        print(f"Alerta: As seguintes colunas estão ausentes da API: {colunas_faltantes}")

else:
    print("Tabela 'tickets' não existe, criando e realizando carga completa.")
    df_api.to_sql('tickets', con=engine, if_exists='replace', index=False)

Espero ter ajudado! 
 

  • Like 1
Link para o comentário
Compartilhar em outros sites

Faça login para comentar

Você vai ser capaz de deixar um comentário após fazer o login



Entrar Agora
×
×
  • Criar Novo...