API review

Proposer: Jeremy Leibs and Tim Field

Present at review:

  • Patrick
  • Josh
  • Bhaskara
  • Caroline
  • Lorenz
  • Tully

New API

C++

The new C++ API works on the premise of creating "views" of a bag using "queries". A Query is an abstract class which defines a function that filters whether or not the messages from a connection are to be included. This function has access to topic_name, datatype, md5sum, message definition as well as the connection header. Additionally, each Query can specify a start and end time for the range of times it includes.

Multiple queries can be added to a View, including queries from different bags. The View then provides an iterator interface across the bags, sorted based on time.

Python

The Python API is similar, except the the "query" is specified as optional arguments to the readMessages function, which returns the "view" as a generator.

C++

   1 namespace rosbag {
   2 
   3 namespace bagmode
   4 {
   5     //! The possible modes to open a bag in
   6     enum BagMode
   7     {
   8         Read    = 1 << 0,
   9         Write   = 1 << 1,
  10         Append  = 1 << 2
  11     };
  12 }
  13 
  14 namespace compression
  15 {
  16     enum CompressionType
  17     {
  18         None = 0,
  19         BZ2  = 1
  20     };
  21 }
  22 
  23 class Bag
  24 {
  25 public:
  26     Bag();
  27 
  28     //! Open a bag file
  29     Bag(std::string const& filename, uint32_t mode = bagmode::Read);
  30 
  31     ~Bag();
  32 
  33     //! Open a bag file
  34     bool open(std::string const& filename, uint32_t mode = bagmode::Read);
  35 
  36     //! Close the bag file (write to disk, append index, etc.)
  37     void close();
  38 
  39     //! Get the filename of the bag
  40     std::string getFileName() const;
  41 
  42     //! Get the mode the bag is in
  43     uint32_t getMode() const;
  44 
  45     //! Get the major-version of the open bagfile
  46     uint32_t getMajorVersion() const;
  47 
  48     //! Get the minor-version of the open bagfile
  49     uint32_t getMinorVersion() const;
  50 
  51     //! Get the current size of the bagfile (a lower bound on bag-file size)
  52     uint64_t getSize() const;
  53 
  54     //! Set the compression method to use for writing chunks
  55     void setCompression(CompressionType compression);
  56 
  57     //! Get the compression method to use for writing chunks
  58     CompressionType getCompression() const;
  59 
  60     //! Set the threshold for creating new chunks
  61     void setChunkThreshold(uint32_t chunk_threshold);
  62 
  63     //! Get the threshold for creating new chunks
  64     uint32_t getChunkThreshold() const;
  65 
  66     //! Write a message into the bag file
  67     /*!
  68      * \param topic The topic name
  69      * \param event The message event to be added
  70      *
  71      * Can throw BagNotOpenException or BagIOException
  72      */
  73     template<class T>
  74     void write(std::string const& topic, ros::MessageEvent<T> const& event);
  75 
  76     //! Write a message into the bag file
  77     /*!
  78      * \param topic The topic name
  79      * \param time  Timestamp of the message
  80      * \param msg   The message to be added
  81      *
  82      * Can throw BagNotOpenException or BagIOException
  83      */
  84     template<class T>
  85     void write(std::string const& topic, ros::Time const& time, T const& msg);
  86 
  87     //! Write a message into the bag file
  88     /*!
  89      * \param topic The topic name
  90      * \param time  Timestamp of the message
  91      * \param msg   The message to be added
  92      *
  93      * Can throw BagNotOpenException or BagIOException
  94      */
  95     template<class T>
  96     void write(std::string const& topic, ros::Time const& time, T& msg);
  97 
  98     //! Write a message into the bag file
  99     /*!
 100      * \param topic The topic name
 101      * \param time  Timestamp of the message
 102      * \param msg   A MessageInstance
 103      * \param connection_header  A connection header.
 104      *
 105      * Can throw BagNotOpenException or BagIOException
 106      */
 107     void write(std::string const& topic, ros::Time const& time, MessageInstance const& msg,
 108                            boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>());
 109 }
 110 
 111 //! A struct grouping together the information about a connection, used by Query
 112 struct ConnectionInfo
 113 {
 114     uint32_t    id;
 115     std::string topic;
 116     std::string datatype;
 117     std::string md5sum;
 118     std::string msg_def;
 119 
 120     boost::shared_ptr<ros::M_string> header;
 121 };
 122 
 123 class TopicQuery
 124 {
 125 public:
 126     TopicQuery(std::string const& topic);
 127     TopicQuery(std::vector<std::string> const& topics);
 128 
 129     bool operator()(ConnectionInfo const*) const;
 130 
 131     // ...
 132 };
 133 
 134 class TypeQuery
 135 {
 136 public:
 137     TypeQuery(std::string const& type);
 138     TypeQuery(std::vector<std::string> const& types);
 139 
 140     bool operator()(ConnectionInfo const*) const;
 141 
 142     // ...
 143 };
 144 
 145 class View
 146 {
 147 public:
 148     //! An iterator that points to a MessageInstance from a bag
 149     /*!
 150      * This iterator dereferences to a MessageInstance by VALUE, since
 151      * there does not actually exist a structure of MessageInstance
 152      * from which to return a reference.
 153      */
 154     class iterator
 155      : public boost::iterator_facade<iterator,
 156                                      MessageInstance,
 157                                      boost::forward_traversal_tag,
 158                                      MessageInstance>
 159     {
 160         // ...
 161     };
 162 
 163     //! Typedef to const_iterator
 164     typedef iterator const_iterator;
 165 
 166     //! Create a view and add a query
 167     /*!
 168      * param bag        The bag file on which to run this query
 169      * param start_time The beginning of the time_range for the query
 170      * param end_time   The end of the time_range for the query
 171      */
 172     View(Bag const& bag, ros::Time const& start_time = ros::TIME_MIN, ros::Time const& end_time = ros::TIME_MAX);
 173 
 174     //! Create a view and add a query
 175     /*!
 176      * param bag        The bag file on which to run this query
 177      * param query      The actual query to evaluate which connections to include
 178      * param start_time The beginning of the time_range for the query
 179      * param end_time   The end of the time_range for the query
 180      */
 181     View(Bag const& bag, boost::function<bool(ConnectionInfo const*)> query,
 182          ros::Time const& start_time = ros::TIME_MIN, ros::Time const& end_time = ros::TIME_MAX);
 183 
 184     ~View();
 185 
 186     iterator begin();
 187     iterator end();
 188     uint32_t size() const;
 189 
 190     //! Add a query to a view
 191     /*!
 192      * param bag        The bag file on which to run this query
 193      * param start_time The beginning of the time_range for the query
 194      * param end_time   The end of the time_range for the query
 195      */
 196     void addQuery(Bag const& bag, ros::Time const& start_time = ros::TIME_MIN, ros::Time const& end_time = ros::TIME_MAX);
 197 
 198     //! Add a query to a view
 199     /*!
 200      * param bag        The bag file on which to run this query
 201      * param query      The actual query to evaluate which topics to include
 202      * param start_time The beginning of the time_range for the query
 203      * param end_time   The end of the time_range for the query
 204      */
 205     void addQuery(Bag const& bag, boost::function<bool(ConnectionInfo const*)> query,
 206                   ros::Time const& start_time = ros::TIME_MIN, ros::Time const& end_time = ros::TIME_MAX);
 207 }
 208 
 209 //! A class pointing into a bag file
 210 /*!
 211  *  The MessageInstance class itself is fairly light weight.  It
 212  *  simply contains a pointer to a bag-file and the index_entry
 213  *  necessary to get access to the corresponding data.
 214  *
 215  *  It adheres to the necessary ros::message_traits to be directly
 216  *  serializable.
 217  *
 218  *  Note, it has a private constructor and can only be returned via an
 219  *  iterator from a rosbag::View.
 220  */
 221 class MessageInstance
 222 {
 223 public:
 224     ros::Time   const& getTime()              const;
 225     std::string const& getTopic()             const;
 226     std::string const& getDataType()          const;
 227     std::string const& getMD5Sum()            const;
 228     std::string const& getMessageDefinition() const;
 229 
 230     boost::shared_ptr<ros::M_string> getConnectionHeader() const;
 231 
 232     std::string getCallerId() const;
 233     bool        isLatching()  const;
 234 
 235     //! Templated call to instantiate a message
 236     /*!
 237      * returns NULL pointer if incompatible
 238      */
 239     template<class T>
 240     boost::shared_ptr<T const> instantiate() const;
 241   
 242     //! Write serialized message contents out to a stream
 243     template<typename Stream>
 244     void write(Stream& stream) const;
 245 
 246     //! Size of serialized message
 247     uint32_t size() const;
 248 };
 249 
 250 //! Helper function to create AdvertiseOptions from a MessageInstance
 251 /*!
 252  *  param msg         The Message instance for which to generate adveritse options
 253  *  param queue_size  The size of the outgoing queue
 254  */
 255 ros::AdvertiseOptions createAdvertiseOptions(MessageInstance const& msg, uint32_t queue_size);
 256 
 257 } // namespace rosbag
 258 

