#ifndef OSRM_ENGINE_DATA_WATCHDOG_HPP #define OSRM_ENGINE_DATA_WATCHDOG_HPP #include "engine/datafacade/contiguous_internalmem_datafacade.hpp" #include "engine/datafacade/shared_memory_allocator.hpp" #include "engine/datafacade_factory.hpp" #include "storage/shared_datatype.hpp" #include "storage/shared_memory.hpp" #include "storage/shared_monitor.hpp" #include #include #include #include #include #include namespace osrm { namespace engine { namespace detail { // We need this wrapper type since template-template specilization of FacadeT is broken on clang // when it is combined with an templated alias (DataFacade in this case). // See https://godbolt.org/g/ZS6Xmt for an example. template class DataWatchdogImpl; template class DataWatchdogImpl> final { using mutex_type = typename storage::SharedMonitor::mutex_type; using Facade = datafacade::ContiguousInternalMemoryDataFacade; public: DataWatchdogImpl(const std::string &dataset_name) : dataset_name(dataset_name), active(true) { // create the initial facade before launching the watchdog thread { boost::interprocess::scoped_lock current_region_lock(barrier.get_mutex()); auto &shared_register = barrier.data(); auto static_region_id = shared_register.Find(dataset_name + "/static"); auto updatable_region_id = shared_register.Find(dataset_name + "/updatable"); if (static_region_id == storage::SharedRegionRegister::INVALID_REGION_ID || updatable_region_id == storage::SharedRegionRegister::INVALID_REGION_ID) { throw util::exception("Could not find shared memory region for \"" + dataset_name + "/data\". Did you run osrm-datastore?"); } static_shared_region = &shared_register.GetRegion(static_region_id); updatable_shared_region = &shared_register.GetRegion(updatable_region_id); static_region = *static_shared_region; updatable_region = *updatable_shared_region; facade_factory = DataFacadeFactory( std::make_shared( std::vector{ static_region.shm_key, updatable_region.shm_key})); } watcher = std::thread(&DataWatchdogImpl::Run, this); } ~DataWatchdogImpl() { active = false; barrier.notify_all(); watcher.join(); } std::shared_ptr Get(const api::BaseParameters ¶ms) const { return facade_factory.Get(params); } std::shared_ptr Get(const api::TileParameters ¶ms) const { return facade_factory.Get(params); } private: void Run() { while (active) { boost::interprocess::scoped_lock current_region_lock(barrier.get_mutex()); while (active && static_region.timestamp == static_shared_region->timestamp && updatable_region.timestamp == updatable_shared_region->timestamp) { barrier.wait(current_region_lock); } if (!active) break; if (static_region.timestamp != static_shared_region->timestamp) { static_region = *static_shared_region; } if (updatable_region.timestamp != updatable_shared_region->timestamp) { updatable_region = *updatable_shared_region; } util::Log() << "updated facade to regions " << (int)static_region.shm_key << " and " << (int)updatable_region.shm_key << " with timestamps " << static_region.timestamp << " and " << updatable_region.timestamp; facade_factory = DataFacadeFactory( std::make_shared( std::vector{ static_region.shm_key, updatable_region.shm_key})); } util::Log() << "DataWatchdog thread stopped"; } const std::string dataset_name; storage::SharedMonitor barrier; std::thread watcher; bool active; storage::SharedRegion static_region; storage::SharedRegion updatable_region; storage::SharedRegion *static_shared_region; storage::SharedRegion *updatable_shared_region; DataFacadeFactory facade_factory; }; } // This class monitors the shared memory region that contains the pointers to // the data and layout regions that should be used. This region is updated // once a new dataset arrives. template class FacadeT> using DataWatchdog = detail::DataWatchdogImpl>; } } #endif