Creating the Services and Testing#

Having established the pipeline structure, identified its components, and delineated the inputs and outputs for each, we can now proceed with the implementation of these components. As outlined in Chapter 1.5 general guidelines for creating a component, the initial step involves defining the services. By “services” we refer to the functions that will be deployed on the servers. These services are defined independently of the servers to ensure that the core functionality is thoroughly tested before further development.

Data Component#

We begin with the data collection and cleaning component, hereafter referred to as the data component. The primary objective of this component is to clean raw data from a CSV file. As previously mentioned, this CSV file will be sourced from the web application. In defining the service, our focus will remain on the core functionality rather than on the specifics of how variables will be received by the server. The service function is designed to accept a CSV file, clean and structure the data, and return the processed results. A foundational code for this service is already available in the stock_price_prediction notebook. The code for the data cleaning and structuring has simply been turned into a sepatate function. All lists have also been flattened before being returned. This has been done to simplify the communication between the components later on. Below you can see an example of the service:

def clean_data(csv_file):
    data = pd.read_csv(csv_file)
    data['Date'] = pd.to_datetime(data['Date'])
    data['Previous_Close'] = data['Close'].shift(1)
    data = data.dropna()
    
    x = data[['Previous_Close']]
    y = data['Close']
    dates = data['Date']
    
    x_train, x_test, y_train, y_test, dates_train, dates_test = train_test_split(
        x, y, dates, test_size=0.2, shuffle=False
    )
    
    # Flatten the lists
    x_train_flat = [item for sublist in x_train.values for item in sublist]
    x_test_flat = [item for sublist in x_test.values for item in sublist]
    y_train_flat = y_train.tolist()
    y_test_flat = y_test.tolist()
    dates_train_str = dates_train.dt.strftime('%Y-%m-%d').tolist()
    dates_test_str = dates_test.dt.strftime('%Y-%m-%d').tolist()
    
    return x_train_flat, x_test_flat, y_train_flat, y_test_flat, dates_train_str, dates_test_str

This function processes a CSV file by converting the ‘Date’ column into the appropriate format. It creates a new feature, Previous_Close, based on the prior day’s closing value and removes any rows containing NaN values. The function then splits the data into training and testing sets using train_test_split(), and finally, flattens the resulting lists before returning them.

Testing the Service#

To verify the functionality of this service, it is recommended to write a simple test function that invokes the clean_data function. A basic test function is provided in the data_test.py file. This function specifies the CSV file to be used as input, calls clean_data, and prints the resulting values. The results are also saved to a JSON file for subsequent use when testing the next component, namely the training component. The CSV file used for testing is a shortened version of the one employed in the stock price prediction pipeline, which simplifies the testing process by reducing the volume of data. The test_data_service function is defined as follows:

def test_data_service():
    csv_file = 'MSFT.US.test.csv'
    returned_data = clean_data(csv_file)
    print(returned_data)

    #save the data to a json file
    with open('cleaned_data.json', 'w') as f:
        #add data with variable names to json file:
        json.dump({
            'x_train': list(returned_data[0]),
            'x_test': list(returned_data[1]),
            'y_train': list(returned_data[2]),
            'y_test': list(returned_data[3]),
            'dates_train': list(returned_data[4]),
            'dates_test': list(returned_data[5])
        }, f)
    return

Ensure that this function is invoked in the file, and then execute python data_test.py to observe the output generated by the clean_data function. Should the service fail to function as expected, you may utilize the built-in debugger in VS Code to diagnose and rectify the issue.

Training Component#

We will now apply a similar approach to the training component. The service for this component is intended to receive the cleaned data, train a linear regression model, and return the trained model. It is important to note that the x_train list was flattened in the data component to streamline later steps (particularly during the implementation of gRPC). However, this list must be converted back into a 2D array, which necessitates the following line of code:

x_train = np.array(x_train).reshape(-1, 1)

Apart from this adjustment, the existing code from the stock_price_prediction notebook can be utilized. Additionally, consideration must be given to how the trained model will be transmitted to the subsequent component. To facilitate this, the model will be serialized into a binary format. The train_model function is implemented as follows:

import numpy as np
from sklearn.linear_model import LinearRegression
import pickle

def train_model(x_train, y_train):
    model = LinearRegression()
    x_train = np.array(x_train).reshape(-1, 1)
    model.fit(x_train, y_train)
    model_binary = pickle.dumps(model)

    return model_binary

