Compare commits
35 Commits
experiment
...
pcb-rev1.2
| Author | SHA1 | Date | |
|---|---|---|---|
| 419d85209e | |||
| 0f4f72253c | |||
| 08fdb75297 | |||
| d28f0b1c0c | |||
| 9147bab5bb | |||
| 0820ec1fc8 | |||
| 7d3cdbabe4 | |||
| b5e3df1054 | |||
| 55718aa1ff | |||
| 981c75020f | |||
| b56e4f36b4 | |||
| 9ba6e698b9 | |||
| db136ed79a | |||
| 6a9ff9eb0a | |||
| 7327549eea | |||
| 7e532ec641 | |||
| 5013e2359d | |||
| 4512b91763 | |||
| 5891006bcd | |||
| a81952fb8a | |||
| 92f9ce3d5a | |||
| 23d5b050dc | |||
| 2a4033d3ca | |||
| 2809d3f6e7 | |||
| 8e4f2fde21 | |||
| 9357b4d243 | |||
| 231172f794 | |||
| 5625f43f81 | |||
| 902ce980af | |||
| 834e07966a | |||
| e8beb4c8f7 | |||
| fef9e690cd | |||
| 4507275a02 | |||
| 16d5180d34 | |||
| 69e119a8a0 |
43
DEVELOP.md
43
DEVELOP.md
@@ -9,3 +9,46 @@ python -m venv test-venv
|
||||
pip install -r tests/requirements.txt
|
||||
pip install -U micropython-rp2-pico_w-stubs --target typings
|
||||
```
|
||||
|
||||
|
||||
## 'database' schema for btree db
|
||||
|
||||
### Playlist storage
|
||||
|
||||
The playlist data is stored in the btree database in a hierarchical schema. The hierarchy levels are
|
||||
separated by the '/' character. Currently, the schema is as follows: The top level for a playlist
|
||||
is the 'tag' id encoded as a hexadecimal string. Beneath this, the 'playlist' key contains the
|
||||
elements in the playlist. The keys used for the playlist entries must be decimal integers,
|
||||
left-padded with zeros so their length is 5 (e.g. format `{:05}`).
|
||||
|
||||
#### Playlist modes
|
||||
|
||||
The 'playlistshuffle' key located under the 'tag' key can be 'no' or 'yes' and specifies whether the
|
||||
playlist is in shuffle mode. Should this key be absent the default value is 'no'.
|
||||
|
||||
The 'playlistpersist' key located under the 'tag' key can be 'no', 'track' or 'offset'. Should this
|
||||
key be absent the default value is 'track'.
|
||||
|
||||
* When it is 'no', the playlist position is not saved when playback stops. If shuffle mode is
|
||||
active, the shuffle random seed is also not saved.
|
||||
* When it is 'track', the currently playing track is saved when playback stops. If shuffle mode is
|
||||
active, the shuffle random seed is also saved. Should playback reach the last track (in shuffle
|
||||
mode: the last track in the permutated order), the saved position is reset and playback is
|
||||
stopped. The next time the playlist is started it will start from the first track and with a new
|
||||
shuffle seed if applicable.
|
||||
* When it is 'offset', the operation is basically the same as in 'track' mode. The difference is
|
||||
that the offset in the currently playing track is also saved and playback will resume at that
|
||||
position.
|
||||
|
||||
The 'playlistpos' key located under the 'tag' key stores the key of the current playlist
|
||||
entry. The 'playlistshuffleseed' key stores the random seed used to shuffle the playlist.
|
||||
The 'playlistposoffset' key stores the offset in the current playlist entry.
|
||||
|
||||
#### Example
|
||||
|
||||
For example, a playlist with two entries 'a.mp3' and 'b.mp3' for a tag with the id '00aa11bb22'
|
||||
would be stored in the following key/value pairs in the btree db:
|
||||
|
||||
* 00aa11bb22/playlist/00000: a.mp3
|
||||
* 00aa11bb22/playlist/00001: b.mp3
|
||||
* 00aa11bb22/playlistpos: 00000
|
||||
|
||||
@@ -5,18 +5,18 @@
|
||||
(condition "A.Type == 'pad' && (B.Type == 'text' || B.Type == 'graphic')"))
|
||||
|
||||
(rule "drill hole size (mechanical)"
|
||||
(constraint hole_size (min 0.3mm) (max 6.3mm)))
|
||||
(constraint hole_size (min 0.2mm) (max 6.3mm)))
|
||||
|
||||
(rule "Minimum Via Hole Size"
|
||||
(constraint hole_size (min 0.3mm))
|
||||
(constraint hole_size (min 0.2mm))
|
||||
(condition "A.Type == 'via'"))
|
||||
|
||||
(rule "Minimum Via Diameter"
|
||||
(constraint via_diameter (min 20mil))
|
||||
(constraint via_diameter (min 0.35mm))
|
||||
(condition "A.Type == 'via'"))
|
||||
|
||||
(rule "PTH Hole Size"
|
||||
(constraint hole_size (min 12mil) (max 6.35mm))
|
||||
(constraint hole_size (min 0.2mm) (max 6.35mm))
|
||||
(condition "A.isPlated()"))
|
||||
|
||||
(rule "Minimum Non-plated Hole Size"
|
||||
@@ -24,5 +24,5 @@
|
||||
(condition "A.Type == 'pad' && !A.isPlated()"))
|
||||
|
||||
(rule "Pad to Track clearance"
|
||||
(constraint clearance (min 0.2mm))
|
||||
(condition "A.isPlated() && A.Type != 'Via' && B.Type == 'track'"))
|
||||
(constraint clearance (min 0.1mm))
|
||||
(condition "A.isPlated() && A.Type != 'Via' && B.Type == 'track'"))
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"board": {
|
||||
"active_layer": 5,
|
||||
"active_layer": 0,
|
||||
"active_layer_preset": "",
|
||||
"auto_track_width": true,
|
||||
"hidden_netclasses": [],
|
||||
@@ -18,17 +18,17 @@
|
||||
"zones": 0.6
|
||||
},
|
||||
"selection_filter": {
|
||||
"dimensions": true,
|
||||
"footprints": true,
|
||||
"dimensions": false,
|
||||
"footprints": false,
|
||||
"graphics": true,
|
||||
"keepouts": true,
|
||||
"keepouts": false,
|
||||
"lockedItems": false,
|
||||
"otherItems": true,
|
||||
"pads": true,
|
||||
"otherItems": false,
|
||||
"pads": false,
|
||||
"text": true,
|
||||
"tracks": true,
|
||||
"vias": true,
|
||||
"zones": true
|
||||
"zones": false
|
||||
},
|
||||
"visible_items": [
|
||||
"vias",
|
||||
@@ -51,7 +51,7 @@
|
||||
"conflict_shadows",
|
||||
"shapes"
|
||||
],
|
||||
"visible_layers": "00000000_00000000_0fffffff_ffffffff",
|
||||
"visible_layers": "00000000_00000000_0ffffff7_ffffffff",
|
||||
"zone_display_mode": 0
|
||||
},
|
||||
"git": {
|
||||
|
||||
@@ -37,9 +37,9 @@
|
||||
"other_text_thickness": 0.15,
|
||||
"other_text_upright": false,
|
||||
"pads": {
|
||||
"drill": 3.2,
|
||||
"height": 3.2,
|
||||
"width": 3.2
|
||||
"drill": 0.8,
|
||||
"height": 1.6,
|
||||
"width": 1.6
|
||||
},
|
||||
"silk_line_width": 0.1,
|
||||
"silk_text_italic": false,
|
||||
@@ -48,7 +48,7 @@
|
||||
"silk_text_thickness": 0.1,
|
||||
"silk_text_upright": false,
|
||||
"zones": {
|
||||
"min_clearance": 0.0
|
||||
"min_clearance": 0.4064
|
||||
}
|
||||
},
|
||||
"diff_pair_dimensions": [
|
||||
@@ -60,31 +60,11 @@
|
||||
],
|
||||
"drc_exclusions": [
|
||||
[
|
||||
"items_not_allowed|117348000|131000000|4c513f4f-0c90-4a64-b2d1-bf0189c761db|00000000-0000-0000-0000-000000000000",
|
||||
"lib_footprint_mismatch|148587019|59126370|4f17c230-1bc1-45e3-8816-9cf9d13e3a5c|00000000-0000-0000-0000-000000000000",
|
||||
""
|
||||
],
|
||||
[
|
||||
"items_not_allowed|117348000|131000000|fea7aba8-73c4-4553-8cc8-83472c47a83b|00000000-0000-0000-0000-000000000000",
|
||||
""
|
||||
],
|
||||
[
|
||||
"items_not_allowed|122420000|136975000|8ffc6a0d-806e-4b99-922f-2baead2bd98b|00000000-0000-0000-0000-000000000000",
|
||||
""
|
||||
],
|
||||
[
|
||||
"items_not_allowed|148270000|136975000|804b9d74-91fa-4f75-bc54-8f55b801562f|00000000-0000-0000-0000-000000000000",
|
||||
""
|
||||
],
|
||||
[
|
||||
"items_not_allowed|148720706|136551802|f2c1e20e-a272-42d8-a551-3e9074b96921|00000000-0000-0000-0000-000000000000",
|
||||
""
|
||||
],
|
||||
[
|
||||
"nonmirrored_text_on_back_layer|210811000|131073000|e87ca627-5327-4f7e-ac1a-c1eadd662c0a|00000000-0000-0000-0000-000000000000",
|
||||
""
|
||||
],
|
||||
[
|
||||
"silk_overlap|115550000|131000000|cd433b4c-40ac-48cc-b03e-5acf3357d88b|fea7aba8-73c4-4553-8cc8-83472c47a83b",
|
||||
"nonmirrored_text_on_back_layer|213233000|81790000|e87ca627-5327-4f7e-ac1a-c1eadd662c0a|00000000-0000-0000-0000-000000000000",
|
||||
""
|
||||
]
|
||||
],
|
||||
@@ -153,22 +133,22 @@
|
||||
},
|
||||
"rules": {
|
||||
"max_error": 0.005,
|
||||
"min_clearance": 0.2032,
|
||||
"min_clearance": 0.15,
|
||||
"min_connection": 0.0,
|
||||
"min_copper_edge_clearance": 0.2,
|
||||
"min_groove_width": 0.0,
|
||||
"min_hole_clearance": 0.35,
|
||||
"min_hole_to_hole": 0.45,
|
||||
"min_hole_to_hole": 0.2,
|
||||
"min_microvia_diameter": 0.2,
|
||||
"min_microvia_drill": 0.1,
|
||||
"min_resolved_spokes": 2,
|
||||
"min_silk_clearance": 0.0,
|
||||
"min_text_height": 1.0,
|
||||
"min_text_thickness": 0.15,
|
||||
"min_through_hole_diameter": 0.3,
|
||||
"min_track_width": 0.2032,
|
||||
"min_through_hole_diameter": 0.2,
|
||||
"min_track_width": 0.15,
|
||||
"min_via_annular_width": 0.15,
|
||||
"min_via_diameter": 0.5,
|
||||
"min_via_diameter": 0.35,
|
||||
"solder_mask_to_copper_clearance": 0.005,
|
||||
"use_height_for_length_calcs": true
|
||||
},
|
||||
@@ -280,7 +260,12 @@
|
||||
"equivalence_files": []
|
||||
},
|
||||
"erc": {
|
||||
"erc_exclusions": [],
|
||||
"erc_exclusions": [
|
||||
[
|
||||
"pin_to_pin|1549400|977900|f11a46f2-13fb-4d63-9b53-b032e49a375d|4b6603d6-e95b-4131-942b-96ddc8a9a606|/7226ca0f-138a-46b4-89f3-dc32dfb1f9f7|/7226ca0f-138a-46b4-89f3-dc32dfb1f9f7|/7226ca0f-138a-46b4-89f3-dc32dfb1f9f7",
|
||||
"Raspberry Pi Pico W Datasheet p. 9: If the ADC is not used or ADC performance is not critical, this pin can be connected to digital ground."
|
||||
]
|
||||
],
|
||||
"meta": {
|
||||
"version": 0
|
||||
},
|
||||
@@ -468,9 +453,9 @@
|
||||
"extra_units": "error",
|
||||
"footprint_filter": "ignore",
|
||||
"footprint_link_issues": "warning",
|
||||
"four_way_junction": "ignore",
|
||||
"four_way_junction": "warning",
|
||||
"global_label_dangling": "warning",
|
||||
"hier_label_mismatch": "error",
|
||||
"hier_label_mismatch": "warning",
|
||||
"label_dangling": "error",
|
||||
"label_multiple_wires": "warning",
|
||||
"lib_symbol_issues": "warning",
|
||||
@@ -485,14 +470,14 @@
|
||||
"no_connect_dangling": "warning",
|
||||
"pin_not_connected": "error",
|
||||
"pin_not_driven": "error",
|
||||
"pin_to_pin": "warning",
|
||||
"pin_to_pin": "error",
|
||||
"power_pin_not_driven": "error",
|
||||
"same_local_global_label": "warning",
|
||||
"similar_label_and_power": "warning",
|
||||
"similar_labels": "warning",
|
||||
"similar_power": "warning",
|
||||
"simulation_model_issue": "ignore",
|
||||
"single_global_label": "ignore",
|
||||
"single_global_label": "warning",
|
||||
"unannotated": "error",
|
||||
"unconnected_wire_endpoint": "warning",
|
||||
"undefined_netclass": "error",
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -25,3 +25,12 @@ int mp_hal_is_pin_reserved(int n);
|
||||
#define MICROPY_HW_PIN_RESERVED(i) mp_hal_is_pin_reserved(i)
|
||||
|
||||
#define MICROPY_PY_THREAD (0)
|
||||
|
||||
#define TONBERRY_POWER_EN 22
|
||||
|
||||
#define MICROPY_BOARD_STARTUP() \
|
||||
{ \
|
||||
gpio_init(TONBERRY_POWER_EN); \
|
||||
gpio_set_dir(TONBERRY_POWER_EN, true); \
|
||||
gpio_put(TONBERRY_POWER_EN, true); \
|
||||
}
|
||||
|
||||
@@ -91,19 +91,21 @@ void i2s_stop(void)
|
||||
{
|
||||
if (!i2s_context.playback_active)
|
||||
return;
|
||||
bool have_data = false;
|
||||
do {
|
||||
while (true) {
|
||||
const long flags = save_and_disable_interrupts();
|
||||
const int next_buf = (i2s_context.cur_playing + 1) % AUDIO_BUFS;
|
||||
have_data = i2s_context.has_data[next_buf];
|
||||
const bool have_data = i2s_context.has_data[next_buf];
|
||||
if (!have_data) {
|
||||
i2s_context.playback_active = false;
|
||||
shared_context.underruns = 0;
|
||||
restore_interrupts(flags);
|
||||
break;
|
||||
}
|
||||
__wfi();
|
||||
restore_interrupts(flags);
|
||||
if (have_data)
|
||||
__wfi();
|
||||
} while (have_data);
|
||||
const long flags = save_and_disable_interrupts();
|
||||
i2s_context.playback_active = false;
|
||||
shared_context.underruns = 0;
|
||||
restore_interrupts(flags);
|
||||
__nop(); // Ensure at least two instructions between enable interrupts and subsequent disable
|
||||
__nop();
|
||||
}
|
||||
// Workaround rp2040 E13
|
||||
dma_channel_set_irq1_enabled(i2s_context.dma_ch, false);
|
||||
dma_channel_abort(i2s_context.dma_ch);
|
||||
|
||||
@@ -130,57 +130,62 @@ static void sd_dump_cid [[maybe_unused]] (void)
|
||||
static bool sd_read_csd(struct sd_context *sd_context)
|
||||
{
|
||||
uint8_t buf[16];
|
||||
if (sd_cmd_read(9, 0, 16, buf)) {
|
||||
const uint8_t crc = sd_crc7(15, buf);
|
||||
const uint8_t card_crc = buf[15] >> 1;
|
||||
if (card_crc != crc) {
|
||||
printf("CRC mismatch: Got %02hhx, expected %02hhx\n", card_crc, crc);
|
||||
// Some cheap SD cards always report CRC=0, don't fail in that case
|
||||
if (card_crc != 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
const unsigned csd_ver = buf[0] >> 6;
|
||||
unsigned blocksize [[maybe_unused]] = 0;
|
||||
unsigned blocks = 0;
|
||||
unsigned version [[maybe_unused]] = 0;
|
||||
switch (csd_ver) {
|
||||
case 0: {
|
||||
if (sd_context->sdhc_sdxc) {
|
||||
printf("sd_init: Got CSD v1.0 but card is SDHC/SDXC?\n");
|
||||
return false;
|
||||
}
|
||||
const unsigned read_bl_len = buf[5] & 0xf;
|
||||
if (read_bl_len < 9 || read_bl_len > 11) {
|
||||
printf("Invalid read_bl_len in CSD 1.0\n");
|
||||
return false;
|
||||
}
|
||||
blocksize = 1 << (buf[5] & 0xf);
|
||||
const unsigned c_size_mult = (buf[9] & 0x1) << 2 | (buf[10] & 0xc0) >> 6;
|
||||
const unsigned c_size = (buf[6] & 0x3) << 10 | (buf[7] << 2) | (buf[8] & 0xc0) >> 6;
|
||||
blocks = (c_size + 1) * (1 << (c_size_mult + 2));
|
||||
version = 1;
|
||||
break;
|
||||
}
|
||||
case 1: {
|
||||
blocksize = SD_SECTOR_SIZE;
|
||||
const unsigned c_size = (buf[7] & 0x3f) << 16 | buf[8] << 8 | buf[9];
|
||||
blocks = (c_size + 1) * 1024;
|
||||
version = 2;
|
||||
break;
|
||||
}
|
||||
case 2: {
|
||||
printf("sd_init: Got CSD v3.0, but SDUC does not support SPI.\n");
|
||||
if (!sd_cmd_read(9, 0, 16, buf)) {
|
||||
printf("Failed to read CSD\n");
|
||||
return false;
|
||||
}
|
||||
const uint8_t crc = sd_crc7(15, buf);
|
||||
const uint8_t card_crc = buf[15] >> 1;
|
||||
if (card_crc != crc) {
|
||||
printf("CRC mismatch: Got %02hhx, expected %02hhx\n", card_crc, crc);
|
||||
// Some cheap SD cards always report CRC=0, don't fail in that case
|
||||
if (card_crc != 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
sd_context->blocks = blocks;
|
||||
sd_context->blocksize = blocksize;
|
||||
#ifdef SD_DEBUG
|
||||
printf("CSD version %u.0, blocksize %u, blocks %u, capacity %llu MiB\n", version, blocksize, blocks,
|
||||
((uint64_t)blocksize * blocks) / (1024 * 1024));
|
||||
#endif
|
||||
}
|
||||
const unsigned csd_ver = buf[0] >> 6;
|
||||
unsigned blocksize [[maybe_unused]] = 0;
|
||||
unsigned blocks = 0;
|
||||
unsigned version [[maybe_unused]] = 0;
|
||||
unsigned max_speed [[maybe_unused]] = 0;
|
||||
switch (csd_ver) {
|
||||
case 0: {
|
||||
if (sd_context->sdhc_sdxc) {
|
||||
printf("sd_init: Got CSD v1.0 but card is SDHC/SDXC?\n");
|
||||
return false;
|
||||
}
|
||||
const unsigned read_bl_len = buf[5] & 0xf;
|
||||
if (read_bl_len < 9 || read_bl_len > 11) {
|
||||
printf("Invalid read_bl_len in CSD 1.0\n");
|
||||
return false;
|
||||
}
|
||||
blocksize = 1 << (buf[5] & 0xf);
|
||||
const unsigned c_size_mult = (buf[9] & 0x1) << 2 | (buf[10] & 0xc0) >> 6;
|
||||
const unsigned c_size = (buf[6] & 0x3) << 10 | (buf[7] << 2) | (buf[8] & 0xc0) >> 6;
|
||||
blocks = (c_size + 1) * (1 << (c_size_mult + 2));
|
||||
version = 1;
|
||||
max_speed = buf[3];
|
||||
break;
|
||||
}
|
||||
case 1: {
|
||||
blocksize = SD_SECTOR_SIZE;
|
||||
const unsigned c_size = (buf[7] & 0x3f) << 16 | buf[8] << 8 | buf[9];
|
||||
blocks = (c_size + 1) * 1024;
|
||||
version = 2;
|
||||
max_speed = buf[3];
|
||||
break;
|
||||
}
|
||||
case 2: {
|
||||
printf("sd_init: Got CSD v3.0, but SDUC does not support SPI.\n");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
sd_context->blocks = blocks;
|
||||
sd_context->blocksize = blocksize;
|
||||
#ifdef SD_DEBUG
|
||||
printf("CSD version %u.0, blocksize %u, blocks %u, capacity %llu MiB, max speed %u\n", version, blocksize, blocks,
|
||||
((uint64_t)blocksize * blocks) / (1024 * 1024), max_speed);
|
||||
#endif
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -191,52 +196,52 @@ bool sd_init(struct sd_context *sd_context, int mosi, int miso, int sck, int ss,
|
||||
}
|
||||
|
||||
if (!sd_early_init()) {
|
||||
return false;
|
||||
goto out_spi;
|
||||
}
|
||||
|
||||
if (!sd_check_interface_condition()) {
|
||||
return false;
|
||||
goto out_spi;
|
||||
}
|
||||
|
||||
uint32_t ocr;
|
||||
if (!sd_read_ocr(&ocr)) {
|
||||
printf("sd_init: read OCR failed\n");
|
||||
return false;
|
||||
goto out_spi;
|
||||
}
|
||||
if ((ocr & 0x00380000) != 0x00380000) {
|
||||
printf("sd_init: unsupported card voltage range\n");
|
||||
return false;
|
||||
goto out_spi;
|
||||
}
|
||||
|
||||
if (!sd_send_op_cond())
|
||||
return false;
|
||||
goto out_spi;
|
||||
|
||||
sd_spi_set_bitrate(rate);
|
||||
|
||||
if (!sd_read_ocr(&ocr)) {
|
||||
printf("sd_init: read OCR failed\n");
|
||||
return false;
|
||||
goto out_spi;
|
||||
}
|
||||
if (!(ocr & (1 << 31))) {
|
||||
printf("sd_init: card not powered up but !idle?\n");
|
||||
return false;
|
||||
goto out_spi;
|
||||
}
|
||||
sd_context->sdhc_sdxc = (ocr & (1 << 30));
|
||||
|
||||
if (!sd_read_csd(sd_context)) {
|
||||
return false;
|
||||
goto out_spi;
|
||||
}
|
||||
|
||||
if (sd_context->blocksize != SD_SECTOR_SIZE) {
|
||||
if (sd_context->blocksize != 1024 && sd_context->blocksize != 2048) {
|
||||
printf("sd_init: Unsupported block size %u\n", sd_context->blocksize);
|
||||
return false;
|
||||
goto out_spi;
|
||||
}
|
||||
// Attempt SET_BLOCKLEN command
|
||||
uint8_t resp[1];
|
||||
if (!sd_cmd(16, SD_SECTOR_SIZE, 1, resp)) {
|
||||
printf("sd_init: SET_BLOCKLEN failed\n");
|
||||
return false;
|
||||
goto out_spi;
|
||||
}
|
||||
// Successfully set blocksize to SD_SECTOR_SIZE, adjust context
|
||||
sd_context->blocks *= sd_context->blocksize / SD_SECTOR_SIZE;
|
||||
@@ -253,6 +258,10 @@ bool sd_init(struct sd_context *sd_context, int mosi, int miso, int sck, int ss,
|
||||
|
||||
sd_context->initialized = true;
|
||||
return true;
|
||||
|
||||
out_spi:
|
||||
sd_spi_deinit();
|
||||
return false;
|
||||
}
|
||||
|
||||
bool sd_deinit(struct sd_context *sd_context)
|
||||
|
||||
@@ -12,13 +12,15 @@
|
||||
#include <pico/time.h>
|
||||
#include <string.h>
|
||||
|
||||
typedef enum { DMA_READ_TOKEN, DMA_READ, DMA_IDLE } sd_dma_state;
|
||||
|
||||
struct sd_dma_context {
|
||||
uint8_t *read_buf;
|
||||
size_t len;
|
||||
uint8_t crc_buf[2];
|
||||
uint8_t read_token_buf;
|
||||
uint8_t wrdata;
|
||||
_Atomic enum { DMA_READ_TOKEN, DMA_READ, DMA_IDLE } state;
|
||||
_Atomic sd_dma_state state;
|
||||
};
|
||||
|
||||
struct sd_spi_context {
|
||||
@@ -111,8 +113,18 @@ static void __time_critical_func(sd_spi_dma_isr)(void)
|
||||
|
||||
void sd_spi_wait_complete(void)
|
||||
{
|
||||
while (sd_spi_context.sd_dma_context.state != DMA_IDLE)
|
||||
while (true) {
|
||||
const long flags = save_and_disable_interrupts();
|
||||
const sd_dma_state state = sd_spi_context.sd_dma_context.state;
|
||||
if (state == DMA_IDLE) {
|
||||
restore_interrupts(flags);
|
||||
return;
|
||||
}
|
||||
__wfi();
|
||||
restore_interrupts(flags);
|
||||
__nop(); // Ensure at least two instructions between enable interrupts and subsequent disable
|
||||
__nop();
|
||||
}
|
||||
}
|
||||
|
||||
bool sd_cmd_read_is_complete(void) { return sd_spi_context.sd_dma_context.state == DMA_IDLE; }
|
||||
@@ -208,6 +220,12 @@ bool sd_cmd_read_complete(void)
|
||||
sd_spi_wait_complete();
|
||||
gpio_put(sd_spi_context.ss, true);
|
||||
sd_spi_read_blocking(0xff, &buf, 1);
|
||||
if (sd_spi_context.sd_dma_context.read_token_buf != 0xfe) {
|
||||
#ifdef SD_DEBUG
|
||||
printf("read failed: invalid read token %02hhx\n", sd_spi_context.sd_dma_context.read_token_buf);
|
||||
#endif
|
||||
return false;
|
||||
}
|
||||
#ifdef SD_READ_CRC_CHECK
|
||||
const uint16_t expect_crc = sd_crc16(sd_spi_context.sd_dma_context.len, sd_spi_context.sd_dma_context.read_buf);
|
||||
const uint16_t act_crc = sd_spi_context.sd_dma_context.crc_buf[0] << 8 | sd_spi_context.sd_dma_context.crc_buf[1];
|
||||
@@ -218,7 +236,7 @@ bool sd_cmd_read_complete(void)
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
return (sd_spi_context.sd_dma_context.read_token_buf == 0xfe);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool sd_cmd_write(uint8_t cmd, uint32_t arg, unsigned datalen, uint8_t data[const static datalen])
|
||||
|
||||
@@ -52,10 +52,12 @@ static inline void sd_spi_pio_program_init(PIO pio, uint sm, uint offset, uint m
|
||||
sm_config_set_out_shift(&c, false, true, 8);
|
||||
sm_config_set_in_shift(&c, false, true, 8);
|
||||
|
||||
// high speed SPI needs to bypass the input synchronizers on the MISO pin
|
||||
hw_set_bits(&pio->input_sync_bypass, 1u << miso);
|
||||
|
||||
const unsigned pio_freq = bitrate*4;
|
||||
const float div = clock_get_hz(clk_sys) / (float)pio_freq;
|
||||
// for some reason, small clkdiv values (even integer ones) cause issues
|
||||
sm_config_set_clkdiv(&c, div < 2.5f ? 2.5f : div);
|
||||
sm_config_set_clkdiv(&c, div);
|
||||
pio_sm_init(pio, sm, offset, &c);
|
||||
}
|
||||
%}
|
||||
|
||||
@@ -2,24 +2,51 @@
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
from collections import namedtuple
|
||||
import os
|
||||
import time
|
||||
from utils import TimerManager
|
||||
|
||||
|
||||
Dependencies = namedtuple('Dependencies', ('mp3player', 'nfcreader', 'buttons'))
|
||||
Dependencies = namedtuple('Dependencies', ('mp3player', 'nfcreader', 'buttons', 'playlistdb'))
|
||||
|
||||
# Should be ~ 6dB steps
|
||||
VOLUME_CURVE = [1, 2, 4, 8, 16, 32, 63, 126, 251]
|
||||
|
||||
|
||||
class PlayerApp:
|
||||
class TagStateMachine:
|
||||
def __init__(self, parent, timer_manager):
|
||||
self.parent = parent
|
||||
self.timer_manager = timer_manager
|
||||
self.current_tag = None
|
||||
self.current_tag_time = time.ticks_ms()
|
||||
|
||||
def onTagChange(self, new_tag):
|
||||
if new_tag is not None:
|
||||
self.timer_manager.cancel(self.onTagRemoveDelay)
|
||||
if new_tag == self.current_tag:
|
||||
return
|
||||
# Change playlist on new tag
|
||||
if new_tag is not None:
|
||||
self.current_tag_time = time.ticks_ms()
|
||||
self.current_tag = new_tag
|
||||
self.parent.onNewTag(new_tag)
|
||||
else:
|
||||
self.timer_manager.schedule(time.ticks_ms() + 5000, self.onTagRemoveDelay)
|
||||
|
||||
def onTagRemoveDelay(self):
|
||||
if self.current_tag is not None:
|
||||
self.current_tag = None
|
||||
self.parent.onTagRemoved()
|
||||
|
||||
def __init__(self, deps: Dependencies):
|
||||
self.current_tag = None
|
||||
self.current_tag_time = time.ticks_ms()
|
||||
self.timer_manager = TimerManager()
|
||||
self.tag_state_machine = self.TagStateMachine(self, self.timer_manager)
|
||||
self.player = deps.mp3player(self)
|
||||
self.nfc = deps.nfcreader(self)
|
||||
self.nfc = deps.nfcreader(self.tag_state_machine)
|
||||
self.playlist_db = deps.playlistdb(self)
|
||||
self.tag_mode = self.playlist_db.getSetting('tagmode')
|
||||
self.playing_tag = None
|
||||
self.playlist = None
|
||||
self.buttons = deps.buttons(self) if deps.buttons is not None else None
|
||||
self.mp3file = None
|
||||
self.volume_pos = 3
|
||||
@@ -30,36 +57,29 @@ class PlayerApp:
|
||||
self.mp3file.close()
|
||||
self.mp3file = None
|
||||
|
||||
def onTagChange(self, new_tag):
|
||||
if new_tag is not None:
|
||||
self.timer_manager.cancel(self.onTagRemoveDelay)
|
||||
if new_tag == self.current_tag:
|
||||
return
|
||||
# Change playlist on new tag
|
||||
if new_tag is not None:
|
||||
self.current_tag_time = time.ticks_ms()
|
||||
self.current_tag = new_tag
|
||||
uid_str = ''.join('{:02x}'.format(x) for x in new_tag)
|
||||
try:
|
||||
testfiles = [f'/sd/{uid_str}/'.encode() + name for name in os.listdir(f'/sd/{uid_str}'.encode())
|
||||
if name.endswith(b'mp3')]
|
||||
except OSError as ex:
|
||||
print(f'Could not get playlist for tag {uid_str}: {ex}')
|
||||
self.current_tag = None
|
||||
self.player.stop()
|
||||
return
|
||||
testfiles.sort()
|
||||
self._set_playlist(testfiles)
|
||||
else:
|
||||
self.timer_manager.schedule(time.ticks_ms() + 5000, self.onTagRemoveDelay)
|
||||
def onNewTag(self, new_tag):
|
||||
"""
|
||||
Callback (typically called by TagStateMachine) to signal that a new tag has been presented.
|
||||
"""
|
||||
uid_str = b''.join('{:02x}'.format(x).encode() for x in new_tag)
|
||||
if self.tag_mode == 'tagremains' or (self.tag_mode == 'tagstartstop' and new_tag != self.playing_tag):
|
||||
self._set_playlist(uid_str)
|
||||
self.playing_tag = new_tag
|
||||
elif self.tag_mode == 'tagstartstop':
|
||||
print('Tag presented again, stopping playback')
|
||||
self._unset_playlist()
|
||||
self.playing_tag = None
|
||||
|
||||
def onTagRemoveDelay(self):
|
||||
if self.current_tag is not None:
|
||||
def onTagRemoved(self):
|
||||
"""
|
||||
Callback (typically called by TagStateMachine) to signal that a tag has been removed.
|
||||
"""
|
||||
if self.tag_mode == 'tagremains':
|
||||
print('Tag gone, stopping playback')
|
||||
self.current_tag = None
|
||||
self.player.stop()
|
||||
self._unset_playlist()
|
||||
|
||||
def onButtonPressed(self, what):
|
||||
assert self.buttons is not None
|
||||
if what == self.buttons.VOLUP:
|
||||
self.volume_pos = min(self.volume_pos + 1, len(VOLUME_CURVE) - 1)
|
||||
self.player.set_volume(VOLUME_CURVE[self.volume_pos])
|
||||
@@ -70,24 +90,39 @@ class PlayerApp:
|
||||
self._play_next()
|
||||
|
||||
def onPlaybackDone(self):
|
||||
assert self.mp3file is not None
|
||||
self.mp3file.close()
|
||||
self.mp3file = None
|
||||
self._play_next()
|
||||
|
||||
def _set_playlist(self, files: list[bytes]):
|
||||
self.playlist_pos = 0
|
||||
self.playlist = files
|
||||
self._play(self.playlist[self.playlist_pos])
|
||||
def _set_playlist(self, tag: bytes):
|
||||
self._unset_playlist()
|
||||
self.playlist = self.playlist_db.getPlaylistForTag(tag)
|
||||
self._play(self.playlist.getCurrentPath() if self.playlist is not None else None,
|
||||
self.playlist.getPlaybackOffset() if self.playlist is not None else 0)
|
||||
|
||||
def _unset_playlist(self):
|
||||
if self.playlist is not None:
|
||||
pos = self.player.stop()
|
||||
if pos is not None:
|
||||
self.playlist.setPlaybackOffset(pos)
|
||||
self.playlist = None
|
||||
|
||||
def _play_next(self):
|
||||
if self.playlist_pos + 1 < len(self.playlist):
|
||||
self.playlist_pos += 1
|
||||
self._play(self.playlist[self.playlist_pos])
|
||||
if self.playlist is None:
|
||||
return
|
||||
filename = self.playlist.getNextPath()
|
||||
self._play(filename)
|
||||
if filename is None:
|
||||
self.playlist = None
|
||||
self.playing_tag = None
|
||||
|
||||
def _play(self, filename: bytes):
|
||||
def _play(self, filename: bytes | None, offset=0):
|
||||
if self.mp3file is not None:
|
||||
self.player.stop()
|
||||
self.mp3file.close()
|
||||
self.mp3file = None
|
||||
self.mp3file = open(filename, 'rb')
|
||||
self.player.play(self.mp3file)
|
||||
if filename is not None:
|
||||
print(f'Playing {filename!r}')
|
||||
self.mp3file = open(filename, 'rb')
|
||||
self.player.play(self.mp3file, offset)
|
||||
|
||||
@@ -42,10 +42,8 @@ VBAT_ADC = Pin.board.GP26
|
||||
|
||||
|
||||
def board_init():
|
||||
# Keep power turned on
|
||||
# POWER_EN turned on in MICROPY_BOARD_STARTUP
|
||||
# TODO: Implement soft power off
|
||||
POWER_EN.init(mode=Pin.OUT)
|
||||
POWER_EN.value(1)
|
||||
|
||||
# SD_DO / MISO input doesn't need any special configuration
|
||||
# Set 8 mA drive strength for SCK and MOSI
|
||||
|
||||
@@ -3,8 +3,11 @@
|
||||
|
||||
import aiorepl # type: ignore
|
||||
import asyncio
|
||||
from errno import ENOENT
|
||||
import machine
|
||||
import micropython
|
||||
import network
|
||||
import os
|
||||
import time
|
||||
from math import pi, sin, pow
|
||||
|
||||
@@ -15,7 +18,7 @@ from mfrc522 import MFRC522
|
||||
from mp3player import MP3Player
|
||||
from nfc import Nfc
|
||||
from rp2_neopixel import NeoPixel
|
||||
from utils import Buttons, SDContext, TimerManager
|
||||
from utils import BTreeFileManager, Buttons, SDContext, TimerManager
|
||||
|
||||
try:
|
||||
import hwconfig
|
||||
@@ -54,36 +57,78 @@ async def rainbow(np, period=10):
|
||||
machine.mem32[0x40030000 + 0x00] = 0x10
|
||||
|
||||
|
||||
def setup_wifi():
|
||||
network.hostname("TonberryPico")
|
||||
wlan = network.WLAN(network.WLAN.IF_AP)
|
||||
wlan.config(ssid=f"TonberryPicoAP_{machine.unique_id().hex()}", security=wlan.SEC_OPEN)
|
||||
wlan.active(True)
|
||||
|
||||
|
||||
DB_PATH = '/sd/tonberry.db'
|
||||
|
||||
|
||||
def run():
|
||||
asyncio.new_event_loop()
|
||||
# Setup LEDs
|
||||
np = NeoPixel(hwconfig.LED_DIN, hwconfig.LED_COUNT, sm=1)
|
||||
asyncio.create_task(rainbow(np))
|
||||
|
||||
# Wifi with default config
|
||||
setup_wifi()
|
||||
|
||||
# Setup MP3 player
|
||||
with SDContext(mosi=hwconfig.SD_DI, miso=hwconfig.SD_DO, sck=hwconfig.SD_SCK, ss=hwconfig.SD_CS,
|
||||
baudrate=hwconfig.SD_CLOCKRATE), \
|
||||
AudioContext(hwconfig.I2S_DIN, hwconfig.I2S_DCLK, hwconfig.I2S_LRCLK) as audioctx:
|
||||
baudrate=hwconfig.SD_CLOCKRATE):
|
||||
# Temporary hack: build database from folders if no database exists
|
||||
# Can be removed once playlists can be created via API
|
||||
try:
|
||||
_ = os.stat(DB_PATH)
|
||||
except OSError as ex:
|
||||
if ex.errno == ENOENT:
|
||||
print("No playlist DB found, trying to build DB from tag dirs")
|
||||
builddb()
|
||||
|
||||
# Setup NFC
|
||||
reader = MFRC522(spi_id=hwconfig.RC522_SPIID, sck=hwconfig.RC522_SCK, miso=hwconfig.RC522_MISO,
|
||||
mosi=hwconfig.RC522_MOSI, cs=hwconfig.RC522_SS, rst=hwconfig.RC522_RST, tocard_retries=20)
|
||||
with BTreeFileManager(DB_PATH) as playlistdb, \
|
||||
AudioContext(hwconfig.I2S_DIN, hwconfig.I2S_DCLK, hwconfig.I2S_LRCLK) as audioctx:
|
||||
|
||||
# Setup app
|
||||
deps = app.Dependencies(mp3player=lambda the_app: MP3Player(audioctx, the_app),
|
||||
nfcreader=lambda the_app: Nfc(reader, the_app),
|
||||
buttons=lambda the_app: Buttons(the_app, pin_volup=hwconfig.BUTTON_VOLUP,
|
||||
pin_voldown=hwconfig.BUTTON_VOLDOWN,
|
||||
pin_next=hwconfig.BUTTON_NEXT))
|
||||
the_app = app.PlayerApp(deps)
|
||||
# Setup NFC
|
||||
reader = MFRC522(spi_id=hwconfig.RC522_SPIID, sck=hwconfig.RC522_SCK, miso=hwconfig.RC522_MISO,
|
||||
mosi=hwconfig.RC522_MOSI, cs=hwconfig.RC522_SS, rst=hwconfig.RC522_RST, tocard_retries=20)
|
||||
|
||||
# Start
|
||||
asyncio.create_task(aiorepl.task({'timer_manager': TimerManager(),
|
||||
'app': the_app}))
|
||||
asyncio.get_event_loop().run_forever()
|
||||
# Setup app
|
||||
deps = app.Dependencies(mp3player=lambda the_app: MP3Player(audioctx, the_app),
|
||||
nfcreader=lambda the_app: Nfc(reader, the_app),
|
||||
buttons=lambda the_app: Buttons(the_app, pin_volup=hwconfig.BUTTON_VOLUP,
|
||||
pin_voldown=hwconfig.BUTTON_VOLDOWN,
|
||||
pin_next=hwconfig.BUTTON_NEXT),
|
||||
playlistdb=lambda _: playlistdb)
|
||||
the_app = app.PlayerApp(deps)
|
||||
|
||||
# Start
|
||||
asyncio.create_task(aiorepl.task({'timer_manager': TimerManager(),
|
||||
'app': the_app}))
|
||||
asyncio.get_event_loop().run_forever()
|
||||
|
||||
|
||||
def builddb():
|
||||
"""
|
||||
For testing, build a playlist db based on the previous tag directory format.
|
||||
Can be removed once uploading files / playlist via the web api is possible.
|
||||
"""
|
||||
try:
|
||||
os.unlink(DB_PATH)
|
||||
except OSError:
|
||||
pass
|
||||
with BTreeFileManager(DB_PATH) as db:
|
||||
for name, type_, _, _ in os.ilistdir(b'/sd'):
|
||||
if type_ != 0x4000:
|
||||
continue
|
||||
fl = [b'/sd/' + name + b'/' + x for x in os.listdir(b'/sd/' + name) if x.endswith(b'.mp3')]
|
||||
db.createPlaylistForTag(name, fl)
|
||||
os.sync()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if machine.Pin(17, machine.Pin.IN, machine.Pin.PULL_UP).value() != 0:
|
||||
time.sleep(1)
|
||||
time.sleep(1)
|
||||
if machine.Pin(hwconfig.BUTTON_VOLUP, machine.Pin.IN, machine.Pin.PULL_UP).value() != 0:
|
||||
run()
|
||||
|
||||
@@ -21,14 +21,19 @@ class MP3Player:
|
||||
self.mp3task = None
|
||||
self.volume = 128
|
||||
self.cb = cb
|
||||
self.pos = 0
|
||||
|
||||
def play(self, stream):
|
||||
def play(self, stream, offset=0):
|
||||
"""
|
||||
Play from byte stream.
|
||||
If offset > 0, discard the first offset bytes
|
||||
"""
|
||||
if self.mp3task is not None:
|
||||
self.mp3task.cancel()
|
||||
self.mp3task = None
|
||||
if offset > 0:
|
||||
stream.seek(offset, 1)
|
||||
self.pos = offset
|
||||
self.mp3task = asyncio.create_task(self._play_task(stream))
|
||||
|
||||
def stop(self):
|
||||
@@ -38,6 +43,8 @@ class MP3Player:
|
||||
if self.mp3task is not None:
|
||||
self.mp3task.cancel()
|
||||
self.mp3task = None
|
||||
return self.pos
|
||||
return None
|
||||
|
||||
def set_volume(self, volume: int):
|
||||
"""
|
||||
@@ -60,6 +67,7 @@ class MP3Player:
|
||||
# End of file
|
||||
break
|
||||
_, _, underruns = await self.audiocore.async_put(data[:bytes_read])
|
||||
self.pos += bytes_read
|
||||
if underruns > known_underruns:
|
||||
print(f"{underruns:x}")
|
||||
known_underruns = underruns
|
||||
|
||||
@@ -4,7 +4,8 @@
|
||||
from utils.buttons import Buttons
|
||||
from utils.mbrpartition import MBRPartition
|
||||
from utils.pinindex import get_pin_index
|
||||
from utils.playlistdb import BTreeDB, BTreeFileManager
|
||||
from utils.sdcontext import SDContext
|
||||
from utils.timer import TimerManager
|
||||
|
||||
__all__ = ["Buttons", "get_pin_index", "MBRPartition", "SDContext", "TimerManager"]
|
||||
__all__ = ["BTreeDB", "BTreeFileManager", "Buttons", "get_pin_index", "MBRPartition", "SDContext", "TimerManager"]
|
||||
|
||||
387
software/src/utils/playlistdb.py
Normal file
387
software/src/utils/playlistdb.py
Normal file
@@ -0,0 +1,387 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
import btree
|
||||
import random
|
||||
import time
|
||||
try:
|
||||
import typing
|
||||
from typing import TYPE_CHECKING, Iterable # type: ignore
|
||||
except ImportError:
|
||||
TYPE_CHECKING = False
|
||||
if TYPE_CHECKING:
|
||||
class IPlaylist(typing.Protocol):
|
||||
def getPaths(self) -> Iterable[bytes]: ...
|
||||
def getCurrentPath(self) -> bytes: ...
|
||||
def getNextPath(self) -> bytes | None: ...
|
||||
|
||||
class IPlaylistDB(typing.Protocol):
|
||||
def getPlaylistForTag(self, tag: bytes) -> IPlaylist: ...
|
||||
else:
|
||||
class IPlaylistDB(object):
|
||||
...
|
||||
|
||||
class IPlaylist(object):
|
||||
...
|
||||
|
||||
|
||||
class BTreeDB(IPlaylistDB):
|
||||
SHUFFLE_NO = b'no'
|
||||
SHUFFLE_YES = b'yes'
|
||||
PERSIST_NO = b'no'
|
||||
PERSIST_TRACK = b'track'
|
||||
PERSIST_OFFSET = b'offset'
|
||||
DEFAULT_SETTINGS = {
|
||||
b'tagmode': b'tagremains'
|
||||
}
|
||||
|
||||
class Playlist(IPlaylist):
|
||||
def __init__(self, parent: "BTreeDB", tag: bytes, pos: int, persist, shuffle):
|
||||
self.parent = parent
|
||||
self.tag = tag
|
||||
self.pos = pos
|
||||
self.persist = persist
|
||||
self.shuffle = shuffle
|
||||
self.length = self.parent._getPlaylistLength(self.tag)
|
||||
self._shuffle()
|
||||
|
||||
def _getPlaylistPos(self):
|
||||
"""
|
||||
Gets the position to pass to parent._getPlaylistEntry etc.
|
||||
"""
|
||||
if self.shuffle == BTreeDB.SHUFFLE_YES:
|
||||
return self.shuffle_order[self.pos]
|
||||
else:
|
||||
return self.pos
|
||||
|
||||
def _shuffle(self, reshuffle=False):
|
||||
if self.shuffle == BTreeDB.SHUFFLE_NO:
|
||||
return
|
||||
|
||||
self.shuffle_seed = None
|
||||
# Try to get seed from DB if persisted
|
||||
if self.persist != BTreeDB.PERSIST_NO and not reshuffle:
|
||||
self.shuffle_seed = self.parent._getPlaylistShuffleSeed(self.tag)
|
||||
if self.shuffle_seed is None:
|
||||
# Either not persisted or could not read from db
|
||||
self.shuffle_seed = time.ticks_cpu()
|
||||
if self.persist != BTreeDB.PERSIST_NO:
|
||||
self.parent._setPlaylistShuffleSeed(self.tag, self.shuffle_seed)
|
||||
# TODO: Find an algorithm for shuffling that does not use O(n) memory for playlist of length n
|
||||
random.seed(self.shuffle_seed)
|
||||
entries = list(range(0, self.length))
|
||||
# We don't have random.shuffle in micropython, so emulate it with random.choice
|
||||
self.shuffle_order = []
|
||||
while len(entries) > 0:
|
||||
chosen = random.choice(entries)
|
||||
self.shuffle_order.append(chosen)
|
||||
entries.remove(chosen)
|
||||
|
||||
def getPaths(self):
|
||||
"""
|
||||
Get entire playlist in storage order
|
||||
"""
|
||||
return self.parent._getPlaylistValueIterator(self.tag)
|
||||
|
||||
def getCurrentPath(self):
|
||||
"""
|
||||
Get path of file that should be played.
|
||||
"""
|
||||
return self.parent._getPlaylistEntry(self.tag, self._getPlaylistPos())
|
||||
|
||||
def getNextPath(self):
|
||||
"""
|
||||
Select next track and return path.
|
||||
"""
|
||||
if self.pos + 1 >= self.length:
|
||||
self.pos = 0
|
||||
if self.persist != BTreeDB.PERSIST_NO:
|
||||
self.parent._setPlaylistPos(self.tag, self.pos)
|
||||
self.setPlaybackOffset(0)
|
||||
self._shuffle(True)
|
||||
return None
|
||||
|
||||
self.pos += 1
|
||||
if self.persist != BTreeDB.PERSIST_NO:
|
||||
self.parent._setPlaylistPos(self.tag, self.pos)
|
||||
self.setPlaybackOffset(0)
|
||||
return self.getCurrentPath()
|
||||
|
||||
def setPlaybackOffset(self, offset):
|
||||
"""
|
||||
Store the current position in the track for PERSIST_OFFSET mode
|
||||
"""
|
||||
if self.persist != BTreeDB.PERSIST_OFFSET:
|
||||
return
|
||||
self.parent._setPlaylistPosOffset(self.tag, offset)
|
||||
|
||||
def getPlaybackOffset(self):
|
||||
"""
|
||||
Get the current position in the track for PERSIST_OFFSET mode
|
||||
"""
|
||||
if self.persist != BTreeDB.PERSIST_OFFSET:
|
||||
return 0
|
||||
return self.parent._getPlaylistPosOffset(self.tag)
|
||||
|
||||
def __init__(self, db: btree.BTree, flush_func: typing.Callable | None = None):
|
||||
self.db = db
|
||||
self.flush_func = flush_func
|
||||
|
||||
@staticmethod
|
||||
def _keyPlaylistPos(tag):
|
||||
return b''.join([tag, b'/playlistpos'])
|
||||
|
||||
@staticmethod
|
||||
def _keyPlaylistPosOffset(tag):
|
||||
return b''.join([tag, b'/playlistposoffset'])
|
||||
|
||||
@staticmethod
|
||||
def _keyPlaylistShuffle(tag):
|
||||
return b''.join([tag, b'/playlistshuffle'])
|
||||
|
||||
@staticmethod
|
||||
def _keyPlaylistShuffleSeed(tag):
|
||||
return b''.join([tag, b'/playlistshuffleseed'])
|
||||
|
||||
@staticmethod
|
||||
def _keyPlaylistPersist(tag):
|
||||
return b''.join([tag, b'/playlistpersist'])
|
||||
|
||||
@staticmethod
|
||||
def _keyPlaylistEntry(tag, pos):
|
||||
return b''.join([tag, b'/playlist/', '{:05}'.format(pos).encode()])
|
||||
|
||||
@staticmethod
|
||||
def _keyPlaylistStart(tag):
|
||||
return b''.join([tag, b'/playlist/'])
|
||||
|
||||
@staticmethod
|
||||
def _keyPlaylistStartEnd(tag):
|
||||
return (b''.join([tag, b'/playlist/']),
|
||||
b''.join([tag, b'/playlist0']))
|
||||
|
||||
def _flush(self):
|
||||
"""
|
||||
Flush the database and call the flush_func if it was provided.
|
||||
"""
|
||||
self.db.flush()
|
||||
if self.flush_func is not None:
|
||||
self.flush_func()
|
||||
|
||||
def _getPlaylistValueIterator(self, tag: bytes):
|
||||
start, end = self._keyPlaylistStartEnd(tag)
|
||||
return self.db.values(start, end)
|
||||
|
||||
def _getPlaylistEntry(self, tag: bytes, pos: int) -> bytes:
|
||||
return self.db[self._keyPlaylistEntry(tag, pos)]
|
||||
|
||||
def _setPlaylistPos(self, tag: bytes, pos: int, flush=True):
|
||||
self.db[self._keyPlaylistPos(tag)] = str(pos).encode()
|
||||
if flush:
|
||||
self._flush()
|
||||
|
||||
def _setPlaylistPosOffset(self, tag: bytes, offset: int, flush=True):
|
||||
self.db[self._keyPlaylistPosOffset(tag)] = str(offset).encode()
|
||||
if flush:
|
||||
self._flush()
|
||||
|
||||
def _getPlaylistPosOffset(self, tag: bytes) -> int:
|
||||
return int(self.db.get(self._keyPlaylistPosOffset(tag), b'0'))
|
||||
|
||||
def _getPlaylistShuffleSeed(self, tag: bytes) -> int | None:
|
||||
try:
|
||||
return int(self.db[self._keyPlaylistShuffleSeed(tag)])
|
||||
except (ValueError, KeyError):
|
||||
return None
|
||||
|
||||
def _setPlaylistShuffleSeed(self, tag, seed: int, flush=True):
|
||||
self.db[self._keyPlaylistShuffleSeed(tag)] = str(seed).encode()
|
||||
if flush:
|
||||
self._flush()
|
||||
|
||||
def _getPlaylistLength(self, tag: bytes) -> int:
|
||||
start, end = self._keyPlaylistStartEnd(tag)
|
||||
for k in self.db.keys(end, start, btree.DESC):
|
||||
# There is a bug in btreedb that causes an additional key after 'end' to be returned when iterating in
|
||||
# descending order
|
||||
# Check for this and skip it if needed
|
||||
elements = k.split(b'/')
|
||||
if len(elements) >= 2 and elements[1] == b'playlist':
|
||||
last = k
|
||||
break
|
||||
print(last)
|
||||
elements = last.split(b'/')
|
||||
if len(elements) != 3:
|
||||
raise RuntimeError("Malformed playlist key")
|
||||
return int(elements[2])+1
|
||||
|
||||
def _savePlaylist(self, tag, entries, persist, shuffle, flush=True):
|
||||
self._deletePlaylist(tag, False)
|
||||
for idx, entry in enumerate(entries):
|
||||
self.db[self._keyPlaylistEntry(tag, idx)] = entry
|
||||
self.db[self._keyPlaylistPersist(tag)] = persist
|
||||
self.db[self._keyPlaylistShuffle(tag)] = shuffle
|
||||
if flush:
|
||||
self._flush()
|
||||
|
||||
def _deletePlaylist(self, tag, flush=True):
|
||||
start_key, end_key = self._keyPlaylistStartEnd(tag)
|
||||
for k in self.db.keys(start_key, end_key):
|
||||
try:
|
||||
del self.db[k]
|
||||
except KeyError:
|
||||
pass
|
||||
for k in (self._keyPlaylistPos(tag), self._keyPlaylistPosOffset(tag),
|
||||
self._keyPlaylistPersist(tag), self._keyPlaylistShuffle(tag),
|
||||
self._keyPlaylistShuffleSeed(tag)):
|
||||
try:
|
||||
del self.db[k]
|
||||
except KeyError:
|
||||
pass
|
||||
if flush:
|
||||
self._flush()
|
||||
|
||||
def getPlaylistForTag(self, tag: bytes):
|
||||
"""
|
||||
Lookup the playlist for 'tag' and return the Playlist object. Return None if no playlist exists for the given
|
||||
tag.
|
||||
"""
|
||||
persist = self.db.get(self._keyPlaylistPersist(tag), self.PERSIST_TRACK)
|
||||
pos = 0
|
||||
if persist != self.PERSIST_NO and self._keyPlaylistPos(tag) in self.db:
|
||||
try:
|
||||
pos = int(self.db[self._keyPlaylistPos(tag)])
|
||||
except ValueError:
|
||||
pass
|
||||
if self._keyPlaylistEntry(tag, 0) not in self.db:
|
||||
# Empty playlist
|
||||
return None
|
||||
if self._keyPlaylistEntry(tag, pos) not in self.db:
|
||||
pos = 0
|
||||
shuffle = self.db.get(self._keyPlaylistShuffle(tag), self.SHUFFLE_NO)
|
||||
return self.Playlist(self, tag, pos, persist, shuffle)
|
||||
|
||||
def createPlaylistForTag(self, tag: bytes, entries: typing.Iterable[bytes], persist=PERSIST_TRACK,
|
||||
shuffle=SHUFFLE_NO):
|
||||
"""
|
||||
Create and save a playlist for 'tag' and return the Playlist object. If a playlist already existed for 'tag' it
|
||||
is overwritten.
|
||||
"""
|
||||
assert persist in (self.PERSIST_NO, self.PERSIST_TRACK, self.PERSIST_OFFSET)
|
||||
assert shuffle in (self.SHUFFLE_NO, self.SHUFFLE_YES)
|
||||
self._savePlaylist(tag, entries, persist, shuffle)
|
||||
return self.getPlaylistForTag(tag)
|
||||
|
||||
def getSetting(self, key: bytes | str) -> str:
|
||||
if key is str:
|
||||
key = key.encode()
|
||||
return self.db.get(b'settings/' + key, self.DEFAULT_SETTINGS[key]).decode()
|
||||
|
||||
def validate(self, dump=False):
|
||||
"""
|
||||
Validate the structure of the playlist database.
|
||||
"""
|
||||
result = True
|
||||
|
||||
def fail(msg):
|
||||
nonlocal result
|
||||
print(msg)
|
||||
result = False
|
||||
|
||||
last_tag = None
|
||||
last_pos = None
|
||||
for k in self.db.keys():
|
||||
fields = k.split(b'/')
|
||||
if len(fields) <= 1:
|
||||
fail(f'Malformed key {k!r}')
|
||||
continue
|
||||
if fields[0] == b'settings':
|
||||
val = self.db[k].decode()
|
||||
print(f'Setting {fields[1].decode()} = {val}')
|
||||
continue
|
||||
if last_tag != fields[0]:
|
||||
last_tag = fields[0]
|
||||
last_pos = None
|
||||
if dump:
|
||||
print(f'Tag {fields[0]}')
|
||||
if fields[1] == b'playlist':
|
||||
if len(fields) != 3:
|
||||
fail(f'Malformed playlist entry: {k!r}')
|
||||
continue
|
||||
try:
|
||||
idx = int(fields[2])
|
||||
except ValueError:
|
||||
fail(f'Malformed playlist entry: {k!r}')
|
||||
continue
|
||||
if len(fields[2]) != 5:
|
||||
fail(f'Bad index width for {last_tag} at {idx}')
|
||||
if (last_pos is not None and last_pos + 1 != idx) or \
|
||||
(last_pos is None and idx != 0):
|
||||
fail(f'Bad playlist entry sequence for {last_tag} at {idx}')
|
||||
last_pos = idx
|
||||
if dump:
|
||||
print(f'\tTrack {idx}: {self.db[k]!r}')
|
||||
elif fields[1] == b'playlistpos':
|
||||
val = self.db[k]
|
||||
try:
|
||||
idx = int(val)
|
||||
except ValueError:
|
||||
fail(f'Malformed playlist position: {val!r}')
|
||||
continue
|
||||
if 0 > idx or idx > last_pos:
|
||||
fail(f'Playlist position out of range for {last_tag}: {idx}')
|
||||
elif dump:
|
||||
print(f'\tPosition {idx}')
|
||||
elif fields[1] == b'playlistshuffle':
|
||||
val = self.db[k]
|
||||
if val not in (b'no', b'yes'):
|
||||
fail(f'Bad playlistshuffle value for {last_tag}: {val!r}')
|
||||
if dump and val == b'yes':
|
||||
print('\tShuffle')
|
||||
elif fields[1] == b'playlistpersist':
|
||||
val = self.db[k]
|
||||
if val not in (b'no', b'track', b'offset'):
|
||||
fail(f'Bad playlistpersist value for {last_tag}: {val!r}')
|
||||
elif dump:
|
||||
print(f'\tPersist: {val.decode()}')
|
||||
elif fields[1] == b'playlistshuffleseed':
|
||||
val = self.db[k]
|
||||
try:
|
||||
_ = int(val)
|
||||
except ValueError:
|
||||
fail(f' Bad playlistshuffleseed value for {last_tag}: {val!r}')
|
||||
elif fields[1] == b'playlistposoffset':
|
||||
val = self.db[k]
|
||||
try:
|
||||
_ = int(val)
|
||||
except ValueError:
|
||||
fail(f' Bad playlistposoffset value for {last_tag}: {val!r}')
|
||||
else:
|
||||
fail(f'Unknown key {k!r}')
|
||||
return result
|
||||
|
||||
|
||||
class BTreeFileManager:
|
||||
"""
|
||||
Context manager for a BTreeDB playlist db backed by a file in the filesystem.
|
||||
"""
|
||||
def __init__(self, db_path: str | bytes):
|
||||
self.db_path = db_path
|
||||
|
||||
def __enter__(self):
|
||||
try:
|
||||
self.db_file = open(self.db_path, 'r+b')
|
||||
except OSError:
|
||||
self.db_file = open(self.db_path, 'w+b')
|
||||
try:
|
||||
self.db = btree.open(self.db_file, pagesize=512, cachesize=1024)
|
||||
btdb = BTreeDB(self.db, lambda: self.db_file.flush())
|
||||
btdb.validate(True) # while testing, validate and dump DB on startup
|
||||
return btdb
|
||||
except Exception:
|
||||
self.db_file.close()
|
||||
raise
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.db.close()
|
||||
self.db_file.close()
|
||||
25
software/tests/mocks/btree.py
Normal file
25
software/tests/mocks/btree.py
Normal file
@@ -0,0 +1,25 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
from typing import Iterable
|
||||
|
||||
|
||||
class BTree:
|
||||
def close(self): ...
|
||||
|
||||
def values(self, start_key: str | bytes, end_key: str | bytes | None = None, flags=None) -> Iterable[str | bytes]:
|
||||
pass
|
||||
|
||||
def __setitem__(self, key: str | bytes, val: str | bytes): ...
|
||||
|
||||
def flush(self): ...
|
||||
|
||||
def get(self, key: str | bytes, default: str | bytes | None = None) -> str | bytes: ...
|
||||
|
||||
|
||||
def open(dbfile) -> BTree:
|
||||
pass
|
||||
|
||||
|
||||
DESC = 1
|
||||
INCL = 2
|
||||
248
software/tests/test_playerapp.py
Normal file
248
software/tests/test_playerapp.py
Normal file
@@ -0,0 +1,248 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
import app
|
||||
import builtins
|
||||
import pytest # type: ignore
|
||||
import time
|
||||
import utils
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def micropythonify():
|
||||
def time_ticks_ms():
|
||||
return time.time_ns() // 1000000
|
||||
time.ticks_ms = time_ticks_ms
|
||||
yield
|
||||
del time.ticks_ms
|
||||
|
||||
|
||||
class FakeFile:
|
||||
def __init__(self, filename, mode):
|
||||
self.filename = filename
|
||||
self.mode = mode
|
||||
self.closed = False
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
|
||||
|
||||
class FakeMp3Player:
|
||||
def __init__(self):
|
||||
self.volume: int | None = None
|
||||
self.track: FakeFile | None = None
|
||||
|
||||
def set_volume(self, vol: int):
|
||||
self.volume = vol
|
||||
|
||||
def play(self, track: FakeFile, offset: int):
|
||||
self.track = track
|
||||
|
||||
def stop(self):
|
||||
self.track = None
|
||||
|
||||
|
||||
class FakeTimerManager:
|
||||
def __init__(self): pass
|
||||
def cancel(self, timer): pass
|
||||
|
||||
def schedule(self, when, what):
|
||||
what()
|
||||
|
||||
|
||||
class FakeNfcReader:
|
||||
tag_callback = None
|
||||
|
||||
def __init__(self, tag_callback=None):
|
||||
FakeNfcReader.tag_callback = tag_callback
|
||||
|
||||
|
||||
class FakeButtons:
|
||||
def __init__(self): pass
|
||||
|
||||
|
||||
class FakePlaylistDb:
|
||||
class FakePlaylist:
|
||||
def __init__(self, parent, pos=0):
|
||||
self.parent = parent
|
||||
self.pos = 0
|
||||
|
||||
def getCurrentPath(self):
|
||||
return self.parent.tracklist[self.pos]
|
||||
|
||||
def getNextPath(self):
|
||||
self.pos += 1
|
||||
if self.pos >= len(self.parent.tracklist):
|
||||
return None
|
||||
return self.parent.tracklist[self.pos]
|
||||
|
||||
def getPlaybackOffset(self):
|
||||
return 0
|
||||
|
||||
def __init__(self, tracklist=[b'test/path.mp3']):
|
||||
self.tracklist = tracklist
|
||||
|
||||
def getPlaylistForTag(self, tag: bytes):
|
||||
return self.FakePlaylist(self)
|
||||
|
||||
def getSetting(self, key: bytes | str):
|
||||
if key == 'tagmode':
|
||||
return 'tagremains'
|
||||
return None
|
||||
|
||||
|
||||
def fake_open(filename, mode):
|
||||
return FakeFile(filename, mode)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def faketimermanager(monkeypatch):
|
||||
monkeypatch.setattr(utils.timer.TimerManager, '_instance', FakeTimerManager())
|
||||
|
||||
|
||||
def test_construct_app(micropythonify, faketimermanager):
|
||||
fake_mp3 = FakeMp3Player()
|
||||
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
|
||||
nfcreader=lambda _: FakeNfcReader(),
|
||||
buttons=lambda _: FakeButtons(),
|
||||
playlistdb=lambda _: FakePlaylistDb())
|
||||
_ = app.PlayerApp(deps)
|
||||
assert fake_mp3.volume is not None
|
||||
|
||||
|
||||
def test_load_playlist_on_tag(micropythonify, faketimermanager, monkeypatch):
|
||||
fake_db = FakePlaylistDb()
|
||||
fake_mp3 = FakeMp3Player()
|
||||
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
|
||||
nfcreader=lambda x: FakeNfcReader(x),
|
||||
buttons=lambda _: FakeButtons(),
|
||||
playlistdb=lambda _: fake_db)
|
||||
app.PlayerApp(deps)
|
||||
with monkeypatch.context() as m:
|
||||
m.setattr(builtins, 'open', fake_open)
|
||||
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
|
||||
assert fake_mp3.track is not None
|
||||
assert fake_mp3.track.filename == b'test/path.mp3'
|
||||
assert "r" in fake_mp3.track.mode
|
||||
assert "b" in fake_mp3.track.mode
|
||||
|
||||
|
||||
def test_playlist_seq(micropythonify, faketimermanager, monkeypatch):
|
||||
fake_db = FakePlaylistDb([b'track1.mp3', b'track2.mp3', b'track3.mp3'])
|
||||
fake_mp3 = FakeMp3Player()
|
||||
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
|
||||
nfcreader=lambda x: FakeNfcReader(x),
|
||||
buttons=lambda _: FakeButtons(),
|
||||
playlistdb=lambda _: fake_db)
|
||||
dut = app.PlayerApp(deps)
|
||||
with monkeypatch.context() as m:
|
||||
m.setattr(builtins, 'open', fake_open)
|
||||
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
|
||||
assert fake_mp3.track is not None
|
||||
assert fake_mp3.track.filename == b'track1.mp3'
|
||||
|
||||
fake_mp3.track = None
|
||||
dut.onPlaybackDone()
|
||||
assert fake_mp3.track is not None
|
||||
assert fake_mp3.track.filename == b'track2.mp3'
|
||||
|
||||
fake_mp3.track = None
|
||||
dut.onPlaybackDone()
|
||||
assert fake_mp3.track is not None
|
||||
assert fake_mp3.track.filename == b'track3.mp3'
|
||||
|
||||
fake_mp3.track = None
|
||||
dut.onPlaybackDone()
|
||||
assert fake_mp3.track is None
|
||||
|
||||
|
||||
def test_playlist_unknown_tag(micropythonify, faketimermanager, monkeypatch):
|
||||
class FakeNoPlaylistDb:
|
||||
def getPlaylistForTag(self, tag):
|
||||
return None
|
||||
|
||||
def getSetting(self, key: bytes | str):
|
||||
return None
|
||||
|
||||
fake_db = FakeNoPlaylistDb()
|
||||
fake_mp3 = FakeMp3Player()
|
||||
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
|
||||
nfcreader=lambda x: FakeNfcReader(x),
|
||||
buttons=lambda _: FakeButtons(),
|
||||
playlistdb=lambda _: fake_db)
|
||||
app.PlayerApp(deps)
|
||||
with monkeypatch.context() as m:
|
||||
m.setattr(builtins, 'open', fake_open)
|
||||
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
|
||||
assert fake_mp3.track is None
|
||||
|
||||
|
||||
def test_tagmode_startstop(micropythonify, faketimermanager, monkeypatch):
|
||||
class MyFakePlaylistDb(FakePlaylistDb):
|
||||
def __init__(self, tracklist=[b'test/path.mp3']):
|
||||
super().__init__(tracklist)
|
||||
|
||||
def getSetting(self, key: bytes | str):
|
||||
if key == 'tagmode':
|
||||
return 'tagstartstop'
|
||||
return None
|
||||
|
||||
fake_db = MyFakePlaylistDb()
|
||||
fake_mp3 = FakeMp3Player()
|
||||
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
|
||||
nfcreader=lambda x: FakeNfcReader(x),
|
||||
buttons=lambda _: FakeButtons(),
|
||||
playlistdb=lambda _: fake_db)
|
||||
app.PlayerApp(deps)
|
||||
with monkeypatch.context() as m:
|
||||
m.setattr(builtins, 'open', fake_open)
|
||||
# Present tag to start playback
|
||||
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
|
||||
assert fake_mp3.track is not None
|
||||
assert fake_mp3.track.filename == b'test/path.mp3'
|
||||
# Removing tag should not stop playback
|
||||
FakeNfcReader.tag_callback.onTagChange(None)
|
||||
assert fake_mp3.track is not None
|
||||
assert fake_mp3.track.filename == b'test/path.mp3'
|
||||
# Presenting tag should stop playback
|
||||
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
|
||||
assert fake_mp3.track is None
|
||||
# Nothing should change here
|
||||
FakeNfcReader.tag_callback.onTagChange(None)
|
||||
assert fake_mp3.track is None
|
||||
# Presenting tag again should start playback again
|
||||
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
|
||||
assert fake_mp3.track is not None
|
||||
assert fake_mp3.track.filename == b'test/path.mp3'
|
||||
|
||||
|
||||
def test_tagmode_remains(micropythonify, faketimermanager, monkeypatch):
|
||||
class MyFakePlaylistDb(FakePlaylistDb):
|
||||
def __init__(self, tracklist=[b'test/path.mp3']):
|
||||
super().__init__(tracklist)
|
||||
|
||||
def getSetting(self, key: bytes | str):
|
||||
if key == 'tagmode':
|
||||
return 'tagremains'
|
||||
return None
|
||||
|
||||
fake_db = MyFakePlaylistDb()
|
||||
fake_mp3 = FakeMp3Player()
|
||||
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
|
||||
nfcreader=lambda x: FakeNfcReader(x),
|
||||
buttons=lambda _: FakeButtons(),
|
||||
playlistdb=lambda _: fake_db)
|
||||
app.PlayerApp(deps)
|
||||
with monkeypatch.context() as m:
|
||||
m.setattr(builtins, 'open', fake_open)
|
||||
# Present tag to start playback
|
||||
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
|
||||
assert fake_mp3.track is not None
|
||||
assert fake_mp3.track.filename == b'test/path.mp3'
|
||||
# Remove tag to stop playback
|
||||
FakeNfcReader.tag_callback.onTagChange(None)
|
||||
assert fake_mp3.track is None
|
||||
# Presenting tag again should start playback again
|
||||
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
|
||||
assert fake_mp3.track is not None
|
||||
assert fake_mp3.track.filename == b'test/path.mp3'
|
||||
197
software/tests/utils_test/test_btreedb.py
Normal file
197
software/tests/utils_test/test_btreedb.py
Normal file
@@ -0,0 +1,197 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
import btree
|
||||
import pytest
|
||||
import time
|
||||
from utils import BTreeDB
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def micropythonify():
|
||||
def time_ticks_cpu():
|
||||
return time.time_ns()
|
||||
time.ticks_cpu = time_ticks_cpu
|
||||
yield
|
||||
del time.ticks_cpu
|
||||
|
||||
|
||||
class FakeDB:
|
||||
def __init__(self, contents):
|
||||
self.contents = contents
|
||||
self.saved_contents = dict(contents)
|
||||
|
||||
def flush(self):
|
||||
self.saved_contents = dict(self.contents)
|
||||
|
||||
def values(self, start_key=None, end_key=None, flags=None):
|
||||
res = []
|
||||
for key in sorted(self.contents):
|
||||
if start_key is not None and start_key > key:
|
||||
continue
|
||||
if end_key is not None and end_key <= key:
|
||||
break
|
||||
yield self.contents[key]
|
||||
res.append(self.contents[key])
|
||||
|
||||
def keys(self, start_key=None, end_key=None, flags=None):
|
||||
keys = []
|
||||
if flags is not None and flags & btree.DESC != 0:
|
||||
start_key, end_key = end_key, start_key
|
||||
for key in sorted(self.contents):
|
||||
if start_key is not None and start_key > key:
|
||||
continue
|
||||
if end_key is not None and end_key <= key:
|
||||
break
|
||||
keys.append(key)
|
||||
if flags is not None and flags & btree.DESC != 0:
|
||||
keys.reverse()
|
||||
return iter(keys)
|
||||
|
||||
def get(self, key, default=None):
|
||||
return self.contents.get(key, default)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.contents[key]
|
||||
|
||||
def __setitem__(self, key, val):
|
||||
self.contents[key] = val
|
||||
|
||||
def __delitem__(self, key):
|
||||
del self.contents[key]
|
||||
|
||||
def __contains__(self, key):
|
||||
return key in self.contents
|
||||
|
||||
|
||||
def test_playlist_load():
|
||||
contents = {b'foo/part': b'no',
|
||||
b'foo/playlist/00000': b'track1',
|
||||
b'foo/playlist/00001': b'track2',
|
||||
b'foo/playlisttt': b'no'
|
||||
}
|
||||
uut = BTreeDB(FakeDB(contents))
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
assert list(pl.getPaths()) == [b'track1', b'track2']
|
||||
assert pl.getCurrentPath() == b'track1'
|
||||
|
||||
|
||||
def test_playlist_nextpath():
|
||||
contents = FakeDB({b'foo/part': b'no',
|
||||
b'foo/playlist/00000': b'track1',
|
||||
b'foo/playlist/00001': b'track2',
|
||||
b'foo/playlisttt': b'no'
|
||||
})
|
||||
uut = BTreeDB(contents)
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
assert pl.getNextPath() == b'track2'
|
||||
assert contents.saved_contents[b'foo/playlistpos'] == b'1'
|
||||
|
||||
|
||||
def test_playlist_nextpath_last():
|
||||
contents = FakeDB({b'foo/playlist/00000': b'track1',
|
||||
b'foo/playlist/00001': b'track2',
|
||||
b'foo/playlistpos': b'1'
|
||||
})
|
||||
uut = BTreeDB(contents)
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
assert pl.getNextPath() is None
|
||||
assert contents.saved_contents[b'foo/playlistpos'] == b'0'
|
||||
|
||||
|
||||
def test_playlist_create():
|
||||
contents = FakeDB({b'foo/playlist/00000': b'track1',
|
||||
b'foo/playlist/00001': b'track2',
|
||||
b'foo/playlistpos': b'1'
|
||||
})
|
||||
newplaylist = [b'never gonna give you up.mp3', b'durch den monsun.mp3']
|
||||
uut = BTreeDB(contents)
|
||||
new_pl = uut.createPlaylistForTag(b'foo', newplaylist)
|
||||
assert list(new_pl.getPaths()) == newplaylist
|
||||
assert new_pl.getCurrentPath() == newplaylist[0]
|
||||
assert uut.validate(True)
|
||||
|
||||
|
||||
def test_playlist_load_notexist():
|
||||
contents = FakeDB({b'foo/playlist/00000': b'track1',
|
||||
b'foo/playlist/00001': b'track2',
|
||||
b'foo/playlistpos': b'1'
|
||||
})
|
||||
uut = BTreeDB(contents)
|
||||
assert uut.getPlaylistForTag(b'notfound') is None
|
||||
|
||||
|
||||
def test_playlist_starts_at_beginning_in_persist_no_mode():
|
||||
contents = FakeDB({b'foo/playlist/00000': b'track1',
|
||||
b'foo/playlist/00001': b'track2',
|
||||
b'foo/playlistpersist': b'no',
|
||||
})
|
||||
uut = BTreeDB(contents)
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
assert pl.getCurrentPath() == b'track1'
|
||||
assert pl.getNextPath() == b'track2'
|
||||
del pl
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
assert pl.getCurrentPath() == b'track1'
|
||||
|
||||
|
||||
@pytest.mark.parametrize("mode", [b'no', b'track'])
|
||||
def test_playlist_ignores_offset_in_other_modes(mode):
|
||||
contents = FakeDB({b'foo/playlist/00000': b'track1',
|
||||
b'foo/playlist/00001': b'track2',
|
||||
b'foo/playlistpersist': mode,
|
||||
})
|
||||
uut = BTreeDB(contents)
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
pl.setPlaybackOffset(42)
|
||||
del pl
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
assert pl.getPlaybackOffset() == 0
|
||||
|
||||
|
||||
def test_playlist_stores_offset_in_offset_mode():
|
||||
contents = FakeDB({b'foo/playlist/00000': b'track1',
|
||||
b'foo/playlist/00001': b'track2',
|
||||
b'foo/playlistpersist': b'offset',
|
||||
})
|
||||
uut = BTreeDB(contents)
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
pl.setPlaybackOffset(42)
|
||||
del pl
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
assert pl.getPlaybackOffset() == 42
|
||||
|
||||
|
||||
def test_playlist_resets_offset_on_next_track():
|
||||
contents = FakeDB({b'foo/playlist/00000': b'track1',
|
||||
b'foo/playlist/00001': b'track2',
|
||||
b'foo/playlistpersist': b'offset',
|
||||
})
|
||||
uut = BTreeDB(contents)
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
pl.setPlaybackOffset(42)
|
||||
assert pl.getNextPath() == b'track2'
|
||||
del pl
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
assert pl.getCurrentPath() == b'track2'
|
||||
assert pl.getPlaybackOffset() == 0
|
||||
|
||||
|
||||
def test_playlist_shuffle():
|
||||
contents_dict = {b'foo/playlistpersist': b'track',
|
||||
b'foo/playlistshuffle': b'yes',
|
||||
}
|
||||
for i in range(256):
|
||||
contents_dict['foo/playlist/{:05}'.format(i).encode()] = 'track{}'.format(i).encode()
|
||||
contents = FakeDB(contents_dict)
|
||||
uut = BTreeDB(contents)
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
shuffled = False
|
||||
last_idx = int(pl.getCurrentPath().removeprefix(b'track'))
|
||||
while (t := pl.getNextPath()) is not None:
|
||||
idx = int(t.removeprefix(b'track'))
|
||||
if idx != last_idx + 1:
|
||||
shuffled = True
|
||||
break
|
||||
# A false negative ratr of 1 in 256! should be good enough for this test
|
||||
assert shuffled
|
||||
@@ -10,6 +10,7 @@ include(../../lib/micropython/lib/pico-sdk/pico_sdk_init.cmake)
|
||||
project(standalone_mp3)
|
||||
|
||||
option(ENABLE_WRITE_TEST "Enable write test" OFF)
|
||||
option(ENABLE_READ_TEST "Enable read test" ON)
|
||||
option(ENABLE_PLAY_TEST "Enable mp3 playback test" OFF)
|
||||
option(ENABLE_SD_READ_CRC "Enable crc check when reading from sd card" OFF)
|
||||
option(ENABLE_SD_DEBUG "Enable debug output for sd card driver" OFF)
|
||||
@@ -37,6 +38,10 @@ if(ENABLE_WRITE_TEST)
|
||||
target_compile_definitions(standalone_mp3 PRIVATE WRITE_TEST)
|
||||
endif()
|
||||
|
||||
if(ENABLE_READ_TEST)
|
||||
target_compile_definitions(standalone_mp3 PRIVATE READ_TEST)
|
||||
endif()
|
||||
|
||||
if(ENABLE_PLAY_TEST)
|
||||
target_compile_definitions(standalone_mp3 PRIVATE PLAY_TEST)
|
||||
endif()
|
||||
|
||||
@@ -157,7 +157,7 @@ static void write_test(struct sd_context *sd_context)
|
||||
data_buffer[i] ^= 0xff;
|
||||
}
|
||||
|
||||
if(!sd_writeblock(sd_context, 0, data_buffer)) {
|
||||
if (!sd_writeblock(sd_context, 0, data_buffer)) {
|
||||
printf("sd_writeblock failed\n");
|
||||
return;
|
||||
}
|
||||
@@ -165,6 +165,20 @@ static void write_test(struct sd_context *sd_context)
|
||||
} while (data_buffer[SD_SECTOR_SIZE - 1] != 0xAA);
|
||||
}
|
||||
|
||||
static void read_test(struct sd_context *sd_context)
|
||||
{
|
||||
uint8_t data_buffer[512];
|
||||
const uint64_t before = time_us_64();
|
||||
for (int block = 0; block < 245760; ++block) {
|
||||
if (!sd_readblock(sd_context, block, data_buffer)) {
|
||||
printf("sd_readblock(%d) failed\n", block);
|
||||
return;
|
||||
}
|
||||
}
|
||||
const uint64_t elapsed = time_us_64() - before;
|
||||
printf("%llu ms elapsed, %f kB/s\n", elapsed / 1000LLU, 128 * 1024.f / (elapsed / 1000000.f));
|
||||
}
|
||||
|
||||
int main()
|
||||
{
|
||||
stdio_init_all();
|
||||
@@ -172,10 +186,23 @@ int main()
|
||||
|
||||
struct sd_context sd_context;
|
||||
|
||||
if (!sd_init(&sd_context, 3, 4, 2, 5, 15000000)) {
|
||||
#define DRIVE_STRENGTH GPIO_DRIVE_STRENGTH_8MA
|
||||
#define SLEW_RATE GPIO_SLEW_RATE_SLOW
|
||||
|
||||
gpio_set_drive_strength(2, DRIVE_STRENGTH);
|
||||
gpio_set_slew_rate(2, SLEW_RATE);
|
||||
gpio_set_drive_strength(3, DRIVE_STRENGTH);
|
||||
gpio_set_slew_rate(3, SLEW_RATE);
|
||||
|
||||
if (!sd_init(&sd_context, 3, 4, 2, 5, 25000000)) {
|
||||
printf("sd_init failed\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
#ifdef READ_TEST
|
||||
read_test(&sd_context);
|
||||
#endif
|
||||
|
||||
#ifdef WRITE_TEST
|
||||
write_test(&sd_context);
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user