Files
jean_app/classificacao_veranicos.py
2025-06-01 16:44:49 -03:00

427 lines
16 KiB
Python
Executable File

import os
import pandas as pd
import numpy as np
from time import sleep
from datetime import datetime
import csv
from odf.opendocument import OpenDocumentSpreadsheet
from odf.table import Table, TableRow, TableCell
from odf.text import P
def listar_csvs(diretorio):
arquivos_csv = [nome for nome in os.listdir(diretorio) if nome.endswith('.csv') and os.path.isfile(os.path.join(diretorio, nome))]
return arquivos_csv
def processar_Classificao_Veranicos():
print("processar classificacao veranicos..")
origem = '41veranicoschuvosa'
files = listar_csvs(origem)
decendio = carregar_decendios('decendio.csv')
for file in files:
print(f"Encontrada pasta: {file}")
qtd_veranicos,estacao = carregar_estacao_com_veranico(file)
print(estacao)
rodar_decendio(decendio,qtd_veranicos, estacao, file)
from datetime import datetime
def classificar_aplitude(valor: int) -> int:
if 0 <= valor <= 3:
return 0
elif 4 <= valor <= 10:
return 1
elif 11 <= valor <= 20:
return 2
elif 21 <= valor <= 31:
return 3
else: # 32 ou mais
return 4
def diferenca_dias(data1, data2):
formato = "%Y-%m-%d"
# Converter para string se for Timestamp
if not isinstance(data1, str):
data1 = data1.strftime(formato)
if not isinstance(data2, str):
data2 = data2.strftime(formato)
# Verificar se as datas são válidas
if not data1 or not data2:
print(f"Erro: Data inválida -> data1: {data1}, data2: {data2}")
return None
try:
d1 = datetime.strptime(data1, formato)
d2 = datetime.strptime(data2, formato)
return (d2 - d1).days
except ValueError as e:
print(f"Erro ao converter datas: {data1}, {data2} - {e}")
return None
def rodar_decendio(decendios,qtd_veranicos, estacoes, file):
destino= "42veranicosclassificacao"
if not os.path.exists(destino):
os.makedirs(destino)
#estacoes= [2143011]
#estacoes= [2142009]
resultados = []
for estacao in estacoes: # percorre todas as estações array de estacoes
estacao_fitrada = qtd_veranicos[qtd_veranicos['CODIGO'] == estacao]
print("Quantidade de itens na estação", estacao, ":", len(estacao_fitrada))
exit
if len(estacao_fitrada) == 0:
print(f"Erro: Nenhuma linha encontrada para a estação {estacao} em qtd_veranicos.")
break
total = len(estacao_fitrada)
for index, selecao_estacao in estacao_fitrada.iterrows(): # Itera pelas linhas do DataFrame
print(f"\rProcessando estacao {index+1}/{total}", end='')
data_inicio = pd.to_datetime(selecao_estacao['INICIO'])
data_fim = pd.to_datetime(selecao_estacao['FINAL'])
if pd.isna(data_inicio) or pd.isna(data_fim):
print(f"Erro: Datas inválidas na linha {index} da estação {estacao} - INICIO: {data_inicio}, FINAL: {data_fim}")
continue
filtro = decendios[(decendios['INICIO'] <= data_fim) & (decendios['FINAL'] >= data_inicio)]
quantidade_veranicos = len(filtro)
if quantidade_veranicos <= 0:
print(f"estacao {estacao} - Linha {index} - Data Início: {data_inicio}, Data Fim: {data_fim}, Qtd Veranicos: {quantidade_veranicos}")
continue
dia =selecao_estacao['QTDDIAS']
filtro = filtro.copy() # Para evitar avisos do Pandas
filtro['dias_chuva'] = 0
filtro['dias_decendio'] = 0
filtro['amplitude'] =classificar_aplitude(dia)
diferenca=diferenca_dias(data_inicio, data_fim)+1
quantidade_veranicos = len(filtro) # Conta o número de linhas
#print(index,"- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -")
#print ('estacao: ',selecao_estacao , ' Qtd Dias: ', diferenca)
if quantidade_veranicos <=0:
print(f"Tem algum problema registros ({quantidade_veranicos} linhas).")
elif quantidade_veranicos <= 1:
#print(f"Tem mais de 1 registros ({quantidade_veranicos} linhas).")
filtro.loc[filtro.index[0], 'dias_chuva'] = diferenca_dias(data_inicio, data_fim)+1
filtro.loc[filtro.index[0], 'dias_decendio'] =diferenca_dias(filtro.iloc[0]['INICIO'], filtro.iloc[0]['FINAL'])+1
filtro["diferenca"] = abs(filtro["dias_chuva"] - filtro["dias_decendio"])
df_sorted = filtro.sort_values(by="diferenca")
filtro = df_sorted.iloc[:1]
elif quantidade_veranicos <= 2:
#print(f"Tem mais de 2 registros ({quantidade_veranicos} linhas).")
filtro.loc[filtro.index[0], 'dias_chuva'] = diferenca_dias(data_inicio, filtro.iloc[0]['FINAL'])+1
filtro.loc[filtro.index[1], 'dias_chuva'] = diferenca_dias(filtro.iloc[1]['INICIO'], data_fim)+1
filtro.loc[filtro.index[0], 'dias_decendio'] =diferenca_dias(filtro.iloc[0]['INICIO'], filtro.iloc[0]['FINAL'])+1
filtro.loc[filtro.index[1], 'dias_decendio'] =diferenca_dias(filtro.iloc[1]['INICIO'], filtro.iloc[1]['FINAL'])+1
filtro["diferenca"] = abs(filtro["dias_chuva"] - filtro["dias_decendio"])
df_sorted = filtro.sort_values(by="diferenca")
filtro = df_sorted.iloc[:1]
elif quantidade_veranicos <= 3:
#print(f"Tem mais de 3 registros ({quantidade_veranicos} linhas).")
filtro.loc[filtro.index[0], 'dias_chuva'] = diferenca_dias(data_inicio, filtro.iloc[0]['FINAL'])+1
filtro.loc[filtro.index[1], 'dias_chuva'] = diferenca_dias(filtro.iloc[1]['INICIO'], filtro.iloc[1]['FINAL']) +1
filtro.loc[filtro.index[2], 'dias_chuva'] = diferenca_dias(filtro.iloc[2]['INICIO'], data_fim)+1
filtro.loc[filtro.index[0], 'dias_decendio'] =diferenca_dias(filtro.iloc[0]['INICIO'], filtro.iloc[0]['FINAL'])+1
filtro.loc[filtro.index[1], 'dias_decendio'] =diferenca_dias(filtro.iloc[1]['INICIO'], filtro.iloc[1]['FINAL'])+1
filtro.loc[filtro.index[2], 'dias_decendio'] =diferenca_dias(filtro.iloc[2]['INICIO'], filtro.iloc[2]['FINAL'])+1
filtro["diferenca"] = abs(filtro["dias_chuva"] - filtro["dias_decendio"])
df_sorted = filtro.sort_values(by="diferenca")
filtro = df_sorted.iloc[:2]
#print(df_sorted)
#linha_menor_diferenca = df_sorted.iloc[0]
#linha_menor_diferenca2 = df_sorted.iloc[1]
# Pegando apenas as duas primeiras linhas do df_sorted
# Exibindo resultado
#print(linha_menor_diferenca)
#print(linha_menor_diferenca2)
elif quantidade_veranicos <= 4:
#print(f"Tem mais de 4 registros ({quantidade_veranicos} linhas).")
filtro.loc[filtro.index[0], 'dias_chuva'] = diferenca_dias(data_inicio, filtro.iloc[0]['FINAL'])
filtro.loc[filtro.index[1], 'dias_chuva'] = diferenca_dias(filtro.iloc[1]['INICIO'], filtro.iloc[1]['FINAL'])
filtro.loc[filtro.index[2], 'dias_chuva'] = diferenca_dias(filtro.iloc[2]['INICIO'], filtro.iloc[2]['FINAL'])
filtro.loc[filtro.index[3], 'dias_chuva'] = diferenca_dias(filtro.iloc[3]['INICIO'], data_fim)+1
filtro.loc[filtro.index[0], 'dias_decendio'] =diferenca_dias(filtro.iloc[0]['INICIO'], filtro.iloc[0]['FINAL'])+1
filtro.loc[filtro.index[1], 'dias_decendio'] =diferenca_dias(filtro.iloc[1]['INICIO'], filtro.iloc[1]['FINAL'])+1
filtro.loc[filtro.index[2], 'dias_decendio'] =diferenca_dias(filtro.iloc[2]['INICIO'], filtro.iloc[2]['FINAL'])+1
filtro.loc[filtro.index[3], 'dias_decendio'] =diferenca_dias(filtro.iloc[3]['INICIO'], filtro.iloc[3]['FINAL'])+1
filtro["diferenca"] = abs(filtro["dias_chuva"] - filtro["dias_decendio"])
df_sorted = filtro.sort_values(by="diferenca")
filtro = df_sorted.iloc[:3]
#print(filtro)
dados = []
# print('->', selecao_estacao)
for index, row in filtro.iterrows():
dados.append({
'CODIGO': selecao_estacao['CODIGO'],
'DECENDIO': row['DECENDIO'],
'KEY': row['DECENDIO'] + str(pd.to_datetime(row['INICIO']).year),
'AMPLITUDE': row['amplitude'],
'ANO': pd.to_datetime(row['INICIO']).year,
})
#print('************************************************************')
resultados.append(pd.DataFrame(dados))
file= f"./{destino}/descendio_"+file
if os.path.exists(file): os.remove(file)
pd.concat(resultados, ignore_index=True).to_csv(file, sep=';', encoding='utf-8', index=False)
def transformar_decendio():
origem = '42veranicosclassificacao'
files = listar_csvs(origem)
for file in files:
print(f"Encontrada pasta: ./{origem}/{file}")
df = pd.read_csv("./42veranicosclassificacao/"+file, sep=";")
df_pivot = df.pivot(index="CODIGO", columns="DECENDIO", values="AMPLITUDE")
#df_pivot.reset_index(inplace=True)
#df_pivot.to_csv("./43descendiofinal/final"+file, sep=";", encoding="utf-8", index=False)
#Definir o ano hidrológico
def calcular_ano_hidrologico(data):
ano_atual = data.year
if data.month >= 9: # Setembro a Dezembro -> Ano atual/Ano seguinte
return f"{ano_atual}/{ano_atual + 1}"
else: # Janeiro a Abril -> Ano anterior/Ano atual
return f"{ano_atual - 1}/{ano_atual}"
import pandas as pd
def gerarmatrizembraco():
print("gerar csv 45 ano veranicos..")
origem = '42veranicosclassificacao'
destino= "47descendiofinal"
if not os.path.exists(destino):
os.makedirs(destino)
print(f"Pasta '{destino}' criada.")
files = listar_csvs(origem)
for file in files:
print(f"Encontrada pasta: ./{origem}/{file}")
estacao = pd.read_csv(f"./{origem}/{file}", delimiter=";", encoding="utf-8")
codigos = estacao['CODIGO'].unique()
for codigo in codigos:
print(codigo)
ano_hidrologico = pd.read_csv("./decendio.csv", sep=";")
# Converter INICIO para datetime
ano_hidrologico["INICIO"] = pd.to_datetime(ano_hidrologico["INICIO"], format="%d/%m/%Y")
# Criar coluna Decendio2 corretamente
ano_hidrologico["KEY"] = ano_hidrologico["DECENDIO"] + ano_hidrologico["INICIO"].dt.strftime("%Y")
# Aplicar a função corrigida
ano_hidrologico["ANO"] = ano_hidrologico["INICIO"].apply(calcular_ano_hidrologico)
ano_hidrologico["AMPLITUDE"] = 0
ano_hidrologico["ESTACAO"] = codigo
# Salvar CSV corrigido
file2= f"./{destino}/{codigo}.csv"
if os.path.exists(file2): os.remove(file2)
ano_hidrologico.to_csv(file2, sep=";", encoding="utf-8", index=False)
def gerarmatrizfinal():
print("preencehr com os dados veranicos..")
origem= '42veranicosclassificacao'
destino = '47descendiofinal'
if not os.path.exists(destino):
os.makedirs(destino)
print(f"Pasta '{destino}' criada.")
files = listar_csvs(origem)
for file in files:
print(f"Encontrada pasta: ./{origem}/{file}")
estacao = pd.read_csv(f"./{origem}/{file}", delimiter=";", encoding="utf-8")
codigos = estacao['CODIGO'].unique()
for codigo in codigos:
print(codigo)
filedestino= f"./{destino}/{codigo}.csv"
print(filedestino)
destino_df = pd.read_csv(filedestino, delimiter=";", encoding="utf-8")
origem = estacao[estacao['CODIGO'] == codigo]
for index, row in origem.iterrows():
key = row['KEY']
amplitude = row['AMPLITUDE']
mask = destino_df['KEY'] == key
if mask.any():
destino_df.loc[mask, 'AMPLITUDE'] = amplitude
# Salva o dataset de destino atualizado
destino_df.to_csv(filedestino, sep=";", index=False, encoding="utf-8")
def exportarcalc():
destino = "51planilha"
origem = '47descendiofinal'
if not os.path.exists(destino):
os.makedirs(destino)
print(f"Pasta '{destino}' criada.")
files = listar_csvs(origem)
for file in files:
print(f"Encontrada pasta: ./{origem}/{file}")
df = pd.read_csv(f"./{origem}/{file}", delimiter=";", encoding="utf-8")
ods = OpenDocumentSpreadsheet()
table = Table(name="Dados")
# Adicionar linha do ESTACAO
estacao = os.path.splitext(file)[0]
ano_hidrologico = f"ESTACAO: {estacao}"
row_ano = TableRow()
cell_ano = TableCell()
cell_ano.addElement(P(text=ano_hidrologico))
row_ano.addElement(cell_ano)
table.addElement(row_ano)
# Preparar os anos hidrológicos únicos
anos = df['ANO'].unique() # Pegar anos únicos
# Adicionar linha de cabeçalho com os anos
row_periodo = TableRow()
# Primeira célula vazia para alinhamento
cell_vazia = TableCell()
cell_vazia.addElement(P(text="DECÊNDIO"))
row_periodo.addElement(cell_vazia)
# Adicionar cada ano como cabeçalho
for ano in anos:
cell_ano = TableCell()
cell_ano.addElement(P(text=str(ano)))
row_periodo.addElement(cell_ano)
table.addElement(row_periodo)
# Agrupar dados por decêndio
decendios = df['DECENDIO'].unique()
# Adicionar os dados
for decendio in decendios:
table_row = TableRow()
# Primeira célula com o nome do decêndio
cell_decendio = TableCell()
cell_decendio.addElement(P(text=decendio))
table_row.addElement(cell_decendio)
# Adicionar valores para cada ano
for ano in anos:
# Filtrar o valor para o decêndio e ano específico
# Changed 'AMPLIUDE' to 'AMPLITUDE'
valor = df[(df['DECENDIO'] == decendio) & (df['ANO'] == ano)]['AMPLITUDE'].iloc[0] if not df[(df['DECENDIO'] == decendio) & (df['ANO'] == ano)].empty else 0
cell_valor = TableCell()
cell_valor.addElement(P(text=str(valor)))
table_row.addElement(cell_valor)
table.addElement(table_row)
# Adicionar a tabela ao documento
ods.spreadsheet.addElement(table)
file2 = f"./{destino}/{estacao}.ods"
if os.path.exists(file2):
os.remove(file2)
# Salvar o arquivo ODS
ods.save(file2)
def carregar_decendios(file):
print("Abrindo arquivo decendio.")
dados_df = pd.read_csv(file, sep=';', decimal=',', parse_dates=['INICIO', 'FINAL'], dayfirst=True)
start_date = '1980-01-01'
end_date = '2027-12-31'
filtered_df = dados_df[(dados_df['INICIO'] >= start_date) & (dados_df['FINAL'] <= end_date)]
dados_df=filtered_df
#dados_df.to_csv('-valores de origem .csv', sep=";", encoding='utf-8', decimal=',', index=False)
return dados_df
def carregar_estacao_com_veranico(file):
print("Abrindo estacao com veranico")
dados_df = pd.read_csv('./41veranicoschuvosa/'+file, sep=';', decimal=',', parse_dates=['INICIO', 'FINAL'])
codigos_unicos = dados_df['CODIGO'].unique() # Retorna uma lista de valores únicos
return dados_df, codigos_unicos