This document describes the current stable version of Kombu (5.0). For development docs, go here.

Source code for kombu.transport.sqlalchemy.models

"""Kombu transport using SQLAlchemy as the message store."""

import datetime

from sqlalchemy import (Column, Integer, String, Text, DateTime,
                        Sequence, Boolean, ForeignKey, SmallInteger, Index)
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.orm import relation
from sqlalchemy.schema import MetaData

class_registry = {}
metadata = MetaData()
ModelBase = declarative_base(metadata=metadata, class_registry=class_registry)


[docs]class Queue: """The queue class.""" __table_args__ = {'sqlite_autoincrement': True, 'mysql_engine': 'InnoDB'} id = Column(Integer, Sequence('queue_id_sequence'), primary_key=True, autoincrement=True) name = Column(String(200), unique=True) def __init__(self, name): self.name = name def __str__(self): return f'<Queue({self.name})>' @declared_attr def messages(cls): return relation('Message', backref='queue', lazy='noload')
[docs]class Message: """The message class.""" __table_args__ = ( Index('ix_kombu_message_timestamp_id', 'timestamp', 'id'), {'sqlite_autoincrement': True, 'mysql_engine': 'InnoDB'} ) id = Column(Integer, Sequence('message_id_sequence'), primary_key=True, autoincrement=True) visible = Column(Boolean, default=True, index=True) sent_at = Column('timestamp', DateTime, nullable=True, index=True, onupdate=datetime.datetime.now) payload = Column(Text, nullable=False) version = Column(SmallInteger, nullable=False, default=1) __mapper_args__ = {'version_id_col': version} def __init__(self, payload, queue): self.payload = payload self.queue = queue def __str__(self): return '<Message: {0.sent_at} {0.payload} {0.queue_id}>'.format(self) @declared_attr def queue_id(self): return Column( Integer, ForeignKey( '%s.id' % class_registry['Queue'].__tablename__, name='FK_kombu_message_queue' ) )