Skip to content

Commit

Permalink
Merge branch 'dev_slurm_sched'
Browse files Browse the repository at this point in the history
  • Loading branch information
BerengerBerthoul committed Aug 29, 2024
2 parents e1b21b9 + b2e3ff3 commit 7fbafe1
Show file tree
Hide file tree
Showing 47 changed files with 1,161 additions and 258 deletions.
8 changes: 8 additions & 0 deletions .slurm_draft/draft_0_subprocess_auto_join/master.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import subprocess

print('master begin')
p = subprocess.Popen(['python -u worker.py > out.txt 2> err.txt'], shell=True)
# worker.py is launched asynchronously
print('master end')
# master.py will finish quickly (not waiting worker.py to finish)
# However, worker.py will still continue until it is done
5 changes: 5 additions & 0 deletions .slurm_draft/draft_0_subprocess_auto_join/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import time

for i in range(0,500):
print(f'worker {i}')
time.sleep(1)
36 changes: 36 additions & 0 deletions .slurm_draft/draft_1_sockets/master.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import socket
#from socket_utils import send
import socket_utils
import subprocess

LOCALHOST = '127.0.0.1'

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# setup master's socket
s.bind((LOCALHOST, 0)) # 0: let the OS choose an available port
s.listen()
port = s.getsockname()[1]

n = 4

# launch workers
workers = []
for i in range(0,n):
p = subprocess.Popen([f'python -u worker.py {port} {i} > out.txt 2> err.txt'], shell=True)
workers.append(p)

# recv worker messages
remaining_workers = n
while n>0:
conn, addr = s.accept()
with conn:
print(f"Connected by {addr}")
msg = socket_utils.recv(conn)
print(msg)
n -= 1

# wait for workers to finish and collect error codes
returncodes = []
for p in workers:
returncode = p.wait()
returncodes.append(returncode)
31 changes: 31 additions & 0 deletions .slurm_draft/draft_1_sockets/socket_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
def send(sock, msg):
msg_bytes = msg.encode('utf-8')

msg_len = len(msg_bytes)
sent = sock.send(msg_len.to_bytes(8,'big')) # send int64 big endian
if sent == 0:
raise RuntimeError('Socket send broken: could not send message size')

totalsent = 0
while totalsent < msg_len:
sent = sock.send(msg_bytes[totalsent:])
if sent == 0:
raise RuntimeError('Socket send broken: could not send message')
totalsent = totalsent + sent

def recv(sock):
msg_len_bytes = sock.recv(8)
if msg_len_bytes == b'':
raise RuntimeError('Socket recv broken: no message size')
msg_len = int.from_bytes(msg_len_bytes, 'big')

chunks = []
bytes_recv = 0
while bytes_recv < msg_len:
chunk = sock.recv(min(msg_len-bytes_recv, 4096))
if chunk == b'':
raise RuntimeError('Socket recv broken: could not receive message')
chunks.append(chunk)
bytes_recv += len(chunk)
msg_bytes = b''.join(chunks)
return msg_bytes.decode('utf-8')
14 changes: 14 additions & 0 deletions .slurm_draft/draft_1_sockets/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import socket
import socket_utils
#import time
import sys

LOCALHOST = '127.0.0.1'

assert len(sys.argv) == 3
server_port = int(sys.argv[1])
test_idx = int(sys.argv[2])

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((LOCALHOST, server_port))
socket_utils.send(s, f'Hello from {test_idx}')
17 changes: 17 additions & 0 deletions .slurm_draft/draft_2_srun/master_1p.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
echo "launch proc 0"
srun --exclusive --ntasks=1 --qos c1_inter_giga -l bash worker.sh 0 &
echo "detach proc 0"

echo "launch proc 1"
srun --exclusive --ntasks=1 --qos c1_inter_giga -l bash worker.sh 1 &
echo "detach proc 1"

echo "launch proc 2"
srun --exclusive --ntasks=1 --qos c1_inter_giga -l bash worker.sh 2 &
echo "detach proc 2"

echo "launch proc 3"
srun --exclusive --ntasks=1 --qos c1_inter_giga -l bash worker.sh 3 &
echo "detach proc 3"

wait
18 changes: 18 additions & 0 deletions .slurm_draft/draft_2_srun/master_1p_no_exclusive.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

echo "launch proc 0"
srun --ntasks=1 --qos c1_inter_giga -l bash worker.sh 0 &
echo "detach proc 0"

