Streaming from ActionCable Channels
LiveCable components can subscribe to ActionCable channels using the stream_from method. This allows components to react to real-time broadcasts from anywhere in your application, making it easy to build collaborative features like chat rooms, live notifications, or shared dashboards.
Basic Usage
Call stream_from in a connection callback to subscribe to a channel:
module Live
module Chat
class ChatRoom < LiveCable::Component
reactive :messages, -> { [] }, shared: true
after_connect :subscribe_to_chat
private
def subscribe_to_chat
stream_from("chat_messages", coder: ActiveSupport::JSON) do |data|
messages << data
end
end
end
end
endCallback Usage
Use after_connect to set up streams. The connect callback only fires once per component lifecycle, so streams won't be recreated on reconnections.
Broadcasting to Streams
Any part of your application can broadcast to the stream using ActionCable's broadcast API:
module Live
module Chat
class ChatInput < LiveCable::Component
reactive :message, -> { "" }
actions :send_message
def send_message(params)
return if params[:message].blank?
message_data = {
id: SecureRandom.uuid,
text: params[:message],
timestamp: Time.now.to_i,
user: current_user.as_json(only: [:id, :first_name, :last_name])
}
# Broadcast to the chat stream
ActionCable.server.broadcast("chat_messages", message_data)
# Clear the input
self.message = ""
end
end
end
endHow It Works
When a broadcast is received:
- The stream callback is executed with the broadcast payload
- You can update reactive variables inside the callback
- LiveCable automatically detects the changes and broadcasts updates to all affected components
- All components sharing the same reactive variables are re-rendered
Complete Chat Example
ChatRoom Component
module Live
module Chat
class ChatRoom < LiveCable::Component
reactive :messages, -> { [] }, shared: true
reactive :typing_users, -> { [] }, shared: true
after_connect :subscribe_to_streams
private
def subscribe_to_streams
# Subscribe to chat messages
stream_from("chat_messages", coder: ActiveSupport::JSON) do |data|
messages << data
# Keep only last 100 messages
messages.shift if messages.size > 100
end
# Subscribe to typing indicators
stream_from("chat_typing", coder: ActiveSupport::JSON) do |data|
if data[:typing]
typing_users << data[:user] unless typing_users.include?(data[:user])
else
typing_users.delete(data[:user])
end
end
end
end
end
endView (app/views/live/chat/chat_room.html.erb):
<%= live_component(component) do %>
<div class="chat-room">
<div class="messages">
<% messages.each do |message| %>
<div class="message" live-key="<%= message[:id] %>">
<strong><%= message[:user][:first_name] %></strong>
<p><%= message[:text] %></p>
<small><%= Time.at(message[:timestamp]).strftime('%I:%M %p') %></small>
</div>
<% end %>
</div>
<% if typing_users.any? %>
<div class="typing-indicator">
<%= typing_users.map { |u| u[:first_name] }.join(', ') %>
<%= typing_users.size == 1 ? 'is' : 'are' %> typing...
</div>
<% end %>
</div>
<% end %>ChatInput Component
module Live
module Chat
class ChatInput < LiveCable::Component
reactive :message, -> { "" }
actions :send_message, :typing
def send_message(params)
return if message.blank?
ActionCable.server.broadcast("chat_messages", {
id: SecureRandom.uuid,
text: message,
timestamp: Time.now.to_i,
user: current_user.as_json(only: [:id, :first_name, :last_name])
})
# Clear typing indicator
ActionCable.server.broadcast("chat_typing", {
user: current_user.as_json(only: [:id, :first_name]),
typing: false
})
self.message = ""
end
def typing(params)
ActionCable.server.broadcast("chat_typing", {
user: current_user.as_json(only: [:id, :first_name]),
typing: params[:message].present?
})
end
end
end
endView (app/views/live/chat/chat_input.html.erb):
<%= live_component(component) do %>
<form data-action="submit->live#form:prevent" data-live-action-param="send_message">
<input type="text"
name="message"
value="<%= message %>"
placeholder="Type a message..."
data-action="input->live#reactive input->live#call"
data-live-action-param="typing">
<button type="submit">Send</button>
</form>
<% end %>Use Cases
Live Notifications
module Live
class NotificationCenter < LiveCable::Component
reactive :notifications, -> { [] }
after_connect :subscribe_to_notifications
private
def subscribe_to_notifications
stream_from("user_notifications_#{current_user.id}", coder: ActiveSupport::JSON) do |notification|
notifications.unshift(notification)
# Keep only last 10 notifications
notifications.pop if notifications.size > 10
end
end
end
endBroadcast notifications from anywhere:
# In a background job, controller, or model callback
ActionCable.server.broadcast(
"user_notifications_#{user.id}",
{
id: notification.id,
title: "New message",
body: "You have a new message from #{sender.name}",
timestamp: Time.now.to_i
}
)Collaborative Editing
module Live
class DocumentEditor < LiveCable::Component
reactive :document, -> { Document.find(params[:document_id]) }
reactive :active_users, -> { [] }
after_connect :subscribe_to_document
after_disconnect :leave_document
actions :update_content
def update_content(params)
document.update(content: params[:content])
ActionCable.server.broadcast("document_#{document.id}", {
type: 'content_updated',
content: params[:content],
user: current_user.as_json(only: [:id, :name])
})
end
private
def subscribe_to_document
stream_from("document_#{document.id}", coder: ActiveSupport::JSON) do |data|
case data[:type]
when 'content_updated'
# Update local state if change is from another user
if data[:user][:id] != current_user.id
document.reload
end
when 'user_joined'
active_users << data[:user] unless active_users.any? { |u| u[:id] == data[:user][:id] }
when 'user_left'
active_users.reject! { |u| u[:id] == data[:user][:id] }
end
end
# Announce presence
ActionCable.server.broadcast("document_#{document.id}", {
type: 'user_joined',
user: current_user.as_json(only: [:id, :name, :avatar_url])
})
end
def leave_document
ActionCable.server.broadcast("document_#{document.id}", {
type: 'user_left',
user: current_user.as_json(only: [:id])
})
end
end
endLive Dashboards
module Live
class Dashboard < LiveCable::Component
reactive :metrics, -> { {} }
reactive :alerts, -> { [] }
after_connect :subscribe_to_metrics
private
def subscribe_to_metrics
stream_from("dashboard_metrics", coder: ActiveSupport::JSON) do |data|
metrics.merge!(data[:metrics])
if data[:alert]
alerts.unshift(data[:alert])
alerts.pop if alerts.size > 5
end
end
end
end
endUpdate from background jobs:
class MetricsUpdateJob < ApplicationJob
def perform
metrics = {
active_users: User.active.count,
revenue: Order.today.sum(:total),
pending_orders: Order.pending.count
}
ActionCable.server.broadcast("dashboard_metrics", {
metrics: metrics,
timestamp: Time.now.to_i
})
end
endKey Features
- Automatic re-rendering: Changes to reactive variables inside stream callbacks trigger re-renders
- Shared state: Combine with
shared: truereactive variables to sync state across multiple component instances - Connection-scoped: Each user's component instances receive broadcasts independently
- Coder support: Use
coder: ActiveSupport::JSONto automatically decode JSON payloads - Multiple streams: Components can subscribe to multiple streams simultaneously
Best Practices
Do
✅ Use descriptive channel names that indicate the data type
✅ Include relevant identifiers in channel names (user_id, document_id, etc.)
✅ Use JSON coder for structured data
✅ Clean up or limit collection sizes to prevent memory bloat
✅ Use live-key attributes for list items to preserve identity
Don't
❌ Don't broadcast sensitive data without authorization checks
❌ Don't subscribe to broad channels that send unnecessary updates
❌ Don't perform expensive operations inside stream callbacks
❌ Don't forget to unsubscribe or clean up when needed
❌ Don't broadcast the same data to all components if only some need it