Testing the Service#

As with the previous component, it is imperative to test the service by writing a test function that invokes the train_model function. For this purpose, we will use the JSON file generated during the testing of the data component to extract the cleaned data. This time, the test function will also save the resulting model for use in testing the final component. Since the model is serialized into a binary format, it will be saved accordingly. The test function is implemented as follows:

def test_train_model():
    # Read the JSON file
    with open('cleaned_data.json', 'r') as f:
        data = json.load(f)

    # Extract x_train and y_train values
    x_train = data['x_train']
    y_train = data['y_train']

    # Call the train_model function
    model_binary = train_model(x_train, y_train)

    with open('model.pkl', 'wb') as f:
        f.write(model_binary)
    

    # Print the result
    print("Model trained and serialized successfully.")
    print(f"Serialized model size: {len(model_binary)} bytes")

Ensure that the JSON file created during the data cleaning process is moved to the directory containing the training component before executing the file with the testing function.

Model Testing Component#

The service for the model testing component must be capable of handling multiple datasets: x_train, y_train, dates_train, x_test, y_test, and dates_test, as well as the trained model. Based on the stock_price_prediction notebook, we have an outline for constructing this service. The service will generate predictions, calculate the RMSE (Root Mean Square Error), and produce a plot of the results. The key difference in this context is that the plot must be returned in a format suitable for transmission between components. We have opted to encode the plot in a binary format, and it will also be saved for easy visualization. The test_model function is implemented as follows:

import numpy as np
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime
import io

def test_model(model, x_test, y_test, dates_test):
    x_test = np.array(x_test).reshape(-1, 1)
    y_pred = model.predict(x_test)
    print(f"x_test in testing service: {x_test}")
    print(f"y_pred in testing service: {y_pred}")
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    print(f"RMSE: {rmse}")

    # Create a BytesIO object to save the plot in-memory
    plot_stream = io.BytesIO()

    # Plot the results
    plt.figure(figsize=(14, 7))
    plt.plot(dates_test, y_test, label='Actual')
    plt.plot(dates_test, y_pred, label='Predicted')

    # Format the date on the x-axis
    plt.gca().xaxis.set_major_locator(mdates.DayLocator(interval=120))  # Set major ticks every 120 days
    plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))

    plt.gcf().autofmt_xdate()  # Rotate date labels vertically

    plt.xlabel('Date')
    plt.ylabel('Close Price')
    plt.title('MSFT Stock Price Prediction')
    plt.legend()
    plt.grid(True)

    # Save the plot to the BytesIO object
    plt.savefig(plot_stream, format='png')
    plt.close()

    # Get the binary data from the BytesIO object
    plot_stream.seek(0)
    plot_binary = plot_stream.read()

    return rmse, plot_binary

This function ensures that x_test is reshaped into a 2D array before predictions are made using the model’s predict function. The RMSE is calculated using the predicted and actual y_test values, providing a metric of the model’s accuracy. A plot is then generated to visually compare the predicted values with the actual ones. This plot is saved and returned along with the RMSE value and the binary-encoded plot.

Testing the service#

To verify the functionality of this service, a test function should be written. This function will utilize the cleaned data saved in the JSON file and the model saved as a binary file during the training component. The test function will then call the test_model function. The implementation is as follows:

import pickle
import json
import numpy as np
from datetime import datetime
from test_service import test_model

def test_test_model():
    # Load the model from the pickle file
    with open('model.pkl', 'rb') as file:
        model = pickle.load(file)

    # Load the test data from the JSON file
    with open('cleaned_data.json', 'r') as file:
        test_data = json.load(file)

    # Extract and convert the test data
    x_test = np.array(test_data['x_test'])
    y_test = np.array(test_data['y_test'])
    dates_test = [datetime.strptime(date, '%Y-%m-%d') for date in test_data['dates_test']]

    # Call the test_model function
    test_model(model, x_test, y_test, dates_test)

    # Check the output (this can be more sophisticated with assertions)
    print("Test completed successfully.")

# Run the test function
if __name__ == "__main__":
    test_test_model()

Before running this file, ensure that the JSON and pickle files are moved to the directory containing the test file.

Conclusion#

We have now successfully developed and tested the three services necessary for our components. Having confirmed that these functions perform as intended, we can proceed to the next stage with confidence.