Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Python Parallel Processing with ThreadPoolExecutor Gives Wrong Results with Keras Model

I am using parallel processing using the concurrent.futures.ThreadPoolExecutor class to make multiple predictions using a Keras model for different sets of weights.

But the Keras model predictions using parallel processing are not correct.

This is a reproducible sample code that creates 10 sets of weights. Then, it calculates the model’s errors using and without parallel processing.

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

I set a random seed to NumPy to make sure that there is no randomness across the different runs.

import tensorflow.keras
import numpy
import concurrent.futures

numpy.random.seed(1)

def create_rand_weights(model, num_models):
    random_model_weights = []
    for model_idx in range(num_models):
        random_weights = []
        for layer_idx in range(len(model.weights)):
            layer_shape = model.weights[layer_idx].shape
            if len(layer_shape) > 1:
                layer_weights = numpy.random.rand(layer_shape[0], layer_shape[1])
            else:
                layer_weights = numpy.random.rand(layer_shape[0])
            random_weights.append(layer_weights)
        random_weights = numpy.array(random_weights, dtype=object)
        random_model_weights.append(random_weights)
    
    random_model_weights = numpy.array(random_model_weights)
    return random_model_weights

def model_error(model_weights):
    global data_inputs, data_outputs, model
    model.set_weights(model_weights)
    predictions = model.predict(data_inputs)
    mae = tensorflow.keras.losses.MeanAbsoluteError()
    abs_error = mae(data_outputs, predictions).numpy() + 0.00000001
    return abs_error

input_layer  = tensorflow.keras.layers.Input(3)
dense_layer1 = tensorflow.keras.layers.Dense(5, activation="relu")(input_layer)
output_layer = tensorflow.keras.layers.Dense(1, activation="linear")(dense_layer1)
model = tensorflow.keras.Model(inputs=input_layer, outputs=output_layer)

data_inputs = numpy.array([[0.02, 0.1, 0.15],
                           [0.7, 0.6, 0.8],
                           [1.5, 1.2, 1.7],
                           [3.2, 2.9, 3.1]])    
data_outputs = numpy.array([[0.1],
                            [0.6],
                            [1.3],
                            [2.5]])

num_models = 10
random_model_weights = create_rand_weights(model, num_models)

ExecutorClass = concurrent.futures.ThreadPoolExecutor
thread_output = []
with ExecutorClass(max_workers=2) as executor:
    output = executor.map(model_error, random_model_weights)
for out in output:
    thread_output.append(out)
thread_output=numpy.array(thread_output)
print("Wrong Outputs using Threads")
print(thread_output)

print("\n\n")

correct_output = []
for idx in range(num_models):
    error = model_error(random_model_weights[idx])
    correct_output.append(error)
correct_output=numpy.array(correct_output)
print("Correct Outputs without Threads")
print(correct_output)

This is the correct model outputs without using parallel processing:

[6.78012372 3.42922212 4.96738673 6.64474774 6.83102609 4.41165734 3.34482099 7.6132908  7.97145654 6.98378612]

This is the wrong model outputs without using parallel processing:

[3.42922212 3.42922212 6.90911246 6.64474774 4.41165734 3.34482099 7.6132908  7.97145654 6.98378612 6.98378612]

Even that I set a random seed for NumPy, the outputs using parallel processing still vary for different runs.

>Solution :

It appears that the model is not thread safe. You can modify your code as follows (just this function) which will have the effect of cloning the model:

def model_error(model_weights):
    global data_inputs, data_outputs, model
    _model = tensorflow.keras.models.clone_model(model)
    _model.set_weights(model_weights)
    predictions = _model.predict(data_inputs)
    mae = tensorflow.keras.losses.MeanAbsoluteError()
    abs_error = mae(data_outputs, predictions).numpy() + 0.00000001
    return abs_error
Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading