Skip to content

Instantly share code, notes, and snippets.

@gurland
Last active December 7, 2023 11:08
Show Gist options
  • Save gurland/ef507be68a3eedfab8b5fecbcd71021f to your computer and use it in GitHub Desktop.
Save gurland/ef507be68a3eedfab8b5fecbcd71021f to your computer and use it in GitHub Desktop.
How to delete all your messages from chat in telegram? Easy, just use this program
# Copyright (c) 2017 Stanislav Bobokalo & Alexey Borontov
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from time import sleep
from os import getenv
from pyrogram import Client
from pyrogram.api.functions.messages import Search
from pyrogram.api.types import InputPeerSelf, InputMessagesFilterEmpty
from pyrogram.api.types.messages import ChannelMessages
from pyrogram.errors import FloodWait, UnknownError
API_ID = getenv('API_ID', None) or int(input('Enter your Telegram API id: '))
API_HASH = getenv('API_ID', None) or input('Enter your Telegram API hash: ')
app = Client("client", api_id=API_ID, api_hash=API_HASH)
app.start()
class Cleaner:
def __init__(self, peer=None, chat_id=None):
self.peer = peer
self.chat_id = chat_id
self.message_ids = []
self.add_offset = 0
def select_supergroup(self):
dialogs = app.get_dialogs()
groups = [x for x in dialogs.dialogs if x.chat.type == 'supergroup']
for i, group in enumerate(groups):
print(f'{i+1}. {group.chat.title}')
print('')
group_n = int(input('Insert group number: '))
selected_group = groups[group_n - 1]
selected_group_peer = app.resolve_peer(selected_group.chat.id)
self.peer = selected_group_peer
self.chat_id = selected_group.chat.id
print(f'Selected {selected_group.chat.title}\n')
return selected_group, selected_group_peer
def run(self):
q = self.search_messages()
self.update_ids(q)
messages_count = q.count
print(f'Found {messages_count} your messages in selected supergroup')
if messages_count < 100:
pass
else:
self.add_offset = 100
for i in range(0, messages_count, 100):
q = self.search_messages()
self.update_ids(q)
self.add_offset += 100
self.delete_messages()
@staticmethod
def chunks(l, n):
"""Yield successive n-sized chunks from l.
https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks#answer-312464"""
for i in range(0, len(l), n):
yield l[i:i + n]
def update_ids(self, query: ChannelMessages):
for msg in query.messages:
self.message_ids.append(msg.id)
return len(query.messages)
def delete_messages(self):
print(f'Deleting {len(self.message_ids)} messages with next message IDs:')
print(self.message_ids)
for message_ids_chunk in self.chunks(self.message_ids, 100):
try:
app.delete_messages(chat_id=self.chat_id, message_ids=message_ids_chunk)
except FloodWait as flood_exception:
sleep(flood_exception.x)
def search_messages(self):
print(f'Searching messages. OFFSET: {self.add_offset}')
return app.send(
Search(
peer=self.peer,
q='',
filter=InputMessagesFilterEmpty(),
min_date=0,
max_date=0,
offset_id=0,
add_offset=self.add_offset,
limit=100,
max_id=0,
min_id=0,
hash=0,
from_id=InputPeerSelf()
)
)
if __name__ == '__main__':
try:
deleter = Cleaner()
deleter.select_supergroup()
deleter.run()
except UnknownError as e:
print(f'UnknownError occured: {e}')
print('Probably API has changed, ask developers to update this utility')
finally:
app.stop()
@mydemondsandme
Copy link

TypeError: GetDialogs.get_dialogs() got an unexpected keyword argument 'pinned_only'

@rajmondx
Copy link

rajmondx commented Apr 30, 2023

Pyrogram libary changed and is now using async/await mandatory, changed the code to make it work with newest update (April,2023):

# Copyright (c) 2017 Stanislav Bobokalo & Alexey Borontov

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import asyncio

from time import sleep
from os import getenv

from pyrogram import Client
from pyrogram.enums import ChatType
from pyrogram.errors import UnknownError, FloodWait
from pyrogram.raw.functions.messages import Search
from pyrogram.raw.types import InputMessagesFilterEmpty, InputPeerSelf
from pyrogram.raw.types.messages import ChannelMessages

# from pyrogram.api.functions.messages import Search
# from pyrogram.api.types import InputPeerSelf, InputMessagesFilterEmpty
# from pyrogram.api.types.messages import ChannelMessages
# from pyrogram.errors import FloodWait, UnknownError


API_ID = getenv('API_ID', None) or int(input('Enter your Telegram API id: '))
API_HASH = getenv('API_ID', None) or input('Enter your Telegram API hash: ')

app = Client("client", api_id=API_ID, api_hash=API_HASH)
app.start()


class Cleaner:
    def __init__(self, peer=None, chat_id=None):
        self.peer = peer
        self.chat_id = chat_id
        self.message_ids = []
        self.add_offset = 0

    def select_supergroup(self):
        dialogs = app.get_dialogs()
        groups = [x for x in dialogs if x.chat.type == ChatType.SUPERGROUP]

        for i, group in enumerate(groups):
            print(f'{i + 1}. {group.chat.title}')

        print('')

        group_n = int(input('Insert group number: '))
        selected_group = groups[group_n - 1]

        selected_group_peer = app.resolve_peer(selected_group.chat.id)
        self.peer = selected_group_peer
        self.chat_id = selected_group.chat.id

        print(f'Selected {selected_group.chat.title}\n')

        return selected_group, selected_group_peer

    async def run(self):
        messages_count = await self.search_messages_and_update_ids()
        print(f'Found {messages_count} your messages in selected supergroup')

        if messages_count < 100:
            pass
        else:
            self.add_offset = 100

            for i in range(0, messages_count, 100):
                await self.search_messages_and_update_ids()
                self.add_offset += 100

        await self.delete_messages()

    @staticmethod
    def chunks(l, n):
        """Yield successive n-sized chunks from l.
        https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks#answer-312464"""
        for i in range(0, len(l), n):
            yield l[i:i + n]

    async def delete_messages(self):
        print(f'Deleting {len(self.message_ids)} messages with next message IDs:')
        print(self.message_ids)
        for message_ids_chunk in self.chunks(self.message_ids, 100):
            try:
                await app.delete_messages(chat_id=self.chat_id, message_ids=message_ids_chunk)
            except FloodWait as flood_exception:
                sleep(flood_exception.x)

    async def search_messages_and_update_ids(self):
        print(f'Searching messages. OFFSET: {self.add_offset}')

        i = 0;
        async for message in app.search_messages(self.chat_id, query="", limit=100, offset=self.add_offset):
            self.message_ids.append(message.id)
            i=i+1;

        return i


if __name__ == '__main__':
    try:
        deleter = Cleaner()
        deleter.select_supergroup()

        loop = asyncio.get_event_loop()
        loop.run_until_complete(deleter.run())
    except UnknownError as e:
        print(f'UnknownError occured: {e}')
        print('Probably API has changed, ask developers to update this utility')
    finally:
        app.stop()

@qupear
Copy link

qupear commented Jul 7, 2023

Pyrogram libary changed and is now using async/await mandatory, changed the code to make it work with newest update (April,2023):

After entering number of a group I get this response:

DeprecationWarning: There is no current event loop
loop = asyncio.get_event_loop()
Searching messages. OFFSET: 0
Found 100 your messages in selected supergroup
Searching messages. OFFSET: 100
Deleting 200 messages with next message IDs:

After listing 200 IDs app stops.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment