Here is my custom Pluto TV script for TVHeadand.
It requires Streamlink and makes use of TVHeadends useful pipe options.
It queries the Pluto TV API directly (so you need to have access to Pluto TV for this to work).
Copy the attached script to a location of your choice (assessable by TVH).
Enable the socket EPG grabber (External: XMLTV) and make a note of its “path”.
Edit the “SOCKET” path inside the script to match the path of the grabber.
Create an automatic IPTV network.
Set the URL to the location of the script, e.g.: pipe:///config/scripts/plutotv.py
.
Set “Maximum timeout (seconds)” to a high number, I use “120”.
I always disable “Scan after creation” and set “Service ID” to “1” on IPTV networks.
Whenever the network is refreshed by TVHeadend (default 60 minutes), it will also push the EPG data to the socket grabber.
When the adverts start, the stream will freeze and because of the timeout we set on the network, the stream will stay alive.
Once the adverts end, the stream will continue:
spawn: Executing "/usr/bin/streamlink"
spawn: [cli][info] Found matching plugin pluto for URL https://pluto.tv/live-tv/64ff1d3d3a0d7ddsf8b110e9
spawn: [cli][info] Available streams: 240p (worst), 360p, 480p_alt, 480p, 720p (best)
spawn: [cli][info] Opening stream: 720p (hls-pluto)
This is where the stream will freeze:
spawn: [stream.hls][info] Filtering out segments and pausing stream output
And resume here:
spawn: [stream.hls][info] Resuming stream output
This script works without modification inside the Linuxserver.io TVH Docker image.
Script:
#!/usr/bin/python
import requests
from datetime import datetime, timedelta, timezone
import xml.etree.ElementTree as ET
import socket
import io
# CONFIG #############################################################################################
SOCKET = "/config/epggrab/xmltv.sock" # Path to your UNIX socket
######################################################################################################
now = datetime.now(timezone.utc)
start_time = now - timedelta(
minutes=now.minute % 30, seconds=now.second, microseconds=now.microsecond
)
stop_time = start_time + timedelta(hours=8)
epg_begin = start_time.strftime("%Y-%m-%dT%H:%M:%S-00:00")
epg_end = stop_time.strftime("%Y-%m-%dT%H:%M:%S-00:00")
data = requests.get(
f"https://api.pluto.tv/v2/channels?start={epg_begin}&stop={epg_end}"
).json()
m3u = "#EXTM3U\n"
root = ET.Element("tv")
programmes = []
for channel in data:
channel_element = ET.SubElement(root, "channel", id="pluto-" + channel["_id"])
ET.SubElement(channel_element, "display-name").text = channel["name"]
ET.SubElement(channel_element, "channel-number").text = str(channel["number"])
ET.SubElement(channel_element, "icon").text = channel.get("colorLogoPNG", {}).get(
"path", ""
)
m3u += f'#EXTINF:-1 tvg-ID="{"pluto-" + channel["_id"]}" tvg-chno="{channel["number"]}" tvg-name="{channel["name"]}" tvg-logo="{channel.get("colorLogoPNG", {}).get("path", "")}", {channel["name"]}\n'
m3u += (
f"pipe://streamlink 'https://pluto.tv/live-tv/{channel['_id']}' best --stdout\n"
)
for programme in channel["timelines"]:
programme_data = {
"start": programme["start"],
"stop": programme["stop"],
"channel": "pluto-" + channel["_id"],
"title": programme["title"],
"episode": programme.get("episode", {}),
}
programmes.append(programme_data)
for programme in programmes:
start_time = datetime.fromisoformat(programme["start"]).strftime(
"%Y%m%d%H%M%S +0000"
)
stop_time = datetime.fromisoformat(programme["stop"]).strftime("%Y%m%d%H%M%S +0000")
programme_element = ET.SubElement(
root,
"programme",
start=start_time,
stop=stop_time,
channel=programme["channel"],
)
ET.SubElement(programme_element, "title").text = programme.get("title", "Untitled")
ET.SubElement(programme_element, "sub-title").text = programme["episode"].get(
"name", ""
)
ET.SubElement(programme_element, "desc").text = programme["episode"].get(
"description", ""
)
ET.SubElement(programme_element, "episode-num").text = (
f'S{programme["episode"].get("season", 0)}E{programme["episode"].get("number", 0)}'
)
ET.SubElement(programme_element, "category").text = programme["episode"].get(
"genre", ""
)
ET.SubElement(programme_element, "category").text = programme["episode"].get(
"subGenre", ""
)
ET.SubElement(programme_element, "category").text = programme.get("category", "")
tree = ET.ElementTree(root)
ET.indent(tree, space="\t", level=0)
xml_bytes = io.BytesIO()
tree.write(xml_bytes, encoding="UTF-8", xml_declaration=True)
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.connect(SOCKET)
xml_bytes.seek(0)
s.sendall(xml_bytes.read())
except:
pass
# with open("pluto.xmltv", "wb") as file:
# tree.write(file, encoding="utf-8", xml_declaration=True)
print(m3u)