diff --git a/src/aiy/__init__.py b/src/aiy/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/aiy/common.py b/src/aiy/common.py new file mode 100644 index 00000000..c97de241 --- /dev/null +++ b/src/aiy/common.py @@ -0,0 +1,227 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""This library provides common drivers for the AIY projects.""" + +import itertools +import os +import threading +import time + +import RPi.GPIO as GPIO + + +class Button(object): + """Detect edges on the given GPIO channel.""" + + def __init__(self, + channel, + polarity=GPIO.FALLING, + pull_up_down=GPIO.PUD_UP, + debounce_time=0.15): + """A simple GPIO-based button driver. + + This driver supports a simple GPIO-based button. It works by detecting + edges on the given GPIO channel. Debouncing is automatic. + + Args: + channel: the GPIO pin number to use (BCM mode) + polarity: the GPIO polarity to detect; either GPIO.FALLING or + GPIO.RISING. + pull_up_down: whether the port should be pulled up or down; defaults to + GPIO.PUD_UP. + debounce_time: the time used in debouncing the button in seconds. + """ + + if polarity not in [GPIO.FALLING, GPIO.RISING]: + raise ValueError( + 'polarity must be one of: GPIO.FALLING or GPIO.RISING') + + self.channel = int(channel) + self.polarity = polarity + self.expected_value = polarity == GPIO.RISING + self.debounce_time = debounce_time + + GPIO.setmode(GPIO.BCM) + GPIO.setup(channel, GPIO.IN, pull_up_down=pull_up_down) + + self.callback = None + + def wait_for_press(self): + """Waits for the button to be pressed. + + This method blocks until the button is pressed. + """ + GPIO.add_event_detect(self.channel, self.polarity) + while True: + if GPIO.event_detected(self.channel) and self._debounce(): + GPIO.remove_event_detect(self.channel) + return + else: + time.sleep(0.1) + + def on_press(self, callback): + """Calls the callback whenever the button is pressed. + + Args: + callback: a function to call whenever the button is pressed. It should + take a single channel number. + + Example: + def MyButtonPressHandler(channel): + print "button pressed: channel = %d" % channel + my_button.on_press(MyButtonPressHandler) + """ + GPIO.remove_event_detect(self.channel) + self.callback = callback + GPIO.add_event_detect( + self.channel, self.polarity, callback=self._debounce_and_callback) + + def _debounce_and_callback(self, _): + if self._debounce(): + self.callback() + + def _debounce(self): + """Debounces the GPIO signal. + + Check that the input holds the expected value for the debounce + period, to avoid false trigger on short pulses. + """ + start = time.time() + while time.time() < start + self.debounce_time: + if GPIO.input(self.channel) != self.expected_value: + return False + time.sleep(0.01) + return True + + +class LED: + """Starts a background thread to show patterns with the LED. + + Simple usage: + my_led = LED(channel = 25) + my_led.start() + my_led.set_state(LED_BEACON) + my_led.stop() + """ + + LED_OFF = 0 + LED_ON = 1 + LED_BLINK = 2 + LED_BLINK_3 = 3 + LED_BEACON = 4 + LED_BEACON_DARK = 5 + LED_DECAY = 6 + LED_PULSE_SLOW = 7 + LED_PULSE_QUICK = 8 + + def __init__(self, channel): + self.animator = threading.Thread(target=self._animate) + self.channel = channel + self.iterator = None + self.running = False + self.state = None + self.sleep = 0 + + GPIO.setmode(GPIO.BCM) + GPIO.setup(channel, GPIO.OUT) + self.pwm = GPIO.PWM(channel, 100) + + self.lock = threading.Lock() + + def start(self): + """Starts the LED driver.""" + with self.lock: + if not self.running: + self.running = True + self.pwm.start(0) # off by default + self.animator.start() + + def stop(self): + """Stops the LED driver and sets the LED to off.""" + with self.lock: + if self.running: + self.running = False + self.animator.join() + self.pwm.stop() + + def set_state(self, state): + """Sets the LED driver's new state. + + Note the LED driver must be started for this to have any effect. + """ + with self.lock: + self.state = state + + def _animate(self): + while True: + state = None + running = False + with self.lock: + state = self.state + self.state = None + running = self.running + if not running: + return + if state: + if not self._parse_state(state): + raise ValueError('unsupported state: %d' % state) + if self.iterator: + self.pwm.ChangeDutyCycle(next(self.iterator)) + time.sleep(self.sleep) + else: + # We can also wait for a state change here with a Condition. + time.sleep(1) + + def _parse_state(self, state): + self.iterator = None + self.sleep = 0.0 + if state == self.LED_OFF: + self.pwm.ChangeDutyCycle(0) + return True + if state == self.LED_ON: + self.pwm.ChangeDutyCycle(100) + return True + if state == self.LED_BLINK: + self.iterator = itertools.cycle([0, 100]) + self.sleep = 0.5 + return True + if state == self.LED_BLINK_3: + self.iterator = itertools.cycle([0, 100] * 3 + [0, 0]) + self.sleep = 0.25 + return True + if state == self.LED_BEACON: + self.iterator = itertools.cycle( + itertools.chain([30] * 100, [100] * 8, range(100, 30, -5))) + self.sleep = 0.05 + return True + if state == self.LED_BEACON_DARK: + self.iterator = itertools.cycle( + itertools.chain([0] * 100, range(0, 30, 3), range(30, 0, -3))) + self.sleep = 0.05 + return True + if state == self.LED_DECAY: + self.iterator = itertools.cycle(range(100, 0, -2)) + self.sleep = 0.05 + return True + if state == self.LED_PULSE_SLOW: + self.iterator = itertools.cycle( + itertools.chain(range(0, 100, 2), range(100, 0, -2))) + self.sleep = 0.1 + return True + if state == self.LED_PULSE_QUICK: + self.iterator = itertools.cycle( + itertools.chain(range(0, 100, 5), range(100, 0, -5))) + self.sleep = 0.05 + return True + return False diff --git a/src/led.py b/src/led.py index dca70ab5..1dd1e248 100644 --- a/src/led.py +++ b/src/led.py @@ -11,15 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Signal states on a LED""" -'''Signal states on a LED''' - -import itertools import logging import os -import threading import time +import aiy.common import RPi.GPIO as GPIO logger = logging.getLogger('led') @@ -31,83 +29,6 @@ ] -class LED: - - """Starts a background thread to show patterns with the LED.""" - - def __init__(self, channel): - self.animator = threading.Thread(target=self._animate) - self.channel = channel - self.iterator = None - self.running = False - self.state = None - self.sleep = 0 - - GPIO.setup(channel, GPIO.OUT) - self.pwm = GPIO.PWM(channel, 100) - - def start(self): - self.pwm.start(0) # off by default - self.running = True - self.animator.start() - - def stop(self): - self.running = False - self.animator.join() - self.pwm.stop() - GPIO.output(self.channel, GPIO.LOW) - - def set_state(self, state): - self.state = state - - def _animate(self): - # TODO(ensonic): refactor or add justification - # pylint: disable=too-many-branches - while self.running: - if self.state: - if self.state == 'on': - self.iterator = None - self.sleep = 0.0 - self.pwm.ChangeDutyCycle(100) - elif self.state == 'off': - self.iterator = None - self.sleep = 0.0 - self.pwm.ChangeDutyCycle(0) - elif self.state == 'blink': - self.iterator = itertools.cycle([0, 100]) - self.sleep = 0.5 - elif self.state == 'blink-3': - self.iterator = itertools.cycle([0, 100] * 3 + [0, 0]) - self.sleep = 0.25 - elif self.state == 'beacon': - self.iterator = itertools.cycle( - itertools.chain([30] * 100, [100] * 8, range(100, 30, -5))) - self.sleep = 0.05 - elif self.state == 'beacon-dark': - self.iterator = itertools.cycle( - itertools.chain([0] * 100, range(0, 30, 3), range(30, 0, -3))) - self.sleep = 0.05 - elif self.state == 'decay': - self.iterator = itertools.cycle(range(100, 0, -2)) - self.sleep = 0.05 - elif self.state == 'pulse-slow': - self.iterator = itertools.cycle( - itertools.chain(range(0, 100, 2), range(100, 0, -2))) - self.sleep = 0.1 - elif self.state == 'pulse-quick': - self.iterator = itertools.cycle( - itertools.chain(range(0, 100, 5), range(100, 0, -5))) - self.sleep = 0.05 - else: - logger.warning("unsupported state: %s", self.state) - self.state = None - if self.iterator: - self.pwm.ChangeDutyCycle(next(self.iterator)) - time.sleep(self.sleep) - else: - time.sleep(1) - - def main(): logging.basicConfig( level=logging.INFO, @@ -117,25 +38,26 @@ def main(): import configargparse parser = configargparse.ArgParser( default_config_files=CONFIG_FILES, - description="Status LED daemon") + description="Status LED daemon" + ) parser.add_argument('-G', '--gpio-pin', default=25, type=int, help='GPIO pin for the LED (default: 25)') args = parser.parse_args() led = None state_map = { - "starting": "pulse-quick", - "ready": "beacon-dark", - "listening": "on", - "thinking": "pulse-quick", - "stopping": "pulse-quick", - "power-off": "off", - "error": "blink-3", + "starting": aiy.common.LED.LED_PULSE_QUICK, + "ready": aiy.common.LED.LED_BEACON_DARK, + "listening": aiy.common.LED.LED_ON, + "thinking": aiy.common.LED.LED_PULSE_QUICK, + "stopping": aiy.common.LED.LED_PULSE_QUICK, + "power-off": aiy.common.LED.LED_OFF, + "error": aiy.common.LED.LED_BLINK_3, } try: GPIO.setmode(GPIO.BCM) - led = LED(args.gpio_pin) + led = aiy.common.LED(args.gpio_pin) led.start() while True: try: @@ -156,5 +78,6 @@ def main(): led.stop() GPIO.cleanup() + if __name__ == '__main__': main()