25 de junho, 2024
As empresas de manufatura em geral têm muitos dados relacionados ao funcionamento de suas máquinas, como id da máquina, linha de produção, tempo de operação, energia consumida, entre outros. Ao considerar realizar uma análise de dados durante um mês de operação, pode ser gerado um volume muito grande de dados, o que torna muito lento, ou até mesmo inviável o processamento desses dados de forma convencional.
Como uma alternativa ao processamento de dados convencional, pode ser utilizado o PySpark para o processamento de dados distribuído. Assim, um grande volume de dados pode ser processado em paralelo, em máquinas diferentes, e o resultado obtido da orquestração desse processamento.
Para o projeto em questão, foi criado um arquivo JSON, com 1 milhão de linhas, simulando os dados de operação das máquinas de uma empresa de manufatura durante um mês. O objetivo foi somar a energia consumida por máquina durante o mês. Essa informação pode ser utilizada para verificar se houve algum desvio no consumo de energia (considerando comparar com dados históricos), ou até mesmo propor reduções no consumo de energia, considerando analisar uma possível redução para as máquinas que consomem mais energia.
A seguir é apresentado o script python utilizado para geração dos dados:
# Imports
import random
import json
from datetime import datetime, timedelta
# Função para gerar uma data e hora aleatórias dentro de um intervalo
def random_datetime(start_datetime, end_datetime):
# Converte as datas e horas para objetos datetime
start_datetime = datetime.strptime(start_datetime, "%Y-%m-%d %H:%M:%S")
end_datetime = datetime.strptime(end_datetime, "%Y-%m-%d %H:%M:%S")
# Calcula a diferença entre as datas e horas em segundos
delta = end_datetime - start_datetime
delta_seconds = delta.total_seconds()
# Gera um número de segundos aleatório dentro do intervalo
random_seconds = random.uniform(0, delta_seconds)
# Retorna a data e hora aleatórias
return start_datetime + timedelta(seconds=random_seconds)
# Gerador de dados de exemplo
def gera_dados_exemplo(num_registros):
start_datetime = "2024-06-01 00:00:00"
end_datetime = "2024-06-30 23:59:59"
linha_prod = ["Linha_1", "Linha_2", "Linha_3", "Linha_4", "Linha_5", "Linha_6", "Linha_7"]
maq_id = ["M01", "M02", "M03", "M04", "M05", "M06", "M07", "M08", "M09", "M010", "M011", "M012"]
for _ in range(num_registros):
data_hora = str(random_datetime(start_datetime, end_datetime))
linha_producao = random.choice(linha_prod)
maquina_id = random.choice(maq_id)
tempo_operacao = random.randint(1, 8) # Tempo de operação em horas
energia_consumida = round(random.uniform(40.0, 90.0), 1) # Energia consumida em kWh
custo_operacional = round(random.uniform(300.75, 500.50), 2) # Custo operacional em reais
defeitos = random.randint(0, 3)
temperatura_maquina = round(random.uniform(20.0, 75.3), 1) # Temperatura da máquina em graus Celsius
yield {"data_hora": data_hora,\
"linha_producao": linha_producao,\
"maquina_id": maquina_id,\
"tempo_operacao": tempo_operacao,\
"energia_consumida": energia_consumida,\
"custo_operacional": custo_operacional,\
"defeitos": defeitos,\
"temperatura_maquina": temperatura_maquina}
# Cria 1000000 de registros
dados_exemplo = list(gera_dados_exemplo(1000000))
# Caminho do arquivo JSON
caminho_arquivo = 'dados_manufatura.json'
# Salvando os dados no arquivo JSON
with open(caminho_arquivo, 'w') as arquivo:
for item in dados_exemplo:
json.dump(item, arquivo)
arquivo.write('\n')
A seguir é apresentado o script de processamento dos dados:
# Imports
import time
import os
from pyspark.sql import SparkSession, types
from pyspark.sql.functions import col, regexp_replace
# Inicia a medição do tempo
start_time = time.time()
# Inicializa a SparkSession
spark = SparkSession.builder \
.appName("maior_energia_consumida") \
.getOrCreate()
# Define o schema dos dados
schema = types.StructType([
types.StructField("data_hora", types.StringType(), True),
types.StructField("linha_producao", types.StringType(), True),
types.StructField("maquina_id", types.StringType(), True),
types.StructField("tempo_operacao", types.IntegerType(), True),
types.StructField("energia_consumida", types.DecimalType(3, 1), True),
types.StructField("custo_operacional", types.DecimalType(5, 2), True),
types.StructField("defeitos", types.IntegerType(), True),
types.StructField("temperatura_maquina", types.DecimalType(3, 1), True)
])
# Carrega o arquivo JSON com o schema definido
df = spark.read.schema(schema).json("data/dados_manufatura.json")
# Cria outro DataFrame removendo duplicatas com base em todas as colunas
df_filtrado = df.dropDuplicates()
# Drop da colunas que não serão utilizadas (filtro no nível de coluna)
df = df_filtrado.drop("data_hora", "linha_producao", "tempo_operacao", "custo_operacional", "defeitos", "temperatura_maquina")
# Verifica o schema e os dados
df.printSchema()
df.show()
# Cria ou substitui uma visualização temporária com o nome 'DSATEMP'
df.createOrReplaceTempView("DSATEMP")
# Executa uma consulta SQL para somar a energia consumida agrupada por id_maquina
df_resultado = spark.sql("""
SELECT maquina_id, SUM(energia_consumida) AS total_energia_consumida
FROM DSATEMP
GROUP BY maquina_id
ORDER BY total_energia_consumida DESC
""")
# Exibe resultado
df_resultado.show()
# Salva o resultado em um arquivo CSV
output_path = "data/relatorio"
df_resultado.write.csv(output_path, header=True, mode="overwrite")
print("Resultado gravado na pasta relatorio.")
# Finaliza a medição do tempo
end_time = time.time()
# Calcula o tempo de execução
execution_time = round(end_time - start_time, 1)
# Imprime o tempo de execução
print(f"\nTempo de execução: {execution_time} segundos\n")
O script foi executado utilizado um cluster criado com o docker compose, com 2 workers, através do comando:
docker exec dsa-pyspark-master spark-submit --deploy-mode client ./apps/maior_energia_consumida.py
Obs.: Este cluster foi baseado no estudo do capítulo 02 e nas configurações de saída do capitulo 04 do curso "PySpark e Apache Kafka Para Processamento de Dados em Batch e Streaming" da Data Science Academy.
A seguir é apresentada a saída da execução:
No projeto em questão o resultado foi gravado em um arquivo .csv, podendo ser apresentado como um relatório simples das máquinas e seu consumo de energia durante o mês. Caso a empresa possua um data warehouse ou data lake, o resultado também poderá ser gravado nessas estruturas. Com o PySpark, esse processamento de 1 milhão de linhas foi realizado em segundos e com essa informação os tomadores de decisão podem ter insights valiosos.
Caso tenha alguma dúvida, sugestão ou apenas queira trocar uma ideia sobre este projeto, fico à disposição.
E-mail: gabrielapbarros15@gmail.com
LinkedIn: https://www.linkedin.com/in/gabriela-pereira-barros/