Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various issues #524

Merged
merged 14 commits into from
Jul 5, 2024
2 changes: 1 addition & 1 deletion .github/workflows/windows-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ jobs:
run: python3.exe -m pip install .[test]

- name: Run tests
run: python3 -m pytest -m "server or util or client or mainloop or partner"
run: python3 -m pytest -m "server or db or client or mainloop or partner"
3 changes: 1 addition & 2 deletions example/boolean.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
"""

import snap7
import snap7.util.setters

plc = snap7.client.Client()
plc.connect("192.168.200.24", 0, 3)
Expand All @@ -37,7 +36,7 @@


# play with these functions.
plc.read_area(area=Area.MK, dbnumber=0, start=20, size=2)
plc.read_area(area=Area.MK, db_number=0, start=20, size=2)

data = bytearray()
snap7.util.setters.set_int(data, 0, 127)
Expand Down
122 changes: 38 additions & 84 deletions example/example.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,21 @@
"""
This is an example of how to use the snap7 library to read and write data to a PLC.
It is used to manipulate a large DB object containing over 450 'rows' which represent valves
"""

import time

import snap7.util.db
from db_layouts import rc_if_db_1_layout
from db_layouts import tank_rc_if_db_layout

import snap7
from snap7 import util

print("""

THIS IS EXAMPLE CODE MEANTH TO BE READ.

It is used to manipulate a large DB object with over
450 'rows' which represent valves

You don't have a project and PLC like I have which I used
to create the test code with.
from snap7 import Client, Row, DB
from util.db import print_row

""")

client = snap7.client.Client()
client = Client()
client.connect("192.168.200.24", 0, 3)


def get_db1():
def get_db1() -> None:
"""
Here we read out DB1, all data we is put in the all_data
variable and is a bytearray with the raw plc data
Expand All @@ -34,47 +26,19 @@ def get_db1():
row_size = 130 # size of item
index = i * row_size
offset = index + row_size # end of row in db
util.print_row(all_data[index:offset])

print_row(all_data[index:offset])

def get_db_row(db, start, size):
"""
Here you see and example of readying out a part of a DB

Args:
db (int): The db to use
start (int): The index of where to start in db data
size (int): The size of the db data to read
"""
type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte]
data = client.db_read(db, start, type_, size)
# print_row(data[:60])
return data


def set_db_row(db, start, size, _bytearray):
"""
Here we replace a piece of data in a db block with new data

Args:
db (int): The db to use
start(int): The start within the db
size(int): The size of the data in bytes
_butearray (enumerable): The data to put in the db
"""
client.db_write(db, start, size, _bytearray)


def show_row(x):
def show_row(x: int) -> None:
"""
print data in DB of row/object X in
"""

row_size = 126

while True:
data = get_db_row(1, 4 + x * row_size, row_size)
row = snap7.util.db.DB_Row(data, rc_if_db_1_layout, layout_offset=4)
data = client.db_read(1, 4 + x * row_size, row_size)
row = Row(data, rc_if_db_1_layout, layout_offset=4)
print("name", row["RC_IF_NAME"])
print(row["RC_IF_NAME"])
break
Expand All @@ -83,24 +47,24 @@ def show_row(x):
# do some check action..


def get_row(x):
def get_row(x: int) -> Row:
row_size = 126
data = get_db_row(1, 4 + x * row_size, row_size)
row = snap7.util.db.DB_Row(data, rc_if_db_1_layout, layout_offset=4)
data = client.db_read(1, 4 + x * row_size, row_size)
row = Row(data, rc_if_db_1_layout, layout_offset=4)
return row


def set_row(x, row):
def set_row(x: int, row: Row) -> None:
"""
We use db 1, use offset 4, we replace row x. To find the correct
start_index we mulitpy by row_size by x and we put the
byte array representation of row in the PLC
"""
row_size = 126
set_db_row(1, 4 + x * row_size, row_size, row._bytearray)
client.db_write(1, 4 + x * row_size, row_size, row._bytearray)


def open_row(row):
def open_row(row: Row) -> None:
"""
open a valve
"""
Expand All @@ -115,13 +79,8 @@ def open_row(row):
row["CloseAut"] = 0
row["OpenAut"] = 1

# row['StartAut'] = True
# row['StopAut'] = False
# row['RstLi'] = True
# row['StringValue'] = 'test'


def close_row(row):
def close_row(row: Row) -> None:
"""
close a valve
"""
Expand All @@ -132,11 +91,7 @@ def close_row(row):
row["OpenAut"] = 0


# show_row(0)
# show_row(1)


def open_and_close():
def open_and_close() -> None:
for x in range(450):
row = get_row(x)
open_row(row)
Expand All @@ -150,18 +105,17 @@ def open_and_close():
set_row(x, row)


