Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved Start Simulation example #71

Merged
merged 7 commits into from
Jul 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 137 additions & 126 deletions examples/basic_examples/start_simulation_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,6 @@
L-shape tube wall using an async call.

"""

awp_roots = sorted(
[k for k in os.environ.keys() if k.startswith("AWP_ROOT")], reverse=True
)
last_rocky_version = awp_roots[0]
if "25.1" not in last_rocky_version:
# `non_blocking` simulation only available on Rocky 25R1 and onwards.
quit()

###############################################################################
# .. image:: /_static/Lshape_tube_result.png
# :width: 400pt
Expand All @@ -47,130 +38,150 @@
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Perform the required imports and create an empty project.

import os.path
import os
import tempfile
import time

import ansys.rocky.core as pyrocky
from ansys.rocky.core import examples

# Create a temp directory to save the project.
project_dir = tempfile.mkdtemp(prefix="pyrocky_")

# Launch Rocky and open a project.
rocky = pyrocky.launch_rocky()
project = rocky.api.CreateProject()
project.SaveProject(os.path.join(project_dir, "rocky-testing.rocky"))

###############################################################################
# Configure the study
# ~~~~~~~~~~~~~~~~~~~
study = project.GetStudy()

# Download the STL file that was imported into Rocky to represent a wall.
file_name = "Lshape_tube.stl"
file_path = examples.download_file(project_dir, file_name, "pyrocky/geometries")
wall = study.ImportWall(file_path)[0]

# Create a particle with the default shape (sphere) and size distribution (single
# distribution with a sieve size of 0.1 m).
particle = study.CreateParticle()

# Create a circular surface to used as the inlet.
circular_surface = study.CreateCircularSurface()
circular_surface.SetMaxRadius(0.8, unit="m")

# Create a rectangular surface to use as the outlet.
rectangular_surface = study.CreateRectangularSurface()
rectangular_surface.SetLength(3, unit="m")
rectangular_surface.SetWidth(3, unit="m")
rectangular_surface.SetCenter((5, -7.5, 0), unit="m")

# Set the inlet and outlet.
particle_inlet = study.CreateParticleInlet(circular_surface, particle)
input_property_list = particle_inlet.GetInputPropertiesList()
input_property_list[0].SetMassFlowRate(1000)
outlet = study.CreateOutlet(rectangular_surface)

# Set the motion rotation over the Y axis and apply it on the wall and the
# rectagular surface used as the outlet.
motion_frame_source = study.GetMotionFrameSource()
motion_frame = motion_frame_source.NewFrame()
motion_frame.AddRotationMotion(angular_velocity=((0.0, 0.5, 0.0), "rad/s"))
motion_frame.ApplyTo(rectangular_surface)
motion_frame.ApplyTo(wall)

# The domain settings define the domain limits where the particles are enabled to be
# computed in the simulation.
domain = study.GetDomainSettings()
domain.DisableUseBoundaryLimits()
domain.SetCoordinateLimitsMaxValues((10, 1, 10), unit="m")

###############################################################################
# Set up the solver and run the simulation
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

solver = study.GetSolver()
simulation_duration = 5
solver.SetSimulationDuration(simulation_duration, unit="s")
study.StartSimulation(
non_blocking=True
) # `non_blocking` only available on Rocky 25R1 and onwards.

###############################################################################
# Postprocess
# ~~~~~~~~~~~
# Obtain the in and out mass flows of the particles while the simulation is
# running.
particles = study.GetParticles()

import time
awp_roots = sorted(
[k for k in os.environ.keys() if k.startswith("AWP_ROOT")], reverse=True
)
last_rocky_version = awp_roots[0]
if "25.1" in last_rocky_version:
fingeraugusto marked this conversation as resolved.
Show resolved Hide resolved
# `non_blocking` simulation only available on Rocky 25R1 and onwards.

while study.IsSimulating():
# When running an asynchronous simulation, the call to RefreshResults is required to
# ensure that the results are updated.
study.RefreshResults()
# Create a temp directory to save the project.
project_dir = tempfile.mkdtemp(prefix="pyrocky_")

# Launch Rocky and open a project.
rocky = pyrocky.launch_rocky()
project = rocky.api.CreateProject()
project.SaveProject(os.path.join(project_dir, "rocky-testing.rocky"))

###############################################################################
# Configure the study
# ~~~~~~~~~~~~~~~~~~~
study = project.GetStudy()

# Download the STL file that was imported into Rocky to represent a wall.
file_name = "Lshape_tube.stl"
file_path = examples.download_file(project_dir, file_name, "pyrocky/geometries")
wall = study.ImportWall(file_path)[0]

# Create a particle with the default shape (sphere) and size distribution (single
# distribution with a sieve size of 0.1 m).
particle = study.CreateParticle()

# Create a circular surface to used as the inlet.
circular_surface = study.CreateCircularSurface()
circular_surface.SetMaxRadius(0.8, unit="m")

# Create a rectangular surface to use as the outlet.
rectangular_surface = study.CreateRectangularSurface()
rectangular_surface.SetLength(3, unit="m")
rectangular_surface.SetWidth(3, unit="m")
rectangular_surface.SetCenter((5, -7.5, 0), unit="m")

# Set the inlet and outlet.
particle_inlet = study.CreateParticleInlet(circular_surface, particle)
input_property_list = particle_inlet.GetInputPropertiesList()
input_property_list[0].SetMassFlowRate(1000)
outlet = study.CreateOutlet(rectangular_surface)

# Set the motion rotation over the Y axis and apply it on the wall and the
# rectangular surface used as the outlet.
motion_frame_source = study.GetMotionFrameSource()
motion_frame = motion_frame_source.NewFrame()
motion_frame.AddRotationMotion(angular_velocity=((0.0, 0.5, 0.0), "rad/s"))
motion_frame.ApplyTo(rectangular_surface)
motion_frame.ApplyTo(wall)

# The domain settings define the domain limits where the particles are enabled to be
# computed in the simulation.
domain = study.GetDomainSettings()
domain.DisableUseBoundaryLimits()
domain.SetCoordinateLimitsMaxValues((10, 1, 10), unit="m")

###############################################################################
# Set up the solver and run the simulation
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

solver = study.GetSolver()
simulation_duration = 5
solver.SetSimulationDuration(simulation_duration, unit="s")
# `non_blocking` only available on Rocky 25R1 and onwards.
study.StartSimulation(non_blocking=True)

###############################################################################
# Postprocess
# ~~~~~~~~~~~
# Obtain the in and out mass flows of the particles while the simulation is
# running.
particles = study.GetParticles()

while study.IsSimulating():
# When running an asynchronous simulation, the call to RefreshResults is required
# to ensure that the results are updated.
study.RefreshResults()

times, mass_flow_in = particles.GetNumpyCurve(
"Particles Mass Flow In", unit="t/h"
)
times, mass_flow_out = particles.GetNumpyCurve(
"Particles Mass Flow Out", unit="t/h"
)

print(f"Simulation Progress: {study.GetProgress():.2f} %")
print(f"\tCurrent mass_flow_in: {mass_flow_in[-1]:.2f} t/h")
print(f"\tCurrent mass_flow_out: {mass_flow_out[-1]:.2f} t/h")

time.sleep(2)

print("Simulation Complete!")

times, mass_flow_in = particles.GetNumpyCurve("Particles Mass Flow In", unit="t/h")
times, mass_flow_out = particles.GetNumpyCurve("Particles Mass Flow Out", unit="t/h")

print(f"Simulation Progress: {100 * times[-1] / simulation_duration:.2f} %")
print(f"\tCurrent mass_flow_in: {mass_flow_in[-1]:.2f} t/h")
print(f"\tCurrent mass_flow_out: {mass_flow_out[-1]:.2f} t/h")

time.sleep(2)

print("Simulation Complete!")

# Obtain the maximum and minimum velocities of the particles at each time step.
import numpy as np

simulation_times = study.GetTimeSet()
velocity_gf = particles.GetGridFunction("Velocity : Translational : Absolute")
velocity_max = np.array(
[velocity_gf.GetMax(unit="m/s", time_step=i) for i in range(len(simulation_times))]
)
velocity_min = np.array(
[velocity_gf.GetMin(unit="m/s", time_step=i) for i in range(len(simulation_times))]
)

#################################################################################
# Plot curves
# +++++++++++

import matplotlib.pyplot as plt

fig, (ax1, ax2) = plt.subplots(2, 1)

ax1.plot(times, mass_flow_in, "b", label="Mass Flow In")
ax1.plot(times, mass_flow_out, "r", label="Mass Flow Out")
ax1.set_xlabel("Time [s]")
ax1.set_ylabel("Mass Flow [t/h]")
ax1.legend(loc="upper left")

ax2.plot(simulation_times, velocity_max, "b", label="Max Velocity")
ax2.plot(simulation_times, velocity_min, "r", label="Min Velocity")
ax2.set_xlabel("Time [s]")
ax2.set_ylabel("Velocity [m/s]")
ax2.legend(loc="upper left")

plt.draw()
# Obtain the maximum and minimum velocities of the particles at each time step.
import numpy as np

simulation_times = study.GetTimeSet()
velocity_gf = particles.GetGridFunction("Velocity : Translational : Absolute")
velocity_max = np.array(
[
velocity_gf.GetMax(unit="m/s", time_step=i)
for i in range(len(simulation_times))
]
)
velocity_min = np.array(
[
velocity_gf.GetMin(unit="m/s", time_step=i)
for i in range(len(simulation_times))
]
)

#################################################################################
# Plot curves
# +++++++++++

import matplotlib.pyplot as plt

fig, (ax1, ax2) = plt.subplots(2, 1)

ax1.plot(times, mass_flow_in, "b", label="Mass Flow In")
ax1.plot(times, mass_flow_out, "r", label="Mass Flow Out")
ax1.set_xlabel("Time [s]")
ax1.set_ylabel("Mass Flow [t/h]")
ax1.legend(loc="upper left")

ax2.plot(simulation_times, velocity_max, "b", label="Max Velocity")
ax2.plot(simulation_times, velocity_min, "r", label="Min Velocity")
ax2.set_xlabel("Time [s]")
ax2.set_ylabel("Velocity [m/s]")
ax2.legend(loc="upper left")

plt.draw()

rocky.close()
Loading