Example usage for write:

   1     rosbag::Bag bag;
   2     bag.open("test.bag", rosbag::bagmode::Write);
   3 
   4     std_msgs::String str;
   5     str.data = std::string("foo");
   6 
   7     std_msgs::Int32 i;
   8     i.data = 42;
   9 
  10     bag.write("chatter", ros::Time::now(), str);
  11     bag.write("numbers", ros::Time::now(), i);
  12 
  13     bag.close();

Example usage for read:

   1     rosbag::Bag bag;
   2     bag.open("test.bag", rosbag::bagmode::Read);
   3 
   4     std::vector<std::string> topics;
   5     topics.push_back(std::string("chatter"));
   6     topics.push_back(std::string("numbers"));
   7 
   8     rosbag::View view(bag, rosbag::TopicQuery(topics));
   9 
  10     foreach(rosbag::MessageInstance const m, view)
  11     {
  12         std_msgs::String::ConstPtr s = m.instantiate<std_msgs::String>();
  13         if (s != NULL)
  14             ASSERT_EQ(s->data, std::string("foo"));
  15 
  16         std_msgs::Int32::ConstPtr i = m.instantiate<std_msgs::Int32>();
  17         if (i != NULL)
  18             ASSERT_EQ(i->data, 42);
  19     }
  20 
  21     bag.close();

Python

   1 class ROSBagException(Exception):
   2     """
   3     Base class for exceptions in rosbag.
   4     """
   5     def __init__(self, value):
   6         self.value = value
   7 
   8 class ROSBagFormatException(ROSBagException):
   9     """
  10     Exceptions for errors relating to the bag file format.
  11     """
  12     def __init__(self, value):
  13         ROSBagException.__init__(self, value)
  14 
  15 class Compression:
  16     """
  17     Allowable compression types
  18     """
  19     NONE = 'none'
  20     BZ2  = 'bz2'
  21 
  22 class Bag(object):
  23     """
  24     Bag objects serialize messages to and from disk using the bag format.
  25     """
  26     def __init__(self, f, mode='r', compression=Compression.NONE, chunk_threshold=768 * 1024, options=None):
  27         """
  28         Open a bag file.  The mode can be 'r', 'w', or 'a' for reading (default),
  29         writing or appending.  The file will be created if it doesn't exist
  30         when opened for writing or appending; it will be truncated when opened
  31         for writing.  Simultaneous reading and writing is allowed when in writing
  32         or appending mode.
  33         @param f: filename of bag to open or a stream to read from
  34         @type  f: str or file
  35         @param mode: mode, either 'r', 'w', or 'a'
  36         @type  mode: str
  37         @param compression: compression mode, see Compression
  38         @type  compression: str
  39         @param chunk_threshold: minimum number of uncompressed bytes per chunk
  40         @type  chunk_threshold: int
  41         @param options: the bag options specified via a dictionary (currently, compression and chunk_threshold)
  42         @type  options: dict
  43         @raise ValueError: if any argument is invalid
  44         @raise ROSBagException: if an error occurs opening file
  45         @raise ROSBagFormatException: if bag format is corrupted
  46         """
  47     
  48     def read_messages(self, topics=None, start_time=None, end_time=None, topic_filter=None, raw=False):
  49         """
  50         Read the messages from the bag file.
  51         @param topic: list of topics or a single topic [optional]
  52         @type  topic: list(str) or str
  53         @param start_time: earliest timestamp of message to return [optional]
  54         @type  start_time: U{roslib.rostime.Time}
  55         @param end_time: latest timestamp of message to return [optional]
  56         @type  end_time: U{roslib.rostime.Time}
  57         @param topic_filter: function to filter topics to include [optional]
  58         @type  topic_filter: function taking (topic, datatype, md5sum, msg_def) and returning bool
  59         @param raw: if True, then generate tuples of (datatype, data, md5sum, position, pytype)
  60         @type  raw: bool
  61         @return: generator of (topic, message, timestamp) tuples for each message in the bag file
  62         @rtype: generator of tuples of (topic, message, timestamp)
  63         """
  64 
  65     def write(self, topic, msg, t=None, raw=False):
  66         """
  67         Write a message to the bag.  Messages must be written in chronological order for a given topic.
  68         @param topic: name of topic
  69         @type  topic: str
  70         @param msg: message to add to bag, or tuple (if raw)
  71         @type  msg: Message or tuple of raw message data
  72         @param t: ROS time of message publication
  73         @type  t: U{roslib.rostime.Time}
  74         @param raw: if True, msg is in raw format, i.e. (msg_type, serialized_bytes, md5sum, pytype)
  75         @type  raw: bool
  76         @raise ValueError: if arguments are invalid or bag is closed
  77         @raise ROSBagException: if message isn't in chronological order
  78         """
  79 
  80     def get_index(self):
  81         """
  82         Get the index.
  83         @return: the index
  84         @rtype: dict of topic -> (U{roslib.rostime.Time}, position), where position depends on the bag format.
  85         """
  86 
  87     def flush(self):
  88         """
  89         Write the open chunk to disk so subsequent reads will read all messages.
  90         @raise ValueError: if bag is closed 
  91         """
  92 
  93     def close(self):
  94         """
  95         Close the bag file.  Closing an already closed bag does nothing.
  96         """
  97 
  98     @property
  99     def options(self):
 100         """Get the options."""
 101     
 102     @property
 103     def filename(self):
 104         """Get the filename."""
 105     
 106     @property
 107     def version(self):
 108         """Get the version."""
 109     
 110     @property
 111     def mode(self):
 112         """Get the mode."""
 113 
 114     @property
 115     def size(self):
 116         """Get the size in bytes."""
 117 
 118     def _get_compression(self):
 119         """Get the compression."""
 120     
 121     def _set_compression(self, compression):
 122         """Set the compression."""
 123 
 124     compression = property(_get_compression, _set_compression)
 125     
 126     def _get_chunk_threshold(self):
 127         """Get the chunk threshold."""
 128     
 129     def _set_chunk_threshold(self, chunk_threshold):
 130         """Set the chunk threshold."""
 131 
 132     chunk_threshold = property(_get_chunk_threshold, _set_chunk_threshold)

Example usage for write:

   1 import rosbag
   2 from std_msgs.msg import Int32, String
   3 
   4 bag = rosbag.Bag('test.bag', 'w')
   5 
   6 str = String()
   7 str.data = 'foo'
   8 
   9 i = Int32()
  10 i.data = 42
  11 
  12 bag.write('chatter', str);
  13 bag.write('numbers', i);
  14 
  15 bag.close();

Example usage for read:

   1 import rosbag
   2 bag = rosbag.Bag('test.bag')
   3 for (topic, msg, t) in bag.read_messages(topics=['chatter', 'numbers']):
   4     print msg
   5 bag.close();

Question / concerns / comments

Enter your thoughts on the API and any questions / concerns you have here. Please sign your name. Anything you want to address in the API review should be marked down here before the start of the meeting.

Tully

  • is multiple compression types supported within one bag? What happens if I write some messages with one compression type and then call setCompression with another? What does getCompression return in that case?
    • Yes, compression types can vary by chunk (see Bags/Format/2.0). The set/getCompression methods refer to the compression method that will be used to write the next message to the bag.

    • If setCompression is called with a different compression type while a chunk is open, the chunk is closed.

    • getCompression returns the current compression method being used to write.

  • does addQuery do an AND operator, OR operator or append to a list?
    • addQuery essentially does an OR operator. The view provides access to the (sorted) aggregate of all messages returned from each bag with each query.
  • What are the rules about modifying a view? What happens to existing iterators when a view is modified?
    • Since Views and Bags are only additive (there is no removeQuery), iterators stay valid. Iteration semantics are the same as an STL Set. Your iterator always points to the same element, however, the value of *(iterator + 1) may change either through adding new Queries to the View or new elements to the bag.

