Skip to content

Instantly share code, notes, and snippets.

@asilichenko
Created October 21, 2024 11:38
Show Gist options
  • Save asilichenko/aedd74c97c757d3ace205b045ea6c24c to your computer and use it in GitHub Desktop.
Save asilichenko/aedd74c97c757d3ace205b045ea6c24c to your computer and use it in GitHub Desktop.
Python Multiprocessing: Shared Value and Shared Array
import multiprocessing
from datetime import datetime
from multiprocessing import Process
from typing import List
import numpy as np
NUM_ITERATIONS: int = 1_000_000
KEY_SIZE: int = 20
KEY_MAX_VAL: int = 40
CPU_COUNT: int = multiprocessing.cpu_count()
def calc_score(key: np.ndarray) -> int:
return 0 + np.sum(key)
def generate_key() -> np.ndarray:
return np.random.randint(0, KEY_MAX_VAL, size=KEY_SIZE, dtype=np.uint8)
class SearchProcess(Process):
def __init__(self, best_score: multiprocessing.Value, best_key: multiprocessing.Array) -> None:
super().__init__(daemon=True)
self.best_score: multiprocessing.Value = best_score
self.best_key: multiprocessing.Array = best_key
def claim_best(self, expected: int, key: np.ndarray) -> None:
with self.best_score.get_lock(): # critical section
original: int = self.best_score.value
if expected > original:
self.best_score.value = expected
for i in range(key.shape[0]):
self.best_key[i] = key[i]
actual: int = self.best_score.value
now = datetime.now().strftime("%H:%M:%S.%f")
print(f'{now} [{multiprocessing.current_process().name}]: {original = } -> {expected = }, {actual = }')
def search(self) -> None:
for _ in range(NUM_ITERATIONS):
key: np.ndarray = generate_key()
score: int = calc_score(key)
self.claim_best(score, key)
def run(self) -> None:
print(f'> start process [{multiprocessing.current_process().name}]')
self.search()
print(f'> finish process [{multiprocessing.current_process().name}]')
def main() -> None:
# https://docs.python.org/3/library/array.html#module-array
# Type code C Type Python Type Minimum size in bytes
# 'b' signed char int 1
# 'B' unsigned char int 1
# 'u' Py_UNICODE Unicode character 2 (see note)
# 'h' signed short int 2
# 'H' unsigned short int 2
# 'i' signed int int 2
# 'I' unsigned int int 2
# 'l' signed long int 4
# 'L' unsigned long int 4
# 'f' float float 4
# 'd' double float 8
best_score: multiprocessing.Value = multiprocessing.Value('I', 0, lock=True)
best_key: multiprocessing.Array = multiprocessing.Array('B', 20)
processes: List[SearchProcess] = [SearchProcess(best_score, best_key) for _ in range(CPU_COUNT)]
[process.start() for process in processes]
# Wait for all processes to finish
[process.join() for process in processes]
# Final value after all processes have completed
print(f"\nFinal value: {best_score.value}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment