-
Notifications
You must be signed in to change notification settings - Fork 0
/
dbService.py
152 lines (129 loc) · 5.38 KB
/
dbService.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# This script:
# 1) Creates the DB "datastore" and a table "dataT" (with default dataset) upon startup, if they don't exist.
# 2) Provides a set of insert/edit/read/delete methods for dataT table.
import sqlite3
import json
dbName = "datastore"
# Creates the database dnBame (if doesn't exist), table dataT, and inserting initial data.
def initTableDataT():
# tabe "dataT" has the follwing columns:
# rowid(int) - Primary key. Automatically inserted. Not editable or insertable by user.
# timestamp(text) DEFAULT (datetime('now','localtime')) - Automatically inserted. Not editable or insertable by user.
# owner(text) - Can be used to identify the sender of a record. Various sensors can have their unique Owner id's.
# priority(int) - Can be used to classify the type of message (temperature, speed, etc) or to determine the priority.
# message(text) - The data itself
#
createDataT = "CREATE TABLE IF NOT EXISTS dataT (timestamp text DEFAULT CURRENT_TIMESTAMP, owner text, priority int, message text)"
# Dafault data to be loaded if the table dataT doesn't exist. Small dataset for testing.
#
defData = [["ana", 1, "note no.1"], ["bob", 1, "note #2"], ["carl", 2, "note #3"], ["dave", 1, "note #4"]]
# If dataT doens't exist it will be created and small dataset will be iserted.
#
con = sqlite3.connect(dbName)
cur = con.cursor()
cur.execute(createDataT) # Create table dataT if it doesn't exist.
con.commit()
res = cur.execute("select count(*) from dataT").fetchone() # Is the table empty?
if res[0] == 0: # If the table is empty, insert small dataset.
for defRow in defData:
cur.execute("insert into dataT (owner, priority, message) values (?,?,?)", defRow)
con.commit()
# Check how many rows are in the table
res = cur.execute("select count(*) from dataT").fetchone()
con.close()
print ("Number of rows in dataT table:", res[0])
# Fetching all rows from the table (limited amount of rows as parameter, absolute limit=100)
def getAllDataT(lim):
if lim > 100:
print("getAllDataT absolute limit 100 rows")
lim = 100
con = sqlite3.connect(dbName)
cur = con.cursor()
limit = "" if lim == 0 else " limit " + str(lim)
cur.execute("select rowid, * from dataT order by rowid desc " + limit)
d = []
while True:
row = cur.fetchone()
if row == None:
break
d.append(row)
# print("json.dumps(getAllDataT)", json.dumps(d)) # For debugging/logging purposes.
con.close()
return d
# Fetching all records from the table by owner. Amount of rows as lim(it) parameter (absolute max limit=100).
def getAllByOwner(owner, lim):
if lim > 100:
print("getAllDataT absolute limit 100 rows")
lim = 100
con = sqlite3.connect(dbName)
cur = con.cursor()
limit = "" if lim == 0 else " limit " + str(lim)
cur.execute("select rowid, * from dataT where owner=?" + limit + " order by rowid desc", (owner,))
d = []
while True:
row = cur.fetchone()
if row == None:
break
d.append(row)
con.close()
# print("json.dumps(getAllByOwner)", json.dumps(d)) # For debugging/logging purposes.
return d
# Inserting new record. Timestamp and rowid (PK) are insterted automatically.
def putNewTask(owner, priority, message):
print ("db put new taks")
# TODO: validation parameters
con = sqlite3.connect(dbName)
cur = con.cursor()
cur.execute("insert into dataT (owner, priority, message) values (?,?,?)", (owner, int(priority), message))
con.commit()
con.close()
return {'status': 'ok'}
# Updating a record, identified by rowid (primary key).
def updateDataT(rowid, owner, priority, message):
con = sqlite3.connect(dbName)
cur = con.cursor()
# TODO: Validation if rowid exists, and return a propriate errorcode.
cur.execute("UPDATE dataT SET owner = ?, priority = ?, message = ? WHERE rowid=?", (owner, int(priority), message, rowid))
con.commit()
con.close()
return {'status': 'ok'}
# Deleting a record, identified by rowid (primary key)
def deleteTask(rowid):
con = sqlite3.connect(dbName)
cur = con.cursor()
# TODO: Validation if rowid exists, and return a propriate errorcode.
cur.execute("delete from dataT where rowid=?", (rowid,))
con.commit()
con.close()
return {'status': 'ok'}
# Fetching a record based on a rowid (PK).
def getById(rowid):
con = sqlite3.connect(dbName)
cur = con.cursor()
# TODO: Validation if rowid exists, and return a propriate errorcode.
cur.execute("select rowid, * from dataT where rowid=?", (rowid,))
d = [cur.fetchone()]
con.close()
return d
# Internal method for transforming the data into a list (dictionary).
def rowsToListDict(rows):
retList = []
for row in rows:
dict = {
'rowId': row[0],
'timestamp': row[1],
'owner': row[2],
'priority': row[3],
'message': row[4]
}
retList.append(dict)
# print("retList", retList) # For debugging/logging purposes.
return retList
# Following calls will be executed on startup in order to create DB and table if they don't exist,
# and to display the count of data in the dataT table.
#
# Create table if not already created
initTableDataT()
#
# Read from table, for testing purposes.
getAllDataT(100)