51 class subscription_node
64 void set_next(subscription_node* sub)
70 subscription_node* get_next()
const
78 set_next(ETL_NULLPTR);
82 void append(subscription_node* sub)
84 if (sub != ETL_NULLPTR)
86 sub->set_next(get_next());
91 subscription_node* p_next;
99 class subscription :
public subscription_node
103 friend class message_broker;
114 virtual message_id_span_t message_id_list()
const = 0;
123 subscription* next_subscription()
const
125 return static_cast<subscription*
>(get_next());
131 using etl::imessage_router::receive;
137 : imessage_router(
etl::imessage_router::MESSAGE_BROKER)
146 : imessage_router(
etl::imessage_router::MESSAGE_BROKER, successor_)
155 : imessage_router(id_)
165 : imessage_router(id_, successor_)
176 initialise_insertion_point(new_sub.get_router(), &new_sub);
182 initialise_insertion_point(&router, ETL_NULLPTR);
188 receive(etl::imessage_router::ALL_MESSAGE_ROUTERS, msg);
191 virtual void receive(etl::shared_message shared_msg) ETL_OVERRIDE
193 receive(etl::imessage_router::ALL_MESSAGE_ROUTERS, shared_msg);
197 virtual void receive(etl::message_router_id_t destination_router_id,
198 const etl::imessage& msg) ETL_OVERRIDE
207 while (sub != ETL_NULLPTR)
209 message_id_span_t message_ids = sub->message_id_list();
211 message_id_span_t::iterator itr = etl::find(message_ids.begin(), message_ids.end(),
id);
213 if (itr != message_ids.end())
215 etl::imessage_router* router = sub->get_router();
217 if (destination_router_id == etl::imessage_router::ALL_MESSAGE_ROUTERS ||
218 destination_router_id == router->get_message_router_id())
220 router->receive(msg);
224 sub = sub->next_subscription();
236 virtual void receive(etl::message_router_id_t destination_router_id,
237 etl::shared_message shared_msg) ETL_OVERRIDE
246 while (sub != ETL_NULLPTR)
248 message_id_span_t message_ids = sub->message_id_list();
250 message_id_span_t::iterator itr = etl::find(message_ids.begin(), message_ids.end(),
id);
252 if (itr != message_ids.end())
254 etl::imessage_router* router = sub->get_router();
256 if (destination_router_id == etl::imessage_router::ALL_MESSAGE_ROUTERS ||
257 destination_router_id == router->get_message_router_id())
259 router->receive(shared_msg);
263 sub = sub->next_subscription();
274 using imessage_router::accepts;
287 while (sub != ETL_NULLPTR)
289 message_id_span_t message_ids = sub->message_id_list();
291 message_id_span_t::iterator itr = etl::find(message_ids.
begin(), message_ids.
end(),
id);
293 if (itr != message_ids.
end())
297 if (router->accepts(
id))
303 sub = sub->next_subscription();
328 ETL_DEPRECATED
virtual bool is_null_router() const ETL_OVERRIDE
334 virtual bool is_producer() const ETL_OVERRIDE
340 virtual bool is_consumer() const ETL_OVERRIDE
348 return head.get_next() == ETL_NULLPTR;
354 void initialise_insertion_point(
const etl::imessage_router* p_router, etl::message_broker::subscription* p_new_sub)
356 const etl::imessage_router* p_target_router = p_router;
358 subscription_node* p_sub = head.get_next();
359 subscription_node* p_sub_previous = &head;
361 while (p_sub != ETL_NULLPTR)
364 if (
static_cast<subscription*
>(p_sub)->get_router() == p_target_router)
367 p_sub_previous->set_next(p_sub->get_next());
375 p_sub = p_sub->get_next();
376 p_sub_previous = p_sub_previous->get_next();
379 if (p_new_sub != ETL_NULLPTR)
382 p_sub_previous->append(p_new_sub);
386 subscription_node head;