PySpark¶

25 de junho, 2024

logo-projeto.png

Sumário

  • 1. Problema de negócio
  • 2. Proposta de solução
  • 3. Geração dos dados
  • 4. Script de processamento
  • 5. Execução do Script
  • 6. Conclusão

1. Problema de negócio ¶

As empresas de manufatura em geral têm muitos dados relacionados ao funcionamento de suas máquinas, como id da máquina, linha de produção, tempo de operação, energia consumida, entre outros. Ao considerar realizar uma análise de dados durante um mês de operação, pode ser gerado um volume muito grande de dados, o que torna muito lento, ou até mesmo inviável o processamento desses dados de forma convencional.

2. Proposta de solução ¶

Como uma alternativa ao processamento de dados convencional, pode ser utilizado o PySpark para o processamento de dados distribuído. Assim, um grande volume de dados pode ser processado em paralelo, em máquinas diferentes, e o resultado obtido da orquestração desse processamento.

Para o projeto em questão, foi criado um arquivo JSON, com 1 milhão de linhas, simulando os dados de operação das máquinas de uma empresa de manufatura durante um mês. O objetivo foi somar a energia consumida por máquina durante o mês. Essa informação pode ser utilizada para verificar se houve algum desvio no consumo de energia (considerando comparar com dados históricos), ou até mesmo propor reduções no consumo de energia, considerando analisar uma possível redução para as máquinas que consomem mais energia.

3. Geração dos dados ¶

A seguir é apresentado o script python utilizado para geração dos dados:

# Imports
import random
import json
from datetime import datetime, timedelta

# Função para gerar uma data e hora aleatórias dentro de um intervalo
def random_datetime(start_datetime, end_datetime):
    # Converte as datas e horas para objetos datetime
    start_datetime = datetime.strptime(start_datetime, "%Y-%m-%d %H:%M:%S")
    end_datetime = datetime.strptime(end_datetime, "%Y-%m-%d %H:%M:%S")

    # Calcula a diferença entre as datas e horas em segundos
    delta = end_datetime - start_datetime
    delta_seconds = delta.total_seconds()

    # Gera um número de segundos aleatório dentro do intervalo
    random_seconds = random.uniform(0, delta_seconds)

    # Retorna a data e hora aleatórias
    return start_datetime + timedelta(seconds=random_seconds)


# Gerador de dados de exemplo
def gera_dados_exemplo(num_registros):

    start_datetime = "2024-06-01 00:00:00"
    end_datetime = "2024-06-30 23:59:59"
    linha_prod = ["Linha_1", "Linha_2", "Linha_3", "Linha_4", "Linha_5", "Linha_6", "Linha_7"]
    maq_id = ["M01", "M02", "M03", "M04", "M05", "M06", "M07", "M08", "M09", "M010", "M011", "M012"]

    for _ in range(num_registros):
        data_hora = str(random_datetime(start_datetime, end_datetime))
        linha_producao = random.choice(linha_prod)
        maquina_id = random.choice(maq_id)
        tempo_operacao = random.randint(1, 8) # Tempo de operação em horas
        energia_consumida = round(random.uniform(40.0, 90.0), 1) # Energia consumida em kWh
        custo_operacional = round(random.uniform(300.75, 500.50), 2) # Custo operacional em reais
        defeitos = random.randint(0, 3)
        temperatura_maquina = round(random.uniform(20.0, 75.3), 1) # Temperatura da máquina em graus Celsius

        yield {"data_hora": data_hora,\
               "linha_producao": linha_producao,\
               "maquina_id": maquina_id,\
               "tempo_operacao": tempo_operacao,\
               "energia_consumida": energia_consumida,\
               "custo_operacional": custo_operacional,\
               "defeitos": defeitos,\
               "temperatura_maquina": temperatura_maquina}

# Cria 1000000 de registros
dados_exemplo = list(gera_dados_exemplo(1000000))

