Skip to content

Add bot.send("text") #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
# micropython-utelegram

This library provides a **microPython** interface for for a subset of the **Telegram Bot API**. Have been tested on an **ESP32** but should work just fine on an **ESP8266**
This library provides a **microPython** interface for for a subset of the **Telegram Bot API**. Have been tested on an **ESP32**.

Note that this module can't work on ESP8266 because axTLS version used currently by uPy doesn't support ciphersuites of telegram bot api
<br />(see: https://docs.micropython.org/en/latest/esp8266/general.html?highlight=certificate#ssl-tls-limitations and https://forum.micropython.org/viewtopic.php?t=3246).

## Your first bot

On the demo folder you will find an example bot.
You will find an example bot.

First you'll need to create a new bot using the **BotFather** to get a token for your bot. Once you have it rename the **config.py-demo** and set the variables (WiFI SID/password and your bot token):
First you'll need to edit the config dictionary on the main.py file, or create a new file config.json with the needed data about your wifi connection and the token of your bot.

```python
wifi_config = {
'ssid':'DEMO',
'password':'PASSW0RD'
}

utelegram_config = {
config = {
'ssid': 'DEMO',
'password': 'PASSW0RD',
'token': 'TOKEN'
}
```

If you have your **ESP32** connected as **/dev/ttyUSB0** you can use the upload.sh script to upload the bot code to your **micropython enabled ESP32**:

```bash
./upload.sh
```config.json
{
"ssid": "DEMO",
"password": "PASSW0RD",
"token": "TOKEN"
}
```

### Example bot code

#### Initialize bot
Expand Down Expand Up @@ -105,13 +105,13 @@ bot.send(message['message']['chat']['id'], 'pong')
We can either let the bot loop but itself to reply to messages:

```python
bot.listen()
bot.listen_blocking()
```

Or we can loop manually using the **read_once()** function:
Or we can loop manually using the **update()** function:

```python
bot.read_once()
bot.update()
```

Using this method you should add a sleep between each time we poll the Telegram API
Expand Down
8 changes: 0 additions & 8 deletions demo/config.py-demo

This file was deleted.

34 changes: 0 additions & 34 deletions demo/main.py

This file was deleted.

13 changes: 0 additions & 13 deletions demo/upload.sh

This file was deleted.

1 change: 0 additions & 1 deletion demo/utelegram.py

This file was deleted.

47 changes: 47 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import utelegram
import network
import utime
import json

def demo():
config = {
'ssid': 'DEMO',
'password': 'PASSW0RD',
'token': 'TOKEN',
}
try:
with open('config.json', 'r') as f:
config = json.loads(f.read())
except:
pass
print('Config: {}'.format(config))
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.scan()
sta_if.connect(config['ssid'], config['password'])

print('WAITING FOR NETWORK')
while not sta_if.isconnected():
utime.sleep(1)

def get_message(message):
bot.send(message['message']['chat']['id'], message['message']['text'].upper())

def reply_ping(message):
print(message)
bot.send(message['message']['chat']['id'], 'pong')

if sta_if.isconnected():
bot = utelegram.ubot(config['token'])
bot.register('/ping', reply_ping)
bot.set_default_handler(get_message)

print('BOT LISTENING')
while True:
bot.update()
utime.sleep(3)
else:
print('NOT CONNECTED - aborting')

if __name__ == "__main__":
demo()
60 changes: 36 additions & 24 deletions utelegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,76 +3,87 @@
import ujson
import urequests


class ubot:

def __init__(self, token, offset=0):
class DEBUG_LEVELS:
DISABLED = 0
INFO = 1
VERBOSE = 2
DEBUG = 3
def __init__(self, token, offset=-1, debuglevel=DEBUG_LEVELS.DEBUG):
self.DEBUG_LEVEL = debuglevel
self.url = 'https://api.telegram.org/bot' + token
self.commands = {}
self.default_handler = None
self.message_offset = offset
self.message_offset = offset-1
self.sleep_btw_updates = 3

messages = self.read_messages()
if messages:
if self.message_offset==0:
self.message_offset = messages[-1]['update_id']
else:
for message in messages:
if message['update_id'] >= self.message_offset:
self.message_offset = message['update_id']
break

def myprint(self, level, txt):
if level <= self.DEBUG_LEVEL:
print(txt)

def send(self, chat_id, text):
data = {'chat_id': chat_id, 'text': text}
try:
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
response = urequests.post(self.url + '/sendMessage', json=data, headers=headers)
response.close()
self.myprint(self.DEBUG_LEVELS.INFO, 'Send: {}'.format(data))
return True
except:
self.myprint(self.DEBUG_LEVELS.INFO, 'Send failed! Data: {}'.format(data))
return False

def read_messages(self):
result = []
self.query_updates = {
'offset': self.message_offset,
query_updates = {
'offset': self.message_offset + 1,
'limit': 1,
'timeout': 30,
'timeout': 5,
'allowed_updates': ['message']}

header = {'Content-Type': 'application/json'}
try:
update_messages = urequests.post(self.url + '/getUpdates', json=self.query_updates).json()
self.myprint(self.DEBUG_LEVELS.DEBUG, 'Request new messages: {}'.format(query_updates))
update_messages = urequests.post(self.url + '/getUpdates', json=query_updates, headers=header).json()
if 'result' in update_messages:
for item in update_messages['result']:
self.myprint(self.DEBUG_LEVELS.INFO, 'Received message: {}'.format(item))
result.append(item)
return result
except (ValueError):
self.myprint(self.DEBUG_LEVELS.INFO, 'Received message ValueError.')
return None
except (OSError):
self.myprint(self.DEBUG_LEVELS.INFO, 'Received message OSError')
print("OSError: request timed out")
return None

def listen(self):
def listen_blocking(self):
while True:
self.myprint(self.DEBUG_LEVELS.VERBOSE, 'Check for updates.')
self.read_once()
time.sleep(self.sleep_btw_updates)
gc.collect()

def update(self):
self.myprint(self.DEBUG_LEVELS.VERBOSE, 'Check for updates.')
self.read_once()
gc.collect()

def read_once(self):
messages = self.read_messages()
if messages:
if self.message_offset==0:
if self.message_offset == 0:
self.message_offset = messages[-1]['update_id']
self.myprint(self.DEBUG_LEVELS.DEBUG, 'New message_offset: {}'.format(self.message_offset))
self.message_handler(messages[-1])
else:
for message in messages:
if message['update_id'] >= self.message_offset:
if message['update_id'] > self.message_offset:
self.message_offset = message['update_id']
self.myprint(self.DEBUG_LEVELS.DEBUG, 'New message_offset: {}'.format(self.message_offset))
self.message_handler(message)
break
#break

def register(self, command, handler):
self.commands[command] = handler

Expand All @@ -83,6 +94,7 @@ def set_sleep_btw_updates(self, sleep_time):
self.sleep_btw_updates = sleep_time

def message_handler(self, message):
self.myprint(self.DEBUG_LEVELS.DEBUG, 'Handle message: {}'.format(message))
if 'text' in message['message']:
parts = message['message']['text'].split(' ')
if parts[0] in self.commands:
Expand Down