diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6c0bb97 --- /dev/null +++ b/.gitignore @@ -0,0 +1,19 @@ +.DS_Store +*.pyc +*~ +*.sqlite +*.sqlite-journal +settings_local.py +local_settings.py +.*.sw[po] +dist/ +*.egg-info +doc/__build/* +build/ +locale/ +pip-log.txt +devdatabase.db +.directory +bundle_version.gen +celeryd.log +celeryd.pid diff --git a/AUTHORS b/AUTHORS new file mode 100644 index 0000000..6737c55 --- /dev/null +++ b/AUTHORS @@ -0,0 +1,10 @@ +Ask Solem +Tareque Hossain +Flavio [FlaPer87] Percoco Premoli +Petar Radosevic +Tomaž Muraus +Peter Hoffmann +David Ziegler +David Clymer +Pascal Hartig +Marcin Lulek (ergo) diff --git a/Changelog b/Changelog new file mode 100644 index 0000000..ab8d3cb --- /dev/null +++ b/Changelog @@ -0,0 +1,8 @@ +t================ + Change history +================ + +0.1.0 +===== + +* Initial port diff --git a/INSTALL b/INSTALL new file mode 100644 index 0000000..34642b0 --- /dev/null +++ b/INSTALL @@ -0,0 +1,21 @@ +Installation +============ + +You can install ``ghettoq`` either via the Python Package Index (PyPI) +or from source. + +To install using ``pip``,:: + + $ pip install ghettoq + + +To install using ``easy_install``,:: + + $ easy_install ghettoq + + +If you have downloaded a source tarball you can install it +by doing the following,:: + + $ python setup.py build + # python setup.py install # as root diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..81d19ce --- /dev/null +++ b/LICENSE @@ -0,0 +1,28 @@ +Copyright (c) 2009, Ask Solem +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + +Neither the name of Ask Solem nor the names of its contributors may be used +to endorse or promote products derived from this software without specific +prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS +BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..06f1d8c --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,10 @@ +include AUTHORS +include Changelog +include INSTALL +include LICENSE +include MANIFEST.in +include README +include THANKS +include TODO +include setup.cfg +recursive-include djkombu * diff --git a/README b/README new file mode 120000 index 0000000..92cacd2 --- /dev/null +++ b/README @@ -0,0 +1 @@ +README.rst \ No newline at end of file diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..2fbe159 --- /dev/null +++ b/README.rst @@ -0,0 +1,52 @@ +=================================================== +kombu-sqlalchemy - Kombu transport using SQLAlchemy +=================================================== + +:version: 0.9.0 + +Introduction +============ + +This package enables you to use SQLAlchemy as the message store +for `Kombu`_. + + +``kombu-sqlalchemy`` contains a single transport, +``sqlakombu.transport.Transport``, which is used like this:: + + >>> from kombu.connection import BrokerConnection + >>> c = BrokerConnection(transport="sqlakombu.transport.Transport") + + +.. _`Kombu`: http://pypi.python.org/pypi/kombu + +Installation +============ + +You can install ``kombu-sqlalchemy`` either via the Python Package Index (PyPI) +or from source. + +To install using ``pip``,:: + + $ pip install kombu-sqlalchemy + + +To install using ``easy_install``,:: + + $ easy_install kombu-sqlalchemy + + +If you have downloaded a source tarball you can install it +by doing the following,:: + + $ python setup.py build + # python setup.py install # as root + +License +======= + +This software is licensed under the ``New BSD License``. See the ``LICENSE`` +file in the top distribution directory for the full license text. + +.. # vim: syntax=rst expandtab tabstop=4 shiftwidth=4 shiftround + diff --git a/THANKS b/THANKS new file mode 100644 index 0000000..f829111 --- /dev/null +++ b/THANKS @@ -0,0 +1,3 @@ +Thanks to Rajesh Dhawan and other authors of django-queue-service +for the database model implementation. +See http://code.google.com/p/django-queue-service/. diff --git a/TODO b/TODO new file mode 100644 index 0000000..baf3adc --- /dev/null +++ b/TODO @@ -0,0 +1 @@ +See http://github.com/ask/ghettoq/issues/ diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..5d3d2a1 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,4 @@ +[nosetests] +verbosity = 1 +detailed-errors = 1 +with-coverage = 1 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..5d71f2c --- /dev/null +++ b/setup.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import os +import codecs + +try: + from setuptools import setup, find_packages, Command +except ImportError: + from ez_setup import use_setuptools + use_setuptools() + from setuptools import setup, find_packages, Command + +from distutils.command.install_data import install_data +from distutils.command.install import INSTALL_SCHEMES +import sys + +import sqlakombu + +packages, data_files = [], [] +root_dir = os.path.dirname(__file__) +if root_dir != '': + os.chdir(root_dir) +src_dir = "sqlakombu" + +install_requires = [] + + +def osx_install_data(install_data): + + def finalize_options(self): + self.set_undefined_options("install", ("install_lib", "install_dir")) + install_data.finalize_options(self) + + +def fullsplit(path, result=None): + if result is None: + result = [] + head, tail = os.path.split(path) + if head == '': + return [tail] + result + if head == path: + return result + return fullsplit(head, [tail] + result) + + +for scheme in INSTALL_SCHEMES.values(): + scheme['data'] = scheme['purelib'] + +SKIP_EXTENSIONS = [".pyc", ".pyo", ".swp", ".swo"] + + +def is_unwanted_file(filename): + for skip_ext in SKIP_EXTENSIONS: + if filename.endswith(skip_ext): + return True + return False + + +for dirpath, dirnames, filenames in os.walk(src_dir): + # Ignore dirnames that start with '.' + for i, dirname in enumerate(dirnames): + if dirname.startswith("."): + del dirnames[i] + for filename in filenames: + if filename.endswith(".py"): + packages.append('.'.join(fullsplit(dirpath))) + elif is_unwanted_file(filename): + pass + else: + data_files.append([dirpath, [os.path.join(dirpath, f) for f in + filenames]]) + +setup( + name='kombu-sqlalchemy', + version=sqlakombu.__version__, + description=sqlakombu.__doc__, + author=sqlakombu.__author__, + author_email=sqlakombu.__contact__, + url=sqlakombu.__homepage__, + platforms=["any"], + license='BSD', + packages=packages, + data_files=data_files, + zip_safe=False, + test_suite="nose.collector", + install_requires=install_requires, + classifiers=[ + "Development Status :: 4 - Beta", + "Operating System :: OS Independent", + "Programming Language :: Python", + "Intended Audience :: Developers", + "Topic :: Software Development :: Libraries :: Python Modules", + ], + long_description=codecs.open('README', "r", "utf-8").read(), +) diff --git a/sqlakombu/__init__.py b/sqlakombu/__init__.py new file mode 100644 index 0000000..d78a054 --- /dev/null +++ b/sqlakombu/__init__.py @@ -0,0 +1,8 @@ +"""Kombu transport using SQLAlchemy as the message store.""" +VERSION = (0, 9, 0) +__version__ = ".".join(map(str, VERSION)) +__author__ = "Ask Solem" +__contact__ = "ask@celeryproject.org" +__homepage__ = "http://github.com/ask/kombu-sqlalchemy/" +__docformat__ = "restructuredtext" +__license__ = "BSD" diff --git a/sqlakombu/models.py b/sqlakombu/models.py new file mode 100644 index 0000000..bd1a719 --- /dev/null +++ b/sqlakombu/models.py @@ -0,0 +1,49 @@ +import datetime + +from sqlalchemy import Column, Integer, String, Text, DateTime, \ + Sequence, Boolean, ForeignKey, SmallInteger +from sqlalchemy.orm import relation +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.schema import MetaData + +metadata = MetaData() +ModelBase = declarative_base(metadata=metadata) + + +class Queue(ModelBase): + __tablename__ = 'kombu_queue' + __table_args__ = {"sqlite_autoincrement": True} + + id = Column(Integer, Sequence('queue_id_sequence'), primary_key=True, + autoincrement=True) + name = Column(String(200), unique=True) + messages = relation("Message", backref='queue', lazy='noload') + + def __init__(self, name): + self.name = name + + def __str__(self): + return "" % (self.name) + +class Message(ModelBase): + __tablename__ = 'kombu_message' + __table_args__ = {"sqlite_autoincrement": True} + + id = Column(Integer, Sequence('message_id_sequence'), primary_key=True, + autoincrement=True) + visible = Column(Boolean, default=True, index=True) + sent_at = Column('timestamp', DateTime, nullable=True, index=True, + onupdate = datetime.datetime.now) + payload = Column(Text, nullable=False) + queue_id = Column(SmallInteger, ForeignKey('ghettoq_queue.id', name='FK_qhettoq_message_queue')) + version = Column(SmallInteger, nullable=False, default=1) + + __mapper_args__ = {'version_id_col': version} + + def __init__(self, payload, queue): + self.payload = payload + self.queue = queue + + def __str__(self): + return "" % (self.visible, self.sent_at, self.payload, self.queue_id) + diff --git a/sqlakombu/transport.py b/sqlakombu/transport.py new file mode 100644 index 0000000..4455cd4 --- /dev/null +++ b/sqlakombu/transport.py @@ -0,0 +1,83 @@ +from Queue import Empty + +from anyjson import serialize, deserialize +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from kombu.transport import virtual + +from sqlakombu.models import Queue, Message + + +class Channel(virtual.Channel): + _session = None + _engines = {} # engine cache + + def _open(self): + conninfo = self.connection.client + if conninfo.host not in self._engines: + engine = sqlalchemy.create_engine(conninfo.host) + Session = sessionmaker(bind=engine) + metadata.create_all(engine) + self._engines[conninfo.host] = engine, Session + return self._engines[conninfo.host] + + @property + def session(self): + if self._session is None: + _, Session = self._open() + self._session = Session() + return self._session + + def _get_or_create_queue(self, queue): + obj = self.session.query(Queue).filter(Queue.name == queue) \ + .first() + if not obj: + obj = Queue(queue) + self.session.add(obj) + self.session.commit() + return obj + + def _new_queue(self, queue, **kwargs): + self._get_or_create(queue) + + def _put(self, queue, message, **kwargs): + obj = self._get_or_create(queue) + message = Message(serialize(payload), queue) + self.session.add(message) + self.session.commit() + + def _get(self, queue): + obj = self._get_or_create(queue) + msg = self.session.query(Message) \ + .filter(Message.queue_id == obj.id) \ + .filter(Message.visible != 0) \ + .order_by(Message.sent_at) \ + .order_by(Message.id) \ + .limit(1) \ + .first() + if msg: + msg.visible = False + self.session.commit() + return deserialize(msg.payload) + + def _query_all(self, queue): + obj = self._get_or_create(queue) + return self.session.query(Message) \ + .filter(queue_id == obj.id) + + def _purge(self, queue): + count = self._query_all(queue).delete(synchronize_session=False) + self.session.commit() + return count + + def _size(self, queue): + return self._query_all(queue).count() + + +class Transport(virtual.Transport): + Channel = Channel + + default_port = 0 + connection_errors = () + channel_errors = ()