Patrick

  • The Bag::write() methods should be compatible with the roscpp 1.1 serialization API, where it's valid to have messages that don't inherit from ros::Message. They should be templated on the message type, see ros::Publisher::publish().

    • Of course - this was an oversight. Our implementation of write() directly calls a function templated on message type, so this is an easy fix.

  • I'd leave out bagmode::Default and default mode to bagmode::Read, it's clearer to me.

    • Sounds good. Updated.
  • What does getVersion() return as compared to getMajorVersion() and getMinorVersion()?

    • e.g. a bag with version 1.2 would have version=102, majorVersion=1, minorVersion=2. Here are the implementations:

   1 int Bag::getVersion()      const { return version_;       }
   2 int Bag::getMajorVersion() const { return version_ / 100; }
   3 int Bag::getMinorVersion() const { return version_ % 100; }
    • As you can see, getVersion() is redundant. I'll remove it.

    • Btw, I suppose minor version is there to signify forward compatibility of bag file formats, but that convention doesn't seem to have been followed (1.0 to 1.1 to 1.2 were all forwards incompatible). I bumped the new format version to 2.0 to indicate forwards incompatibility.
  • What is CompressionType? An enum? What are the possible values?

    • An enum. None and BZ2. (see above)
  • I'd like a Bag constructor that takes the filename and (optional) mode.

    • Sure. RAII FTW. I added an optional compression argument as well.

More:

  • I'm not sure the Query class really needs to exist. You could change View::addQuery() to

   1 void addQuery(Bag& bag, const boost::function<bool(ConnectionInfo const*)>& query,
   2               ros::Time const& start_time = ros::TIME_MIN,
   3               ros::Time const& end_time   = ros::TIME_MAX);
   4 
   5 // And TopicQuery becomes
   6 class TopicQuery
   7 {
   8     TopicQuery(std::vector<std::string> const& topics);
   9 
  10     bool operator() (ConnectionInfo const*) const;

which is more flexible in the (unlikely?) case of implementing your own query filters.

  • In addQuery(), can bag be const? Does the view actually modify the bag?

  • TopicQuery and TypeQuery should have constructors taking just a single topic/type for convenience.

Josh

  • It seems a bit odd to have to attempt to instantiate an object to check its type... what about a MsgInstance::isType<>() call?

  • Python: why are _get_compression/_set_compression/_get_chunk.../_set_chunk... prefixed by an underscore?

Meeting agenda

To be filled out by proposer based on comments gathered during API review period

Conclusion

Package status change mark change manifest)

  • /!\ Add API for write using MessageEvent directly (DONE)

  • /!\ All ints need to become sized (DONE)

  • /!\ No compression in Constructor (DONE)

  • /!\ Remove Query class and replace with functor (DONE)

  • /!\ View::addQuery should take a const Bag (DONE)

  • /!\ Add isType to MessageInstance (DONE)

  • /!\ Go through Python coding standards (DONE)

  • /!\ In python readMessage make topics take a string vs. list (DONE)

  • /!\ Look at adding python and C++ calls to repack (we decided against; simple to implement in user code)


Wiki: rosbag/Reviews/2010-04-22_API_Review (last edited 2010-05-03 04:24:52 by TimField)