PySpark¶

June 25, 2024

logo-projeto.png

Sumário

  • 1. Business problem
  • 2. Solution proposal
  • 3. Data creation
  • 4. Processing Script
  • 5. Script execution
  • 6. Conclusion

1. Business problem ¶

Manufacturing companies in general have a lot of data related to the operation of their machines, such as machine ID, production line, operating time, and energy consumed, among others. When considering performing a data analysis during a month of operation, a very large data volume can be generated, which makes processing this data very slow or even impracticable in a conventional way.

2. Solution proposal ¶

As an alternative to conventional data processing, PySpark can be used for data distributed processing. Thus, a large data volume can be processed in parallel, on different machines, and the result, obtained from the orchestration of this processing.

For this project, a JSON file was created, with 1 million rows, simulating the machine's operating data from a manufacturing company for a month. The objective was to add up the energy consumed per machine during the month. This information can be used to check whether there has been any deviation in energy consumption (considering comparison with historical data), or even propose reductions in energy consumption, considering analyzing a possible reduction for machines that consume more energy.

3. Data creation ¶

Below is the python script used to generate the data:

# Imports
import random
import json
from datetime import datetime, timedelta

# Function to generate a random date and time within a range
def random_datetime(start_datetime, end_datetime):
    # Convert the dates and times to datetime objects
    start_datetime = datetime.strptime(start_datetime, "%Y-%m-%d %H:%M:%S")
    end_datetime = datetime.strptime(end_datetime, "%Y-%m-%d %H:%M:%S")

    # Calculate the difference between the dates and times in seconds
    delta = end_datetime - start_datetime
    delta_seconds = delta.total_seconds()

    # Generate a random number of seconds within the interval
    random_seconds = random.uniform(0, delta_seconds)

    # Return the random date and time
    return start_datetime + timedelta(seconds=random_seconds)


# Sample data generator
def generate_example_data(num_records):

    start_datetime = "2024-06-01 00:00:00"
    end_datetime = "2024-06-30 23:59:59"
    production_lines = ["Line_1", "Line_2", "Line_3", "Line_4", "Line_5", "Line_6", "Line_7"]
    machine_ids = ["M01", "M02", "M03", "M04", "M05", "M06", "M07", "M08", "M09", "M010", "M011", "M012"]

    for _ in range(num_records):
        datetime = str(random_datetime(start_datetime, end_datetime))
        production_line = random.choice(production_lines)
        machine_id = random.choice(machine_ids)
        operation_time = random.randint(1, 8) # Operation time in hours
        energy_consumed = round(random.uniform(40.0, 90.0), 1) # Energy consumed in kWh
        operational_cost = round(random.uniform(300.75, 500.50), 2) # Operational cost in reais
        defects = random.randint(0, 3)
        machine_temperature = round(random.uniform(20.0, 75.3), 1) # Machine temperature in degrees Celsius

        yield {"datetime": datetime,\
               "production_line": production_line,\
               "machine_id": machine_id,\
               "operation_time": operation_time,\
               "energy_consumed": energy_consumed,\
               "operational_cost": operational_cost,\
               "defects": defects,\
               "machine_temperature": machine_temperature}

# Create 1000000 records
example_data = list(generate_example_data(1000000))

# Path to the JSON file
file_path = 'manufacturing_data.json'

# Saving the data to the JSON file
with open(file_path, 'w') as file:
    for item in example_data:
        json.dump(item, file)
        file.write('\n')

4. Processing Script ¶

Below is the data processing script:

# Imports
import time
import os
from pyspark.sql import SparkSession, types
from pyspark.sql.functions import col, regexp_replace

# Start measuring time
start_time = time.time()

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("highest_energy_consumption") \
    .getOrCreate()

# Define the schema of the data
schema = types.StructType([
    types.StructField("datetime", types.StringType(), True),
    types.StructField("production_line", types.StringType(), True),
    types.StructField("machine_id", types.StringType(), True),
    types.StructField("operation_time", types.IntegerType(), True),
    types.StructField("energy_consumed", types.DecimalType(3, 1), True),
    types.StructField("operational_cost", types.DecimalType(5, 2), True),
    types.StructField("defects", types.IntegerType(), True),
    types.StructField("machine_temperature", types.DecimalType(3, 1), True)
])

# Load the JSON file with the defined schema
df = spark.read.schema(schema).json("data/manufacturing_data.json")

# Create another DataFrame removing duplicates based on all columns
df_filtered = df.dropDuplicates()

# Drop columns that will not be used (column-level filtering)
df = df_filtered.drop("datetime", "production_line", "operation_time", "operational_cost", "defects", "machine_temperature")

# Check the schema and the data
df.printSchema()
df.show()

# Create or replace a temporary view with the name 'DSATEMP'
df.createOrReplaceTempView("DSATEMP")

# Execute an SQL query to select energy consumed sum grup by machine_id
df_result = spark.sql("""
            SELECT machine_id, SUM(energy_consumed) AS total_energy_consumed
            FROM DSATEMP
            GROUP BY machine_id
            ORDER BY total_energy_consumed DESC
          """)

# Show result
df_result.show()

# Save the result to a CSV file
output_path = "data/report"
df_result.write.csv(output_path, header=True, mode="overwrite")

print("Result saved in the report folder.")

# End time measurement
end_time = time.time()

# Calculate execution time
execution_time = round(end_time - start_time, 1)

# Print execution time
print(f"\nExecution time: {execution_time} seconds\n")

5. Script execution ¶

The script was executed using a cluster created with Docker Compose, with 2 workers, using the command:

docker exec dsa-pyspark-master spark-submit --deploy-mode client ./apps/highest_energy_consumed.py

Note: This cluster was based on the study of chapter 02 and the output configurations of chapter 04 of the Data Science Academy course "PySpark and Apache Kafka for Batch and Streaming Data Processing".

Below is shown the execution output:

saida_execucao_english.png

6. Conclusion ¶

In this project, the result was recorded in a .csv file, which can be presented as a simple report of the machines and their energy consumption during the month. If the company has a data warehouse or a data lake, the result can also be recorded in these structures. With PySpark, this 1 million rows processing was realized in seconds, and with this information, decision-makers can obtain valuable insights.

Contact¶

If you have any questions, suggestions or just want to talk about this project, I am at your disposal.

E-mail: gabrielapbarros15@gmail.com

LinkedIn: https://www.linkedin.com/in/gabriela-pereira-barros/