def set_part_db(start, size, _bytearray):
def set_part_db(start: int, size: int, _bytearray: bytearray) -> None:
data = _bytearray[start : start + size]
set_db_row(1, start, size, data)
client.db_write(1, start, size, data)


def write_data_db(dbnumber, all_data, size):
area = snap7.types.S7AreaDB
dbnumber = 1
client.write_area(area, dbnumber, 0, size, all_data)
# def write_data_db(dbnumber, all_data, size):
# area = snap7.types.S7AreaDB
# client.write_area(area, dbnumber, 0, size, all_data)


def open_and_close_db1():
def open_and_close_db1() -> None:
t = time.time()
db1 = make_item_db(1)
all_data = db1._bytearray
Expand All @@ -172,7 +126,7 @@ def open_and_close_db1():
# set_part_db(4+x*126, 126, all_data)

t = time.time()
write_data_db(1, all_data, 4 + 126 * 450)
client.write_area(1, all_data, 4 + 126 * 450)
print(f"opening all valves took: {time.time() - t}")

print("sleep...")
Expand All @@ -184,24 +138,24 @@ def open_and_close_db1():
print(time.time() - t)

t = time.time()
write_data_db(1, all_data, 4 + 126 * 450)
client.write_area(1, all_data, 4 + 126 * 450)
print(f"closing all valves took: {time.time() - t}")


def read_tank_db():
def read_tank_db() -> None:
db73 = make_tank_db()
print(len(db73))
for x, (name, row) in enumerate(db73):
print(row)


def make_item_db(db_number):
def make_item_db(db_number: int) -> DB:
t = time.time()
all_data = client.db_upload(db_number)
all_data = client.upload(db_number)

print(f"getting all data took: {time.time() - t}")

db1 = snap7.util.db.DB(
db1 = DB(
db_number, # the db we use
all_data, # bytearray from the plc
rc_if_db_1_layout, # layout specification
Expand All @@ -216,18 +170,18 @@ def make_item_db(db_number):
return db1


def make_tank_db():
tank_data = client.db_upload(73)
db73 = snap7.util.db.DB(73, tank_data, tank_rc_if_db_layout, 238, 2, id_field="RC_IF_NAME")
def make_tank_db() -> DB:
tank_data = client.upload(73)
db73 = DB(73, tank_data, tank_rc_if_db_layout, 238, 2, id_field="RC_IF_NAME")
return db73


def print_tag():
def print_tag() -> None:
db1 = make_item_db(1)
print(db1["5V315"])


def print_open():
def print_open() -> None:
db1 = make_item_db(1)
for x, (name, row) in enumerate(db1):
if row["BatchName"]:
Expand Down
4 changes: 2 additions & 2 deletions example/logo_7_8.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

import snap7
from snap7.logo import Logo

# for setup the Logo connection please follow this link
# https://snap7.sourceforge.net/logo.html
Expand All @@ -13,7 +13,7 @@

logger = logging.getLogger(__name__)

plc = snap7.logo.Logo()
plc = Logo()
plc.connect("192.168.0.41", 0x1000, 0x2000)

if plc.get_connected():
Expand Down
9 changes: 5 additions & 4 deletions example/read_multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

import ctypes

import snap7.util.getters
from snap7.common import check_error
from snap7 import Client
from error import check_error
from snap7.types import S7DataItem, Area, WordLen
from snap7.util import get_real, get_int

client = snap7.client.Client()
client = Client()
client.connect("10.100.5.2", 0, 2)

data_items = (S7DataItem * 3)()
Expand Down Expand Up @@ -53,7 +54,7 @@

result_values = []
# function to cast bytes to match data_types[] above
byte_to_value = [snap7.util.getters.get_real, snap7.util.getters.get_real, snap7.util.getters.get_int]
byte_to_value = [get_real, get_real, get_int]

# unpack and test the result of each read
for i in range(0, len(data_items)):
Expand Down
2 changes: 1 addition & 1 deletion example/write_multi.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ctypes
import snap7
from snap7.types import Area, S7DataItem, WordLen
from snap7.util import set_int, set_real, get_int, get_real, get_s5time
from util.db import set_int, set_real, get_int, get_real, get_s5time


client = snap7.client.Client()
Expand Down
15 changes: 7 additions & 8 deletions snap7/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@

from importlib.metadata import version, PackageNotFoundError

from . import client
from . import common
from . import error
from . import logo
from . import server
from . import types
from . import util
from .client import Client
from .server import Server
from .logo import Logo
from .partner import Partner
from .util.db import Row, DB
from .types import Area, Block, WordLen, SrvEvent, SrvArea

__all__ = ["client", "common", "error", "logo", "server", "types", "util"]
__all__ = ["Client", "Server", "Logo", "Partner", "Row", "DB", "Area", "Block", "WordLen", "SrvEvent", "SrvArea"]

try:
__version__ = version("python-snap7")
Expand Down
Loading
Loading