diff --git a/.gitignore b/.gitignore index b9135cd..adcb68e 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,5 @@ -project.pico-go \ No newline at end of file +project.pico-go +.vscode/ +.picowgo +blink.py +.DS_STORE \ No newline at end of file diff --git a/README.md b/README.md index 8d80468..23f3332 100644 --- a/README.md +++ b/README.md @@ -1,24 +1,211 @@ + # at_pico_w -Communicate with the atPlatform using a Raspberry Pi Pico W. -Developed with [MicroPython](https://micropython.org/). +For UMass 2022 IoT Projects. + +# Table of Contents + +- [Prerequisites](#prerequisites) +- [Instructions](#instructions) + * [Forking the Project](#forking-the-project) + * [Setting up Micropython on your Pico W](#setting-up-micropython-on-your-pico-w) + * [Getting Started - Blinking the LED](#getting-started---blinking-the-led) + * [Connecting to the atPlatform](#connecting-to-the-atplatform) + * [Libraries](#libraries) + +# Prerequisites + +- [VSCode](https://code.visualstudio.com/Download) with the [Pico-W-Go extension](https://marketplace.visualstudio.com/items?itemName=paulober.pico-w-go) +- [Git](https://git-scm.com/) and your own [GitHub](https://github.com) account +- A [Raspberry Pi Pico W](https://www.canakit.com/raspberry-pi-pico-w.html) and [a micro-USB to USB-A cable](https://m.media-amazon.com/images/I/61kT7kpt2hL._AC_SY450_.jpg) (to connect your Pico to your Computer) +- [FileZilla](https://filezilla-project.org/) or any other FTP software. +- Two [atSigns](https://my.atsign.com/go) and their [.atKeys files](https://www.youtube.com/watch?v=2Uy-sLQdQcA&ab_channel=Atsign) + +# Instructions + +## 0. Introduction + +Hi UMass students! I've wrote some code for you to get your Pico Ws setup with the atPlatform. Big shoutout to @realvarx on GitHub for developing AES CTR and RSA-2048 private key signing on the Pico W. + +Let me know if you have any questions on Discord (Jeremy#7970) or by email (jeremy.tubongbanua@atsign.com) or just on our [discord](https://discord.atsign.com). + +Be sure to get the first 4 prerequisites under [Prerequisites](#prerequisites) before you start. The last 2 prerequisites (FTP software and 2 atSigns) can be done later. + +## 1. Getting the right Micropython Firmware for your Pico W + +1. Go to [atsign-foundation/micropython/releases](https://github.com/atsign-foundation/micropython/releases) and download the `.uf2` file. This `.uf2` file is specially built firmware to allow a certain encryption that Atsign uses (AES-256 CTR mode) to work on the Pico W. Big shoutout to @realvarx for developing all the Atsign encryption for the Pico W. + +2. Unplug your Pico W from your computer. Hold down the BOOTSEL button on the Pico W and then plug back the Pico W into your computer (all while holding down the BOOTSEL button). Once the Pico W is plugged in, you can let go of the BOOTSEL button. Your Pico W should now be in bootloader mode. + +You should see the Pico on your computer as a USB drive (see image below). + + + +3. Drag and drop the `firmware.uf2` file into the Pico W. This will flash the Pico W with the new firmware. Now the Pico W should automatically restart so no need to unplug/replug it. + +## 2. Getting Started - Blinking the LED + +Now let's create our first project. + +1. Open VSCode and open an empty folder somewhere on your computer. This is where your project will be. + +2. Get the [Pico-W-Go extension](https://marketplace.visualstudio.com/items?itemName=paulober.pico-w-go) + +3. Create a file named `blink.py` and write the following code: + +```py +# blink.py + +import machine +import time + +led = machine.Pin("LED", machine.Pin.OUT) # "LED" is the on board LED + +# blink ten times +for i in range(10): + print('Blinking... %s' %str(i+1)) + led.toggle() + time.sleep(0.5) +``` + +Note: You could also name it `main.py`. However, `main.py` is the default file that the Pico W will run when it starts up. But with the [Pico-W-Go extension](https://marketplace.visualstudio.com/items?itemName=paulober.pico-w-go), you can choose *when* to run a file (regardless of name) which is how we will be running our files from here on out. I recommend not having a main.py at all since it tends to bug out the Pico W trying to run your experimental code as soon as it boots which can get annoying. + + +4. Open the command pallette via `Ctrl + Shift + P` (or `Cmd + Shift + P` if you are on a Mac) and type `Configure Project` and press Enter. The extension should setup your current folder as a Pico W project by initializing a `.vscode/` hidden folder and a `.picowgo` hidden file. Now, any Pico-W-Go commands should work. You can also access Pico-W-Go commands by clicking on "All Commands" at the bottom of VSCode. + +VSCode bottom toolbar with Pico-W-Go commands: + + + +6. Now try connecting to your Pico W and see if it works. You can do this by clicking on the "Connect" button. + +7. Make sure you have your `blink.py` python file open in the editor and Run the "Run current file" command. You should see the onboard LED blink 10 times. + +## 3. Forking the Project + +Now we know your Pico W is working swell, let's get into some atPlatform. + +First, let's create a fork of this repository branch on your own GitHub account. This gives you a copy of the code that you can edit on your own system. + +1. Fork this repository by clicking "Fork" +2. Go into your terminal where you like to keep your code projects and do the following: +- `mkdir at_pico_w` (make an at_pico_w folder) +- `cd at_pico_w` (change directories) +- `git clone /at_pico_w.git> .` (clone your fork into the folder you've created) +- `git checkout -b umass2022` (create a new branch called umass2022 and go into it) +- `git reset --hard origin/umass2022` (reset the branch to the origin) + +Now you should have all of the code in your folder. Your folder should look something like this: + + + +## 4. Connecting to WiFi + +1. Edit the `settings.json` by adding your WiFi and Password, leave the atSign blank for now. + +settings.json + +```json +{ + "ssid": "********", + "password": "&&&&", + "atSign": "" +} +``` + + +2. You should see a file called `test_1_wifi.py` in your project (since you forked the repository). Open this file in VSCode and run the Pico-W-Go command "Run this current file". + +3. The code is: +```py +def main(): + from lib.at_client import io_util + from lib import wifi + + # Add your SSID an Password in `settings.json` + ssid, password, atSign = io_util.read_settings() + del atSign # atSign not needed in memory right now + + print('\nConnecting to %s (Ctrl+C to stop)...' % ssid) + wlan = wifi.init_wlan(ssid, password) + + if not wlan == None: + print('Connected to WiFi %s: %s' %(ssid, str(wlan.isconnected()))) + else: + print('Failed to connect to \'%s\'... :(' %ssid) + +if __name__ == '__main__': + main() +``` + +4. Your output should be similar to: + +``` +Connecting to Soup (Ctrl+C to stop)... +Connected to WiFi Soup: True +``` + +## 5. Connecting to your device atSign's atServer. + +1. If you do not have all of the prerequisites, it is time to get them, especially: [FileZilla](https://filezilla-project.org/) or any other FTP software and two [atSigns](https://my.atsign.com/go) and their [.atKeys files](https://www.youtube.com/watch?v=2Uy-sLQdQcA&ab_channel=Atsign). Continue reading to find out how to get your .atKeys files. + +2. Add the atSign you wish your device's atSign to be in the `settings.json`. This is an atSign you own and got from [my.atsign.com/go](https://my.atsign.com/go). + +settings.json + +``` +{ + "ssid": "******", + "password": "***", + "atSign": "@alice" +} + +``` + +3. To get an .atKeys file belonging to an atSign, go to 0:54 of this [video](https://youtu.be/2Uy-sLQdQcA?t=54). The easiest way is to download [atmospherePro](https://atsign.com/apps/atmospherepro/) on your computer (via Microsoft Store on Windows or the Appstore if you're on Mac) and onboarding your atSign via the onboarding widget in the app. There are other ways to generate the encryption .atKeys file for an atSign (such as the [at_onboarding_cli on Dart](), [RegisterCLI on Java](), the [sshnp_register_tool in at_tools](), [OnboardingCLI in Java]()) but generating it through [atmospherePro](https://atsign.com/apps/atmospherepro/) is the easiest if you don't want to deal with any code. If you have an .atKeys file (like '@bob232.atKeys'), then move onto the next step. We are going to put our atKeys on the Pico W. + +4. Run the "Start FTP server" command to start the FTP server on your Pico W Go. You should be given an address in the terminal (similar to the image below): + + + +5. Open [FileZilla](https://filezilla-project.org/) and connect to this address. If it asks for a password, the password should be `pico`. If you've uploaded and files to the Pico before, you should see them once you've connected. Similar to below: + + + +6. Create a `keys/` directory. This is where you will drag and drop the .atKeys file. Your `/keys/` directory should look like this: + + + +7. Close the FTP server by pressing "Stop" on the bottom toolbar on VSCode. + +Stop button: + + + +Yay our FTP server is closed: + + + +8. Now we have to put our keys in the Pico W, it's time to run `test_3_pkam_authenticate.py`. + +Output should be similar to: + + + +If you don't see this, ensure your settings.json looks like this (with an atSign you own): -## Usage -- Install the latest `firmware.uf2` onto your Pico W from [atsign-foundation/micropython Releases](https://github.com/atsign-foundation/micropython/releases), as this is patched to enable AES CTR, which is used by atSigns. -- Fill all the fields of the `settings.json` file (ssid/passw of your Wi-Fi network and atSign). -- Download [Thonny IDE](https://thonny.org/) and place all the files of this repository in the Pico W file system. -- Place your `.atKeys` file in the `~/keys/` directory (if the folder doesn't exist, create it manually) -- Run `main.py` and select option `3` in the REPL ("Get privateKey for @[yourAtSign]") -- Re-launch the REPL (run `main.py` again) -- Now you can select option `2` in the REPL to automatically get authenticated in your DESS -- Enjoy! :) +```json +{ + "ssid": "****", + "password": "****", + "atSign": "@fascinatingsnow" +} +``` -(If you get an error when attempting to find the secondary or when trying to connect to it, run again the REPL) +and that you have the .atKeys file in the `/keys/` directory. -(You can uncomment a few commented lines in the `send_verb` method to see the llookup verb answer content decrypted. Warning: some stored keys are not encrypted, if you try to see the decrypted content of one of those stored keys, you will get an error. This exception will be handled in the future) + -## Libraries -- [iot-core-micropython](https://github.com/GoogleCloudPlatform/iot-core-micropython) -- [uasn1](https://github.com/mkomon/uasn1) +## 6. Sending end-to-end encrypted data \ No newline at end of file diff --git a/aes.py b/aes.py deleted file mode 100644 index b924e75..0000000 --- a/aes.py +++ /dev/null @@ -1,18 +0,0 @@ -import ucryptolib -import os -import binascii - -def aes_decrypt(encryptedText, selfEncryptionKey): - ciphertext = binascii.a2b_base64(encryptedText) - key = binascii.a2b_base64(selfEncryptionKey) - iv = hex_str_to_bytes("00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00") - aes = ucryptolib.aes(key, 6, iv) - decrypted = aes.decrypt(bytearray(ciphertext)) - return decrypted.decode('utf-8').rstrip("\x10") # .decode('utf-8') - -def hex_str_to_bytes(hex_str): - parts = hex_str.split(' ') - together = "".join(parts) - return bytes.fromhex(together) - - \ No newline at end of file diff --git a/lib/aes.py b/lib/aes.py new file mode 100644 index 0000000..5ecd85f --- /dev/null +++ b/lib/aes.py @@ -0,0 +1,33 @@ +import ubinascii +import ucryptolib + +def aes_decrypt(encryptedText: str, aes256Base64Key: str) -> str: + key = ubinascii.a2b_base64(aes256Base64Key) + iv = hex_str_to_bytes("00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00") + aes = ucryptolib.aes(key, 6, iv) + decrypted = aes.decrypt(bytearray(ubinascii.a2b_base64(encryptedText))) + return decrypted.decode('utf-8').rstrip("\x10") + +def aes_encrypt(plain_text: str, aes256Base64Key: str) -> str: + key = ubinascii.a2b_base64(aes256Base64Key) + iv = hex_str_to_bytes("00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00") + aes = ucryptolib.aes(key, 6, iv) + encrypted = aes.encrypt(bytearray(ubinascii.a2b_base64(plain_text))) + return encrypted.decode('utf-8').rstrip("\x10") + +def hex_str_to_bytes(hex_str): + parts = hex_str.split(' ') + together = "".join(parts) + return bytes.fromhex(together) + +def str_to_bytes(s: str) -> bytes: + return ubinascii.a2b_base64(s) + +def str_to_bytearray(s: str) -> bytearray: + return bytearray(str_to_bytes(s)) + +def bytearray_to_str(b: bytearray) -> str: + return str(b) + +def bytes_to_str(b: bytes) -> str: + return str(ubinascii.b2a_base64(b)) \ No newline at end of file diff --git a/lib/at_client/at_utils.py b/lib/at_client/at_utils.py new file mode 100644 index 0000000..217de47 --- /dev/null +++ b/lib/at_client/at_utils.py @@ -0,0 +1,23 @@ +import ubinascii + +def without_prefix(atSign: str): + if atSign.startswith('@'): + return atSign[1:] + return atSign + +def format_atSign(atSign: str): + if not atSign.startswith('@'): + return '@' + atSign + return atSign + +def str_to_bytes(s: str) -> bytes: + return ubinascii.a2b_base64(s) + +def str_to_bytearray(s: str) -> bytearray: + return bytearray(str_to_bytes(s)) + +def bytearray_to_str(b: bytearray) -> str: + return str(b) + +def bytes_to_str(b: bytes) -> str: + return str(ubinascii.b2a_base64(b)) \ No newline at end of file diff --git a/lib/at_client/io_util.py b/lib/at_client/io_util.py new file mode 100644 index 0000000..41301b9 --- /dev/null +++ b/lib/at_client/io_util.py @@ -0,0 +1,14 @@ +import ujson +from lib.at_client import at_utils + +def read_settings(): + with open('settings.json') as f: + info = ujson.loads(f.read()) + return info['ssid'], info['password'], info['atSign'].replace('@', '') + +def read_key(atSign: str): + atSign = at_utils.without_prefix(atSign) + path = '/keys/@' + atSign + '_key.atKeys' + with open(path) as f: + info = ujson.loads(f.read()) + return info['aesEncryptPrivateKey'], info['aesEncryptPublicKey'], info['aesPkamPrivateKey'], info['aesPkamPublicKey'], info['selfEncryptionKey'] \ No newline at end of file diff --git a/lib/at_client/keys_util.py b/lib/at_client/keys_util.py new file mode 100644 index 0000000..c704854 --- /dev/null +++ b/lib/at_client/keys_util.py @@ -0,0 +1,43 @@ +from lib.at_client.io_util import read_key +from lib.aes import aes_decrypt +from lib.at_client.at_utils import without_prefix +from lib.pem_service import get_pem_parameters, get_pem_key +import os + +# initializes /keys/@alice/ with RSA keys in their pem form +# e.g. /keys/@alice/aesEncryptPrivateKey_pem.json +def initialize_keys(atSign: str) -> None: + aesEncryptPrivateKey, aesEncryptPublicKey, aesPkamPrivateKey, aesPkamPublicKey, selfEncryptionKey = read_key(atSign) + + try: + os.mkdir('/keys/@%s' % without_prefix(atSign)) + except: + pass + + # aesEncryptPrivateKey_pem_parameters = get_pem_parameters(get_pem_key(aes_decrypt(aesEncryptPrivateKey, selfEncryptionKey), 'private'), 'private') + # with open('/keys/@%s/aesEncryptPrivateKey_pem.json' % without_prefix(atSign), 'w') as w: + # w.write("{\n\"aesEncryptPrivateKey\": [\n" + str(aesEncryptPrivateKey_pem_parameters[0]) + ",\n" + str(aesEncryptPrivateKey_pem_parameters[1]) + ",\n" + str(aesEncryptPrivateKey_pem_parameters[2]) + ",\n" + str(aesEncryptPrivateKey_pem_parameters[3]) + ",\n" + str(aesEncryptPrivateKey_pem_parameters[4]) + "\n]\n}") + # del aesEncryptPrivateKey_pem_parameters, aesEncryptPrivateKey + + # aesEncryptPublicKey_pem_parameters = get_pem_parameters(get_pem_key(aes_decrypt(aesEncryptPublicKey, selfEncryptionKey), 'public'), 'public') + # with open('/keys/@%s/aesEncryptPublicKey_pem.json' % without_prefix(atSign), 'w') as w: + # w.write("{\n\"aesEncryptPublicKey\": [\n" + str(aesEncryptPublicKey_pem_parameters[0]) + ",\n" + str(aesEncryptPublicKey_pem_parameters[1]) + ",\n" + str(aesEncryptPublicKey_pem_parameters[2]) + ",\n" + str(aesEncryptPublicKey_pem_parameters[3]) + ",\n" + str(aesEncryptPublicKey_pem_parameters[4]) + "\n]\n}") + # del aesEncryptPublicKey_pem_parameters, aesEncryptPublicKey + + aesPkamPrivateKey_pem_parameters = get_pem_parameters(get_pem_key(aes_decrypt(aesPkamPrivateKey, selfEncryptionKey), 'private'), 'private') + with open('/keys/@%s/aesPkamPrivateKey_pem.json' % without_prefix(atSign), 'w') as w: + w.write("{\n\"aesPkamPrivateKey\": [\n" + str(aesPkamPrivateKey_pem_parameters[0]) + ",\n" + str(aesPkamPrivateKey_pem_parameters[1]) + ",\n" + str(aesPkamPrivateKey_pem_parameters[2]) + ",\n" + str(aesPkamPrivateKey_pem_parameters[3]) + ",\n" + str(aesPkamPrivateKey_pem_parameters[4]) + "\n]\n}") + del aesPkamPrivateKey_pem_parameters, aesPkamPrivateKey + + # aesPkamPublicKey_pem_parameters = get_pem_parameters(get_pem_key(aes_decrypt(aesPkamPublicKey, selfEncryptionKey))) + # with open('/keys/@%s/aesPkamPublicKey_pem.json' % without_prefix(atSign), 'w') as w: + # w.write("{\n\"aesPkamPublicKey\": [\n" + str(aesPkamPublicKey_pem_parameters[0]) + ",\n" + str(aesPkamPublicKey_pem_parameters[1]) + ",\n" + str(aesPkamPublicKey_pem_parameters[2]) + ",\n" + str(aesPkamPublicKey_pem_parameters[3]) + ",\n" + str(aesPkamPublicKey_pem_parameters[4]) + "\n]\n}") + # del aesPkamPublicKey_pem_parameters, aesPkamPublicKey + + del selfEncryptionKey + +def get_pem_pkam_private_key_from_file(atSign: str): + import ujson + with open('/keys/@%s/aesPkamPrivateKey_pem.json' %without_prefix(atSign), 'r') as f: + info = ujson.loads(f.read()) + return info["aesPkamPrivateKey"] \ No newline at end of file diff --git a/lib/at_client/remote_secondary.py b/lib/at_client/remote_secondary.py new file mode 100644 index 0000000..26c4069 --- /dev/null +++ b/lib/at_client/remote_secondary.py @@ -0,0 +1,107 @@ +import time +import usocket +import ussl as ssl # type: ignore +from lib.at_client import at_utils + +class RemoteSecondary: + + # rootUrl e.g. root.atsign.org:64 + # atSign e.g. "alice" or "@alice" + def __init__(self, rootUrl: str, atSign: str, wlan=None): + self.rootUrl = rootUrl + self.atSign = at_utils.format_atSign(atSign) + self.wlan = wlan + + + + def is_connected(self) -> bool: + return self.ss is not None + + # returns two variables: response, command + # executes verb on secondary + def send_verb(self, verb: str): + self.ss.write((verb + "\r\n").encode()) + response = b'' + time.sleep(2) + data = self.ss.read() + time.sleep(2) + print('data from verb: %s' % data) + if data is not None: + response += data + parts = response.decode().split('\n') + else: + parts = ['', ''] + + return parts[0], parts[1] + + def get_secondary_address(self): + return self.secondary_address + + # initializes self.ss (usocket.socket object), secondary_address nullable + def connect_to_secondary(self, secondary_address=None) -> None: + # print('Connecting to secondary... ', end="") + + if(secondary_address == None): + rootHost = self.rootUrl.split(':')[0] + # rootPort = int(self.rootUrl.split(':')[1]) + rootPort = 64 + # print('Finding secondary...') + self.secondary_address = self.find_secondary(self.atSign, rootHost, rootPort) + time.sleep(2) + else: + self.secondary_address = secondary_address + + ss_split = self.secondary_address.split(":") + print(ss_split) + address = ss_split[0] + port = ss_split[1] + + a = usocket.getaddrinfo(address, int(port))[0][-1] + s = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM) + + try: + s.connect(a) + except OSError as e: + if str(e) == '119': # For non-Blocking sockets 119 is EINPROGRESS + print("In Progress") + else: + raise e + + s.setblocking(False) + ss = ssl.wrap_socket(s, do_handshake = True) + self.ss = ss + + # returns the secondary address (as a string) of a given atSign, rootHost, and rootPort + def find_secondary(self, atSign: str, rootHost: str, rootPort: int) -> str: + atSign = at_utils.without_prefix(atSign) + # print('Finding secondary for @' + atSign + '...') + a = usocket.getaddrinfo(rootHost, rootPort)[0][-1] + s = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM) + + try: + s.connect(a) + except OSError as e: + if str(e) == '119': # For non-Blocking sockets 119 is EINPROGRESS + print("In Progress") + else: + raise e + + s.setblocking(False) + time.sleep(1) + ss = ssl.wrap_socket(s, do_handshake = True) + time.sleep(1) + + ss.write((atSign + "\r\n").encode()) + time.sleep(1) + + response = b'' + data = ss.read() + time.sleep(0.5) + response += data + secondary = response.decode().replace('@', '') + secondary = secondary.replace('\r\n', '') + + ss.close() + # print('Address found: %s' % secondary) + return secondary + \ No newline at end of file diff --git a/ntp_client.py b/lib/ntp_client.py similarity index 97% rename from ntp_client.py rename to lib/ntp_client.py index ac499b0..4276388 100644 --- a/ntp_client.py +++ b/lib/ntp_client.py @@ -1,4 +1,5 @@ -import network # type: ignore +# ntp, network time protocol client + import time import usocket as socket # type: ignore import struct diff --git a/pem_service.py b/lib/pem_service.py similarity index 59% rename from pem_service.py rename to lib/pem_service.py index c15a4ad..f3b4e20 100644 --- a/pem_service.py +++ b/lib/pem_service.py @@ -1,5 +1,4 @@ import io -import sys import ubinascii import uasn1 @@ -8,7 +7,6 @@ def read_pem(input_data): data = [] state = 0 for line in input_data: - # print(line) if state == 0: if line.startswith('-----BEGIN'): state = 1 @@ -35,6 +33,8 @@ def strid(id): s = 'OCTET STRING' elif id == uasn1.Null: s = 'NULL' + elif id == uasn1.BitString: + s = 'BIT STRING' elif id == uasn1.ObjectIdentifier: s = 'OBJECT IDENTIFIER' elif id == uasn1.Enumerated: @@ -55,9 +55,11 @@ def strclass(id): s = 'UNIVERSAL' elif id == uasn1.ClassApplication: s = 'APPLICATION' + elif id == uasn1.BitString: + s = 'BIT STRING' elif id == uasn1.ClassContext: s = 'CONTEXT' - elif id == san1.ClassPrivate: + elif id == uasn1.ClassPrivate: s = 'PRIVATE' else: raise ValueError('Illegal class: %#02x' % id) @@ -71,6 +73,7 @@ def prettyprint(input_data, output, indent=0): """Pretty print ASN.1 data.""" while not input_data.eof(): tag = input_data.peek() + # print(output.getvalue()) if tag[1] == uasn1.TypePrimitive: tag, value = input_data.read() output.write(' ' * indent) @@ -84,8 +87,8 @@ def prettyprint(input_data, output, indent=0): prettyprint(input_data, output, indent+2) input_data.leave() -def get_pem_parameters(pem): - formatted_pem = format_pem(pem) +def get_pem_parameters(pem, type: str): + formatted_pem = format_pem(pem, type) input_data = read_pem(formatted_pem) data = [] for line in input_data: @@ -115,12 +118,19 @@ def get_pem_parameters(pem): # text.append(str(i)) return result #, text -def get_pem_key(pkcs8): - formatted_pkcs8 = format_pem(pkcs8) +def get_pem_key(pkcs8, type: str): + # print('formatted_pkcs8') + formatted_pkcs8 = format_pem(pkcs8, type) + # print(formatted_pkcs8) + + # print('input_data') input_data = read_pem(formatted_pkcs8) + # print(input_data) + data = [] for line in input_data: data.append(line) + if isinstance(data[0], str): data = b''.join(data) elif isinstance(data[0], int): @@ -128,6 +138,9 @@ def get_pem_key(pkcs8): else: print('invalid data') + # print('data') + # print(data) + dec = uasn1.Decoder() dec.start(data) @@ -140,11 +153,73 @@ def get_pem_key(pkcs8): # print(value) return value -def format_pem(pem): +# type == "public" or "private" +def format_pem(pem, type: str): pem_list = [] for i in range(0, len(pem), 64): pem_list.append(pem[i:i+64]) - pem_list.insert(0, "-----BEGIN RSA PRIVATE KEY-----") - pem_list.append("-----END RSA PRIVATE KEY-----") + pem_list.insert(0, "-----BEGIN RSA %s KEY-----" %type.upper()) + pem_list.append("-----END RSA %s KEY-----" %type.upper()) return pem_list + +def get_public_n_e(publicRsaKeyDecrypted: str): + """ + get the n and e value of a public rsa key (decrypted, base 64, pkcs1 e.g.: 'MIIBIjANBgkqhkiG9w0BAQE...') + + Example: + + sk = '***' # selfEncryptionKey + encryptedEncryptPublicKey = '***' # aesEncryptPublicKey + + from lib import aes + from lib.pem_service import get_public_n_e + + n, e = get_public_n_e(aes.aes_decrypt(encryptedEncryptPublicKey, sk)) + print(n, e) # e.g. : '17722134712468768015452030444478829164426015687915321737937997713634046043898347302696251047824063488994... 65537' + + """ + formatted_pem = format_pem(publicRsaKeyDecrypted, "public") + input_data = read_pem(formatted_pem) + + from lib import uasn1 + dec = uasn1.Decoder() + dec.start(input_data) + + import io + s = io.StringIO() + + prettyprint(dec, s) + + # print(s.getvalue()) + + incorrect_ascii = s.getvalue().split('\n')[4].replace(' [UNIVERSAL] BIT STRING (value \'', '').replace('\')', '') + # print('incorrect_ascii %s' %incorrect_ascii) + hex_str = ubinascii.hexlify(ubinascii.a2b_base64(incorrect_ascii), ' ').decode() # '00 30 82...' + # print('hex_str %s' %hex_str) + correct_hex_str = hex_str[3:] + # print('hex_str removed %s' %correct_hex_str) + # convert back to ascii + correct_ascii = ubinascii.unhexlify(correct_hex_str.replace(' ', '')) + # print(correct_ascii) + + input_data = correct_ascii + dec = uasn1.Decoder() + dec.start(input_data) + + s = io.StringIO() + + prettyprint(dec, s) + # print(s.getvalue()) + + n_e_array = s.getvalue().split('\n')[1:3] + # print('n_e_array %s' %n_e_array) + numbers = [] + for value in n_e_array: + value = value.replace(' [UNIVERSAL] INTEGER (value ', '').replace(')', '') + # print(value) + numbers.append(int(value)) + + # print('n: %s' %numbers[0]) + # print('e: %s' %numbers[1]) + return numbers[0], numbers[1] \ No newline at end of file diff --git a/lib/third_party/rsa/pkcs1.py b/lib/third_party/rsa/pkcs1.py index 5d3e561..0adfca5 100644 --- a/lib/third_party/rsa/pkcs1.py +++ b/lib/third_party/rsa/pkcs1.py @@ -235,8 +235,8 @@ def decrypt(crypto, priv_key): cleartext = transform.int2bytes(decrypted, blocksize) # If we can't find the cleartext marker, decryption failed. - if cleartext[0:2] != b'\x00\x02': - raise DecryptionError('Decryption failed') + # if cleartext[0:2] != b'\x00\x02': + # raise DecryptionError('Decryption failed') # Find the 00 separator between the padding and the message try: diff --git a/lib/uasn1.py b/lib/uasn1.py index 434f312..568f056 100644 --- a/lib/uasn1.py +++ b/lib/uasn1.py @@ -8,6 +8,7 @@ Boolean = 0x01 Integer = 0x02 +BitString = 0x03 OctetString = 0x04 Null = 0x05 ObjectIdentifier = 0x06 @@ -333,6 +334,9 @@ def _read_value(self, nr, length): bytes_data = self._read_bytes(length) if nr == Boolean: value = self._decode_boolean(bytes_data) + elif nr == BitString: + value = binascii.b2a_base64(bytes_data).decode().rstrip('\n') + value = self._decode_bit_string(bytes_data) elif nr in (Integer, Enumerated): value = self._decode_integer(bytes_data) elif nr == OctetString: @@ -405,6 +409,11 @@ def _decode_octet_string(self, bytes_data): """Decode an octet string.""" a = binascii.b2a_base64(bytes_data) return a.decode().rstrip('\n') + + def _decode_bit_string(self, bytes_data): + """Decode a bit string""" + a = binascii.b2a_base64(bytes_data).decode().rstrip('\n') + return a def _decode_null(self, bytes_data): """Decode a Null value.""" diff --git a/lib/wifi.py b/lib/wifi.py new file mode 100644 index 0000000..6461290 --- /dev/null +++ b/lib/wifi.py @@ -0,0 +1,10 @@ +import network + +# Returns network.WLAN object +def init_wlan(ssid: str, password: str): + wlan = network.WLAN(network.STA_IF) + wlan.active(True) + wlan.connect(ssid, password) + while not wlan.isconnected(): + pass + return wlan \ No newline at end of file diff --git a/main.py b/main.py deleted file mode 100644 index 6bf54ad..0000000 --- a/main.py +++ /dev/null @@ -1,211 +0,0 @@ -import network # type: ignore -import time -from ntp_client import sync_time, format_time_string, format_time_id -import usocket as socket # type: ignore -import ussl as ssl # type: ignore -import sys -import machine -import ujson -from third_party import rsa -from ubinascii import b2a_base64 -from third_party import string -from aes import aes_decrypt - -ssid = password = atSign = '' -aesEncryptPrivateKey = aesEncryptPublicKey = aesPkamPrivateKey = aesPkamPublicKey = selfEncryptionKey = '' -privateKey = [] - -sensor_temp = machine.ADC(4) -conversion_factor = 3.3 / (65535) - -def main(): - - global ssid, password, atSign, privateKey, aesEncryptPrivateKey, aesEncryptPublicKey, aesPkamPrivateKey, aesPkamPublicKey, selfEncryptionKey - ssid, password, atSign, privateKey = read_settings() - aesEncryptPrivateKey, aesEncryptPublicKey, aesPkamPrivateKey, aesPkamPublicKey, selfEncryptionKey = read_key(atSign) - pkamPrivateKey = aes_decrypt(aesPkamPrivateKey, selfEncryptionKey) - # print(pkamPrivateKey) - - wlan = network.WLAN(network.STA_IF) # type: ignore - wlan.active(True) - wlan.connect(ssid, password) - - while not wlan.isconnected() and wlan.status() >= 0: - time.sleep(1) - print("Wi-Fi .. Connecting") - - print ("Wi-Fi Connected") - - sync_time() - - while True: - print("Welcome! What would you like to do?\n\t1) REPL\n\t2) REPL (from:atSign challenge is completed automatically)\n\t3) Get privateKey for @" + atSign + "\n\t4) Temperature sensor menu\n\t5) Exit") - opt= input("> ") - - if int(opt) == 1 or int(opt) == 2: - secondary = find_secondary(atSign) - ss = connect_to_secondary(secondary) - - print('Connected\n') - command = '@' - - if int(opt) == 2: - response, command = send_verb(ss, 'from:' + atSign) - if 'data:_' in response: - print("Signing challenge... (This process can take up to 1 minute)") - private_key = rsa.PrivateKey(privateKey[0], privateKey[1], privateKey[2], privateKey[3], privateKey[4]) - challenge = response.replace('@data:', '') - signature = b42_urlsafe_encode(rsa.sign(challenge, private_key, 'SHA-256')) - print(signature) - response, command = send_verb(ss, 'pkam:' + signature) - print(response) - else: - ss.close() - sys.exit(1) - - while True: - verb = str(input(command)) - response, command = send_verb(ss, verb) - print(response) - if('error:AT' in response): - ss.close() - sys.exit(1) - - elif int(opt) == 3: - from pem_service import get_pem_parameters, get_pem_key - - pemKey = get_pem_key(pkamPrivateKey) - privateKey = get_pem_parameters(pemKey) - with open('settings.json', 'w') as w: - # ujson.dump({"ssid": ssid, "password" : password, "atSign" : atSign, "privateKey" : privateKey}, w) - # w.write("{\n\t\"ssid\": " + ssid + ",\n\t\"password\": " + password + ",\n\t\"atSign\": "+ atSign + ",\n\t\"privateKey\": [" + text[0] + ",\n\t" + text[1] + ",\n\t" + text[2] + ",\n\t" + text[3] + ",\n\t" + text[4] + "\n\t]\n}") - - w.write("{\n\t\"ssid\": \"" + ssid + "\",\n\t\"password\": \"" + password + "\",\n\t\"atSign\": \"" + atSign + - "\",\n\t\"privateKey\": [\n\t\t\t\t\t" + str(privateKey[0]) + ",\n\t\t\t\t\t" + str(privateKey[1]) + ",\n\t\t\t\t\t" + str(privateKey[2]) + - ",\n\t\t\t\t\t" + str(privateKey[3]) + ",\n\t\t\t\t\t" + str(privateKey[4]) + "\n\t\t\t\t ]\n}") - - print("Your privateKey has been generated. Re-launch REPL to continue") - sys.exit() - - - elif int(opt) == 4: - while True: - print("Temperature sensor menu\n\t1) See current temperature\n\t2) Record current temperature\n\t3) Exit") - tmp = input("> ") - if int(tmp) == 1: - print("--------------------\n") - print(format_time_string() + "\nTemperature: " + str(measure_temp())) - print("\n--------------------") - elif int(tmp) == 2: - tm = str(measure_temp()) - verb = "update:" + str(format_time_id()) + ".temperature@" + atSign + " " + tm - secondary = find_secondary(atSign) - ts = connect_to_secondary(secondary) - print('Connected\n') - response, command = send_verb(ts, 'from:' + atSign) - print(response) - pkam = str(input(command)) - response, command = send_verb(ts, pkam) - print(response) - if('success' in response): - send_verb(ts, verb) - print("Current temperature measurement has been recorded:\n" + verb.replace('update:', '') + '\n') - else: - print('Could not record temperature: Wrong pkam') - ts.close() - elif int(tmp) == 3: - break - else: - print('Invalid option. Please enter a number in the range [1-3]') - elif int(opt) == 5: - sys.exit(1) - else: - print('Invalid option. Please enter a number in the range [1-4]') - - -def send_verb(skt, verb): - skt.write((verb + "\r\n").encode()) - time.sleep(1) - response = b'' - data = skt.read() - response += data - parts = response.decode().split('\n') - -# if 'llookup:' in verb: -# content = parts[0].replace('data:', '') -# parts[0] = aes_decrypt(content, selfEncryptionKey) - - return parts[0], parts[1] - -def measure_temp(): - reading = sensor_temp.read_u16() * conversion_factor - temperature = 27 - (reading - 0.706)/0.001721 - return temperature - -def read_settings(): - with open('settings.json') as f: - info = ujson.loads(f.read()) - return info['ssid'], info['password'], info['atSign'].replace('@', ''), info['privateKey'] - -def read_key(atSign): - path = '/keys/@' + atSign + '_key.atKeys' - with open(path) as f: - info = ujson.loads(f.read()) - return info['aesEncryptPrivateKey'], info['aesEncryptPublicKey'], info['aesPkamPrivateKey'], info['aesPkamPublicKey'], info['selfEncryptionKey'] - -def connect_to_secondary(secondary): - print('Connecting to secondary... ', end="") - - secondary = secondary.split(':') - address = secondary[0] - port = secondary[1] - - a = socket.getaddrinfo(address, int(port))[0][-1] - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - try: - s.connect(a) - except OSError as e: - if str(e) == '119': # For non-Blocking sockets 119 is EINPROGRESS - print("In Progress") - else: - raise e - - s.setblocking(False) - ss = ssl.wrap_socket(s, do_handshake = True) - return ss - -def find_secondary(atSign): - print('Finding secondary for @' + atSign + '...') - a = socket.getaddrinfo('root.atsign.org', 64)[0][-1] - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - try: - s.connect(a) - except OSError as e: - if str(e) == '119': # For non-Blocking sockets 119 is EINPROGRESS - print("In Progress") - else: - raise e - - s.setblocking(False) - ss = ssl.wrap_socket(s, do_handshake = True) - - ss.write((atSign + "\r\n").encode()) - time.sleep(1) - - response = b'' - data = ss.read() - response += data - secondary = response.decode().replace('@', '') - secondary = secondary.replace('\r\n', '') - - ss.close() - print('Address found: %s' % secondary) - return secondary - -def b42_urlsafe_encode(payload): - return string.translate(b2a_base64(payload)[:-1].decode('utf-8'),{ ord('+'):'-', ord('/'):'_' }) - -if __name__ == '__main__': - main() diff --git a/settings.json b/settings.json index f1e306a..b25c80c 100644 --- a/settings.json +++ b/settings.json @@ -1,6 +1,5 @@ { - "ssid": "", - "password": "", - "atSign": "", - "privateKey": [] + "ssid": "***", + "password": "***", + "atSign": "@fascinatingsnow" } \ No newline at end of file diff --git a/test_1_wifi.py b/test_1_wifi.py new file mode 100644 index 0000000..598f414 --- /dev/null +++ b/test_1_wifi.py @@ -0,0 +1,26 @@ + + +def main(): + """ + Sample output + + >>> Connecting to Soup (Ctrl+C to stop)... + >>> Connected to WiFi Soup: True + """ + from lib.at_client import io_util + from lib import wifi + + # Add your SSID an Password in `settings.json` + ssid, password, atSign = io_util.read_settings() + del atSign # atSign not needed in memory right now + + print('\nConnecting to %s (Ctrl+C to stop)...' % ssid) + wlan = wifi.init_wlan(ssid, password) + + if not wlan == None: + print('Connected to WiFi %s: %s' %(ssid, str(wlan.isconnected()))) + else: + print('Failed to connect to \'%s\'... :(' %ssid) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/test_2_find_secondary_address.py b/test_2_find_secondary_address.py new file mode 100644 index 0000000..54accc1 --- /dev/null +++ b/test_2_find_secondary_address.py @@ -0,0 +1,39 @@ + + +def main(): + """ + + Finds the secondary address of `atSign` from settings.json. Ensure `SSID` AND `password` are correct in settings.json. + + Sample output + + >>> Connecting to Soup (Ctrl+C to stop)... + >>> Connecting to remote secondary of atSign @fascinatingsnow... + >>> Connected to secondary of atSign @fascinatingsnow with address 6fe57327-01c1-5bbc-8a3c-3af1df169545.swarm0002.atsign.zone:5004 + """ + from lib.at_client import io_util + from lib.at_client.remote_secondary import RemoteSecondary + from lib import wifi + + # get values + ssid, password, atSign = io_util.read_settings() + rootUrl = 'root.atsign.org:64' + rootHost = rootUrl.split(':')[0] + rootPort = int(rootUrl.split(':')[1]) + + # connect to internet + print('\nConnecting to %s (Ctrl+C to stop)...' % ssid) + wlan = wifi.init_wlan(ssid, password) + print('Connected: ' %str(not wlan == None)) + + # initialize RemtoeSecondary object + rs = RemoteSecondary(rootUrl, atSign, wlan) + + # find secondary address + print('Connecting to remote secondary of atSign @%s...' % atSign) + ss_address = rs.find_secondary(atSign, rootHost, rootPort) + print('Connected to secondary of atSign @%s with address %s' %(atSign, ss_address)) + +if __name__ == '__main__': + main() + diff --git a/test_3_pkam_authenticate.py b/test_3_pkam_authenticate.py new file mode 100644 index 0000000..3f4f5af --- /dev/null +++ b/test_3_pkam_authenticate.py @@ -0,0 +1,50 @@ + + +def main(): + import sys + shouldRun = str(input('Run? (y/n): ')) + if shouldRun == 'n': + sys.exit(1) + + from lib.at_client import keys_util + from lib.at_client import io_util + from lib.at_client import remote_secondary + from lib import wifi + from lib.third_party import rsa + from lib.at_client.at_utils import without_prefix + ssid, password, atSign = io_util.read_settings() + keys_util.initialize_keys(atSign) # initialize /keys/@{your_atSign}/ containing _pem.json keys extracted from /keys/@fascinatingsnow_key.atKeys, you really only need to run this once per atSign + + atSignWithoutPrefix = without_prefix(atSign) + rootUrl = 'root.atsign.org:64' + + print('Connecting to WiFi %s...' % ssid) + wlan = wifi.init_wlan(ssid, password) + + remote_secondary = remote_secondary.RemoteSecondary(rootUrl, atSign, wlan) + print('Connecting to remote secondary of atSign @%s...' % atSignWithoutPrefix) + remote_secondary.connect_to_secondary() + print('Connected to secondary of atSign @%s with address %s' %(atSignWithoutPrefix, str(remote_secondary.get_secondary_address()))) + if remote_secondary.is_connected(): + print('Sending from:%s' % atSign) + response, command = remote_secondary.send_verb('from:%s' %atSignWithoutPrefix) + challenge = response.replace('@data:', '') + print('Challenge: %s' % challenge) + print('Digesting...') + pemPkamPrivateKey = keys_util.get_pem_pkam_private_key_from_file(atSign) # parameters + rsaPkamPrivateKey = rsa.PrivateKey(pemPkamPrivateKey[0], pemPkamPrivateKey[1], pemPkamPrivateKey[2], pemPkamPrivateKey[3], pemPkamPrivateKey[4]) + signature = b42_urlsafe_encode(rsa.sign(challenge, rsaPkamPrivateKey, 'SHA-256')) + print('Signature: %s' % str(signature)) + response, command = remote_secondary.send_verb('pkam:' + signature) + print(response) # data:success + + del signature, challenge, rsaPkamPrivateKey, pemPkamPrivateKey, ssid, password + sys.exit(1) + +def b42_urlsafe_encode(payload): + from lib.third_party import string + from ubinascii import b2a_base64 + return string.translate(b2a_base64(payload)[:-1].decode('utf-8'),{ ord('+'):'-', ord('/'):'_' }) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/test_4_send_data.py b/test_4_send_data.py new file mode 100644 index 0000000..7be7a94 --- /dev/null +++ b/test_4_send_data.py @@ -0,0 +1,76 @@ + + +def main(): + shouldRun = str(input('Run? (y/n): ')) + if shouldRun == 'n': + import sys + sys.exit(1) + + from lib.at_client import keys_util + from lib.at_client import io_util + from lib.at_client import remote_secondary + from lib import wifi + from lib.third_party import rsa + from lib.at_client.at_utils import without_prefix + ssid, password, atSign = io_util.read_settings() + keys_util.initialize_keys(atSign) # initialize /keys/@{your_atSign}/ containing _pem.json keys extracted from /keys/@fascinatingsnow_key.atKeys, you really only need to run this once per atSign + + atSignWithoutPrefix = without_prefix(atSign) + rootUrl = 'root.atsign.org:64' + + print('Connecting to WiFi %s...' % ssid) + wlan = wifi.init_wlan(ssid, password) + + remote_secondary = remote_secondary.RemoteSecondary(rootUrl, atSign, wlan) + print('Connecting to remote secondary of atSign @%s...' % atSignWithoutPrefix) + remote_secondary.connect_to_secondary() + print('Connected to secondary of atSign @%s with address %s' %(atSignWithoutPrefix, str(remote_secondary.get_secondary_address()))) + if remote_secondary.is_connected(): + print('Sending from:%s' % atSign) + response, command = remote_secondary.send_verb('from:%s' %atSignWithoutPrefix) + challenge = response.replace('@data:', '') + print('Challenge: %s' % challenge) + print('Digesting...') + pemPkamPrivateKey = keys_util.get_pem_pkam_private_key_from_file(atSign) # parameters + rsaPkamPrivateKey = rsa.PrivateKey(pemPkamPrivateKey[0], pemPkamPrivateKey[1], pemPkamPrivateKey[2], pemPkamPrivateKey[3], pemPkamPrivateKey[4]) + signature = b42_urlsafe_encode(rsa.sign(challenge, rsaPkamPrivateKey, 'SHA-256')) + print('Signature: %s' % str(signature)) + response, command = remote_secondary.send_verb('pkam:' + signature) + print(response) # data:success + + del signature, challenge, rsaPkamPrivateKey, pemPkamPrivateKey, ssid, password + + import time + time.sleep(1) + del time + + # Now we're logged in! + # Consider + # - @smoothalligator as the app atSign + # - @fascinatingsnow as the IoT Pico W atSign + + # A) Receive data from another atSign + # theirAtSign = '@smoothalligator' + # keyName = 'led' + # for i in range(50): + # response, command = remote_secondary.send_verb('lookup:%s%s' % (keyName, theirAtSign)) + # if response != None: + # response = response.replace('@data:', '') + # print(response) # 0, 1 + + # B) Send a message to another atSign + # theirAtSign = '@smoothalligator' + # keyName = 'message' + # value = 'some message' + # updateCommand = ('update:%s%s %s' %(keyName, theirAtSign, value)) + # print('ran command: %s' %updateCommand) + # response, command = remote_secondary.send_verb(updateCommand) + # print(response) + +def b42_urlsafe_encode(payload): + from lib.third_party import string + from ubinascii import b2a_base64 + return string.translate(b2a_base64(payload)[:-1].decode('utf-8'),{ ord('+'):'-', ord('/'):'_' }) + +if __name__ == '__main__': + main() \ No newline at end of file