forked from WebOfTrust/keripy
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request WebOfTrust#760 from SmithSamuelM/main
added streaming.py and test_streaming.py preliminary setup for annota…
- Loading branch information
Showing
4 changed files
with
136 additions
and
67 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
keri.core.streaming module | ||
Provides support for Streamer and Annotater | ||
""" | ||
|
||
|
||
from typing import NamedTuple | ||
from collections import namedtuple | ||
|
||
from .. import kering | ||
|
||
from .. import help | ||
|
||
|
||
from . import coring | ||
|
||
|
||
def annot(): | ||
"""Annotate CESR stream""" | ||
|
||
|
||
|
||
def denot(): | ||
"""De-annotate CESR stream""" | ||
|
||
|
||
|
||
class Streamer: | ||
""" | ||
Streamer is CESR sniffable stream class | ||
Has the following public properties: | ||
Properties: | ||
Methods: | ||
Hidden: | ||
""" | ||
|
||
def __init__(self, stream): | ||
"""Initialize instance | ||
Parameters: | ||
stream (bytes | bytearray): sniffable CESR stream | ||
""" | ||
self._stream = bytes(stream) | ||
|
||
|
||
@property | ||
def stream(self): | ||
"""stream property getter | ||
""" | ||
return self._stream | ||
|
||
@property | ||
def text(self): | ||
"""expanded stream as qb64 text | ||
Returns: | ||
stream (bytes): expanded text qb64 version of stream | ||
""" | ||
return self._stream | ||
|
||
@property | ||
def binary(self): | ||
"""compacted stream as qb2 binary | ||
Returns: | ||
stream (bytes): compacted binary qb2 version of stream | ||
""" | ||
return self._stream | ||
|
||
@property | ||
def texter(self): | ||
"""expanded stream as Texter instance | ||
Returns: | ||
texter (Texter): Texter primitive of stream suitable wrapping | ||
""" | ||
return self._stream | ||
|
||
|
||
|
||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
tests.core.test_streaming module | ||
""" | ||
|
||
|
||
import pytest | ||
|
||
|
||
from keri import kering | ||
|
||
from keri.help import helping | ||
|
||
from keri.core import (Matter, ) | ||
|
||
|
||
from keri.core import streaming | ||
from keri.core.streaming import (annot, denot, Streamer) | ||
|
||
|
||
def test_annot(): | ||
"""Test annot function Annotate""" | ||
|
||
"""End Test""" | ||
|
||
def test_streamer(): | ||
"""Test streamer instance""" | ||
|
||
"""End Test""" | ||
|
||
if __name__ == "__main__": | ||
test_annot() | ||
test_streamer() | ||
|
||
|
||
|