# Caminho do arquivo JSON
caminho_arquivo = 'dados_manufatura.json'

# Salvando os dados no arquivo JSON
with open(caminho_arquivo, 'w') as arquivo:
    for item in dados_exemplo:
        json.dump(item, arquivo)
        arquivo.write('\n')

4. Script de processamento ¶

A seguir é apresentado o script de processamento dos dados:

# Imports
import time
import os
from pyspark.sql import SparkSession, types
from pyspark.sql.functions import col, regexp_replace

# Inicia a medição do tempo
start_time = time.time()

# Inicializa a SparkSession
spark = SparkSession.builder \
    .appName("maior_energia_consumida") \
    .getOrCreate()

# Define o schema dos dados
schema = types.StructType([
    types.StructField("data_hora", types.StringType(), True),
    types.StructField("linha_producao", types.StringType(), True),
    types.StructField("maquina_id", types.StringType(), True),
    types.StructField("tempo_operacao", types.IntegerType(), True),
    types.StructField("energia_consumida", types.DecimalType(3, 1), True),
    types.StructField("custo_operacional", types.DecimalType(5, 2), True),
    types.StructField("defeitos", types.IntegerType(), True),
    types.StructField("temperatura_maquina", types.DecimalType(3, 1), True)
])

# Carrega o arquivo JSON com o schema definido
df = spark.read.schema(schema).json("data/dados_manufatura.json")

# Cria outro DataFrame removendo duplicatas com base em todas as colunas
df_filtrado = df.dropDuplicates()

# Drop da colunas que não serão utilizadas (filtro no nível de coluna)
df = df_filtrado.drop("data_hora", "linha_producao", "tempo_operacao", "custo_operacional", "defeitos", "temperatura_maquina")

# Verifica o schema e os dados
df.printSchema()
df.show()

# Cria ou substitui uma visualização temporária com o nome 'DSATEMP'
df.createOrReplaceTempView("DSATEMP")

# Executa uma consulta SQL para somar a energia consumida agrupada por id_maquina
df_resultado = spark.sql("""
            SELECT maquina_id, SUM(energia_consumida) AS total_energia_consumida
            FROM DSATEMP
            GROUP BY maquina_id
            ORDER BY total_energia_consumida DESC
          """)

# Exibe resultado
df_resultado.show()

# Salva o resultado em um arquivo CSV
output_path = "data/relatorio"
df_resultado.write.csv(output_path, header=True, mode="overwrite")

print("Resultado gravado na pasta relatorio.")

# Finaliza a medição do tempo
end_time = time.time()

# Calcula o tempo de execução
execution_time = round(end_time - start_time, 1)

# Imprime o tempo de execução
print(f"\nTempo de execução: {execution_time} segundos\n")

5. Execução do Script ¶

O script foi executado utilizado um cluster criado com o docker compose, com 2 workers, através do comando:

docker exec dsa-pyspark-master spark-submit --deploy-mode client ./apps/maior_energia_consumida.py

Obs.: Este cluster foi baseado no estudo do capítulo 02 e nas configurações de saída do capitulo 04 do curso "PySpark e Apache Kafka Para Processamento de Dados em Batch e Streaming" da Data Science Academy.

A seguir é apresentada a saída da execução:

saida_execucao.png

6. Conclusão ¶

No projeto em questão o resultado foi gravado em um arquivo .csv, podendo ser apresentado como um relatório simples das máquinas e seu consumo de energia durante o mês. Caso a empresa possua um data warehouse ou data lake, o resultado também poderá ser gravado nessas estruturas. Com o PySpark, esse processamento de 1 milhão de linhas foi realizado em segundos e com essa informação os tomadores de decisão podem ter insights valiosos.

Entre em contato¶

Caso tenha alguma dúvida, sugestão ou apenas queira trocar uma ideia sobre este projeto, fico à disposição.

E-mail: gabrielapbarros15@gmail.com

LinkedIn: https://www.linkedin.com/in/gabriela-pereira-barros/