echo "launch proc 1"
srun --ntasks=1 --qos c1_inter_giga -l bash worker.sh 1 &
echo "detach proc 1"

echo "launch proc 2"
srun --ntasks=1 --qos c1_inter_giga -l bash worker.sh 2 &
echo "detach proc 2"

echo "launch proc 3"
srun --ntasks=1 --qos c1_inter_giga -l bash worker.sh 3 &
echo "detach proc 3"

wait
17 changes: 17 additions & 0 deletions .slurm_draft/draft_2_srun/master_multi_node.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
echo "launch proc 0"
srun --exclusive --ntasks=43 --qos c1_inter_giga -l bash worker.sh 0 &
echo "detach proc 0"

echo "launch proc 1"
srun --exclusive --ntasks=4 --qos c1_inter_giga -l bash worker.sh 1 &
echo "detach proc 1"

echo "launch proc 2"
srun --exclusive --ntasks=45 --qos c1_inter_giga -l bash worker.sh 2 &
echo "detach proc 2"

echo "launch proc 3"
srun --exclusive --ntasks=4 --qos c1_inter_giga -l bash worker.sh 3 &
echo "detach proc 3"

wait
1 change: 1 addition & 0 deletions .slurm_draft/draft_2_srun/master_simple.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
srun --ntasks=1 -l bash worker.sh 0
18 changes: 18 additions & 0 deletions .slurm_draft/draft_2_srun/slurm_job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

#SBATCH --job-name=pytest_par
#SBATCH --time 00:30:00
#SBATCH --qos=co_short_std
#SBATCH --ntasks=88
#SBATCH --nodes=2-2
#SBATCH --output=slurm.%j.out
#SBATCH --error=slurm.%j.err

#date
#source /scratchm/sonics/dist/2023-11/source.sh --env sonics_dev --compiler gcc@12 --mpi intel-oneapi

date
#./master_1p.sh
#./master_1p_no_exclusive.sh
./master_multi_node.sh
date
16 changes: 16 additions & 0 deletions .slurm_draft/draft_2_srun/slurm_mpi_job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

#SBATCH --job-name=test_slurm_pytest
#SBATCH --ntasks=48
#SBATCH --time 0-0:10
#SBATCH --qos=c1_test_giga
#SBATCH --output=slurm.%j.out
#SBATCH --error=slurm.%j.err

#date
#source /scratchm/sonics/dist/2023-11/source.sh --env sonics_dev --compiler gcc@12 --mpi intel-oneapi

date
#python3 master.py
./master.sh
date
18 changes: 18 additions & 0 deletions .slurm_draft/draft_2_srun/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import socket
import socket_utils
import time
import sys
import datetime

LOCALHOST = '127.0.0.1'

assert len(sys.argv) == 3
server_port = int(sys.argv[1])
test_idx = int(sys.argv[2])

print(f'start proc {test_idx} - ',datetime.datetime.now())

#with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# time.sleep(10)
# s.connect((LOCALHOST, server_port))
# socket_utils.send(s, f'Hello from {test_idx}')
7 changes: 7 additions & 0 deletions .slurm_draft/draft_2_srun/worker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

printf "start proc $1 "
hostname
date
sleep 10
printf "end proc $1 "
date
14 changes: 14 additions & 0 deletions .slurm_draft/draft_3_sockets_ip/job_worker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash
#MSUB -r deploy_test
#MSUB -o sonics.out
#MSUB -e sonics.err
#MSUB -n 1
#MSUB -T 1600
#MSUB -A
####MSUB -x
#MSUB -q milan
#MSUB -Q test
#MSUB -m scratch,work

hostname -I
python worker.py
4 changes: 4 additions & 0 deletions .slurm_draft/draft_3_sockets_ip/machine_conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

ips = ['10.30.14.3', '10.136.0.3', '172.28.5.3', '10.137.0.3', '10.137.128.3']
ip = ips[1]
port = 10000
18 changes: 18 additions & 0 deletions .slurm_draft/draft_3_sockets_ip/master.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import socket
#from socket_utils import send
import socket_utils
import subprocess
from machine_conf import ip, port

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# setup master's socket
s.bind((ip, port)) # port=0: let the OS choose an available port
s.listen()
port = s.getsockname()[1]

print('waiting for socket connection')
conn, addr = s.accept()
with conn:
print(f"Connected by {addr}")
msg = socket_utils.recv(conn)
print(msg)
31 changes: 31 additions & 0 deletions .slurm_draft/draft_3_sockets_ip/socket_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
def send(sock, msg):
msg_bytes = msg.encode('utf-8')

msg_len = len(msg_bytes)
sent = sock.send(msg_len.to_bytes(4,'big')) # send int64 big endian
if sent == 0:
raise RuntimeError('Socket send broken: could not send message size')

totalsent = 0
while totalsent < msg_len:
sent = sock.send(msg_bytes[totalsent:])
if sent == 0:
raise RuntimeError('Socket send broken: could not send message')
totalsent = totalsent + sent

def recv(sock):
msg_len_bytes = sock.recv(4)
if msg_len_bytes == b'':
raise RuntimeError('Socket recv broken: no message size')
msg_len = int.from_bytes(msg_len_bytes, 'big')

chunks = []
bytes_recv = 0
while bytes_recv < msg_len:
chunk = sock.recv(min(msg_len-bytes_recv, 4096))
if chunk == b'':
raise RuntimeError('Socket recv broken: could not receive message')
chunks.append(chunk)
bytes_recv += len(chunk)
msg_bytes = b''.join(chunks)
return msg_bytes.decode('utf-8')
10 changes: 10 additions & 0 deletions .slurm_draft/draft_3_sockets_ip/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import socket
import socket_utils
import time

from machine_conf import ip, port

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((ip, port))
socket_utils.send(s, f'Hello from {socket.gethostname()}')
time.sleep(2)
16 changes: 16 additions & 0 deletions .slurm_draft/hello_mpi.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#include "mpi.h"
#include <iostream>

int main(int argc, char *argv[]) {
MPI_Init(&argc, &argv);

int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
std::cout << "mpi proc = " << rank << "/" << world_size << "\n";

MPI_Finalize();

return 0;
}
41 changes: 41 additions & 0 deletions .slurm_draft/master.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import socket
#from socket_utils import send
import socket_utils
import subprocess
import datetime

LOCALHOST = '127.0.0.1'

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# setup master's socket
s.bind((LOCALHOST, 0)) # 0: let the OS choose an available port
s.listen()
port = s.getsockname()[1]

n = 4

# launch workers
workers = []
for i in range(0,n):
#p = subprocess.Popen([f'python3 -u worker.py {port} {i} > out.txt 2> err.txt'], shell=True)
print('starting subprocess - ',datetime.datetime.now())
p = subprocess.Popen([f'srun --exclusive --ntasks=1 --qos c1_inter_giga -l python3 -u worker.py {port} {i} > out_{i}.txt 2> err_{i}.txt'], shell=True) # --exclusive for SLURM to parallelize with srun (https://stackoverflow.com/a/66805905/1583122)
print('detached subprocess - ',datetime.datetime.now())
workers.append(p)

# recv worker messages
remaining_workers = n
while n>0:
print(f'remaining_workers={n} - ',datetime.datetime.now())
conn, addr = s.accept()
with conn:
msg = socket_utils.recv(conn)
print(msg)
n -= 1

# wait for workers to finish and collect error codes
returncodes = []
for p in workers:
print('wait to finish - ',datetime.datetime.now())
returncode = p.wait()
returncodes.append(returncode)
14 changes: 14 additions & 0 deletions .slurm_draft/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash

#SBATCH --job-name=pytest_par
#SBATCH --time 00:30:00
#SBATCH --qos=co_short_std
#SBATCH --ntasks=1
##SBATCH --nodes=2-2
#SBATCH --output=slurm.%j.out
#SBATCH --error=slurm.%j.err

#echo $TOTO
whoami
#srun --exclusive --ntasks=1 -l hostname
nproc --all
16 changes: 16 additions & 0 deletions .slurm_draft/slurm_impi.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

#SBATCH --job-name=test_slurm_pytest
#SBATCH --ntasks=48
#SBATCH --time 0-0:10
#SBATCH --qos=c1_test_giga
#SBATCH --output=slurm.%j.out
#SBATCH --error=slurm.%j.err

#date
#source /scratchm/sonics/dist/2023-11/source.sh --env sonics_dev --compiler gcc@12 --mpi intel-oneapi

date
#python3 master.py
./master.sh
date
Loading

0 comments on commit 7fbafe1

Please sign in to comment.